#
tokens: 24874/50000 17/17 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── LICENSE
├── mcp_server.py
├── pyproject.toml
├── README.md
├── resource
│   └── img
│       ├── ali.png
│       ├── gzh_code.jpg
│       ├── img_1.png
│       ├── img_2.png
│       └── planet.jpg
├── src
│   ├── __init__.py
│   ├── baostock_data_source.py
│   ├── data_source_interface.py
│   ├── formatting
│   │   ├── __init__.py
│   │   └── markdown_formatter.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── analysis.py
│   │   ├── base.py
│   │   ├── date_utils.py
│   │   ├── financial_reports.py
│   │   ├── helpers.py
│   │   ├── indices.py
│   │   ├── macroeconomic.py
│   │   ├── market_overview.py
│   │   └── stock_market.py
│   └── utils.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/
AGENTS.md
# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# Pipfile.lock

# PEP 582; used by PDM, PEP 582 proposal
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# VS Code settings
.vscode/

docs/





```

--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------

```python
# This file makes src a Python package

```

--------------------------------------------------------------------------------
/src/tools/__init__.py:
--------------------------------------------------------------------------------

```python
# Initialization file for tools package

```

--------------------------------------------------------------------------------
/src/formatting/__init__.py:
--------------------------------------------------------------------------------

```python
# Initialization file for formatting package

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "a-share-mcp"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
    "baostock>=0.8.9",
    "httpx>=0.28.1",
    "mcp[cli]>=1.2.0",
    "pandas>=2.2.3",
    "annotated-types>=0.7.0",
    "anyio>=4.9.0",
    "certifi>=2025.4.26",
    "click>=8.1.8",
    "colorama>=0.4.6",
    "h11>=0.16.0",
    "httpcore>=1.0.9",
    "httpx-sse>=0.4.0",
    "idna>=3.10",
    "markdown-it-py>=3.0.0",
    "mdurl>=0.1.2",
    "numpy>=2.2.5",
    "pydantic>=2.11.3",
    "pydantic-core>=2.33.1",
    "pydantic-settings>=2.9.1",
    "pygments>=2.19.1",
    "python-dateutil>=2.9.0",
    "python-dotenv>=1.1.0",
    "pytz>=2025.2",
    "rich>=14.0.0",
    "shellingham>=1.5.4",
    "six>=1.17.0",
    "sniffio>=1.3.1",
    "sse-starlette>=2.3.3",
    "starlette>=0.46.2",
    "tabulate>=0.9.0",
    "typer>=0.15.3",
    "typing-extensions>=4.13.2",
    "typing-inspection>=0.4.0",
    "tzdata>=2025.2",
    "uvicorn>=0.34.2",
]

```

--------------------------------------------------------------------------------
/src/utils.py:
--------------------------------------------------------------------------------

```python
# Utility functions, including the Baostock login context manager and logging setup
import baostock as bs
import os
import sys
import logging
from contextlib import contextmanager
from .data_source_interface import LoginError

# --- Logging Setup ---
def setup_logging(level=logging.INFO):
    """Configures basic logging for the application."""
    logging.basicConfig(
        level=level,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    # Optionally silence logs from dependencies if they are too verbose
    # logging.getLogger("mcp").setLevel(logging.WARNING)

# Get a logger instance for this module (optional, but good practice)
logger = logging.getLogger(__name__)

# --- Baostock Context Manager ---
@contextmanager
def baostock_login_context():
    """Context manager to handle Baostock login and logout, suppressing stdout messages."""
    # Redirect stdout to suppress login/logout messages
    original_stdout_fd = sys.stdout.fileno()
    saved_stdout_fd = os.dup(original_stdout_fd)
    devnull_fd = os.open(os.devnull, os.O_WRONLY)

    os.dup2(devnull_fd, original_stdout_fd)
    os.close(devnull_fd)

    logger.debug("Attempting Baostock login...")
    lg = bs.login()
    logger.debug(f"Login result: code={lg.error_code}, msg={lg.error_msg}")

    # Restore stdout
    os.dup2(saved_stdout_fd, original_stdout_fd)
    os.close(saved_stdout_fd)

    if lg.error_code != '0':
        # Log error before raising
        logger.error(f"Baostock login failed: {lg.error_msg}")
        raise LoginError(f"Baostock login failed: {lg.error_msg}")

    logger.info("Baostock login successful.")
    try:
        yield  # API calls happen here
    finally:
        # Redirect stdout again for logout
        original_stdout_fd = sys.stdout.fileno()
        saved_stdout_fd = os.dup(original_stdout_fd)
        devnull_fd = os.open(os.devnull, os.O_WRONLY)

        os.dup2(devnull_fd, original_stdout_fd)
        os.close(devnull_fd)

        logger.debug("Attempting Baostock logout...")
        bs.logout()
        logger.debug("Logout completed.")

        # Restore stdout
        os.dup2(saved_stdout_fd, original_stdout_fd)
        os.close(saved_stdout_fd)
        logger.info("Baostock logout successful.")

# You can add other utility functions or classes here if needed

```

--------------------------------------------------------------------------------
/src/formatting/markdown_formatter.py:
--------------------------------------------------------------------------------

```python
"""
Markdown formatting utilities for A-Share MCP Server.
"""
import pandas as pd
import logging
import json

logger = logging.getLogger(__name__)

# Configuration: Max rows to display in string outputs to protect context length
MAX_MARKDOWN_ROWS = 250


def format_df_to_markdown(df: pd.DataFrame, max_rows: int = None) -> str:
    """Formats a Pandas DataFrame to a Markdown string with row truncation.

    Args:
        df: The DataFrame to format
        max_rows: Maximum rows to include in output. Defaults to MAX_MARKDOWN_ROWS if None.

    Returns:
        A markdown formatted string representation of the DataFrame
    """
    if df is None or df.empty:
        logger.warning("Attempted to format an empty DataFrame to Markdown.")
        return "(No data available to display)"

    if max_rows is None:
        max_rows = MAX_MARKDOWN_ROWS

    original_rows = df.shape[0]
    rows_to_show = min(original_rows, max_rows)
    df_display = df.head(rows_to_show)

    truncated = original_rows > rows_to_show

    try:
        markdown_table = df_display.to_markdown(index=False)
    except Exception as e:
        logger.error("Error converting DataFrame to Markdown: %s", e, exc_info=True)
        return "Error: Could not format data into Markdown table."

    if truncated:
        notes = f"rows truncated to {rows_to_show} from {original_rows}"
        return f"Note: Data truncated ({notes}).\n\n{markdown_table}"
    return markdown_table


def format_table_output(
    df: pd.DataFrame,
    format: str = "markdown",
    max_rows: int | None = None,
    meta: dict | None = None,
) -> str:
    """Formats a DataFrame into the requested string format with optional meta.

    Args:
        df: Data to format.
        format: 'markdown' | 'json' | 'csv'. Defaults to 'markdown'.
        max_rows: Optional max rows to include (defaults depend on formatters).
        meta: Optional metadata dict to include (prepended for markdown, embedded for json).

    Returns:
        A string suitable for tool responses.
    """
    fmt = (format or "markdown").lower()

    # Normalize row cap
    if max_rows is None:
        max_rows = MAX_MARKDOWN_ROWS if fmt == "markdown" else MAX_MARKDOWN_ROWS

    total_rows = 0 if df is None else int(df.shape[0])
    rows_to_show = 0 if df is None else min(total_rows, max_rows)
    truncated = total_rows > rows_to_show
    df_display = df.head(rows_to_show) if df is not None else pd.DataFrame()

    if fmt == "markdown":
        header = ""
        if meta:
            # Render a compact meta header
            lines = ["Meta:"]
            for k, v in meta.items():
                lines.append(f"- {k}: {v}")
            header = "\n".join(lines) + "\n\n"
        return header + format_df_to_markdown(df_display, max_rows=max_rows)

    if fmt == "csv":
        try:
            return df_display.to_csv(index=False)
        except Exception as e:
            logger.error("Error converting DataFrame to CSV: %s", e, exc_info=True)
            return "Error: Could not format data into CSV."

    if fmt == "json":
        try:
            payload = {
                "data": [] if df_display is None else df_display.to_dict(orient="records"),
                "meta": {
                    **(meta or {}),
                    "total_rows": total_rows,
                    "returned_rows": rows_to_show,
                    "truncated": truncated,
                    "columns": [] if df_display is None else list(df_display.columns),
                },
            }
            return json.dumps(payload, ensure_ascii=False)
        except Exception as e:
            logger.error("Error converting DataFrame to JSON: %s", e, exc_info=True)
            return "Error: Could not format data into JSON."

    # Fallback to markdown if unknown format
    logger.warning("Unknown format '%s', falling back to markdown", fmt)
    return format_df_to_markdown(df_display, max_rows=max_rows)

```

--------------------------------------------------------------------------------
/src/tools/helpers.py:
--------------------------------------------------------------------------------

```python
"""
Helper tools for code normalization and constants discovery.
These are agent-friendly utilities with clear, unambiguous parameters.
"""
import logging
import re
from typing import Optional

from mcp.server.fastmcp import FastMCP

logger = logging.getLogger(__name__)


def register_helpers_tools(app: FastMCP):
    """
    Register helper/utility tools with the MCP app.
    """

    @app.tool()
    def normalize_stock_code(code: str) -> str:
        """
        Normalize a stock code to Baostock format.

        Rules:
            - If 6 digits and starts with '6' -> 'sh.<code>'
            - If 6 digits and starts with other -> 'sz.<code>'
            - Accept '600000.SH'/'000001.SZ' -> lower and reorder to 'sh.600000'/'sz.000001'
            - Accept 'sh600000'/'sz000001' -> insert dot

        Args:
            code: Raw stock code (e.g., '600000', '000001.SZ', 'sh600000').

        Returns:
            Normalized code like 'sh.600000' or an error string if invalid.

        Examples:
            - normalize_stock_code('600000') -> 'sh.600000'
            - normalize_stock_code('000001.SZ') -> 'sz.000001'
        """
        logger.info("Tool 'normalize_stock_code' called with input=%s", code)
        try:
            raw = (code or "").strip()
            if not raw:
                return "Error: 'code' is required."

            # Patterns
            m = re.fullmatch(r"(?i)(sh|sz)[\.]?(\d{6})", raw)
            if m:
                ex = m.group(1).lower()
                num = m.group(2)
                return f"{ex}.{num}"

            m2 = re.fullmatch(r"(\d{6})[\.]?(?i)(sh|sz)", raw)
            if m2:
                num = m2.group(1)
                ex = m2.group(2).lower()
                return f"{ex}.{num}"

            m3 = re.fullmatch(r"(\d{6})", raw)
            if m3:
                num = m3.group(1)
                ex = "sh" if num.startswith("6") else "sz"
                return f"{ex}.{num}"

            return "Error: Unsupported code format. Examples: 'sh.600000', '600000', '000001.SZ'."
        except Exception as e:
            logger.exception("Exception in normalize_stock_code: %s", e)
            return f"Error: {e}"

    @app.tool()
    def list_tool_constants(kind: Optional[str] = None) -> str:
        """
        List valid constants for tool parameters.

        Args:
            kind: Optional filter: 'frequency' | 'adjust_flag' | 'year_type' | 'index'. If None, show all.

        Returns:
            Markdown table(s) of constants and meanings.
        """
        logger.info("Tool 'list_tool_constants' called kind=%s", kind or "all")
        freq = [
            ("d", "daily"), ("w", "weekly"), ("m", "monthly"),
            ("5", "5 minutes"), ("15", "15 minutes"), ("30", "30 minutes"), ("60", "60 minutes"),
        ]
        adjust = [("1", "forward adjusted"), ("2", "backward adjusted"), ("3", "unadjusted")]
        year_type = [("report", "announcement year"), ("operate", "ex-dividend year")]
        index = [("hs300", "CSI 300"), ("sz50", "SSE 50"), ("zz500", "CSI 500")]

        sections = []
        def as_md(title: str, rows):
            if not rows:
                return ""
            header = f"### {title}\n\n| value | meaning |\n|---|---|\n"
            lines = [f"| {v} | {m} |" for (v, m) in rows]
            return header + "\n".join(lines) + "\n"

        k = (kind or "").strip().lower()
        if k in ("", "frequency"):
            sections.append(as_md("frequency", freq))
        if k in ("", "adjust_flag"):
            sections.append(as_md("adjust_flag", adjust))
        if k in ("", "year_type"):
            sections.append(as_md("year_type", year_type))
        if k in ("", "index"):
            sections.append(as_md("index", index))

        out = "\n".join(s for s in sections if s)
        if not out:
            return "Error: Invalid kind. Use one of 'frequency', 'adjust_flag', 'year_type', 'index'."
        return out


```

--------------------------------------------------------------------------------
/src/data_source_interface.py:
--------------------------------------------------------------------------------

```python
# Defines the abstract interface for financial data sources
from abc import ABC, abstractmethod
import pandas as pd
from typing import Optional, List

class DataSourceError(Exception):
    """Base exception for data source errors."""
    pass


class LoginError(DataSourceError):
    """Exception raised for login failures to the data source."""
    pass


class NoDataFoundError(DataSourceError):
    """Exception raised when no data is found for the given query."""
    pass


class FinancialDataSource(ABC):
    """
    Abstract base class defining the interface for financial data sources.
    Implementations of this class provide access to specific financial data APIs
    (e.g., Baostock, Akshare).
    """

    @abstractmethod
    def get_historical_k_data(
        self,
        code: str,
        start_date: str,
        end_date: str,
        frequency: str = "d",
        adjust_flag: str = "3",
        fields: Optional[List[str]] = None,
    ) -> pd.DataFrame:
        """
        Fetches historical K-line (OHLCV) data for a given stock code.

        Args:
            code: The stock code (e.g., 'sh.600000', 'sz.000001').
            start_date: Start date in 'YYYY-MM-DD' format.
            end_date: End date in 'YYYY-MM-DD' format.
            frequency: Data frequency. Common values depend on the underlying
                       source (e.g., 'd' for daily, 'w' for weekly, 'm' for monthly,
                       '5', '15', '30', '60' for minutes). Defaults to 'd'.
            adjust_flag: Adjustment flag for historical data. Common values
                         depend on the source (e.g., '1' for forward adjusted,
                         '2' for backward adjusted, '3' for non-adjusted).
                         Defaults to '3'.
            fields: Optional list of specific fields to retrieve. If None,
                    retrieves default fields defined by the implementation.

        Returns:
            A pandas DataFrame containing the historical K-line data, with
            columns corresponding to the requested fields.

        Raises:
            LoginError: If login to the data source fails.
            NoDataFoundError: If no data is found for the query.
            DataSourceError: For other data source related errors.
            ValueError: If input parameters are invalid.
        """
        pass

    @abstractmethod
    def get_stock_basic_info(self, code: str) -> pd.DataFrame:
        """
        Fetches basic information for a given stock code.

        Args:
            code: The stock code (e.g., 'sh.600000', 'sz.000001').

        Returns:
            A pandas DataFrame containing the basic stock information.
            The structure and columns depend on the underlying data source.
            Typically contains info like name, industry, listing date, etc.

        Raises:
            LoginError: If login to the data source fails.
            NoDataFoundError: If no data is found for the query.
            DataSourceError: For other data source related errors.
            ValueError: If the input code is invalid.
        """
        pass

    @abstractmethod
    def get_trade_dates(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches trading dates information within a range."""
        pass

    @abstractmethod
    def get_all_stock(self, date: Optional[str] = None) -> pd.DataFrame:
        """Fetches list of all stocks and their trading status on a given date."""
        pass

    @abstractmethod
    def get_deposit_rate_data(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches benchmark deposit rates."""
        pass

    @abstractmethod
    def get_loan_rate_data(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches benchmark loan rates."""
        pass

    @abstractmethod
    def get_required_reserve_ratio_data(self, start_date: Optional[str] = None, end_date: Optional[str] = None, year_type: str = '0') -> pd.DataFrame:
        """Fetches required reserve ratio data."""
        pass

    @abstractmethod
    def get_money_supply_data_month(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches monthly money supply data (M0, M1, M2)."""
        pass

    @abstractmethod
    def get_money_supply_data_year(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches yearly money supply data (M0, M1, M2 - year end balance)."""
        pass

    # Note: SHIBOR is not implemented in current Baostock bindings; no abstract method here.

```

--------------------------------------------------------------------------------
/src/tools/macroeconomic.py:
--------------------------------------------------------------------------------

```python
"""
Macroeconomic tools for the MCP server.
Fetch interest rates, money supply data, and more with consistent options.
"""
import logging
from typing import Optional

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource
from src.tools.base import call_macro_data_tool

logger = logging.getLogger(__name__)


def register_macroeconomic_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """
    Register macroeconomic data tools with the MCP app.

    Args:
        app: The FastMCP app instance
        active_data_source: The active financial data source
    """

    @app.tool()
    def get_deposit_rate_data(start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetches benchmark deposit rates (活期, 定期) within a date range.

        Args:
            start_date: Optional. Start date in 'YYYY-MM-DD' format.
            end_date: Optional. End date in 'YYYY-MM-DD' format.

        Returns:
            Markdown table with deposit rate data or an error message.
        """
        return call_macro_data_tool(
            "get_deposit_rate_data",
            active_data_source.get_deposit_rate_data,
            "Deposit Rate",
            start_date, end_date,
            limit=limit, format=format
        )

    @app.tool()
    def get_loan_rate_data(start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetches benchmark loan rates (贷款利率) within a date range.

        Args:
            start_date: Optional. Start date in 'YYYY-MM-DD' format.
            end_date: Optional. End date in 'YYYY-MM-DD' format.

        Returns:
            Markdown table with loan rate data or an error message.
        """
        return call_macro_data_tool(
            "get_loan_rate_data",
            active_data_source.get_loan_rate_data,
            "Loan Rate",
            start_date, end_date,
            limit=limit, format=format
        )

    @app.tool()
    def get_required_reserve_ratio_data(start_date: Optional[str] = None, end_date: Optional[str] = None, year_type: str = '0', limit: int = 250, format: str = "markdown") -> str:
        """
        Fetches required reserve ratio data (存款准备金率) within a date range.

        Args:
            start_date: Optional. Start date in 'YYYY-MM-DD' format.
            end_date: Optional. End date in 'YYYY-MM-DD' format.
            year_type: Optional. Year type for date filtering. '0' for announcement date (公告日期, default),
                    '1' for effective date (生效日期).

        Returns:
            Markdown table with required reserve ratio data or an error message.
        """
        # Basic validation for year_type
        if year_type not in ['0', '1']:
            logger.warning(f"Invalid year_type requested: {year_type}")
            return "Error: Invalid year_type '{year_type}'. Valid options are '0' (announcement date) or '1' (effective date)."

        return call_macro_data_tool(
            "get_required_reserve_ratio_data",
            active_data_source.get_required_reserve_ratio_data,
            "Required Reserve Ratio",
            start_date, end_date,
            limit=limit, format=format,
            yearType=year_type  # Pass the extra arg correctly named for Baostock
        )

    @app.tool()
    def get_money_supply_data_month(start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetches monthly money supply data (M0, M1, M2) within a date range.

        Args:
            start_date: Optional. Start date in 'YYYY-MM' format.
            end_date: Optional. End date in 'YYYY-MM' format.

        Returns:
            Markdown table with monthly money supply data or an error message.
        """
        # Add specific validation for YYYY-MM format if desired
        return call_macro_data_tool(
            "get_money_supply_data_month",
            active_data_source.get_money_supply_data_month,
            "Monthly Money Supply",
            start_date, end_date,
            limit=limit, format=format
        )

    @app.tool()
    def get_money_supply_data_year(start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetches yearly money supply data (M0, M1, M2 - year end balance) within a date range.

        Args:
            start_date: Optional. Start year in 'YYYY' format.
            end_date: Optional. End year in 'YYYY' format.

        Returns:
            Markdown table with yearly money supply data or an error message.
        """
        # Add specific validation for YYYY format if desired
        return call_macro_data_tool(
            "get_money_supply_data_year",
            active_data_source.get_money_supply_data_year,
            "Yearly Money Supply",
            start_date, end_date,
            limit=limit, format=format
        )

    # Note: SHIBOR 查询未在当前 baostock 绑定中提供,对应工具不实现。

```

--------------------------------------------------------------------------------
/src/tools/base.py:
--------------------------------------------------------------------------------

```python
"""
Base utilities for MCP tools.
Shared helpers for calling data sources with consistent formatting and errors.
"""
import logging
from typing import Callable, Optional
import pandas as pd

from src.formatting.markdown_formatter import format_df_to_markdown, format_table_output
from src.data_source_interface import NoDataFoundError, LoginError, DataSourceError

logger = logging.getLogger(__name__)


def call_financial_data_tool(
    tool_name: str,
    # Pass the bound method like active_data_source.get_profit_data
    data_source_method: Callable,
    data_type_name: str,
    code: str,
    year: str,
    quarter: int,
    *,
    limit: int = 250,
    format: str = "markdown",
) -> str:
    """
    Helper function to reduce repetition for financial data tools

    Args:
        tool_name: Name of the tool for logging
        data_source_method: Method to call on the data source
        data_type_name: Type of financial data (for logging)
        code: Stock code
        year: Year to query
        quarter: Quarter to query

    Returns:
        Markdown formatted string with results or error message
    """
    logger.info(f"Tool '{tool_name}' called for {code}, {year}Q{quarter}")
    try:
        # Basic validation
        if not year.isdigit() or len(year) != 4:
            logger.warning(f"Invalid year format requested: {year}")
            return f"Error: Invalid year '{year}'. Please provide a 4-digit year."
        if not 1 <= quarter <= 4:
            logger.warning(f"Invalid quarter requested: {quarter}")
            return f"Error: Invalid quarter '{quarter}'. Must be between 1 and 4."

        # Call the appropriate method on the already instantiated active_data_source
        df = data_source_method(code=code, year=year, quarter=quarter)
        logger.info(
            f"Successfully retrieved {data_type_name} data for {code}, {year}Q{quarter}.")
        meta = {"code": code, "year": year, "quarter": quarter, "dataset": data_type_name}
        return format_table_output(df, format=format, max_rows=limit, meta=meta)

    except NoDataFoundError as e:
        logger.warning(f"NoDataFoundError for {code}, {year}Q{quarter}: {e}")
        return f"Error: {e}"
    except LoginError as e:
        logger.error(f"LoginError for {code}: {e}")
        return f"Error: Could not connect to data source. {e}"
    except DataSourceError as e:
        logger.error(f"DataSourceError for {code}: {e}")
        return f"Error: An error occurred while fetching data. {e}"
    except ValueError as e:
        logger.warning(f"ValueError processing request for {code}: {e}")
        return f"Error: Invalid input parameter. {e}"
    except Exception as e:
        logger.exception(
            f"Unexpected Exception processing {tool_name} for {code}: {e}")
        return f"Error: An unexpected error occurred: {e}"


def call_macro_data_tool(
    tool_name: str,
    data_source_method: Callable,
    data_type_name: str,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    *,
    limit: int = 250,
    format: str = "markdown",
    **kwargs  # For extra params like year_type
) -> str:
    """
    Helper function for macroeconomic data tools

    Args:
        tool_name: Name of the tool for logging
        data_source_method: Method to call on the data source
        data_type_name: Type of data (for logging)
        start_date: Optional start date
        end_date: Optional end date
        **kwargs: Additional keyword arguments to pass to data_source_method

    Returns:
        Markdown formatted string with results or error message
    """
    date_range_log = f"from {start_date or 'default'} to {end_date or 'default'}"
    kwargs_log = f", extra_args={kwargs}" if kwargs else ""
    logger.info(f"Tool '{tool_name}' called {date_range_log}{kwargs_log}")
    try:
        # Call the appropriate method on the active_data_source
        df = data_source_method(start_date=start_date, end_date=end_date, **kwargs)
        logger.info(f"Successfully retrieved {data_type_name} data.")
        meta = {"dataset": data_type_name, "start_date": start_date, "end_date": end_date} | ({"extra": kwargs} if kwargs else {})
        return format_table_output(df, format=format, max_rows=limit, meta=meta)
    except NoDataFoundError as e:
        logger.warning(f"NoDataFoundError: {e}")
        return f"Error: {e}"
    except LoginError as e:
        logger.error(f"LoginError: {e}")
        return f"Error: Could not connect to data source. {e}"
    except DataSourceError as e:
        logger.error(f"DataSourceError: {e}")
        return f"Error: An error occurred while fetching data. {e}"
    except ValueError as e:
        logger.warning(f"ValueError: {e}")
        return f"Error: Invalid input parameter. {e}"
    except Exception as e:
        logger.exception(f"Unexpected Exception processing {tool_name}: {e}")
        return f"Error: An unexpected error occurred: {e}"


def call_index_constituent_tool(
    tool_name: str,
    data_source_method: Callable,
    index_name: str,
    date: Optional[str] = None,
    *,
    limit: int = 250,
    format: str = "markdown",
) -> str:
    """
    Helper function for index constituent tools

    Args:
        tool_name: Name of the tool for logging
        data_source_method: Method to call on the data source
        index_name: Name of the index (for logging)
        date: Optional date to query

    Returns:
        Markdown formatted string with results or error message
    """
    log_msg = f"Tool '{tool_name}' called for date={date or 'latest'}"
    logger.info(log_msg)
    try:
        # Add date validation if desired
        df = data_source_method(date=date)
        logger.info(
            f"Successfully retrieved {index_name} constituents for {date or 'latest'}.")
        meta = {"index": index_name, "as_of": date or "latest"}
        return format_table_output(df, format=format, max_rows=limit, meta=meta)
    except NoDataFoundError as e:
        logger.warning(f"NoDataFoundError: {e}")
        return f"Error: {e}"
    except LoginError as e:
        logger.error(f"LoginError: {e}")
        return f"Error: Could not connect to data source. {e}"
    except DataSourceError as e:
        logger.error(f"DataSourceError: {e}")
        return f"Error: An error occurred while fetching data. {e}"
    except ValueError as e:
        logger.warning(f"ValueError: {e}")
        return f"Error: Invalid input parameter. {e}"
    except Exception as e:
        logger.exception(f"Unexpected Exception processing {tool_name}: {e}")
        return f"Error: An unexpected error occurred: {e}"

```

--------------------------------------------------------------------------------
/src/tools/market_overview.py:
--------------------------------------------------------------------------------

```python
"""
Market overview tools for the MCP server.
Includes trading calendar, stock list, and discovery helpers.
"""
import logging
from typing import Optional

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource, NoDataFoundError, LoginError, DataSourceError
from src.formatting.markdown_formatter import format_df_to_markdown, format_table_output

logger = logging.getLogger(__name__)


def register_market_overview_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """
    Register market overview tools with the MCP app.

    Args:
        app: The FastMCP app instance
        active_data_source: The active financial data source
    """

    @app.tool()
    def get_trade_dates(start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetch trading dates within a specified range.

        Args:
            start_date: Optional. Start date in 'YYYY-MM-DD' format. Defaults to 2015-01-01 if None.
            end_date: Optional. End date in 'YYYY-MM-DD' format. Defaults to the current date if None.

        Returns:
            Markdown table with 'is_trading_day' (1=trading, 0=non-trading).
        """
        logger.info(
            f"Tool 'get_trade_dates' called for range {start_date or 'default'} to {end_date or 'default'}")
        try:
            # Add date validation if desired
            df = active_data_source.get_trade_dates(
                start_date=start_date, end_date=end_date)
            logger.info("Successfully retrieved trade dates.")
            meta = {"start_date": start_date or "default", "end_date": end_date or "default"}
            return format_table_output(df, format=format, max_rows=limit, meta=meta)

        except NoDataFoundError as e:
            logger.warning(f"NoDataFoundError: {e}")
            return f"Error: {e}"
        except LoginError as e:
            logger.error(f"LoginError: {e}")
            return f"Error: Could not connect to data source. {e}"
        except DataSourceError as e:
            logger.error(f"DataSourceError: {e}")
            return f"Error: An error occurred while fetching data. {e}"
        except ValueError as e:
            logger.warning(f"ValueError: {e}")
            return f"Error: Invalid input parameter. {e}"
        except Exception as e:
            logger.exception(
                f"Unexpected Exception processing get_trade_dates: {e}")
            return f"Error: An unexpected error occurred: {e}"

    @app.tool()
    def get_all_stock(date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetch a list of all stocks (A-shares and indices) and their trading status for a date.

        Args:
            date: Optional. The date in 'YYYY-MM-DD' format. If None, uses the current date.

        Returns:
            Markdown table listing stock codes and trading status (1=trading, 0=suspended).
        """
        logger.info(
            f"Tool 'get_all_stock' called for date={date or 'default'}")
        try:
            # Add date validation if desired
            df = active_data_source.get_all_stock(date=date)
            logger.info(
                f"Successfully retrieved stock list for {date or 'default'}.")
            meta = {"as_of": date or "default"}
            return format_table_output(df, format=format, max_rows=limit, meta=meta)

        except NoDataFoundError as e:
            logger.warning(f"NoDataFoundError: {e}")
            return f"Error: {e}"
        except LoginError as e:
            logger.error(f"LoginError: {e}")
            return f"Error: Could not connect to data source. {e}"
        except DataSourceError as e:
            logger.error(f"DataSourceError: {e}")
            return f"Error: An error occurred while fetching data. {e}"
        except ValueError as e:
            logger.warning(f"ValueError: {e}")
            return f"Error: Invalid input parameter. {e}"
        except Exception as e:
            logger.exception(
                f"Unexpected Exception processing get_all_stock: {e}")
            return f"Error: An unexpected error occurred: {e}"

    @app.tool()
    def search_stocks(keyword: str, date: Optional[str] = None, limit: int = 50, format: str = "markdown") -> str:
        """
        Search stocks by code substring on a date.

        Args:
            keyword: Substring to match in the stock code (e.g., '600', '000001').
            date: Optional 'YYYY-MM-DD'. If None, uses current date.
            limit: Max rows to return. Defaults to 50.
            format: Output format: 'markdown' | 'json' | 'csv'. Defaults to 'markdown'.

        Returns:
            Matching stock codes with their trading status.
        """
        logger.info("Tool 'search_stocks' called keyword=%s, date=%s, limit=%s, format=%s", keyword, date or "default", limit, format)
        try:
            if not keyword or not keyword.strip():
                return "Error: 'keyword' is required (substring of code)."
            df = active_data_source.get_all_stock(date=date)
            if df is None or df.empty:
                return "(No data available to display)"
            kw = keyword.strip().lower()
            # baostock returns 'code' like 'sh.600000'
            filtered = df[df["code"].str.lower().str.contains(kw, na=False)]
            meta = {"keyword": keyword, "as_of": date or "current"}
            return format_table_output(filtered, format=format, max_rows=limit, meta=meta)
        except Exception as e:
            logger.exception("Exception processing search_stocks: %s", e)
            return f"Error: An unexpected error occurred: {e}"

    @app.tool()
    def get_suspensions(date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        List suspended stocks for a date.

        Args:
            date: Optional 'YYYY-MM-DD'. If None, uses current date.
            limit: Max rows to return. Defaults to 250.
            format: Output format: 'markdown' | 'json' | 'csv'. Defaults to 'markdown'.

        Returns:
            Table of stocks where tradeStatus==0.
        """
        logger.info("Tool 'get_suspensions' called date=%s, limit=%s, format=%s", date or "current", limit, format)
        try:
            df = active_data_source.get_all_stock(date=date)
            if df is None or df.empty:
                return "(No data available to display)"
            # tradeStatus: '1' trading, '0' suspended
            if "tradeStatus" not in df.columns:
                return "Error: 'tradeStatus' column not present in data source response."
            suspended = df[df["tradeStatus"] == '0']
            meta = {"as_of": date or "current", "total_suspended": int(suspended.shape[0])}
            return format_table_output(suspended, format=format, max_rows=limit, meta=meta)
        except Exception as e:
            logger.exception("Exception processing get_suspensions: %s", e)
            return f"Error: An unexpected error occurred: {e}"

```

--------------------------------------------------------------------------------
/src/tools/financial_reports.py:
--------------------------------------------------------------------------------

```python
"""
Financial report tools for the MCP server.
Clear, strongly-typed parameters; consistent output options.
"""
import logging
from typing import List, Optional

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource
from src.tools.base import call_financial_data_tool

logger = logging.getLogger(__name__)


def register_financial_report_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """
    Register financial report related tools with the MCP app.

    Args:
        app: The FastMCP app instance
        active_data_source: The active financial data source
    """

    @app.tool()
    def get_profit_data(code: str, year: str, quarter: int, limit: int = 250, format: str = "markdown") -> str:
        """
        Get quarterly profitability data (e.g., ROE, net profit margin) for a stock.

        Args:
            code: The stock code (e.g., 'sh.600000').
            year: The 4-digit year (e.g., '2023').
            quarter: The quarter (1, 2, 3, or 4).

        Returns:
            Profitability metrics table.
        """
        return call_financial_data_tool(
            "get_profit_data",
            active_data_source.get_profit_data,
            "Profitability",
            code, year, quarter,
            limit=limit, format=format
        )

    @app.tool()
    def get_operation_data(code: str, year: str, quarter: int, limit: int = 250, format: str = "markdown") -> str:
        """
        Get quarterly operation capability data (e.g., turnover ratios) for a stock.

        Args:
            code: The stock code (e.g., 'sh.600000').
            year: The 4-digit year (e.g., '2023').
            quarter: The quarter (1, 2, 3, or 4).

        Returns:
            Operation capability metrics table.
        """
        return call_financial_data_tool(
            "get_operation_data",
            active_data_source.get_operation_data,
            "Operation Capability",
            code, year, quarter,
            limit=limit, format=format
        )

    @app.tool()
    def get_growth_data(code: str, year: str, quarter: int, limit: int = 250, format: str = "markdown") -> str:
        """
        Get quarterly growth capability data (e.g., YOY growth rates) for a stock.

        Args:
            code: The stock code (e.g., 'sh.600000').
            year: The 4-digit year (e.g., '2023').
            quarter: The quarter (1, 2, 3, or 4).

        Returns:
            Growth capability metrics table.
        """
        return call_financial_data_tool(
            "get_growth_data",
            active_data_source.get_growth_data,
            "Growth Capability",
            code, year, quarter,
            limit=limit, format=format
        )

    @app.tool()
    def get_balance_data(code: str, year: str, quarter: int, limit: int = 250, format: str = "markdown") -> str:
        """
        Get quarterly balance sheet / solvency data (e.g., current ratio, debt ratio) for a stock.

        Args:
            code: The stock code (e.g., 'sh.600000').
            year: The 4-digit year (e.g., '2023').
            quarter: The quarter (1, 2, 3, or 4).

        Returns:
            Balance sheet metrics table.
        """
        return call_financial_data_tool(
            "get_balance_data",
            active_data_source.get_balance_data,
            "Balance Sheet",
            code, year, quarter,
            limit=limit, format=format
        )

    @app.tool()
    def get_cash_flow_data(code: str, year: str, quarter: int, limit: int = 250, format: str = "markdown") -> str:
        """
        Get quarterly cash flow data (e.g., CFO/Operating Revenue ratio) for a stock.

        Args:
            code: The stock code (e.g., 'sh.600000').
            year: The 4-digit year (e.g., '2023').
            quarter: The quarter (1, 2, 3, or 4).

        Returns:
            Cash flow metrics table.
        """
        return call_financial_data_tool(
            "get_cash_flow_data",
            active_data_source.get_cash_flow_data,
            "Cash Flow",
            code, year, quarter,
            limit=limit, format=format
        )

    @app.tool()
    def get_dupont_data(code: str, year: str, quarter: int, limit: int = 250, format: str = "markdown") -> str:
        """
        Get quarterly DuPont analysis data (ROE decomposition) for a stock.

        Args:
            code: The stock code (e.g., 'sh.600000').
            year: The 4-digit year (e.g., '2023').
            quarter: The quarter (1, 2, 3, or 4).

        Returns:
            DuPont analysis metrics table.
        """
        return call_financial_data_tool(
            "get_dupont_data",
            active_data_source.get_dupont_data,
            "DuPont Analysis",
            code, year, quarter,
            limit=limit, format=format
        )

    @app.tool()
    def get_performance_express_report(code: str, start_date: str, end_date: str, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetches performance express reports (业绩快报) for a stock within a date range.
        Note: Companies are not required to publish these except in specific cases.

        Args:
            code: The stock code (e.g., 'sh.600000').
            start_date: Start date (for report publication/update) in 'YYYY-MM-DD' format.
            end_date: End date (for report publication/update) in 'YYYY-MM-DD' format.

        Returns:
            Markdown table with performance express report data or an error message.
        """
        logger.info(
            f"Tool 'get_performance_express_report' called for {code} ({start_date} to {end_date})")
        try:
            # Add date validation if desired
            df = active_data_source.get_performance_express_report(
                code=code, start_date=start_date, end_date=end_date)
            logger.info(
                f"Successfully retrieved performance express reports for {code}.")
            from src.formatting.markdown_formatter import format_table_output
            meta = {"code": code, "start_date": start_date, "end_date": end_date, "dataset": "performance_express"}
            return format_table_output(df, format=format, max_rows=limit, meta=meta)

        except Exception as e:
            logger.exception(
                f"Exception processing get_performance_express_report for {code}: {e}")
            return f"Error: An unexpected error occurred: {e}"

    @app.tool()
    def get_forecast_report(code: str, start_date: str, end_date: str, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetches performance forecast reports (业绩预告) for a stock within a date range.
        Note: Companies are not required to publish these except in specific cases.

        Args:
            code: The stock code (e.g., 'sh.600000').
            start_date: Start date (for report publication/update) in 'YYYY-MM-DD' format.
            end_date: End date (for report publication/update) in 'YYYY-MM-DD' format.

        Returns:
            Markdown table with performance forecast report data or an error message.
        """
        logger.info(
            f"Tool 'get_forecast_report' called for {code} ({start_date} to {end_date})")
        try:
            # Add date validation if desired
            df = active_data_source.get_forecast_report(
                code=code, start_date=start_date, end_date=end_date)
            logger.info(
                f"Successfully retrieved performance forecast reports for {code}.")
            from src.formatting.markdown_formatter import format_table_output
            meta = {"code": code, "start_date": start_date, "end_date": end_date, "dataset": "forecast"}
            return format_table_output(df, format=format, max_rows=limit, meta=meta)

        except Exception as e:
            logger.exception(
                f"Exception processing get_forecast_report for {code}: {e}")
            return f"Error: An unexpected error occurred: {e}"

```

--------------------------------------------------------------------------------
/src/tools/indices.py:
--------------------------------------------------------------------------------

```python
"""
Index-related tools for the MCP server.
Includes index constituents and industry utilities with clear, discoverable parameters.
"""
import logging
from typing import Optional, List

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource
from src.tools.base import call_index_constituent_tool
from src.formatting.markdown_formatter import format_table_output

logger = logging.getLogger(__name__)


def register_index_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """
    Register index related tools with the MCP app.

    Args:
        app: The FastMCP app instance
        active_data_source: The active financial data source
    """

    @app.tool()
    def get_stock_industry(code: Optional[str] = None, date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        Get industry classification for a specific stock or all stocks on a date.

        Args:
            code: Optional stock code in Baostock format (e.g., 'sh.600000'). If None, returns all.
            date: Optional 'YYYY-MM-DD'. If None, uses the latest available date.

        Returns:
            Markdown table with industry data or an error message.
        """
        log_msg = f"Tool 'get_stock_industry' called for code={code or 'all'}, date={date or 'latest'}"
        logger.info(log_msg)
        try:
            # Add date validation if desired
            df = active_data_source.get_stock_industry(code=code, date=date)
            logger.info(
                f"Successfully retrieved industry data for {code or 'all'}, {date or 'latest'}.")
            meta = {"code": code or "all", "as_of": date or "latest"}
            return format_table_output(df, format=format, max_rows=limit, meta=meta)

        except Exception as e:
            logger.exception(
                f"Exception processing get_stock_industry: {e}")
            return f"Error: An unexpected error occurred: {e}"

    @app.tool()
    def get_sz50_stocks(date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetches the constituent stocks of the SZSE 50 Index for a given date.

        Args:
            date: Optional. The date in 'YYYY-MM-DD' format. If None, uses the latest available date.

        Returns:
            Markdown table with SZSE 50 constituent stocks or an error message.
        """
        return call_index_constituent_tool(
            "get_sz50_stocks",
            active_data_source.get_sz50_stocks,
            "SZSE 50",
            date,
            limit=limit, format=format
        )

    @app.tool()
    def get_hs300_stocks(date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetch the constituent stocks of the CSI 300 Index for a given date.

        Args:
            date: Optional 'YYYY-MM-DD'. If None, uses the latest available date.

        Returns:
            Markdown table with CSI 300 constituent stocks or an error message.
        """
        return call_index_constituent_tool(
            "get_hs300_stocks",
            active_data_source.get_hs300_stocks,
            "CSI 300",
            date,
            limit=limit, format=format
        )

    @app.tool()
    def get_zz500_stocks(date: Optional[str] = None, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetch the constituent stocks of the CSI 500 Index for a given date.

        Args:
            date: Optional 'YYYY-MM-DD'. If None, uses the latest available date.

        Returns:
            Markdown table with CSI 500 constituent stocks or an error message.
        """
        return call_index_constituent_tool(
            "get_zz500_stocks",
            active_data_source.get_zz500_stocks,
            "CSI 500",
            date,
            limit=limit, format=format
        )

    @app.tool()
    def get_index_constituents(
        index: str,
        date: Optional[str] = None,
        limit: int = 250,
        format: str = "markdown",
    ) -> str:
        """
        Get constituents for a major index.

        Args:
            index: One of 'hs300' (CSI 300), 'sz50' (SSE 50), 'zz500' (CSI 500).
            date: Optional 'YYYY-MM-DD'. If None, uses the latest available date.
            limit: Max rows to return (pagination helper). Defaults to 250.
            format: Output format: 'markdown' | 'json' | 'csv'. Defaults to 'markdown'.

        Returns:
            Table of index constituents in the requested format. Defaults to Markdown.

        Examples:
            - get_index_constituents(index='hs300')
            - get_index_constituents(index='sz50', date='2024-12-31', format='json', limit=100)
        """
        logger.info(
            f"Tool 'get_index_constituents' called index={index}, date={date or 'latest'}, limit={limit}, format={format}")
        try:
            key = (index or "").strip().lower()
            if key not in {"hs300", "sz50", "zz500"}:
                return "Error: Invalid index. Valid options are 'hs300', 'sz50', 'zz500'."

            if key == "hs300":
                df = active_data_source.get_hs300_stocks(date=date)
            elif key == "sz50":
                df = active_data_source.get_sz50_stocks(date=date)
            else:
                df = active_data_source.get_zz500_stocks(date=date)

            meta = {
                "index": key,
                "as_of": date or "latest",
            }
            return format_table_output(df, format=format, max_rows=limit, meta=meta)
        except Exception as e:
            logger.exception("Exception processing get_index_constituents: %s", e)
            return f"Error: An unexpected error occurred: {e}"

    @app.tool()
    def list_industries(date: Optional[str] = None, format: str = "markdown") -> str:
        """
        List distinct industries for a given date.

        Args:
            date: Optional 'YYYY-MM-DD'. If None, uses the latest available date.
            format: Output format: 'markdown' | 'json' | 'csv'. Defaults to 'markdown'.

        Returns:
            One-column table of industries.
        """
        logger.info("Tool 'list_industries' called date=%s", date or "latest")
        try:
            df = active_data_source.get_stock_industry(code=None, date=date)
            if df is None or df.empty:
                return "(No data available to display)"
            col = "industry" if "industry" in df.columns else df.columns[-1]
            out = df[[col]].drop_duplicates().sort_values(by=col)
            out = out.rename(columns={col: "industry"})
            meta = {"as_of": date or "latest", "count": int(out.shape[0])}
            return format_table_output(out, format=format, max_rows=out.shape[0], meta=meta)
        except Exception as e:
            logger.exception("Exception processing list_industries: %s", e)
            return f"Error: An unexpected error occurred: {e}"

    @app.tool()
    def get_industry_members(
        industry: str,
        date: Optional[str] = None,
        limit: int = 250,
        format: str = "markdown",
    ) -> str:
        """
        Get all stocks that belong to a given industry on a date.

        Args:
            industry: Exact industry name to filter by (see list_industries).
            date: Optional 'YYYY-MM-DD'. If None, uses the latest available date.
            limit: Max rows to return. Defaults to 250.
            format: Output format: 'markdown' | 'json' | 'csv'. Defaults to 'markdown'.

        Returns:
            Table of stocks in the given industry.
        """
        logger.info(
            "Tool 'get_industry_members' called industry=%s, date=%s, limit=%s, format=%s",
            industry, date or "latest", limit, format,
        )
        try:
            if not industry or not industry.strip():
                return "Error: 'industry' is required. Call list_industries() to discover available values."
            df = active_data_source.get_stock_industry(code=None, date=date)
            if df is None or df.empty:
                return "(No data available to display)"
            col = "industry" if "industry" in df.columns else df.columns[-1]
            filtered = df[df[col] == industry].copy()
            meta = {"industry": industry, "as_of": date or "latest"}
            return format_table_output(filtered, format=format, max_rows=limit, meta=meta)
        except Exception as e:
            logger.exception("Exception processing get_industry_members: %s", e)
            return f"Error: An unexpected error occurred: {e}"

```

--------------------------------------------------------------------------------
/src/tools/date_utils.py:
--------------------------------------------------------------------------------

```python
"""
Date utility tools for the MCP server.
Convenience helpers around trading days and analysis timeframes.
"""
import logging
from datetime import datetime, timedelta
import calendar

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource

logger = logging.getLogger(__name__)


def register_date_utils_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """
    Register date utility tools with the MCP app.

    Args:
        app: The FastMCP app instance
        active_data_source: The active financial data source
    """

    @app.tool()
    def get_latest_trading_date() -> str:
        """
        Get the latest trading date up to today.

        Returns:
            The latest trading date in 'YYYY-MM-DD' format.
        """
        logger.info("Tool 'get_latest_trading_date' called")
        try:
            today = datetime.now().strftime("%Y-%m-%d")
            # Query within the current month (safe bound)
            start_date = (datetime.now().replace(day=1)).strftime("%Y-%m-%d")
            end_date = (datetime.now().replace(day=28)).strftime("%Y-%m-%d")

            df = active_data_source.get_trade_dates(
                start_date=start_date, end_date=end_date)

            valid_trading_days = df[df['is_trading_day'] == '1']['calendar_date'].tolist()

            latest_trading_date = None
            for dstr in valid_trading_days:
                if dstr <= today and (latest_trading_date is None or dstr > latest_trading_date):
                    latest_trading_date = dstr

            if latest_trading_date:
                logger.info("Latest trading date found: %s", latest_trading_date)
                return latest_trading_date
            else:
                logger.warning("No trading dates found before today, returning today's date")
                return today

        except Exception as e:
            logger.exception("Error determining latest trading date: %s", e)
            return datetime.now().strftime("%Y-%m-%d")

    @app.tool()
    def get_market_analysis_timeframe(period: str = "recent") -> str:
        """
        Get a market analysis timeframe label tuned for current calendar context.

        Args:
            period: One of 'recent' (default), 'quarter', 'half_year', 'year'.

        Returns:
            A human-friendly label plus ISO range, like "2025年1月-3月 (ISO: 2025-01-01 至 2025-03-31)".
        """
        logger.info(
            f"Tool 'get_market_analysis_timeframe' called with period={period}")

        now = datetime.now()
        end_date = now

        if period == "recent":
            if now.day < 15:
                if now.month == 1:
                    start_date = datetime(now.year - 1, 11, 1)
                    middle_date = datetime(now.year - 1, 12, 1)
                elif now.month == 2:
                    start_date = datetime(now.year, 1, 1)
                    middle_date = start_date
                else:
                    start_date = datetime(now.year, now.month - 2, 1)
                    middle_date = datetime(now.year, now.month - 1, 1)
            else:
                if now.month == 1:
                    start_date = datetime(now.year - 1, 12, 1)
                    middle_date = start_date
                else:
                    start_date = datetime(now.year, now.month - 1, 1)
                    middle_date = start_date

        elif period == "quarter":
            if now.month <= 3:
                start_date = datetime(now.year - 1, now.month + 9, 1)
            else:
                start_date = datetime(now.year, now.month - 3, 1)
            middle_date = start_date

        elif period == "half_year":
            if now.month <= 6:
                start_date = datetime(now.year - 1, now.month + 6, 1)
            else:
                start_date = datetime(now.year, now.month - 6, 1)
            middle_date = datetime(start_date.year, start_date.month + 3, 1) if start_date.month <= 9 else \
                datetime(start_date.year + 1, start_date.month - 9, 1)

        elif period == "year":
            start_date = datetime(now.year - 1, now.month, 1)
            middle_date = datetime(start_date.year, start_date.month + 6, 1) if start_date.month <= 6 else \
                datetime(start_date.year + 1, start_date.month - 6, 1)
        else:
            if now.month == 1:
                start_date = datetime(now.year - 1, 12, 1)
            else:
                start_date = datetime(now.year, now.month - 1, 1)
            middle_date = start_date

        def get_month_end_day(year, month):
            return calendar.monthrange(year, month)[1]

        end_day = min(get_month_end_day(end_date.year, end_date.month), end_date.day)
        end_iso_date = f"{end_date.year}-{end_date.month:02d}-{end_day:02d}"

        start_iso_date = f"{start_date.year}-{start_date.month:02d}-01"

        if start_date.year != end_date.year:
            date_range = f"{start_date.year}年{start_date.month}月-{end_date.year}年{end_date.month}月"
        elif middle_date.month != start_date.month and middle_date.month != end_date.month:
            date_range = f"{start_date.year}年{start_date.month}月-{middle_date.month}月-{end_date.month}月"
        elif start_date.month != end_date.month:
            date_range = f"{start_date.year}年{start_date.month}月-{end_date.month}月"
        else:
            date_range = f"{start_date.year}年{start_date.month}月"

        result = f"{date_range} (ISO: {start_iso_date} to {end_iso_date})"
        logger.info(f"Generated market analysis timeframe: {result}")
        return result

    @app.tool()
    def is_trading_day(date: str) -> str:
        """
        Check whether a given date is a trading day.

        Args:
            date: 'YYYY-MM-DD'.

        Returns:
            'Yes' or 'No'.

        Examples:
            - is_trading_day('2025-01-03')
        """
        logger.info("Tool 'is_trading_day' called date=%s", date)
        try:
            df = active_data_source.get_trade_dates(start_date=date, end_date=date)
            if df is None or df.empty:
                return "No"
            flag_col = 'is_trading_day' if 'is_trading_day' in df.columns else df.columns[-1]
            val = str(df.iloc[0][flag_col])
            return "Yes" if val == '1' else "No"
        except Exception as e:
            logger.exception("Exception processing is_trading_day: %s", e)
            return f"Error: {e}"

    @app.tool()
    def previous_trading_day(date: str) -> str:
        """
        Get the previous trading day before a given date.

        Args:
            date: 'YYYY-MM-DD'.

        Returns:
            The previous trading day in 'YYYY-MM-DD'. If none found nearby, returns input date.
        """
        logger.info("Tool 'previous_trading_day' called date=%s", date)
        try:
            d = datetime.strptime(date, "%Y-%m-%d")
            start = (d - timedelta(days=30)).strftime("%Y-%m-%d")
            end = date
            df = active_data_source.get_trade_dates(start_date=start, end_date=end)
            if df is None or df.empty:
                return date
            flag_col = 'is_trading_day' if 'is_trading_day' in df.columns else df.columns[-1]
            day_col = 'calendar_date' if 'calendar_date' in df.columns else df.columns[0]
            candidates = df[(df[flag_col] == '1') & (df[day_col] < date)].sort_values(by=day_col)
            if candidates.empty:
                return date
            return str(candidates.iloc[-1][day_col])
        except Exception as e:
            logger.exception("Exception processing previous_trading_day: %s", e)
            return f"Error: {e}"

    @app.tool()
    def next_trading_day(date: str) -> str:
        """
        Get the next trading day after a given date.

        Args:
            date: 'YYYY-MM-DD'.

        Returns:
            The next trading day in 'YYYY-MM-DD'. If none found nearby, returns input date.
        """
        logger.info("Tool 'next_trading_day' called date=%s", date)
        try:
            d = datetime.strptime(date, "%Y-%m-%d")
            start = date
            end = (d + timedelta(days=30)).strftime("%Y-%m-%d")
            df = active_data_source.get_trade_dates(start_date=start, end_date=end)
            if df is None or df.empty:
                return date
            flag_col = 'is_trading_day' if 'is_trading_day' in df.columns else df.columns[-1]
            day_col = 'calendar_date' if 'calendar_date' in df.columns else df.columns[0]
            candidates = df[(df[flag_col] == '1') & (df[day_col] > date)].sort_values(by=day_col)
            if candidates.empty:
                return date
            return str(candidates.iloc[0][day_col])
        except Exception as e:
            logger.exception("Exception processing next_trading_day: %s", e)
            return f"Error: {e}"


```

--------------------------------------------------------------------------------
/src/tools/stock_market.py:
--------------------------------------------------------------------------------

```python
"""
Stock market tools for the MCP server.
Historical prices, basic info, dividends, and adjust factors with clear options.
"""
import logging
from typing import List, Optional

from mcp.server.fastmcp import FastMCP
from src.data_source_interface import FinancialDataSource, NoDataFoundError, LoginError, DataSourceError
from src.formatting.markdown_formatter import format_df_to_markdown, format_table_output

logger = logging.getLogger(__name__)


def register_stock_market_tools(app: FastMCP, active_data_source: FinancialDataSource):
    """
    Register stock market data tools with the MCP app.

    Args:
        app: The FastMCP app instance
        active_data_source: The active financial data source
    """

    @app.tool()
    def get_historical_k_data(
        code: str,
        start_date: str,
        end_date: str,
        frequency: str = "d",
        adjust_flag: str = "3",
        fields: Optional[List[str]] = None,
        limit: int = 250,
        format: str = "markdown",
    ) -> str:
        """
        Fetches historical K-line (OHLCV) data for a Chinese A-share stock.

        Args:
            code: The stock code in Baostock format (e.g., 'sh.600000', 'sz.000001').
            start_date: Start date in 'YYYY-MM-DD' format.
            end_date: End date in 'YYYY-MM-DD' format.
            frequency: Data frequency. Valid options (from Baostock):
                         'd': daily
                         'w': weekly
                         'm': monthly
                         '5': 5 minutes
                         '15': 15 minutes
                         '30': 30 minutes
                         '60': 60 minutes
                       Defaults to 'd'.
            adjust_flag: Adjustment flag for price/volume. Valid options (from Baostock):
                           '1': Forward adjusted (后复权)
                           '2': Backward adjusted (前复权)
                           '3': Non-adjusted (不复权)
                         Defaults to '3'.
            fields: Optional list of specific data fields to retrieve (must be valid Baostock fields).
                    If None or empty, default fields will be used (e.g., date, code, open, high, low, close, volume, amount, pctChg).
            limit: Max rows to return. Defaults to 250.
            format: Output format: 'markdown' | 'json' | 'csv'. Defaults to 'markdown'.

        Returns:
            A Markdown formatted string containing the K-line data table, or an error message.
            The table might be truncated if the result set is too large.
        """
        logger.info(
            f"Tool 'get_historical_k_data' called for {code} ({start_date}-{end_date}, freq={frequency}, adj={adjust_flag}, fields={fields})")
        try:
            # Validate frequency and adjust_flag if necessary (basic example)
            valid_freqs = ['d', 'w', 'm', '5', '15', '30', '60']
            valid_adjusts = ['1', '2', '3']
            if frequency not in valid_freqs:
                logger.warning(f"Invalid frequency requested: {frequency}")
                return f"Error: Invalid frequency '{frequency}'. Valid options are: {valid_freqs}"
            if adjust_flag not in valid_adjusts:
                logger.warning(f"Invalid adjust_flag requested: {adjust_flag}")
                return f"Error: Invalid adjust_flag '{adjust_flag}'. Valid options are: {valid_adjusts}"

            # Call the injected data source
            df = active_data_source.get_historical_k_data(
                code=code,
                start_date=start_date,
                end_date=end_date,
                frequency=frequency,
                adjust_flag=adjust_flag,
                fields=fields,
            )
            # Format the result
            logger.info(
                f"Successfully retrieved K-data for {code}, formatting output.")
            meta = {"code": code, "start_date": start_date, "end_date": end_date, "frequency": frequency, "adjust_flag": adjust_flag}
            return format_table_output(df, format=format, max_rows=limit, meta=meta)

        except NoDataFoundError as e:
            logger.warning(f"NoDataFoundError for {code}: {e}")
            return f"Error: {e}"
        except LoginError as e:
            logger.error(f"LoginError for {code}: {e}")
            return f"Error: Could not connect to data source. {e}"
        except DataSourceError as e:
            logger.error(f"DataSourceError for {code}: {e}")
            return f"Error: An error occurred while fetching data. {e}"
        except ValueError as e:
            logger.warning(f"ValueError processing request for {code}: {e}")
            return f"Error: Invalid input parameter. {e}"
        except Exception as e:
            # Catch-all for unexpected errors
            logger.exception(
                f"Unexpected Exception processing get_historical_k_data for {code}: {e}")
            return f"Error: An unexpected error occurred: {e}"

    @app.tool()
    def get_stock_basic_info(code: str, fields: Optional[List[str]] = None, format: str = "markdown") -> str:
        """
        Fetches basic information for a given Chinese A-share stock.

        Args:
            code: The stock code in Baostock format (e.g., 'sh.600000', 'sz.000001').
            fields: Optional list to select specific columns from the available basic info
                    (e.g., ['code', 'code_name', 'industry', 'listingDate']).
                    If None or empty, returns all available basic info columns from Baostock.

        Returns:
            Basic stock information in the requested format.
        """
        logger.info(
            f"Tool 'get_stock_basic_info' called for {code} (fields={fields})")
        try:
            # Call the injected data source
            # Pass fields along; BaostockDataSource implementation handles selection
            df = active_data_source.get_stock_basic_info(
                code=code, fields=fields)

            # Format the result (basic info usually small)
            logger.info(
                f"Successfully retrieved basic info for {code}, formatting output.")
            meta = {"code": code}
            return format_table_output(df, format=format, max_rows=df.shape[0] if df is not None else 0, meta=meta)

        except NoDataFoundError as e:
            logger.warning(f"NoDataFoundError for {code}: {e}")
            return f"Error: {e}"
        except LoginError as e:
            logger.error(f"LoginError for {code}: {e}")
            return f"Error: Could not connect to data source. {e}"
        except DataSourceError as e:
            logger.error(f"DataSourceError for {code}: {e}")
            return f"Error: An error occurred while fetching data. {e}"
        except ValueError as e:
            logger.warning(f"ValueError processing request for {code}: {e}")
            return f"Error: Invalid input parameter or requested field not available. {e}"
        except Exception as e:
            logger.exception(
                f"Unexpected Exception processing get_stock_basic_info for {code}: {e}")
            return f"Error: An unexpected error occurred: {e}"

    @app.tool()
    def get_dividend_data(code: str, year: str, year_type: str = "report", limit: int = 250, format: str = "markdown") -> str:
        """
        Fetches dividend information for a given stock code and year.

        Args:
            code: The stock code in Baostock format (e.g., 'sh.600000', 'sz.000001').
            year: The year to query (e.g., '2023').
            year_type: Type of year. Valid options (from Baostock):
                         'report': Announcement year (预案公告年份)
                         'operate': Ex-dividend year (除权除息年份)
                       Defaults to 'report'.

        Returns:
            Dividend records table.
        """
        logger.info(
            f"Tool 'get_dividend_data' called for {code}, year={year}, year_type={year_type}")
        try:
            # Basic validation
            if year_type not in ['report', 'operate']:
                logger.warning(f"Invalid year_type requested: {year_type}")
                return f"Error: Invalid year_type '{year_type}'. Valid options are: 'report', 'operate'"
            if not year.isdigit() or len(year) != 4:
                logger.warning(f"Invalid year format requested: {year}")
                return f"Error: Invalid year '{year}'. Please provide a 4-digit year."

            df = active_data_source.get_dividend_data(
                code=code, year=year, year_type=year_type)
            logger.info(
                f"Successfully retrieved dividend data for {code}, year {year}.")
            meta = {"code": code, "year": year, "year_type": year_type}
            return format_table_output(df, format=format, max_rows=limit, meta=meta)

        except NoDataFoundError as e:
            logger.warning(f"NoDataFoundError for {code}, year {year}: {e}")
            return f"Error: {e}"
        except LoginError as e:
            logger.error(f"LoginError for {code}: {e}")
            return f"Error: Could not connect to data source. {e}"
        except DataSourceError as e:
            logger.error(f"DataSourceError for {code}: {e}")
            return f"Error: An error occurred while fetching data. {e}"
        except ValueError as e:
            logger.warning(f"ValueError processing request for {code}: {e}")
            return f"Error: Invalid input parameter. {e}"
        except Exception as e:
            logger.exception(
                f"Unexpected Exception processing get_dividend_data for {code}: {e}")
            return f"Error: An unexpected error occurred: {e}"

    @app.tool()
    def get_adjust_factor_data(code: str, start_date: str, end_date: str, limit: int = 250, format: str = "markdown") -> str:
        """
        Fetches adjustment factor data for a given stock code and date range.
        Uses Baostock's "涨跌幅复权算法" factors. Useful for calculating adjusted prices.

        Args:
            code: The stock code in Baostock format (e.g., 'sh.600000', 'sz.000001').
            start_date: Start date in 'YYYY-MM-DD' format.
            end_date: End date in 'YYYY-MM-DD' format.

        Returns:
            Adjustment factors table.
        """
        logger.info(
            f"Tool 'get_adjust_factor_data' called for {code} ({start_date} to {end_date})")
        try:
            # Basic date validation could be added here if desired
            df = active_data_source.get_adjust_factor_data(
                code=code, start_date=start_date, end_date=end_date)
            logger.info(
                f"Successfully retrieved adjustment factor data for {code}.")
            meta = {"code": code, "start_date": start_date, "end_date": end_date}
            return format_table_output(df, format=format, max_rows=limit, meta=meta)

        except NoDataFoundError as e:
            logger.warning(f"NoDataFoundError for {code}: {e}")
            return f"Error: {e}"
        except LoginError as e:
            logger.error(f"LoginError for {code}: {e}")
            return f"Error: Could not connect to data source. {e}"
        except DataSourceError as e:
            logger.error(f"DataSourceError for {code}: {e}")
            return f"Error: An error occurred while fetching data. {e}"
        except ValueError as e:
            logger.warning(f"ValueError processing request for {code}: {e}")
            return f"Error: Invalid input parameter. {e}"
        except Exception as e:
            logger.exception(
                f"Unexpected Exception processing get_adjust_factor_data for {code}: {e}")
            return f"Error: An unexpected error occurred: {e}"

```

--------------------------------------------------------------------------------
/src/baostock_data_source.py:
--------------------------------------------------------------------------------

```python
# Implementation of the FinancialDataSource interface using Baostock
import baostock as bs
import pandas as pd
from typing import List, Optional
import logging
from .data_source_interface import FinancialDataSource, DataSourceError, NoDataFoundError, LoginError
from .utils import baostock_login_context

# Get a logger instance for this module
logger = logging.getLogger(__name__)

DEFAULT_K_FIELDS = [
    "date", "code", "open", "high", "low", "close", "preclose",
    "volume", "amount", "adjustflag", "turn", "tradestatus",
    "pctChg", "peTTM", "pbMRQ", "psTTM", "pcfNcfTTM", "isST"
]

DEFAULT_BASIC_FIELDS = [
    "code", "tradeStatus", "code_name"
    # Add more default fields as needed, e.g., "industry", "listingDate"
]

# Helper function to reduce repetition in financial data fetching


def _fetch_financial_data(
    bs_query_func,
    data_type_name: str,
    code: str,
    year: str,
    quarter: int
) -> pd.DataFrame:
    logger.info(
        f"Fetching {data_type_name} data for {code}, year={year}, quarter={quarter}")
    try:
        with baostock_login_context():
            # Assuming all these functions take code, year, quarter
            rs = bs_query_func(code=code, year=year, quarter=quarter)

            if rs.error_code != '0':
                logger.error(
                    f"Baostock API error ({data_type_name}) for {code}: {rs.error_msg} (code: {rs.error_code})")
                if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                    raise NoDataFoundError(
                        f"No {data_type_name} data found for {code}, {year}Q{quarter}. Baostock msg: {rs.error_msg}")
                else:
                    raise DataSourceError(
                        f"Baostock API error fetching {data_type_name} data: {rs.error_msg} (code: {rs.error_code})")

            data_list = []
            while rs.next():
                data_list.append(rs.get_row_data())

            if not data_list:
                logger.warning(
                    f"No {data_type_name} data found for {code}, {year}Q{quarter} (empty result set from Baostock).")
                raise NoDataFoundError(
                    f"No {data_type_name} data found for {code}, {year}Q{quarter} (empty result set).")

            result_df = pd.DataFrame(data_list, columns=rs.fields)
            logger.info(
                f"Retrieved {len(result_df)} {data_type_name} records for {code}, {year}Q{quarter}.")
            return result_df

    except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
        logger.warning(
            f"Caught known error fetching {data_type_name} data for {code}: {type(e).__name__}")
        raise e
    except Exception as e:
        logger.exception(
            f"Unexpected error fetching {data_type_name} data for {code}: {e}")
        raise DataSourceError(
            f"Unexpected error fetching {data_type_name} data for {code}: {e}")

# Helper function to reduce repetition for index constituent data fetching


def _fetch_index_constituent_data(
    bs_query_func,
    index_name: str,
    date: Optional[str] = None
) -> pd.DataFrame:
    logger.info(
        f"Fetching {index_name} constituents for date={date or 'latest'}")
    try:
        with baostock_login_context():
            # date is optional, defaults to latest
            rs = bs_query_func(date=date)

            if rs.error_code != '0':
                logger.error(
                    f"Baostock API error ({index_name} Constituents) for date {date}: {rs.error_msg} (code: {rs.error_code})")
                if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                    raise NoDataFoundError(
                        f"No {index_name} constituent data found for date {date}. Baostock msg: {rs.error_msg}")
                else:
                    raise DataSourceError(
                        f"Baostock API error fetching {index_name} constituents: {rs.error_msg} (code: {rs.error_code})")

            data_list = []
            while rs.next():
                data_list.append(rs.get_row_data())

            if not data_list:
                logger.warning(
                    f"No {index_name} constituent data found for date {date} (empty result set).")
                raise NoDataFoundError(
                    f"No {index_name} constituent data found for date {date} (empty result set).")

            result_df = pd.DataFrame(data_list, columns=rs.fields)
            logger.info(
                f"Retrieved {len(result_df)} {index_name} constituents for date {date or 'latest'}.")
            return result_df

    except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
        logger.warning(
            f"Caught known error fetching {index_name} constituents for date {date}: {type(e).__name__}")
        raise e
    except Exception as e:
        logger.exception(
            f"Unexpected error fetching {index_name} constituents for date {date}: {e}")
        raise DataSourceError(
            f"Unexpected error fetching {index_name} constituents for date {date}: {e}")

# Helper function to reduce repetition for macroeconomic data fetching


def _fetch_macro_data(
    bs_query_func,
    data_type_name: str,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    **kwargs  # For extra params like yearType
) -> pd.DataFrame:
    date_range_log = f"from {start_date or 'default'} to {end_date or 'default'}"
    kwargs_log = f", extra_args={kwargs}" if kwargs else ""
    logger.info(f"Fetching {data_type_name} data {date_range_log}{kwargs_log}")
    try:
        with baostock_login_context():
            rs = bs_query_func(start_date=start_date,
                               end_date=end_date, **kwargs)

            if rs.error_code != '0':
                logger.error(
                    f"Baostock API error ({data_type_name}): {rs.error_msg} (code: {rs.error_code})")
                if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                    raise NoDataFoundError(
                        f"No {data_type_name} data found for the specified criteria. Baostock msg: {rs.error_msg}")
                else:
                    raise DataSourceError(
                        f"Baostock API error fetching {data_type_name} data: {rs.error_msg} (code: {rs.error_code})")

            data_list = []
            while rs.next():
                data_list.append(rs.get_row_data())

            if not data_list:
                logger.warning(
                    f"No {data_type_name} data found for the specified criteria (empty result set).")
                raise NoDataFoundError(
                    f"No {data_type_name} data found for the specified criteria (empty result set).")

            result_df = pd.DataFrame(data_list, columns=rs.fields)
            logger.info(
                f"Retrieved {len(result_df)} {data_type_name} records.")
            return result_df

    except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
        logger.warning(
            f"Caught known error fetching {data_type_name} data: {type(e).__name__}")
        raise e
    except Exception as e:
        logger.exception(
            f"Unexpected error fetching {data_type_name} data: {e}")
        raise DataSourceError(
            f"Unexpected error fetching {data_type_name} data: {e}")


class BaostockDataSource(FinancialDataSource):
    """
    Concrete implementation of FinancialDataSource using the Baostock library.
    """

    def _format_fields(self, fields: Optional[List[str]], default_fields: List[str]) -> str:
        """Formats the list of fields into a comma-separated string for Baostock."""
        if fields is None or not fields:
            logger.debug(
                f"No specific fields requested, using defaults: {default_fields}")
            return ",".join(default_fields)
        # Basic validation: ensure requested fields are strings
        if not all(isinstance(f, str) for f in fields):
            raise ValueError("All items in the fields list must be strings.")
        logger.debug(f"Using requested fields: {fields}")
        return ",".join(fields)

    def get_historical_k_data(
        self,
        code: str,
        start_date: str,
        end_date: str,
        frequency: str = "d",
        adjust_flag: str = "3",
        fields: Optional[List[str]] = None,
    ) -> pd.DataFrame:
        """Fetches historical K-line data using Baostock."""
        logger.info(
            f"Fetching K-data for {code} ({start_date} to {end_date}), freq={frequency}, adjust={adjust_flag}")
        try:
            formatted_fields = self._format_fields(fields, DEFAULT_K_FIELDS)
            logger.debug(
                f"Requesting fields from Baostock: {formatted_fields}")

            with baostock_login_context():
                rs = bs.query_history_k_data_plus(
                    code,
                    formatted_fields,
                    start_date=start_date,
                    end_date=end_date,
                    frequency=frequency,
                    adjustflag=adjust_flag
                )

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (K-data) for {code}: {rs.error_msg} (code: {rs.error_code})")
                    # Check common error codes, e.g., for no data
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':  # Example error code
                        raise NoDataFoundError(
                            f"No historical data found for {code} in the specified range. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching K-data: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No historical data found for {code} in range (empty result set from Baostock).")
                    raise NoDataFoundError(
                        f"No historical data found for {code} in the specified range (empty result set).")

                # Crucial: Use rs.fields for column names
                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(f"Retrieved {len(result_df)} records for {code}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            # Re-raise known errors
            logger.warning(
                f"Caught known error fetching K-data for {code}: {type(e).__name__}")
            raise e
        except Exception as e:
            # Wrap unexpected errors
            # Use logger.exception to include traceback
            logger.exception(
                f"Unexpected error fetching K-data for {code}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching K-data for {code}: {e}")

    def get_stock_basic_info(self, code: str, fields: Optional[List[str]] = None) -> pd.DataFrame:
        """Fetches basic stock information using Baostock."""
        logger.info(f"Fetching basic info for {code}")
        try:
            # Note: query_stock_basic doesn't seem to have a fields parameter in docs,
            # but we keep the signature consistent. It returns a fixed set.
            # We will use the `fields` argument post-query to select columns if needed.
            logger.debug(
                f"Requesting basic info for {code}. Optional fields requested: {fields}")

            with baostock_login_context():
                # Example: Fetch basic info; adjust API call if needed based on baostock docs
                # rs = bs.query_stock_basic(code=code, code_name=code_name) # If supporting name lookup
                rs = bs.query_stock_basic(code=code)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Basic Info) for {code}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                        raise NoDataFoundError(
                            f"No basic info found for {code}. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching basic info: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No basic info found for {code} (empty result set from Baostock).")
                    raise NoDataFoundError(
                        f"No basic info found for {code} (empty result set).")

                # Crucial: Use rs.fields for column names
                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved basic info for {code}. Columns: {result_df.columns.tolist()}")

                # Optional: Select subset of columns if `fields` argument was provided
                if fields:
                    available_cols = [
                        col for col in fields if col in result_df.columns]
                    if not available_cols:
                        raise ValueError(
                            f"None of the requested fields {fields} are available in the basic info result.")
                    logger.debug(
                        f"Selecting columns: {available_cols} from basic info for {code}")
                    result_df = result_df[available_cols]

                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching basic info for {code}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching basic info for {code}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching basic info for {code}: {e}")

    def get_dividend_data(self, code: str, year: str, year_type: str = "report") -> pd.DataFrame:
        """Fetches dividend information using Baostock."""
        logger.info(
            f"Fetching dividend data for {code}, year={year}, year_type={year_type}")
        try:
            with baostock_login_context():
                rs = bs.query_dividend_data(
                    code=code, year=year, yearType=year_type)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Dividend) for {code}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                        raise NoDataFoundError(
                            f"No dividend data found for {code} and year {year}. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching dividend data: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No dividend data found for {code}, year {year} (empty result set from Baostock).")
                    raise NoDataFoundError(
                        f"No dividend data found for {code}, year {year} (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved {len(result_df)} dividend records for {code}, year {year}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching dividend data for {code}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching dividend data for {code}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching dividend data for {code}: {e}")

    def get_adjust_factor_data(self, code: str, start_date: str, end_date: str) -> pd.DataFrame:
        """Fetches adjustment factor data using Baostock."""
        logger.info(
            f"Fetching adjustment factor data for {code} ({start_date} to {end_date})")
        try:
            with baostock_login_context():
                rs = bs.query_adjust_factor(
                    code=code, start_date=start_date, end_date=end_date)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Adjust Factor) for {code}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                        raise NoDataFoundError(
                            f"No adjustment factor data found for {code} in the specified range. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching adjust factor data: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No adjustment factor data found for {code} in range (empty result set from Baostock).")
                    raise NoDataFoundError(
                        f"No adjustment factor data found for {code} in the specified range (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved {len(result_df)} adjustment factor records for {code}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching adjust factor data for {code}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching adjust factor data for {code}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching adjust factor data for {code}: {e}")

    def get_profit_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        """Fetches quarterly profitability data using Baostock."""
        return _fetch_financial_data(bs.query_profit_data, "Profitability", code, year, quarter)

    def get_operation_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        """Fetches quarterly operation capability data using Baostock."""
        return _fetch_financial_data(bs.query_operation_data, "Operation Capability", code, year, quarter)

    def get_growth_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        """Fetches quarterly growth capability data using Baostock."""
        return _fetch_financial_data(bs.query_growth_data, "Growth Capability", code, year, quarter)

    def get_balance_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        """Fetches quarterly balance sheet data (solvency) using Baostock."""
        return _fetch_financial_data(bs.query_balance_data, "Balance Sheet", code, year, quarter)

    def get_cash_flow_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        """Fetches quarterly cash flow data using Baostock."""
        return _fetch_financial_data(bs.query_cash_flow_data, "Cash Flow", code, year, quarter)

    def get_dupont_data(self, code: str, year: str, quarter: int) -> pd.DataFrame:
        """Fetches quarterly DuPont analysis data using Baostock."""
        return _fetch_financial_data(bs.query_dupont_data, "DuPont Analysis", code, year, quarter)

    def get_performance_express_report(self, code: str, start_date: str, end_date: str) -> pd.DataFrame:
        """Fetches performance express reports (业绩快报) using Baostock."""
        logger.info(
            f"Fetching Performance Express Report for {code} ({start_date} to {end_date})")
        try:
            with baostock_login_context():
                rs = bs.query_performance_express_report(
                    code=code, start_date=start_date, end_date=end_date)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Perf Express) for {code}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                        raise NoDataFoundError(
                            f"No performance express report found for {code} in range {start_date}-{end_date}. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching performance express report: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No performance express report found for {code} in range {start_date}-{end_date} (empty result set).")
                    raise NoDataFoundError(
                        f"No performance express report found for {code} in range {start_date}-{end_date} (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved {len(result_df)} performance express report records for {code}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching performance express report for {code}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching performance express report for {code}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching performance express report for {code}: {e}")

    def get_forecast_report(self, code: str, start_date: str, end_date: str) -> pd.DataFrame:
        """Fetches performance forecast reports (业绩预告) using Baostock."""
        logger.info(
            f"Fetching Performance Forecast Report for {code} ({start_date} to {end_date})")
        try:
            with baostock_login_context():
                rs = bs.query_forecast_report(
                    code=code, start_date=start_date, end_date=end_date)
                # Note: Baostock docs mention pagination for this, but the Python API doesn't seem to expose it directly.
                # We fetch all available pages in the loop below.

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Forecast) for {code}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                        raise NoDataFoundError(
                            f"No performance forecast report found for {code} in range {start_date}-{end_date}. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching performance forecast report: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():  # Loop should handle pagination implicitly if rs manages it
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No performance forecast report found for {code} in range {start_date}-{end_date} (empty result set).")
                    raise NoDataFoundError(
                        f"No performance forecast report found for {code} in range {start_date}-{end_date} (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved {len(result_df)} performance forecast report records for {code}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching performance forecast report for {code}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching performance forecast report for {code}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching performance forecast report for {code}: {e}")

    def get_stock_industry(self, code: Optional[str] = None, date: Optional[str] = None) -> pd.DataFrame:
        """Fetches industry classification using Baostock."""
        log_msg = f"Fetching industry data for code={code or 'all'}, date={date or 'latest'}"
        logger.info(log_msg)
        try:
            with baostock_login_context():
                rs = bs.query_stock_industry(code=code, date=date)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Industry) for {code}, {date}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':
                        raise NoDataFoundError(
                            f"No industry data found for {code}, {date}. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching industry data: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No industry data found for {code}, {date} (empty result set).")
                    raise NoDataFoundError(
                        f"No industry data found for {code}, {date} (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved {len(result_df)} industry records for {code or 'all'}, {date or 'latest'}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching industry data for {code}, {date}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching industry data for {code}, {date}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching industry data for {code}, {date}: {e}")

    def get_sz50_stocks(self, date: Optional[str] = None) -> pd.DataFrame:
        """Fetches SZSE 50 index constituents using Baostock."""
        return _fetch_index_constituent_data(bs.query_sz50_stocks, "SZSE 50", date)

    def get_hs300_stocks(self, date: Optional[str] = None) -> pd.DataFrame:
        """Fetches CSI 300 index constituents using Baostock."""
        return _fetch_index_constituent_data(bs.query_hs300_stocks, "CSI 300", date)

    def get_zz500_stocks(self, date: Optional[str] = None) -> pd.DataFrame:
        """Fetches CSI 500 index constituents using Baostock."""
        return _fetch_index_constituent_data(bs.query_zz500_stocks, "CSI 500", date)

    def get_trade_dates(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches trading dates using Baostock."""
        logger.info(
            f"Fetching trade dates from {start_date or 'default'} to {end_date or 'default'}")
        try:
            with baostock_login_context():  # Login might not be strictly needed for this, but keeping consistent
                rs = bs.query_trade_dates(
                    start_date=start_date, end_date=end_date)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (Trade Dates): {rs.error_msg} (code: {rs.error_code})")
                    # Unlikely to have 'no record found' for dates, but handle API errors
                    raise DataSourceError(
                        f"Baostock API error fetching trade dates: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    # This case should ideally not happen if the API returns a valid range
                    logger.warning(
                        f"No trade dates returned for range {start_date}-{end_date} (empty result set).")
                    raise NoDataFoundError(
                        f"No trade dates found for range {start_date}-{end_date} (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(f"Retrieved {len(result_df)} trade date records.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching trade dates: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(f"Unexpected error fetching trade dates: {e}")
            raise DataSourceError(
                f"Unexpected error fetching trade dates: {e}")

    def get_all_stock(self, date: Optional[str] = None) -> pd.DataFrame:
        """Fetches all stock list for a given date using Baostock."""
        logger.info(f"Fetching all stock list for date={date or 'default'}")
        try:
            with baostock_login_context():
                rs = bs.query_all_stock(day=date)

                if rs.error_code != '0':
                    logger.error(
                        f"Baostock API error (All Stock) for date {date}: {rs.error_msg} (code: {rs.error_code})")
                    if "no record found" in rs.error_msg.lower() or rs.error_code == '10002':  # Check if this applies
                        raise NoDataFoundError(
                            f"No stock data found for date {date}. Baostock msg: {rs.error_msg}")
                    else:
                        raise DataSourceError(
                            f"Baostock API error fetching all stock list: {rs.error_msg} (code: {rs.error_code})")

                data_list = []
                while rs.next():
                    data_list.append(rs.get_row_data())

                if not data_list:
                    logger.warning(
                        f"No stock list returned for date {date} (empty result set).")
                    raise NoDataFoundError(
                        f"No stock list found for date {date} (empty result set).")

                result_df = pd.DataFrame(data_list, columns=rs.fields)
                logger.info(
                    f"Retrieved {len(result_df)} stock records for date {date or 'default'}.")
                return result_df

        except (LoginError, NoDataFoundError, DataSourceError, ValueError) as e:
            logger.warning(
                f"Caught known error fetching all stock list for date {date}: {type(e).__name__}")
            raise e
        except Exception as e:
            logger.exception(
                f"Unexpected error fetching all stock list for date {date}: {e}")
            raise DataSourceError(
                f"Unexpected error fetching all stock list for date {date}: {e}")

    def get_deposit_rate_data(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches benchmark deposit rates using Baostock."""
        return _fetch_macro_data(bs.query_deposit_rate_data, "Deposit Rate", start_date, end_date)

    def get_loan_rate_data(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches benchmark loan rates using Baostock."""
        return _fetch_macro_data(bs.query_loan_rate_data, "Loan Rate", start_date, end_date)

    def get_required_reserve_ratio_data(self, start_date: Optional[str] = None, end_date: Optional[str] = None, year_type: str = '0') -> pd.DataFrame:
        """Fetches required reserve ratio data using Baostock."""
        # Note the extra yearType parameter handled by kwargs
        return _fetch_macro_data(bs.query_required_reserve_ratio_data, "Required Reserve Ratio", start_date, end_date, yearType=year_type)

    def get_money_supply_data_month(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches monthly money supply data (M0, M1, M2) using Baostock."""
        # Baostock expects YYYY-MM format for dates here
        return _fetch_macro_data(bs.query_money_supply_data_month, "Monthly Money Supply", start_date, end_date)

    def get_money_supply_data_year(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> pd.DataFrame:
        """Fetches yearly money supply data (M0, M1, M2 - year end balance) using Baostock."""
        # Baostock expects YYYY format for dates here
        return _fetch_macro_data(bs.query_money_supply_data_year, "Yearly Money Supply", start_date, end_date)

    # Note: SHIBOR is not available in current Baostock API bindings used; not implemented.

```