This is page 22 of 29. Use http://codebase.md/wshobson/maverick-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── feature_request.md
│ │ ├── question.md
│ │ └── security_report.md
│ ├── pull_request_template.md
│ └── workflows
│ ├── claude-code-review.yml
│ └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│ ├── launch.json
│ └── settings.json
├── alembic
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 001_initial_schema.py
│ ├── 003_add_performance_indexes.py
│ ├── 006_rename_metadata_columns.py
│ ├── 008_performance_optimization_indexes.py
│ ├── 009_rename_to_supply_demand.py
│ ├── 010_self_contained_schema.py
│ ├── 011_remove_proprietary_terms.py
│ ├── 013_add_backtest_persistence_models.py
│ ├── 014_add_portfolio_models.py
│ ├── 08e3945a0c93_merge_heads.py
│ ├── 9374a5c9b679_merge_heads_for_testing.py
│ ├── abf9b9afb134_merge_multiple_heads.py
│ ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│ ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│ ├── f0696e2cac15_add_essential_performance_indexes.py
│ └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── api
│ │ └── backtesting.md
│ ├── BACKTESTING.md
│ ├── COST_BASIS_SPECIFICATION.md
│ ├── deep_research_agent.md
│ ├── exa_research_testing_strategy.md
│ ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│ ├── PORTFOLIO.md
│ ├── SETUP_SELF_CONTAINED.md
│ └── speed_testing_framework.md
├── examples
│ ├── complete_speed_validation.py
│ ├── deep_research_integration.py
│ ├── llm_optimization_example.py
│ ├── llm_speed_demo.py
│ ├── monitoring_example.py
│ ├── parallel_research_example.py
│ ├── speed_optimization_demo.py
│ └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── circuit_breaker.py
│ │ ├── deep_research.py
│ │ ├── market_analysis.py
│ │ ├── optimized_research.py
│ │ ├── supervisor.py
│ │ └── technical_analysis.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api_server.py
│ │ ├── connection_manager.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ ├── stock_analysis.py
│ │ │ └── technical_analysis.py
│ │ ├── error_handling.py
│ │ ├── inspector_compatible_sse.py
│ │ ├── inspector_sse.py
│ │ ├── middleware
│ │ │ ├── error_handling.py
│ │ │ ├── mcp_logging.py
│ │ │ ├── rate_limiting_enhanced.py
│ │ │ └── security.py
│ │ ├── openapi_config.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── backtesting.py
│ │ │ ├── data_enhanced.py
│ │ │ ├── data.py
│ │ │ ├── health_enhanced.py
│ │ │ ├── health_tools.py
│ │ │ ├── health.py
│ │ │ ├── intelligent_backtesting.py
│ │ │ ├── introspection.py
│ │ │ ├── mcp_prompts.py
│ │ │ ├── monitoring.py
│ │ │ ├── news_sentiment_enhanced.py
│ │ │ ├── performance.py
│ │ │ ├── portfolio.py
│ │ │ ├── research.py
│ │ │ ├── screening_ddd.py
│ │ │ ├── screening_parallel.py
│ │ │ ├── screening.py
│ │ │ ├── technical_ddd.py
│ │ │ ├── technical_enhanced.py
│ │ │ ├── technical.py
│ │ │ └── tool_registry.py
│ │ ├── server.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── base_service.py
│ │ │ ├── market_service.py
│ │ │ ├── portfolio_service.py
│ │ │ ├── prompt_service.py
│ │ │ └── resource_service.py
│ │ ├── simple_sse.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── insomnia_export.py
│ │ └── postman_export.py
│ ├── application
│ │ ├── __init__.py
│ │ ├── commands
│ │ │ └── __init__.py
│ │ ├── dto
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_dto.py
│ │ ├── queries
│ │ │ ├── __init__.py
│ │ │ └── get_technical_analysis.py
│ │ └── screening
│ │ ├── __init__.py
│ │ ├── dtos.py
│ │ └── queries.py
│ ├── backtesting
│ │ ├── __init__.py
│ │ ├── ab_testing.py
│ │ ├── analysis.py
│ │ ├── batch_processing_stub.py
│ │ ├── batch_processing.py
│ │ ├── model_manager.py
│ │ ├── optimization.py
│ │ ├── persistence.py
│ │ ├── retraining_pipeline.py
│ │ ├── strategies
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── ml
│ │ │ │ ├── __init__.py
│ │ │ │ ├── adaptive.py
│ │ │ │ ├── ensemble.py
│ │ │ │ ├── feature_engineering.py
│ │ │ │ └── regime_aware.py
│ │ │ ├── ml_strategies.py
│ │ │ ├── parser.py
│ │ │ └── templates.py
│ │ ├── strategy_executor.py
│ │ ├── vectorbt_engine.py
│ │ └── visualization.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── constants.py
│ │ ├── database_self_contained.py
│ │ ├── database.py
│ │ ├── llm_optimization_config.py
│ │ ├── logging_settings.py
│ │ ├── plotly_config.py
│ │ ├── security_utils.py
│ │ ├── security.py
│ │ ├── settings.py
│ │ ├── technical_constants.py
│ │ ├── tool_estimation.py
│ │ └── validation.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── technical_analysis.py
│ │ └── visualization.py
│ ├── data
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── cache.py
│ │ ├── django_adapter.py
│ │ ├── health.py
│ │ ├── models.py
│ │ ├── performance.py
│ │ ├── session_management.py
│ │ └── validation.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── optimization.py
│ ├── dependencies.py
│ ├── domain
│ │ ├── __init__.py
│ │ ├── entities
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis.py
│ │ ├── events
│ │ │ └── __init__.py
│ │ ├── portfolio.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ ├── entities.py
│ │ │ ├── services.py
│ │ │ └── value_objects.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_service.py
│ │ ├── stock_analysis
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis_service.py
│ │ └── value_objects
│ │ ├── __init__.py
│ │ └── technical_indicators.py
│ ├── exceptions.py
│ ├── infrastructure
│ │ ├── __init__.py
│ │ ├── cache
│ │ │ └── __init__.py
│ │ ├── caching
│ │ │ ├── __init__.py
│ │ │ └── cache_management_service.py
│ │ ├── connection_manager.py
│ │ ├── data_fetching
│ │ │ ├── __init__.py
│ │ │ └── stock_data_service.py
│ │ ├── health
│ │ │ ├── __init__.py
│ │ │ └── health_checker.py
│ │ ├── persistence
│ │ │ ├── __init__.py
│ │ │ └── stock_repository.py
│ │ ├── providers
│ │ │ └── __init__.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ └── repositories.py
│ │ └── sse_optimizer.py
│ ├── langchain_tools
│ │ ├── __init__.py
│ │ ├── adapters.py
│ │ └── registry.py
│ ├── logging_config.py
│ ├── memory
│ │ ├── __init__.py
│ │ └── stores.py
│ ├── monitoring
│ │ ├── __init__.py
│ │ ├── health_check.py
│ │ ├── health_monitor.py
│ │ ├── integration_example.py
│ │ ├── metrics.py
│ │ ├── middleware.py
│ │ └── status_dashboard.py
│ ├── providers
│ │ ├── __init__.py
│ │ ├── dependencies.py
│ │ ├── factories
│ │ │ ├── __init__.py
│ │ │ ├── config_factory.py
│ │ │ └── provider_factory.py
│ │ ├── implementations
│ │ │ ├── __init__.py
│ │ │ ├── cache_adapter.py
│ │ │ ├── macro_data_adapter.py
│ │ │ ├── market_data_adapter.py
│ │ │ ├── persistence_adapter.py
│ │ │ └── stock_data_adapter.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── cache.py
│ │ │ ├── config.py
│ │ │ ├── macro_data.py
│ │ │ ├── market_data.py
│ │ │ ├── persistence.py
│ │ │ └── stock_data.py
│ │ ├── llm_factory.py
│ │ ├── macro_data.py
│ │ ├── market_data.py
│ │ ├── mocks
│ │ │ ├── __init__.py
│ │ │ ├── mock_cache.py
│ │ │ ├── mock_config.py
│ │ │ ├── mock_macro_data.py
│ │ │ ├── mock_market_data.py
│ │ │ ├── mock_persistence.py
│ │ │ └── mock_stock_data.py
│ │ ├── openrouter_provider.py
│ │ ├── optimized_screening.py
│ │ ├── optimized_stock_data.py
│ │ └── stock_data.py
│ ├── README.md
│ ├── tests
│ │ ├── __init__.py
│ │ ├── README_INMEMORY_TESTS.md
│ │ ├── test_cache_debug.py
│ │ ├── test_fixes_validation.py
│ │ ├── test_in_memory_routers.py
│ │ ├── test_in_memory_server.py
│ │ ├── test_macro_data_provider.py
│ │ ├── test_mailgun_email.py
│ │ ├── test_market_calendar_caching.py
│ │ ├── test_mcp_tool_fixes_pytest.py
│ │ ├── test_mcp_tool_fixes.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_models_functional.py
│ │ ├── test_server.py
│ │ ├── test_stock_data_enhanced.py
│ │ ├── test_stock_data_provider.py
│ │ └── test_technical_analysis.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── performance_monitoring.py
│ │ ├── portfolio_manager.py
│ │ ├── risk_management.py
│ │ └── sentiment_analysis.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── agent_errors.py
│ │ ├── batch_processing.py
│ │ ├── cache_warmer.py
│ │ ├── circuit_breaker_decorators.py
│ │ ├── circuit_breaker_services.py
│ │ ├── circuit_breaker.py
│ │ ├── data_chunking.py
│ │ ├── database_monitoring.py
│ │ ├── debug_utils.py
│ │ ├── fallback_strategies.py
│ │ ├── llm_optimization.py
│ │ ├── logging_example.py
│ │ ├── logging_init.py
│ │ ├── logging.py
│ │ ├── mcp_logging.py
│ │ ├── memory_profiler.py
│ │ ├── monitoring_middleware.py
│ │ ├── monitoring.py
│ │ ├── orchestration_logging.py
│ │ ├── parallel_research.py
│ │ ├── parallel_screening.py
│ │ ├── quick_cache.py
│ │ ├── resource_manager.py
│ │ ├── shutdown.py
│ │ ├── stock_helpers.py
│ │ ├── structured_logger.py
│ │ ├── tool_monitoring.py
│ │ ├── tracing.py
│ │ └── yfinance_pool.py
│ ├── validation
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── data.py
│ │ ├── middleware.py
│ │ ├── portfolio.py
│ │ ├── responses.py
│ │ ├── screening.py
│ │ └── technical.py
│ └── workflows
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── market_analyzer.py
│ │ ├── optimizer_agent.py
│ │ ├── strategy_selector.py
│ │ └── validator_agent.py
│ ├── backtesting_workflow.py
│ └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── dev.sh
│ ├── INSTALLATION_GUIDE.md
│ ├── load_example.py
│ ├── load_market_data.py
│ ├── load_tiingo_data.py
│ ├── migrate_db.py
│ ├── README_TIINGO_LOADER.md
│ ├── requirements_tiingo.txt
│ ├── run_stock_screening.py
│ ├── run-migrations.sh
│ ├── seed_db.py
│ ├── seed_sp500.py
│ ├── setup_database.sh
│ ├── setup_self_contained.py
│ ├── setup_sp500_database.sh
│ ├── test_seeded_data.py
│ ├── test_tiingo_loader.py
│ ├── tiingo_config.py
│ └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── core
│ │ └── test_technical_analysis.py
│ ├── data
│ │ └── test_portfolio_models.py
│ ├── domain
│ │ ├── conftest.py
│ │ ├── test_portfolio_entities.py
│ │ └── test_technical_analysis_service.py
│ ├── fixtures
│ │ └── orchestration_fixtures.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── README.md
│ │ ├── run_integration_tests.sh
│ │ ├── test_api_technical.py
│ │ ├── test_chaos_engineering.py
│ │ ├── test_config_management.py
│ │ ├── test_full_backtest_workflow_advanced.py
│ │ ├── test_full_backtest_workflow.py
│ │ ├── test_high_volume.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_orchestration_complete.py
│ │ ├── test_portfolio_persistence.py
│ │ ├── test_redis_cache.py
│ │ ├── test_security_integration.py.disabled
│ │ └── vcr_setup.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── test_benchmarks.py
│ │ ├── test_load.py
│ │ ├── test_profiling.py
│ │ └── test_stress.py
│ ├── providers
│ │ └── test_stock_data_simple.py
│ ├── README.md
│ ├── test_agents_router_mcp.py
│ ├── test_backtest_persistence.py
│ ├── test_cache_management_service.py
│ ├── test_cache_serialization.py
│ ├── test_circuit_breaker.py
│ ├── test_database_pool_config_simple.py
│ ├── test_database_pool_config.py
│ ├── test_deep_research_functional.py
│ ├── test_deep_research_integration.py
│ ├── test_deep_research_parallel_execution.py
│ ├── test_error_handling.py
│ ├── test_event_loop_integrity.py
│ ├── test_exa_research_integration.py
│ ├── test_exception_hierarchy.py
│ ├── test_financial_search.py
│ ├── test_graceful_shutdown.py
│ ├── test_integration_simple.py
│ ├── test_langgraph_workflow.py
│ ├── test_market_data_async.py
│ ├── test_market_data_simple.py
│ ├── test_mcp_orchestration_functional.py
│ ├── test_ml_strategies.py
│ ├── test_optimized_research_agent.py
│ ├── test_orchestration_integration.py
│ ├── test_orchestration_logging.py
│ ├── test_orchestration_tools_simple.py
│ ├── test_parallel_research_integration.py
│ ├── test_parallel_research_orchestrator.py
│ ├── test_parallel_research_performance.py
│ ├── test_performance_optimizations.py
│ ├── test_production_validation.py
│ ├── test_provider_architecture.py
│ ├── test_rate_limiting_enhanced.py
│ ├── test_runner_validation.py
│ ├── test_security_comprehensive.py.disabled
│ ├── test_security_cors.py
│ ├── test_security_enhancements.py.disabled
│ ├── test_security_headers.py
│ ├── test_security_penetration.py
│ ├── test_session_management.py
│ ├── test_speed_optimization_validation.py
│ ├── test_stock_analysis_dependencies.py
│ ├── test_stock_analysis_service.py
│ ├── test_stock_data_fetching_service.py
│ ├── test_supervisor_agent.py
│ ├── test_supervisor_functional.py
│ ├── test_tool_estimation_config.py
│ ├── test_visualization.py
│ └── utils
│ ├── test_agent_errors.py
│ ├── test_logging.py
│ ├── test_parallel_screening.py
│ └── test_quick_cache.py
├── tools
│ ├── check_orchestration_config.py
│ ├── experiments
│ │ ├── validation_examples.py
│ │ └── validation_fixed.py
│ ├── fast_dev.sh
│ ├── hot_reload.py
│ ├── quick_test.py
│ └── templates
│ ├── new_router_template.py
│ ├── new_tool_template.py
│ ├── screening_strategy_template.py
│ └── test_template.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/test_parallel_research_performance.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive Performance Test Suite for Parallel Research System
This test suite validates parallel research performance against specific thresholds:
- Minimum speedup: 2 agents = 1.3x speedup, 4 agents = 2.0x speedup
- Maximum memory increase: 3x memory usage acceptable for 4x agents
- Test duration: Quick tests for CI (<30s total runtime)
Features:
- Realistic failure simulation (0%, 10%, 25% failure rates)
- Memory usage monitoring and validation
- Statistical significance testing (3+ runs per test)
- Integration with existing pytest infrastructure
- Performance markers for easy filtering
"""
import asyncio
import gc
import logging
import random
import statistics
import time
import tracemalloc
from datetime import datetime
from typing import Any
from unittest.mock import AsyncMock
import psutil
import pytest
from maverick_mcp.agents.deep_research import DeepResearchAgent
from maverick_mcp.utils.parallel_research import (
ParallelResearchConfig,
ParallelResearchOrchestrator,
ResearchResult,
ResearchTask,
)
logger = logging.getLogger(__name__)
# Performance thresholds and test configuration
PERFORMANCE_THRESHOLDS = {
"speedup": {
2: 1.3, # 2 agents should achieve 1.3x speedup minimum
4: 2.0, # 4 agents should achieve 2.0x speedup minimum
},
"memory_multiplier": 3.0, # 3x memory usage acceptable for 4x agents
"max_test_duration": 30.0, # Total test suite should complete within 30s
}
# Failure simulation configuration
FAILURE_RATES = [0.0, 0.10, 0.25] # 0%, 10%, 25% failure rates
STATISTICAL_RUNS = 3 # Number of runs for statistical significance
class PerformanceMonitor:
"""Monitors CPU, memory, and timing performance during test execution."""
def __init__(self, test_name: str):
self.test_name = test_name
self.start_time: float = 0
self.start_memory: int = 0
self.peak_memory: int = 0
self.cpu_percent_samples: list[float] = []
self.process = psutil.Process()
def __enter__(self):
"""Start performance monitoring."""
# Force garbage collection for accurate memory measurement
gc.collect()
# Start memory tracing
tracemalloc.start()
# Record baseline metrics
self.start_time = time.time()
self.start_memory = self.process.memory_info().rss
self.peak_memory = self.start_memory
logger.info(f"Performance monitoring started for {self.test_name}")
return self
def sample_cpu(self):
"""Sample current CPU usage."""
try:
cpu_percent = self.process.cpu_percent()
self.cpu_percent_samples.append(cpu_percent)
# Track peak memory
current_memory = self.process.memory_info().rss
self.peak_memory = max(self.peak_memory, current_memory)
except Exception as e:
logger.warning(f"Failed to sample CPU/memory: {e}")
def __exit__(self, exc_type, exc_val, exc_tb):
"""Complete performance monitoring and return metrics."""
end_time = time.time()
end_memory = self.process.memory_info().rss
# Get memory tracing results
current, peak_traced = tracemalloc.get_traced_memory()
tracemalloc.stop()
# Calculate metrics
execution_time = end_time - self.start_time
memory_increase = end_memory - self.start_memory
memory_multiplier = end_memory / max(self.start_memory, 1)
avg_cpu = (
statistics.mean(self.cpu_percent_samples) if self.cpu_percent_samples else 0
)
self.metrics = {
"execution_time": execution_time,
"start_memory_mb": self.start_memory / (1024 * 1024),
"end_memory_mb": end_memory / (1024 * 1024),
"peak_memory_mb": max(self.peak_memory, peak_traced) / (1024 * 1024),
"memory_increase_mb": memory_increase / (1024 * 1024),
"memory_multiplier": memory_multiplier,
"avg_cpu_percent": avg_cpu,
"cpu_samples": len(self.cpu_percent_samples),
}
logger.info(
f"Performance metrics for {self.test_name}: "
f"Time: {execution_time:.3f}s, "
f"Memory: {self.metrics['start_memory_mb']:.1f}MB -> "
f"{self.metrics['end_memory_mb']:.1f}MB "
f"({memory_multiplier:.2f}x), "
f"CPU: {avg_cpu:.1f}%"
)
class MockResearchExecutor:
"""
Realistic mock executor that simulates LLM research operations with:
- Configurable failure rates and timeout scenarios
- Variable response times (100-500ms)
- Different response sizes to test memory patterns
- Structured research data that mirrors real usage
"""
def __init__(
self,
failure_rate: float = 0.0,
base_delay: float = 0.1,
delay_variance: float = 0.4,
include_timeouts: bool = False,
):
self.failure_rate = failure_rate
self.base_delay = base_delay
self.delay_variance = delay_variance
self.include_timeouts = include_timeouts
self.execution_count = 0
self.execution_log: list[dict[str, Any]] = []
async def __call__(self, task: ResearchTask) -> dict[str, Any]:
"""Execute mock research task with realistic behavior."""
self.execution_count += 1
start_time = time.time()
# Simulate realistic processing delay (100-500ms)
delay = self.base_delay + random.uniform(0, self.delay_variance)
await asyncio.sleep(delay)
# Simulate various failure modes
if random.random() < self.failure_rate:
failure_type = random.choice(
["api_timeout", "rate_limit", "auth_error", "network_error"]
)
if failure_type == "api_timeout" and self.include_timeouts:
# Simulate timeout by sleeping longer than task timeout
await asyncio.sleep(task.timeout + 1 if task.timeout else 10)
raise Exception(f"Simulated {failure_type} for task {task.task_id}")
# Generate realistic research response based on task type
response = self._generate_research_response(task)
execution_time = time.time() - start_time
self.execution_log.append(
{
"task_id": task.task_id,
"task_type": task.task_type,
"execution_time": execution_time,
"delay_simulated": delay,
"response_size": len(str(response)),
}
)
return response
def _generate_research_response(self, task: ResearchTask) -> dict[str, Any]:
"""Generate structured research data with realistic content."""
# Vary response size based on research type
insight_counts = {
"fundamental": random.randint(8, 15),
"technical": random.randint(6, 12),
"sentiment": random.randint(10, 20),
"competitive": random.randint(7, 14),
}
insight_count = insight_counts.get(task.task_type, 10)
# Generate realistic insights
insights = [
f"{task.task_type} insight {i + 1} for {task.target_topic[:20]}"
for i in range(insight_count)
]
# Add context-specific data
sources = [
{
"title": f"Source {i + 1} for {task.task_type} research",
"url": f"https://example.com/research/{task.task_type}/{i}",
"credibility_score": random.uniform(0.6, 0.95),
"published_date": datetime.now().isoformat(),
"content_snippet": f"Content from source {i + 1} "
* random.randint(10, 50),
"relevance_score": random.uniform(0.7, 1.0),
}
for i in range(random.randint(3, 8))
]
return {
"research_type": task.task_type,
"insights": insights,
"risk_factors": [
f"{task.task_type} risk factor {i + 1}"
for i in range(random.randint(3, 7))
],
"opportunities": [
f"{task.task_type} opportunity {i + 1}"
for i in range(random.randint(3, 7))
],
"sentiment": {
"direction": random.choice(["bullish", "bearish", "neutral"]),
"confidence": random.uniform(0.5, 0.9),
"consensus": random.uniform(0.4, 0.8),
},
"credibility_score": random.uniform(0.6, 0.9),
"sources": sources,
"focus_areas": task.focus_areas,
"metadata": {
"execution_time": random.uniform(0.1, 0.5),
"api_calls_made": random.randint(2, 8),
"cache_hits": random.randint(0, 4),
"cache_misses": random.randint(1, 6),
},
# Add some large data structures to test memory usage
"detailed_analysis": {
f"analysis_point_{i}": f"Detailed analysis content {i} "
* random.randint(50, 200)
for i in range(random.randint(5, 15))
},
}
def get_execution_stats(self) -> dict[str, Any]:
"""Get detailed execution statistics."""
if not self.execution_log:
return {"total_executions": 0}
execution_times = [log["execution_time"] for log in self.execution_log]
response_sizes = [log["response_size"] for log in self.execution_log]
return {
"total_executions": len(self.execution_log),
"avg_execution_time": statistics.mean(execution_times),
"median_execution_time": statistics.median(execution_times),
"avg_response_size": statistics.mean(response_sizes),
"total_response_size": sum(response_sizes),
"task_type_distribution": {
task_type: len(
[log for log in self.execution_log if log["task_type"] == task_type]
)
for task_type in [
"fundamental",
"technical",
"sentiment",
"competitive",
]
},
}
class PerformanceTester:
"""Manages performance test execution and validation."""
@staticmethod
def create_test_tasks(
agent_count: int, session_id: str = "perf_test"
) -> list[ResearchTask]:
"""Create realistic research tasks for performance testing."""
topics = [
"Apple Inc financial analysis and market outlook",
"Tesla Inc competitive position and technical analysis",
"Microsoft Corp sentiment analysis and growth prospects",
"NVIDIA Corp fundamental analysis and AI sector outlook",
"Amazon Inc market position and e-commerce trends",
"Google Inc advertising revenue and cloud competition",
]
tasks = []
for i in range(agent_count):
topic = topics[i % len(topics)]
task_types = ["fundamental", "technical", "sentiment", "competitive"]
task_type = task_types[i % len(task_types)]
task = ResearchTask(
task_id=f"{session_id}_{task_type}_{i}",
task_type=task_type,
target_topic=topic,
focus_areas=[f"focus_{i}", f"area_{i % 3}"],
priority=random.randint(5, 9),
timeout=10, # Short timeout for CI
)
tasks.append(task)
return tasks
@staticmethod
async def run_sequential_baseline(
tasks: list[ResearchTask], executor: MockResearchExecutor
) -> dict[str, Any]:
"""Run tasks sequentially to establish performance baseline."""
start_time = time.time()
results = []
for task in tasks:
try:
result = await executor(task)
results.append({"task": task, "result": result, "status": "success"})
except Exception as e:
results.append({"task": task, "error": str(e), "status": "failed"})
execution_time = time.time() - start_time
successful_results = [r for r in results if r["status"] == "success"]
return {
"execution_time": execution_time,
"successful_tasks": len(successful_results),
"failed_tasks": len(results) - len(successful_results),
"results": results,
}
@staticmethod
async def run_parallel_test(
tasks: list[ResearchTask],
config: ParallelResearchConfig,
executor: MockResearchExecutor,
) -> ResearchResult:
"""Run parallel research test with orchestrator."""
orchestrator = ParallelResearchOrchestrator(config)
# Mock synthesis callback
async def mock_synthesis(
task_results: dict[str, ResearchTask],
) -> dict[str, Any]:
successful_results = [
task.result
for task in task_results.values()
if task.status == "completed" and task.result
]
return {
"synthesis": f"Mock synthesis from {len(successful_results)} results",
"confidence_score": random.uniform(0.7, 0.9),
"key_findings": [
f"Finding {i}" for i in range(min(len(successful_results), 5))
],
}
return await orchestrator.execute_parallel_research(
tasks=tasks,
research_executor=executor,
synthesis_callback=mock_synthesis,
)
@staticmethod
def validate_performance_thresholds(
agent_count: int,
sequential_time: float,
parallel_time: float,
memory_multiplier: float,
success_threshold: float = 0.8, # 80% of tests should pass threshold
) -> dict[str, bool]:
"""Validate performance against defined thresholds."""
speedup = sequential_time / max(parallel_time, 0.001) # Avoid division by zero
expected_speedup = PERFORMANCE_THRESHOLDS["speedup"].get(agent_count, 1.0)
max_memory_multiplier = PERFORMANCE_THRESHOLDS["memory_multiplier"]
return {
"speedup_threshold_met": speedup >= expected_speedup * success_threshold,
"memory_threshold_met": memory_multiplier <= max_memory_multiplier,
"speedup_achieved": speedup,
"speedup_expected": expected_speedup,
"memory_multiplier": memory_multiplier,
"memory_limit": max_memory_multiplier,
}
# Test fixtures
@pytest.fixture
def performance_monitor():
"""Create performance monitor for tests."""
def _create_monitor(test_name: str):
return PerformanceMonitor(test_name)
return _create_monitor
@pytest.fixture
def mock_executor_factory():
"""Factory for creating mock research executors with different configurations."""
def _create_executor(**kwargs):
return MockResearchExecutor(**kwargs)
return _create_executor
@pytest.fixture
def performance_tester():
"""Provide PerformanceTester utility."""
return PerformanceTester()
# Unit Performance Tests
@pytest.mark.unit
class TestParallelResearchPerformance:
"""Core performance tests for parallel research system."""
@pytest.mark.parametrize("agent_count", [2, 4])
@pytest.mark.parametrize(
"failure_rate", FAILURE_RATES[:2]
) # 0% and 10% for unit tests
async def test_parallel_speedup_thresholds(
self,
agent_count: int,
failure_rate: float,
performance_monitor,
mock_executor_factory,
performance_tester,
):
"""Test that parallel execution meets minimum speedup thresholds."""
test_name = f"speedup_{agent_count}agents_{int(failure_rate * 100)}pct_failure"
# Run multiple iterations for statistical significance
speedup_results = []
memory_results = []
for run in range(STATISTICAL_RUNS):
with performance_monitor(f"{test_name}_run{run}") as monitor:
# Create test configuration
config = ParallelResearchConfig(
max_concurrent_agents=agent_count,
timeout_per_agent=5,
enable_fallbacks=True,
rate_limit_delay=0.05, # Fast for testing
)
# Create tasks and executor
tasks = performance_tester.create_test_tasks(
agent_count, f"speedup_test_{run}"
)
executor = mock_executor_factory(
failure_rate=failure_rate,
base_delay=0.1,
delay_variance=0.1,
)
# Sample CPU/memory during test
monitor.sample_cpu()
# Run sequential baseline
sequential_start = time.time()
await performance_tester.run_sequential_baseline(tasks, executor)
sequential_time = time.time() - sequential_start
monitor.sample_cpu()
# Reset executor for parallel test
executor.execution_count = 0
executor.execution_log.clear()
# Run parallel test
parallel_start = time.time()
await performance_tester.run_parallel_test(tasks, config, executor)
parallel_time = time.time() - parallel_start
monitor.sample_cpu()
# Calculate metrics
speedup = sequential_time / max(parallel_time, 0.001)
speedup_results.append(speedup)
memory_results.append(monitor.metrics["memory_multiplier"])
logger.info(
f"Run {run + 1}: Sequential: {sequential_time:.3f}s, "
f"Parallel: {parallel_time:.3f}s, Speedup: {speedup:.2f}x"
)
# Statistical analysis
avg_speedup = statistics.mean(speedup_results)
median_speedup = statistics.median(speedup_results)
avg_memory_multiplier = statistics.mean(memory_results)
# Validate against thresholds
expected_speedup = PERFORMANCE_THRESHOLDS["speedup"][agent_count]
validation = performance_tester.validate_performance_thresholds(
agent_count=agent_count,
sequential_time=1.0, # Normalized
parallel_time=1.0 / avg_speedup, # Normalized
memory_multiplier=avg_memory_multiplier,
)
logger.info(
f"Performance summary for {agent_count} agents, {failure_rate * 100}% failure rate: "
f"Avg speedup: {avg_speedup:.2f}x (expected: {expected_speedup:.2f}x), "
f"Memory multiplier: {avg_memory_multiplier:.2f}x"
)
# Assertions with clear failure messages
assert validation["speedup_threshold_met"], (
f"Speedup threshold not met: achieved {avg_speedup:.2f}x, "
f"expected {expected_speedup:.2f}x (with 80% success rate)"
)
assert validation["memory_threshold_met"], (
f"Memory threshold exceeded: {avg_memory_multiplier:.2f}x > "
f"{PERFORMANCE_THRESHOLDS['memory_multiplier']}x limit"
)
# Performance characteristics validation
assert median_speedup > 1.0, "Parallel execution should show some speedup"
assert all(m < 10.0 for m in memory_results), (
"Memory usage should be reasonable"
)
async def test_performance_under_failures(
self, performance_monitor, mock_executor_factory, performance_tester
):
"""Test performance degradation under different failure scenarios."""
agent_count = 4
test_name = "failure_resilience_test"
results = {}
for failure_rate in FAILURE_RATES:
with performance_monitor(
f"{test_name}_{int(failure_rate * 100)}pct"
) as monitor:
config = ParallelResearchConfig(
max_concurrent_agents=agent_count,
timeout_per_agent=3,
enable_fallbacks=True,
rate_limit_delay=0.02,
)
tasks = performance_tester.create_test_tasks(
agent_count, f"failure_test_{int(failure_rate * 100)}"
)
executor = mock_executor_factory(
failure_rate=failure_rate,
base_delay=0.05,
include_timeouts=False, # No timeouts for this test
)
monitor.sample_cpu()
parallel_result = await performance_tester.run_parallel_test(
tasks, config, executor
)
monitor.sample_cpu()
results[failure_rate] = {
"successful_tasks": parallel_result.successful_tasks,
"failed_tasks": parallel_result.failed_tasks,
"execution_time": monitor.metrics["execution_time"],
"memory_multiplier": monitor.metrics["memory_multiplier"],
"success_rate": parallel_result.successful_tasks
/ (parallel_result.successful_tasks + parallel_result.failed_tasks),
}
# Validate failure handling - adjusted for realistic expectations
assert results[0.0]["success_rate"] == 1.0, (
"Zero failure rate should achieve 100% success"
)
assert results[0.10]["success_rate"] >= 0.7, (
"10% failure rate should maintain >70% success"
)
assert results[0.25]["success_rate"] >= 0.5, (
"25% failure rate should maintain >50% success"
)
# Validate performance doesn't degrade drastically with failures
baseline_time = results[0.0]["execution_time"]
assert results[0.10]["execution_time"] <= baseline_time * 1.5, (
"10% failure shouldn't increase time by >50%"
)
assert results[0.25]["execution_time"] <= baseline_time * 2.0, (
"25% failure shouldn't double execution time"
)
logger.info("Failure resilience test completed successfully")
async def test_memory_usage_patterns(
self, performance_monitor, mock_executor_factory, performance_tester
):
"""Test memory usage patterns across different agent counts."""
memory_results = {}
for agent_count in [1, 2, 4]:
with performance_monitor(f"memory_test_{agent_count}_agents") as monitor:
config = ParallelResearchConfig(
max_concurrent_agents=agent_count,
timeout_per_agent=5,
enable_fallbacks=True,
rate_limit_delay=0.01,
)
# Create larger dataset to test memory scaling
tasks = performance_tester.create_test_tasks(
agent_count * 2, f"memory_test_{agent_count}"
)
executor = mock_executor_factory(
failure_rate=0.0,
base_delay=0.05, # Short delay to focus on memory
)
# Force garbage collection before test
gc.collect()
monitor.sample_cpu()
# Run test with memory monitoring
result = await performance_tester.run_parallel_test(
tasks, config, executor
)
# Sample memory again
monitor.sample_cpu()
# Force another GC to see post-test memory
gc.collect()
await asyncio.sleep(0.1) # Allow cleanup
memory_results[agent_count] = {
"peak_memory_mb": monitor.metrics["peak_memory_mb"],
"memory_increase_mb": monitor.metrics["memory_increase_mb"],
"memory_multiplier": monitor.metrics["memory_multiplier"],
"successful_tasks": result.successful_tasks,
}
# Validate memory scaling is reasonable
baseline_memory = memory_results[1]["peak_memory_mb"]
memory_4x = memory_results[4]["peak_memory_mb"]
# 4x agents should not use more than 3x memory
memory_scaling = memory_4x / baseline_memory
assert memory_scaling <= PERFORMANCE_THRESHOLDS["memory_multiplier"], (
f"Memory scaling too high: {memory_scaling:.2f}x > "
f"{PERFORMANCE_THRESHOLDS['memory_multiplier']}x limit"
)
# Memory usage should scale sub-linearly (better than linear)
assert memory_scaling < 4.0, "Memory should scale sub-linearly with agent count"
logger.info(f"Memory scaling from 1 to 4 agents: {memory_scaling:.2f}x")
# Slow/Integration Performance Tests
@pytest.mark.slow
class TestParallelResearchIntegrationPerformance:
"""Integration performance tests with more realistic scenarios."""
async def test_deep_research_agent_parallel_integration(
self, performance_monitor, mock_executor_factory
):
"""Test DeepResearchAgent with parallel execution enabled."""
with performance_monitor("deep_research_agent_parallel") as monitor:
# Mock LLM for DeepResearchAgent
mock_llm = AsyncMock()
mock_llm.ainvoke.return_value.content = '{"analysis": "test analysis"}'
# Create agent with parallel execution enabled
agent = DeepResearchAgent(
llm=mock_llm,
persona="moderate",
enable_parallel_execution=True,
parallel_config=ParallelResearchConfig(
max_concurrent_agents=3,
timeout_per_agent=5,
enable_fallbacks=True,
),
)
monitor.sample_cpu()
# Mock the subagent execution to avoid real API calls
async def mock_subagent_execution(task: ResearchTask) -> dict[str, Any]:
await asyncio.sleep(0.1) # Simulate work
return {
"research_type": task.task_type,
"insights": [f"Mock insight for {task.task_type}"],
"sentiment": {"direction": "neutral", "confidence": 0.7},
"credibility_score": 0.8,
"sources": [{"title": "Mock source", "url": "http://example.com"}],
}
# Override the subagent execution method
agent._execute_subagent_task = mock_subagent_execution
# Run comprehensive research
result = await agent.research_comprehensive(
topic="Apple Inc comprehensive analysis",
session_id="integration_test",
depth="standard",
use_parallel_execution=True,
)
monitor.sample_cpu()
# Validate integration results
assert result["status"] == "success"
assert result["execution_mode"] == "parallel"
assert "parallel_execution_stats" in result
assert result["parallel_execution_stats"]["total_tasks"] > 0
assert result["execution_time_ms"] > 0
# Performance validation
execution_time_seconds = result["execution_time_ms"] / 1000
assert execution_time_seconds < 10.0, (
"Integration test should complete within 10s"
)
assert monitor.metrics["memory_multiplier"] < 5.0, (
"Memory usage should be reasonable"
)
logger.info("Deep research agent integration test passed")
@pytest.mark.parametrize(
"failure_rate", [0.25]
) # Only high failure rate for slow tests
async def test_high_failure_rate_resilience(
self,
failure_rate: float,
performance_monitor,
mock_executor_factory,
performance_tester,
):
"""Test system resilience under high failure rates."""
agent_count = 6 # More agents for integration testing
test_name = f"high_failure_resilience_{int(failure_rate * 100)}pct"
resilience_results = []
for run in range(STATISTICAL_RUNS):
with performance_monitor(f"{test_name}_run{run}") as monitor:
config = ParallelResearchConfig(
max_concurrent_agents=4, # Limit concurrency
timeout_per_agent=8, # Longer timeout for resilience
enable_fallbacks=True,
rate_limit_delay=0.1,
)
tasks = performance_tester.create_test_tasks(
agent_count, f"resilience_test_{run}"
)
executor = mock_executor_factory(
failure_rate=failure_rate,
base_delay=0.2, # Longer delays to simulate real API calls
delay_variance=0.3,
include_timeouts=True, # Include timeout scenarios
)
monitor.sample_cpu()
try:
result = await performance_tester.run_parallel_test(
tasks, config, executor
)
success_rate = result.successful_tasks / (
result.successful_tasks + result.failed_tasks
)
resilience_results.append(
{
"success_rate": success_rate,
"execution_time": monitor.metrics["execution_time"],
"memory_multiplier": monitor.metrics["memory_multiplier"],
"parallel_efficiency": result.parallel_efficiency,
}
)
except Exception as e:
logger.warning(f"Resilience test run {run} failed: {e}")
resilience_results.append(
{
"success_rate": 0.0,
"execution_time": monitor.metrics["execution_time"],
"memory_multiplier": monitor.metrics["memory_multiplier"],
"parallel_efficiency": 0.0,
}
)
monitor.sample_cpu()
# Analyze resilience results
avg_success_rate = statistics.mean(
[r["success_rate"] for r in resilience_results]
)
avg_execution_time = statistics.mean(
[r["execution_time"] for r in resilience_results]
)
# Validate system maintained reasonable performance under stress
min_acceptable_success = 0.5 # 50% success rate under 25% failure
assert avg_success_rate >= min_acceptable_success, (
f"System not resilient enough: {avg_success_rate:.2f} < {min_acceptable_success}"
)
assert avg_execution_time < 20.0, (
"High failure rate tests should still complete reasonably"
)
logger.info(
f"High failure rate resilience test: {avg_success_rate:.2f} average success rate, "
f"{avg_execution_time:.2f}s average execution time"
)
# Test Suite Performance Validation
@pytest.mark.unit
class TestSuitePerformance:
"""Validate overall test suite performance characteristics."""
async def test_total_test_duration_under_threshold(self, performance_monitor):
"""Validate that core performance tests complete within time budget."""
with performance_monitor("test_suite_duration") as monitor:
# Simulate running the key performance tests
await asyncio.sleep(0.1) # Placeholder for actual test execution
monitor.sample_cpu()
# This is a meta-test that would be updated based on actual suite performance
# For now, we validate the monitoring infrastructure works
assert monitor.metrics["execution_time"] < 1.0, (
"Meta-test should complete quickly"
)
assert monitor.metrics["memory_multiplier"] < 2.0, (
"Meta-test should use minimal memory"
)
def test_performance_threshold_configuration(self):
"""Validate performance threshold configuration is reasonable."""
# Test threshold sanity checks
assert PERFORMANCE_THRESHOLDS["speedup"][2] > 1.0, (
"2-agent speedup should exceed 1x"
)
assert (
PERFORMANCE_THRESHOLDS["speedup"][4] > PERFORMANCE_THRESHOLDS["speedup"][2]
), "4-agent speedup should exceed 2-agent"
assert PERFORMANCE_THRESHOLDS["memory_multiplier"] > 1.0, (
"Memory multiplier should allow some increase"
)
assert PERFORMANCE_THRESHOLDS["memory_multiplier"] < 10.0, (
"Memory multiplier should be reasonable"
)
assert PERFORMANCE_THRESHOLDS["max_test_duration"] > 10.0, (
"Test duration budget should be reasonable"
)
# Test failure rate configuration
assert all(0.0 <= rate <= 1.0 for rate in FAILURE_RATES), (
"Failure rates should be valid percentages"
)
assert len(set(FAILURE_RATES)) == len(FAILURE_RATES), (
"Failure rates should be unique"
)
# Test statistical significance configuration
assert STATISTICAL_RUNS >= 3, (
"Statistical runs should provide reasonable sample size"
)
assert STATISTICAL_RUNS <= 10, "Statistical runs should not be excessive for CI"
if __name__ == "__main__":
# Allow running individual performance tests for development
import sys
if len(sys.argv) > 1:
pytest.main([sys.argv[1], "-v", "-s", "--tb=short"])
else:
# Run unit performance tests by default
pytest.main([__file__, "-v", "-s", "-m", "unit", "--tb=short"])
```
--------------------------------------------------------------------------------
/tests/integration/test_mcp_tools.py:
--------------------------------------------------------------------------------
```python
"""
MCP Tools Integration Tests for Claude Desktop Interaction.
This test suite covers:
- All MCP tool registrations and functionality
- Tool parameter validation and error handling
- Tool response formats and data integrity
- Claude Desktop simulation and interaction patterns
- Real-world usage scenarios
- Performance and timeout handling
"""
import asyncio
import logging
from unittest.mock import Mock, patch
import numpy as np
import pytest
from fastmcp import Context
from maverick_mcp.api.routers.backtesting import setup_backtesting_tools
logger = logging.getLogger(__name__)
class MockFastMCP:
"""Mock FastMCP instance for testing tool registration."""
def __init__(self):
self.tools = {}
self.tool_functions = {}
def tool(self, name: str = None):
"""Mock tool decorator."""
def decorator(func):
tool_name = name or func.__name__
self.tools[tool_name] = {
"function": func,
"name": tool_name,
"signature": self._get_function_signature(func),
}
self.tool_functions[tool_name] = func
return func
return decorator
def _get_function_signature(self, func):
"""Extract function signature for validation."""
import inspect
sig = inspect.signature(func)
return {
"parameters": list(sig.parameters.keys()),
"annotations": {k: str(v.annotation) for k, v in sig.parameters.items()},
}
class TestMCPToolsIntegration:
"""Integration tests for MCP tools and Claude Desktop interaction."""
@pytest.fixture
def mock_mcp(self):
"""Create mock FastMCP instance."""
return MockFastMCP()
@pytest.fixture
def mock_context(self):
"""Create mock MCP context."""
context = Mock(spec=Context)
context.session = {}
return context
@pytest.fixture
async def setup_tools(self, mock_mcp):
"""Set up all backtesting tools for testing."""
setup_backtesting_tools(mock_mcp)
return mock_mcp
async def test_all_mcp_tools_registration(self, setup_tools):
"""Test that all MCP tools are properly registered."""
mcp = setup_tools
# Expected tools from backtesting router
expected_tools = [
"run_backtest",
"optimize_strategy",
"walk_forward_analysis",
"monte_carlo_simulation",
"compare_strategies",
"list_strategies",
"parse_strategy",
"backtest_portfolio",
"generate_backtest_charts",
"generate_optimization_charts",
"run_ml_strategy_backtest",
"train_ml_predictor",
"analyze_market_regimes",
"create_strategy_ensemble",
]
# Check all tools are registered
registered_tools = set(mcp.tools.keys())
expected_set = set(expected_tools)
missing_tools = expected_set - registered_tools
extra_tools = registered_tools - expected_set
assert len(missing_tools) == 0, f"Missing tools: {missing_tools}"
logger.info(f"✓ All {len(registered_tools)} MCP tools registered successfully")
if extra_tools:
logger.info(f"Additional tools found: {extra_tools}")
# Validate each tool has proper signature
for tool_name, tool_info in mcp.tools.items():
assert callable(tool_info["function"]), f"Tool {tool_name} is not callable"
assert "signature" in tool_info, f"Tool {tool_name} missing signature"
return {
"registered_tools": list(registered_tools),
"tool_count": len(registered_tools),
}
async def test_run_backtest_tool_comprehensive(self, setup_tools, mock_context):
"""Test run_backtest tool with comprehensive parameter validation."""
mcp = setup_tools
tool_func = mcp.tool_functions["run_backtest"]
# Test cases with different parameter combinations
test_cases = [
{
"name": "basic_sma_cross",
"params": {
"symbol": "AAPL",
"strategy": "sma_cross",
"fast_period": "10",
"slow_period": "20",
},
"should_succeed": True,
},
{
"name": "rsi_strategy",
"params": {
"symbol": "GOOGL",
"strategy": "rsi",
"period": "14",
"oversold": "30",
"overbought": "70",
},
"should_succeed": True,
},
{
"name": "invalid_symbol",
"params": {
"symbol": "", # Empty symbol
"strategy": "sma_cross",
},
"should_succeed": False,
},
{
"name": "invalid_strategy",
"params": {
"symbol": "AAPL",
"strategy": "nonexistent_strategy",
},
"should_succeed": False,
},
{
"name": "invalid_numeric_params",
"params": {
"symbol": "AAPL",
"strategy": "sma_cross",
"fast_period": "invalid_number",
},
"should_succeed": False,
},
]
results = {}
for test_case in test_cases:
try:
# Mock the VectorBT engine to avoid actual data fetching
with patch("maverick_mcp.backtesting.VectorBTEngine") as mock_engine:
mock_instance = Mock()
mock_engine.return_value = mock_instance
# Mock successful backtest result
mock_result = {
"symbol": test_case["params"]["symbol"],
"strategy_type": test_case["params"]["strategy"],
"metrics": {
"total_return": 0.15,
"sharpe_ratio": 1.2,
"max_drawdown": -0.12,
"total_trades": 25,
},
"trades": [],
"equity_curve": [10000, 10100, 10200, 10300],
"drawdown_series": [0, -0.01, -0.02, 0],
}
mock_instance.run_backtest.return_value = mock_result
# Execute tool
result = await tool_func(mock_context, **test_case["params"])
if test_case["should_succeed"]:
assert isinstance(result, dict), (
f"Result should be dict for {test_case['name']}"
)
assert "symbol" in result, (
f"Missing symbol in result for {test_case['name']}"
)
assert "metrics" in result, (
f"Missing metrics in result for {test_case['name']}"
)
results[test_case["name"]] = {"success": True, "result": result}
logger.info(f"✓ {test_case['name']} succeeded as expected")
else:
# If we got here, it didn't fail as expected
results[test_case["name"]] = {
"success": False,
"unexpected_success": True,
}
logger.warning(
f"⚠ {test_case['name']} succeeded but was expected to fail"
)
except Exception as e:
if test_case["should_succeed"]:
results[test_case["name"]] = {"success": False, "error": str(e)}
logger.error(f"✗ {test_case['name']} failed unexpectedly: {e}")
else:
results[test_case["name"]] = {
"success": True,
"expected_error": str(e),
}
logger.info(f"✓ {test_case['name']} failed as expected: {e}")
# Calculate success rate
total_tests = len(test_cases)
successful_tests = sum(1 for r in results.values() if r.get("success", False))
success_rate = successful_tests / total_tests
assert success_rate >= 0.8, f"Success rate too low: {success_rate:.1%}"
return {"test_results": results, "success_rate": success_rate}
async def test_strategy_tools_integration(self, setup_tools, mock_context):
"""Test strategy-related tools integration."""
mcp = setup_tools
# Test list_strategies tool
list_func = mcp.tool_functions["list_strategies"]
strategies_result = await list_func(mock_context)
assert isinstance(strategies_result, dict), "list_strategies should return dict"
assert "available_strategies" in strategies_result, (
"Missing available_strategies"
)
assert "total_count" in strategies_result, "Missing total_count"
assert strategies_result["total_count"] > 0, "Should have strategies available"
logger.info(f"✓ Found {strategies_result['total_count']} available strategies")
# Test parse_strategy tool
parse_func = mcp.tool_functions["parse_strategy"]
parse_test_cases = [
"Buy when RSI is below 30 and sell when above 70",
"Use 10-day and 20-day moving average crossover",
"MACD strategy with standard parameters",
"Invalid strategy description that makes no sense",
]
parse_results = {}
for description in parse_test_cases:
try:
result = await parse_func(mock_context, description=description)
assert isinstance(result, dict), "parse_strategy should return dict"
assert "success" in result, "Missing success field"
assert "strategy" in result, "Missing strategy field"
parse_results[description] = result
status = "✓" if result["success"] else "⚠"
logger.info(
f"{status} Parsed: '{description}' -> {result['strategy'].get('strategy_type', 'unknown')}"
)
except Exception as e:
parse_results[description] = {"error": str(e)}
logger.error(f"✗ Parse failed for: '{description}' - {e}")
return {
"strategies_list": strategies_result,
"parse_results": parse_results,
}
async def test_optimization_tools_integration(self, setup_tools, mock_context):
"""Test optimization-related tools integration."""
mcp = setup_tools
# Mock VectorBT engine for optimization tests
with patch("maverick_mcp.backtesting.VectorBTEngine") as mock_engine_class:
mock_engine = Mock()
mock_engine_class.return_value = mock_engine
# Mock optimization results
mock_optimization_result = {
"best_parameters": {"fast_period": 12, "slow_period": 26},
"best_performance": {
"total_return": 0.25,
"sharpe_ratio": 1.8,
"max_drawdown": -0.08,
},
"optimization_results": [
{
"parameters": {"fast_period": 10, "slow_period": 20},
"metrics": {"sharpe_ratio": 1.2},
},
{
"parameters": {"fast_period": 12, "slow_period": 26},
"metrics": {"sharpe_ratio": 1.8},
},
],
}
mock_engine.optimize_parameters.return_value = mock_optimization_result
# Test optimize_strategy tool
optimize_func = mcp.tool_functions["optimize_strategy"]
result = await optimize_func(
mock_context,
symbol="AAPL",
strategy="sma_cross",
optimization_level="medium",
top_n=5,
)
assert isinstance(result, dict), "optimize_strategy should return dict"
logger.info("✓ optimize_strategy tool executed successfully")
# Test walk_forward_analysis tool
walk_forward_func = mcp.tool_functions["walk_forward_analysis"]
# Mock walk-forward analysis
with patch(
"maverick_mcp.backtesting.StrategyOptimizer"
) as mock_optimizer_class:
mock_optimizer = Mock()
mock_optimizer_class.return_value = mock_optimizer
mock_walk_forward_result = {
"out_of_sample_performance": {
"total_return": 0.18,
"sharpe_ratio": 1.5,
"win_rate": 0.65,
},
"windows_tested": 4,
"average_window_performance": 0.15,
}
mock_optimizer.walk_forward_analysis.return_value = (
mock_walk_forward_result
)
result = await walk_forward_func(
mock_context,
symbol="AAPL",
strategy="sma_cross",
window_size=252,
step_size=63,
)
assert isinstance(result, dict), (
"walk_forward_analysis should return dict"
)
logger.info("✓ walk_forward_analysis tool executed successfully")
return {"optimization_tests": "completed"}
async def test_ml_tools_integration(self, setup_tools, mock_context):
"""Test ML-enhanced tools integration."""
mcp = setup_tools
# Test ML strategy tools
ml_tools = [
"run_ml_strategy_backtest",
"train_ml_predictor",
"analyze_market_regimes",
"create_strategy_ensemble",
]
ml_results = {}
for tool_name in ml_tools:
if tool_name in mcp.tool_functions:
try:
tool_func = mcp.tool_functions[tool_name]
# Mock ML dependencies
with patch(
"maverick_mcp.backtesting.VectorBTEngine"
) as mock_engine:
mock_instance = Mock()
mock_engine.return_value = mock_instance
# Mock historical data
import numpy as np
import pandas as pd
dates = pd.date_range(
start="2022-01-01", end="2023-12-31", freq="D"
)
mock_data = pd.DataFrame(
{
"open": np.random.uniform(100, 200, len(dates)),
"high": np.random.uniform(100, 200, len(dates)),
"low": np.random.uniform(100, 200, len(dates)),
"close": np.random.uniform(100, 200, len(dates)),
"volume": np.random.randint(
1000000, 10000000, len(dates)
),
},
index=dates,
)
mock_instance.get_historical_data.return_value = mock_data
# Test specific ML tools
if tool_name == "run_ml_strategy_backtest":
result = await tool_func(
mock_context,
symbol="AAPL",
strategy_type="ml_predictor",
model_type="random_forest",
)
elif tool_name == "train_ml_predictor":
result = await tool_func(
mock_context,
symbol="AAPL",
model_type="random_forest",
n_estimators=100,
)
elif tool_name == "analyze_market_regimes":
result = await tool_func(
mock_context,
symbol="AAPL",
method="hmm",
n_regimes=3,
)
elif tool_name == "create_strategy_ensemble":
result = await tool_func(
mock_context,
symbols=["AAPL", "GOOGL"],
base_strategies=["sma_cross", "rsi"],
)
ml_results[tool_name] = {
"success": True,
"type": type(result).__name__,
}
logger.info(f"✓ {tool_name} executed successfully")
except Exception as e:
ml_results[tool_name] = {"success": False, "error": str(e)}
logger.error(f"✗ {tool_name} failed: {e}")
else:
ml_results[tool_name] = {"success": False, "error": "Tool not found"}
return ml_results
async def test_visualization_tools_integration(self, setup_tools, mock_context):
"""Test visualization tools integration."""
mcp = setup_tools
visualization_tools = [
"generate_backtest_charts",
"generate_optimization_charts",
]
viz_results = {}
for tool_name in visualization_tools:
if tool_name in mcp.tool_functions:
try:
tool_func = mcp.tool_functions[tool_name]
# Mock VectorBT engine and visualization dependencies
with patch(
"maverick_mcp.backtesting.VectorBTEngine"
) as mock_engine:
mock_instance = Mock()
mock_engine.return_value = mock_instance
# Mock backtest result for charts
mock_result = {
"symbol": "AAPL",
"equity_curve": [10000, 10100, 10200, 10300, 10250],
"drawdown_series": [0, -0.01, -0.02, 0, -0.005],
"trades": [
{
"entry_time": "2023-01-01",
"exit_time": "2023-02-01",
"pnl": 100,
},
{
"entry_time": "2023-03-01",
"exit_time": "2023-04-01",
"pnl": -50,
},
],
"metrics": {
"total_return": 0.15,
"sharpe_ratio": 1.2,
"max_drawdown": -0.08,
"total_trades": 10,
},
}
mock_instance.run_backtest.return_value = mock_result
# Mock visualization functions
with patch(
"maverick_mcp.backtesting.visualization.generate_equity_curve"
) as mock_equity:
with patch(
"maverick_mcp.backtesting.visualization.generate_performance_dashboard"
) as mock_dashboard:
with patch(
"maverick_mcp.backtesting.visualization.generate_trade_scatter"
) as mock_scatter:
with patch(
"maverick_mcp.backtesting.visualization.generate_optimization_heatmap"
) as mock_heatmap:
# Mock chart returns (base64 strings)
mock_chart_data = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="
mock_equity.return_value = mock_chart_data
mock_dashboard.return_value = mock_chart_data
mock_scatter.return_value = mock_chart_data
mock_heatmap.return_value = mock_chart_data
# Execute visualization tool
result = await tool_func(
mock_context,
symbol="AAPL",
strategy="sma_cross",
theme="light",
)
assert isinstance(result, dict), (
f"{tool_name} should return dict"
)
# Validate chart data
for chart_name, chart_data in result.items():
assert isinstance(chart_data, str), (
f"Chart {chart_name} should be string"
)
assert len(chart_data) > 0, (
f"Chart {chart_name} should have data"
)
viz_results[tool_name] = {
"success": True,
"charts_generated": list(result.keys()),
"chart_count": len(result),
}
logger.info(
f"✓ {tool_name} generated {len(result)} charts successfully"
)
except Exception as e:
viz_results[tool_name] = {"success": False, "error": str(e)}
logger.error(f"✗ {tool_name} failed: {e}")
else:
viz_results[tool_name] = {"success": False, "error": "Tool not found"}
return viz_results
async def test_claude_desktop_simulation(self, setup_tools, mock_context):
"""Simulate realistic Claude Desktop usage patterns."""
mcp = setup_tools
# Simulate a typical Claude Desktop session
session_commands = [
{
"command": "List available strategies",
"tool": "list_strategies",
"params": {},
},
{
"command": "Run backtest for AAPL with SMA crossover",
"tool": "run_backtest",
"params": {
"symbol": "AAPL",
"strategy": "sma_cross",
"fast_period": "10",
"slow_period": "20",
},
},
{
"command": "Compare multiple strategies",
"tool": "compare_strategies",
"params": {
"symbol": "AAPL",
"strategies": ["sma_cross", "rsi", "macd"],
},
},
{
"command": "Generate charts for backtest",
"tool": "generate_backtest_charts",
"params": {
"symbol": "AAPL",
"strategy": "sma_cross",
},
},
]
session_results = []
# Mock all necessary dependencies for simulation
with patch("maverick_mcp.backtesting.VectorBTEngine") as mock_engine_class:
mock_engine = Mock()
mock_engine_class.return_value = mock_engine
# Mock results for different tools
mock_backtest_result = {
"symbol": "AAPL",
"strategy_type": "sma_cross",
"metrics": {"total_return": 0.15, "sharpe_ratio": 1.2},
"trades": [],
"equity_curve": [10000, 10150],
"drawdown_series": [0, -0.02],
}
mock_engine.run_backtest.return_value = mock_backtest_result
# Mock comparison results
with patch(
"maverick_mcp.backtesting.BacktestAnalyzer"
) as mock_analyzer_class:
mock_analyzer = Mock()
mock_analyzer_class.return_value = mock_analyzer
mock_comparison = {
"strategy_rankings": [
{"strategy": "sma_cross", "rank": 1, "sharpe_ratio": 1.2},
{"strategy": "rsi", "rank": 2, "sharpe_ratio": 1.1},
{"strategy": "macd", "rank": 3, "sharpe_ratio": 0.9},
],
"best_strategy": "sma_cross",
}
mock_analyzer.compare_strategies.return_value = mock_comparison
# Mock visualization
with patch(
"maverick_mcp.backtesting.visualization.generate_equity_curve"
) as mock_viz:
mock_viz.return_value = "mock_chart_data"
# Execute session commands
for command_info in session_commands:
try:
start_time = asyncio.get_event_loop().time()
tool_func = mcp.tool_functions[command_info["tool"]]
result = await tool_func(
mock_context, **command_info["params"]
)
execution_time = (
asyncio.get_event_loop().time() - start_time
)
session_results.append(
{
"command": command_info["command"],
"tool": command_info["tool"],
"success": True,
"execution_time": execution_time,
"result_type": type(result).__name__,
}
)
logger.info(
f"✓ '{command_info['command']}' completed in {execution_time:.3f}s"
)
except Exception as e:
session_results.append(
{
"command": command_info["command"],
"tool": command_info["tool"],
"success": False,
"error": str(e),
}
)
logger.error(f"✗ '{command_info['command']}' failed: {e}")
# Analyze session results
total_commands = len(session_commands)
successful_commands = sum(1 for r in session_results if r.get("success", False))
success_rate = successful_commands / total_commands
avg_execution_time = np.mean(
[r.get("execution_time", 0) for r in session_results if r.get("success")]
)
assert success_rate >= 0.75, f"Session success rate too low: {success_rate:.1%}"
assert avg_execution_time < 5.0, (
f"Average execution time too high: {avg_execution_time:.3f}s"
)
logger.info(
f"Claude Desktop Simulation Results:\n"
f" • Commands Executed: {total_commands}\n"
f" • Successful: {successful_commands}\n"
f" • Success Rate: {success_rate:.1%}\n"
f" • Avg Execution Time: {avg_execution_time:.3f}s"
)
return {
"session_results": session_results,
"success_rate": success_rate,
"avg_execution_time": avg_execution_time,
}
async def test_tool_parameter_validation_comprehensive(
self, setup_tools, mock_context
):
"""Test comprehensive parameter validation across all tools."""
mcp = setup_tools
validation_tests = []
# Test parameter validation for key tools
test_cases = [
{
"tool": "run_backtest",
"valid_params": {"symbol": "AAPL", "strategy": "sma_cross"},
"invalid_params": [
{"symbol": "", "strategy": "sma_cross"}, # Empty symbol
{"symbol": "AAPL", "strategy": ""}, # Empty strategy
{
"symbol": "AAPL",
"strategy": "sma_cross",
"fast_period": "not_a_number",
}, # Invalid number
],
},
{
"tool": "optimize_strategy",
"valid_params": {"symbol": "AAPL", "strategy": "sma_cross"},
"invalid_params": [
{
"symbol": "AAPL",
"strategy": "invalid_strategy",
}, # Invalid strategy
{
"symbol": "AAPL",
"strategy": "sma_cross",
"top_n": -1,
}, # Negative top_n
],
},
]
for test_case in test_cases:
tool_name = test_case["tool"]
if tool_name in mcp.tool_functions:
tool_func = mcp.tool_functions[tool_name]
# Test valid parameters
try:
with patch("maverick_mcp.backtesting.VectorBTEngine"):
await tool_func(mock_context, **test_case["valid_params"])
validation_tests.append(
{
"tool": tool_name,
"test": "valid_params",
"success": True,
}
)
except Exception as e:
validation_tests.append(
{
"tool": tool_name,
"test": "valid_params",
"success": False,
"error": str(e),
}
)
# Test invalid parameters
for invalid_params in test_case["invalid_params"]:
try:
with patch("maverick_mcp.backtesting.VectorBTEngine"):
await tool_func(mock_context, **invalid_params)
# If we got here, validation didn't catch the error
validation_tests.append(
{
"tool": tool_name,
"test": f"invalid_params_{invalid_params}",
"success": False,
"error": "Validation should have failed but didn't",
}
)
except Exception as e:
# Expected to fail
validation_tests.append(
{
"tool": tool_name,
"test": f"invalid_params_{invalid_params}",
"success": True,
"expected_error": str(e),
}
)
# Calculate validation success rate
total_validation_tests = len(validation_tests)
successful_validations = sum(
1 for t in validation_tests if t.get("success", False)
)
validation_success_rate = (
successful_validations / total_validation_tests
if total_validation_tests > 0
else 0
)
logger.info(
f"Parameter Validation Results:\n"
f" • Total Validation Tests: {total_validation_tests}\n"
f" • Successful Validations: {successful_validations}\n"
f" • Validation Success Rate: {validation_success_rate:.1%}"
)
return {
"validation_tests": validation_tests,
"validation_success_rate": validation_success_rate,
}
if __name__ == "__main__":
# Run MCP tools integration tests
pytest.main(
[
__file__,
"-v",
"--tb=short",
"--asyncio-mode=auto",
"--timeout=300", # 5 minute timeout
"--durations=10",
]
)
```
--------------------------------------------------------------------------------
/maverick_mcp/agents/supervisor.py:
--------------------------------------------------------------------------------
```python
"""
SupervisorAgent implementation using 2025 LangGraph patterns.
Orchestrates multiple specialized agents with intelligent routing, result synthesis,
and conflict resolution for comprehensive financial analysis.
"""
import logging
from datetime import datetime
from typing import Any
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import BaseTool
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph
from langgraph.types import Command
from maverick_mcp.agents.base import INVESTOR_PERSONAS, PersonaAwareAgent
from maverick_mcp.config.settings import get_settings
from maverick_mcp.exceptions import AgentInitializationError
from maverick_mcp.memory.stores import ConversationStore
from maverick_mcp.workflows.state import SupervisorState
logger = logging.getLogger(__name__)
settings = get_settings()
# Query routing matrix for intelligent agent selection
ROUTING_MATRIX = {
"market_screening": {
"agents": ["market"],
"primary": "market",
"parallel": False,
"confidence_threshold": 0.7,
"synthesis_required": False,
},
"technical_analysis": {
"agents": ["technical"],
"primary": "technical",
"parallel": False,
"confidence_threshold": 0.8,
"synthesis_required": False,
},
"stock_investment_decision": {
"agents": ["market", "technical"],
"primary": "technical",
"parallel": True,
"confidence_threshold": 0.85,
"synthesis_required": True,
},
"portfolio_analysis": {
"agents": ["market", "technical"],
"primary": "market",
"parallel": True,
"confidence_threshold": 0.75,
"synthesis_required": True,
},
"deep_research": {
"agents": ["research"], # Research agent handles comprehensive analysis
"primary": "research",
"parallel": False,
"confidence_threshold": 0.9,
"synthesis_required": False, # Research agent provides complete analysis
},
"company_research": {
"agents": ["research"], # Dedicated company research
"primary": "research",
"parallel": False,
"confidence_threshold": 0.85,
"synthesis_required": False,
},
"sentiment_analysis": {
"agents": ["research"], # Market sentiment analysis
"primary": "research",
"parallel": False,
"confidence_threshold": 0.8,
"synthesis_required": False,
},
"risk_assessment": {
"agents": ["market", "technical"], # Future risk agent integration
"primary": "market",
"parallel": True,
"confidence_threshold": 0.8,
"synthesis_required": True,
},
}
class QueryClassifier:
"""LLM-powered query classification with rule-based fallback."""
def __init__(self, llm: BaseChatModel):
self.llm = llm
async def classify_query(self, query: str, persona: str) -> dict[str, Any]:
"""Classify query using LLM with structured output."""
classification_prompt = f"""
Analyze this financial query and classify it for multi-agent routing.
Query: "{query}"
Investor Persona: {persona}
Classify into one of these categories:
1. market_screening - Finding stocks, sector analysis, market breadth
2. technical_analysis - Chart patterns, indicators, entry/exit points
3. stock_investment_decision - Complete analysis of specific stock(s)
4. portfolio_analysis - Portfolio optimization, risk assessment
5. deep_research - Fundamental analysis, company research, news analysis
6. risk_assessment - Position sizing, risk management, portfolio risk
Consider the complexity and return classification with confidence.
Return ONLY valid JSON in this exact format:
{{
"category": "category_name",
"confidence": 0.85,
"required_agents": ["agent1", "agent2"],
"complexity": "simple",
"estimated_execution_time_ms": 30000,
"parallel_capable": true,
"reasoning": "Brief explanation of classification"
}}
"""
try:
response = await self.llm.ainvoke(
[
SystemMessage(
content="You are a financial query classifier. Return only valid JSON."
),
HumanMessage(content=classification_prompt),
]
)
# Parse LLM response
import json
classification = json.loads(response.content.strip())
# Validate and enhance with routing matrix
category = classification.get("category", "stock_investment_decision")
routing_config = ROUTING_MATRIX.get(
category, ROUTING_MATRIX["stock_investment_decision"]
)
return {
**classification,
"routing_config": routing_config,
"timestamp": datetime.now(),
}
except Exception as e:
logger.warning(f"LLM classification failed: {e}, using rule-based fallback")
return self._rule_based_fallback(query, persona)
def _rule_based_fallback(self, query: str, persona: str) -> dict[str, Any]:
"""Rule-based classification fallback."""
query_lower = query.lower()
# Simple keyword-based classification
if any(
word in query_lower for word in ["screen", "find stocks", "scan", "search"]
):
category = "market_screening"
elif any(
word in query_lower
for word in ["chart", "technical", "rsi", "macd", "pattern"]
):
category = "technical_analysis"
elif any(
word in query_lower for word in ["portfolio", "allocation", "diversif"]
):
category = "portfolio_analysis"
elif any(
word in query_lower
for word in ["research", "fundamental", "news", "earnings"]
):
category = "deep_research"
elif any(
word in query_lower
for word in ["company", "business", "competitive", "industry"]
):
category = "company_research"
elif any(
word in query_lower for word in ["sentiment", "opinion", "mood", "feeling"]
):
category = "sentiment_analysis"
elif any(
word in query_lower for word in ["risk", "position size", "stop loss"]
):
category = "risk_assessment"
else:
category = "stock_investment_decision"
routing_config = ROUTING_MATRIX[category]
return {
"category": category,
"confidence": 0.6,
"required_agents": routing_config["agents"],
"complexity": "moderate",
"estimated_execution_time_ms": 60000,
"parallel_capable": routing_config["parallel"],
"reasoning": "Rule-based classification fallback",
"routing_config": routing_config,
"timestamp": datetime.now(),
}
class ResultSynthesizer:
"""Synthesize results from multiple agents with conflict resolution."""
def __init__(self, llm: BaseChatModel, persona):
self.llm = llm
self.persona = persona
async def synthesize_results(
self,
agent_results: dict[str, Any],
query_type: str,
conflicts: list[dict[str, Any]],
) -> dict[str, Any]:
"""Synthesize final recommendation from agent results."""
# Calculate agent weights based on query type and persona
weights = self._calculate_agent_weights(query_type, agent_results)
# Create synthesis prompt
synthesis_prompt = self._build_synthesis_prompt(
agent_results, weights, query_type, conflicts
)
# Use LLM to synthesize coherent response
synthesis_response = await self.llm.ainvoke(
[
SystemMessage(content="You are a financial analysis synthesizer."),
HumanMessage(content=synthesis_prompt),
]
)
return {
"synthesis": synthesis_response.content,
"weights_applied": weights,
"conflicts_resolved": len(conflicts),
"confidence_score": self._calculate_overall_confidence(
agent_results, weights
),
"contributing_agents": list(agent_results.keys()),
"persona_alignment": self._assess_persona_alignment(
synthesis_response.content
),
}
def _calculate_agent_weights(
self, query_type: str, agent_results: dict
) -> dict[str, float]:
"""Calculate weights for agent results based on context."""
base_weights = {
"market_screening": {"market": 0.9, "technical": 0.1},
"technical_analysis": {"market": 0.2, "technical": 0.8},
"stock_investment_decision": {"market": 0.4, "technical": 0.6},
"portfolio_analysis": {"market": 0.6, "technical": 0.4},
"deep_research": {"research": 1.0},
"company_research": {"research": 1.0},
"sentiment_analysis": {"research": 1.0},
"risk_assessment": {"market": 0.3, "technical": 0.3, "risk": 0.4},
}
weights = base_weights.get(query_type, {"market": 0.5, "technical": 0.5})
# Adjust weights based on agent confidence scores
for agent, base_weight in weights.items():
if agent in agent_results:
confidence = agent_results[agent].get("confidence_score", 0.5)
weights[agent] = base_weight * (0.5 + confidence * 0.5)
# Normalize weights to sum to 1.0
total_weight = sum(weights.values())
if total_weight > 0:
weights = {k: v / total_weight for k, v in weights.items()}
return weights
def _build_synthesis_prompt(
self,
agent_results: dict[str, Any],
weights: dict[str, float],
query_type: str,
conflicts: list[dict[str, Any]],
) -> str:
"""Build synthesis prompt for LLM."""
prompt = f"""
Synthesize a comprehensive financial analysis response from multiple specialized agents.
Query Type: {query_type}
Investor Persona: {self.persona.name} - {", ".join(self.persona.characteristics)}
Agent Results:
"""
for agent, result in agent_results.items():
weight = weights.get(agent, 0.0)
prompt += f"\n{agent.upper()} Agent (Weight: {weight:.2f}):\n"
prompt += f" - Confidence: {result.get('confidence_score', 0.5)}\n"
prompt += (
f" - Analysis: {result.get('analysis', 'No analysis provided')}\n"
)
if "recommendations" in result:
prompt += f" - Recommendations: {result['recommendations']}\n"
if conflicts:
prompt += f"\nConflicts Detected ({len(conflicts)}):\n"
for i, conflict in enumerate(conflicts, 1):
prompt += f"{i}. {conflict}\n"
prompt += f"""
Please synthesize these results into a coherent, actionable response that:
1. Weighs agent inputs according to their weights and confidence scores
2. Resolves any conflicts using the {self.persona.name} investor perspective
3. Provides clear, actionable recommendations aligned with {self.persona.name} characteristics
4. Includes appropriate risk disclaimers
5. Maintains professional, confident tone
Focus on actionable insights for the {self.persona.name} investor profile.
"""
return prompt
def _calculate_overall_confidence(
self, agent_results: dict, weights: dict[str, float]
) -> float:
"""Calculate weighted overall confidence score."""
total_confidence = 0.0
total_weight = 0.0
for agent, weight in weights.items():
if agent in agent_results:
confidence = agent_results[agent].get("confidence_score", 0.5)
total_confidence += confidence * weight
total_weight += weight
return total_confidence / total_weight if total_weight > 0 else 0.5
def _assess_persona_alignment(self, synthesis_content: str) -> float:
"""Assess how well synthesis aligns with investor persona."""
# Simple keyword-based alignment scoring
persona_keywords = {
"conservative": ["stable", "dividend", "low-risk", "preservation"],
"moderate": ["balanced", "diversified", "moderate", "growth"],
"aggressive": ["growth", "momentum", "high-return", "opportunity"],
}
keywords = persona_keywords.get(self.persona.name.lower(), [])
content_lower = synthesis_content.lower()
alignment_score = sum(1 for keyword in keywords if keyword in content_lower)
return min(alignment_score / len(keywords) if keywords else 0.5, 1.0)
class SupervisorAgent(PersonaAwareAgent):
"""
Multi-agent supervisor using 2025 LangGraph patterns.
Orchestrates MarketAnalysisAgent, TechnicalAnalysisAgent, and future DeepResearchAgent
with intelligent routing, result synthesis, and conflict resolution.
"""
def __init__(
self,
llm: BaseChatModel,
agents: dict[str, PersonaAwareAgent],
persona: str = "moderate",
checkpointer: MemorySaver | None = None,
ttl_hours: int = 1,
routing_strategy: str = "llm_powered",
synthesis_mode: str = "weighted",
conflict_resolution: str = "confidence_based",
max_iterations: int = 5,
):
"""Initialize supervisor with existing agent instances."""
if not agents:
raise AgentInitializationError(
agent_type="SupervisorAgent",
reason="No agents provided for supervision",
)
# Store agent references
self.agents = agents
self.market_agent = agents.get("market")
self.technical_agent = agents.get("technical")
self.research_agent = agents.get("research") # DeepResearchAgent integration
# Configuration
self.routing_strategy = routing_strategy
self.synthesis_mode = synthesis_mode
self.conflict_resolution = conflict_resolution
self.max_iterations = max_iterations
# Ensure all agents use the same persona
persona_obj = INVESTOR_PERSONAS.get(persona, INVESTOR_PERSONAS["moderate"])
for agent in agents.values():
if hasattr(agent, "persona"):
agent.persona = persona_obj
# Get supervisor-specific tools
supervisor_tools = self._get_supervisor_tools()
# Initialize base class
super().__init__(
llm=llm,
tools=supervisor_tools,
persona=persona,
checkpointer=checkpointer or MemorySaver(),
ttl_hours=ttl_hours,
)
# Initialize components
self.conversation_store = ConversationStore(ttl_hours=ttl_hours)
self.query_classifier = QueryClassifier(llm)
self.result_synthesizer = ResultSynthesizer(llm, self.persona)
logger.info(
f"SupervisorAgent initialized with {len(agents)} agents: {list(agents.keys())}"
)
def get_state_schema(self) -> type:
"""Return SupervisorState schema."""
return SupervisorState
def _get_supervisor_tools(self) -> list[BaseTool]:
"""Get tools specific to supervision and coordination."""
from langchain_core.tools import tool
tools = []
if self.market_agent:
@tool
async def query_market_agent(
query: str,
session_id: str,
screening_strategy: str = "momentum",
max_results: int = 20,
) -> dict[str, Any]:
"""Query the market analysis agent for stock screening and market analysis."""
try:
return await self.market_agent.analyze_market(
query=query,
session_id=session_id,
screening_strategy=screening_strategy,
max_results=max_results,
)
except Exception as e:
return {"error": f"Market agent error: {str(e)}"}
tools.append(query_market_agent)
if self.technical_agent:
@tool
async def query_technical_agent(
symbol: str, timeframe: str = "1d", indicators: list[str] | None = None
) -> dict[str, Any]:
"""Query the technical analysis agent for chart analysis and indicators."""
try:
if indicators is None:
indicators = ["sma_20", "rsi", "macd"]
return await self.technical_agent.analyze_stock(
symbol=symbol, timeframe=timeframe, indicators=indicators
)
except Exception as e:
return {"error": f"Technical agent error: {str(e)}"}
tools.append(query_technical_agent)
if self.research_agent:
@tool
async def query_research_agent(
query: str,
session_id: str,
research_scope: str = "comprehensive",
max_sources: int = 50,
timeframe: str = "1m",
) -> dict[str, Any]:
"""Query the deep research agent for comprehensive research and analysis."""
try:
return await self.research_agent.research_topic(
query=query,
session_id=session_id,
research_scope=research_scope,
max_sources=max_sources,
timeframe=timeframe,
)
except Exception as e:
return {"error": f"Research agent error: {str(e)}"}
@tool
async def analyze_company_research(
symbol: str, session_id: str, include_competitive: bool = True
) -> dict[str, Any]:
"""Perform comprehensive company research and fundamental analysis."""
try:
return await self.research_agent.research_company_comprehensive(
symbol=symbol,
session_id=session_id,
include_competitive_analysis=include_competitive,
)
except Exception as e:
return {"error": f"Company research error: {str(e)}"}
@tool
async def analyze_market_sentiment_research(
topic: str, session_id: str, timeframe: str = "1w"
) -> dict[str, Any]:
"""Analyze market sentiment using deep research capabilities."""
try:
return await self.research_agent.analyze_market_sentiment(
topic=topic, session_id=session_id, timeframe=timeframe
)
except Exception as e:
return {"error": f"Sentiment analysis error: {str(e)}"}
tools.extend(
[
query_research_agent,
analyze_company_research,
analyze_market_sentiment_research,
]
)
return tools
def _build_graph(self):
"""Build supervisor graph with multi-agent coordination."""
workflow = StateGraph(SupervisorState)
# Core supervisor nodes
workflow.add_node("analyze_query", self._analyze_query)
workflow.add_node("create_execution_plan", self._create_execution_plan)
workflow.add_node("route_to_agents", self._route_to_agents)
workflow.add_node("aggregate_results", self._aggregate_results)
workflow.add_node("resolve_conflicts", self._resolve_conflicts)
workflow.add_node("synthesize_response", self._synthesize_response)
# Agent invocation nodes
if self.market_agent:
workflow.add_node("invoke_market_agent", self._invoke_market_agent)
if self.technical_agent:
workflow.add_node("invoke_technical_agent", self._invoke_technical_agent)
if self.research_agent:
workflow.add_node("invoke_research_agent", self._invoke_research_agent)
# Coordination nodes
workflow.add_node("parallel_coordinator", self._parallel_coordinator)
# Tool node
if self.tools:
from langgraph.prebuilt import ToolNode
tool_node = ToolNode(self.tools)
workflow.add_node("tools", tool_node)
# Define workflow edges
workflow.add_edge(START, "analyze_query")
workflow.add_edge("analyze_query", "create_execution_plan")
workflow.add_edge("create_execution_plan", "route_to_agents")
# Conditional routing based on execution plan
workflow.add_conditional_edges(
"route_to_agents",
self._route_decision,
{
"market_only": "invoke_market_agent"
if self.market_agent
else "synthesize_response",
"technical_only": "invoke_technical_agent"
if self.technical_agent
else "synthesize_response",
"research_only": "invoke_research_agent"
if self.research_agent
else "synthesize_response",
"parallel_execution": "parallel_coordinator",
"use_tools": "tools" if self.tools else "synthesize_response",
"synthesize": "synthesize_response",
},
)
# Agent result collection
if self.market_agent:
workflow.add_edge("invoke_market_agent", "aggregate_results")
if self.technical_agent:
workflow.add_edge("invoke_technical_agent", "aggregate_results")
if self.research_agent:
workflow.add_edge("invoke_research_agent", "aggregate_results")
workflow.add_edge("parallel_coordinator", "aggregate_results")
if self.tools:
workflow.add_edge("tools", "aggregate_results")
# Conflict detection and resolution
workflow.add_conditional_edges(
"aggregate_results",
self._check_conflicts,
{"resolve": "resolve_conflicts", "synthesize": "synthesize_response"},
)
workflow.add_edge("resolve_conflicts", "synthesize_response")
workflow.add_edge("synthesize_response", END)
return workflow.compile(checkpointer=self.checkpointer)
# Workflow node implementations will continue...
# (The rest of the implementation follows the same pattern)
async def coordinate_agents(
self, query: str, session_id: str, **kwargs
) -> dict[str, Any]:
"""
Main entry point for multi-agent coordination.
Args:
query: User query requiring multiple agents
session_id: Session identifier
**kwargs: Additional parameters
Returns:
Coordinated response from multiple agents
"""
start_time = datetime.now()
# Initialize supervisor state
initial_state = {
"messages": [HumanMessage(content=query)],
"persona": self.persona.name,
"session_id": session_id,
"timestamp": datetime.now(),
"query_classification": {},
"execution_plan": [],
"current_subtask_index": 0,
"routing_strategy": self.routing_strategy,
"active_agents": [],
"agent_results": {},
"agent_confidence": {},
"agent_execution_times": {},
"agent_errors": {},
"workflow_status": "planning",
"parallel_execution": False,
"dependency_graph": {},
"max_iterations": self.max_iterations,
"current_iteration": 0,
"conflicts_detected": [],
"conflict_resolution": {},
"synthesis_weights": {},
"final_recommendation_confidence": 0.0,
"synthesis_mode": self.synthesis_mode,
"total_execution_time_ms": 0.0,
"agent_coordination_overhead_ms": 0.0,
"synthesis_time_ms": 0.0,
"cache_utilization": {},
"api_calls_made": 0,
"cache_hits": 0,
"cache_misses": 0,
# Legacy fields initialized as None for backward compatibility
"query_type": None,
"subtasks": None,
"current_subtask": None,
"workflow_plan": None,
"completed_steps": None,
"pending_steps": None,
"final_recommendations": None,
"confidence_scores": None,
"risk_warnings": None,
}
# Add any additional parameters
initial_state.update(kwargs)
# Execute supervision workflow
try:
result = await self.graph.ainvoke(
initial_state,
config={
"configurable": {
"thread_id": session_id,
"checkpoint_ns": "supervisor",
}
},
)
# Calculate total execution time
execution_time = (datetime.now() - start_time).total_seconds() * 1000
result["total_execution_time_ms"] = execution_time
return self._format_supervisor_response(result)
except Exception as e:
logger.error(f"Error in supervisor coordination: {e}")
return {
"status": "error",
"error": str(e),
"total_execution_time_ms": (datetime.now() - start_time).total_seconds()
* 1000,
"agent_type": "supervisor",
}
def _format_supervisor_response(self, result: dict[str, Any]) -> dict[str, Any]:
"""Format supervisor response for consistent output."""
return {
"status": "success",
"agent_type": "supervisor",
"persona": result.get("persona"),
"query_classification": result.get("query_classification", {}),
"agents_used": result.get("active_agents", []),
"synthesis": result.get("messages", [])[-1].content
if result.get("messages")
else "No synthesis available",
"confidence_score": result.get("final_recommendation_confidence", 0.0),
"execution_time_ms": result.get("total_execution_time_ms", 0.0),
"conflicts_resolved": len(result.get("conflicts_detected", [])),
"workflow_status": result.get("workflow_status", "completed"),
}
# Placeholder implementations for workflow nodes
# These will be implemented based on the specific node logic
async def _analyze_query(self, state: SupervisorState) -> Command:
"""Analyze query to determine routing strategy and requirements."""
query = state["messages"][-1].content if state["messages"] else ""
# Classify the query
classification = await self.query_classifier.classify_query(
query, state["persona"]
)
return Command(
goto="create_execution_plan",
update={
"query_classification": classification,
"workflow_status": "analyzing",
},
)
async def _create_execution_plan(self, state: SupervisorState) -> Command:
"""Create execution plan based on query classification."""
classification = state["query_classification"]
# Create execution plan based on classification
execution_plan = [
{
"task_id": "main_analysis",
"agents": classification.get("required_agents", ["market"]),
"parallel": classification.get("parallel_capable", False),
"priority": 1,
}
]
return Command(
goto="route_to_agents",
update={"execution_plan": execution_plan, "workflow_status": "planning"},
)
async def _route_to_agents(self, state: SupervisorState) -> Command:
"""Route query to appropriate agents based on execution plan."""
return Command(
goto="parallel_execution", update={"workflow_status": "executing"}
)
async def _route_decision(self, state: SupervisorState) -> str:
"""Decide routing strategy based on state."""
classification = state.get("query_classification", {})
required_agents = classification.get("required_agents", ["market"])
parallel = classification.get("parallel_capable", False)
if len(required_agents) == 1:
agent = required_agents[0]
if agent == "market" and self.market_agent:
return "market_only"
elif agent == "technical" and self.technical_agent:
return "technical_only"
elif agent == "research" and self.research_agent:
return "research_only"
elif len(required_agents) > 1 and parallel:
return "parallel_execution"
return "synthesize"
async def _parallel_coordinator(self, state: SupervisorState) -> Command:
"""Coordinate parallel execution of multiple agents."""
# This would implement parallel agent coordination
# For now, return to aggregation
return Command(
goto="aggregate_results", update={"workflow_status": "aggregating"}
)
async def _invoke_market_agent(self, state: SupervisorState) -> Command:
"""Invoke market analysis agent."""
if not self.market_agent:
return Command(
goto="aggregate_results",
update={"agent_errors": {"market": "Market agent not available"}},
)
try:
query = state["messages"][-1].content if state["messages"] else ""
result = await self.market_agent.analyze_market(
query=query, session_id=state["session_id"]
)
return Command(
goto="aggregate_results",
update={
"agent_results": {"market": result},
"active_agents": ["market"],
},
)
except Exception as e:
return Command(
goto="aggregate_results",
update={
"agent_errors": {"market": str(e)},
"active_agents": ["market"],
},
)
async def _invoke_technical_agent(self, state: SupervisorState) -> Command:
"""Invoke technical analysis agent."""
if not self.technical_agent:
return Command(
goto="aggregate_results",
update={"agent_errors": {"technical": "Technical agent not available"}},
)
# This would implement technical agent invocation
return Command(
goto="aggregate_results", update={"active_agents": ["technical"]}
)
async def _invoke_research_agent(self, state: SupervisorState) -> Command:
"""Invoke deep research agent (future implementation)."""
if not self.research_agent:
return Command(
goto="aggregate_results",
update={"agent_errors": {"research": "Research agent not available"}},
)
# Future implementation
return Command(goto="aggregate_results", update={"active_agents": ["research"]})
async def _aggregate_results(self, state: SupervisorState) -> Command:
"""Aggregate results from all agents."""
return Command(
goto="synthesize_response", update={"workflow_status": "synthesizing"}
)
def _check_conflicts(self, state: SupervisorState) -> str:
"""Check if there are conflicts between agent results."""
conflicts = state.get("conflicts_detected", [])
return "resolve" if conflicts else "synthesize"
async def _resolve_conflicts(self, state: SupervisorState) -> Command:
"""Resolve conflicts between agent recommendations."""
return Command(
goto="synthesize_response",
update={"conflict_resolution": {"strategy": "confidence_based"}},
)
async def _synthesize_response(self, state: SupervisorState) -> Command:
"""Synthesize final response from agent results."""
agent_results = state.get("agent_results", {})
conflicts = state.get("conflicts_detected", [])
classification = state.get("query_classification", {})
if agent_results:
synthesis = await self.result_synthesizer.synthesize_results(
agent_results=agent_results,
query_type=classification.get("category", "stock_investment_decision"),
conflicts=conflicts,
)
return Command(
goto="__end__",
update={
"final_recommendation_confidence": synthesis["confidence_score"],
"synthesis_weights": synthesis["weights_applied"],
"workflow_status": "completed",
"messages": state["messages"]
+ [HumanMessage(content=synthesis["synthesis"])],
},
)
else:
return Command(
goto="__end__",
update={
"workflow_status": "completed",
"messages": state["messages"]
+ [
HumanMessage(content="No agent results available for synthesis")
],
},
)
```
--------------------------------------------------------------------------------
/maverick_mcp/backtesting/strategies/ml/regime_aware.py:
--------------------------------------------------------------------------------
```python
"""Market regime-aware trading strategies with automatic strategy switching."""
import logging
from typing import Any
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
from maverick_mcp.backtesting.strategies.base import Strategy
logger = logging.getLogger(__name__)
class MarketRegimeDetector:
"""Detect market regimes using various statistical methods."""
def __init__(
self, method: str = "hmm", n_regimes: int = 3, lookback_period: int = 50
):
"""Initialize regime detector.
Args:
method: Detection method ('hmm', 'kmeans', 'threshold')
n_regimes: Number of market regimes to detect
lookback_period: Period for regime detection
"""
self.method = method
self.n_regimes = n_regimes
self.lookback_period = lookback_period
self.scaler = StandardScaler()
# Initialize detection model
self.model = None
self.is_fitted = False
self._initialize_model()
def _initialize_model(self):
"""Initialize regime detection model with better configurations."""
if self.method == "hmm":
# Use GaussianMixture with more stable configuration
self.model = GaussianMixture(
n_components=self.n_regimes,
covariance_type="diag", # Use diagonal covariance for stability
random_state=42,
max_iter=200,
tol=1e-6,
reg_covar=1e-6, # Regularization for numerical stability
init_params="kmeans", # Better initialization
warm_start=False,
)
elif self.method == "kmeans":
self.model = KMeans(
n_clusters=self.n_regimes,
random_state=42,
n_init=10,
max_iter=500,
tol=1e-6,
algorithm="lloyd", # More stable algorithm
)
elif self.method == "threshold":
# Threshold-based regime detection
self.model = None
else:
raise ValueError(f"Unsupported regime detection method: {self.method}")
def extract_regime_features(self, data: DataFrame) -> np.ndarray:
"""Extract robust features for regime detection.
Args:
data: Price data
Returns:
Feature array with consistent dimensionality and stability
"""
try:
# Validate input data
if data is None or data.empty or len(data) < 10:
logger.debug("Insufficient data for regime feature extraction")
return np.array([])
if "close" not in data.columns:
logger.warning("Close price data not available for regime features")
return np.array([])
features = []
returns = data["close"].pct_change().dropna()
if len(returns) == 0:
logger.debug("No valid returns data for regime features")
return np.array([])
# Rolling statistics with robust error handling
for window in [5, 10, 20]:
if len(returns) >= window:
window_returns = returns.rolling(window)
mean_return = window_returns.mean().iloc[-1]
std_return = window_returns.std().iloc[-1]
# Robust skewness and kurtosis
if window >= 5:
skew_return = window_returns.skew().iloc[-1]
kurt_return = window_returns.kurt().iloc[-1]
else:
skew_return = 0.0
kurt_return = 0.0
# Replace NaN/inf values with sensible defaults
features.extend(
[
mean_return if np.isfinite(mean_return) else 0.0,
std_return if np.isfinite(std_return) else 0.01,
skew_return if np.isfinite(skew_return) else 0.0,
kurt_return if np.isfinite(kurt_return) else 0.0,
]
)
else:
# Default values for insufficient data
features.extend([0.0, 0.01, 0.0, 0.0])
# Enhanced technical indicators for regime detection
current_price = data["close"].iloc[-1]
# Multiple timeframe trend strength
if len(data) >= 20:
# Short-term trend (20-day)
sma_20 = data["close"].rolling(20).mean()
sma_20_value = (
float(sma_20.iloc[-1]) if not pd.isna(sma_20.iloc[-1]) else 0.0
)
if sma_20_value != 0.0:
trend_strength_20 = (current_price - sma_20_value) / sma_20_value
else:
trend_strength_20 = 0.0
features.append(
trend_strength_20 if np.isfinite(trend_strength_20) else 0.0
)
# Price momentum (rate of change)
prev_price = (
float(data["close"].iloc[-20])
if not pd.isna(data["close"].iloc[-20])
else current_price
)
if prev_price != 0.0:
momentum_20 = (current_price - prev_price) / prev_price
else:
momentum_20 = 0.0
features.append(momentum_20 if np.isfinite(momentum_20) else 0.0)
else:
features.extend([0.0, 0.0])
# Multi-timeframe volatility regime detection
if len(returns) >= 20:
vol_short = returns.rolling(20).std().iloc[-1] * np.sqrt(
252
) # Annualized
vol_medium = (
returns.rolling(60).std().iloc[-1] * np.sqrt(252)
if len(returns) >= 60
else vol_short
)
# Volatility regime indicator
vol_regime = vol_short / vol_medium if vol_medium > 0 else 1.0
features.append(vol_regime if np.isfinite(vol_regime) else 1.0)
# Absolute volatility level (normalized)
vol_level = min(vol_short / 0.3, 3.0) # Cap at 3x of 30% volatility
features.append(vol_level if np.isfinite(vol_level) else 1.0)
else:
features.extend([1.0, 1.0])
# Market structure and volume features (if available)
if "volume" in data.columns and len(data) >= 10:
current_volume = data["volume"].iloc[-1]
# Volume trend
if len(data) >= 20:
volume_ma_short = data["volume"].rolling(10).mean().iloc[-1]
volume_ma_long = data["volume"].rolling(20).mean().iloc[-1]
volume_trend = (
volume_ma_short / volume_ma_long if volume_ma_long > 0 else 1.0
)
features.append(volume_trend if np.isfinite(volume_trend) else 1.0)
# Volume surge indicator
volume_surge = (
current_volume / volume_ma_long if volume_ma_long > 0 else 1.0
)
features.append(
min(volume_surge, 10.0) if np.isfinite(volume_surge) else 1.0
)
else:
features.extend([1.0, 1.0])
else:
features.extend([1.0, 1.0])
# Price dispersion (high-low range analysis)
if "high" in data.columns and "low" in data.columns and len(data) >= 10:
hl_range = (data["high"] - data["low"]) / data["close"]
avg_range = (
hl_range.rolling(20).mean().iloc[-1]
if len(data) >= 20
else hl_range.mean()
)
current_range = hl_range.iloc[-1]
range_regime = current_range / avg_range if avg_range > 0 else 1.0
features.append(range_regime if np.isfinite(range_regime) else 1.0)
else:
features.append(1.0)
feature_array = np.array(features)
# Final validation and cleaning
if len(feature_array) == 0:
return np.array([])
# Replace any remaining NaN/inf values
feature_array = np.nan_to_num(
feature_array, nan=0.0, posinf=1.0, neginf=-1.0
)
return feature_array
except Exception as e:
logger.error(f"Error extracting regime features: {e}")
return np.array([])
def detect_regime_threshold(self, data: DataFrame) -> int:
"""Detect regime using threshold-based method.
Args:
data: Price data
Returns:
Regime label (0: bear/declining, 1: sideways, 2: bull/trending)
"""
if len(data) < 20:
return 1 # Default to sideways
# Calculate trend and volatility measures
returns = data["close"].pct_change()
# Trend measure (20-day slope)
x = np.arange(20)
y = data["close"].iloc[-20:].values
trend_slope = np.polyfit(x, y, 1)[0] / y[-1] # Normalized slope
# Volatility measure
vol_20 = returns.rolling(20).std().iloc[-1] * np.sqrt(252)
# Define regime thresholds
trend_threshold = 0.001 # 0.1% daily trend threshold
vol_threshold = 0.25 # 25% annual volatility threshold
# Classify regime
if trend_slope > trend_threshold and vol_20 < vol_threshold:
return 2 # Bull/trending market
elif trend_slope < -trend_threshold and vol_20 > vol_threshold:
return 0 # Bear/declining market
else:
return 1 # Sideways/uncertain market
def fit_regimes(self, data: DataFrame) -> None:
"""Fit regime detection model to historical data with enhanced robustness.
Args:
data: Historical price data
"""
if self.method == "threshold":
self.is_fitted = True
return
try:
# Need sufficient data for stable regime detection
min_required_samples = max(50, self.n_regimes * 20)
if len(data) < min_required_samples + self.lookback_period:
logger.warning(
f"Insufficient data for regime fitting: {len(data)} < {min_required_samples + self.lookback_period}"
)
self.is_fitted = True
return
# Extract features for regime detection with temporal consistency
feature_list = []
feature_consistency_count = None
# Use overlapping windows for more stable regime detection
step_size = max(1, self.lookback_period // 10)
for i in range(self.lookback_period, len(data), step_size):
window_data = data.iloc[max(0, i - self.lookback_period) : i + 1]
features = self.extract_regime_features(window_data)
if len(features) > 0 and np.all(np.isfinite(features)):
# Check feature consistency
if feature_consistency_count is None:
feature_consistency_count = len(features)
elif len(features) != feature_consistency_count:
logger.warning(
f"Feature dimension mismatch: expected {feature_consistency_count}, got {len(features)}"
)
continue
feature_list.append(features)
if len(feature_list) < min_required_samples:
logger.warning(
f"Insufficient valid samples for regime fitting: {len(feature_list)} < {min_required_samples}"
)
self.is_fitted = True
return
# Ensure we have valid feature_list before creating array
if len(feature_list) == 0:
logger.warning(
"Empty feature list after filtering, cannot create feature matrix"
)
self.is_fitted = True
return
X = np.array(feature_list)
# Additional data quality checks
if X.size == 0:
logger.warning("Empty feature matrix, cannot fit regime detector")
self.is_fitted = True
return
elif np.any(np.isnan(X)) or np.any(np.isinf(X)):
logger.warning("Found NaN or inf values in feature matrix, cleaning...")
X = np.nan_to_num(X, nan=0.0, posinf=1.0, neginf=-1.0)
# Check for zero variance features
feature_std = np.std(X, axis=0)
zero_variance_features = np.where(feature_std < 1e-8)[0]
if len(zero_variance_features) > 0:
logger.debug(
f"Found {len(zero_variance_features)} zero-variance features"
)
# Add small noise to zero-variance features
for idx in zero_variance_features:
X[:, idx] += np.random.normal(0, 1e-6, X.shape[0])
# Scale features with robust scaler
X_scaled = self.scaler.fit_transform(X)
# Fit model with better error handling
try:
if self.method == "hmm":
# For GaussianMixture, ensure numerical stability
self.model.fit(X_scaled)
# Validate fitted model
if (
not hasattr(self.model, "weights_")
or len(self.model.weights_) != self.n_regimes
):
raise ValueError("Model fitting failed - invalid weights")
# Check convergence
if not self.model.converged_:
logger.warning(
"GaussianMixture did not converge, but will proceed"
)
elif self.method == "kmeans":
self.model.fit(X_scaled)
# Validate fitted model
if (
not hasattr(self.model, "cluster_centers_")
or len(self.model.cluster_centers_) != self.n_regimes
):
raise ValueError(
"KMeans fitting failed - invalid cluster centers"
)
self.is_fitted = True
# Log fitting success with model diagnostics
if self.method == "hmm":
avg_log_likelihood = self.model.score(X_scaled) / len(X_scaled)
logger.info(
f"Fitted {self.method} regime detector with {len(X)} samples, avg log-likelihood: {avg_log_likelihood:.4f}"
)
else:
inertia = (
self.model.inertia_
if hasattr(self.model, "inertia_")
else "N/A"
)
logger.info(
f"Fitted {self.method} regime detector with {len(X)} samples, inertia: {inertia}"
)
except Exception as model_error:
logger.error(f"Model fitting failed: {model_error}")
logger.info("Falling back to threshold method")
self.method = "threshold" # Fallback to threshold method
self.is_fitted = True
except Exception as e:
logger.error(f"Error fitting regime detector: {e}")
self.is_fitted = True # Allow fallback to threshold method
def detect_current_regime(self, data: DataFrame) -> int:
"""Detect current market regime with enhanced error handling.
Args:
data: Recent price data
Returns:
Regime label (0: bear, 1: sideways, 2: bull)
"""
if not self.is_fitted:
logger.debug("Regime detector not fitted, using threshold method")
return self.detect_regime_threshold(data)
try:
if self.method == "threshold":
return self.detect_regime_threshold(data)
# Extract features for current regime
features = self.extract_regime_features(data)
if len(features) == 0:
logger.debug("No features extracted, falling back to threshold method")
return self.detect_regime_threshold(data)
# Check for non-finite features only if features array is not empty
if features.size > 0 and np.any(~np.isfinite(features)):
logger.debug("Non-finite features detected, cleaning and proceeding")
features = np.nan_to_num(features, nan=0.0, posinf=1.0, neginf=-1.0)
# Validate feature consistency with training
expected_features = (
self.scaler.n_features_in_
if hasattr(self.scaler, "n_features_in_")
else None
)
if expected_features is not None and len(features) != expected_features:
logger.warning(
f"Feature count mismatch in prediction: expected {expected_features}, got {len(features)}"
)
return self.detect_regime_threshold(data)
# Scale features and predict regime
try:
X = self.scaler.transform([features])
regime = self.model.predict(X)[0]
# Validate regime prediction
if regime < 0 or regime >= self.n_regimes:
logger.warning(
f"Invalid regime prediction: {regime}, using threshold method"
)
return self.detect_regime_threshold(data)
return int(regime)
except Exception as pred_error:
logger.debug(
f"Prediction error: {pred_error}, falling back to threshold method"
)
return self.detect_regime_threshold(data)
except Exception as e:
logger.error(f"Error detecting current regime: {e}")
return self.detect_regime_threshold(data) # Always fallback to threshold
def get_regime_probabilities(self, data: DataFrame) -> np.ndarray:
"""Get probabilities for each regime.
Args:
data: Recent price data
Returns:
Array of regime probabilities
"""
if not self.is_fitted or self.method == "threshold":
# For threshold method, return deterministic probabilities
regime = self.detect_current_regime(data)
probs = np.zeros(self.n_regimes)
probs[regime] = 1.0
return probs
try:
features = self.extract_regime_features(data)
if len(features) == 0:
return np.ones(self.n_regimes) / self.n_regimes
elif features.size > 0 and np.any(np.isnan(features)):
return np.ones(self.n_regimes) / self.n_regimes
X = self.scaler.transform([features])
if hasattr(self.model, "predict_proba"):
return self.model.predict_proba(X)[0]
else:
# For methods without probabilities, return one-hot encoding
regime = self.model.predict(X)[0]
probs = np.zeros(self.n_regimes)
probs[regime] = 1.0
return probs
except Exception as e:
logger.error(f"Error getting regime probabilities: {e}")
return np.ones(self.n_regimes) / self.n_regimes
class RegimeAwareStrategy(Strategy):
"""Strategy that switches between different strategies based on market regime."""
def __init__(
self,
regime_strategies: dict[int, Strategy],
regime_detector: MarketRegimeDetector = None,
regime_names: dict[int, str] = None,
switch_threshold: float = 0.7,
min_regime_duration: int = 5,
parameters: dict[str, Any] = None,
):
"""Initialize regime-aware strategy.
Args:
regime_strategies: Dictionary mapping regime labels to strategies
regime_detector: Market regime detector instance
regime_names: Names for each regime
switch_threshold: Probability threshold for regime switching
min_regime_duration: Minimum duration before switching regimes
parameters: Additional parameters
"""
super().__init__(parameters)
self.regime_strategies = regime_strategies
self.regime_detector = regime_detector or MarketRegimeDetector()
self.regime_names = regime_names or {0: "Bear", 1: "Sideways", 2: "Bull"}
self.switch_threshold = switch_threshold
self.min_regime_duration = min_regime_duration
# Regime tracking
self.current_regime = 1 # Start with sideways
self.regime_history = []
self.regime_duration = 0
self.regime_switches = 0
@property
def name(self) -> str:
"""Get strategy name."""
strategy_names = [s.name for s in self.regime_strategies.values()]
return f"RegimeAware({','.join(strategy_names)})"
@property
def description(self) -> str:
"""Get strategy description."""
return f"Regime-aware strategy switching between {len(self.regime_strategies)} strategies based on market conditions"
def fit_regime_detector(self, data: DataFrame) -> None:
"""Fit regime detector to historical data.
Args:
data: Historical price data
"""
self.regime_detector.fit_regimes(data)
def update_current_regime(self, data: DataFrame, current_idx: int) -> bool:
"""Update current market regime.
Args:
data: Price data
current_idx: Current index in data
Returns:
True if regime changed, False otherwise
"""
# Get regime probabilities
window_data = data.iloc[
max(0, current_idx - self.regime_detector.lookback_period) : current_idx + 1
]
regime_probs = self.regime_detector.get_regime_probabilities(window_data)
# Find most likely regime
most_likely_regime = np.argmax(regime_probs)
max_prob = regime_probs[most_likely_regime]
# Check if we should switch regimes
regime_changed = False
if (
most_likely_regime != self.current_regime
and max_prob >= self.switch_threshold
and self.regime_duration >= self.min_regime_duration
):
old_regime = self.current_regime
self.current_regime = most_likely_regime
self.regime_duration = 0
self.regime_switches += 1
regime_changed = True
logger.info(
f"Regime switch: {self.regime_names.get(old_regime, old_regime)} -> "
f"{self.regime_names.get(self.current_regime, self.current_regime)} "
f"(prob: {max_prob:.3f})"
)
else:
self.regime_duration += 1
# Track regime history
self.regime_history.append(
{
"index": current_idx,
"regime": self.current_regime,
"probabilities": regime_probs.tolist(),
"duration": self.regime_duration,
"switched": regime_changed,
}
)
return regime_changed
def get_active_strategy(self) -> Strategy:
"""Get currently active strategy based on regime.
Returns:
Active strategy for current regime
"""
if self.current_regime in self.regime_strategies:
return self.regime_strategies[self.current_regime]
else:
# Fallback to first available strategy
return next(iter(self.regime_strategies.values()))
def generate_signals(self, data: DataFrame) -> tuple[Series, Series]:
"""Generate regime-aware trading signals.
Args:
data: Price data with OHLCV columns
Returns:
Tuple of (entry_signals, exit_signals) as boolean Series
"""
try:
# Validate input data
if data is None or len(data) == 0:
logger.warning("Empty or invalid data provided to generate_signals")
# Create empty Series with a dummy index to avoid empty array issues
dummy_index = pd.DatetimeIndex([pd.Timestamp.now()])
return pd.Series(False, index=dummy_index), pd.Series(
False, index=dummy_index
)
# Ensure minimum data requirements
min_required_data = max(50, self.regime_detector.lookback_period)
if len(data) < min_required_data:
logger.warning(
f"Insufficient data for regime-aware strategy: {len(data)} < {min_required_data}"
)
# Return all False signals but with valid data index
return pd.Series(False, index=data.index), pd.Series(
False, index=data.index
)
# Fit regime detector if not already done
if not self.regime_detector.is_fitted:
try:
self.fit_regime_detector(data)
except Exception as e:
logger.error(
f"Failed to fit regime detector: {e}, falling back to single strategy"
)
# Fallback to using first available strategy without regime switching
fallback_strategy = next(iter(self.regime_strategies.values()))
return fallback_strategy.generate_signals(data)
entry_signals = pd.Series(False, index=data.index)
exit_signals = pd.Series(False, index=data.index)
# Generate signals with regime awareness
current_strategy = None
for idx in range(len(data)):
# Update regime
regime_changed = self.update_current_regime(data, idx)
# Get active strategy
active_strategy = self.get_active_strategy()
# If regime changed, regenerate signals from new strategy
if regime_changed or current_strategy != active_strategy:
current_strategy = active_strategy
# Generate signals for remaining data
remaining_data = data.iloc[idx:]
if len(remaining_data) > 0:
strategy_entry, strategy_exit = (
current_strategy.generate_signals(remaining_data)
)
# Update signals for remaining period
end_idx = min(idx + len(strategy_entry), len(data))
entry_signals.iloc[idx:end_idx] = strategy_entry.iloc[
: end_idx - idx
]
exit_signals.iloc[idx:end_idx] = strategy_exit.iloc[
: end_idx - idx
]
logger.info(
f"Generated regime-aware signals with {self.regime_switches} regime switches"
)
return entry_signals, exit_signals
except Exception as e:
logger.error(f"Error generating regime-aware signals: {e}")
# Ensure we always return valid series even on error
if data is not None and len(data) > 0:
return pd.Series(False, index=data.index), pd.Series(
False, index=data.index
)
else:
# Create dummy index to avoid empty array issues
dummy_index = pd.DatetimeIndex([pd.Timestamp.now()])
return pd.Series(False, index=dummy_index), pd.Series(
False, index=dummy_index
)
def get_regime_analysis(self) -> dict[str, Any]:
"""Get analysis of regime detection and switching.
Returns:
Dictionary with regime analysis
"""
if not self.regime_history:
return {}
regime_counts = {}
regime_durations = {}
for record in self.regime_history:
regime = record["regime"]
regime_name = self.regime_names.get(regime, f"Regime_{regime}")
if regime_name not in regime_counts:
regime_counts[regime_name] = 0
regime_durations[regime_name] = []
regime_counts[regime_name] += 1
# Track regime durations
if record["switched"] and len(self.regime_history) > 1:
# Find duration of previous regime
prev_regime_start = 0
for i in range(len(self.regime_history) - 2, -1, -1):
if (
self.regime_history[i]["regime"]
!= self.regime_history[-1]["regime"]
):
prev_regime_start = i + 1
break
duration = len(self.regime_history) - prev_regime_start - 1
prev_regime = self.regime_history[prev_regime_start]["regime"]
prev_regime_name = self.regime_names.get(
prev_regime, f"Regime_{prev_regime}"
)
if prev_regime_name in regime_durations:
regime_durations[prev_regime_name].append(duration)
# Calculate average durations
avg_durations = {}
for regime_name, durations in regime_durations.items():
if durations:
avg_durations[regime_name] = np.mean(durations)
else:
avg_durations[regime_name] = 0
return {
"current_regime": self.regime_names.get(
self.current_regime, self.current_regime
),
"total_switches": self.regime_switches,
"regime_counts": regime_counts,
"average_regime_durations": avg_durations,
"regime_history": self.regime_history[-50:], # Last 50 records
"active_strategy": self.get_active_strategy().name,
}
def validate_parameters(self) -> bool:
"""Validate regime-aware strategy parameters.
Returns:
True if parameters are valid
"""
if not self.regime_strategies:
return False
if self.switch_threshold < 0 or self.switch_threshold > 1:
return False
if self.min_regime_duration < 0:
return False
# Validate individual strategies
for strategy in self.regime_strategies.values():
if not strategy.validate_parameters():
return False
return True
def get_default_parameters(self) -> dict[str, Any]:
"""Get default parameters for regime-aware strategy.
Returns:
Dictionary of default parameters
"""
return {
"switch_threshold": 0.7,
"min_regime_duration": 5,
"regime_detection_method": "hmm",
"n_regimes": 3,
"lookback_period": 50,
}
class AdaptiveRegimeStrategy(RegimeAwareStrategy):
"""Advanced regime-aware strategy with adaptive regime detection."""
def __init__(
self,
regime_strategies: dict[int, Strategy],
adaptation_frequency: int = 100,
regime_confidence_threshold: float = 0.6,
**kwargs,
):
"""Initialize adaptive regime strategy.
Args:
regime_strategies: Dictionary mapping regime labels to strategies
adaptation_frequency: How often to re-fit regime detector
regime_confidence_threshold: Minimum confidence for regime detection
**kwargs: Additional parameters for RegimeAwareStrategy
"""
super().__init__(regime_strategies, **kwargs)
self.adaptation_frequency = adaptation_frequency
self.regime_confidence_threshold = regime_confidence_threshold
self.last_adaptation = 0
@property
def name(self) -> str:
"""Get strategy name."""
return f"Adaptive{super().name}"
def adapt_regime_detector(self, data: DataFrame, current_idx: int) -> None:
"""Re-fit regime detector with recent data.
Args:
data: Price data
current_idx: Current index
"""
if current_idx - self.last_adaptation < self.adaptation_frequency:
return
try:
# Use recent data for adaptation
adaptation_data = data.iloc[max(0, current_idx - 500) : current_idx]
if len(adaptation_data) >= self.regime_detector.lookback_period:
logger.info(f"Adapting regime detector at index {current_idx}")
self.regime_detector.fit_regimes(adaptation_data)
self.last_adaptation = current_idx
except Exception as e:
logger.error(f"Error adapting regime detector: {e}")
def generate_signals(self, data: DataFrame) -> tuple[Series, Series]:
"""Generate adaptive regime-aware signals.
Args:
data: Price data with OHLCV columns
Returns:
Tuple of (entry_signals, exit_signals) as boolean Series
"""
# Periodically adapt regime detector
for idx in range(
self.adaptation_frequency, len(data), self.adaptation_frequency
):
self.adapt_regime_detector(data, idx)
# Generate signals using parent method
return super().generate_signals(data)
```
--------------------------------------------------------------------------------
/tests/integration/test_orchestration_complete.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Comprehensive Integration Test Suite for MaverickMCP Orchestration Capabilities
This test suite thoroughly validates all orchestration features including:
- agents_orchestrated_analysis with different personas and routing strategies
- agents_deep_research_financial with various research depths and focus areas
- agents_compare_multi_agent_analysis with different agent combinations
The tests simulate real Claude Desktop usage patterns and validate end-to-end
functionality with comprehensive error handling and performance monitoring.
"""
import asyncio
import json
import logging
import os
import sys
import time
import tracemalloc
from datetime import datetime
from typing import Any
# Add project root to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
# Test configuration
TEST_CONFIG = {
"timeout_seconds": 300, # 5 minutes max per test
"concurrent_limit": 3, # Maximum concurrent tests
"performance_monitoring": True,
"detailed_validation": True,
"save_results": True,
}
# Test scenarios for each orchestration tool
ORCHESTRATED_ANALYSIS_SCENARIOS = [
{
"name": "Conservative_LLM_Powered_Routing",
"query": "Analyze AAPL for long-term investment",
"persona": "conservative",
"routing_strategy": "llm_powered",
"max_agents": 2,
"parallel_execution": False,
},
{
"name": "Aggressive_Rule_Based_Routing",
"query": "Find high momentum growth stocks in tech",
"persona": "aggressive",
"routing_strategy": "rule_based",
"max_agents": 3,
"parallel_execution": True,
},
{
"name": "Moderate_Hybrid_Routing",
"query": "Portfolio analysis for MSFT GOOGL NVDA",
"persona": "moderate",
"routing_strategy": "hybrid",
"max_agents": 2,
"parallel_execution": True,
},
{
"name": "Day_Trader_Fast_Execution",
"query": "Quick technical analysis on SPY options",
"persona": "day_trader",
"routing_strategy": "rule_based",
"max_agents": 1,
"parallel_execution": False,
},
]
DEEP_RESEARCH_SCENARIOS = [
{
"name": "Basic_Company_Research",
"research_topic": "Tesla Inc stock analysis",
"persona": "moderate",
"research_depth": "basic",
"focus_areas": ["fundamentals"],
"timeframe": "7d",
},
{
"name": "Standard_Sector_Research",
"research_topic": "renewable energy sector trends",
"persona": "conservative",
"research_depth": "standard",
"focus_areas": ["market_sentiment", "competitive_landscape"],
"timeframe": "30d",
},
{
"name": "Comprehensive_Market_Research",
"research_topic": "AI and machine learning investment opportunities",
"persona": "aggressive",
"research_depth": "comprehensive",
"focus_areas": ["fundamentals", "technicals", "market_sentiment"],
"timeframe": "90d",
},
{
"name": "Exhaustive_Crypto_Research",
"research_topic": "Bitcoin and cryptocurrency market analysis",
"persona": "day_trader",
"research_depth": "exhaustive",
"focus_areas": ["technicals", "market_sentiment", "competitive_landscape"],
"timeframe": "1y",
},
]
MULTI_AGENT_COMPARISON_SCENARIOS = [
{
"name": "Market_vs_Supervisor_Stock_Analysis",
"query": "Should I invest in Apple stock now?",
"agent_types": ["market", "supervisor"],
"persona": "moderate",
},
{
"name": "Conservative_Multi_Agent_Portfolio",
"query": "Build a balanced portfolio for retirement",
"agent_types": ["market", "supervisor"],
"persona": "conservative",
},
{
"name": "Aggressive_Growth_Strategy",
"query": "Find the best growth stocks for 2025",
"agent_types": ["market", "supervisor"],
"persona": "aggressive",
},
]
ERROR_HANDLING_SCENARIOS = [
{
"tool": "orchestrated_analysis",
"params": {
"query": "", # Empty query
"persona": "invalid_persona",
"routing_strategy": "unknown_strategy",
},
},
{
"tool": "deep_research_financial",
"params": {
"research_topic": "XYZ",
"research_depth": "invalid_depth",
"focus_areas": ["invalid_area"],
},
},
{
"tool": "compare_multi_agent_analysis",
"params": {
"query": "test",
"agent_types": ["nonexistent_agent"],
"persona": "unknown",
},
},
]
class TestResult:
"""Container for individual test results."""
def __init__(self, test_name: str, tool_name: str):
self.test_name = test_name
self.tool_name = tool_name
self.start_time = time.time()
self.end_time: float | None = None
self.success = False
self.error: str | None = None
self.response: dict[str, Any] | None = None
self.execution_time_ms: float | None = None
self.memory_usage_mb: float | None = None
self.validation_results: dict[str, bool] = {}
def mark_completed(
self, success: bool, response: dict | None = None, error: str | None = None
):
"""Mark test as completed with results."""
self.end_time = time.time()
self.execution_time_ms = (self.end_time - self.start_time) * 1000
self.success = success
self.response = response
self.error = error
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"test_name": self.test_name,
"tool_name": self.tool_name,
"success": self.success,
"execution_time_ms": self.execution_time_ms,
"memory_usage_mb": self.memory_usage_mb,
"error": self.error,
"validation_results": self.validation_results,
"response_summary": self._summarize_response() if self.response else None,
}
def _summarize_response(self) -> dict[str, Any]:
"""Create summary of response for reporting."""
if not self.response:
return {}
summary = {
"status": self.response.get("status"),
"agent_type": self.response.get("agent_type"),
"persona": self.response.get("persona"),
}
# Add tool-specific summary fields
if "agents_used" in self.response:
summary["agents_used"] = self.response["agents_used"]
if "sources_analyzed" in self.response:
summary["sources_analyzed"] = self.response["sources_analyzed"]
if "agents_compared" in self.response:
summary["agents_compared"] = self.response["agents_compared"]
return summary
class IntegrationTestSuite:
"""Comprehensive integration test suite for MCP orchestration tools."""
def __init__(self):
self.setup_logging()
self.results: list[TestResult] = []
self.start_time = time.time()
self.session_id = f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Import MCP tools
self._import_tools()
def setup_logging(self):
"""Configure logging for test execution."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(
f"integration_test_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
),
logging.StreamHandler(),
],
)
self.logger = logging.getLogger(__name__)
def _import_tools(self):
"""Import and initialize MCP tools."""
try:
from maverick_mcp.api.routers.agents import (
compare_multi_agent_analysis,
deep_research_financial,
orchestrated_analysis,
)
self.orchestrated_analysis = orchestrated_analysis
self.deep_research_financial = deep_research_financial
self.compare_multi_agent_analysis = compare_multi_agent_analysis
self.logger.info("Successfully imported MCP orchestration tools")
except ImportError as e:
self.logger.error(f"Failed to import MCP tools: {e}")
raise RuntimeError(f"Cannot run tests without MCP tools: {e}")
def print_header(self, title: str):
"""Print formatted test section header."""
print(f"\n{'=' * 80}")
print(f" {title}")
print(f"{'=' * 80}")
def print_progress(self, current: int, total: int, test_name: str):
"""Print progress indicator."""
percentage = (current / total) * 100
print(f"[{current:2d}/{total}] ({percentage:5.1f}%) {test_name}")
async def test_orchestrated_analysis(self) -> list[TestResult]:
"""Test agents_orchestrated_analysis with various scenarios."""
self.print_header("Testing Orchestrated Analysis Tool")
results = []
for i, scenario in enumerate(ORCHESTRATED_ANALYSIS_SCENARIOS):
test_name = f"orchestrated_analysis_{scenario['name']}"
self.print_progress(i + 1, len(ORCHESTRATED_ANALYSIS_SCENARIOS), test_name)
result = TestResult(test_name, "agents_orchestrated_analysis")
try:
if TEST_CONFIG["performance_monitoring"]:
tracemalloc.start()
# Add unique session ID for each test
scenario_params = scenario.copy()
scenario_params["session_id"] = f"{self.session_id}_{test_name}"
# Execute with timeout
response = await asyncio.wait_for(
self.orchestrated_analysis(**scenario_params),
timeout=TEST_CONFIG["timeout_seconds"],
)
# Validate response
validation_results = self._validate_orchestrated_response(response)
result.validation_results = validation_results
success = (
all(validation_results.values())
and response.get("status") == "success"
)
result.mark_completed(success, response)
if TEST_CONFIG["performance_monitoring"]:
current, peak = tracemalloc.get_traced_memory()
result.memory_usage_mb = peak / 1024 / 1024
tracemalloc.stop()
self.logger.info(
f"✓ {test_name}: {'PASS' if success else 'FAIL'} "
f"({result.execution_time_ms:.0f}ms)"
)
except TimeoutError:
result.mark_completed(False, error="Test timeout")
self.logger.warning(f"✗ {test_name}: TIMEOUT")
except Exception as e:
result.mark_completed(False, error=str(e))
self.logger.error(f"✗ {test_name}: ERROR - {e}")
results.append(result)
return results
async def test_deep_research_financial(self) -> list[TestResult]:
"""Test agents_deep_research_financial with various scenarios."""
self.print_header("Testing Deep Research Financial Tool")
results = []
for i, scenario in enumerate(DEEP_RESEARCH_SCENARIOS):
test_name = f"deep_research_{scenario['name']}"
self.print_progress(i + 1, len(DEEP_RESEARCH_SCENARIOS), test_name)
result = TestResult(test_name, "agents_deep_research_financial")
try:
if TEST_CONFIG["performance_monitoring"]:
tracemalloc.start()
# Add unique session ID
scenario_params = scenario.copy()
scenario_params["session_id"] = f"{self.session_id}_{test_name}"
response = await asyncio.wait_for(
self.deep_research_financial(**scenario_params),
timeout=TEST_CONFIG["timeout_seconds"],
)
validation_results = self._validate_research_response(response)
result.validation_results = validation_results
success = (
all(validation_results.values())
and response.get("status") == "success"
)
result.mark_completed(success, response)
if TEST_CONFIG["performance_monitoring"]:
current, peak = tracemalloc.get_traced_memory()
result.memory_usage_mb = peak / 1024 / 1024
tracemalloc.stop()
self.logger.info(
f"✓ {test_name}: {'PASS' if success else 'FAIL'} "
f"({result.execution_time_ms:.0f}ms)"
)
except TimeoutError:
result.mark_completed(False, error="Test timeout")
self.logger.warning(f"✗ {test_name}: TIMEOUT")
except Exception as e:
result.mark_completed(False, error=str(e))
self.logger.error(f"✗ {test_name}: ERROR - {e}")
results.append(result)
return results
async def test_compare_multi_agent_analysis(self) -> list[TestResult]:
"""Test agents_compare_multi_agent_analysis with various scenarios."""
self.print_header("Testing Multi-Agent Comparison Tool")
results = []
for i, scenario in enumerate(MULTI_AGENT_COMPARISON_SCENARIOS):
test_name = f"multi_agent_{scenario['name']}"
self.print_progress(i + 1, len(MULTI_AGENT_COMPARISON_SCENARIOS), test_name)
result = TestResult(test_name, "agents_compare_multi_agent_analysis")
try:
if TEST_CONFIG["performance_monitoring"]:
tracemalloc.start()
scenario_params = scenario.copy()
scenario_params["session_id"] = f"{self.session_id}_{test_name}"
response = await asyncio.wait_for(
self.compare_multi_agent_analysis(**scenario_params),
timeout=TEST_CONFIG["timeout_seconds"],
)
validation_results = self._validate_comparison_response(response)
result.validation_results = validation_results
success = (
all(validation_results.values())
and response.get("status") == "success"
)
result.mark_completed(success, response)
if TEST_CONFIG["performance_monitoring"]:
current, peak = tracemalloc.get_traced_memory()
result.memory_usage_mb = peak / 1024 / 1024
tracemalloc.stop()
self.logger.info(
f"✓ {test_name}: {'PASS' if success else 'FAIL'} "
f"({result.execution_time_ms:.0f}ms)"
)
except TimeoutError:
result.mark_completed(False, error="Test timeout")
self.logger.warning(f"✗ {test_name}: TIMEOUT")
except Exception as e:
result.mark_completed(False, error=str(e))
self.logger.error(f"✗ {test_name}: ERROR - {e}")
results.append(result)
return results
async def test_error_handling(self) -> list[TestResult]:
"""Test error handling with invalid inputs."""
self.print_header("Testing Error Handling")
results = []
for i, scenario in enumerate(ERROR_HANDLING_SCENARIOS):
test_name = f"error_handling_{scenario['tool']}"
self.print_progress(i + 1, len(ERROR_HANDLING_SCENARIOS), test_name)
result = TestResult(test_name, scenario["tool"])
try:
# Get the tool function
tool_func = getattr(self, scenario["tool"])
# Add session ID
params = scenario["params"].copy()
params["session_id"] = f"{self.session_id}_{test_name}"
response = await asyncio.wait_for(
tool_func(**params),
timeout=60, # Shorter timeout for error cases
)
# For error handling tests, we expect graceful error handling
# Success means the tool returned an error response without crashing
has_error_field = (
"error" in response or response.get("status") == "error"
)
success = has_error_field and isinstance(response, dict)
result.validation_results = {"graceful_error_handling": success}
result.mark_completed(success, response)
self.logger.info(
f"✓ {test_name}: {'PASS' if success else 'FAIL'} - "
f"Graceful error handling: {has_error_field}"
)
except TimeoutError:
result.mark_completed(False, error="Test timeout")
self.logger.warning(f"✗ {test_name}: TIMEOUT")
except Exception as e:
# For error handling tests, exceptions are actually failures
result.mark_completed(False, error=f"Unhandled exception: {str(e)}")
self.logger.error(f"✗ {test_name}: UNHANDLED EXCEPTION - {e}")
results.append(result)
return results
async def test_concurrent_execution(self) -> list[TestResult]:
"""Test concurrent execution of multiple tools."""
self.print_header("Testing Concurrent Execution")
results = []
# Create concurrent test scenarios
concurrent_tasks = [
(
"concurrent_orchestrated",
self.orchestrated_analysis,
{
"query": "Analyze MSFT for investment",
"persona": "moderate",
"routing_strategy": "llm_powered",
"session_id": f"{self.session_id}_concurrent_1",
},
),
(
"concurrent_research",
self.deep_research_financial,
{
"research_topic": "Amazon business model",
"persona": "conservative",
"research_depth": "standard",
"session_id": f"{self.session_id}_concurrent_2",
},
),
(
"concurrent_comparison",
self.compare_multi_agent_analysis,
{
"query": "Best tech stocks for portfolio",
"persona": "aggressive",
"session_id": f"{self.session_id}_concurrent_3",
},
),
]
start_time = time.time()
try:
# Execute all tasks concurrently
concurrent_results = await asyncio.gather(
*[
task_func(**task_params)
for _, task_func, task_params in concurrent_tasks
],
return_exceptions=True,
)
execution_time = (time.time() - start_time) * 1000
# Process results
for i, (task_name, _, _) in enumerate(concurrent_tasks):
result = TestResult(task_name, "concurrent_execution")
if i < len(concurrent_results):
response = concurrent_results[i]
if isinstance(response, Exception):
result.mark_completed(False, error=str(response))
success = False
else:
success = (
isinstance(response, dict)
and response.get("status") != "error"
)
result.mark_completed(success, response)
result.validation_results = {"concurrent_execution": success}
self.logger.info(f"✓ {task_name}: {'PASS' if success else 'FAIL'}")
else:
result.mark_completed(False, error="No result returned")
self.logger.error(f"✗ {task_name}: No result returned")
results.append(result)
# Add overall concurrent test result
concurrent_summary = TestResult(
"concurrent_execution_summary", "performance"
)
concurrent_summary.execution_time_ms = execution_time
concurrent_summary.validation_results = {
"all_completed": len(concurrent_results) == len(concurrent_tasks),
"no_crashes": all(
not isinstance(r, Exception) for r in concurrent_results
),
"reasonable_time": execution_time < 180000, # 3 minutes
}
concurrent_summary.mark_completed(
all(concurrent_summary.validation_results.values()),
{
"concurrent_tasks": len(concurrent_tasks),
"total_time_ms": execution_time,
},
)
results.append(concurrent_summary)
self.logger.info(
f"Concurrent execution completed in {execution_time:.0f}ms"
)
except Exception as e:
error_result = TestResult("concurrent_execution_error", "performance")
error_result.mark_completed(False, error=str(e))
results.append(error_result)
self.logger.error(f"✗ Concurrent execution failed: {e}")
return results
def _validate_orchestrated_response(
self, response: dict[str, Any]
) -> dict[str, bool]:
"""Validate orchestrated analysis response format."""
validations = {
"has_status": "status" in response,
"has_agent_type": "agent_type" in response,
"has_persona": "persona" in response,
"has_session_id": "session_id" in response,
"status_is_success": response.get("status") == "success",
"has_routing_strategy": "routing_strategy" in response,
"has_execution_time": "execution_time_ms" in response
and isinstance(response.get("execution_time_ms"), int | float),
}
# Additional validations for successful responses
if response.get("status") == "success":
validations.update(
{
"has_agents_used": "agents_used" in response,
"agents_used_is_list": isinstance(
response.get("agents_used"), list
),
"has_synthesis_confidence": "synthesis_confidence" in response,
}
)
return validations
def _validate_research_response(self, response: dict[str, Any]) -> dict[str, bool]:
"""Validate deep research response format."""
validations = {
"has_status": "status" in response,
"has_agent_type": "agent_type" in response,
"has_persona": "persona" in response,
"has_research_topic": "research_topic" in response,
"status_is_success": response.get("status") == "success",
"has_research_depth": "research_depth" in response,
"has_focus_areas": "focus_areas" in response,
}
if response.get("status") == "success":
validations.update(
{
"has_sources_analyzed": "sources_analyzed" in response,
"sources_analyzed_is_numeric": isinstance(
response.get("sources_analyzed"), int | float
),
"has_research_confidence": "research_confidence" in response,
"has_validation_checks": "validation_checks_passed" in response,
}
)
return validations
def _validate_comparison_response(
self, response: dict[str, Any]
) -> dict[str, bool]:
"""Validate multi-agent comparison response format."""
validations = {
"has_status": "status" in response,
"has_query": "query" in response,
"has_persona": "persona" in response,
"status_is_success": response.get("status") == "success",
"has_agents_compared": "agents_compared" in response,
}
if response.get("status") == "success":
validations.update(
{
"agents_compared_is_list": isinstance(
response.get("agents_compared"), list
),
"has_comparison": "comparison" in response,
"comparison_is_dict": isinstance(response.get("comparison"), dict),
"has_execution_times": "execution_times_ms" in response,
"has_insights": "insights" in response,
}
)
return validations
async def run_performance_benchmark(self):
"""Run performance benchmarks for all tools."""
self.print_header("Performance Benchmarking")
benchmark_scenarios = [
(
"orchestrated_fast",
self.orchestrated_analysis,
{
"query": "Quick AAPL analysis",
"persona": "moderate",
"routing_strategy": "rule_based",
"max_agents": 1,
"parallel_execution": False,
},
),
(
"research_basic",
self.deep_research_financial,
{
"research_topic": "Microsoft",
"research_depth": "basic",
"persona": "moderate",
},
),
(
"comparison_minimal",
self.compare_multi_agent_analysis,
{
"query": "Compare AAPL vs MSFT",
"agent_types": ["market"],
"persona": "moderate",
},
),
]
performance_results = []
for test_name, tool_func, params in benchmark_scenarios:
print(f"Benchmarking {test_name}...")
# Add session ID
params["session_id"] = f"{self.session_id}_benchmark_{test_name}"
# Run multiple iterations for average performance
times = []
for i in range(3): # 3 iterations for average
start_time = time.time()
try:
await tool_func(**params)
end_time = time.time()
execution_time = (end_time - start_time) * 1000
times.append(execution_time)
except Exception as e:
self.logger.error(
f"Benchmark {test_name} iteration {i + 1} failed: {e}"
)
times.append(float("inf"))
# Calculate performance metrics
valid_times = [t for t in times if t != float("inf")]
if valid_times:
avg_time = sum(valid_times) / len(valid_times)
min_time = min(valid_times)
max_time = max(valid_times)
performance_results.append(
{
"test": test_name,
"avg_time_ms": avg_time,
"min_time_ms": min_time,
"max_time_ms": max_time,
"successful_runs": len(valid_times),
}
)
print(
f" {test_name}: Avg={avg_time:.0f}ms, Min={min_time:.0f}ms, Max={max_time:.0f}ms"
)
else:
print(f" {test_name}: All iterations failed")
return performance_results
def generate_test_report(self):
"""Generate comprehensive test report."""
self.print_header("Test Results Summary")
total_tests = len(self.results)
passed_tests = sum(1 for r in self.results if r.success)
failed_tests = total_tests - passed_tests
total_time = time.time() - self.start_time
print(f"Total Tests: {total_tests}")
print(f"Passed: {passed_tests} ({passed_tests / total_tests * 100:.1f}%)")
print(f"Failed: {failed_tests} ({failed_tests / total_tests * 100:.1f}%)")
print(f"Total Execution Time: {total_time:.2f}s")
# Group results by tool
by_tool = {}
for result in self.results:
if result.tool_name not in by_tool:
by_tool[result.tool_name] = []
by_tool[result.tool_name].append(result)
print("\nResults by Tool:")
for tool_name, tool_results in by_tool.items():
tool_passed = sum(1 for r in tool_results if r.success)
tool_total = len(tool_results)
print(
f" {tool_name}: {tool_passed}/{tool_total} passed "
f"({tool_passed / tool_total * 100:.1f}%)"
)
# Performance summary
execution_times = [
r.execution_time_ms for r in self.results if r.execution_time_ms
]
if execution_times:
avg_time = sum(execution_times) / len(execution_times)
print("\nPerformance Summary:")
print(f" Average execution time: {avg_time:.0f}ms")
print(f" Fastest test: {min(execution_times):.0f}ms")
print(f" Slowest test: {max(execution_times):.0f}ms")
# Failed tests details
failed_results = [r for r in self.results if not r.success]
if failed_results:
print("\nFailed Tests:")
for result in failed_results:
print(f" ✗ {result.test_name}: {result.error}")
return {
"summary": {
"total_tests": total_tests,
"passed_tests": passed_tests,
"failed_tests": failed_tests,
"pass_rate": passed_tests / total_tests if total_tests > 0 else 0,
"total_execution_time_s": total_time,
},
"by_tool": {
tool: {
"total": len(results),
"passed": sum(1 for r in results if r.success),
"pass_rate": sum(1 for r in results if r.success) / len(results),
}
for tool, results in by_tool.items()
},
"performance": {
"avg_execution_time_ms": avg_time if execution_times else None,
"min_execution_time_ms": min(execution_times)
if execution_times
else None,
"max_execution_time_ms": max(execution_times)
if execution_times
else None,
},
"failed_tests": [r.to_dict() for r in failed_results],
}
def save_results(self, report: dict[str, Any], performance_data: list[dict]):
"""Save detailed results to files."""
if not TEST_CONFIG["save_results"]:
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save detailed results
detailed_results = {
"test_session": self.session_id,
"timestamp": timestamp,
"config": TEST_CONFIG,
"summary": report,
"detailed_results": [r.to_dict() for r in self.results],
"performance_benchmarks": performance_data,
}
with open(f"integration_test_results_{timestamp}.json", "w") as f:
json.dump(detailed_results, f, indent=2, default=str)
print(f"\nDetailed results saved to: integration_test_results_{timestamp}.json")
async def run_all_tests(self):
"""Run the complete test suite."""
self.print_header(
f"MaverickMCP Orchestration Integration Test Suite - {self.session_id}"
)
print("Test Configuration:")
for key, value in TEST_CONFIG.items():
print(f" {key}: {value}")
try:
# Run all test categories
orchestrated_results = await self.test_orchestrated_analysis()
self.results.extend(orchestrated_results)
research_results = await self.test_deep_research_financial()
self.results.extend(research_results)
comparison_results = await self.test_compare_multi_agent_analysis()
self.results.extend(comparison_results)
error_handling_results = await self.test_error_handling()
self.results.extend(error_handling_results)
concurrent_results = await self.test_concurrent_execution()
self.results.extend(concurrent_results)
# Performance benchmarks
performance_data = await self.run_performance_benchmark()
# Generate and save report
report = self.generate_test_report()
self.save_results(report, performance_data)
# Final status
total_passed = sum(1 for r in self.results if r.success)
total_tests = len(self.results)
if total_passed == total_tests:
print(f"\n🎉 ALL TESTS PASSED! ({total_passed}/{total_tests})")
return 0
else:
print(f"\n⚠️ SOME TESTS FAILED ({total_passed}/{total_tests} passed)")
return 1
except Exception as e:
self.logger.error(f"Test suite execution failed: {e}")
print(f"\n💥 TEST SUITE EXECUTION FAILED: {e}")
return 2
async def main():
"""Main test execution function."""
# Set environment variables for testing if needed
if not os.getenv("OPENAI_API_KEY"):
print("⚠️ Warning: OPENAI_API_KEY not set - tests will use mock responses")
test_suite = IntegrationTestSuite()
exit_code = await test_suite.run_all_tests()
return exit_code
if __name__ == "__main__":
# Make the script executable
import sys
try:
exit_code = asyncio.run(main())
sys.exit(exit_code)
except KeyboardInterrupt:
print("\n🛑 Tests interrupted by user")
sys.exit(130) # SIGINT exit code
except Exception as e:
print(f"\n💥 Unexpected error: {e}")
sys.exit(1)
```
--------------------------------------------------------------------------------
/tests/test_orchestration_logging.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive test suite for OrchestrationLogger functionality.
This test suite covers:
- OrchestrationLogger initialization and configuration
- Context tracking and performance metrics
- Visual indicators and structured output formatting
- Parallel execution logging
- Method call decorators and context managers
- Resource usage monitoring
- Error handling and fallback logging
"""
import asyncio
import logging
import time
from unittest.mock import Mock, patch
import pytest
from maverick_mcp.utils.orchestration_logging import (
LogColors,
OrchestrationLogger,
get_orchestration_logger,
log_agent_execution,
log_fallback_trigger,
log_method_call,
log_parallel_execution,
log_performance_metrics,
log_resource_usage,
log_synthesis_operation,
log_tool_invocation,
)
class TestLogColors:
"""Test LogColors utility class."""
def test_color_constants(self):
"""Test that color constants are defined."""
assert hasattr(LogColors, "HEADER")
assert hasattr(LogColors, "OKBLUE")
assert hasattr(LogColors, "OKCYAN")
assert hasattr(LogColors, "OKGREEN")
assert hasattr(LogColors, "WARNING")
assert hasattr(LogColors, "FAIL")
assert hasattr(LogColors, "ENDC")
assert hasattr(LogColors, "BOLD")
assert hasattr(LogColors, "UNDERLINE")
# Verify they contain ANSI escape sequences
assert LogColors.HEADER.startswith("\033[")
assert LogColors.ENDC == "\033[0m"
class TestOrchestrationLogger:
"""Test OrchestrationLogger main functionality."""
def test_logger_initialization(self):
"""Test OrchestrationLogger initialization."""
logger = OrchestrationLogger("TestComponent")
assert logger.component_name == "TestComponent"
assert logger.request_id is None
assert logger.session_context == {}
assert isinstance(logger.logger, logging.Logger)
assert logger.logger.name == "maverick_mcp.orchestration.TestComponent"
def test_set_request_context(self):
"""Test setting request context."""
logger = OrchestrationLogger("TestComponent")
# Test with explicit request_id
logger.set_request_context(
request_id="req_123", session_id="session_456", custom_param="value"
)
assert logger.request_id == "req_123"
assert logger.session_context["session_id"] == "session_456"
assert logger.session_context["request_id"] == "req_123"
assert logger.session_context["custom_param"] == "value"
def test_set_request_context_auto_id(self):
"""Test auto-generation of request ID."""
logger = OrchestrationLogger("TestComponent")
logger.set_request_context(session_id="session_789")
assert logger.request_id is not None
assert len(logger.request_id) == 8 # UUID truncated to 8 chars
assert logger.session_context["session_id"] == "session_789"
assert logger.session_context["request_id"] == logger.request_id
def test_format_message_with_context(self):
"""Test message formatting with context."""
logger = OrchestrationLogger("TestComponent")
logger.set_request_context(request_id="req_123", session_id="session_456")
formatted = logger._format_message(
"INFO", "Test message", param1="value1", param2=42
)
assert "TestComponent" in formatted
assert "req:req_123" in formatted
assert "session:session_456" in formatted
assert "Test message" in formatted
assert "param1:value1" in formatted
assert "param2:42" in formatted
def test_format_message_without_context(self):
"""Test message formatting without context."""
logger = OrchestrationLogger("TestComponent")
formatted = logger._format_message("WARNING", "Warning message", error="test")
assert "TestComponent" in formatted
assert "Warning message" in formatted
assert "error:test" in formatted
# Should not contain context brackets when no context
assert "req:" not in formatted
assert "session:" not in formatted
def test_format_message_color_coding(self):
"""Test color coding in message formatting."""
logger = OrchestrationLogger("TestComponent")
debug_msg = logger._format_message("DEBUG", "Debug message")
info_msg = logger._format_message("INFO", "Info message")
warning_msg = logger._format_message("WARNING", "Warning message")
error_msg = logger._format_message("ERROR", "Error message")
assert LogColors.OKCYAN in debug_msg
assert LogColors.OKGREEN in info_msg
assert LogColors.WARNING in warning_msg
assert LogColors.FAIL in error_msg
# All should end with reset color
assert LogColors.ENDC in debug_msg
assert LogColors.ENDC in info_msg
assert LogColors.ENDC in warning_msg
assert LogColors.ENDC in error_msg
def test_logging_methods(self):
"""Test all logging level methods."""
logger = OrchestrationLogger("TestComponent")
with (
patch.object(logger.logger, "debug") as mock_debug,
patch.object(logger.logger, "info") as mock_info,
patch.object(logger.logger, "warning") as mock_warning,
patch.object(logger.logger, "error") as mock_error,
):
logger.debug("Debug message", param="debug")
logger.info("Info message", param="info")
logger.warning("Warning message", param="warning")
logger.error("Error message", param="error")
mock_debug.assert_called_once()
mock_info.assert_called_once()
mock_warning.assert_called_once()
mock_error.assert_called_once()
def test_none_value_filtering(self):
"""Test filtering of None values in message formatting."""
logger = OrchestrationLogger("TestComponent")
formatted = logger._format_message(
"INFO",
"Test message",
param1="value1",
param2=None, # Should be filtered out
param3="value3",
)
assert "param1:value1" in formatted
assert "param2:None" not in formatted
assert "param3:value3" in formatted
class TestGlobalLoggerRegistry:
"""Test global logger registry functionality."""
def test_get_orchestration_logger_creation(self):
"""Test creation of new orchestration logger."""
logger = get_orchestration_logger("NewComponent")
assert isinstance(logger, OrchestrationLogger)
assert logger.component_name == "NewComponent"
def test_get_orchestration_logger_reuse(self):
"""Test reuse of existing orchestration logger."""
logger1 = get_orchestration_logger("ReuseComponent")
logger2 = get_orchestration_logger("ReuseComponent")
assert logger1 is logger2 # Should be the same instance
def test_multiple_component_loggers(self):
"""Test multiple independent component loggers."""
logger_a = get_orchestration_logger("ComponentA")
logger_b = get_orchestration_logger("ComponentB")
assert logger_a is not logger_b
assert logger_a.component_name == "ComponentA"
assert logger_b.component_name == "ComponentB"
class TestLogMethodCallDecorator:
"""Test log_method_call decorator functionality."""
@pytest.fixture
def sample_class(self):
"""Create sample class for decorator testing."""
class SampleClass:
def __init__(self):
self.name = "SampleClass"
@log_method_call(component="TestComponent")
async def async_method(self, param1: str, param2: int = 10):
await asyncio.sleep(0.01)
return f"result_{param1}_{param2}"
@log_method_call(component="TestComponent", include_params=False)
async def async_method_no_params(self):
return "no_params_result"
@log_method_call(component="TestComponent", include_timing=False)
async def async_method_no_timing(self):
return "no_timing_result"
@log_method_call()
def sync_method(self, value: str):
return f"sync_{value}"
return SampleClass
@pytest.mark.asyncio
async def test_async_method_decoration_success(self, sample_class):
"""Test successful async method decoration."""
instance = sample_class()
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
result = await instance.async_method("test", param2=20)
assert result == "result_test_20"
# Verify logging calls
assert mock_logger.info.call_count == 2 # Start and success
start_call = mock_logger.info.call_args_list[0][0][0]
success_call = mock_logger.info.call_args_list[1][0][0]
assert "🚀 START async_method" in start_call
assert "params:" in start_call
assert "✅ SUCCESS async_method" in success_call
assert "duration:" in success_call
@pytest.mark.asyncio
async def test_async_method_decoration_no_params(self, sample_class):
"""Test async method decoration without parameter logging."""
instance = sample_class()
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
await instance.async_method_no_params()
start_call = mock_logger.info.call_args_list[0][0][0]
assert "params:" not in start_call
@pytest.mark.asyncio
async def test_async_method_decoration_no_timing(self, sample_class):
"""Test async method decoration without timing."""
instance = sample_class()
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
await instance.async_method_no_timing()
success_call = mock_logger.info.call_args_list[1][0][0]
assert "duration:" not in success_call
@pytest.mark.asyncio
async def test_async_method_decoration_error(self, sample_class):
"""Test async method decoration with error handling."""
class ErrorClass:
@log_method_call(component="ErrorComponent")
async def failing_method(self):
raise ValueError("Test error")
instance = ErrorClass()
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
with pytest.raises(ValueError, match="Test error"):
await instance.failing_method()
# Should log error
assert mock_logger.error.called
error_call = mock_logger.error.call_args[0][0]
assert "❌ ERROR failing_method" in error_call
assert "error: Test error" in error_call
def test_sync_method_decoration(self, sample_class):
"""Test synchronous method decoration."""
instance = sample_class()
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
result = instance.sync_method("test_value")
assert result == "sync_test_value"
# Should log start and success
assert mock_logger.info.call_count == 2
assert "🚀 START sync_method" in mock_logger.info.call_args_list[0][0][0]
assert "✅ SUCCESS sync_method" in mock_logger.info.call_args_list[1][0][0]
def test_component_name_inference(self):
"""Test automatic component name inference."""
class InferenceTest:
@log_method_call()
def test_method(self):
return "test"
instance = InferenceTest()
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
instance.test_method()
# Should infer component name from class
mock_get_logger.assert_called_with("InferenceTest")
def test_result_summary_extraction(self, sample_class):
"""Test extraction of result summaries for logging."""
class ResultClass:
@log_method_call(component="ResultComponent")
async def method_with_result_info(self):
return {
"execution_mode": "parallel",
"research_confidence": 0.85,
"parallel_execution_stats": {
"successful_tasks": 3,
"total_tasks": 4,
},
}
instance = ResultClass()
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
asyncio.run(instance.method_with_result_info())
success_call = mock_logger.info.call_args_list[1][0][0]
assert "mode: parallel" in success_call
assert "confidence: 0.85" in success_call
assert "tasks: 3/4" in success_call
class TestContextManagers:
"""Test context manager utilities."""
def test_log_parallel_execution_success(self):
"""Test log_parallel_execution context manager success case."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
with log_parallel_execution("TestComponent", "test operation", 3) as logger:
assert logger == mock_logger
time.sleep(0.01) # Simulate work
# Should log start and success
assert mock_logger.info.call_count == 2
start_call = mock_logger.info.call_args_list[0][0][0]
success_call = mock_logger.info.call_args_list[1][0][0]
assert "🔄 PARALLEL_START test operation" in start_call
assert "tasks: 3" in start_call
assert "🎯 PARALLEL_SUCCESS test operation" in success_call
assert "duration:" in success_call
def test_log_parallel_execution_error(self):
"""Test log_parallel_execution context manager error case."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
with pytest.raises(ValueError, match="Test error"):
with log_parallel_execution("TestComponent", "failing operation", 2):
raise ValueError("Test error")
# Should log start and error
assert mock_logger.info.call_count == 1 # Only start
assert mock_logger.error.call_count == 1
start_call = mock_logger.info.call_args[0][0]
error_call = mock_logger.error.call_args[0][0]
assert "🔄 PARALLEL_START failing operation" in start_call
assert "💥 PARALLEL_ERROR failing operation" in error_call
assert "error: Test error" in error_call
def test_log_agent_execution_success(self):
"""Test log_agent_execution context manager success case."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
with log_agent_execution(
"fundamental", "task_123", ["earnings", "valuation"]
) as logger:
assert logger == mock_logger
time.sleep(0.01)
# Should log start and success
assert mock_logger.info.call_count == 2
start_call = mock_logger.info.call_args_list[0][0][0]
success_call = mock_logger.info.call_args_list[1][0][0]
assert "🤖 AGENT_START task_123" in start_call
assert "focus: ['earnings', 'valuation']" in start_call
assert "🎉 AGENT_SUCCESS task_123" in success_call
def test_log_agent_execution_without_focus(self):
"""Test log_agent_execution without focus areas."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
with log_agent_execution("sentiment", "task_456"):
pass
start_call = mock_logger.info.call_args_list[0][0][0]
assert "focus:" not in start_call
def test_log_agent_execution_error(self):
"""Test log_agent_execution context manager error case."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
with pytest.raises(RuntimeError, match="Agent failed"):
with log_agent_execution("technical", "task_789"):
raise RuntimeError("Agent failed")
# Should log start and error
error_call = mock_logger.error.call_args[0][0]
assert "🔥 AGENT_ERROR task_789" in error_call
assert "error: Agent failed" in error_call
class TestUtilityLoggingFunctions:
"""Test utility logging functions."""
def test_log_tool_invocation_basic(self):
"""Test basic tool invocation logging."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
log_tool_invocation("test_tool")
mock_logger.info.assert_called_once()
call_arg = mock_logger.info.call_args[0][0]
assert "🔧 TOOL_INVOKE test_tool" in call_arg
def test_log_tool_invocation_with_request_data(self):
"""Test tool invocation logging with request data."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
request_data = {
"query": "This is a test query that is longer than 50 characters to test truncation",
"research_scope": "comprehensive",
"persona": "moderate",
}
log_tool_invocation("research_tool", request_data)
call_arg = mock_logger.info.call_args[0][0]
assert "🔧 TOOL_INVOKE research_tool" in call_arg
assert (
"query: 'This is a test query that is longer than 50 charac...'"
in call_arg
)
assert "scope: comprehensive" in call_arg
assert "persona: moderate" in call_arg
def test_log_synthesis_operation(self):
"""Test synthesis operation logging."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
log_synthesis_operation(
"parallel_research", 5, "Combined insights from multiple agents"
)
call_arg = mock_logger.info.call_args[0][0]
assert "🧠 SYNTHESIS parallel_research" in call_arg
assert "inputs: 5" in call_arg
assert "output: Combined insights from multiple agents" in call_arg
def test_log_synthesis_operation_without_output(self):
"""Test synthesis operation logging without output summary."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
log_synthesis_operation("basic_synthesis", 3)
call_arg = mock_logger.info.call_args[0][0]
assert "🧠 SYNTHESIS basic_synthesis" in call_arg
assert "inputs: 3" in call_arg
assert "output:" not in call_arg
def test_log_fallback_trigger(self):
"""Test fallback trigger logging."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
log_fallback_trigger(
"ParallelOrchestrator", "API timeout", "switch to sequential"
)
mock_logger.warning.assert_called_once()
call_arg = mock_logger.warning.call_args[0][0]
assert "⚠️ FALLBACK_TRIGGER API timeout" in call_arg
assert "action: switch to sequential" in call_arg
def test_log_performance_metrics(self):
"""Test performance metrics logging."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
metrics = {
"total_tasks": 5,
"successful_tasks": 4,
"failed_tasks": 1,
"parallel_efficiency": 2.3,
"total_duration": 1.5,
}
log_performance_metrics("TestComponent", metrics)
call_arg = mock_logger.info.call_args[0][0]
assert "📊 PERFORMANCE_METRICS" in call_arg
assert "total_tasks: 5" in call_arg
assert "successful_tasks: 4" in call_arg
assert "parallel_efficiency: 2.3" in call_arg
def test_log_resource_usage_complete(self):
"""Test resource usage logging with all parameters."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
log_resource_usage(
"ResourceComponent", api_calls=15, cache_hits=8, memory_mb=45.7
)
call_arg = mock_logger.info.call_args[0][0]
assert "📈 RESOURCE_USAGE" in call_arg
assert "api_calls: 15" in call_arg
assert "cache_hits: 8" in call_arg
assert "memory_mb: 45.7" in call_arg
def test_log_resource_usage_partial(self):
"""Test resource usage logging with partial parameters."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
log_resource_usage("ResourceComponent", api_calls=10, cache_hits=None)
call_arg = mock_logger.info.call_args[0][0]
assert "api_calls: 10" in call_arg
assert "cache_hits" not in call_arg
assert "memory_mb" not in call_arg
def test_log_resource_usage_no_params(self):
"""Test resource usage logging with no valid parameters."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
log_resource_usage(
"ResourceComponent", api_calls=None, cache_hits=None, memory_mb=None
)
# Should not call logger if no valid parameters
mock_logger.info.assert_not_called()
class TestIntegratedLoggingScenarios:
"""Test integrated logging scenarios."""
@pytest.mark.asyncio
async def test_complete_parallel_research_logging(self):
"""Test complete parallel research logging scenario."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
# Simulate complete parallel research workflow
class MockResearchAgent:
@log_method_call(component="ParallelOrchestrator")
async def execute_parallel_research(self, topic: str, session_id: str):
# Set context for this research session
orchestration_logger = get_orchestration_logger(
"ParallelOrchestrator"
)
orchestration_logger.set_request_context(
session_id=session_id, research_topic=topic[:50], task_count=3
)
# Log tool invocation
log_tool_invocation(
"deep_research",
{"query": topic, "research_scope": "comprehensive"},
)
# Execute parallel tasks
with log_parallel_execution(
"ParallelOrchestrator", "research execution", 3
):
# Simulate parallel agent executions
for i, agent_type in enumerate(
["fundamental", "sentiment", "technical"]
):
with log_agent_execution(
agent_type, f"task_{i}", ["focus1", "focus2"]
):
await asyncio.sleep(0.01) # Simulate work
# Log synthesis
log_synthesis_operation(
"parallel_research_synthesis", 3, "Comprehensive analysis"
)
# Log performance metrics
log_performance_metrics(
"ParallelOrchestrator",
{
"successful_tasks": 3,
"failed_tasks": 0,
"parallel_efficiency": 2.5,
},
)
# Log resource usage
log_resource_usage(
"ParallelOrchestrator", api_calls=15, cache_hits=5
)
return {
"status": "success",
"execution_mode": "parallel",
"research_confidence": 0.85,
}
agent = MockResearchAgent()
result = await agent.execute_parallel_research(
topic="Apple Inc comprehensive analysis",
session_id="integrated_test_123",
)
# Verify comprehensive logging occurred
assert mock_logger.info.call_count >= 8 # Multiple info logs expected
assert result["status"] == "success"
def test_logging_component_isolation(self):
"""Test that different components maintain separate logging contexts."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger_a = Mock()
mock_logger_b = Mock()
# Mock different loggers for different components
def get_logger_side_effect(component_name):
if component_name == "ComponentA":
return mock_logger_a
elif component_name == "ComponentB":
return mock_logger_b
else:
return Mock()
mock_get_logger.side_effect = get_logger_side_effect
# Component A operations
log_performance_metrics("ComponentA", {"metric_a": 1})
log_resource_usage("ComponentA", api_calls=5)
# Component B operations
log_performance_metrics("ComponentB", {"metric_b": 2})
log_fallback_trigger("ComponentB", "test reason", "test action")
# Verify isolation
assert mock_logger_a.info.call_count == 2 # Performance + resource
assert mock_logger_b.info.call_count == 1 # Performance only
assert mock_logger_b.warning.call_count == 1 # Fallback trigger
@pytest.mark.asyncio
async def test_error_propagation_with_logging(self):
"""Test that errors are properly logged and propagated."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
class ErrorComponent:
@log_method_call(component="ErrorComponent")
async def failing_operation(self):
with log_parallel_execution("ErrorComponent", "failing task", 1):
with log_agent_execution("test_agent", "failing_task"):
raise RuntimeError("Simulated failure")
component = ErrorComponent()
# Should properly propagate the error while logging it
with pytest.raises(RuntimeError, match="Simulated failure"):
await component.failing_operation()
# Verify error was logged at multiple levels
assert (
mock_logger.error.call_count >= 2
) # Method and context manager errors
def test_performance_timing_accuracy(self):
"""Test timing accuracy in logging decorators and context managers."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
@log_method_call(component="TimingTest")
def timed_function():
time.sleep(0.1) # Sleep for ~100ms
return "completed"
result = timed_function()
assert result == "completed"
# Check that timing was logged
success_call = mock_logger.info.call_args_list[1][0][0]
assert "duration:" in success_call
# Extract duration (rough check - timing can be imprecise in tests)
duration_part = [
part for part in success_call.split() if "duration:" in part
][0]
duration_value = float(duration_part.split(":")[-1].replace("s", ""))
assert (
0.05 <= duration_value <= 0.5
) # Should be around 0.1s with some tolerance
class TestLoggingUnderLoad:
"""Test logging behavior under various load conditions."""
@pytest.mark.asyncio
async def test_concurrent_logging_safety(self):
"""Test that concurrent logging operations are safe."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
@log_method_call(component="ConcurrentTest")
async def concurrent_task(task_id: int):
with log_agent_execution("test_agent", f"task_{task_id}"):
await asyncio.sleep(0.01)
return f"result_{task_id}"
# Run multiple tasks concurrently
tasks = [concurrent_task(i) for i in range(5)]
results = await asyncio.gather(*tasks)
# Verify all tasks completed
assert len(results) == 5
assert all("result_" in result for result in results)
# Logging should have occurred for all tasks
assert mock_logger.info.call_count >= 10 # At least 2 per task
def test_high_frequency_logging(self):
"""Test logging performance under high frequency operations."""
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
# Perform many logging operations quickly
start_time = time.time()
for i in range(100):
log_performance_metrics(
f"Component_{i % 5}", {"operation_id": i, "timestamp": time.time()}
)
end_time = time.time()
# Should complete quickly
assert (end_time - start_time) < 1.0 # Should take less than 1 second
# All operations should have been logged
assert mock_logger.info.call_count == 100
@pytest.mark.asyncio
async def test_memory_usage_tracking(self):
"""Test that logging doesn't consume excessive memory."""
import gc
with patch(
"maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
) as mock_get_logger:
mock_logger = Mock()
mock_get_logger.return_value = mock_logger
# Get baseline memory
gc.collect()
initial_objects = len(gc.get_objects())
# Perform many logging operations
for i in range(50):
logger = OrchestrationLogger(f"TestComponent_{i}")
logger.set_request_context(
session_id=f"session_{i}",
request_id=f"req_{i}",
large_data=f"data_{'x' * 100}_{i}", # Some larger context data
)
logger.info("Test message", param1=f"value_{i}", param2=i)
# Check memory growth
gc.collect()
final_objects = len(gc.get_objects())
# Memory growth should be reasonable (not growing indefinitely)
object_growth = final_objects - initial_objects
assert object_growth < 1000 # Reasonable threshold for test
```