# Directory Structure ``` ├── .gitignore ├── .python-version ├── pyproject.toml ├── README.md ├── sql_mcp.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.13 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # MCP SQL Server A FastMCP server that provides SQL database interaction tools via a conversational AI interface. ## Overview This project creates a server that exposes MS SQL Server operations through a conversational AI interface. It uses the FastMCP framework to provide tools for querying and manipulating SQL data, allowing users to interact with databases using natural language. ## Features - Execute SQL queries and view results - List available tables in the database - Describe table structure with column information - Execute non-query operations (INSERT, UPDATE, DELETE) - List available ODBC drivers on the system - View database information and server details ## Requirements - Python 3.7+ - pyodbc - asyncio - FastMCP framework - Microsoft SQL Server - ODBC Driver 17 for SQL Server ## Installation 1. Install Python dependencies: ```bash pip install pyodbc asyncio fastmcp ``` 2. Ensure you have Microsoft SQL Server installed and the ODBC Driver 17 for SQL Server. 3. Configure the connection settings in the script: ```python # Connection parameters SERVER = "server\\instance" # Change to your SQL Server instance DATABASE = "db_name" # Change to your database name ``` ## Usage Run the server: ```bash python mcp_sql_server.py ``` The server will initialize and establish a connection to the specified SQL Server database. ## Available Tools ### query_sql Execute a SQL query and return the results. ``` query_sql(query: str = None) -> str ``` - If no query is provided, it defaults to `SELECT * FROM [dbo].[Table_1]` - Returns query results as a formatted string ### list_tables List all tables available in the database. ``` list_tables() -> str ``` - Returns a list of table names as a string ### describe_table Get the structure of a specific table. ``` describe_table(table_name: str) -> str ``` - `table_name`: Name of the table to describe - Returns column information including names and data types ### execute_nonquery Execute INSERT, UPDATE, DELETE or other non-query SQL statements. ``` execute_nonquery(sql: str) -> str ``` - `sql`: The SQL statement to execute - Returns operation results, including number of affected rows - Automatically handles transactions (commit/rollback) ### list_odbc_drivers List all available ODBC drivers on the system. ``` list_odbc_drivers() -> str ``` - Returns a comma-separated list of installed ODBC drivers ### database_info Get general information about the connected database. ``` database_info() -> str ``` - Returns server name, database name, SQL Server version, current server time, and table count ## Architecture The server uses an asynchronous architecture to avoid blocking operations: 1. **Lifecycle Management**: The `app_lifespan` context manager handles database connection setup and teardown. 2. **Non-blocking Operations**: Database operations run in a separate thread using `asyncio.get_event_loop().run_in_executor()` to prevent blocking the main event loop. 3. **Error Handling**: All operations include comprehensive error handling with useful error messages. ## Error Handling The server handles various error conditions: - Database connection failures - SQL query syntax errors - Table not found errors - Permission-related issues All errors are logged and appropriate error messages are returned to the client. ## Customization To add new database tools or modify existing ones, follow the pattern used in the existing tools: ```python @mcp.tool() async def your_new_tool(ctx: Context, param1: str) -> str: """Documentation for your tool""" try: conn = ctx.request_context.lifespan_context["conn"] if conn is None: return "Database connection is not available." def your_db_operation(): # Your database operations here pass loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, your_db_operation) # Process and return results return "Your result" except Exception as e: return f"Error: {str(e)}" ``` ## Security Considerations - The server uses Windows Authentication ("Trusted_Connection=yes") - Consider implementing input validation for SQL queries to prevent SQL injection - Restrict database user permissions based on the principle of least privilege ## Troubleshooting Common issues: 1. **Connection errors**: Verify the SQL Server instance name and ensure it's running 2. **ODBC driver errors**: Confirm ODBC Driver 17 for SQL Server is installed 3. **Permission errors**: Check that the Windows user running the application has appropriate SQL Server permissions ## License [Your License Information] ## Contact [Your Contact Information] ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "sql-mcp-server" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.13" dependencies = [ "mcp[cli]>=1.5.0", "pyodbc", ] ``` -------------------------------------------------------------------------------- /sql_mcp.py: -------------------------------------------------------------------------------- ```python from contextlib import asynccontextmanager from collections.abc import AsyncIterator import logging import sys import pyodbc import asyncio from mcp.server.fastmcp import Context, FastMCP # Set up logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger("server_module") # Connection parameters SERVER = "LEGION\\SQLEXPRESS" DATABASE = "test" @asynccontextmanager async def app_lifespan(server: FastMCP) -> AsyncIterator[dict]: """Manage application lifecycle with type-safe context""" logger.debug("Initializing database connection") conn = None try: # Connect using a loop.run_in_executor to avoid blocking def connect_db(): connection_string = ( f"DRIVER={{ODBC Driver 17 for SQL Server}};" f"SERVER={SERVER};" f"DATABASE={DATABASE};" f"Trusted_Connection=yes;" ) logger.debug(f"Connection string: {connection_string}") return pyodbc.connect(connection_string) loop = asyncio.get_event_loop() conn = await loop.run_in_executor(None, connect_db) logger.debug("Database connection established successfully") # Yield a dictionary instead of a dataclass to match example yield {"conn": conn} except Exception as e: logger.error(f"Database connection error: {type(e).__name__}: {str(e)}", exc_info=True) # Continue without database but with empty dict yield {"conn": None} finally: if conn: logger.debug("Closing database connection") await asyncio.get_event_loop().run_in_executor(None, conn.close) # Create an MCP server with the lifespan mcp = FastMCP("My MS SQL Integrated App", lifespan=app_lifespan) @mcp.tool() async def query_sql(ctx: Context, query: str = None) -> str: """ Tool to query the SQL database with a custom query. Args: query: The SQL query to execute. If not provided, will run a default query. Returns: The query results as a string. """ try: # Access the connection using dictionary access conn = ctx.request_context.lifespan_context["conn"] if conn is None: return "Database connection is not available. Check server logs for details." # Use default query if none provided if not query: query = "SELECT * FROM [dbo].[Table_1]" logger.debug(f"Executing query: {query}") # Execute query in a non-blocking way def run_query(): cursor = conn.cursor() try: cursor.execute(query) if cursor.description: # Check if the query returns results columns = [column[0] for column in cursor.description] results = [] for row in cursor.fetchall(): results.append(dict(zip(columns, row))) return {"success": True, "results": results, "rowCount": len(results)} else: # For non-SELECT queries (INSERT, UPDATE, etc.) return {"success": True, "rowCount": cursor.rowcount, "message": f"Query affected {cursor.rowcount} rows"} except Exception as e: return {"success": False, "error": str(e)} finally: cursor.close() loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, run_query) if result["success"]: if "results" in result: return f"Query results: {result['results']}" else: return result["message"] else: return f"Query error: {result['error']}" except Exception as e: logger.error(f"Query execution error: {type(e).__name__}: {str(e)}") return f"Error: {str(e)}" @mcp.tool() async def list_tables(ctx: Context) -> str: """List all tables in the database that can be queried.""" try: conn = ctx.request_context.lifespan_context["conn"] if conn is None: return "Database connection is not available." def get_tables(): cursor = conn.cursor() cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE'") tables = [row[0] for row in cursor.fetchall()] cursor.close() return tables loop = asyncio.get_event_loop() tables = await loop.run_in_executor(None, get_tables) return f"Available tables: {tables}" except Exception as e: return f"Error listing tables: {str(e)}" @mcp.tool() async def describe_table(ctx: Context, table_name: str) -> str: """ Get the structure of a specific table. Args: table_name: Name of the table to describe Returns: Column information for the specified table """ try: conn = ctx.request_context.lifespan_context["conn"] if conn is None: return "Database connection is not available." def get_structure(): cursor = conn.cursor() cursor.execute(f"SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'") columns = [] for row in cursor.fetchall(): col_name, data_type, max_length = row if max_length: columns.append(f"{col_name} ({data_type}({max_length}))") else: columns.append(f"{col_name} ({data_type})") cursor.close() return columns loop = asyncio.get_event_loop() structure = await loop.run_in_executor(None, get_structure) if structure: return f"Structure of table '{table_name}':\n" + "\n".join(structure) else: return f"Table '{table_name}' not found or has no columns." except Exception as e: return f"Error describing table: {str(e)}" @mcp.tool() async def execute_nonquery(ctx: Context, sql: str) -> str: """ Execute a non-query SQL statement (INSERT, UPDATE, DELETE, etc.). Args: sql: The SQL statement to execute Returns: Result of the operation """ try: conn = ctx.request_context.lifespan_context["conn"] if conn is None: return "Database connection is not available." def run_nonquery(): try: cursor = conn.cursor() cursor.execute(sql) row_count = cursor.rowcount # Commit changes conn.commit() cursor.close() return {"success": True, "rowCount": row_count} except Exception as e: # Rollback in case of error conn.rollback() return {"success": False, "error": str(e)} loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, run_nonquery) if result["success"]: return f"Operation successful. Rows affected: {result['rowCount']}" else: return f"Operation failed: {result['error']}" except Exception as e: return f"Error executing SQL: {str(e)}" @mcp.tool() async def list_odbc_drivers(ctx: Context) -> str: """List available ODBC drivers on the system""" try: def get_drivers(): return pyodbc.drivers() drivers = await asyncio.get_event_loop().run_in_executor(None, get_drivers) return f"Available ODBC drivers: {', '.join(drivers)}" except Exception as e: return f"Error listing drivers: {str(e)}" @mcp.tool() async def database_info(ctx: Context) -> str: """Get general information about the connected database""" try: conn = ctx.request_context.lifespan_context["conn"] if conn is None: return "Database connection is not available." def get_info(): cursor = conn.cursor() # Get SQL Server version cursor.execute("SELECT @@VERSION") version = cursor.fetchone()[0] # Get database name and size cursor.execute(""" SELECT DB_NAME() AS DatabaseName, CONVERT(VARCHAR(50), GETDATE(), 120) AS CurrentDateTime, (SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE') AS TableCount """) db_info = cursor.fetchone() cursor.close() return { "version": version, "database": db_info[0], "current_time": db_info[1], "table_count": db_info[2] } loop = asyncio.get_event_loop() info = await loop.run_in_executor(None, get_info) return ( f"Database Information:\n" f"Server: {SERVER}\n" f"Database: {info['database']}\n" f"Server Version: {info['version'].split('\\n')[0]}\n" f"Current Server Time: {info['current_time']}\n" f"Number of Tables: {info['table_count']}" ) except Exception as e: return f"Error getting database info: {str(e)}" # Run the server if __name__ == "__main__": try: logger.info("Starting MCP server") mcp.run() except Exception as e: logger.critical(f"Server startup failed: {e}", exc_info=True) sys.exit(1) ```