#
tokens: 3255/50000 5/5 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── pyproject.toml
├── README.md
├── sql_mcp.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.13

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info

# Virtual environments
.venv

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# MCP SQL Server

A FastMCP server that provides SQL database interaction tools via a conversational AI interface.

## Overview

This project creates a server that exposes MS SQL Server operations through a conversational AI interface. It uses the FastMCP framework to provide tools for querying and manipulating SQL data, allowing users to interact with databases using natural language.

## Features

- Execute SQL queries and view results
- List available tables in the database
- Describe table structure with column information
- Execute non-query operations (INSERT, UPDATE, DELETE)
- List available ODBC drivers on the system
- View database information and server details

## Requirements

- Python 3.7+
- pyodbc
- asyncio
- FastMCP framework
- Microsoft SQL Server
- ODBC Driver 17 for SQL Server

## Installation

1. Install Python dependencies:

```bash
pip install pyodbc asyncio fastmcp
```

2. Ensure you have Microsoft SQL Server installed and the ODBC Driver 17 for SQL Server.

3. Configure the connection settings in the script:

```python
# Connection parameters
SERVER = "server\\instance"  # Change to your SQL Server instance
DATABASE = "db_name"              # Change to your database name
```

## Usage

Run the server:

```bash
python mcp_sql_server.py
```

The server will initialize and establish a connection to the specified SQL Server database.

## Available Tools

### query_sql

Execute a SQL query and return the results.

```
query_sql(query: str = None) -> str
```

- If no query is provided, it defaults to `SELECT * FROM [dbo].[Table_1]`
- Returns query results as a formatted string

### list_tables

List all tables available in the database.

```
list_tables() -> str
```

- Returns a list of table names as a string

### describe_table

Get the structure of a specific table.

```
describe_table(table_name: str) -> str
```

- `table_name`: Name of the table to describe
- Returns column information including names and data types

### execute_nonquery

Execute INSERT, UPDATE, DELETE or other non-query SQL statements.

```
execute_nonquery(sql: str) -> str
```

- `sql`: The SQL statement to execute
- Returns operation results, including number of affected rows
- Automatically handles transactions (commit/rollback)

### list_odbc_drivers

List all available ODBC drivers on the system.

```
list_odbc_drivers() -> str
```

- Returns a comma-separated list of installed ODBC drivers

### database_info

Get general information about the connected database.

```
database_info() -> str
```

- Returns server name, database name, SQL Server version, current server time, and table count

## Architecture

The server uses an asynchronous architecture to avoid blocking operations:

1. **Lifecycle Management**: The `app_lifespan` context manager handles database connection setup and teardown.

2. **Non-blocking Operations**: Database operations run in a separate thread using `asyncio.get_event_loop().run_in_executor()` to prevent blocking the main event loop.

3. **Error Handling**: All operations include comprehensive error handling with useful error messages.

## Error Handling

The server handles various error conditions:

- Database connection failures
- SQL query syntax errors
- Table not found errors
- Permission-related issues

All errors are logged and appropriate error messages are returned to the client.

## Customization

To add new database tools or modify existing ones, follow the pattern used in the existing tools:

```python
@mcp.tool()
async def your_new_tool(ctx: Context, param1: str) -> str:
    """Documentation for your tool"""
    try:
        conn = ctx.request_context.lifespan_context["conn"]
        
        if conn is None:
            return "Database connection is not available."
            
        def your_db_operation():
            # Your database operations here
            pass
            
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(None, your_db_operation)
        
        # Process and return results
        return "Your result"
    except Exception as e:
        return f"Error: {str(e)}"
```

## Security Considerations

- The server uses Windows Authentication ("Trusted_Connection=yes")
- Consider implementing input validation for SQL queries to prevent SQL injection
- Restrict database user permissions based on the principle of least privilege

## Troubleshooting

Common issues:

1. **Connection errors**: Verify the SQL Server instance name and ensure it's running
2. **ODBC driver errors**: Confirm ODBC Driver 17 for SQL Server is installed
3. **Permission errors**: Check that the Windows user running the application has appropriate SQL Server permissions

## License

[Your License Information]

## Contact

[Your Contact Information]

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "sql-mcp-server"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
    "mcp[cli]>=1.5.0",
    "pyodbc",
]

```

--------------------------------------------------------------------------------
/sql_mcp.py:
--------------------------------------------------------------------------------

```python
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
import logging
import sys
import pyodbc
import asyncio
from mcp.server.fastmcp import Context, FastMCP

# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("server_module")

# Connection parameters
SERVER = "LEGION\\SQLEXPRESS"
DATABASE = "test"

@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[dict]:
    """Manage application lifecycle with type-safe context"""
    logger.debug("Initializing database connection")
    conn = None
    
    try:
        # Connect using a loop.run_in_executor to avoid blocking
        def connect_db():
            connection_string = (
                f"DRIVER={{ODBC Driver 17 for SQL Server}};"
                f"SERVER={SERVER};"
                f"DATABASE={DATABASE};"
                f"Trusted_Connection=yes;"
            )
            logger.debug(f"Connection string: {connection_string}")
            return pyodbc.connect(connection_string)
            
        loop = asyncio.get_event_loop()
        conn = await loop.run_in_executor(None, connect_db)
        logger.debug("Database connection established successfully")
        
        # Yield a dictionary instead of a dataclass to match example
        yield {"conn": conn}
    except Exception as e:
        logger.error(f"Database connection error: {type(e).__name__}: {str(e)}", exc_info=True)
        # Continue without database but with empty dict
        yield {"conn": None}
    finally:
        if conn:
            logger.debug("Closing database connection")
            await asyncio.get_event_loop().run_in_executor(None, conn.close)

# Create an MCP server with the lifespan
mcp = FastMCP("My MS SQL Integrated App", lifespan=app_lifespan)

@mcp.tool()
async def query_sql(ctx: Context, query: str = None) -> str:
    """
    Tool to query the SQL database with a custom query.
    
    Args:
        query: The SQL query to execute. If not provided, will run a default query.
    
    Returns:
        The query results as a string.
    """
    try:
        # Access the connection using dictionary access
        conn = ctx.request_context.lifespan_context["conn"]
        
        if conn is None:
            return "Database connection is not available. Check server logs for details."
        
        # Use default query if none provided
        if not query:
            query = "SELECT * FROM [dbo].[Table_1]"
            
        logger.debug(f"Executing query: {query}")
        
        # Execute query in a non-blocking way
        def run_query():
            cursor = conn.cursor()
            try:
                cursor.execute(query)
                if cursor.description:  # Check if the query returns results
                    columns = [column[0] for column in cursor.description]
                    results = []
                    for row in cursor.fetchall():
                        results.append(dict(zip(columns, row)))
                    return {"success": True, "results": results, "rowCount": len(results)}
                else:
                    # For non-SELECT queries (INSERT, UPDATE, etc.)
                    return {"success": True, "rowCount": cursor.rowcount, "message": f"Query affected {cursor.rowcount} rows"}
            except Exception as e:
                return {"success": False, "error": str(e)}
            finally:
                cursor.close()
            
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(None, run_query)
        
        if result["success"]:
            if "results" in result:
                return f"Query results: {result['results']}"
            else:
                return result["message"]
        else:
            return f"Query error: {result['error']}"
    except Exception as e:
        logger.error(f"Query execution error: {type(e).__name__}: {str(e)}")
        return f"Error: {str(e)}"

@mcp.tool()
async def list_tables(ctx: Context) -> str:
    """List all tables in the database that can be queried."""
    try:
        conn = ctx.request_context.lifespan_context["conn"]
        
        if conn is None:
            return "Database connection is not available."
            
        def get_tables():
            cursor = conn.cursor()
            cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE'")
            tables = [row[0] for row in cursor.fetchall()]
            cursor.close()
            return tables
            
        loop = asyncio.get_event_loop()
        tables = await loop.run_in_executor(None, get_tables)
        
        return f"Available tables: {tables}"
    except Exception as e:
        return f"Error listing tables: {str(e)}"

@mcp.tool()
async def describe_table(ctx: Context, table_name: str) -> str:
    """
    Get the structure of a specific table.
    
    Args:
        table_name: Name of the table to describe
        
    Returns:
        Column information for the specified table
    """
    try:
        conn = ctx.request_context.lifespan_context["conn"]
        
        if conn is None:
            return "Database connection is not available."
            
        def get_structure():
            cursor = conn.cursor()
            cursor.execute(f"SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'")
            columns = []
            for row in cursor.fetchall():
                col_name, data_type, max_length = row
                if max_length:
                    columns.append(f"{col_name} ({data_type}({max_length}))")
                else:
                    columns.append(f"{col_name} ({data_type})")
            cursor.close()
            return columns
            
        loop = asyncio.get_event_loop()
        structure = await loop.run_in_executor(None, get_structure)
        
        if structure:
            return f"Structure of table '{table_name}':\n" + "\n".join(structure)
        else:
            return f"Table '{table_name}' not found or has no columns."
    except Exception as e:
        return f"Error describing table: {str(e)}"

@mcp.tool()
async def execute_nonquery(ctx: Context, sql: str) -> str:
    """
    Execute a non-query SQL statement (INSERT, UPDATE, DELETE, etc.).
    
    Args:
        sql: The SQL statement to execute
        
    Returns:
        Result of the operation
    """
    try:
        conn = ctx.request_context.lifespan_context["conn"]
        
        if conn is None:
            return "Database connection is not available."
            
        def run_nonquery():
            try:
                cursor = conn.cursor()
                cursor.execute(sql)
                row_count = cursor.rowcount
                # Commit changes
                conn.commit()
                cursor.close()
                return {"success": True, "rowCount": row_count}
            except Exception as e:
                # Rollback in case of error
                conn.rollback()
                return {"success": False, "error": str(e)}
            
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(None, run_nonquery)
        
        if result["success"]:
            return f"Operation successful. Rows affected: {result['rowCount']}"
        else:
            return f"Operation failed: {result['error']}"
    except Exception as e:
        return f"Error executing SQL: {str(e)}"

@mcp.tool()
async def list_odbc_drivers(ctx: Context) -> str:
    """List available ODBC drivers on the system"""
    try:
        def get_drivers():
            return pyodbc.drivers()
            
        drivers = await asyncio.get_event_loop().run_in_executor(None, get_drivers)
        return f"Available ODBC drivers: {', '.join(drivers)}"
    except Exception as e:
        return f"Error listing drivers: {str(e)}"

@mcp.tool()
async def database_info(ctx: Context) -> str:
    """Get general information about the connected database"""
    try:
        conn = ctx.request_context.lifespan_context["conn"]
        
        if conn is None:
            return "Database connection is not available."
            
        def get_info():
            cursor = conn.cursor()
            
            # Get SQL Server version
            cursor.execute("SELECT @@VERSION")
            version = cursor.fetchone()[0]
            
            # Get database name and size
            cursor.execute("""
                SELECT 
                    DB_NAME() AS DatabaseName,
                    CONVERT(VARCHAR(50), GETDATE(), 120) AS CurrentDateTime,
                    (SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE') AS TableCount
            """)
            db_info = cursor.fetchone()
            
            cursor.close()
            return {
                "version": version,
                "database": db_info[0],
                "current_time": db_info[1],
                "table_count": db_info[2]
            }
            
        loop = asyncio.get_event_loop()
        info = await loop.run_in_executor(None, get_info)
        
        return (
            f"Database Information:\n"
            f"Server: {SERVER}\n"
            f"Database: {info['database']}\n"
            f"Server Version: {info['version'].split('\\n')[0]}\n"
            f"Current Server Time: {info['current_time']}\n"
            f"Number of Tables: {info['table_count']}"
        )
    except Exception as e:
        return f"Error getting database info: {str(e)}"

# Run the server
if __name__ == "__main__":
    try:
        logger.info("Starting MCP server")
        mcp.run()
    except Exception as e:
        logger.critical(f"Server startup failed: {e}", exc_info=True)
        sys.exit(1)
```