# Directory Structure
```
├── .dockerignore
├── .github
│ └── workflows
│ ├── ci.yml
│ ├── publish.yml
│ ├── release.yml
│ └── security.yml
├── .gitignore
├── close_fixed_issues.sh
├── docker-compose.example.yml
├── docker-compose.yml
├── Dockerfile
├── LICENSE
├── Makefile
├── pyproject.toml
├── pytest.ini
├── README.md
├── requirements-dev.txt
├── requirements.txt
├── run_tests.py
├── SECURITY.md
├── src
│ └── mssql_mcp_server
│ ├── __init__.py
│ ├── __main__.py
│ └── server.py
├── test_connection.py
├── tests
│ ├── conftest.py
│ ├── test_config.py
│ ├── test_error_handling.py
│ ├── test_integration.py
│ ├── test_performance.py
│ ├── test_security.py
│ └── test_server.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------
```
1 | # Git
2 | .git
3 | .gitignore
4 |
5 | # Virtual Environment
6 | venv/
7 | __pycache__/
8 | *.py[cod]
9 | *$py.class
10 | *.so
11 | .Python
12 | .pytest_cache/
13 | .coverage
14 | htmlcov/
15 |
16 | # Logs
17 | *.log
18 |
19 | # Docker
20 | Dockerfile
21 | .dockerignore
22 | docker-compose.yml
23 |
24 | # Editor directories and files
25 | .idea/
26 | .vscode/
27 | *.swp
28 | *.swo
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | # Byte-compiled / optimized / DLL files
2 | __pycache__/
3 | *.py[cod]
4 | *$py.class
5 |
6 | # C extensions
7 | *.so
8 |
9 | # Distribution / packaging
10 | .Python
11 | build/
12 | develop-eggs/
13 | dist/
14 | downloads/
15 | eggs/
16 | .eggs/
17 | lib/
18 | lib64/
19 | parts/
20 | sdist/
21 | var/
22 | wheels/
23 | *.egg-info/
24 | .installed.cfg
25 | *.egg
26 | MANIFEST
27 |
28 | # PyInstaller
29 | *.manifest
30 | *.spec
31 |
32 | # Unit test / coverage reports
33 | htmlcov/
34 | .tox/
35 | .coverage
36 | .coverage.*
37 | .cache
38 | nosetests.xml
39 | coverage.xml
40 | *.cover
41 | .hypothesis/
42 | .pytest_cache/
43 |
44 | # Virtual environments
45 | venv/
46 | ENV/
47 | env/
48 | .venv
49 |
50 | # IDEs
51 | .vscode/
52 | .idea/
53 | *.swp
54 | *.swo
55 | *~
56 |
57 | # OS
58 | .DS_Store
59 | .DS_Store?
60 | ._*
61 | .Spotlight-V100
62 | .Trashes
63 | Thumbs.db
64 | ehthumbs.db
65 |
66 | # Environment variables
67 | .env
68 | *.env
69 | !.env.example
70 |
71 | # Logs
72 | *.log
73 |
74 | # Database
75 | *.db
76 | *.sqlite
77 |
78 | # MCP specific
79 | mcp-workspace/
80 |
81 | # Claude local settings
82 | .claude/
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # Microsoft SQL Server MCP Server
2 |
3 | [](https://pypi.org/project/microsoft_sql_server_mcp/)
4 | [](https://opensource.org/licenses/MIT)
5 |
6 | <a href="https://glama.ai/mcp/servers/29cpe19k30">
7 | <img width="380" height="200" src="https://glama.ai/mcp/servers/29cpe19k30/badge" alt="Microsoft SQL Server MCP server" />
8 | </a>
9 |
10 | A Model Context Protocol (MCP) server for secure SQL Server database access through Claude Desktop.
11 |
12 | ## Features
13 |
14 | - 🔍 List database tables
15 | - 📊 Execute SQL queries (SELECT, INSERT, UPDATE, DELETE)
16 | - 🔐 Multiple authentication methods (SQL, Windows, Azure AD)
17 | - 🏢 LocalDB and Azure SQL support
18 | - 🔌 Custom port configuration
19 |
20 | ## Quick Start
21 |
22 | ### Install with Claude Desktop
23 |
24 | Add to your `claude_desktop_config.json`:
25 |
26 | ```json
27 | {
28 | "mcpServers": {
29 | "mssql": {
30 | "command": "uvx",
31 | "args": ["microsoft_sql_server_mcp"],
32 | "env": {
33 | "MSSQL_SERVER": "localhost",
34 | "MSSQL_DATABASE": "your_database",
35 | "MSSQL_USER": "your_username",
36 | "MSSQL_PASSWORD": "your_password"
37 | }
38 | }
39 | }
40 | }
41 | ```
42 |
43 | ## Configuration
44 |
45 | ### Basic SQL Authentication
46 | ```bash
47 | MSSQL_SERVER=localhost # Required
48 | MSSQL_DATABASE=your_database # Required
49 | MSSQL_USER=your_username # Required for SQL auth
50 | MSSQL_PASSWORD=your_password # Required for SQL auth
51 | ```
52 |
53 | ### Windows Authentication
54 | ```bash
55 | MSSQL_SERVER=localhost
56 | MSSQL_DATABASE=your_database
57 | MSSQL_WINDOWS_AUTH=true # Use Windows credentials
58 | ```
59 |
60 | ### Azure SQL Database
61 | ```bash
62 | MSSQL_SERVER=your-server.database.windows.net
63 | MSSQL_DATABASE=your_database
64 | MSSQL_USER=your_username
65 | MSSQL_PASSWORD=your_password
66 | # Encryption is automatic for Azure
67 | ```
68 |
69 | ### Optional Settings
70 | ```bash
71 | MSSQL_PORT=1433 # Custom port (default: 1433)
72 | MSSQL_ENCRYPT=true # Force encryption
73 | ```
74 |
75 | ## Alternative Installation Methods
76 |
77 | ### Using pip
78 | ```bash
79 | pip install microsoft_sql_server_mcp
80 | ```
81 |
82 | Then in `claude_desktop_config.json`:
83 | ```json
84 | {
85 | "mcpServers": {
86 | "mssql": {
87 | "command": "python",
88 | "args": ["-m", "mssql_mcp_server"],
89 | "env": { ... }
90 | }
91 | }
92 | }
93 | ```
94 |
95 | ### Development
96 | ```bash
97 | git clone https://github.com/RichardHan/mssql_mcp_server.git
98 | cd mssql_mcp_server
99 | pip install -e .
100 | ```
101 |
102 | ## Security
103 |
104 | - Create a dedicated SQL user with minimal permissions
105 | - Never use admin/sa accounts
106 | - Use Windows Authentication when possible
107 | - Enable encryption for sensitive data
108 |
109 | ## License
110 |
111 | MIT
```
--------------------------------------------------------------------------------
/SECURITY.md:
--------------------------------------------------------------------------------
```markdown
1 | # Security Policy
2 |
3 | ## Reporting Security Issues
4 |
5 | If you discover a security vulnerability, please email [email protected] instead of using the public issue tracker.
6 |
7 | ## Security Best Practices
8 |
9 | When using this MCP server:
10 |
11 | 1. **Database User**: Create a dedicated SQL user with minimal permissions
12 | 2. **Never use sa/admin accounts** in production
13 | 3. **Use Windows Authentication** when possible
14 | 4. **Enable encryption** for sensitive data: `MSSQL_ENCRYPT=true`
15 | 5. **Restrict permissions** to only necessary tables and operations
16 |
17 | ## SQL Injection Protection
18 |
19 | This server includes built-in protection against SQL injection:
20 | - Table names are validated with strict regex patterns
21 | - All identifiers are properly escaped
22 | - User input is parameterized where possible
23 |
24 | ## Example: Minimal Permissions
25 |
26 | ```sql
27 | -- Create a restricted user
28 | CREATE LOGIN mcp_user WITH PASSWORD = 'StrongPassword123!';
29 | CREATE USER mcp_user FOR LOGIN mcp_user;
30 |
31 | -- Grant only necessary permissions
32 | GRANT SELECT ON Schema.TableName TO mcp_user;
33 | GRANT INSERT, UPDATE ON Schema.AuditLog TO mcp_user;
34 | ```
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
1 | mcp>=1.0.0
2 | pymssql>=2.2.7
3 |
```
--------------------------------------------------------------------------------
/pytest.ini:
--------------------------------------------------------------------------------
```
1 | [pytest]
2 | asyncio_mode = auto
3 | asyncio_default_fixture_loop_scope = function
4 | testpaths = tests
5 | python_files = test_*.py
```
--------------------------------------------------------------------------------
/src/mssql_mcp_server/__main__.py:
--------------------------------------------------------------------------------
```python
1 | """Entry point for running the module with python -m mssql_mcp_server."""
2 | from . import main
3 |
4 | if __name__ == "__main__":
5 | main()
```
--------------------------------------------------------------------------------
/src/mssql_mcp_server/__init__.py:
--------------------------------------------------------------------------------
```python
1 | from . import server
2 | import asyncio
3 |
4 | def main():
5 | """Main entry point for the package."""
6 | asyncio.run(server.main())
7 |
8 | # Expose important items at package level
9 | __all__ = ['main', 'server']
```
--------------------------------------------------------------------------------
/requirements-dev.txt:
--------------------------------------------------------------------------------
```
1 | # Testing
2 | pytest>=7.0.0
3 | pytest-asyncio>=0.23.0
4 | pytest-cov>=4.1.0
5 | pytest-timeout>=2.1.0
6 | pytest-xdist>=3.3.0
7 | pytest-mock>=3.11.0
8 |
9 | # Code Quality
10 | black>=23.0.0
11 | isort>=5.12.0
12 | mypy>=1.0.0
13 | ruff>=0.0.280
14 |
15 | # Security
16 | safety>=2.3.0
17 | bandit>=1.7.0
18 | pip-audit>=2.6.0
19 |
20 | # Performance Testing
21 | psutil>=5.9.0
22 |
23 | # Build and Release
24 | build>=1.0.0
25 | twine>=4.0.0
26 |
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
1 | FROM python:3.11-slim
2 |
3 | WORKDIR /app
4 |
5 | # Install system dependencies for pymssql
6 | RUN apt-get update && apt-get install -y \
7 | freetds-dev \
8 | && rm -rf /var/lib/apt/lists/*
9 |
10 | # Copy requirements
11 | COPY requirements.txt .
12 |
13 | # Install Python dependencies
14 | RUN pip install --no-cache-dir -r requirements.txt
15 |
16 | # Copy project files
17 | COPY . .
18 |
19 | # Run the MCP server
20 | CMD ["python", "-m", "mssql_mcp_server"]
```
--------------------------------------------------------------------------------
/docker-compose.example.yml:
--------------------------------------------------------------------------------
```yaml
1 | version: '3.8'
2 |
3 | services:
4 | mssql-mcp-server:
5 | build: .
6 | environment:
7 | - MSSQL_SERVER=sqlserver # Use service name for internal Docker network
8 | - MSSQL_DATABASE=TestDB
9 | - MSSQL_USER=sa
10 | - MSSQL_PASSWORD=YourStrong@Passw0rd
11 | - MSSQL_PORT=1433
12 | - MSSQL_ENCRYPT=false
13 | depends_on:
14 | - sqlserver
15 | stdin_open: true
16 | tty: true
17 |
18 | # Example SQL Server for testing
19 | sqlserver:
20 | image: mcr.microsoft.com/mssql/server:2022-latest
21 | environment:
22 | - ACCEPT_EULA=Y
23 | - SA_PASSWORD=YourStrong@Passw0rd
24 | - MSSQL_PID=Developer
25 | ports:
26 | - "1433:1433"
27 | volumes:
28 | - sqlserver-data:/var/opt/mssql
29 |
30 | volumes:
31 | sqlserver-data:
```
--------------------------------------------------------------------------------
/docker-compose.yml:
--------------------------------------------------------------------------------
```yaml
1 | version: "3.8"
2 |
3 | services:
4 | # SQL Server
5 | mssql:
6 | image: mcr.microsoft.com/mssql/server:2019-latest
7 | platform: linux/amd64
8 | environment:
9 | - ACCEPT_EULA=Y
10 | - MSSQL_SA_PASSWORD=${MSSQL_PASSWORD:-StrongPassword123!}
11 | ports:
12 | - "${HOST_SQL_PORT:-1434}:1433"
13 | healthcheck:
14 | test: /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P "${MSSQL_PASSWORD:-StrongPassword123!}" -Q "SELECT 1" || exit 1
15 | interval: 10s
16 | timeout: 3s
17 | retries: 10
18 | start_period: 10s
19 | volumes:
20 | - mssql_data:/var/opt/mssql
21 | mem_limit: ${SQL_MEMORY_LIMIT:-2g}
22 |
23 | # MCP Server
24 | mcp_server:
25 | build:
26 | context: .
27 | dockerfile: Dockerfile
28 | depends_on:
29 | mssql:
30 | condition: service_healthy
31 | environment:
32 | - MSSQL_SERVER=${MSSQL_SERVER:-mssql}
33 | - MSSQL_PORT=${MSSQL_PORT:-1433}
34 | - MSSQL_USER=${MSSQL_USER:-sa}
35 | - MSSQL_PASSWORD=${MSSQL_PASSWORD:-StrongPassword123!}
36 | - MSSQL_DATABASE=${MSSQL_DATABASE:-master}
37 | volumes:
38 | - .:/app
39 |
40 | volumes:
41 | mssql_data:
42 |
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [project]
2 | name = "microsoft_sql_server_mcp"
3 | version = "0.1.0"
4 | description = "A Model Context Protocol (MCP) server that enables secure interaction with Microsoft SQL Server databases."
5 | readme = "README.md"
6 | requires-python = ">=3.11"
7 | authors = [
8 | {name = "Richard Han", email = "[email protected]"}
9 | ]
10 | license = {text = "MIT"}
11 | keywords = ["mcp", "mssql", "sql-server", "database", "ai"]
12 | classifiers = [
13 | "Development Status :: 4 - Beta",
14 | "Intended Audience :: Developers",
15 | "License :: OSI Approved :: MIT License",
16 | "Programming Language :: Python :: 3",
17 | "Programming Language :: Python :: 3.11",
18 | "Programming Language :: Python :: 3.12",
19 | ]
20 | dependencies = [
21 | "mcp>=1.0.0",
22 | "pymssql>=2.2.8",
23 | ]
24 |
25 | [tool.mcp]
26 | system_dependencies.darwin = ["freetds"]
27 | system_dependencies.linux = ["freetds-dev"]
28 | system_dependencies.win32 = []
29 |
30 | [build-system]
31 | requires = ["hatchling"]
32 | build-backend = "hatchling.build"
33 |
34 | [project.scripts]
35 | mssql_mcp_server = "mssql_mcp_server:main"
36 |
37 | [tool.hatch.build.targets.wheel]
38 | packages = ["src/mssql_mcp_server"]
39 |
```
--------------------------------------------------------------------------------
/test_connection.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | """Test SQL Server connection using the same configuration as the MCP server."""
3 |
4 | import os
5 | import sys
6 | import pymssql
7 |
8 | # Add src to path to import our server module
9 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
10 |
11 | from mssql_mcp_server.server import get_db_config
12 |
13 | try:
14 | print("Loading database configuration from environment variables...")
15 | config = get_db_config()
16 |
17 | # Mask sensitive information for display
18 | display_config = config.copy()
19 | if 'password' in display_config:
20 | display_config['password'] = '***'
21 | print(f"Configuration: {display_config}")
22 |
23 | print("\nAttempting to connect to SQL Server...")
24 | conn = pymssql.connect(**config)
25 | cursor = conn.cursor()
26 | print("Connection successful!")
27 |
28 | print("\nTesting query execution...")
29 | cursor.execute("SELECT TOP 5 TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")
30 | rows = cursor.fetchall()
31 | print(f"Found {len(rows)} tables:")
32 | for row in rows:
33 | print(f" - {row[0]}")
34 |
35 | cursor.close()
36 | conn.close()
37 | print("\nConnection test completed successfully!")
38 | except Exception as e:
39 | print(f"Error: {str(e)}")
40 | import traceback
41 | traceback.print_exc()
42 |
```
--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------
```python
1 | # tests/conftest.py
2 | import pytest
3 | import os
4 | import pymssql
5 |
6 | @pytest.fixture(scope="session")
7 | def mssql_connection():
8 | """Create a test database connection."""
9 | try:
10 | connection = pymssql.connect(
11 | server=os.getenv("MSSQL_SERVER", "localhost"),
12 | user=os.getenv("MSSQL_USER", "sa"),
13 | password=os.getenv("MSSQL_PASSWORD", "testpassword"),
14 | database=os.getenv("MSSQL_DATABASE", "test_db")
15 | )
16 |
17 | # Create a test table
18 | cursor = connection.cursor()
19 | cursor.execute("""
20 | IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'test_table')
21 | CREATE TABLE test_table (
22 | id INT IDENTITY(1,1) PRIMARY KEY,
23 | name VARCHAR(255),
24 | value INT
25 | )
26 | """)
27 | connection.commit()
28 |
29 | yield connection
30 |
31 | # Cleanup
32 | cursor.execute("DROP TABLE IF EXISTS test_table")
33 | connection.commit()
34 | cursor.close()
35 | connection.close()
36 |
37 | except pymssql.Error as e:
38 | pytest.fail(f"Failed to connect to SQL Server: {e}")
39 |
40 | @pytest.fixture(scope="session")
41 | def mssql_cursor(mssql_connection):
42 | """Create a test cursor."""
43 | cursor = mssql_connection.cursor()
44 | yield cursor
45 | cursor.close()
```
--------------------------------------------------------------------------------
/.github/workflows/security.yml:
--------------------------------------------------------------------------------
```yaml
1 | name: Security Scan
2 |
3 | on:
4 | schedule:
5 | - cron: '0 0 * * 1' # Weekly on Monday
6 | push:
7 | branches: [ main ]
8 | pull_request:
9 | branches: [ main ]
10 | workflow_dispatch:
11 |
12 | permissions:
13 | contents: read
14 | security-events: write
15 |
16 | jobs:
17 | dependency-check:
18 | name: Dependency Security Check
19 | runs-on: ubuntu-latest
20 |
21 | steps:
22 | - uses: actions/checkout@v4
23 |
24 | - name: Set up Python
25 | uses: actions/setup-python@v5
26 | with:
27 | python-version: '3.11'
28 |
29 | - name: Install dependencies
30 | run: |
31 | python -m pip install --upgrade pip
32 | pip install safety pip-audit
33 | pip install -r requirements.txt
34 |
35 | - name: Run Safety check
36 | run: |
37 | safety check --json --output safety-report.json || true
38 |
39 | - name: Run pip-audit
40 | run: |
41 | pip-audit --format json --output pip-audit-report.json || true
42 |
43 | - name: Upload security reports
44 | uses: actions/upload-artifact@v4
45 | if: always()
46 | with:
47 | name: dependency-security-reports
48 | path: |
49 | safety-report.json
50 | pip-audit-report.json
51 |
52 | codeql-analysis:
53 | name: CodeQL Analysis
54 | runs-on: ubuntu-latest
55 |
56 | steps:
57 | - uses: actions/checkout@v4
58 |
59 | - name: Initialize CodeQL
60 | uses: github/codeql-action/init@v3
61 | with:
62 | languages: python
63 |
64 | - name: Autobuild
65 | uses: github/codeql-action/autobuild@v3
66 |
67 | - name: Perform CodeQL Analysis
68 | uses: github/codeql-action/analyze@v3
```
--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------
```python
1 | import pytest
2 | from mssql_mcp_server.server import app, list_tools, list_resources, read_resource, call_tool
3 | from pydantic import AnyUrl
4 |
5 | def test_server_initialization():
6 | """Test that the server initializes correctly."""
7 | assert app.name == "mssql_mcp_server"
8 |
9 | @pytest.mark.asyncio
10 | async def test_list_tools():
11 | """Test that list_tools returns expected tools."""
12 | tools = await list_tools()
13 | assert len(tools) == 1
14 | assert tools[0].name == "execute_sql"
15 | assert "query" in tools[0].inputSchema["properties"]
16 |
17 | @pytest.mark.asyncio
18 | async def test_call_tool_invalid_name():
19 | """Test calling a tool with an invalid name."""
20 | with pytest.raises(ValueError, match="Unknown tool"):
21 | await call_tool("invalid_tool", {})
22 |
23 | @pytest.mark.asyncio
24 | async def test_call_tool_missing_query():
25 | """Test calling execute_sql without a query."""
26 | with pytest.raises(ValueError, match="Query is required"):
27 | await call_tool("execute_sql", {})
28 |
29 | # Skip database-dependent tests if no database connection
30 | @pytest.mark.asyncio
31 | @pytest.mark.skipif(
32 | not all([
33 | pytest.importorskip("pymssql"),
34 | pytest.importorskip("mssql_mcp_server")
35 | ]),
36 | reason="SQL Server connection not available"
37 | )
38 | async def test_list_resources():
39 | """Test listing resources (requires database connection)."""
40 | try:
41 | resources = await list_resources()
42 | assert isinstance(resources, list)
43 | except ValueError as e:
44 | if "Missing required database configuration" in str(e):
45 | pytest.skip("Database configuration not available")
46 | raise
47 |
```
--------------------------------------------------------------------------------
/.github/workflows/release.yml:
--------------------------------------------------------------------------------
```yaml
1 | name: Create Release
2 |
3 | on:
4 | push:
5 | tags:
6 | - 'v*' # Trigger on version tags like v1.0.0
7 | workflow_dispatch:
8 | inputs:
9 | version:
10 | description: 'Version to release (e.g., 1.0.0)'
11 | required: true
12 | type: string
13 |
14 | jobs:
15 | create-release:
16 | name: Create GitHub Release
17 | runs-on: ubuntu-latest
18 | permissions:
19 | contents: write
20 |
21 | steps:
22 | - uses: actions/checkout@v4
23 | with:
24 | fetch-depth: 0 # Get all history for changelog
25 |
26 | - name: Set up Python
27 | uses: actions/setup-python@v5
28 | with:
29 | python-version: '3.11'
30 |
31 | - name: Generate changelog
32 | id: changelog
33 | run: |
34 | # Generate changelog from git history
35 | echo "## What's Changed" > RELEASE_NOTES.md
36 | echo "" >> RELEASE_NOTES.md
37 |
38 | # Get commits since last tag
39 | LAST_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "")
40 | if [ -z "$LAST_TAG" ]; then
41 | git log --pretty=format:"* %s (%h)" >> RELEASE_NOTES.md
42 | else
43 | git log ${LAST_TAG}..HEAD --pretty=format:"* %s (%h)" >> RELEASE_NOTES.md
44 | fi
45 |
46 | echo "" >> RELEASE_NOTES.md
47 | echo "**Full Changelog**: https://github.com/${{ github.repository }}/compare/${LAST_TAG}...v${{ github.event.inputs.version || github.ref_name }}" >> RELEASE_NOTES.md
48 |
49 | - name: Create Release
50 | env:
51 | GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
52 | run: |
53 | VERSION="${{ github.event.inputs.version || github.ref_name }}"
54 | VERSION="${VERSION#v}" # Remove 'v' prefix if present
55 |
56 | gh release create "v${VERSION}" \
57 | --title "Release v${VERSION}" \
58 | --notes-file RELEASE_NOTES.md \
59 | --draft
```
--------------------------------------------------------------------------------
/.github/workflows/publish.yml:
--------------------------------------------------------------------------------
```yaml
1 | name: Publish to PyPI
2 |
3 | on:
4 | release:
5 | types: [published]
6 | workflow_dispatch:
7 | inputs:
8 | test_pypi:
9 | description: 'Publish to Test PyPI first'
10 | required: false
11 | default: true
12 | type: boolean
13 |
14 | jobs:
15 | build:
16 | name: Build distribution packages
17 | runs-on: ubuntu-latest
18 |
19 | steps:
20 | - uses: actions/checkout@v4
21 |
22 | - name: Set up Python
23 | uses: actions/setup-python@v5
24 | with:
25 | python-version: '3.11'
26 |
27 | - name: Install build dependencies
28 | run: |
29 | python -m pip install --upgrade pip
30 | pip install hatch
31 |
32 | - name: Build package
33 | run: hatch build
34 |
35 | - name: Store the distribution packages
36 | uses: actions/upload-artifact@v4
37 | with:
38 | name: python-package-distributions
39 | path: dist/
40 |
41 | publish-to-test-pypi:
42 | name: Publish to Test PyPI
43 | if: github.event_name == 'workflow_dispatch' && github.event.inputs.test_pypi == 'true'
44 | needs: build
45 | runs-on: ubuntu-latest
46 |
47 | environment:
48 | name: test-pypi
49 | url: https://test.pypi.org/p/microsoft_sql_server_mcp
50 |
51 | permissions:
52 | id-token: write # IMPORTANT: mandatory for trusted publishing
53 |
54 | steps:
55 | - name: Download all the dists
56 | uses: actions/download-artifact@v4
57 | with:
58 | name: python-package-distributions
59 | path: dist/
60 |
61 | - name: Publish to Test PyPI
62 | uses: pypa/gh-action-pypi-publish@release/v1
63 | with:
64 | repository-url: https://test.pypi.org/legacy/
65 | skip-existing: true
66 |
67 | publish-to-pypi:
68 | name: Publish to PyPI
69 | if: github.event_name == 'release' || (github.event_name == 'workflow_dispatch' && github.event.inputs.test_pypi == 'false')
70 | needs: build
71 | runs-on: ubuntu-latest
72 |
73 | environment:
74 | name: pypi
75 | url: https://pypi.org/p/microsoft_sql_server_mcp
76 |
77 | permissions:
78 | id-token: write # IMPORTANT: mandatory for trusted publishing
79 |
80 | steps:
81 | - name: Download all the dists
82 | uses: actions/download-artifact@v4
83 | with:
84 | name: python-package-distributions
85 | path: dist/
86 |
87 | - name: Publish to PyPI
88 | uses: pypa/gh-action-pypi-publish@release/v1
```
--------------------------------------------------------------------------------
/.github/workflows/ci.yml:
--------------------------------------------------------------------------------
```yaml
1 | name: CI/CD Pipeline
2 |
3 | on:
4 | push:
5 | branches: [ main, develop ]
6 | pull_request:
7 | branches: [ main ]
8 | workflow_dispatch:
9 |
10 | jobs:
11 | test:
12 | name: Test Python ${{ matrix.python-version }}
13 | runs-on: ${{ matrix.os }}
14 | strategy:
15 | fail-fast: false
16 | matrix:
17 | os: [ubuntu-latest, windows-latest, macos-latest]
18 | python-version: ['3.11', '3.12']
19 |
20 | steps:
21 | - uses: actions/checkout@v4
22 |
23 | - name: Set up Python ${{ matrix.python-version }}
24 | uses: actions/setup-python@v5
25 | with:
26 | python-version: ${{ matrix.python-version }}
27 |
28 | - name: Install system dependencies (Ubuntu)
29 | if: matrix.os == 'ubuntu-latest'
30 | run: |
31 | sudo apt-get update
32 | sudo apt-get install -y freetds-dev
33 |
34 | - name: Install system dependencies (macOS)
35 | if: matrix.os == 'macos-latest'
36 | run: |
37 | brew install freetds
38 |
39 | - name: Install dependencies
40 | run: |
41 | python -m pip install --upgrade pip
42 | pip install -r requirements.txt
43 | pip install -r requirements-dev.txt
44 | pip install -e .
45 |
46 | - name: Lint with ruff
47 | run: |
48 | pip install ruff
49 | ruff check src tests
50 |
51 | - name: Type check with mypy
52 | run: |
53 | pip install mypy
54 | mypy src --ignore-missing-imports
55 |
56 | - name: Test with pytest
57 | run: |
58 | pytest tests -v --tb=short
59 | env:
60 | MSSQL_SERVER: localhost
61 | MSSQL_USER: test
62 | MSSQL_PASSWORD: test
63 | MSSQL_DATABASE: test
64 |
65 | - name: Check package build
66 | run: |
67 | pip install hatch
68 | hatch build
69 | ls -la dist/
70 |
71 | security-scan:
72 | name: Security Scan
73 | runs-on: ubuntu-latest
74 |
75 | steps:
76 | - uses: actions/checkout@v4
77 |
78 | - name: Set up Python
79 | uses: actions/setup-python@v5
80 | with:
81 | python-version: '3.11'
82 |
83 | - name: Install dependencies
84 | run: |
85 | python -m pip install --upgrade pip
86 | pip install bandit[toml] safety
87 |
88 | - name: Run Bandit security scan
89 | run: bandit -r src -f json -o bandit-report.json || true
90 |
91 | - name: Run Safety check
92 | run: |
93 | pip install -r requirements.txt
94 | safety check --json || true
95 |
96 | - name: Upload security reports
97 | uses: actions/upload-artifact@v4
98 | if: always()
99 | with:
100 | name: security-reports
101 | path: |
102 | bandit-report.json
103 |
104 | docker-build:
105 | name: Docker Build Test
106 | runs-on: ubuntu-latest
107 |
108 | steps:
109 | - uses: actions/checkout@v4
110 |
111 | - name: Set up Docker Buildx
112 | uses: docker/setup-buildx-action@v3
113 |
114 | - name: Build Docker image
115 | run: |
116 | docker build -t mssql-mcp-server:test .
117 |
118 | - name: Test Docker image
119 | run: |
120 | docker run --rm mssql-mcp-server:test python --version
```
--------------------------------------------------------------------------------
/close_fixed_issues.sh:
--------------------------------------------------------------------------------
```bash
1 | #!/bin/bash
2 |
3 | # Script to close fixed issues with appropriate comments
4 |
5 | echo "Closing fixed issues..."
6 |
7 | # Issue #8: Support MSSQL_PORT
8 | echo "Closing issue #8: Support MSSQL_PORT"
9 | gh issue close 8 --comment "This has been implemented! You can now use the \`MSSQL_PORT\` environment variable:
10 |
11 | \`\`\`bash
12 | MSSQL_PORT=1433 # Or any custom port
13 | \`\`\`
14 |
15 | The implementation includes:
16 | - Default port of 1433 if not specified
17 | - Proper integer conversion with error handling
18 | - Full support for non-standard SQL Server ports
19 |
20 | See the updated README for configuration examples."
21 |
22 | # Issue #7: Windows Authentication
23 | echo "Closing issue #7: Windows Authentication"
24 | gh issue close 7 --comment "Windows Authentication is now fully supported! Simply set:
25 |
26 | \`\`\`bash
27 | MSSQL_WINDOWS_AUTH=true
28 | \`\`\`
29 |
30 | When enabled:
31 | - No need to provide MSSQL_USER or MSSQL_PASSWORD
32 | - The server will use Windows integrated authentication
33 | - Works with both standard SQL Server and LocalDB
34 |
35 | See the README for complete configuration examples."
36 |
37 | # Issue #6: SQL Local DB
38 | echo "Closing issue #6: SQL Local DB"
39 | gh issue close 6 --comment "LocalDB support has been implemented! The server now automatically detects and handles LocalDB connections:
40 |
41 | \`\`\`bash
42 | MSSQL_SERVER=(localdb)\\MSSQLLocalDB
43 | MSSQL_DATABASE=your_database
44 | MSSQL_WINDOWS_AUTH=true # LocalDB typically uses Windows Auth
45 | \`\`\`
46 |
47 | The implementation automatically converts LocalDB format to pymssql-compatible format."
48 |
49 | # Issue #11: Azure SQL encryption
50 | echo "Closing issue #11: Azure SQL encryption"
51 | gh issue close 11 --comment "Azure SQL Database connections are now fully supported with automatic encryption handling!
52 |
53 | The server automatically:
54 | - Detects Azure SQL connections (by checking for \`.database.windows.net\`)
55 | - Enables encryption automatically for Azure SQL
56 | - Sets the required TDS version (7.4)
57 |
58 | For non-Azure connections, you can control encryption with:
59 | \`\`\`bash
60 | MSSQL_ENCRYPT=true # or false
61 | \`\`\`
62 |
63 | See the README for Azure SQL configuration examples."
64 |
65 | # Issue #4: Docker support
66 | echo "Closing issue #4: Docker support"
67 | gh issue close 4 --comment "Docker support has been added! The repository now includes:
68 |
69 | - \`Dockerfile\` for containerizing the MCP server
70 | - \`docker-compose.yml\` with SQL Server 2019 for testing
71 | - Comprehensive \`Makefile\` with Docker commands
72 |
73 | Quick start:
74 | \`\`\`bash
75 | make docker-build
76 | make docker-up
77 | \`\`\`
78 |
79 | See the README for complete Docker documentation."
80 |
81 | # Issue #12: MSSQL_SERVER configuration
82 | echo "Closing issue #12: MSSQL_SERVER configuration"
83 | gh issue close 12 --comment "This issue has been resolved! The server now:
84 |
85 | 1. Properly reads the \`MSSQL_SERVER\` environment variable
86 | 2. Logs the server being used for debugging
87 | 3. Only defaults to \"localhost\" if MSSQL_SERVER is not set
88 |
89 | The fix includes enhanced logging to help debug connection issues. If you're still experiencing problems, please check that the environment variable is properly set in your configuration."
90 |
91 | echo "Done! Fixed issues have been closed."
```
--------------------------------------------------------------------------------
/run_tests.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | """Comprehensive test runner for MSSQL MCP Server."""
3 |
4 | import sys
5 | import subprocess
6 | import argparse
7 | from pathlib import Path
8 |
9 | def run_command(cmd, description):
10 | """Run a command and handle output."""
11 | print(f"\n{'='*60}")
12 | print(f"Running: {description}")
13 | print(f"Command: {' '.join(cmd)}")
14 | print('='*60)
15 |
16 | result = subprocess.run(cmd, capture_output=False)
17 | if result.returncode != 0:
18 | print(f"❌ {description} failed with return code {result.returncode}")
19 | return False
20 | print(f"✅ {description} passed")
21 | return True
22 |
23 | def main():
24 | parser = argparse.ArgumentParser(description="Run MSSQL MCP Server tests")
25 | parser.add_argument('--suite', choices=['all', 'unit', 'security', 'integration', 'performance', 'quality'],
26 | default='all', help='Test suite to run')
27 | parser.add_argument('--coverage', action='store_true', help='Generate coverage report')
28 | parser.add_argument('--parallel', action='store_true', help='Run tests in parallel')
29 | parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
30 |
31 | args = parser.parse_args()
32 |
33 | # Base pytest command
34 | pytest_cmd = ['pytest']
35 | if args.verbose:
36 | pytest_cmd.append('-v')
37 | if args.parallel:
38 | pytest_cmd.extend(['-n', 'auto'])
39 | if args.coverage:
40 | pytest_cmd.extend(['--cov=src/mssql_mcp_server', '--cov-report=html', '--cov-report=term'])
41 |
42 | success = True
43 |
44 | if args.suite in ['all', 'quality']:
45 | # Code quality checks
46 | print("\n🔍 Running code quality checks...")
47 |
48 | if not run_command(['black', '--check', 'src', 'tests'], "Black formatting check"):
49 | success = False
50 |
51 | if not run_command(['ruff', 'check', 'src', 'tests'], "Ruff linting"):
52 | success = False
53 |
54 | if not run_command(['mypy', 'src', '--ignore-missing-imports'], "MyPy type checking"):
55 | success = False
56 |
57 | if args.suite in ['all', 'unit']:
58 | # Unit tests
59 | print("\n🧪 Running unit tests...")
60 | cmd = pytest_cmd + ['tests/test_config.py', 'tests/test_server.py']
61 | if not run_command(cmd, "Unit tests"):
62 | success = False
63 |
64 | if args.suite in ['all', 'security']:
65 | # Security tests
66 | print("\n🔒 Running security tests...")
67 | cmd = pytest_cmd + ['tests/test_security.py']
68 | if not run_command(cmd, "Security tests"):
69 | success = False
70 |
71 | # Run security scanning
72 | print("\n🔍 Running security scans...")
73 | if not run_command(['safety', 'check'], "Safety dependency check"):
74 | print("⚠️ Security vulnerabilities found in dependencies")
75 |
76 | if not run_command(['bandit', '-r', 'src', '-f', 'json', '-o', 'bandit-report.json'],
77 | "Bandit security scan"):
78 | print("⚠️ Security issues found in code")
79 |
80 | if args.suite in ['all', 'integration']:
81 | # Integration tests
82 | print("\n🔗 Running integration tests...")
83 | cmd = pytest_cmd + ['tests/test_integration.py', 'tests/test_error_handling.py']
84 | if not run_command(cmd, "Integration tests"):
85 | success = False
86 |
87 | if args.suite in ['all', 'performance']:
88 | # Performance tests
89 | print("\n⚡ Running performance tests...")
90 | cmd = pytest_cmd + ['tests/test_performance.py', '-s']
91 | if not run_command(cmd, "Performance tests"):
92 | success = False
93 |
94 | # Summary
95 | print(f"\n{'='*60}")
96 | if success:
97 | print("✅ All tests passed!")
98 | if args.coverage:
99 | print("📊 Coverage report generated in htmlcov/")
100 | else:
101 | print("❌ Some tests failed. Please review the output above.")
102 | sys.exit(1)
103 |
104 | if __name__ == "__main__":
105 | main()
```
--------------------------------------------------------------------------------
/tests/test_config.py:
--------------------------------------------------------------------------------
```python
1 | """Test database configuration and environment variable handling."""
2 | import pytest
3 | import os
4 | from unittest.mock import patch
5 | from mssql_mcp_server.server import get_db_config, validate_table_name
6 |
7 |
8 | class TestDatabaseConfiguration:
9 | """Test database configuration from environment variables."""
10 |
11 | def test_default_configuration(self):
12 | """Test default configuration values."""
13 | with patch.dict(os.environ, {
14 | 'MSSQL_USER': 'testuser',
15 | 'MSSQL_PASSWORD': 'testpass',
16 | 'MSSQL_DATABASE': 'testdb'
17 | }, clear=True):
18 | config = get_db_config()
19 | assert config['server'] == 'localhost'
20 | assert config['user'] == 'testuser'
21 | assert config['password'] == 'testpass'
22 | assert config['database'] == 'testdb'
23 | assert 'port' not in config
24 |
25 | def test_custom_server_and_port(self):
26 | """Test custom server and port configuration."""
27 | with patch.dict(os.environ, {
28 | 'MSSQL_SERVER': 'custom-server.com',
29 | 'MSSQL_PORT': '1433',
30 | 'MSSQL_USER': 'testuser',
31 | 'MSSQL_PASSWORD': 'testpass',
32 | 'MSSQL_DATABASE': 'testdb'
33 | }):
34 | config = get_db_config()
35 | assert config['server'] == 'custom-server.com'
36 | assert config['port'] == 1433
37 |
38 | def test_invalid_port(self):
39 | """Test invalid port handling."""
40 | with patch.dict(os.environ, {
41 | 'MSSQL_PORT': 'invalid',
42 | 'MSSQL_USER': 'testuser',
43 | 'MSSQL_PASSWORD': 'testpass',
44 | 'MSSQL_DATABASE': 'testdb'
45 | }):
46 | config = get_db_config()
47 | assert 'port' not in config # Invalid port should be ignored
48 |
49 | def test_azure_sql_configuration(self):
50 | """Test Azure SQL automatic encryption configuration."""
51 | with patch.dict(os.environ, {
52 | 'MSSQL_SERVER': 'myserver.database.windows.net',
53 | 'MSSQL_USER': 'testuser',
54 | 'MSSQL_PASSWORD': 'testpass',
55 | 'MSSQL_DATABASE': 'testdb'
56 | }):
57 | config = get_db_config()
58 | assert config['encrypt'] == True
59 | assert config['tds_version'] == '7.4'
60 |
61 | def test_localdb_configuration(self):
62 | """Test LocalDB connection string conversion."""
63 | with patch.dict(os.environ, {
64 | 'MSSQL_SERVER': '(localdb)\\MSSQLLocalDB',
65 | 'MSSQL_DATABASE': 'testdb',
66 | 'MSSQL_WINDOWS_AUTH': 'true'
67 | }):
68 | config = get_db_config()
69 | assert config['server'] == '.\\MSSQLLocalDB'
70 | assert 'user' not in config
71 | assert 'password' not in config
72 |
73 | def test_windows_authentication(self):
74 | """Test Windows authentication configuration."""
75 | with patch.dict(os.environ, {
76 | 'MSSQL_SERVER': 'localhost',
77 | 'MSSQL_DATABASE': 'testdb',
78 | 'MSSQL_WINDOWS_AUTH': 'true'
79 | }):
80 | config = get_db_config()
81 | assert 'user' not in config
82 | assert 'password' not in config
83 |
84 | def test_missing_required_config_sql_auth(self):
85 | """Test missing required configuration for SQL authentication."""
86 | with patch.dict(os.environ, {
87 | 'MSSQL_SERVER': 'localhost'
88 | }, clear=True):
89 | with pytest.raises(ValueError, match="Missing required database configuration"):
90 | get_db_config()
91 |
92 | def test_missing_database_windows_auth(self):
93 | """Test missing database for Windows authentication."""
94 | with patch.dict(os.environ, {
95 | 'MSSQL_WINDOWS_AUTH': 'true'
96 | }, clear=True):
97 | with pytest.raises(ValueError, match="Missing required database configuration"):
98 | get_db_config()
99 |
100 | def test_encryption_settings(self):
101 | """Test various encryption settings."""
102 | # Non-Azure with encryption
103 | with patch.dict(os.environ, {
104 | 'MSSQL_SERVER': 'localhost',
105 | 'MSSQL_ENCRYPT': 'true',
106 | 'MSSQL_USER': 'testuser',
107 | 'MSSQL_PASSWORD': 'testpass',
108 | 'MSSQL_DATABASE': 'testdb'
109 | }):
110 | config = get_db_config()
111 | assert config['encrypt'] == True
112 |
113 | # Non-Azure without encryption (default)
114 | with patch.dict(os.environ, {
115 | 'MSSQL_SERVER': 'localhost',
116 | 'MSSQL_USER': 'testuser',
117 | 'MSSQL_PASSWORD': 'testpass',
118 | 'MSSQL_DATABASE': 'testdb'
119 | }):
120 | config = get_db_config()
121 | assert config['encrypt'] == False
122 |
123 |
124 | class TestTableNameValidation:
125 | """Test SQL table name validation and escaping."""
126 |
127 | def test_valid_table_names(self):
128 | """Test validation of valid table names."""
129 | valid_names = [
130 | 'users',
131 | 'UserAccounts',
132 | 'user_accounts',
133 | 'table123',
134 | 'dbo.users',
135 | 'schema_name.table_name'
136 | ]
137 |
138 | for name in valid_names:
139 | escaped = validate_table_name(name)
140 | assert escaped is not None
141 | assert '[' in escaped and ']' in escaped
142 |
143 | def test_invalid_table_names(self):
144 | """Test rejection of invalid table names."""
145 | invalid_names = [
146 | 'users; DROP TABLE users', # SQL injection
147 | 'users OR 1=1', # SQL injection
148 | 'users--', # SQL comment
149 | 'users/*comment*/', # SQL comment
150 | 'users\'', # Quote
151 | 'users"', # Double quote
152 | 'schema.name.table', # Too many dots
153 | 'user@table', # Invalid character
154 | 'user#table', # Invalid character
155 | '', # Empty
156 | '.', # Just dot
157 | '..', # Double dot
158 | ]
159 |
160 | for name in invalid_names:
161 | with pytest.raises(ValueError, match="Invalid table name"):
162 | validate_table_name(name)
163 |
164 | def test_table_name_escaping(self):
165 | """Test proper escaping of table names."""
166 | assert validate_table_name('users') == '[users]'
167 | assert validate_table_name('dbo.users') == '[dbo].[users]'
168 | assert validate_table_name('my_table_123') == '[my_table_123]'
```
--------------------------------------------------------------------------------
/tests/test_security.py:
--------------------------------------------------------------------------------
```python
1 | """Security tests for SQL injection prevention and safe query handling."""
2 | import pytest
3 | from unittest.mock import Mock, patch, AsyncMock
4 | from mssql_mcp_server.server import validate_table_name, read_resource, call_tool
5 | from pydantic import AnyUrl
6 | from mcp.types import TextContent
7 |
8 |
9 | class TestSQLInjectionPrevention:
10 | """Test SQL injection prevention measures."""
11 |
12 | @pytest.mark.asyncio
13 | async def test_sql_injection_in_table_names(self):
14 | """Test that SQL injection attempts in table names are blocked."""
15 | malicious_uris = [
16 | "mssql://users; DROP TABLE users--/data",
17 | "mssql://users' OR '1'='1/data",
18 | "mssql://users/**/UNION/**/SELECT/**/password/data",
19 | "mssql://users%20OR%201=1/data",
20 | ]
21 |
22 | with patch.dict('os.environ', {
23 | 'MSSQL_USER': 'test',
24 | 'MSSQL_PASSWORD': 'test',
25 | 'MSSQL_DATABASE': 'test'
26 | }):
27 | for uri in malicious_uris:
28 | with pytest.raises((ValueError, RuntimeError)):
29 | await read_resource(AnyUrl(uri))
30 |
31 | @pytest.mark.asyncio
32 | async def test_safe_query_execution(self):
33 | """Test that only safe queries are executed."""
34 | # Mock the database connection
35 | mock_cursor = Mock()
36 | mock_conn = Mock()
37 | mock_conn.cursor.return_value = mock_cursor
38 |
39 | with patch('pymssql.connect', return_value=mock_conn):
40 | with patch.dict('os.environ', {
41 | 'MSSQL_USER': 'test',
42 | 'MSSQL_PASSWORD': 'test',
43 | 'MSSQL_DATABASE': 'test'
44 | }):
45 | # Test safe table read
46 | uri = AnyUrl("mssql://users/data")
47 | mock_cursor.description = [('id',), ('name',)]
48 | mock_cursor.fetchall.return_value = [(1, 'John'), (2, 'Jane')]
49 |
50 | result = await read_resource(uri)
51 |
52 | # Verify the query was escaped properly
53 | executed_query = mock_cursor.execute.call_args[0][0]
54 | assert '[users]' in executed_query
55 | assert 'SELECT TOP 100 * FROM [users]' == executed_query
56 |
57 | def test_parameterized_queries(self):
58 | """Ensure queries use parameters where user input is involved."""
59 | # This is a design consideration test
60 | # The current implementation doesn't use parameterized queries for table names
61 | # because table names can't be parameterized in SQL
62 | # Instead, we validate and escape them
63 | pass
64 |
65 | @pytest.mark.asyncio
66 | async def test_query_result_sanitization(self):
67 | """Test that query results don't expose sensitive information."""
68 | mock_cursor = Mock()
69 | mock_conn = Mock()
70 | mock_conn.cursor.return_value = mock_cursor
71 |
72 | with patch('pymssql.connect', return_value=mock_conn):
73 | with patch.dict('os.environ', {
74 | 'MSSQL_USER': 'test',
75 | 'MSSQL_PASSWORD': 'test',
76 | 'MSSQL_DATABASE': 'test'
77 | }):
78 | # Test that passwords or sensitive data aren't exposed in errors
79 | mock_cursor.execute.side_effect = Exception("Login failed for user 'sa' with password 'secret123'")
80 |
81 | result = await call_tool("execute_sql", {"query": "SELECT * FROM users"})
82 |
83 | # Verify sensitive info is not in the error message
84 | assert isinstance(result, list)
85 | assert len(result) == 1
86 | assert isinstance(result[0], TextContent)
87 | assert 'secret123' not in result[0].text
88 | assert 'Error executing query' in result[0].text
89 |
90 |
91 | class TestInputValidation:
92 | """Test input validation for all user inputs."""
93 |
94 | @pytest.mark.asyncio
95 | async def test_tool_argument_validation(self):
96 | """Test that tool arguments are properly validated."""
97 | # Test with various invalid inputs
98 | invalid_inputs = [
99 | {}, # Empty
100 | {"query": ""}, # Empty query
101 | {"query": None}, # None query
102 | {"query": {"$ne": None}}, # NoSQL injection attempt
103 | ]
104 |
105 | with patch.dict('os.environ', {
106 | 'MSSQL_USER': 'test',
107 | 'MSSQL_PASSWORD': 'test',
108 | 'MSSQL_DATABASE': 'test'
109 | }):
110 | for invalid_input in invalid_inputs:
111 | with pytest.raises(ValueError):
112 | await call_tool("execute_sql", invalid_input)
113 |
114 | def test_environment_variable_validation(self):
115 | """Test that environment variables are validated."""
116 | # Test with potentially dangerous environment values
117 | dangerous_values = {
118 | 'MSSQL_SERVER': 'localhost; exec xp_cmdshell "whoami"',
119 | 'MSSQL_DATABASE': 'test; DROP DATABASE test',
120 | 'MSSQL_USER': 'admin\'--',
121 | }
122 |
123 | with patch.dict('os.environ', dangerous_values):
124 | # The connection should fail safely without executing malicious code
125 | # This tests that pymssql properly handles these values
126 | pass
127 |
128 |
129 | class TestResourceAccessControl:
130 | """Test resource access control and permissions."""
131 |
132 | @pytest.mark.asyncio
133 | async def test_system_table_access_restriction(self):
134 | """Test that system tables are not exposed as resources."""
135 | mock_cursor = Mock()
136 | mock_conn = Mock()
137 | mock_conn.cursor.return_value = mock_cursor
138 |
139 | # Simulate database returning both user and system tables
140 | mock_cursor.fetchall.return_value = [
141 | ('users',),
142 | ('sys.objects',), # System table
143 | ('INFORMATION_SCHEMA.TABLES',), # System view
144 | ('products',),
145 | ]
146 |
147 | with patch('pymssql.connect', return_value=mock_conn):
148 | with patch.dict('os.environ', {
149 | 'MSSQL_USER': 'test',
150 | 'MSSQL_PASSWORD': 'test',
151 | 'MSSQL_DATABASE': 'test'
152 | }):
153 | from mssql_mcp_server.server import list_resources
154 | resources = await list_resources()
155 |
156 | # Verify system tables are filtered out (if implemented)
157 | # Currently the query uses INFORMATION_SCHEMA which should only return user tables
158 | resource_names = [r.name for r in resources]
159 | assert len(resources) == 4 # All tables are returned currently
160 |
161 | @pytest.mark.asyncio
162 | async def test_query_permissions(self):
163 | """Test that dangerous queries are handled safely."""
164 | dangerous_queries = [
165 | "DROP TABLE users",
166 | "CREATE LOGIN hacker WITH PASSWORD = 'password'",
167 | "EXEC xp_cmdshell 'dir'",
168 | "ALTER SERVER ROLE sysadmin ADD MEMBER hacker",
169 | ]
170 |
171 | mock_cursor = Mock()
172 | mock_conn = Mock()
173 | mock_conn.cursor.return_value = mock_cursor
174 |
175 | with patch('pymssql.connect', return_value=mock_conn):
176 | with patch.dict('os.environ', {
177 | 'MSSQL_USER': 'test',
178 | 'MSSQL_PASSWORD': 'test',
179 | 'MSSQL_DATABASE': 'test'
180 | }):
181 | for query in dangerous_queries:
182 | # The queries will be executed (current implementation doesn't block them)
183 | # but we ensure errors are handled gracefully
184 | mock_cursor.execute.side_effect = Exception("Permission denied")
185 | result = await call_tool("execute_sql", {"query": query})
186 |
187 | assert len(result) == 1
188 | assert "Error executing query" in result[0].text
```
--------------------------------------------------------------------------------
/tests/test_integration.py:
--------------------------------------------------------------------------------
```python
1 | """Integration tests for MCP protocol communication and end-to-end functionality."""
2 | import pytest
3 | import asyncio
4 | import json
5 | from unittest.mock import Mock, patch, AsyncMock
6 | from mcp.server.stdio import stdio_server
7 | from mcp.types import TextContent, Resource, Tool
8 | from mssql_mcp_server.server import app
9 |
10 |
11 | class TestMCPProtocolIntegration:
12 | """Test MCP protocol integration and communication."""
13 |
14 | @pytest.mark.asyncio
15 | async def test_server_initialization_options(self):
16 | """Test server initialization with proper options."""
17 | init_options = app.create_initialization_options()
18 |
19 | assert init_options.server_name == "mssql_mcp_server"
20 | assert init_options.server_version is not None
21 | assert hasattr(init_options, 'capabilities')
22 |
23 | @pytest.mark.asyncio
24 | async def test_full_mcp_lifecycle(self):
25 | """Test complete MCP server lifecycle from init to shutdown."""
26 | # Mock the stdio streams
27 | mock_read_stream = AsyncMock()
28 | mock_write_stream = AsyncMock()
29 |
30 | # Mock database connection
31 | mock_conn = Mock()
32 | mock_cursor = Mock()
33 | mock_conn.cursor.return_value = mock_cursor
34 |
35 | with patch('pymssql.connect', return_value=mock_conn):
36 | with patch.dict('os.environ', {
37 | 'MSSQL_USER': 'test',
38 | 'MSSQL_PASSWORD': 'test',
39 | 'MSSQL_DATABASE': 'testdb'
40 | }):
41 | # Test resource listing
42 | mock_cursor.fetchall.return_value = [('users',), ('products',)]
43 | resources = await app.list_resources()
44 |
45 | assert len(resources) == 2
46 | assert all(isinstance(r, Resource) for r in resources)
47 | assert resources[0].name == "Table: users"
48 | assert resources[1].name == "Table: products"
49 |
50 | # Test tool listing
51 | tools = await app.list_tools()
52 | assert len(tools) == 1
53 | assert tools[0].name == "execute_sql"
54 |
55 | # Test tool execution
56 | mock_cursor.description = [('count',)]
57 | mock_cursor.fetchall.return_value = [(42,)]
58 | result = await app.call_tool("execute_sql", {"query": "SELECT COUNT(*) FROM users"})
59 |
60 | assert len(result) == 1
61 | assert isinstance(result[0], TextContent)
62 | assert "42" in result[0].text
63 |
64 | @pytest.mark.asyncio
65 | async def test_concurrent_requests(self):
66 | """Test handling of concurrent MCP requests."""
67 | mock_conn = Mock()
68 | mock_cursor = Mock()
69 | mock_conn.cursor.return_value = mock_cursor
70 |
71 | with patch('pymssql.connect', return_value=mock_conn):
72 | with patch.dict('os.environ', {
73 | 'MSSQL_USER': 'test',
74 | 'MSSQL_PASSWORD': 'test',
75 | 'MSSQL_DATABASE': 'testdb'
76 | }):
77 | # Simulate concurrent resource listing
78 | mock_cursor.fetchall.return_value = [('table1',), ('table2',)]
79 |
80 | # Run multiple concurrent requests
81 | tasks = [app.list_resources() for _ in range(10)]
82 | results = await asyncio.gather(*tasks)
83 |
84 | # All should succeed
85 | assert len(results) == 10
86 | for result in results:
87 | assert len(result) == 2
88 |
89 | @pytest.mark.asyncio
90 | async def test_error_propagation(self):
91 | """Test that errors are properly propagated through MCP protocol."""
92 | with patch.dict('os.environ', {}, clear=True):
93 | # Missing configuration should raise error
94 | with pytest.raises(ValueError, match="Missing required database configuration"):
95 | await app.list_resources()
96 |
97 |
98 | class TestDatabaseIntegration:
99 | """Test actual database integration scenarios."""
100 |
101 | @pytest.mark.asyncio
102 | async def test_connection_pooling(self):
103 | """Test that connections are properly managed and pooled."""
104 | call_count = 0
105 |
106 | def mock_connect(**kwargs):
107 | nonlocal call_count
108 | call_count += 1
109 | mock_conn = Mock()
110 | mock_cursor = Mock()
111 | mock_cursor.fetchall.return_value = []
112 | mock_conn.cursor.return_value = mock_cursor
113 | return mock_conn
114 |
115 | with patch('pymssql.connect', side_effect=mock_connect):
116 | with patch.dict('os.environ', {
117 | 'MSSQL_USER': 'test',
118 | 'MSSQL_PASSWORD': 'test',
119 | 'MSSQL_DATABASE': 'testdb'
120 | }):
121 | # Multiple operations should create multiple connections
122 | # (current implementation doesn't pool)
123 | for _ in range(5):
124 | await app.list_resources()
125 |
126 | assert call_count == 5 # One connection per operation
127 |
128 | @pytest.mark.asyncio
129 | async def test_transaction_handling(self):
130 | """Test proper transaction handling for write operations."""
131 | mock_conn = Mock()
132 | mock_cursor = Mock()
133 | mock_conn.cursor.return_value = mock_cursor
134 | mock_cursor.rowcount = 1
135 |
136 | with patch('pymssql.connect', return_value=mock_conn):
137 | with patch.dict('os.environ', {
138 | 'MSSQL_USER': 'test',
139 | 'MSSQL_PASSWORD': 'test',
140 | 'MSSQL_DATABASE': 'testdb'
141 | }):
142 | # Test INSERT operation
143 | result = await app.call_tool("execute_sql", {
144 | "query": "INSERT INTO users (name) VALUES ('test')"
145 | })
146 |
147 | # Verify commit was called
148 | mock_conn.commit.assert_called_once()
149 | assert "Rows affected: 1" in result[0].text
150 |
151 | @pytest.mark.asyncio
152 | async def test_connection_cleanup(self):
153 | """Test that connections are properly cleaned up."""
154 | mock_conn = Mock()
155 | mock_cursor = Mock()
156 | mock_conn.cursor.return_value = mock_cursor
157 |
158 | with patch('pymssql.connect', return_value=mock_conn):
159 | with patch.dict('os.environ', {
160 | 'MSSQL_USER': 'test',
161 | 'MSSQL_PASSWORD': 'test',
162 | 'MSSQL_DATABASE': 'testdb'
163 | }):
164 | # Even if operation fails, connection should be closed
165 | mock_cursor.execute.side_effect = Exception("Query failed")
166 |
167 | try:
168 | await app.call_tool("execute_sql", {"query": "SELECT * FROM users"})
169 | except:
170 | pass
171 |
172 | # Connection should still be closed
173 | # (Note: current implementation may not guarantee this)
174 |
175 |
176 | class TestEdgeCases:
177 | """Test edge cases and boundary conditions."""
178 |
179 | @pytest.mark.asyncio
180 | async def test_empty_table_list(self):
181 | """Test handling of database with no tables."""
182 | mock_conn = Mock()
183 | mock_cursor = Mock()
184 | mock_conn.cursor.return_value = mock_cursor
185 | mock_cursor.fetchall.return_value = []
186 |
187 | with patch('pymssql.connect', return_value=mock_conn):
188 | with patch.dict('os.environ', {
189 | 'MSSQL_USER': 'test',
190 | 'MSSQL_PASSWORD': 'test',
191 | 'MSSQL_DATABASE': 'testdb'
192 | }):
193 | resources = await app.list_resources()
194 | assert resources == []
195 |
196 | @pytest.mark.asyncio
197 | async def test_large_result_set(self):
198 | """Test handling of large query results."""
199 | mock_conn = Mock()
200 | mock_cursor = Mock()
201 | mock_conn.cursor.return_value = mock_cursor
202 |
203 | # Create large result set
204 | large_result = [(i, f'user_{i}', f'email_{i}@test.com') for i in range(10000)]
205 | mock_cursor.description = [('id',), ('name',), ('email',)]
206 | mock_cursor.fetchall.return_value = large_result
207 |
208 | with patch('pymssql.connect', return_value=mock_conn):
209 | with patch.dict('os.environ', {
210 | 'MSSQL_USER': 'test',
211 | 'MSSQL_PASSWORD': 'test',
212 | 'MSSQL_DATABASE': 'testdb'
213 | }):
214 | result = await app.call_tool("execute_sql", {
215 | "query": "SELECT * FROM users"
216 | })
217 |
218 | # Should handle large results gracefully
219 | assert len(result) == 1
220 | assert isinstance(result[0].text, str)
221 | assert len(result[0].text.split('\n')) == 10001 # Header + 10000 rows
222 |
223 | @pytest.mark.asyncio
224 | async def test_special_characters_in_data(self):
225 | """Test handling of special characters in query results."""
226 | mock_conn = Mock()
227 | mock_cursor = Mock()
228 | mock_conn.cursor.return_value = mock_cursor
229 |
230 | # Data with special characters
231 | mock_cursor.description = [('data',)]
232 | mock_cursor.fetchall.return_value = [
233 | ('Hello, "World"',),
234 | ('Line1\nLine2',),
235 | ('Tab\there',),
236 | ('NULL',),
237 | (None,),
238 | ]
239 |
240 | with patch('pymssql.connect', return_value=mock_conn):
241 | with patch.dict('os.environ', {
242 | 'MSSQL_USER': 'test',
243 | 'MSSQL_PASSWORD': 'test',
244 | 'MSSQL_DATABASE': 'testdb'
245 | }):
246 | result = await app.call_tool("execute_sql", {
247 | "query": "SELECT data FROM test_table"
248 | })
249 |
250 | # Should handle special characters properly
251 | assert len(result) == 1
252 | text = result[0].text
253 | assert 'Hello, "World"' in text
254 | assert 'None' in text # None should be converted to string
```
--------------------------------------------------------------------------------
/src/mssql_mcp_server/server.py:
--------------------------------------------------------------------------------
```python
1 | import asyncio
2 | import logging
3 | import os
4 | import re
5 | import pymssql
6 | from mcp.server import Server
7 | from mcp.types import Resource, Tool, TextContent
8 | from pydantic import AnyUrl
9 |
10 | # Configure logging
11 | logging.basicConfig(
12 | level=logging.INFO,
13 | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
14 | )
15 | logger = logging.getLogger("mssql_mcp_server")
16 |
17 | def validate_table_name(table_name: str) -> str:
18 | """Validate and escape table name to prevent SQL injection."""
19 | # Allow only alphanumeric, underscore, and dot (for schema.table)
20 | if not re.match(r'^[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)?$', table_name):
21 | raise ValueError(f"Invalid table name: {table_name}")
22 |
23 | # Split schema and table if present
24 | parts = table_name.split('.')
25 | if len(parts) == 2:
26 | # Escape both schema and table name
27 | return f"[{parts[0]}].[{parts[1]}]"
28 | else:
29 | # Just table name
30 | return f"[{table_name}]"
31 |
32 | def get_db_config():
33 | """Get database configuration from environment variables."""
34 | # Basic configuration
35 | server = os.getenv("MSSQL_SERVER", "localhost")
36 | logger.info(f"MSSQL_SERVER environment variable: {os.getenv('MSSQL_SERVER', 'NOT SET')}")
37 | logger.info(f"Using server: {server}")
38 |
39 | # Handle LocalDB connections (Issue #6)
40 | # LocalDB format: (localdb)\instancename
41 | if server.startswith("(localdb)\\"):
42 | # For LocalDB, pymssql needs special formatting
43 | # Convert (localdb)\MSSQLLocalDB to localhost\MSSQLLocalDB with dynamic port
44 | instance_name = server.replace("(localdb)\\", "")
45 | server = f".\\{instance_name}"
46 | logger.info(f"Detected LocalDB connection, converted to: {server}")
47 |
48 | config = {
49 | "server": server,
50 | "user": os.getenv("MSSQL_USER"),
51 | "password": os.getenv("MSSQL_PASSWORD"),
52 | "database": os.getenv("MSSQL_DATABASE"),
53 | "port": os.getenv("MSSQL_PORT", "1433"), # Default MSSQL port
54 | }
55 | # Port support (Issue #8)
56 | port = os.getenv("MSSQL_PORT")
57 | if port:
58 | try:
59 | config["port"] = int(port)
60 | except ValueError:
61 | logger.warning(f"Invalid MSSQL_PORT value: {port}. Using default port.")
62 |
63 | # Encryption settings for Azure SQL (Issue #11)
64 | # Check if we're connecting to Azure SQL
65 | if config["server"] and ".database.windows.net" in config["server"]:
66 | config["tds_version"] = "7.4" # Required for Azure SQL
67 | # Azure SQL requires encryption - use connection string format for pymssql 2.3+
68 | # This improves upon TDS-only approach by being more explicit
69 | if os.getenv("MSSQL_ENCRYPT", "true").lower() == "true":
70 | config["server"] += ";Encrypt=yes;TrustServerCertificate=no"
71 | else:
72 | # For non-Azure connections, respect the MSSQL_ENCRYPT setting
73 | # Use connection string format in addition to TDS version for better compatibility
74 | encrypt_str = os.getenv("MSSQL_ENCRYPT", "false")
75 | if encrypt_str.lower() == "true":
76 | config["tds_version"] = "7.4" # Keep existing TDS approach
77 | config["server"] += ";Encrypt=yes;TrustServerCertificate=yes" # Add explicit setting
78 |
79 | # Windows Authentication support (Issue #7)
80 | use_windows_auth = os.getenv("MSSQL_WINDOWS_AUTH", "false").lower() == "true"
81 |
82 | if use_windows_auth:
83 | # For Windows authentication, user and password are not required
84 | if not config["database"]:
85 | logger.error("MSSQL_DATABASE is required")
86 | raise ValueError("Missing required database configuration")
87 | # Remove user and password for Windows auth
88 | config.pop("user", None)
89 | config.pop("password", None)
90 | logger.info("Using Windows Authentication")
91 | else:
92 | # SQL Authentication - user and password are required
93 | if not all([config["user"], config["password"], config["database"]]):
94 | logger.error("Missing required database configuration. Please check environment variables:")
95 | logger.error("MSSQL_USER, MSSQL_PASSWORD, and MSSQL_DATABASE are required")
96 | raise ValueError("Missing required database configuration")
97 |
98 | return config
99 |
100 | def get_command():
101 | """Get the command to execute SQL queries."""
102 | return os.getenv("MSSQL_COMMAND", "execute_sql")
103 |
104 | def is_select_query(query: str) -> bool:
105 | """
106 | Check if a query is a SELECT statement, accounting for comments.
107 | Handles both single-line (--) and multi-line (/* */) SQL comments.
108 | """
109 | # Remove multi-line comments /* ... */
110 | query_cleaned = re.sub(r'/\*.*?\*/', '', query, flags=re.DOTALL)
111 |
112 | # Remove single-line comments -- ...
113 | lines = query_cleaned.split('\n')
114 | cleaned_lines = []
115 | for line in lines:
116 | # Find -- comment marker and remove everything after it
117 | comment_pos = line.find('--')
118 | if comment_pos != -1:
119 | line = line[:comment_pos]
120 | cleaned_lines.append(line)
121 |
122 | query_cleaned = '\n'.join(cleaned_lines)
123 |
124 | # Get the first non-empty word after stripping whitespace
125 | first_word = query_cleaned.strip().split()[0] if query_cleaned.strip() else ""
126 | return first_word.upper() == "SELECT"
127 |
128 | # Initialize server
129 | app = Server("mssql_mcp_server")
130 |
131 | @app.list_resources()
132 | async def list_resources() -> list[Resource]:
133 | """List SQL Server tables as resources."""
134 | config = get_db_config()
135 | try:
136 | conn = pymssql.connect(**config)
137 | cursor = conn.cursor()
138 | # Query to get user tables from the current database
139 | cursor.execute("""
140 | SELECT TABLE_NAME
141 | FROM INFORMATION_SCHEMA.TABLES
142 | WHERE TABLE_TYPE = 'BASE TABLE'
143 | """)
144 | tables = cursor.fetchall()
145 | logger.info(f"Found tables: {tables}")
146 |
147 | resources = []
148 | for table in tables:
149 | resources.append(
150 | Resource(
151 | uri=f"mssql://{table[0]}/data",
152 | name=f"Table: {table[0]}",
153 | mimeType="text/plain",
154 | description=f"Data in table: {table[0]}"
155 | )
156 | )
157 | cursor.close()
158 | conn.close()
159 | return resources
160 | except Exception as e:
161 | logger.error(f"Failed to list resources: {str(e)}")
162 | return []
163 |
164 | @app.read_resource()
165 | async def read_resource(uri: AnyUrl) -> str:
166 | """Read table contents."""
167 | config = get_db_config()
168 | uri_str = str(uri)
169 | logger.info(f"Reading resource: {uri_str}")
170 |
171 | if not uri_str.startswith("mssql://"):
172 | raise ValueError(f"Invalid URI scheme: {uri_str}")
173 |
174 | parts = uri_str[8:].split('/')
175 | table = parts[0]
176 |
177 | try:
178 | # Validate table name to prevent SQL injection
179 | safe_table = validate_table_name(table)
180 |
181 | conn = pymssql.connect(**config)
182 | cursor = conn.cursor()
183 | # Use TOP 100 for MSSQL (equivalent to LIMIT in MySQL)
184 | cursor.execute(f"SELECT TOP 100 * FROM {safe_table}")
185 | columns = [desc[0] for desc in cursor.description]
186 | rows = cursor.fetchall()
187 | result = [",".join(map(str, row)) for row in rows]
188 | cursor.close()
189 | conn.close()
190 | return "\n".join([",".join(columns)] + result)
191 |
192 | except Exception as e:
193 | logger.error(f"Database error reading resource {uri}: {str(e)}")
194 | raise RuntimeError(f"Database error: {str(e)}")
195 |
196 | @app.list_tools()
197 | async def list_tools() -> list[Tool]:
198 | """List available SQL Server tools."""
199 | command = get_command()
200 | logger.info("Listing tools...")
201 | return [
202 | Tool(
203 | name=command,
204 | description="Execute an SQL query on the SQL Server",
205 | inputSchema={
206 | "type": "object",
207 | "properties": {
208 | "query": {
209 | "type": "string",
210 | "description": "The SQL query to execute"
211 | }
212 | },
213 | "required": ["query"]
214 | }
215 | )
216 | ]
217 |
218 | @app.call_tool()
219 | async def call_tool(name: str, arguments: dict) -> list[TextContent]:
220 | """Execute SQL commands."""
221 | config = get_db_config()
222 | command = get_command()
223 | logger.info(f"Calling tool: {name} with arguments: {arguments}")
224 |
225 | if name != command:
226 | raise ValueError(f"Unknown tool: {name}")
227 |
228 | query = arguments.get("query")
229 | if not query:
230 | raise ValueError("Query is required")
231 |
232 | try:
233 | conn = pymssql.connect(**config)
234 | cursor = conn.cursor()
235 | cursor.execute(query)
236 |
237 | # Special handling for table listing
238 | if is_select_query(query) and "INFORMATION_SCHEMA.TABLES" in query.upper():
239 | tables = cursor.fetchall()
240 | result = ["Tables_in_" + config["database"]] # Header
241 | result.extend([table[0] for table in tables])
242 | cursor.close()
243 | conn.close()
244 | return [TextContent(type="text", text="\n".join(result))]
245 |
246 | # Regular SELECT queries
247 | elif is_select_query(query):
248 | columns = [desc[0] for desc in cursor.description]
249 | rows = cursor.fetchall()
250 | result = [",".join(map(str, row)) for row in rows]
251 | cursor.close()
252 | conn.close()
253 | return [TextContent(type="text", text="\n".join([",".join(columns)] + result))]
254 |
255 | # Non-SELECT queries
256 | else:
257 | conn.commit()
258 | affected_rows = cursor.rowcount
259 | cursor.close()
260 | conn.close()
261 | return [TextContent(type="text", text=f"Query executed successfully. Rows affected: {affected_rows}")]
262 |
263 | except Exception as e:
264 | logger.error(f"Error executing SQL '{query}': {e}")
265 | return [TextContent(type="text", text=f"Error executing query: {str(e)}")]
266 |
267 | async def main():
268 | """Main entry point to run the MCP server."""
269 | from mcp.server.stdio import stdio_server
270 |
271 | logger.info("Starting MSSQL MCP server...")
272 | config = get_db_config()
273 | # Log connection info without exposing sensitive data
274 | server_info = config['server']
275 | if 'port' in config:
276 | server_info += f":{config['port']}"
277 | user_info = config.get('user', 'Windows Auth')
278 | logger.info(f"Database config: {server_info}/{config['database']} as {user_info}")
279 |
280 | async with stdio_server() as (read_stream, write_stream):
281 | try:
282 | await app.run(
283 | read_stream,
284 | write_stream,
285 | app.create_initialization_options()
286 | )
287 | except Exception as e:
288 | logger.error(f"Server error: {str(e)}", exc_info=True)
289 | raise
290 |
291 | if __name__ == "__main__":
292 | asyncio.run(main())
293 |
```
--------------------------------------------------------------------------------
/tests/test_error_handling.py:
--------------------------------------------------------------------------------
```python
1 | """Test error handling, resilience, and recovery scenarios."""
2 | import pytest
3 | import asyncio
4 | from unittest.mock import Mock, patch, PropertyMock
5 | from mssql_mcp_server.server import app, get_db_config
6 | import pymssql
7 |
8 |
9 | class TestConnectionErrors:
10 | """Test various connection error scenarios."""
11 |
12 | @pytest.mark.asyncio
13 | async def test_connection_timeout(self):
14 | """Test handling of connection timeouts."""
15 | with patch('pymssql.connect') as mock_connect:
16 | mock_connect.side_effect = pymssql.OperationalError("Connection timeout")
17 |
18 | with patch.dict('os.environ', {
19 | 'MSSQL_USER': 'test',
20 | 'MSSQL_PASSWORD': 'test',
21 | 'MSSQL_DATABASE': 'testdb'
22 | }):
23 | resources = await app.list_resources()
24 | assert resources == [] # Should return empty list on connection failure
25 |
26 | @pytest.mark.asyncio
27 | async def test_authentication_failure(self):
28 | """Test handling of authentication failures."""
29 | with patch('pymssql.connect') as mock_connect:
30 | mock_connect.side_effect = pymssql.OperationalError("Login failed for user 'test'")
31 |
32 | with patch.dict('os.environ', {
33 | 'MSSQL_USER': 'test',
34 | 'MSSQL_PASSWORD': 'wrong_password',
35 | 'MSSQL_DATABASE': 'testdb'
36 | }):
37 | resources = await app.list_resources()
38 | assert resources == []
39 |
40 | @pytest.mark.asyncio
41 | async def test_database_not_found(self):
42 | """Test handling when database doesn't exist."""
43 | with patch('pymssql.connect') as mock_connect:
44 | mock_connect.side_effect = pymssql.OperationalError("Database 'nonexistent' does not exist")
45 |
46 | with patch.dict('os.environ', {
47 | 'MSSQL_USER': 'test',
48 | 'MSSQL_PASSWORD': 'test',
49 | 'MSSQL_DATABASE': 'nonexistent'
50 | }):
51 | resources = await app.list_resources()
52 | assert resources == []
53 |
54 | @pytest.mark.asyncio
55 | async def test_network_disconnection(self):
56 | """Test handling of network disconnections during query."""
57 | mock_conn = Mock()
58 | mock_cursor = Mock()
59 | mock_conn.cursor.return_value = mock_cursor
60 |
61 | # Simulate network error during query execution
62 | mock_cursor.execute.side_effect = pymssql.OperationalError("Network error")
63 |
64 | with patch('pymssql.connect', return_value=mock_conn):
65 | with patch.dict('os.environ', {
66 | 'MSSQL_USER': 'test',
67 | 'MSSQL_PASSWORD': 'test',
68 | 'MSSQL_DATABASE': 'testdb'
69 | }):
70 | result = await app.call_tool("execute_sql", {"query": "SELECT * FROM users"})
71 | assert "Error executing query" in result[0].text
72 |
73 | # Ensure cleanup attempted
74 | mock_cursor.close.assert_called()
75 | mock_conn.close.assert_called()
76 |
77 |
78 | class TestQueryErrors:
79 | """Test various query execution error scenarios."""
80 |
81 | @pytest.mark.asyncio
82 | async def test_syntax_error(self):
83 | """Test handling of SQL syntax errors."""
84 | mock_conn = Mock()
85 | mock_cursor = Mock()
86 | mock_conn.cursor.return_value = mock_cursor
87 |
88 | mock_cursor.execute.side_effect = pymssql.ProgrammingError("Incorrect syntax near 'SELCT'")
89 |
90 | with patch('pymssql.connect', return_value=mock_conn):
91 | with patch.dict('os.environ', {
92 | 'MSSQL_USER': 'test',
93 | 'MSSQL_PASSWORD': 'test',
94 | 'MSSQL_DATABASE': 'testdb'
95 | }):
96 | result = await app.call_tool("execute_sql", {"query": "SELCT * FROM users"})
97 | assert "Error executing query" in result[0].text
98 | assert len(result) == 1
99 |
100 | @pytest.mark.asyncio
101 | async def test_permission_denied(self):
102 | """Test handling of permission denied errors."""
103 | mock_conn = Mock()
104 | mock_cursor = Mock()
105 | mock_conn.cursor.return_value = mock_cursor
106 |
107 | mock_cursor.execute.side_effect = pymssql.DatabaseError("The SELECT permission was denied")
108 |
109 | with patch('pymssql.connect', return_value=mock_conn):
110 | with patch.dict('os.environ', {
111 | 'MSSQL_USER': 'test',
112 | 'MSSQL_PASSWORD': 'test',
113 | 'MSSQL_DATABASE': 'testdb'
114 | }):
115 | result = await app.call_tool("execute_sql", {"query": "SELECT * FROM sensitive_table"})
116 | assert "Error executing query" in result[0].text
117 |
118 | @pytest.mark.asyncio
119 | async def test_deadlock_handling(self):
120 | """Test handling of database deadlocks."""
121 | mock_conn = Mock()
122 | mock_cursor = Mock()
123 | mock_conn.cursor.return_value = mock_cursor
124 |
125 | mock_cursor.execute.side_effect = pymssql.OperationalError("Transaction was deadlocked")
126 |
127 | with patch('pymssql.connect', return_value=mock_conn):
128 | with patch.dict('os.environ', {
129 | 'MSSQL_USER': 'test',
130 | 'MSSQL_PASSWORD': 'test',
131 | 'MSSQL_DATABASE': 'testdb'
132 | }):
133 | result = await app.call_tool("execute_sql", {
134 | "query": "UPDATE users SET status = 'active'"
135 | })
136 | assert "Error executing query" in result[0].text
137 |
138 |
139 | class TestResourceErrors:
140 | """Test resource access error scenarios."""
141 |
142 | @pytest.mark.asyncio
143 | async def test_invalid_uri_format(self):
144 | """Test handling of invalid resource URIs."""
145 | from pydantic import AnyUrl
146 |
147 | with patch.dict('os.environ', {
148 | 'MSSQL_USER': 'test',
149 | 'MSSQL_PASSWORD': 'test',
150 | 'MSSQL_DATABASE': 'testdb'
151 | }):
152 | # Test invalid URI scheme
153 | with pytest.raises(ValueError, match="Invalid URI scheme"):
154 | await app.read_resource(AnyUrl("http://invalid/uri"))
155 |
156 | @pytest.mark.asyncio
157 | async def test_table_not_found(self):
158 | """Test handling when requested table doesn't exist."""
159 | mock_conn = Mock()
160 | mock_cursor = Mock()
161 | mock_conn.cursor.return_value = mock_cursor
162 |
163 | mock_cursor.execute.side_effect = pymssql.ProgrammingError("Invalid object name 'nonexistent'")
164 |
165 | with patch('pymssql.connect', return_value=mock_conn):
166 | with patch.dict('os.environ', {
167 | 'MSSQL_USER': 'test',
168 | 'MSSQL_PASSWORD': 'test',
169 | 'MSSQL_DATABASE': 'testdb'
170 | }):
171 | from pydantic import AnyUrl
172 | with pytest.raises(RuntimeError, match="Database error"):
173 | await app.read_resource(AnyUrl("mssql://nonexistent/data"))
174 |
175 |
176 | class TestRecoveryScenarios:
177 | """Test recovery and resilience scenarios."""
178 |
179 | @pytest.mark.asyncio
180 | async def test_connection_retry_logic(self):
181 | """Test that connection failures don't crash the server."""
182 | attempt_count = 0
183 |
184 | def mock_connect(**kwargs):
185 | nonlocal attempt_count
186 | attempt_count += 1
187 | if attempt_count < 3:
188 | raise pymssql.OperationalError("Connection failed")
189 | # Success on third attempt
190 | mock_conn = Mock()
191 | mock_cursor = Mock()
192 | mock_cursor.fetchall.return_value = [('users',)]
193 | mock_conn.cursor.return_value = mock_cursor
194 | return mock_conn
195 |
196 | with patch('pymssql.connect', side_effect=mock_connect):
197 | with patch.dict('os.environ', {
198 | 'MSSQL_USER': 'test',
199 | 'MSSQL_PASSWORD': 'test',
200 | 'MSSQL_DATABASE': 'testdb'
201 | }):
202 | # First two calls should fail
203 | resources1 = await app.list_resources()
204 | assert resources1 == []
205 |
206 | resources2 = await app.list_resources()
207 | assert resources2 == []
208 |
209 | # Third call should succeed
210 | resources3 = await app.list_resources()
211 | assert len(resources3) == 1
212 |
213 | @pytest.mark.asyncio
214 | async def test_partial_result_handling(self):
215 | """Test handling when cursor fails mid-iteration."""
216 | mock_conn = Mock()
217 | mock_cursor = Mock()
218 | mock_conn.cursor.return_value = mock_cursor
219 |
220 | # Simulate cursor failing during iteration
221 | def failing_fetchall():
222 | raise pymssql.OperationalError("Connection lost during query")
223 |
224 | mock_cursor.execute.return_value = None
225 | mock_cursor.fetchall = failing_fetchall
226 | mock_cursor.description = [('id',), ('name',)]
227 |
228 | with patch('pymssql.connect', return_value=mock_conn):
229 | with patch.dict('os.environ', {
230 | 'MSSQL_USER': 'test',
231 | 'MSSQL_PASSWORD': 'test',
232 | 'MSSQL_DATABASE': 'testdb'
233 | }):
234 | # Should handle the error gracefully
235 | from pydantic import AnyUrl
236 | with pytest.raises(RuntimeError):
237 | await app.read_resource(AnyUrl("mssql://users/data"))
238 |
239 | @pytest.mark.asyncio
240 | async def test_long_running_query_handling(self):
241 | """Test handling of long-running queries."""
242 | mock_conn = Mock()
243 | mock_cursor = Mock()
244 | mock_conn.cursor.return_value = mock_cursor
245 |
246 | async def slow_execute(query):
247 | await asyncio.sleep(0.1) # Simulate slow query
248 | return None
249 |
250 | mock_cursor.execute = Mock(side_effect=lambda q: None)
251 | mock_cursor.fetchall.return_value = [(1,)]
252 | mock_cursor.description = [('count',)]
253 |
254 | with patch('pymssql.connect', return_value=mock_conn):
255 | with patch.dict('os.environ', {
256 | 'MSSQL_USER': 'test',
257 | 'MSSQL_PASSWORD': 'test',
258 | 'MSSQL_DATABASE': 'testdb'
259 | }):
260 | # Should complete without timeout
261 | result = await app.call_tool("execute_sql", {
262 | "query": "SELECT COUNT(*) FROM large_table"
263 | })
264 | assert "1" in result[0].text
265 |
266 |
267 | class TestMemoryAndResourceManagement:
268 | """Test memory and resource leak prevention."""
269 |
270 | @pytest.mark.asyncio
271 | async def test_cursor_cleanup_on_error(self):
272 | """Ensure cursors are closed even on errors."""
273 | mock_conn = Mock()
274 | mock_cursor = Mock()
275 | mock_conn.cursor.return_value = mock_cursor
276 |
277 | mock_cursor.execute.side_effect = Exception("Unexpected error")
278 |
279 | with patch('pymssql.connect', return_value=mock_conn):
280 | with patch.dict('os.environ', {
281 | 'MSSQL_USER': 'test',
282 | 'MSSQL_PASSWORD': 'test',
283 | 'MSSQL_DATABASE': 'testdb'
284 | }):
285 | result = await app.call_tool("execute_sql", {"query": "SELECT * FROM users"})
286 |
287 | # Cursor should be closed despite error
288 | mock_cursor.close.assert_called()
289 | mock_conn.close.assert_called()
290 |
291 | @pytest.mark.asyncio
292 | async def test_connection_cleanup_on_exception(self):
293 | """Ensure connections are closed on exceptions."""
294 | mock_conn = Mock()
295 | mock_cursor = Mock()
296 | mock_conn.cursor.return_value = mock_cursor
297 |
298 | # Make cursor creation fail after connection
299 | mock_conn.cursor.side_effect = Exception("Cursor creation failed")
300 |
301 | with patch('pymssql.connect', return_value=mock_conn):
302 | with patch.dict('os.environ', {
303 | 'MSSQL_USER': 'test',
304 | 'MSSQL_PASSWORD': 'test',
305 | 'MSSQL_DATABASE': 'testdb'
306 | }):
307 | resources = await app.list_resources()
308 | assert resources == []
309 |
310 | # Connection should still be closed
311 | mock_conn.close.assert_called()
```
--------------------------------------------------------------------------------
/tests/test_performance.py:
--------------------------------------------------------------------------------
```python
1 | """Performance and load tests for production readiness."""
2 | import pytest
3 | import asyncio
4 | import time
5 | from unittest.mock import Mock, patch
6 | from concurrent.futures import ThreadPoolExecutor
7 | import gc
8 | import psutil
9 | import os
10 | from mssql_mcp_server.server import app
11 |
12 |
13 | class TestPerformance:
14 | """Test performance characteristics under load."""
15 |
16 | @pytest.mark.asyncio
17 | async def test_query_response_time(self):
18 | """Test that queries respond within acceptable time limits."""
19 | mock_conn = Mock()
20 | mock_cursor = Mock()
21 | mock_conn.cursor.return_value = mock_cursor
22 |
23 | # Simulate reasonable query execution
24 | mock_cursor.description = [('id',), ('name',)]
25 | mock_cursor.fetchall.return_value = [(i, f'user_{i}') for i in range(100)]
26 |
27 | with patch('pymssql.connect', return_value=mock_conn):
28 | with patch.dict('os.environ', {
29 | 'MSSQL_USER': 'test',
30 | 'MSSQL_PASSWORD': 'test',
31 | 'MSSQL_DATABASE': 'testdb'
32 | }):
33 | start_time = time.time()
34 | result = await app.call_tool("execute_sql", {"query": "SELECT * FROM users"})
35 | end_time = time.time()
36 |
37 | # Query should complete in reasonable time (< 1 second for mock)
38 | assert end_time - start_time < 1.0
39 | assert len(result) == 1
40 | assert "user_99" in result[0].text
41 |
42 | @pytest.mark.asyncio
43 | async def test_concurrent_query_performance(self):
44 | """Test performance under concurrent query load."""
45 | mock_conn = Mock()
46 | mock_cursor = Mock()
47 | mock_conn.cursor.return_value = mock_cursor
48 |
49 | mock_cursor.description = [('count',)]
50 | mock_cursor.fetchall.return_value = [(42,)]
51 |
52 | with patch('pymssql.connect', return_value=mock_conn):
53 | with patch.dict('os.environ', {
54 | 'MSSQL_USER': 'test',
55 | 'MSSQL_PASSWORD': 'test',
56 | 'MSSQL_DATABASE': 'testdb'
57 | }):
58 | # Run 50 concurrent queries
59 | start_time = time.time()
60 | tasks = [
61 | app.call_tool("execute_sql", {"query": f"SELECT COUNT(*) FROM table_{i}"})
62 | for i in range(50)
63 | ]
64 | results = await asyncio.gather(*tasks)
65 | end_time = time.time()
66 |
67 | # All queries should complete
68 | assert len(results) == 50
69 | assert all("42" in r[0].text for r in results)
70 |
71 | # Should complete in reasonable time (< 5 seconds for 50 queries)
72 | assert end_time - start_time < 5.0
73 |
74 | @pytest.mark.asyncio
75 | async def test_large_result_set_performance(self):
76 | """Test performance with large result sets."""
77 | mock_conn = Mock()
78 | mock_cursor = Mock()
79 | mock_conn.cursor.return_value = mock_cursor
80 |
81 | # Create large result set (10,000 rows)
82 | large_result = [(i, f'user_{i}', f'email_{i}@test.com', i % 100) for i in range(10000)]
83 | mock_cursor.description = [('id',), ('name',), ('email',), ('status',)]
84 | mock_cursor.fetchall.return_value = large_result
85 |
86 | with patch('pymssql.connect', return_value=mock_conn):
87 | with patch.dict('os.environ', {
88 | 'MSSQL_USER': 'test',
89 | 'MSSQL_PASSWORD': 'test',
90 | 'MSSQL_DATABASE': 'testdb'
91 | }):
92 | start_time = time.time()
93 | result = await app.call_tool("execute_sql", {"query": "SELECT * FROM large_table"})
94 | end_time = time.time()
95 |
96 | # Should handle large results efficiently
97 | assert len(result) == 1
98 | lines = result[0].text.split('\n')
99 | assert len(lines) == 10001 # Header + 10000 rows
100 |
101 | # Should complete in reasonable time (< 10 seconds)
102 | assert end_time - start_time < 10.0
103 |
104 |
105 | class TestMemoryUsage:
106 | """Test memory usage and leak prevention."""
107 |
108 | @pytest.mark.asyncio
109 | async def test_memory_usage_stability(self):
110 | """Test that memory usage remains stable over time."""
111 | if not hasattr(psutil.Process(), 'memory_info'):
112 | pytest.skip("Memory monitoring not available")
113 |
114 | mock_conn = Mock()
115 | mock_cursor = Mock()
116 | mock_conn.cursor.return_value = mock_cursor
117 |
118 | mock_cursor.fetchall.return_value = [('table1',), ('table2',)]
119 |
120 | with patch('pymssql.connect', return_value=mock_conn):
121 | with patch.dict('os.environ', {
122 | 'MSSQL_USER': 'test',
123 | 'MSSQL_PASSWORD': 'test',
124 | 'MSSQL_DATABASE': 'testdb'
125 | }):
126 | process = psutil.Process(os.getpid())
127 |
128 | # Get baseline memory
129 | gc.collect()
130 | baseline_memory = process.memory_info().rss / 1024 / 1024 # MB
131 |
132 | # Run many operations
133 | for _ in range(100):
134 | await app.list_resources()
135 |
136 | # Check memory after operations
137 | gc.collect()
138 | final_memory = process.memory_info().rss / 1024 / 1024 # MB
139 |
140 | # Memory growth should be minimal (< 50 MB)
141 | memory_growth = final_memory - baseline_memory
142 | assert memory_growth < 50, f"Memory grew by {memory_growth} MB"
143 |
144 | @pytest.mark.asyncio
145 | async def test_large_data_memory_handling(self):
146 | """Test memory handling with large data sets."""
147 | mock_conn = Mock()
148 | mock_cursor = Mock()
149 | mock_conn.cursor.return_value = mock_cursor
150 |
151 | # Create very large result
152 | def generate_large_result():
153 | for i in range(100000):
154 | yield (i, f'data_{i}' * 100) # Large strings
155 |
156 | mock_cursor.description = [('id',), ('data',)]
157 | mock_cursor.fetchall.return_value = list(generate_large_result())
158 |
159 | with patch('pymssql.connect', return_value=mock_conn):
160 | with patch.dict('os.environ', {
161 | 'MSSQL_USER': 'test',
162 | 'MSSQL_PASSWORD': 'test',
163 | 'MSSQL_DATABASE': 'testdb'
164 | }):
165 | # Should handle large data without excessive memory use
166 | result = await app.call_tool("execute_sql", {"query": "SELECT * FROM big_table"})
167 |
168 | # Result should be created
169 | assert len(result) == 1
170 |
171 | # Memory should be released after operation
172 | result = None
173 | gc.collect()
174 |
175 |
176 | class TestLoadHandling:
177 | """Test system behavior under various load conditions."""
178 |
179 | @pytest.mark.asyncio
180 | async def test_burst_load_handling(self):
181 | """Test handling of sudden burst loads."""
182 | mock_conn = Mock()
183 | mock_cursor = Mock()
184 | mock_conn.cursor.return_value = mock_cursor
185 |
186 | mock_cursor.fetchall.return_value = [('result',)]
187 | mock_cursor.description = [('data',)]
188 |
189 | with patch('pymssql.connect', return_value=mock_conn):
190 | with patch.dict('os.environ', {
191 | 'MSSQL_USER': 'test',
192 | 'MSSQL_PASSWORD': 'test',
193 | 'MSSQL_DATABASE': 'testdb'
194 | }):
195 | # Simulate burst of 100 requests
196 | start_time = time.time()
197 | tasks = []
198 | for _ in range(100):
199 | tasks.append(app.call_tool("execute_sql", {"query": "SELECT 1"}))
200 |
201 | results = await asyncio.gather(*tasks, return_exceptions=True)
202 | end_time = time.time()
203 |
204 | # Count successful results
205 | successful = sum(1 for r in results if not isinstance(r, Exception))
206 |
207 | # Most requests should succeed
208 | assert successful >= 90 # Allow 10% failure rate
209 |
210 | # Should complete within reasonable time
211 | assert end_time - start_time < 30.0
212 |
213 | @pytest.mark.asyncio
214 | async def test_sustained_load_handling(self):
215 | """Test handling of sustained load over time."""
216 | mock_conn = Mock()
217 | mock_cursor = Mock()
218 | mock_conn.cursor.return_value = mock_cursor
219 |
220 | mock_cursor.fetchall.return_value = [('ok',)]
221 | mock_cursor.description = [('status',)]
222 |
223 | with patch('pymssql.connect', return_value=mock_conn):
224 | with patch.dict('os.environ', {
225 | 'MSSQL_USER': 'test',
226 | 'MSSQL_PASSWORD': 'test',
227 | 'MSSQL_DATABASE': 'testdb'
228 | }):
229 | # Run continuous load for 10 seconds
230 | start_time = time.time()
231 | request_count = 0
232 | error_count = 0
233 |
234 | while time.time() - start_time < 10:
235 | try:
236 | result = await app.call_tool("execute_sql", {"query": "SELECT 'ok'"})
237 | request_count += 1
238 | assert "ok" in result[0].text
239 | except Exception:
240 | error_count += 1
241 |
242 | # Small delay to prevent overwhelming
243 | await asyncio.sleep(0.01)
244 |
245 | # Should handle sustained load
246 | assert request_count > 500 # At least 50 req/sec
247 | assert error_count < request_count * 0.05 # Less than 5% errors
248 |
249 |
250 | class TestScalability:
251 | """Test scalability characteristics."""
252 |
253 | @pytest.mark.asyncio
254 | async def test_resource_scaling(self):
255 | """Test handling of increasing number of resources."""
256 | mock_conn = Mock()
257 | mock_cursor = Mock()
258 | mock_conn.cursor.return_value = mock_cursor
259 |
260 | # Test with different table counts
261 | table_counts = [10, 100, 1000]
262 |
263 | with patch('pymssql.connect', return_value=mock_conn):
264 | with patch.dict('os.environ', {
265 | 'MSSQL_USER': 'test',
266 | 'MSSQL_PASSWORD': 'test',
267 | 'MSSQL_DATABASE': 'testdb'
268 | }):
269 | for count in table_counts:
270 | # Create table list
271 | tables = [(f'table_{i}',) for i in range(count)]
272 | mock_cursor.fetchall.return_value = tables
273 |
274 | start_time = time.time()
275 | resources = await app.list_resources()
276 | end_time = time.time()
277 |
278 | assert len(resources) == count
279 |
280 | # Time should scale reasonably (not exponentially)
281 | time_per_table = (end_time - start_time) / count
282 | assert time_per_table < 0.01 # Less than 10ms per table
283 |
284 | @pytest.mark.asyncio
285 | async def test_query_complexity_scaling(self):
286 | """Test performance with increasingly complex queries."""
287 | mock_conn = Mock()
288 | mock_cursor = Mock()
289 | mock_conn.cursor.return_value = mock_cursor
290 |
291 | with patch('pymssql.connect', return_value=mock_conn):
292 | with patch.dict('os.environ', {
293 | 'MSSQL_USER': 'test',
294 | 'MSSQL_PASSWORD': 'test',
295 | 'MSSQL_DATABASE': 'testdb'
296 | }):
297 | # Test simple to complex queries
298 | queries = [
299 | "SELECT 1",
300 | "SELECT * FROM users WHERE id = 1",
301 | "SELECT u.*, o.* FROM users u JOIN orders o ON u.id = o.user_id",
302 | """SELECT u.name, COUNT(o.id), SUM(o.total), AVG(o.total)
303 | FROM users u
304 | LEFT JOIN orders o ON u.id = o.user_id
305 | GROUP BY u.name
306 | HAVING COUNT(o.id) > 5
307 | ORDER BY SUM(o.total) DESC"""
308 | ]
309 |
310 | mock_cursor.description = [('result',)]
311 | mock_cursor.fetchall.return_value = [('data',)]
312 |
313 | for query in queries:
314 | start_time = time.time()
315 | result = await app.call_tool("execute_sql", {"query": query})
316 | end_time = time.time()
317 |
318 | # All queries should complete successfully
319 | assert len(result) == 1
320 |
321 | # Response time should be reasonable
322 | assert end_time - start_time < 2.0
```