# Directory Structure
```
├── .gitignore
├── env.example
├── install.sh
├── README.md
├── requirements.txt
├── setup.py
├── src
│ ├── __init__.py
│ └── server.py
├── start.sh
├── status.sh
├── stop.sh
└── tests
├── __init__.py
└── test_server.py
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# IDE
.vscode/
.idea/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Project specific
config.json
# Runtime files
server.pid
server.log
nohup.out
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# MSSQL MCP Server
A Model Context Protocol (MCP) server that provides comprehensive access to Microsoft SQL Server databases. This enhanced server enables Language Models to inspect database schemas, execute queries, manage database objects, and perform advanced database operations through a standardized interface.
## 🚀 Enhanced Features
### **Complete Database Schema Traversal**
- **23 comprehensive database management tools** (expanded from 5 basic operations)
- **Full database object hierarchy exploration** - tables, views, stored procedures, indexes, schemas
- **Advanced database object management** - create, modify, delete operations
- **Intelligent resource access** - all tables and views available as MCP resources
- **Large content handling** - retrieves complete stored procedures (1400+ lines) without truncation
### **Core Capabilities**
- **Database Connection**: Connect to MSSQL Server instances with flexible authentication
- **Schema Inspection**: Complete database object exploration and management
- **Query Execution**: Execute SELECT, INSERT, UPDATE, DELETE, and DDL queries
- **Stored Procedure Management**: Create, modify, execute, and manage stored procedures
- **View Management**: Create, modify, delete, and describe views
- **Index Management**: Create, delete, and analyze indexes
- **Resource Access**: Browse table and view data as MCP resources
- **Security**: Read-only and write operations are properly separated and validated
## ⚠️ Important Usage Guidelines for Engineering Teams
### **Database Limitation**
**🔴 CRITICAL: Limit to ONE database per MCP server instance**
- This enhanced MCP server creates **23 tools per database**
- Cursor has a **40-tool limit** across all MCP servers
- Using multiple database instances will exceed Cursor's tool limit
- For multiple databases, use separate MCP server instances in different projects
### **Large Content Limitations**
**⚠️ IMPORTANT: File operations not supported within chat context**
- Large stored procedures (1400+ lines) can be retrieved and viewed in chat
- However, saving large content to files via MCP tools is not reliable due to token limits
- **For bulk data extraction**: Use standalone Python scripts with direct database connections
- **Recommended approach**: Copy-paste smaller procedures from chat, use external scripts for large ones
### **Tool Distribution**
- **Core Tools**: 5 (read_query, write_query, list_tables, describe_table, create_table)
- **Stored Procedures**: 6 tools (create, modify, delete, list, describe, execute, get_parameters)
- **Views**: 5 tools (create, modify, delete, list, describe)
- **Indexes**: 4 tools (create, delete, list, describe)
- **Schema Management**: 2 tools (list_schemas, list_all_objects)
- **Total**: 23 tools + enhanced write_query supporting all database object operations
## Installation
### Prerequisites
- Python 3.10 or higher
- ODBC Driver 17 for SQL Server
- Access to an MSSQL Server instance
### Quick Setup
1. **Clone or create the project directory:**
```bash
mkdir mcp-sqlserver && cd mcp-sqlserver
```
2. **Run the installation script:**
```bash
chmod +x install.sh
./install.sh
```
3. **Configure your database connection:**
```bash
cp env.example .env
# Edit .env with your database details
```
### Manual Installation
1. **Create virtual environment:**
```bash
python3 -m venv venv
source venv/bin/activate
```
2. **Install dependencies:**
```bash
pip install -r requirements.txt
```
3. **Install ODBC Driver (macOS):**
```bash
brew tap microsoft/mssql-release
brew install msodbcsql17 mssql-tools
```
## Configuration
Create a `.env` file with your database configuration:
```env
MSSQL_DRIVER={ODBC Driver 17 for SQL Server}
MSSQL_SERVER=your-server-address
MSSQL_DATABASE=your-database-name
MSSQL_USER=your-username
MSSQL_PASSWORD=your-password
MSSQL_PORT=1433
TrustServerCertificate=yes
```
### Configuration Options
- `MSSQL_SERVER`: Server hostname or IP address (required)
- `MSSQL_DATABASE`: Database name to connect to (required)
- `MSSQL_USER`: Username for authentication
- `MSSQL_PASSWORD`: Password for authentication
- `MSSQL_PORT`: Port number (default: 1433)
- `MSSQL_DRIVER`: ODBC driver name (default: {ODBC Driver 17 for SQL Server})
- `TrustServerCertificate`: Trust server certificate (default: yes)
- `Trusted_Connection`: Use Windows authentication (default: no)
## Usage
### Understanding MCP Servers
MCP (Model Context Protocol) servers are designed to work with AI assistants and language models. They communicate via stdin/stdout using JSON-RPC protocol, not as traditional web services.
### Running the Server
**For AI Assistant Integration:**
```bash
python3 src/server.py
```
The server will start and wait for MCP protocol messages on stdin. This is how AI assistants like Claude Desktop or other MCP clients will communicate with it.
**For Testing and Development:**
1. **Test database connection:**
```bash
python3 test_connection.py
```
2. **Check server status:**
```bash
./status.sh
```
3. **View available tables:**
```bash
# The server provides tools that can be called by MCP clients
# Direct testing requires an MCP client or testing framework
```
## Available Tools (23 Total)
The enhanced server provides comprehensive database management tools:
### **Core Database Operations (5 tools)**
1. **`read_query`** - Execute SELECT queries to read data
2. **`write_query`** - Execute INSERT, UPDATE, DELETE, and DDL queries
3. **`list_tables`** - List all tables in the database
4. **`describe_table`** - Get schema information for a specific table
5. **`create_table`** - Create new tables
### **Stored Procedure Management (6 tools)**
6. **`create_procedure`** - Create new stored procedures
7. **`modify_procedure`** - Modify existing stored procedures
8. **`delete_procedure`** - Delete stored procedures
9. **`list_procedures`** - List all stored procedures with metadata
10. **`describe_procedure`** - Get complete procedure definitions
11. **`execute_procedure`** - Execute procedures with parameters
12. **`get_procedure_parameters`** - Get detailed parameter information
### **View Management (5 tools)**
13. **`create_view`** - Create new views
14. **`modify_view`** - Modify existing views
15. **`delete_view`** - Delete views
16. **`list_views`** - List all views in the database
17. **`describe_view`** - Get view definitions and schema
### **Index Management (4 tools)**
18. **`create_index`** - Create new indexes
19. **`delete_index`** - Delete indexes
20. **`list_indexes`** - List all indexes (optionally by table)
21. **`describe_index`** - Get detailed index information
### **Schema Exploration (2 tools)**
22. **`list_schemas`** - List all schemas in the database
23. **`list_all_objects`** - List all database objects organized by schema
### **Available Resources**
Both tables and views are exposed as MCP resources with URIs like:
- `mssql://table_name/data` - Access table data in CSV format
- `mssql://view_name/data` - Access view data in CSV format
Resources provide the first 100 rows of data in CSV format for quick data exploration.
## Database Schema Traversal Examples
### **1. Explore Database Structure**
```
# Start with schemas
list_schemas
# Get all objects in a specific schema
list_all_objects(schema_name: "dbo")
# Or get all objects across all schemas
list_all_objects()
```
### **2. Table Exploration**
```
# List all tables
list_tables
# Get detailed table information
describe_table(table_name: "YourTableName")
# Access table data as MCP resource
# URI: mssql://YourTableName/data
```
### **3. View Management**
```
# List all views
list_views
# Get view definition
describe_view(view_name: "YourViewName")
# Create a new view
create_view(view_script: "CREATE VIEW MyView AS SELECT * FROM MyTable WHERE Active = 1")
# Access view data as MCP resource
# URI: mssql://YourViewName/data
```
### **4. Stored Procedure Operations**
```
# List all procedures
list_procedures
# Get complete procedure definition (handles large procedures like wmPostPurchase)
describe_procedure(procedure_name: "YourProcedureName")
# Save large procedures to file for analysis
write_file(file_path: "procedure_name.sql", content: "procedure_definition")
# Get parameter details
get_procedure_parameters(procedure_name: "YourProcedureName")
# Execute procedure
execute_procedure(procedure_name: "YourProcedureName", parameters: ["param1", "param2"])
```
### **5. Index Management**
```
# List all indexes
list_indexes()
# List indexes for specific table
list_indexes(table_name: "YourTableName")
# Get index details
describe_index(index_name: "IX_YourIndex", table_name: "YourTableName")
# Create new index
create_index(index_script: "CREATE INDEX IX_NewIndex ON MyTable (Column1, Column2)")
```
## Stored Procedure Management Examples
### **Create a Simple Stored Procedure**
```sql
CREATE PROCEDURE GetEmployeeCount
AS
BEGIN
SELECT COUNT(*) AS TotalEmployees FROM Employees
END
```
### **Create a Stored Procedure with Parameters**
```sql
CREATE PROCEDURE GetEmployeesByDepartment
@DepartmentId INT,
@MinSalary DECIMAL(10,2) = 0
AS
BEGIN
SELECT
EmployeeId,
FirstName,
LastName,
Salary,
DepartmentId
FROM Employees
WHERE DepartmentId = @DepartmentId
AND Salary >= @MinSalary
ORDER BY LastName, FirstName
END
```
### **Create a Stored Procedure with Output Parameters**
```sql
CREATE PROCEDURE GetDepartmentStats
@DepartmentId INT,
@EmployeeCount INT OUTPUT,
@AverageSalary DECIMAL(10,2) OUTPUT
AS
BEGIN
SELECT
@EmployeeCount = COUNT(*),
@AverageSalary = AVG(Salary)
FROM Employees
WHERE DepartmentId = @DepartmentId
END
```
### **Modify an Existing Stored Procedure**
```sql
ALTER PROCEDURE GetEmployeesByDepartment
@DepartmentId INT,
@MinSalary DECIMAL(10,2) = 0,
@MaxSalary DECIMAL(10,2) = 999999.99
AS
BEGIN
SELECT
EmployeeId,
FirstName,
LastName,
Salary,
DepartmentId,
HireDate
FROM Employees
WHERE DepartmentId = @DepartmentId
AND Salary BETWEEN @MinSalary AND @MaxSalary
ORDER BY Salary DESC, LastName, FirstName
END
```
## Large Content Handling
### **How It Works**
The server efficiently handles large database objects like stored procedures:
1. **Direct Retrieval**: Fetches complete content directly from SQL Server
2. **No Truncation**: Returns full procedure definitions regardless of size
3. **Chat Display**: Large procedures can be viewed in full within the chat interface
4. **Memory Efficient**: Processes content through database connection streams
### **Usage Examples**
```
# Describe a large procedure (gets complete definition)
describe_procedure(procedure_name: "wmPostPurchase")
# Works with procedures of any size (tested with 1400+ line procedures)
# Content is displayed in chat for viewing and copy-paste operations
```
### **Limitations for File Operations**
**⚠️ Important**: While large procedures can be retrieved and displayed in chat, saving them to files via MCP tools is not reliable due to inference token limits. For bulk data extraction:
1. **Small procedures**: Copy-paste from chat interface
2. **Large procedures**: Use standalone Python scripts with direct database connections
3. **Bulk operations**: Create dedicated extraction scripts outside the MCP context
## Integration with AI Assistants
### Claude Desktop
Add this server to your Claude Desktop configuration:
```json
{
"mcpServers": {
"mssql": {
"command": "python3",
"args": ["/path/to/mcp-sqlserver/src/server.py"],
"cwd": "/path/to/mcp-sqlserver",
"env": {
"MSSQL_SERVER": "your-server",
"MSSQL_DATABASE": "your-database",
"MSSQL_USER": "your-username",
"MSSQL_PASSWORD": "your-password"
}
}
}
}
```
### Other MCP Clients
The server follows the standard MCP protocol and should work with any compliant MCP client.
## Development
### Project Structure
```
mcp-sqlserver/
├── src/
│ └── server.py # Main MCP server implementation with chunking system
├── tests/
│ └── test_server.py # Unit tests
├── requirements.txt # Python dependencies
├── .env # Database configuration (create from env.example)
├── env.example # Configuration template
├── install.sh # Installation script
├── start.sh # Server startup script (for development)
├── stop.sh # Server shutdown script
├── status.sh # Server status script
└── README.md # This file
```
### Testing
Run the test suite:
```bash
python -m pytest tests/
```
Test database connection:
```bash
python3 test_connection.py
```
### Logging
The server uses Python's logging module. Set the log level by modifying the `logging.basicConfig()` call in `src/server.py`.
## Security Considerations
- **Authentication**: Always use strong passwords and secure authentication
- **Network**: Ensure your database server is properly secured
- **Permissions**: Grant only necessary database permissions to the user account
- **SSL/TLS**: Use encrypted connections when possible
- **Query Validation**: The server validates query types and prevents unauthorized operations
- **DDL Operations**: Create/modify/delete operations for database objects are properly validated
- **Stored Procedure Execution**: Parameters are safely handled to prevent injection attacks
- **Large Content Handling**: Large procedures are retrieved efficiently without truncation
- **File Operations**: Write operations are validated and sandboxed
- **Read-First Approach**: Exploration tools are read-only by default for production safety
## Troubleshooting
### Common Issues
1. **Connection Failed**: Check your database server address, credentials, and network connectivity
2. **ODBC Driver Not Found**: Install Microsoft ODBC Driver 17 for SQL Server
3. **Permission Denied**: Ensure the database user has appropriate permissions
4. **Port Issues**: Verify the correct port number and firewall settings
5. **Large Content Issues**: Large procedures display in chat but cannot be saved to files via MCP tools
6. **Memory Issues**: Large content is streamed efficiently from the database
### Debug Mode
Enable debug logging by setting the log level to DEBUG in `src/server.py`:
```python
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
```
### Large Content Troubleshooting
If you encounter issues with large content:
1. **Copy-paste approach**: Use chat interface to view and copy large procedures
2. **External scripts**: Create standalone Python scripts for bulk data extraction
3. **Check memory**: Large procedures are handled efficiently by the database connection
4. **Verify permissions**: Ensure database user can access procedure definitions
5. **Test with smaller procedures**: Verify basic functionality first
### Getting Help
1. Check the server logs for detailed error messages
2. Verify your `.env` configuration
3. Test the database connection independently
4. Ensure all dependencies are installed correctly
5. For large content issues, use copy-paste from chat or create external extraction scripts
## Recent Enhancements
### **Large Content Handling (Latest)**
- Verified complete retrieval of large stored procedures without truncation
- Successfully tested with procedures like `wmPostPurchase` (1400+ lines, 57KB)
- Large procedures display fully in chat interface for viewing and copy-paste
- Efficient memory handling through database connection streaming
- **Note**: File operations via MCP tools not reliable for large content due to token limits
### **Complete Database Object Management**
- Expanded from 5 to 23 comprehensive database management tools
- Added full CRUD operations for all major database objects
- Implemented schema traversal capabilities matching SSMS functionality
- Added MCP resource access for tables and views
- Enhanced security with proper operation validation
## License
This project is open source. See the license file for details.
## Contributing
Contributions are welcome! Please feel free to submit pull requests or open issues for bugs and feature requests.
```
--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------
```python
# MSSQL MCP Server Package
```
--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------
```python
# Test package for MSSQL MCP Server
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
pyodbc>=4.0.39
pydantic>=2.0.0
python-dotenv>=1.0.1
mcp>=1.2.0
anyio>=4.5.0
asyncio-mqtt>=0.16.2
pytest>=7.0.0
pytest-asyncio>=0.21.0
```
--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Setup script for MSSQL MCP Server
"""
from setuptools import setup, find_packages
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
with open("requirements.txt", "r", encoding="utf-8") as fh:
requirements = [line.strip() for line in fh if line.strip() and not line.startswith("#")]
setup(
name="mssql-mcp-server",
version="1.0.0",
author="MSSQL MCP Server",
author_email="",
description="A Model Context Protocol server for Microsoft SQL Server databases",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/your-username/mssql-mcp-server",
packages=find_packages(),
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
],
python_requires=">=3.8",
install_requires=requirements,
entry_points={
"console_scripts": [
"mssql-mcp-server=src.server:main",
],
},
include_package_data=True,
zip_safe=False,
)
```
--------------------------------------------------------------------------------
/install.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
# MSSQL MCP Server Installation Script
echo "🚀 Installing MSSQL MCP Server..."
# Check if Python is installed
if ! command -v python3 &> /dev/null; then
echo "❌ Python 3 is not installed. Please install Python 3.8 or higher."
exit 1
fi
# Check Python version
python_version=$(python3 -c "import sys; print('.'.join(map(str, sys.version_info[:2])))")
required_version="3.8"
if [ "$(printf '%s\n' "$required_version" "$python_version" | sort -V | head -n1)" != "$required_version" ]; then
echo "❌ Python $python_version is installed, but Python $required_version or higher is required."
exit 1
fi
echo "✅ Python $python_version detected"
# Create virtual environment if it doesn't exist
if [ ! -d "venv" ]; then
echo "📦 Creating virtual environment..."
python3 -m venv venv
fi
# Activate virtual environment
echo "🔧 Activating virtual environment..."
source venv/bin/activate
# Upgrade pip
echo "⬆️ Upgrading pip..."
pip install --upgrade pip
# Install requirements
echo "📥 Installing dependencies..."
pip install -r requirements.txt
# Make server executable
chmod +x src/server.py
echo "✅ Installation complete!"
echo ""
echo "📋 Next steps:"
echo "1. Copy env.example to .env and configure your database settings"
echo "2. Run the server with: python src/server.py"
echo "3. Or activate the virtual environment and run: source venv/bin/activate && python src/server.py"
echo ""
echo "📖 For Claude Desktop integration, see README.md"
```
--------------------------------------------------------------------------------
/stop.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
# MSSQL MCP Server Stop Script
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Function to print colored output
print_error() {
echo -e "${RED}❌ $1${NC}"
}
print_success() {
echo -e "${GREEN}✅ $1${NC}"
}
print_warning() {
echo -e "${YELLOW}⚠️ $1${NC}"
}
print_info() {
echo -e "${BLUE}ℹ️ $1${NC}"
}
echo "🛑 Stopping MSSQL MCP Server..."
# Check if PID file exists
if [ ! -f "server.pid" ]; then
print_warning "No PID file found. Server may not be running."
# Check if any python processes are running the server
server_pids=$(pgrep -f "src/server.py" 2>/dev/null || true)
if [ -n "$server_pids" ]; then
print_warning "Found running server processes. Attempting to stop them..."
echo "$server_pids" | while read pid; do
if [ -n "$pid" ]; then
print_info "Stopping process $pid..."
kill $pid 2>/dev/null || true
# Wait for process to stop
for i in {1..10}; do
if ! ps -p $pid > /dev/null 2>&1; then
print_success "Process $pid stopped"
break
fi
sleep 1
done
# Force kill if still running
if ps -p $pid > /dev/null 2>&1; then
print_warning "Force killing process $pid..."
kill -9 $pid 2>/dev/null || true
fi
fi
done
print_success "All server processes stopped"
else
print_info "No running server processes found"
fi
exit 0
fi
# Read PID from file
pid=$(cat server.pid)
# Check if process is actually running
if ! ps -p $pid > /dev/null 2>&1; then
print_warning "Process $pid is not running. Cleaning up PID file..."
rm server.pid
print_success "Cleanup complete"
exit 0
fi
print_info "Stopping server process $pid..."
# Try graceful shutdown first
kill $pid 2>/dev/null
# Wait for process to stop gracefully
stopped=false
for i in {1..10}; do
if ! ps -p $pid > /dev/null 2>&1; then
stopped=true
break
fi
print_info "Waiting for graceful shutdown... ($i/10)"
sleep 1
done
if [ "$stopped" = false ]; then
print_warning "Graceful shutdown failed. Force killing process..."
kill -9 $pid 2>/dev/null || true
# Wait a bit more
sleep 2
if ps -p $pid > /dev/null 2>&1; then
print_error "Failed to stop process $pid"
exit 1
fi
fi
# Clean up PID file
rm server.pid
print_success "MSSQL MCP Server stopped successfully"
# Show final status
print_info "Server status: Stopped"
if [ -f "server.log" ]; then
print_info "Logs are available in server.log"
# Show last few lines of log
echo ""
print_info "Last few log entries:"
tail -5 server.log 2>/dev/null || print_warning "Could not read log file"
fi
```
--------------------------------------------------------------------------------
/status.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
# MSSQL MCP Server Status Script
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Function to print colored output
print_error() {
echo -e "${RED}❌ $1${NC}"
}
print_success() {
echo -e "${GREEN}✅ $1${NC}"
}
print_warning() {
echo -e "${YELLOW}⚠️ $1${NC}"
}
print_info() {
echo -e "${BLUE}ℹ️ $1${NC}"
}
echo "📊 MSSQL MCP Server Status"
echo "=========================="
# Check if PID file exists
if [ ! -f "server.pid" ]; then
print_warning "No PID file found"
# Check if any python processes are running the server
server_pids=$(pgrep -f "src/server.py" 2>/dev/null || true)
if [ -n "$server_pids" ]; then
print_warning "Server processes found running without PID file:"
echo "$server_pids" | while read pid; do
if [ -n "$pid" ]; then
echo " PID: $pid"
fi
done
print_info "Consider running './stop.sh' to clean up"
else
print_error "Server is not running"
fi
exit 1
fi
# Read PID from file
pid=$(cat server.pid)
# Check if process is actually running
if ! ps -p $pid > /dev/null 2>&1; then
print_error "Server is not running (stale PID file)"
print_info "PID file contains: $pid"
print_info "Run './stop.sh' to clean up or './start.sh' to restart"
exit 1
fi
# Server is running
print_success "Server is running"
echo " PID: $pid"
# Get process information
if command -v ps &> /dev/null; then
process_info=$(ps -p $pid -o pid,ppid,etime,pcpu,pmem,cmd --no-headers 2>/dev/null || true)
if [ -n "$process_info" ]; then
echo " Process Info:"
echo " $process_info"
fi
fi
# Check log file
if [ -f "server.log" ]; then
log_size=$(wc -c < server.log 2>/dev/null || echo "unknown")
log_lines=$(wc -l < server.log 2>/dev/null || echo "unknown")
print_info "Log file: server.log ($log_lines lines, $log_size bytes)"
# Show last few lines of log
echo ""
print_info "Recent log entries:"
tail -10 server.log 2>/dev/null | sed 's/^/ /' || print_warning "Could not read log file"
else
print_warning "No log file found"
fi
# Check environment configuration
echo ""
print_info "Environment Configuration:"
if [ -f ".env" ]; then
print_success ".env file exists"
# Check key variables (without showing sensitive data)
if grep -q "MSSQL_SERVER=" .env 2>/dev/null; then
server_value=$(grep "MSSQL_SERVER=" .env | cut -d'=' -f2 | sed 's/^[[:space:]]*//' | sed 's/[[:space:]]*$//')
if [ -n "$server_value" ]; then
print_success "MSSQL_SERVER is configured"
else
print_warning "MSSQL_SERVER is empty"
fi
else
print_warning "MSSQL_SERVER not found in .env"
fi
if grep -q "MSSQL_DATABASE=" .env 2>/dev/null; then
db_value=$(grep "MSSQL_DATABASE=" .env | cut -d'=' -f2 | sed 's/^[[:space:]]*//' | sed 's/[[:space:]]*$//')
if [ -n "$db_value" ]; then
print_success "MSSQL_DATABASE is configured"
else
print_warning "MSSQL_DATABASE is empty"
fi
else
print_warning "MSSQL_DATABASE not found in .env"
fi
if grep -q "MSSQL_USER=" .env 2>/dev/null; then
print_success "MSSQL_USER is configured"
else
print_info "MSSQL_USER not configured (may be using Windows auth)"
fi
else
print_error ".env file not found"
fi
# Check virtual environment
echo ""
print_info "Virtual Environment:"
if [ -d "venv" ]; then
print_success "Virtual environment exists"
if [ -n "$VIRTUAL_ENV" ]; then
print_success "Virtual environment is activated"
else
print_info "Virtual environment is not activated in current shell"
fi
else
print_warning "Virtual environment not found"
fi
echo ""
print_info "Control Commands:"
echo " Start: ./start.sh"
echo " Stop: ./stop.sh"
echo " Status: ./status.sh"
echo " Logs: tail -f server.log"
```
--------------------------------------------------------------------------------
/start.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
# MSSQL MCP Server Start Script
set -e # Exit on any error
echo "🚀 Starting MSSQL MCP Server..."
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Function to print colored output
print_error() {
echo -e "${RED}❌ $1${NC}"
}
print_success() {
echo -e "${GREEN}✅ $1${NC}"
}
print_warning() {
echo -e "${YELLOW}⚠️ $1${NC}"
}
print_info() {
echo -e "${BLUE}ℹ️ $1${NC}"
}
# Check if Python is installed
if ! command -v python3 &> /dev/null; then
print_error "Python 3 is not installed. Please install Python 3.8 or higher."
exit 1
fi
# Check Python version
python_version=$(python3 -c "import sys; print('.'.join(map(str, sys.version_info[:2])))")
required_version="3.8"
if [ "$(printf '%s\n' "$required_version" "$python_version" | sort -V | head -n1)" != "$required_version" ]; then
print_error "Python $python_version is installed, but Python $required_version or higher is required."
exit 1
fi
print_success "Python $python_version detected"
# Check if virtual environment exists
if [ ! -d "venv" ]; then
print_warning "Virtual environment not found. Creating one..."
python3 -m venv venv
print_success "Virtual environment created"
fi
# Activate virtual environment
print_info "Activating virtual environment..."
source venv/bin/activate
# Check if dependencies are installed
print_info "Checking dependencies..."
if ! python -c "import pyodbc, pydantic, mcp" &> /dev/null; then
print_warning "Dependencies not found. Installing..."
pip install -r requirements.txt
print_success "Dependencies installed"
fi
# Environment validation
print_info "Validating environment configuration..."
# Check if .env file exists
if [ ! -f ".env" ]; then
print_error ".env file not found!"
print_info "Please create a .env file based on env.example:"
echo ""
echo " cp env.example .env"
echo ""
print_info "Then edit .env with your database configuration."
exit 1
fi
# Load environment variables from .env file using Python to handle special characters
eval $(python3 -c "
import os
import shlex
from dotenv import load_dotenv
load_dotenv()
for key, value in os.environ.items():
if key.startswith('MSSQL_') or key in ['TrustServerCertificate', 'Trusted_Connection']:
# Use shlex.quote to properly escape the value
escaped_value = shlex.quote(value)
print(f'export {key}={escaped_value}')
")
# Validate required environment variables
validation_failed=false
validate_env_var() {
local var_name=$1
local var_value=${!var_name}
if [ -z "$var_value" ]; then
print_error "$var_name is not set in .env file"
validation_failed=true
return 1
else
print_success "$var_name is configured"
return 0
fi
}
# Check required variables
validate_env_var "MSSQL_SERVER"
validate_env_var "MSSQL_DATABASE"
# Check if authentication is properly configured
if [ -z "$MSSQL_USER" ] && [ "$Trusted_Connection" != "yes" ]; then
print_error "Either MSSQL_USER must be set or Trusted_Connection must be 'yes'"
validation_failed=true
fi
if [ -n "$MSSQL_USER" ] && [ -z "$MSSQL_PASSWORD" ]; then
print_error "MSSQL_PASSWORD must be set when MSSQL_USER is provided"
validation_failed=true
fi
if [ "$validation_failed" = true ]; then
print_error "Environment validation failed. Please check your .env file."
echo ""
print_info "Required variables:"
echo " - MSSQL_SERVER: Your SQL Server hostname/IP"
echo " - MSSQL_DATABASE: Database name to connect to"
echo " - MSSQL_USER: Username (if not using Windows authentication)"
echo " - MSSQL_PASSWORD: Password (if using SQL Server authentication)"
echo ""
print_info "Optional variables:"
echo " - MSSQL_PORT: Port number (default: 1433)"
echo " - MSSQL_DRIVER: ODBC driver (default: {ODBC Driver 17 for SQL Server})"
echo " - TrustServerCertificate: yes/no (default: yes)"
echo " - Trusted_Connection: yes/no (default: no)"
exit 1
fi
# Test database connection
print_info "Testing database connection..."
if python3 -c "
import os
import pyodbc
from dotenv import load_dotenv
load_dotenv()
driver = os.getenv('MSSQL_DRIVER', '{ODBC Driver 17 for SQL Server}')
server = os.getenv('MSSQL_SERVER')
database = os.getenv('MSSQL_DATABASE')
username = os.getenv('MSSQL_USER', '')
password = os.getenv('MSSQL_PASSWORD', '')
port = os.getenv('MSSQL_PORT', '1433')
trust_cert = os.getenv('TrustServerCertificate', 'yes')
trusted_conn = os.getenv('Trusted_Connection', 'no')
conn_str = f'DRIVER={driver};SERVER={server},{port};DATABASE={database};'
if username and password:
conn_str += f'UID={username};PWD={password};'
conn_str += f'TrustServerCertificate={trust_cert};Trusted_Connection={trusted_conn};'
try:
conn = pyodbc.connect(conn_str, timeout=10)
cursor = conn.cursor()
cursor.execute('SELECT 1')
cursor.fetchone()
conn.close()
print('Connection successful')
except Exception as e:
print(f'Connection failed: {e}')
exit(1)
" 2>/dev/null; then
print_success "Database connection test passed"
else
print_error "Database connection test failed"
print_info "Please check your database configuration and ensure:"
echo " - SQL Server is running and accessible"
echo " - Database exists and you have access permissions"
echo " - Network connectivity is available"
echo " - ODBC Driver 17 for SQL Server is installed"
exit 1
fi
# Check if server is already running
if [ -f "server.pid" ]; then
pid=$(cat server.pid)
if ps -p $pid > /dev/null 2>&1; then
print_warning "Server is already running (PID: $pid)"
print_info "Use './stop.sh' to stop the server first"
exit 1
else
print_warning "Stale PID file found, removing..."
rm server.pid
fi
fi
# Start the server
print_success "All validations passed! Starting MCP server..."
echo ""
print_info "Server will run in the background. Use './stop.sh' to stop it."
print_info "Logs will be written to server.log"
echo ""
# Start server in background and save PID
nohup python3 src/server.py > server.log 2>&1 &
server_pid=$!
echo $server_pid > server.pid
# Wait a moment and check if server started successfully
sleep 2
if ps -p $server_pid > /dev/null 2>&1; then
print_success "MSSQL MCP Server started successfully (PID: $server_pid)"
print_info "Server is running in the background"
print_info "View logs with: tail -f server.log"
else
print_error "Failed to start server. Check server.log for details."
rm -f server.pid
exit 1
fi
```
--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Tests for MSSQL MCP Server
"""
import os
import pytest
import asyncio
from unittest.mock import Mock, patch, MagicMock
from src.server import MSSQLServer
class TestMSSQLServer:
"""Test cases for MSSQLServer class."""
def test_build_connection_string_with_credentials(self):
"""Test connection string building with username/password."""
with patch.dict(os.environ, {
'MSSQL_SERVER': 'testserver',
'MSSQL_DATABASE': 'testdb',
'MSSQL_USER': 'testuser',
'MSSQL_PASSWORD': 'testpass',
'MSSQL_PORT': '1433',
'MSSQL_DRIVER': '{ODBC Driver 17 for SQL Server}'
}):
server = MSSQLServer()
conn_str = server.connection_string
assert 'SERVER=testserver,1433' in conn_str
assert 'DATABASE=testdb' in conn_str
assert 'UID=testuser' in conn_str
assert 'PWD=testpass' in conn_str
assert 'DRIVER={ODBC Driver 17 for SQL Server}' in conn_str
def test_build_connection_string_missing_required(self):
"""Test connection string building with missing required variables."""
with patch.dict(os.environ, {}, clear=True):
with pytest.raises(ValueError, match="MSSQL_SERVER and MSSQL_DATABASE must be set"):
MSSQLServer()
@patch('src.server.pyodbc.connect')
def test_get_connection_success(self, mock_connect):
"""Test successful database connection."""
mock_connection = Mock()
mock_connect.return_value = mock_connection
with patch.dict(os.environ, {
'MSSQL_SERVER': 'testserver',
'MSSQL_DATABASE': 'testdb'
}):
server = MSSQLServer()
connection = server._get_connection()
assert connection == mock_connection
mock_connect.assert_called_once()
@patch('src.server.pyodbc.connect')
def test_get_connection_failure(self, mock_connect):
"""Test database connection failure."""
mock_connect.side_effect = Exception("Connection failed")
with patch.dict(os.environ, {
'MSSQL_SERVER': 'testserver',
'MSSQL_DATABASE': 'testdb'
}):
server = MSSQLServer()
with pytest.raises(Exception, match="Connection failed"):
server._get_connection()
@pytest.mark.asyncio
async def test_execute_read_query_valid(self):
"""Test executing a valid SELECT query."""
with patch.dict(os.environ, {
'MSSQL_SERVER': 'testserver',
'MSSQL_DATABASE': 'testdb'
}):
server = MSSQLServer()
# Mock database connection and cursor
mock_connection = Mock()
mock_cursor = Mock()
mock_connection.cursor.return_value = mock_cursor
mock_cursor.fetchall.return_value = [('John', 25), ('Jane', 30)]
mock_cursor.description = [('name',), ('age',)]
with patch.object(server, '_get_connection', return_value=mock_connection):
result = await server._execute_read_query("SELECT name, age FROM users")
assert len(result) == 1
assert "name,age" in result[0].text
assert "John,25" in result[0].text
assert "Jane,30" in result[0].text
@pytest.mark.asyncio
async def test_execute_read_query_invalid(self):
"""Test executing an invalid query (not SELECT)."""
with patch.dict(os.environ, {
'MSSQL_SERVER': 'testserver',
'MSSQL_DATABASE': 'testdb'
}):
server = MSSQLServer()
with pytest.raises(ValueError, match="Only SELECT queries are allowed"):
await server._execute_read_query("DELETE FROM users")
@pytest.mark.asyncio
async def test_execute_write_query_valid(self):
"""Test executing a valid INSERT query."""
with patch.dict(os.environ, {
'MSSQL_SERVER': 'testserver',
'MSSQL_DATABASE': 'testdb'
}):
server = MSSQLServer()
# Mock database connection and cursor
mock_connection = Mock()
mock_cursor = Mock()
mock_connection.cursor.return_value = mock_cursor
mock_cursor.rowcount = 1
with patch.object(server, '_get_connection', return_value=mock_connection):
result = await server._execute_write_query("INSERT INTO users (name) VALUES ('Test')")
assert len(result) == 1
assert "1 rows affected" in result[0].text
mock_connection.commit.assert_called_once()
@pytest.mark.asyncio
async def test_list_tables(self):
"""Test listing database tables."""
with patch.dict(os.environ, {
'MSSQL_SERVER': 'testserver',
'MSSQL_DATABASE': 'testdb'
}):
server = MSSQLServer()
# Mock database connection and cursor
mock_connection = Mock()
mock_cursor = Mock()
mock_connection.cursor.return_value = mock_cursor
mock_cursor.fetchall.return_value = [('users', 'BASE TABLE'), ('orders', 'BASE TABLE')]
with patch.object(server, '_get_connection', return_value=mock_connection):
result = await server._list_tables()
assert len(result) == 1
assert "users (BASE TABLE)" in result[0].text
assert "orders (BASE TABLE)" in result[0].text
@pytest.mark.asyncio
async def test_describe_table(self):
"""Test describing a table schema."""
with patch.dict(os.environ, {
'MSSQL_SERVER': 'testserver',
'MSSQL_DATABASE': 'testdb'
}):
server = MSSQLServer()
# Mock database connection and cursor
mock_connection = Mock()
mock_cursor = Mock()
mock_connection.cursor.return_value = mock_cursor
mock_cursor.fetchall.return_value = [
('id', 'int', 'NO', None, None),
('name', 'varchar', 'YES', None, 255)
]
with patch.object(server, '_get_connection', return_value=mock_connection):
result = await server._describe_table('users')
assert len(result) == 1
assert "Schema for table 'users'" in result[0].text
assert "id | int | NO" in result[0].text
assert "name | varchar | YES" in result[0].text
if __name__ == "__main__":
pytest.main([__file__])
```
--------------------------------------------------------------------------------
/src/server.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
MSSQL MCP Server
A Model Context Protocol server that provides access to Microsoft SQL Server databases.
Enables Language Models to inspect table schemas and execute SQL queries.
"""
import asyncio
import logging
import os
import sys
import traceback
from typing import Any, Dict, List, Optional, Sequence
from urllib.parse import urlparse
import pyodbc
from dotenv import load_dotenv
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
from pydantic import AnyUrl
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("mssql-mcp-server")
class MSSQLServer:
def __init__(self):
self.connection_string = self._build_connection_string()
self.server = Server("mssql-mcp-server")
self._setup_handlers()
def _build_connection_string(self) -> str:
"""Build MSSQL connection string from environment variables."""
driver = os.getenv("MSSQL_DRIVER", "{ODBC Driver 17 for SQL Server}")
server = os.getenv("MSSQL_SERVER", "localhost")
database = os.getenv("MSSQL_DATABASE", "")
username = os.getenv("MSSQL_USER", "")
password = os.getenv("MSSQL_PASSWORD", "")
port = os.getenv("MSSQL_PORT", "1433")
trust_cert = os.getenv("TrustServerCertificate", "yes")
trusted_conn = os.getenv("Trusted_Connection", "no")
if not all([server, database]):
raise ValueError("MSSQL_SERVER and MSSQL_DATABASE must be set")
conn_str = f"DRIVER={driver};SERVER={server},{port};DATABASE={database};"
if username and password:
conn_str += f"UID={username};PWD={password};"
conn_str += f"TrustServerCertificate={trust_cert};Trusted_Connection={trusted_conn};"
return conn_str
def _get_connection(self):
"""Get database connection."""
try:
return pyodbc.connect(self.connection_string)
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
raise
def _setup_handlers(self):
"""Set up MCP handlers."""
@self.server.list_resources()
async def list_resources() -> List[Resource]:
"""List all available database tables as resources."""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT TABLE_NAME, TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE IN ('BASE TABLE', 'VIEW')
ORDER BY TABLE_TYPE, TABLE_NAME
""")
tables = cursor.fetchall()
resources = []
for table in tables:
table_name = table[0]
table_type = table[1]
if table_type == 'BASE TABLE':
resources.append(Resource(
uri=AnyUrl(f"mssql://{table_name}/data"),
name=f"Table: {table_name}",
description=f"Data from {table_name} table",
mimeType="text/csv"
))
else: # VIEW
resources.append(Resource(
uri=AnyUrl(f"mssql://{table_name}/data"),
name=f"View: {table_name}",
description=f"Data from {table_name} view",
mimeType="text/csv"
))
return resources
except Exception as e:
logger.error(f"Error listing resources: {e}")
return []
@self.server.read_resource()
async def read_resource(uri: AnyUrl) -> str:
"""Read data from a specific table."""
try:
# Parse the URI to get table name
parsed = urlparse(str(uri))
if parsed.scheme != "mssql":
raise ValueError("Invalid URI scheme")
table_name = parsed.netloc
if not table_name:
raise ValueError("Table name not specified in URI")
with self._get_connection() as conn:
cursor = conn.cursor()
# Get first 100 rows
cursor.execute(f"SELECT TOP 100 * FROM [{table_name}]")
rows = cursor.fetchall()
if not rows:
return "No data found"
# Get column names
columns = [desc[0] for desc in cursor.description]
# Format as CSV
csv_data = ",".join(columns) + "\n"
for row in rows:
csv_data += ",".join(str(cell) if cell is not None else "" for cell in row) + "\n"
return csv_data
except Exception as e:
logger.error(f"Error reading resource {uri}: {e}")
return f"Error: {str(e)}"
@self.server.list_tools()
async def list_tools() -> List[Tool]:
"""List available tools."""
return [
Tool(
name="read_query",
description="Execute a SELECT query to read data from the database",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SELECT SQL query to execute"
}
},
"required": ["query"]
}
),
Tool(
name="write_query",
description="Execute an INSERT, UPDATE, or DELETE query",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL query to execute (INSERT, UPDATE, DELETE)"
}
},
"required": ["query"]
}
),
Tool(
name="list_tables",
description="List all tables in the database",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="describe_table",
description="Get schema information for a specific table",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table to describe"
}
},
"required": ["table_name"]
}
),
Tool(
name="create_table",
description="Create a new table in the database",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "CREATE TABLE SQL statement"
}
},
"required": ["query"]
}
),
Tool(
name="create_procedure",
description="Create a new stored procedure",
inputSchema={
"type": "object",
"properties": {
"procedure_script": {
"type": "string",
"description": "Complete T-SQL script to create the stored procedure (including CREATE PROCEDURE statement)"
}
},
"required": ["procedure_script"]
}
),
Tool(
name="modify_procedure",
description="Modify an existing stored procedure",
inputSchema={
"type": "object",
"properties": {
"procedure_script": {
"type": "string",
"description": "Complete T-SQL script to alter the stored procedure (using ALTER PROCEDURE statement)"
}
},
"required": ["procedure_script"]
}
),
Tool(
name="delete_procedure",
description="Delete a stored procedure",
inputSchema={
"type": "object",
"properties": {
"procedure_name": {
"type": "string",
"description": "Name of the stored procedure to delete"
}
},
"required": ["procedure_name"]
}
),
Tool(
name="list_procedures",
description="List all stored procedures in the database",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="describe_procedure",
description="Get detailed information about a stored procedure including its definition",
inputSchema={
"type": "object",
"properties": {
"procedure_name": {
"type": "string",
"description": "Name of the stored procedure to describe"
}
},
"required": ["procedure_name"]
}
),
Tool(
name="execute_procedure",
description="Execute a stored procedure with optional parameters",
inputSchema={
"type": "object",
"properties": {
"procedure_name": {
"type": "string",
"description": "Name of the stored procedure to execute"
},
"parameters": {
"type": "array",
"description": "Optional array of parameter values",
"items": {
"type": "string"
}
}
},
"required": ["procedure_name"]
}
),
Tool(
name="get_procedure_parameters",
description="Get parameter information for a stored procedure",
inputSchema={
"type": "object",
"properties": {
"procedure_name": {
"type": "string",
"description": "Name of the stored procedure to get parameters for"
}
},
"required": ["procedure_name"]
}
),
Tool(
name="list_views",
description="List all views in the database",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="describe_view",
description="Get detailed information about a view including its definition",
inputSchema={
"type": "object",
"properties": {
"view_name": {
"type": "string",
"description": "Name of the view to describe"
}
},
"required": ["view_name"]
}
),
Tool(
name="create_view",
description="Create a new view",
inputSchema={
"type": "object",
"properties": {
"view_script": {
"type": "string",
"description": "Complete T-SQL script to create the view (including CREATE VIEW statement)"
}
},
"required": ["view_script"]
}
),
Tool(
name="modify_view",
description="Modify an existing view",
inputSchema={
"type": "object",
"properties": {
"view_script": {
"type": "string",
"description": "Complete T-SQL script to alter the view (using ALTER VIEW statement)"
}
},
"required": ["view_script"]
}
),
Tool(
name="delete_view",
description="Delete a view",
inputSchema={
"type": "object",
"properties": {
"view_name": {
"type": "string",
"description": "Name of the view to delete"
}
},
"required": ["view_name"]
}
),
Tool(
name="list_indexes",
description="List all indexes in the database",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Optional table name to filter indexes for a specific table"
}
}
}
),
Tool(
name="describe_index",
description="Get detailed information about an index",
inputSchema={
"type": "object",
"properties": {
"index_name": {
"type": "string",
"description": "Name of the index to describe"
},
"table_name": {
"type": "string",
"description": "Name of the table the index belongs to"
}
},
"required": ["index_name", "table_name"]
}
),
Tool(
name="create_index",
description="Create a new index",
inputSchema={
"type": "object",
"properties": {
"index_script": {
"type": "string",
"description": "Complete T-SQL script to create the index (including CREATE INDEX statement)"
}
},
"required": ["index_script"]
}
),
Tool(
name="delete_index",
description="Delete an index",
inputSchema={
"type": "object",
"properties": {
"index_name": {
"type": "string",
"description": "Name of the index to delete"
},
"table_name": {
"type": "string",
"description": "Name of the table the index belongs to"
}
},
"required": ["index_name", "table_name"]
}
),
Tool(
name="list_schemas",
description="List all schemas in the database",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="list_all_objects",
description="List all database objects (tables, views, procedures, indexes) organized by schema",
inputSchema={
"type": "object",
"properties": {
"schema_name": {
"type": "string",
"description": "Optional schema name to filter objects for a specific schema"
}
}
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Execute tool calls."""
try:
if name == "read_query":
return await self._execute_read_query(arguments["query"])
elif name == "write_query":
return await self._execute_write_query(arguments["query"])
elif name == "list_tables":
return await self._list_tables()
elif name == "describe_table":
return await self._describe_table(arguments["table_name"])
elif name == "create_table":
return await self._create_table(arguments["query"])
elif name == "create_procedure":
return await self._create_procedure(arguments["procedure_script"])
elif name == "modify_procedure":
return await self._modify_procedure(arguments["procedure_script"])
elif name == "delete_procedure":
return await self._delete_procedure(arguments["procedure_name"])
elif name == "list_procedures":
return await self._list_procedures()
elif name == "describe_procedure":
return await self._describe_procedure(arguments["procedure_name"])
elif name == "execute_procedure":
return await self._execute_procedure(arguments["procedure_name"], arguments.get("parameters"))
elif name == "get_procedure_parameters":
return await self._get_procedure_parameters(arguments["procedure_name"])
elif name == "list_views":
return await self._list_views()
elif name == "describe_view":
return await self._describe_view(arguments["view_name"])
elif name == "create_view":
return await self._create_view(arguments["view_script"])
elif name == "modify_view":
return await self._modify_view(arguments["view_script"])
elif name == "delete_view":
return await self._delete_view(arguments["view_name"])
elif name == "list_indexes":
return await self._list_indexes(arguments.get("table_name"))
elif name == "describe_index":
return await self._describe_index(arguments["index_name"], arguments["table_name"])
elif name == "create_index":
return await self._create_index(arguments["index_script"])
elif name == "delete_index":
return await self._delete_index(arguments["index_name"], arguments["table_name"])
elif name == "list_schemas":
return await self._list_schemas()
elif name == "list_all_objects":
return await self._list_all_objects(arguments.get("schema_name"))
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error executing tool {name}: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _execute_read_query(self, query: str) -> List[TextContent]:
"""Execute a SELECT query."""
if not query.strip().upper().startswith("SELECT"):
raise ValueError("Only SELECT queries are allowed for read_query")
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query)
rows = cursor.fetchall()
if not rows:
return [TextContent(type="text", text="No results found")]
# Get column names
columns = [desc[0] for desc in cursor.description]
# Format as CSV
csv_data = ",".join(columns) + "\n"
for row in rows:
csv_data += ",".join(str(cell) if cell is not None else "" for cell in row) + "\n"
return [TextContent(type="text", text=csv_data)]
async def _execute_write_query(self, query: str) -> List[TextContent]:
"""Execute an INSERT, UPDATE, DELETE, or stored procedure operation query."""
query_upper = query.strip().upper()
allowed_commands = ["INSERT", "UPDATE", "DELETE", "CREATE PROCEDURE", "ALTER PROCEDURE", "DROP PROCEDURE", "EXEC", "EXECUTE", "CREATE VIEW", "ALTER VIEW", "DROP VIEW", "CREATE INDEX", "DROP INDEX"]
if not any(query_upper.startswith(cmd) for cmd in allowed_commands):
raise ValueError("Only INSERT, UPDATE, DELETE, CREATE/ALTER/DROP PROCEDURE, CREATE/ALTER/DROP VIEW, CREATE/DROP INDEX, and EXEC queries are allowed for write_query")
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query)
# For DDL operations, rowcount might not be meaningful
if query_upper.startswith(("CREATE PROCEDURE", "ALTER PROCEDURE", "DROP PROCEDURE")):
conn.commit()
return [TextContent(type="text", text="Stored procedure operation executed successfully.")]
elif query_upper.startswith(("CREATE VIEW", "ALTER VIEW", "DROP VIEW")):
conn.commit()
return [TextContent(type="text", text="View operation executed successfully.")]
elif query_upper.startswith(("CREATE INDEX", "DROP INDEX")):
conn.commit()
return [TextContent(type="text", text="Index operation executed successfully.")]
elif query_upper.startswith(("EXEC", "EXECUTE")):
conn.commit()
return [TextContent(type="text", text="Stored procedure executed successfully.")]
else:
affected_rows = cursor.rowcount
conn.commit()
return [TextContent(type="text", text=f"Query executed successfully. {affected_rows} rows affected.")]
async def _list_tables(self) -> List[TextContent]:
"""List all tables in the database."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT TABLE_NAME, TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_NAME
""")
tables = cursor.fetchall()
if not tables:
return [TextContent(type="text", text="No tables found")]
result = "Tables in database:\n"
for table in tables:
result += f"- {table[0]} ({table[1]})\n"
return [TextContent(type="text", text=result)]
async def _describe_table(self, table_name: str) -> List[TextContent]:
"""Get schema information for a table."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
COLUMN_NAME,
DATA_TYPE,
IS_NULLABLE,
COLUMN_DEFAULT,
CHARACTER_MAXIMUM_LENGTH
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = ?
ORDER BY ORDINAL_POSITION
""", table_name)
columns = cursor.fetchall()
if not columns:
return [TextContent(type="text", text=f"Table '{table_name}' not found")]
result = f"Schema for table '{table_name}':\n"
result += "Column Name | Data Type | Nullable | Default | Max Length\n"
result += "-" * 60 + "\n"
for col in columns:
max_len = str(col[4]) if col[4] else "N/A"
result += f"{col[0]} | {col[1]} | {col[2]} | {col[3] or 'NULL'} | {max_len}\n"
return [TextContent(type="text", text=result)]
async def _create_table(self, query: str) -> List[TextContent]:
"""Create a new table."""
if not query.strip().upper().startswith("CREATE TABLE"):
raise ValueError("Only CREATE TABLE statements are allowed")
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query)
conn.commit()
return [TextContent(type="text", text="Table created successfully")]
async def _create_procedure(self, procedure_script: str) -> List[TextContent]:
"""Create a new stored procedure."""
if not procedure_script.strip().upper().startswith("CREATE PROCEDURE"):
raise ValueError("Only CREATE PROCEDURE statements are allowed")
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(procedure_script)
conn.commit()
return [TextContent(type="text", text="Procedure created successfully")]
async def _modify_procedure(self, procedure_script: str) -> List[TextContent]:
"""Modify an existing stored procedure."""
if not procedure_script.strip().upper().startswith("ALTER PROCEDURE"):
raise ValueError("Only ALTER PROCEDURE statements are allowed")
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(procedure_script)
conn.commit()
return [TextContent(type="text", text="Procedure modified successfully")]
async def _delete_procedure(self, procedure_name: str) -> List[TextContent]:
"""Delete a stored procedure."""
with self._get_connection() as conn:
cursor = conn.cursor()
# SQL Server doesn't support IF EXISTS with parameters in this context
cursor.execute(f"DROP PROCEDURE IF EXISTS [{procedure_name}]")
conn.commit()
return [TextContent(type="text", text=f"Procedure '{procedure_name}' deleted successfully")]
async def _list_procedures(self) -> List[TextContent]:
"""List all stored procedures in the database."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
o.name AS ProcedureName,
o.create_date AS CreatedDate,
o.modify_date AS ModifiedDate,
CASE
WHEN EXISTS (
SELECT 1 FROM sys.parameters p
WHERE p.object_id = o.object_id
) THEN 'Yes'
ELSE 'No'
END AS HasParameters
FROM sys.objects o
WHERE o.type = 'P'
AND o.is_ms_shipped = 0 -- Exclude system procedures
ORDER BY o.name
""")
procedures = cursor.fetchall()
if not procedures:
return [TextContent(type="text", text="No user-defined stored procedures found")]
result = "Stored procedures in database:\n"
result += "Name | Created | Modified | Has Parameters\n"
result += "-" * 60 + "\n"
for proc in procedures:
created = proc[1].strftime("%Y-%m-%d") if proc[1] else "N/A"
modified = proc[2].strftime("%Y-%m-%d") if proc[2] else "N/A"
result += f"{proc[0]} | {created} | {modified} | {proc[3]}\n"
return [TextContent(type="text", text=result)]
async def _describe_procedure(self, procedure_name: str) -> List[TextContent]:
"""Get detailed information about a stored procedure including its definition."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
OBJECT_DEFINITION(OBJECT_ID(?)) AS ProcedureDefinition
""", procedure_name)
row = cursor.fetchone()
if not row or not row[0]:
return [TextContent(type="text", text=f"Procedure '{procedure_name}' not found")]
definition = row[0]
result = f"Definition for procedure '{procedure_name}':\n"
result += "=" * 50 + "\n"
result += definition
return [TextContent(type="text", text=result)]
async def _execute_procedure(self, procedure_name: str, parameters: Optional[List[str]] = None) -> List[TextContent]:
"""Execute a stored procedure with optional parameters."""
if parameters is None:
parameters = []
with self._get_connection() as conn:
cursor = conn.cursor()
# Build the EXEC statement
if parameters:
param_placeholders = ', '.join(['?' for _ in parameters])
exec_statement = f"EXEC [{procedure_name}] {param_placeholders}"
cursor.execute(exec_statement, *parameters)
else:
cursor.execute(f"EXEC [{procedure_name}]")
# Try to fetch results if any
try:
rows = cursor.fetchall()
if rows:
# Get column names if available
if cursor.description:
columns = [desc[0] for desc in cursor.description]
result = f"Procedure '{procedure_name}' executed successfully.\n\nResults:\n"
result += ",".join(columns) + "\n"
for row in rows:
result += ",".join(str(cell) if cell is not None else "" for cell in row) + "\n"
return [TextContent(type="text", text=result)]
else:
return [TextContent(type="text", text=f"Procedure '{procedure_name}' executed successfully. {len(rows)} rows returned.")]
else:
return [TextContent(type="text", text=f"Procedure '{procedure_name}' executed successfully. No results returned.")]
except Exception:
# Some procedures don't return results
return [TextContent(type="text", text=f"Procedure '{procedure_name}' executed successfully.")]
async def _get_procedure_parameters(self, procedure_name: str) -> List[TextContent]:
"""Get parameter information for a stored procedure."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
p.parameter_id,
p.name AS parameter_name,
TYPE_NAME(p.user_type_id) AS data_type,
p.max_length,
p.precision,
p.scale,
p.is_output,
p.has_default_value,
p.default_value
FROM sys.parameters p
INNER JOIN sys.objects o ON p.object_id = o.object_id
WHERE o.name = ? AND o.type = 'P'
ORDER BY p.parameter_id
""", procedure_name)
parameters = cursor.fetchall()
if not parameters:
return [TextContent(type="text", text=f"No parameters found for procedure '{procedure_name}' or procedure does not exist")]
result = f"Parameters for procedure '{procedure_name}':\n"
result += "ID | Name | Data Type | Length | Precision | Scale | Output | Has Default | Default Value\n"
result += "-" * 90 + "\n"
for param in parameters:
param_id = param[0]
name = param[1] or "(return value)"
data_type = param[2]
max_length = param[3] if param[3] != -1 else "MAX"
precision = param[4] if param[4] > 0 else ""
scale = param[5] if param[5] > 0 else ""
is_output = "Yes" if param[6] else "No"
has_default = "Yes" if param[7] else "No"
default_value = param[8] if param[8] else ""
result += f"{param_id} | {name} | {data_type} | {max_length} | {precision} | {scale} | {is_output} | {has_default} | {default_value}\n"
return [TextContent(type="text", text=result)]
async def _list_views(self) -> List[TextContent]:
"""List all views in the database."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'VIEW'
ORDER BY TABLE_NAME
""")
views = cursor.fetchall()
if not views:
return [TextContent(type="text", text="No views found")]
result = "Views in database:\n"
for view in views:
result += f"- {view[0]}\n"
return [TextContent(type="text", text=result)]
async def _describe_view(self, view_name: str) -> List[TextContent]:
"""Get detailed information about a view including its definition."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT OBJECT_DEFINITION(OBJECT_ID(?)) AS ViewDefinition
""", view_name)
row = cursor.fetchone()
if not row or not row[0]:
return [TextContent(type="text", text=f"View '{view_name}' not found")]
definition = row[0]
result = f"Definition for view '{view_name}':\n"
result += "=" * 50 + "\n"
result += definition
return [TextContent(type="text", text=result)]
async def _create_view(self, view_script: str) -> List[TextContent]:
"""Create a new view."""
if not view_script.strip().upper().startswith("CREATE VIEW"):
raise ValueError("Only CREATE VIEW statements are allowed")
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(view_script)
conn.commit()
return [TextContent(type="text", text="View created successfully")]
async def _modify_view(self, view_script: str) -> List[TextContent]:
"""Modify an existing view."""
if not view_script.strip().upper().startswith("ALTER VIEW"):
raise ValueError("Only ALTER VIEW statements are allowed")
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(view_script)
conn.commit()
return [TextContent(type="text", text="View modified successfully")]
async def _delete_view(self, view_name: str) -> List[TextContent]:
"""Delete a view."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(f"DROP VIEW IF EXISTS [{view_name}]")
conn.commit()
return [TextContent(type="text", text=f"View '{view_name}' deleted successfully")]
async def _list_indexes(self, table_name: Optional[str] = None) -> List[TextContent]:
"""List all indexes in the database."""
with self._get_connection() as conn:
cursor = conn.cursor()
if table_name:
cursor.execute("""
SELECT
i.name AS IndexName,
i.type_desc AS IndexType,
i.is_unique AS IsUnique,
i.is_primary_key AS IsPrimaryKey,
i.is_disabled AS IsDisabled
FROM sys.indexes i
INNER JOIN sys.objects o ON i.object_id = o.object_id
WHERE o.name = ? AND i.name IS NOT NULL
ORDER BY i.name
""", table_name)
result = f"Indexes for table '{table_name}':\n"
result += "Name | Type | Unique | Primary Key | Disabled\n"
result += "-" * 60 + "\n"
else:
cursor.execute("""
SELECT
OBJECT_NAME(i.object_id) AS TableName,
i.name AS IndexName,
i.type_desc AS IndexType,
i.is_unique AS IsUnique,
i.is_primary_key AS IsPrimaryKey,
i.is_disabled AS IsDisabled
FROM sys.indexes i
INNER JOIN sys.objects o ON i.object_id = o.object_id
WHERE o.type = 'U' AND i.name IS NOT NULL
ORDER BY OBJECT_NAME(i.object_id), i.name
""")
result = "Indexes in database:\n"
result += "Table | Index Name | Type | Unique | Primary Key | Disabled\n"
result += "-" * 80 + "\n"
indexes = cursor.fetchall()
if not indexes:
return [TextContent(type="text", text="No indexes found")]
for index in indexes:
if table_name:
result += f"{index[0]} | {index[1]} | {index[2]} | {index[3]} | {index[4]}\n"
else:
result += f"{index[0]} | {index[1]} | {index[2]} | {index[3]} | {index[4]} | {index[5]}\n"
return [TextContent(type="text", text=result)]
async def _describe_index(self, index_name: str, table_name: str) -> List[TextContent]:
"""Get detailed information about an index."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
i.name AS IndexName,
i.type_desc AS IndexType,
i.is_unique AS IsUnique,
i.is_primary_key AS IsPrimaryKey,
i.is_disabled AS IsDisabled
FROM sys.indexes i
INNER JOIN sys.objects o ON i.object_id = o.object_id
WHERE i.name = ? AND o.name = ?
""", index_name, table_name)
index = cursor.fetchone()
if not index:
return [TextContent(type="text", text=f"Index '{index_name}' not found in table '{table_name}'")]
result = f"Index '{index_name}' in table '{table_name}':\n"
result += "Name | Type | Unique | Primary Key | Disabled\n"
result += "-" * 60 + "\n"
result += f"{index[0]} | {index[1]} | {index[2]} | {index[3]} | {index[4]}\n"
return [TextContent(type="text", text=result)]
async def _create_index(self, index_script: str) -> List[TextContent]:
"""Create a new index."""
if not index_script.strip().upper().startswith("CREATE INDEX"):
raise ValueError("Only CREATE INDEX statements are allowed")
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(index_script)
conn.commit()
return [TextContent(type="text", text="Index created successfully")]
async def _delete_index(self, index_name: str, table_name: str) -> List[TextContent]:
"""Delete an index."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(f"DROP INDEX IF EXISTS [{index_name}] ON [{table_name}]")
conn.commit()
return [TextContent(type="text", text=f"Index '{index_name}' in table '{table_name}' deleted successfully")]
async def _list_schemas(self) -> List[TextContent]:
"""List all schemas in the database."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT SCHEMA_NAME(schema_id) AS SchemaName
FROM sys.schemas
ORDER BY SCHEMA_NAME(schema_id)
""")
schemas = cursor.fetchall()
if not schemas:
return [TextContent(type="text", text="No schemas found")]
result = "Schemas in database:\n"
for schema in schemas:
result += f"- {schema[0]}\n"
return [TextContent(type="text", text=result)]
async def _list_all_objects(self, schema_name: Optional[str] = None) -> List[TextContent]:
"""List all database objects (tables, views, procedures, indexes) organized by schema."""
with self._get_connection() as conn:
cursor = conn.cursor()
if schema_name:
cursor.execute("""
SELECT
OBJECT_NAME(object_id) AS ObjectName,
type_desc AS ObjectType
FROM sys.objects
WHERE SCHEMA_NAME(schema_id) = ?
ORDER BY OBJECT_NAME(object_id)
""", schema_name)
else:
cursor.execute("""
SELECT
SCHEMA_NAME(schema_id) AS SchemaName,
OBJECT_NAME(object_id) AS ObjectName,
type_desc AS ObjectType
FROM sys.objects
ORDER BY SCHEMA_NAME(schema_id), OBJECT_NAME(object_id)
""")
objects = cursor.fetchall()
if not objects:
return [TextContent(type="text", text="No objects found")]
result = "Objects in database:\n"
for obj in objects:
result += f"- {obj[0]} | {obj[1]} | {obj[2]}\n"
return [TextContent(type="text", text=result)]
async def run(self):
"""Run the MCP server."""
try:
logger.info("Starting MCP server...")
async with stdio_server() as (read_stream, write_stream):
logger.info("Server streams established")
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="mssql-mcp-server",
server_version="1.0.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities=None,
),
),
)
except Exception as e:
logger.error(f"Error in server run: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise
async def main():
"""Main entry point."""
try:
logger.info("Initializing MSSQL MCP Server...")
server = MSSQLServer()
logger.info("Server initialized, starting...")
await server.run()
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server error: {e}")
logger.error(f"Full traceback: {traceback.format_exc()}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
```