This is page 2 of 29. Use http://codebase.md/wshobson/maverick-mcp?page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── feature_request.md
│ │ ├── question.md
│ │ └── security_report.md
│ ├── pull_request_template.md
│ └── workflows
│ ├── claude-code-review.yml
│ └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│ ├── launch.json
│ └── settings.json
├── alembic
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 001_initial_schema.py
│ ├── 003_add_performance_indexes.py
│ ├── 006_rename_metadata_columns.py
│ ├── 008_performance_optimization_indexes.py
│ ├── 009_rename_to_supply_demand.py
│ ├── 010_self_contained_schema.py
│ ├── 011_remove_proprietary_terms.py
│ ├── 013_add_backtest_persistence_models.py
│ ├── 014_add_portfolio_models.py
│ ├── 08e3945a0c93_merge_heads.py
│ ├── 9374a5c9b679_merge_heads_for_testing.py
│ ├── abf9b9afb134_merge_multiple_heads.py
│ ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│ ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│ ├── f0696e2cac15_add_essential_performance_indexes.py
│ └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── api
│ │ └── backtesting.md
│ ├── BACKTESTING.md
│ ├── COST_BASIS_SPECIFICATION.md
│ ├── deep_research_agent.md
│ ├── exa_research_testing_strategy.md
│ ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│ ├── PORTFOLIO.md
│ ├── SETUP_SELF_CONTAINED.md
│ └── speed_testing_framework.md
├── examples
│ ├── complete_speed_validation.py
│ ├── deep_research_integration.py
│ ├── llm_optimization_example.py
│ ├── llm_speed_demo.py
│ ├── monitoring_example.py
│ ├── parallel_research_example.py
│ ├── speed_optimization_demo.py
│ └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── circuit_breaker.py
│ │ ├── deep_research.py
│ │ ├── market_analysis.py
│ │ ├── optimized_research.py
│ │ ├── supervisor.py
│ │ └── technical_analysis.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api_server.py
│ │ ├── connection_manager.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ ├── stock_analysis.py
│ │ │ └── technical_analysis.py
│ │ ├── error_handling.py
│ │ ├── inspector_compatible_sse.py
│ │ ├── inspector_sse.py
│ │ ├── middleware
│ │ │ ├── error_handling.py
│ │ │ ├── mcp_logging.py
│ │ │ ├── rate_limiting_enhanced.py
│ │ │ └── security.py
│ │ ├── openapi_config.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── backtesting.py
│ │ │ ├── data_enhanced.py
│ │ │ ├── data.py
│ │ │ ├── health_enhanced.py
│ │ │ ├── health_tools.py
│ │ │ ├── health.py
│ │ │ ├── intelligent_backtesting.py
│ │ │ ├── introspection.py
│ │ │ ├── mcp_prompts.py
│ │ │ ├── monitoring.py
│ │ │ ├── news_sentiment_enhanced.py
│ │ │ ├── performance.py
│ │ │ ├── portfolio.py
│ │ │ ├── research.py
│ │ │ ├── screening_ddd.py
│ │ │ ├── screening_parallel.py
│ │ │ ├── screening.py
│ │ │ ├── technical_ddd.py
│ │ │ ├── technical_enhanced.py
│ │ │ ├── technical.py
│ │ │ └── tool_registry.py
│ │ ├── server.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── base_service.py
│ │ │ ├── market_service.py
│ │ │ ├── portfolio_service.py
│ │ │ ├── prompt_service.py
│ │ │ └── resource_service.py
│ │ ├── simple_sse.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── insomnia_export.py
│ │ └── postman_export.py
│ ├── application
│ │ ├── __init__.py
│ │ ├── commands
│ │ │ └── __init__.py
│ │ ├── dto
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_dto.py
│ │ ├── queries
│ │ │ ├── __init__.py
│ │ │ └── get_technical_analysis.py
│ │ └── screening
│ │ ├── __init__.py
│ │ ├── dtos.py
│ │ └── queries.py
│ ├── backtesting
│ │ ├── __init__.py
│ │ ├── ab_testing.py
│ │ ├── analysis.py
│ │ ├── batch_processing_stub.py
│ │ ├── batch_processing.py
│ │ ├── model_manager.py
│ │ ├── optimization.py
│ │ ├── persistence.py
│ │ ├── retraining_pipeline.py
│ │ ├── strategies
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── ml
│ │ │ │ ├── __init__.py
│ │ │ │ ├── adaptive.py
│ │ │ │ ├── ensemble.py
│ │ │ │ ├── feature_engineering.py
│ │ │ │ └── regime_aware.py
│ │ │ ├── ml_strategies.py
│ │ │ ├── parser.py
│ │ │ └── templates.py
│ │ ├── strategy_executor.py
│ │ ├── vectorbt_engine.py
│ │ └── visualization.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── constants.py
│ │ ├── database_self_contained.py
│ │ ├── database.py
│ │ ├── llm_optimization_config.py
│ │ ├── logging_settings.py
│ │ ├── plotly_config.py
│ │ ├── security_utils.py
│ │ ├── security.py
│ │ ├── settings.py
│ │ ├── technical_constants.py
│ │ ├── tool_estimation.py
│ │ └── validation.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── technical_analysis.py
│ │ └── visualization.py
│ ├── data
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── cache.py
│ │ ├── django_adapter.py
│ │ ├── health.py
│ │ ├── models.py
│ │ ├── performance.py
│ │ ├── session_management.py
│ │ └── validation.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── optimization.py
│ ├── dependencies.py
│ ├── domain
│ │ ├── __init__.py
│ │ ├── entities
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis.py
│ │ ├── events
│ │ │ └── __init__.py
│ │ ├── portfolio.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ ├── entities.py
│ │ │ ├── services.py
│ │ │ └── value_objects.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_service.py
│ │ ├── stock_analysis
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis_service.py
│ │ └── value_objects
│ │ ├── __init__.py
│ │ └── technical_indicators.py
│ ├── exceptions.py
│ ├── infrastructure
│ │ ├── __init__.py
│ │ ├── cache
│ │ │ └── __init__.py
│ │ ├── caching
│ │ │ ├── __init__.py
│ │ │ └── cache_management_service.py
│ │ ├── connection_manager.py
│ │ ├── data_fetching
│ │ │ ├── __init__.py
│ │ │ └── stock_data_service.py
│ │ ├── health
│ │ │ ├── __init__.py
│ │ │ └── health_checker.py
│ │ ├── persistence
│ │ │ ├── __init__.py
│ │ │ └── stock_repository.py
│ │ ├── providers
│ │ │ └── __init__.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ └── repositories.py
│ │ └── sse_optimizer.py
│ ├── langchain_tools
│ │ ├── __init__.py
│ │ ├── adapters.py
│ │ └── registry.py
│ ├── logging_config.py
│ ├── memory
│ │ ├── __init__.py
│ │ └── stores.py
│ ├── monitoring
│ │ ├── __init__.py
│ │ ├── health_check.py
│ │ ├── health_monitor.py
│ │ ├── integration_example.py
│ │ ├── metrics.py
│ │ ├── middleware.py
│ │ └── status_dashboard.py
│ ├── providers
│ │ ├── __init__.py
│ │ ├── dependencies.py
│ │ ├── factories
│ │ │ ├── __init__.py
│ │ │ ├── config_factory.py
│ │ │ └── provider_factory.py
│ │ ├── implementations
│ │ │ ├── __init__.py
│ │ │ ├── cache_adapter.py
│ │ │ ├── macro_data_adapter.py
│ │ │ ├── market_data_adapter.py
│ │ │ ├── persistence_adapter.py
│ │ │ └── stock_data_adapter.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── cache.py
│ │ │ ├── config.py
│ │ │ ├── macro_data.py
│ │ │ ├── market_data.py
│ │ │ ├── persistence.py
│ │ │ └── stock_data.py
│ │ ├── llm_factory.py
│ │ ├── macro_data.py
│ │ ├── market_data.py
│ │ ├── mocks
│ │ │ ├── __init__.py
│ │ │ ├── mock_cache.py
│ │ │ ├── mock_config.py
│ │ │ ├── mock_macro_data.py
│ │ │ ├── mock_market_data.py
│ │ │ ├── mock_persistence.py
│ │ │ └── mock_stock_data.py
│ │ ├── openrouter_provider.py
│ │ ├── optimized_screening.py
│ │ ├── optimized_stock_data.py
│ │ └── stock_data.py
│ ├── README.md
│ ├── tests
│ │ ├── __init__.py
│ │ ├── README_INMEMORY_TESTS.md
│ │ ├── test_cache_debug.py
│ │ ├── test_fixes_validation.py
│ │ ├── test_in_memory_routers.py
│ │ ├── test_in_memory_server.py
│ │ ├── test_macro_data_provider.py
│ │ ├── test_mailgun_email.py
│ │ ├── test_market_calendar_caching.py
│ │ ├── test_mcp_tool_fixes_pytest.py
│ │ ├── test_mcp_tool_fixes.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_models_functional.py
│ │ ├── test_server.py
│ │ ├── test_stock_data_enhanced.py
│ │ ├── test_stock_data_provider.py
│ │ └── test_technical_analysis.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── performance_monitoring.py
│ │ ├── portfolio_manager.py
│ │ ├── risk_management.py
│ │ └── sentiment_analysis.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── agent_errors.py
│ │ ├── batch_processing.py
│ │ ├── cache_warmer.py
│ │ ├── circuit_breaker_decorators.py
│ │ ├── circuit_breaker_services.py
│ │ ├── circuit_breaker.py
│ │ ├── data_chunking.py
│ │ ├── database_monitoring.py
│ │ ├── debug_utils.py
│ │ ├── fallback_strategies.py
│ │ ├── llm_optimization.py
│ │ ├── logging_example.py
│ │ ├── logging_init.py
│ │ ├── logging.py
│ │ ├── mcp_logging.py
│ │ ├── memory_profiler.py
│ │ ├── monitoring_middleware.py
│ │ ├── monitoring.py
│ │ ├── orchestration_logging.py
│ │ ├── parallel_research.py
│ │ ├── parallel_screening.py
│ │ ├── quick_cache.py
│ │ ├── resource_manager.py
│ │ ├── shutdown.py
│ │ ├── stock_helpers.py
│ │ ├── structured_logger.py
│ │ ├── tool_monitoring.py
│ │ ├── tracing.py
│ │ └── yfinance_pool.py
│ ├── validation
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── data.py
│ │ ├── middleware.py
│ │ ├── portfolio.py
│ │ ├── responses.py
│ │ ├── screening.py
│ │ └── technical.py
│ └── workflows
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── market_analyzer.py
│ │ ├── optimizer_agent.py
│ │ ├── strategy_selector.py
│ │ └── validator_agent.py
│ ├── backtesting_workflow.py
│ └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── dev.sh
│ ├── INSTALLATION_GUIDE.md
│ ├── load_example.py
│ ├── load_market_data.py
│ ├── load_tiingo_data.py
│ ├── migrate_db.py
│ ├── README_TIINGO_LOADER.md
│ ├── requirements_tiingo.txt
│ ├── run_stock_screening.py
│ ├── run-migrations.sh
│ ├── seed_db.py
│ ├── seed_sp500.py
│ ├── setup_database.sh
│ ├── setup_self_contained.py
│ ├── setup_sp500_database.sh
│ ├── test_seeded_data.py
│ ├── test_tiingo_loader.py
│ ├── tiingo_config.py
│ └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── core
│ │ └── test_technical_analysis.py
│ ├── data
│ │ └── test_portfolio_models.py
│ ├── domain
│ │ ├── conftest.py
│ │ ├── test_portfolio_entities.py
│ │ └── test_technical_analysis_service.py
│ ├── fixtures
│ │ └── orchestration_fixtures.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── README.md
│ │ ├── run_integration_tests.sh
│ │ ├── test_api_technical.py
│ │ ├── test_chaos_engineering.py
│ │ ├── test_config_management.py
│ │ ├── test_full_backtest_workflow_advanced.py
│ │ ├── test_full_backtest_workflow.py
│ │ ├── test_high_volume.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_orchestration_complete.py
│ │ ├── test_portfolio_persistence.py
│ │ ├── test_redis_cache.py
│ │ ├── test_security_integration.py.disabled
│ │ └── vcr_setup.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── test_benchmarks.py
│ │ ├── test_load.py
│ │ ├── test_profiling.py
│ │ └── test_stress.py
│ ├── providers
│ │ └── test_stock_data_simple.py
│ ├── README.md
│ ├── test_agents_router_mcp.py
│ ├── test_backtest_persistence.py
│ ├── test_cache_management_service.py
│ ├── test_cache_serialization.py
│ ├── test_circuit_breaker.py
│ ├── test_database_pool_config_simple.py
│ ├── test_database_pool_config.py
│ ├── test_deep_research_functional.py
│ ├── test_deep_research_integration.py
│ ├── test_deep_research_parallel_execution.py
│ ├── test_error_handling.py
│ ├── test_event_loop_integrity.py
│ ├── test_exa_research_integration.py
│ ├── test_exception_hierarchy.py
│ ├── test_financial_search.py
│ ├── test_graceful_shutdown.py
│ ├── test_integration_simple.py
│ ├── test_langgraph_workflow.py
│ ├── test_market_data_async.py
│ ├── test_market_data_simple.py
│ ├── test_mcp_orchestration_functional.py
│ ├── test_ml_strategies.py
│ ├── test_optimized_research_agent.py
│ ├── test_orchestration_integration.py
│ ├── test_orchestration_logging.py
│ ├── test_orchestration_tools_simple.py
│ ├── test_parallel_research_integration.py
│ ├── test_parallel_research_orchestrator.py
│ ├── test_parallel_research_performance.py
│ ├── test_performance_optimizations.py
│ ├── test_production_validation.py
│ ├── test_provider_architecture.py
│ ├── test_rate_limiting_enhanced.py
│ ├── test_runner_validation.py
│ ├── test_security_comprehensive.py.disabled
│ ├── test_security_cors.py
│ ├── test_security_enhancements.py.disabled
│ ├── test_security_headers.py
│ ├── test_security_penetration.py
│ ├── test_session_management.py
│ ├── test_speed_optimization_validation.py
│ ├── test_stock_analysis_dependencies.py
│ ├── test_stock_analysis_service.py
│ ├── test_stock_data_fetching_service.py
│ ├── test_supervisor_agent.py
│ ├── test_supervisor_functional.py
│ ├── test_tool_estimation_config.py
│ ├── test_visualization.py
│ └── utils
│ ├── test_agent_errors.py
│ ├── test_logging.py
│ ├── test_parallel_screening.py
│ └── test_quick_cache.py
├── tools
│ ├── check_orchestration_config.py
│ ├── experiments
│ │ ├── validation_examples.py
│ │ └── validation_fixed.py
│ ├── fast_dev.sh
│ ├── hot_reload.py
│ ├── quick_test.py
│ └── templates
│ ├── new_router_template.py
│ ├── new_tool_template.py
│ ├── screening_strategy_template.py
│ └── test_template.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/maverick_mcp/api/utils/postman_export.py:
--------------------------------------------------------------------------------
```python
"""
Postman Collection Export Utility
Converts OpenAPI specifications to Postman collection format.
"""
from typing import Any
def convert_to_postman(openapi_dict: dict[str, Any]) -> dict[str, Any]:
"""
Convert OpenAPI specification to Postman collection format.
Args:
openapi_dict: OpenAPI specification dictionary
Returns:
Postman collection dictionary
"""
info = openapi_dict.get("info", {})
collection = {
"info": {
"name": info.get("title", "API Collection"),
"description": info.get("description", "Exported from OpenAPI spec"),
"version": info.get("version", "1.0.0"),
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json",
},
"item": [],
"variable": [],
}
# Add server variables
servers = openapi_dict.get("servers", [])
if servers:
collection["variable"].append(
{
"key": "baseUrl",
"value": servers[0].get("url", "http://localhost:8000"),
"type": "string",
}
)
# Convert paths to Postman requests
paths = openapi_dict.get("paths", {})
for path, methods in paths.items():
for method, operation in methods.items():
if method.upper() in ["GET", "POST", "PUT", "DELETE", "PATCH"]:
item = {
"name": operation.get("summary", f"{method.upper()} {path}"),
"request": {
"method": method.upper(),
"header": [],
"url": {
"raw": "{{baseUrl}}" + path,
"host": ["{{baseUrl}}"],
"path": path.split("/")[1:]
if path.startswith("/")
else path.split("/"),
},
},
"response": [],
}
# Add request body if present
if "requestBody" in operation:
content = operation["requestBody"].get("content", {})
if "application/json" in content:
item["request"]["header"].append(
{
"key": "Content-Type",
"value": "application/json",
"type": "text",
}
)
# Add example body if available
schema = content["application/json"].get("schema", {})
if "example" in schema:
item["request"]["body"] = {
"mode": "raw",
"raw": str(schema["example"]),
}
collection["item"].append(item)
return collection
```
--------------------------------------------------------------------------------
/maverick_mcp/validation/screening.py:
--------------------------------------------------------------------------------
```python
"""
Validation models for stock screening tools.
This module provides Pydantic models for validating inputs
to all screening-related tools.
"""
from typing import Literal
from pydantic import Field, field_validator
from .base import (
BaseRequest,
PaginationMixin,
PositiveFloat,
PositiveInt,
StrictBaseModel,
)
class MaverickScreeningRequest(StrictBaseModel, PaginationMixin):
"""Validation for get_maverick_stocks tool."""
limit: PositiveInt = Field(
default=20, le=100, description="Maximum number of stocks to return"
)
model_config = {"json_schema_extra": {"examples": [{"limit": 20}, {"limit": 50}]}}
class SupplyDemandBreakoutRequest(StrictBaseModel, PaginationMixin):
"""Validation for get_supply_demand_breakouts tool."""
limit: PositiveInt = Field(
default=20, le=100, description="Maximum number of stocks to return"
)
filter_moving_averages: bool = Field(
default=False,
description="If True, only return stocks in demand expansion phase (above all moving averages)",
)
model_config = {
"json_schema_extra": {
"examples": [
{"limit": 20, "filter_moving_averages": False},
{"limit": 15, "filter_moving_averages": True},
]
}
}
class CustomScreeningRequest(BaseRequest, PaginationMixin):
"""Validation for get_screening_by_criteria tool."""
min_momentum_score: float | None = Field(
default=None,
ge=0.0,
le=100.0,
description="Minimum momentum score (0-100)",
)
min_volume: PositiveInt | None = Field(
default=None, description="Minimum average daily volume"
)
max_price: PositiveFloat | None = Field(
default=None, description="Maximum stock price"
)
sector: str | None = Field(
default=None,
max_length=100,
description="Specific sector to filter (e.g., 'Technology')",
)
limit: PositiveInt = Field(
default=20, le=100, description="Maximum number of results"
)
@field_validator("sector")
@classmethod
def normalize_sector(cls, v: str | None) -> str | None:
"""Normalize sector name."""
if v is not None:
# Title case for consistency
return v.strip().title()
return v
model_config = {
"json_schema_extra": {
"examples": [
{"min_momentum_score": 85.0, "min_volume": 1000000, "limit": 20},
{
"max_price": 50.0,
"sector": "Technology",
"min_momentum_score": 80.0,
"limit": 30,
},
]
}
}
class ScreeningType(StrictBaseModel):
"""Enum for screening types."""
screening_type: Literal[
"maverick_bullish", "maverick_bearish", "supply_demand_breakout", "all"
] = Field(default="all", description="Type of screening to retrieve")
```
--------------------------------------------------------------------------------
/tests/test_event_loop_integrity.py:
--------------------------------------------------------------------------------
```python
"""Tests ensuring temporary event loops are restored correctly."""
from __future__ import annotations
import asyncio
from typing import Any
import pytest
from maverick_mcp.api.server import health_resource, status_dashboard_resource
from maverick_mcp.backtesting.strategy_executor import (
ExecutionContext,
StrategyExecutor,
)
from maverick_mcp.utils.quick_cache import quick_cache
def _assert_loop_clean() -> None:
"""Assert that no closed event loop remains configured."""
policy = asyncio.get_event_loop_policy()
try:
loop = policy.get_event_loop()
except RuntimeError:
loop = None
if loop is not None:
assert not loop.is_closed()
asyncio.set_event_loop(None)
def test_health_resource_restores_event_loop(monkeypatch: pytest.MonkeyPatch) -> None:
"""Calling the health resource twice should not leave a closed loop."""
async def _stub_health() -> dict[str, Any]:
return {"status": "healthy"}
monkeypatch.setattr(
"maverick_mcp.api.routers.health_enhanced._get_detailed_health_status",
_stub_health,
)
result = health_resource()
assert result["status"] == "healthy"
second_result = health_resource()
assert second_result["status"] == "healthy"
_assert_loop_clean()
def test_status_dashboard_restores_event_loop(monkeypatch: pytest.MonkeyPatch) -> None:
"""The status dashboard resource should restore the previous loop."""
async def _stub_dashboard() -> dict[str, Any]:
return {"status": "ok"}
monkeypatch.setattr(
"maverick_mcp.monitoring.status_dashboard.get_dashboard_data",
_stub_dashboard,
)
result = status_dashboard_resource()
assert result["status"] == "ok"
again = status_dashboard_resource()
assert again["status"] == "ok"
_assert_loop_clean()
def test_quick_cache_sync_wrapper_restores_loop() -> None:
"""Synchronous quick_cache wrapper should not leave a closed loop behind."""
call_count = {"count": 0}
@quick_cache(ttl_seconds=60)
def _compute(value: int) -> int:
call_count["count"] += 1
return value * 2
assert _compute(2) == 4
assert _compute(2) == 4
assert call_count["count"] == 1
_assert_loop_clean()
def test_strategy_executor_sync_runner_restores_loop() -> None:
"""Running a backtest synchronously should restore the previous loop."""
executor = StrategyExecutor(max_concurrent_strategies=1)
class _DummyEngine:
async def run_backtest(self, **_: Any) -> dict[str, Any]:
return {"status": "ok"}
context = ExecutionContext(
strategy_id="test",
symbol="AAPL",
strategy_type="demo",
parameters={},
start_date="2024-01-01",
end_date="2024-01-02",
)
engine = _DummyEngine()
result = executor._run_backtest_sync(engine, context)
assert result["status"] == "ok"
_assert_loop_clean()
executor._thread_pool.shutdown(wait=True)
```
--------------------------------------------------------------------------------
/maverick_mcp/config/technical_constants.py:
--------------------------------------------------------------------------------
```python
"""
Technical Analysis Constants and Configuration
This module centralizes all technical analysis parameters and thresholds
to follow Open/Closed Principle and eliminate magic numbers.
"""
from dataclasses import dataclass
from typing import Final
@dataclass(frozen=True)
class TechnicalAnalysisConfig:
"""Configuration class for technical analysis parameters."""
# RSI Configuration
RSI_PERIOD: int = 14
RSI_OVERBOUGHT: float = 70.0
RSI_OVERSOLD: float = 30.0
# Moving Average Configuration
SMA_SHORT_PERIOD: int = 50
SMA_LONG_PERIOD: int = 200
EMA_PERIOD: int = 21
EMA_FAST_PERIOD: int = 12
EMA_SLOW_PERIOD: int = 26
# MACD Configuration
MACD_FAST_PERIOD: int = 12
MACD_SLOW_PERIOD: int = 26
MACD_SIGNAL_PERIOD: int = 9
# Bollinger Bands Configuration
BOLLINGER_PERIOD: int = 20
BOLLINGER_STD_DEV: float = 2.0
# Stochastic Oscillator Configuration
STOCH_K_PERIOD: int = 14
STOCH_D_PERIOD: int = 3
STOCH_OVERBOUGHT: float = 80.0
STOCH_OVERSOLD: float = 20.0
# Volume Analysis Configuration
HIGH_VOLUME_THRESHOLD: float = 1.5 # 1.5x average volume
LOW_VOLUME_THRESHOLD: float = 0.7 # 0.7x average volume
VOLUME_SMA_PERIOD: int = 20
# Chart Pattern Configuration
PATTERN_SIMILARITY_THRESHOLD: float = 0.05
PATTERN_MIN_SEPARATION: int = 5
# Support and Resistance Configuration
SUPPORT_RESISTANCE_LOOKBACK: int = 20
SUPPORT_RESISTANCE_TOLERANCE: float = 0.02 # 2% tolerance
# ATR Configuration
ATR_PERIOD: int = 14
# CCI Configuration
CCI_PERIOD: int = 20
CCI_OVERBOUGHT: float = 100.0
CCI_OVERSOLD: float = -100.0
# Williams %R Configuration
WILLIAMS_R_PERIOD: int = 14
WILLIAMS_R_OVERBOUGHT: float = -20.0
WILLIAMS_R_OVERSOLD: float = -80.0
# Global configuration instance
TECHNICAL_CONFIG: Final[TechnicalAnalysisConfig] = TechnicalAnalysisConfig()
# Screening Strategy Configuration
@dataclass(frozen=True)
class ScreeningConfig:
"""Configuration for stock screening strategies."""
# Maverick Bullish Strategy
MIN_VOLUME: int = 1_000_000
MIN_PRICE: float = 5.0
MAX_PRICE: float = 500.0
MIN_MARKET_CAP: float = 100_000_000 # $100M
# RSI Requirements
RSI_MIN_BULLISH: float = 30.0
RSI_MAX_BULLISH: float = 70.0
# Volume Requirements
VOLUME_SPIKE_THRESHOLD: float = 1.5 # 1.5x average volume
# Moving Average Requirements
MA_CROSSOVER_PERIOD: int = 5 # Days to check for crossover
# Bear Strategy Thresholds
RSI_MAX_BEARISH: float = 30.0
PRICE_DECLINE_THRESHOLD: float = -0.10 # 10% decline
# Trending Breakout Strategy
BREAKOUT_VOLUME_MULTIPLIER: float = 2.0
BREAKOUT_PRICE_THRESHOLD: float = 0.05 # 5% price increase
# General Filtering
EXCLUDE_PENNY_STOCKS: bool = True
EXCLUDE_ETFS: bool = False
MAX_RESULTS_PER_STRATEGY: int = 50
# Global screening configuration instance
SCREENING_CONFIG: Final[ScreeningConfig] = ScreeningConfig()
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/security_report.md:
--------------------------------------------------------------------------------
```markdown
---
name: Security Vulnerability Report
about: Report a security issue (use this template for public issues only)
title: '[SECURITY] '
labels: ['security', 'needs-triage']
assignees: ''
---
# ⚠️ Security Vulnerability Report
## ⚠️ IMPORTANT NOTICE
**For serious security vulnerabilities, please DO NOT create a public issue.**
Instead, report them privately via:
- **GitHub Security Advisories**: [Security Tab](https://github.com/wshobson/maverick-mcp/security) (Recommended)
## Public Security Issues Only
**Use this template only for:**
- [ ] Minor security improvements
- [ ] Documentation security issues
- [ ] Public security discussions
- [ ] Low-impact security suggestions
## Issue Description
**Security concern:**
Describe the security issue or improvement suggestion.
**Impact level:**
- [ ] Critical - Immediate attention required
- [ ] High - Important security flaw
- [ ] Medium - Security improvement needed
- [ ] Low - Minor security suggestion
## Category
**Type of security issue:**
- [ ] Authentication/Authorization
- [ ] Input validation
- [ ] Data exposure
- [ ] Configuration issue
- [ ] Dependency vulnerability
- [ ] Code injection
- [ ] Cross-site scripting (XSS)
- [ ] SQL injection
- [ ] Path traversal
- [ ] Information disclosure
- [ ] Denial of service
- [ ] Cryptographic issue
- [ ] Other: ___
## Affected Components
**Which parts of the system are affected?**
- [ ] MCP server
- [ ] Authentication system
- [ ] Database layer
- [ ] API endpoints
- [ ] Configuration files
- [ ] Dependencies
- [ ] Documentation
- [ ] Other: ___
## Environment
**System information:**
- MaverickMCP version: [e.g., 0.1.0]
- Python version: [e.g., 3.12.0]
- Operating system: [e.g., Ubuntu 22.04]
- Database: [PostgreSQL/SQLite version]
## Reproduction Steps (if applicable)
**For demonstrable issues only (no sensitive details):**
1. Step 1
2. Step 2
3. Step 3
## Expected Security Behavior
**What should happen from a security perspective?**
## Actual Behavior
**What actually happens?**
## Suggested Solution
**How do you think this should be fixed?**
## References
**Related security standards or best practices:**
- [ ] OWASP Top 10
- [ ] CWE (Common Weakness Enumeration)
- [ ] NIST guidelines
- [ ] Industry standards
- [ ] Other: ___
**Links to documentation or examples:**
- [Link 1]
- [Link 2]
## Additional Context
**Additional information:**
Add any other context about the security concern.
**Risk assessment:**
- [ ] Could lead to data breach
- [ ] Could allow unauthorized access
- [ ] Could cause service disruption
- [ ] Could expose sensitive information
- [ ] Low impact improvement
- [ ] Other: ___
## Disclosure
**For public issues:**
- [ ] I confirm this is not a serious vulnerability
- [ ] I understand serious issues should be reported privately
- [ ] This is a general security improvement suggestion
- [ ] This is a documentation or process improvement
---
**Remember:** For any serious security vulnerabilities, please report privately through GitHub Security Advisories.
```
--------------------------------------------------------------------------------
/maverick_mcp/validation/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive validation models for Maverick-MCP API.
This module provides Pydantic models for validating all tool inputs,
API requests, and responses, ensuring data integrity and providing
clear error messages with standardized response formats.
"""
# Auth validation removed for personal use
from .base import (
DateRangeMixin,
DateString,
DateValidator,
PaginationMixin,
Percentage,
PositiveFloat,
PositiveInt,
StrictBaseModel,
TickerSymbol,
TickerValidator,
)
# Billing validation removed for personal use
from .data import (
ClearCacheRequest,
FetchStockDataRequest,
GetChartLinksRequest,
GetNewsRequest,
GetStockInfoRequest,
StockDataBatchRequest,
)
# Error imports removed - use maverick_mcp.exceptions instead
from .middleware import (
RateLimitMiddleware,
SecurityMiddleware,
ValidationMiddleware,
)
from .portfolio import (
CorrelationAnalysisRequest,
PortfolioComparisonRequest,
RiskAnalysisRequest,
)
from .responses import (
BaseResponse,
BatchOperationResult,
BatchResponse,
DataResponse,
ErrorDetail,
ErrorResponse,
HealthResponse,
HealthStatus,
ListResponse,
RateLimitInfo,
RateLimitResponse,
ValidationErrorResponse,
WebhookEvent,
WebhookResponse,
error_response,
success_response,
validation_error_response,
)
from .screening import (
CustomScreeningRequest,
MaverickScreeningRequest,
SupplyDemandBreakoutRequest,
)
from .technical import (
MACDAnalysisRequest,
RSIAnalysisRequest,
StockChartRequest,
SupportResistanceRequest,
TechnicalAnalysisRequest,
)
# Webhook validation removed for personal use
__all__ = [
# Base validation
"DateRangeMixin",
"DateString",
"DateValidator",
"PaginationMixin",
"Percentage",
"PositiveFloat",
"PositiveInt",
"StrictBaseModel",
"TickerSymbol",
"TickerValidator",
# Data validation
"FetchStockDataRequest",
"StockDataBatchRequest",
"GetStockInfoRequest",
"GetNewsRequest",
"GetChartLinksRequest",
"ClearCacheRequest",
# Middleware
"RateLimitMiddleware",
"SecurityMiddleware",
"ValidationMiddleware",
# Portfolio validation
"RiskAnalysisRequest",
"PortfolioComparisonRequest",
"CorrelationAnalysisRequest",
# Response models
"BaseResponse",
"BatchOperationResult",
"BatchResponse",
"DataResponse",
"ErrorDetail",
"ErrorResponse",
"HealthResponse",
"HealthStatus",
"ListResponse",
"RateLimitInfo",
"RateLimitResponse",
"ValidationErrorResponse",
"WebhookEvent",
"WebhookResponse",
"error_response",
"success_response",
"validation_error_response",
# Screening validation
"MaverickScreeningRequest",
"SupplyDemandBreakoutRequest",
"CustomScreeningRequest",
# Technical validation
"RSIAnalysisRequest",
"MACDAnalysisRequest",
"SupportResistanceRequest",
"TechnicalAnalysisRequest",
"StockChartRequest",
]
```
--------------------------------------------------------------------------------
/tests/test_market_data_async.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Async test to verify market data provider non-blocking functionality.
"""
import asyncio
import time
import pytest
from maverick_mcp.providers.market_data import MarketDataProvider
@pytest.mark.asyncio
@pytest.mark.integration
@pytest.mark.external
async def test_market_data_async():
"""Test market data provider async functions."""
provider = MarketDataProvider()
print("Testing Market Data Provider (Async/Non-blocking)")
print("=" * 50)
# Test market overview with concurrent execution
print("\nTesting concurrent market overview fetch...")
start_time = time.time()
overview = await provider.get_market_overview_async()
elapsed = time.time() - start_time
print(f"✅ Fetched complete market overview in {elapsed:.2f} seconds")
# Show results
print(f"\nMarket Summary: {len(overview['market_summary'])} indices")
print(f"Top Gainers: {len(overview['top_gainers'])} stocks")
print(f"Top Losers: {len(overview['top_losers'])} stocks")
print(f"Sectors: {len(overview['sector_performance'])} sectors")
# Test individual async methods concurrently
print("\n\nTesting individual methods concurrently...")
start_time = time.time()
# Create multiple tasks
tasks = [
provider.get_market_summary_async(),
provider.get_top_gainers_async(10),
provider.get_top_losers_async(10),
provider.get_most_active_async(10),
provider.get_sector_performance_async(),
]
# Run all tasks concurrently
results = await asyncio.gather(*tasks)
elapsed = time.time() - start_time
print(f"✅ Fetched all data concurrently in {elapsed:.2f} seconds")
summary, gainers, losers, active, sectors = results
# Display sample results
print("\nResults:")
print(f" - Market indices: {len(summary)}")
print(f" - Top gainers: {len(gainers)}")
print(f" - Top losers: {len(losers)}")
print(f" - Most active: {len(active)}")
print(f" - Sectors: {len(sectors)}")
if gainers and isinstance(gainers, list) and len(gainers) > 0:
print("\nTop 3 Gainers:")
for stock in gainers[:3]:
if isinstance(stock, dict):
print(
f" {stock['symbol']}: ${stock['price']} (+{stock['change_percent']}%)"
)
print("\n✅ All async tests completed!")
@pytest.mark.asyncio
@pytest.mark.integration
@pytest.mark.external
async def test_with_timeout():
"""Test with timeout to demonstrate non-blocking behavior."""
provider = MarketDataProvider()
print("\nTesting with timeout (5 seconds)...")
try:
# Run with a timeout
await asyncio.wait_for(provider.get_market_overview_async(), timeout=5.0)
print("✅ Data fetched within timeout")
except TimeoutError:
print("❌ Operation timed out (data source may be slow)")
async def main():
"""Run all async tests."""
await test_market_data_async()
await test_with_timeout()
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())
```
--------------------------------------------------------------------------------
/maverick_mcp/api/inspector_sse.py:
--------------------------------------------------------------------------------
```python
"""
MCP Inspector-compatible SSE handler.
This module implements an SSE handler that's compatible with MCP Inspector's
expectations, where JSON-RPC messages are exchanged directly over the SSE
connection rather than via a separate POST endpoint.
"""
import json
import logging
from uuid import uuid4
from starlette.requests import Request
from starlette.responses import StreamingResponse
from maverick_mcp.api.server import mcp
logger = logging.getLogger(__name__)
class InspectorSSEHandler:
"""SSE handler compatible with MCP Inspector."""
def __init__(self, mcp_instance):
self.mcp = mcp_instance
self.sessions = {}
async def handle_sse(self, request: Request):
"""Handle SSE connection from MCP Inspector."""
session_id = str(uuid4())
logger.info(f"New SSE connection: {session_id}")
async def event_generator():
"""Generate SSE events."""
# Send initial connection event
yield f"data: {json.dumps({'type': 'connection', 'sessionId': session_id})}\n\n"
# Keep connection alive
while True:
# In a real implementation, we'd process incoming messages here
# For now, just keep the connection alive
import asyncio
await asyncio.sleep(30)
yield ": keepalive\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
async def handle_message(self, request: Request):
"""Handle JSON-RPC message from client."""
# Get session ID from query params or headers
session_id = request.query_params.get("session_id")
if not session_id:
return {"error": "Missing session_id"}
# Get JSON-RPC message
try:
message = await request.json()
except Exception as e:
logger.error(f"Failed to parse JSON: {e}")
return {"error": "Invalid JSON"}
logger.info(f"Received message for session {session_id}: {message}")
# Process the message through MCP
# This is where we'd integrate with the actual MCP server
# For now, return a mock response
if message.get("method") == "initialize":
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {"listChanged": True},
"resources": {"listChanged": False},
"prompts": {"listChanged": False},
},
"serverInfo": {"name": "MaverickMCP", "version": "1.0.0"},
},
}
return {"jsonrpc": "2.0", "id": message.get("id"), "result": {}}
# Create global handler instance
inspector_handler = InspectorSSEHandler(mcp)
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/question.md:
--------------------------------------------------------------------------------
```markdown
---
name: Question
about: Ask a question about using or contributing to MaverickMCP
title: '[QUESTION] '
labels: ['question', 'documentation']
assignees: ''
---
## Question Summary
**What would you like to know?**
Ask your question clearly and concisely.
## Context
**What are you trying to accomplish?**
Describe what you're working on or what you want to achieve.
**Your experience level:**
- [ ] New to MaverickMCP
- [ ] Familiar with MCP but new to MaverickMCP
- [ ] Experienced with MaverickMCP
- [ ] Financial analysis background
- [ ] Software development background
## Environment (if applicable)
**System Information:**
- OS: [e.g., macOS, Linux, Windows]
- Python version: [if relevant]
- Installation method: [uv, pip, docker]
## What You've Tried
**Research done:**
- [ ] Checked documentation
- [ ] Searched existing issues
- [ ] Looked at code examples
- [ ] Tried the Claude Desktop setup guide
**Attempted solutions:**
Describe what you've already tried.
## Specific Questions
**Please be specific about what you need help with:**
1. [Your first question]
2. [Your second question, if any]
3. [Additional questions]
## Question Category
**What type of question is this?**
- [ ] Installation and setup
- [ ] Configuration and API keys
- [ ] Using specific MCP tools
- [ ] Financial analysis methodology
- [ ] Technical indicators and calculations
- [ ] Stock screening strategies
- [ ] Portfolio analysis
- [ ] Performance optimization
- [ ] Integration with Claude Desktop
- [ ] Contributing to the project
- [ ] Architecture and design
- [ ] Error troubleshooting
- [ ] Other: ___
## Code Example (if applicable)
**If your question involves code, please provide a minimal example:**
```python
# Your code here
```
**Expected behavior:**
What did you expect to happen?
**Actual behavior:**
What actually happened?
## Financial Context (if applicable)
**Market/Asset class:**
- [ ] US Stocks (NYSE, NASDAQ)
- [ ] International stocks
- [ ] Crypto
- [ ] Forex
- [ ] Commodities
- [ ] Other: ___
**Analysis type:**
- [ ] Technical analysis
- [ ] Fundamental analysis
- [ ] Portfolio optimization
- [ ] Risk analysis
- [ ] Backtesting
- [ ] Real-time monitoring
- [ ] Other: ___
## Documentation Improvement
**Could this be better documented?**
- [ ] Yes, this should be added to documentation
- [ ] Yes, existing docs need clarification
- [ ] No, it's clearly documented but I missed it
- [ ] Not sure
**Where would you expect to find this information?**
- [ ] README
- [ ] CLAUDE.md project guide
- [ ] API documentation
- [ ] Examples directory
- [ ] Contributing guide
- [ ] Other: ___
## Additional Context
**Anything else that might be helpful:**
Add any other context, screenshots, or information that might help answer your question.
## Urgency
**How urgent is this question?**
- [ ] Urgent - blocking my work
- [ ] High - important for current task
- [ ] Medium - helpful to know
- [ ] Low - general curiosity
## Follow-up
**Would you be willing to:**
- [ ] Help improve documentation based on the answer
- [ ] Submit a PR with examples
- [ ] Help other users with similar questions
- [ ] Test proposed solutions
```
--------------------------------------------------------------------------------
/tools/templates/new_tool_template.py:
--------------------------------------------------------------------------------
```python
"""
Template for creating new MCP tools.
Copy this file and modify it to create new tools quickly.
"""
from typing import Any
from maverick_mcp.api.server import mcp
from maverick_mcp.utils.logging import get_logger
logger = get_logger(__name__)
@mcp.tool()
async def tool_name(
param1: str,
param2: int = 10,
param3: bool = True,
) -> dict[str, Any]:
"""
Brief description of what this tool does.
This tool performs [specific action] and returns [expected output].
Args:
param1: Description of first parameter
param2: Description of second parameter (default: 10)
param3: Description of third parameter (default: True)
Returns:
dict containing:
- result: The main result of the operation
- status: Success/failure status
- details: Additional details about the operation
Raises:
ValueError: If parameters are invalid
Exception: For other errors
"""
# Log tool execution
logger.info(
"Executing tool_name",
extra={
"param1": param1,
"param2": param2,
"param3": param3,
},
)
try:
# Validate inputs
if not param1:
raise ValueError("param1 cannot be empty")
if param2 < 0:
raise ValueError("param2 must be non-negative")
# Main tool logic here
# Example: Fetch data, process it, return results
# For tools that need database access:
# from maverick_mcp.data.models import get_db
# db = next(get_db())
# try:
# # Database operations
# finally:
# db.close()
# For tools that need async operations:
# import asyncio
# results = await asyncio.gather(
# async_operation1(),
# async_operation2(),
# )
# Prepare response
result = {
"result": f"Processed {param1} with settings {param2}, {param3}",
"status": "success",
"details": {
"processed_at": "2024-01-01T00:00:00Z",
"item_count": 42,
},
}
logger.info(
"Tool completed successfully",
extra={"tool": "tool_name", "result_keys": list(result.keys())},
)
return result
except ValueError as e:
logger.error(f"Validation error in tool_name: {e}")
return {
"status": "error",
"error": str(e),
"error_type": "validation",
}
except Exception as e:
logger.error(
f"Unexpected error in tool_name: {e}",
exc_info=True,
)
return {
"status": "error",
"error": str(e),
"error_type": "unexpected",
}
# Example of a tool that doesn't require authentication
@mcp.tool()
async def public_tool_name(query: str) -> dict[str, Any]:
"""
A public tool that doesn't require authentication.
Args:
query: The query to process
Returns:
dict with query results
"""
return {
"query": query,
"results": ["result1", "result2"],
"count": 2,
}
```
--------------------------------------------------------------------------------
/maverick_mcp/data/django_adapter.py:
--------------------------------------------------------------------------------
```python
"""
Django database adapter for Maverick-MCP.
This module provides integration between Maverick-MCP and an existing
Django database, allowing MCP to read Django-owned
tables while maintaining separation of concerns.
"""
import logging
from typing import Any
from sqlalchemy import BigInteger, Boolean, Column, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
# Create a separate Base for Django table mappings
DjangoBase: Any = declarative_base()
class DjangoUser(DjangoBase):
"""Read-only mapping to Django's users_customuser table."""
__tablename__ = "users_customuser"
__table_args__ = {"extend_existing": True}
id = Column(BigInteger, primary_key=True)
username = Column(String(150), nullable=False)
email = Column(String(254), nullable=False)
first_name = Column(String(150))
last_name = Column(String(150))
is_active = Column(Boolean, default=True)
is_staff = Column(Boolean, default=False)
def __repr__(self):
return (
f"<DjangoUser(id={self.id}, username={self.username}, email={self.email})>"
)
class DjangoStock(DjangoBase):
"""Read-only mapping to Django's stocks_stock table."""
__tablename__ = "stocks_stock"
__table_args__ = {"extend_existing": True}
id = Column(BigInteger, primary_key=True)
symbol = Column(String(20), nullable=False, unique=True)
name = Column(String(255))
sector = Column(String(100))
industry = Column(String(100))
market_cap = Column(BigInteger)
def __repr__(self):
return f"<DjangoStock(symbol={self.symbol}, name={self.name})>"
class DjangoAdapter:
"""
Adapter for accessing Django-owned database tables.
This adapter provides read-only access to Django tables,
ensuring MCP doesn't modify Django-managed data.
"""
def __init__(self, session: Session):
self.session = session
def get_user_by_email(self, email: str) -> DjangoUser | None:
"""Get Django user by email address."""
return self.session.query(DjangoUser).filter(DjangoUser.email == email).first()
def get_user_by_id(self, user_id: int) -> DjangoUser | None:
"""Get Django user by ID."""
return self.session.query(DjangoUser).filter(DjangoUser.id == user_id).first()
def get_stock_by_symbol(self, symbol: str) -> DjangoStock | None:
"""Get stock by symbol from Django table."""
return (
self.session.query(DjangoStock)
.filter(DjangoStock.symbol == symbol.upper())
.first()
)
def link_mcp_user_to_django(self, email: str) -> dict | None:
"""
Link MCP API key to Django user via email.
Returns user info with placeholder subscription metadata.
"""
# Find Django user
django_user = self.get_user_by_email(email)
if not django_user:
return None
return {
"user_id": django_user.id,
"username": django_user.username,
"email": django_user.email,
"is_active": django_user.is_active,
"has_subscription": False,
"subscription_status": None,
"external_customer_id": None,
}
```
--------------------------------------------------------------------------------
/tests/integration/run_integration_tests.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
"""
Integration Test Runner for MaverickMCP Orchestration Tools
This script runs the comprehensive integration test suite with proper environment setup
and provides clear output for validation of all orchestration capabilities.
"""
set -e # Exit on any error
echo "🚀 MaverickMCP Orchestration Integration Test Runner"
echo "=================================================="
# Check if we're in the right directory
if [[ ! -f "test_orchestration_complete.py" ]]; then
echo "❌ Error: Must run from tests/integration directory"
exit 1
fi
# Navigate to project root for proper imports
cd "$(dirname "$0")/../.."
# Check Python environment
echo "🔍 Checking Python environment..."
if command -v uv >/dev/null 2>&1; then
echo "✅ Using uv for Python environment"
PYTHON_CMD="uv run python"
elif [[ -f ".venv/bin/activate" ]]; then
echo "✅ Using virtual environment"
source .venv/bin/activate
PYTHON_CMD="python"
else
echo "⚠️ No virtual environment detected, using system Python"
PYTHON_CMD="python"
fi
# Check required dependencies
echo "🔍 Checking dependencies..."
$PYTHON_CMD -c "import maverick_mcp; print('✅ maverick_mcp package found')" || {
echo "❌ maverick_mcp package not installed. Run 'make setup' first."
exit 1
}
# Check if MCP server dependencies are available
$PYTHON_CMD -c "from maverick_mcp.api.routers.agents import orchestrated_analysis; print('✅ Orchestration tools available')" || {
echo "❌ Orchestration tools not available. Check agent dependencies."
exit 1
}
# Set up test environment
echo "🛠️ Setting up test environment..."
# Check for API keys (optional)
if [[ -z "$OPENAI_API_KEY" ]]; then
echo "⚠️ OPENAI_API_KEY not set - tests will use mock responses"
else
echo "✅ OPENAI_API_KEY found"
fi
if [[ -z "$EXA_API_KEY" ]]; then
echo "⚠️ EXA_API_KEY not set - deep research may have limited functionality"
else
echo "✅ EXA_API_KEY found"
fi
# Create logs directory if it doesn't exist
mkdir -p logs
echo ""
echo "🧪 Starting comprehensive integration tests..."
echo " This will test all orchestration capabilities including:"
echo " - agents_orchestrated_analysis with multiple personas/routing"
echo " - agents_deep_research_financial with various depths/focus areas"
echo " - agents_compare_multi_agent_analysis with different combinations"
echo " - Error handling and edge cases"
echo " - Concurrent execution performance"
echo " - Memory usage monitoring"
echo ""
# Run the comprehensive test suite
$PYTHON_CMD tests/integration/test_orchestration_complete.py
# Capture exit code
TEST_EXIT_CODE=$?
echo ""
echo "=================================================="
if [[ $TEST_EXIT_CODE -eq 0 ]]; then
echo "🎉 ALL INTEGRATION TESTS PASSED!"
echo " The orchestration tools are working correctly and ready for production use."
elif [[ $TEST_EXIT_CODE -eq 1 ]]; then
echo "⚠️ SOME TESTS FAILED"
echo " Check the test output above and log files for details."
elif [[ $TEST_EXIT_CODE -eq 130 ]]; then
echo "🛑 TESTS INTERRUPTED BY USER"
else
echo "💥 TEST SUITE EXECUTION FAILED"
echo " Check the error output and ensure all dependencies are properly installed."
fi
echo ""
echo "📊 Test artifacts:"
echo " - Detailed logs: integration_test_*.log"
echo " - JSON results: integration_test_results_*.json"
echo ""
exit $TEST_EXIT_CODE
```
--------------------------------------------------------------------------------
/maverick_mcp/api/api_server.py:
--------------------------------------------------------------------------------
```python
"""
Simplified FastAPI HTTP API Server for MaverickMCP Personal Use.
This module provides a minimal FastAPI server for testing compatibility.
Most functionality has been moved to the main MCP server for personal use.
"""
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from maverick_mcp.api.middleware.error_handling import (
ErrorHandlingMiddleware,
RequestTracingMiddleware,
)
from maverick_mcp.api.middleware.security import SecurityHeadersMiddleware
from maverick_mcp.api.routers.health import router as health_router
from maverick_mcp.config.settings import settings
from maverick_mcp.utils.logging import get_logger
logger = get_logger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle."""
logger.info("Starting simplified MaverickMCP API server")
# Initialize monitoring systems
try:
from maverick_mcp.utils.monitoring import initialize_monitoring
logger.info("Initializing monitoring systems...")
initialize_monitoring()
logger.info("Monitoring systems initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize monitoring systems: {e}")
# Initialize performance systems
try:
from maverick_mcp.data.performance import initialize_performance_systems
logger.info("Initializing performance optimization systems...")
performance_status = await initialize_performance_systems()
logger.info(f"Performance systems initialized: {performance_status}")
except Exception as e:
logger.error(f"Failed to initialize performance systems: {e}")
yield
# Cleanup performance systems
try:
from maverick_mcp.data.performance import cleanup_performance_systems
logger.info("Cleaning up performance systems...")
await cleanup_performance_systems()
logger.info("Performance systems cleaned up")
except Exception as e:
logger.error(f"Error cleaning up performance systems: {e}")
logger.info("Shutting down simplified MaverickMCP API server")
def create_api_app() -> FastAPI:
"""Create and configure a minimal FastAPI application for testing."""
# Create FastAPI app
app = FastAPI(
title=f"{settings.app_name} API (Personal Use)",
description="Simplified HTTP API endpoints for MaverickMCP personal use",
version="1.0.0",
lifespan=lifespan,
docs_url="/api/docs" if settings.api.debug else None,
redoc_url="/api/redoc" if settings.api.debug else None,
openapi_url="/api/openapi.json" if settings.api.debug else None,
)
# Add minimal middleware
app.add_middleware(ErrorHandlingMiddleware)
app.add_middleware(RequestTracingMiddleware)
app.add_middleware(SecurityHeadersMiddleware)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.api.cors_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
# Add only essential routers (health check)
app.include_router(health_router, prefix="/api")
logger.info("Simplified MaverickMCP API server configured for personal use")
return app
# Create the app instance
api_app = create_api_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"maverick_mcp.api.api_server:api_app",
host="127.0.0.1",
port=8001,
reload=True,
)
```
--------------------------------------------------------------------------------
/maverick_mcp/tests/test_market_calendar_caching.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Test market calendar integration with stock data caching.
"""
import logging
from maverick_mcp.providers.stock_data import StockDataProvider
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
def test_weekend_handling():
"""Test that weekends are handled properly."""
provider = StockDataProvider()
# Test 1: Request data ending on a Sunday (today)
print("\nTest 1: Request data ending on Sunday (should adjust to Friday)")
symbol = "AAPL"
end_date = "2025-05-25" # Sunday
start_date = "2025-05-19" # Monday
print(f"Requesting {symbol} from {start_date} to {end_date}")
df = provider.get_stock_data(symbol, start_date, end_date)
print(f"Received {len(df)} rows")
if not df.empty:
print(f"Data range: {df.index.min()} to {df.index.max()}")
# Test 2: Request data for a holiday weekend
print("\n\nTest 2: Request data including Memorial Day weekend 2024")
end_date = "2024-05-27" # Memorial Day
start_date = "2024-05-24" # Friday before
print(f"Requesting {symbol} from {start_date} to {end_date}")
df = provider.get_stock_data(symbol, start_date, end_date)
print(f"Received {len(df)} rows")
if not df.empty:
print(f"Data range: {df.index.min()} to {df.index.max()}")
# Test 3: Verify no unnecessary yfinance calls for non-trading days
print("\n\nTest 3: Second request for same data (should use cache)")
df2 = provider.get_stock_data(symbol, start_date, end_date)
print(f"Received {len(df2)} rows from cache")
def test_trading_day_detection():
"""Test trading day detection methods."""
provider = StockDataProvider()
print("\n\nTesting trading day detection:")
# Test specific dates
test_dates = [
("2024-05-24", "Friday - should be trading day"),
("2024-05-25", "Saturday - should NOT be trading day"),
("2024-05-26", "Sunday - should NOT be trading day"),
("2024-05-27", "Memorial Day - should NOT be trading day"),
("2024-12-25", "Christmas - should NOT be trading day"),
("2024-07-04", "Independence Day - should NOT be trading day"),
]
for date_str, description in test_dates:
is_trading = provider._is_trading_day(date_str) # type: ignore[attr-defined]
print(
f"{date_str} ({description}): {'Trading' if is_trading else 'Non-trading'}"
)
# Test getting trading days in a range
print("\n\nTrading days in May 2024:")
trading_days = provider._get_trading_days("2024-05-20", "2024-05-31") # type: ignore[attr-defined]
for day in trading_days:
print(f" {day.strftime('%Y-%m-%d %A')}")
def test_year_boundary():
"""Test caching across year boundaries."""
provider = StockDataProvider()
print("\n\nTest 4: Year boundary request")
symbol = "MSFT"
start_date = "2023-12-28"
end_date = "2024-01-03"
print(f"Requesting {symbol} from {start_date} to {end_date}")
df = provider.get_stock_data(symbol, start_date, end_date)
print(f"Received {len(df)} rows")
if not df.empty:
print("Trading days found:")
for date in df.index:
print(f" {date.strftime('%Y-%m-%d %A')}")
if __name__ == "__main__":
print("=" * 60)
print("Testing Market Calendar Integration")
print("=" * 60)
test_weekend_handling()
test_trading_day_detection()
test_year_boundary()
print("\n" + "=" * 60)
print("All tests completed!")
print("=" * 60)
```
--------------------------------------------------------------------------------
/tests/test_orchestration_tools_simple.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Simple test script to verify the orchestration tools are working correctly.
"""
import asyncio
import os
import sys
# Add the project root to the Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from maverick_mcp.api.routers.agents import (
deep_research_financial,
list_available_agents,
orchestrated_analysis,
)
async def test_list_available_agents():
"""Test the list_available_agents function."""
print("🧪 Testing list_available_agents...")
try:
result = list_available_agents()
print(f"✅ Success: {result['status']}")
print(f"📊 Available agents: {list(result['agents'].keys())}")
print(f"🎭 Available personas: {result['personas']}")
return True
except Exception as e:
print(f"❌ Error: {e}")
return False
async def test_orchestrated_analysis():
"""Test the orchestrated_analysis function with a simple query."""
print("\n🧪 Testing orchestrated_analysis...")
try:
result = await orchestrated_analysis(
query="What's the technical outlook for Apple stock?",
persona="moderate",
routing_strategy="rule_based", # Use rule_based to avoid LLM calls
max_agents=2,
)
print(f"✅ Success: {result['status']}")
if result["status"] == "success":
print(f"📈 Agent Type: {result.get('agent_type', 'unknown')}")
print(f"🎭 Persona: {result.get('persona', 'unknown')}")
print(f"⏱️ Execution Time: {result.get('execution_time_ms', 0):.2f}ms")
return result["status"] == "success"
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
return False
async def test_deep_research_financial():
"""Test the deep_research_financial function."""
print("\n🧪 Testing deep_research_financial...")
try:
result = await deep_research_financial(
research_topic="Apple Inc",
persona="moderate",
research_depth="basic", # Use basic depth to minimize processing
timeframe="7d",
)
print(f"✅ Success: {result['status']}")
if result["status"] == "success":
print(f"🔍 Agent Type: {result.get('agent_type', 'unknown')}")
print(f"📚 Research Topic: {result.get('research_topic', 'unknown')}")
return result["status"] == "success"
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""Run all tests."""
print("🚀 Testing Orchestration Tools\n" + "=" * 50)
# Test 1: List available agents
test1_passed = await test_list_available_agents()
# Test 2: Orchestrated analysis
test2_passed = await test_orchestrated_analysis()
# Test 3: Deep research
test3_passed = await test_deep_research_financial()
# Summary
print("\n" + "=" * 50)
print("📊 Test Results Summary:")
print(f" List Available Agents: {'✅' if test1_passed else '❌'}")
print(f" Orchestrated Analysis: {'✅' if test2_passed else '❌'}")
print(f" Deep Research: {'✅' if test3_passed else '❌'}")
total_passed = sum([test1_passed, test2_passed, test3_passed])
print(f"\n🎯 Total: {total_passed}/3 tests passed")
if total_passed == 3:
print("🎉 All orchestration tools are working correctly!")
return True
else:
print("⚠️ Some tests failed - check the errors above")
return False
if __name__ == "__main__":
success = asyncio.run(main())
sys.exit(0 if success else 1)
```
--------------------------------------------------------------------------------
/alembic.ini:
--------------------------------------------------------------------------------
```
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to maverick_mcp/data/migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep.
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# Set via environment variable or env.py
# sqlalchemy.url = postgresql://user:pass@localhost/local_production_snapshot
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
```
--------------------------------------------------------------------------------
/maverick_mcp/validation/portfolio.py:
--------------------------------------------------------------------------------
```python
"""
Validation models for portfolio analysis tools.
This module provides Pydantic models for validating inputs
to all portfolio-related tools.
"""
from pydantic import Field, field_validator
from .base import (
Percentage,
PositiveInt,
StrictBaseModel,
TickerSymbol,
TickerValidator,
)
class RiskAnalysisRequest(StrictBaseModel):
"""Validation for risk_adjusted_analysis tool."""
ticker: TickerSymbol = Field(..., description="Stock ticker symbol")
risk_level: Percentage = Field(
default=50.0,
description="Risk tolerance from 0 (conservative) to 100 (aggressive)",
)
@field_validator("ticker")
@classmethod
def normalize_ticker(cls, v: str) -> str:
"""Normalize ticker to uppercase."""
return TickerValidator.validate_ticker(v)
model_config = {
"json_schema_extra": {
"examples": [
{"ticker": "AAPL", "risk_level": 50.0},
{"ticker": "TSLA", "risk_level": 75.0},
{"ticker": "JNJ", "risk_level": 25.0},
]
}
}
class PortfolioComparisonRequest(StrictBaseModel):
"""Validation for compare_tickers tool."""
tickers: list[TickerSymbol] = Field(
...,
min_length=2,
max_length=20,
description="List of ticker symbols to compare (2-20 tickers)",
)
days: PositiveInt = Field(
default=90,
le=1825, # Max 5 years
description="Number of days of historical data for comparison",
)
@field_validator("tickers")
@classmethod
def validate_tickers(cls, v: list[str]) -> list[str]:
"""Validate and normalize ticker list."""
tickers = TickerValidator.validate_ticker_list(v)
if len(tickers) < 2:
raise ValueError("At least 2 unique tickers are required for comparison")
return tickers
model_config = {
"json_schema_extra": {
"examples": [
{"tickers": ["AAPL", "MSFT", "GOOGL"], "days": 90},
{"tickers": ["SPY", "QQQ", "IWM", "DIA"], "days": 180},
]
}
}
class CorrelationAnalysisRequest(StrictBaseModel):
"""Validation for portfolio_correlation_analysis tool."""
tickers: list[TickerSymbol] = Field(
...,
min_length=2,
max_length=30,
description="List of ticker symbols for correlation analysis",
)
days: PositiveInt = Field(
default=252, # 1 trading year
ge=30, # Need at least 30 days for meaningful correlation
le=2520, # Max 10 years
description="Number of days for correlation calculation",
)
@field_validator("tickers")
@classmethod
def validate_tickers(cls, v: list[str]) -> list[str]:
"""Validate and normalize ticker list."""
tickers = TickerValidator.validate_ticker_list(v)
if len(tickers) < 2:
raise ValueError(
"At least 2 unique tickers are required for correlation analysis"
)
return tickers
@field_validator("days")
@classmethod
def validate_days_for_correlation(cls, v: int) -> int:
"""Ensure enough days for meaningful correlation."""
if v < 30:
raise ValueError(
"At least 30 days of data required for meaningful correlation analysis"
)
return v
model_config = {
"json_schema_extra": {
"examples": [
{"tickers": ["AAPL", "MSFT", "GOOGL", "AMZN"], "days": 252},
{
"tickers": ["SPY", "TLT", "GLD", "DBC", "VNQ"],
"days": 504, # 2 years
},
]
}
}
```
--------------------------------------------------------------------------------
/tools/experiments/validation_fixed.py:
--------------------------------------------------------------------------------
```python
"""
Fixed validation examples that work with the current codebase.
"""
import os
import random
import subprocess
import time
import pandas as pd
from maverick_mcp.utils.agent_errors import agent_friendly_errors
from maverick_mcp.utils.parallel_screening import ParallelScreener
from maverick_mcp.utils.quick_cache import get_cache_stats, quick_cache
print("🎯 Maverick-MCP Validation - Fixed Version")
print("=" * 60)
# Validation 1: Using the agent error handler
print("\n🔐 1. Testing Agent Error Handler...")
@agent_friendly_errors(reraise=False)
def test_column_error():
"""Test DataFrame column error handling."""
df = pd.DataFrame({"Close": [100, 101, 102]})
# This will raise KeyError
return df["close"] # Wrong case!
result = test_column_error()
if isinstance(result, dict) and "fix_suggestion" in result:
print(f"✅ Error caught with fix: {result['fix_suggestion']['fix']}")
print(f" Example: {result['fix_suggestion']['example']}")
# Validation 2: Testing the quick cache
print("\n💾 2. Testing Quick Cache...")
@quick_cache(ttl_seconds=5)
def expensive_operation(value: int) -> int:
"""Simulate expensive operation."""
time.sleep(0.5) # Simulate work
return value * 2
# First call - cache miss
start = time.time()
result1 = expensive_operation(42)
time1 = time.time() - start
# Second call - cache hit
start = time.time()
result2 = expensive_operation(42)
time2 = time.time() - start
stats = get_cache_stats()
print(f"✅ Cache working: First call {time1:.3f}s, Second call {time2:.3f}s")
print(
f" Cache stats: {stats['hits']} hits, {stats['misses']} misses, {stats['hit_rate']}% hit rate"
)
# Validation 3: Testing parallel screening
print("\n⚡ 3. Testing Parallel Screening...")
def simple_screen(symbol: str) -> dict:
"""Simple screening function for testing."""
time.sleep(0.1) # Simulate work
return {
"symbol": symbol,
"passed": random.random() > 0.5,
"score": random.randint(60, 95),
}
test_symbols = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "META"]
# Sequential
start = time.time()
seq_results = [simple_screen(s) for s in test_symbols]
seq_time = time.time() - start
# Parallel
with ParallelScreener(max_workers=3) as screener:
start = time.time()
par_results = screener.screen_batch(test_symbols, simple_screen, batch_size=2)
par_time = time.time() - start
speedup = seq_time / par_time if par_time > 0 else 1
print(f"✅ Parallel screening: {speedup:.1f}x speedup")
print(f" Sequential: {seq_time:.2f}s, Parallel: {par_time:.2f}s")
# Validation 4: Testing experiment harness
print("\n🧪 4. Testing Experiment Harness...")
os.makedirs("tools/experiments", exist_ok=True)
# Check if experiment harness would work
if os.path.exists("tools/experiment.py"):
print("✅ Experiment harness is available")
print(" Drop .py files in tools/experiments/ to auto-execute")
else:
print("❌ Experiment harness not found")
# Validation 5: Testing fast commands
print("\n🚀 5. Testing Fast Commands...")
# Test make command
result = subprocess.run(["make", "help"], capture_output=True, text=True)
if result.returncode == 0:
print("✅ Makefile commands working")
# Show some key commands
for line in result.stdout.split("\n")[2:6]:
if line.strip():
print(f" {line}")
# Summary
print("\n" + "=" * 60)
print("🎉 Validation Summary:")
print(" 1. Agent Error Handler: ✅ Provides helpful fixes")
print(" 2. Quick Cache: ✅ Speeds up repeated calls")
print(" 3. Parallel Screening: ✅ Multi-core speedup")
print(" 4. Experiment Harness: ✅ Auto-execution ready")
print(" 5. Fast Commands: ✅ Makefile working")
print("\n✨ All core improvements validated successfully!")
```
--------------------------------------------------------------------------------
/maverick_mcp/tests/test_cache_debug.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Debug test for stock data caching issues.
"""
import logging
from datetime import datetime, timedelta
from maverick_mcp.providers.stock_data import StockDataProvider
# Set up detailed logging for debugging
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
def test_stock_data_caching_debug():
"""Test stock data caching functionality with detailed logging."""
# Initialize provider
provider = StockDataProvider()
# Test parameters
symbol = "MSFT"
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
print("\nTest parameters:")
print(f" Symbol: {symbol}")
print(f" Start date: {start_date}")
print(f" End date: {end_date}")
# Test 1: Fetch data (should check cache first, then fetch missing)
print("\n1. Fetching data (should prioritize cache)...")
df1 = provider.get_stock_data(symbol, start_date, end_date)
assert not df1.empty, "First fetch returned empty DataFrame"
print(f" Fetched {len(df1)} rows")
# Test 2: Fetch same data again (should use cache entirely)
print("\n2. Fetching same data again (should use cache entirely)...")
df2 = provider.get_stock_data(symbol, start_date, end_date)
assert not df2.empty, "Second fetch returned empty DataFrame"
print(f" Fetched {len(df2)} rows")
# Verify data consistency
assert len(df1) == len(df2), "Data length mismatch between fetches"
# Test 3: Force fresh data
print("\n3. Forcing fresh data (use_cache=False)...")
df3 = provider.get_stock_data(symbol, start_date, end_date, use_cache=False)
assert not df3.empty, "Fresh fetch returned empty DataFrame"
print(f" Fetched {len(df3)} rows")
# Test 4: Test partial cache hit (request wider date range)
wider_start = (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")
print(
f"\n4. Testing partial cache hit (wider range: {wider_start} to {end_date})..."
)
df4 = provider.get_stock_data(symbol, wider_start, end_date)
assert not df4.empty, "Wider range fetch returned empty DataFrame"
print(f" Fetched {len(df4)} rows (should fetch only missing data)")
# Display sample data
if not df1.empty:
print("\nSample data (first 5 rows):")
print(df1.head())
print("\nTest completed successfully!")
def test_smart_caching_behavior():
"""Test that smart caching truly prioritizes database over yfinance."""
provider = StockDataProvider()
# Use a less common stock to ensure we're testing our cache
symbol = "AAPL"
# Test 1: Request recent data (might be partially cached)
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=10)).strftime("%Y-%m-%d")
print(f"\nTest 1: Recent data request ({start_date} to {end_date})")
df1 = provider.get_stock_data(symbol, start_date, end_date)
print(f"Fetched {len(df1)} rows")
# Test 2: Request same data again - should be fully cached
print("\nTest 2: Same request again - should use cache entirely")
df2 = provider.get_stock_data(symbol, start_date, end_date)
print(f"Fetched {len(df2)} rows")
# Test 3: Request historical data that might be fully cached
hist_end = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
hist_start = (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")
print(f"\nTest 3: Historical data ({hist_start} to {hist_end})")
df3 = provider.get_stock_data(symbol, hist_start, hist_end)
print(f"Fetched {len(df3)} rows")
print("\nSmart caching test completed!")
if __name__ == "__main__":
test_stock_data_caching_debug()
```
--------------------------------------------------------------------------------
/alembic/versions/014_add_portfolio_models.py:
--------------------------------------------------------------------------------
```python
"""Add portfolio management models
Revision ID: 014_add_portfolio_models
Revises: 013_add_backtest_persistence_models
Create Date: 2025-11-01 12:00:00.000000
This migration adds portfolio management models for tracking user investment holdings:
1. UserPortfolio - Portfolio metadata with user identification
2. PortfolioPosition - Individual position records with cost basis tracking
Features:
- Average cost basis tracking for educational simplicity
- High-precision Decimal types for financial accuracy (Numeric(12,4) for prices, Numeric(20,8) for shares)
- Support for fractional shares
- Single-user design with user_id="default"
- Cascade delete for data integrity
- Comprehensive indexes for common query patterns
"""
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from alembic import op
# revision identifiers, used by Alembic.
revision = "014_add_portfolio_models"
down_revision = "013_add_backtest_persistence_models"
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Create portfolio management tables."""
# Create portfolios table
op.create_table(
"mcp_portfolios",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column(
"user_id",
sa.String(100),
nullable=False,
server_default="default",
),
sa.Column(
"name",
sa.String(200),
nullable=False,
server_default="My Portfolio",
),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
)
# Create indexes on portfolios
op.create_index("idx_portfolio_user", "mcp_portfolios", ["user_id"])
op.create_unique_constraint(
"uq_user_portfolio_name", "mcp_portfolios", ["user_id", "name"]
)
# Create positions table
op.create_table(
"mcp_portfolio_positions",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("portfolio_id", postgresql.UUID(as_uuid=True), nullable=False),
sa.Column("ticker", sa.String(20), nullable=False),
sa.Column("shares", sa.Numeric(20, 8), nullable=False),
sa.Column("average_cost_basis", sa.Numeric(12, 4), nullable=False),
sa.Column("total_cost", sa.Numeric(20, 4), nullable=False),
sa.Column("purchase_date", sa.DateTime(timezone=True), nullable=False),
sa.Column("notes", sa.Text, nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.ForeignKeyConstraint(
["portfolio_id"], ["mcp_portfolios.id"], ondelete="CASCADE"
),
)
# Create indexes on positions
op.create_index(
"idx_position_portfolio", "mcp_portfolio_positions", ["portfolio_id"]
)
op.create_index("idx_position_ticker", "mcp_portfolio_positions", ["ticker"])
op.create_index(
"idx_position_portfolio_ticker",
"mcp_portfolio_positions",
["portfolio_id", "ticker"],
)
op.create_unique_constraint(
"uq_portfolio_position_ticker",
"mcp_portfolio_positions",
["portfolio_id", "ticker"],
)
def downgrade() -> None:
"""Drop portfolio management tables."""
# Drop positions table first (due to foreign key)
op.drop_table("mcp_portfolio_positions")
# Drop portfolios table
op.drop_table("mcp_portfolios")
```
--------------------------------------------------------------------------------
/maverick_mcp/api/utils/insomnia_export.py:
--------------------------------------------------------------------------------
```python
"""
Insomnia Collection Export Utility
Converts OpenAPI specifications to Insomnia workspace format.
"""
import uuid
from typing import Any
def convert_to_insomnia(openapi_dict: dict[str, Any]) -> dict[str, Any]:
"""
Convert OpenAPI specification to Insomnia workspace format.
Args:
openapi_dict: OpenAPI specification dictionary
Returns:
Insomnia workspace dictionary
"""
info = openapi_dict.get("info", {})
workspace = {
"_type": "export",
"__export_format": 4,
"__export_date": "2024-01-01T00:00:00.000Z",
"__export_source": "maverick-mcp:openapi",
"resources": [],
}
# Create workspace resource
workspace_id = f"wrk_{uuid.uuid4().hex[:12]}"
workspace["resources"].append(
{
"_id": workspace_id,
"_type": "workspace",
"name": info.get("title", "API Workspace"),
"description": info.get("description", "Exported from OpenAPI spec"),
"scope": "collection",
}
)
# Create environment for base URL
env_id = f"env_{uuid.uuid4().hex[:12]}"
servers = openapi_dict.get("servers", [])
base_url = (
servers[0].get("url", "http://localhost:8000")
if servers
else "http://localhost:8000"
)
workspace["resources"].append(
{
"_id": env_id,
"_type": "environment",
"name": "Base Environment",
"data": {"base_url": base_url},
"dataPropertyOrder": {"&": ["base_url"]},
"color": "#7d69cb",
"isPrivate": False,
"metaSortKey": 1,
"parentId": workspace_id,
}
)
# Convert paths to Insomnia requests
paths = openapi_dict.get("paths", {})
for path, methods in paths.items():
for method, operation in methods.items():
if method.upper() in ["GET", "POST", "PUT", "DELETE", "PATCH"]:
request_id = f"req_{uuid.uuid4().hex[:12]}"
request = {
"_id": request_id,
"_type": "request",
"parentId": workspace_id,
"name": operation.get("summary", f"{method.upper()} {path}"),
"description": operation.get("description", ""),
"url": "{{ _.base_url }}" + path,
"method": method.upper(),
"headers": [],
"parameters": [],
"body": {},
"authentication": {},
}
# Add request body if present
if "requestBody" in operation:
content = operation["requestBody"].get("content", {})
if "application/json" in content:
request["headers"].append(
{"name": "Content-Type", "value": "application/json"}
)
request["body"] = {"mimeType": "application/json", "text": "{}"}
# Add example if available
schema = content["application/json"].get("schema", {})
if "example" in schema:
request["body"]["text"] = str(schema["example"])
# Add query parameters if present
if "parameters" in operation:
for param in operation["parameters"]:
if param.get("in") == "query":
request["parameters"].append(
{
"name": param["name"],
"value": "",
"description": param.get("description", ""),
"disabled": not param.get("required", False),
}
)
workspace["resources"].append(request)
return workspace
```
--------------------------------------------------------------------------------
/tools/hot_reload.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""
Hot reload development tool for Maverick-MCP.
This script watches for file changes and automatically restarts the server,
providing instant feedback during development.
"""
import os
import subprocess
import sys
import time
from pathlib import Path
from typing import Any
try:
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
except ImportError:
print("Installing watchdog for file watching...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "watchdog"])
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
class ReloadHandler(FileSystemEventHandler):
"""Handler that restarts the server on file changes."""
def __init__(self, command: list[str], debounce_seconds: float = 0.5):
self.command = command
self.debounce_seconds = debounce_seconds
self.last_reload = 0
self.process: subprocess.Popen[Any] | None = None
self.start_process()
def start_process(self):
"""Start the development process."""
if self.process:
print("🔄 Stopping previous process...")
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
print(f"🚀 Starting: {' '.join(self.command)}")
self.process = subprocess.Popen(self.command)
self.last_reload = time.time()
def on_modified(self, event):
"""Handle file modification events."""
if event.is_directory:
return
# Skip certain files
path = Path(event.src_path)
if any(
pattern in str(path)
for pattern in [
"__pycache__",
".pyc",
".git",
".pytest_cache",
".log",
".db",
".sqlite",
]
):
return
# Only reload Python files and config files
if path.suffix not in [".py", ".toml", ".yaml", ".yml", ".env"]:
return
# Debounce rapid changes
current_time = time.time()
if current_time - self.last_reload < self.debounce_seconds:
return
print(f"\n📝 File changed: {path}")
self.start_process()
def cleanup(self):
"""Clean up the running process."""
if self.process:
self.process.terminate()
self.process.wait()
def main():
"""Main entry point for hot reload."""
import argparse
parser = argparse.ArgumentParser(description="Hot reload for Maverick-MCP")
parser.add_argument(
"--command",
default="make backend",
help="Command to run (default: make backend)",
)
parser.add_argument(
"--watch",
action="append",
default=["maverick_mcp"],
help="Directories to watch (can be specified multiple times)",
)
parser.add_argument(
"--exclude",
action="append",
default=[],
help="Patterns to exclude from watching",
)
args = parser.parse_args()
# Parse command
command = args.command.split() if isinstance(args.command, str) else args.command
# Set up file watcher
event_handler = ReloadHandler(command)
observer = Observer()
# Watch specified directories
for watch_dir in args.watch:
if os.path.exists(watch_dir):
print(f"👀 Watching: {watch_dir}")
observer.schedule(event_handler, watch_dir, recursive=True)
else:
print(f"⚠️ Directory not found: {watch_dir}")
observer.start()
print("\n✨ Hot reload active! Press Ctrl+C to stop.\n")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n👋 Stopping hot reload...")
observer.stop()
event_handler.cleanup()
observer.join()
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/maverick_mcp/tests/test_mailgun_email.py:
--------------------------------------------------------------------------------
```python
"""
Test script for Mailgun email integration.
Run this script to test your Mailgun configuration:
python maverick_mcp/tests/test_mailgun_email.py
"""
import asyncio
import os
import sys
from pathlib import Path
import pytest
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from maverick_mcp.config.settings import settings
from maverick_mcp.utils.email_service import (
MailgunService,
send_api_key_email,
send_welcome_email,
)
@pytest.mark.asyncio
@pytest.mark.integration
async def test_mailgun_config():
"""Test Mailgun configuration."""
print("=" * 60)
print("Testing Mailgun Configuration")
print("=" * 60)
print(f"Email Enabled: {settings.email.enabled}")
print(f"Mailgun Domain: {settings.email.mailgun_domain}")
print(f"From Address: {settings.email.from_address}")
print(f"From Name: {settings.email.from_name}")
print(f"API Key Set: {'Yes' if settings.email.mailgun_api_key else 'No'}")
if not settings.email.mailgun_api_key:
print("\n❌ Mailgun API key not configured!")
print("Please set MAILGUN_API_KEY in your .env file")
return False
if not settings.email.mailgun_domain:
print("\n❌ Mailgun domain not configured!")
print("Please set MAILGUN_DOMAIN in your .env file")
return False
print("\n✅ Mailgun configuration looks good!")
return True
@pytest.mark.asyncio
@pytest.mark.integration
async def test_send_email():
"""Test sending a basic email."""
print("\n" + "=" * 60)
print("Testing Basic Email Send")
print("=" * 60)
# Get test email from environment or use default
test_email = os.getenv("TEST_EMAIL", "[email protected]")
service = MailgunService()
success = await service.send_email(
to=test_email,
subject="Test Email from Maverick-MCP",
text="This is a test email to verify Mailgun integration.",
html="<h1>Test Email</h1><p>This is a test email to verify Mailgun integration.</p>",
tags=["test", "integration"],
metadata={"test": "true", "source": "test_script"},
)
if success:
print(f"✅ Test email sent successfully to {test_email}")
else:
print(f"❌ Failed to send test email to {test_email}")
return success
@pytest.mark.asyncio
@pytest.mark.integration
async def test_email_templates():
"""Test all email templates."""
print("\n" + "=" * 60)
print("Testing Email Templates")
print("=" * 60)
test_email = os.getenv("TEST_EMAIL", "[email protected]")
test_name = "Test User"
# Test welcome email
print("\n1. Testing Welcome Email...")
success = await send_welcome_email(test_email, test_name)
print("✅ Welcome email sent" if success else "❌ Welcome email failed")
# Test API key email
print("\n2. Testing API Key Email...")
success = await send_api_key_email(test_email, test_name, "test_1234567890")
print("✅ API key email sent" if success else "❌ API key email failed")
async def main():
"""Run all tests."""
print("\nMaverick-MCP Mailgun Email Test Suite")
print("=====================================\n")
# Test configuration
if not await test_mailgun_config():
print("\nPlease configure Mailgun before running tests.")
print("See .env.mailgun.example for configuration details.")
return
# Ask if user wants to send test emails
print("\nWould you like to send test emails? (y/n)")
response = input().strip().lower()
if response == "y":
test_email = input(
"Enter test email address (or press Enter for default): "
).strip()
if test_email:
os.environ["TEST_EMAIL"] = test_email
# Send test emails
await test_send_email()
print("\nWould you like to test all email templates? (y/n)")
if input().strip().lower() == "y":
await test_email_templates()
print("\n" + "=" * 60)
print("Test Complete!")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/maverick_mcp/data/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Data utilities for Maverick-MCP.
This package contains data caching, processing and storage utilities.
"""
# Core data functionality - conditional imports to handle missing dependencies
__all__ = []
# Try to import core cache and model functionality
try:
from .cache import get_from_cache as _get_from_cache
from .cache import save_to_cache as _save_to_cache
get_from_cache = _get_from_cache
save_to_cache = _save_to_cache
__all__.extend(["get_from_cache", "save_to_cache"])
except ImportError:
# Cache functionality not available (missing msgpack)
pass
try:
from .models import (
MaverickBearStocks as _MaverickBearStocks,
)
from .models import (
MaverickStocks as _MaverickStocks,
)
from .models import (
PriceCache as _PriceCache,
)
from .models import (
SessionLocal as _SessionLocal,
)
from .models import (
Stock as _Stock,
)
from .models import (
SupplyDemandBreakoutStocks as _SupplyDemandBreakoutStocks,
)
from .models import (
bulk_insert_price_data as _bulk_insert_price_data,
)
from .models import (
ensure_database_schema as _ensure_database_schema,
)
from .models import (
get_db as _get_db,
)
from .models import (
get_latest_maverick_screening as _get_latest_maverick_screening,
)
from .models import (
init_db as _init_db,
)
MaverickBearStocks = _MaverickBearStocks
MaverickStocks = _MaverickStocks
PriceCache = _PriceCache
SessionLocal = _SessionLocal
Stock = _Stock
SupplyDemandBreakoutStocks = _SupplyDemandBreakoutStocks
bulk_insert_price_data = _bulk_insert_price_data
ensure_database_schema = _ensure_database_schema
get_db = _get_db
get_latest_maverick_screening = _get_latest_maverick_screening
init_db = _init_db
__all__.extend(
[
"Stock",
"PriceCache",
"MaverickStocks",
"MaverickBearStocks",
"SupplyDemandBreakoutStocks",
"SessionLocal",
"get_db",
"init_db",
"ensure_database_schema",
"bulk_insert_price_data",
"get_latest_maverick_screening",
]
)
except ImportError:
# Model functionality not available (missing SQLAlchemy or other deps)
pass
# Always try to import validation - it's critical for production validation test
try:
from .validation import (
DataValidator,
validate_backtest_data,
validate_stock_data,
)
# Create module-level validation instance for easy access
validation = DataValidator()
__all__.extend(
[
"DataValidator",
"validate_stock_data",
"validate_backtest_data",
"validation",
]
)
except ImportError as import_error:
# If validation can't be imported, create a minimal stub
error_message = (
f"Validation functionality requires additional dependencies: {import_error}"
)
def _raise_validation_import_error() -> None:
raise ImportError(error_message)
class ValidationStub:
"""Minimal validation stub when dependencies aren't available."""
def __getattr__(self, name):
_raise_validation_import_error()
# Static method stubs
@staticmethod
def validate_date_range(*args, **kwargs):
_raise_validation_import_error()
@staticmethod
def validate_data_quality(*args, **kwargs):
_raise_validation_import_error()
@staticmethod
def validate_price_data(*args, **kwargs):
_raise_validation_import_error()
@staticmethod
def validate_batch_data(*args, **kwargs):
_raise_validation_import_error()
validation = ValidationStub()
DataValidator = ValidationStub
def validate_stock_data(*args, **kwargs):
return {"error": "Dependencies not available"}
def validate_backtest_data(*args, **kwargs):
return {"error": "Dependencies not available"}
__all__.extend(
[
"DataValidator",
"validate_stock_data",
"validate_backtest_data",
"validation",
]
)
```
--------------------------------------------------------------------------------
/maverick_mcp/utils/stock_helpers.py:
--------------------------------------------------------------------------------
```python
"""
Stock data helper utilities for routers.
This module provides common stock data fetching and processing utilities
that are shared across multiple routers to avoid code duplication.
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
from datetime import UTC, datetime, timedelta
import pandas as pd
from maverick_mcp.providers.stock_data import EnhancedStockDataProvider
# Thread pool for async operations
executor = ThreadPoolExecutor(max_workers=4)
def get_stock_dataframe(ticker: str, days: int = 365) -> pd.DataFrame:
"""
Get stock data as a DataFrame with technical indicators.
Args:
ticker: Stock ticker symbol (e.g., "AAPL")
days: Number of days of historical data to fetch (default: 365)
Returns:
DataFrame with stock price data and technical indicators
Raises:
ValueError: If ticker is invalid or data cannot be fetched
"""
from maverick_mcp.core.technical_analysis import add_technical_indicators
# Calculate date range
end_date = datetime.now(UTC)
start_date = end_date - timedelta(days=days)
start_str = start_date.strftime("%Y-%m-%d")
end_str = end_date.strftime("%Y-%m-%d")
# Get stock data provider
stock_provider = EnhancedStockDataProvider()
# Fetch data and add technical indicators
df = stock_provider.get_stock_data(ticker, start_str, end_str)
df = add_technical_indicators(df)
return df
async def get_stock_dataframe_async(ticker: str, days: int = 365) -> pd.DataFrame:
"""
Async wrapper for get_stock_dataframe to avoid blocking the event loop.
Args:
ticker: Stock ticker symbol (e.g., "AAPL")
days: Number of days of historical data to fetch (default: 365)
Returns:
DataFrame with stock price data and technical indicators
Raises:
ValueError: If ticker is invalid or data cannot be fetched
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, get_stock_dataframe, ticker, days)
async def get_multiple_stock_dataframes_async(
tickers: list[str], days: int = 365
) -> dict[str, pd.DataFrame]:
"""
Fetch multiple stock dataframes concurrently.
Args:
tickers: List of stock ticker symbols
days: Number of days of historical data to fetch (default: 365)
Returns:
Dictionary mapping ticker symbols to their DataFrames
Raises:
ValueError: If any ticker is invalid or data cannot be fetched
"""
tasks = [get_stock_dataframe_async(ticker, days) for ticker in tickers]
results = await asyncio.gather(*tasks)
return dict(zip(tickers, results, strict=False))
def validate_ticker(ticker: str) -> str:
"""
Validate and normalize a stock ticker symbol.
Args:
ticker: Stock ticker symbol to validate
Returns:
Normalized ticker symbol (uppercase, stripped)
Raises:
ValueError: If ticker is invalid
"""
if not ticker or not isinstance(ticker, str): # type: ignore[arg-type]
raise ValueError("Ticker must be a non-empty string")
ticker = ticker.strip().upper()
if not ticker:
raise ValueError("Ticker cannot be empty")
# Basic validation - ticker should be alphanumeric with possible dots/hyphens
if not ticker.replace(".", "").replace("-", "").isalnum():
raise ValueError("Ticker contains invalid characters")
if len(ticker) > 10:
raise ValueError("Ticker is too long (max 10 characters)")
return ticker
def calculate_date_range(days: int) -> tuple[str, str]:
"""
Calculate start and end date strings for stock data fetching.
Args:
days: Number of days of historical data
Returns:
Tuple of (start_date_str, end_date_str) in YYYY-MM-DD format
Raises:
ValueError: If days is not a positive integer
"""
if not isinstance(days, int) or days <= 0: # type: ignore[arg-type]
raise ValueError("Days must be a positive integer")
if days > 3650: # ~10 years
raise ValueError("Days cannot exceed 3650 (10 years)")
end_date = datetime.now(UTC)
start_date = end_date - timedelta(days=days)
start_str = start_date.strftime("%Y-%m-%d")
end_str = end_date.strftime("%Y-%m-%d")
return start_str, end_str
```
--------------------------------------------------------------------------------
/maverick_mcp/validation/technical.py:
--------------------------------------------------------------------------------
```python
"""
Validation models for technical analysis tools.
This module provides Pydantic models for validating inputs
to all technical analysis tools.
"""
from pydantic import Field, field_validator
from .base import (
PositiveInt,
StrictBaseModel,
TickerSymbol,
TickerValidator,
)
class RSIAnalysisRequest(StrictBaseModel):
"""Validation for get_rsi_analysis tool."""
ticker: TickerSymbol = Field(..., description="Stock ticker symbol")
period: PositiveInt = Field(
default=14, le=100, description="RSI period (typically 14)"
)
days: PositiveInt = Field(
default=365,
le=3650, # Max 10 years
description="Number of days of historical data",
)
@field_validator("ticker")
@classmethod
def normalize_ticker(cls, v: str) -> str:
"""Normalize ticker to uppercase."""
return TickerValidator.validate_ticker(v)
model_config = {
"json_schema_extra": {
"examples": [
{"ticker": "AAPL", "period": 14, "days": 365},
{"ticker": "MSFT", "period": 21, "days": 90},
]
}
}
class MACDAnalysisRequest(StrictBaseModel):
"""Validation for get_macd_analysis tool."""
ticker: TickerSymbol = Field(..., description="Stock ticker symbol")
fast_period: PositiveInt = Field(default=12, le=50, description="Fast EMA period")
slow_period: PositiveInt = Field(default=26, le=100, description="Slow EMA period")
signal_period: PositiveInt = Field(
default=9, le=50, description="Signal line period"
)
days: PositiveInt = Field(
default=365, le=3650, description="Number of days of historical data"
)
@field_validator("ticker")
@classmethod
def normalize_ticker(cls, v: str) -> str:
"""Normalize ticker to uppercase."""
return TickerValidator.validate_ticker(v)
@field_validator("slow_period")
@classmethod
def validate_slow_greater_than_fast(cls, v: int, info) -> int:
"""Ensure slow period is greater than fast period."""
fast = info.data.get("fast_period", 12)
if v <= fast:
raise ValueError(
f"Slow period ({v}) must be greater than fast period ({fast})"
)
return v
model_config = {
"json_schema_extra": {
"examples": [
{
"ticker": "AAPL",
"fast_period": 12,
"slow_period": 26,
"signal_period": 9,
},
{
"ticker": "GOOGL",
"fast_period": 10,
"slow_period": 20,
"signal_period": 5,
"days": 180,
},
]
}
}
class SupportResistanceRequest(StrictBaseModel):
"""Validation for get_support_resistance tool."""
ticker: TickerSymbol = Field(..., description="Stock ticker symbol")
days: PositiveInt = Field(
default=365, le=3650, description="Number of days of historical data"
)
@field_validator("ticker")
@classmethod
def normalize_ticker(cls, v: str) -> str:
"""Normalize ticker to uppercase."""
return TickerValidator.validate_ticker(v)
class TechnicalAnalysisRequest(StrictBaseModel):
"""Validation for get_full_technical_analysis tool."""
ticker: TickerSymbol = Field(..., description="Stock ticker symbol")
days: PositiveInt = Field(
default=365, le=3650, description="Number of days of historical data"
)
@field_validator("ticker")
@classmethod
def normalize_ticker(cls, v: str) -> str:
"""Normalize ticker to uppercase."""
return TickerValidator.validate_ticker(v)
model_config = {
"json_schema_extra": {
"examples": [
{"ticker": "AAPL", "days": 365},
{"ticker": "TSLA", "days": 90},
]
}
}
class StockChartRequest(StrictBaseModel):
"""Validation for get_stock_chart_analysis tool."""
ticker: TickerSymbol = Field(..., description="Stock ticker symbol")
@field_validator("ticker")
@classmethod
def normalize_ticker(cls, v: str) -> str:
"""Normalize ticker to uppercase."""
return TickerValidator.validate_ticker(v)
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/feature_request.md:
--------------------------------------------------------------------------------
```markdown
---
name: Feature Request
about: Suggest a new feature or improvement for MaverickMCP
title: '[FEATURE] '
labels: ['enhancement', 'needs-triage']
assignees: ''
---
## 🚀 Feature Request Summary
A clear and concise description of the feature you'd like to see added.
## 💰 Financial Disclaimer Acknowledgment
- [ ] I understand this is educational software and not financial advice
- [ ] This feature request is for educational/technical purposes, not investment recommendations
- [ ] I understand that any financial analysis features will include appropriate disclaimers
## 🎯 Problem/Use Case
**What problem does this feature solve?**
A clear description of the problem or limitation you're experiencing.
**Who would benefit from this feature?**
- [ ] Individual traders learning technical analysis
- [ ] MCP developers building financial tools
- [ ] Educational institutions teaching finance
- [ ] Open source contributors
- [ ] Other: ___________
## 💡 Proposed Solution
**Describe your ideal solution:**
A clear and concise description of what you want to happen.
**Alternative approaches you've considered:**
Any alternative solutions or features you've thought about.
## 🔧 Technical Details
**Component Area:**
- [ ] Data fetching (new data sources, APIs)
- [ ] Technical analysis (new indicators, calculations)
- [ ] Stock screening (new strategies, filters)
- [ ] Portfolio analysis (risk metrics, optimization)
- [ ] MCP tools (new tools, tool improvements)
- [ ] Database/Caching (performance, storage)
- [ ] Claude Desktop integration
- [ ] Developer experience (setup, debugging)
- [ ] Documentation and examples
**Implementation Complexity:**
- [ ] Simple (few lines of code, existing patterns)
- [ ] Medium (new functionality, moderate effort)
- [ ] Complex (major architectural changes, significant effort)
- [ ] I'm not sure
**Dependencies:**
- Does this require new external APIs or libraries?
- Are there any known technical constraints?
## 📊 Examples/Mockups
**Code examples, mockups, or references:**
```python
# Example of how you envision using this feature
result = new_feature_function(symbol="AAPL", period=20)
```
**Reference implementations:**
- Links to similar features in other projects
- Academic papers or financial resources
- Industry standards or best practices
## 🎓 Educational Value
**Learning objectives:**
- What financial concepts would this help users learn?
- How does this contribute to financial education?
- What technical skills would developers gain?
**Documentation needs:**
- [ ] Code examples needed
- [ ] Tutorial/guide needed
- [ ] Financial concept explanation needed
- [ ] API documentation needed
## 🤝 Contribution
**Are you willing to contribute to this feature?**
- [ ] Yes, I can implement this myself
- [ ] Yes, I can help with testing/documentation
- [ ] Yes, I can provide domain expertise
- [ ] I can help but need guidance
- [ ] I cannot contribute but would love to use it
**Your relevant experience:**
- Financial analysis background?
- Python development experience?
- MCP protocol familiarity?
- Other relevant skills?
## ✅ Pre-submission Checklist
- [ ] I have searched existing issues to avoid duplicates
- [ ] This feature aligns with educational/personal-use goals
- [ ] I have considered the implementation complexity
- [ ] I understand this won't provide financial advice or guarantees
- [ ] I have provided clear examples and use cases
## 🏷️ Feature Classification
**Priority:**
- [ ] Critical (blocking important use cases)
- [ ] High (significant improvement to user experience)
- [ ] Medium (nice to have, moderate impact)
- [ ] Low (minor enhancement)
**Effort Estimate:**
- [ ] Small (1-3 days)
- [ ] Medium (1-2 weeks)
- [ ] Large (1+ months)
- [ ] Unknown
**Release Timeline:**
- [ ] Next minor version
- [ ] Next major version
- [ ] Future consideration
- [ ] No preference
## 🌟 Additional Context
**Related issues or discussions:**
- Link to any related GitHub issues or discussions
- References to community conversations
**Financial domain considerations:**
- Any regulatory or compliance aspects?
- Specific financial methodologies or standards?
- Data provider requirements or limitations?
**Community impact:**
- How many users would benefit?
- Educational institutions that might use this?
- Open source projects that could leverage this?
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "maverick_mcp"
version = "0.1.0"
description = "Personal-use MCP server for Claude Desktop providing professional-grade stock analysis and technical indicators"
readme = "README.md"
requires-python = ">=3.12,<3.13"
dependencies = [
# Core MCP and server dependencies
"fastmcp>=2.7.0",
"mcp>=1.9.3",
"fastapi>=0.115.12",
"uvicorn>=0.35.0",
"gunicorn>=23.0.0",
"python-multipart>=0.0.20",
"aiofiles>=24.1.0",
"httpx>=0.28.1",
"python-dotenv>=1.0.1",
# LangChain and AI dependencies
"langchain>=0.3.25",
"langchain-anthropic>=0.3.15",
"langchain-community>=0.3.24",
"langchain-openai>=0.3.19",
"langchain-mcp-adapters>=0.1.6",
"langgraph>=0.4.8",
"langgraph-supervisor>=0.0.18",
"anthropic>=0.52.2",
"openai>=1.84.0",
"tiktoken>=0.6.0",
# Deep research dependencies
"exa-py>=1.0.19",
# Database and caching
"sqlalchemy>=2.0.40",
"alembic>=1.16.1",
"psycopg2-binary>=2.9.10",
"aiosqlite>=0.20.0",
"asyncpg>=0.30.0",
"greenlet>=3.0.0",
"redis>=6.2.0",
"hiredis>=3.2.1",
"msgpack>=1.0.7",
"certifi>=2024.2.2",
# Financial data and analysis (core)
"numpy>=1.26.4",
"pandas>=2.2.3",
"yfinance>=0.2.63",
"finvizfinance>=1.1.0",
"pandas-ta>=0.3.14b0",
"ta-lib>=0.6.3",
# Backtesting
"vectorbt>=0.26.0",
"numba>=0.60.0",
"scikit-learn>=1.6.1",
"scipy>=1.15.3",
"pytz>=2024.1",
# Security (basic cryptography for data security)
"cryptography>=42.0.0",
# System monitoring (basic)
"psutil>=6.0.0",
"sentry-sdk[fastapi]>=2.22.0",
# Prometheus metrics
"prometheus-client>=0.21.1",
# Trading
"fredapi>=0.5.2",
"pandas-datareader>=0.10.0",
"pandas-market-calendars>=5.1.0",
"tiingo>=0.16.1",
# Visualization (essential only)
"matplotlib>=3.10.3",
"plotly>=5.0.0",
"seaborn>=0.13.2",
"kaleido>=0.2.1", # Required for Plotly image export
# Development tools
"watchdog>=6.0.0",
"ty>=0.0.1a19",
"pytest>=8.4.0",
"pytest-asyncio>=1.1.0",
"pytest-cov>=6.2.1",
"vcrpy>=7.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.3.5",
"pytest-asyncio>=0.24.0",
"pytest-cov>=4.1.0",
"pytest-xdist>=3.6.0",
"testcontainers[postgres,redis]>=4.5.0",
"vcrpy>=6.0.1",
"aiosqlite>=0.20.0",
"greenlet>=3.0.0",
"asyncpg>=0.30.0",
"ruff>=0.11.10",
"bandit>=1.7.5",
"safety>=3.0.0",
"types-requests>=2.31.0",
"types-pytz>=2024.1.0",
"ty>=0.0.1a19",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
include = ["*.py"]
[tool.pytest.ini_options]
minversion = "8.0"
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
# Markers for test categories
markers = [
"unit: marks tests as unit tests (deselect with '-m \"not unit\"')",
"integration: marks tests as integration tests (deselect with '-m \"not integration\"')",
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
"external: marks tests that require external APIs",
"database: marks tests that require database access",
"redis: marks tests that require Redis access",
]
# Default to running only unit tests
addopts = [
"-v",
"--strict-markers",
"--tb=short",
"-m", "not integration and not slow and not external",
"--durations=10", # Show 10 slowest tests
]
# Async configuration
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
[tool.ruff]
line-length = 88
target-version = "py312"
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
]
ignore = [
"E501", # line too long (handled by formatter)
"B008", # do not perform function calls in argument defaults
"B904", # raise without from inside except
"W191", # indentation contains tabs
]
[tool.ruff.lint.per-file-ignores]
"tests/*" = ["F403", "F405"] # star imports allowed in tests
[tool.ruff.format]
quote-style = "double"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"
[dependency-groups]
dev = [
"testcontainers[postgres]>=4.10.0",
]
```
--------------------------------------------------------------------------------
/scripts/test_seeded_data.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Test script to verify seeded data works with MCP tools.
This script tests the key MCP screening tools to ensure they return
results from the seeded database.
"""
import logging
import os
import sys
from pathlib import Path
# Add the project root to the Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# noqa: E402 - imports must come after sys.path modification
from sqlalchemy import create_engine # noqa: E402
from sqlalchemy.orm import sessionmaker # noqa: E402
from maverick_mcp.providers.stock_data import EnhancedStockDataProvider # noqa: E402
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("test_seeded_data")
def test_screening_tools():
"""Test the main screening tools with seeded data."""
logger.info("Testing MCP screening tools with seeded data...")
# Set up database connection
database_url = os.getenv("DATABASE_URL") or "sqlite:///maverick_mcp.db"
engine = create_engine(database_url, echo=False)
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# Create provider
provider = EnhancedStockDataProvider(db_session=session)
# Test 1: Maverick recommendations (bullish)
logger.info("=== Testing Maverick Recommendations (Bullish) ===")
try:
maverick_results = provider.get_maverick_recommendations(limit=5)
logger.info(f"✅ Found {len(maverick_results)} Maverick recommendations")
for i, stock in enumerate(maverick_results[:3]):
logger.info(
f" {i + 1}. {stock['ticker']} - Score: {stock.get('combined_score', 'N/A')}"
)
except Exception as e:
logger.error(f"❌ Maverick recommendations failed: {e}")
# Test 2: Bear recommendations
logger.info("\n=== Testing Bear Recommendations ===")
try:
bear_results = provider.get_maverick_bear_recommendations(limit=5)
logger.info(f"✅ Found {len(bear_results)} Bear recommendations")
for i, stock in enumerate(bear_results[:3]):
logger.info(
f" {i + 1}. {stock['ticker']} - Score: {stock.get('score', 'N/A')}"
)
except Exception as e:
logger.error(f"❌ Bear recommendations failed: {e}")
# Test 3: Supply/Demand breakouts
logger.info("\n=== Testing Supply/Demand Breakouts ===")
try:
breakout_results = provider.get_supply_demand_breakout_recommendations(
limit=5
)
logger.info(f"✅ Found {len(breakout_results)} Supply/Demand breakouts")
for i, stock in enumerate(breakout_results[:3]):
logger.info(
f" {i + 1}. {stock['ticker']} - Score: {stock.get('momentum_score', 'N/A')}"
)
except Exception as e:
logger.error(f"❌ Supply/Demand breakouts failed: {e}")
# Test 4: Individual stock data
logger.info("\n=== Testing Individual Stock Data ===")
try:
# Test with AAPL (should have price data)
stock_data = provider.get_stock_data(
"AAPL", start_date="2025-08-01", end_date="2025-08-23"
)
logger.info(f"✅ AAPL price data: {len(stock_data)} records")
if not stock_data.empty:
latest = stock_data.iloc[-1]
logger.info(f" Latest: {latest.name} - Close: ${latest['close']:.2f}")
except Exception as e:
logger.error(f"❌ Individual stock data failed: {e}")
# Test 5: All screening recommendations
logger.info("\n=== Testing All Screening Recommendations ===")
try:
all_results = provider.get_all_screening_recommendations()
total = sum(len(stocks) for stocks in all_results.values())
logger.info(f"✅ Total screening results across all categories: {total}")
for category, stocks in all_results.items():
logger.info(f" {category}: {len(stocks)} stocks")
except Exception as e:
logger.error(f"❌ All screening recommendations failed: {e}")
logger.info("\n🎉 MCP screening tools test completed!")
if __name__ == "__main__":
test_screening_tools()
```
--------------------------------------------------------------------------------
/DATABASE_SETUP.md:
--------------------------------------------------------------------------------
```markdown
# MaverickMCP Database Setup
This guide explains how to set up and seed the SQLite database for MaverickMCP with sample stock data.
## Quick Start
### 1. Run Complete Setup (Recommended)
```bash
# Set your database URL (optional - defaults to SQLite)
export DATABASE_URL=sqlite:///maverick_mcp.db
# Run the complete setup script
./scripts/setup_database.sh
```
This will:
- ✅ Create SQLite database with all tables
- ✅ Seed with 40 sample stocks (AAPL, MSFT, GOOGL, etc.)
- ✅ Populate with 1,370+ price records
- ✅ Generate sample screening results (Maverick, Bear, Supply/Demand)
- ✅ Add technical indicators cache
### 2. Manual Step-by-Step Setup
```bash
# Step 1: Create database tables
python scripts/migrate_db.py
# Step 2: Seed with sample data (no API key required)
python scripts/seed_db.py
# Step 3: Test the setup
python scripts/test_seeded_data.py
```
## Database Configuration
### Default Configuration (SQLite)
- **Database**: `sqlite:///maverick_mcp.db`
- **Location**: Project root directory
- **No setup required**: Works out of the box
### PostgreSQL (Optional)
```bash
# Set environment variable
export DATABASE_URL=postgresql://localhost/maverick_mcp
# Create PostgreSQL database
createdb maverick_mcp
# Run migration
python scripts/migrate_db.py
```
## Sample Data Overview
### Stocks Included (40 total)
- **Large Cap**: AAPL, MSFT, GOOGL, AMZN, TSLA, NVDA, META, BRK-B, JNJ, V
- **Growth**: AMD, CRM, SHOP, ROKU, ZM, DOCU, SNOW, PLTR, RBLX, U
- **Value**: KO, PFE, XOM, CVX, JPM, BAC, WMT, PG, T, VZ
- **Small Cap**: UPST, SOFI, OPEN, WISH, CLOV, SPCE, LCID, RIVN, BYND, PTON
### Generated Data
- **1,370+ Price Records**: 200 days of historical data for 10 stocks
- **24 Maverick Stocks**: Bullish momentum recommendations
- **16 Bear Stocks**: Bearish setups with technical indicators
- **16 Supply/Demand Breakouts**: Accumulation breakout candidates
- **600 Technical Indicators**: RSI, SMA cache for analysis
## Testing MCP Tools
After seeding, test that the screening tools work:
```bash
python scripts/test_seeded_data.py
```
Expected output:
```
✅ Found 5 Maverick recommendations
1. PTON - Score: 100
2. BYND - Score: 100
3. RIVN - Score: 100
✅ Found 5 Bear recommendations
1. MSFT - Score: 37
2. JNJ - Score: 32
3. TSLA - Score: 32
✅ Total screening results across all categories: 56
```
## Using with Claude Desktop
After database setup, start the MCP server:
```bash
# Start the server
make dev
# Or manually
uvicorn maverick_mcp.api.server:app --host 0.0.0.0 --port 8003
```
Then connect with Claude Desktop using `mcp-remote`:
```json
{
"mcpServers": {
"maverick-mcp": {
"command": "npx",
"args": ["-y", "mcp-remote", "http://localhost:8003/mcp"]
}
}
}
```
Test with prompts like:
- "Show me the top maverick stock recommendations"
- "Get technical analysis for AAPL"
- "Find bearish stocks with high RSI"
## Database Schema
### Core Tables
- **mcp_stocks**: Stock symbols and company information
- **mcp_price_cache**: Historical OHLCV price data
- **mcp_technical_cache**: Calculated technical indicators
### Screening Tables
- **mcp_maverick_stocks**: Bullish momentum screening results
- **mcp_maverick_bear_stocks**: Bearish setup screening results
- **mcp_supply_demand_breakouts**: Breakout pattern screening results
## Troubleshooting
### Database Connection Issues
```bash
# Check database exists
ls -la maverick_mcp.db
# Test SQLite connection
sqlite3 maverick_mcp.db "SELECT COUNT(*) FROM mcp_stocks;"
```
### No Screening Results
```bash
# Verify data was seeded
sqlite3 maverick_mcp.db "
SELECT
(SELECT COUNT(*) FROM mcp_stocks) as stocks,
(SELECT COUNT(*) FROM mcp_price_cache) as prices,
(SELECT COUNT(*) FROM mcp_maverick_stocks) as maverick;
"
```
### MCP Server Connection
```bash
# Check server is running
curl http://localhost:8003/health
# Check MCP endpoint
curl http://localhost:8003/mcp/capabilities
```
## Advanced Configuration
### Environment Variables
```bash
# Database
DATABASE_URL=sqlite:///maverick_mcp.db
# Optional: Enable debug logging
LOG_LEVEL=debug
# Optional: Redis caching
REDIS_HOST=localhost
REDIS_PORT=6379
```
### Custom Stock Lists
Edit `scripts/seed_db.py` and modify `SAMPLE_STOCKS` to include your preferred stock symbols.
### Production Setup
- Use PostgreSQL for better performance
- Enable Redis caching
- Set up proper logging
- Configure rate limiting
---
✅ **Database ready!** Your MaverickMCP instance now has a complete SQLite database with sample stock data and screening results.
```
--------------------------------------------------------------------------------
/alembic/env.py:
--------------------------------------------------------------------------------
```python
"""
Alembic environment configuration for Maverick-MCP.
This file configures Alembic to work with the existing Django database,
managing only tables with the mcp_ prefix.
"""
import os
import sys
from logging.config import fileConfig
from pathlib import Path
from sqlalchemy import engine_from_config, pool
from alembic import context
# Add project root to Python path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Import models
from maverick_mcp.data.models import Base as DataBase
# Use data models metadata (auth removed for personal version)
combined_metadata = DataBase.metadata
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Get database URL from environment or use default
DATABASE_URL = os.getenv(
"DATABASE_URL",
os.getenv("POSTGRES_URL", "postgresql://localhost/local_production_snapshot"),
)
# Override sqlalchemy.url in alembic.ini
config.set_main_option("sqlalchemy.url", DATABASE_URL)
# add your model's MetaData object here
# for 'autogenerate' support
# Use the combined metadata from both Base objects
target_metadata = combined_metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def include_object(object, name, type_, reflected, compare_to):
"""
Include only MCP-prefixed tables and stock-related tables.
This ensures Alembic only manages tables that belong to Maverick-MCP,
not Django tables.
"""
if type_ == "table":
# Include MCP tables and stock tables
return (
name.startswith("mcp_")
or name.startswith("stocks_")
or name
in ["maverick_stocks", "maverick_bear_stocks", "supply_demand_breakouts"]
)
elif type_ in [
"index",
"unique_constraint",
"foreign_key_constraint",
"check_constraint",
]:
# Include indexes and constraints for our tables
if hasattr(object, "table") and object.table is not None:
table_name = object.table.name
return (
table_name.startswith("mcp_")
or table_name.startswith("stocks_")
or table_name
in [
"maverick_stocks",
"maverick_bear_stocks",
"supply_demand_breakouts",
]
)
# For reflected objects, check the table name in the name
return any(
name.startswith(prefix)
for prefix in [
"idx_mcp_",
"uq_mcp_",
"fk_mcp_",
"ck_mcp_",
"idx_stocks_",
"uq_stocks_",
"fk_stocks_",
"ck_stocks_",
"ck_pricecache_",
"ck_maverick_",
"ck_supply_demand_",
]
)
return True
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
include_object=include_object,
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
include_object=include_object,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
```
--------------------------------------------------------------------------------
/maverick_mcp/providers/implementations/market_data_adapter.py:
--------------------------------------------------------------------------------
```python
"""
Market data provider adapter.
This module provides adapters that make the existing MarketDataProvider
compatible with the new IMarketDataProvider interface.
"""
import asyncio
import logging
from typing import Any
from maverick_mcp.providers.interfaces.market_data import (
IMarketDataProvider,
MarketDataConfig,
)
from maverick_mcp.providers.market_data import MarketDataProvider
logger = logging.getLogger(__name__)
class MarketDataAdapter(IMarketDataProvider):
"""
Adapter that makes the existing MarketDataProvider compatible with IMarketDataProvider interface.
This adapter wraps the existing provider and exposes it through the new
interface contracts, enabling gradual migration to the new architecture.
"""
def __init__(self, config: MarketDataConfig | None = None):
"""
Initialize the market data adapter.
Args:
config: Market data configuration (optional)
"""
self._config = config
self._provider = MarketDataProvider()
logger.debug("MarketDataAdapter initialized")
async def get_market_summary(self) -> dict[str, Any]:
"""
Get a summary of major market indices (async wrapper).
Returns:
Dictionary with market index data including prices and changes
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_market_summary)
async def get_top_gainers(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get top gaining stocks in the market (async wrapper).
Args:
limit: Maximum number of stocks to return
Returns:
List of dictionaries with stock data for top gainers
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_top_gainers, limit)
async def get_top_losers(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get top losing stocks in the market (async wrapper).
Args:
limit: Maximum number of stocks to return
Returns:
List of dictionaries with stock data for top losers
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_top_losers, limit)
async def get_most_active(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get most active stocks by volume (async wrapper).
Args:
limit: Maximum number of stocks to return
Returns:
List of dictionaries with stock data for most active stocks
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_most_active, limit)
async def get_sector_performance(self) -> dict[str, float]:
"""
Get sector performance data (async wrapper).
Returns:
Dictionary mapping sector names to performance percentages
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_sector_performance)
async def get_earnings_calendar(self, days: int = 7) -> list[dict[str, Any]]:
"""
Get upcoming earnings announcements (async wrapper).
Args:
days: Number of days to look ahead
Returns:
List of dictionaries with earnings announcement data
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self._provider.get_earnings_calendar, days
)
async def get_market_overview(self) -> dict[str, Any]:
"""
Get comprehensive market overview (async wrapper).
Returns:
Dictionary with comprehensive market data including:
- market_summary: Index data
- top_gainers: Daily gainers
- top_losers: Daily losers
- sector_performance: Sector data
- timestamp: Data timestamp
"""
# Use the existing async method if available, otherwise wrap the sync version
if hasattr(self._provider, "get_market_overview_async"):
return await self._provider.get_market_overview_async()
else:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_market_overview)
def get_sync_provider(self) -> MarketDataProvider:
"""
Get the underlying synchronous provider for backward compatibility.
Returns:
The wrapped MarketDataProvider instance
"""
return self._provider
```
--------------------------------------------------------------------------------
/maverick_mcp/api/connection_manager.py:
--------------------------------------------------------------------------------
```python
"""
MCP Connection Manager for persistent tool registration and session management.
"""
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime
logger = logging.getLogger(__name__)
@dataclass
class ConnectionSession:
"""Represents an active MCP connection session."""
session_id: str
client_info: str
connected_at: datetime = field(default_factory=datetime.now)
last_activity: datetime = field(default_factory=datetime.now)
tools_registered: bool = False
is_active: bool = True
class MCPConnectionManager:
"""
Manages MCP connection sessions and ensures persistent tool registration.
Fixes:
- Single connection initialization to prevent tool registration conflicts
- Session persistence to maintain tool availability across connection cycles
- Connection monitoring and cleanup
"""
def __init__(self):
self.sessions: dict[str, ConnectionSession] = {}
self.tools_initialized = False
self.startup_time = datetime.now()
self._lock = asyncio.Lock()
async def register_connection(
self, session_id: str, client_info: str = "unknown"
) -> ConnectionSession:
"""Register a new connection session."""
async with self._lock:
logger.info(
f"Registering new MCP connection: {session_id} from {client_info}"
)
# Clean up any existing session with same ID
if session_id in self.sessions:
await self.cleanup_session(session_id)
# Create new session
session = ConnectionSession(session_id=session_id, client_info=client_info)
self.sessions[session_id] = session
# Ensure tools are registered (only once globally)
if not self.tools_initialized:
logger.info("Initializing tools for first connection")
self.tools_initialized = True
session.tools_registered = True
else:
logger.info("Tools already initialized, reusing registration")
session.tools_registered = True
logger.info(
f"Connection registered successfully. Active sessions: {len(self.sessions)}"
)
return session
async def update_activity(self, session_id: str):
"""Update last activity timestamp for a session."""
if session_id in self.sessions:
self.sessions[session_id].last_activity = datetime.now()
async def cleanup_session(self, session_id: str):
"""Clean up a specific session."""
if session_id in self.sessions:
session = self.sessions[session_id]
session.is_active = False
logger.info(
f"Cleaning up session {session_id} (connected for {datetime.now() - session.connected_at})"
)
del self.sessions[session_id]
async def cleanup_stale_sessions(self, timeout_seconds: int = 300):
"""Clean up sessions that haven't been active recently."""
now = datetime.now()
stale_sessions = []
for session_id, session in self.sessions.items():
if (now - session.last_activity).total_seconds() > timeout_seconds:
stale_sessions.append(session_id)
for session_id in stale_sessions:
await self.cleanup_session(session_id)
def get_connection_status(self) -> dict:
"""Get current connection status for debugging."""
now = datetime.now()
return {
"active_sessions": len(self.sessions),
"tools_initialized": self.tools_initialized,
"server_uptime": str(now - self.startup_time),
"sessions": [
{
"session_id": session.session_id,
"client_info": session.client_info,
"connected_duration": str(now - session.connected_at),
"last_activity": str(now - session.last_activity),
"tools_registered": session.tools_registered,
"is_active": session.is_active,
}
for session in self.sessions.values()
],
}
async def ensure_tools_available(self) -> bool:
"""Ensure tools are available for connections."""
return self.tools_initialized and len(self.sessions) > 0
# Global connection manager instance
connection_manager = MCPConnectionManager()
async def get_connection_manager() -> MCPConnectionManager:
"""Get the global connection manager instance."""
return connection_manager
```
--------------------------------------------------------------------------------
/maverick_mcp/config/plotly_config.py:
--------------------------------------------------------------------------------
```python
"""
Plotly configuration module for Maverick MCP.
This module configures Plotly defaults using the modern plotly.io.defaults API
to avoid deprecation warnings from the legacy kaleido.scope API.
"""
import logging
import warnings
from typing import Any
try:
import plotly.io as pio
PLOTLY_AVAILABLE = True
except ImportError:
PLOTLY_AVAILABLE = False
logger = logging.getLogger(__name__)
def configure_plotly_defaults() -> None:
"""
Configure Plotly defaults using the modern plotly.io.defaults API.
This replaces the deprecated plotly.io.kaleido.scope configuration
and helps reduce deprecation warnings.
"""
if not PLOTLY_AVAILABLE:
logger.warning("Plotly not available, skipping configuration")
return
try:
# Configure modern Plotly defaults (replaces kaleido.scope configuration)
pio.defaults.default_format = "png"
pio.defaults.default_width = 800
pio.defaults.default_height = 600
pio.defaults.default_scale = 1.0
# Configure additional defaults that don't trigger deprecation warnings
if hasattr(pio.defaults, "mathjax"):
pio.defaults.mathjax = None
if hasattr(pio.defaults, "plotlyjs"):
pio.defaults.plotlyjs = "auto"
# Note: We avoid setting kaleido.scope properties directly to prevent warnings
# The modern pio.defaults API should be used instead
logger.info("✓ Plotly defaults configured successfully")
except Exception as e:
logger.error(f"Error configuring Plotly defaults: {e}")
def suppress_plotly_warnings() -> None:
"""
Suppress specific Plotly/Kaleido deprecation warnings.
These warnings come from the library internals and can't be fixed
at the user code level until the libraries are updated.
"""
try:
# Comprehensive suppression of all kaleido-related deprecation warnings
deprecation_patterns = [
r".*plotly\.io\.kaleido\.scope\..*is deprecated.*",
r".*Use of plotly\.io\.kaleido\.scope\..*is deprecated.*",
r".*default_format.*deprecated.*",
r".*default_width.*deprecated.*",
r".*default_height.*deprecated.*",
r".*default_scale.*deprecated.*",
r".*mathjax.*deprecated.*",
r".*plotlyjs.*deprecated.*",
]
for pattern in deprecation_patterns:
warnings.filterwarnings(
"ignore",
category=DeprecationWarning,
message=pattern,
)
# Also suppress by module
warnings.filterwarnings(
"ignore",
category=DeprecationWarning,
module=r".*kaleido.*",
)
warnings.filterwarnings(
"ignore",
category=DeprecationWarning,
module=r"plotly\.io\._kaleido",
)
logger.debug("✓ Plotly deprecation warnings suppressed")
except Exception as e:
logger.error(f"Error suppressing Plotly warnings: {e}")
def setup_plotly() -> None:
"""
Complete Plotly setup with modern configuration and warning suppression.
This function should be called once during application initialization.
"""
if not PLOTLY_AVAILABLE:
logger.warning("Plotly not available, skipping setup")
return
# First suppress warnings to avoid noise during configuration
suppress_plotly_warnings()
# Then configure with modern API
configure_plotly_defaults()
logger.info("✓ Plotly setup completed")
def get_plotly_config() -> dict[str, Any]:
"""
Get current Plotly configuration for debugging.
Returns:
Dictionary with current Plotly configuration settings
"""
if not PLOTLY_AVAILABLE:
return {"error": "Plotly not available"}
config = {}
try:
# Modern defaults
config["defaults"] = {
"default_format": getattr(pio.defaults, "default_format", "unknown"),
"default_width": getattr(pio.defaults, "default_width", "unknown"),
"default_height": getattr(pio.defaults, "default_height", "unknown"),
"default_scale": getattr(pio.defaults, "default_scale", "unknown"),
}
# Kaleido scope (if available)
if hasattr(pio, "kaleido") and hasattr(pio.kaleido, "scope"):
scope = pio.kaleido.scope
config["kaleido_scope"] = {
"mathjax": getattr(scope, "mathjax", "unknown"),
"plotlyjs": getattr(scope, "plotlyjs", "unknown"),
"configured": getattr(scope, "_configured", False),
}
except Exception as e:
config["error"] = f"Error getting config: {e}"
return config
```
--------------------------------------------------------------------------------
/maverick_mcp/api/routers/technical_ddd.py:
--------------------------------------------------------------------------------
```python
"""
Technical analysis router with Domain-Driven Design.
This is the refactored version that delegates all business logic
to the domain and application layers.
"""
from typing import Any
from fastmcp import FastMCP
from maverick_mcp.api.dependencies.technical_analysis import (
get_technical_analysis_query,
)
from maverick_mcp.utils.logging import get_logger
logger = get_logger("maverick_mcp.routers.technical_ddd")
# Create the technical analysis router
technical_ddd_router: FastMCP = FastMCP("Technical_Analysis_DDD")
async def get_technical_analysis_ddd(
ticker: str,
days: int = 365,
) -> dict[str, Any]:
"""
Get comprehensive technical analysis for a stock using Domain-Driven Design.
This is a thin controller that delegates all business logic to the
application and domain layers, following DDD principles.
Args:
ticker: Stock ticker symbol
days: Number of days of historical data (default: 365)
Returns:
Complete technical analysis with all indicators
"""
try:
# Get the query handler through dependency injection
query = get_technical_analysis_query()
# Execute the query - all business logic is in the domain/application layers
analysis_dto = await query.execute(symbol=ticker, days=days)
# Convert DTO to dict for MCP response
return {
"ticker": ticker,
"analysis": analysis_dto.model_dump(),
"status": "success",
}
except ValueError as e:
logger.warning(f"Invalid input for {ticker}: {str(e)}")
return {
"ticker": ticker,
"error": str(e),
"status": "invalid_input",
}
except Exception as e:
logger.error(f"Error analyzing {ticker}: {str(e)}", exc_info=True)
return {
"ticker": ticker,
"error": "Technical analysis failed",
"status": "error",
}
async def get_rsi_analysis_ddd(
ticker: str,
period: int = 14,
days: int = 365,
) -> dict[str, Any]:
"""
Get RSI analysis using Domain-Driven Design approach.
Args:
ticker: Stock ticker symbol
period: RSI period (default: 14)
days: Number of days of historical data (default: 365)
Returns:
RSI analysis results
"""
try:
# Get query handler
query = get_technical_analysis_query()
# Execute query for RSI only
analysis_dto = await query.execute(
symbol=ticker,
days=days,
indicators=["rsi"],
rsi_period=period,
)
if not analysis_dto.rsi:
return {
"ticker": ticker,
"error": "RSI calculation failed",
"status": "error",
}
return {
"ticker": ticker,
"period": period,
"analysis": analysis_dto.rsi.model_dump(),
"status": "success",
}
except Exception as e:
logger.error(f"Error in RSI analysis for {ticker}: {str(e)}")
return {
"ticker": ticker,
"error": str(e),
"status": "error",
}
async def get_support_resistance_ddd(
ticker: str,
days: int = 365,
) -> dict[str, Any]:
"""
Get support and resistance levels using DDD approach.
Args:
ticker: Stock ticker symbol
days: Number of days of historical data (default: 365)
Returns:
Support and resistance levels
"""
try:
# Get query handler
query = get_technical_analysis_query()
# Execute query
analysis_dto = await query.execute(
symbol=ticker,
days=days,
indicators=[], # No indicators needed, just levels
)
return {
"ticker": ticker,
"current_price": analysis_dto.current_price,
"support_levels": [
{
"price": level.price,
"strength": level.strength,
"distance": level.distance_from_current,
}
for level in analysis_dto.support_levels
],
"resistance_levels": [
{
"price": level.price,
"strength": level.strength,
"distance": level.distance_from_current,
}
for level in analysis_dto.resistance_levels
],
"status": "success",
}
except Exception as e:
logger.error(f"Error in support/resistance analysis for {ticker}: {str(e)}")
return {
"ticker": ticker,
"error": str(e),
"status": "error",
}
```
--------------------------------------------------------------------------------
/maverick_mcp/api/middleware/error_handling.py:
--------------------------------------------------------------------------------
```python
"""
Error handling middleware for FastAPI applications.
This middleware provides centralized error handling, logging, and monitoring
integration for all unhandled exceptions in the API.
"""
import time
import uuid
from collections.abc import Callable
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from maverick_mcp.api.error_handling import handle_api_error
from maverick_mcp.utils.logging import get_logger
from maverick_mcp.utils.monitoring import get_monitoring_service
logger = get_logger(__name__)
monitoring = get_monitoring_service()
class ErrorHandlingMiddleware(BaseHTTPMiddleware):
"""
Middleware to catch and handle all unhandled exceptions.
This middleware:
1. Catches any unhandled exceptions from route handlers
2. Logs errors with full context
3. Sends errors to monitoring (Sentry)
4. Returns structured error responses to clients
5. Adds request IDs for tracing
"""
async def dispatch(self, request: Request, call_next: Callable) -> Response:
"""Process the request and handle any exceptions."""
# Generate request ID
request_id = str(uuid.uuid4())
request.state.request_id = request_id
# Add request ID to response headers
start_time = time.time()
try:
# Add breadcrumb for monitoring
monitoring.add_breadcrumb(
message=f"{request.method} {request.url.path}",
category="request",
level="info",
data={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"query": str(request.url.query),
},
)
# Process the request
response = await call_next(request)
# Add request ID to response headers
response.headers["X-Request-ID"] = request_id
# Log successful request
duration = time.time() - start_time
logger.info(
f"{request.method} {request.url.path} completed",
extra={
"request_id": request_id,
"status_code": response.status_code,
"duration": duration,
},
)
return response
except Exception as exc:
# Calculate request duration
duration = time.time() - start_time
# Log the error
logger.error(
f"Unhandled exception in {request.method} {request.url.path}",
exc_info=True,
extra={
"request_id": request_id,
"duration": duration,
"error_type": type(exc).__name__,
},
)
# Handle the error and get structured response
error_response = handle_api_error(
request,
exc,
context={
"request_id": request_id,
"duration": duration,
},
)
# Add request ID to error response
error_response.headers["X-Request-ID"] = request_id
return error_response
class RequestTracingMiddleware(BaseHTTPMiddleware):
"""
Middleware to add request tracing information.
This middleware adds:
1. Request IDs to all requests
2. User context for authenticated requests
3. Performance tracking
"""
async def dispatch(self, request: Request, call_next: Callable) -> Response:
"""Add tracing context to requests."""
# Check if request ID already exists (from error handling middleware)
if not hasattr(request.state, "request_id"):
request.state.request_id = str(uuid.uuid4())
# Extract user context if available
user_id = None
if hasattr(request.state, "user"):
user_id = getattr(request.state.user, "id", None)
monitoring.set_user_context(user_id)
# Add monitoring context
monitoring.add_breadcrumb(
message="Request context",
category="request",
data={
"request_id": request.state.request_id,
"user_id": user_id,
"path": request.url.path,
},
)
# Process request with monitoring transaction
with monitoring.transaction(
name=f"{request.method} {request.url.path}", op="http.server"
):
response = await call_next(request)
# Clear user context after request
if user_id:
monitoring.set_user_context(None)
return response
```
--------------------------------------------------------------------------------
/maverick_mcp/providers/interfaces/market_data.py:
--------------------------------------------------------------------------------
```python
"""
Market data provider interface.
This module defines the abstract interface for market-wide data operations,
including market indices, gainers/losers, sector performance, and earnings calendar.
"""
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class IMarketDataProvider(Protocol):
"""
Interface for market-wide data operations.
This interface defines the contract for retrieving market overview data,
including indices, top movers, sector performance, and earnings calendar.
"""
async def get_market_summary(self) -> dict[str, Any]:
"""
Get a summary of major market indices.
Returns:
Dictionary with market index data including prices and changes
"""
...
async def get_top_gainers(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get top gaining stocks in the market.
Args:
limit: Maximum number of stocks to return
Returns:
List of dictionaries with stock data for top gainers
"""
...
async def get_top_losers(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get top losing stocks in the market.
Args:
limit: Maximum number of stocks to return
Returns:
List of dictionaries with stock data for top losers
"""
...
async def get_most_active(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get most active stocks by volume.
Args:
limit: Maximum number of stocks to return
Returns:
List of dictionaries with stock data for most active stocks
"""
...
async def get_sector_performance(self) -> dict[str, float]:
"""
Get sector performance data.
Returns:
Dictionary mapping sector names to performance percentages
"""
...
async def get_earnings_calendar(self, days: int = 7) -> list[dict[str, Any]]:
"""
Get upcoming earnings announcements.
Args:
days: Number of days to look ahead
Returns:
List of dictionaries with earnings announcement data
"""
...
async def get_market_overview(self) -> dict[str, Any]:
"""
Get comprehensive market overview including summary, gainers, losers, and sectors.
Returns:
Dictionary with comprehensive market data including:
- market_summary: Index data
- top_gainers: Daily gainers
- top_losers: Daily losers
- sector_performance: Sector data
- timestamp: Data timestamp
"""
...
class MarketDataConfig:
"""
Configuration class for market data providers.
This class encapsulates market data-related configuration parameters
to reduce coupling between providers and configuration sources.
"""
def __init__(
self,
external_api_key: str = "",
tiingo_api_key: str = "",
request_timeout: int = 30,
max_retries: int = 3,
rate_limit_delay: float = 0.1,
default_limit: int = 10,
use_fallback_providers: bool = True,
):
"""
Initialize market data configuration.
Args:
external_api_key: API key for External API service
tiingo_api_key: API key for Tiingo service
request_timeout: Request timeout in seconds
max_retries: Maximum number of retry attempts
rate_limit_delay: Delay between requests in seconds
default_limit: Default number of results to return
use_fallback_providers: Whether to use fallback data sources
"""
self.external_api_key = external_api_key
self.tiingo_api_key = tiingo_api_key
self.request_timeout = request_timeout
self.max_retries = max_retries
self.rate_limit_delay = rate_limit_delay
self.default_limit = default_limit
self.use_fallback_providers = use_fallback_providers
@property
def has_external_api_key(self) -> bool:
"""Check if External API key is configured."""
return bool(self.external_api_key.strip())
@property
def has_tiingo_key(self) -> bool:
"""Check if Tiingo API key is configured."""
return bool(self.tiingo_api_key.strip())
# Market data constants that can be used by implementations
MARKET_INDICES = {
"^GSPC": "S&P 500",
"^DJI": "Dow Jones",
"^IXIC": "NASDAQ",
"^RUT": "Russell 2000",
"^VIX": "VIX",
"^TNX": "10Y Treasury",
}
SECTOR_ETFS = {
"Technology": "XLK",
"Healthcare": "XLV",
"Financials": "XLF",
"Consumer Discretionary": "XLY",
"Industrials": "XLI",
"Energy": "XLE",
"Utilities": "XLU",
"Materials": "XLB",
"Consumer Staples": "XLP",
"Real Estate": "XLRE",
"Communication Services": "XLC",
}
```
--------------------------------------------------------------------------------
/maverick_mcp/api/routers/health.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive health check router for backtesting system.
Provides detailed health monitoring including:
- Component status (database, cache, external APIs)
- Circuit breaker monitoring
- Resource utilization
- Readiness and liveness probes
- Performance metrics
"""
import logging
from datetime import UTC, datetime
from fastapi import APIRouter
from pydantic import BaseModel, Field
from maverick_mcp.config.settings import get_settings
from maverick_mcp.utils.circuit_breaker import get_circuit_breaker_status
logger = logging.getLogger(__name__)
settings = get_settings()
router = APIRouter(prefix="/health", tags=["Health"])
class CircuitBreakerStatus(BaseModel):
"""Circuit breaker status information."""
name: str = Field(description="Circuit breaker name")
state: str = Field(description="Current state (closed/open/half_open)")
failure_count: int = Field(description="Current consecutive failure count")
time_until_retry: float | None = Field(description="Seconds until retry allowed")
metrics: dict = Field(description="Performance metrics")
class HealthStatus(BaseModel):
"""Overall health status."""
status: str = Field(description="Overall health status")
timestamp: str = Field(description="Current timestamp")
version: str = Field(description="Application version")
circuit_breakers: dict[str, CircuitBreakerStatus] = Field(
description="Circuit breaker statuses"
)
services: dict[str, str] = Field(description="External service statuses")
@router.get("/", response_model=HealthStatus)
async def health_check() -> HealthStatus:
"""
Get comprehensive health status including circuit breakers.
Returns:
HealthStatus: Current health information
"""
# Get circuit breaker status
cb_status = get_circuit_breaker_status()
# Convert to response models
circuit_breakers = {}
for name, status in cb_status.items():
circuit_breakers[name] = CircuitBreakerStatus(
name=status["name"],
state=status["state"],
failure_count=status["consecutive_failures"],
time_until_retry=status["time_until_retry"],
metrics=status["metrics"],
)
# Determine overall health
any_open = any(cb["state"] == "open" for cb in cb_status.values())
overall_status = "degraded" if any_open else "healthy"
# Check service statuses based on circuit breakers
services = {
"yfinance": "down"
if cb_status.get("yfinance", {}).get("state") == "open"
else "up",
"finviz": "down"
if cb_status.get("finviz", {}).get("state") == "open"
else "up",
"fred_api": "down"
if cb_status.get("fred_api", {}).get("state") == "open"
else "up",
"external_api": "down"
if cb_status.get("external_api", {}).get("state") == "open"
else "up",
"news_api": "down"
if cb_status.get("news_api", {}).get("state") == "open"
else "up",
}
return HealthStatus(
status=overall_status,
timestamp=datetime.now(UTC).isoformat(),
version=getattr(settings, "version", "0.1.0"),
circuit_breakers=circuit_breakers,
services=services,
)
@router.get("/circuit-breakers", response_model=dict[str, CircuitBreakerStatus])
async def get_circuit_breakers() -> dict[str, CircuitBreakerStatus]:
"""
Get detailed circuit breaker status.
Returns:
Dictionary of circuit breaker statuses
"""
cb_status = get_circuit_breaker_status()
result = {}
for name, status in cb_status.items():
result[name] = CircuitBreakerStatus(
name=status["name"],
state=status["state"],
failure_count=status["consecutive_failures"],
time_until_retry=status["time_until_retry"],
metrics=status["metrics"],
)
return result
@router.post("/circuit-breakers/{name}/reset")
async def reset_circuit_breaker(name: str) -> dict:
"""
Reset a specific circuit breaker.
Args:
name: Circuit breaker name
Returns:
Success response
"""
from maverick_mcp.utils.circuit_breaker import get_circuit_breaker
breaker = get_circuit_breaker(name)
if not breaker:
return {"status": "error", "message": f"Circuit breaker '{name}' not found"}
breaker.reset()
logger.info(f"Circuit breaker '{name}' reset via API")
return {"status": "success", "message": f"Circuit breaker '{name}' reset"}
@router.post("/circuit-breakers/reset-all")
async def reset_all_circuit_breakers() -> dict:
"""
Reset all circuit breakers.
Returns:
Success response
"""
from maverick_mcp.utils.circuit_breaker import reset_all_circuit_breakers
reset_all_circuit_breakers()
logger.info("All circuit breakers reset via API")
return {"status": "success", "message": "All circuit breakers reset"}
```
--------------------------------------------------------------------------------
/maverick_mcp/tests/test_server.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for the Maverick-MCP server.
"""
import json
import os
import subprocess
import time
import unittest
from typing import Any
import pytest
import requests
# Constants
SERVER_URL = "http://localhost:8000"
SERVER_START_TIMEOUT = 10 # seconds
@pytest.mark.integration
class TestMaverickMCPServer(unittest.TestCase):
"""Integration tests for the Maverick-MCP server."""
process: subprocess.Popen[bytes] | None = None
@classmethod
def setUpClass(cls):
"""Start the server before running tests."""
# Skip server startup if USE_RUNNING_SERVER environment variable is set
cls.process = None
if os.environ.get("USE_RUNNING_SERVER") != "1":
print("Starting Maverick-MCP server...")
# Start the server as a subprocess
cls.process = subprocess.Popen(
["python", "-m", "maverick_mcp.api.server"],
# Redirect stdout and stderr to prevent output in test results
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Wait for the server to start
start_time = time.time()
while time.time() - start_time < SERVER_START_TIMEOUT:
try:
response = requests.get(f"{SERVER_URL}/health")
if response.status_code == 200:
print("Server started successfully")
break
except requests.exceptions.ConnectionError:
pass
time.sleep(0.5)
else:
# If the server didn't start within the timeout, kill it and fail
cls.tearDownClass()
raise TimeoutError("Server did not start within the timeout period")
@classmethod
def tearDownClass(cls):
"""Stop the server after tests are done."""
if cls.process:
print("Stopping Maverick-MCP server...")
# Send SIGTERM signal to the process
cls.process.terminate()
try:
# Wait for the process to terminate
cls.process.wait(timeout=5)
except subprocess.TimeoutExpired:
# If the process doesn't terminate within 5 seconds, kill it
cls.process.kill()
cls.process.wait()
def test_health_endpoint(self):
"""Test the health endpoint."""
response = requests.get(f"{SERVER_URL}/health")
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data["status"], "ok")
# Version should be present
self.assertIn("version", data)
def test_mcp_endpoint(self):
"""Test the MCP endpoint."""
# This is a simple request to test if the MCP endpoint is responding
sse_url = f"{SERVER_URL}/sse"
response = requests.get(sse_url)
# Just check that the endpoint exists and responds with success
self.assertIn(
response.status_code, [200, 405]
) # 200 OK or 405 Method Not Allowed
def send_mcp_request(self, method: str, params: list[Any]) -> dict[str, Any]:
"""
Send a request to the MCP server.
Args:
method: The method name
params: The parameters for the method
Returns:
The response from the server
"""
request_body = {"jsonrpc": "2.0", "id": 1, "method": method, "params": params}
response = requests.post(
f"{SERVER_URL}/messages/",
json=request_body,
headers={"Content-Type": "application/json"},
)
# Check that the request was successful
self.assertEqual(response.status_code, 200)
# Parse the response
data = response.json()
# Check that the response is valid JSON-RPC
self.assertEqual(data["jsonrpc"], "2.0")
self.assertEqual(data["id"], 1)
return data # type: ignore[no-any-return]
def test_fetch_stock_data(self):
"""Test the fetch_stock_data tool."""
# Send a request to fetch stock data for a known symbol (AAPL)
response_data = self.send_mcp_request("fetch_stock_data", ["AAPL"])
# Check that the result is present and contains stock data
self.assertIn("result", response_data)
result = response_data["result"]
# Parse the result as JSON
stock_data = json.loads(result)
# Check that the stock data contains the expected fields
self.assertIn("index", stock_data)
self.assertIn("columns", stock_data)
self.assertIn("data", stock_data)
# Check that the columns include OHLCV
for column in ["open", "high", "low", "close", "volume"]:
self.assertIn(
column.lower(), [col.lower() for col in stock_data["columns"]]
)
# Run the tests if this script is executed directly
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/examples/timeout_fix_demonstration.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Demonstration of Search Provider Timeout Fixes
This script shows how the timeout issues identified by the debugger subagent have been resolved:
BEFORE (Issues):
- Complex queries failed at exactly 10 seconds
- Circuit breakers were too aggressive (5 failures = disabled)
- No distinction between timeout and other failure types
- Budget allocation wasn't optimal
AFTER (Fixed):
- Complex queries get up to 25 seconds
- Circuit breakers are more tolerant (8 failures, faster recovery)
- Timeout failures have separate, higher threshold (12 vs 6)
- Better budget allocation with minimum timeout protection
"""
import sys
from pathlib import Path
from maverick_mcp.agents.deep_research import WebSearchProvider
from maverick_mcp.config.settings import get_settings
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
def demonstrate_timeout_improvements():
"""Show the specific improvements made to resolve timeout issues."""
print("🐛 SEARCH PROVIDER TIMEOUT FIXES")
print("=" * 50)
# Create test provider to demonstrate calculations
class DemoProvider(WebSearchProvider):
async def search(self, query, num_results=10, timeout_budget=None):
return []
provider = DemoProvider(api_key="demo")
settings = get_settings()
# The problematic query from the debugger report
complex_query = "Google Microsoft OpenAI AI services competition revenue market share 2024 2025 growth forecast Claude Gemini GPT"
print("🔍 COMPLEX QUERY EXAMPLE:")
print(f" Query: {complex_query}")
print(f" Words: {len(complex_query.split())}")
# Show timeout calculation
timeout = provider._calculate_timeout(complex_query)
print(f" ✅ NEW Timeout: {timeout:.1f}s (was 10s → now 25s)")
# Show budget scenarios
tight_budget_timeout = provider._calculate_timeout(
complex_query, timeout_budget=15.0
)
good_budget_timeout = provider._calculate_timeout(
complex_query, timeout_budget=50.0
)
print(f" ✅ With 15s budget: {tight_budget_timeout:.1f}s (min 8s protection)")
print(f" ✅ With 50s budget: {good_budget_timeout:.1f}s (full 25s)")
print("\n📊 FAILURE TOLERANCE IMPROVEMENTS:")
# Show tolerance thresholds
timeout_threshold = getattr(
settings.performance, "search_timeout_failure_threshold", 12
)
circuit_threshold = getattr(
settings.performance, "search_circuit_breaker_failure_threshold", 8
)
circuit_recovery = getattr(
settings.performance, "search_circuit_breaker_recovery_timeout", 30
)
print(f" ✅ Timeout failures before disable: {timeout_threshold} (was 3)")
print(f" ✅ Circuit breaker threshold: {circuit_threshold} (was 5)")
print(f" ✅ Circuit breaker recovery: {circuit_recovery}s (was 60s)")
print("\n🎯 KEY FIXES SUMMARY:")
print(" ✅ Complex queries (9+ words): 25s timeout instead of 10s")
print(" ✅ Medium queries (4-8 words): 17s timeout instead of 10s")
print(" ✅ Minimum timeout protection: Never below 8s for complex queries")
print(" ✅ Budget efficiency: 85% allocation (was 80%)")
print(" ✅ Timeout-specific tolerance: 12 failures (was 3)")
print(" ✅ Search circuit breakers: 8 failures, 30s recovery")
print("\n🔬 TECHNICAL DETAILS:")
print(" • Timeout calculation is adaptive based on query complexity")
print(" • Budget constraints respect minimum timeout requirements")
print(" • Separate failure tracking for timeout vs other errors")
print(" • Circuit breakers tuned specifically for search operations")
print(" • Enhanced debug logging for troubleshooting")
def show_before_after_comparison():
"""Show specific before/after comparisons for the identified issues."""
print("\n📋 BEFORE vs AFTER COMPARISON")
print("=" * 50)
test_cases = [
("AAPL", "Simple 1-word query"),
("Google Microsoft OpenAI competition", "Medium 4-word query"),
(
"Google Microsoft OpenAI AI services competition revenue market share 2024 2025 growth forecast",
"Complex 13-word query",
),
]
for query, description in test_cases:
words = len(query.split())
# Calculate OLD timeout (all queries got 10s)
old_timeout = 10.0
# Calculate NEW timeout
provider = WebSearchProvider(api_key="demo")
new_timeout = provider._calculate_timeout(query)
improvement = "🟰" if old_timeout == new_timeout else "📈"
print(f" {improvement} {description} ({words} words):")
print(f" BEFORE: {old_timeout:.1f}s | AFTER: {new_timeout:.1f}s")
if __name__ == "__main__":
demonstrate_timeout_improvements()
show_before_after_comparison()
print("\n✅ The search provider timeout issues have been fully resolved!")
print(
" Complex queries like the 15-word example will now get 25s instead of failing at 10s."
)
```
--------------------------------------------------------------------------------
/maverick_mcp/validation/data.py:
--------------------------------------------------------------------------------
```python
"""
Validation models for data-related tools.
This module provides Pydantic models for validating inputs
to all data fetching and caching tools.
"""
from pydantic import Field, field_validator, model_validator
from .base import (
DateRangeMixin,
DateString,
DateValidator,
StrictBaseModel,
TickerSymbol,
TickerValidator,
)
class FetchStockDataRequest(StrictBaseModel, DateRangeMixin):
"""Validation for fetch_stock_data tool."""
ticker: TickerSymbol = Field(
...,
description="Stock ticker symbol (e.g., AAPL, MSFT)",
json_schema_extra={"examples": ["AAPL", "MSFT", "GOOGL"]},
)
@field_validator("ticker")
@classmethod
def normalize_ticker(cls, v: str) -> str:
"""Normalize ticker to uppercase."""
return TickerValidator.validate_ticker(v)
model_config = {
"json_schema_extra": {
"examples": [
{
"ticker": "AAPL",
"start_date": "2024-01-01",
"end_date": "2024-12-31",
},
{"ticker": "MSFT"},
]
}
}
class StockDataBatchRequest(StrictBaseModel, DateRangeMixin):
"""Validation for fetch_stock_data_batch tool."""
tickers: list[TickerSymbol] = Field(
...,
min_length=1,
max_length=50,
description="List of ticker symbols (max 50)",
json_schema_extra={"examples": [["AAPL", "MSFT", "GOOGL"]]},
)
@field_validator("tickers")
@classmethod
def validate_tickers(cls, v: list[str]) -> list[str]:
"""Validate and normalize ticker list."""
return TickerValidator.validate_ticker_list(v)
model_config = {
"json_schema_extra": {
"examples": [
{"tickers": ["AAPL", "MSFT", "GOOGL"], "start_date": "2024-01-01"},
{
"tickers": ["SPY", "QQQ", "IWM"],
"start_date": "2024-06-01",
"end_date": "2024-12-31",
},
]
}
}
class GetStockInfoRequest(StrictBaseModel):
"""Validation for get_stock_info tool."""
ticker: TickerSymbol = Field(
..., description="Stock ticker symbol", json_schema_extra={"examples": ["AAPL"]}
)
@field_validator("ticker")
@classmethod
def normalize_ticker(cls, v: str) -> str:
"""Normalize ticker to uppercase."""
return TickerValidator.validate_ticker(v)
class GetNewsRequest(StrictBaseModel):
"""Validation for get_news_sentiment tool."""
ticker: TickerSymbol = Field(
..., description="Stock ticker symbol", json_schema_extra={"examples": ["AAPL"]}
)
limit: int = Field(
default=10,
ge=1,
le=100,
description="Maximum number of news articles to return",
json_schema_extra={"examples": [10, 20, 50]},
)
@field_validator("ticker")
@classmethod
def normalize_ticker(cls, v: str) -> str:
"""Normalize ticker to uppercase."""
return TickerValidator.validate_ticker(v)
class GetChartLinksRequest(StrictBaseModel):
"""Validation for get_chart_links tool."""
ticker: TickerSymbol = Field(
..., description="Stock ticker symbol", json_schema_extra={"examples": ["AAPL"]}
)
@field_validator("ticker")
@classmethod
def normalize_ticker(cls, v: str) -> str:
"""Normalize ticker to uppercase."""
return TickerValidator.validate_ticker(v)
class CachedPriceDataRequest(StrictBaseModel):
"""Validation for get_cached_price_data tool."""
ticker: TickerSymbol = Field(..., description="Stock ticker symbol")
start_date: DateString = Field(..., description="Start date in YYYY-MM-DD format")
end_date: DateString | None = Field(
default=None, description="End date in YYYY-MM-DD format (defaults to today)"
)
@field_validator("ticker")
@classmethod
def normalize_ticker(cls, v: str) -> str:
"""Normalize ticker to uppercase."""
return TickerValidator.validate_ticker(v)
@field_validator("start_date", "end_date")
@classmethod
def validate_date(cls, v: str | None) -> str | None:
"""Validate date format."""
if v is not None:
DateValidator.validate_date_string(v)
return v
@model_validator(mode="after")
def validate_date_range(self):
"""Ensure end_date is after start_date."""
if self.end_date is not None:
DateValidator.validate_date_range(self.start_date, self.end_date)
return self
class ClearCacheRequest(StrictBaseModel):
"""Validation for clear_cache tool."""
ticker: TickerSymbol | None = Field(
default=None, description="Specific ticker to clear (None to clear all)"
)
@field_validator("ticker")
@classmethod
def normalize_ticker(cls, v: str | None) -> str | None:
"""Normalize ticker to uppercase if provided."""
if v is not None:
return TickerValidator.validate_ticker(v)
return v
```
--------------------------------------------------------------------------------
/maverick_mcp/validation/base.py:
--------------------------------------------------------------------------------
```python
"""
Base validation models and common validators for Maverick-MCP.
This module provides base classes and common validation functions
used across all validation models.
"""
import re
from datetime import UTC, datetime
from typing import Annotated
from pydantic import BaseModel, ConfigDict, Field, field_validator
from maverick_mcp.config.settings import get_settings
settings = get_settings()
# Custom type annotations
TickerSymbol = Annotated[
str,
Field(
min_length=settings.validation.min_symbol_length,
max_length=settings.validation.max_symbol_length,
pattern=r"^[A-Z0-9\-\.]{1,10}$",
description="Stock ticker symbol (e.g., AAPL, BRK.B, SPY)",
),
]
DateString = Annotated[
str, Field(pattern=r"^\d{4}-\d{2}-\d{2}$", description="Date in YYYY-MM-DD format")
]
PositiveInt = Annotated[int, Field(gt=0, description="Positive integer value")]
PositiveFloat = Annotated[float, Field(gt=0.0, description="Positive float value")]
Percentage = Annotated[
float, Field(ge=0.0, le=100.0, description="Percentage value (0-100)")
]
class StrictBaseModel(BaseModel):
"""
Base model with strict validation settings.
- Forbids extra fields
- Validates on assignment
- Uses strict mode for type coercion
"""
model_config = ConfigDict(
extra="forbid",
validate_assignment=True,
strict=True,
str_strip_whitespace=True,
json_schema_extra={"examples": []},
)
class TickerValidator:
"""Common ticker validation methods."""
@staticmethod
def validate_ticker(value: str) -> str:
"""Validate and normalize ticker symbol."""
# Convert to uppercase
ticker = value.upper().strip()
# Check pattern
pattern = f"^[A-Z0-9\\-\\.]{{1,{settings.validation.max_symbol_length}}}$"
if not re.match(pattern, ticker):
raise ValueError(
f"Invalid ticker symbol: {value}. "
f"Must be {settings.validation.min_symbol_length}-{settings.validation.max_symbol_length} characters, alphanumeric with optional . or -"
)
return ticker
@staticmethod
def validate_ticker_list(values: list[str]) -> list[str]:
"""Validate and normalize a list of tickers."""
if not values:
raise ValueError("At least one ticker symbol is required")
# Remove duplicates while preserving order
seen = set()
unique_tickers = []
for ticker in values:
normalized = TickerValidator.validate_ticker(ticker)
if normalized not in seen:
seen.add(normalized)
unique_tickers.append(normalized)
return unique_tickers
class DateValidator:
"""Common date validation methods."""
@staticmethod
def validate_date_string(value: str) -> str:
"""Validate date string format."""
try:
datetime.strptime(value, "%Y-%m-%d")
except ValueError:
raise ValueError(f"Invalid date format: {value}. Must be YYYY-MM-DD")
return value
@staticmethod
def validate_date_range(start_date: str, end_date: str) -> tuple[str, str]:
"""Validate that end_date is after start_date."""
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
if end < start:
raise ValueError(
f"End date ({end_date}) must be after start date ({start_date})"
)
# Check dates aren't too far in the future
today = datetime.now(UTC).date()
if end.date() > today:
raise ValueError(f"End date ({end_date}) cannot be in the future")
return start_date, end_date
class PaginationMixin(BaseModel):
"""Mixin for pagination parameters."""
limit: PositiveInt = Field(
default=20, le=100, description="Maximum number of results to return"
)
offset: int = Field(default=0, ge=0, description="Number of results to skip")
class DateRangeMixin(BaseModel):
"""Mixin for date range parameters."""
start_date: DateString | None = Field(
default=None, description="Start date in YYYY-MM-DD format"
)
end_date: DateString | None = Field(
default=None, description="End date in YYYY-MM-DD format"
)
@field_validator("end_date")
@classmethod
def validate_date_range(cls, v: str | None, info) -> str | None:
"""Ensure end_date is after start_date if both are provided."""
if v is None:
return v
start = info.data.get("start_date")
if start is not None:
DateValidator.validate_date_range(start, v)
return v
class BaseRequest(BaseModel):
"""Base class for all API request models."""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
extra="forbid",
)
class BaseResponse(BaseModel):
"""Base class for all API response models."""
model_config = ConfigDict(
validate_assignment=True,
use_enum_values=True,
)
```
--------------------------------------------------------------------------------
/alembic/versions/f0696e2cac15_add_essential_performance_indexes.py:
--------------------------------------------------------------------------------
```python
"""Add essential performance indexes
Revision ID: f0696e2cac15
Revises: 007_enhance_audit_logging
Create Date: 2025-06-25 17:28:38.473307
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "f0696e2cac15"
down_revision = "007_enhance_audit_logging"
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Add essential performance indexes for existing tables only."""
print("Creating essential performance indexes...")
# 1. Stock data performance indexes (for large stocks_pricecache table)
try:
op.create_index(
"idx_stocks_pricecache_stock_date",
"stocks_pricecache",
["stock_id", "date"],
postgresql_using="btree",
if_not_exists=True,
)
print("✓ Created stock price cache index")
except Exception as e:
print(f"Warning: Could not create stock price cache index: {e}")
# 2. Stock lookup optimization
try:
op.execute(
"CREATE INDEX IF NOT EXISTS idx_stocks_stock_ticker_lower "
"ON stocks_stock (LOWER(ticker_symbol))"
)
print("✓ Created case-insensitive ticker lookup index")
except Exception as e:
print(f"Warning: Could not create ticker lookup index: {e}")
# 3. MCP API keys performance (critical for authentication)
try:
op.create_index(
"idx_mcp_api_keys_active_lookup",
"mcp_api_keys",
["is_active", "expires_at"],
postgresql_using="btree",
if_not_exists=True,
)
print("✓ Created API keys performance index")
except Exception as e:
print(f"Warning: Could not create API keys index: {e}")
# 4. Requests tracking performance
try:
op.create_index(
"idx_mcp_requests_user_time",
"mcp_requests",
["user_id", sa.text("created_at DESC")],
postgresql_using="btree",
if_not_exists=True,
)
print("✓ Created requests tracking index")
except Exception as e:
print(f"Warning: Could not create requests index: {e}")
# 5. Auth audit log performance
try:
op.create_index(
"idx_mcp_auth_audit_log_user_time",
"mcp_auth_audit_log",
["user_id", sa.text("created_at DESC")],
postgresql_using="btree",
if_not_exists=True,
)
print("✓ Created auth audit log index")
except Exception as e:
print(f"Warning: Could not create auth audit index: {e}")
# 6. Screening tables performance (if they exist)
try:
op.create_index(
"idx_maverick_stocks_combined_score",
"maverick_stocks",
[sa.text('"COMBINED_SCORE" DESC')],
postgresql_using="btree",
if_not_exists=True,
)
print("✓ Created maverick stocks performance index")
except Exception as e:
print(f"Warning: Could not create maverick stocks index: {e}")
try:
op.create_index(
"idx_maverick_bear_stocks_score",
"maverick_bear_stocks",
[sa.text('"SCORE" DESC')],
postgresql_using="btree",
if_not_exists=True,
)
print("✓ Created maverick bear stocks performance index")
except Exception as e:
print(f"Warning: Could not create maverick bear stocks index: {e}")
try:
op.create_index(
"idx_supply_demand_breakouts_rs_rating",
"supply_demand_breakouts",
[sa.text('"RS_RATING" DESC')],
postgresql_using="btree",
if_not_exists=True,
)
print("✓ Created supply/demand breakouts performance index")
except Exception as e:
print(f"Warning: Could not create supply/demand breakouts index: {e}")
print("Essential performance indexes creation completed!")
def downgrade() -> None:
"""Remove essential performance indexes."""
print("Removing essential performance indexes...")
# Remove indexes (order doesn't matter for drops)
indexes_to_drop = [
("idx_stocks_pricecache_stock_date", "stocks_pricecache"),
("idx_mcp_api_keys_active_lookup", "mcp_api_keys"),
("idx_mcp_requests_user_time", "mcp_requests"),
("idx_mcp_auth_audit_log_user_time", "mcp_auth_audit_log"),
("idx_maverick_stocks_combined_score", "maverick_stocks"),
("idx_maverick_bear_stocks_score", "maverick_bear_stocks"),
("idx_supply_demand_breakouts_rs_rating", "supply_demand_breakouts"),
]
for index_name, table_name in indexes_to_drop:
try:
op.drop_index(index_name, table_name, if_exists=True)
print(f"✓ Dropped {index_name}")
except Exception as e:
print(f"Warning: Could not drop {index_name}: {e}")
# Drop special indexes
try:
op.execute("DROP INDEX IF EXISTS idx_stocks_stock_ticker_lower")
print("✓ Dropped ticker lookup index")
except Exception as e:
print(f"Warning: Could not drop ticker lookup index: {e}")
print("Essential performance indexes removal completed!")
```
--------------------------------------------------------------------------------
/maverick_mcp/memory/stores.py:
--------------------------------------------------------------------------------
```python
"""
Memory stores for agent conversations and user data.
"""
import logging
from datetime import datetime, timedelta
from typing import Any
logger = logging.getLogger(__name__)
class MemoryStore:
"""Base class for memory storage."""
def __init__(self, ttl_hours: float = 24.0):
self.ttl_hours = ttl_hours
self.store: dict[str, dict[str, Any]] = {}
def set(self, key: str, value: Any, ttl_hours: float | None = None) -> None:
"""Store a value with optional custom TTL."""
ttl = ttl_hours or self.ttl_hours
expiry = datetime.now() + timedelta(hours=ttl)
self.store[key] = {
"value": value,
"expiry": expiry.isoformat(),
"created": datetime.now().isoformat(),
}
def get(self, key: str) -> Any | None:
"""Get a value if not expired."""
if key not in self.store:
return None
entry = self.store[key]
expiry = datetime.fromisoformat(entry["expiry"])
if datetime.now() > expiry:
del self.store[key]
return None
return entry["value"]
def delete(self, key: str) -> None:
"""Delete a value."""
if key in self.store:
del self.store[key]
def clear_expired(self) -> int:
"""Clear all expired entries."""
now = datetime.now()
expired_keys = []
for key, entry in self.store.items():
if now > datetime.fromisoformat(entry["expiry"]):
expired_keys.append(key)
for key in expired_keys:
del self.store[key]
return len(expired_keys)
class ConversationStore(MemoryStore):
"""Store for conversation-specific data."""
def save_analysis(
self, session_id: str, symbol: str, analysis_type: str, data: dict[str, Any]
) -> None:
"""Save analysis results for a conversation."""
key = f"{session_id}:analysis:{symbol}:{analysis_type}"
analysis_record = {
"symbol": symbol,
"type": analysis_type,
"data": data,
"timestamp": datetime.now().isoformat(),
}
self.set(key, analysis_record)
def get_analysis(
self, session_id: str, symbol: str, analysis_type: str
) -> dict[str, Any] | None:
"""Get cached analysis for a symbol."""
key = f"{session_id}:analysis:{symbol}:{analysis_type}"
return self.get(key)
def save_context(self, session_id: str, context_type: str, data: Any) -> None:
"""Save conversation context."""
key = f"{session_id}:context:{context_type}"
self.set(key, data)
def get_context(self, session_id: str, context_type: str) -> Any | None:
"""Get conversation context."""
key = f"{session_id}:context:{context_type}"
return self.get(key)
def list_analyses(self, session_id: str) -> list[dict[str, Any]]:
"""List all analyses for a session."""
analyses = []
prefix = f"{session_id}:analysis:"
for key, entry in self.store.items():
if key.startswith(prefix):
analyses.append(entry["value"])
return analyses
class UserMemoryStore(MemoryStore):
"""Store for user-specific long-term memory."""
def __init__(self, ttl_hours: float = 168.0): # 1 week default
super().__init__(ttl_hours)
def save_preference(self, user_id: str, preference_type: str, value: Any) -> None:
"""Save user preference."""
key = f"user:{user_id}:pref:{preference_type}"
self.set(key, value, ttl_hours=self.ttl_hours * 4) # Longer TTL for preferences
def get_preference(self, user_id: str, preference_type: str) -> Any | None:
"""Get user preference."""
key = f"user:{user_id}:pref:{preference_type}"
return self.get(key)
def save_trade_history(self, user_id: str, trade: dict[str, Any]) -> None:
"""Save trade to history."""
key = f"user:{user_id}:trades"
trades = self.get(key) or []
trades.append({**trade, "timestamp": datetime.now().isoformat()})
# Keep last 100 trades
trades = trades[-100:]
self.set(key, trades)
def get_trade_history(self, user_id: str, limit: int = 50) -> list[dict[str, Any]]:
"""Get user's trade history."""
key = f"user:{user_id}:trades"
trades = self.get(key) or []
return trades[-limit:]
def save_watchlist(self, user_id: str, symbols: list[str]) -> None:
"""Save user's watchlist."""
key = f"user:{user_id}:watchlist"
self.set(key, symbols)
def get_watchlist(self, user_id: str) -> list[str]:
"""Get user's watchlist."""
key = f"user:{user_id}:watchlist"
return self.get(key) or []
def update_risk_profile(self, user_id: str, profile: dict[str, Any]) -> None:
"""Update user's risk profile."""
key = f"user:{user_id}:risk_profile"
self.set(key, profile, ttl_hours=self.ttl_hours * 4)
def get_risk_profile(self, user_id: str) -> dict[str, Any] | None:
"""Get user's risk profile."""
key = f"user:{user_id}:risk_profile"
return self.get(key)
```
--------------------------------------------------------------------------------
/tools/templates/test_template.py:
--------------------------------------------------------------------------------
```python
"""
Template for creating new test files.
Copy this file and modify it to create new tests quickly.
"""
from datetime import UTC, datetime
from unittest.mock import Mock, patch
import pytest
# Import what you're testing
# from maverick_mcp.your_module import YourClass, your_function
class TestYourClass:
"""Test suite for YourClass."""
@pytest.fixture
def mock_dependencies(self):
"""Set up common mocks for tests."""
with patch("maverick_mcp.your_module.external_dependency") as mock_dep:
mock_dep.return_value = Mock()
yield {
"dependency": mock_dep,
}
@pytest.fixture
def sample_data(self):
"""Provide sample test data."""
return {
"id": 1,
"name": "Test Item",
"value": 42.0,
"created_at": datetime.now(UTC),
}
def test_initialization(self):
"""Test class initialization."""
# obj = YourClass(param1="value1", param2=42)
# assert obj.param1 == "value1"
# assert obj.param2 == 42
pass
def test_method_success(self, mock_dependencies, sample_data):
"""Test successful method execution."""
# Arrange
# obj = YourClass()
# mock_dependencies["dependency"].some_method.return_value = "expected"
# Act
# result = obj.your_method(sample_data)
# Assert
# assert result == "expected"
# mock_dependencies["dependency"].some_method.assert_called_once_with(sample_data)
pass
def test_method_validation_error(self):
"""Test method with invalid input."""
# obj = YourClass()
# with pytest.raises(ValueError, match="Invalid input"):
# obj.your_method(None)
pass
@pytest.mark.asyncio
async def test_async_method(self, mock_dependencies):
"""Test asynchronous method."""
# Arrange
# obj = YourClass()
# mock_dependencies["dependency"].async_method = AsyncMock(return_value="async_result")
# Act
# result = await obj.async_method()
# Assert
# assert result == "async_result"
pass
class TestYourFunction:
"""Test suite for standalone functions."""
def test_function_basic(self):
"""Test basic function behavior."""
# result = your_function("input")
# assert result == "expected_output"
pass
def test_function_edge_cases(self):
"""Test edge cases."""
# Test empty input
# assert your_function("") == ""
# Test None input
# with pytest.raises(TypeError):
# your_function(None)
# Test large input
# large_input = "x" * 10000
# assert len(your_function(large_input)) <= 10000
pass
@pytest.mark.parametrize(
"input_value,expected",
[
("test1", "result1"),
("test2", "result2"),
("", ""),
("special!@#", "special"),
],
)
def test_function_parametrized(self, input_value, expected):
"""Test function with multiple inputs."""
# result = your_function(input_value)
# assert result == expected
pass
class TestIntegration:
"""Integration tests (marked for optional execution)."""
@pytest.mark.integration
def test_database_integration(self, db_session):
"""Test database operations."""
# This test requires a real database connection
# from maverick_mcp.your_module import create_item, get_item
# # Create
# item = create_item(db_session, name="Test", value=42)
# assert item.id is not None
# # Read
# retrieved = get_item(db_session, item.id)
# assert retrieved.name == "Test"
# assert retrieved.value == 42
pass
@pytest.mark.integration
@pytest.mark.asyncio
async def test_external_api_integration(self):
"""Test external API calls."""
# This test makes real API calls
# from maverick_mcp.your_module import fetch_external_data
# result = await fetch_external_data("AAPL")
# assert result is not None
# assert "price" in result
pass
# Fixtures that can be reused across tests
@pytest.fixture
def mock_redis():
"""Mock Redis client."""
with patch("maverick_mcp.data.cache.get_redis_client") as mock:
redis_mock = Mock()
redis_mock.get.return_value = None
redis_mock.set.return_value = True
mock.return_value = redis_mock
yield redis_mock
@pytest.fixture
def mock_settings():
"""Mock settings for testing."""
with patch("maverick_mcp.config.settings.settings") as mock:
mock.auth.enabled = False
mock.api.debug = True
yield mock
# Performance tests (optional)
@pytest.mark.slow
class TestPerformance:
"""Performance tests (excluded by default)."""
def test_large_dataset_processing(self):
"""Test processing of large datasets."""
# import time
# from maverick_mcp.your_module import process_data
# large_data = list(range(1_000_000))
# start = time.time()
# result = process_data(large_data)
# duration = time.time() - start
# assert len(result) == 1_000_000
# assert duration < 1.0 # Should complete in under 1 second
pass
```
--------------------------------------------------------------------------------
/maverick_mcp/providers/mocks/mock_macro_data.py:
--------------------------------------------------------------------------------
```python
"""
Mock macro data provider implementation for testing.
"""
from datetime import datetime
from typing import Any
class MockMacroDataProvider:
"""
Mock implementation of IMacroDataProvider for testing.
"""
def __init__(self, test_data: dict[str, Any] | None = None):
"""
Initialize the mock macro data provider.
Args:
test_data: Optional test data to return
"""
self._test_data = test_data or {}
self._call_log: list[dict[str, Any]] = []
async def get_gdp_growth_rate(self) -> dict[str, Any]:
"""Get mock GDP growth rate."""
self._log_call("get_gdp_growth_rate", {})
if "gdp_growth_rate" in self._test_data:
return self._test_data["gdp_growth_rate"]
return {
"current": 2.5,
"previous": 2.3,
}
async def get_unemployment_rate(self) -> dict[str, Any]:
"""Get mock unemployment rate."""
self._log_call("get_unemployment_rate", {})
if "unemployment_rate" in self._test_data:
return self._test_data["unemployment_rate"]
return {
"current": 3.8,
"previous": 3.9,
}
async def get_inflation_rate(self) -> dict[str, Any]:
"""Get mock inflation rate."""
self._log_call("get_inflation_rate", {})
if "inflation_rate" in self._test_data:
return self._test_data["inflation_rate"]
return {
"current": 3.2,
"previous": 3.4,
"bounds": (1.5, 6.8),
}
async def get_vix(self) -> float | None:
"""Get mock VIX data."""
self._log_call("get_vix", {})
if "vix" in self._test_data:
return self._test_data["vix"]
return 18.5
async def get_sp500_performance(self) -> float:
"""Get mock S&P 500 performance."""
self._log_call("get_sp500_performance", {})
if "sp500_performance" in self._test_data:
return self._test_data["sp500_performance"]
return 1.25
async def get_nasdaq_performance(self) -> float:
"""Get mock NASDAQ performance."""
self._log_call("get_nasdaq_performance", {})
if "nasdaq_performance" in self._test_data:
return self._test_data["nasdaq_performance"]
return 1.85
async def get_sp500_momentum(self) -> float:
"""Get mock S&P 500 momentum."""
self._log_call("get_sp500_momentum", {})
if "sp500_momentum" in self._test_data:
return self._test_data["sp500_momentum"]
return 0.75
async def get_nasdaq_momentum(self) -> float:
"""Get mock NASDAQ momentum."""
self._log_call("get_nasdaq_momentum", {})
if "nasdaq_momentum" in self._test_data:
return self._test_data["nasdaq_momentum"]
return 1.15
async def get_usd_momentum(self) -> float:
"""Get mock USD momentum."""
self._log_call("get_usd_momentum", {})
if "usd_momentum" in self._test_data:
return self._test_data["usd_momentum"]
return -0.35
async def get_macro_statistics(self) -> dict[str, Any]:
"""Get mock comprehensive macro statistics."""
self._log_call("get_macro_statistics", {})
if "macro_statistics" in self._test_data:
return self._test_data["macro_statistics"]
return {
"gdp_growth_rate": 2.5,
"gdp_growth_rate_previous": 2.3,
"unemployment_rate": 3.8,
"unemployment_rate_previous": 3.9,
"inflation_rate": 3.2,
"inflation_rate_previous": 3.4,
"sp500_performance": 1.25,
"nasdaq_performance": 1.85,
"vix": 18.5,
"sentiment_score": 65.5,
"historical_data": self._generate_mock_historical_data(),
}
async def get_historical_data(self) -> dict[str, Any]:
"""Get mock historical data."""
self._log_call("get_historical_data", {})
if "historical_data" in self._test_data:
return self._test_data["historical_data"]
return self._generate_mock_historical_data()
def _generate_mock_historical_data(self) -> dict[str, Any]:
"""Generate mock historical data for indicators."""
return {
"sp500_performance": [1.0, 1.1, 1.2, 1.25, 1.3],
"nasdaq_performance": [1.5, 1.6, 1.7, 1.8, 1.85],
"vix": [20.0, 19.5, 18.8, 18.2, 18.5],
"gdp_growth_rate": [2.1, 2.2, 2.3, 2.4, 2.5],
"unemployment_rate": [4.2, 4.1, 4.0, 3.9, 3.8],
"inflation_rate": [3.8, 3.6, 3.5, 3.4, 3.2],
}
# Testing utilities
def _log_call(self, method: str, args: dict[str, Any]) -> None:
"""Log method calls for testing verification."""
self._call_log.append(
{
"method": method,
"args": args,
"timestamp": datetime.now(),
}
)
def get_call_log(self) -> list[dict[str, Any]]:
"""Get the log of method calls."""
return self._call_log.copy()
def clear_call_log(self) -> None:
"""Clear the method call log."""
self._call_log.clear()
def set_test_data(self, key: str, data: Any) -> None:
"""Set test data for a specific key."""
self._test_data[key] = data
def clear_test_data(self) -> None:
"""Clear all test data."""
self._test_data.clear()
```