#
tokens: 10644/50000 17/17 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── docker-compose.yml
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README-zh.md
├── README.md
├── requirements.txt
└── src
    └── mysql_mcp_server_pro
        ├── __init__.py
        ├── cli.py
        ├── config
        │   ├── __init__.py
        │   ├── .env
        │   ├── dbconfig.py
        │   └── event_store.py
        ├── exception
        │   ├── __init__.py
        │   └── exceptions.py
        ├── handles
        │   ├── __init__.py
        │   ├── base.py
        │   ├── execute_sql.py
        │   ├── get_chinese_initials.py
        │   ├── get_db_health_index_usage.py
        │   ├── get_db_health_running.py
        │   ├── get_table_desc.py
        │   ├── get_table_index.py
        │   ├── get_table_lock.py
        │   ├── get_table_name.py
        │   ├── optimize_sql.py
        │   └── use_prompt_queryTableData.py
        ├── oauth
        │   ├── __init__.py
        │   ├── config.py
        │   ├── middleware.py
        │   ├── routes.py
        │   └── token_handler.py
        ├── prompts
        │   ├── __init__.py
        │   ├── AnalysisMySqlIssues.py
        │   ├── BasePrompt.py
        │   └── QueryTableData.py
        ├── server.py
        ├── templates
        │   └── login.html
        └── utils
            ├── __init__.py
            ├── database_pool.py
            └── execute_sql_util.py
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
.env

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
[![简体中文](https://img.shields.io/badge/简体中文-点击查看-orange)](README-zh.md)
[![English](https://img.shields.io/badge/English-Click-yellow)](README.md)
[![MseeP.ai Security Assessment Badge](https://mseep.net/mseep-audited.png)](https://mseep.ai/app/wenb1n-dev-mysql-mcp-server-pro)
[![MCPHub](https://img.shields.io/badge/mcphub-audited-blue)](https://mcphub.com/mcp-servers/wenb1n-dev/mysql_mcp_server_pro)


# mcp_mysql_server_pro

## Introduction
mcp_mysql_server_pro is not just about MySQL CRUD operations, but also includes database anomaly analysis capabilities and makes it easy for developers to extend with custom tools.

- Supports all Model Context Protocol (MCP) transfer modes (STDIO, SSE, Streamable Http)
- Supports OAuth2.0
- Supports multiple SQL execution, separated by ";"
- Supports querying database table names and fields based on table comments
- Supports SQL execution plan analysis
- Supports Chinese field to pinyin conversion
- Supports table lock analysis
- Supports database health status analysis
- Supports permission control with three roles: readonly, writer, and admin
    ```
    "readonly": ["SELECT", "SHOW", "DESCRIBE", "EXPLAIN"],  # Read-only permissions
    "writer": ["SELECT", "SHOW", "DESCRIBE", "EXPLAIN", "INSERT", "UPDATE", "DELETE"],  # Read-write permissions
    "admin": ["SELECT", "SHOW", "DESCRIBE", "EXPLAIN", "INSERT", "UPDATE", "DELETE", 
             "CREATE", "ALTER", "DROP", "TRUNCATE"]  # Administrator permissions
    ```
- Supports prompt template invocation


## Tool List
| Tool Name                  | Description                                                                                                                                                                                                              |
|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 
| execute_sql                | SQL execution tool that can execute ["SELECT", "SHOW", "DESCRIBE", "EXPLAIN", "INSERT", "UPDATE", "DELETE", "CREATE", "ALTER", "DROP", "TRUNCATE"] commands based on permission configuration                            |
| get_chinese_initials       | Convert Chinese field names to pinyin initials                                                                                                                                                                           |
| get_db_health_running      | Analyze MySQL health status (connection status, transaction status, running status, lock status detection)                                                                                                               |
| get_table_desc             | Search for table structures in the database based on table names, supporting multi-table queries                                                                                                                         |
| get_table_index            | Search for table indexes in the database based on table names, supporting multi-table queries                                                                                                                            |
| get_table_lock             | Check if there are row-level locks or table-level locks in the current MySQL server                                                                                                                                      |
| get_table_name             | Search for table names in the database based on table comments and descriptions                                                                                                                                          |
| get_db_health_index_usage  | Get the index usage of the currently connected mysql database, including redundant index situations, poorly performing index situations, and the top 5 unused index situations with query times greater than 30 seconds  | 
| optimize_sql               | Professional SQL performance optimization tool, providing expert optimization suggestions based on MySQL execution plans, table structure information, table data volume, and table indexes.                            |
| use_prompt_queryTableData | Use built-in prompts to let the model construct a chain call of tools in mcp (not a commonly used fixed tool, you need to modify the code to enable it, see this class for details) |

## Prompt List
| Prompt Name                | Description                                                                                                                           |
|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------| 
| analyzing-mysql-prompt    | This is a prompt for analyzing MySQL-related issues                                                                                    |
| query-table-data-prompt   | This is a prompt for querying table data using tools. If description is empty, it will be initialized as a MySQL database query assistant |

## Usage Instructions

### Installation and Configuration
1. Install Package
```bash
pip install mysql_mcp_server_pro
```

2. Configure Environment Variables
Create a `.env` file with the following content:
```bash
# MySQL Database Configuration
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=your_username
MYSQL_PASSWORD=your_password
MYSQL_DATABASE=your_database
# Optional, default is 'readonly'. Available values: readonly, writer, admin
MYSQL_ROLE=readonly
```

3. Run Service
```bash
# SSE mode
mysql_mcp_server_pro --mode sse --envfile /path/to/.env

## Streamable Http mode (default)
mysql_mcp_server_pro --envfile /path/to/.env

# Streamable Http  oauth Authentication
mysql_mcp_server_pro --oauth true

```

4. mcp client

go to see see "Use uv to start the service"
^_^


Note:
- The `.env` file should be placed in the directory where you run the command or use --envfile parameter to specify the path
- You can also set these variables directly in your environment
- Make sure the database configuration is correct and can connect

### Run with uvx, Client Configuration
- This method can be used directly in MCP-supported clients, no need to download the source code. For example, Tongyi Qianwen plugin, trae editor, etc.
```json
{
	"mcpServers": {
		"mysql": {
			"command": "uvx",
			"args": [
				"--from",
				"mysql_mcp_server_pro",
				"mysql_mcp_server_pro",
				"--mode",
				"stdio"
			],
			"env": {
				"MYSQL_HOST": "192.168.x.xxx",
				"MYSQL_PORT": "3306",
				"MYSQL_USER": "root",
				"MYSQL_PASSWORD": "root",
				"MYSQL_DATABASE": "a_llm",
				"MYSQL_ROLE": "admin"
			}
		}
	}
}
```

### Local Development with Streamable Http mode

- Use uv to start the service

Add the following content to your mcp client tools, such as cursor, cline, etc.

mcp json as follows:
```
{
  "mcpServers": {
    "mysql_mcp_server_pro": {
      "name": "mysql_mcp_server_pro",
      "type": "streamableHttp",
      "description": "",
      "isActive": true,
      "url": "http://localhost:3000/mcp/"
    }
  }
}
```

Modify the .env file content to update the database connection information with your database details:
```
# MySQL Database Configuration
MYSQL_HOST=192.168.xxx.xxx
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=root
MYSQL_DATABASE=a_llm
MYSQL_ROLE=admin
```

Start commands:
```
# Download dependencies
uv sync

# Start
uv run -m mysql_mcp_server_pro.server

# Custom env file location
uv run -m mysql_mcp_server_pro.server --envfile /path/to/.env

# oauth Authentication
uv run -m mysql_mcp_server_pro.server --oauth true
```

### Local Development with SSE Mode

- Use uv to start the service

Add the following content to your mcp client tools, such as cursor, cline, etc.

mcp json as follows:
```
{
  "mcpServers": {
    "mysql_mcp_server_pro": {
      "name": "mysql_mcp_server_pro",
      "description": "",
      "isActive": true,
      "url": "http://localhost:9000/sse"
    }
  }
}
```

Modify the .env file content to update the database connection information with your database details:
```
# MySQL Database Configuration
MYSQL_HOST=192.168.xxx.xxx
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=root
MYSQL_DATABASE=a_llm
MYSQL_ROLE=admin
```

Start commands:
```
# Download dependencies
uv sync

# Start
uv run -m mysql_mcp_server_pro.server --mode sse

# Custom env file location
uv run -m mysql_mcp_server_pro.server --mode sse --envfile /path/to/.env
```

### Local Development with STDIO Mode

Add the following content to your mcp client tools, such as cursor, cline, etc.

mcp json as follows:
```
{
  "mcpServers": {
      "operateMysql": {
        "isActive": true,
        "name": "operateMysql",
        "command": "uv",
        "args": [
          "--directory",
          "/Volumes/mysql_mcp_server_pro/src/mysql_mcp_server_pro",    # Replace this with your project path
          "run",
          "-m",
          "mysql_mcp_server_pro.server",
          "--mode",
          "stdio"
        ],
        "env": {
          "MYSQL_HOST": "localhost",
          "MYSQL_PORT": "3306",
          "MYSQL_USER": "root", 
          "MYSQL_PASSWORD": "123456",
          "MYSQL_DATABASE": "a_llm",
          "MYSQL_ROLE": "admin"
       }
    }
  }
} 
```

## Custom Tool Extensions
1. Add a new tool class in the handles package, inherit from BaseHandler, and implement get_tool_description and run_tool methods

2. Import the new tool in __init__.py to make it available in the server

## OAuth2.0 Authentication
1. Start the authentication service. By default, it uses the built-in OAuth 2.0 password mode authentication. You can modify your own authentication service address in the env file.
```aiignore
uv run -m mysql_mcp_server_pro.server --oauth true
```

2. Visit the authentication service at http://localhost:3000/login. Default username and password are configured in the env file.
   ![image](https://github.com/user-attachments/assets/ec8a629e-62f9-4b93-b3cc-442b3d2dc46f)


3. Copy the token and add it to the request headers, for example:
   ![image](https://github.com/user-attachments/assets/a5451e35-bddd-4e49-8aa9-a4178d30ec88)

```json
{
  "mcpServers": {
    "mysql_mcp_server_pro": {
      "name": "mysql_mcp_server_pro",
      "type": "streamableHttp",
      "description": "",
      "isActive": true,
      "url": "http://localhost:3000/mcp/",
      "headers": {
        "authorization": "bearer TOKEN_VALUE"
      }
    }
  }
}
```

## Examples
1. Create a new table and insert data, prompt format as follows:
```
# Task
   Create an organizational structure table with the following structure: department name, department number, parent department, is valid.
# Requirements
 - Table name: department
 - Common fields need indexes
 - Each field needs comments, table needs comment
 - Generate 5 real data records after creation
```
![image](https://github.com/user-attachments/assets/34118993-2a4c-4804-92f8-7fba9df89190)
![image](https://github.com/user-attachments/assets/f8299f01-c321-4dbf-b5de-13ba06885cc1)


2. Query data based on table comments, prompt as follows:
```
Search for data with Department name 'Executive Office' in Department organizational structure table
```
![image](https://github.com/user-attachments/assets/dcf96603-548e-42d9-9217-78e569be7a8d)


3. Analyze slow SQL, prompt as follows:
```
select * from t_jcsjzx_hjkq_cd_xsz_sk xsz
left join t_jcsjzx_hjkq_jcd jcd on jcd.cddm = xsz.cddm 
Based on current index situation, review execution plan and provide optimization suggestions in markdown format, including table index status, execution details, and optimization recommendations
```

4. Analyze SQL deadlock issues, prompt as follows:
```
update t_admin_rms_zzjg set sfyx = '0' where xh = '1' is stuck, please analyze the cause
```
![image](https://github.com/user-attachments/assets/25bca1cd-854c-4591-ac6e-32d464b12066)


5. Analyze the health status prompt as follows
```
Check the current health status of MySQL
```
![image](https://github.com/user-attachments/assets/1f221ab8-59bf-402c-a15a-ec3eba1eea59)

```

--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/exception/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/cli.py:
--------------------------------------------------------------------------------

```python
from mysql_mcp_server_pro.server import main

def stdio_entry():
    main()

```

--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/config/__init__.py:
--------------------------------------------------------------------------------

```python
from .dbconfig import get_db_config,get_role_permissions

__all__ = [
    "get_db_config",
    "get_role_permissions",
]
```

--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/prompts/__init__.py:
--------------------------------------------------------------------------------

```python
from .AnalysisMySqlIssues import AnalysisMySqlIssues
from .QueryTableData import QueryTableData

__all__ = [
    "AnalysisMySqlIssues",
    "QueryTableData",
]
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
mcp>=1.8.0
mysql-connector-python>=9.2.0
pymysql>=1.1.1
pypinyin>=0.54.0
python-dotenv>=1.1.0
starlette>=0.46.1
uvicorn>=0.34.0
PyJWT>=2.8.0
sqlalchemy>=2.0.0
```

--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/oauth/__init__.py:
--------------------------------------------------------------------------------

```python
from .config import oauth_config
from .token_handler import TokenHandler
from .middleware import OAuthMiddleware
from .routes import login, login_page

__all__ = [
    "oauth_config",
    "TokenHandler",
    "OAuthMiddleware",
    "login",
    "login_page"
] 
```

--------------------------------------------------------------------------------
/docker-compose.yml:
--------------------------------------------------------------------------------

```yaml
version: '3.8'

services:
  mysql-mcp-server:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: mysql-mcp-server
    ports:
      - "9000:9000"  # 如果你的应用监听 9000 端口,请根据实际情况调整
    environment:
      MYSQL_HOST: ${MYSQL_HOST:-localhost}
      MYSQL_PORT: ${MYSQL_PORT:-3306}
      MYSQL_USER: ${MYSQL_USER:-root}
      MYSQL_PASSWORD: ${MYSQL_PASSWORD:-123456}
      MYSQL_DATABASE: ${MYSQL_DATABASE:-a_llm}
      MYSQL_ROLE: ${MYSQL_ROLE:-admin}
      PYTHONPATH: /app/src
    restart: unless-stopped
    command: ["python", "-m", "mysql_mcp_server_pro.server"]
```

--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/handles/__init__.py:
--------------------------------------------------------------------------------

```python
from .execute_sql import ExecuteSQL
from .get_chinese_initials import GetChineseInitials
from .get_table_desc import GetTableDesc
from .get_table_index import GetTableIndex
from .get_table_lock import GetTableLock
from .get_table_name import GetTableName
from .get_db_health_running import GetDBHealthRunning
from .get_db_health_index_usage import GetDBHealthIndexUsage
from .use_prompt_queryTableData import UsePromptQueryTableData
from .optimize_sql import OptimizeSql

__all__ = [
    "ExecuteSQL",
    "GetChineseInitials",
    "GetTableDesc",
    "GetTableIndex",
    "GetTableLock",
    "GetTableName",
    "GetDBHealthRunning",
    "GetDBHealthIndexUsage",
    "UsePromptQueryTableData",
    "OptimizeSql"
]
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
# Use the official Python image from the Docker Hub
FROM python:3.11-slim

# Set the working directory in the container
WORKDIR /app

# Copy the requirements file into the container
COPY requirements.txt .

# Install the dependencies specified in the requirements file
RUN pip install --no-cache-dir -r requirements.txt

# Copy the current directory contents into the container at /app
COPY src/ /app/src

# Set environment variables for MySQL (these can be overwritten with `docker run -e`)
ENV MYSQL_HOST=localhost
ENV MYSQL_PORT=3306
ENV MYSQL_USER=root
ENV MYSQL_PASSWORD=123456
ENV MYSQL_DATABASE=a_llm
ENV MYSQL_ROLE=admin
ENV PYTHONPATH=/app/src

# Command to run the server
CMD ["python", "-m", "mysql_mcp_server_pro.server"]
```

--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/handles/get_db_health_running.py:
--------------------------------------------------------------------------------

```python
from typing import Dict, Any, Sequence

from mcp import Tool
from mcp.types import TextContent

from .base import BaseHandler

from mysql_mcp_server_pro.handles import (
    ExecuteSQL
)

execute_sql = ExecuteSQL()

class GetDBHealthRunning(BaseHandler):
    name = "get_db_health_running"
    description = (
        "获取当前mysql的健康状态(Analyze MySQL health status )"
    )

    def get_tool_description(self) -> Tool:
        return Tool(
            name=self.name,
            description=self.description,
            inputSchema={
                "type": "object",
                "properties": {

                }
            }
        )

    async def run_tool(self, arguments: Dict[str, Any]) -> Sequence[TextContent]:
        lock_result = await self.get_lock(arguments)
        processlist_result = await self.get_processlist(arguments)
        status_result = await self.get_status(arguments)
        trx_result = await self.get_trx(arguments)


        # 合并结果
        combined_result = []
        combined_result.extend(processlist_result)
        combined_result.extend(lock_result)
        combined_result.extend(trx_result)
        combined_result.extend(status_result)

        return combined_result

    """
        获取连接情况
    """
    async def get_processlist(self, arguments: Dict[str, Any]) -> Sequence[TextContent]:
        try:
            sql = "SHOW FULL PROCESSLIST;SHOW VARIABLES LIKE 'max_connections';"

            return await execute_sql.run_tool({"query": sql})
        except Exception as e:
            return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]

    """
        获取运行情况
    """
    async def get_status(self, arguments: Dict[str, Any]) -> Sequence[TextContent]:
        try:
            sql = "SHOW ENGINE INNODB STATUS;"

            return await execute_sql.run_tool({"query": sql})
        except Exception as e:
            return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]

    """
        获取事务情况
    """
    async def get_trx(self, arguments: Dict[str, Any]) -> Sequence[TextContent]:
        try:
            sql = "SELECT * FROM INFORMATION_SCHEMA.INNODB_TRX;"
            return await execute_sql.run_tool({"query": sql})
        except Exception as e:
            return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]


    """
        获取锁情况
    """
    async def get_lock(self, arguments: Dict[str, Any]) -> Sequence[TextContent]:
        try:
            sql = "SHOW OPEN TABLES WHERE In_use > 0;select * from information_schema.innodb_locks;select * from information_schema.innodb_lock_waits;"
            sql += "select * from performance_schema.data_lock_waits;"
            sql += "select * from performance_schema.data_locks;"


            return await execute_sql.run_tool({"query": sql})
        except Exception as e:
            return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]
```

--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/handles/get_db_health_index_usage.py:
--------------------------------------------------------------------------------

```python
from typing import Dict, Any, Sequence

from mcp import Tool
from mcp.types import TextContent

from .base import BaseHandler
from mysql_mcp_server_pro.config import get_db_config
from mysql_mcp_server_pro.handles import (
    ExecuteSQL
)

execute_sql = ExecuteSQL()

class GetDBHealthIndexUsage(BaseHandler):
    name = "get_db_health_index_usage"
    description = (
        "获取当前连接的mysql库的索引使用情况,包含冗余索引情况、性能较差的索引情况、未使用索引且查询时间大于30秒top5情况"
        + "(Get the index usage of the currently connected mysql database, including redundant index situations, "
        +  "poorly performing index situations, and the top 5 unused index situations with query times greater than 30 seconds)"
    )

    def get_tool_description(self) -> Tool:
        return Tool(
            name=self.name,
            description=self.description,
            inputSchema={
                "type": "object",
                "properties": {

                }
            }
        )

    async def run_tool(self, arguments: Dict[str, Any]) -> Sequence[TextContent]:
        config = get_db_config()

        count_zero_result = await self.get_count_zero(arguments,config)
        max_time_result = await self.get_max_timer(arguments,config)
        not_used_index_result = await self.get_not_used_index(arguments,config)


        # 合并结果
        combined_result = []
        combined_result.extend(count_zero_result)
        combined_result.extend(max_time_result)
        combined_result.extend(not_used_index_result)


        return combined_result

    """
        获取冗余索引情况
    """
    async def get_count_zero(self, arguments: Dict[str, Any], config) -> Sequence[TextContent]:
        try:
            sql = "SELECT object_name,index_name,count_star from performance_schema.table_io_waits_summary_by_index_usage "
            sql += f"WHERE object_schema = '{config['database']}' and count_star = 0 AND sum_timer_wait = 0 ;"

            return await execute_sql.run_tool({"query": sql})
        except Exception as e:
            return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]


    """
        获取性能较差的索引情况
    """
    async def get_max_timer(self, arguments: Dict[str, Any], config) -> Sequence[TextContent]:
        try:
            sql = "SELECT object_schema,object_name,index_name,(max_timer_wait / 1000000000000) max_timer_wait "
            sql += f"FROM performance_schema.table_io_waits_summary_by_index_usage where object_schema = '{config['database']}' "
            sql += "and index_name is not null ORDER BY  max_timer_wait DESC;"

            return await execute_sql.run_tool({"query": sql})
        except Exception as e:
            return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]

    """
        获取未使用索引查询时间大于30秒的top5情况
    """
    async def get_not_used_index(self, arguments: Dict[str, Any], config) -> Sequence[TextContent]:
        try:
            sql = "SELECT object_schema,object_name, (max_timer_wait / 1000000000000) max_timer_wait "
            sql += f"FROM performance_schema.table_io_waits_summary_by_index_usage where object_schema = '{config['database']}' "
            sql += "and index_name IS null and max_timer_wait > 30000000000000 ORDER BY max_timer_wait DESC limit 5;"

            return await execute_sql.run_tool({"query": sql})
        except Exception as e:
            return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]
```

--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/config/event_store.py:
--------------------------------------------------------------------------------

```python
"""
In-memory event store for demonstrating resumability functionality.

This is a simple implementation intended for examples and testing,
not for production use where a persistent storage solution would be more appropriate.
"""

import logging
from collections import deque
from dataclasses import dataclass
from uuid import uuid4

from mcp.server.streamable_http import (
    EventCallback,
    EventId,
    EventMessage,
    EventStore,
    StreamId,
)
from mcp.types import JSONRPCMessage

logger = logging.getLogger(__name__)


@dataclass
class EventEntry:
    """
    Represents an event entry in the event store.
    """

    event_id: EventId
    stream_id: StreamId
    message: JSONRPCMessage


class InMemoryEventStore(EventStore):
    """
    Simple in-memory implementation of the EventStore interface for resumability.
    This is primarily intended for examples and testing, not for production use
    where a persistent storage solution would be more appropriate.

    This implementation keeps only the last N events per stream for memory efficiency.
    """

    def __init__(self, max_events_per_stream: int = 100):
        """Initialize the event store.

        Args:
            max_events_per_stream: Maximum number of events to keep per stream
        """
        self.max_events_per_stream = max_events_per_stream
        # for maintaining last N events per stream
        self.streams: dict[StreamId, deque[EventEntry]] = {}
        # event_id -> EventEntry for quick lookup
        self.event_index: dict[EventId, EventEntry] = {}

    async def store_event(
        self, stream_id: StreamId, message: JSONRPCMessage
    ) -> EventId:
        """Stores an event with a generated event ID."""
        event_id = str(uuid4())
        event_entry = EventEntry(
            event_id=event_id, stream_id=stream_id, message=message
        )

        # Get or create deque for this stream
        if stream_id not in self.streams:
            self.streams[stream_id] = deque(maxlen=self.max_events_per_stream)

        # If deque is full, the oldest event will be automatically removed
        # We need to remove it from the event_index as well
        if len(self.streams[stream_id]) == self.max_events_per_stream:
            oldest_event = self.streams[stream_id][0]
            self.event_index.pop(oldest_event.event_id, None)

        # Add new event
        self.streams[stream_id].append(event_entry)
        self.event_index[event_id] = event_entry

        return event_id

    async def replay_events_after(
        self,
        last_event_id: EventId,
        send_callback: EventCallback,
    ) -> StreamId | None:
        """Replays events that occurred after the specified event ID."""
        if last_event_id not in self.event_index:
            logger.warning(f"Event ID {last_event_id} not found in store")
            return None

        # Get the stream and find events after the last one
        last_event = self.event_index[last_event_id]
        stream_id = last_event.stream_id
        stream_events = self.streams.get(last_event.stream_id, deque())

        # Events in deque are already in chronological order
        found_last = False
        for event in stream_events:
            if found_last:
                await send_callback(EventMessage(event.message, event.event_id))
            elif event.event_id == last_event_id:
                found_last = True

        return stream_id

```

--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/oauth/routes.py:
--------------------------------------------------------------------------------

```python
import os
import hashlib
import time

from starlette.responses import JSONResponse, HTMLResponse
from starlette.requests import Request
from pathlib import Path

from .token_handler import TokenHandler
from .config import oauth_config


async def login_page(request: Request) -> HTMLResponse:
    """返回登录页面"""
    try:
        templates_dir = Path(__file__).parent.parent / "templates"
        login_html = templates_dir / "login.html"
        
        with open(login_html, "r", encoding="utf-8") as f:
            content = f.read()
        
        return HTMLResponse(content)
    except Exception as e:
        return HTMLResponse(
            content=f"<h1>错误</h1><p>加载登录页面失败: {str(e)}</p>",
            status_code=500
        )

async def login(request: Request) -> JSONResponse:
    """
    OAuth 2.0密码模式端点
    
    请求格式:
    POST /mcp/auth/login
    Content-Type: application/json
    {
        "grant_type": "password",
        "username": "用户名",
        "password": "密码",
        "client_id": "客户端ID",
        "client_secret": "客户端密钥"
    }
    """
    # 检查 Accept 头部
    accept_header = request.headers.get("accept", "*/*")
    if accept_header != "*/*" and "application/json" not in accept_header:
        return JSONResponse(
            {"error": "not_acceptable", "error_description": "Client must accept application/json response"},
            status_code=406
        )

    try:
        data = await request.json()
        
        # 验证授权类型
        grant_type = data.get("grant_type")
        if grant_type not in oauth_config.GRANT_TYPES:
            return JSONResponse(
                {"error": "unsupported_grant_type"},
                status_code=400
            )
            
        # 验证客户端凭据
        client_id = data.get("client_id")
        client_secret = data.get("client_secret")
        
        if not client_id or not client_secret:
            return JSONResponse(
                {"error": "invalid_client"},
                status_code=401
            )
            
        if client_id != oauth_config.CLIENT_ID or client_secret != oauth_config.CLIENT_SECRET:
            return JSONResponse(
                {"error": "invalid_client"},
                status_code=401
            )
        
        if grant_type == "password":
            username = data.get("username")
            #password = data.get("password")

            encrypted_password = data.get("password")  # 从前端接收的加密密码

            if not username or not encrypted_password:
                return JSONResponse(
                    {"error": "invalid_request", "error_description": "Missing username or password"},
                    status_code=400
                )

            # 获取时间戳和盐值
            timestamp = request.headers.get("X-Timestamp")
            salt = request.headers.get("X-Salt")

            if not timestamp or not salt:
                return JSONResponse(
                    {"error": "invalid_request", "error_description": "Missing security parameters"},
                    status_code=400
                )

            # 验证时间戳是否在有效期内(5分钟)
            try:
                ts = int(timestamp) / 1000  # 转换为秒
                current_time = time.time()
                if current_time - ts > 300:  # 5分钟过期
                    return JSONResponse(
                        {"error": "invalid_request", "error_description": "Request expired"},
                        status_code=400
                    )
            except (ValueError, TypeError):
                return JSONResponse(
                    {"error": "invalid_request", "error_description": "Invalid timestamp"},
                    status_code=400
                )

            # 验证密码
            # 1. 使用与前端相同的加密方式验证
            # 第一次哈希:密码 + 盐
            first_hash = hashlib.sha256((os.getenv("OAUTH_USER_PASSWORD", "admin") + salt).encode()).hexdigest()
            # 第二次哈希:第一次哈希结果 + 时间戳
            expected_hash = hashlib.sha256((first_hash + timestamp).encode()).hexdigest()


            # 这里应该添加实际的用户验证逻辑
            if username == os.getenv("OAUTH_USER_NAME", "admin") and encrypted_password == expected_hash :
                # 生成令牌
                access_token, refresh_token, access_expires, refresh_expires = TokenHandler.create_tokens(
                    user_id="1",  # 这里应该是实际的用户ID
                    username=username
                )
                
                # 返回标准的OAuth2.0响应
                return JSONResponse(
                    TokenHandler.create_token_response(
                        access_token,
                        refresh_token,
                        access_expires,
                        refresh_expires
                    )
                )
            
            return JSONResponse(
                {"error": "invalid_grant"},
                status_code=401
            )
            
        elif grant_type == "refresh_token":
            refresh_token = data.get("refresh_token")
            if not refresh_token:
                return JSONResponse(
                    {"error": "invalid_request"},
                    status_code=400
                )
                
            # 验证刷新令牌
            payload = TokenHandler.verify_token(refresh_token)
            if not payload or payload.get("type") != "refresh_token":
                return JSONResponse(
                    {"error": "invalid_grant"},
                    status_code=401
                )
                
            # 生成新的令牌
            access_token, new_refresh_token, access_expires, refresh_expires = TokenHandler.create_tokens(
                user_id=payload["sub"],
                username=payload["username"]
            )
            
            return JSONResponse(
                TokenHandler.create_token_response(
                    access_token,
                    new_refresh_token,
                    access_expires,
                    refresh_expires
                )
            )
    
    except Exception as e:
        return JSONResponse(
            {"error": "server_error", "error_description": str(e)},
            status_code=500
        ) 
```

--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/templates/login.html:
--------------------------------------------------------------------------------

```html
<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>MySQL MCP Server Pro - 登录</title>
    <link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600&display=swap" rel="stylesheet">
    <!-- 添加 crypto-js 库 -->
    <script src="https://cdnjs.cloudflare.com/ajax/libs/crypto-js/4.1.1/crypto-js.min.js"></script>
    <style>
        :root {
            --primary-color: #2196F3;
            --primary-dark: #1976D2;
            --text-primary: #212121;
            --text-secondary: #757575;
            --background: #f5f5f5;
            --surface: #FFFFFF;
            --error: #F44336;
            --success: #4CAF50;
        }

        * {
            margin: 0;
            padding: 0;
            box-sizing: border-box;
        }

        body {
            font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif;
            background: var(--background);
            color: var(--text-primary);
            line-height: 1.6;
            min-height: 100vh;
            display: flex;
            justify-content: center;
            align-items: center;
        }

        /* 登录表单样式 */
        .login-container {
            background: white;
            padding: 2rem;
            border-radius: 8px;
            box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
            width: 100%;
            max-width: 400px;
        }

        .header {
            text-align: center;
            margin-bottom: 2rem;
        }

        .title {
            font-size: 1.5rem;
            color: var(--text-primary);
            margin-bottom: 0.5rem;
        }

        .author {
            color: var(--text-secondary);
            font-size: 0.9rem;
        }

        .form-group {
            margin-bottom: 1.5rem;
        }

        label {
            display: block;
            margin-bottom: 0.5rem;
            color: var(--text-primary);
            font-weight: 500;
        }

        input {
            width: 100%;
            padding: 0.75rem;
            border: 1px solid #ddd;
            border-radius: 6px;
            font-size: 0.95rem;
        }

        input:focus {
            outline: none;
            border-color: var(--primary-color);
        }

        .submit-button {
            width: 100%;
            padding: 0.75rem;
            background: var(--primary-color);
            color: white;
            border: none;
            border-radius: 6px;
            font-size: 1rem;
            font-weight: 500;
            cursor: pointer;
        }

        .submit-button:hover {
            background: var(--primary-dark);
        }

        .error-message {
            color: var(--error);
            margin-top: 1rem;
            text-align: center;
            display: none;
        }

        /* Token展示界面样式 */
        .token-container {
            display: none;
            position: fixed;
            top: 0;
            left: 0;
            right: 0;
            bottom: 0;
            background: var(--background);
            padding: 1.5rem;
        }

        .token-content {
            max-width: 900px;
            margin: 0 auto;
        }

        .token-header {
            margin-bottom: 1rem;
            text-align: center;
        }

        .token-header h1 {
            font-size: 1.25rem;
            margin-bottom: 0.25rem;
        }

        .token-header .subtitle {
            font-size: 0.9rem;
            color: var(--text-secondary);
        }

        .token-section {
            background: white;
            padding: 1.25rem;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.05);
            margin-bottom: 0.75rem;
        }

        .token-section h2 {
            font-size: 1rem;
            margin-bottom: 0.75rem;
            display: flex;
            align-items: center;
            gap: 0.5rem;
        }

        .token-value {
            font-family: 'Monaco', 'Consolas', monospace;
            word-break: break-all;
            background: var(--background);
            padding: 0.75rem;
            border-radius: 6px;
            border: 1px solid #ddd;
            font-size: 0.85rem;
            margin-bottom: 0.5rem;
            line-height: 1.3;
        }

        .token-info {
            margin: 0.5rem 0;
            color: var(--text-secondary);
            font-size: 0.85rem;
        }

        .token-info p {
            margin: 0.25rem 0;
            display: flex;
            align-items: center;
            gap: 0.5rem;
        }

        .copy-button {
            background: var(--success);
            color: white;
            border: none;
            padding: 0.5rem 1rem;
            border-radius: 4px;
            cursor: pointer;
            font-size: 0.85rem;
            display: inline-flex;
            align-items: center;
            gap: 0.5rem;
        }

        .copy-button:hover {
            opacity: 0.9;
        }

        .code-block {
            background: #1E1E1E;
            color: #E0E0E0;
            padding: 0.75rem;
            border-radius: 6px;
            overflow-x: auto;
            font-family: 'Monaco', 'Consolas', monospace;
            font-size: 0.8rem;
            line-height: 1.3;
            margin: 0.5rem 0;
        }

        .github-card {
            text-align: center;
            padding: 0.75rem;
            background: linear-gradient(135deg, #2196F3 0%, #1976D2 100%);
            color: white;
            border-radius: 6px;
            margin-top: 0.5rem;
            font-size: 0.85rem;
        }

        .github-card a {
            color: white;
            text-decoration: none;
            font-weight: 500;
            display: inline-flex;
            align-items: center;
            gap: 0.25rem;
        }

        .github-card a:hover {
            text-decoration: underline;
        }
    </style>
</head>
<body>
    <!-- 登录表单 -->
    <div class="login-container">
        <div class="header">
            <h1 class="title">MySQL MCP Server Pro</h1>
            <div class="author">by wenb1n</div>
        </div>
        <form id="loginForm">
            <div class="form-group">
                <label for="username">用户名</label>
                <input type="text" id="username" name="username" required>
            </div>
            <div class="form-group">
                <label for="password">密码</label>
                <input type="password" id="password" name="password" required>
            </div>
            <button type="submit" class="submit-button">登录</button>
        </form>
        <div id="errorMessage" class="error-message"></div>
    </div>

    <!-- Token展示界面 -->
    <div id="tokenContainer" class="token-container">
        <div class="token-content">
            <div class="token-header">
                <h1>MySQL MCP Server Pro</h1>
                <div class="subtitle">by wenb1n</div>
            </div>

            <div class="token-section">
                <h2>🔑 访问令牌 (Access Token)</h2>
                <div id="tokenValue" class="token-value"></div>
                <div class="token-info">
                    <p>⏰ 过期时间:<span id="expireTime"></span></p>
                </div>
                <button id="copyButton" class="copy-button">📋 复制Token</button>
            </div>

            <div class="token-section">
                <h2>🔄 刷新令牌 (Refresh Token)</h2>
                <div id="refreshTokenValue" class="token-value"></div>
                <div class="token-info">
                    <p>使用刷新令牌获取新的访问令牌:</p>
                </div>
                <div class="code-block">
                    <pre id="refreshTokenExample">curl -X POST http://localhost:3000/mcp/auth/login \
  -H "Content-Type: application/json" \
  -d '{
    "grant_type": "refresh_token",
    "refresh_token": "your-refresh-token",
    "client_id": "mysql-mcp-client",
    "client_secret": "mysql-mcp-secret"
  }'</pre>
                </div>
            </div>

            <div class="github-card">
                <a href="https://github.com/wenb1n-dev/mysql_mcp_server_pro" target="_blank">
                    ⭐️ 如果觉得好用,请帮忙点个 Star 支持一下!
                </a>
            </div>
        </div>
    </div>

    <script>
        // 生成随机盐值
        function generateSalt(length = 16) {
            const chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
            let salt = '';
            for (let i = 0; i < length; i++) {
                salt += chars.charAt(Math.floor(Math.random() * chars.length));
            }
            return salt;
        }

        // 加密密码
        function encryptPassword(password, salt, timestamp) {
            // 第一次哈希:密码 + 盐
            const firstHash = CryptoJS.SHA256(password + salt).toString();
            // 第二次哈希:第一次哈希结果 + 时间戳
            const finalHash = CryptoJS.SHA256(firstHash + timestamp).toString();
            return finalHash;
        }

        document.getElementById('loginForm').addEventListener('submit', async (e) => {
            e.preventDefault();
            const username = document.getElementById('username').value;
            const password = document.getElementById('password').value;
            
            try {
                // 生成安全参数
                const salt = generateSalt();
                const timestamp = Date.now().toString();
                const encryptedPassword = encryptPassword(password, salt, timestamp);
                
                const response = await fetch('/mcp/auth/login', {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json',
                        'Accept': 'application/json',
                        'X-Timestamp': timestamp,  // 添加时间戳头
                        'X-Salt': salt,           // 添加盐值头
                    },
                    body: JSON.stringify({
                        grant_type: 'password',
                        username: username,
                        password: encryptedPassword,  // 发送加密后的密码
                        client_id: 'mysql-mcp-client',
                        client_secret: 'mysql-mcp-secret'
                    })
                });

                const data = await response.json();
                
                if (response.ok) {
                    // 显示token信息
                    const tokenContainer = document.getElementById('tokenContainer');
                    const tokenValue = document.getElementById('tokenValue');
                    const expireTime = document.getElementById('expireTime');
                    const refreshTokenValue = document.getElementById('refreshTokenValue');
                    
                    tokenValue.textContent = data.access_token;
                    expireTime.innerHTML = `
                        <div style="display: flex; flex-direction: column; gap: 0.25rem;">
                            <div>🔑 访问令牌过期时间: ${data.expire_time}</div>
                            <div>🔄 刷新令牌过期时间: ${data.refresh_token_expire_time}</div>
                        </div>
                    `;
                    refreshTokenValue.textContent = data.refresh_token;
                    
                    // 更新示例代码中的刷新令牌
                    const example = document.getElementById('refreshTokenExample');
                    example.textContent = example.textContent.replace('your-refresh-token', data.refresh_token);
                    
                    tokenContainer.style.display = 'block';
                    document.querySelector('.login-container').style.display = 'none';
                } else {
                    const errorMessage = document.getElementById('errorMessage');
                    errorMessage.textContent = data.error_description || data.error || '登录失败';
                    errorMessage.style.display = 'block';
                }
            } catch (error) {
                const errorMessage = document.getElementById('errorMessage');
                errorMessage.textContent = '网络错误,请稍后重试';
                errorMessage.style.display = 'block';
            }
        });

        document.getElementById('copyButton').addEventListener('click', function() {
            const token = document.getElementById('tokenValue').textContent;
            if (token) {
                navigator.clipboard.writeText(token).then(() => {
                    this.textContent = '✓ 已复制';
                    setTimeout(() => {
                        this.innerHTML = '📋 复制Token';
                    }, 2000);
                }).catch(() => {
                    alert('复制失败,请手动复制');
                });
            }
        });
    </script>
</body>
</html> 
```