#
tokens: 24327/50000 28/28 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── LICENSE
├── mcp_server.py
├── pyproject.toml
├── README.md
├── resource
│   └── img
│       ├── ali.png
│       ├── gzh_code.jpg
│       ├── img_1.png
│       ├── img_2.png
│       └── planet.jpg
├── src
│   ├── __init__.py
│   ├── baostock_data_source.py
│   ├── data_source_interface.py
│   ├── formatting
│   │   ├── __init__.py
│   │   └── markdown_formatter.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── tool_runner.py
│   │   └── validation.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── analysis.py
│   │   ├── date_utils.py
│   │   ├── financial_reports.py
│   │   ├── helpers.py
│   │   ├── indices.py
│   │   ├── macroeconomic.py
│   │   ├── market_overview.py
│   │   └── stock_market.py
│   ├── use_cases
│   │   ├── __init__.py
│   │   ├── analysis.py
│   │   ├── date_utils.py
│   │   ├── financial_reports.py
│   │   ├── helpers.py
│   │   ├── indices.py
│   │   ├── macroeconomic.py
│   │   ├── market_overview.py
│   │   └── stock_market.py
│   └── utils.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/
AGENTS.md
# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# Pipfile.lock

# PEP 582; used by PDM, PEP 582 proposal
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# VS Code settings
.vscode/

docs/

# Local tests (do not commit real-data logs or scripts)
test/






```

--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------

```python
# This file makes src a Python package

```

--------------------------------------------------------------------------------
/src/tools/__init__.py:
--------------------------------------------------------------------------------

```python
# Initialization file for tools package

```

--------------------------------------------------------------------------------
/src/formatting/__init__.py:
--------------------------------------------------------------------------------

```python
# Initialization file for formatting package

```

--------------------------------------------------------------------------------
/src/use_cases/__init__.py:
--------------------------------------------------------------------------------

```python
"""Use case layer to keep tools thin and consistent."""

```

--------------------------------------------------------------------------------
/src/services/__init__.py:
--------------------------------------------------------------------------------

```python
"""Shared services for validation and execution helpers."""

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "a-share-mcp"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
    "baostock>=0.8.9",
    "httpx>=0.28.1",
    "mcp[cli]>=1.2.0",
    "pandas>=2.2.3",
    "annotated-types>=0.7.0",
    "anyio>=4.9.0",
    "certifi>=2025.4.26",
    "click>=8.1.8",
    "colorama>=0.4.6",
    "h11>=0.16.0",
    "httpcore>=1.0.9",
    "httpx-sse>=0.4.0",
    "idna>=3.10",
    "markdown-it-py>=3.0.0",
    "mdurl>=0.1.2",
    "numpy>=2.2.5",
    "pydantic>=2.11.3",
    "pydantic-core>=2.33.1",
    "pydantic-settings>=2.9.1",
    "pygments>=2.19.1",
    "python-dateutil>=2.9.0",
    "python-dotenv>=1.1.0",
    "pytz>=2025.2",
    "rich>=14.0.0",
    "shellingham>=1.5.4",
    "six>=1.17.0",
    "sniffio>=1.3.1",
    "sse-starlette>=2.3.3",
    "starlette>=0.46.2",
    "tabulate>=0.9.0",
    "typer>=0.15.3",
    "typing-extensions>=4.13.2",
    "typing-inspection>=0.4.0",
    "tzdata>=2025.2",
    "uvicorn>=0.34.2",
]

[tool.uv]
dev-dependencies = [
    "pytest>=8.3.0",
]

```

--------------------------------------------------------------------------------
/src/tools/analysis.py:
--------------------------------------------------------------------------------

```python
"""
Analysis tools for MCP server.
Delegates heavy lifting to use-case layer.
"""
import logging

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource
from src.services.tool_runner import run_tool_with_handling
from src.use_cases.analysis import build_stock_analysis_report

logger = logging.getLogger(__name__)


def register_analysis_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """Register analysis tools."""

    @app.tool()
    def get_stock_analysis(code: str, analysis_type: str = "fundamental") -> str:
        """
        提供基于数据的股票分析报告,而非投资建议。

        Args:
            code: 股票代码,如'sh.600000'
            analysis_type: 'fundamental'|'technical'|'comprehensive'
        """
        logger.info(f"Tool 'get_stock_analysis' called for {code}, type={analysis_type}")
        return run_tool_with_handling(
            lambda: build_stock_analysis_report(active_data_source, code=code, analysis_type=analysis_type),
            context=f"get_stock_analysis:{code}:{analysis_type}",
        )

```

--------------------------------------------------------------------------------
/src/use_cases/helpers.py:
--------------------------------------------------------------------------------

```python
"""Helper use cases for normalization utilities."""
import re

from src.services.validation import validate_non_empty_str


def normalize_stock_code_logic(code: str) -> str:
    validate_non_empty_str(code, "code")
    raw = code.strip()

    m = re.fullmatch(r"(?i)(sh|sz)[.]?(\d{6})", raw)
    if m:
        ex, num = m.group(1).lower(), m.group(2)
        return f"{ex}.{num}"

    m2 = re.fullmatch(r"(\d{6})[.]?(?i:(sh|sz))", raw)
    if m2:
        num, ex = m2.group(1), m2.group(2).lower()
        return f"{ex}.{num}"

    m3 = re.fullmatch(r"(\d{6})", raw)
    if m3:
        num = m3.group(1)
        ex = "sh" if num.startswith("6") else "sz"
        return f"{ex}.{num}"

    raise ValueError("Unsupported code format. Examples: 'sh.600000', '600000', '000001.SZ'.")


def normalize_index_code_logic(code: str) -> str:
    validate_non_empty_str(code, "code")
    raw = code.strip().upper()
    if raw in {"000300", "CSI300", "HS300"}:
        return "sh.000300"
    if raw in {"000016", "SSE50", "SZ50"}:
        return "sh.000016"
    if raw in {"000905", "ZZ500", "CSI500"}:
        return "sh.000905"
    raise ValueError("Unsupported index code. Examples: 000300/CSI300/HS300, 000016, 000905.")

```

--------------------------------------------------------------------------------
/src/services/tool_runner.py:
--------------------------------------------------------------------------------

```python
"""Common error handling for MCP tools."""
import logging
from typing import Callable

from src.data_source_interface import NoDataFoundError, LoginError, DataSourceError

logger = logging.getLogger(__name__)


def run_tool_with_handling(action: Callable[[], str], context: str) -> str:
    """
    Executes a callable and normalizes exceptions to user-friendly strings.

    Args:
        action: Callable returning a string (typically formatted output).
        context: Short description for logs.
    """
    try:
        return action()
    except NoDataFoundError as e:
        logger.warning(f"{context}: No data found: {e}")
        return f"Error: {e}"
    except LoginError as e:
        logger.error(f"{context}: Login error: {e}")
        return f"Error: Could not connect to data source. {e}"
    except DataSourceError as e:
        logger.error(f"{context}: Data source error: {e}")
        return f"Error: An error occurred while fetching data. {e}"
    except ValueError as e:
        logger.warning(f"{context}: Validation error: {e}")
        return f"Error: Invalid input parameter. {e}"
    except Exception as e:  # Catch-all
        logger.exception(f"{context}: Unexpected error: {e}")
        return f"Error: An unexpected error occurred: {e}"

```

--------------------------------------------------------------------------------
/src/services/validation.py:
--------------------------------------------------------------------------------

```python
"""Validation utilities for tool inputs."""
from typing import Iterable

VALID_FREQS = ["d", "w", "m", "5", "15", "30", "60"]
VALID_ADJUST_FLAGS = ["1", "2", "3"]
VALID_FORMATS = ["markdown", "json", "csv"]
VALID_YEAR_TYPES = ["report", "operate"]
VALID_RESERVE_YEAR_TYPES = ["0", "1", "2"]


def _ensure_in(value: str, allowed: Iterable[str], label: str) -> None:
    if value not in allowed:
        raise ValueError(f"Invalid {label} '{value}'. Valid options are: {list(allowed)}")


def validate_frequency(frequency: str) -> None:
    _ensure_in(frequency, VALID_FREQS, "frequency")


def validate_adjust_flag(adjust_flag: str) -> None:
    _ensure_in(adjust_flag, VALID_ADJUST_FLAGS, "adjust_flag")


def validate_output_format(fmt: str) -> None:
    _ensure_in(fmt, VALID_FORMATS, "format")


def validate_year(year: str) -> None:
    if not year.isdigit() or len(year) != 4:
        raise ValueError(f"Invalid year '{year}'. Please provide a 4-digit year.")


def validate_year_type(year_type: str) -> None:
    _ensure_in(year_type, VALID_YEAR_TYPES, "year_type")


def validate_quarter(quarter: int) -> None:
    if quarter not in (1, 2, 3, 4):
        raise ValueError("Invalid quarter. Must be between 1 and 4.")


def validate_non_empty_str(value: str, label: str) -> None:
    if value is None or not str(value).strip():
        raise ValueError(f"'{label}' is required.")


def validate_index_key(value: str, mapping: dict) -> str:
    key = mapping.get(value.lower()) if isinstance(value, str) else None
    if not key:
        raise ValueError(f"Invalid index '{value}'. Valid options: {sorted(set(mapping.values()))}")
    return key


def validate_year_type_reserve(year_type: str) -> None:
    _ensure_in(year_type, VALID_RESERVE_YEAR_TYPES, "year_type")


def validate_limit(limit: int) -> None:
    if limit <= 0:
        raise ValueError("limit must be positive.")

```

--------------------------------------------------------------------------------
/src/use_cases/market_overview.py:
--------------------------------------------------------------------------------

```python
"""Use cases for market overview tools."""
from typing import Optional

from src.data_source_interface import FinancialDataSource
from src.formatting.markdown_formatter import format_table_output
from src.services.validation import validate_output_format, validate_non_empty_str


def fetch_trade_dates(data_source: FinancialDataSource, *, start_date: Optional[str], end_date: Optional[str], limit: int, format: str) -> str:
    validate_output_format(format)
    df = data_source.get_trade_dates(start_date=start_date, end_date=end_date)
    meta = {"start_date": start_date or "default", "end_date": end_date or "default"}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)


def fetch_all_stock(data_source: FinancialDataSource, *, date: Optional[str], limit: int, format: str) -> str:
    validate_output_format(format)
    df = data_source.get_all_stock(date=date)
    meta = {"as_of": date or "default"}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)


def fetch_search_stocks(data_source: FinancialDataSource, *, keyword: str, date: Optional[str], limit: int, format: str) -> str:
    validate_output_format(format)
    validate_non_empty_str(keyword, "keyword")
    df = data_source.get_all_stock(date=date)
    if df is None or df.empty:
        return "(No data available to display)"
    kw = keyword.strip().lower()
    filtered = df[df["code"].str.lower().str.contains(kw, na=False)]
    meta = {"keyword": keyword, "as_of": date or "current"}
    return format_table_output(filtered, format=format, max_rows=limit, meta=meta)


def fetch_suspensions(data_source: FinancialDataSource, *, date: Optional[str], limit: int, format: str) -> str:
    validate_output_format(format)
    df = data_source.get_all_stock(date=date)
    if df is None or df.empty:
        return "(No data available to display)"
    if "tradeStatus" not in df.columns:
        raise ValueError("'tradeStatus' column not present in data source response.")
    suspended = df[df["tradeStatus"] == '0']
    meta = {"as_of": date or "current", "total_suspended": int(suspended.shape[0])}
    return format_table_output(suspended, format=format, max_rows=limit, meta=meta)

```

--------------------------------------------------------------------------------
/src/utils.py:
--------------------------------------------------------------------------------

```python
# Utility functions, including the Baostock login context manager and logging setup
import baostock as bs
import os
import sys
import logging
from contextlib import contextmanager
from .data_source_interface import LoginError

# --- Logging Setup ---
def setup_logging(level=logging.INFO):
    """Configures basic logging for the application."""
    logging.basicConfig(
        level=level,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    # Optionally silence logs from dependencies if they are too verbose
    # logging.getLogger("mcp").setLevel(logging.WARNING)

# Get a logger instance for this module (optional, but good practice)
logger = logging.getLogger(__name__)

# --- Baostock Context Manager ---
@contextmanager
def baostock_login_context():
    """Context manager to handle Baostock login and logout, suppressing stdout messages."""
    # Redirect stdout to suppress login/logout messages
    original_stdout_fd = sys.stdout.fileno()
    saved_stdout_fd = os.dup(original_stdout_fd)
    devnull_fd = os.open(os.devnull, os.O_WRONLY)

    os.dup2(devnull_fd, original_stdout_fd)
    os.close(devnull_fd)

    logger.debug("Attempting Baostock login...")
    lg = bs.login()
    logger.debug(f"Login result: code={lg.error_code}, msg={lg.error_msg}")

    # Restore stdout
    os.dup2(saved_stdout_fd, original_stdout_fd)
    os.close(saved_stdout_fd)

    if lg.error_code != '0':
        # Log error before raising
        logger.error(f"Baostock login failed: {lg.error_msg}")
        raise LoginError(f"Baostock login failed: {lg.error_msg}")

    logger.info("Baostock login successful.")
    try:
        yield  # API calls happen here
    finally:
        # Redirect stdout again for logout
        original_stdout_fd = sys.stdout.fileno()
        saved_stdout_fd = os.dup(original_stdout_fd)
        devnull_fd = os.open(os.devnull, os.O_WRONLY)

        os.dup2(devnull_fd, original_stdout_fd)
        os.close(devnull_fd)

        logger.debug("Attempting Baostock logout...")
        bs.logout()
        logger.debug("Logout completed.")

        # Restore stdout
        os.dup2(saved_stdout_fd, original_stdout_fd)
        os.close(saved_stdout_fd)
        logger.info("Baostock logout successful.")

# You can add other utility functions or classes here if needed

```

--------------------------------------------------------------------------------
/src/use_cases/indices.py:
--------------------------------------------------------------------------------

```python
"""Use cases for index and industry related tools."""
from typing import Optional

from src.data_source_interface import FinancialDataSource
from src.formatting.markdown_formatter import format_table_output
from src.services.validation import validate_output_format, validate_index_key, validate_non_empty_str

INDEX_MAP = {
    "hs300": "hs300",
    "沪深300": "hs300",
    "zz500": "zz500",
    "中证500": "zz500",
    "sz50": "sz50",
    "上证50": "sz50",
}


def fetch_stock_industry(data_source: FinancialDataSource, *, code: Optional[str], date: Optional[str], limit: int, format: str) -> str:
    validate_output_format(format)
    df = data_source.get_stock_industry(code=code, date=date)
    meta = {"code": code or "all", "as_of": date or "latest"}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)


def fetch_index_constituents(data_source: FinancialDataSource, *, index: str, date: Optional[str], limit: int, format: str) -> str:
    validate_output_format(format)
    key = validate_index_key(index, INDEX_MAP)
    if key == "hs300":
        df = data_source.get_hs300_stocks(date=date)
    elif key == "sz50":
        df = data_source.get_sz50_stocks(date=date)
    else:
        df = data_source.get_zz500_stocks(date=date)
    meta = {"index": key, "as_of": date or "latest"}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)


def fetch_list_industries(data_source: FinancialDataSource, *, date: Optional[str], format: str) -> str:
    validate_output_format(format)
    df = data_source.get_stock_industry(code=None, date=date)
    if df is None or df.empty:
        return "(No data available to display)"
    col = "industry" if "industry" in df.columns else df.columns[-1]
    out = df[[col]].drop_duplicates().sort_values(by=col)
    out = out.rename(columns={col: "industry"})
    meta = {"as_of": date or "latest", "count": int(out.shape[0])}
    return format_table_output(out, format=format, max_rows=out.shape[0], meta=meta)


def fetch_industry_members(data_source: FinancialDataSource, *, industry: str, date: Optional[str], limit: int, format: str) -> str:
    validate_output_format(format)
    validate_non_empty_str(industry, "industry")
    df = data_source.get_stock_industry(code=None, date=date)
    if df is None or df.empty:
        return "(No data available to display)"
    col = "industry" if "industry" in df.columns else df.columns[-1]
    filtered = df[df[col] == industry].copy()
    meta = {"industry": industry, "as_of": date or "latest"}
    return format_table_output(filtered, format=format, max_rows=limit, meta=meta)

```

--------------------------------------------------------------------------------
/src/use_cases/macroeconomic.py:
--------------------------------------------------------------------------------

```python
"""Use cases for macroeconomic data tools."""
from typing import Optional

from src.data_source_interface import FinancialDataSource
from src.formatting.markdown_formatter import format_table_output
from src.services.validation import validate_output_format, validate_year_type_reserve


def fetch_deposit_rate_data(data_source: FinancialDataSource, *, start_date: Optional[str], end_date: Optional[str], limit: int, format: str) -> str:
    validate_output_format(format)
    df = data_source.get_deposit_rate_data(start_date=start_date, end_date=end_date)
    meta = {"dataset": "deposit_rate", "start_date": start_date, "end_date": end_date}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)


def fetch_loan_rate_data(data_source: FinancialDataSource, *, start_date: Optional[str], end_date: Optional[str], limit: int, format: str) -> str:
    validate_output_format(format)
    df = data_source.get_loan_rate_data(start_date=start_date, end_date=end_date)
    meta = {"dataset": "loan_rate", "start_date": start_date, "end_date": end_date}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)


def fetch_required_reserve_ratio_data(data_source: FinancialDataSource, *, start_date: Optional[str], end_date: Optional[str], year_type: str, limit: int, format: str) -> str:
    validate_output_format(format)
    validate_year_type_reserve(year_type)
    df = data_source.get_required_reserve_ratio_data(start_date=start_date, end_date=end_date, year_type=year_type)
    meta = {"dataset": "required_reserve_ratio", "start_date": start_date, "end_date": end_date, "year_type": year_type}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)


def fetch_money_supply_data_month(data_source: FinancialDataSource, *, start_date: Optional[str], end_date: Optional[str], limit: int, format: str) -> str:
    validate_output_format(format)
    df = data_source.get_money_supply_data_month(start_date=start_date, end_date=end_date)
    meta = {"dataset": "money_supply_month", "start_date": start_date, "end_date": end_date}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)


def fetch_money_supply_data_year(data_source: FinancialDataSource, *, start_date: Optional[str], end_date: Optional[str], limit: int, format: str) -> str:
    validate_output_format(format)
    df = data_source.get_money_supply_data_year(start_date=start_date, end_date=end_date)
    meta = {"dataset": "money_supply_year", "start_date": start_date, "end_date": end_date}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)

```

--------------------------------------------------------------------------------
/src/use_cases/stock_market.py:
--------------------------------------------------------------------------------

```python
"""Stock market use cases orchestrating data fetch and formatting."""
from typing import List, Optional

import pandas as pd

from src.data_source_interface import FinancialDataSource
from src.formatting.markdown_formatter import format_table_output
from src.services.validation import (
    validate_adjust_flag,
    validate_frequency,
    validate_output_format,
    validate_year,
    validate_year_type,
)


def fetch_historical_k_data(
    data_source: FinancialDataSource,
    *,
    code: str,
    start_date: str,
    end_date: str,
    frequency: str = "d",
    adjust_flag: str = "3",
    fields: Optional[List[str]] = None,
    limit: int = 250,
    format: str = "markdown",
) -> str:
    validate_frequency(frequency)
    validate_adjust_flag(adjust_flag)
    validate_output_format(format)

    df = data_source.get_historical_k_data(
        code=code,
        start_date=start_date,
        end_date=end_date,
        frequency=frequency,
        adjust_flag=adjust_flag,
        fields=fields,
    )
    meta = {
        "code": code,
        "start_date": start_date,
        "end_date": end_date,
        "frequency": frequency,
        "adjust_flag": adjust_flag,
    }
    return format_table_output(df, format=format, max_rows=limit, meta=meta)


def fetch_stock_basic_info(
    data_source: FinancialDataSource,
    *,
    code: str,
    fields: Optional[List[str]] = None,
    format: str = "markdown",
) -> str:
    validate_output_format(format)
    df = data_source.get_stock_basic_info(code=code, fields=fields)
    meta = {"code": code}
    return format_table_output(df, format=format, max_rows=df.shape[0] if df is not None else 0, meta=meta)


def fetch_dividend_data(
    data_source: FinancialDataSource,
    *,
    code: str,
    year: str,
    year_type: str = "report",
    limit: int = 250,
    format: str = "markdown",
) -> str:
    validate_year(year)
    validate_year_type(year_type)
    validate_output_format(format)

    df = data_source.get_dividend_data(code=code, year=year, year_type=year_type)
    meta = {"code": code, "year": year, "year_type": year_type}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)


def fetch_adjust_factor_data(
    data_source: FinancialDataSource,
    *,
    code: str,
    start_date: str,
    end_date: str,
    limit: int = 250,
    format: str = "markdown",
) -> str:
    validate_output_format(format)
    df = data_source.get_adjust_factor_data(code=code, start_date=start_date, end_date=end_date)
    meta = {"code": code, "start_date": start_date, "end_date": end_date}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)

```

--------------------------------------------------------------------------------
/src/tools/helpers.py:
--------------------------------------------------------------------------------

```python
"""
Helper tools for code normalization and constants discovery.
Uses shared validation and helper logic.
"""
import logging
from typing import Optional

from mcp.server.fastmcp import FastMCP
from src.services.tool_runner import run_tool_with_handling
from src.use_cases.helpers import normalize_index_code_logic, normalize_stock_code_logic

logger = logging.getLogger(__name__)


def register_helpers_tools(app: FastMCP):
    """Register helper/utility tools with the MCP app."""

    @app.tool()
    def normalize_stock_code(code: str) -> str:
        """Normalize a stock code to Baostock format."""
        logger.info("Tool 'normalize_stock_code' called with input=%s", code)
        return run_tool_with_handling(
            lambda: normalize_stock_code_logic(code),
            context="normalize_stock_code",
        )

    @app.tool()
    def normalize_index_code(code: str) -> str:
        """Normalize common index codes to Baostock format."""
        logger.info("Tool 'normalize_index_code' called with input=%s", code)
        return run_tool_with_handling(
            lambda: normalize_index_code_logic(code),
            context="normalize_index_code",
        )

    @app.tool()
    def list_tool_constants(kind: Optional[str] = None) -> str:
        """
        List valid constants for tool parameters.

        Args:
            kind: Optional filter: 'frequency' | 'adjust_flag' | 'year_type' | 'index'. If None, show all.
        """
        logger.info("Tool 'list_tool_constants' called kind=%s", kind or "all")
        freq = [
            ("d", "daily"), ("w", "weekly"), ("m", "monthly"),
            ("5", "5 minutes"), ("15", "15 minutes"), ("30", "30 minutes"), ("60", "60 minutes"),
        ]
        adjust = [("1", "forward adjusted"), ("2", "backward adjusted"), ("3", "unadjusted")]
        year_type = [("report", "announcement year"), ("operate", "ex-dividend year")]
        index = [("hs300", "CSI 300"), ("sz50", "SSE 50"), ("zz500", "CSI 500")]

        sections = []

        def as_md(title: str, rows):
            if not rows:
                return ""
            header = f"### {title}\n\n| value | meaning |\n|---|---|\n"
            lines = [f"| {v} | {m} |" for (v, m) in rows]
            return header + "\n".join(lines) + "\n"

        k = (kind or "").strip().lower()
        if k in ("", "frequency"):
            sections.append(as_md("frequency", freq))
        if k in ("", "adjust_flag"):
            sections.append(as_md("adjust_flag", adjust))
        if k in ("", "year_type"):
            sections.append(as_md("year_type", year_type))
        if k in ("", "index"):
            sections.append(as_md("index", index))

        out = "\n".join(s for s in sections if s)
        if not out:
            return "Error: Invalid kind. Use one of 'frequency', 'adjust_flag', 'year_type', 'index'."
        return out

```

--------------------------------------------------------------------------------
/src/tools/macroeconomic.py:
--------------------------------------------------------------------------------

```python
"""
Macroeconomic tools for the MCP server.
Delegates to use cases with shared validation and error handling.
"""
import logging
from typing import Optional

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource
from src.services.tool_runner import run_tool_with_handling
from src.use_cases.macroeconomic import (
    fetch_deposit_rate_data,
    fetch_loan_rate_data,
    fetch_money_supply_data_month,
    fetch_money_supply_data_year,
    fetch_required_reserve_ratio_data,
)

logger = logging.getLogger(__name__)


def register_macroeconomic_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """Register macroeconomic tools."""

    @app.tool()
    def get_deposit_rate_data(start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """Benchmark deposit rates."""
        return run_tool_with_handling(
            lambda: fetch_deposit_rate_data(active_data_source, start_date=start_date, end_date=end_date, limit=limit, format=format),
            context="get_deposit_rate_data",
        )

    @app.tool()
    def get_loan_rate_data(start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """Benchmark loan rates."""
        return run_tool_with_handling(
            lambda: fetch_loan_rate_data(active_data_source, start_date=start_date, end_date=end_date, limit=limit, format=format),
            context="get_loan_rate_data",
        )

    @app.tool()
    def get_required_reserve_ratio_data(start_date: Optional[str] = None, end_date: Optional[str] = None, year_type: str = '0', limit: int = 250, format: str = "markdown") -> str:
        """Required reserve ratio data."""
        return run_tool_with_handling(
            lambda: fetch_required_reserve_ratio_data(
                active_data_source, start_date=start_date, end_date=end_date, year_type=year_type, limit=limit, format=format
            ),
            context="get_required_reserve_ratio_data",
        )

    @app.tool()
    def get_money_supply_data_month(start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """Monthly money supply data."""
        return run_tool_with_handling(
            lambda: fetch_money_supply_data_month(
                active_data_source, start_date=start_date, end_date=end_date, limit=limit, format=format
            ),
            context="get_money_supply_data_month",
        )

    @app.tool()
    def get_money_supply_data_year(start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """Yearly money supply data."""
        return run_tool_with_handling(
            lambda: fetch_money_supply_data_year(
                active_data_source, start_date=start_date, end_date=end_date, limit=limit, format=format
            ),
            context="get_money_supply_data_year",
        )

```

--------------------------------------------------------------------------------
/src/tools/date_utils.py:
--------------------------------------------------------------------------------

```python
"""
Date utility tools for the MCP server.
Delegates to use-case layer for consistent behavior.
"""
import logging
from typing import Optional

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource
from src.services.tool_runner import run_tool_with_handling
from src.use_cases import date_utils as uc_date

logger = logging.getLogger(__name__)


def register_date_utils_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """Register date utility tools."""

    @app.tool()
    def get_latest_trading_date() -> str:
        """Get the latest trading date up to today."""
        logger.info("Tool 'get_latest_trading_date' called")
        return run_tool_with_handling(
            lambda: uc_date.get_latest_trading_date(active_data_source),
            context="get_latest_trading_date",
        )

    @app.tool()
    def get_market_analysis_timeframe(period: str = "recent") -> str:
        """Return a human-friendly timeframe label."""
        logger.info(f"Tool 'get_market_analysis_timeframe' called with period={period}")
        return run_tool_with_handling(
            lambda: uc_date.get_market_analysis_timeframe(period=period),
            context="get_market_analysis_timeframe",
        )

    @app.tool()
    def is_trading_day(date: str) -> str:
        """Check if a specific date is a trading day."""
        return run_tool_with_handling(
            lambda: uc_date.is_trading_day(active_data_source, date=date),
            context=f"is_trading_day:{date}",
        )

    @app.tool()
    def previous_trading_day(date: str) -> str:
        """Get the previous trading day before the given date."""
        return run_tool_with_handling(
            lambda: uc_date.previous_trading_day(active_data_source, date=date),
            context=f"previous_trading_day:{date}",
        )

    @app.tool()
    def next_trading_day(date: str) -> str:
        """Get the next trading day after the given date."""
        return run_tool_with_handling(
            lambda: uc_date.next_trading_day(active_data_source, date=date),
            context=f"next_trading_day:{date}",
        )

    @app.tool()
    def get_last_n_trading_days(days: int = 5) -> str:
        """Return the last N trading dates."""
        return run_tool_with_handling(
            lambda: uc_date.get_last_n_trading_days(active_data_source, days=days),
            context=f"get_last_n_trading_days:{days}",
        )

    @app.tool()
    def get_recent_trading_range(days: int = 5) -> str:
        """Return a date range string covering the recent N trading days."""
        return run_tool_with_handling(
            lambda: uc_date.get_recent_trading_range(active_data_source, days=days),
            context=f"get_recent_trading_range:{days}",
        )

    @app.tool()
    def get_month_end_trading_dates(year: int) -> str:
        """Return month-end trading dates for a given year."""
        return run_tool_with_handling(
            lambda: uc_date.get_month_end_trading_dates(active_data_source, year=year),
            context=f"get_month_end_trading_dates:{year}",
        )

```

--------------------------------------------------------------------------------
/src/tools/indices.py:
--------------------------------------------------------------------------------

```python
"""
Index-related tools for the MCP server.
Delegates to use-case layer for validation and formatting.
"""
import logging
from typing import Optional

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource
from src.services.tool_runner import run_tool_with_handling
from src.use_cases.indices import (
    fetch_index_constituents,
    fetch_industry_members,
    fetch_list_industries,
    fetch_stock_industry,
)

logger = logging.getLogger(__name__)


def register_index_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """Register index related tools with the MCP app."""

    @app.tool()
    def get_stock_industry(code: Optional[str] = None, date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """Get industry classification for a specific stock or all stocks on a date."""
        logger.info(f"Tool 'get_stock_industry' called for code={code or 'all'}, date={date or 'latest'}")
        return run_tool_with_handling(
            lambda: fetch_stock_industry(active_data_source, code=code, date=date, limit=limit, format=format),
            context=f"get_stock_industry:{code or 'all'}",
        )

    @app.tool()
    def get_sz50_stocks(date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """SZSE 50 constituents."""
        return run_tool_with_handling(
            lambda: fetch_index_constituents(active_data_source, index="sz50", date=date, limit=limit, format=format),
            context="get_sz50_stocks",
        )

    @app.tool()
    def get_hs300_stocks(date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """CSI 300 constituents."""
        return run_tool_with_handling(
            lambda: fetch_index_constituents(active_data_source, index="hs300", date=date, limit=limit, format=format),
            context="get_hs300_stocks",
        )

    @app.tool()
    def get_zz500_stocks(date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """CSI 500 constituents."""
        return run_tool_with_handling(
            lambda: fetch_index_constituents(active_data_source, index="zz500", date=date, limit=limit, format=format),
            context="get_zz500_stocks",
        )

    @app.tool()
    def get_index_constituents(index: str, date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """Generic index constituent fetch (hs300/sz50/zz500)."""
        return run_tool_with_handling(
            lambda: fetch_index_constituents(active_data_source, index=index, date=date, limit=limit, format=format),
            context=f"get_index_constituents:{index}",
        )

    @app.tool()
    def list_industries(date: Optional[str] = None, format: str = "markdown") -> str:
        """List distinct industries for a given date."""
        logger.info("Tool 'list_industries' called date=%s", date or "latest")
        return run_tool_with_handling(
            lambda: fetch_list_industries(active_data_source, date=date, format=format),
            context="list_industries",
        )

    @app.tool()
    def get_industry_members(industry: str, date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """Get all stocks in a given industry on a date."""
        logger.info("Tool 'get_industry_members' called industry=%s, date=%s", industry, date or "latest")
        return run_tool_with_handling(
            lambda: fetch_industry_members(active_data_source, industry=industry, date=date, limit=limit, format=format),
            context=f"get_industry_members:{industry}",
        )

```

--------------------------------------------------------------------------------
/src/formatting/markdown_formatter.py:
--------------------------------------------------------------------------------

```python
"""
Markdown formatting utilities for A-Share MCP Server.
"""
import pandas as pd
import logging
import json

logger = logging.getLogger(__name__)

# Configuration: Max rows to display in string outputs to protect context length
MAX_MARKDOWN_ROWS = 250


def format_df_to_markdown(df: pd.DataFrame, max_rows: int = None) -> str:
    """Formats a Pandas DataFrame to a Markdown string with row truncation.

    Args:
        df: The DataFrame to format
        max_rows: Maximum rows to include in output. Defaults to MAX_MARKDOWN_ROWS if None.

    Returns:
        A markdown formatted string representation of the DataFrame
    """
    if df is None or df.empty:
        logger.warning("Attempted to format an empty DataFrame to Markdown.")
        return "(No data available to display)"

    if max_rows is None:
        max_rows = MAX_MARKDOWN_ROWS

    original_rows = df.shape[0]
    rows_to_show = min(original_rows, max_rows)
    df_display = df.head(rows_to_show)

    truncated = original_rows > rows_to_show

    try:
        markdown_table = df_display.to_markdown(index=False)
    except Exception as e:
        logger.error("Error converting DataFrame to Markdown: %s", e, exc_info=True)
        return "Error: Could not format data into Markdown table."

    if truncated:
        notes = f"rows truncated to {rows_to_show} from {original_rows}"
        return f"Note: Data truncated ({notes}).\n\n{markdown_table}"
    return markdown_table


def format_table_output(
    df: pd.DataFrame,
    format: str = "markdown",
    max_rows: int | None = None,
    meta: dict | None = None,
) -> str:
    """Formats a DataFrame into the requested string format with optional meta.

    Args:
        df: Data to format.
        format: 'markdown' | 'json' | 'csv'. Defaults to 'markdown'.
        max_rows: Optional max rows to include (defaults depend on formatters).
        meta: Optional metadata dict to include (prepended for markdown, embedded for json).

    Returns:
        A string suitable for tool responses.
    """
    fmt = (format or "markdown").lower()

    # Normalize row cap
    if max_rows is None:
        max_rows = MAX_MARKDOWN_ROWS if fmt == "markdown" else MAX_MARKDOWN_ROWS

    total_rows = 0 if df is None else int(df.shape[0])
    rows_to_show = 0 if df is None else min(total_rows, max_rows)
    truncated = total_rows > rows_to_show
    df_display = df.head(rows_to_show) if df is not None else pd.DataFrame()

    if fmt == "markdown":
        header = ""
        if meta:
            # Render a compact meta header
            lines = ["Meta:"]
            for k, v in meta.items():
                lines.append(f"- {k}: {v}")
            header = "\n".join(lines) + "\n\n"
        return header + format_df_to_markdown(df_display, max_rows=max_rows)

    if fmt == "csv":
        try:
            return df_display.to_csv(index=False)
        except Exception as e:
            logger.error("Error converting DataFrame to CSV: %s", e, exc_info=True)
            return "Error: Could not format data into CSV."

    if fmt == "json":
        try:
            payload = {
                "data": [] if df_display is None else df_display.to_dict(orient="records"),
                "meta": {
                    **(meta or {}),
                    "total_rows": total_rows,
                    "returned_rows": rows_to_show,
                    "truncated": truncated,
                    "columns": [] if df_display is None else list(df_display.columns),
                },
            }
            return json.dumps(payload, ensure_ascii=False)
        except Exception as e:
            logger.error("Error converting DataFrame to JSON: %s", e, exc_info=True)
            return "Error: Could not format data into JSON."

    # Fallback to markdown if unknown format
    logger.warning("Unknown format '%s', falling back to markdown", fmt)
    return format_df_to_markdown(df_display, max_rows=max_rows)

```

--------------------------------------------------------------------------------
/src/tools/market_overview.py:
--------------------------------------------------------------------------------

```python
"""
Market overview tools for the MCP server.
Includes trading calendar, stock list, and discovery helpers.
"""
import logging
from typing import Optional

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource
from src.services.tool_runner import run_tool_with_handling
from src.use_cases.market_overview import (
    fetch_all_stock,
    fetch_search_stocks,
    fetch_suspensions,
    fetch_trade_dates,
)

logger = logging.getLogger(__name__)


def register_market_overview_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """
    Register market overview tools with the MCP app.

    Args:
        app: The FastMCP app instance
        active_data_source: The active financial data source
    """

    @app.tool()
    def get_trade_dates(start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetch trading dates within a specified range.

        Args:
            start_date: Optional. Start date in 'YYYY-MM-DD' format. Defaults to 2015-01-01 if None.
            end_date: Optional. End date in 'YYYY-MM-DD' format. Defaults to the current date if None.

        Returns:
            Markdown table with 'is_trading_day' (1=trading, 0=non-trading).
        """
        logger.info(f"Tool 'get_trade_dates' called for range {start_date or 'default'} to {end_date or 'default'}")
        return run_tool_with_handling(
            lambda: fetch_trade_dates(active_data_source, start_date=start_date, end_date=end_date, limit=limit, format=format),
            context="get_trade_dates",
        )

    @app.tool()
    def get_all_stock(date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetch a list of all stocks (A-shares and indices) and their trading status for a date.

        Args:
            date: Optional. The date in 'YYYY-MM-DD' format. If None, uses the current date.

        Returns:
            Markdown table listing stock codes and trading status (1=trading, 0=suspended).
        """
        logger.info(f"Tool 'get_all_stock' called for date={date or 'default'}")
        return run_tool_with_handling(
            lambda: fetch_all_stock(active_data_source, date=date, limit=limit, format=format),
            context=f"get_all_stock:{date or 'default'}",
        )

    @app.tool()
    def search_stocks(keyword: str, date: Optional[str] = None, limit: int = 50, format: str = "markdown") -> str:
        """
        Search stocks by code substring on a date.

        Args:
            keyword: Substring to match in the stock code (e.g., '600', '000001').
            date: Optional 'YYYY-MM-DD'. If None, uses current date.
            limit: Max rows to return. Defaults to 50.
            format: Output format: 'markdown' | 'json' | 'csv'. Defaults to 'markdown'.

        Returns:
            Matching stock codes with their trading status.
        """
        logger.info("Tool 'search_stocks' called keyword=%s, date=%s, limit=%s, format=%s", keyword, date or "default", limit, format)
        return run_tool_with_handling(
            lambda: fetch_search_stocks(active_data_source, keyword=keyword, date=date, limit=limit, format=format),
            context=f"search_stocks:{keyword}",
        )

    @app.tool()
    def get_suspensions(date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        List suspended stocks for a date.

        Args:
            date: Optional 'YYYY-MM-DD'. If None, uses current date.
            limit: Max rows to return. Defaults to 250.
            format: Output format: 'markdown' | 'json' | 'csv'. Defaults to 'markdown'.

        Returns:
            Table of stocks where tradeStatus==0.
        """
        logger.info("Tool 'get_suspensions' called date=%s, limit=%s, format=%s", date or "current", limit, format)
        return run_tool_with_handling(
            lambda: fetch_suspensions(active_data_source, date=date, limit=limit, format=format),
            context=f"get_suspensions:{date or 'current'}",
        )

```

--------------------------------------------------------------------------------
/src/use_cases/financial_reports.py:
--------------------------------------------------------------------------------

```python
"""Use cases for financial report related tools."""
from typing import Optional

from src.data_source_interface import FinancialDataSource
from src.formatting.markdown_formatter import format_table_output
from src.services.validation import (
    validate_output_format,
    validate_quarter,
    validate_year,
)


def _format_financial_df(df, *, code: str, year: str | None, quarter: Optional[int], dataset: str, format: str, limit: int) -> str:
    meta = {"code": code, "dataset": dataset}
    if year:
        meta["year"] = year
    if quarter is not None:
        meta["quarter"] = quarter
    return format_table_output(df, format=format, max_rows=limit, meta=meta)


def fetch_profit_data(data_source: FinancialDataSource, *, code: str, year: str, quarter: int, limit: int, format: str) -> str:
    validate_year(year)
    validate_quarter(quarter)
    validate_output_format(format)
    df = data_source.get_profit_data(code=code, year=year, quarter=quarter)
    return _format_financial_df(df, code=code, year=year, quarter=quarter, dataset="Profitability", format=format, limit=limit)


def fetch_operation_data(data_source: FinancialDataSource, *, code: str, year: str, quarter: int, limit: int, format: str) -> str:
    validate_year(year)
    validate_quarter(quarter)
    validate_output_format(format)
    df = data_source.get_operation_data(code=code, year=year, quarter=quarter)
    return _format_financial_df(df, code=code, year=year, quarter=quarter, dataset="Operation Capability", format=format, limit=limit)


def fetch_growth_data(data_source: FinancialDataSource, *, code: str, year: str, quarter: int, limit: int, format: str) -> str:
    validate_year(year)
    validate_quarter(quarter)
    validate_output_format(format)
    df = data_source.get_growth_data(code=code, year=year, quarter=quarter)
    return _format_financial_df(df, code=code, year=year, quarter=quarter, dataset="Growth", format=format, limit=limit)


def fetch_balance_data(data_source: FinancialDataSource, *, code: str, year: str, quarter: int, limit: int, format: str) -> str:
    validate_year(year)
    validate_quarter(quarter)
    validate_output_format(format)
    df = data_source.get_balance_data(code=code, year=year, quarter=quarter)
    return _format_financial_df(df, code=code, year=year, quarter=quarter, dataset="Balance Sheet", format=format, limit=limit)


def fetch_cash_flow_data(data_source: FinancialDataSource, *, code: str, year: str, quarter: int, limit: int, format: str) -> str:
    validate_year(year)
    validate_quarter(quarter)
    validate_output_format(format)
    df = data_source.get_cash_flow_data(code=code, year=year, quarter=quarter)
    return _format_financial_df(df, code=code, year=year, quarter=quarter, dataset="Cash Flow", format=format, limit=limit)


def fetch_dupont_data(data_source: FinancialDataSource, *, code: str, year: str, quarter: int, limit: int, format: str) -> str:
    validate_year(year)
    validate_quarter(quarter)
    validate_output_format(format)
    df = data_source.get_dupont_data(code=code, year=year, quarter=quarter)
    return _format_financial_df(df, code=code, year=year, quarter=quarter, dataset="Dupont", format=format, limit=limit)


def fetch_performance_express_report(data_source: FinancialDataSource, *, code: str, start_date: str, end_date: str, limit: int, format: str) -> str:
    validate_output_format(format)
    df = data_source.get_performance_express_report(code=code, start_date=start_date, end_date=end_date)
    meta = {"code": code, "start_date": start_date, "end_date": end_date, "dataset": "Performance Express"}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)


def fetch_forecast_report(data_source: FinancialDataSource, *, code: str, start_date: str, end_date: str, limit: int, format: str) -> str:
    validate_output_format(format)
    df = data_source.get_forecast_report(code=code, start_date=start_date, end_date=end_date)
    meta = {"code": code, "start_date": start_date, "end_date": end_date, "dataset": "Forecast"}
    return format_table_output(df, format=format, max_rows=limit, meta=meta)

```

--------------------------------------------------------------------------------
/src/tools/financial_reports.py:
--------------------------------------------------------------------------------

```python
"""
Financial report tools for the MCP server.
Thin wrappers delegating to use cases with shared validation and error handling.
"""
import logging
from typing import Optional

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource
from src.services.tool_runner import run_tool_with_handling
from src.use_cases.financial_reports import (
    fetch_balance_data,
    fetch_cash_flow_data,
    fetch_dupont_data,
    fetch_forecast_report,
    fetch_growth_data,
    fetch_operation_data,
    fetch_performance_express_report,
    fetch_profit_data,
)

logger = logging.getLogger(__name__)


def register_financial_report_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """
    Register financial report related tools with the MCP app.
    """

    @app.tool()
    def get_profit_data(code: str, year: str, quarter: int, limit: int = 250, format: str = "markdown") -> str:
        """Quarterly profitability data."""
        return run_tool_with_handling(
            lambda: fetch_profit_data(active_data_source, code=code, year=year, quarter=quarter, limit=limit, format=format),
            context=f"get_profit_data:{code}:{year}Q{quarter}",
        )

    @app.tool()
    def get_operation_data(code: str, year: str, quarter: int, limit: int = 250, format: str = "markdown") -> str:
        """Quarterly operation capability data."""
        return run_tool_with_handling(
            lambda: fetch_operation_data(active_data_source, code=code, year=year, quarter=quarter, limit=limit, format=format),
            context=f"get_operation_data:{code}:{year}Q{quarter}",
        )

    @app.tool()
    def get_growth_data(code: str, year: str, quarter: int, limit: int = 250, format: str = "markdown") -> str:
        """Quarterly growth capability data."""
        return run_tool_with_handling(
            lambda: fetch_growth_data(active_data_source, code=code, year=year, quarter=quarter, limit=limit, format=format),
            context=f"get_growth_data:{code}:{year}Q{quarter}",
        )

    @app.tool()
    def get_balance_data(code: str, year: str, quarter: int, limit: int = 250, format: str = "markdown") -> str:
        """Quarterly balance sheet data."""
        return run_tool_with_handling(
            lambda: fetch_balance_data(active_data_source, code=code, year=year, quarter=quarter, limit=limit, format=format),
            context=f"get_balance_data:{code}:{year}Q{quarter}",
        )

    @app.tool()
    def get_cash_flow_data(code: str, year: str, quarter: int, limit: int = 250, format: str = "markdown") -> str:
        """Quarterly cash flow data."""
        return run_tool_with_handling(
            lambda: fetch_cash_flow_data(active_data_source, code=code, year=year, quarter=quarter, limit=limit, format=format),
            context=f"get_cash_flow_data:{code}:{year}Q{quarter}",
        )

    @app.tool()
    def get_dupont_data(code: str, year: str, quarter: int, limit: int = 250, format: str = "markdown") -> str:
        """Quarterly Dupont analysis data."""
        return run_tool_with_handling(
            lambda: fetch_dupont_data(active_data_source, code=code, year=year, quarter=quarter, limit=limit, format=format),
            context=f"get_dupont_data:{code}:{year}Q{quarter}",
        )

    @app.tool()
    def get_performance_express_report(code: str, start_date: str, end_date: str, limit: int = 250, format: str = "markdown") -> str:
        """Performance express report within date range."""
        return run_tool_with_handling(
            lambda: fetch_performance_express_report(
                active_data_source, code=code, start_date=start_date, end_date=end_date, limit=limit, format=format
            ),
            context=f"get_performance_express_report:{code}:{start_date}-{end_date}",
        )

    @app.tool()
    def get_forecast_report(code: str, start_date: str, end_date: str, limit: int = 250, format: str = "markdown") -> str:
        """Earnings forecast report within date range."""
        return run_tool_with_handling(
            lambda: fetch_forecast_report(
                active_data_source, code=code, start_date=start_date, end_date=end_date, limit=limit, format=format
            ),
            context=f"get_forecast_report:{code}:{start_date}-{end_date}",
        )

```

--------------------------------------------------------------------------------
/src/use_cases/date_utils.py:
--------------------------------------------------------------------------------

```python
"""Use cases for date utility tools."""
import calendar
from datetime import datetime, timedelta
from typing import Optional

import pandas as pd

from src.data_source_interface import FinancialDataSource


def _fetch_trading_days(data_source: FinancialDataSource, start_date: str, end_date: str) -> pd.DataFrame:
    return data_source.get_trade_dates(start_date=start_date, end_date=end_date)


def get_latest_trading_date(data_source: FinancialDataSource) -> str:
    today = datetime.now().strftime("%Y-%m-%d")
    start_date = datetime.now().replace(day=1).strftime("%Y-%m-%d")
    end_date = datetime.now().replace(day=28).strftime("%Y-%m-%d")
    df = _fetch_trading_days(data_source, start_date=start_date, end_date=end_date)
    valid_trading_days = df[df["is_trading_day"] == "1"]["calendar_date"].tolist()
    latest_trading_date = None
    for dstr in valid_trading_days:
        if dstr <= today and (latest_trading_date is None or dstr > latest_trading_date):
            latest_trading_date = dstr
    return latest_trading_date or today


def get_market_analysis_timeframe(period: str = "recent") -> str:
    now = datetime.now()
    end_date = now
    if period == "recent":
        if now.day < 15:
            if now.month == 1:
                start_date = datetime(now.year - 1, 11, 1)
            else:
                prev_month = now.month - 1
                start_month = prev_month if prev_month > 0 else 12
                start_year = now.year if prev_month > 0 else now.year - 1
                start_date = datetime(start_year, start_month, 1)
        else:
            start_date = datetime(now.year, now.month, 1)
    elif period == "quarter":
        quarter = (now.month - 1) // 3 + 1
        start_month = (quarter - 1) * 3 + 1
        start_date = datetime(now.year, start_month, 1)
    elif period == "half_year":
        start_month = 1 if now.month <= 6 else 7
        start_date = datetime(now.year, start_month, 1)
    elif period == "year":
        start_date = datetime(now.year, 1, 1)
    else:
        raise ValueError("Invalid period. Use 'recent', 'quarter', 'half_year', or 'year'.")
    return f"{start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}"


def is_trading_day(data_source: FinancialDataSource, *, date: str) -> str:
    df = _fetch_trading_days(data_source, start_date=date, end_date=date)
    if df.empty:
        return "未知"
    row = df.iloc[0]
    return "是" if str(row.get("is_trading_day", "")) == "1" else "否"


def previous_trading_day(data_source: FinancialDataSource, *, date: str) -> str:
    target = datetime.strptime(date, "%Y-%m-%d")
    start = (target - timedelta(days=31)).strftime("%Y-%m-%d")
    df = _fetch_trading_days(data_source, start_date=start, end_date=date)
    days = df[df["is_trading_day"] == "1"]["calendar_date"].tolist()
    prev = max([d for d in days if d < date], default=None)
    return prev or date


def next_trading_day(data_source: FinancialDataSource, *, date: str) -> str:
    target = datetime.strptime(date, "%Y-%m-%d")
    end = (target + timedelta(days=31)).strftime("%Y-%m-%d")
    df = _fetch_trading_days(data_source, start_date=date, end_date=end)
    days = df[df["is_trading_day"] == "1"]["calendar_date"].tolist()
    next_day = min([d for d in days if d > date], default=None)
    return next_day or date


def get_last_n_trading_days(data_source: FinancialDataSource, *, days: int) -> str:
    today = datetime.now()
    start = (today - timedelta(days=days * 2)).strftime("%Y-%m-%d")
    end = today.strftime("%Y-%m-%d")
    df = _fetch_trading_days(data_source, start_date=start, end_date=end)
    trading_days = df[df["is_trading_day"] == "1"]["calendar_date"].tolist()
    return ", ".join(trading_days[-days:]) if trading_days else ""


def get_recent_trading_range(data_source: FinancialDataSource, *, days: int) -> str:
    today = datetime.now()
    start = (today - timedelta(days=days * 2)).strftime("%Y-%m-%d")
    end = today.strftime("%Y-%m-%d")
    df = _fetch_trading_days(data_source, start_date=start, end_date=end)
    trading_days = df[df["is_trading_day"] == "1"]["calendar_date"].tolist()
    if not trading_days:
        return ""
    return f"{trading_days[-days]} 至 {trading_days[-1]}" if len(trading_days) >= days else f"{trading_days[0]} 至 {trading_days[-1]}"


def get_month_end_trading_dates(data_source: FinancialDataSource, *, year: int) -> str:
    results = []
    for month in range(1, 13):
        last_day = calendar.monthrange(year, month)[1]
        start_date = datetime(year, month, last_day - 7).strftime("%Y-%m-%d")
        end_date = datetime(year, month, last_day).strftime("%Y-%m-%d")
        df = _fetch_trading_days(data_source, start_date=start_date, end_date=end_date)
        trading_days = df[df["is_trading_day"] == "1"]["calendar_date"].tolist()
        if trading_days:
            results.append(trading_days[-1])
    return ", ".join(results)

```

--------------------------------------------------------------------------------
/src/tools/stock_market.py:
--------------------------------------------------------------------------------

```python
"""
Stock market tools for the MCP server.
Thin wrappers that delegate to use cases with shared validation and error handling.
"""
import logging
from typing import List, Optional

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource
from src.services.tool_runner import run_tool_with_handling
from src.use_cases.stock_market import (
    fetch_adjust_factor_data,
    fetch_dividend_data,
    fetch_historical_k_data,
    fetch_stock_basic_info,
)

logger = logging.getLogger(__name__)


def register_stock_market_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """
    Register stock market data tools with the MCP app.

    Args:
        app: The FastMCP app instance
        active_data_source: The active financial data source
    """

    @app.tool()
    def get_historical_k_data(
        code: str,
        start_date: str,
        end_date: str,
        frequency: str = "d",
        adjust_flag: str = "3",
        fields: Optional[List[str]] = None,
        limit: int = 250,
        format: str = "markdown",
    ) -> str:
        """
        Fetches historical K-line (OHLCV) data for a Chinese A-share stock.

        Args:
            code: The stock code in Baostock format (e.g., 'sh.600000', 'sz.000001').
            start_date: Start date in 'YYYY-MM-DD' format.
            end_date: End date in 'YYYY-MM-DD' format.
            frequency: Data frequency. Valid options (from Baostock):
                         'd': daily
                         'w': weekly
                         'm': monthly
                         '5': 5 minutes
                         '15': 15 minutes
                         '30': 30 minutes
                         '60': 60 minutes
                       Defaults to 'd'.
            adjust_flag: Adjustment flag for price/volume. Valid options (from Baostock):
                           '1': Forward adjusted (后复权)
                           '2': Backward adjusted (前复权)
                           '3': Non-adjusted (不复权)
                         Defaults to '3'.
            fields: Optional list of specific data fields to retrieve (must be valid Baostock fields).
                    If None or empty, default fields will be used (e.g., date, code, open, high, low, close, volume, amount, pctChg).
            limit: Max rows to return. Defaults to 250.
            format: Output format: 'markdown' | 'json' | 'csv'. Defaults to 'markdown'.

            Returns:
                A Markdown formatted string containing the K-line data table, or an error message.
                The table might be truncated if the result set is too large.
            """
        logger.info(
            f"Tool 'get_historical_k_data' called for {code} ({start_date}-{end_date}, freq={frequency}, adj={adjust_flag}, fields={fields})"
        )
        return run_tool_with_handling(
            lambda: fetch_historical_k_data(
                active_data_source,
                code=code,
                start_date=start_date,
                end_date=end_date,
                frequency=frequency,
                adjust_flag=adjust_flag,
                fields=fields,
                limit=limit,
                format=format,
            ),
            context=f"get_historical_k_data:{code}",
        )

    @app.tool()
    def get_stock_basic_info(code: str, fields: Optional[List[str]] = None, format: str = "markdown") -> str:
        """
        Fetches basic information for a given Chinese A-share stock.

        Args:
            code: The stock code in Baostock format (e.g., 'sh.600000', 'sz.000001').
            fields: Optional list to select specific columns from the available basic info
                    (e.g., ['code', 'code_name', 'industry', 'listingDate']).
                    If None or empty, returns all available basic info columns from Baostock.

        Returns:
            Basic stock information in the requested format.
        """
        logger.info(f"Tool 'get_stock_basic_info' called for {code} (fields={fields})")
        return run_tool_with_handling(
            lambda: fetch_stock_basic_info(
                active_data_source, code=code, fields=fields, format=format
            ),
            context=f"get_stock_basic_info:{code}",
        )

    @app.tool()
    def get_dividend_data(code: str, year: str, year_type: str = "report", limit: int = 250, format: str = "markdown") -> str:
        """
        Fetches dividend information for a given stock code and year.

        Args:
            code: The stock code in Baostock format (e.g., 'sh.600000', 'sz.000001').
            year: The year to query (e.g., '2023').
            year_type: Type of year. Valid options (from Baostock):
                         'report': Announcement year (预案公告年份)
                         'operate': Ex-dividend year (除权除息年份)
                       Defaults to 'report'.

        Returns:
            Dividend records table.
        """
        logger.info(f"Tool 'get_dividend_data' called for {code}, year={year}, year_type={year_type}")
        return run_tool_with_handling(
            lambda: fetch_dividend_data(
                active_data_source,
                code=code,
                year=year,
                year_type=year_type,
                limit=limit,
                format=format,
            ),
            context=f"get_dividend_data:{code}:{year}",
        )

    @app.tool()
    def get_adjust_factor_data(code: str, start_date: str, end_date: str, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetches adjustment factor data for a given stock code and date range.
        Uses Baostock's "涨跌幅复权算法" factors. Useful for calculating adjusted prices.

        Args:
            code: The stock code in Baostock format (e.g., 'sh.600000', 'sz.000001').
            start_date: Start date in 'YYYY-MM-DD' format.
            end_date: End date in 'YYYY-MM-DD' format.

        Returns:
            Adjustment factors table.
        """
        logger.info(f"Tool 'get_adjust_factor_data' called for {code} ({start_date} to {end_date})")
        return run_tool_with_handling(
            lambda: fetch_adjust_factor_data(
                active_data_source,
                code=code,
                start_date=start_date,
                end_date=end_date,
                limit=limit,
                format=format,
            ),
            context=f"get_adjust_factor_data:{code}",
        )

```

--------------------------------------------------------------------------------
/src/data_source_interface.py:
--------------------------------------------------------------------------------

```python
# Defines the abstract interface for financial data sources
from abc import ABC, abstractmethod
import pandas as pd
from typing import Optional, List

class DataSourceError(Exception):
    """Base exception for data source errors."""
    pass


class LoginError(DataSourceError):
    """Exception raised for login failures to the data source."""
    pass


class NoDataFoundError(DataSourceError):
    """Exception raised when no data is found for the given query."""
    pass


class FinancialDataSource(ABC):
    """
    Abstract base class defining the interface for financial data sources.
    Implementations of this class provide access to specific financial data APIs
    (e.g., Baostock, Akshare).
    """

    @abstractmethod
    def get_historical_k_data(
        self,
        code: str,
        start_date: str,
        end_date: str,
        frequency: str = "d",
        adjust_flag: str = "3",
        fields: Optional[List[str]] = None,
    ) -> pd.DataFrame:
        """
        Fetches historical K-line (OHLCV) data for a given stock code.

        Args:
            code: The stock code (e.g., 'sh.600000', 'sz.000001').
            start_date: Start date in 'YYYY-MM-DD' format.
            end_date: End date in 'YYYY-MM-DD' format.
            frequency: Data frequency. Common values depend on the underlying
                       source (e.g., 'd' for daily, 'w' for weekly, 'm' for monthly,
                       '5', '15', '30', '60' for minutes). Defaults to 'd'.
            adjust_flag: Adjustment flag for historical data. Common values
                         depend on the source (e.g., '1' for forward adjusted,
                         '2' for backward adjusted, '3' for non-adjusted).
                         Defaults to '3'.
            fields: Optional list of specific fields to retrieve. If None,
                    retrieves default fields defined by the implementation.

        Returns:
            A pandas DataFrame containing the historical K-line data, with
            columns corresponding to the requested fields.

        Raises:
            LoginError: If login to the data source fails.
            NoDataFoundError: If no data is found for the query.
            DataSourceError: For other data source related errors.
            ValueError: If input parameters are invalid.
        """
        pass

    @abstractmethod
    def get_stock_basic_info(self, code: str) -> pd.DataFrame:
        """
        Fetches basic information for a given stock code.

        Args:
            code: The stock code (e.g., 'sh.600000', 'sz.000001').

        Returns:
            A pandas DataFrame containing the basic stock information.
            The structure and columns depend on the underlying data source.
            Typically contains info like name, industry, listing date, etc.

        Raises:
            LoginError: If login to the data source fails.
            NoDataFoundError: If no data is found for the query.
            DataSourceError: For other data source related errors.
            ValueError: If the input code is invalid.
        """
        pass

    @abstractmethod
    def get_trade_dates(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches trading dates information within a range."""
        pass

    @abstractmethod
    def get_all_stock(self, date: Optional[str] = None) -> pd.DataFrame:
        """Fetches list of all stocks and their trading status on a given date."""
        pass

    @abstractmethod
    def get_deposit_rate_data(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches benchmark deposit rates."""
        pass

    @abstractmethod
    def get_loan_rate_data(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches benchmark loan rates."""
        pass

    @abstractmethod
    def get_required_reserve_ratio_data(self, start_date: Optional[str] = None, end_date: Optional[str] = None, year_type: str = '0') -> pd.DataFrame:
        """Fetches required reserve ratio data."""
        pass

    @abstractmethod
    def get_money_supply_data_month(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches monthly money supply data (M0, M1, M2)."""
        pass

    @abstractmethod
    def get_money_supply_data_year(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches yearly money supply data (M0, M1, M2 - year end balance)."""
        pass

    @abstractmethod
    def get_dividend_data(self, code: str, year: str, year_type: str = "report") -> pd.DataFrame:
        """Fetches dividend information for a stock and year."""
        pass

    @abstractmethod
    def get_adjust_factor_data(self, code: str, start_date: str, end_date: str) -> pd.DataFrame:
        """Fetches adjustment factor data used for price adjustments."""
        pass

    # Financial report datasets
    @abstractmethod
    def get_profit_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        pass

    @abstractmethod
    def get_operation_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        pass

    @abstractmethod
    def get_growth_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        pass

    @abstractmethod
    def get_balance_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        pass

    @abstractmethod
    def get_cash_flow_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        pass

    @abstractmethod
    def get_dupont_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        pass

    @abstractmethod
    def get_performance_express_report(self, code: str, start_date: str, end_date: str) -> pd.DataFrame:
        pass

    @abstractmethod
    def get_forecast_report(self, code: str, start_date: str, end_date: str) -> pd.DataFrame:
        pass

    # Index / industry
    @abstractmethod
    def get_stock_industry(self, code: Optional[str] = None, date: Optional[str] = None) -> pd.DataFrame:
        pass

    @abstractmethod
    def get_hs300_stocks(self, date: Optional[str] = None) -> pd.DataFrame:
        pass

    @abstractmethod
    def get_sz50_stocks(self, date: Optional[str] = None) -> pd.DataFrame:
        pass

    @abstractmethod
    def get_zz500_stocks(self, date: Optional[str] = None) -> pd.DataFrame:
        pass

    # Market overview
    @abstractmethod
    def get_all_stock(self, date: Optional[str] = None) -> pd.DataFrame:
        pass
    # Note: SHIBOR is not implemented in current Baostock bindings; no abstract method here.

```

--------------------------------------------------------------------------------
/src/baostock_data_source.py:
--------------------------------------------------------------------------------

```python
# Implementation of the FinancialDataSource interface using Baostock
import baostock as bs
import pandas as pd
from typing import List, Optional
import logging
from .data_source_interface import FinancialDataSource, DataSourceError, NoDataFoundError, LoginError
from .utils import baostock_login_context

# Get a logger instance for this module
logger = logging.getLogger(__name__)

DEFAULT_K_FIELDS = [
    "date", "code", "open", "high", "low", "close", "preclose",
    "volume", "amount", "adjustflag", "turn", "tradestatus",
    "pctChg", "peTTM", "pbMRQ", "psTTM", "pcfNcfTTM", "isST"
]

DEFAULT_BASIC_FIELDS = [
    "code", "tradeStatus", "code_name"
    # Add more default fields as needed, e.g., "industry", "listingDate"
]

# Helper function to reduce repetition in financial data fetching


def _fetch_financial_data(
    bs_query_func,
    data_type_name: str,
    code: str,
    year: str,
    quarter: int
) -> pd.DataFrame:
    logger.info(
        f"Fetching {data_type_name} data for {code}, year={year}, quarter={quarter}")
    try:
        with baostock_login_context():
            # Assuming all these functions take code, year, quarter
            rs = bs_query_func(code=code, year=year, quarter=quarter)

            if rs.error_code != '0':
                logger.error(
                    f"Baostock API error ({data_type_name}) for {code}: {rs.error_msg} (code: {rs.error_code})")
                if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                    raise NoDataFoundError(
                        f"No {data_type_name} data found for {code}, {year}Q{quarter}. Baostock msg: {rs.error_msg}")
                else:
                    raise DataSourceError(
                        f"Baostock API error fetching {data_type_name} data: {rs.error_msg} (code: {rs.error_code})")

            data_list = []
            while rs.next():
                data_list.append(rs.get_row_data())

            if not data_list:
                logger.warning(
                    f"No {data_type_name} data found for {code}, {year}Q{quarter} (empty result set from Baostock).")
                raise NoDataFoundError(
                    f"No {data_type_name} data found for {code}, {year}Q{quarter} (empty result set).")

            result_df = pd.DataFrame(data_list, columns=rs.fields)
            logger.info(
                f"Retrieved {len(result_df)} {data_type_name} records for {code}, {year}Q{quarter}.")
            return result_df

    except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
        logger.warning(
            f"Caught known error fetching {data_type_name} data for {code}: {type(e).__name__}")
        raise e
    except Exception as e:
        logger.exception(
            f"Unexpected error fetching {data_type_name} data for {code}: {e}")
        raise DataSourceError(
            f"Unexpected error fetching {data_type_name} data for {code}: {e}")

# Helper function to reduce repetition for index constituent data fetching


def _fetch_index_constituent_data(
    bs_query_func,
    index_name: str,
    date: Optional[str] = None
) -> pd.DataFrame:
    logger.info(
        f"Fetching {index_name} constituents for date={date or 'latest'}")
    try:
        with baostock_login_context():
            # date is optional, defaults to latest
            rs = bs_query_func(date=date)

            if rs.error_code != '0':
                logger.error(
                    f"Baostock API error ({index_name} Constituents) for date {date}: {rs.error_msg} (code: {rs.error_code})")
                if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                    raise NoDataFoundError(
                        f"No {index_name} constituent data found for date {date}. Baostock msg: {rs.error_msg}")
                else:
                    raise DataSourceError(
                        f"Baostock API error fetching {index_name} constituents: {rs.error_msg} (code: {rs.error_code})")

            data_list = []
            while rs.next():
                data_list.append(rs.get_row_data())

            if not data_list:
                logger.warning(
                    f"No {index_name} constituent data found for date {date} (empty result set).")
                raise NoDataFoundError(
                    f"No {index_name} constituent data found for date {date} (empty result set).")

            result_df = pd.DataFrame(data_list, columns=rs.fields)
            logger.info(
                f"Retrieved {len(result_df)} {index_name} constituents for date {date or 'latest'}.")
            return result_df

    except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
        logger.warning(
            f"Caught known error fetching {index_name} constituents for date {date}: {type(e).__name__}")
        raise e
    except Exception as e:
        logger.exception(
            f"Unexpected error fetching {index_name} constituents for date {date}: {e}")
        raise DataSourceError(
            f"Unexpected error fetching {index_name} constituents for date {date}: {e}")

# Helper function to reduce repetition for macroeconomic data fetching


def _fetch_macro_data(
    bs_query_func,
    data_type_name: str,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    **kwargs  # For extra params like yearType
) -> pd.DataFrame:
    date_range_log = f"from {start_date or 'default'} to {end_date or 'default'}"
    kwargs_log = f", extra_args={kwargs}" if kwargs else ""
    logger.info(f"Fetching {data_type_name} data {date_range_log}{kwargs_log}")
    try:
        with baostock_login_context():
            rs = bs_query_func(start_date=start_date,
                               end_date=end_date, **kwargs)

            if rs.error_code != '0':
                logger.error(
                    f"Baostock API error ({data_type_name}): {rs.error_msg} (code: {rs.error_code})")
                if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                    raise NoDataFoundError(
                        f"No {data_type_name} data found for the specified criteria. Baostock msg: {rs.error_msg}")
                else:
                    raise DataSourceError(
                        f"Baostock API error fetching {data_type_name} data: {rs.error_msg} (code: {rs.error_code})")

            data_list = []
            while rs.next():
                data_list.append(rs.get_row_data())

            if not data_list:
                logger.warning(
                    f"No {data_type_name} data found for the specified criteria (empty result set).")
                raise NoDataFoundError(
                    f"No {data_type_name} data found for the specified criteria (empty result set).")

            result_df = pd.DataFrame(data_list, columns=rs.fields)
            logger.info(
                f"Retrieved {len(result_df)} {data_type_name} records.")
            return result_df

    except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
        logger.warning(
            f"Caught known error fetching {data_type_name} data: {type(e).__name__}")
        raise e
    except Exception as e:
        logger.exception(
            f"Unexpected error fetching {data_type_name} data: {e}")
        raise DataSourceError(
            f"Unexpected error fetching {data_type_name} data: {e}")


class BaostockDataSource(FinancialDataSource):
    """
    Concrete implementation of FinancialDataSource using the Baostock library.
    """

    def _format_fields(self, fields: Optional[List[str]], default_fields: List[str]) -> str:
        """Formats the list of fields into a comma-separated string for Baostock."""
        if fields is None or not fields:
            logger.debug(
                f"No specific fields requested, using defaults: {default_fields}")
            return ",".join(default_fields)
        # Basic validation: ensure requested fields are strings
        if not all(isinstance(f, str) for f in fields):
            raise ValueError("All items in the fields list must be strings.")
        logger.debug(f"Using requested fields: {fields}")
        return ",".join(fields)

    def get_historical_k_data(
        self,
        code: str,
        start_date: str,
        end_date: str,
        frequency: str = "d",
        adjust_flag: str = "3",
        fields: Optional[List[str]] = None,
    ) -> pd.DataFrame:
        """Fetches historical K-line data using Baostock."""
        logger.info(
            f"Fetching K-data for {code} ({start_date} to {end_date}), freq={frequency}, adjust={adjust_flag}")
        try:
            formatted_fields = self._format_fields(fields, DEFAULT_K_FIELDS)
            logger.debug(
                f"Requesting fields from Baostock: {formatted_fields}")

            with baostock_login_context():
                rs = bs.query_history_k_data_plus(
                    code,
                    formatted_fields,
                    start_date=start_date,
                    end_date=end_date,
                    frequency=frequency,
                    adjustflag=adjust_flag
                )

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (K-data) for {code}: {rs.error_msg} (code: {rs.error_code})")
                    # Check common error codes, e.g., for no data
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':  # Example error code
                        raise NoDataFoundError(
                            f"No historical data found for {code} in the specified range. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching K-data: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No historical data found for {code} in range (empty result set from Baostock).")
                    raise NoDataFoundError(
                        f"No historical data found for {code} in the specified range (empty result set).")

                # Crucial: Use rs.fields for column names
                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(f"Retrieved {len(result_df)} records for {code}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            # Re-raise known errors
            logger.warning(
                f"Caught known error fetching K-data for {code}: {type(e).__name__}")
            raise e
        except Exception as e:
            # Wrap unexpected errors
            # Use logger.exception to include traceback
            logger.exception(
                f"Unexpected error fetching K-data for {code}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching K-data for {code}: {e}")

    def get_stock_basic_info(self, code: str, fields: Optional[List[str]] = None) -> pd.DataFrame:
        """Fetches basic stock information using Baostock."""
        logger.info(f"Fetching basic info for {code}")
        try:
            # Note: query_stock_basic doesn't seem to have a fields parameter in docs,
            # but we keep the signature consistent. It returns a fixed set.
            # We will use the `fields` argument post-query to select columns if needed.
            logger.debug(
                f"Requesting basic info for {code}. Optional fields requested: {fields}")

            with baostock_login_context():
                # Example: Fetch basic info; adjust API call if needed based on baostock docs
                # rs = bs.query_stock_basic(code=code, code_name=code_name) # If supporting name lookup
                rs = bs.query_stock_basic(code=code)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Basic Info) for {code}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                        raise NoDataFoundError(
                            f"No basic info found for {code}. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching basic info: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No basic info found for {code} (empty result set from Baostock).")
                    raise NoDataFoundError(
                        f"No basic info found for {code} (empty result set).")

                # Crucial: Use rs.fields for column names
                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved basic info for {code}. Columns: {result_df.columns.tolist()}")

                # Optional: Select subset of columns if `fields` argument was provided
                if fields:
                    available_cols = [
                        col for col in fields if col in result_df.columns]
                    if not available_cols:
                        raise ValueError(
                            f"None of the requested fields {fields} are available in the basic info result.")
                    logger.debug(
                        f"Selecting columns: {available_cols} from basic info for {code}")
                    result_df = result_df[available_cols]

                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching basic info for {code}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching basic info for {code}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching basic info for {code}: {e}")

    def get_dividend_data(self, code: str, year: str, year_type: str = "report") -> pd.DataFrame:
        """Fetches dividend information using Baostock."""
        logger.info(
            f"Fetching dividend data for {code}, year={year}, year_type={year_type}")
        try:
            with baostock_login_context():
                rs = bs.query_dividend_data(
                    code=code, year=year, yearType=year_type)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Dividend) for {code}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                        raise NoDataFoundError(
                            f"No dividend data found for {code} and year {year}. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching dividend data: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No dividend data found for {code}, year {year} (empty result set from Baostock).")
                    raise NoDataFoundError(
                        f"No dividend data found for {code}, year {year} (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved {len(result_df)} dividend records for {code}, year {year}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching dividend data for {code}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching dividend data for {code}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching dividend data for {code}: {e}")

    def get_adjust_factor_data(self, code: str, start_date: str, end_date: str) -> pd.DataFrame:
        """Fetches adjustment factor data using Baostock."""
        logger.info(
            f"Fetching adjustment factor data for {code} ({start_date} to {end_date})")
        try:
            with baostock_login_context():
                rs = bs.query_adjust_factor(
                    code=code, start_date=start_date, end_date=end_date)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Adjust Factor) for {code}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                        raise NoDataFoundError(
                            f"No adjustment factor data found for {code} in the specified range. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching adjust factor data: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No adjustment factor data found for {code} in range (empty result set from Baostock).")
                    raise NoDataFoundError(
                        f"No adjustment factor data found for {code} in the specified range (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved {len(result_df)} adjustment factor records for {code}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching adjust factor data for {code}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching adjust factor data for {code}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching adjust factor data for {code}: {e}")

    def get_profit_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        """Fetches quarterly profitability data using Baostock."""
        return _fetch_financial_data(bs.query_profit_data, "Profitability", code, year, quarter)

    def get_operation_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        """Fetches quarterly operation capability data using Baostock."""
        return _fetch_financial_data(bs.query_operation_data, "Operation Capability", code, year, quarter)

    def get_growth_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        """Fetches quarterly growth capability data using Baostock."""
        return _fetch_financial_data(bs.query_growth_data, "Growth Capability", code, year, quarter)

    def get_balance_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        """Fetches quarterly balance sheet data (solvency) using Baostock."""
        return _fetch_financial_data(bs.query_balance_data, "Balance Sheet", code, year, quarter)

    def get_cash_flow_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        """Fetches quarterly cash flow data using Baostock."""
        return _fetch_financial_data(bs.query_cash_flow_data, "Cash Flow", code, year, quarter)

    def get_dupont_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        """Fetches quarterly DuPont analysis data using Baostock."""
        return _fetch_financial_data(bs.query_dupont_data, "DuPont Analysis", code, year, quarter)

    def get_performance_express_report(self, code: str, start_date: str, end_date: str) -> pd.DataFrame:
        """Fetches performance express reports (业绩快报) using Baostock."""
        logger.info(
            f"Fetching Performance Express Report for {code} ({start_date} to {end_date})")
        try:
            with baostock_login_context():
                rs = bs.query_performance_express_report(
                    code=code, start_date=start_date, end_date=end_date)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Perf Express) for {code}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                        raise NoDataFoundError(
                            f"No performance express report found for {code} in range {start_date}-{end_date}. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching performance express report: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No performance express report found for {code} in range {start_date}-{end_date} (empty result set).")
                    raise NoDataFoundError(
                        f"No performance express report found for {code} in range {start_date}-{end_date} (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved {len(result_df)} performance express report records for {code}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching performance express report for {code}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching performance express report for {code}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching performance express report for {code}: {e}")

    def get_forecast_report(self, code: str, start_date: str, end_date: str) -> pd.DataFrame:
        """Fetches performance forecast reports (业绩预告) using Baostock."""
        logger.info(
            f"Fetching Performance Forecast Report for {code} ({start_date} to {end_date})")
        try:
            with baostock_login_context():
                rs = bs.query_forecast_report(
                    code=code, start_date=start_date, end_date=end_date)
                # Note: Baostock docs mention pagination for this, but the Python API doesn't seem to expose it directly.
                # We fetch all available pages in the loop below.

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Forecast) for {code}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                        raise NoDataFoundError(
                            f"No performance forecast report found for {code} in range {start_date}-{end_date}. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching performance forecast report: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():  # Loop should handle pagination implicitly if rs manages it
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No performance forecast report found for {code} in range {start_date}-{end_date} (empty result set).")
                    raise NoDataFoundError(
                        f"No performance forecast report found for {code} in range {start_date}-{end_date} (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved {len(result_df)} performance forecast report records for {code}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching performance forecast report for {code}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching performance forecast report for {code}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching performance forecast report for {code}: {e}")

    def get_stock_industry(self, code: Optional[str] = None, date: Optional[str] = None) -> pd.DataFrame:
        """Fetches industry classification using Baostock."""
        log_msg = f"Fetching industry data for code={code or 'all'}, date={date or 'latest'}"
        logger.info(log_msg)
        try:
            with baostock_login_context():
                rs = bs.query_stock_industry(code=code, date=date)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Industry) for {code}, {date}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                        raise NoDataFoundError(
                            f"No industry data found for {code}, {date}. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching industry data: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No industry data found for {code}, {date} (empty result set).")
                    raise NoDataFoundError(
                        f"No industry data found for {code}, {date} (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved {len(result_df)} industry records for {code or 'all'}, {date or 'latest'}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching industry data for {code}, {date}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching industry data for {code}, {date}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching industry data for {code}, {date}: {e}")

    def get_sz50_stocks(self, date: Optional[str] = None) -> pd.DataFrame:
        """Fetches SZSE 50 index constituents using Baostock."""
        return _fetch_index_constituent_data(bs.query_sz50_stocks, "SZSE 50", date)

    def get_hs300_stocks(self, date: Optional[str] = None) -> pd.DataFrame:
        """Fetches CSI 300 index constituents using Baostock."""
        return _fetch_index_constituent_data(bs.query_hs300_stocks, "CSI 300", date)

    def get_zz500_stocks(self, date: Optional[str] = None) -> pd.DataFrame:
        """Fetches CSI 500 index constituents using Baostock."""
        return _fetch_index_constituent_data(bs.query_zz500_stocks, "CSI 500", date)

    def get_trade_dates(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches trading dates using Baostock."""
        logger.info(
            f"Fetching trade dates from {start_date or 'default'} to {end_date or 'default'}")
        try:
            with baostock_login_context():  # Login might not be strictly needed for this, but keeping consistent
                rs = bs.query_trade_dates(
                    start_date=start_date, end_date=end_date)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Trade Dates): {rs.error_msg} (code: {rs.error_code})")
                    # Unlikely to have 'no record found' for dates, but handle API errors
                    raise DataSourceError(
                        f"Baostock API error fetching trade dates: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    # This case should ideally not happen if the API returns a valid range
                    logger.warning(
                        f"No trade dates returned for range {start_date}-{end_date} (empty result set).")
                    raise NoDataFoundError(
                        f"No trade dates found for range {start_date}-{end_date} (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(f"Retrieved {len(result_df)} trade date records.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching trade dates: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(f"Unexpected error fetching trade dates: {e}")
            raise DataSourceError(
                f"Unexpected error fetching trade dates: {e}")

    def get_all_stock(self, date: Optional[str] = None) -> pd.DataFrame:
        """Fetches all stock list for a given date using Baostock."""
        logger.info(f"Fetching all stock list for date={date or 'default'}")
        try:
            with baostock_login_context():
                rs = bs.query_all_stock(day=date)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (All Stock) for date {date}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':  # Check if this applies
                        raise NoDataFoundError(
                            f"No stock data found for date {date}. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching all stock list: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No stock list returned for date {date} (empty result set).")
                    raise NoDataFoundError(
                        f"No stock list found for date {date} (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved {len(result_df)} stock records for date {date or 'default'}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching all stock list for date {date}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching all stock list for date {date}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching all stock list for date {date}: {e}")

    def get_deposit_rate_data(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches benchmark deposit rates using Baostock."""
        return _fetch_macro_data(bs.query_deposit_rate_data, "Deposit Rate", start_date, end_date)

    def get_loan_rate_data(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches benchmark loan rates using Baostock."""
        return _fetch_macro_data(bs.query_loan_rate_data, "Loan Rate", start_date, end_date)

    def get_required_reserve_ratio_data(self, start_date: Optional[str] = None, end_date: Optional[str] = None, year_type: str = '0') -> pd.DataFrame:
        """Fetches required reserve ratio data using Baostock."""
        # Note the extra yearType parameter handled by kwargs
        return _fetch_macro_data(bs.query_required_reserve_ratio_data, "Required Reserve Ratio", start_date, end_date, yearType=year_type)

    def get_money_supply_data_month(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches monthly money supply data (M0, M1, M2) using Baostock."""
        # Baostock expects YYYY-MM format for dates here
        return _fetch_macro_data(bs.query_money_supply_data_month, "Monthly Money Supply", start_date, end_date)

    def get_money_supply_data_year(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches yearly money supply data (M0, M1, M2 - year end balance) using Baostock."""
        # Baostock expects YYYY format for dates here
        return _fetch_macro_data(bs.query_money_supply_data_year, "Yearly Money Supply", start_date, end_date)

    # Note: SHIBOR is not available in current Baostock API bindings used; not implemented.

```