#
tokens: 7041/50000 18/18 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .env.example
├── .github
│   └── workflows
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── changelogithub.config.json
├── examples
│   ├── data
│   │   ├── README.txt
│   │   ├── rice_yields.csv
│   │   ├── temperature_data.csv
│   │   └── wheat_yields.csv
│   ├── llamaindex-with-opendal-mcp.py
│   └── README.md
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│   └── mcp_server_opendal
│       ├── __init__.py
│       ├── resource.py
│       └── server.py
├── tests
│   └── test_integration_fs.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info

# Virtual environments
.venv
.ruff_cache
.env

```

--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------

```
MCP_HOST=localhost
MCP_PORT=8000
OPENAI_API_KEY=your-openai-api-key
OPENAI_MODEL=your-openai-model
OPENAI_ENDPOINT=your-openai-endpoint
OPENDAL_FS_TYPE=fs
OPENDAL_FS_ROOT=./examples/
```

--------------------------------------------------------------------------------
/examples/data/README.txt:
--------------------------------------------------------------------------------

```
This directory contains research data on the impact of climate change on crop yields from 2010 to 2022 in different regions.
============================

This directory contains research data on the impact of climate change on crop yields from 2010 to 2022 in different regions.

File descriptions:
- wheat_yields.csv: Wheat yield data
- rice_yields.csv: Rice yield data
- temperature_data.csv: Temperature data
```

--------------------------------------------------------------------------------
/examples/README.md:
--------------------------------------------------------------------------------

```markdown
# LlamaIndex Agent Example with OpenDAL MCP

## Start the MCP Server

To run this example, you need to have a MCP server running.

Make sure you are in the root directory of the project, set the environment variables first:

- `OPENDAL_FS_TYPE=fs`
- `OPENDAL_FS_ROOT=./examples/`

Then, run the following command:

```bash
uv sync  # To install the project, this should be done only once
uv run mcp-server-opendal --transport sse
```

## Run the Example

Set the environment variables below.

- `MCP_HOST`: The host of the MCP server
- `MCP_PORT`: The port of the MCP server
- `OPENAI_API_KEY`: The API key of the OpenAI API
- `OPENAI_MODEL`: The model of the OpenAI API
- `OPENAI_ENDPOINT`: The endpoint of the OpenAI API

Then, run the example with the following command:

```bash
uv run examples/llamaindex-with-opendal-mcp.py
```

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Model Context Protocol Server for Apache OpenDAL™
A Model Context Protocol (MCP) server implementation that provides access to various storage services via [Apache OpenDAL™](https://opendal.apache.org/).

[![PyPI - Version](https://img.shields.io/pypi/v/mcp-server-opendal)](https://pypi.org/project/mcp-server-opendal/)
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/mcp-server-opendal)](https://pypi.org/project/mcp-server-opendal/)
[![Lint and Test](https://github.com/Xuanwo/mcp-server-opendal/actions/workflows/test.yml/badge.svg)](https://github.com/Xuanwo/mcp-server-opendal/actions/workflows/test.yml)

## Features

- Seamless access to multiple storage services including S3, Azure Blob Storage, Google Cloud Storage, and more
- List files and directories from storage services
- Read file contents with automatic text/binary detection
- Environment variable based configuration

## Installation

```shell
pip install mcp-server-opendal
```

## Usage with Claude Desktop

Add the following to `claude_desktop_config.json`:

```json
{
  "mcpServers": {
    "opendal": {
      "command": "uvx",
      "args": [
        "mcp-server-opendal"
      ],
      "env": {
        "YOUR_ENV_VAR": "YOUR_ENV_VALUE"
      }
    }
  }
}
```

It requires `uv` to be installed on your machine. Check the [official documentation](https://docs.astral.sh/uv/getting-started/installation/) for installation guides.

## Environment variables

Configure storage services by setting environment variables. Each service requires a prefix and specific configuration options.

For example, to configure an S3 service with alias "mys3":

```
OPENDAL_MYS3_TYPE=s3
OPENDAL_MYS3_BUCKET=mybucket
OPENDAL_MYS3_REGION=us-east-1
OPENDAL_MYS3_ENDPOINT=http://localhost:9000
OPENDAL_MYS3_ACCESS_KEY_ID=myaccesskey
OPENDAL_MYS3_SECRET_ACCESS_KEY=mysecretkey
```

Then you can use tool like `read` and `list` with `mys3://path/to/file`.

`mcp-server-opendal` will also load from `.env`.

## Development

```shell
npx @modelcontextprotocol/inspector \
  uv run mcp-server-opendal
```

```

--------------------------------------------------------------------------------
/changelogithub.config.json:
--------------------------------------------------------------------------------

```json
{
  "types": {
    "break": { "title": "💥 Breaking Changes" },
    "feat": { "title": "🚀 Features" },
    "fix": { "title": "🐞 Bug Fixes" },
    "doc": { "title": "📝 Documentation" },
    "chore": { "title": "💻 Chores" }
  }
}

```

--------------------------------------------------------------------------------
/src/mcp_server_opendal/__init__.py:
--------------------------------------------------------------------------------

```python
import logging

from . import server


def main():
    """Main entry point for the package."""
    logging.basicConfig(level=logging.DEBUG)
    server.main()


# Optionally expose other important items at package level
__all__ = ["main", "server"]

```

--------------------------------------------------------------------------------
/examples/data/temperature_data.csv:
--------------------------------------------------------------------------------

```
Year,North Average Temperature(°C),South Average Temperature(°C),National Average Temperature(°C)
2010,22.0,24.0,23.0
2011,22.2,24.25,23.22
2012,22.4,24.5,23.44
2013,22.6,24.75,23.66
2014,22.8,25.0,23.88
2015,23.0,25.25,24.1
2016,23.2,25.5,24.32
2017,23.4,25.75,24.54
2018,23.6,26.0,24.76
2019,23.8,26.25,24.98
2020,24.0,26.5,25.2
2021,24.2,26.75,25.42
2022,24.4,27.0,25.64
```

--------------------------------------------------------------------------------
/examples/data/rice_yields.csv:
--------------------------------------------------------------------------------

```
Year,Region,Average Temperature(°C),Yield(ton/hectare)
2010,South,24.0,5.772211109586192
2011,South,24.25,5.768775076745289
2012,South,24.5,5.633577850775186
2013,South,24.75,5.567733432553301
2014,South,25.0,5.546675121908924
2015,South,25.25,5.338465977162644
2016,South,25.5,5.35486878347953
2017,South,25.75,5.349000767493246
2018,South,26.0,5.114887795954057
2019,South,26.25,5.003123064631635
2020,South,26.5,4.910301072246474
2021,South,26.75,4.830848429519767
2022,South,27.0,4.839301775701944
```

--------------------------------------------------------------------------------
/examples/data/wheat_yields.csv:
--------------------------------------------------------------------------------

```
Year,Region,Average Temperature(°C),Yield(ton/hectare)
2010,North,22.0,4.170916869556414
2011,North,22.2,4.127563812254134
2012,North,22.4,4.113646614676481
2013,North,22.6,4.036069023417953
2014,North,22.8,3.9702148178835747
2015,North,23.0,3.9581433256795266
2016,North,23.2,3.842753248007842
2017,North,23.4,3.846889161483347
2018,North,23.6,3.714642481399471
2019,North,23.8,3.7133893798272284
2020,North,24.0,3.6473729560326555
2021,North,24.2,3.564664051294111
2022,North,24.4,3.5251559539742284
```

--------------------------------------------------------------------------------
/.github/workflows/test.yml:
--------------------------------------------------------------------------------

```yaml
name: Lint and Test

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ${{ matrix.os }}
    strategy:
      fail-fast: false
      matrix:
        os: [ubuntu-latest]
        python-version: ["3.12"]

    steps:
      - uses: actions/checkout@v4

      - name: Setup Python ${{ matrix.python-version }}
        uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python-version }}

      - name: Install uv
        uses: astral-sh/setup-uv@v5
        with:
          version: latest

      - name: Install dependencies
        run: |
          uv sync --dev

      - name: Code check
        run: |
          uv run ruff check .
          uv run ruff format --check .

      - name: Run tests
        run: |
          uv run pytest -xvs tests/ 
```

--------------------------------------------------------------------------------
/.github/workflows/release.yml:
--------------------------------------------------------------------------------

```yaml
name: Release

on:
  push:
    tags:
      - "*"

jobs:
  release-note:
    name: Generate release notes
    runs-on: ubuntu-latest
    permissions:
      contents: write

    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0
      - uses: actions/setup-node@v4
        with:
          node-version: lts/*

      - run: npx changelogithub
        continue-on-error: true
        env:
          GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}

  build-package:
    name: Build & inspect our package.
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0
      - uses: hynek/build-and-inspect-python-package@v2

  release-pypi:
    name: Publish distributions to PyPI
    permissions:
      id-token: write
    environment:
      name: pypi
      url: https://pypi.org/project/mcp-server-opendal/${{ github.ref_name }}
    runs-on: ubuntu-latest
    needs: build-package

    steps:
      - name: Download packages built by build-and-inspect-python-package
        uses: actions/download-artifact@v4
        with:
          name: Packages
          path: dist

      - name: Upload package to PyPI
        uses: pypa/gh-action-pypi-publish@release/v1

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp-server-opendal"
description = "A Model Context Protocol server providing tools to access storage services for usage by LLMs"
readme = "README.md"
requires-python = ">=3.12"
license = { text = "Apache-2.0" }
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: Apache Software License",
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.12",
]
dependencies = [
    "mcp>=1.0.0",
    "opendal>=0.45.16,<0.46.0",
    "python-dotenv>=1.0.1",
]
dynamic = ["version"]

[build-system]
requires = ["hatchling", "hatch-vcs>=0.3.0"]
build-backend = "hatchling.build"

[dependency-groups]
dev = ["pytest>=8.3.5", "pytest-asyncio>=0.25.3", "ruff>=0.7.3"]
examples = [
    "llama-index>=0.12.22",
    "llama-index-llms-openai-like>=0.3.4",
    "llama-index-tools-mcp>=0.1.0",
]

[project.scripts]
mcp-server-opendal = "mcp_server_opendal:main"

[tool.hatch.version]
source = "vcs"
fallback-version = "0.0.0"

[tool.hatch.build.targets.sdist]
only-include = ["src/", "examples/", "tests/"]

[tool.ruff]
line-length = 88
src = ["src"]
exclude = ["tests/fixtures"]
target-version = "py312"

[tool.ruff.lint]
extend-select = [
    "I",    # isort
    "B",    # flake8-bugbear
    "C4",   # flake8-comprehensions
    "PGH",  # pygrep-hooks
    "RUF",  # ruff
    "W",    # pycodestyle
    "YTT",  # flake8-2020
]
extend-ignore = ["B018", "B019", "B905"]

[tool.ruff.lint.mccabe]
max-complexity = 10

```

--------------------------------------------------------------------------------
/examples/llamaindex-with-opendal-mcp.py:
--------------------------------------------------------------------------------

```python
import asyncio
import os

from dotenv import load_dotenv
from llama_index.core.agent import ReActAgent, ReActChatFormatter
from llama_index.core.agent.react.prompts import REACT_CHAT_SYSTEM_HEADER
from llama_index.core.llms import ChatMessage
from llama_index.llms.openai_like import OpenAILike
from llama_index.tools.mcp import BasicMCPClient, McpToolSpec

load_dotenv()


# MCP Server Connection Parameters
MCP_HOST = os.getenv("MCP_HOST")
MCP_PORT = os.getenv("MCP_PORT")

SYSTEM_PROMPT = """\
You are an agricultural research assistant. I am researching the impact of climate change on crop yields.
I need you to help me analyze some research data stored in the file system.

Please first list the files in the `fs://data/` directory,
Then you must read the file `README.txt` to understand the data,
then read the necessary CSV files, and summarize the main findings.
"""


async def get_agent(tools: McpToolSpec):
    tools = await tools.to_tool_list_async()
    agent = ReActAgent.from_tools(
        llm=OpenAILike(
            model=os.getenv("OPENAI_MODEL"),
            api_base=os.getenv("OPENAI_ENDPOINT"),
            api_key=os.getenv("OPENAI_API_KEY"),
            is_chat_model=True,
        ),
        tools=list(tools),
        react_chat_formatter=ReActChatFormatter(
            system_header=SYSTEM_PROMPT + "\n" + REACT_CHAT_SYSTEM_HEADER,
        ),
        max_iterations=20,
        verbose=True,
    )
    return agent


async def handle_user_message(message_content: str, agent: ReActAgent):
    user_message = ChatMessage.from_str(role="user", content=message_content)
    response = await agent.achat(message=user_message.content)
    print(response.response)


async def main():
    mcp_tool = McpToolSpec(client=BasicMCPClient(f"http://{MCP_HOST}:{MCP_PORT}/sse"))

    agent = await get_agent(mcp_tool)
    try:
        await handle_user_message("What is the main finding of the data?", agent)
    except Exception as e:
        print(f"Unexpected error: {type(e)}, {e}")


if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/tests/test_integration_fs.py:
--------------------------------------------------------------------------------

```python
import base64
import json
import shutil
import tempfile
from pathlib import Path

import pytest

from mcp_server_opendal.server import (
    OPENDAL_OPTIONS,
    get_info,
    list,
    mcp,
    read,
    register_resources,
)


@pytest.fixture
def test_files():
    """Create a temporary directory with test files"""
    # Create a temporary directory
    temp_dir = tempfile.mkdtemp()

    try:
        # Create some test files and directories
        root_path = Path(temp_dir)

        # Create a text file
        text_file = root_path / "test_text.txt"
        text_file.write_text(
            "This is a test text file\nSecond line content", encoding="utf-8"
        )

        # Create a binary file
        bin_file = root_path / "binary_file.bin"
        bin_file.write_bytes(b"\x00\x01\x02\x03\xff\xfe\xab\xcd")

        # Create a JSON file
        json_file = root_path / "config.json"
        json_data = {"name": "test", "value": 123, "enabled": True}
        json_file.write_text(
            json.dumps(json_data, ensure_ascii=False), encoding="utf-8"
        )

        # Create a subdirectory and a nested file
        subdir = root_path / "subdir"
        subdir.mkdir()
        nested_file = subdir / "nested_file.log"
        nested_file.write_text("Subdirectory file content", encoding="utf-8")

        yield temp_dir
    finally:
        # Clean up the temporary directory
        shutil.rmtree(temp_dir)


@pytest.fixture
def setup_env(test_files, monkeypatch):
    """Set environment variables and OpenDAL configuration"""
    OPENDAL_OPTIONS.clear()
    OPENDAL_OPTIONS.update(
        {
            "fs_type": "fs",
            "fs_root": test_files,
        }
    )

    monkeypatch.setenv("OPENDAL_FS_TYPE", "fs")
    monkeypatch.setenv("OPENDAL_FS_ROOT", test_files)

    # Re-register resources
    register_resources()

    yield


@pytest.mark.asyncio
async def test_list_resources(setup_env):
    """Test listing available resources"""
    resources = await mcp.list_resources()

    assert len(resources) > 0

    resource_schemes = [str(r.uri.scheme) for r in resources if hasattr(r, "uri")]
    assert "fs" in resource_schemes


@pytest.mark.asyncio
async def test_list_directory_contents(setup_env, test_files):
    """Test listing directory contents"""
    result = await list("fs://")

    assert "test_text.txt" in result
    assert "binary_file.bin" in result
    assert "config.json" in result
    assert "subdir" in result

    subdir_result = await list("fs://subdir/")
    assert "nested_file.log" in subdir_result


@pytest.mark.asyncio
async def test_read_text_file(setup_env):
    """Test reading a text file"""
    result = await read("fs://test_text.txt")

    assert "This is a test text file\nSecond line content" in result["content"]
    assert result["mime_type"] in ["text/plain", None]


@pytest.mark.asyncio
async def test_read_binary_file(setup_env):
    """Test reading a binary file"""
    result = await read("fs://binary_file.bin")

    expected_binary = b"\x00\x01\x02\x03\xff\xfe\xab\xcd"
    decoded = base64.b64decode(result["content"])
    assert decoded == expected_binary
    assert result.get("is_binary", False) is True


@pytest.mark.asyncio
async def test_read_json_file(setup_env):
    """Test reading a JSON file"""
    result = await read("fs://config.json")

    assert "test" in result["content"]
    assert result["mime_type"] in ["application/json", "text/plain", None]


@pytest.mark.asyncio
async def test_read_json_file_with_read(setup_env):
    """Test reading a JSON file with read"""
    result = await read("fs://config.json")

    assert "test" in result["content"]
    assert result["mime_type"] in ["application/json", "text/plain", None]


@pytest.mark.asyncio
async def test_get_file_info(setup_env):
    """Test getting file information"""
    result = await get_info("fs://test_text.txt")

    assert "test_text.txt" in result
    content = "This is a test text file\nSecond line content"
    expected_size = len(content.encode("utf-8"))
    assert f"Size: {expected_size} bytes" in result


@pytest.mark.asyncio
async def test_read_nested_file(setup_env):
    """Test reading a nested file"""
    result = await read("fs://subdir/nested_file.log")

    assert "Subdirectory file content" in result["content"]

```

--------------------------------------------------------------------------------
/src/mcp_server_opendal/resource.py:
--------------------------------------------------------------------------------

```python
import logging
import os
from typing import Any, List, Tuple, Union

import opendal
from dotenv import load_dotenv
from mcp.server.fastmcp.resources import Resource
from opendal import Entry, Metadata
from opendal.layers import RetryLayer
from pydantic import Field

logger = logging.getLogger("mcp_server_opendal")

# all opendal related environment variables
load_dotenv()
OPENDAL_OPTIONS = {
    k.replace("OPENDAL_", "").lower(): v.lower()
    for k, v in os.environ.items()
    if k.startswith("OPENDAL_")
}


class OpendalResource(Resource):
    """
    OpenDAL Resource provider that handles interactions with different storage services.
    Acts both as a FastMCP Resource and as an interface to OpenDAL operations.

    This resource provider will read the environment variables for the given scheme and use them to configure the opendal operator.

    For example, if the scheme is "mys3", the environment variables should be:

    ```
    OPENDAL_MYS3_TYPE=s3
    OPENDAL_MYS3_BUCKET=mybucket
    OPENDAL_MYS3_REGION=us-east-1
    OPENDAL_MYS3_ENDPOINT=http://localhost:9000
    OPENDAL_MYS3_ACCESS_KEY_ID=myaccesskey
    OPENDAL_MYS3_SECRET_ACCESS_KEY=mysecretkey
    ```
    """

    scheme: str = Field("", description="Storage scheme (e.g., s3, fs)")
    op: Any = None

    def __init__(self, scheme: str):
        scheme = scheme.lower()
        # Configure OpenDAL operator
        opendal_type = OPENDAL_OPTIONS.get(f"{scheme}_type")
        opendal_options = {
            k.replace(f"{scheme}_", ""): v
            for k, v in OPENDAL_OPTIONS.items()
            if k.startswith(f"{scheme}_")
        }
        logger.debug(f"Initializing OpendalResource with options: {opendal_options}")

        # Initialize FastMCP Resource
        super().__init__(
            uri=f"{scheme}://",
            name=f"{scheme} storage",
            description=f"Storage service accessed via OpenDAL {scheme} protocol",
            mime_type="application/vnd.folder",  # for containers/directories
        )

        # Initialize OpenDAL operator
        self.scheme = scheme
        self.op = opendal.AsyncOperator(opendal_type, **opendal_options).layer(
            RetryLayer()
        )
        logger.debug(f"Initialized OpendalResource: {self.op}")

    async def read(self) -> str:
        """
        Read method for scheme-level resources, only returns descriptive information
        Actual file content is obtained through path-specific methods
        """
        info = f"OpenDAL {self.scheme} storage resource.\n\n"
        info += f"To access specific files, use: {self.scheme}://path/to/file\n"
        info += f"To list directory contents, use the 'list' tool with: {self.scheme}://path/to/dir\n"
        return info

    async def list(
        self, prefix: Union[str, os.PathLike], max_keys: int = 1000
    ) -> List[Entry]:
        """List entries with the given prefix"""
        logger.debug(f"Listing entries with prefix: {prefix}")

        if max_keys <= 0:
            return []

        entries = []

        it = await self.op.list(prefix)

        async for entry in it:
            logger.debug(f"Listing entry: {entry}")
            entries.append(entry)
            if len(entries) >= max_keys:
                break

        return entries

    async def read_path(self, path: Union[str, os.PathLike]) -> bytes:
        """Read content from a specific path"""
        logger.debug(f"Reading path: {path}")
        return await self.op.read(path)

    async def stat(self, path: Union[str, os.PathLike]) -> Metadata:
        """Get metadata for a specific path"""
        logger.debug(f"Statting path: {path}")
        return await self.op.stat(path)

    def is_text_file(self, path: Union[str, os.PathLike]) -> bool:
        """Determine if a file is text-based by its extension"""
        text_extensions = {
            ".txt",
            ".log",
            ".json",
            ".xml",
            ".yml",
            ".yaml",
            ".md",
            ".csv",
            ".ini",
            ".conf",
            ".py",
            ".js",
            ".html",
            ".css",
            ".sh",
            ".bash",
            ".cfg",
            ".properties",
        }
        return any(path.lower().endswith(ext) for ext in text_extensions)


def parse_uri(uri: str) -> Tuple[OpendalResource, str]:
    """Parse a URI into a resource and path"""
    from urllib.parse import unquote, urlparse

    logger.debug(f"Parsing URI: {uri}")
    parsed = urlparse(uri)

    scheme = parsed.scheme
    path = parsed.netloc + parsed.path
    path = unquote(path)  # Decode URL-encoded characters
    return (OpendalResource(scheme), path)

```

--------------------------------------------------------------------------------
/src/mcp_server_opendal/server.py:
--------------------------------------------------------------------------------

```python
import argparse
import base64
import logging
import os
from typing import Any, Dict

from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP

from mcp_server_opendal.resource import OPENDAL_OPTIONS, OpendalResource, parse_uri

load_dotenv()
default_log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
log_level = getattr(logging, default_log_level, logging.INFO)

# Initialize the FastMCP server - Use the default log level
mcp = FastMCP("opendal_service", log_level=default_log_level)

# Configure logging - still use numeric constants to configure standard logging
logging.basicConfig(level=log_level)
logger = logging.getLogger("mcp_server_opendal")


def register_resources():
    """Register all OpenDAL resources"""
    # Get all available schemes
    schemes = {k.split("_")[0] for k in OPENDAL_OPTIONS.keys()}

    # Clean existing resources and register new resources
    for scheme in schemes:
        try:
            resource = OpendalResource(scheme)
            mcp.add_resource(resource)
            logger.info(f"Registered OpenDAL resource for scheme: {scheme}")
        except Exception as e:
            logger.error(
                f"Failed to register OpenDAL resource for scheme {scheme}: {e}"
            )


# Register resources
register_resources()


# Create a resource template, used to dynamically generate resources
@mcp.resource("{scheme}://{path}")
async def opendal_resource(scheme: str, path: str) -> Dict[str, Any]:
    """
    Access files in OpenDAL service

    Args:
        scheme: storage service scheme
        path: file path

    Returns:
        Dictionary containing file content and metadata
    """
    logger.debug(f"Reading template resource content: {scheme}://{path}")
    try:
        resource = OpendalResource(scheme)
        data = await resource.read_path(path)
        metadata = await resource.stat(path)

        if resource.is_text_file(path):
            return {
                "content": data.decode("utf-8"),
                "mime_type": metadata.content_type or "text/plain",
                "size": metadata.content_length,
                "is_binary": False,
            }
        else:
            return {
                "content": base64.b64encode(data).decode("ascii"),
                "mime_type": metadata.content_type or "application/octet-stream",
                "size": metadata.content_length,
                "is_binary": True,
            }
    except Exception as e:
        logger.error(f"Failed to read resource: {e!s}")
        return {"error": str(e)}


# Modify the list tool to ensure the path ends with a slash
@mcp.tool()
async def list(uri: str) -> str:
    """
    List files in OpenDAL service

    Args:
        uri: resource URI, e.g. mys3://path/to/dir

    Returns:
        String containing directory content
    """
    logger.debug(f"Listing directory content: {uri}")
    try:
        resource, path = parse_uri(uri)

        # Ensure directory path ends with a slash
        if path and not path.endswith("/"):
            path = path + "/"

        entries = await resource.list(path)

        return str(entries)
    except Exception as e:
        logger.error(f"Failed to list directory content: {e!s}")
        return f"Error: {e!s}"


# Read content of file
@mcp.tool()
async def read(uri: str) -> Dict[str, Any]:
    """
    Read file content from OpenDAL service

    Args:
        uri: resource URI, e.g. mys3://path/to/file

    Returns:
        File content or error information
    """
    logger.debug(f"Reading file content: {uri}")
    try:
        resource, path = parse_uri(uri)
        # Directly call the resource function to get content
        return await opendal_resource(resource.scheme, path)
    except Exception as e:
        logger.error(f"Failed to read file content: {e!s}")
        return {"error": str(e)}


# Get file metadata
@mcp.tool()
async def get_info(uri: str) -> str:
    """
    Get metadata of file in OpenDAL service

    Args:
        uri: resource URI, e.g. mys3://path/to/file

    Returns:
        File metadata information
    """
    logger.debug(f"Getting file info: {uri}")
    try:
        resource, path = parse_uri(uri)
        metadata = await resource.stat(path)

        result = f"File: {path}\n"
        result += f"Size: {metadata.content_length} bytes\n"
        result += f"Type: {metadata.content_type}\n"

        return result
    except Exception as e:
        logger.error(f"Failed to get file info: {e!s}")
        return f"Error: {e!s}"


def main():
    parser = argparse.ArgumentParser(description="OpenDAL MCP server")
    parser.add_argument(
        "--transport",
        type=str,
        choices=["stdio", "sse"],
        default="stdio",
        help="Transport method (stdio or sse)",
    )

    args = parser.parse_args()

    if args.transport == "sse":
        mcp.run("sse")
    else:
        mcp.run("stdio")


if __name__ == "__main__":
    main()

```