#
tokens: 4011/50000 7/7 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── LICENSE
├── package.json
├── pyproject.toml
├── README.md
├── src
│   └── mcp_server_oracle
│       ├── __init__.py
│       └── oracle_tools.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# UV
#   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#uv.lock

# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
#   in version control.
#   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Ruff stuff:
.ruff_cache/

# PyPI configuration file
.pypirc

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# mcp-server-oracle
Model Context Protocol server to access oracle

[![Python 3.12](https://img.shields.io/badge/python-3.12-blue.svg)](https://www.python.org/downloads/release/python-3120/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

## Demos


https://github.com/user-attachments/assets/dc4e377b-4efb-43e6-85fa-93ed852fe21f



## Quickstart

To try this in Claude Desktop app, add this to your claude config files:

```json
{
  "mcpServers": {
    "mcp-server-oracle": {
      "command": "uvx",
      "args": [
        "mcp-server-oracle"
      ],
      "env": {
        "ORACLE_CONNECTION_STRING": "username/password@hostname:port/service_name"
      }
    }
  }
}
```

### Prerequisites

- UV (pacakge manager)
- Python 3.12+
- Claude Desktop

### Installation

#### Claude Desktop Configuration

Add the server configuration to your Claude Desktop config file:

**MacOS**: `~/Library/Application\ Support/Claude/claude_desktop_config.json`  
**Windows**: `%APPDATA%/Claude/claude_desktop_config.json`


## Contributing

1. Fork the repository from [mcp-server-oracle](https://github.com/hdcola/mcp-server-oracle)
2. Create your feature branch
3. Commit your changes
4. Push to the branch
5. Open a Pull Request

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

```

--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------

```json
{
  "name": "mcp-server-oracle",
  "version": "1.0.1",
  "description": "",
  "main": "index.js",
  "scripts": {
    "inspector": "npx @modelcontextprotocol/inspector uv run main.py"
  },
  "keywords": [],
  "author": "hdcola",
  "license": "MIT",
  "packageManager": "[email protected]"
}

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp-server-oracle"
version = "1.0.2"
description = "Model Context Protocol server to access oracle database"
authors = [
    { name = "hdcola", email = "[email protected]" }
]
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
    "mcp[cli]>=1.5.0",
    "oracledb>=3.0.0",
    "python-dotenv>=1.0.1",
]

[project.scripts]
mcp-server-oracle = "mcp_server_oracle:main"
dev = "mcp_server_oracle:dev"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

```

--------------------------------------------------------------------------------
/src/mcp_server_oracle/__init__.py:
--------------------------------------------------------------------------------

```python
import os
import sys
import signal
from typing import Any
from mcp.server.fastmcp import FastMCP
from . import oracle_tools
from dotenv import load_dotenv


# Load the environment variables
load_dotenv()

# Initialize the FastMCP server
mcp = FastMCP("mcp-server-oracle")

oracle_tools.connection_string = os.getenv("ORACLE_CONNECTION_STRING")


@mcp.tool()
async def list_tables() -> str:
    """Get a list of all tables in the oracle database

    Args:
        None
    """
    return await oracle_tools.list_tables()


@mcp.tool()
async def describe_table(table_name: str) -> str:
    """Get a description of a table in the oracle database"

    Args:
        table_name (string): The name of the table to describe
    """
    return await oracle_tools.describe_table(table_name)


@mcp.tool()
async def read_query(query: str) -> str:
    """Execute SELECT queries to read data from the oracle database

    Args:
        query (string): The SELECT query to execute
    """
    return await oracle_tools.read_query(query)

@mcp.tool()
async def exec_dml_sql(execsql: str) -> str:
    """Execute insert/update/delete/truncate to the oracle database

    Args:
        query (string): The sql to execute
    """
    return await oracle_tools.exec_dml_sql(execsql)

@mcp.tool()
async def exec_ddl_sql(execsql: str) -> str:
    """Execute create/drop/alter to the oracle database

    Args:
        query (string): The sql to execute
    """
    return await oracle_tools.exec_ddl_sql(execsql)

@mcp.tool()
async def exec_pro_sql(execsql: str) -> str:
    """Execute PL/SQL code blocks including stored procedures, functions and anonymous blocks

    Args:
        execsql (string): The PL/SQL code block to execute
    """
    return await oracle_tools.exec_pro_sql(execsql)


def main() -> None:
    mcp.run(transport='stdio')


def dev() -> None:
    """
    Development function that handles Ctrl+C gracefully.
    This function calls main() but catches KeyboardInterrupt to allow 
    clean exit when user presses Ctrl+C.
    """
    print("mcp server starting", file=sys.stderr)

    # Define signal handler for cleaner exit
    def signal_handler(sig, frame):
        print("\nShutting down mcp server...", file=sys.stderr)
        sys.exit(0)

    # Register the signal handler for SIGINT (Ctrl+C)
    signal.signal(signal.SIGINT, signal_handler)

    try:
        # Run the server with proper exception handling
        main()
    except KeyboardInterrupt:
        print("\nShutting down mcp server...", file=sys.stderr)
        sys.exit(0)
    except Exception as e:
        print(f"\nError: {e}", file=sys.stderr)
        sys.exit(1)

```

--------------------------------------------------------------------------------
/src/mcp_server_oracle/oracle_tools.py:
--------------------------------------------------------------------------------

```python
import oracledb
import asyncio

connection_string = ""

# 初始化为 Thin 模式
oracledb.init_oracle_client(lib_dir=None)

async def list_tables() -> list:
    tables = []
    try:
        # Run database operations in a separate thread
        def db_operation():
            result_tables = []
            with oracledb.connect(connection_string) as conn:
                cursor = conn.cursor()
                cursor.execute(
                    "SELECT table_name FROM user_tables ORDER BY table_name")
                for row in cursor:
                    result_tables.append(row[0])
            return '\n'.join(result_tables)

        return await asyncio.to_thread(db_operation)
    except oracledb.DatabaseError as e:
        print('Error occurred:', e)
        return str(e)


async def describe_table(table_name: str) -> str:
    try:
        # Run database operations in a separate thread
        def db_operation(table):
            with oracledb.connect(connection_string) as conn:
                cursor = conn.cursor()

                # Create CSV headers
                result = [
                    "COLUMN_NAME,DATA_TYPE,NULLABLE,DATA_LENGTH,PRIMARY_KEY,FOREIGN_KEY"]

                # Get primary key columns
                pk_columns = []
                cursor.execute(
                    """
                    SELECT cols.column_name
                    FROM all_constraints cons, all_cons_columns cols
                    WHERE cons.constraint_type = 'P'
                    AND cons.constraint_name = cols.constraint_name
                    AND cons.owner = cols.owner
                    AND cols.table_name = :table_name
                    """,
                    table_name=table.upper()
                )
                for row in cursor:
                    pk_columns.append(row[0])

                # Get foreign key columns and references
                fk_info = {}
                cursor.execute(
                    """
                    SELECT a.column_name, c_pk.table_name as referenced_table, b.column_name as referenced_column
                    FROM all_cons_columns a
                    JOIN all_constraints c ON a.owner = c.owner AND a.constraint_name = c.constraint_name
                    JOIN all_constraints c_pk ON c.r_owner = c_pk.owner AND c.r_constraint_name = c_pk.constraint_name
                    JOIN all_cons_columns b ON c_pk.owner = b.owner AND c_pk.constraint_name = b.constraint_name
                    WHERE c.constraint_type = 'R'
                    AND a.table_name = :table_name
                    """,
                    table_name=table.upper()
                )
                for row in cursor:
                    fk_info[row[0]] = f"{row[1]}.{row[2]}"

                # Get main column information
                cursor.execute(
                    """
                    SELECT column_name, data_type, nullable, data_length 
                    FROM user_tab_columns 
                    WHERE table_name = :table_name 
                    ORDER BY column_id
                    """,
                    table_name=table.upper()
                )

                rows_found = False
                for row in cursor:
                    rows_found = True
                    column_name = row[0]
                    data_type = row[1]
                    nullable = row[2]
                    data_length = str(row[3])
                    is_pk = "YES" if column_name in pk_columns else "NO"
                    fk_ref = fk_info.get(column_name, "NO")

                    # Format as CSV row
                    result.append(
                        f"{column_name},{data_type},{nullable},{data_length},{is_pk},{fk_ref}")

                if not rows_found:
                    return f"Table {table} not found or has no columns."

                return '\n'.join(result)

        return await asyncio.to_thread(db_operation, table_name)
    except oracledb.DatabaseError as e:
        print('Error occurred:', e)
        return str(e)


async def read_query(query: str) -> str:
    try:
        # Check if the query is a SELECT statement
        if not query.strip().upper().startswith('SELECT'):
            return "Error: Only SELECT statements are supported."

        # Run database operations in a separate thread
        def db_operation(query):
            with oracledb.connect(connection_string) as conn:
                cursor = conn.cursor()
                cursor.execute(query)  # Execute query first

                # Get column names after executing the query
                columns = [col[0] for col in cursor.description]
                result = [','.join(columns)]  # Add column headers

                # Process each row
                for row in cursor:
                    # Convert each value in the tuple to string
                    string_values = [
                        str(val) if val is not None else "NULL" for val in row]
                    result.append(','.join(string_values))

                return '\n'.join(result)

        return await asyncio.to_thread(db_operation, query)
    except oracledb.DatabaseError as e:
        print('Error occurred:', e)
        return str(e)


async def exec_dml_sql(execsql: str) -> str:
    try:
        # 检查SQL语句是否包含DML关键字
        sql_upper = execsql.upper()
        if not any(keyword in sql_upper for keyword in ['INSERT', 'DELETE', 'TRUNCATE', 'UPDATE']):
            return "Error: Only INSERT, DELETE, TRUNCATE or UPDATE statements are supported."
        
        # Run database operations in a separate thread
        def db_operation(query):
            with oracledb.connect(connection_string) as conn:
                cursor = conn.cursor()
                # 执行DML语句
                cursor.execute(query)
                # 获取影响的行数
                rows_affected = cursor.rowcount
                # 提交事务
                conn.commit()
                # 返回执行结果
                return f"执行成功: 影响了 {rows_affected} 行数据"

        return await asyncio.to_thread(db_operation, execsql)
    except oracledb.DatabaseError as e:
        print('Error occurred:', e)
        return str(e)

async def exec_ddl_sql(execsql: str) -> str:
    try:
        # 检查SQL语句是否包含ddl关键字
        sql_upper = execsql.upper()
        if not any(keyword in sql_upper for keyword in ['CREATE', 'ALTER', 'DROP']):
            return "Error: Only CREATE, ALTER, DROP statements are supported."
        
        def db_operation(query):
            with oracledb.connect(connection_string) as conn:
                cursor = conn.cursor()
                cursor.execute(query)
                return "DDL语句执行成功"

        return await asyncio.to_thread(db_operation, execsql)
    except oracledb.DatabaseError as e:
        print('Error occurred:', e)
        return str(e)

async def exec_pro_sql(execsql: str) -> str:
    try:
        # Run database operations in a separate thread
        def db_operation(query):
            with oracledb.connect(connection_string) as conn:
                cursor = conn.cursor()
                # 执行PL/SQL代码块
                cursor.execute(query)
                # 如果有输出参数或返回值,尝试获取
                try:
                    result = cursor.fetchall()
                    if result:
                        # 将结果格式化为字符串
                        return '\n'.join(','.join(str(col) if col is not None else 'NULL' for col in row) for row in result)
                except oracledb.DatabaseError:
                    # 如果没有结果集,说明是存储过程或无返回值的PL/SQL块
                    pass
                # 提交事务
                conn.commit()
                return "PL/SQL代码块执行成功"

        return await asyncio.to_thread(db_operation, execsql)
    except oracledb.DatabaseError as e:
        print('Error occurred:', e)
        return str(e)
    except oracledb.DatabaseError as e:
        print('Error occurred:', e)
        return str(e)

if __name__ == "__main__":
    # Create and run the async event loop
    async def main():
        # print(await list_tables())
        print(await describe_table('CONCAT'))

    asyncio.run(main())

```