# Directory Structure
```
├── .cursor
│ └── rules
│ └── akshare-cursor-rule.mdc
├── .DS_Store
├── .gitignore
├── claude_desktop_config.json
├── data
│ └── strong_stocks_20250303.csv
├── Dockerfile
├── install.sh
├── pyproject.toml
├── README.md
├── run_server.py
├── smithery.yaml
├── src
│ ├── .DS_Store
│ └── mcp_server_akshare
│ ├── __init__.py
│ ├── .DS_Store
│ ├── api.py
│ └── server.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
venv/
__pycache__/
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# AKShare MCP Server
A Model Context Protocol (MCP) server that provides financial data analysis capabilities using the AKShare library.
## Features
- Access to Chinese and global financial market data through AKShare
- Integration with Claude Desktop via MCP protocol
- Support for various financial data queries and analysis
## Installation
### Using uv (recommended)
```bash
# Clone the repository
git clone https://github.com/yourusername/akshare_mcp_server.git
cd akshare_mcp_server
# Create and activate a virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies with uv
uv pip install -e .
```
### Using pip
```bash
# Clone the repository
git clone https://github.com/yourusername/akshare_mcp_server.git
cd akshare_mcp_server
# Create and activate a virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -e .
```
## Usage
### Running the server
```bash
# Activate the virtual environment
source venv/bin/activate # On Windows: venv\Scripts\activate
# Run the server
python run_server.py
```
### Integrating with Claude Desktop
1. Add the following configuration to your Claude Desktop configuration:
```json
"mcpServers": {
"akshare-mcp": {
"command": "uv",
"args": [
"--directory",
"/path/to/akshare_mcp_server",
"run",
"akshare-mcp"
],
"env": {
"AKSHARE_API_KEY": "<your_api_key_if_needed>"
}
}
}
```
2. Restart Claude Desktop
3. Select the AKShare MCP server from the available tools
## Available Tools
The AKShare MCP server provides the following tools:
- Stock data queries
- Fund data queries
- Bond data queries
- Futures data queries
- Forex data queries
- Macroeconomic data queries
- And more...
## Adding a New Tool
To add a new tool to the MCP server, follow these steps:
1. **Add a new API function in `src/mcp_server_akshare/api.py`**:
```python
async def fetch_new_data_function(param1: str, param2: str = "default") -> List[Dict[str, Any]]:
"""
Fetch new data type.
Args:
param1: Description of param1
param2: Description of param2
"""
try:
df = ak.akshare_function_name(param1=param1, param2=param2)
return dataframe_to_dict(df)
except Exception as e:
logger.error(f"Error fetching new data: {e}")
raise
```
2. **Add the new tool to the enum in `src/mcp_server_akshare/server.py`**:
```python
class AKShareTools(str, Enum):
# Existing tools...
NEW_TOOL_NAME = "new_tool_name"
```
3. **Import the new function in `src/mcp_server_akshare/server.py`**:
```python
from .api import (
# Existing imports...
fetch_new_data_function,
)
```
4. **Add the tool definition to the `handle_list_tools()` function**:
```python
types.Tool(
name=AKShareTools.NEW_TOOL_NAME.value,
description="Description of the new tool",
inputSchema={
"type": "object",
"properties": {
"param1": {"type": "string", "description": "Description of param1"},
"param2": {"type": "string", "description": "Description of param2"},
},
"required": ["param1"], # List required parameters
},
),
```
5. **Add the tool handler in the `handle_call_tool()` function**:
```python
case AKShareTools.NEW_TOOL_NAME.value:
param1 = arguments.get("param1")
if not param1:
raise ValueError("Missing required argument: param1")
param2 = arguments.get("param2", "default")
result = await fetch_new_data_function(
param1=param1,
param2=param2,
)
```
6. **Test the new tool** by running the server and making a request to the new tool.
## Development
```bash
# Install development dependencies
uv pip install -e ".[dev]"
# Run tests
pytest
```
## Docker
You can also run the server using Docker:
```bash
# Build the Docker image
docker build -t akshare-mcp-server .
# Run the Docker container
docker run -p 8000:8000 akshare-mcp-server
```
## License
MIT
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
FROM python:3.10-slim
WORKDIR /app
# Install uv
RUN pip install uv
# Copy project files
COPY . .
# Install dependencies
RUN uv pip install -e .
# Expose port if needed
# EXPOSE 8000
# Run the server
CMD ["python", "run_server.py"]
```
--------------------------------------------------------------------------------
/install.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
# Create and activate virtual environment
python3 -m venv venv
source venv/bin/activate
# Install uv if not already installed
pip install uv
# Install the package with uv
uv pip install -e .
echo "Installation complete. You can now run the server with 'python run_server.py'"
```
--------------------------------------------------------------------------------
/claude_desktop_config.json:
--------------------------------------------------------------------------------
```json
{
"mcpServers": {
"akshare-mcp": {
"command": "uv",
"args": [
"--directory",
"/Users/hlchen/CodeHub/akshare_mcp_server",
"run",
"akshare-mcp"
],
"env": {
"AKSHARE_API_KEY": "<insert_api_key_if_needed>"
}
}
}
}
```
--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------
```yaml
name: mcp-server-akshare
version: 0.1.0
description: MCP server for AKShare financial data
license: MIT
authors:
- name: Your Name
email: [email protected]
dependencies:
- akshare>=1.11.0
- mcp>=0.1.0
- httpx>=0.24.0
- python-dotenv>=1.0.0
- pandas>=2.0.0
- numpy>=1.24.0
dev-dependencies:
- black>=23.3.0
- isort>=5.12.0
- mypy>=1.3.0
- pytest>=7.3.1
- pytest-asyncio>=0.21.0
scripts:
akshare-mcp: mcp_server_akshare:main
```
--------------------------------------------------------------------------------
/run_server.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""
Entry point for the AKShare MCP server.
"""
import asyncio
import logging
import sys
from src.mcp_server_akshare import main
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
if __name__ == "__main__":
print("Starting AKShare MCP server...")
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Server stopped by user.")
sys.exit(0)
except Exception as e:
logger.error(f"Error running server: {e}", exc_info=True)
sys.exit(1)
```
--------------------------------------------------------------------------------
/src/mcp_server_akshare/__init__.py:
--------------------------------------------------------------------------------
```python
"""
MCP server for AKShare financial data.
"""
import asyncio
import logging
from typing import Optional
from .server import main as server_main
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
async def main() -> None:
"""
Main entry point for the AKShare MCP server.
"""
logger.info("Starting AKShare MCP server...")
try:
await server_main()
except Exception as e:
logger.error(f"Error running AKShare MCP server: {e}", exc_info=True)
raise
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "mcp-server-akshare"
version = "0.1.0"
description = "MCP server for AKShare financial data"
readme = "README.md"
requires-python = ">=3.10"
license = {text = "MIT"}
authors = [
{name = "Your Name", email = "[email protected]"},
]
dependencies = [
"akshare>=1.11.0",
"mcp>=0.1.0",
"httpx>=0.24.0",
"python-dotenv>=1.0.0",
"pandas>=2.0.0",
"numpy>=1.24.0",
]
[project.optional-dependencies]
dev = [
"black>=23.3.0",
"isort>=5.12.0",
"mypy>=1.3.0",
"pytest>=7.3.1",
"pytest-asyncio>=0.21.0",
]
[project.scripts]
akshare-mcp = "mcp_server_akshare:main"
[tool.black]
line-length = 100
target-version = ["py310"]
[tool.isort]
profile = "black"
line_length = 100
[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
```
--------------------------------------------------------------------------------
/src/mcp_server_akshare/api.py:
--------------------------------------------------------------------------------
```python
"""
API functions for interacting with AKShare.
"""
import logging
import os
from typing import Any, Dict, List, Optional, Union
import akshare as ak
import pandas as pd
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logger = logging.getLogger(__name__)
# Optional API key (if needed for certain endpoints)
API_KEY = os.getenv("AKSHARE_API_KEY")
def dataframe_to_dict(df: pd.DataFrame) -> List[Dict[str, Any]]:
"""
Convert a pandas DataFrame to a list of dictionaries.
"""
return df.to_dict(orient="records")
def dataframe_to_json(df: pd.DataFrame) -> str:
"""
Convert a pandas DataFrame to a JSON string.
"""
return df.to_json(orient="records", date_format="iso")
async def fetch_stock_zh_a_spot() -> List[Dict[str, Any]]:
"""
Fetch A-share stock data.
"""
try:
df = ak.stock_zh_a_spot()
return dataframe_to_dict(df)
except Exception as e:
logger.error(f"Error fetching A-share stock data: {e}")
raise
async def fetch_stock_zh_a_hist(
symbol: str,
period: str = "daily",
start_date: str = None,
end_date: str = None,
adjust: str = ""
) -> List[Dict[str, Any]]:
"""
Fetch A-share stock historical data.
Args:
symbol: Stock code
period: Data frequency, options: daily, weekly, monthly
start_date: Start date in format YYYYMMDD
end_date: End date in format YYYYMMDD
adjust: Price adjustment, options: "", qfq (forward), hfq (backward)
"""
try:
df = ak.stock_zh_a_hist(
symbol=symbol,
period=period,
start_date=start_date,
end_date=end_date,
adjust=adjust
)
return dataframe_to_dict(df)
except Exception as e:
logger.error(f"Error fetching stock historical data for {symbol}: {e}")
raise
async def fetch_stock_zh_index_spot() -> List[Dict[str, Any]]:
"""
Fetch Chinese stock market index data.
"""
try:
df = ak.stock_zh_index_spot()
return dataframe_to_dict(df)
except Exception as e:
logger.error(f"Error fetching stock index data: {e}")
raise
async def fetch_stock_zh_index_daily(symbol: str) -> List[Dict[str, Any]]:
"""
Fetch Chinese stock market index daily data.
Args:
symbol: Index code
"""
try:
df = ak.stock_zh_index_daily(symbol=symbol)
return dataframe_to_dict(df)
except Exception as e:
logger.error(f"Error fetching stock index daily data for {symbol}: {e}")
raise
async def fetch_fund_etf_category_sina(category: str = "ETF基金") -> List[Dict[str, Any]]:
"""
Fetch ETF fund data from Sina.
Args:
category: Fund category
"""
try:
df = ak.fund_etf_category_sina(category=category)
return dataframe_to_dict(df)
except Exception as e:
logger.error(f"Error fetching ETF fund data: {e}")
raise
async def fetch_fund_etf_hist_sina(symbol: str) -> List[Dict[str, Any]]:
"""
Fetch ETF fund historical data from Sina.
Args:
symbol: ETF fund code
"""
try:
df = ak.fund_etf_hist_sina(symbol=symbol)
return dataframe_to_dict(df)
except Exception as e:
logger.error(f"Error fetching ETF fund historical data for {symbol}: {e}")
raise
async def fetch_macro_china_gdp() -> List[Dict[str, Any]]:
"""
Fetch China GDP data.
"""
try:
df = ak.macro_china_gdp()
return dataframe_to_dict(df)
except Exception as e:
logger.error(f"Error fetching China GDP data: {e}")
raise
async def fetch_macro_china_cpi() -> List[Dict[str, Any]]:
"""
Fetch China CPI data.
"""
try:
df = ak.macro_china_cpi()
return dataframe_to_dict(df)
except Exception as e:
logger.error(f"Error fetching China CPI data: {e}")
raise
async def fetch_forex_spot_quote() -> List[Dict[str, Any]]:
"""
Fetch forex spot quotes.
"""
try:
df = ak.forex_spot_quote()
return dataframe_to_dict(df)
except Exception as e:
logger.error(f"Error fetching forex spot quotes: {e}")
raise
async def fetch_futures_zh_spot() -> List[Dict[str, Any]]:
"""
Fetch Chinese futures market spot data.
"""
try:
df = ak.futures_zh_spot()
return dataframe_to_dict(df)
except Exception as e:
logger.error(f"Error fetching futures spot data: {e}")
raise
async def fetch_bond_zh_hs_cov_spot() -> List[Dict[str, Any]]:
"""
Fetch Chinese convertible bond data.
"""
try:
df = ak.bond_zh_hs_cov_spot()
return dataframe_to_dict(df)
except Exception as e:
logger.error(f"Error fetching convertible bond data: {e}")
raise
async def fetch_stock_zt_pool_strong_em(date: str = None) -> List[Dict[str, Any]]:
"""
Fetch strong stock pool data from East Money.
Args:
date: Date in format YYYYMMDD
"""
try:
logger.info(f"Fetching strong stock pool data for date: {date}")
df = ak.stock_zt_pool_strong_em(date=date)
logger.info(f"Result type: {type(df)}")
logger.info(f"Is DataFrame empty: {df.empty}")
if not df.empty:
logger.info(f"DataFrame shape: {df.shape}")
logger.info(f"DataFrame columns: {df.columns.tolist()}")
return dataframe_to_dict(df)
else:
logger.warning(f"No data available for date: {date}")
# Try without date parameter as a fallback
if date:
logger.info("Trying again without date parameter as fallback...")
df_fallback = ak.stock_zt_pool_strong_em()
logger.info(f"Fallback result type: {type(df_fallback)}")
logger.info(f"Fallback is DataFrame empty: {df_fallback.empty}")
if not df_fallback.empty:
logger.info(f"Fallback DataFrame shape: {df_fallback.shape}")
logger.info(f"Fallback DataFrame columns: {df_fallback.columns.tolist()}")
return dataframe_to_dict(df_fallback)
else:
logger.warning("Fallback also returned empty DataFrame")
# Return empty list if no data is available
return []
except Exception as e:
logger.error(f"Error fetching strong stock pool data for date {date}: {e}")
raise
```