This is page 1 of 3. Use http://codebase.md/calvernaz/alphavantage?page={x} to view the full context. # Directory Structure ``` ├── .bumpversion.cfg ├── .github │ ├── FUNDING.yml │ └── workflows │ └── publish.yml ├── .gitignore ├── .python-version ├── CONTRIBUTING.md ├── deploy │ └── aws-stateless-mcp-lambda │ ├── .aws-sam │ │ └── build.toml │ ├── deploy.sh │ ├── lambda_function.py │ ├── README.md │ ├── requirements.txt │ └── template.yaml ├── DEVELOPMENT.md ├── Dockerfile ├── LICENSE ├── pyproject.toml ├── pytest.ini ├── README.md ├── scripts │ └── publish.py ├── smithery.yaml ├── src │ ├── alphavantage_mcp_client │ │ └── client.py │ └── alphavantage_mcp_server │ ├── __init__.py │ ├── api.py │ ├── oauth.py │ ├── prompts.py │ ├── response_utils.py │ ├── server.py │ ├── telemetry_bootstrap.py │ ├── telemetry_instrument.py │ └── tools.py ├── tests │ ├── test_api.py │ ├── test_http_mcp_client.py │ ├── test_http_transport.py │ ├── test_integration.py │ ├── test_stdio_transport.py │ └── test_telemetry.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.12.7 ``` -------------------------------------------------------------------------------- /.bumpversion.cfg: -------------------------------------------------------------------------------- ``` [bumpversion] current_version = 0.3.24 commit = True tag = True [bumpversion:file:pyproject.toml] search = version = "{current_version}" replace = version = "{new_version}" ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` .idea aws-chalice # Logs logs *.log npm-debug.log* yarn-debug.log* yarn-error.log* lerna-debug.log* .pnpm-debug.log* # Diagnostic reports (https://nodejs.org/api/report.html) report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json # Runtime data pids *.pid *.seed *.pid.lock # Directory for instrumented libs generated by jscoverage/JSCover lib-cov # Coverage directory used by tools like istanbul coverage *.lcov # nyc test coverage .nyc_output # Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) .grunt # Bower dependency directory (https://bower.io/) bower_components # node-waf configuration .lock-wscript # Compiled binary addons (https://nodejs.org/api/addons.html) build/Release # Dependency directories node_modules/ jspm_packages/ # Snowpack dependency directory (https://snowpack.dev/) web_modules/ # TypeScript cache *.tsbuildinfo # Optional npm cache directory .npm # Optional eslint cache .eslintcache # Optional stylelint cache .stylelintcache # Microbundle cache .rpt2_cache/ .rts2_cache_cjs/ .rts2_cache_es/ .rts2_cache_umd/ # Optional REPL history .node_repl_history # Output of 'npm pack' *.tgz # Yarn Integrity file .yarn-integrity # dotenv environment variable files .env .env.development.local .env.test.local .env.production.local .env.local .env # parcel-bundler cache (https://parceljs.org/) .cache .parcel-cache # Next.js build output .next out # Nuxt.js build / generate output .nuxt dist # Gatsby files .cache/ # Comment in the public line in if your project uses Gatsby and not Next.js # https://nextjs.org/blog/next-9-1#public-directory-support # public # vuepress build output .vuepress/dist # vuepress v2.x temp and cache directory .temp .cache # Docusaurus cache and generated files .docusaurus # Serverless directories .serverless/ # FuseBox cache .fusebox/ # DynamoDB Local files .dynamodb/ # TernJS port file .tern-port # Stores VSCode versions used for testing VSCode extensions .vscode-test # yarn v2 .yarn/cache .yarn/unplugged .yarn/build-state.yml .yarn/install-state.gz .pnp.* build/ gcp-oauth.keys.json .*-server-credentials.json # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class # C extensions *.so # Distribution / packaging .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ share/python-wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .nox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *.cover *.py,cover .hypothesis/ .pytest_cache/ cover/ # Translations *.mo *.pot # Django stuff: *.log local_settings.py db.sqlite3 db.sqlite3-journal # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ # PyBuilder .pybuilder/ target/ # Jupyter Notebook .ipynb_checkpoints # IPython profile_default/ ipython_config.py # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: # .python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. # However, in case of collaboration, if having platform-specific dependencies or dependencies # having no cross-platform support, pipenv may install dependencies that don't work, or not # install all needed dependencies. #Pipfile.lock # poetry # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control #poetry.lock # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. #pdm.lock # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it # in version control. # https://pdm.fming.dev/latest/usage/project/#working-with-version-control .pdm.toml .pdm-python .pdm-build/ # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ # Celery stuff celerybeat-schedule celerybeat.pid # SageMath parsed files *.sage.py # Environments .env .venv env/ venv/ ENV/ env.bak/ venv.bak/ # Spyder project settings .spyderproject .spyproject # Rope project settings .ropeproject # mkdocs documentation /site # mypy .mypy_cache/ .dmypy.json dmypy.json # Pyre type checker .pyre/ # pytype static type analyzer .pytype/ # Cython debug symbols cython_debug/ .DS_Store # PyCharm # JetBrains specific template is maintained in a separate JetBrains.gitignore that can # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ windsurfrules.md deploy/aws-chalice/vendor deploy/aws-chalice/.chalice ``` -------------------------------------------------------------------------------- /deploy/aws-stateless-mcp-lambda/README.md: -------------------------------------------------------------------------------- ```markdown # AWS Stateless MCP Lambda Deployment This deployment uses the **stateless MCP pattern** from [aws-samples/sample-serverless-mcp-servers](https://github.com/aws-samples/sample-serverless-mcp-servers/tree/main/stateless-mcp-on-lambda-python) to deploy the AlphaVantage MCP Server on AWS Lambda. ## 🎯 Why Stateless MCP? Unlike our previous attempts with Chalice and Lambda Web Adapter, this approach is specifically designed for **stateless MCP servers** that work perfectly with Lambda's execution model: - ✅ **No session state management** - Each request is independent - ✅ **Perfect for Lambda** - Stateless execution model matches Lambda - ✅ **Horizontal scaling** - Seamless elasticity and load distribution - ✅ **AWS-recommended pattern** - Based on official AWS samples ## 🏗️ Architecture ``` Internet → API Gateway → Lambda Function → AlphaVantage MCP Server → AlphaVantage API ``` Each Lambda invocation: 1. Receives MCP JSON-RPC request via API Gateway 2. Calls appropriate AlphaVantage MCP server function directly 3. Returns MCP-compliant JSON response 4. No persistent connections or session state required ## 🚀 Quick Start ### Prerequisites ```bash # Install AWS CLI pip install awscli # Install AWS SAM CLI pip install aws-sam-cli # Configure AWS credentials aws configure ``` ### Deploy ```bash # Set your AlphaVantage API key export ALPHAVANTAGE_API_KEY=your_api_key_here # Optional: Enable OAuth 2.1 export OAUTH_ENABLED=true export OAUTH_AUTHORIZATION_SERVER_URL=https://your-oauth-server.com # Deploy cd deploy/aws-stateless-mcp-lambda chmod +x deploy.sh ./deploy.sh ``` ## 🧪 Testing After deployment, test with these commands: ### 1. Initialize MCP Session ```bash curl -X POST 'https://your-api-id.execute-api.region.amazonaws.com/prod/mcp' \ -H 'Content-Type: application/json' \ -H 'Accept: application/json' \ -d '{ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "test-client", "version": "1.0.0"} } }' ``` ### 2. List Available Tools ```bash curl -X POST 'https://your-api-id.execute-api.region.amazonaws.com/prod/mcp' \ -H 'Content-Type: application/json' \ -H 'Accept: application/json' \ -d '{ "jsonrpc": "2.0", "id": 2, "method": "tools/list" }' ``` ### 3. Call a Tool ```bash curl -X POST 'https://your-api-id.execute-api.region.amazonaws.com/prod/mcp' \ -H 'Content-Type: application/json' \ -H 'Accept: application/json' \ -d '{ "jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": { "name": "stock_quote", "arguments": {"symbol": "AAPL"} } }' ``` ## 🔐 OAuth 2.1 Support Enable OAuth authentication by setting environment variables: ```bash export OAUTH_ENABLED=true export OAUTH_AUTHORIZATION_SERVER_URL=https://your-oauth-server.com export OAUTH_CLIENT_ID=your_client_id export OAUTH_CLIENT_SECRET=your_client_secret ``` When OAuth is enabled, include Bearer token in requests: ```bash curl -X POST 'https://your-api-id.execute-api.region.amazonaws.com/prod/mcp' \ -H 'Content-Type: application/json' \ -H 'Authorization: Bearer your_access_token' \ -d '{"jsonrpc":"2.0","id":2,"method":"tools/list"}' ``` ## 📊 Available Tools The AlphaVantage MCP Server provides 50+ financial data tools: ### Stock Data - `get_stock_quote` - Real-time stock quotes - `get_intraday_data` - Intraday time series - `get_daily_data` - Daily time series - `get_weekly_data` - Weekly time series - `get_monthly_data` - Monthly time series ### Technical Indicators - `get_sma` - Simple Moving Average - `get_ema` - Exponential Moving Average - `get_rsi` - Relative Strength Index - `get_macd` - MACD indicator - And 30+ more technical indicators ### Fundamental Data - `get_company_overview` - Company fundamentals - `get_income_statement` - Income statements - `get_balance_sheet` - Balance sheets - `get_cash_flow` - Cash flow statements ### Economic Data - `get_gdp` - GDP data - `get_inflation` - Inflation rates - `get_unemployment` - Unemployment rates - And more economic indicators ## 🔍 Monitoring ### CloudWatch Logs ```bash # Follow Lambda logs aws logs tail /aws/lambda/alphavantage-stateless-mcp-alphavantage-mcp --follow # Get function metrics aws lambda get-function --function-name alphavantage-stateless-mcp-alphavantage-mcp ``` ### API Gateway Metrics - Monitor request count, latency, and errors in CloudWatch - Set up alarms for high error rates or latency ## 🛠️ Troubleshooting ### Common Issues **1. Import Errors** ``` ModuleNotFoundError: No module named 'alphavantage_mcp_server' ``` - **Solution**: Ensure the Lambda layer is properly built with source code **2. API Key Errors** ``` {"error": "API key required"} ``` - **Solution**: Verify `ALPHAVANTAGE_API_KEY` environment variable is set **3. Tool Not Found** ``` {"error": {"code": -32601, "message": "Method not found"}} ``` - **Solution**: Check tool name spelling and availability with `tools/list` ### Debug Mode Enable debug logging by setting environment variable: ```bash export DEBUG=true ``` ## 💰 Cost Estimation ### Lambda Costs - **Requests**: $0.20 per 1M requests - **Duration**: $0.0000166667 per GB-second - **Example**: 10,000 requests/month ≈ $2-5/month ### API Gateway Costs - **REST API**: $3.50 per million API calls - **Data transfer**: $0.09 per GB ### Total Estimated Cost - **Light usage** (1K requests/month): ~$1/month - **Moderate usage** (10K requests/month): ~$5/month - **Heavy usage** (100K requests/month): ~$40/month ## 🧹 Cleanup Remove all AWS resources: ```bash aws cloudformation delete-stack --stack-name alphavantage-stateless-mcp ``` ## 📚 References - [AWS Sample Serverless MCP Servers](https://github.com/aws-samples/sample-serverless-mcp-servers) - [MCP Specification](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports) - [AlphaVantage API Documentation](https://www.alphavantage.co/documentation/) - [AWS Lambda Documentation](https://docs.aws.amazon.com/lambda/) ## 🤝 Contributing This deployment is based on the official AWS sample pattern. For improvements: 1. Test changes locally with SAM 2. Update the Lambda function code 3. Redeploy with `./deploy.sh` 4. Verify with test commands ## 📄 License This deployment follows the same MIT-0 license as the AWS sample repository. ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # ✅ Official Alpha Vantage MCP Server [](https://smithery.ai/server/@calvernaz/alphavantage) [](https://mseep.ai/app/b76d0966-edd1-46fd-9cfb-b29a6d8cb563) A MCP server for the stock market data API, Alphavantage API. **MCP Server URL**: https://mcp.alphavantage.co **PyPi**: https://pypi.org/project/alphavantage-mcp/ ## Configuration ### Getting an API Key 1. Sign up for a [Free Alphavantage API key](https://www.alphavantage.co/support/#api-key) 2. Add the API key to your environment variables as `ALPHAVANTAGE_API_KEY` ## Installation ### Option 1: Using uvx (Recommended) The easiest way to use the AlphaVantage MCP server is with `uvx`: ```bash # Run directly without installation uvx alphavantage-mcp # Or with specific arguments uvx alphavantage-mcp --server http --port 8080 ``` ### Option 2: Using pip ```bash pip install alphavantage-mcp alphavantage-mcp ``` ### Option 3: From source ```bash git clone https://github.com/calvernaz/alphavantage.git cd alphavantage uv run alphavantage ``` ## Server Modes The AlphaVantage server can run in two different modes: ### Stdio Server (Default) This is the standard MCP server mode used for tools like Claude Desktop. ```bash alphavantage # or explicitly: alphavantage --server stdio ``` ### Streamable HTTP Server This mode provides real-time updates via HTTP streaming. ```bash alphavantage --server http --port 8080 ``` ### Streamable HTTP Server with OAuth 2.1 Authentication This mode adds OAuth 2.1 authentication to the HTTP server, following the MCP specification for secure access. ```bash alphavantage --server http --port 8080 --oauth ``` #### OAuth Configuration When using the `--oauth` flag, the server requires OAuth 2.1 configuration via environment variables: **Required Environment Variables:** ```bash export OAUTH_AUTHORIZATION_SERVER_URL="https://your-auth-server.com/realms/your-realm" export OAUTH_RESOURCE_SERVER_URI="https://your-mcp-server.com" ``` **Optional Environment Variables:** ```bash # Token validation method (default: jwt) export OAUTH_TOKEN_VALIDATION_METHOD="jwt" # or "introspection" # For JWT validation export OAUTH_JWT_PUBLIC_KEY="-----BEGIN PUBLIC KEY-----\n...\n-----END PUBLIC KEY-----" export OAUTH_JWT_ALGORITHM="RS256" # default # For token introspection validation export OAUTH_INTROSPECTION_ENDPOINT="https://your-auth-server.com/realms/your-realm/protocol/openid-connect/token/introspect" export OAUTH_INTROSPECTION_CLIENT_ID="your-client-id" export OAUTH_INTROSPECTION_CLIENT_SECRET="your-client-secret" # Optional: Required scopes (space-separated) export OAUTH_REQUIRED_SCOPES="mcp:access mcp:read" # Optional: Enable session binding for additional security (default: true) export OAUTH_SESSION_BINDING_ENABLED="true" ``` #### OAuth Features The OAuth implementation provides: - **OAuth 2.0 Protected Resource Metadata** endpoint (`/.well-known/oauth-protected-resource`) - **Bearer token authentication** for all MCP requests - **JWT and Token Introspection** validation methods - **MCP Security Best Practices** compliance: - Token audience validation (prevents token passthrough attacks) - Session hijacking prevention with secure session IDs - User-bound sessions for additional security - Proper WWW-Authenticate headers for 401 responses #### Example: Keycloak Configuration For testing with Keycloak: ```bash # Keycloak OAuth configuration export OAUTH_AUTHORIZATION_SERVER_URL="https://keycloak.example.com/realms/mcp-realm" export OAUTH_RESOURCE_SERVER_URI="https://mcp.example.com" export OAUTH_TOKEN_VALIDATION_METHOD="introspection" export OAUTH_INTROSPECTION_ENDPOINT="https://keycloak.example.com/realms/mcp-realm/protocol/openid-connect/token/introspect" export OAUTH_INTROSPECTION_CLIENT_ID="mcp-server" export OAUTH_INTROSPECTION_CLIENT_SECRET="your-keycloak-client-secret" export OAUTH_REQUIRED_SCOPES="mcp:access" # Start server with OAuth alphavantage --server http --port 8080 --oauth ``` #### OAuth Client Flow When OAuth is enabled, MCP clients must: 1. **Discover** the authorization server via `GET /.well-known/oauth-protected-resource` 2. **Register** with the authorization server (if using Dynamic Client Registration) 3. **Obtain access tokens** from the authorization server 4. **Include tokens** in requests: `Authorization: Bearer <access-token>` 5. **Handle 401/403 responses** and refresh tokens as needed Options: - `--server`: Choose between `stdio` (default) or `http` server mode - `--port`: Specify the port for the Streamable HTTP server (default: 8080) - `--oauth`: Enable OAuth 2.1 authentication (requires `--server http`) ## 📊 Telemetry The AlphaVantage MCP server includes optional Prometheus metrics for monitoring and observability. ### Enabling Telemetry Set the following environment variables to enable telemetry: ```bash # Enable telemetry (default: true) export MCP_TELEMETRY_ENABLED=true # Server identification (optional) export MCP_SERVER_NAME=alphavantage export MCP_SERVER_VERSION=1.0.0 # Metrics server port (default: 9464) export MCP_METRICS_PORT=9464 ``` ### Metrics Endpoint When telemetry is enabled, Prometheus metrics are available at: ``` http://localhost:9464/metrics ``` ### Available Metrics The server collects the following metrics for each tool call: - **`mcp_tool_calls_total`** - Total number of tool calls (labeled by tool and outcome) - **`mcp_tool_latency_seconds`** - Tool execution latency histogram - **`mcp_tool_request_bytes`** - Request payload size histogram - **`mcp_tool_response_bytes`** - Response payload size histogram - **`mcp_tool_active_concurrency`** - Active concurrent tool calls gauge - **`mcp_tool_errors_total`** - Total errors by type (timeout, bad_input, connection, unknown) ### Example Usage with Telemetry ```bash # Start server with telemetry enabled export MCP_TELEMETRY_ENABLED=true export MCP_SERVER_NAME=alphavantage-prod export ALPHAVANTAGE_API_KEY=your_api_key alphavantage --server http --port 8080 # View metrics curl http://localhost:9464/metrics ``` ## 🚀 AWS Serverless Deployment Deploy the AlphaVantage MCP Server on AWS Lambda using the stateless MCP pattern for production-ready, scalable deployment. ### Quick AWS Deployment ```bash cd deploy/aws-stateless-mcp-lambda export ALPHAVANTAGE_API_KEY=your_api_key_here ./deploy.sh ``` **Features:** - ✅ **Stateless MCP pattern** - Perfect for Lambda's execution model - ✅ **Auto-scaling** - Handles any load with AWS Lambda + API Gateway - ✅ **Cost-effective** - Pay only for requests (~$1-5/month for typical usage) - ✅ **Production-ready** - Based on AWS official sample patterns - ✅ **OAuth 2.1 support** - Optional authentication for secure access **📖 Full Documentation:** See [AWS Deployment Guide](deploy/aws-stateless-mcp-lambda/README.md) for complete setup instructions, testing, monitoring, and troubleshooting. ### Usage with Claude Desktop #### Option 1: Using uvx (Recommended) Add this to your `claude_desktop_config.json`: ```json { "mcpServers": { "alphavantage": { "command": "uvx", "args": ["alphavantage-mcp"], "env": { "ALPHAVANTAGE_API_KEY": "YOUR_API_KEY_HERE" } } } } ``` #### Option 2: From source If you cloned the repository, use this configuration: ```json { "mcpServers": { "alphavantage": { "command": "uv", "args": [ "--directory", "<DIRECTORY-OF-CLONED-PROJECT>/alphavantage", "run", "alphavantage" ], "env": { "ALPHAVANTAGE_API_KEY": "YOUR_API_KEY_HERE" } } } } ``` #### Running the Server in Streamable HTTP Mode **Using uvx:** ```json { "mcpServers": { "alphavantage": { "command": "uvx", "args": ["alphavantage-mcp", "--server", "http", "--port", "8080"], "env": { "ALPHAVANTAGE_API_KEY": "YOUR_API_KEY_HERE" } } } } ``` **From source:** ```json { "mcpServers": { "alphavantage": { "command": "uv", "args": [ "--directory", "<DIRECTORY-OF-CLONED-PROJECT>/alphavantage", "run", "alphavantage", "--server", "http", "--port", "8080" ], "env": { "ALPHAVANTAGE_API_KEY": "YOUR_API_KEY_HERE" } } } } ``` ## 📺 Demo Video Watch a quick demonstration of the Alpha Vantage MCP Server in action: [](https://github.com/user-attachments/assets/bc9ecffb-eab6-4a4d-bbf6-9fc8178f15c3) ## 🔧 Development & Publishing ### Publishing to PyPI This project includes scripts for publishing to PyPI and TestPyPI: ```bash # Publish to TestPyPI (for testing) python scripts/publish.py --test # Publish to PyPI (production) python scripts/publish.py # Use uv publish instead of twine python scripts/publish.py --test --use-uv ``` The script uses `twine` by default (recommended) but can also use `uv publish` with the `--use-uv` flag. ### GitHub Actions The repository includes a GitHub Actions workflow for automated publishing: - **Trusted Publishing**: Uses PyPA's official publish action with OpenID Connect - **Manual Trigger**: Can be triggered manually with options for TestPyPI vs PyPI - **Twine Fallback**: Supports both trusted publishing and twine-based publishing To set up publishing: 1. **For Trusted Publishing** (recommended): - Configure trusted publishing on PyPI/TestPyPI with your GitHub repository - No secrets needed - uses OpenID Connect 2. **For Token-based Publishing**: - Add `PYPI_API_TOKEN` and `TEST_PYPI_API_TOKEN` secrets to your repository - Use the "Use twine" option in the workflow dispatch ## 🤝 Contributing We welcome contributions from the community! To get started, check out our [contribution](CONTRIBUTING.md) guide for setup instructions, development tips, and guidelines. ``` -------------------------------------------------------------------------------- /CONTRIBUTING.md: -------------------------------------------------------------------------------- ```markdown # Contributing to AlphaVantage MCP Server Thanks for your interest in contributing! 🎉 This project is the official [MCP (Model Context Protocol)](https://github.com/modelcontextprotocol/servers) server for [Alpha Vantage](https://www.alphavantage.co). Here's how to get started with local development and testing. --- ## 🚀 Getting Started ### 1. Clone the repo ```bash git clone https://github.com/calvernaz/alphavantage.git cd alphavantage ``` ## Set up your environment You'll need an [Alpha Vantage API key](https://www.alphavantage.co/support/#api-key). Create a .env file in the project root: ```bash touch .env ``` Add your API key to the .env file: ```bash ALPHAVANTAGE_API_KEY=your_api_key_here ``` Alternatively, you can export it directly in your terminal: ```bash export ALPHAVANTAGE_API_KEY=your_api_key_here ``` ## 🧪 Running Locally with Inspector Use the MCP Inspector to run and test your server locally with hot reload. ```bash npm install -g @modelcontextprotocol/inspector ``` Then, run the server: ```bash npx @modelcontextprotocol/inspector uv --directory ~/alphavantage run alphavantage ``` > Replace ~/code/alphavantage with your actual path to this repo. ``` -------------------------------------------------------------------------------- /.github/FUNDING.yml: -------------------------------------------------------------------------------- ```yaml github: [calvernaz] ``` -------------------------------------------------------------------------------- /pytest.ini: -------------------------------------------------------------------------------- ``` [pytest] asyncio_mode=auto asyncio_default_fixture_loop_scope="function" ``` -------------------------------------------------------------------------------- /deploy/aws-stateless-mcp-lambda/requirements.txt: -------------------------------------------------------------------------------- ``` # Core MCP dependencies mcp>=1.0.0 # AlphaVantage MCP server dependencies requests>=2.31.0 aiohttp>=3.9.0 pydantic>=2.5.0 toml>=0.10.2 uvicorn>=0.24.0 starlette>=0.27.0 # OAuth 2.1 support PyJWT>=2.8.0 httpx>=0.25.0 cryptography>=41.0.0 # AWS Lambda runtime boto3>=1.34.0 ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml startCommand: type: http configSchema: # JSON Schema defining the configuration options for the MCP. type: object required: - alphavantageApiKey properties: alphavantageApiKey: type: string description: The API key for the Alphavantage server. commandFunction: # A function that produces the CLI command to start the MCP on stdio. |- config => ({ command: 'alphavantage', env: { ALPHAVANTAGE_API_KEY: config.alphavantageApiKey } }) ``` -------------------------------------------------------------------------------- /deploy/aws-stateless-mcp-lambda/.aws-sam/build.toml: -------------------------------------------------------------------------------- ```toml # This file is auto generated by SAM CLI build command [function_build_definitions.f642707c-4652-4f92-8ce7-4478f43aac20] codeuri = "/Users/medusa/code/alphavantage/deploy/aws-stateless-mcp-lambda" runtime = "python3.12" architecture = "x86_64" handler = "lambda_function.lambda_handler" manifest_hash = "" packagetype = "Zip" functions = ["AlphaVantageMCPFunction"] [layer_build_definitions.02ea97b0-d24e-4cf4-b3e6-7682870a9300] layer_name = "AlphaVantageMCPLayer" codeuri = "/Users/medusa/code/alphavantage/src" build_method = "python3.12" compatible_runtimes = ["python3.12"] architecture = "x86_64" manifest_hash = "" layer = "AlphaVantageMCPLayer" ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Use a Python image with uv pre-installed FROM ghcr.io/astral-sh/uv:python3.12-alpine # Install the project into `/app` WORKDIR /app # Enable bytecode compilation ENV UV_COMPILE_BYTECODE=1 # Copy from the cache instead of linking since it's a mounted volume ENV UV_LINK_MODE=copy # Install the project's dependencies using the lockfile and settings RUN --mount=type=cache,target=/root/.cache/uv \ --mount=type=bind,source=uv.lock,target=uv.lock \ --mount=type=bind,source=pyproject.toml,target=pyproject.toml \ uv sync --locked --no-install-project --no-dev --python /usr/local/bin/python3 # Then, add the rest of the project source code and install it # Installing separately from its dependencies allows optimal layer caching COPY . /app # Remove any existing virtual environment that might have been copied from host RUN rm -rf .venv RUN --mount=type=cache,target=/root/.cache/uv \ uv sync --locked --no-dev --python /usr/local/bin/python3 # Place executables in the environment at the front of the path ENV PATH="/app/.venv/bin:$PATH" # Set transport mode to HTTP and port to 8080 as required by Smithery proxy ENV TRANSPORT=http ENV PORT=8080 EXPOSE 8080 # Reset the entrypoint, don't invoke `uv` ENTRYPOINT [] # Run the application directly using the venv Python CMD ["alphavantage"] ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "alphavantage-mcp" version = "0.3.24" description = "AlphaVantage MCP server - Financial data tools for Model Context Protocol" readme = "README.md" requires-python = ">=3.12" keywords = ["mcp", "alphavantage", "financial", "stocks", "api", "server"] license = {text = "MIT"} classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.12", "Topic :: Office/Business :: Financial", "Topic :: Software Development :: Libraries :: Python Modules", ] dependencies = [ "mcp>=1.9.4", "httpx>=0.28.1", "starlette>=0.47.2", "uvicorn>=0.32.1", "PyJWT>=2.10.1", "prometheus-client>=0.20.0", "toml>=0.10.2", "packaging>=21.0", ] [[project.authors]] name = "Cesar Alvernaz" email = "[email protected]" [project.urls] Homepage = "https://github.com/calvernaz/alphavantage" Repository = "https://github.com/calvernaz/alphavantage" Issues = "https://github.com/calvernaz/alphavantage/issues" Documentation = "https://github.com/calvernaz/alphavantage#readme" [build-system] requires = [ "hatchling",] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/alphavantage_mcp_server"] [dependency-groups] dev = [ "pytest>=8.4.1", "pytest-asyncio>=0.24.0", "ruff>=0.9.9", "build>=1.0.0", "twine>=4.0.0", ] [project.scripts] alphavantage-mcp = "alphavantage_mcp_server:main" ``` -------------------------------------------------------------------------------- /src/alphavantage_mcp_server/__init__.py: -------------------------------------------------------------------------------- ```python import asyncio import argparse import os from . import server def main(): """Main entry point for the package.""" parser = argparse.ArgumentParser(description="AlphaVantage MCP Server") parser.add_argument( "--server", type=str, choices=["stdio", "http"], help="Server type: stdio or http (default: stdio, or from TRANSPORT env var)", ) parser.add_argument( "--port", type=int, help="Port for HTTP server (default: 8080, or from PORT env var)", ) parser.add_argument( "--oauth", action="store_true", help="Enable OAuth 2.1 authentication for HTTP server (requires --server http)", ) args = parser.parse_args() # Determine server type: command line arg takes precedence, then env var, then default to stdio server_type = args.server if server_type is None: transport_env = os.getenv("TRANSPORT", "").lower() if transport_env == "http": server_type = "http" else: server_type = "stdio" # Determine port: command line arg takes precedence, then env var, then default to 8080 port = args.port if port is None: try: port = int(os.getenv("PORT", "8080")) except ValueError: port = 8080 # Validate OAuth flag usage if args.oauth and server_type != "http": parser.error( "--oauth flag can only be used with --server http or TRANSPORT=http" ) # Use the patched server.main function directly asyncio.run( server.main(server_type=server_type, port=port, oauth_enabled=args.oauth) ) if __name__ == "__main__": main() __all__ = ["main", "server"] ``` -------------------------------------------------------------------------------- /tests/test_telemetry.py: -------------------------------------------------------------------------------- ```python """ Unit tests for telemetry modules. """ import sys from unittest.mock import patch, MagicMock import pytest # Mock the external dependencies sys.modules["prometheus_client"] = MagicMock() class TestTelemetryInstrumentation: """Test telemetry instrumentation functionality.""" def test_instrument_tool_decorator_disabled(self): """Test that decorator does nothing when telemetry is disabled.""" with patch( "src.alphavantage_mcp_server.telemetry_instrument.is_telemetry_enabled", return_value=False, ): from src.alphavantage_mcp_server.telemetry_instrument import instrument_tool @instrument_tool("test_tool") def test_function(x, y): return x + y result = test_function(2, 3) assert result == 5 def test_error_classification(self): """Test error classification logic.""" from src.alphavantage_mcp_server.telemetry_instrument import _classify_error assert _classify_error(TimeoutError()) == "timeout" assert _classify_error(ValueError()) == "bad_input" assert _classify_error(TypeError()) == "bad_input" assert _classify_error(KeyError()) == "bad_input" assert _classify_error(ConnectionError()) == "connection" assert _classify_error(RuntimeError()) == "unknown" def test_size_calculation(self): """Test size calculation function.""" from src.alphavantage_mcp_server.telemetry_instrument import _get_size_bytes assert _get_size_bytes("hello") == 5 assert _get_size_bytes(b"hello") == 5 assert _get_size_bytes(None) == 0 assert _get_size_bytes(123) > 0 if __name__ == "__main__": pytest.main([__file__]) ``` -------------------------------------------------------------------------------- /.github/workflows/publish.yml: -------------------------------------------------------------------------------- ```yaml name: Publish to PyPI on: release: types: [published] workflow_dispatch: inputs: test_pypi: description: 'Publish to TestPyPI instead of PyPI' required: false default: false type: boolean use_twine: description: 'Use twine instead of trusted publishing' required: false default: false type: boolean jobs: publish: runs-on: ubuntu-latest environment: name: ${{ github.event.inputs.test_pypi == 'true' && 'testpypi' || 'pypi' }} permissions: id-token: write # For trusted publishing contents: read steps: - uses: actions/checkout@v4 - name: Install uv uses: astral-sh/setup-uv@v4 with: version: "latest" - name: Set up Python run: uv python install 3.12 - name: Install dependencies run: uv sync --dev - name: Build package run: uv build - name: Publish to TestPyPI (trusted publishing) if: github.event.inputs.test_pypi == 'true' && github.event.inputs.use_twine != 'true' uses: pypa/gh-action-pypi-publish@release/v1 with: repository-url: https://test.pypi.org/legacy/ - name: Publish to PyPI (trusted publishing) if: github.event.inputs.test_pypi != 'true' && github.event.inputs.use_twine != 'true' uses: pypa/gh-action-pypi-publish@release/v1 - name: Publish to TestPyPI (twine) if: github.event.inputs.test_pypi == 'true' && github.event.inputs.use_twine == 'true' env: TWINE_USERNAME: __token__ TWINE_PASSWORD: ${{ secrets.TEST_PYPI_API_TOKEN }} run: uv run twine upload --repository testpypi dist/* - name: Publish to PyPI (twine) if: github.event.inputs.test_pypi != 'true' && github.event.inputs.use_twine == 'true' env: TWINE_USERNAME: __token__ TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} run: uv run twine upload dist/* ``` -------------------------------------------------------------------------------- /DEVELOPMENT.md: -------------------------------------------------------------------------------- ```markdown # Run the server Set the environment variable `ALPHAVANTAGE_API_KEY` to your Alphavantage API key. ```bash uv --directory ~/code/alphavantage run alphavantage ``` ### Response Limiting Utilities #### 1. Modify API Functions Add `max_data_points` parameter to technical indicator functions: ```python async def fetch_sma( symbol: str, interval: str = None, month: str = None, time_period: int = None, series_type: str = None, datatype: str = "json", max_data_points: int = 100, # NEW PARAMETER ) -> dict[str, str] | str: ``` #### 2. Apply Response Limiting Logic ```python # In fetch_sma and other technical indicator functions if datatype == "csv": return response.text # For JSON responses, apply response limiting full_response = response.json() from .response_utils import limit_time_series_response, should_limit_response if should_limit_response(full_response): return limit_time_series_response(full_response, max_data_points) return full_response ``` #### 3. Update Tool Definitions Add `max_data_points` parameter to tool schemas: ```python types.Tool( name=AlphavantageTools.SMA.value, description="Fetch simple moving average", inputSchema={ "type": "object", "properties": { "symbol": {"type": "string"}, "interval": {"type": "string"}, "month": {"type": "string"}, "time_period": {"type": "number"}, "series_type": {"type": "string"}, "datatype": {"type": "string"}, "max_data_points": { "type": "number", "description": "Maximum number of data points to return (default: 100)", "default": 100 }, }, "required": ["symbol", "interval", "time_period", "series_type"], }, ), ``` #### 4. Update Tool Handlers Pass `max_data_points` parameter to API functions: ```python case AlphavantageTools.SMA.value: symbol = arguments.get("symbol") interval = arguments.get("interval") month = arguments.get("month") time_period = arguments.get("time_period") series_type = arguments.get("series_type") datatype = arguments.get("datatype", "json") max_data_points = arguments.get("max_data_points", 100) # NEW if not symbol or not interval or not time_period or not series_type: raise ValueError( "Missing required arguments: symbol, interval, time_period, series_type" ) result = await fetch_sma( symbol, interval, month, time_period, series_type, datatype, max_data_points ) ``` # Format ```bash ruff check src/alphavantage_mcp_server/ --fix ``` # Run Tests ```bash pytest tests/*.py ``` # Versioning ```bash bumpversion patch ``` ``` -------------------------------------------------------------------------------- /deploy/aws-stateless-mcp-lambda/template.yaml: -------------------------------------------------------------------------------- ```yaml AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > Stateless AlphaVantage MCP Server on AWS Lambda Based on aws-samples/sample-serverless-mcp-servers pattern Parameters: AlphaVantageApiKey: Type: String Description: AlphaVantage API Key NoEcho: true OAuthEnabled: Type: String Default: 'false' AllowedValues: ['true', 'false'] Description: Enable OAuth 2.1 authentication OAuthAuthorizationServerUrl: Type: String Default: '' Description: OAuth Authorization Server URL (optional) Globals: Function: Timeout: 30 MemorySize: 512 Runtime: python3.12 Environment: Variables: ALPHAVANTAGE_API_KEY: !Ref AlphaVantageApiKey OAUTH_ENABLED: !Ref OAuthEnabled OAUTH_AUTHORIZATION_SERVER_URL: !Ref OAuthAuthorizationServerUrl Resources: AlphaVantageMCPFunction: Type: AWS::Serverless::Function Properties: FunctionName: !Sub "${AWS::StackName}-alphavantage-mcp" CodeUri: . Handler: lambda_function.lambda_handler Description: Stateless AlphaVantage MCP Server Layers: - !Ref AlphaVantageMCPLayer Events: McpApiPost: Type: Api Properties: Path: /mcp Method: post McpApiOptions: Type: Api Properties: Path: /mcp Method: options Environment: Variables: PYTHONPATH: /var/task:/opt/python # Lambda Layer for AlphaVantage MCP Server source code AlphaVantageMCPLayer: Type: AWS::Serverless::LayerVersion Properties: LayerName: !Sub "${AWS::StackName}-alphavantage-mcp-layer" Description: AlphaVantage MCP Server source code ContentUri: ../../src/ CompatibleRuntimes: - python3.12 Metadata: BuildMethod: python3.12 Outputs: McpApiUrl: Description: "API Gateway endpoint URL for MCP requests" Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/mcp" Export: Name: !Sub "${AWS::StackName}-McpApiUrl" FunctionName: Description: "Lambda Function Name" Value: !Ref AlphaVantageMCPFunction Export: Name: !Sub "${AWS::StackName}-FunctionName" TestCommands: Description: "Test commands for the deployed MCP server" Value: !Sub | # Initialize MCP session curl -X POST https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/mcp \ -H 'Content-Type: application/json' \ -H 'Accept: application/json' \ -d '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test-client","version":"1.0.0"}}}' # List available tools curl -X POST https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/mcp \ -H 'Content-Type: application/json' \ -H 'Accept: application/json' \ -d '{"jsonrpc":"2.0","id":2,"method":"tools/list"}' # Call a tool (example: get stock quote) curl -X POST https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/mcp \ -H 'Content-Type: application/json' \ -H 'Accept: application/json' \ -d '{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"get_stock_quote","arguments":{"symbol":"AAPL"}}}' ``` -------------------------------------------------------------------------------- /tests/test_stdio_transport.py: -------------------------------------------------------------------------------- ```python import json import sys import pytest from mcp import ClientSession from mcp.client.stdio import stdio_client, StdioServerParameters @pytest.mark.asyncio async def test_stdio_stock_quote(): """Test stdio transport with stock_quote tool""" # Start the MCP server as a subprocess server_params = StdioServerParameters( command=sys.executable, args=[ "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "stdio", ], ) # Connect to the server using stdio client client = stdio_client(server_params) async with client as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Initialize the session await session.initialize() # List available tools response = await session.list_tools() tools = response.tools tool_names = [tool.name for tool in tools] # Verify stock_quote tool is available assert "stock_quote" in tool_names, ( f"stock_quote not found in tools: {tool_names}" ) # Find the stock_quote tool next(tool for tool in tools if tool.name == "stock_quote") # Test calling the stock_quote tool result = await session.call_tool("stock_quote", {"symbol": "AAPL"}) # Verify we got a result assert result is not None assert hasattr(result, "content") assert len(result.content) > 0 # Parse the JSON result to verify it contains stock data content_text = result.content[0].text stock_data = json.loads(content_text) # Verify the response contains expected stock quote fields assert "Global Quote" in stock_data or "Error Message" in stock_data if "Global Quote" in stock_data: global_quote = stock_data["Global Quote"] assert "01. symbol" in global_quote assert "05. price" in global_quote assert global_quote["01. symbol"] == "AAPL" @pytest.mark.asyncio async def test_stdio_tool_list(): """Test that stdio transport can list all available tools""" server_params = StdioServerParameters( command=sys.executable, args=[ "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "stdio", ], ) client = stdio_client(server_params) async with client as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() response = await session.list_tools() tools = response.tools # Verify we have tools assert len(tools) > 0 # Verify essential tools are present tool_names = [tool.name for tool in tools] expected_tools = ["stock_quote", "time_series_daily"] for expected_tool in expected_tools: assert expected_tool in tool_names, ( f"{expected_tool} not found in tools" ) # Verify each tool has required attributes for tool in tools: assert hasattr(tool, "name") assert hasattr(tool, "description") assert hasattr(tool, "inputSchema") assert tool.name is not None assert tool.description is not None assert tool.inputSchema is not None ``` -------------------------------------------------------------------------------- /scripts/publish.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Script to build and publish the alphavantage-mcp package to PyPI. Usage: python scripts/publish.py --test # Publish to TestPyPI using twine python scripts/publish.py # Publish to PyPI using twine python scripts/publish.py --test --use-uv # Publish to TestPyPI using uv publish python scripts/publish.py --use-uv # Publish to PyPI using uv publish """ import argparse import subprocess import sys from pathlib import Path def run_command(cmd: list[str], description: str) -> bool: """Run a command and return True if successful.""" print(f"🔄 {description}...") try: result = subprocess.run(cmd, check=True, capture_output=True, text=True) print(f"✅ {description} completed successfully") if result.stdout: print(f" Output: {result.stdout.strip()}") return True except subprocess.CalledProcessError as e: print(f"❌ {description} failed") print(f" Error: {e.stderr.strip()}") return False def main(): parser = argparse.ArgumentParser(description="Build and publish alphavantage-mcp package") parser.add_argument( "--test", action="store_true", help="Publish to TestPyPI instead of PyPI" ) parser.add_argument( "--skip-build", action="store_true", help="Skip building and only upload existing dist files" ) parser.add_argument( "--use-uv", action="store_true", help="Use uv publish instead of twine (default: use twine)" ) args = parser.parse_args() # Change to project root project_root = Path(__file__).parent.parent print(f"📁 Working in: {project_root}") if not args.skip_build: # Clean previous builds dist_dir = project_root / "dist" if dist_dir.exists(): print("🧹 Cleaning previous builds...") import shutil shutil.rmtree(dist_dir) # Build the package if not run_command( ["uv", "build"], "Building package" ): sys.exit(1) # Check if dist files exist dist_dir = project_root / "dist" if not dist_dir.exists() or not list(dist_dir.glob("*.whl")): print("❌ No built packages found in dist/") print(" Run without --skip-build to build the package first") sys.exit(1) # Upload to PyPI or TestPyPI if args.test: repository_name = "testpypi" print("🧪 Publishing to TestPyPI...") print(" You can install with: pip install -i https://test.pypi.org/simple/ alphavantage-mcp") else: repository_name = "pypi" print("🚀 Publishing to PyPI...") print(" After publishing, users can install with: uvx alphavantage-mcp") # Choose upload method if args.use_uv: # Use uv publish repository_url = "https://test.pypi.org/legacy/" if args.test else "https://upload.pypi.org/legacy/" upload_cmd = [ "uv", "publish", "--publish-url", repository_url, str(dist_dir / "*") ] auth_help = "uv publish --help" else: # Use twine (default) upload_cmd = [ "uv", "run", "twine", "upload", "--repository", repository_name, str(dist_dir / "*") ] auth_help = "~/.pypirc file or environment variables" if not run_command(upload_cmd, f"Uploading to {repository_name}"): print(f"\n💡 If authentication failed, make sure you have:") print(f" 1. Created an account on {repository_name}") print(f" 2. Generated an API token") print(f" 3. Set up authentication with: {auth_help}") sys.exit(1) print(f"\n🎉 Package successfully published to {repository_name.upper()}!") if args.test: print("\n📋 Next steps:") print(" 1. Test the installation: pip install -i https://test.pypi.org/simple/ alphavantage-mcp") print(" 2. If everything works, publish to PyPI: python scripts/publish.py") else: print("\n📋 Users can now install with:") print(" uvx alphavantage-mcp") print(" or") print(" pip install alphavantage-mcp") if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/alphavantage_mcp_client/client.py: -------------------------------------------------------------------------------- ```python import os import sys import json import asyncio from typing import Optional, Literal from contextlib import AsyncExitStack from mcp import ClientSession from mcp.client.sse import sse_client from mcp.client.streamable_http import streamablehttp_client from openai import OpenAI BASE_URL = "http://localhost:8080/" class MCPClient: def __init__(self): # Initialize session and client objects self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() self.client = OpenAI( base_url=os.getenv("OPENAI_BASE_URL"), api_key=os.getenv("OPENAI_API_KEY"), ) async def connect_to_server( self, server_protocol: Literal["sse", "streamable-http"] ): """Connect to an MCP server""" if server_protocol == "sse": client = sse_client(BASE_URL + "sse") rs, ws = await self.exit_stack.enter_async_context(client) elif server_protocol == "streamable-http": client = streamablehttp_client(BASE_URL + "mcp") rs, ws, _ = await self.exit_stack.enter_async_context(client) else: raise Exception("Unknown transport protocol") self.session = await self.exit_stack.enter_async_context(ClientSession(rs, ws)) await self.session.initialize() # List available tools response = await self.session.list_tools() tools = response.tools_definitions print("\nConnected to server with tools:", [tool.name for tool in tools]) async def process_query(self, query: str) -> str: """Process a query using LLM and available tools""" messages = [{"role": "user", "content": query}] response = await self.session.list_tools() available_tools = [ { "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.inputSchema, }, } for tool in response.tools ] # Initial LLM API call response = self.client.chat.completions.create( model=self.client.models.list().data[0].id, messages=messages, tools=available_tools, tool_choice="auto", ) # Process response and handle tool calls final_text = [] content = response.choices[0].message.content final_text.append(content) if response.choices[0].message.tool_calls: for tc in response.choices[0].message.tool_calls: f = tc.function tool_name = f.name tool_args = f.arguments # Execute tool call result = await self.session.call_tool(tool_name, json.loads(tool_args)) final_text.append(f"[Calling tool {tool_name} with args {tool_args}]") # Continue conversation with tool results messages.append({"role": "assistant", "content": content}) messages.append({"role": "user", "content": result.content}) # Get next response from LLM response = self.client.chat.completions.create( model=self.client.models.list().data[0].id, messages=messages, ) final_text.append(response.choices[0].message.content) return "\n".join(final_text) async def chat_loop(self): """Run an interactive chat loop""" print("\nMCP Client Started!") print("Type your queries or 'quit' to exit.") while True: try: query = input("\nQuery: ").strip() if query.lower() == "quit": break response = await self.process_query(query) print("\n" + response) except Exception as e: raise e print(f"\nError: {str(e)}") async def cleanup(self): """Clean up resources""" await self.exit_stack.aclose() async def start(): if len(sys.argv) == 2 and sys.argv[1] in ("sse", "streamable-http"): client = MCPClient() try: await client.connect_to_server(sys.argv[1]) await client.chat_loop() finally: await client.cleanup() else: raise Exception("Usage: uv run client sse|streamable-http") def main(): asyncio.run(start()) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/alphavantage_mcp_server/telemetry_bootstrap.py: -------------------------------------------------------------------------------- ```python """ Telemetry Bootstrap Module This module initializes Prometheus metrics for the AlphaVantage MCP server. It provides centralized configuration and setup for telemetry components. """ import os import logging import threading from typing import Optional from prometheus_client import Counter, Histogram, Gauge, start_http_server logger = logging.getLogger(__name__) # Environment variable configuration MCP_TELEMETRY_ENABLED = os.getenv("MCP_TELEMETRY_ENABLED", "true").lower() == "true" MCP_SERVER_NAME = os.getenv("MCP_SERVER_NAME", "alphavantage") MCP_SERVER_VERSION = os.getenv("MCP_SERVER_VERSION", "dev") MCP_METRICS_PORT = int(os.getenv("MCP_METRICS_PORT", "9464")) # Global telemetry state _telemetry_initialized = False _metrics_server_started = False _metrics_server_thread: Optional[threading.Thread] = None # Prometheus metrics - these will be initialized in init_telemetry() MCP_CALLS: Optional[Counter] = None MCP_ERRS: Optional[Counter] = None MCP_LAT: Optional[Histogram] = None MCP_REQ_B: Optional[Histogram] = None MCP_RES_B: Optional[Histogram] = None MCP_CONC: Optional[Gauge] = None def _create_prometheus_metrics(): """Create and return Prometheus metrics objects.""" global MCP_CALLS, MCP_ERRS, MCP_LAT, MCP_REQ_B, MCP_RES_B, MCP_CONC MCP_CALLS = Counter( "mcp_tool_calls_total", "Total number of MCP tool calls", ["tool", "server", "version", "outcome"], ) MCP_ERRS = Counter( "mcp_tool_errors_total", "Total number of MCP tool errors", ["tool", "error_kind"], ) MCP_LAT = Histogram( "mcp_tool_latency_seconds", "MCP tool call latency in seconds", ["tool", "server", "version"], buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0], ) MCP_REQ_B = Histogram( "mcp_tool_request_bytes", "MCP tool request size in bytes", ["tool"], buckets=[64, 256, 1024, 4096, 16384, 65536, 262144, 1048576], ) MCP_RES_B = Histogram( "mcp_tool_response_bytes", "MCP tool response size in bytes", ["tool"], buckets=[64, 256, 1024, 4096, 16384, 65536, 262144, 1048576, 4194304], ) MCP_CONC = Gauge( "mcp_tool_active_concurrency", "Number of currently active MCP tool calls", ["tool"], ) def _start_metrics_server(): """Start the Prometheus metrics HTTP server.""" global _metrics_server_started, _metrics_server_thread if _metrics_server_started: return try: def run_server(): try: start_http_server(MCP_METRICS_PORT, addr="127.0.0.1") logger.info( f"Prometheus metrics server started on 127.0.0.1:{MCP_METRICS_PORT}" ) except Exception as e: logger.error(f"Failed to start metrics server: {e}") _metrics_server_thread = threading.Thread(target=run_server, daemon=True) _metrics_server_thread.start() _metrics_server_started = True except Exception as e: logger.error(f"Failed to start metrics server thread: {e}") def init_telemetry(start_metrics: bool = True) -> None: """ Initialize telemetry system. Args: start_metrics: Whether to start the Prometheus metrics HTTP server. Set to False for Lambda environments. """ global _telemetry_initialized if _telemetry_initialized: logger.debug("Telemetry already initialized") return if not MCP_TELEMETRY_ENABLED: logger.info("Telemetry disabled via MCP_TELEMETRY_ENABLED") return try: logger.info( f"Initializing telemetry for {MCP_SERVER_NAME} v{MCP_SERVER_VERSION}" ) # Initialize Prometheus metrics _create_prometheus_metrics() logger.debug("Prometheus metrics created") # Start metrics server if requested if start_metrics: _start_metrics_server() _telemetry_initialized = True logger.info("Telemetry initialization complete") except Exception as e: logger.error(f"Failed to initialize telemetry: {e}") raise def is_telemetry_enabled() -> bool: """Check if telemetry is enabled and initialized.""" return MCP_TELEMETRY_ENABLED and _telemetry_initialized # Export the metric objects for use by other modules __all__ = [ "init_telemetry", "is_telemetry_enabled", "MCP_CALLS", "MCP_ERRS", "MCP_LAT", "MCP_REQ_B", "MCP_RES_B", "MCP_CONC", "MCP_SERVER_NAME", "MCP_SERVER_VERSION", ] ``` -------------------------------------------------------------------------------- /src/alphavantage_mcp_server/response_utils.py: -------------------------------------------------------------------------------- ```python """ Response utilities for handling large API responses and preventing token limit issues. """ import json from typing import Dict, Any def limit_time_series_response( response: Dict[str, Any], max_data_points: int = 100, preserve_metadata: bool = True ) -> Dict[str, Any]: """ Limit the number of data points in a time series response to prevent token limit issues. Args: response: The full API response from AlphaVantage max_data_points: Maximum number of data points to include (default: 100) preserve_metadata: Whether to preserve metadata sections (default: True) Returns: Limited response with reduced data points """ if not isinstance(response, dict): return response limited_response = {} # Preserve metadata sections (they're usually small) if preserve_metadata: for key, value in response.items(): if not isinstance(value, dict) or len(value) < 50: limited_response[key] = value # Find and limit the main time series data section time_series_keys = [ key for key in response.keys() if any( indicator in key.lower() for indicator in [ "time series", "technical analysis", "sma", "ema", "rsi", "macd", "bbands", "stoch", "adx", "aroon", "cci", "mom", "roc", "willr", "ad", "obv", "ht_", "atr", "natr", "trix", "ultosc", "dx", "minus_di", "plus_di", "minus_dm", "plus_dm", "midpoint", "midprice", "sar", "trange", "adosc", ] ) ] for ts_key in time_series_keys: if ts_key in response and isinstance(response[ts_key], dict): time_series_data = response[ts_key] # Get the most recent data points (sorted by date descending) sorted_dates = sorted(time_series_data.keys(), reverse=True) limited_dates = sorted_dates[:max_data_points] # Create limited time series with only recent data limited_time_series = { date: time_series_data[date] for date in limited_dates } limited_response[ts_key] = limited_time_series # Add summary info about the limitation if len(sorted_dates) > max_data_points: limited_response[f"{ts_key}_summary"] = { "total_data_points_available": len(sorted_dates), "data_points_returned": len(limited_dates), "date_range_returned": { "from": min(limited_dates), "to": max(limited_dates), }, "note": f"Response limited to {max_data_points} most recent data points to prevent token limit issues", } return limited_response def estimate_response_size(response: Any) -> int: """ Estimate the token size of a response (rough approximation). Args: response: The response to estimate Returns: Estimated number of tokens """ try: json_str = json.dumps(response, indent=2) # Rough approximation: 1 token ≈ 4 characters return len(json_str) // 4 except Exception: return 0 def should_limit_response(response: Any, max_tokens: int = 15000) -> bool: """ Check if a response should be limited based on estimated token count. Args: response: The response to check max_tokens: Maximum allowed tokens (default: 15000) Returns: True if response should be limited """ estimated_tokens = estimate_response_size(response) return estimated_tokens > max_tokens def create_response_summary(response: Dict[str, Any]) -> Dict[str, Any]: """ Create a summary of a large response instead of returning the full data. Args: response: The full response to summarize Returns: Summary of the response """ summary = { "response_type": "summary", "reason": "Full response too large, showing summary to prevent token limit issues", } # Add metadata sections for key, value in response.items(): if not isinstance(value, dict) or len(value) < 10: summary[key] = value # Summarize large data sections for key, value in response.items(): if isinstance(value, dict) and len(value) >= 10: summary[f"{key}_info"] = { "data_points": len(value), "date_range": { "earliest": min(value.keys()) if value else None, "latest": max(value.keys()) if value else None, }, "sample_fields": list(list(value.values())[0].keys()) if value else [], "note": "Use a more specific date range or limit parameter to get actual data", } return summary ``` -------------------------------------------------------------------------------- /deploy/aws-stateless-mcp-lambda/deploy.sh: -------------------------------------------------------------------------------- ```bash #!/bin/bash # AWS Stateless MCP Lambda Deployment Script # Based on aws-samples/sample-serverless-mcp-servers pattern set -e echo "🚀 AlphaVantage Stateless MCP Server Deployment" echo "==============================================" # Check prerequisites echo "📋 Checking prerequisites..." # Check if AWS CLI is installed if ! command -v aws &> /dev/null; then echo "❌ AWS CLI is not installed. Please install it first." exit 1 fi # Check if SAM CLI is installed if ! command -v sam &> /dev/null; then echo "❌ AWS SAM CLI is not installed. Please install it first." echo " Install with: pip install aws-sam-cli" exit 1 fi # Check AWS credentials if ! aws sts get-caller-identity &> /dev/null; then echo "❌ AWS credentials not configured. Please run 'aws configure' first." exit 1 fi # Check required environment variables if [ -z "$ALPHAVANTAGE_API_KEY" ]; then echo "❌ ALPHAVANTAGE_API_KEY environment variable is required." echo " Set it with: export ALPHAVANTAGE_API_KEY=your_api_key_here" exit 1 fi echo "✅ Prerequisites check passed" # Set deployment parameters STACK_NAME="alphavantage-stateless-mcp" OAUTH_ENABLED="${OAUTH_ENABLED:-false}" OAUTH_AUTHORIZATION_SERVER_URL="${OAUTH_AUTHORIZATION_SERVER_URL:-}" echo "" echo "📦 Deployment Configuration:" echo " Stack Name: $STACK_NAME" echo " OAuth Enabled: $OAUTH_ENABLED" echo " OAuth Server URL: ${OAUTH_AUTHORIZATION_SERVER_URL:-'(not set)'}" echo "" # Confirm deployment read -p "🤔 Do you want to proceed with deployment? (y/N): " -n 1 -r echo if [[ ! $REPLY =~ ^[Yy]$ ]]; then echo "❌ Deployment cancelled." exit 1 fi # Build the SAM application echo "" echo "🔨 Building SAM application..." sam build --use-container if [ $? -ne 0 ]; then echo "❌ SAM build failed." exit 1 fi echo "✅ Build completed successfully" # Deploy the SAM application echo "" echo "🚀 Deploying to AWS..." # Deploy with conditional parameter handling if [ -n "$OAUTH_AUTHORIZATION_SERVER_URL" ]; then # Deploy with OAuth URL sam deploy \ --stack-name "$STACK_NAME" \ --capabilities CAPABILITY_IAM \ --resolve-s3 \ --parameter-overrides \ "AlphaVantageApiKey=$ALPHAVANTAGE_API_KEY" \ "OAuthEnabled=$OAUTH_ENABLED" \ "OAuthAuthorizationServerUrl=$OAUTH_AUTHORIZATION_SERVER_URL" \ --no-confirm-changeset \ --no-fail-on-empty-changeset else # Deploy without OAuth URL (use default empty value) sam deploy \ --stack-name "$STACK_NAME" \ --capabilities CAPABILITY_IAM \ --resolve-s3 \ --parameter-overrides \ "AlphaVantageApiKey=$ALPHAVANTAGE_API_KEY" \ "OAuthEnabled=$OAUTH_ENABLED" \ --no-confirm-changeset \ --no-fail-on-empty-changeset fi if [ $? -ne 0 ]; then echo "❌ Deployment failed." exit 1 fi echo "✅ Deployment completed successfully!" # Get the API endpoint echo "" echo "📡 Getting deployment information..." API_URL=$(aws cloudformation describe-stacks \ --stack-name "$STACK_NAME" \ --query 'Stacks[0].Outputs[?OutputKey==`McpApiUrl`].OutputValue' \ --output text) FUNCTION_NAME=$(aws cloudformation describe-stacks \ --stack-name "$STACK_NAME" \ --query 'Stacks[0].Outputs[?OutputKey==`FunctionName`].OutputValue' \ --output text) echo "" echo "🎉 Deployment Summary:" echo "======================" echo " API Endpoint: $API_URL" echo " Function Name: $FUNCTION_NAME" echo " Stack Name: $STACK_NAME" echo "" # Test the deployment echo "🧪 Testing the deployment..." echo "" echo "1️⃣ Testing MCP Initialize..." INIT_RESPONSE=$(curl -s -X POST "$API_URL" \ -H 'Content-Type: application/json' \ -H 'Accept: application/json' \ -d '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test-client","version":"1.0.0"}}}') if echo "$INIT_RESPONSE" | grep -q '"result"'; then echo "✅ Initialize test passed" else echo "❌ Initialize test failed" echo "Response: $INIT_RESPONSE" fi echo "" echo "2️⃣ Testing Tools List..." TOOLS_RESPONSE=$(curl -s -X POST "$API_URL" \ -H 'Content-Type: application/json' \ -H 'Accept: application/json' \ -d '{"jsonrpc":"2.0","id":2,"method":"tools/list"}') if echo "$TOOLS_RESPONSE" | grep -q '"tools"'; then TOOL_COUNT=$(echo "$TOOLS_RESPONSE" | grep -o '"name"' | wc -l) echo "✅ Tools list test passed - Found $TOOL_COUNT tools" else echo "❌ Tools list test failed" echo "Response: $TOOLS_RESPONSE" fi echo "" echo "🎯 Manual Test Commands:" echo "========================" echo "" echo "# Initialize MCP session:" echo "curl -X POST '$API_URL' \\" echo " -H 'Content-Type: application/json' \\" echo " -H 'Accept: application/json' \\" echo " -d '{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"initialize\",\"params\":{\"protocolVersion\":\"2024-11-05\",\"capabilities\":{},\"clientInfo\":{\"name\":\"test-client\",\"version\":\"1.0.0\"}}}'" echo "" echo "# List available tools:" echo "curl -X POST '$API_URL' \\" echo " -H 'Content-Type: application/json' \\" echo " -H 'Accept: application/json' \\" echo " -d '{\"jsonrpc\":\"2.0\",\"id\":2,\"method\":\"tools/list\"}'" echo "" echo "# Call a tool (get stock quote for AAPL):" echo "curl -X POST '$API_URL' \\" echo " -H 'Content-Type: application/json' \\" echo " -H 'Accept: application/json' \\" echo " -d '{\"jsonrpc\":\"2.0\",\"id\":3,\"method\":\"tools/call\",\"params\":{\"name\":\"get_stock_quote\",\"arguments\":{\"symbol\":\"AAPL\"}}}'" echo "" echo "📊 Monitoring:" echo "==============" echo " CloudWatch Logs: aws logs tail /aws/lambda/$FUNCTION_NAME --follow" echo " Function Metrics: aws lambda get-function --function-name $FUNCTION_NAME" echo "" echo "🧹 Cleanup (when done testing):" echo "================================" echo " aws cloudformation delete-stack --stack-name $STACK_NAME" echo "" echo "✅ Stateless MCP deployment completed successfully!" echo " Your AlphaVantage MCP server is now running serverlessly on AWS Lambda!" ``` -------------------------------------------------------------------------------- /src/alphavantage_mcp_server/telemetry_instrument.py: -------------------------------------------------------------------------------- ```python """ Telemetry Instrumentation Module This module provides the @instrument_tool decorator for wrapping MCP tool functions with telemetry collection using Prometheus metrics. """ import asyncio import functools import logging import time from typing import Any, Callable, Optional from .telemetry_bootstrap import ( MCP_CALLS, MCP_ERRS, MCP_LAT, MCP_REQ_B, MCP_RES_B, MCP_CONC, is_telemetry_enabled, ) logger = logging.getLogger(__name__) def _classify_error(error: Exception) -> str: """ Classify an error into a category for metrics labeling. Args: error: The exception to classify Returns: Error category string: "timeout", "bad_input", "connection", or "unknown" """ if isinstance(error, (TimeoutError, asyncio.TimeoutError)): return "timeout" elif isinstance(error, (ValueError, TypeError, KeyError, AttributeError)): return "bad_input" elif isinstance(error, (ConnectionError, OSError)): return "connection" else: return "unknown" def _get_size_bytes(obj: Any) -> int: """ Calculate the approximate size of an object in bytes. Args: obj: Object to measure Returns: Size in bytes (0 if measurement fails) """ try: if obj is None: return 0 elif isinstance(obj, (str, bytes)): return len(obj) else: # For other objects, convert to string and measure return len(str(obj)) except Exception: return 0 def instrument_tool(tool_name: str, transport: Optional[str] = None) -> Callable: """ Decorator to instrument MCP tool functions with telemetry collection. This decorator: - Increments/decrements active concurrency gauge - Measures execution duration - Classifies and counts errors - Emits calls_total metric with outcome - Measures request and response payload sizes Args: tool_name: Name of the tool for metrics labeling transport: Transport type ("stdio", "http", etc.) for metrics labeling Returns: Decorated function with telemetry instrumentation """ def decorator(func: Callable) -> Callable: if not is_telemetry_enabled(): # If telemetry is disabled, return the original function unchanged return func @functools.wraps(func) async def async_wrapper(*args, **kwargs) -> Any: """Async wrapper for instrumented functions.""" start_time = time.time() outcome = "error" error_kind = None # Increment active concurrency if MCP_CONC: MCP_CONC.labels(tool=tool_name).inc() try: # Measure request size (approximate) request_data = {"args": args, "kwargs": kwargs} request_size = _get_size_bytes(request_data) if MCP_REQ_B: MCP_REQ_B.labels(tool=tool_name).observe(request_size) # Execute the actual function result = await func(*args, **kwargs) # Measure response size response_size = _get_size_bytes(result) if MCP_RES_B: MCP_RES_B.labels(tool=tool_name).observe(response_size) outcome = "ok" return result except Exception as e: error_kind = _classify_error(e) # Increment error counter if MCP_ERRS: MCP_ERRS.labels(tool=tool_name, error_kind=error_kind).inc() logger.warning(f"Tool {tool_name} failed with {error_kind} error: {e}") raise finally: # Record execution time duration = time.time() - start_time if MCP_LAT: MCP_LAT.labels(tool=tool_name).observe(duration) # Increment total calls counter if MCP_CALLS: MCP_CALLS.labels(tool=tool_name, outcome=outcome).inc() # Decrement active concurrency if MCP_CONC: MCP_CONC.labels(tool=tool_name).dec() @functools.wraps(func) def sync_wrapper(*args, **kwargs) -> Any: """Sync wrapper for instrumented functions.""" start_time = time.time() outcome = "error" error_kind = None # Increment active concurrency if MCP_CONC: MCP_CONC.labels(tool=tool_name).inc() try: # Measure request size (approximate) request_data = {"args": args, "kwargs": kwargs} request_size = _get_size_bytes(request_data) if MCP_REQ_B: MCP_REQ_B.labels(tool=tool_name).observe(request_size) # Execute the actual function result = func(*args, **kwargs) # Measure response size response_size = _get_size_bytes(result) if MCP_RES_B: MCP_RES_B.labels(tool=tool_name).observe(response_size) outcome = "ok" return result except Exception as e: error_kind = _classify_error(e) # Increment error counter if MCP_ERRS: MCP_ERRS.labels(tool=tool_name, error_kind=error_kind).inc() logger.warning(f"Tool {tool_name} failed with {error_kind} error: {e}") raise finally: # Record execution time duration = time.time() - start_time if MCP_LAT: MCP_LAT.labels(tool=tool_name).observe(duration) # Increment total calls counter if MCP_CALLS: MCP_CALLS.labels(tool=tool_name, outcome=outcome).inc() # Decrement active concurrency if MCP_CONC: MCP_CONC.labels(tool=tool_name).dec() # Return appropriate wrapper based on whether function is async if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator # Export the decorator __all__ = ["instrument_tool"] ``` -------------------------------------------------------------------------------- /tests/test_http_mcp_client.py: -------------------------------------------------------------------------------- ```python import asyncio import json import subprocess import sys import pytest from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client @pytest.mark.asyncio async def test_http_transport_with_mcp_client(): """Test HTTP transport using proper MCP streamable HTTP client""" # Start the HTTP server server_process = subprocess.Popen( [ sys.executable, "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "http", "--port", "8087", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) try: # Give server time to start await asyncio.sleep(4) base_url = "http://localhost:8087/" client = streamablehttp_client(base_url + "mcp") async with client as streams: # Extract streams from the context if len(streams) == 3: read_stream, write_stream, session_manager = streams else: read_stream, write_stream = streams async with ClientSession(read_stream, write_stream) as session: # Initialize the session init_result = await session.initialize() assert init_result is not None assert init_result.serverInfo.name == "alphavantage" # List available tools response = await session.list_tools() tools = response.tools tool_names = [tool.name for tool in tools] # Verify essential tools are present assert "stock_quote" in tool_names assert len(tools) > 0 # Test calling the stock_quote tool result = await session.call_tool("stock_quote", {"symbol": "AAPL"}) # Verify we got a result assert result is not None assert hasattr(result, "content") assert len(result.content) > 0 # Parse the JSON result content_text = result.content[0].text stock_data = json.loads(content_text) assert "Global Quote" in stock_data or "Error Message" in stock_data assert stock_data["Global Quote"]["01. symbol"] == "AAPL" finally: # Clean up server_process.terminate() try: server_process.wait(timeout=5) except subprocess.TimeoutExpired: server_process.kill() @pytest.mark.asyncio async def test_http_transport_tool_listing(): """Test HTTP transport tool listing functionality""" # Start the HTTP server server_process = subprocess.Popen( [ sys.executable, "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "http", "--port", "8088", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) try: # Give server time to start await asyncio.sleep(4) base_url = "http://localhost:8088/" client = streamablehttp_client(base_url + "mcp") async with client as streams: if len(streams) == 3: read_stream, write_stream, session_manager = streams else: read_stream, write_stream = streams async with ClientSession(read_stream, write_stream) as session: await session.initialize() response = await session.list_tools() tools = response.tools # Verify we have tools assert len(tools) > 0 # Verify essential tools are present tool_names = [tool.name for tool in tools] expected_tools = ["stock_quote", "time_series_daily"] for expected_tool in expected_tools: assert expected_tool in tool_names, ( f"{expected_tool} not found in tools" ) # Verify each tool has required attributes for tool in tools: assert hasattr(tool, "name") assert hasattr(tool, "description") assert hasattr(tool, "inputSchema") assert tool.name is not None assert tool.description is not None assert tool.inputSchema is not None finally: # Clean up server_process.terminate() try: server_process.wait(timeout=5) except subprocess.TimeoutExpired: server_process.kill() @pytest.mark.asyncio async def test_http_transport_tool_call(): """Test HTTP transport tool calling functionality""" # Start the HTTP server server_process = subprocess.Popen( [ sys.executable, "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "http", "--port", "8089", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) try: # Give server time to start await asyncio.sleep(4) base_url = "http://localhost:8089/" client = streamablehttp_client(base_url + "mcp") async with client as streams: if len(streams) == 3: read_stream, write_stream, session_manager = streams else: read_stream, write_stream = streams async with ClientSession(read_stream, write_stream) as session: # Initialize the session await session.initialize() # Call the stock_quote tool stock_result = await session.call_tool( "stock_quote", {"symbol": "MSFT"} ) # Verify we got a result assert stock_result is not None assert hasattr(stock_result, "content") assert len(stock_result.content) > 0 # Parse and validate the result stock_content = stock_result.content[0].text stock_data = json.loads(stock_content) assert "Global Quote" in stock_data assert stock_data["Global Quote"]["01. symbol"] == "MSFT" # Call time_series_daily tool time_series_result = await session.call_tool( "time_series_daily", {"symbol": "IBM", "outputsize": "compact"} ) # Verify we got a result assert time_series_result is not None assert hasattr(time_series_result, "content") assert len(time_series_result.content) > 0 # Parse and validate the result ts_content = time_series_result.content[0].text ts_data = json.loads(ts_content) assert "Time Series (Daily)" in ts_data assert "Meta Data" in ts_data assert ts_data["Meta Data"]["2. Symbol"] == "IBM" finally: # Clean up server_process.terminate() try: server_process.wait(timeout=5) except subprocess.TimeoutExpired: server_process.kill() ``` -------------------------------------------------------------------------------- /tests/test_http_transport.py: -------------------------------------------------------------------------------- ```python import asyncio import json import subprocess import sys import pytest from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client @pytest.mark.asyncio async def test_http_stock_quote(): """Test streamable-http transport with stock_quote tool""" # Start the HTTP server server_process = subprocess.Popen( [ sys.executable, "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "http", "--port", "8091", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) try: # Give server time to start await asyncio.sleep(4) base_url = "http://localhost:8091/" # Connect to the server using streamable-http client client = streamablehttp_client(base_url + "mcp") async with client as streams: # Handle different return formats from the client if len(streams) == 3: read_stream, write_stream, session_manager = streams else: read_stream, write_stream = streams async with ClientSession(read_stream, write_stream) as session: # Initialize the session await session.initialize() # List available tools response = await session.list_tools() tools = response.tools tool_names = [tool.name for tool in tools] # Verify stock_quote tool is available assert "stock_quote" in tool_names, ( f"stock_quote not found in tools: {tool_names}" ) # Test calling the stock_quote tool result = await session.call_tool("stock_quote", {"symbol": "AAPL"}) # Verify we got a result assert result is not None assert hasattr(result, "content") assert len(result.content) > 0 # Parse the JSON result to verify it contains stock data content_text = result.content[0].text stock_data = json.loads(content_text) # Verify the response contains expected stock quote fields assert "Global Quote" in stock_data or "Error Message" in stock_data if "Global Quote" in stock_data: global_quote = stock_data["Global Quote"] assert "01. symbol" in global_quote assert "05. price" in global_quote assert global_quote["01. symbol"] == "AAPL" finally: # Clean up server_process.terminate() try: server_process.wait(timeout=5) except subprocess.TimeoutExpired: server_process.kill() @pytest.mark.asyncio async def test_http_tool_list(): """Test that streamable-http transport can list all available tools""" # Start the HTTP server server_process = subprocess.Popen( [ sys.executable, "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "http", "--port", "8092", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) try: # Give server time to start await asyncio.sleep(4) base_url = "http://localhost:8092/" client = streamablehttp_client(base_url + "mcp") async with client as streams: if len(streams) == 3: read_stream, write_stream, session_manager = streams else: read_stream, write_stream = streams async with ClientSession(read_stream, write_stream) as session: await session.initialize() response = await session.list_tools() tools = response.tools # Verify we have tools assert len(tools) > 0 # Verify essential tools are present tool_names = [tool.name for tool in tools] expected_tools = ["stock_quote", "time_series_daily"] for expected_tool in expected_tools: assert expected_tool in tool_names, ( f"{expected_tool} not found in tools" ) # Verify each tool has required attributes for tool in tools: assert hasattr(tool, "name") assert hasattr(tool, "description") assert hasattr(tool, "inputSchema") assert tool.name is not None assert tool.description is not None assert tool.inputSchema is not None finally: # Clean up server_process.terminate() try: server_process.wait(timeout=5) except subprocess.TimeoutExpired: server_process.kill() @pytest.mark.asyncio async def test_http_multiple_calls(): """Test making multiple tool calls over streamable-http transport""" # Start the HTTP server server_process = subprocess.Popen( [ sys.executable, "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "http", "--port", "8093", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) try: # Give server time to start await asyncio.sleep(4) base_url = "http://localhost:8093/" client = streamablehttp_client(base_url + "mcp") async with client as streams: if len(streams) == 3: read_stream, write_stream, session_manager = streams else: read_stream, write_stream = streams async with ClientSession(read_stream, write_stream) as session: await session.initialize() # Make multiple calls to test session persistence symbols = ["AAPL", "GOOGL", "MSFT"] results = [] for symbol in symbols: result = await session.call_tool("stock_quote", {"symbol": symbol}) assert result is not None assert hasattr(result, "content") assert len(result.content) > 0 content_text = result.content[0].text stock_data = json.loads(content_text) results.append(stock_data) # Verify we got results for all symbols assert len(results) == len(symbols) # Verify each result contains stock data or error message for i, result in enumerate(results): assert "Global Quote" in result or "Error Message" in result, ( f"Invalid result for {symbols[i]}" ) finally: # Clean up server_process.terminate() try: server_process.wait(timeout=5) except subprocess.TimeoutExpired: server_process.kill() @pytest.mark.asyncio async def test_http_server_info(): """Test retrieving server information over streamable-http transport""" # Start the HTTP server server_process = subprocess.Popen( [ sys.executable, "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "http", "--port", "8094", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) try: # Give server time to start await asyncio.sleep(4) base_url = "http://localhost:8094/" client = streamablehttp_client(base_url + "mcp") async with client as streams: if len(streams) == 3: read_stream, write_stream, session_manager = streams else: read_stream, write_stream = streams async with ClientSession(read_stream, write_stream) as session: # Initialize and get server info init_result = await session.initialize() # Verify server info is present assert hasattr(init_result, "serverInfo") assert init_result.serverInfo.name == "alphavantage" assert init_result.serverInfo.version is not None # Verify protocol version assert init_result.protocolVersion is not None finally: # Clean up server_process.terminate() try: server_process.wait(timeout=5) except subprocess.TimeoutExpired: server_process.kill() ``` -------------------------------------------------------------------------------- /tests/test_integration.py: -------------------------------------------------------------------------------- ```python import asyncio import json import subprocess import sys import httpx import pytest from mcp import ClientSession from mcp.client.stdio import stdio_client, StdioServerParameters @pytest.mark.asyncio async def test_stdio_basic_connection(): """Test basic stdio connection and tool listing""" server_params = StdioServerParameters( command=sys.executable, args=[ "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "stdio", ], ) client = stdio_client(server_params) async with client as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Initialize the session init_result = await session.initialize() assert init_result is not None assert init_result.serverInfo.name == "alphavantage" # List available tools response = await session.list_tools() tools = response.tools tool_names = [tool.name for tool in tools] # Verify essential tools are present assert "stock_quote" in tool_names assert len(tools) > 0 @pytest.mark.asyncio async def test_http_basic_connection(): """Test basic HTTP connection using direct HTTP requests""" # Start the HTTP server server_process = subprocess.Popen( [ sys.executable, "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "http", "--port", "8084", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) try: # Give server time to start await asyncio.sleep(4) # Test basic HTTP endpoint async with httpx.AsyncClient() as client: # Test initialization request init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2025-06-18", "capabilities": {}, "clientInfo": {"name": "test-client", "version": "1.0.0"}, }, } response = await client.post( "http://localhost:8084/mcp", json=init_request, headers={ "Content-Type": "application/json", "Accept": "application/json, text/event-stream", "MCP-Protocol-Version": "2025-06-18", }, timeout=10.0, ) assert response.status_code == 200 json_data = response.json() assert json_data is not None assert "result" in json_data assert json_data["result"]["serverInfo"]["name"] == "alphavantage" # Send initialized notification (required after initialize) initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized", } await client.post( "http://localhost:8084/mcp", json=initialized_notification, headers={ "Content-Type": "application/json", "Accept": "application/json, text/event-stream", "MCP-Protocol-Version": "2025-06-18", }, timeout=10.0, ) # Test tools/list request # https://modelcontextprotocol.io/specification/2025-06-18/server/tools#listing-tools tools_request = {"jsonrpc": "2.0", "id": 2, "method": "tools/list"} response = await client.post( "http://localhost:8084/mcp", json=tools_request, headers={ "Content-Type": "application/json", "Accept": "application/json, text/event-stream", "MCP-Protocol-Version": "2025-06-18", }, timeout=10.0, ) assert response.status_code == 200 json_data = response.json() assert json_data is not None assert "result" in json_data assert "tools" in json_data["result"] tools = json_data["result"]["tools"] tool_names = [tool["name"] for tool in tools] assert "stock_quote" in tool_names assert len(tools) > 0 finally: # Clean up server_process.terminate() try: server_process.wait(timeout=5) except subprocess.TimeoutExpired: server_process.kill() @pytest.mark.asyncio async def test_stdio_stock_quote_call(): """Test calling STOCK_QUOTE tool via stdio""" server_params = StdioServerParameters( command=sys.executable, args=[ "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "stdio", ], ) client = stdio_client(server_params) async with client as (read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: # Initialize await session.initialize() # Call stock_quote tool result = await session.call_tool("stock_quote", {"symbol": "AAPL"}) # Verify we got a result assert result is not None assert hasattr(result, "content") assert len(result.content) > 0 # Parse the JSON result content_text = result.content[0].text stock_data = json.loads(content_text) # Should contain either valid data or error message assert "Global Quote" in stock_data or "Error Message" in stock_data @pytest.mark.asyncio async def test_http_stock_quote_call(): """Test calling STOCK_QUOTE tool via HTTP""" # Start the HTTP server server_process = subprocess.Popen( [ sys.executable, "-c", "import sys; sys.path.insert(0, 'src'); from alphavantage_mcp_server import main; main()", "--server", "http", "--port", "8085", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) try: # Give server time to start await asyncio.sleep(4) async with httpx.AsyncClient() as client: # Initialize first init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "test-client", "version": "1.0.0"}, }, } await client.post( "http://localhost:8085/mcp", json=init_request, headers={ "Content-Type": "application/json", "Accept": "application/json, text/event-stream", }, timeout=10.0, ) # Send initialized notification initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized", } await client.post( "http://localhost:8085/mcp", json=initialized_notification, headers={ "Content-Type": "application/json", "Accept": "application/json, text/event-stream", }, timeout=10.0, ) # Call stock_quote tool tool_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "stock_quote", "arguments": {"symbol": "AAPL"}}, } response = await client.post( "http://localhost:8085/mcp", json=tool_request, headers={ "Content-Type": "application/json", "Accept": "application/json, text/event-stream", }, timeout=30.0, ) assert response.status_code == 200 # Parse SSE response for tool call json_data = response.json() assert json_data is not None assert "result" in json_data assert "content" in json_data["result"] content = json_data["result"]["content"] assert len(content) > 0 # Parse the JSON result content_text = content[0]["text"] stock_data = json.loads(content_text) # Should contain either valid data or error message assert "Global Quote" in stock_data or "Error Message" in stock_data finally: # Clean up server_process.terminate() try: server_process.wait(timeout=5) except subprocess.TimeoutExpired: server_process.kill() ``` -------------------------------------------------------------------------------- /tests/test_api.py: -------------------------------------------------------------------------------- ```python import csv import os from io import StringIO import pytest from alphavantage_mcp_server.api import ( fetch_earnings_calendar, fetch_earnings_call_transcript, fetch_sma, ) @pytest.mark.asyncio async def test_fetch_earnings_call_transcript(): """Test fetching earnings call transcript with real API call.""" data = await fetch_earnings_call_transcript(symbol="IBM", quarter="2024Q1") assert isinstance(data, dict), "API should return JSON data as string" assert "symbol" in data, "JSON should contain 'symbol' field" assert "quarter" in data, "JSON should contain 'quarter' field" assert "transcript" in data, "JSON should contain 'transcript' field" assert data["symbol"] == "IBM", "Should find IBM data in the response" assert data["transcript"], "Transcript should not be empty" first_entry = data["transcript"][0] required_fields = ["speaker", "title", "content", "sentiment"] for field in required_fields: assert field in first_entry, f"Field '{field}' missing from transcript entry" assert first_entry["content"], "Transcript content should not be empty" @pytest.mark.asyncio async def test_fetch_earnings_calendar(): """Test fetching earnings calendar with real API call.""" api_key = os.getenv("ALPHAVANTAGE_API_KEY") assert api_key, "ALPHAVANTAGE_API_KEY must be set in environment" result = await fetch_earnings_calendar(symbol="AAPL", horizon="3month") assert isinstance(result, str), "API should return CSV data as string" # Parse CSV data csv_reader = csv.DictReader(StringIO(result)) rows = list(csv_reader) # Basic validation of structure assert rows, "CSV should contain at least one row" # Check required fields in first row first_row = rows[0] required_fields = ["symbol", "name", "reportDate"] for field in required_fields: assert field in first_row, f"Field '{field}' missing from CSV data" # Check if we found AAPL data apple_entries = [row for row in rows if row["symbol"] == "AAPL"] assert apple_entries, "Should find AAPL entries in the response" @pytest.mark.asyncio async def test_fetch_sma(): """Test fetching SMA (Simple Moving Average) with real API call.""" api_key = os.getenv("ALPHAVANTAGE_API_KEY") assert api_key, "ALPHAVANTAGE_API_KEY must be set in environment" # Test with common parameters that should work result = await fetch_sma( symbol="AAPL", interval="daily", time_period=20, series_type="close" ) assert isinstance(result, dict), "API should return JSON data as dict" # Check for expected structure in SMA response assert "Meta Data" in result, "Response should contain 'Meta Data' section" # Find the technical analysis key (it varies by indicator) tech_analysis_key = None for key in result.keys(): if "Technical Analysis" in key and "SMA" in key: tech_analysis_key = key break assert tech_analysis_key is not None, ( "Response should contain Technical Analysis section for SMA" ) # Validate metadata meta_data = result["Meta Data"] assert "1: Symbol" in meta_data, "Meta Data should contain symbol" assert "2: Indicator" in meta_data, "Meta Data should contain indicator type" assert "3: Last Refreshed" in meta_data, ( "Meta Data should contain last refreshed date" ) assert "4: Interval" in meta_data, "Meta Data should contain interval" assert "5: Time Period" in meta_data, "Meta Data should contain time period" assert "6: Series Type" in meta_data, "Meta Data should contain series type" assert meta_data["1: Symbol"] == "AAPL", "Symbol should match request" assert meta_data["2: Indicator"] == "Simple Moving Average (SMA)", ( "Indicator should be SMA" ) assert meta_data["4: Interval"] == "daily", "Interval should match request" assert meta_data["5: Time Period"] == 20, "Time period should match request" assert meta_data["6: Series Type"] == "close", "Series type should match request" # Validate technical analysis data sma_data = result[tech_analysis_key] assert isinstance(sma_data, dict), "SMA data should be a dictionary" assert len(sma_data) > 0, "SMA data should contain at least one data point" # Check structure of first data point first_date = list(sma_data.keys())[0] first_data_point = sma_data[first_date] assert isinstance(first_data_point, dict), "Each data point should be a dictionary" assert "SMA" in first_data_point, "Data point should contain SMA value" # Validate that SMA value is numeric sma_value = first_data_point["SMA"] assert isinstance(sma_value, str), "SMA value should be string (as returned by API)" float(sma_value) # Should not raise exception if valid number @pytest.mark.asyncio async def test_fetch_sma_with_month(): """Test fetching SMA with month parameter for intraday data.""" api_key = os.getenv("ALPHAVANTAGE_API_KEY") assert api_key, "ALPHAVANTAGE_API_KEY must be set in environment" # Test with intraday interval and month parameter result = await fetch_sma( symbol="MSFT", interval="60min", time_period=14, series_type="close", month="2024-01", ) assert isinstance(result, dict), "API should return JSON data as dict" assert "Meta Data" in result, "Response should contain 'Meta Data' section" # Validate that month parameter was applied meta_data = result["Meta Data"] assert "7: Time Zone" in meta_data, "Meta Data should contain time zone" assert meta_data["7: Time Zone"] == "US/Eastern", "Time zone should be US/Eastern" @pytest.mark.asyncio async def test_fetch_sma_csv_format(): """Test fetching SMA in CSV format.""" api_key = os.getenv("ALPHAVANTAGE_API_KEY") assert api_key, "ALPHAVANTAGE_API_KEY must be set in environment" result = await fetch_sma( symbol="GOOGL", interval="daily", time_period=10, series_type="close", datatype="csv", ) assert isinstance(result, str), "CSV format should return string data" assert len(result) > 0, "CSV data should not be empty" # Basic CSV validation lines = result.strip().split("\n") assert len(lines) > 1, "CSV should have header and at least one data row" # Check CSV header header = lines[0] assert "time" in header.lower(), "CSV should contain time column" assert "sma" in header.lower(), "CSV should contain SMA column" @pytest.mark.asyncio async def test_fetch_sma_with_response_limiting(): """Test SMA response limiting functionality to prevent token limit issues.""" api_key = os.getenv("ALPHAVANTAGE_API_KEY") assert api_key, "ALPHAVANTAGE_API_KEY must be set in environment" # Test with a small max_data_points to demonstrate limiting result = await fetch_sma( symbol="NVDA", interval="daily", time_period=14, series_type="close", max_data_points=10, # Limit to only 10 data points ) assert isinstance(result, dict), "API should return JSON data as dict" assert "Meta Data" in result, "Response should contain 'Meta Data' section" # Find the technical analysis key tech_analysis_key = None for key in result.keys(): if "Technical Analysis" in key and "SMA" in key: tech_analysis_key = key break assert tech_analysis_key is not None, ( "Response should contain Technical Analysis section for SMA" ) # Check that response was limited sma_data = result[tech_analysis_key] assert len(sma_data) <= 10, ( f"Response should be limited to 10 data points, got {len(sma_data)}" ) # Check for summary information if response was limited summary_key = f"{tech_analysis_key}_summary" if summary_key in result: summary = result[summary_key] assert "total_data_points_available" in summary, ( "Summary should show total available data points" ) assert "data_points_returned" in summary, ( "Summary should show returned data points" ) assert "note" in summary, "Summary should contain explanation note" assert summary["data_points_returned"] == len(sma_data), ( "Summary count should match actual data" ) # Verify dates are in descending order (most recent first) dates = list(sma_data.keys()) sorted_dates = sorted(dates, reverse=True) assert dates == sorted_dates, "Data points should be ordered by most recent first" @pytest.mark.asyncio async def test_fetch_sma_large_response_handling(): """Test SMA handling of potentially large responses.""" api_key = os.getenv("ALPHAVANTAGE_API_KEY") assert api_key, "ALPHAVANTAGE_API_KEY must be set in environment" # Test with default max_data_points (100) result = await fetch_sma( symbol="AAPL", interval="daily", time_period=20, series_type="close", # Using default max_data_points=100 ) assert isinstance(result, dict), "API should return JSON data as dict" # Find the technical analysis key tech_analysis_key = None for key in result.keys(): if "Technical Analysis" in key and "SMA" in key: tech_analysis_key = key break assert tech_analysis_key is not None, ( "Response should contain Technical Analysis section" ) # Check that response respects the default limit sma_data = result[tech_analysis_key] assert len(sma_data) <= 100, ( f"Response should be limited to 100 data points by default, got {len(sma_data)}" ) # Verify all data points have valid SMA values for date, data_point in sma_data.items(): assert "SMA" in data_point, f"Data point for {date} should contain SMA value" sma_value = data_point["SMA"] assert isinstance(sma_value, str), "SMA value should be string" float(sma_value) # Should not raise exception if valid number ``` -------------------------------------------------------------------------------- /deploy/aws-stateless-mcp-lambda/lambda_function.py: -------------------------------------------------------------------------------- ```python """ AWS Lambda function for stateless AlphaVantage MCP Server. Based on aws-samples/sample-serverless-mcp-servers/stateless-mcp-on-lambda-python pattern. """ import asyncio import json import os import sys from typing import Any, Dict # Add the source directory to Python path for imports sys.path.insert(0, "/opt/python") # Import MCP components # Import AlphaVantage MCP server components from alphavantage_mcp_server.server import ( handle_list_tools, handle_call_tool, list_prompts, get_prompt, get_version, ) from alphavantage_mcp_server.oauth import ( OAuthResourceServer, create_oauth_config_from_env, ) def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: """ AWS Lambda handler for stateless MCP requests. Each request is handled independently without session state. """ try: # Parse the incoming request if "body" not in event: return create_error_response(400, "Missing request body") # Handle both string and already-parsed JSON bodies if isinstance(event["body"], str): try: request_data = json.loads(event["body"]) except json.JSONDecodeError: return create_error_response(400, "Invalid JSON in request body") else: request_data = event["body"] # Validate JSON-RPC format if not isinstance(request_data, dict) or "jsonrpc" not in request_data: return create_error_response(400, "Invalid JSON-RPC request") # Handle OAuth if enabled oauth_server = None oauth_enabled = os.environ.get("OAUTH_ENABLED", "false").lower() == "true" if oauth_enabled: oauth_config = create_oauth_config_from_env() if oauth_config: oauth_server = OAuthResourceServer(oauth_config) # Check authentication for non-initialize requests method = request_data.get("method", "") if method != "initialize": auth_result = validate_oauth_request(event, oauth_server) if not auth_result["authenticated"]: return auth_result["response"] # Process the MCP request response = asyncio.run(handle_mcp_request(request_data, oauth_server)) return { "statusCode": 200, "headers": { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type, Authorization, X-Session-ID", }, "body": json.dumps(response), } except Exception as e: print(f"Lambda handler error: {str(e)}") return create_error_response(500, f"Internal server error: {str(e)}") async def handle_mcp_request( request_data: Dict[str, Any], oauth_server: OAuthResourceServer = None ) -> Dict[str, Any]: """ Handle MCP request in stateless mode. Each request creates a fresh server instance. """ method = request_data.get("method", "") request_id = request_data.get("id", 1) params = request_data.get("params", {}) try: # Handle different MCP methods if method == "initialize": return handle_initialize(request_id, params) elif method == "tools/list": return await handle_tools_list_request(request_id) elif method == "tools/call": return await handle_tools_call_request(request_id, params) elif method == "prompts/list": return await handle_prompts_list_request(request_id) elif method == "prompts/get": return await handle_prompts_get_request(request_id, params) else: return create_jsonrpc_error( request_id, -32601, f"Method not found: {method}" ) except Exception as e: print(f"MCP request error: {str(e)}") return create_jsonrpc_error(request_id, -32603, f"Internal error: {str(e)}") def handle_initialize(request_id: Any, params: Dict[str, Any]) -> Dict[str, Any]: """Handle MCP initialize request - stateless mode""" try: version = get_version() except Exception: version = "0.3.17" # Fallback version for Lambda return { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "experimental": {}, "prompts": {"listChanged": False}, "tools": {"listChanged": False}, }, "serverInfo": {"name": "alphavantage", "version": version}, }, } async def handle_tools_list_request(request_id: Any) -> Dict[str, Any]: """Handle tools/list request - get all available tools""" try: # Call the AlphaVantage server's handle_list_tools function directly tools = await handle_list_tools() # Convert MCP Tool objects to JSON-serializable format tools_json = [] for tool in tools: tool_dict = {"name": tool.name, "description": tool.description} if hasattr(tool, "inputSchema") and tool.inputSchema: tool_dict["inputSchema"] = tool.inputSchema tools_json.append(tool_dict) return {"jsonrpc": "2.0", "id": request_id, "result": {"tools": tools_json}} except Exception as e: print(f"Tools list error: {str(e)}") return create_jsonrpc_error( request_id, -32603, f"Failed to list tools: {str(e)}" ) async def handle_tools_call_request( request_id: Any, params: Dict[str, Any] ) -> Dict[str, Any]: """Handle tools/call request - execute a tool""" try: tool_name = params.get("name") arguments = params.get("arguments", {}) if not tool_name: return create_jsonrpc_error(request_id, -32602, "Missing tool name") # Call the tool using the AlphaVantage server's handle_call_tool function result = await handle_call_tool(tool_name, arguments) # Convert MCP result objects to JSON-serializable format content_list = [] if isinstance(result, list): for item in result: if hasattr(item, "text"): # TextContent object content_list.append({"type": "text", "text": item.text}) elif hasattr(item, "data"): # ImageContent object content_list.append( { "type": "image", "data": item.data, "mimeType": getattr(item, "mimeType", "image/png"), } ) elif hasattr(item, "uri"): # EmbeddedResource object content_list.append( { "type": "resource", "resource": { "uri": item.uri, "text": getattr(item, "text", ""), "mimeType": getattr(item, "mimeType", "text/plain"), }, } ) else: # Fallback for unknown types content_list.append({"type": "text", "text": str(item)}) else: # Single result if hasattr(result, "text"): content_list.append({"type": "text", "text": result.text}) else: content_list.append({"type": "text", "text": str(result)}) return {"jsonrpc": "2.0", "id": request_id, "result": {"content": content_list}} except Exception as e: print(f"Tool call error: {str(e)}") return create_jsonrpc_error( request_id, -32603, f"Tool execution failed: {str(e)}" ) async def handle_prompts_list_request(request_id: Any) -> Dict[str, Any]: """Handle prompts/list request""" try: # Call the AlphaVantage server's list_prompts function directly prompts = await list_prompts() # Convert to JSON-serializable format prompts_json = [] for prompt in prompts: prompt_dict = {"name": prompt.name, "description": prompt.description} if hasattr(prompt, "arguments") and prompt.arguments: prompt_dict["arguments"] = prompt.arguments prompts_json.append(prompt_dict) return {"jsonrpc": "2.0", "id": request_id, "result": {"prompts": prompts_json}} except Exception as e: print(f"Prompts list error: {str(e)}") return create_jsonrpc_error( request_id, -32603, f"Failed to list prompts: {str(e)}" ) async def handle_prompts_get_request( request_id: Any, params: Dict[str, Any] ) -> Dict[str, Any]: """Handle prompts/get request""" try: prompt_name = params.get("name") arguments = params.get("arguments", {}) if not prompt_name: return create_jsonrpc_error(request_id, -32602, "Missing prompt name") # Call the prompt using the AlphaVantage server's get_prompt function result = await get_prompt(prompt_name, arguments) return {"jsonrpc": "2.0", "id": request_id, "result": result} except Exception as e: print(f"Prompt get error: {str(e)}") return create_jsonrpc_error( request_id, -32603, f"Failed to get prompt: {str(e)}" ) def validate_oauth_request( event: Dict[str, Any], oauth_server: OAuthResourceServer ) -> Dict[str, Any]: """Validate OAuth authentication for the request""" try: # Extract authorization header headers = event.get("headers", {}) auth_header = headers.get("Authorization") or headers.get("authorization") if not auth_header or not auth_header.startswith("Bearer "): return { "authenticated": False, "response": create_oauth_error_response( 401, "invalid_token", "Missing or invalid authorization header" ), } # TODO: Implement OAuth token validation using oauth_server # For now, return authenticated=True to allow requests return {"authenticated": True} except Exception as e: return { "authenticated": False, "response": create_oauth_error_response( 500, "server_error", f"OAuth validation error: {str(e)}" ), } def create_error_response(status_code: int, message: str) -> Dict[str, Any]: """Create a standard error response""" return { "statusCode": status_code, "headers": { "Content-Type": "application/json", "Access-Control-Allow-Origin": "*", }, "body": json.dumps({"error": {"code": status_code, "message": message}}), } def create_jsonrpc_error(request_id: Any, code: int, message: str) -> Dict[str, Any]: """Create a JSON-RPC error response""" return { "jsonrpc": "2.0", "id": request_id, "error": {"code": code, "message": message}, } def create_oauth_error_response( status_code: int, error_type: str, description: str ) -> Dict[str, Any]: """Create an OAuth error response""" return { "statusCode": status_code, "headers": { "Content-Type": "application/json", "WWW-Authenticate": f'Bearer error="{error_type}", error_description="{description}"', "Access-Control-Allow-Origin": "*", }, "body": json.dumps({"error": error_type, "error_description": description}), } ``` -------------------------------------------------------------------------------- /src/alphavantage_mcp_server/oauth.py: -------------------------------------------------------------------------------- ```python """ OAuth 2.1 implementation for AlphaVantage MCP Server. This module provides OAuth 2.1 resource server functionality as required by the Model Context Protocol specification (2025-06-18). It supports configuration-driven OAuth that works with any compliant OAuth 2.1 authorization server. Key features: - OAuth 2.0 Protected Resource Metadata (RFC 9728) - Access token validation (JWT and introspection) - WWW-Authenticate header handling - Configuration-driven authorization server discovery - MCP Security Best Practices compliance Security Features: - Token audience validation (prevents token passthrough attacks) - Secure session ID generation and binding - User-specific session binding to prevent session hijacking - Proper error handling with OAuth-compliant responses """ import logging import secrets from dataclasses import dataclass from typing import Dict, List, Optional, Tuple, Union from urllib.parse import urljoin import httpx import jwt from starlette.requests import Request from starlette.responses import JSONResponse, Response logger = logging.getLogger(__name__) @dataclass class OAuthConfig: """Configuration for OAuth 2.1 resource server functionality.""" # Authorization server configuration authorization_server_url: str """URL of the OAuth 2.1 authorization server (e.g., https://auth.example.com)""" # Resource server identity resource_server_uri: str """Canonical URI of this MCP server (e.g., https://mcp.example.com)""" # Token validation settings token_validation_method: str = "jwt" """Method for token validation: 'jwt' or 'introspection'""" jwt_public_key: Optional[str] = None """Public key for JWT validation (PEM format)""" jwt_algorithm: str = "RS256" """JWT signing algorithm""" introspection_endpoint: Optional[str] = None """Token introspection endpoint URL (RFC 7662)""" introspection_client_id: Optional[str] = None """Client ID for introspection requests""" introspection_client_secret: Optional[str] = None """Client secret for introspection requests""" # Metadata configuration resource_metadata_path: str = "/.well-known/oauth-protected-resource" """Path for OAuth 2.0 Protected Resource Metadata endpoint""" # Optional scopes required_scopes: List[str] = None """List of required scopes for accessing this resource""" # Security settings session_binding_enabled: bool = True """Enable user-specific session binding for security""" def __post_init__(self): """Validate configuration after initialization.""" if not self.authorization_server_url: raise ValueError("authorization_server_url is required") if not self.resource_server_uri: raise ValueError("resource_server_uri is required") if self.token_validation_method == "jwt" and not self.jwt_public_key: logger.warning("JWT validation selected but no public key provided") elif ( self.token_validation_method == "introspection" and not self.introspection_endpoint ): raise ValueError( "introspection_endpoint required for introspection validation" ) if self.required_scopes is None: self.required_scopes = [] class OAuthError(Exception): """Base exception for OAuth-related errors.""" def __init__(self, error: str, description: str = None, status_code: int = 401): self.error = error self.description = description self.status_code = status_code super().__init__(f"{error}: {description}" if description else error) class TokenValidationResult: """Result of token validation.""" def __init__(self, valid: bool, claims: Dict = None, error: str = None): self.valid = valid self.claims = claims or {} self.error = error @property def subject(self) -> Optional[str]: """Get the subject (user) from token claims.""" return self.claims.get("sub") @property def audience(self) -> Optional[Union[str, List[str]]]: """Get the audience from token claims.""" return self.claims.get("aud") @property def scopes(self) -> List[str]: """Get the scopes from token claims.""" scope_claim = self.claims.get("scope", "") if isinstance(scope_claim, str): return scope_claim.split() if scope_claim else [] elif isinstance(scope_claim, list): return scope_claim return [] @property def user_id(self) -> Optional[str]: """Get a unique user identifier for session binding.""" # Use subject as the primary user identifier return self.subject class SecureSessionManager: """ Secure session management following MCP security best practices. Implements: - Secure, non-deterministic session IDs - User-specific session binding - Session validation """ def __init__(self): self._sessions: Dict[str, Dict] = {} def generate_session_id(self, user_id: str) -> str: """ Generate a secure session ID bound to a user. Format: <user_id_hash>:<secure_random_token> This prevents session hijacking by binding sessions to users. """ # Generate cryptographically secure random token secure_token = secrets.token_urlsafe(32) # Create a hash of user_id for binding (not reversible) import hashlib user_hash = hashlib.sha256(user_id.encode()).hexdigest()[:16] session_id = f"{user_hash}:{secure_token}" # Store session metadata self._sessions[session_id] = { "user_id": user_id, "created_at": __import__("time").time(), } logger.debug(f"Generated secure session ID for user: {user_id}") return session_id def validate_session(self, session_id: str, user_id: str) -> bool: """ Validate that a session ID belongs to the specified user. This prevents session hijacking attacks. """ if not session_id or session_id not in self._sessions: return False session_data = self._sessions[session_id] return session_data.get("user_id") == user_id def cleanup_expired_sessions(self, max_age_seconds: int = 3600): """Clean up expired sessions.""" current_time = __import__("time").time() expired_sessions = [ sid for sid, data in self._sessions.items() if current_time - data.get("created_at", 0) > max_age_seconds ] for sid in expired_sessions: del self._sessions[sid] if expired_sessions: logger.info(f"Cleaned up {len(expired_sessions)} expired sessions") class OAuthResourceServer: """OAuth 2.1 Resource Server implementation for MCP.""" def __init__(self, config: OAuthConfig): self.config = config self.session_manager = SecureSessionManager() self._http_client = httpx.AsyncClient() logger.info( f"Initialized OAuth resource server for {config.resource_server_uri}" ) async def get_protected_resource_metadata(self) -> Dict: """ Generate OAuth 2.0 Protected Resource Metadata (RFC 9728). Returns metadata that clients use to discover the authorization server. """ metadata = { "resource": self.config.resource_server_uri, "authorization_servers": [self.config.authorization_server_url], } if self.config.required_scopes: metadata["scopes_supported"] = self.config.required_scopes logger.debug(f"Generated resource metadata: {metadata}") return metadata async def handle_resource_metadata_request(self, request: Request) -> JSONResponse: """Handle requests to the protected resource metadata endpoint.""" try: metadata = await self.get_protected_resource_metadata() return JSONResponse( content=metadata, headers={"Content-Type": "application/json"} ) except Exception as e: logger.error(f"Error serving resource metadata: {e}") return JSONResponse( content={ "error": "server_error", "error_description": "Failed to generate metadata", }, status_code=500, ) def extract_bearer_token(self, request: Request) -> Optional[str]: """Extract Bearer token from Authorization header.""" auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): return None return auth_header[7:] # Remove "Bearer " prefix async def validate_jwt_token(self, token: str) -> TokenValidationResult: """Validate JWT access token with audience validation.""" if not self.config.jwt_public_key: return TokenValidationResult(False, error="JWT public key not configured") try: # Decode and verify JWT with strict audience validation # This prevents token passthrough attacks (MCP Security Best Practice) claims = jwt.decode( token, self.config.jwt_public_key, algorithms=[self.config.jwt_algorithm], audience=self.config.resource_server_uri, # Strict audience validation options={"verify_aud": True}, # Ensure audience is verified ) logger.debug(f"JWT validation successful for subject: {claims.get('sub')}") return TokenValidationResult(True, claims) except jwt.ExpiredSignatureError: logger.warning("Token validation failed: Token expired") return TokenValidationResult(False, error="Token expired") except jwt.InvalidAudienceError: logger.warning( f"Token validation failed: Invalid audience. Expected: {self.config.resource_server_uri}" ) return TokenValidationResult(False, error="Invalid audience") except jwt.InvalidTokenError as e: logger.warning(f"Token validation failed: {str(e)}") return TokenValidationResult(False, error=f"Invalid token: {str(e)}") except Exception as e: logger.error(f"JWT validation error: {e}") return TokenValidationResult(False, error="Token validation failed") async def validate_token_introspection(self, token: str) -> TokenValidationResult: """Validate token using OAuth 2.0 Token Introspection (RFC 7662).""" if not self.config.introspection_endpoint: return TokenValidationResult( False, error="Introspection endpoint not configured" ) try: # Prepare introspection request auth = None if ( self.config.introspection_client_id and self.config.introspection_client_secret ): auth = ( self.config.introspection_client_id, self.config.introspection_client_secret, ) response = await self._http_client.post( self.config.introspection_endpoint, data={"token": token}, auth=auth, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) if response.status_code != 200: logger.warning( f"Introspection failed with status: {response.status_code}" ) return TokenValidationResult(False, error="Introspection failed") introspection_result = response.json() # Check if token is active if not introspection_result.get("active", False): return TokenValidationResult(False, error="Token inactive") # Validate audience (prevents token passthrough attacks) token_audience = introspection_result.get("aud") if token_audience: if isinstance(token_audience, list): if self.config.resource_server_uri not in token_audience: logger.warning( f"Token audience mismatch. Expected: {self.config.resource_server_uri}, Got: {token_audience}" ) return TokenValidationResult(False, error="Invalid audience") elif token_audience != self.config.resource_server_uri: logger.warning( f"Token audience mismatch. Expected: {self.config.resource_server_uri}, Got: {token_audience}" ) return TokenValidationResult(False, error="Invalid audience") logger.debug( f"Token introspection successful for subject: {introspection_result.get('sub')}" ) return TokenValidationResult(True, introspection_result) except Exception as e: logger.error(f"Token introspection error: {e}") return TokenValidationResult(False, error="Introspection failed") async def validate_access_token(self, token: str) -> TokenValidationResult: """Validate access token using configured method.""" if self.config.token_validation_method == "jwt": result = await self.validate_jwt_token(token) elif self.config.token_validation_method == "introspection": result = await self.validate_token_introspection(token) else: return TokenValidationResult(False, error="Unknown validation method") # Check required scopes if token is valid if result.valid and self.config.required_scopes: token_scopes = result.scopes missing_scopes = set(self.config.required_scopes) - set(token_scopes) if missing_scopes: logger.warning(f"Token missing required scopes: {missing_scopes}") return TokenValidationResult(False, error="Insufficient scopes") return result async def authenticate_request( self, request: Request, session_id: str = None ) -> Tuple[bool, Optional[TokenValidationResult]]: """ Authenticate an incoming request with session validation. Implements MCP security best practices: - Verifies all inbound requests when OAuth is enabled - Validates session binding to prevent hijacking Returns: Tuple of (is_authenticated, validation_result) """ token = self.extract_bearer_token(request) if not token: logger.debug("No Bearer token found in request") return False, None result = await self.validate_access_token(token) if not result.valid: logger.warning(f"Token validation failed: {result.error}") return False, result # Additional session validation if session binding is enabled if self.config.session_binding_enabled and session_id and result.user_id: if not self.session_manager.validate_session(session_id, result.user_id): logger.warning(f"Session validation failed for user: {result.user_id}") return False, TokenValidationResult(False, error="Invalid session") logger.debug(f"Request authenticated for subject: {result.subject}") return True, result def create_www_authenticate_header(self) -> str: """Create WWW-Authenticate header for 401 responses.""" metadata_url = urljoin( self.config.resource_server_uri, self.config.resource_metadata_path ) return f'Bearer resource="{metadata_url}"' async def create_unauthorized_response( self, error: str = "invalid_token", description: str = None ) -> Response: """Create a 401 Unauthorized response with proper headers.""" www_auth = self.create_www_authenticate_header() error_response = {"error": error} if description: error_response["error_description"] = description return JSONResponse( content=error_response, status_code=401, headers={"WWW-Authenticate": www_auth}, ) async def create_forbidden_response( self, error: str = "insufficient_scope", description: str = None ) -> Response: """Create a 403 Forbidden response.""" error_response = {"error": error} if description: error_response["error_description"] = description return JSONResponse(content=error_response, status_code=403) def generate_secure_session(self, user_id: str) -> str: """Generate a secure session ID for a user.""" return self.session_manager.generate_session_id(user_id) async def cleanup(self): """Cleanup resources.""" await self._http_client.aclose() self.session_manager.cleanup_expired_sessions() def create_oauth_config_from_env() -> Optional[OAuthConfig]: """Create OAuth configuration from environment variables.""" import os auth_server_url = os.getenv("OAUTH_AUTHORIZATION_SERVER_URL") resource_uri = os.getenv("OAUTH_RESOURCE_SERVER_URI") if not auth_server_url or not resource_uri: logger.info("OAuth environment variables not found, OAuth disabled") return None return OAuthConfig( authorization_server_url=auth_server_url, resource_server_uri=resource_uri, token_validation_method=os.getenv("OAUTH_TOKEN_VALIDATION_METHOD", "jwt"), jwt_public_key=os.getenv("OAUTH_JWT_PUBLIC_KEY"), jwt_algorithm=os.getenv("OAUTH_JWT_ALGORITHM", "RS256"), introspection_endpoint=os.getenv("OAUTH_INTROSPECTION_ENDPOINT"), introspection_client_id=os.getenv("OAUTH_INTROSPECTION_CLIENT_ID"), introspection_client_secret=os.getenv("OAUTH_INTROSPECTION_CLIENT_SECRET"), required_scopes=os.getenv("OAUTH_REQUIRED_SCOPES", "").split() if os.getenv("OAUTH_REQUIRED_SCOPES") else [], session_binding_enabled=os.getenv( "OAUTH_SESSION_BINDING_ENABLED", "true" ).lower() == "true", ) ``` -------------------------------------------------------------------------------- /src/alphavantage_mcp_server/prompts.py: -------------------------------------------------------------------------------- ```python """ AlphaVantage MCP Server Prompts Definition This module contains the prompt definitions and schemas for the AlphaVantage MCP server. """ import mcp.types as types from mcp.types import Prompt from .tools import AlphavantageTools def prompts_definitions() -> list[Prompt]: return [ types.Prompt( name=AlphavantageTools.STOCK_QUOTE.value, description="Fetch the latest price and volume information for a ticker of your choice", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True, ), types.PromptArgument( name="datatype", description="Data type (json or csv). Default is json", required=False, ), ], ), types.Prompt( name=AlphavantageTools.TIME_SERIES_INTRADAY.value, description="Fetch current and 20+ years of historical intraday OHLCV time series of the equity specified", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), ], ), types.Prompt( name=AlphavantageTools.TIME_SERIES_DAILY.value, description="Fetch a time series daily", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.TIME_SERIES_DAILY_ADJUSTED.value, description="Fetch a time series daily adjusted", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="outputsize", description="Output size (compact or full)", required=False, ), types.PromptArgument( name="datatype", description="Data type (json or csv). Default is json", required=False, ), ], ), types.Prompt( name=AlphavantageTools.TIME_SERIES_WEEKLY.value, description="Fetch a time series weekly", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.TIME_SERIES_WEEKLY_ADJUSTED.value, description="Fetch a time series weekly adjusted", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.TIME_SERIES_MONTHLY.value, description="Fetch a time series monthly", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.TIME_SERIES_MONTHLY_ADJUSTED.value, description="Fetch a time series monthly adjusted", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.REALTIME_BULK_QUOTES.value, description="Fetch real time bulk quotes", arguments=[ types.PromptArgument( name="symbols", description="Stock symbols", required=True ) ], ), types.Prompt( name=AlphavantageTools.SYMBOL_SEARCH.value, description="Search endpoint", arguments=[ types.PromptArgument( name="keywords", description="Keywords", required=True ) ], ), types.Prompt( name=AlphavantageTools.MARKET_STATUS.value, description="Fetch market status", arguments=[], ), types.Prompt( name=AlphavantageTools.REALTIME_OPTIONS.value, description="Fetch realtime options", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.HISTORICAL_OPTIONS.value, description="Fetch the full historical options chain for a specific symbol on a specific date, covering 15+ years of history", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="date", description="Trading session date (YYYY-MM-DD). or example, date=2017-11-15", required=True, ), types.PromptArgument( name="datatype", description="Data type (json or csv). Default is json", required=True, ), ], ), types.Prompt( name=AlphavantageTools.NEWS_SENTIMENT.value, description="Fetch news sentiment", arguments=[ types.PromptArgument( name="tickers", description="Stock tickers", required=False ), types.PromptArgument( name="options", description="The news topics of your choice", required=False, ), types.PromptArgument( name="time_from", description="The time range of the news articles you are targeting, time_from=20220410T0130.", required=False, ), types.PromptArgument( name="time_to", description="The time range of the news articles you are targeting. time_to=20230410T0130", required=False, ), types.PromptArgument( name="sort", description="Sort by (latest or oldest). Default sort=LATEST", required=False, ), types.PromptArgument( name="limit", description="Limit the number of news articles returned. Default=50", required=False, ), ], ), types.Prompt( name=AlphavantageTools.TOP_GAINERS_LOSERS.value, description="Fetch top gainers and losers", arguments=[], ), types.Prompt( name=AlphavantageTools.INSIDER_TRANSACTIONS.value, description="Fetch insider transactions", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.ANALYTICS_FIXED_WINDOW.value, description="Fetch analytics fixed window", arguments=[ types.PromptArgument( name="symbols", description="Stock symbols", required=True ) ], ), types.Prompt( name=AlphavantageTools.ANALYTICS_SLIDING_WINDOW.value, description="Fetch analytics sliding window", arguments=[ types.PromptArgument( name="symbols", description="Stock symbols", required=True ) ], ), types.Prompt( name=AlphavantageTools.COMPANY_OVERVIEW.value, description="Fetch the company information, financial ratios, and other key metrics for the equity specified", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.ETF_PROFILE.value, description="Fetch ETF profile", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.COMPANY_DIVIDENDS.value, description="Fetch company dividends", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.COMPANY_SPLITS.value, description="Fetch company splits", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.INCOME_STATEMENT.value, description="Fetch company income statement", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.BALANCE_SHEET.value, description="Fetch company balance sheet", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.CASH_FLOW.value, description="Fetch company cash flow", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.LISTING_STATUS.value, description="Fetch listing status", arguments=[], ), types.Prompt( name=AlphavantageTools.COMPANY_EARNINGS.value, description="This API returns the annual and quarterly earnings (EPS) for the company of interest.", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.EARNINGS_CALENDAR.value, description="Fetch company earnings calendar", arguments=[], ), types.Prompt( name=AlphavantageTools.EARNINGS_CALL_TRANSCRIPT.value, description="Fetch earnings call transcript", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="quarter", description="Fiscal quarket in the format YYYYQM", required=True, ), ], ), types.Prompt( name=AlphavantageTools.IPO_CALENDAR.value, description="Fetch IPO calendar", arguments=[], ), types.Prompt( name=AlphavantageTools.EXCHANGE_RATE.value, description="Fetch exchange rate", arguments=[ types.PromptArgument( name="from_currency", description="The currency you would like to get the exchange rate for.", required=True, ), types.PromptArgument( name="to_currency", description="The destination currency for the exchange rate", required=True, ), ], ), types.Prompt( name=AlphavantageTools.FX_INTRADAY.value, description="Fetch FX intraday", arguments=[ types.PromptArgument( name="from_symbol", description="From symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.FX_DAILY.value, description="Fetch FX daily", arguments=[ types.PromptArgument( name="from_symbol", description="From symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.FX_WEEKLY.value, description="Fetch FX weekly", arguments=[ types.PromptArgument( name="from_symbol", description="From symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.FX_MONTHLY.value, description="Fetch FX monthly", arguments=[ types.PromptArgument( name="from_symbol", description="From symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.CRYPTO_INTRADAY.value, description="Fetch intraday time series (timestamp, open, high, low, close, volume) of the cryptocurrency specified", arguments=[ types.PromptArgument( name="symbol", description="The digital/crypto currency", required=True, ), types.PromptArgument( name="market", description="The exchange market of your choice", required=True, ), types.PromptArgument( name="interval", description="Time interval between two consecutive data points in the time series. " "The following values are supported: 1min, 5min, 15min, 30min, 60min", required=True, ), types.PromptArgument( name="datatype", description="Data type (json or csv). Default is json", required=False, ), types.PromptArgument( name="outputsize", description="Output size (compact or full)", required=False, ), ], ), types.Prompt( name=AlphavantageTools.DIGITAL_CURRENCY_DAILY.value, description="Fetch digital currency daily", arguments=[ types.PromptArgument( name="symbol", description="Digital currency symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.DIGITAL_CURRENCY_WEEKLY.value, description="Fetch digital currency weekly", arguments=[ types.PromptArgument( name="symbol", description="Digital currency symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.DIGITAL_CURRENCY_MONTHLY.value, description="Fetch digital currency monthly", arguments=[ types.PromptArgument( name="symbol", description="Digital currency symbol", required=True ) ], ), types.Prompt( name=AlphavantageTools.WTI_CRUDE_OIL.value, description="Fetch WTI crude oil", arguments=[], ), types.Prompt( name=AlphavantageTools.BRENT_CRUDE_OIL.value, description="Fetch Brent crude oil", arguments=[], ), types.Prompt( name=AlphavantageTools.NATURAL_GAS.value, description="Fetch natural gas", arguments=[], ), types.Prompt( name=AlphavantageTools.COPPER.value, description="Fetch copper", arguments=[], ), types.Prompt( name=AlphavantageTools.ALUMINUM.value, description="Fetch aluminum", arguments=[], ), types.Prompt( name=AlphavantageTools.WHEAT.value, description="Fetch wheat", arguments=[] ), types.Prompt( name=AlphavantageTools.CORN.value, description="Fetch corn", arguments=[] ), types.Prompt( name=AlphavantageTools.COTTON.value, description="Fetch cotton", arguments=[], ), types.Prompt( name=AlphavantageTools.SUGAR.value, description="Fetch sugar", arguments=[] ), types.Prompt( name=AlphavantageTools.COFFEE.value, description="Fetch coffee", arguments=[], ), types.Prompt( name=AlphavantageTools.ALL_COMMODITIES.value, description="Fetch all commodities", arguments=[], ), types.Prompt( name=AlphavantageTools.REAL_GDP.value, description="Fetch real GDP", arguments=[], ), types.Prompt( name=AlphavantageTools.REAL_GDP_PER_CAPITA.value, description="Fetch real GDP per capita", arguments=[], ), types.Prompt( name=AlphavantageTools.TREASURY_YIELD.value, description="Fetch treasury yield", arguments=[], ), types.Prompt( name=AlphavantageTools.FEDERAL_FUNDS_RATE.value, description="Fetch federal funds rate", arguments=[], ), types.Prompt( name=AlphavantageTools.CPI.value, description="Fetch consumer price index", arguments=[], ), types.Prompt( name=AlphavantageTools.INFLATION.value, description="Fetch inflation", arguments=[], ), types.Prompt( name=AlphavantageTools.RETAIL_SALES.value, description="Fetch retail sales", arguments=[], ), types.Prompt( name=AlphavantageTools.DURABLES.value, description="Fetch durables", arguments=[], ), types.Prompt( name=AlphavantageTools.UNEMPLOYMENT.value, description="Fetch unemployment", arguments=[], ), types.Prompt( name=AlphavantageTools.NONFARM_PAYROLL.value, description="Fetch nonfarm payroll", arguments=[], ), types.Prompt( name=AlphavantageTools.SMA.value, description="Fetch the simple moving average (SMA) values", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Time interval between two consecutive data points in the time series. " "The following values are supported: 1min, 5min, 15min, 30min, 60min, daily, weekly, monthly", required=True, ), types.PromptArgument( name="month", description="ONLY applicable to intraday intervals (1min, 5min, 15min, 30min, and 60min) for the equity markets. For example, month=2009-01", required=False, ), types.PromptArgument( name="time_period", description="Number of data points used to calculate each moving average value. E.g, time_period=60", required=True, ), types.PromptArgument( name="series_type", description="The desired price type in the time series. Four types are supported: close, open, high, low", required=True, ), types.PromptArgument( name="datatype", description="Data type (json or csv). Default is json", required=False, ), types.PromptArgument( name="max_data_points", description="Maximum number of data points to fetch. Default is 100", required=False, ), ], ), types.Prompt( name=AlphavantageTools.EMA.value, description="Fetch the exponential moving average (EMA) values", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Time interval between two consecutive data points in the time series. " "The following values are supported: 1min, 5min, 15min, 30min, 60min, daily, weekly, monthly", required=True, ), types.PromptArgument( name="month", description="ONLY applicable to intraday intervals (1min, 5min, 15min, 30min, and 60min) for the equity markets. For example, month=2009-01", required=False, ), types.PromptArgument( name="time_period", description="Number of data points used to calculate each moving average value. E.g, time_period=60", required=True, ), types.PromptArgument( name="series_type", description="The desired price type in the time series. Four types are supported: close, open, high, low", required=True, ), types.PromptArgument( name="datatype", description="Data type (json or csv). Default is json", required=False, ), ], ), types.Prompt( name=AlphavantageTools.WMA.value, description="Fetch weighted moving average", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.DEMA.value, description="Fetch double exponential moving average", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.TRIMA.value, description="Fetch triangular moving average", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.KAMA.value, description="Fetch Kaufman adaptive moving average", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.MAMA.value, description="Fetch MESA adaptive moving average", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), types.PromptArgument( name="fastlimit", description="Fast limit", required=True ), types.PromptArgument( name="slowlimit", description="Slow limit", required=True ), ], ), types.Prompt( name=AlphavantageTools.T3.value, description="Fetch triple exponential moving average", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.MACD.value, description="Fetch moving average convergence divergence", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.MACDEXT.value, description="Fetch moving average convergence divergence extended", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.STOCH.value, description="Fetch stochastic oscillator", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), ], ), types.Prompt( name=AlphavantageTools.STOCHF.value, description="Fetch stochastic oscillator fast", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), ], ), types.Prompt( name=AlphavantageTools.RSI.value, description="Fetch relative strength index", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.STOCHRSI.value, description="Fetch stochastic relative strength index", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.WILLR.value, description="Fetch Williams' percent range", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.ADX.value, description="Fetch average directional movement index", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.ADXR.value, description="Fetch average directional movement index rating", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.APO.value, description="Fetch absolute price oscillator", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), types.PromptArgument( name="fastperiod", description="Fast period", required=True ), types.PromptArgument( name="slowperiod", description="Slow period", required=True ), ], ), types.Prompt( name=AlphavantageTools.PPO.value, description="Fetch percentage price oscillator", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), types.PromptArgument( name="fastperiod", description="Fast period", required=True ), types.PromptArgument( name="slowperiod", description="Slow period", required=True ), ], ), types.Prompt( name=AlphavantageTools.MOM.value, description="Fetch momentum", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.BOP.value, description="Fetch balance of power", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), ], ), types.Prompt( name=AlphavantageTools.CCI.value, description="Fetch commodity channel index", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.CMO.value, description="Fetch Chande momentum oscillator", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.ROC.value, description="Fetch rate of change", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), types.PromptArgument( name="series_type", description="The desired price type in the time series. Four types are supported: close, open, high, low", required=True, ), ], ), types.Prompt( name=AlphavantageTools.ROCR.value, description="Fetch rate of change ratio", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.AROON.value, description="Fetch Aroon", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.AROONOSC.value, description="Fetch aroon oscillator", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.MFI.value, description="Fetch money flow index", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.TRIX.value, description="Fetch triple exponential average", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), types.PromptArgument( name="series_type", description="The desired price type in the time series. Four types are supported: close, open, high, low", required=True, ), ], ), types.Prompt( name=AlphavantageTools.ULTOSC.value, description="Fetch ultimate oscillator", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="timeperiod1", description="Time period 1", required=True ), types.PromptArgument( name="timeperiod2", description="Time period 2", required=True ), types.PromptArgument( name="timeperiod3", description="Time period 3", required=True ), ], ), types.Prompt( name=AlphavantageTools.DX.value, description="Fetch directional movement index", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.MINUS_DI.value, description="Fetch minus directional indicator", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.PLUS_DI.value, description="Fetch plus directional indicator", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.MINUS_DM.value, description="Fetch minus directional movement", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.PLUS_DM.value, description="Fetch plus directional movement", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.BBANDS.value, description="Fetch Bollinger bands", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), types.PromptArgument( name="nbdevup", description="Nbdevup", required=True ), types.PromptArgument( name="nbdevdn", description="Nbdevdn", required=True ), ], ), types.Prompt( name=AlphavantageTools.MIDPOINT.value, description="Fetch midpoint", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.MIDPRICE.value, description="Fetch midprice", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.SAR.value, description="Fetch parabolic SAR", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), ], ), types.Prompt( name=AlphavantageTools.TRANGE.value, description="Fetch true range", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), ], ), types.Prompt( name=AlphavantageTools.ATR.value, description="Fetch average true range", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.NATR.value, description="Fetch normalized average true range", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="time_period", description="Time period", required=True ), ], ), types.Prompt( name=AlphavantageTools.AD.value, description="Fetch Chaikin A/D line", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), ], ), types.Prompt( name=AlphavantageTools.ADOSC.value, description="Fetch Chaikin A/D oscillator", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="fastperiod", description="Fast period", required=True ), types.PromptArgument( name="slowperiod", description="Slow period", required=True ), ], ), types.Prompt( name=AlphavantageTools.OBV.value, description="Fetch on balance volume", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), ], ), types.Prompt( name=AlphavantageTools.HT_TRENDLINE.value, description="Fetch Hilbert transform - trendline", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.HT_SINE.value, description="Fetch Hilbert transform - sine wave", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), types.PromptArgument( name="series_type", description="Series type", required=True ), ], ), types.Prompt( name=AlphavantageTools.HT_TRENDMODE.value, description="Fetch Hilbert transform - trend mode", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), ], ), types.Prompt( name=AlphavantageTools.HT_DCPERIOD.value, description="Fetch Hilbert transform - dominant cycle period", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), ], ), types.Prompt( name=AlphavantageTools.HT_DCPHASE.value, description="Fetch Hilbert transform - dominant cycle phase", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), ], ), types.Prompt( name=AlphavantageTools.HT_PHASOR.value, description="Fetch Hilbert transform - phasor components", arguments=[ types.PromptArgument( name="symbol", description="Stock symbol", required=True ), types.PromptArgument( name="interval", description="Interval", required=True ), ], ), ] ```