# Directory Structure
```
├── .env
├── .gitignore
├── LICENSE
├── mcp_sqlalchemy_server
│ ├── __init__.py
│ └── server.py
├── pyproject.toml
└── README.md
```
# Files
--------------------------------------------------------------------------------
/.env:
--------------------------------------------------------------------------------
```
ODBC_DSN=VOS
ODBC_USER=dba
ODBC_PASSWORD=dba
API_KEY=sk-xxx
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
#.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
---
# MCP Server ODBC via SQLAlchemy
A lightweight MCP (Model Context Protocol) server for ODBC built with **FastAPI**, **pyodbc**, and **SQLAlchemy**. This server is compatible with Virtuoso DBMS and other DBMS backends that implement a SQLAlchemy provider.

---
## Features
- **Get Schemas**: Fetch and list all schema names from the connected database.
- **Get Tables**: Retrieve table information for specific schemas or all schemas.
- **Describe Table**: Generate a detailed description of table structures, including:
- Column names and data types
- Nullable attributes
- Primary and foreign keys
- **Search Tables**: Filter and retrieve tables based on name substrings.
- **Execute Stored Procedures**: In the case of Virtuoso, execute stored procedures and retrieve results.
- **Execute Queries**:
- JSONL result format: Optimized for structured responses.
- Markdown table format: Ideal for reporting and visualization.
---
## Prerequisites
1. **Install uv**:
```bash
pip install uv
```
Or use Homebrew:
```bash
brew install uv
```
2. **unixODBC Runtime Environment Checks**:
1. Check installation configuration (i.e., location of key INI files) by running: `odbcinst -j`
2. List available data source names by running: `odbcinst -q -s`
3. **ODBC DSN Setup**: Configure your ODBC Data Source Name (`~/.odbc.ini`) for the target database. Example for Virtuoso DBMS:
```
[VOS]
Description = OpenLink Virtuoso
Driver = /path/to/virtodbcu_r.so
Database = Demo
Address = localhost:1111
WideAsUTF16 = Yes
```
3. **SQLAlchemy URL Binding**: Use the format:
```
virtuoso+pyodbc://user:password@VOS
```
---
## Installation
Clone this repository:
```bash
git clone https://github.com/OpenLinkSoftware/mcp-sqlalchemy-server.git
cd mcp-sqlalchemy-server
```
## Environment Variables
Update your `.env`by overriding the defaults to match your preferences
```
ODBC_DSN=VOS
ODBC_USER=dba
ODBC_PASSWORD=dba
API_KEY=xxx
```
---
## Configuration
For **Claude Desktop** users:
Add the following to `claude_desktop_config.json`:
```json
{
"mcpServers": {
"my_database": {
"command": "uv",
"args": ["--directory", "/path/to/mcp-sqlalchemy-server", "run", "mcp-sqlalchemy-server"],
"env": {
"ODBC_DSN": "dsn_name",
"ODBC_USER": "username",
"ODBC_PASSWORD": "password",
"API_KEY": "sk-xxx"
}
}
}
}
```
---
# Usage
## Database Management System (DBMS) Connection URLs
Here are the pyodbc URL examples for connecting to DBMS systems that have been tested using this mcp-server.
| Database | URL Format |
|---------------|-----------------------------------------------|
| Virtuoso DBMS | `virtuoso+pyodbc://user:password@ODBC_DSN` |
| PostgreSQL | `postgresql://user:password@localhost/dbname` |
| MySQL | `mysql+pymysql://user:password@localhost/dbname` |
| SQLite | `sqlite:///path/to/database.db` |
Once connected, you can interact with your WhatsApp contacts through Claude, leveraging Claude's AI capabilities in your WhatsApp conversations.
## Tools Provided
### Overview
|name|description|
|---|---|
|podbc_get_schemas|List database schemas accessible to connected database management system (DBMS).|
|podbc_get_tables|List tables associated with a selected database schema.|
|podbc_describe_table|Provide the description of a table associated with a designated database schema. This includes information about column names, data types, nulls handling, autoincrement, primary key, and foreign keys|
|podbc_filter_table_names|List tables, based on a substring pattern from the `q` input field, associated with a selected database schema.|
|podbc_query_database|Execute a SQL query and return results in JSONL format.|
|podbc_execute_query|Execute a SQL query and return results in JSONL format.|
|podbc_execute_query_md|Execute a SQL query and return results in Markdown table format.|
|podbc_spasql_query|Execute a SPASQL query and return results.|
|podbc_sparql_query|Execute a SPARQL query and return results.|
|podbc_virtuoso_support_ai|Interact with the Virtuoso Support Assistant/Agent -- a Virtuoso-specific feature for interacting with LLMs|
### Detailed Description
- **podbc_get_schemas**
- Retrieve and return a list of all schema names from the connected database.
- Input parameters:
- `user` (string, optional): Database username. Defaults to "demo".
- `password` (string, optional): Database password. Defaults to "demo".
- `dsn` (string, optional): ODBC data source name. Defaults to "Local Virtuoso".
- Returns a JSON string array of schema names.
- **podbc_get_tables**
- Retrieve and return a list containing information about tables in a specified schema. If no schema is provided, uses the connection's default schema.
- Input parameters:
- `schema` (string, optional): Database schema to filter tables. Defaults to connection default.
- `user` (string, optional): Database username. Defaults to "demo".
- `password` (string, optional): Database password. Defaults to "demo".
- `dsn` (string, optional): ODBC data source name. Defaults to "Local Virtuoso".
- Returns a JSON string containing table information (e.g., TABLE_CAT, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE).
- **podbc_filter_table_names**
- Filters and returns information about tables whose names contain a specific substring.
- Input parameters:
- `q` (string, required): The substring to search for within table names.
- `schema` (string, optional): Database schema to filter tables. Defaults to connection default.
- `user` (string, optional): Database username. Defaults to "demo".
- `password` (string, optional): Database password. Defaults to "demo".
- `dsn` (string, optional): ODBC data source name. Defaults to "Local Virtuoso".
- Returns a JSON string containing information for matching tables.
- **podbc_describe_table**
- Retrieve and return detailed information about the columns of a specific table.
- Input parameters:
- `schema` (string, required): The database schema name containing the table.
- `table` (string, required): The name of the table to describe.
- `user` (string, optional): Database username. Defaults to "demo".
- `password` (string, optional): Database password. Defaults to "demo".
- `dsn` (string, optional): ODBC data source name. Defaults to "Local Virtuoso".
- Returns a JSON string describing the table's columns (e.g., COLUMN_NAME, TYPE_NAME, COLUMN_SIZE, IS_NULLABLE).
- **podbc_query_database**
- Execute a standard SQL query and return the results in JSON format.
- Input parameters:
- `query` (string, required): The SQL query string to execute.
- `user` (string, optional): Database username. Defaults to "demo".
- `password` (string, optional): Database password. Defaults to "demo".
- `dsn` (string, optional): ODBC data source name. Defaults to "Local Virtuoso".
- Returns query results as a JSON string.
- **podbc_query_database_md**
- Execute a standard SQL query and return the results formatted as a Markdown table.
- Input parameters:
- `query` (string, required): The SQL query string to execute.
- `user` (string, optional): Database username. Defaults to "demo".
- `password` (string, optional): Database password. Defaults to "demo".
- `dsn` (string, optional): ODBC data source name. Defaults to "Local Virtuoso".
- Returns query results as a Markdown table string.
- **podbc_query_database_jsonl**
- Execute a standard SQL query and return the results in JSON Lines (JSONL) format (one JSON object per line).
- Input parameters:
- `query` (string, required): The SQL query string to execute.
- `user` (string, optional): Database username. Defaults to "demo".
- `password` (string, optional): Database password. Defaults to "demo".
- `dsn` (string, optional): ODBC data source name. Defaults to "Local Virtuoso".
- Returns query results as a JSONL string.
- **podbc_spasql_query**
- Execute a SPASQL (SQL/SPARQL hybrid) query return results. This is a Virtuoso-specific feature.
- Input parameters:
- `query` (string, required): The SPASQL query string.
- `max_rows` (number, optional): Maximum number of rows to return. Defaults to 20.
- `timeout` (number, optional): Query timeout in milliseconds. Defaults to 30000.
- `user` (string, optional): Database username. Defaults to "demo".
- `password` (string, optional): Database password. Defaults to "demo".
- `dsn` (string, optional): ODBC data source name. Defaults to "Local Virtuoso".
- Returns the result from the underlying stored procedure call (e.g., `Demo.demo.execute_spasql_query`).
- **podbc_sparql_query**
- Execute a SPARQL query and return results. This is a Virtuoso-specific feature.
- Input parameters:
- `query` (string, required): The SPARQL query string.
- `format` (string, optional): Desired result format. Defaults to 'json'.
- `timeout` (number, optional): Query timeout in milliseconds. Defaults to 30000.
- `user` (string, optional): Database username. Defaults to "demo".
- `password` (string, optional): Database password. Defaults to "demo".
- `dsn` (string, optional): ODBC data source name. Defaults to "Local Virtuoso".
- Returns the result from the underlying function call (e.g., `"UB".dba."sparqlQuery"`).
- **podbc_virtuoso_support_ai**
- Utilizes a Virtuoso-specific AI Assistant function, passing a prompt and optional API key. This is a Virtuoso-specific feature.
- Input parameters:
- `prompt` (string, required): The prompt text for the AI function.
- `api_key` (string, optional): API key for the AI service. Defaults to "none".
- `user` (string, optional): Database username. Defaults to "demo".
- `password` (string, optional): Database password. Defaults to "demo".
- `dsn` (string, optional): ODBC data source name. Defaults to "Local Virtuoso".
- Returns the result from the AI Support Assistant function call (e.g., `DEMO.DBA.OAI_VIRTUOSO_SUPPORT_AI`).
---
## Troubleshooting
For easier troubleshooting:
1. Install the MCP Inspector:
```bash
npm install -g @modelcontextprotocol/inspector
```
2. Start the inspector:
```bash
npx @modelcontextprotocol/inspector uv --directory /path/to/mcp-sqlalchemy-server run mcp-sqlalchemy-server
```
Access the provided URL to troubleshoot server interactions.
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "mcp-sqlalchemy-server"
version = "0.3.1"
description = "A simple MCP ODBC server using FastAPI and ODBC"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"mcp[cli]>=1.4.1",
"pyodbc>=5.2.0",
"python-dotenv>=1.0.1",
]
[[project.authors]]
name = "Sergey Malinin"
email = "[email protected]"
[build-system]
requires = [ "hatchling",]
build-backend = "hatchling.build"
[project.scripts]
mcp-sqlalchemy-server = "mcp_sqlalchemy_server:main"
```
--------------------------------------------------------------------------------
/mcp_sqlalchemy_server/__init__.py:
--------------------------------------------------------------------------------
```python
import argparse
import logging
import os
from .server import (
podbc_get_schemas,
podbc_get_tables,
podbc_describe_table,
podbc_filter_table_names,
podbc_execute_query,
podbc_execute_query_md,
mcp,
podbc_query_database,
podbc_spasql_query,
podbc_sparql_query,
podbc_virtuoso_support_ai,
podbc_sparql_func,
podbc_sparql_get_entity_types,
podbc_sparql_get_entity_types_detailed,
podbc_sparql_get_entity_types_samples,
podbc_sparql_get_ontologies
)
# Optionally expose other important items at package level
__all__ = [
"podbc_get_schemas",
"podbc_get_tables",
"podbc_describe_table",
"podbc_filter_table_names",
"podbc_execute_query",
"podbc_execute_query_md",
"podbc_query_database",
"podbc_spasql_query",
"podbc_sparql_query",
"podbc_virtuoso_support_ai",
"podbc_sparql_func",
"podbc_sparql_get_entity_types",
"podbc_sparql_get_entity_types_detailed",
"podbc_sparql_get_entity_types_samples",
"podbc_sparql_get_ontologies"
]
def main():
parser = argparse.ArgumentParser(description="MCP SQLAlchemy Server")
parser.add_argument("--transport", type=str, default="stdio", choices=["stdio", "sse"],
help="Transport mode: stdio or sse")
args = parser.parse_args()
logging.info(f"Starting server with transport={args.transport} ")
mcp.run(transport=args.transport)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/mcp_sqlalchemy_server/server.py:
--------------------------------------------------------------------------------
```python
from collections import defaultdict
import os
import logging
from dotenv import load_dotenv
import pyodbc
from typing import Any, Dict, List, Optional
import json
from mcp.server.fastmcp import FastMCP
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Retrieve database connection details from environment variables
DB_UID = os.getenv("ODBC_USER")
DB_PWD = os.getenv("ODBC_PASSWORD")
DB_DSN = os.getenv("ODBC_DSN")
MAX_LONG_DATA = int(os.getenv("MAX_LONG_DATA",4096))
API_KEY = os.getenv("API_KEY", "none")
### Database ###
def get_connection(readonly=True, uid: Optional[str] = None, pwd: Optional[str] = None,
dsn: Optional[str] = None) -> pyodbc.Connection:
dsn = DB_DSN if dsn is None else dsn
uid = DB_UID if uid is None else uid
pwd = DB_PWD if pwd is None else pwd
if dsn is None:
raise ValueError("ODBC_DSN environment variable is not set.")
if uid is None:
raise ValueError("ODBC_USER environment variable is not set.")
if pwd is None:
raise ValueError("ODBC_PASSWORD environment variable is not set.")
dsn_string = f"DSN={dsn};UID={uid};PWD={pwd}"
logging.info(f"DSN:{dsn} UID:{uid}")
# connection_string="DSN=VOS;UID=dba;PWD=dba"
return pyodbc.connect(dsn_string, autocommit=True, readonly=readonly)
### Constants ###
### MCP ###
mcp = FastMCP('mcp-sqlalchemy-server', transport=["stdio", "sse"])
@mcp.tool(
name="podbc_get_schemas",
description="Retrieve and return a list of all schema names from the connected database."
)
def podbc_get_schemas(user:Optional[str]=None, password:Optional[str]=None, dsn:Optional[str]=None) -> str:
"""
Retrieve and return a list of all schema names from the connected database.
Args:
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: A list of schema names.
"""
try:
with get_connection(True, user, password, dsn) as conn:
cursor = conn.cursor()
rs = cursor.tables(table=None, catalog="%", schema=None, tableType=None);
catalogs = {row[0] for row in rs.fetchall()}
return json.dumps(list(catalogs))
except pyodbc.Error as e:
logging.error(f"Error retrieving schemas: {e}")
raise
@mcp.tool(
name="podbc_get_tables",
description="Retrieve and return a list containing information about tables in specified schema, if empty uses connection default"
)
def podbc_get_tables(Schema: Optional[str] = None, user:Optional[str]=None,
password:Optional[str]=None, dsn:Optional[str]=None) -> str:
"""
Retrieve and return a list containing information about tables.
If `schema` is None, returns tables for all schemas.
If `schema` is not None, returns tables for the specified schema.
Args:
schema (Optional[str]): The name of the schema to retrieve tables for. If None, retrieves tables for all schemas.
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: A list containing information about tables.
"""
cat = "%" if Schema is None else Schema
try:
with get_connection(True, user, password, dsn) as conn:
cursor = conn.cursor()
rs = cursor.tables(table=None, catalog=cat, schema="%", tableType="TABLE");
results = []
for row in rs:
results.append({"TABLE_CAT":row[0], "TABLE_SCHEM":row[1], "TABLE_NAME":row[2]})
return json.dumps(results, indent=2)
except pyodbc.Error as e:
logging.error(f"Error retrieving tables: {e}")
raise
@mcp.tool(
name="podbc_describe_table",
description="Retrieve and return a dictionary containing the definition of a table, including column names, data types, nullable,"
" autoincrement, primary key, and foreign keys."
)
def podbc_describe_table(Schema:str, table: str, user:Optional[str]=None,
password:Optional[str]=None, dsn:Optional[str]=None) -> str:
"""
Retrieve and return a dictionary containing the definition of a table, including column names, data types, nullable, autoincrement, primary key, and foreign keys.
If `schema` is None, returns the table definition for the specified table in all schemas.
If `schema` is not None, returns the table definition for the specified table in the specified schema.
Args:
schema (str): The name of the schema to retrieve the table definition for. If None, retrieves the table definition for all schemas.
table (str): The name of the table to retrieve the definition for.
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: A dictionary containing the table definition, including column names, data types, nullable, autoincrement, primary key, and foreign keys.
"""
cat = "%" if Schema is None else Schema
table_definition = {}
try:
with get_connection(True, user, password, dsn) as conn:
rc, tbl = _has_table(conn, cat=cat, table=table)
if rc:
table_definition = _get_table_info(conn, cat=tbl.get("cat"), sch=tbl.get("sch"), table=tbl.get("name"))
return json.dumps(table_definition, indent=2)
except pyodbc.Error as e:
logging.error(f"Error retrieving table definition: {e}")
raise
def _has_table(conn, cat:str, table:str):
with conn.cursor() as cursor:
row = cursor.tables(table=table, catalog=cat, schema=None, tableType=None).fetchone()
if row:
return True, {"cat":row[0], "sch": row[1], "name":row[2]}
else:
return False, {}
def _get_columns(conn, cat: str, sch: str, table:str):
with conn.cursor() as cursor:
ret = []
for row in cursor.columns(table=table, catalog=cat, schema=sch):
ret.append({
"name":row[3],
"type":row[5],
"column_size": row[6],
# "decimal_digits":row[8],
"num_prec_radix":row[9],
"nullable":False if row[10]==0 else True,
"default":row[12]
})
return ret
def _get_pk_constraint(conn, cat: str, sch: str, table:str):
with conn.cursor() as cursor:
ret = None
rs = cursor.primaryKeys(table=table, catalog=cat, schema=sch).fetchall()
if len(rs) > 0:
ret = { "constrained_columns": [row[3] for row in rs],
"name": rs[0][5]
}
return ret
def _get_foreign_keys(conn, cat: str, sch: str, table:str):
def fkey_rec():
return {
"name": None,
"constrained_columns": [],
"referred_cat": None,
"referred_schem": None,
"referred_table": None,
"referred_columns": [],
"options": {},
}
fkeys = defaultdict(fkey_rec)
with conn.cursor() as cursor:
rs = cursor.foreignKeys(foreignTable=table, foreignCatalog=cat, foreignSchema=sch)
for row in rs:
rec = fkeys[row[11]] #.FK_NAME
rec["name"] = row[11] #.FK_NAME
c_cols = rec["constrained_columns"]
c_cols.append(row[7]) #.FKCOLUMN_NAME)
r_cols = rec["referred_columns"]
r_cols.append(row[3]) #.PKCOLUMN_NAME)
if not rec["referred_table"]:
rec["referred_table"] = row[2] #.PKTABLE_NAME
rec["referred_schem"] = row[1] #.PKTABLE_OWNER
rec["referred_cat"] = row[0] #.PKTABLE_CAT
return list(fkeys.values())
def _get_table_info(conn, cat:str, sch: str, table: str) -> Dict[str, Any]:
try:
columns = _get_columns(conn, cat=cat, sch=sch, table=table)
primary_keys = _get_pk_constraint(conn, cat=cat, sch=sch, table=table)['constrained_columns']
foreign_keys = _get_foreign_keys(conn, cat=cat, sch=sch, table=table)
table_info = {
"TABLE_CAT": cat,
"TABLE_SCHEM": sch,
"TABLE_NAME": table,
"columns": columns,
"primary_keys": primary_keys,
"foreign_keys": foreign_keys
}
for column in columns:
column["primary_key"] = column['name'] in primary_keys
return table_info
except pyodbc.Error as e:
logging.error(f"Error retrieving table info: {e}")
raise
@mcp.tool(
name="podbc_filter_table_names",
description="Retrieve and return a list containing information about tables whose names contain the substring 'q' ."
)
def podbc_filter_table_names(q: str, Schema: Optional[str] = None, user:Optional[str]=None, password:Optional[str]=None,
dsn:Optional[str]=None) -> str:
"""
Retrieve and return a list containing information about tables whose names contain the substring 'q'
Args:
q (str): The substring to filter table names by.
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: A list containing information about tables whose names contain the substring 'q'.
"""
cat = "%" if Schema is None else Schema
try:
with get_connection(True, user, password, dsn) as conn:
cursor = conn.cursor()
rs = cursor.tables(table=None, catalog=cat, schema='%', tableType="TABLE");
results = []
for row in rs:
if q in row[2]:
results.append({"TABLE_CAT":row[0], "TABLE_SCHEM":row[1], "TABLE_NAME":row[2]})
return json.dumps(results, indent=2)
except pyodbc.Error as e:
logging.error(f"Error filtering table names: {e}")
raise
@mcp.tool(
name="podbc_execute_query",
description="Execute a SQL query and return results in JSONL format."
)
def podbc_execute_query(query: str, max_rows: int = 100, params: Optional[Dict[str, Any]] = None,
user:Optional[str]=None, password:Optional[str]=None, dsn:Optional[str]=None) -> str:
"""
Execute a SQL query and return results in JSONL format.
Args:
query (str): The SQL query to execute.
max_rows (int): Maximum number of rows to return. Default is 100.
params (Optional[Dict[str, Any]]): Optional dictionary of parameters to pass to the query.
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: Results in JSONL format.
"""
try:
with get_connection(True, user, password, dsn) as conn:
cursor = conn.cursor()
rs = cursor.execute(query) if params is None else cursor.execute(query, params)
columns = [column[0] for column in rs.description]
results = []
for row in rs:
rs_dict = dict(zip(columns, row))
truncated_row = {key: (str(value)[:MAX_LONG_DATA] if value is not None else None) for key, value in rs_dict.items()}
results.append(truncated_row)
if len(results) >= max_rows:
break
# Convert the results to JSONL format
jsonl_results = "\n".join(json.dumps(row) for row in results)
# Return the JSONL formatted results
return jsonl_results
except pyodbc.Error as e:
logging.error(f"Error executing query: {e}")
raise
@mcp.tool(
name="podbc_execute_query_md",
description="Execute a SQL query and return results in Markdown table format."
)
def podbc_execute_query_md(query: str, max_rows: int = 100, params: Optional[Dict[str, Any]] = None,
user:Optional[str]=None, password:Optional[str]=None, dsn:Optional[str]=None) -> str:
"""
Execute a SQL query and return results in Markdown table format.
Args:
query (str): The SQL query to execute.
max_rows (int): Maximum number of rows to return. Default is 100.
params (Optional[Dict[str, Any]]): Optional dictionary of parameters to pass to the query.
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: Results in Markdown table format.
"""
try:
with get_connection(True, user, password, dsn) as conn:
cursor = conn.cursor()
rs = cursor.execute(query) if params is None else cursor.execute(query, params)
columns = [column[0] for column in rs.description]
results = []
for row in rs:
rs_dict = dict(zip(columns, row))
truncated_row = {key: (str(value)[:MAX_LONG_DATA] if value is not None else None) for key, value in rs_dict.items()}
results.append(truncated_row)
if len(results) >= max_rows:
break
# Create the Markdown table header
md_table = "| " + " | ".join(columns) + " |\n"
md_table += "| " + " | ".join(["---"] * len(columns)) + " |\n"
# Add rows to the Markdown table
for row in results:
md_table += "| " + " | ".join(str(row[col]) for col in columns) + " |\n"
# Return the Markdown formatted results
return md_table
except pyodbc.Error as e:
logging.error(f"Error executing query: {e}")
raise
@mcp.tool(
name="podbc_query_database",
description="Execute a SQL query and return results in JSONL format."
)
def podbc_query_database(query: str, user:Optional[str]=None, password:Optional[str]=None,
dsn:Optional[str]=None) -> str:
"""
Execute a SQL query and return results in JSONL format.
Args:
query (str): The SQL query to execute.
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: Results in JSONL format.
"""
try:
with get_connection(True, user, password, dsn) as conn:
cursor = conn.cursor()
rs = cursor.execute(query)
columns = [column[0] for column in rs.description]
results = []
for row in rs:
rs_dict = dict(zip(columns, row))
truncated_row = {key: (str(value)[:MAX_LONG_DATA] if value is not None else None) for key, value in rs_dict.items()}
results.append(truncated_row)
# Convert the results to JSONL format
jsonl_results = "\n".join(json.dumps(row) for row in results)
# Return the JSONL formatted results
return jsonl_results
except pyodbc.Error as e:
logging.error(f"Error executing query: {e}")
raise
@mcp.tool(
name="podbc_spasql_query",
description="Execute a SPASQL query and return results."
)
def podbc_spasql_query(query: str, max_rows:Optional[int] = 20, timeout:Optional[int] = 300000,
user:Optional[str]=None, password:Optional[str]=None, dsn:Optional[str]=None) -> str:
"""
Execute a SPASQL query and return results in JSONL format.
Args:
query (str): The SPASQL query to execute.
max_rows (int): Maximum number of rows to return. Default is 100.
timeout (int): Query timeout. Default is 30000ms.
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: Results in requested format as string.
"""
try:
with get_connection(True, user, password, dsn) as conn:
cursor = conn.cursor()
cmd = f"select Demo.demo.execute_spasql_query(charset_recode(?, '_WIDE_', 'UTF-8'), ?, ?) as result"
rs = cursor.execute(cmd, (query, max_rows, timeout,)).fetchone()
return rs[0]
except pyodbc.Error as e:
logging.error(f"Error executing query: {e}")
raise
@mcp.tool(
name="podbc_virtuoso_support_ai",
description="Tool to use the Virtuoso AI support function"
)
def podbc_virtuoso_support_ai(prompt: str, api_key:Optional[str]=None, user:Optional[str]=None,
password:Optional[str]=None, dsn:Optional[str]=None) -> str:
"""
Tool to use the Virtuoso AI support function
Args:
prompt (str): AI prompt text (required).
api_key (str): API key for AI service (optional).
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: Results data in JSON.
"""
try:
_api_key = api_key if api_key is not None else API_KEY
with get_connection(True, user, password, dsn) as conn:
cursor = conn.cursor()
cmd = f"select DEMO.DBA.OAI_VIRTUOSO_SUPPORT_AI(?, ?) as result"
rs = cursor.execute(cmd, (prompt, _api_key,)).fetchone()
return rs[0]
except pyodbc.Error as e:
logging.error(f"Error executing request")
raise pyodbc.Error("Error executing request")
@mcp.tool(
name="podbc_sparql_func",
description="Tool to use the SPARQL AI support function"
)
def podbc_sparql_func(prompt: str, api_key:Optional[str]=None, user:Optional[str]=None,
password:Optional[str]=None, dsn:Optional[str]=None) -> str:
"""
Call SPARQL AI func.
Args:
prompt (str): The prompt.
api_key (str): optional.
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: Results data in JSON.
"""
try:
_api_key = api_key if api_key is not None else API_KEY
with get_connection(True, user, password, dsn) as conn:
cursor = conn.cursor()
cmd = f"select DEMO.DBA.OAI_SPARQL_FUNC(?, ?) as result"
rs = cursor.execute(cmd, (prompt, _api_key,)).fetchone()
return rs[0]
except pyodbc.Error as e:
logging.error(f"Error executing request")
raise pyodbc.Error("Error executing request")
def _exec_sparql(query: str, format:Optional[str]="json", timeout:Optional[int]= 300000,
user:Optional[str]=None, password:Optional[str]=None, dsn:Optional[str]=None) -> str:
timeout = 30000
format = "json"
try:
with get_connection(True, user, password, dsn) as conn:
cursor = conn.cursor()
cmd = f"select Demo.demo.execute_spasql_query(charset_recode(?, '_WIDE_', 'UTF-8'), ?, ?) as result"
rs = cursor.execute(cmd, (query, format, timeout,)).fetchone()
return rs[0]
except pyodbc.Error as e:
logging.error(f"Error executing query: {e}")
raise
@mcp.tool(
name="podbc_sparql_get_entity_types",
description="This query retrieves all entity types in the RDF graph, along with their labels and comments if available. "
"It filters out blank nodes and ensures that only IRI types are returned. "
"The LIMIT clause is set to 100 to restrict the number of entity types returned. "
)
def podbc_sparql_get_entity_types(user:Optional[str]=None, password:Optional[str]=None, dsn:Optional[str]=None) -> str:
"""
Execute a SPARQL query and return results.
Args:
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: Results in requested format as string.
"""
query = """
SELECT DISTINCT * FROM (
SPARQL
PREFIX owl: <http://www.w3.org/2002/07/owl#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
SELECT ?o
WHERE {
GRAPH ?g {
?s a ?o .
OPTIONAL {
?s rdfs:label ?label .
FILTER (LANG(?label) = "en" || LANG(?label) = "")
}
OPTIONAL {
?s rdfs:comment ?comment .
FILTER (LANG(?comment) = "en" || LANG(?comment) = "")
}
FILTER (isIRI(?o) && !isBlank(?o))
}
}
LIMIT 100
) AS x
"""
return podbc_query_database(query, user=user, password=password, dsn=dsn)
@mcp.tool(
name="podbc_sparql_get_entity_types_detailed",
description="This query retrieves all entity types in the RDF graph, along with their labels and comments if available. "
"It filters out blank nodes and ensures that only IRI types are returned. "
"The LIMIT clause is set to 100 to restrict the number of entity types returned."
)
def podbc_sparql_get_entity_types_detailed(user:Optional[str]=None, password:Optional[str]=None, dsn:Optional[str]=None) -> str:
"""
Execute a SPARQL query and return results.
Args:
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: Results in requested format as string.
"""
query = """
SELECT * FROM (
SPARQL
PREFIX owl: <http://www.w3.org/2002/07/owl#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
SELECT ?o, (SAMPLE(?label) AS ?label), (SAMPLE(?comment) AS ?comment)
WHERE {
GRAPH ?g {
?s a ?o .
OPTIONAL {?o rdfs:label ?label . FILTER (LANG(?label) = "en" || LANG(?label) = "")}
OPTIONAL {?o rdfs:comment ?comment . FILTER (LANG(?comment) = "en" || LANG(?comment) = "")}
FILTER (isIRI(?o) && !isBlank(?o))
}
}
GROUP BY ?o
ORDER BY ?o
LIMIT 20
) AS results
"""
return podbc_query_database(query, user=user, password=password, dsn=dsn)
@mcp.tool(
name="podbc_sparql_get_entity_types_samples",
description="This query retrieves samples of entities for each type in the RDF graph, along with their labels and counts. "
"It groups by entity type and orders the results by sample count in descending order. "
"Note: The LIMIT clause is set to 20 to restrict the number of entity types returned."
)
def podbc_sparql_get_entity_types_samples(user:Optional[str]=None, password:Optional[str]=None, dsn:Optional[str]=None) -> str:
"""
Execute a SPARQL query and return results.
Args:
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: Results in requested format as string.
"""
query = """
SELECT * FROM (
SPARQL
PREFIX owl: <http://www.w3.org/2002/07/owl#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
SELECT (SAMPLE(?s) AS ?sample), ?slabel, (COUNT(*) AS ?sampleCount), (?o AS ?entityType), ?olabel
WHERE {
GRAPH ?g {
?s a ?o .
OPTIONAL {?s rdfs:label ?slabel . FILTER (LANG(?slabel) = \"en\" || LANG(?slabel) = \"\")}
FILTER (isIRI(?s) && !isBlank(?s))
OPTIONAL {?o rdfs:label ?olabel . FILTER (LANG(?olabel) = \"en\" || LANG(?olabel) = \"\")}
FILTER (isIRI(?o) && !isBlank(?o))
}
}
GROUP BY ?slabel ?o ?olabel
ORDER BY DESC(?sampleCount) ?o ?slabel ?olabel
LIMIT 20
) AS results
"""
return podbc_query_database(query, user=user, password=password, dsn=dsn)
@mcp.tool(
name="podbc_sparql_get_ontologies",
description="This query retrieves all ontologies in the RDF graph, along with their labels and comments if available."
)
def podbc_sparql_get_ontologies(user:Optional[str]=None, password:Optional[str]=None, dsn:Optional[str]=None) -> str:
"""
Execute a SPARQL query and return results.
Args:
user (Optional[str]=None): Optional username.
password (Optional[str]=None): Optional password.
dsn (Optional[str]=None): Optional dsn name.
Returns:
str: Results in requested format as string.
"""
query = """
SELECT * FROM (
SPARQL
PREFIX owl: <http://www.w3.org/2002/07/owl#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
SELECT ?s, ?label, ?comment
WHERE {
GRAPH ?g {
?s a owl:Ontology .
OPTIONAL {
?s rdfs:label ?label .
FILTER (LANG(?label) = "en" || LANG(?label) = "")
}
OPTIONAL {
?s rdfs:comment ?comment .
FILTER (LANG(?comment) = "en" || LANG(?comment) = "")
}
FILTER (isIRI(?o) && !isBlank(?o))
}
}
LIMIT 100
) AS x
"""
return podbc_query_database(query, user=user, password=password, dsn=dsn)
if __name__ == "__main__":
mcp.run()
```