# Directory Structure
```
├── .gitignore
├── docker-compose.yml
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README-zh.md
├── README.md
├── requirements.txt
└── src
└── mysql_mcp_server_pro
├── __init__.py
├── cli.py
├── config
│ ├── __init__.py
│ ├── .env
│ ├── dbconfig.py
│ └── event_store.py
├── exception
│ ├── __init__.py
│ └── exceptions.py
├── handles
│ ├── __init__.py
│ ├── base.py
│ ├── execute_sql.py
│ ├── get_chinese_initials.py
│ ├── get_db_health_index_usage.py
│ ├── get_db_health_running.py
│ ├── get_table_desc.py
│ ├── get_table_index.py
│ ├── get_table_lock.py
│ ├── get_table_name.py
│ ├── optimize_sql.py
│ └── use_prompt_queryTableData.py
├── oauth
│ ├── __init__.py
│ ├── config.py
│ ├── middleware.py
│ ├── routes.py
│ └── token_handler.py
├── prompts
│ ├── __init__.py
│ ├── AnalysisMySqlIssues.py
│ ├── BasePrompt.py
│ └── QueryTableData.py
├── server.py
├── templates
│ └── login.html
└── utils
├── __init__.py
├── database_pool.py
└── execute_sql_util.py
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
.env
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
[](README-zh.md)
[](README.md)
[](https://mseep.ai/app/wenb1n-dev-mysql-mcp-server-pro)
[](https://mcphub.com/mcp-servers/wenb1n-dev/mysql_mcp_server_pro)
# mcp_mysql_server_pro
## Introduction
mcp_mysql_server_pro is not just about MySQL CRUD operations, but also includes database anomaly analysis capabilities and makes it easy for developers to extend with custom tools.
- Supports all Model Context Protocol (MCP) transfer modes (STDIO, SSE, Streamable Http)
- Supports OAuth2.0
- Supports multiple SQL execution, separated by ";"
- Supports querying database table names and fields based on table comments
- Supports SQL execution plan analysis
- Supports Chinese field to pinyin conversion
- Supports table lock analysis
- Supports database health status analysis
- Supports permission control with three roles: readonly, writer, and admin
```
"readonly": ["SELECT", "SHOW", "DESCRIBE", "EXPLAIN"], # Read-only permissions
"writer": ["SELECT", "SHOW", "DESCRIBE", "EXPLAIN", "INSERT", "UPDATE", "DELETE"], # Read-write permissions
"admin": ["SELECT", "SHOW", "DESCRIBE", "EXPLAIN", "INSERT", "UPDATE", "DELETE",
"CREATE", "ALTER", "DROP", "TRUNCATE"] # Administrator permissions
```
- Supports prompt template invocation
## Tool List
| Tool Name | Description |
|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| execute_sql | SQL execution tool that can execute ["SELECT", "SHOW", "DESCRIBE", "EXPLAIN", "INSERT", "UPDATE", "DELETE", "CREATE", "ALTER", "DROP", "TRUNCATE"] commands based on permission configuration |
| get_chinese_initials | Convert Chinese field names to pinyin initials |
| get_db_health_running | Analyze MySQL health status (connection status, transaction status, running status, lock status detection) |
| get_table_desc | Search for table structures in the database based on table names, supporting multi-table queries |
| get_table_index | Search for table indexes in the database based on table names, supporting multi-table queries |
| get_table_lock | Check if there are row-level locks or table-level locks in the current MySQL server |
| get_table_name | Search for table names in the database based on table comments and descriptions |
| get_db_health_index_usage | Get the index usage of the currently connected mysql database, including redundant index situations, poorly performing index situations, and the top 5 unused index situations with query times greater than 30 seconds |
| optimize_sql | Professional SQL performance optimization tool, providing expert optimization suggestions based on MySQL execution plans, table structure information, table data volume, and table indexes. |
| use_prompt_queryTableData | Use built-in prompts to let the model construct a chain call of tools in mcp (not a commonly used fixed tool, you need to modify the code to enable it, see this class for details) |
## Prompt List
| Prompt Name | Description |
|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| analyzing-mysql-prompt | This is a prompt for analyzing MySQL-related issues |
| query-table-data-prompt | This is a prompt for querying table data using tools. If description is empty, it will be initialized as a MySQL database query assistant |
## Usage Instructions
### Installation and Configuration
1. Install Package
```bash
pip install mysql_mcp_server_pro
```
2. Configure Environment Variables
Create a `.env` file with the following content:
```bash
# MySQL Database Configuration
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=your_username
MYSQL_PASSWORD=your_password
MYSQL_DATABASE=your_database
# Optional, default is 'readonly'. Available values: readonly, writer, admin
MYSQL_ROLE=readonly
```
3. Run Service
```bash
# SSE mode
mysql_mcp_server_pro --mode sse --envfile /path/to/.env
## Streamable Http mode (default)
mysql_mcp_server_pro --envfile /path/to/.env
# Streamable Http oauth Authentication
mysql_mcp_server_pro --oauth true
```
4. mcp client
go to see see "Use uv to start the service"
^_^
Note:
- The `.env` file should be placed in the directory where you run the command or use --envfile parameter to specify the path
- You can also set these variables directly in your environment
- Make sure the database configuration is correct and can connect
### Run with uvx, Client Configuration
- This method can be used directly in MCP-supported clients, no need to download the source code. For example, Tongyi Qianwen plugin, trae editor, etc.
```json
{
"mcpServers": {
"mysql": {
"command": "uvx",
"args": [
"--from",
"mysql_mcp_server_pro",
"mysql_mcp_server_pro",
"--mode",
"stdio"
],
"env": {
"MYSQL_HOST": "192.168.x.xxx",
"MYSQL_PORT": "3306",
"MYSQL_USER": "root",
"MYSQL_PASSWORD": "root",
"MYSQL_DATABASE": "a_llm",
"MYSQL_ROLE": "admin"
}
}
}
}
```
### Local Development with Streamable Http mode
- Use uv to start the service
Add the following content to your mcp client tools, such as cursor, cline, etc.
mcp json as follows:
```
{
"mcpServers": {
"mysql_mcp_server_pro": {
"name": "mysql_mcp_server_pro",
"type": "streamableHttp",
"description": "",
"isActive": true,
"url": "http://localhost:3000/mcp/"
}
}
}
```
Modify the .env file content to update the database connection information with your database details:
```
# MySQL Database Configuration
MYSQL_HOST=192.168.xxx.xxx
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=root
MYSQL_DATABASE=a_llm
MYSQL_ROLE=admin
```
Start commands:
```
# Download dependencies
uv sync
# Start
uv run -m mysql_mcp_server_pro.server
# Custom env file location
uv run -m mysql_mcp_server_pro.server --envfile /path/to/.env
# oauth Authentication
uv run -m mysql_mcp_server_pro.server --oauth true
```
### Local Development with SSE Mode
- Use uv to start the service
Add the following content to your mcp client tools, such as cursor, cline, etc.
mcp json as follows:
```
{
"mcpServers": {
"mysql_mcp_server_pro": {
"name": "mysql_mcp_server_pro",
"description": "",
"isActive": true,
"url": "http://localhost:9000/sse"
}
}
}
```
Modify the .env file content to update the database connection information with your database details:
```
# MySQL Database Configuration
MYSQL_HOST=192.168.xxx.xxx
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=root
MYSQL_DATABASE=a_llm
MYSQL_ROLE=admin
```
Start commands:
```
# Download dependencies
uv sync
# Start
uv run -m mysql_mcp_server_pro.server --mode sse
# Custom env file location
uv run -m mysql_mcp_server_pro.server --mode sse --envfile /path/to/.env
```
### Local Development with STDIO Mode
Add the following content to your mcp client tools, such as cursor, cline, etc.
mcp json as follows:
```
{
"mcpServers": {
"operateMysql": {
"isActive": true,
"name": "operateMysql",
"command": "uv",
"args": [
"--directory",
"/Volumes/mysql_mcp_server_pro/src/mysql_mcp_server_pro", # Replace this with your project path
"run",
"-m",
"mysql_mcp_server_pro.server",
"--mode",
"stdio"
],
"env": {
"MYSQL_HOST": "localhost",
"MYSQL_PORT": "3306",
"MYSQL_USER": "root",
"MYSQL_PASSWORD": "123456",
"MYSQL_DATABASE": "a_llm",
"MYSQL_ROLE": "admin"
}
}
}
}
```
## Custom Tool Extensions
1. Add a new tool class in the handles package, inherit from BaseHandler, and implement get_tool_description and run_tool methods
2. Import the new tool in __init__.py to make it available in the server
## OAuth2.0 Authentication
1. Start the authentication service. By default, it uses the built-in OAuth 2.0 password mode authentication. You can modify your own authentication service address in the env file.
```aiignore
uv run -m mysql_mcp_server_pro.server --oauth true
```
2. Visit the authentication service at http://localhost:3000/login. Default username and password are configured in the env file.

3. Copy the token and add it to the request headers, for example:

```json
{
"mcpServers": {
"mysql_mcp_server_pro": {
"name": "mysql_mcp_server_pro",
"type": "streamableHttp",
"description": "",
"isActive": true,
"url": "http://localhost:3000/mcp/",
"headers": {
"authorization": "bearer TOKEN_VALUE"
}
}
}
}
```
## Examples
1. Create a new table and insert data, prompt format as follows:
```
# Task
Create an organizational structure table with the following structure: department name, department number, parent department, is valid.
# Requirements
- Table name: department
- Common fields need indexes
- Each field needs comments, table needs comment
- Generate 5 real data records after creation
```


2. Query data based on table comments, prompt as follows:
```
Search for data with Department name 'Executive Office' in Department organizational structure table
```

3. Analyze slow SQL, prompt as follows:
```
select * from t_jcsjzx_hjkq_cd_xsz_sk xsz
left join t_jcsjzx_hjkq_jcd jcd on jcd.cddm = xsz.cddm
Based on current index situation, review execution plan and provide optimization suggestions in markdown format, including table index status, execution details, and optimization recommendations
```
4. Analyze SQL deadlock issues, prompt as follows:
```
update t_admin_rms_zzjg set sfyx = '0' where xh = '1' is stuck, please analyze the cause
```

5. Analyze the health status prompt as follows
```
Check the current health status of MySQL
```

```
--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/exception/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/cli.py:
--------------------------------------------------------------------------------
```python
from mysql_mcp_server_pro.server import main
def stdio_entry():
main()
```
--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/config/__init__.py:
--------------------------------------------------------------------------------
```python
from .dbconfig import get_db_config,get_role_permissions
__all__ = [
"get_db_config",
"get_role_permissions",
]
```
--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/prompts/__init__.py:
--------------------------------------------------------------------------------
```python
from .AnalysisMySqlIssues import AnalysisMySqlIssues
from .QueryTableData import QueryTableData
__all__ = [
"AnalysisMySqlIssues",
"QueryTableData",
]
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
mcp>=1.8.0
mysql-connector-python>=9.2.0
pymysql>=1.1.1
pypinyin>=0.54.0
python-dotenv>=1.1.0
starlette>=0.46.1
uvicorn>=0.34.0
PyJWT>=2.8.0
sqlalchemy>=2.0.0
```
--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/oauth/__init__.py:
--------------------------------------------------------------------------------
```python
from .config import oauth_config
from .token_handler import TokenHandler
from .middleware import OAuthMiddleware
from .routes import login, login_page
__all__ = [
"oauth_config",
"TokenHandler",
"OAuthMiddleware",
"login",
"login_page"
]
```
--------------------------------------------------------------------------------
/docker-compose.yml:
--------------------------------------------------------------------------------
```yaml
version: '3.8'
services:
mysql-mcp-server:
build:
context: .
dockerfile: Dockerfile
container_name: mysql-mcp-server
ports:
- "9000:9000" # 如果你的应用监听 9000 端口,请根据实际情况调整
environment:
MYSQL_HOST: ${MYSQL_HOST:-localhost}
MYSQL_PORT: ${MYSQL_PORT:-3306}
MYSQL_USER: ${MYSQL_USER:-root}
MYSQL_PASSWORD: ${MYSQL_PASSWORD:-123456}
MYSQL_DATABASE: ${MYSQL_DATABASE:-a_llm}
MYSQL_ROLE: ${MYSQL_ROLE:-admin}
PYTHONPATH: /app/src
restart: unless-stopped
command: ["python", "-m", "mysql_mcp_server_pro.server"]
```
--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/handles/__init__.py:
--------------------------------------------------------------------------------
```python
from .execute_sql import ExecuteSQL
from .get_chinese_initials import GetChineseInitials
from .get_table_desc import GetTableDesc
from .get_table_index import GetTableIndex
from .get_table_lock import GetTableLock
from .get_table_name import GetTableName
from .get_db_health_running import GetDBHealthRunning
from .get_db_health_index_usage import GetDBHealthIndexUsage
from .use_prompt_queryTableData import UsePromptQueryTableData
from .optimize_sql import OptimizeSql
__all__ = [
"ExecuteSQL",
"GetChineseInitials",
"GetTableDesc",
"GetTableIndex",
"GetTableLock",
"GetTableName",
"GetDBHealthRunning",
"GetDBHealthIndexUsage",
"UsePromptQueryTableData",
"OptimizeSql"
]
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
# Use the official Python image from the Docker Hub
FROM python:3.11-slim
# Set the working directory in the container
WORKDIR /app
# Copy the requirements file into the container
COPY requirements.txt .
# Install the dependencies specified in the requirements file
RUN pip install --no-cache-dir -r requirements.txt
# Copy the current directory contents into the container at /app
COPY src/ /app/src
# Set environment variables for MySQL (these can be overwritten with `docker run -e`)
ENV MYSQL_HOST=localhost
ENV MYSQL_PORT=3306
ENV MYSQL_USER=root
ENV MYSQL_PASSWORD=123456
ENV MYSQL_DATABASE=a_llm
ENV MYSQL_ROLE=admin
ENV PYTHONPATH=/app/src
# Command to run the server
CMD ["python", "-m", "mysql_mcp_server_pro.server"]
```
--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/handles/get_db_health_running.py:
--------------------------------------------------------------------------------
```python
from typing import Dict, Any, Sequence
from mcp import Tool
from mcp.types import TextContent
from .base import BaseHandler
from mysql_mcp_server_pro.handles import (
ExecuteSQL
)
execute_sql = ExecuteSQL()
class GetDBHealthRunning(BaseHandler):
name = "get_db_health_running"
description = (
"获取当前mysql的健康状态(Analyze MySQL health status )"
)
def get_tool_description(self) -> Tool:
return Tool(
name=self.name,
description=self.description,
inputSchema={
"type": "object",
"properties": {
}
}
)
async def run_tool(self, arguments: Dict[str, Any]) -> Sequence[TextContent]:
lock_result = await self.get_lock(arguments)
processlist_result = await self.get_processlist(arguments)
status_result = await self.get_status(arguments)
trx_result = await self.get_trx(arguments)
# 合并结果
combined_result = []
combined_result.extend(processlist_result)
combined_result.extend(lock_result)
combined_result.extend(trx_result)
combined_result.extend(status_result)
return combined_result
"""
获取连接情况
"""
async def get_processlist(self, arguments: Dict[str, Any]) -> Sequence[TextContent]:
try:
sql = "SHOW FULL PROCESSLIST;SHOW VARIABLES LIKE 'max_connections';"
return await execute_sql.run_tool({"query": sql})
except Exception as e:
return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]
"""
获取运行情况
"""
async def get_status(self, arguments: Dict[str, Any]) -> Sequence[TextContent]:
try:
sql = "SHOW ENGINE INNODB STATUS;"
return await execute_sql.run_tool({"query": sql})
except Exception as e:
return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]
"""
获取事务情况
"""
async def get_trx(self, arguments: Dict[str, Any]) -> Sequence[TextContent]:
try:
sql = "SELECT * FROM INFORMATION_SCHEMA.INNODB_TRX;"
return await execute_sql.run_tool({"query": sql})
except Exception as e:
return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]
"""
获取锁情况
"""
async def get_lock(self, arguments: Dict[str, Any]) -> Sequence[TextContent]:
try:
sql = "SHOW OPEN TABLES WHERE In_use > 0;select * from information_schema.innodb_locks;select * from information_schema.innodb_lock_waits;"
sql += "select * from performance_schema.data_lock_waits;"
sql += "select * from performance_schema.data_locks;"
return await execute_sql.run_tool({"query": sql})
except Exception as e:
return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]
```
--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/handles/get_db_health_index_usage.py:
--------------------------------------------------------------------------------
```python
from typing import Dict, Any, Sequence
from mcp import Tool
from mcp.types import TextContent
from .base import BaseHandler
from mysql_mcp_server_pro.config import get_db_config
from mysql_mcp_server_pro.handles import (
ExecuteSQL
)
execute_sql = ExecuteSQL()
class GetDBHealthIndexUsage(BaseHandler):
name = "get_db_health_index_usage"
description = (
"获取当前连接的mysql库的索引使用情况,包含冗余索引情况、性能较差的索引情况、未使用索引且查询时间大于30秒top5情况"
+ "(Get the index usage of the currently connected mysql database, including redundant index situations, "
+ "poorly performing index situations, and the top 5 unused index situations with query times greater than 30 seconds)"
)
def get_tool_description(self) -> Tool:
return Tool(
name=self.name,
description=self.description,
inputSchema={
"type": "object",
"properties": {
}
}
)
async def run_tool(self, arguments: Dict[str, Any]) -> Sequence[TextContent]:
config = get_db_config()
count_zero_result = await self.get_count_zero(arguments,config)
max_time_result = await self.get_max_timer(arguments,config)
not_used_index_result = await self.get_not_used_index(arguments,config)
# 合并结果
combined_result = []
combined_result.extend(count_zero_result)
combined_result.extend(max_time_result)
combined_result.extend(not_used_index_result)
return combined_result
"""
获取冗余索引情况
"""
async def get_count_zero(self, arguments: Dict[str, Any], config) -> Sequence[TextContent]:
try:
sql = "SELECT object_name,index_name,count_star from performance_schema.table_io_waits_summary_by_index_usage "
sql += f"WHERE object_schema = '{config['database']}' and count_star = 0 AND sum_timer_wait = 0 ;"
return await execute_sql.run_tool({"query": sql})
except Exception as e:
return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]
"""
获取性能较差的索引情况
"""
async def get_max_timer(self, arguments: Dict[str, Any], config) -> Sequence[TextContent]:
try:
sql = "SELECT object_schema,object_name,index_name,(max_timer_wait / 1000000000000) max_timer_wait "
sql += f"FROM performance_schema.table_io_waits_summary_by_index_usage where object_schema = '{config['database']}' "
sql += "and index_name is not null ORDER BY max_timer_wait DESC;"
return await execute_sql.run_tool({"query": sql})
except Exception as e:
return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]
"""
获取未使用索引查询时间大于30秒的top5情况
"""
async def get_not_used_index(self, arguments: Dict[str, Any], config) -> Sequence[TextContent]:
try:
sql = "SELECT object_schema,object_name, (max_timer_wait / 1000000000000) max_timer_wait "
sql += f"FROM performance_schema.table_io_waits_summary_by_index_usage where object_schema = '{config['database']}' "
sql += "and index_name IS null and max_timer_wait > 30000000000000 ORDER BY max_timer_wait DESC limit 5;"
return await execute_sql.run_tool({"query": sql})
except Exception as e:
return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]
```
--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/config/event_store.py:
--------------------------------------------------------------------------------
```python
"""
In-memory event store for demonstrating resumability functionality.
This is a simple implementation intended for examples and testing,
not for production use where a persistent storage solution would be more appropriate.
"""
import logging
from collections import deque
from dataclasses import dataclass
from uuid import uuid4
from mcp.server.streamable_http import (
EventCallback,
EventId,
EventMessage,
EventStore,
StreamId,
)
from mcp.types import JSONRPCMessage
logger = logging.getLogger(__name__)
@dataclass
class EventEntry:
"""
Represents an event entry in the event store.
"""
event_id: EventId
stream_id: StreamId
message: JSONRPCMessage
class InMemoryEventStore(EventStore):
"""
Simple in-memory implementation of the EventStore interface for resumability.
This is primarily intended for examples and testing, not for production use
where a persistent storage solution would be more appropriate.
This implementation keeps only the last N events per stream for memory efficiency.
"""
def __init__(self, max_events_per_stream: int = 100):
"""Initialize the event store.
Args:
max_events_per_stream: Maximum number of events to keep per stream
"""
self.max_events_per_stream = max_events_per_stream
# for maintaining last N events per stream
self.streams: dict[StreamId, deque[EventEntry]] = {}
# event_id -> EventEntry for quick lookup
self.event_index: dict[EventId, EventEntry] = {}
async def store_event(
self, stream_id: StreamId, message: JSONRPCMessage
) -> EventId:
"""Stores an event with a generated event ID."""
event_id = str(uuid4())
event_entry = EventEntry(
event_id=event_id, stream_id=stream_id, message=message
)
# Get or create deque for this stream
if stream_id not in self.streams:
self.streams[stream_id] = deque(maxlen=self.max_events_per_stream)
# If deque is full, the oldest event will be automatically removed
# We need to remove it from the event_index as well
if len(self.streams[stream_id]) == self.max_events_per_stream:
oldest_event = self.streams[stream_id][0]
self.event_index.pop(oldest_event.event_id, None)
# Add new event
self.streams[stream_id].append(event_entry)
self.event_index[event_id] = event_entry
return event_id
async def replay_events_after(
self,
last_event_id: EventId,
send_callback: EventCallback,
) -> StreamId | None:
"""Replays events that occurred after the specified event ID."""
if last_event_id not in self.event_index:
logger.warning(f"Event ID {last_event_id} not found in store")
return None
# Get the stream and find events after the last one
last_event = self.event_index[last_event_id]
stream_id = last_event.stream_id
stream_events = self.streams.get(last_event.stream_id, deque())
# Events in deque are already in chronological order
found_last = False
for event in stream_events:
if found_last:
await send_callback(EventMessage(event.message, event.event_id))
elif event.event_id == last_event_id:
found_last = True
return stream_id
```
--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/oauth/routes.py:
--------------------------------------------------------------------------------
```python
import os
import hashlib
import time
from starlette.responses import JSONResponse, HTMLResponse
from starlette.requests import Request
from pathlib import Path
from .token_handler import TokenHandler
from .config import oauth_config
async def login_page(request: Request) -> HTMLResponse:
"""返回登录页面"""
try:
templates_dir = Path(__file__).parent.parent / "templates"
login_html = templates_dir / "login.html"
with open(login_html, "r", encoding="utf-8") as f:
content = f.read()
return HTMLResponse(content)
except Exception as e:
return HTMLResponse(
content=f"<h1>错误</h1><p>加载登录页面失败: {str(e)}</p>",
status_code=500
)
async def login(request: Request) -> JSONResponse:
"""
OAuth 2.0密码模式端点
请求格式:
POST /mcp/auth/login
Content-Type: application/json
{
"grant_type": "password",
"username": "用户名",
"password": "密码",
"client_id": "客户端ID",
"client_secret": "客户端密钥"
}
"""
# 检查 Accept 头部
accept_header = request.headers.get("accept", "*/*")
if accept_header != "*/*" and "application/json" not in accept_header:
return JSONResponse(
{"error": "not_acceptable", "error_description": "Client must accept application/json response"},
status_code=406
)
try:
data = await request.json()
# 验证授权类型
grant_type = data.get("grant_type")
if grant_type not in oauth_config.GRANT_TYPES:
return JSONResponse(
{"error": "unsupported_grant_type"},
status_code=400
)
# 验证客户端凭据
client_id = data.get("client_id")
client_secret = data.get("client_secret")
if not client_id or not client_secret:
return JSONResponse(
{"error": "invalid_client"},
status_code=401
)
if client_id != oauth_config.CLIENT_ID or client_secret != oauth_config.CLIENT_SECRET:
return JSONResponse(
{"error": "invalid_client"},
status_code=401
)
if grant_type == "password":
username = data.get("username")
#password = data.get("password")
encrypted_password = data.get("password") # 从前端接收的加密密码
if not username or not encrypted_password:
return JSONResponse(
{"error": "invalid_request", "error_description": "Missing username or password"},
status_code=400
)
# 获取时间戳和盐值
timestamp = request.headers.get("X-Timestamp")
salt = request.headers.get("X-Salt")
if not timestamp or not salt:
return JSONResponse(
{"error": "invalid_request", "error_description": "Missing security parameters"},
status_code=400
)
# 验证时间戳是否在有效期内(5分钟)
try:
ts = int(timestamp) / 1000 # 转换为秒
current_time = time.time()
if current_time - ts > 300: # 5分钟过期
return JSONResponse(
{"error": "invalid_request", "error_description": "Request expired"},
status_code=400
)
except (ValueError, TypeError):
return JSONResponse(
{"error": "invalid_request", "error_description": "Invalid timestamp"},
status_code=400
)
# 验证密码
# 1. 使用与前端相同的加密方式验证
# 第一次哈希:密码 + 盐
first_hash = hashlib.sha256((os.getenv("OAUTH_USER_PASSWORD", "admin") + salt).encode()).hexdigest()
# 第二次哈希:第一次哈希结果 + 时间戳
expected_hash = hashlib.sha256((first_hash + timestamp).encode()).hexdigest()
# 这里应该添加实际的用户验证逻辑
if username == os.getenv("OAUTH_USER_NAME", "admin") and encrypted_password == expected_hash :
# 生成令牌
access_token, refresh_token, access_expires, refresh_expires = TokenHandler.create_tokens(
user_id="1", # 这里应该是实际的用户ID
username=username
)
# 返回标准的OAuth2.0响应
return JSONResponse(
TokenHandler.create_token_response(
access_token,
refresh_token,
access_expires,
refresh_expires
)
)
return JSONResponse(
{"error": "invalid_grant"},
status_code=401
)
elif grant_type == "refresh_token":
refresh_token = data.get("refresh_token")
if not refresh_token:
return JSONResponse(
{"error": "invalid_request"},
status_code=400
)
# 验证刷新令牌
payload = TokenHandler.verify_token(refresh_token)
if not payload or payload.get("type") != "refresh_token":
return JSONResponse(
{"error": "invalid_grant"},
status_code=401
)
# 生成新的令牌
access_token, new_refresh_token, access_expires, refresh_expires = TokenHandler.create_tokens(
user_id=payload["sub"],
username=payload["username"]
)
return JSONResponse(
TokenHandler.create_token_response(
access_token,
new_refresh_token,
access_expires,
refresh_expires
)
)
except Exception as e:
return JSONResponse(
{"error": "server_error", "error_description": str(e)},
status_code=500
)
```
--------------------------------------------------------------------------------
/src/mysql_mcp_server_pro/templates/login.html:
--------------------------------------------------------------------------------
```html
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>MySQL MCP Server Pro - 登录</title>
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600&display=swap" rel="stylesheet">
<!-- 添加 crypto-js 库 -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/crypto-js/4.1.1/crypto-js.min.js"></script>
<style>
:root {
--primary-color: #2196F3;
--primary-dark: #1976D2;
--text-primary: #212121;
--text-secondary: #757575;
--background: #f5f5f5;
--surface: #FFFFFF;
--error: #F44336;
--success: #4CAF50;
}
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif;
background: var(--background);
color: var(--text-primary);
line-height: 1.6;
min-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
}
/* 登录表单样式 */
.login-container {
background: white;
padding: 2rem;
border-radius: 8px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
width: 100%;
max-width: 400px;
}
.header {
text-align: center;
margin-bottom: 2rem;
}
.title {
font-size: 1.5rem;
color: var(--text-primary);
margin-bottom: 0.5rem;
}
.author {
color: var(--text-secondary);
font-size: 0.9rem;
}
.form-group {
margin-bottom: 1.5rem;
}
label {
display: block;
margin-bottom: 0.5rem;
color: var(--text-primary);
font-weight: 500;
}
input {
width: 100%;
padding: 0.75rem;
border: 1px solid #ddd;
border-radius: 6px;
font-size: 0.95rem;
}
input:focus {
outline: none;
border-color: var(--primary-color);
}
.submit-button {
width: 100%;
padding: 0.75rem;
background: var(--primary-color);
color: white;
border: none;
border-radius: 6px;
font-size: 1rem;
font-weight: 500;
cursor: pointer;
}
.submit-button:hover {
background: var(--primary-dark);
}
.error-message {
color: var(--error);
margin-top: 1rem;
text-align: center;
display: none;
}
/* Token展示界面样式 */
.token-container {
display: none;
position: fixed;
top: 0;
left: 0;
right: 0;
bottom: 0;
background: var(--background);
padding: 1.5rem;
}
.token-content {
max-width: 900px;
margin: 0 auto;
}
.token-header {
margin-bottom: 1rem;
text-align: center;
}
.token-header h1 {
font-size: 1.25rem;
margin-bottom: 0.25rem;
}
.token-header .subtitle {
font-size: 0.9rem;
color: var(--text-secondary);
}
.token-section {
background: white;
padding: 1.25rem;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.05);
margin-bottom: 0.75rem;
}
.token-section h2 {
font-size: 1rem;
margin-bottom: 0.75rem;
display: flex;
align-items: center;
gap: 0.5rem;
}
.token-value {
font-family: 'Monaco', 'Consolas', monospace;
word-break: break-all;
background: var(--background);
padding: 0.75rem;
border-radius: 6px;
border: 1px solid #ddd;
font-size: 0.85rem;
margin-bottom: 0.5rem;
line-height: 1.3;
}
.token-info {
margin: 0.5rem 0;
color: var(--text-secondary);
font-size: 0.85rem;
}
.token-info p {
margin: 0.25rem 0;
display: flex;
align-items: center;
gap: 0.5rem;
}
.copy-button {
background: var(--success);
color: white;
border: none;
padding: 0.5rem 1rem;
border-radius: 4px;
cursor: pointer;
font-size: 0.85rem;
display: inline-flex;
align-items: center;
gap: 0.5rem;
}
.copy-button:hover {
opacity: 0.9;
}
.code-block {
background: #1E1E1E;
color: #E0E0E0;
padding: 0.75rem;
border-radius: 6px;
overflow-x: auto;
font-family: 'Monaco', 'Consolas', monospace;
font-size: 0.8rem;
line-height: 1.3;
margin: 0.5rem 0;
}
.github-card {
text-align: center;
padding: 0.75rem;
background: linear-gradient(135deg, #2196F3 0%, #1976D2 100%);
color: white;
border-radius: 6px;
margin-top: 0.5rem;
font-size: 0.85rem;
}
.github-card a {
color: white;
text-decoration: none;
font-weight: 500;
display: inline-flex;
align-items: center;
gap: 0.25rem;
}
.github-card a:hover {
text-decoration: underline;
}
</style>
</head>
<body>
<!-- 登录表单 -->
<div class="login-container">
<div class="header">
<h1 class="title">MySQL MCP Server Pro</h1>
<div class="author">by wenb1n</div>
</div>
<form id="loginForm">
<div class="form-group">
<label for="username">用户名</label>
<input type="text" id="username" name="username" required>
</div>
<div class="form-group">
<label for="password">密码</label>
<input type="password" id="password" name="password" required>
</div>
<button type="submit" class="submit-button">登录</button>
</form>
<div id="errorMessage" class="error-message"></div>
</div>
<!-- Token展示界面 -->
<div id="tokenContainer" class="token-container">
<div class="token-content">
<div class="token-header">
<h1>MySQL MCP Server Pro</h1>
<div class="subtitle">by wenb1n</div>
</div>
<div class="token-section">
<h2>🔑 访问令牌 (Access Token)</h2>
<div id="tokenValue" class="token-value"></div>
<div class="token-info">
<p>⏰ 过期时间:<span id="expireTime"></span></p>
</div>
<button id="copyButton" class="copy-button">📋 复制Token</button>
</div>
<div class="token-section">
<h2>🔄 刷新令牌 (Refresh Token)</h2>
<div id="refreshTokenValue" class="token-value"></div>
<div class="token-info">
<p>使用刷新令牌获取新的访问令牌:</p>
</div>
<div class="code-block">
<pre id="refreshTokenExample">curl -X POST http://localhost:3000/mcp/auth/login \
-H "Content-Type: application/json" \
-d '{
"grant_type": "refresh_token",
"refresh_token": "your-refresh-token",
"client_id": "mysql-mcp-client",
"client_secret": "mysql-mcp-secret"
}'</pre>
</div>
</div>
<div class="github-card">
<a href="https://github.com/wenb1n-dev/mysql_mcp_server_pro" target="_blank">
⭐️ 如果觉得好用,请帮忙点个 Star 支持一下!
</a>
</div>
</div>
</div>
<script>
// 生成随机盐值
function generateSalt(length = 16) {
const chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
let salt = '';
for (let i = 0; i < length; i++) {
salt += chars.charAt(Math.floor(Math.random() * chars.length));
}
return salt;
}
// 加密密码
function encryptPassword(password, salt, timestamp) {
// 第一次哈希:密码 + 盐
const firstHash = CryptoJS.SHA256(password + salt).toString();
// 第二次哈希:第一次哈希结果 + 时间戳
const finalHash = CryptoJS.SHA256(firstHash + timestamp).toString();
return finalHash;
}
document.getElementById('loginForm').addEventListener('submit', async (e) => {
e.preventDefault();
const username = document.getElementById('username').value;
const password = document.getElementById('password').value;
try {
// 生成安全参数
const salt = generateSalt();
const timestamp = Date.now().toString();
const encryptedPassword = encryptPassword(password, salt, timestamp);
const response = await fetch('/mcp/auth/login', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'X-Timestamp': timestamp, // 添加时间戳头
'X-Salt': salt, // 添加盐值头
},
body: JSON.stringify({
grant_type: 'password',
username: username,
password: encryptedPassword, // 发送加密后的密码
client_id: 'mysql-mcp-client',
client_secret: 'mysql-mcp-secret'
})
});
const data = await response.json();
if (response.ok) {
// 显示token信息
const tokenContainer = document.getElementById('tokenContainer');
const tokenValue = document.getElementById('tokenValue');
const expireTime = document.getElementById('expireTime');
const refreshTokenValue = document.getElementById('refreshTokenValue');
tokenValue.textContent = data.access_token;
expireTime.innerHTML = `
<div style="display: flex; flex-direction: column; gap: 0.25rem;">
<div>🔑 访问令牌过期时间: ${data.expire_time}</div>
<div>🔄 刷新令牌过期时间: ${data.refresh_token_expire_time}</div>
</div>
`;
refreshTokenValue.textContent = data.refresh_token;
// 更新示例代码中的刷新令牌
const example = document.getElementById('refreshTokenExample');
example.textContent = example.textContent.replace('your-refresh-token', data.refresh_token);
tokenContainer.style.display = 'block';
document.querySelector('.login-container').style.display = 'none';
} else {
const errorMessage = document.getElementById('errorMessage');
errorMessage.textContent = data.error_description || data.error || '登录失败';
errorMessage.style.display = 'block';
}
} catch (error) {
const errorMessage = document.getElementById('errorMessage');
errorMessage.textContent = '网络错误,请稍后重试';
errorMessage.style.display = 'block';
}
});
document.getElementById('copyButton').addEventListener('click', function() {
const token = document.getElementById('tokenValue').textContent;
if (token) {
navigator.clipboard.writeText(token).then(() => {
this.textContent = '✓ 已复制';
setTimeout(() => {
this.innerHTML = '📋 复制Token';
}, 2000);
}).catch(() => {
alert('复制失败,请手动复制');
});
}
});
</script>
</body>
</html>
```