# Directory Structure
```
├── .gitignore
├── config
│ ├── config_template.ini
│ ├── providex.ini
│ └── sqlite.ini
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│ └── odbc_mcp
│ ├── __init__.py
│ ├── b1-odbc.py
│ ├── config.py
│ ├── odbc.py
│ └── server.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python build artifacts
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environment
env/
venv/
ENV/
.venv/
# Environment and config files with secrets
.env
*.log
# IDE files
.idea/
.vscode/
*.swp
*.swo
.DS_Store
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# ODBC MCP Server
An MCP (Model Context Protocol) server that enables LLM tools like Claude Desktop to query databases via ODBC connections. This server allows Claude and other MCP clients to access, analyze, and generate insights from database data while maintaining security and read-only safeguards.
## Features
- Connect to any ODBC-compatible database
- Support for multiple database connections
- Flexible configuration through config files or Claude Desktop settings
- Read-only safeguards to prevent data modification
- Easy installation with UV package manager
- Detailed error reporting and logging
## Prerequisites
- Python 3.10 or higher
- UV package manager
- ODBC drivers for your database(s) installed on your system
- For Sage 100 Advanced: ProvideX ODBC driver
## Installation
```bash
git clone https://github.com/tylerstoltz/mcp-odbc.git
cd mcp-odbc
uv venv
.venv\Scripts\activate # On Mac / Linux: source .venv/bin/activate (untested)
uv pip install -e .
```
## Configuration
The server can be configured through:
1. A dedicated config file
2. Environment variables
3. Claude Desktop configuration
### General Configuration Setup
Create a configuration file (`.ini`) with your database connection details:
```ini
[SERVER]
default_connection = my_database
max_rows = 1000
timeout = 30
[my_database]
dsn = MyDatabaseDSN
username = your_username
password = your_password
readonly = true
```
### SQLite Configuration
For SQLite databases with ODBC:
```ini
[SERVER]
default_connection = sqlite_db
max_rows = 1000
timeout = 30
[sqlite_db]
dsn = SQLite_DSN_Name
readonly = true
```
### Sage 100 ProvideX Configuration
ProvideX requires special configuration for compatibility. Use this minimal configuration for best results:
```ini
[SERVER]
default_connection = sage100
max_rows = 1000
timeout = 60
[sage100]
dsn = YOUR_PROVIDEX_DSN
username = your_username
password = your_password
company = YOUR_COMPANY_CODE
readonly = true
```
**Important notes for ProvideX:**
- Use a minimal configuration - adding extra parameters may cause connection issues
- Always set `readonly = true` for safety
- The `company` parameter is required for Sage 100 connections
- Avoid changing connection attributes after connection is established
### Claude Desktop Integration
To configure the server in Claude Desktop:
1. Open or create `claude_desktop_config.json`:
- Windows: `%APPDATA%\Claude\claude_desktop_config.json`
- macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
2. Add MCP server configuration:
```json
{
"mcpServers": {
"odbc": {
"command": "uv",
"args": [
"--directory",
"C:\\path\\to\\mcp-odbc",
"run",
"odbc-mcp-server",
"--config",
"C:\\path\\to\\mcp-odbc\\config\\your_config.ini"
]
}
}
}
```
## Usage
### Starting the Server Manually
```bash
# Start with default configuration
odbc-mcp-server
# Start with a specific config file
odbc-mcp-server --config path/to/config.ini
```
### Using with Claude Desktop
1. Configure the server in Claude Desktop's config file as shown above
2. Restart Claude Desktop
3. The ODBC tools will automatically appear in the MCP tools list
### Available MCP Tools
The ODBC MCP server provides these tools:
1. **list-connections**: Lists all configured database connections
2. **list-available-dsns**: Lists all available DSNs on the system
3. **test-connection**: Tests a database connection and returns information
4. **list-tables**: Lists all tables in the database
5. **get-table-schema**: Gets schema information for a table
6. **execute-query**: Executes an SQL query and returns results
## Example Queries
Try these prompts in Claude Desktop after connecting the server:
- "Show me all the tables in the database"
- "What's the schema of the Customer table?"
- "Run a query to get the first 10 customers"
- "Find all orders placed in the last 30 days"
- "Analyze the sales data by region and provide insights"
## Troubleshooting
### Connection Issues
If you encounter connection problems:
1. Verify your ODBC drivers are installed correctly
2. Test your DSN using the ODBC Data Source Administrator
3. Check connection parameters in your config file
4. Look for detailed error messages in Claude Desktop logs
### ProvideX-Specific Issues
For Sage 100/ProvideX:
1. Use minimal connection configuration (DSN, username, password, company)
2. Make sure the Company parameter is correct
3. Use the special ProvideX configuration template
4. If you encounter `Driver not capable` errors, check that autocommit is being set at connection time
### Missing Tables
If tables aren't showing up:
1. Verify user permissions for the database account
2. Check if the company code is correct (for Sage 100)
3. Try using fully qualified table names (schema.table)
## License
MIT License - Copyright (c) 2024
```
--------------------------------------------------------------------------------
/config/sqlite.ini:
--------------------------------------------------------------------------------
```
; SQLite ODBC Configuration
; Example configuration for connecting to a SQLite database via ODBC
[SERVER]
; Default connection to use if not specified
default_connection = sqlite_db
; Maximum rows to return per query (default: 1000)
max_rows = 1000
; Query timeout in seconds (default: 30)
timeout = 30
[sqlite_db]
; DSN-based connection - replace with your SQLite DSN name
dsn = SQLite_DSN_Name
; SQLite typically doesn't need username/password
; readonly = true enforces read-only mode for safety
readonly = true
```
--------------------------------------------------------------------------------
/config/config_template.ini:
--------------------------------------------------------------------------------
```
; ODBC MCP Server Configuration Template
; Copy this file and rename to config.ini
[SERVER]
; Default connection to use if not specified
default_connection = example_dsn
; Maximum rows to return per query (default: 1000)
max_rows = 1000
; Query timeout in seconds (default: 30)
timeout = 30
; Example DSN-based connection
[example_dsn]
dsn = MyDSN
username = username
password = password
; Enforce read-only mode (default: true)
readonly = true
; Example full connection string
[example_connstr]
connection_string = Driver={SQL Server};Server=myserver;Database=mydatabase;UID=username;PWD=password;
readonly = true
; Example DSN-less connection
[example_dsnless]
driver = SQL Server
server = myserver
database = mydatabase
username = username
password = password
readonly = true
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "odbc-mcp-server"
version = "0.1.0"
description = "MCP server for ODBC database connections"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "MIT" }
authors = [
{ name = "Tyler Stoltz", email = "[email protected]" }
]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: Microsoft :: Windows",
]
dependencies = [
"mcp>=0.1.0",
"pyodbc>=4.0.34",
"pydantic>=2.0.0",
"configparser>=5.0.0",
]
[project.scripts]
odbc-mcp-server = "odbc_mcp:main"
[project.optional-dependencies]
dev = [
"black>=23.0.0",
"pytest>=7.0.0",
]
[tool.hatch.build.targets.wheel]
packages = ["src/odbc_mcp"]
[tool.pytest.ini_options]
testpaths = ["tests"]
```
--------------------------------------------------------------------------------
/config/providex.ini:
--------------------------------------------------------------------------------
```
; Sage 100 ProvideX ODBC Configuration
; Minimal configuration for best compatibility with Sage 100 Advanced
[SERVER]
; Default connection to use if not specified
default_connection = sage100
; Maximum rows to return per query
max_rows = 1000
; Query timeout in seconds
timeout = 60
[sage100]
; IMPORTANT: For ProvideX compatibility, use minimal settings
; Replace with your ProvideX DSN (usually something like SOTAMAS90)
dsn = YOUR_PROVIDEX_DSN
; Replace with your actual username
username = YOUR_USERNAME
; Replace with your actual password
password = YOUR_PASSWORD
; Company code (required for Sage 100) - typically a 3-letter code
company = YOUR_COMPANY_CODE
; Always keep readonly mode for safety
readonly = true
; NOTE: For ProvideX compatibility:
; - Use minimal settings as shown above
; - Avoid adding extra parameters that may cause connection issues
; - The connection uses autocommit=True automatically for ProvideX drivers
```
--------------------------------------------------------------------------------
/src/odbc_mcp/__init__.py:
--------------------------------------------------------------------------------
```python
"""
ODBC MCP Server package.
Provides MCP tools for querying databases via ODBC.
"""
import asyncio
import argparse
import sys
from pathlib import Path
from .server import ODBCMCPServer
def main():
"""Main entry point for the package."""
parser = argparse.ArgumentParser(description="ODBC MCP Server")
parser.add_argument(
"--config", "-c",
help="Path to configuration file",
type=str
)
args = parser.parse_args()
try:
server = ODBCMCPServer(args.config)
asyncio.run(server.run())
except KeyboardInterrupt:
print("Server shutting down...")
sys.exit(0)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# Expose key classes at package level
from .config import ServerConfig, ODBCConnection
from .odbc import ODBCHandler
__all__ = ['ODBCMCPServer', 'ServerConfig', 'ODBCConnection', 'ODBCHandler', 'main']
```
--------------------------------------------------------------------------------
/src/odbc_mcp/config.py:
--------------------------------------------------------------------------------
```python
"""
Configuration management for ODBC MCP Server.
Handles loading and validating ODBC connection settings.
"""
import os
import json
from pathlib import Path
from typing import Dict, Optional, List, Any
import configparser
from pydantic import BaseModel, Field, validator
class ODBCConnection(BaseModel):
"""ODBC connection configuration model."""
name: str
connection_string: Optional[str] = None
dsn: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
driver: Optional[str] = None
server: Optional[str] = None
database: Optional[str] = None
additional_params: Dict[str, str] = Field(default_factory=dict)
readonly: bool = True # Enforce read-only mode
@validator('connection_string', 'dsn', 'username', 'password', 'driver', 'server', 'database', pre=True)
def empty_str_to_none(cls, v):
"""Convert empty strings to None."""
if v == "":
return None
return v
def get_connection_string(self) -> str:
"""Generate complete connection string for pyodbc."""
# If a full connection string is provided, use it
if self.connection_string:
return self.connection_string
# Otherwise build from components
parts = []
if self.dsn:
parts.append(f"DSN={self.dsn}")
if self.driver:
parts.append(f"Driver={{{self.driver}}}")
if self.server:
parts.append(f"Server={self.server}")
if self.database:
parts.append(f"Database={self.database}")
if self.username:
parts.append(f"UID={self.username}")
if self.password:
parts.append(f"PWD={self.password}")
# Add any additional parameters
for key, value in self.additional_params.items():
parts.append(f"{key}={value}")
return ";".join(parts)
class ServerConfig(BaseModel):
"""Main server configuration."""
connections: Dict[str, ODBCConnection] = Field(default_factory=dict)
default_connection: Optional[str] = None
max_rows: int = 1000 # Default limit for query results
timeout: int = 30 # Default timeout in seconds
@validator('default_connection')
def check_default_connection(cls, v, values):
"""Ensure default_connection references a valid connection."""
if v is not None and v not in values.get('connections', {}):
raise ValueError(f"Default connection '{v}' not found in configured connections")
return v
def load_from_ini(file_path: str) -> ServerConfig:
"""Load configuration from an INI file."""
if not os.path.exists(file_path):
raise FileNotFoundError(f"Configuration file not found: {file_path}")
config = configparser.ConfigParser()
config.read(file_path)
connections = {}
default_connection = None
max_rows = 1000
timeout = 30
# Extract server config
if 'SERVER' in config:
server_config = config['SERVER']
default_connection = server_config.get('default_connection')
max_rows = server_config.getint('max_rows', 1000)
timeout = server_config.getint('timeout', 30)
# Extract connection configs
for section in config.sections():
if section == 'SERVER':
continue
# This is a connection section
connection_config = dict(config[section])
# Handle additional parameters (anything not specifically processed)
additional_params = {}
for key, value in connection_config.items():
if key not in ['connection_string', 'dsn', 'username', 'password',
'driver', 'server', 'database', 'readonly']:
additional_params[key] = value
# Create the connection object
readonly = connection_config.get('readonly', 'true').lower() in ['true', 'yes', '1', 'on']
connection = ODBCConnection(
name=section,
connection_string=connection_config.get('connection_string'),
dsn=connection_config.get('dsn'),
username=connection_config.get('username'),
password=connection_config.get('password'),
driver=connection_config.get('driver'),
server=connection_config.get('server'),
database=connection_config.get('database'),
additional_params=additional_params,
readonly=readonly
)
connections[section] = connection
return ServerConfig(
connections=connections,
default_connection=default_connection,
max_rows=max_rows,
timeout=timeout
)
def load_from_claude_config(claude_config_path: Optional[str] = None) -> Optional[ServerConfig]:
"""
Load configuration from Claude Desktop's config file.
Looks for a configuration section under mcpServerEnv.odbc_mcp_server
"""
if claude_config_path is None:
# Default paths for Claude Desktop config
if os.name == 'nt': # Windows
claude_config_path = os.path.join(os.environ.get('APPDATA', ''), 'Claude', 'claude_desktop_config.json')
else: # macOS
claude_config_path = os.path.expanduser('~/Library/Application Support/Claude/claude_desktop_config.json')
if not os.path.exists(claude_config_path):
return None
try:
with open(claude_config_path, 'r') as f:
claude_config = json.load(f)
# Check if our server config exists
if 'mcpServerEnv' not in claude_config or 'odbc_mcp_server' not in claude_config['mcpServerEnv']:
return None
odbc_config = claude_config['mcpServerEnv']['odbc_mcp_server']
# Process connections
connections = {}
for conn_name, conn_config in odbc_config.get('connections', {}).items():
connections[conn_name] = ODBCConnection(
name=conn_name,
**conn_config
)
return ServerConfig(
connections=connections,
default_connection=odbc_config.get('default_connection'),
max_rows=odbc_config.get('max_rows', 1000),
timeout=odbc_config.get('timeout', 30)
)
except Exception as e:
print(f"Error loading Claude config: {e}")
return None
def load_config(config_path: Optional[str] = None) -> ServerConfig:
"""
Load configuration from file or Claude Desktop config.
Order of precedence:
1. Specified config file path
2. ENV variable ODBC_MCP_CONFIG
3. Claude Desktop config
4. Default config path (./config/config.ini)
"""
# Check specified path
if config_path and os.path.exists(config_path):
return load_from_ini(config_path)
# Check environment variable
env_config_path = os.environ.get('ODBC_MCP_CONFIG')
if env_config_path and os.path.exists(env_config_path):
return load_from_ini(env_config_path)
# Try Claude Desktop config
claude_config = load_from_claude_config()
if claude_config:
return claude_config
# Try default path
default_path = os.path.join(os.path.dirname(__file__), '..', '..', 'config', 'config.ini')
if os.path.exists(default_path):
return load_from_ini(default_path)
# If no config found, raise error
raise FileNotFoundError(
"No configuration found. Please provide a config file or set the ODBC_MCP_CONFIG environment variable."
)
```
--------------------------------------------------------------------------------
/src/odbc_mcp/server.py:
--------------------------------------------------------------------------------
```python
"""
MCP Server implementation for ODBC connections.
Provides tools for database querying via the Model Context Protocol.
"""
import asyncio
import os
import sys
import json
import logging
from typing import Dict, List, Any, Optional
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
import mcp.types as types
from .config import load_config, ServerConfig
from .odbc import ODBCHandler
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger("odbc-mcp-server")
class ODBCMCPServer:
"""
MCP Server that provides tools for ODBC database connectivity.
"""
def __init__(self, config_path: Optional[str] = None):
"""Initialize the server with configuration."""
try:
self.config = load_config(config_path)
self.odbc = ODBCHandler(self.config)
self.server = Server("odbc-mcp-server")
# Register tool handlers
self._register_tools()
logger.info(f"Initialized ODBC MCP Server with {len(self.config.connections)} connections")
except Exception as e:
logger.error(f"Failed to initialize server: {e}")
raise
def _register_tools(self):
"""Register all MCP tools."""
@self.server.list_tools()
async def list_tools() -> List[types.Tool]:
"""List available tools for the MCP client."""
return [
types.Tool(
name="list-connections",
description="List all configured database connections",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
types.Tool(
name="list-available-dsns",
description="List all available DSNs on the system",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
types.Tool(
name="test-connection",
description="Test a database connection and return information",
inputSchema={
"type": "object",
"properties": {
"connection_name": {
"type": "string",
"description": "Name of the connection to test (optional, uses default if not specified)"
}
},
"required": []
}
),
types.Tool(
name="list-tables",
description="List all tables in the database",
inputSchema={
"type": "object",
"properties": {
"connection_name": {
"type": "string",
"description": "Name of the connection to use (optional, uses default if not specified)"
}
},
"required": []
}
),
types.Tool(
name="get-table-schema",
description="Get schema information for a table",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table to describe (required)"
},
"connection_name": {
"type": "string",
"description": "Name of the connection to use (optional, uses default if not specified)"
}
},
"required": ["table_name"]
}
),
types.Tool(
name="execute-query",
description="Execute an SQL query and return results",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL query to execute (required)"
},
"connection_name": {
"type": "string",
"description": "Name of the connection to use (optional, uses default if not specified)"
},
"max_rows": {
"type": "integer",
"description": "Maximum number of rows to return (optional, uses default if not specified)"
}
},
"required": ["sql"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Handle tool execution requests."""
arguments = arguments or {}
try:
if name == "list-connections":
connections = self.odbc.list_connections()
result = {
"connections": connections,
"default_connection": self.config.default_connection
}
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "list-available-dsns":
dsns = self.odbc.get_available_dsns()
return [types.TextContent(type="text", text=json.dumps(dsns, indent=2))]
elif name == "test-connection":
connection_name = arguments.get("connection_name")
result = self.odbc.test_connection(connection_name)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "list-tables":
connection_name = arguments.get("connection_name")
tables = self.odbc.list_tables(connection_name)
# Format the results for better readability
result_text = "### Tables:\n\n"
for table in tables:
schema_prefix = f"{table['schema']}." if table['schema'] else ""
result_text += f"- {schema_prefix}{table['name']}\n"
return [types.TextContent(type="text", text=result_text)]
elif name == "get-table-schema":
table_name = arguments.get("table_name")
if not table_name:
raise ValueError("Table name is required")
connection_name = arguments.get("connection_name")
columns = self.odbc.get_table_schema(table_name, connection_name)
# Format the results for better readability
result_text = f"### Schema for table {table_name}:\n\n"
result_text += "| Column | Type | Size | Nullable |\n"
result_text += "| ------ | ---- | ---- | -------- |\n"
for column in columns:
result_text += f"| {column['name']} | {column['type']} | {column['size']} | {'Yes' if column['nullable'] else 'No'} |\n"
return [types.TextContent(type="text", text=result_text)]
elif name == "execute-query":
sql = arguments.get("sql")
if not sql:
raise ValueError("SQL query is required")
connection_name = arguments.get("connection_name")
max_rows = arguments.get("max_rows")
column_names, rows = self.odbc.execute_query(sql, connection_name, max_rows)
# Format the results as a markdown table
if not column_names:
return [types.TextContent(type="text", text="Query executed successfully, but no results were returned.")]
# Create the results table
result_text = "### Query Results:\n\n"
# Add the header row
result_text += "| " + " | ".join(column_names) + " |\n"
# Add the separator row
result_text += "| " + " | ".join(["---"] * len(column_names)) + " |\n"
# Add the data rows
for row in rows:
result_text += "| " + " | ".join(str(value) if value is not None else "NULL" for value in row) + " |\n"
# Add the row count
result_text += f"\n\n_Returned {len(rows)} rows_"
# Check if we hit the row limit
if max_rows and len(rows) >= max_rows:
result_text += f" _(limited to {max_rows} rows)_"
return [types.TextContent(type="text", text=result_text)]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error executing tool {name}: {e}")
error_message = f"Error executing {name}: {str(e)}"
return [types.TextContent(type="text", text=error_message)]
async def run(self):
"""Run the MCP server."""
try:
initialization_options = InitializationOptions(
server_name="odbc-mcp-server",
server_version="0.1.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
)
async with stdio_server() as (read_stream, write_stream):
logger.info("Starting ODBC MCP Server")
await self.server.run(
read_stream,
write_stream,
initialization_options,
)
except Exception as e:
logger.error(f"Server error: {e}")
raise
finally:
# Clean up connections
self.odbc.close_all_connections()
```
--------------------------------------------------------------------------------
/src/odbc_mcp/b1-odbc.py:
--------------------------------------------------------------------------------
```python
"""
ODBC connection and query management.
Handles database connections and provides methods for executing queries.
"""
import pyodbc
import re
from typing import List, Dict, Any, Optional, Tuple, Union
from .config import ODBCConnection, ServerConfig
class ODBCHandler:
"""Handles ODBC connections and query execution."""
def __init__(self, config: ServerConfig):
"""Initialize with server configuration."""
self.config = config
self.connections = config.connections
self.default_connection = config.default_connection
self.max_rows = config.max_rows
self.timeout = config.timeout
self.active_connections: Dict[str, pyodbc.Connection] = {}
def __del__(self):
"""Ensure all connections are closed on deletion."""
self.close_all_connections()
def close_all_connections(self):
"""Close all active database connections."""
for conn_name, conn in self.active_connections.items():
try:
conn.close()
except Exception:
pass
self.active_connections = {}
def get_connection(self, connection_name: Optional[str] = None) -> pyodbc.Connection:
"""
Get a database connection by name or use the default.
Args:
connection_name: Name of the connection to use, or None for default
Returns:
pyodbc.Connection: Active database connection
Raises:
ValueError: If connection name doesn't exist
ConnectionError: If connection fails
"""
# Use default if not specified
if connection_name is None:
if self.default_connection is None:
if len(self.connections) == 1:
# If only one connection is defined, use it
connection_name = list(self.connections.keys())[0]
else:
raise ValueError("No default connection specified and multiple connections exist")
else:
connection_name = self.default_connection
# Check if connection exists
if connection_name not in self.connections:
raise ValueError(f"Connection '{connection_name}' not found in configuration")
# Return existing connection if available
if connection_name in self.active_connections:
try:
# Test the connection with a simple query
self.active_connections[connection_name].cursor().execute("SELECT 1")
return self.active_connections[connection_name]
except Exception:
# Connection is stale, close it
try:
self.active_connections[connection_name].close()
except Exception:
pass
del self.active_connections[connection_name]
# Create new connection
connection_config = self.connections[connection_name]
conn_str = connection_config.get_connection_string()
try:
connection = pyodbc.connect(conn_str, timeout=self.timeout)
# Set read-only if specified
if connection_config.readonly:
connection.setdecoding(pyodbc.SQL_CHAR, encoding='utf-8')
connection.setdecoding(pyodbc.SQL_WCHAR, encoding='utf-8')
connection.setencoding(encoding='utf-8')
self.active_connections[connection_name] = connection
return connection
except Exception as e:
raise ConnectionError(f"Failed to connect to '{connection_name}': {str(e)}")
def list_connections(self) -> List[str]:
"""List all available connection names."""
return list(self.connections.keys())
def get_available_dsns(self) -> List[Dict[str, str]]:
"""
Get a list of all available DSNs on the system.
Returns:
List of dictionaries containing DSN information
"""
dsns = []
for dsn_info in pyodbc.dataSources().items():
dsns.append({
"name": dsn_info[0],
"driver": dsn_info[1]
})
return dsns
def list_tables(self, connection_name: Optional[str] = None) -> List[Dict[str, str]]:
"""
List all tables in the database.
Args:
connection_name: Name of the connection to use, or None for default
Returns:
List of dictionaries with table information
"""
connection = self.get_connection(connection_name)
cursor = connection.cursor()
tables = []
try:
for table_info in cursor.tables():
if table_info.table_type == 'TABLE':
tables.append({
"catalog": table_info.table_cat or "",
"schema": table_info.table_schem or "",
"name": table_info.table_name,
"type": table_info.table_type
})
return tables
except Exception as e:
# For some ODBC drivers that don't support table enumeration,
# fallback to a SQL query if possible
try:
sql_tables = []
cursor.execute("SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")
for row in cursor.fetchall():
sql_tables.append({
"catalog": row[0] or "",
"schema": row[1] or "",
"name": row[2],
"type": row[3]
})
return sql_tables
except Exception:
# If everything fails, raise the original error
raise ConnectionError(f"Failed to list tables: {str(e)}")
def get_table_schema(self, table_name: str, connection_name: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Get schema information for a table.
Args:
table_name: Name of the table
connection_name: Name of the connection to use, or None for default
Returns:
List of dictionaries with column information
"""
connection = self.get_connection(connection_name)
cursor = connection.cursor()
# Try to extract schema and table name
schema_parts = table_name.split('.')
if len(schema_parts) > 1:
schema_name = schema_parts[0]
table_name = schema_parts[1]
else:
schema_name = None
columns = []
try:
# Use metadata API if available
column_metadata = cursor.columns(table=table_name, schema=schema_name)
for column in column_metadata:
columns.append({
"name": column.column_name,
"type": column.type_name,
"size": column.column_size,
"nullable": column.nullable == 1,
"position": column.ordinal_position
})
# If we got column info, return it
if columns:
return columns
# Otherwise, try SQL approach
raise Exception("No columns found")
except Exception:
# Try SQL approach for drivers that don't support metadata
try:
sql = f"SELECT * FROM {table_name} WHERE 1=0"
cursor.execute(sql)
columns = []
for i, column in enumerate(cursor.description):
columns.append({
"name": column[0],
"type": self._get_type_name(column[1]),
"size": column[3],
"nullable": column[6] == 1,
"position": i+1
})
return columns
except Exception as e:
raise ValueError(f"Failed to get schema for table '{table_name}': {str(e)}")
def _get_type_name(self, type_code: int) -> str:
"""Convert ODBC type code to type name."""
type_map = {
pyodbc.SQL_CHAR: "CHAR",
pyodbc.SQL_VARCHAR: "VARCHAR",
pyodbc.SQL_LONGVARCHAR: "LONGVARCHAR",
pyodbc.SQL_WCHAR: "WCHAR",
pyodbc.SQL_WVARCHAR: "WVARCHAR",
pyodbc.SQL_WLONGVARCHAR: "WLONGVARCHAR",
pyodbc.SQL_DECIMAL: "DECIMAL",
pyodbc.SQL_NUMERIC: "NUMERIC",
pyodbc.SQL_SMALLINT: "SMALLINT",
pyodbc.SQL_INTEGER: "INTEGER",
pyodbc.SQL_REAL: "REAL",
pyodbc.SQL_FLOAT: "FLOAT",
pyodbc.SQL_DOUBLE: "DOUBLE",
pyodbc.SQL_BIT: "BIT",
pyodbc.SQL_TINYINT: "TINYINT",
pyodbc.SQL_BIGINT: "BIGINT",
pyodbc.SQL_BINARY: "BINARY",
pyodbc.SQL_VARBINARY: "VARBINARY",
pyodbc.SQL_LONGVARBINARY: "LONGVARBINARY",
pyodbc.SQL_TYPE_DATE: "DATE",
pyodbc.SQL_TYPE_TIME: "TIME",
pyodbc.SQL_TYPE_TIMESTAMP: "TIMESTAMP",
pyodbc.SQL_SS_VARIANT: "SQL_VARIANT",
pyodbc.SQL_SS_UDT: "UDT",
pyodbc.SQL_SS_XML: "XML",
pyodbc.SQL_SS_TIME2: "TIME",
pyodbc.SQL_SS_TIMESTAMPOFFSET: "TIMESTAMPOFFSET",
}
return type_map.get(type_code, f"UNKNOWN({type_code})")
def is_read_only_query(self, sql: str) -> bool:
"""
Check if an SQL query is read-only.
Args:
sql: SQL query to check
Returns:
bool: True if the query is read-only, False otherwise
"""
# Remove comments and normalize whitespace
sql = re.sub(r'--.*?(\n|$)', ' ', sql)
sql = re.sub(r'/\*.*?\*/', ' ', sql, flags=re.DOTALL)
sql = ' '.join(sql.split()).strip().upper()
# Check for data modification statements
data_modification_patterns = [
r'^\s*INSERT\s+INTO',
r'^\s*UPDATE\s+',
r'^\s*DELETE\s+FROM',
r'^\s*DROP\s+',
r'^\s*CREATE\s+',
r'^\s*ALTER\s+',
r'^\s*TRUNCATE\s+',
r'^\s*GRANT\s+',
r'^\s*REVOKE\s+',
r'^\s*MERGE\s+',
r'^\s*EXEC\s+',
r'^\s*EXECUTE\s+',
r'^\s*CALL\s+',
r'^\s*SET\s+',
r'^\s*USE\s+',
]
for pattern in data_modification_patterns:
if re.search(pattern, sql):
return False
# If no modification patterns are found, it's likely read-only
return True
def execute_query(self, sql: str, connection_name: Optional[str] = None,
max_rows: Optional[int] = None) -> Tuple[List[str], List[List[Any]]]:
"""
Execute an SQL query and return results.
Args:
sql: SQL query to execute
connection_name: Name of the connection to use, or None for default
max_rows: Maximum number of rows to return, or None for default
Returns:
Tuple of column names and result rows
"""
# Check if query is read-only for connections with readonly flag
connection = self.get_connection(connection_name)
connection_config = self.connections[connection_name or self.default_connection]
if connection_config.readonly and not self.is_read_only_query(sql):
raise ValueError("Write operations are not allowed on read-only connections")
# Set max rows limit
if max_rows is None:
max_rows = self.max_rows
# Execute the query
cursor = connection.cursor()
cursor.execute(sql)
# Get column names
column_names = [column[0] for column in cursor.description] if cursor.description else []
# Fetch results with row limit
results = []
row_count = 0
for row in cursor:
formatted_row = []
for value in row:
# Convert specific ODBC types to strings for JSON compatibility
if isinstance(value, (bytearray, bytes)):
formatted_row.append(str(value))
else:
formatted_row.append(value)
results.append(formatted_row)
row_count += 1
if row_count >= max_rows:
break
return column_names, results
def test_connection(self, connection_name: Optional[str] = None) -> Dict[str, Any]:
"""
Test a database connection and return information.
Args:
connection_name: Name of the connection to use, or None for default
Returns:
Dictionary with connection status and info
"""
try:
# Get connection
conn = self.get_connection(connection_name)
cursor = conn.cursor()
# Get database info
database_info = {}
try:
cursor.execute("SELECT @@version")
version = cursor.fetchone()
if version:
database_info["version"] = version[0]
except Exception:
# Some databases don't support @@version
pass
# Get connection info
conn_info = {
"driver_name": conn.getinfo(pyodbc.SQL_DRIVER_NAME) if hasattr(conn, 'getinfo') else "Unknown",
"driver_version": conn.getinfo(pyodbc.SQL_DRIVER_VER) if hasattr(conn, 'getinfo') else "Unknown",
"database_name": conn.getinfo(pyodbc.SQL_DATABASE_NAME) if hasattr(conn, 'getinfo') else "Unknown",
"dbms_name": conn.getinfo(pyodbc.SQL_DBMS_NAME) if hasattr(conn, 'getinfo') else "Unknown",
"dbms_version": conn.getinfo(pyodbc.SQL_DBMS_VER) if hasattr(conn, 'getinfo') else "Unknown",
}
return {
"status": "connected",
"connection_name": connection_name or self.default_connection,
"connection_info": conn_info,
"database_info": database_info
}
except Exception as e:
return {
"status": "error",
"connection_name": connection_name or self.default_connection,
"error": str(e)
}
```
--------------------------------------------------------------------------------
/src/odbc_mcp/odbc.py:
--------------------------------------------------------------------------------
```python
"""
ODBC connection and query management.
Handles database connections and provides methods for executing queries.
"""
import pyodbc
import re
from typing import List, Dict, Any, Optional, Tuple, Union
from .config import ODBCConnection, ServerConfig
class ODBCHandler:
"""Handles ODBC connections and query execution."""
def __init__(self, config: ServerConfig):
"""Initialize with server configuration."""
self.config = config
self.connections = config.connections
self.default_connection = config.default_connection
self.max_rows = config.max_rows
self.timeout = config.timeout
self.active_connections: Dict[str, pyodbc.Connection] = {}
def __del__(self):
"""Ensure all connections are closed on deletion."""
self.close_all_connections()
def close_all_connections(self):
"""Close all active database connections."""
for conn_name, conn in self.active_connections.items():
try:
conn.close()
except Exception:
pass
self.active_connections = {}
def get_connection(self, connection_name: Optional[str] = None) -> pyodbc.Connection:
"""
Get a database connection by name or use the default.
Args:
connection_name: Name of the connection to use, or None for default
Returns:
pyodbc.Connection: Active database connection
Raises:
ValueError: If connection name doesn't exist
ConnectionError: If connection fails
"""
# Use default if not specified
if connection_name is None:
if self.default_connection is None:
if len(self.connections) == 1:
# If only one connection is defined, use it
connection_name = list(self.connections.keys())[0]
else:
raise ValueError("No default connection specified and multiple connections exist")
else:
connection_name = self.default_connection
# Check if connection exists
if connection_name not in self.connections:
raise ValueError(f"Connection '{connection_name}' not found in configuration")
# Return existing connection if available
if connection_name in self.active_connections:
try:
# Test the connection with a simple query
self.active_connections[connection_name].cursor().execute("SELECT 1")
return self.active_connections[connection_name]
except Exception:
# Connection is stale, close it
try:
self.active_connections[connection_name].close()
except Exception:
pass
del self.active_connections[connection_name]
# Create new connection
connection_config = self.connections[connection_name]
conn_str = connection_config.get_connection_string()
try:
# Detect if this is ProvideX or has ProvideX in the connection string
is_providex = "PROVIDEX" in conn_str.upper() or connection_name.upper() == "SAGE100"
# Special handling for ProvideX
if is_providex:
# For ProvideX, explicitly set autocommit at connection time
connection = pyodbc.connect(conn_str, timeout=self.timeout, autocommit=True)
else:
# For other drivers, use the standard connection approach
connection = pyodbc.connect(conn_str, timeout=self.timeout)
# Set encoding options
connection.setdecoding(pyodbc.SQL_CHAR, encoding='utf-8')
connection.setdecoding(pyodbc.SQL_WCHAR, encoding='utf-8')
connection.setencoding(encoding='utf-8')
self.active_connections[connection_name] = connection
return connection
except Exception as e:
raise ConnectionError(f"Failed to connect to '{connection_name}': {str(e)}")
def list_connections(self) -> List[str]:
"""List all available connection names."""
return list(self.connections.keys())
def get_available_dsns(self) -> List[Dict[str, str]]:
"""
Get a list of all available DSNs on the system.
Returns:
List of dictionaries containing DSN information
"""
dsns = []
for dsn_info in pyodbc.dataSources().items():
dsns.append({
"name": dsn_info[0],
"driver": dsn_info[1]
})
return dsns
def list_tables(self, connection_name: Optional[str] = None) -> List[Dict[str, str]]:
"""
List all tables in the database.
Args:
connection_name: Name of the connection to use, or None for default
Returns:
List of dictionaries with table information
"""
connection = self.get_connection(connection_name)
cursor = connection.cursor()
tables = []
try:
for table_info in cursor.tables():
if table_info.table_type == 'TABLE':
tables.append({
"catalog": table_info.table_cat or "",
"schema": table_info.table_schem or "",
"name": table_info.table_name,
"type": table_info.table_type
})
return tables
except Exception as e:
# For some ODBC drivers that don't support table enumeration,
# fallback to a SQL query if possible
try:
sql_tables = []
cursor.execute("SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")
for row in cursor.fetchall():
sql_tables.append({
"catalog": row[0] or "",
"schema": row[1] or "",
"name": row[2],
"type": row[3]
})
return sql_tables
except Exception:
# If everything fails, raise the original error
raise ConnectionError(f"Failed to list tables: {str(e)}")
def get_table_schema(self, table_name: str, connection_name: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Get schema information for a table.
Args:
table_name: Name of the table
connection_name: Name of the connection to use, or None for default
Returns:
List of dictionaries with column information
"""
connection = self.get_connection(connection_name)
cursor = connection.cursor()
# Try to extract schema and table name
schema_parts = table_name.split('.')
if len(schema_parts) > 1:
schema_name = schema_parts[0]
table_name = schema_parts[1]
else:
schema_name = None
columns = []
try:
# Use metadata API if available
column_metadata = cursor.columns(table=table_name, schema=schema_name)
for column in column_metadata:
columns.append({
"name": column.column_name,
"type": column.type_name,
"size": column.column_size,
"nullable": column.nullable == 1,
"position": column.ordinal_position
})
# If we got column info, return it
if columns:
return columns
# Otherwise, try SQL approach
raise Exception("No columns found")
except Exception:
# Try SQL approach for drivers that don't support metadata
try:
sql = f"SELECT * FROM {table_name} WHERE 1=0"
cursor.execute(sql)
columns = []
for i, column in enumerate(cursor.description):
columns.append({
"name": column[0],
"type": self._get_type_name(column[1]),
"size": column[3],
"nullable": column[6] == 1,
"position": i+1
})
return columns
except Exception as e:
raise ValueError(f"Failed to get schema for table '{table_name}': {str(e)}")
def _get_type_name(self, type_code: int) -> str:
"""Convert ODBC type code to type name."""
type_map = {
pyodbc.SQL_CHAR: "CHAR",
pyodbc.SQL_VARCHAR: "VARCHAR",
pyodbc.SQL_LONGVARCHAR: "LONGVARCHAR",
pyodbc.SQL_WCHAR: "WCHAR",
pyodbc.SQL_WVARCHAR: "WVARCHAR",
pyodbc.SQL_WLONGVARCHAR: "WLONGVARCHAR",
pyodbc.SQL_DECIMAL: "DECIMAL",
pyodbc.SQL_NUMERIC: "NUMERIC",
pyodbc.SQL_SMALLINT: "SMALLINT",
pyodbc.SQL_INTEGER: "INTEGER",
pyodbc.SQL_REAL: "REAL",
pyodbc.SQL_FLOAT: "FLOAT",
pyodbc.SQL_DOUBLE: "DOUBLE",
pyodbc.SQL_BIT: "BIT",
pyodbc.SQL_TINYINT: "TINYINT",
pyodbc.SQL_BIGINT: "BIGINT",
pyodbc.SQL_BINARY: "BINARY",
pyodbc.SQL_VARBINARY: "VARBINARY",
pyodbc.SQL_LONGVARBINARY: "LONGVARBINARY",
pyodbc.SQL_TYPE_DATE: "DATE",
pyodbc.SQL_TYPE_TIME: "TIME",
pyodbc.SQL_TYPE_TIMESTAMP: "TIMESTAMP",
pyodbc.SQL_SS_VARIANT: "SQL_VARIANT",
pyodbc.SQL_SS_UDT: "UDT",
pyodbc.SQL_SS_XML: "XML",
pyodbc.SQL_SS_TIME2: "TIME",
pyodbc.SQL_SS_TIMESTAMPOFFSET: "TIMESTAMPOFFSET",
}
return type_map.get(type_code, f"UNKNOWN({type_code})")
def is_read_only_query(self, sql: str) -> bool:
"""
Check if an SQL query is read-only.
Args:
sql: SQL query to check
Returns:
bool: True if the query is read-only, False otherwise
"""
# Remove comments and normalize whitespace
sql = re.sub(r'--.*?(\n|$)', ' ', sql)
sql = re.sub(r'/\*.*?\*/', ' ', sql, flags=re.DOTALL)
sql = ' '.join(sql.split()).strip().upper()
# Check for data modification statements
data_modification_patterns = [
r'^\s*INSERT\s+INTO',
r'^\s*UPDATE\s+',
r'^\s*DELETE\s+FROM',
r'^\s*DROP\s+',
r'^\s*CREATE\s+',
r'^\s*ALTER\s+',
r'^\s*TRUNCATE\s+',
r'^\s*GRANT\s+',
r'^\s*REVOKE\s+',
r'^\s*MERGE\s+',
r'^\s*EXEC\s+',
r'^\s*EXECUTE\s+',
r'^\s*CALL\s+',
r'^\s*SET\s+',
r'^\s*USE\s+',
]
for pattern in data_modification_patterns:
if re.search(pattern, sql):
return False
# If no modification patterns are found, it's likely read-only
return True
def execute_query(self, sql: str, connection_name: Optional[str] = None,
max_rows: Optional[int] = None) -> Tuple[List[str], List[List[Any]]]:
"""
Execute an SQL query and return results.
Args:
sql: SQL query to execute
connection_name: Name of the connection to use, or None for default
max_rows: Maximum number of rows to return, or None for default
Returns:
Tuple of column names and result rows
"""
# Check if query is read-only for connections with readonly flag
connection = self.get_connection(connection_name)
connection_config = self.connections[connection_name or self.default_connection]
if connection_config.readonly and not self.is_read_only_query(sql):
raise ValueError("Write operations are not allowed on read-only connections")
# Set max rows limit
if max_rows is None:
max_rows = self.max_rows
# Execute the query
cursor = connection.cursor()
cursor.execute(sql)
# Get column names
column_names = [column[0] for column in cursor.description] if cursor.description else []
# Fetch results with row limit
results = []
row_count = 0
for row in cursor:
formatted_row = []
for value in row:
# Convert specific ODBC types to strings for JSON compatibility
if isinstance(value, (bytearray, bytes)):
formatted_row.append(str(value))
else:
formatted_row.append(value)
results.append(formatted_row)
row_count += 1
if row_count >= max_rows:
break
return column_names, results
def test_connection(self, connection_name: Optional[str] = None) -> Dict[str, Any]:
"""
Test a database connection and return information.
Args:
connection_name: Name of the connection to use, or None for default
Returns:
Dictionary with connection status and info
"""
try:
# Get connection
conn = self.get_connection(connection_name)
cursor = conn.cursor()
# Get database info
database_info = {}
try:
cursor.execute("SELECT @@version")
version = cursor.fetchone()
if version:
database_info["version"] = version[0]
except Exception:
# Some databases don't support @@version
pass
# Get connection info
conn_info = {
"driver_name": conn.getinfo(pyodbc.SQL_DRIVER_NAME) if hasattr(conn, 'getinfo') else "Unknown",
"driver_version": conn.getinfo(pyodbc.SQL_DRIVER_VER) if hasattr(conn, 'getinfo') else "Unknown",
"database_name": conn.getinfo(pyodbc.SQL_DATABASE_NAME) if hasattr(conn, 'getinfo') else "Unknown",
"dbms_name": conn.getinfo(pyodbc.SQL_DBMS_NAME) if hasattr(conn, 'getinfo') else "Unknown",
"dbms_version": conn.getinfo(pyodbc.SQL_DBMS_VER) if hasattr(conn, 'getinfo') else "Unknown",
}
return {
"status": "connected",
"connection_name": connection_name or self.default_connection,
"connection_info": conn_info,
"database_info": database_info
}
except Exception as e:
return {
"status": "error",
"connection_name": connection_name or self.default_connection,
"error": str(e)
}
```