# Directory Structure
```
├── .gitignore
├── .python-version
├── pyproject.toml
├── README.md
├── sql_mcp.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.13
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# MCP SQL Server
A FastMCP server that provides SQL database interaction tools via a conversational AI interface.
## Overview
This project creates a server that exposes MS SQL Server operations through a conversational AI interface. It uses the FastMCP framework to provide tools for querying and manipulating SQL data, allowing users to interact with databases using natural language.
## Features
- Execute SQL queries and view results
- List available tables in the database
- Describe table structure with column information
- Execute non-query operations (INSERT, UPDATE, DELETE)
- List available ODBC drivers on the system
- View database information and server details
## Requirements
- Python 3.7+
- pyodbc
- asyncio
- FastMCP framework
- Microsoft SQL Server
- ODBC Driver 17 for SQL Server
## Installation
1. Install Python dependencies:
```bash
pip install pyodbc asyncio fastmcp
```
2. Ensure you have Microsoft SQL Server installed and the ODBC Driver 17 for SQL Server.
3. Configure the connection settings in the script:
```python
# Connection parameters
SERVER = "server\\instance" # Change to your SQL Server instance
DATABASE = "db_name" # Change to your database name
```
## Usage
Run the server:
```bash
python mcp_sql_server.py
```
The server will initialize and establish a connection to the specified SQL Server database.
## Available Tools
### query_sql
Execute a SQL query and return the results.
```
query_sql(query: str = None) -> str
```
- If no query is provided, it defaults to `SELECT * FROM [dbo].[Table_1]`
- Returns query results as a formatted string
### list_tables
List all tables available in the database.
```
list_tables() -> str
```
- Returns a list of table names as a string
### describe_table
Get the structure of a specific table.
```
describe_table(table_name: str) -> str
```
- `table_name`: Name of the table to describe
- Returns column information including names and data types
### execute_nonquery
Execute INSERT, UPDATE, DELETE or other non-query SQL statements.
```
execute_nonquery(sql: str) -> str
```
- `sql`: The SQL statement to execute
- Returns operation results, including number of affected rows
- Automatically handles transactions (commit/rollback)
### list_odbc_drivers
List all available ODBC drivers on the system.
```
list_odbc_drivers() -> str
```
- Returns a comma-separated list of installed ODBC drivers
### database_info
Get general information about the connected database.
```
database_info() -> str
```
- Returns server name, database name, SQL Server version, current server time, and table count
## Architecture
The server uses an asynchronous architecture to avoid blocking operations:
1. **Lifecycle Management**: The `app_lifespan` context manager handles database connection setup and teardown.
2. **Non-blocking Operations**: Database operations run in a separate thread using `asyncio.get_event_loop().run_in_executor()` to prevent blocking the main event loop.
3. **Error Handling**: All operations include comprehensive error handling with useful error messages.
## Error Handling
The server handles various error conditions:
- Database connection failures
- SQL query syntax errors
- Table not found errors
- Permission-related issues
All errors are logged and appropriate error messages are returned to the client.
## Customization
To add new database tools or modify existing ones, follow the pattern used in the existing tools:
```python
@mcp.tool()
async def your_new_tool(ctx: Context, param1: str) -> str:
"""Documentation for your tool"""
try:
conn = ctx.request_context.lifespan_context["conn"]
if conn is None:
return "Database connection is not available."
def your_db_operation():
# Your database operations here
pass
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, your_db_operation)
# Process and return results
return "Your result"
except Exception as e:
return f"Error: {str(e)}"
```
## Security Considerations
- The server uses Windows Authentication ("Trusted_Connection=yes")
- Consider implementing input validation for SQL queries to prevent SQL injection
- Restrict database user permissions based on the principle of least privilege
## Troubleshooting
Common issues:
1. **Connection errors**: Verify the SQL Server instance name and ensure it's running
2. **ODBC driver errors**: Confirm ODBC Driver 17 for SQL Server is installed
3. **Permission errors**: Check that the Windows user running the application has appropriate SQL Server permissions
## License
[Your License Information]
## Contact
[Your Contact Information]
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "sql-mcp-server"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"mcp[cli]>=1.5.0",
"pyodbc",
]
```
--------------------------------------------------------------------------------
/sql_mcp.py:
--------------------------------------------------------------------------------
```python
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
import logging
import sys
import pyodbc
import asyncio
from mcp.server.fastmcp import Context, FastMCP
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("server_module")
# Connection parameters
SERVER = "LEGION\\SQLEXPRESS"
DATABASE = "test"
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[dict]:
"""Manage application lifecycle with type-safe context"""
logger.debug("Initializing database connection")
conn = None
try:
# Connect using a loop.run_in_executor to avoid blocking
def connect_db():
connection_string = (
f"DRIVER={{ODBC Driver 17 for SQL Server}};"
f"SERVER={SERVER};"
f"DATABASE={DATABASE};"
f"Trusted_Connection=yes;"
)
logger.debug(f"Connection string: {connection_string}")
return pyodbc.connect(connection_string)
loop = asyncio.get_event_loop()
conn = await loop.run_in_executor(None, connect_db)
logger.debug("Database connection established successfully")
# Yield a dictionary instead of a dataclass to match example
yield {"conn": conn}
except Exception as e:
logger.error(f"Database connection error: {type(e).__name__}: {str(e)}", exc_info=True)
# Continue without database but with empty dict
yield {"conn": None}
finally:
if conn:
logger.debug("Closing database connection")
await asyncio.get_event_loop().run_in_executor(None, conn.close)
# Create an MCP server with the lifespan
mcp = FastMCP("My MS SQL Integrated App", lifespan=app_lifespan)
@mcp.tool()
async def query_sql(ctx: Context, query: str = None) -> str:
"""
Tool to query the SQL database with a custom query.
Args:
query: The SQL query to execute. If not provided, will run a default query.
Returns:
The query results as a string.
"""
try:
# Access the connection using dictionary access
conn = ctx.request_context.lifespan_context["conn"]
if conn is None:
return "Database connection is not available. Check server logs for details."
# Use default query if none provided
if not query:
query = "SELECT * FROM [dbo].[Table_1]"
logger.debug(f"Executing query: {query}")
# Execute query in a non-blocking way
def run_query():
cursor = conn.cursor()
try:
cursor.execute(query)
if cursor.description: # Check if the query returns results
columns = [column[0] for column in cursor.description]
results = []
for row in cursor.fetchall():
results.append(dict(zip(columns, row)))
return {"success": True, "results": results, "rowCount": len(results)}
else:
# For non-SELECT queries (INSERT, UPDATE, etc.)
return {"success": True, "rowCount": cursor.rowcount, "message": f"Query affected {cursor.rowcount} rows"}
except Exception as e:
return {"success": False, "error": str(e)}
finally:
cursor.close()
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, run_query)
if result["success"]:
if "results" in result:
return f"Query results: {result['results']}"
else:
return result["message"]
else:
return f"Query error: {result['error']}"
except Exception as e:
logger.error(f"Query execution error: {type(e).__name__}: {str(e)}")
return f"Error: {str(e)}"
@mcp.tool()
async def list_tables(ctx: Context) -> str:
"""List all tables in the database that can be queried."""
try:
conn = ctx.request_context.lifespan_context["conn"]
if conn is None:
return "Database connection is not available."
def get_tables():
cursor = conn.cursor()
cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE'")
tables = [row[0] for row in cursor.fetchall()]
cursor.close()
return tables
loop = asyncio.get_event_loop()
tables = await loop.run_in_executor(None, get_tables)
return f"Available tables: {tables}"
except Exception as e:
return f"Error listing tables: {str(e)}"
@mcp.tool()
async def describe_table(ctx: Context, table_name: str) -> str:
"""
Get the structure of a specific table.
Args:
table_name: Name of the table to describe
Returns:
Column information for the specified table
"""
try:
conn = ctx.request_context.lifespan_context["conn"]
if conn is None:
return "Database connection is not available."
def get_structure():
cursor = conn.cursor()
cursor.execute(f"SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'")
columns = []
for row in cursor.fetchall():
col_name, data_type, max_length = row
if max_length:
columns.append(f"{col_name} ({data_type}({max_length}))")
else:
columns.append(f"{col_name} ({data_type})")
cursor.close()
return columns
loop = asyncio.get_event_loop()
structure = await loop.run_in_executor(None, get_structure)
if structure:
return f"Structure of table '{table_name}':\n" + "\n".join(structure)
else:
return f"Table '{table_name}' not found or has no columns."
except Exception as e:
return f"Error describing table: {str(e)}"
@mcp.tool()
async def execute_nonquery(ctx: Context, sql: str) -> str:
"""
Execute a non-query SQL statement (INSERT, UPDATE, DELETE, etc.).
Args:
sql: The SQL statement to execute
Returns:
Result of the operation
"""
try:
conn = ctx.request_context.lifespan_context["conn"]
if conn is None:
return "Database connection is not available."
def run_nonquery():
try:
cursor = conn.cursor()
cursor.execute(sql)
row_count = cursor.rowcount
# Commit changes
conn.commit()
cursor.close()
return {"success": True, "rowCount": row_count}
except Exception as e:
# Rollback in case of error
conn.rollback()
return {"success": False, "error": str(e)}
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, run_nonquery)
if result["success"]:
return f"Operation successful. Rows affected: {result['rowCount']}"
else:
return f"Operation failed: {result['error']}"
except Exception as e:
return f"Error executing SQL: {str(e)}"
@mcp.tool()
async def list_odbc_drivers(ctx: Context) -> str:
"""List available ODBC drivers on the system"""
try:
def get_drivers():
return pyodbc.drivers()
drivers = await asyncio.get_event_loop().run_in_executor(None, get_drivers)
return f"Available ODBC drivers: {', '.join(drivers)}"
except Exception as e:
return f"Error listing drivers: {str(e)}"
@mcp.tool()
async def database_info(ctx: Context) -> str:
"""Get general information about the connected database"""
try:
conn = ctx.request_context.lifespan_context["conn"]
if conn is None:
return "Database connection is not available."
def get_info():
cursor = conn.cursor()
# Get SQL Server version
cursor.execute("SELECT @@VERSION")
version = cursor.fetchone()[0]
# Get database name and size
cursor.execute("""
SELECT
DB_NAME() AS DatabaseName,
CONVERT(VARCHAR(50), GETDATE(), 120) AS CurrentDateTime,
(SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE') AS TableCount
""")
db_info = cursor.fetchone()
cursor.close()
return {
"version": version,
"database": db_info[0],
"current_time": db_info[1],
"table_count": db_info[2]
}
loop = asyncio.get_event_loop()
info = await loop.run_in_executor(None, get_info)
return (
f"Database Information:\n"
f"Server: {SERVER}\n"
f"Database: {info['database']}\n"
f"Server Version: {info['version'].split('\\n')[0]}\n"
f"Current Server Time: {info['current_time']}\n"
f"Number of Tables: {info['table_count']}"
)
except Exception as e:
return f"Error getting database info: {str(e)}"
# Run the server
if __name__ == "__main__":
try:
logger.info("Starting MCP server")
mcp.run()
except Exception as e:
logger.critical(f"Server startup failed: {e}", exc_info=True)
sys.exit(1)
```