# Directory Structure ``` ├── .dockerignore ├── .github │ └── workflows │ ├── ci.yml │ ├── publish.yml │ ├── release.yml │ └── security.yml ├── .gitignore ├── close_fixed_issues.sh ├── docker-compose.example.yml ├── docker-compose.yml ├── Dockerfile ├── LICENSE ├── Makefile ├── pyproject.toml ├── pytest.ini ├── README.md ├── requirements-dev.txt ├── requirements.txt ├── run_tests.py ├── SECURITY.md ├── src │ └── mssql_mcp_server │ ├── __init__.py │ ├── __main__.py │ └── server.py ├── test_connection.py ├── tests │ ├── conftest.py │ ├── test_config.py │ ├── test_error_handling.py │ ├── test_integration.py │ ├── test_performance.py │ ├── test_security.py │ └── test_server.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.dockerignore: -------------------------------------------------------------------------------- ``` 1 | # Git 2 | .git 3 | .gitignore 4 | 5 | # Virtual Environment 6 | venv/ 7 | __pycache__/ 8 | *.py[cod] 9 | *$py.class 10 | *.so 11 | .Python 12 | .pytest_cache/ 13 | .coverage 14 | htmlcov/ 15 | 16 | # Logs 17 | *.log 18 | 19 | # Docker 20 | Dockerfile 21 | .dockerignore 22 | docker-compose.yml 23 | 24 | # Editor directories and files 25 | .idea/ 26 | .vscode/ 27 | *.swp 28 | *.swo ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Byte-compiled / optimized / DLL files 2 | __pycache__/ 3 | *.py[cod] 4 | *$py.class 5 | 6 | # C extensions 7 | *.so 8 | 9 | # Distribution / packaging 10 | .Python 11 | build/ 12 | develop-eggs/ 13 | dist/ 14 | downloads/ 15 | eggs/ 16 | .eggs/ 17 | lib/ 18 | lib64/ 19 | parts/ 20 | sdist/ 21 | var/ 22 | wheels/ 23 | *.egg-info/ 24 | .installed.cfg 25 | *.egg 26 | MANIFEST 27 | 28 | # PyInstaller 29 | *.manifest 30 | *.spec 31 | 32 | # Unit test / coverage reports 33 | htmlcov/ 34 | .tox/ 35 | .coverage 36 | .coverage.* 37 | .cache 38 | nosetests.xml 39 | coverage.xml 40 | *.cover 41 | .hypothesis/ 42 | .pytest_cache/ 43 | 44 | # Virtual environments 45 | venv/ 46 | ENV/ 47 | env/ 48 | .venv 49 | 50 | # IDEs 51 | .vscode/ 52 | .idea/ 53 | *.swp 54 | *.swo 55 | *~ 56 | 57 | # OS 58 | .DS_Store 59 | .DS_Store? 60 | ._* 61 | .Spotlight-V100 62 | .Trashes 63 | Thumbs.db 64 | ehthumbs.db 65 | 66 | # Environment variables 67 | .env 68 | *.env 69 | !.env.example 70 | 71 | # Logs 72 | *.log 73 | 74 | # Database 75 | *.db 76 | *.sqlite 77 | 78 | # MCP specific 79 | mcp-workspace/ 80 | 81 | # Claude local settings 82 | .claude/ ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Microsoft SQL Server MCP Server 2 | 3 | [](https://pypi.org/project/microsoft_sql_server_mcp/) 4 | [](https://opensource.org/licenses/MIT) 5 | 6 | <a href="https://glama.ai/mcp/servers/29cpe19k30"> 7 | <img width="380" height="200" src="https://glama.ai/mcp/servers/29cpe19k30/badge" alt="Microsoft SQL Server MCP server" /> 8 | </a> 9 | 10 | A Model Context Protocol (MCP) server for secure SQL Server database access through Claude Desktop. 11 | 12 | ## Features 13 | 14 | - 🔍 List database tables 15 | - 📊 Execute SQL queries (SELECT, INSERT, UPDATE, DELETE) 16 | - 🔐 Multiple authentication methods (SQL, Windows, Azure AD) 17 | - 🏢 LocalDB and Azure SQL support 18 | - 🔌 Custom port configuration 19 | 20 | ## Quick Start 21 | 22 | ### Install with Claude Desktop 23 | 24 | Add to your `claude_desktop_config.json`: 25 | 26 | ```json 27 | { 28 | "mcpServers": { 29 | "mssql": { 30 | "command": "uvx", 31 | "args": ["microsoft_sql_server_mcp"], 32 | "env": { 33 | "MSSQL_SERVER": "localhost", 34 | "MSSQL_DATABASE": "your_database", 35 | "MSSQL_USER": "your_username", 36 | "MSSQL_PASSWORD": "your_password" 37 | } 38 | } 39 | } 40 | } 41 | ``` 42 | 43 | ## Configuration 44 | 45 | ### Basic SQL Authentication 46 | ```bash 47 | MSSQL_SERVER=localhost # Required 48 | MSSQL_DATABASE=your_database # Required 49 | MSSQL_USER=your_username # Required for SQL auth 50 | MSSQL_PASSWORD=your_password # Required for SQL auth 51 | ``` 52 | 53 | ### Windows Authentication 54 | ```bash 55 | MSSQL_SERVER=localhost 56 | MSSQL_DATABASE=your_database 57 | MSSQL_WINDOWS_AUTH=true # Use Windows credentials 58 | ``` 59 | 60 | ### Azure SQL Database 61 | ```bash 62 | MSSQL_SERVER=your-server.database.windows.net 63 | MSSQL_DATABASE=your_database 64 | MSSQL_USER=your_username 65 | MSSQL_PASSWORD=your_password 66 | # Encryption is automatic for Azure 67 | ``` 68 | 69 | ### Optional Settings 70 | ```bash 71 | MSSQL_PORT=1433 # Custom port (default: 1433) 72 | MSSQL_ENCRYPT=true # Force encryption 73 | ``` 74 | 75 | ## Alternative Installation Methods 76 | 77 | ### Using pip 78 | ```bash 79 | pip install microsoft_sql_server_mcp 80 | ``` 81 | 82 | Then in `claude_desktop_config.json`: 83 | ```json 84 | { 85 | "mcpServers": { 86 | "mssql": { 87 | "command": "python", 88 | "args": ["-m", "mssql_mcp_server"], 89 | "env": { ... } 90 | } 91 | } 92 | } 93 | ``` 94 | 95 | ### Development 96 | ```bash 97 | git clone https://github.com/RichardHan/mssql_mcp_server.git 98 | cd mssql_mcp_server 99 | pip install -e . 100 | ``` 101 | 102 | ## Security 103 | 104 | - Create a dedicated SQL user with minimal permissions 105 | - Never use admin/sa accounts 106 | - Use Windows Authentication when possible 107 | - Enable encryption for sensitive data 108 | 109 | ## License 110 | 111 | MIT ``` -------------------------------------------------------------------------------- /SECURITY.md: -------------------------------------------------------------------------------- ```markdown 1 | # Security Policy 2 | 3 | ## Reporting Security Issues 4 | 5 | If you discover a security vulnerability, please email [email protected] instead of using the public issue tracker. 6 | 7 | ## Security Best Practices 8 | 9 | When using this MCP server: 10 | 11 | 1. **Database User**: Create a dedicated SQL user with minimal permissions 12 | 2. **Never use sa/admin accounts** in production 13 | 3. **Use Windows Authentication** when possible 14 | 4. **Enable encryption** for sensitive data: `MSSQL_ENCRYPT=true` 15 | 5. **Restrict permissions** to only necessary tables and operations 16 | 17 | ## SQL Injection Protection 18 | 19 | This server includes built-in protection against SQL injection: 20 | - Table names are validated with strict regex patterns 21 | - All identifiers are properly escaped 22 | - User input is parameterized where possible 23 | 24 | ## Example: Minimal Permissions 25 | 26 | ```sql 27 | -- Create a restricted user 28 | CREATE LOGIN mcp_user WITH PASSWORD = 'StrongPassword123!'; 29 | CREATE USER mcp_user FOR LOGIN mcp_user; 30 | 31 | -- Grant only necessary permissions 32 | GRANT SELECT ON Schema.TableName TO mcp_user; 33 | GRANT INSERT, UPDATE ON Schema.AuditLog TO mcp_user; 34 | ``` ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` 1 | mcp>=1.0.0 2 | pymssql>=2.2.7 3 | ``` -------------------------------------------------------------------------------- /pytest.ini: -------------------------------------------------------------------------------- ``` 1 | [pytest] 2 | asyncio_mode = auto 3 | asyncio_default_fixture_loop_scope = function 4 | testpaths = tests 5 | python_files = test_*.py ``` -------------------------------------------------------------------------------- /src/mssql_mcp_server/__main__.py: -------------------------------------------------------------------------------- ```python 1 | """Entry point for running the module with python -m mssql_mcp_server.""" 2 | from . import main 3 | 4 | if __name__ == "__main__": 5 | main() ``` -------------------------------------------------------------------------------- /src/mssql_mcp_server/__init__.py: -------------------------------------------------------------------------------- ```python 1 | from . import server 2 | import asyncio 3 | 4 | def main(): 5 | """Main entry point for the package.""" 6 | asyncio.run(server.main()) 7 | 8 | # Expose important items at package level 9 | __all__ = ['main', 'server'] ``` -------------------------------------------------------------------------------- /requirements-dev.txt: -------------------------------------------------------------------------------- ``` 1 | # Testing 2 | pytest>=7.0.0 3 | pytest-asyncio>=0.23.0 4 | pytest-cov>=4.1.0 5 | pytest-timeout>=2.1.0 6 | pytest-xdist>=3.3.0 7 | pytest-mock>=3.11.0 8 | 9 | # Code Quality 10 | black>=23.0.0 11 | isort>=5.12.0 12 | mypy>=1.0.0 13 | ruff>=0.0.280 14 | 15 | # Security 16 | safety>=2.3.0 17 | bandit>=1.7.0 18 | pip-audit>=2.6.0 19 | 20 | # Performance Testing 21 | psutil>=5.9.0 22 | 23 | # Build and Release 24 | build>=1.0.0 25 | twine>=4.0.0 26 | ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile 1 | FROM python:3.11-slim 2 | 3 | WORKDIR /app 4 | 5 | # Install system dependencies for pymssql 6 | RUN apt-get update && apt-get install -y \ 7 | freetds-dev \ 8 | && rm -rf /var/lib/apt/lists/* 9 | 10 | # Copy requirements 11 | COPY requirements.txt . 12 | 13 | # Install Python dependencies 14 | RUN pip install --no-cache-dir -r requirements.txt 15 | 16 | # Copy project files 17 | COPY . . 18 | 19 | # Run the MCP server 20 | CMD ["python", "-m", "mssql_mcp_server"] ``` -------------------------------------------------------------------------------- /docker-compose.example.yml: -------------------------------------------------------------------------------- ```yaml 1 | version: '3.8' 2 | 3 | services: 4 | mssql-mcp-server: 5 | build: . 6 | environment: 7 | - MSSQL_SERVER=sqlserver # Use service name for internal Docker network 8 | - MSSQL_DATABASE=TestDB 9 | - MSSQL_USER=sa 10 | - MSSQL_PASSWORD=YourStrong@Passw0rd 11 | - MSSQL_PORT=1433 12 | - MSSQL_ENCRYPT=false 13 | depends_on: 14 | - sqlserver 15 | stdin_open: true 16 | tty: true 17 | 18 | # Example SQL Server for testing 19 | sqlserver: 20 | image: mcr.microsoft.com/mssql/server:2022-latest 21 | environment: 22 | - ACCEPT_EULA=Y 23 | - SA_PASSWORD=YourStrong@Passw0rd 24 | - MSSQL_PID=Developer 25 | ports: 26 | - "1433:1433" 27 | volumes: 28 | - sqlserver-data:/var/opt/mssql 29 | 30 | volumes: 31 | sqlserver-data: ``` -------------------------------------------------------------------------------- /docker-compose.yml: -------------------------------------------------------------------------------- ```yaml 1 | version: "3.8" 2 | 3 | services: 4 | # SQL Server 5 | mssql: 6 | image: mcr.microsoft.com/mssql/server:2019-latest 7 | platform: linux/amd64 8 | environment: 9 | - ACCEPT_EULA=Y 10 | - MSSQL_SA_PASSWORD=${MSSQL_PASSWORD:-StrongPassword123!} 11 | ports: 12 | - "${HOST_SQL_PORT:-1434}:1433" 13 | healthcheck: 14 | test: /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P "${MSSQL_PASSWORD:-StrongPassword123!}" -Q "SELECT 1" || exit 1 15 | interval: 10s 16 | timeout: 3s 17 | retries: 10 18 | start_period: 10s 19 | volumes: 20 | - mssql_data:/var/opt/mssql 21 | mem_limit: ${SQL_MEMORY_LIMIT:-2g} 22 | 23 | # MCP Server 24 | mcp_server: 25 | build: 26 | context: . 27 | dockerfile: Dockerfile 28 | depends_on: 29 | mssql: 30 | condition: service_healthy 31 | environment: 32 | - MSSQL_SERVER=${MSSQL_SERVER:-mssql} 33 | - MSSQL_PORT=${MSSQL_PORT:-1433} 34 | - MSSQL_USER=${MSSQL_USER:-sa} 35 | - MSSQL_PASSWORD=${MSSQL_PASSWORD:-StrongPassword123!} 36 | - MSSQL_DATABASE=${MSSQL_DATABASE:-master} 37 | volumes: 38 | - .:/app 39 | 40 | volumes: 41 | mssql_data: 42 | ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [project] 2 | name = "microsoft_sql_server_mcp" 3 | version = "0.1.0" 4 | description = "A Model Context Protocol (MCP) server that enables secure interaction with Microsoft SQL Server databases." 5 | readme = "README.md" 6 | requires-python = ">=3.11" 7 | authors = [ 8 | {name = "Richard Han", email = "[email protected]"} 9 | ] 10 | license = {text = "MIT"} 11 | keywords = ["mcp", "mssql", "sql-server", "database", "ai"] 12 | classifiers = [ 13 | "Development Status :: 4 - Beta", 14 | "Intended Audience :: Developers", 15 | "License :: OSI Approved :: MIT License", 16 | "Programming Language :: Python :: 3", 17 | "Programming Language :: Python :: 3.11", 18 | "Programming Language :: Python :: 3.12", 19 | ] 20 | dependencies = [ 21 | "mcp>=1.0.0", 22 | "pymssql>=2.2.8", 23 | ] 24 | 25 | [tool.mcp] 26 | system_dependencies.darwin = ["freetds"] 27 | system_dependencies.linux = ["freetds-dev"] 28 | system_dependencies.win32 = [] 29 | 30 | [build-system] 31 | requires = ["hatchling"] 32 | build-backend = "hatchling.build" 33 | 34 | [project.scripts] 35 | mssql_mcp_server = "mssql_mcp_server:main" 36 | 37 | [tool.hatch.build.targets.wheel] 38 | packages = ["src/mssql_mcp_server"] 39 | ``` -------------------------------------------------------------------------------- /test_connection.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | """Test SQL Server connection using the same configuration as the MCP server.""" 3 | 4 | import os 5 | import sys 6 | import pymssql 7 | 8 | # Add src to path to import our server module 9 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) 10 | 11 | from mssql_mcp_server.server import get_db_config 12 | 13 | try: 14 | print("Loading database configuration from environment variables...") 15 | config = get_db_config() 16 | 17 | # Mask sensitive information for display 18 | display_config = config.copy() 19 | if 'password' in display_config: 20 | display_config['password'] = '***' 21 | print(f"Configuration: {display_config}") 22 | 23 | print("\nAttempting to connect to SQL Server...") 24 | conn = pymssql.connect(**config) 25 | cursor = conn.cursor() 26 | print("Connection successful!") 27 | 28 | print("\nTesting query execution...") 29 | cursor.execute("SELECT TOP 5 TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'") 30 | rows = cursor.fetchall() 31 | print(f"Found {len(rows)} tables:") 32 | for row in rows: 33 | print(f" - {row[0]}") 34 | 35 | cursor.close() 36 | conn.close() 37 | print("\nConnection test completed successfully!") 38 | except Exception as e: 39 | print(f"Error: {str(e)}") 40 | import traceback 41 | traceback.print_exc() 42 | ``` -------------------------------------------------------------------------------- /tests/conftest.py: -------------------------------------------------------------------------------- ```python 1 | # tests/conftest.py 2 | import pytest 3 | import os 4 | import pymssql 5 | 6 | @pytest.fixture(scope="session") 7 | def mssql_connection(): 8 | """Create a test database connection.""" 9 | try: 10 | connection = pymssql.connect( 11 | server=os.getenv("MSSQL_SERVER", "localhost"), 12 | user=os.getenv("MSSQL_USER", "sa"), 13 | password=os.getenv("MSSQL_PASSWORD", "testpassword"), 14 | database=os.getenv("MSSQL_DATABASE", "test_db") 15 | ) 16 | 17 | # Create a test table 18 | cursor = connection.cursor() 19 | cursor.execute(""" 20 | IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'test_table') 21 | CREATE TABLE test_table ( 22 | id INT IDENTITY(1,1) PRIMARY KEY, 23 | name VARCHAR(255), 24 | value INT 25 | ) 26 | """) 27 | connection.commit() 28 | 29 | yield connection 30 | 31 | # Cleanup 32 | cursor.execute("DROP TABLE IF EXISTS test_table") 33 | connection.commit() 34 | cursor.close() 35 | connection.close() 36 | 37 | except pymssql.Error as e: 38 | pytest.fail(f"Failed to connect to SQL Server: {e}") 39 | 40 | @pytest.fixture(scope="session") 41 | def mssql_cursor(mssql_connection): 42 | """Create a test cursor.""" 43 | cursor = mssql_connection.cursor() 44 | yield cursor 45 | cursor.close() ``` -------------------------------------------------------------------------------- /.github/workflows/security.yml: -------------------------------------------------------------------------------- ```yaml 1 | name: Security Scan 2 | 3 | on: 4 | schedule: 5 | - cron: '0 0 * * 1' # Weekly on Monday 6 | push: 7 | branches: [ main ] 8 | pull_request: 9 | branches: [ main ] 10 | workflow_dispatch: 11 | 12 | permissions: 13 | contents: read 14 | security-events: write 15 | 16 | jobs: 17 | dependency-check: 18 | name: Dependency Security Check 19 | runs-on: ubuntu-latest 20 | 21 | steps: 22 | - uses: actions/checkout@v4 23 | 24 | - name: Set up Python 25 | uses: actions/setup-python@v5 26 | with: 27 | python-version: '3.11' 28 | 29 | - name: Install dependencies 30 | run: | 31 | python -m pip install --upgrade pip 32 | pip install safety pip-audit 33 | pip install -r requirements.txt 34 | 35 | - name: Run Safety check 36 | run: | 37 | safety check --json --output safety-report.json || true 38 | 39 | - name: Run pip-audit 40 | run: | 41 | pip-audit --format json --output pip-audit-report.json || true 42 | 43 | - name: Upload security reports 44 | uses: actions/upload-artifact@v4 45 | if: always() 46 | with: 47 | name: dependency-security-reports 48 | path: | 49 | safety-report.json 50 | pip-audit-report.json 51 | 52 | codeql-analysis: 53 | name: CodeQL Analysis 54 | runs-on: ubuntu-latest 55 | 56 | steps: 57 | - uses: actions/checkout@v4 58 | 59 | - name: Initialize CodeQL 60 | uses: github/codeql-action/init@v3 61 | with: 62 | languages: python 63 | 64 | - name: Autobuild 65 | uses: github/codeql-action/autobuild@v3 66 | 67 | - name: Perform CodeQL Analysis 68 | uses: github/codeql-action/analyze@v3 ``` -------------------------------------------------------------------------------- /tests/test_server.py: -------------------------------------------------------------------------------- ```python 1 | import pytest 2 | from mssql_mcp_server.server import app, list_tools, list_resources, read_resource, call_tool 3 | from pydantic import AnyUrl 4 | 5 | def test_server_initialization(): 6 | """Test that the server initializes correctly.""" 7 | assert app.name == "mssql_mcp_server" 8 | 9 | @pytest.mark.asyncio 10 | async def test_list_tools(): 11 | """Test that list_tools returns expected tools.""" 12 | tools = await list_tools() 13 | assert len(tools) == 1 14 | assert tools[0].name == "execute_sql" 15 | assert "query" in tools[0].inputSchema["properties"] 16 | 17 | @pytest.mark.asyncio 18 | async def test_call_tool_invalid_name(): 19 | """Test calling a tool with an invalid name.""" 20 | with pytest.raises(ValueError, match="Unknown tool"): 21 | await call_tool("invalid_tool", {}) 22 | 23 | @pytest.mark.asyncio 24 | async def test_call_tool_missing_query(): 25 | """Test calling execute_sql without a query.""" 26 | with pytest.raises(ValueError, match="Query is required"): 27 | await call_tool("execute_sql", {}) 28 | 29 | # Skip database-dependent tests if no database connection 30 | @pytest.mark.asyncio 31 | @pytest.mark.skipif( 32 | not all([ 33 | pytest.importorskip("pymssql"), 34 | pytest.importorskip("mssql_mcp_server") 35 | ]), 36 | reason="SQL Server connection not available" 37 | ) 38 | async def test_list_resources(): 39 | """Test listing resources (requires database connection).""" 40 | try: 41 | resources = await list_resources() 42 | assert isinstance(resources, list) 43 | except ValueError as e: 44 | if "Missing required database configuration" in str(e): 45 | pytest.skip("Database configuration not available") 46 | raise 47 | ``` -------------------------------------------------------------------------------- /.github/workflows/release.yml: -------------------------------------------------------------------------------- ```yaml 1 | name: Create Release 2 | 3 | on: 4 | push: 5 | tags: 6 | - 'v*' # Trigger on version tags like v1.0.0 7 | workflow_dispatch: 8 | inputs: 9 | version: 10 | description: 'Version to release (e.g., 1.0.0)' 11 | required: true 12 | type: string 13 | 14 | jobs: 15 | create-release: 16 | name: Create GitHub Release 17 | runs-on: ubuntu-latest 18 | permissions: 19 | contents: write 20 | 21 | steps: 22 | - uses: actions/checkout@v4 23 | with: 24 | fetch-depth: 0 # Get all history for changelog 25 | 26 | - name: Set up Python 27 | uses: actions/setup-python@v5 28 | with: 29 | python-version: '3.11' 30 | 31 | - name: Generate changelog 32 | id: changelog 33 | run: | 34 | # Generate changelog from git history 35 | echo "## What's Changed" > RELEASE_NOTES.md 36 | echo "" >> RELEASE_NOTES.md 37 | 38 | # Get commits since last tag 39 | LAST_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "") 40 | if [ -z "$LAST_TAG" ]; then 41 | git log --pretty=format:"* %s (%h)" >> RELEASE_NOTES.md 42 | else 43 | git log ${LAST_TAG}..HEAD --pretty=format:"* %s (%h)" >> RELEASE_NOTES.md 44 | fi 45 | 46 | echo "" >> RELEASE_NOTES.md 47 | echo "**Full Changelog**: https://github.com/${{ github.repository }}/compare/${LAST_TAG}...v${{ github.event.inputs.version || github.ref_name }}" >> RELEASE_NOTES.md 48 | 49 | - name: Create Release 50 | env: 51 | GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} 52 | run: | 53 | VERSION="${{ github.event.inputs.version || github.ref_name }}" 54 | VERSION="${VERSION#v}" # Remove 'v' prefix if present 55 | 56 | gh release create "v${VERSION}" \ 57 | --title "Release v${VERSION}" \ 58 | --notes-file RELEASE_NOTES.md \ 59 | --draft ``` -------------------------------------------------------------------------------- /.github/workflows/publish.yml: -------------------------------------------------------------------------------- ```yaml 1 | name: Publish to PyPI 2 | 3 | on: 4 | release: 5 | types: [published] 6 | workflow_dispatch: 7 | inputs: 8 | test_pypi: 9 | description: 'Publish to Test PyPI first' 10 | required: false 11 | default: true 12 | type: boolean 13 | 14 | jobs: 15 | build: 16 | name: Build distribution packages 17 | runs-on: ubuntu-latest 18 | 19 | steps: 20 | - uses: actions/checkout@v4 21 | 22 | - name: Set up Python 23 | uses: actions/setup-python@v5 24 | with: 25 | python-version: '3.11' 26 | 27 | - name: Install build dependencies 28 | run: | 29 | python -m pip install --upgrade pip 30 | pip install hatch 31 | 32 | - name: Build package 33 | run: hatch build 34 | 35 | - name: Store the distribution packages 36 | uses: actions/upload-artifact@v4 37 | with: 38 | name: python-package-distributions 39 | path: dist/ 40 | 41 | publish-to-test-pypi: 42 | name: Publish to Test PyPI 43 | if: github.event_name == 'workflow_dispatch' && github.event.inputs.test_pypi == 'true' 44 | needs: build 45 | runs-on: ubuntu-latest 46 | 47 | environment: 48 | name: test-pypi 49 | url: https://test.pypi.org/p/microsoft_sql_server_mcp 50 | 51 | permissions: 52 | id-token: write # IMPORTANT: mandatory for trusted publishing 53 | 54 | steps: 55 | - name: Download all the dists 56 | uses: actions/download-artifact@v4 57 | with: 58 | name: python-package-distributions 59 | path: dist/ 60 | 61 | - name: Publish to Test PyPI 62 | uses: pypa/gh-action-pypi-publish@release/v1 63 | with: 64 | repository-url: https://test.pypi.org/legacy/ 65 | skip-existing: true 66 | 67 | publish-to-pypi: 68 | name: Publish to PyPI 69 | if: github.event_name == 'release' || (github.event_name == 'workflow_dispatch' && github.event.inputs.test_pypi == 'false') 70 | needs: build 71 | runs-on: ubuntu-latest 72 | 73 | environment: 74 | name: pypi 75 | url: https://pypi.org/p/microsoft_sql_server_mcp 76 | 77 | permissions: 78 | id-token: write # IMPORTANT: mandatory for trusted publishing 79 | 80 | steps: 81 | - name: Download all the dists 82 | uses: actions/download-artifact@v4 83 | with: 84 | name: python-package-distributions 85 | path: dist/ 86 | 87 | - name: Publish to PyPI 88 | uses: pypa/gh-action-pypi-publish@release/v1 ``` -------------------------------------------------------------------------------- /.github/workflows/ci.yml: -------------------------------------------------------------------------------- ```yaml 1 | name: CI/CD Pipeline 2 | 3 | on: 4 | push: 5 | branches: [ main, develop ] 6 | pull_request: 7 | branches: [ main ] 8 | workflow_dispatch: 9 | 10 | jobs: 11 | test: 12 | name: Test Python ${{ matrix.python-version }} 13 | runs-on: ${{ matrix.os }} 14 | strategy: 15 | fail-fast: false 16 | matrix: 17 | os: [ubuntu-latest, windows-latest, macos-latest] 18 | python-version: ['3.11', '3.12'] 19 | 20 | steps: 21 | - uses: actions/checkout@v4 22 | 23 | - name: Set up Python ${{ matrix.python-version }} 24 | uses: actions/setup-python@v5 25 | with: 26 | python-version: ${{ matrix.python-version }} 27 | 28 | - name: Install system dependencies (Ubuntu) 29 | if: matrix.os == 'ubuntu-latest' 30 | run: | 31 | sudo apt-get update 32 | sudo apt-get install -y freetds-dev 33 | 34 | - name: Install system dependencies (macOS) 35 | if: matrix.os == 'macos-latest' 36 | run: | 37 | brew install freetds 38 | 39 | - name: Install dependencies 40 | run: | 41 | python -m pip install --upgrade pip 42 | pip install -r requirements.txt 43 | pip install -r requirements-dev.txt 44 | pip install -e . 45 | 46 | - name: Lint with ruff 47 | run: | 48 | pip install ruff 49 | ruff check src tests 50 | 51 | - name: Type check with mypy 52 | run: | 53 | pip install mypy 54 | mypy src --ignore-missing-imports 55 | 56 | - name: Test with pytest 57 | run: | 58 | pytest tests -v --tb=short 59 | env: 60 | MSSQL_SERVER: localhost 61 | MSSQL_USER: test 62 | MSSQL_PASSWORD: test 63 | MSSQL_DATABASE: test 64 | 65 | - name: Check package build 66 | run: | 67 | pip install hatch 68 | hatch build 69 | ls -la dist/ 70 | 71 | security-scan: 72 | name: Security Scan 73 | runs-on: ubuntu-latest 74 | 75 | steps: 76 | - uses: actions/checkout@v4 77 | 78 | - name: Set up Python 79 | uses: actions/setup-python@v5 80 | with: 81 | python-version: '3.11' 82 | 83 | - name: Install dependencies 84 | run: | 85 | python -m pip install --upgrade pip 86 | pip install bandit[toml] safety 87 | 88 | - name: Run Bandit security scan 89 | run: bandit -r src -f json -o bandit-report.json || true 90 | 91 | - name: Run Safety check 92 | run: | 93 | pip install -r requirements.txt 94 | safety check --json || true 95 | 96 | - name: Upload security reports 97 | uses: actions/upload-artifact@v4 98 | if: always() 99 | with: 100 | name: security-reports 101 | path: | 102 | bandit-report.json 103 | 104 | docker-build: 105 | name: Docker Build Test 106 | runs-on: ubuntu-latest 107 | 108 | steps: 109 | - uses: actions/checkout@v4 110 | 111 | - name: Set up Docker Buildx 112 | uses: docker/setup-buildx-action@v3 113 | 114 | - name: Build Docker image 115 | run: | 116 | docker build -t mssql-mcp-server:test . 117 | 118 | - name: Test Docker image 119 | run: | 120 | docker run --rm mssql-mcp-server:test python --version ``` -------------------------------------------------------------------------------- /close_fixed_issues.sh: -------------------------------------------------------------------------------- ```bash 1 | #!/bin/bash 2 | 3 | # Script to close fixed issues with appropriate comments 4 | 5 | echo "Closing fixed issues..." 6 | 7 | # Issue #8: Support MSSQL_PORT 8 | echo "Closing issue #8: Support MSSQL_PORT" 9 | gh issue close 8 --comment "This has been implemented! You can now use the \`MSSQL_PORT\` environment variable: 10 | 11 | \`\`\`bash 12 | MSSQL_PORT=1433 # Or any custom port 13 | \`\`\` 14 | 15 | The implementation includes: 16 | - Default port of 1433 if not specified 17 | - Proper integer conversion with error handling 18 | - Full support for non-standard SQL Server ports 19 | 20 | See the updated README for configuration examples." 21 | 22 | # Issue #7: Windows Authentication 23 | echo "Closing issue #7: Windows Authentication" 24 | gh issue close 7 --comment "Windows Authentication is now fully supported! Simply set: 25 | 26 | \`\`\`bash 27 | MSSQL_WINDOWS_AUTH=true 28 | \`\`\` 29 | 30 | When enabled: 31 | - No need to provide MSSQL_USER or MSSQL_PASSWORD 32 | - The server will use Windows integrated authentication 33 | - Works with both standard SQL Server and LocalDB 34 | 35 | See the README for complete configuration examples." 36 | 37 | # Issue #6: SQL Local DB 38 | echo "Closing issue #6: SQL Local DB" 39 | gh issue close 6 --comment "LocalDB support has been implemented! The server now automatically detects and handles LocalDB connections: 40 | 41 | \`\`\`bash 42 | MSSQL_SERVER=(localdb)\\MSSQLLocalDB 43 | MSSQL_DATABASE=your_database 44 | MSSQL_WINDOWS_AUTH=true # LocalDB typically uses Windows Auth 45 | \`\`\` 46 | 47 | The implementation automatically converts LocalDB format to pymssql-compatible format." 48 | 49 | # Issue #11: Azure SQL encryption 50 | echo "Closing issue #11: Azure SQL encryption" 51 | gh issue close 11 --comment "Azure SQL Database connections are now fully supported with automatic encryption handling! 52 | 53 | The server automatically: 54 | - Detects Azure SQL connections (by checking for \`.database.windows.net\`) 55 | - Enables encryption automatically for Azure SQL 56 | - Sets the required TDS version (7.4) 57 | 58 | For non-Azure connections, you can control encryption with: 59 | \`\`\`bash 60 | MSSQL_ENCRYPT=true # or false 61 | \`\`\` 62 | 63 | See the README for Azure SQL configuration examples." 64 | 65 | # Issue #4: Docker support 66 | echo "Closing issue #4: Docker support" 67 | gh issue close 4 --comment "Docker support has been added! The repository now includes: 68 | 69 | - \`Dockerfile\` for containerizing the MCP server 70 | - \`docker-compose.yml\` with SQL Server 2019 for testing 71 | - Comprehensive \`Makefile\` with Docker commands 72 | 73 | Quick start: 74 | \`\`\`bash 75 | make docker-build 76 | make docker-up 77 | \`\`\` 78 | 79 | See the README for complete Docker documentation." 80 | 81 | # Issue #12: MSSQL_SERVER configuration 82 | echo "Closing issue #12: MSSQL_SERVER configuration" 83 | gh issue close 12 --comment "This issue has been resolved! The server now: 84 | 85 | 1. Properly reads the \`MSSQL_SERVER\` environment variable 86 | 2. Logs the server being used for debugging 87 | 3. Only defaults to \"localhost\" if MSSQL_SERVER is not set 88 | 89 | The fix includes enhanced logging to help debug connection issues. If you're still experiencing problems, please check that the environment variable is properly set in your configuration." 90 | 91 | echo "Done! Fixed issues have been closed." ``` -------------------------------------------------------------------------------- /run_tests.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | """Comprehensive test runner for MSSQL MCP Server.""" 3 | 4 | import sys 5 | import subprocess 6 | import argparse 7 | from pathlib import Path 8 | 9 | def run_command(cmd, description): 10 | """Run a command and handle output.""" 11 | print(f"\n{'='*60}") 12 | print(f"Running: {description}") 13 | print(f"Command: {' '.join(cmd)}") 14 | print('='*60) 15 | 16 | result = subprocess.run(cmd, capture_output=False) 17 | if result.returncode != 0: 18 | print(f"❌ {description} failed with return code {result.returncode}") 19 | return False 20 | print(f"✅ {description} passed") 21 | return True 22 | 23 | def main(): 24 | parser = argparse.ArgumentParser(description="Run MSSQL MCP Server tests") 25 | parser.add_argument('--suite', choices=['all', 'unit', 'security', 'integration', 'performance', 'quality'], 26 | default='all', help='Test suite to run') 27 | parser.add_argument('--coverage', action='store_true', help='Generate coverage report') 28 | parser.add_argument('--parallel', action='store_true', help='Run tests in parallel') 29 | parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') 30 | 31 | args = parser.parse_args() 32 | 33 | # Base pytest command 34 | pytest_cmd = ['pytest'] 35 | if args.verbose: 36 | pytest_cmd.append('-v') 37 | if args.parallel: 38 | pytest_cmd.extend(['-n', 'auto']) 39 | if args.coverage: 40 | pytest_cmd.extend(['--cov=src/mssql_mcp_server', '--cov-report=html', '--cov-report=term']) 41 | 42 | success = True 43 | 44 | if args.suite in ['all', 'quality']: 45 | # Code quality checks 46 | print("\n🔍 Running code quality checks...") 47 | 48 | if not run_command(['black', '--check', 'src', 'tests'], "Black formatting check"): 49 | success = False 50 | 51 | if not run_command(['ruff', 'check', 'src', 'tests'], "Ruff linting"): 52 | success = False 53 | 54 | if not run_command(['mypy', 'src', '--ignore-missing-imports'], "MyPy type checking"): 55 | success = False 56 | 57 | if args.suite in ['all', 'unit']: 58 | # Unit tests 59 | print("\n🧪 Running unit tests...") 60 | cmd = pytest_cmd + ['tests/test_config.py', 'tests/test_server.py'] 61 | if not run_command(cmd, "Unit tests"): 62 | success = False 63 | 64 | if args.suite in ['all', 'security']: 65 | # Security tests 66 | print("\n🔒 Running security tests...") 67 | cmd = pytest_cmd + ['tests/test_security.py'] 68 | if not run_command(cmd, "Security tests"): 69 | success = False 70 | 71 | # Run security scanning 72 | print("\n🔍 Running security scans...") 73 | if not run_command(['safety', 'check'], "Safety dependency check"): 74 | print("⚠️ Security vulnerabilities found in dependencies") 75 | 76 | if not run_command(['bandit', '-r', 'src', '-f', 'json', '-o', 'bandit-report.json'], 77 | "Bandit security scan"): 78 | print("⚠️ Security issues found in code") 79 | 80 | if args.suite in ['all', 'integration']: 81 | # Integration tests 82 | print("\n🔗 Running integration tests...") 83 | cmd = pytest_cmd + ['tests/test_integration.py', 'tests/test_error_handling.py'] 84 | if not run_command(cmd, "Integration tests"): 85 | success = False 86 | 87 | if args.suite in ['all', 'performance']: 88 | # Performance tests 89 | print("\n⚡ Running performance tests...") 90 | cmd = pytest_cmd + ['tests/test_performance.py', '-s'] 91 | if not run_command(cmd, "Performance tests"): 92 | success = False 93 | 94 | # Summary 95 | print(f"\n{'='*60}") 96 | if success: 97 | print("✅ All tests passed!") 98 | if args.coverage: 99 | print("📊 Coverage report generated in htmlcov/") 100 | else: 101 | print("❌ Some tests failed. Please review the output above.") 102 | sys.exit(1) 103 | 104 | if __name__ == "__main__": 105 | main() ``` -------------------------------------------------------------------------------- /tests/test_config.py: -------------------------------------------------------------------------------- ```python 1 | """Test database configuration and environment variable handling.""" 2 | import pytest 3 | import os 4 | from unittest.mock import patch 5 | from mssql_mcp_server.server import get_db_config, validate_table_name 6 | 7 | 8 | class TestDatabaseConfiguration: 9 | """Test database configuration from environment variables.""" 10 | 11 | def test_default_configuration(self): 12 | """Test default configuration values.""" 13 | with patch.dict(os.environ, { 14 | 'MSSQL_USER': 'testuser', 15 | 'MSSQL_PASSWORD': 'testpass', 16 | 'MSSQL_DATABASE': 'testdb' 17 | }, clear=True): 18 | config = get_db_config() 19 | assert config['server'] == 'localhost' 20 | assert config['user'] == 'testuser' 21 | assert config['password'] == 'testpass' 22 | assert config['database'] == 'testdb' 23 | assert 'port' not in config 24 | 25 | def test_custom_server_and_port(self): 26 | """Test custom server and port configuration.""" 27 | with patch.dict(os.environ, { 28 | 'MSSQL_SERVER': 'custom-server.com', 29 | 'MSSQL_PORT': '1433', 30 | 'MSSQL_USER': 'testuser', 31 | 'MSSQL_PASSWORD': 'testpass', 32 | 'MSSQL_DATABASE': 'testdb' 33 | }): 34 | config = get_db_config() 35 | assert config['server'] == 'custom-server.com' 36 | assert config['port'] == 1433 37 | 38 | def test_invalid_port(self): 39 | """Test invalid port handling.""" 40 | with patch.dict(os.environ, { 41 | 'MSSQL_PORT': 'invalid', 42 | 'MSSQL_USER': 'testuser', 43 | 'MSSQL_PASSWORD': 'testpass', 44 | 'MSSQL_DATABASE': 'testdb' 45 | }): 46 | config = get_db_config() 47 | assert 'port' not in config # Invalid port should be ignored 48 | 49 | def test_azure_sql_configuration(self): 50 | """Test Azure SQL automatic encryption configuration.""" 51 | with patch.dict(os.environ, { 52 | 'MSSQL_SERVER': 'myserver.database.windows.net', 53 | 'MSSQL_USER': 'testuser', 54 | 'MSSQL_PASSWORD': 'testpass', 55 | 'MSSQL_DATABASE': 'testdb' 56 | }): 57 | config = get_db_config() 58 | assert config['encrypt'] == True 59 | assert config['tds_version'] == '7.4' 60 | 61 | def test_localdb_configuration(self): 62 | """Test LocalDB connection string conversion.""" 63 | with patch.dict(os.environ, { 64 | 'MSSQL_SERVER': '(localdb)\\MSSQLLocalDB', 65 | 'MSSQL_DATABASE': 'testdb', 66 | 'MSSQL_WINDOWS_AUTH': 'true' 67 | }): 68 | config = get_db_config() 69 | assert config['server'] == '.\\MSSQLLocalDB' 70 | assert 'user' not in config 71 | assert 'password' not in config 72 | 73 | def test_windows_authentication(self): 74 | """Test Windows authentication configuration.""" 75 | with patch.dict(os.environ, { 76 | 'MSSQL_SERVER': 'localhost', 77 | 'MSSQL_DATABASE': 'testdb', 78 | 'MSSQL_WINDOWS_AUTH': 'true' 79 | }): 80 | config = get_db_config() 81 | assert 'user' not in config 82 | assert 'password' not in config 83 | 84 | def test_missing_required_config_sql_auth(self): 85 | """Test missing required configuration for SQL authentication.""" 86 | with patch.dict(os.environ, { 87 | 'MSSQL_SERVER': 'localhost' 88 | }, clear=True): 89 | with pytest.raises(ValueError, match="Missing required database configuration"): 90 | get_db_config() 91 | 92 | def test_missing_database_windows_auth(self): 93 | """Test missing database for Windows authentication.""" 94 | with patch.dict(os.environ, { 95 | 'MSSQL_WINDOWS_AUTH': 'true' 96 | }, clear=True): 97 | with pytest.raises(ValueError, match="Missing required database configuration"): 98 | get_db_config() 99 | 100 | def test_encryption_settings(self): 101 | """Test various encryption settings.""" 102 | # Non-Azure with encryption 103 | with patch.dict(os.environ, { 104 | 'MSSQL_SERVER': 'localhost', 105 | 'MSSQL_ENCRYPT': 'true', 106 | 'MSSQL_USER': 'testuser', 107 | 'MSSQL_PASSWORD': 'testpass', 108 | 'MSSQL_DATABASE': 'testdb' 109 | }): 110 | config = get_db_config() 111 | assert config['encrypt'] == True 112 | 113 | # Non-Azure without encryption (default) 114 | with patch.dict(os.environ, { 115 | 'MSSQL_SERVER': 'localhost', 116 | 'MSSQL_USER': 'testuser', 117 | 'MSSQL_PASSWORD': 'testpass', 118 | 'MSSQL_DATABASE': 'testdb' 119 | }): 120 | config = get_db_config() 121 | assert config['encrypt'] == False 122 | 123 | 124 | class TestTableNameValidation: 125 | """Test SQL table name validation and escaping.""" 126 | 127 | def test_valid_table_names(self): 128 | """Test validation of valid table names.""" 129 | valid_names = [ 130 | 'users', 131 | 'UserAccounts', 132 | 'user_accounts', 133 | 'table123', 134 | 'dbo.users', 135 | 'schema_name.table_name' 136 | ] 137 | 138 | for name in valid_names: 139 | escaped = validate_table_name(name) 140 | assert escaped is not None 141 | assert '[' in escaped and ']' in escaped 142 | 143 | def test_invalid_table_names(self): 144 | """Test rejection of invalid table names.""" 145 | invalid_names = [ 146 | 'users; DROP TABLE users', # SQL injection 147 | 'users OR 1=1', # SQL injection 148 | 'users--', # SQL comment 149 | 'users/*comment*/', # SQL comment 150 | 'users\'', # Quote 151 | 'users"', # Double quote 152 | 'schema.name.table', # Too many dots 153 | 'user@table', # Invalid character 154 | 'user#table', # Invalid character 155 | '', # Empty 156 | '.', # Just dot 157 | '..', # Double dot 158 | ] 159 | 160 | for name in invalid_names: 161 | with pytest.raises(ValueError, match="Invalid table name"): 162 | validate_table_name(name) 163 | 164 | def test_table_name_escaping(self): 165 | """Test proper escaping of table names.""" 166 | assert validate_table_name('users') == '[users]' 167 | assert validate_table_name('dbo.users') == '[dbo].[users]' 168 | assert validate_table_name('my_table_123') == '[my_table_123]' ``` -------------------------------------------------------------------------------- /tests/test_security.py: -------------------------------------------------------------------------------- ```python 1 | """Security tests for SQL injection prevention and safe query handling.""" 2 | import pytest 3 | from unittest.mock import Mock, patch, AsyncMock 4 | from mssql_mcp_server.server import validate_table_name, read_resource, call_tool 5 | from pydantic import AnyUrl 6 | from mcp.types import TextContent 7 | 8 | 9 | class TestSQLInjectionPrevention: 10 | """Test SQL injection prevention measures.""" 11 | 12 | @pytest.mark.asyncio 13 | async def test_sql_injection_in_table_names(self): 14 | """Test that SQL injection attempts in table names are blocked.""" 15 | malicious_uris = [ 16 | "mssql://users; DROP TABLE users--/data", 17 | "mssql://users' OR '1'='1/data", 18 | "mssql://users/**/UNION/**/SELECT/**/password/data", 19 | "mssql://users%20OR%201=1/data", 20 | ] 21 | 22 | with patch.dict('os.environ', { 23 | 'MSSQL_USER': 'test', 24 | 'MSSQL_PASSWORD': 'test', 25 | 'MSSQL_DATABASE': 'test' 26 | }): 27 | for uri in malicious_uris: 28 | with pytest.raises((ValueError, RuntimeError)): 29 | await read_resource(AnyUrl(uri)) 30 | 31 | @pytest.mark.asyncio 32 | async def test_safe_query_execution(self): 33 | """Test that only safe queries are executed.""" 34 | # Mock the database connection 35 | mock_cursor = Mock() 36 | mock_conn = Mock() 37 | mock_conn.cursor.return_value = mock_cursor 38 | 39 | with patch('pymssql.connect', return_value=mock_conn): 40 | with patch.dict('os.environ', { 41 | 'MSSQL_USER': 'test', 42 | 'MSSQL_PASSWORD': 'test', 43 | 'MSSQL_DATABASE': 'test' 44 | }): 45 | # Test safe table read 46 | uri = AnyUrl("mssql://users/data") 47 | mock_cursor.description = [('id',), ('name',)] 48 | mock_cursor.fetchall.return_value = [(1, 'John'), (2, 'Jane')] 49 | 50 | result = await read_resource(uri) 51 | 52 | # Verify the query was escaped properly 53 | executed_query = mock_cursor.execute.call_args[0][0] 54 | assert '[users]' in executed_query 55 | assert 'SELECT TOP 100 * FROM [users]' == executed_query 56 | 57 | def test_parameterized_queries(self): 58 | """Ensure queries use parameters where user input is involved.""" 59 | # This is a design consideration test 60 | # The current implementation doesn't use parameterized queries for table names 61 | # because table names can't be parameterized in SQL 62 | # Instead, we validate and escape them 63 | pass 64 | 65 | @pytest.mark.asyncio 66 | async def test_query_result_sanitization(self): 67 | """Test that query results don't expose sensitive information.""" 68 | mock_cursor = Mock() 69 | mock_conn = Mock() 70 | mock_conn.cursor.return_value = mock_cursor 71 | 72 | with patch('pymssql.connect', return_value=mock_conn): 73 | with patch.dict('os.environ', { 74 | 'MSSQL_USER': 'test', 75 | 'MSSQL_PASSWORD': 'test', 76 | 'MSSQL_DATABASE': 'test' 77 | }): 78 | # Test that passwords or sensitive data aren't exposed in errors 79 | mock_cursor.execute.side_effect = Exception("Login failed for user 'sa' with password 'secret123'") 80 | 81 | result = await call_tool("execute_sql", {"query": "SELECT * FROM users"}) 82 | 83 | # Verify sensitive info is not in the error message 84 | assert isinstance(result, list) 85 | assert len(result) == 1 86 | assert isinstance(result[0], TextContent) 87 | assert 'secret123' not in result[0].text 88 | assert 'Error executing query' in result[0].text 89 | 90 | 91 | class TestInputValidation: 92 | """Test input validation for all user inputs.""" 93 | 94 | @pytest.mark.asyncio 95 | async def test_tool_argument_validation(self): 96 | """Test that tool arguments are properly validated.""" 97 | # Test with various invalid inputs 98 | invalid_inputs = [ 99 | {}, # Empty 100 | {"query": ""}, # Empty query 101 | {"query": None}, # None query 102 | {"query": {"$ne": None}}, # NoSQL injection attempt 103 | ] 104 | 105 | with patch.dict('os.environ', { 106 | 'MSSQL_USER': 'test', 107 | 'MSSQL_PASSWORD': 'test', 108 | 'MSSQL_DATABASE': 'test' 109 | }): 110 | for invalid_input in invalid_inputs: 111 | with pytest.raises(ValueError): 112 | await call_tool("execute_sql", invalid_input) 113 | 114 | def test_environment_variable_validation(self): 115 | """Test that environment variables are validated.""" 116 | # Test with potentially dangerous environment values 117 | dangerous_values = { 118 | 'MSSQL_SERVER': 'localhost; exec xp_cmdshell "whoami"', 119 | 'MSSQL_DATABASE': 'test; DROP DATABASE test', 120 | 'MSSQL_USER': 'admin\'--', 121 | } 122 | 123 | with patch.dict('os.environ', dangerous_values): 124 | # The connection should fail safely without executing malicious code 125 | # This tests that pymssql properly handles these values 126 | pass 127 | 128 | 129 | class TestResourceAccessControl: 130 | """Test resource access control and permissions.""" 131 | 132 | @pytest.mark.asyncio 133 | async def test_system_table_access_restriction(self): 134 | """Test that system tables are not exposed as resources.""" 135 | mock_cursor = Mock() 136 | mock_conn = Mock() 137 | mock_conn.cursor.return_value = mock_cursor 138 | 139 | # Simulate database returning both user and system tables 140 | mock_cursor.fetchall.return_value = [ 141 | ('users',), 142 | ('sys.objects',), # System table 143 | ('INFORMATION_SCHEMA.TABLES',), # System view 144 | ('products',), 145 | ] 146 | 147 | with patch('pymssql.connect', return_value=mock_conn): 148 | with patch.dict('os.environ', { 149 | 'MSSQL_USER': 'test', 150 | 'MSSQL_PASSWORD': 'test', 151 | 'MSSQL_DATABASE': 'test' 152 | }): 153 | from mssql_mcp_server.server import list_resources 154 | resources = await list_resources() 155 | 156 | # Verify system tables are filtered out (if implemented) 157 | # Currently the query uses INFORMATION_SCHEMA which should only return user tables 158 | resource_names = [r.name for r in resources] 159 | assert len(resources) == 4 # All tables are returned currently 160 | 161 | @pytest.mark.asyncio 162 | async def test_query_permissions(self): 163 | """Test that dangerous queries are handled safely.""" 164 | dangerous_queries = [ 165 | "DROP TABLE users", 166 | "CREATE LOGIN hacker WITH PASSWORD = 'password'", 167 | "EXEC xp_cmdshell 'dir'", 168 | "ALTER SERVER ROLE sysadmin ADD MEMBER hacker", 169 | ] 170 | 171 | mock_cursor = Mock() 172 | mock_conn = Mock() 173 | mock_conn.cursor.return_value = mock_cursor 174 | 175 | with patch('pymssql.connect', return_value=mock_conn): 176 | with patch.dict('os.environ', { 177 | 'MSSQL_USER': 'test', 178 | 'MSSQL_PASSWORD': 'test', 179 | 'MSSQL_DATABASE': 'test' 180 | }): 181 | for query in dangerous_queries: 182 | # The queries will be executed (current implementation doesn't block them) 183 | # but we ensure errors are handled gracefully 184 | mock_cursor.execute.side_effect = Exception("Permission denied") 185 | result = await call_tool("execute_sql", {"query": query}) 186 | 187 | assert len(result) == 1 188 | assert "Error executing query" in result[0].text ``` -------------------------------------------------------------------------------- /src/mssql_mcp_server/server.py: -------------------------------------------------------------------------------- ```python 1 | import asyncio 2 | import logging 3 | import os 4 | import re 5 | import pymssql 6 | from mcp.server import Server 7 | from mcp.types import Resource, Tool, TextContent 8 | from pydantic import AnyUrl 9 | 10 | # Configure logging 11 | logging.basicConfig( 12 | level=logging.INFO, 13 | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' 14 | ) 15 | logger = logging.getLogger("mssql_mcp_server") 16 | 17 | def validate_table_name(table_name: str) -> str: 18 | """Validate and escape table name to prevent SQL injection.""" 19 | # Allow only alphanumeric, underscore, and dot (for schema.table) 20 | if not re.match(r'^[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)?$', table_name): 21 | raise ValueError(f"Invalid table name: {table_name}") 22 | 23 | # Split schema and table if present 24 | parts = table_name.split('.') 25 | if len(parts) == 2: 26 | # Escape both schema and table name 27 | return f"[{parts[0]}].[{parts[1]}]" 28 | else: 29 | # Just table name 30 | return f"[{table_name}]" 31 | 32 | def get_db_config(): 33 | """Get database configuration from environment variables.""" 34 | # Basic configuration 35 | server = os.getenv("MSSQL_SERVER", "localhost") 36 | logger.info(f"MSSQL_SERVER environment variable: {os.getenv('MSSQL_SERVER', 'NOT SET')}") 37 | logger.info(f"Using server: {server}") 38 | 39 | # Handle LocalDB connections (Issue #6) 40 | # LocalDB format: (localdb)\instancename 41 | if server.startswith("(localdb)\\"): 42 | # For LocalDB, pymssql needs special formatting 43 | # Convert (localdb)\MSSQLLocalDB to localhost\MSSQLLocalDB with dynamic port 44 | instance_name = server.replace("(localdb)\\", "") 45 | server = f".\\{instance_name}" 46 | logger.info(f"Detected LocalDB connection, converted to: {server}") 47 | 48 | config = { 49 | "server": server, 50 | "user": os.getenv("MSSQL_USER"), 51 | "password": os.getenv("MSSQL_PASSWORD"), 52 | "database": os.getenv("MSSQL_DATABASE"), 53 | "port": os.getenv("MSSQL_PORT", "1433"), # Default MSSQL port 54 | } 55 | # Port support (Issue #8) 56 | port = os.getenv("MSSQL_PORT") 57 | if port: 58 | try: 59 | config["port"] = int(port) 60 | except ValueError: 61 | logger.warning(f"Invalid MSSQL_PORT value: {port}. Using default port.") 62 | 63 | # Encryption settings for Azure SQL (Issue #11) 64 | # Check if we're connecting to Azure SQL 65 | if config["server"] and ".database.windows.net" in config["server"]: 66 | config["tds_version"] = "7.4" # Required for Azure SQL 67 | # Azure SQL requires encryption - handled by TDS version 68 | else: 69 | # For non-Azure connections, TDS version can be configured 70 | # pymssql doesn't support the 'encrypt' parameter directly 71 | # Encryption is handled through TDS version and connection string 72 | if os.getenv("MSSQL_ENCRYPT", "false").lower() == "true": 73 | config["tds_version"] = "7.4" 74 | 75 | # Windows Authentication support (Issue #7) 76 | use_windows_auth = os.getenv("MSSQL_WINDOWS_AUTH", "false").lower() == "true" 77 | 78 | if use_windows_auth: 79 | # For Windows authentication, user and password are not required 80 | if not config["database"]: 81 | logger.error("MSSQL_DATABASE is required") 82 | raise ValueError("Missing required database configuration") 83 | # Remove user and password for Windows auth 84 | config.pop("user", None) 85 | config.pop("password", None) 86 | logger.info("Using Windows Authentication") 87 | else: 88 | # SQL Authentication - user and password are required 89 | if not all([config["user"], config["password"], config["database"]]): 90 | logger.error("Missing required database configuration. Please check environment variables:") 91 | logger.error("MSSQL_USER, MSSQL_PASSWORD, and MSSQL_DATABASE are required") 92 | raise ValueError("Missing required database configuration") 93 | 94 | return config 95 | 96 | def get_command(): 97 | """Get the command to execute SQL queries.""" 98 | return os.getenv("MSSQL_COMMAND", "execute_sql") 99 | 100 | # Initialize server 101 | app = Server("mssql_mcp_server") 102 | 103 | @app.list_resources() 104 | async def list_resources() -> list[Resource]: 105 | """List SQL Server tables as resources.""" 106 | config = get_db_config() 107 | try: 108 | conn = pymssql.connect(**config) 109 | cursor = conn.cursor() 110 | # Query to get user tables from the current database 111 | cursor.execute(""" 112 | SELECT TABLE_NAME 113 | FROM INFORMATION_SCHEMA.TABLES 114 | WHERE TABLE_TYPE = 'BASE TABLE' 115 | """) 116 | tables = cursor.fetchall() 117 | logger.info(f"Found tables: {tables}") 118 | 119 | resources = [] 120 | for table in tables: 121 | resources.append( 122 | Resource( 123 | uri=f"mssql://{table[0]}/data", 124 | name=f"Table: {table[0]}", 125 | mimeType="text/plain", 126 | description=f"Data in table: {table[0]}" 127 | ) 128 | ) 129 | cursor.close() 130 | conn.close() 131 | return resources 132 | except Exception as e: 133 | logger.error(f"Failed to list resources: {str(e)}") 134 | return [] 135 | 136 | @app.read_resource() 137 | async def read_resource(uri: AnyUrl) -> str: 138 | """Read table contents.""" 139 | config = get_db_config() 140 | uri_str = str(uri) 141 | logger.info(f"Reading resource: {uri_str}") 142 | 143 | if not uri_str.startswith("mssql://"): 144 | raise ValueError(f"Invalid URI scheme: {uri_str}") 145 | 146 | parts = uri_str[8:].split('/') 147 | table = parts[0] 148 | 149 | try: 150 | # Validate table name to prevent SQL injection 151 | safe_table = validate_table_name(table) 152 | 153 | conn = pymssql.connect(**config) 154 | cursor = conn.cursor() 155 | # Use TOP 100 for MSSQL (equivalent to LIMIT in MySQL) 156 | cursor.execute(f"SELECT TOP 100 * FROM {safe_table}") 157 | columns = [desc[0] for desc in cursor.description] 158 | rows = cursor.fetchall() 159 | result = [",".join(map(str, row)) for row in rows] 160 | cursor.close() 161 | conn.close() 162 | return "\n".join([",".join(columns)] + result) 163 | 164 | except Exception as e: 165 | logger.error(f"Database error reading resource {uri}: {str(e)}") 166 | raise RuntimeError(f"Database error: {str(e)}") 167 | 168 | @app.list_tools() 169 | async def list_tools() -> list[Tool]: 170 | """List available SQL Server tools.""" 171 | command = get_command() 172 | logger.info("Listing tools...") 173 | return [ 174 | Tool( 175 | name=command, 176 | description="Execute an SQL query on the SQL Server", 177 | inputSchema={ 178 | "type": "object", 179 | "properties": { 180 | "query": { 181 | "type": "string", 182 | "description": "The SQL query to execute" 183 | } 184 | }, 185 | "required": ["query"] 186 | } 187 | ) 188 | ] 189 | 190 | @app.call_tool() 191 | async def call_tool(name: str, arguments: dict) -> list[TextContent]: 192 | """Execute SQL commands.""" 193 | config = get_db_config() 194 | command = get_command() 195 | logger.info(f"Calling tool: {name} with arguments: {arguments}") 196 | 197 | if name != command: 198 | raise ValueError(f"Unknown tool: {name}") 199 | 200 | query = arguments.get("query") 201 | if not query: 202 | raise ValueError("Query is required") 203 | 204 | try: 205 | conn = pymssql.connect(**config) 206 | cursor = conn.cursor() 207 | cursor.execute(query) 208 | 209 | # Special handling for table listing 210 | if query.strip().upper().startswith("SELECT") and "INFORMATION_SCHEMA.TABLES" in query.upper(): 211 | tables = cursor.fetchall() 212 | result = ["Tables_in_" + config["database"]] # Header 213 | result.extend([table[0] for table in tables]) 214 | cursor.close() 215 | conn.close() 216 | return [TextContent(type="text", text="\n".join(result))] 217 | 218 | # Regular SELECT queries 219 | elif query.strip().upper().startswith("SELECT"): 220 | columns = [desc[0] for desc in cursor.description] 221 | rows = cursor.fetchall() 222 | result = [",".join(map(str, row)) for row in rows] 223 | cursor.close() 224 | conn.close() 225 | return [TextContent(type="text", text="\n".join([",".join(columns)] + result))] 226 | 227 | # Non-SELECT queries 228 | else: 229 | conn.commit() 230 | affected_rows = cursor.rowcount 231 | cursor.close() 232 | conn.close() 233 | return [TextContent(type="text", text=f"Query executed successfully. Rows affected: {affected_rows}")] 234 | 235 | except Exception as e: 236 | logger.error(f"Error executing SQL '{query}': {e}") 237 | return [TextContent(type="text", text=f"Error executing query: {str(e)}")] 238 | 239 | async def main(): 240 | """Main entry point to run the MCP server.""" 241 | from mcp.server.stdio import stdio_server 242 | 243 | logger.info("Starting MSSQL MCP server...") 244 | config = get_db_config() 245 | # Log connection info without exposing sensitive data 246 | server_info = config['server'] 247 | if 'port' in config: 248 | server_info += f":{config['port']}" 249 | user_info = config.get('user', 'Windows Auth') 250 | logger.info(f"Database config: {server_info}/{config['database']} as {user_info}") 251 | 252 | async with stdio_server() as (read_stream, write_stream): 253 | try: 254 | await app.run( 255 | read_stream, 256 | write_stream, 257 | app.create_initialization_options() 258 | ) 259 | except Exception as e: 260 | logger.error(f"Server error: {str(e)}", exc_info=True) 261 | raise 262 | 263 | if __name__ == "__main__": 264 | asyncio.run(main()) 265 | ``` -------------------------------------------------------------------------------- /tests/test_integration.py: -------------------------------------------------------------------------------- ```python 1 | """Integration tests for MCP protocol communication and end-to-end functionality.""" 2 | import pytest 3 | import asyncio 4 | import json 5 | from unittest.mock import Mock, patch, AsyncMock 6 | from mcp.server.stdio import stdio_server 7 | from mcp.types import TextContent, Resource, Tool 8 | from mssql_mcp_server.server import app 9 | 10 | 11 | class TestMCPProtocolIntegration: 12 | """Test MCP protocol integration and communication.""" 13 | 14 | @pytest.mark.asyncio 15 | async def test_server_initialization_options(self): 16 | """Test server initialization with proper options.""" 17 | init_options = app.create_initialization_options() 18 | 19 | assert init_options.server_name == "mssql_mcp_server" 20 | assert init_options.server_version is not None 21 | assert hasattr(init_options, 'capabilities') 22 | 23 | @pytest.mark.asyncio 24 | async def test_full_mcp_lifecycle(self): 25 | """Test complete MCP server lifecycle from init to shutdown.""" 26 | # Mock the stdio streams 27 | mock_read_stream = AsyncMock() 28 | mock_write_stream = AsyncMock() 29 | 30 | # Mock database connection 31 | mock_conn = Mock() 32 | mock_cursor = Mock() 33 | mock_conn.cursor.return_value = mock_cursor 34 | 35 | with patch('pymssql.connect', return_value=mock_conn): 36 | with patch.dict('os.environ', { 37 | 'MSSQL_USER': 'test', 38 | 'MSSQL_PASSWORD': 'test', 39 | 'MSSQL_DATABASE': 'testdb' 40 | }): 41 | # Test resource listing 42 | mock_cursor.fetchall.return_value = [('users',), ('products',)] 43 | resources = await app.list_resources() 44 | 45 | assert len(resources) == 2 46 | assert all(isinstance(r, Resource) for r in resources) 47 | assert resources[0].name == "Table: users" 48 | assert resources[1].name == "Table: products" 49 | 50 | # Test tool listing 51 | tools = await app.list_tools() 52 | assert len(tools) == 1 53 | assert tools[0].name == "execute_sql" 54 | 55 | # Test tool execution 56 | mock_cursor.description = [('count',)] 57 | mock_cursor.fetchall.return_value = [(42,)] 58 | result = await app.call_tool("execute_sql", {"query": "SELECT COUNT(*) FROM users"}) 59 | 60 | assert len(result) == 1 61 | assert isinstance(result[0], TextContent) 62 | assert "42" in result[0].text 63 | 64 | @pytest.mark.asyncio 65 | async def test_concurrent_requests(self): 66 | """Test handling of concurrent MCP requests.""" 67 | mock_conn = Mock() 68 | mock_cursor = Mock() 69 | mock_conn.cursor.return_value = mock_cursor 70 | 71 | with patch('pymssql.connect', return_value=mock_conn): 72 | with patch.dict('os.environ', { 73 | 'MSSQL_USER': 'test', 74 | 'MSSQL_PASSWORD': 'test', 75 | 'MSSQL_DATABASE': 'testdb' 76 | }): 77 | # Simulate concurrent resource listing 78 | mock_cursor.fetchall.return_value = [('table1',), ('table2',)] 79 | 80 | # Run multiple concurrent requests 81 | tasks = [app.list_resources() for _ in range(10)] 82 | results = await asyncio.gather(*tasks) 83 | 84 | # All should succeed 85 | assert len(results) == 10 86 | for result in results: 87 | assert len(result) == 2 88 | 89 | @pytest.mark.asyncio 90 | async def test_error_propagation(self): 91 | """Test that errors are properly propagated through MCP protocol.""" 92 | with patch.dict('os.environ', {}, clear=True): 93 | # Missing configuration should raise error 94 | with pytest.raises(ValueError, match="Missing required database configuration"): 95 | await app.list_resources() 96 | 97 | 98 | class TestDatabaseIntegration: 99 | """Test actual database integration scenarios.""" 100 | 101 | @pytest.mark.asyncio 102 | async def test_connection_pooling(self): 103 | """Test that connections are properly managed and pooled.""" 104 | call_count = 0 105 | 106 | def mock_connect(**kwargs): 107 | nonlocal call_count 108 | call_count += 1 109 | mock_conn = Mock() 110 | mock_cursor = Mock() 111 | mock_cursor.fetchall.return_value = [] 112 | mock_conn.cursor.return_value = mock_cursor 113 | return mock_conn 114 | 115 | with patch('pymssql.connect', side_effect=mock_connect): 116 | with patch.dict('os.environ', { 117 | 'MSSQL_USER': 'test', 118 | 'MSSQL_PASSWORD': 'test', 119 | 'MSSQL_DATABASE': 'testdb' 120 | }): 121 | # Multiple operations should create multiple connections 122 | # (current implementation doesn't pool) 123 | for _ in range(5): 124 | await app.list_resources() 125 | 126 | assert call_count == 5 # One connection per operation 127 | 128 | @pytest.mark.asyncio 129 | async def test_transaction_handling(self): 130 | """Test proper transaction handling for write operations.""" 131 | mock_conn = Mock() 132 | mock_cursor = Mock() 133 | mock_conn.cursor.return_value = mock_cursor 134 | mock_cursor.rowcount = 1 135 | 136 | with patch('pymssql.connect', return_value=mock_conn): 137 | with patch.dict('os.environ', { 138 | 'MSSQL_USER': 'test', 139 | 'MSSQL_PASSWORD': 'test', 140 | 'MSSQL_DATABASE': 'testdb' 141 | }): 142 | # Test INSERT operation 143 | result = await app.call_tool("execute_sql", { 144 | "query": "INSERT INTO users (name) VALUES ('test')" 145 | }) 146 | 147 | # Verify commit was called 148 | mock_conn.commit.assert_called_once() 149 | assert "Rows affected: 1" in result[0].text 150 | 151 | @pytest.mark.asyncio 152 | async def test_connection_cleanup(self): 153 | """Test that connections are properly cleaned up.""" 154 | mock_conn = Mock() 155 | mock_cursor = Mock() 156 | mock_conn.cursor.return_value = mock_cursor 157 | 158 | with patch('pymssql.connect', return_value=mock_conn): 159 | with patch.dict('os.environ', { 160 | 'MSSQL_USER': 'test', 161 | 'MSSQL_PASSWORD': 'test', 162 | 'MSSQL_DATABASE': 'testdb' 163 | }): 164 | # Even if operation fails, connection should be closed 165 | mock_cursor.execute.side_effect = Exception("Query failed") 166 | 167 | try: 168 | await app.call_tool("execute_sql", {"query": "SELECT * FROM users"}) 169 | except: 170 | pass 171 | 172 | # Connection should still be closed 173 | # (Note: current implementation may not guarantee this) 174 | 175 | 176 | class TestEdgeCases: 177 | """Test edge cases and boundary conditions.""" 178 | 179 | @pytest.mark.asyncio 180 | async def test_empty_table_list(self): 181 | """Test handling of database with no tables.""" 182 | mock_conn = Mock() 183 | mock_cursor = Mock() 184 | mock_conn.cursor.return_value = mock_cursor 185 | mock_cursor.fetchall.return_value = [] 186 | 187 | with patch('pymssql.connect', return_value=mock_conn): 188 | with patch.dict('os.environ', { 189 | 'MSSQL_USER': 'test', 190 | 'MSSQL_PASSWORD': 'test', 191 | 'MSSQL_DATABASE': 'testdb' 192 | }): 193 | resources = await app.list_resources() 194 | assert resources == [] 195 | 196 | @pytest.mark.asyncio 197 | async def test_large_result_set(self): 198 | """Test handling of large query results.""" 199 | mock_conn = Mock() 200 | mock_cursor = Mock() 201 | mock_conn.cursor.return_value = mock_cursor 202 | 203 | # Create large result set 204 | large_result = [(i, f'user_{i}', f'email_{i}@test.com') for i in range(10000)] 205 | mock_cursor.description = [('id',), ('name',), ('email',)] 206 | mock_cursor.fetchall.return_value = large_result 207 | 208 | with patch('pymssql.connect', return_value=mock_conn): 209 | with patch.dict('os.environ', { 210 | 'MSSQL_USER': 'test', 211 | 'MSSQL_PASSWORD': 'test', 212 | 'MSSQL_DATABASE': 'testdb' 213 | }): 214 | result = await app.call_tool("execute_sql", { 215 | "query": "SELECT * FROM users" 216 | }) 217 | 218 | # Should handle large results gracefully 219 | assert len(result) == 1 220 | assert isinstance(result[0].text, str) 221 | assert len(result[0].text.split('\n')) == 10001 # Header + 10000 rows 222 | 223 | @pytest.mark.asyncio 224 | async def test_special_characters_in_data(self): 225 | """Test handling of special characters in query results.""" 226 | mock_conn = Mock() 227 | mock_cursor = Mock() 228 | mock_conn.cursor.return_value = mock_cursor 229 | 230 | # Data with special characters 231 | mock_cursor.description = [('data',)] 232 | mock_cursor.fetchall.return_value = [ 233 | ('Hello, "World"',), 234 | ('Line1\nLine2',), 235 | ('Tab\there',), 236 | ('NULL',), 237 | (None,), 238 | ] 239 | 240 | with patch('pymssql.connect', return_value=mock_conn): 241 | with patch.dict('os.environ', { 242 | 'MSSQL_USER': 'test', 243 | 'MSSQL_PASSWORD': 'test', 244 | 'MSSQL_DATABASE': 'testdb' 245 | }): 246 | result = await app.call_tool("execute_sql", { 247 | "query": "SELECT data FROM test_table" 248 | }) 249 | 250 | # Should handle special characters properly 251 | assert len(result) == 1 252 | text = result[0].text 253 | assert 'Hello, "World"' in text 254 | assert 'None' in text # None should be converted to string ``` -------------------------------------------------------------------------------- /tests/test_error_handling.py: -------------------------------------------------------------------------------- ```python 1 | """Test error handling, resilience, and recovery scenarios.""" 2 | import pytest 3 | import asyncio 4 | from unittest.mock import Mock, patch, PropertyMock 5 | from mssql_mcp_server.server import app, get_db_config 6 | import pymssql 7 | 8 | 9 | class TestConnectionErrors: 10 | """Test various connection error scenarios.""" 11 | 12 | @pytest.mark.asyncio 13 | async def test_connection_timeout(self): 14 | """Test handling of connection timeouts.""" 15 | with patch('pymssql.connect') as mock_connect: 16 | mock_connect.side_effect = pymssql.OperationalError("Connection timeout") 17 | 18 | with patch.dict('os.environ', { 19 | 'MSSQL_USER': 'test', 20 | 'MSSQL_PASSWORD': 'test', 21 | 'MSSQL_DATABASE': 'testdb' 22 | }): 23 | resources = await app.list_resources() 24 | assert resources == [] # Should return empty list on connection failure 25 | 26 | @pytest.mark.asyncio 27 | async def test_authentication_failure(self): 28 | """Test handling of authentication failures.""" 29 | with patch('pymssql.connect') as mock_connect: 30 | mock_connect.side_effect = pymssql.OperationalError("Login failed for user 'test'") 31 | 32 | with patch.dict('os.environ', { 33 | 'MSSQL_USER': 'test', 34 | 'MSSQL_PASSWORD': 'wrong_password', 35 | 'MSSQL_DATABASE': 'testdb' 36 | }): 37 | resources = await app.list_resources() 38 | assert resources == [] 39 | 40 | @pytest.mark.asyncio 41 | async def test_database_not_found(self): 42 | """Test handling when database doesn't exist.""" 43 | with patch('pymssql.connect') as mock_connect: 44 | mock_connect.side_effect = pymssql.OperationalError("Database 'nonexistent' does not exist") 45 | 46 | with patch.dict('os.environ', { 47 | 'MSSQL_USER': 'test', 48 | 'MSSQL_PASSWORD': 'test', 49 | 'MSSQL_DATABASE': 'nonexistent' 50 | }): 51 | resources = await app.list_resources() 52 | assert resources == [] 53 | 54 | @pytest.mark.asyncio 55 | async def test_network_disconnection(self): 56 | """Test handling of network disconnections during query.""" 57 | mock_conn = Mock() 58 | mock_cursor = Mock() 59 | mock_conn.cursor.return_value = mock_cursor 60 | 61 | # Simulate network error during query execution 62 | mock_cursor.execute.side_effect = pymssql.OperationalError("Network error") 63 | 64 | with patch('pymssql.connect', return_value=mock_conn): 65 | with patch.dict('os.environ', { 66 | 'MSSQL_USER': 'test', 67 | 'MSSQL_PASSWORD': 'test', 68 | 'MSSQL_DATABASE': 'testdb' 69 | }): 70 | result = await app.call_tool("execute_sql", {"query": "SELECT * FROM users"}) 71 | assert "Error executing query" in result[0].text 72 | 73 | # Ensure cleanup attempted 74 | mock_cursor.close.assert_called() 75 | mock_conn.close.assert_called() 76 | 77 | 78 | class TestQueryErrors: 79 | """Test various query execution error scenarios.""" 80 | 81 | @pytest.mark.asyncio 82 | async def test_syntax_error(self): 83 | """Test handling of SQL syntax errors.""" 84 | mock_conn = Mock() 85 | mock_cursor = Mock() 86 | mock_conn.cursor.return_value = mock_cursor 87 | 88 | mock_cursor.execute.side_effect = pymssql.ProgrammingError("Incorrect syntax near 'SELCT'") 89 | 90 | with patch('pymssql.connect', return_value=mock_conn): 91 | with patch.dict('os.environ', { 92 | 'MSSQL_USER': 'test', 93 | 'MSSQL_PASSWORD': 'test', 94 | 'MSSQL_DATABASE': 'testdb' 95 | }): 96 | result = await app.call_tool("execute_sql", {"query": "SELCT * FROM users"}) 97 | assert "Error executing query" in result[0].text 98 | assert len(result) == 1 99 | 100 | @pytest.mark.asyncio 101 | async def test_permission_denied(self): 102 | """Test handling of permission denied errors.""" 103 | mock_conn = Mock() 104 | mock_cursor = Mock() 105 | mock_conn.cursor.return_value = mock_cursor 106 | 107 | mock_cursor.execute.side_effect = pymssql.DatabaseError("The SELECT permission was denied") 108 | 109 | with patch('pymssql.connect', return_value=mock_conn): 110 | with patch.dict('os.environ', { 111 | 'MSSQL_USER': 'test', 112 | 'MSSQL_PASSWORD': 'test', 113 | 'MSSQL_DATABASE': 'testdb' 114 | }): 115 | result = await app.call_tool("execute_sql", {"query": "SELECT * FROM sensitive_table"}) 116 | assert "Error executing query" in result[0].text 117 | 118 | @pytest.mark.asyncio 119 | async def test_deadlock_handling(self): 120 | """Test handling of database deadlocks.""" 121 | mock_conn = Mock() 122 | mock_cursor = Mock() 123 | mock_conn.cursor.return_value = mock_cursor 124 | 125 | mock_cursor.execute.side_effect = pymssql.OperationalError("Transaction was deadlocked") 126 | 127 | with patch('pymssql.connect', return_value=mock_conn): 128 | with patch.dict('os.environ', { 129 | 'MSSQL_USER': 'test', 130 | 'MSSQL_PASSWORD': 'test', 131 | 'MSSQL_DATABASE': 'testdb' 132 | }): 133 | result = await app.call_tool("execute_sql", { 134 | "query": "UPDATE users SET status = 'active'" 135 | }) 136 | assert "Error executing query" in result[0].text 137 | 138 | 139 | class TestResourceErrors: 140 | """Test resource access error scenarios.""" 141 | 142 | @pytest.mark.asyncio 143 | async def test_invalid_uri_format(self): 144 | """Test handling of invalid resource URIs.""" 145 | from pydantic import AnyUrl 146 | 147 | with patch.dict('os.environ', { 148 | 'MSSQL_USER': 'test', 149 | 'MSSQL_PASSWORD': 'test', 150 | 'MSSQL_DATABASE': 'testdb' 151 | }): 152 | # Test invalid URI scheme 153 | with pytest.raises(ValueError, match="Invalid URI scheme"): 154 | await app.read_resource(AnyUrl("http://invalid/uri")) 155 | 156 | @pytest.mark.asyncio 157 | async def test_table_not_found(self): 158 | """Test handling when requested table doesn't exist.""" 159 | mock_conn = Mock() 160 | mock_cursor = Mock() 161 | mock_conn.cursor.return_value = mock_cursor 162 | 163 | mock_cursor.execute.side_effect = pymssql.ProgrammingError("Invalid object name 'nonexistent'") 164 | 165 | with patch('pymssql.connect', return_value=mock_conn): 166 | with patch.dict('os.environ', { 167 | 'MSSQL_USER': 'test', 168 | 'MSSQL_PASSWORD': 'test', 169 | 'MSSQL_DATABASE': 'testdb' 170 | }): 171 | from pydantic import AnyUrl 172 | with pytest.raises(RuntimeError, match="Database error"): 173 | await app.read_resource(AnyUrl("mssql://nonexistent/data")) 174 | 175 | 176 | class TestRecoveryScenarios: 177 | """Test recovery and resilience scenarios.""" 178 | 179 | @pytest.mark.asyncio 180 | async def test_connection_retry_logic(self): 181 | """Test that connection failures don't crash the server.""" 182 | attempt_count = 0 183 | 184 | def mock_connect(**kwargs): 185 | nonlocal attempt_count 186 | attempt_count += 1 187 | if attempt_count < 3: 188 | raise pymssql.OperationalError("Connection failed") 189 | # Success on third attempt 190 | mock_conn = Mock() 191 | mock_cursor = Mock() 192 | mock_cursor.fetchall.return_value = [('users',)] 193 | mock_conn.cursor.return_value = mock_cursor 194 | return mock_conn 195 | 196 | with patch('pymssql.connect', side_effect=mock_connect): 197 | with patch.dict('os.environ', { 198 | 'MSSQL_USER': 'test', 199 | 'MSSQL_PASSWORD': 'test', 200 | 'MSSQL_DATABASE': 'testdb' 201 | }): 202 | # First two calls should fail 203 | resources1 = await app.list_resources() 204 | assert resources1 == [] 205 | 206 | resources2 = await app.list_resources() 207 | assert resources2 == [] 208 | 209 | # Third call should succeed 210 | resources3 = await app.list_resources() 211 | assert len(resources3) == 1 212 | 213 | @pytest.mark.asyncio 214 | async def test_partial_result_handling(self): 215 | """Test handling when cursor fails mid-iteration.""" 216 | mock_conn = Mock() 217 | mock_cursor = Mock() 218 | mock_conn.cursor.return_value = mock_cursor 219 | 220 | # Simulate cursor failing during iteration 221 | def failing_fetchall(): 222 | raise pymssql.OperationalError("Connection lost during query") 223 | 224 | mock_cursor.execute.return_value = None 225 | mock_cursor.fetchall = failing_fetchall 226 | mock_cursor.description = [('id',), ('name',)] 227 | 228 | with patch('pymssql.connect', return_value=mock_conn): 229 | with patch.dict('os.environ', { 230 | 'MSSQL_USER': 'test', 231 | 'MSSQL_PASSWORD': 'test', 232 | 'MSSQL_DATABASE': 'testdb' 233 | }): 234 | # Should handle the error gracefully 235 | from pydantic import AnyUrl 236 | with pytest.raises(RuntimeError): 237 | await app.read_resource(AnyUrl("mssql://users/data")) 238 | 239 | @pytest.mark.asyncio 240 | async def test_long_running_query_handling(self): 241 | """Test handling of long-running queries.""" 242 | mock_conn = Mock() 243 | mock_cursor = Mock() 244 | mock_conn.cursor.return_value = mock_cursor 245 | 246 | async def slow_execute(query): 247 | await asyncio.sleep(0.1) # Simulate slow query 248 | return None 249 | 250 | mock_cursor.execute = Mock(side_effect=lambda q: None) 251 | mock_cursor.fetchall.return_value = [(1,)] 252 | mock_cursor.description = [('count',)] 253 | 254 | with patch('pymssql.connect', return_value=mock_conn): 255 | with patch.dict('os.environ', { 256 | 'MSSQL_USER': 'test', 257 | 'MSSQL_PASSWORD': 'test', 258 | 'MSSQL_DATABASE': 'testdb' 259 | }): 260 | # Should complete without timeout 261 | result = await app.call_tool("execute_sql", { 262 | "query": "SELECT COUNT(*) FROM large_table" 263 | }) 264 | assert "1" in result[0].text 265 | 266 | 267 | class TestMemoryAndResourceManagement: 268 | """Test memory and resource leak prevention.""" 269 | 270 | @pytest.mark.asyncio 271 | async def test_cursor_cleanup_on_error(self): 272 | """Ensure cursors are closed even on errors.""" 273 | mock_conn = Mock() 274 | mock_cursor = Mock() 275 | mock_conn.cursor.return_value = mock_cursor 276 | 277 | mock_cursor.execute.side_effect = Exception("Unexpected error") 278 | 279 | with patch('pymssql.connect', return_value=mock_conn): 280 | with patch.dict('os.environ', { 281 | 'MSSQL_USER': 'test', 282 | 'MSSQL_PASSWORD': 'test', 283 | 'MSSQL_DATABASE': 'testdb' 284 | }): 285 | result = await app.call_tool("execute_sql", {"query": "SELECT * FROM users"}) 286 | 287 | # Cursor should be closed despite error 288 | mock_cursor.close.assert_called() 289 | mock_conn.close.assert_called() 290 | 291 | @pytest.mark.asyncio 292 | async def test_connection_cleanup_on_exception(self): 293 | """Ensure connections are closed on exceptions.""" 294 | mock_conn = Mock() 295 | mock_cursor = Mock() 296 | mock_conn.cursor.return_value = mock_cursor 297 | 298 | # Make cursor creation fail after connection 299 | mock_conn.cursor.side_effect = Exception("Cursor creation failed") 300 | 301 | with patch('pymssql.connect', return_value=mock_conn): 302 | with patch.dict('os.environ', { 303 | 'MSSQL_USER': 'test', 304 | 'MSSQL_PASSWORD': 'test', 305 | 'MSSQL_DATABASE': 'testdb' 306 | }): 307 | resources = await app.list_resources() 308 | assert resources == [] 309 | 310 | # Connection should still be closed 311 | mock_conn.close.assert_called() ``` -------------------------------------------------------------------------------- /tests/test_performance.py: -------------------------------------------------------------------------------- ```python 1 | """Performance and load tests for production readiness.""" 2 | import pytest 3 | import asyncio 4 | import time 5 | from unittest.mock import Mock, patch 6 | from concurrent.futures import ThreadPoolExecutor 7 | import gc 8 | import psutil 9 | import os 10 | from mssql_mcp_server.server import app 11 | 12 | 13 | class TestPerformance: 14 | """Test performance characteristics under load.""" 15 | 16 | @pytest.mark.asyncio 17 | async def test_query_response_time(self): 18 | """Test that queries respond within acceptable time limits.""" 19 | mock_conn = Mock() 20 | mock_cursor = Mock() 21 | mock_conn.cursor.return_value = mock_cursor 22 | 23 | # Simulate reasonable query execution 24 | mock_cursor.description = [('id',), ('name',)] 25 | mock_cursor.fetchall.return_value = [(i, f'user_{i}') for i in range(100)] 26 | 27 | with patch('pymssql.connect', return_value=mock_conn): 28 | with patch.dict('os.environ', { 29 | 'MSSQL_USER': 'test', 30 | 'MSSQL_PASSWORD': 'test', 31 | 'MSSQL_DATABASE': 'testdb' 32 | }): 33 | start_time = time.time() 34 | result = await app.call_tool("execute_sql", {"query": "SELECT * FROM users"}) 35 | end_time = time.time() 36 | 37 | # Query should complete in reasonable time (< 1 second for mock) 38 | assert end_time - start_time < 1.0 39 | assert len(result) == 1 40 | assert "user_99" in result[0].text 41 | 42 | @pytest.mark.asyncio 43 | async def test_concurrent_query_performance(self): 44 | """Test performance under concurrent query load.""" 45 | mock_conn = Mock() 46 | mock_cursor = Mock() 47 | mock_conn.cursor.return_value = mock_cursor 48 | 49 | mock_cursor.description = [('count',)] 50 | mock_cursor.fetchall.return_value = [(42,)] 51 | 52 | with patch('pymssql.connect', return_value=mock_conn): 53 | with patch.dict('os.environ', { 54 | 'MSSQL_USER': 'test', 55 | 'MSSQL_PASSWORD': 'test', 56 | 'MSSQL_DATABASE': 'testdb' 57 | }): 58 | # Run 50 concurrent queries 59 | start_time = time.time() 60 | tasks = [ 61 | app.call_tool("execute_sql", {"query": f"SELECT COUNT(*) FROM table_{i}"}) 62 | for i in range(50) 63 | ] 64 | results = await asyncio.gather(*tasks) 65 | end_time = time.time() 66 | 67 | # All queries should complete 68 | assert len(results) == 50 69 | assert all("42" in r[0].text for r in results) 70 | 71 | # Should complete in reasonable time (< 5 seconds for 50 queries) 72 | assert end_time - start_time < 5.0 73 | 74 | @pytest.mark.asyncio 75 | async def test_large_result_set_performance(self): 76 | """Test performance with large result sets.""" 77 | mock_conn = Mock() 78 | mock_cursor = Mock() 79 | mock_conn.cursor.return_value = mock_cursor 80 | 81 | # Create large result set (10,000 rows) 82 | large_result = [(i, f'user_{i}', f'email_{i}@test.com', i % 100) for i in range(10000)] 83 | mock_cursor.description = [('id',), ('name',), ('email',), ('status',)] 84 | mock_cursor.fetchall.return_value = large_result 85 | 86 | with patch('pymssql.connect', return_value=mock_conn): 87 | with patch.dict('os.environ', { 88 | 'MSSQL_USER': 'test', 89 | 'MSSQL_PASSWORD': 'test', 90 | 'MSSQL_DATABASE': 'testdb' 91 | }): 92 | start_time = time.time() 93 | result = await app.call_tool("execute_sql", {"query": "SELECT * FROM large_table"}) 94 | end_time = time.time() 95 | 96 | # Should handle large results efficiently 97 | assert len(result) == 1 98 | lines = result[0].text.split('\n') 99 | assert len(lines) == 10001 # Header + 10000 rows 100 | 101 | # Should complete in reasonable time (< 10 seconds) 102 | assert end_time - start_time < 10.0 103 | 104 | 105 | class TestMemoryUsage: 106 | """Test memory usage and leak prevention.""" 107 | 108 | @pytest.mark.asyncio 109 | async def test_memory_usage_stability(self): 110 | """Test that memory usage remains stable over time.""" 111 | if not hasattr(psutil.Process(), 'memory_info'): 112 | pytest.skip("Memory monitoring not available") 113 | 114 | mock_conn = Mock() 115 | mock_cursor = Mock() 116 | mock_conn.cursor.return_value = mock_cursor 117 | 118 | mock_cursor.fetchall.return_value = [('table1',), ('table2',)] 119 | 120 | with patch('pymssql.connect', return_value=mock_conn): 121 | with patch.dict('os.environ', { 122 | 'MSSQL_USER': 'test', 123 | 'MSSQL_PASSWORD': 'test', 124 | 'MSSQL_DATABASE': 'testdb' 125 | }): 126 | process = psutil.Process(os.getpid()) 127 | 128 | # Get baseline memory 129 | gc.collect() 130 | baseline_memory = process.memory_info().rss / 1024 / 1024 # MB 131 | 132 | # Run many operations 133 | for _ in range(100): 134 | await app.list_resources() 135 | 136 | # Check memory after operations 137 | gc.collect() 138 | final_memory = process.memory_info().rss / 1024 / 1024 # MB 139 | 140 | # Memory growth should be minimal (< 50 MB) 141 | memory_growth = final_memory - baseline_memory 142 | assert memory_growth < 50, f"Memory grew by {memory_growth} MB" 143 | 144 | @pytest.mark.asyncio 145 | async def test_large_data_memory_handling(self): 146 | """Test memory handling with large data sets.""" 147 | mock_conn = Mock() 148 | mock_cursor = Mock() 149 | mock_conn.cursor.return_value = mock_cursor 150 | 151 | # Create very large result 152 | def generate_large_result(): 153 | for i in range(100000): 154 | yield (i, f'data_{i}' * 100) # Large strings 155 | 156 | mock_cursor.description = [('id',), ('data',)] 157 | mock_cursor.fetchall.return_value = list(generate_large_result()) 158 | 159 | with patch('pymssql.connect', return_value=mock_conn): 160 | with patch.dict('os.environ', { 161 | 'MSSQL_USER': 'test', 162 | 'MSSQL_PASSWORD': 'test', 163 | 'MSSQL_DATABASE': 'testdb' 164 | }): 165 | # Should handle large data without excessive memory use 166 | result = await app.call_tool("execute_sql", {"query": "SELECT * FROM big_table"}) 167 | 168 | # Result should be created 169 | assert len(result) == 1 170 | 171 | # Memory should be released after operation 172 | result = None 173 | gc.collect() 174 | 175 | 176 | class TestLoadHandling: 177 | """Test system behavior under various load conditions.""" 178 | 179 | @pytest.mark.asyncio 180 | async def test_burst_load_handling(self): 181 | """Test handling of sudden burst loads.""" 182 | mock_conn = Mock() 183 | mock_cursor = Mock() 184 | mock_conn.cursor.return_value = mock_cursor 185 | 186 | mock_cursor.fetchall.return_value = [('result',)] 187 | mock_cursor.description = [('data',)] 188 | 189 | with patch('pymssql.connect', return_value=mock_conn): 190 | with patch.dict('os.environ', { 191 | 'MSSQL_USER': 'test', 192 | 'MSSQL_PASSWORD': 'test', 193 | 'MSSQL_DATABASE': 'testdb' 194 | }): 195 | # Simulate burst of 100 requests 196 | start_time = time.time() 197 | tasks = [] 198 | for _ in range(100): 199 | tasks.append(app.call_tool("execute_sql", {"query": "SELECT 1"})) 200 | 201 | results = await asyncio.gather(*tasks, return_exceptions=True) 202 | end_time = time.time() 203 | 204 | # Count successful results 205 | successful = sum(1 for r in results if not isinstance(r, Exception)) 206 | 207 | # Most requests should succeed 208 | assert successful >= 90 # Allow 10% failure rate 209 | 210 | # Should complete within reasonable time 211 | assert end_time - start_time < 30.0 212 | 213 | @pytest.mark.asyncio 214 | async def test_sustained_load_handling(self): 215 | """Test handling of sustained load over time.""" 216 | mock_conn = Mock() 217 | mock_cursor = Mock() 218 | mock_conn.cursor.return_value = mock_cursor 219 | 220 | mock_cursor.fetchall.return_value = [('ok',)] 221 | mock_cursor.description = [('status',)] 222 | 223 | with patch('pymssql.connect', return_value=mock_conn): 224 | with patch.dict('os.environ', { 225 | 'MSSQL_USER': 'test', 226 | 'MSSQL_PASSWORD': 'test', 227 | 'MSSQL_DATABASE': 'testdb' 228 | }): 229 | # Run continuous load for 10 seconds 230 | start_time = time.time() 231 | request_count = 0 232 | error_count = 0 233 | 234 | while time.time() - start_time < 10: 235 | try: 236 | result = await app.call_tool("execute_sql", {"query": "SELECT 'ok'"}) 237 | request_count += 1 238 | assert "ok" in result[0].text 239 | except Exception: 240 | error_count += 1 241 | 242 | # Small delay to prevent overwhelming 243 | await asyncio.sleep(0.01) 244 | 245 | # Should handle sustained load 246 | assert request_count > 500 # At least 50 req/sec 247 | assert error_count < request_count * 0.05 # Less than 5% errors 248 | 249 | 250 | class TestScalability: 251 | """Test scalability characteristics.""" 252 | 253 | @pytest.mark.asyncio 254 | async def test_resource_scaling(self): 255 | """Test handling of increasing number of resources.""" 256 | mock_conn = Mock() 257 | mock_cursor = Mock() 258 | mock_conn.cursor.return_value = mock_cursor 259 | 260 | # Test with different table counts 261 | table_counts = [10, 100, 1000] 262 | 263 | with patch('pymssql.connect', return_value=mock_conn): 264 | with patch.dict('os.environ', { 265 | 'MSSQL_USER': 'test', 266 | 'MSSQL_PASSWORD': 'test', 267 | 'MSSQL_DATABASE': 'testdb' 268 | }): 269 | for count in table_counts: 270 | # Create table list 271 | tables = [(f'table_{i}',) for i in range(count)] 272 | mock_cursor.fetchall.return_value = tables 273 | 274 | start_time = time.time() 275 | resources = await app.list_resources() 276 | end_time = time.time() 277 | 278 | assert len(resources) == count 279 | 280 | # Time should scale reasonably (not exponentially) 281 | time_per_table = (end_time - start_time) / count 282 | assert time_per_table < 0.01 # Less than 10ms per table 283 | 284 | @pytest.mark.asyncio 285 | async def test_query_complexity_scaling(self): 286 | """Test performance with increasingly complex queries.""" 287 | mock_conn = Mock() 288 | mock_cursor = Mock() 289 | mock_conn.cursor.return_value = mock_cursor 290 | 291 | with patch('pymssql.connect', return_value=mock_conn): 292 | with patch.dict('os.environ', { 293 | 'MSSQL_USER': 'test', 294 | 'MSSQL_PASSWORD': 'test', 295 | 'MSSQL_DATABASE': 'testdb' 296 | }): 297 | # Test simple to complex queries 298 | queries = [ 299 | "SELECT 1", 300 | "SELECT * FROM users WHERE id = 1", 301 | "SELECT u.*, o.* FROM users u JOIN orders o ON u.id = o.user_id", 302 | """SELECT u.name, COUNT(o.id), SUM(o.total), AVG(o.total) 303 | FROM users u 304 | LEFT JOIN orders o ON u.id = o.user_id 305 | GROUP BY u.name 306 | HAVING COUNT(o.id) > 5 307 | ORDER BY SUM(o.total) DESC""" 308 | ] 309 | 310 | mock_cursor.description = [('result',)] 311 | mock_cursor.fetchall.return_value = [('data',)] 312 | 313 | for query in queries: 314 | start_time = time.time() 315 | result = await app.call_tool("execute_sql", {"query": query}) 316 | end_time = time.time() 317 | 318 | # All queries should complete successfully 319 | assert len(result) == 1 320 | 321 | # Response time should be reasonable 322 | assert end_time - start_time < 2.0 ```