# Directory Structure
```
├── .env.example
├── .github
│ └── workflows
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── changelogithub.config.json
├── examples
│ ├── data
│ │ ├── README.txt
│ │ ├── rice_yields.csv
│ │ ├── temperature_data.csv
│ │ └── wheat_yields.csv
│ ├── llamaindex-with-opendal-mcp.py
│ └── README.md
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│ └── mcp_server_opendal
│ ├── __init__.py
│ ├── resource.py
│ └── server.py
├── tests
│ └── test_integration_fs.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.12
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
.ruff_cache
.env
```
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
MCP_HOST=localhost
MCP_PORT=8000
OPENAI_API_KEY=your-openai-api-key
OPENAI_MODEL=your-openai-model
OPENAI_ENDPOINT=your-openai-endpoint
OPENDAL_FS_TYPE=fs
OPENDAL_FS_ROOT=./examples/
```
--------------------------------------------------------------------------------
/examples/data/README.txt:
--------------------------------------------------------------------------------
```
This directory contains research data on the impact of climate change on crop yields from 2010 to 2022 in different regions.
============================
This directory contains research data on the impact of climate change on crop yields from 2010 to 2022 in different regions.
File descriptions:
- wheat_yields.csv: Wheat yield data
- rice_yields.csv: Rice yield data
- temperature_data.csv: Temperature data
```
--------------------------------------------------------------------------------
/examples/README.md:
--------------------------------------------------------------------------------
```markdown
# LlamaIndex Agent Example with OpenDAL MCP
## Start the MCP Server
To run this example, you need to have a MCP server running.
Make sure you are in the root directory of the project, set the environment variables first:
- `OPENDAL_FS_TYPE=fs`
- `OPENDAL_FS_ROOT=./examples/`
Then, run the following command:
```bash
uv sync # To install the project, this should be done only once
uv run mcp-server-opendal --transport sse
```
## Run the Example
Set the environment variables below.
- `MCP_HOST`: The host of the MCP server
- `MCP_PORT`: The port of the MCP server
- `OPENAI_API_KEY`: The API key of the OpenAI API
- `OPENAI_MODEL`: The model of the OpenAI API
- `OPENAI_ENDPOINT`: The endpoint of the OpenAI API
Then, run the example with the following command:
```bash
uv run examples/llamaindex-with-opendal-mcp.py
```
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Model Context Protocol Server for Apache OpenDAL™
A Model Context Protocol (MCP) server implementation that provides access to various storage services via [Apache OpenDAL™](https://opendal.apache.org/).
[](https://pypi.org/project/mcp-server-opendal/)
[](https://pypi.org/project/mcp-server-opendal/)
[](https://github.com/Xuanwo/mcp-server-opendal/actions/workflows/test.yml)
## Features
- Seamless access to multiple storage services including S3, Azure Blob Storage, Google Cloud Storage, and more
- List files and directories from storage services
- Read file contents with automatic text/binary detection
- Environment variable based configuration
## Installation
```shell
pip install mcp-server-opendal
```
## Usage with Claude Desktop
Add the following to `claude_desktop_config.json`:
```json
{
"mcpServers": {
"opendal": {
"command": "uvx",
"args": [
"mcp-server-opendal"
],
"env": {
"YOUR_ENV_VAR": "YOUR_ENV_VALUE"
}
}
}
}
```
It requires `uv` to be installed on your machine. Check the [official documentation](https://docs.astral.sh/uv/getting-started/installation/) for installation guides.
## Environment variables
Configure storage services by setting environment variables. Each service requires a prefix and specific configuration options.
For example, to configure an S3 service with alias "mys3":
```
OPENDAL_MYS3_TYPE=s3
OPENDAL_MYS3_BUCKET=mybucket
OPENDAL_MYS3_REGION=us-east-1
OPENDAL_MYS3_ENDPOINT=http://localhost:9000
OPENDAL_MYS3_ACCESS_KEY_ID=myaccesskey
OPENDAL_MYS3_SECRET_ACCESS_KEY=mysecretkey
```
Then you can use tool like `read` and `list` with `mys3://path/to/file`.
`mcp-server-opendal` will also load from `.env`.
## Development
```shell
npx @modelcontextprotocol/inspector \
uv run mcp-server-opendal
```
```
--------------------------------------------------------------------------------
/changelogithub.config.json:
--------------------------------------------------------------------------------
```json
{
"types": {
"break": { "title": "💥 Breaking Changes" },
"feat": { "title": "🚀 Features" },
"fix": { "title": "🐞 Bug Fixes" },
"doc": { "title": "📝 Documentation" },
"chore": { "title": "💻 Chores" }
}
}
```
--------------------------------------------------------------------------------
/src/mcp_server_opendal/__init__.py:
--------------------------------------------------------------------------------
```python
import logging
from . import server
def main():
"""Main entry point for the package."""
logging.basicConfig(level=logging.DEBUG)
server.main()
# Optionally expose other important items at package level
__all__ = ["main", "server"]
```
--------------------------------------------------------------------------------
/examples/data/temperature_data.csv:
--------------------------------------------------------------------------------
```
Year,North Average Temperature(°C),South Average Temperature(°C),National Average Temperature(°C)
2010,22.0,24.0,23.0
2011,22.2,24.25,23.22
2012,22.4,24.5,23.44
2013,22.6,24.75,23.66
2014,22.8,25.0,23.88
2015,23.0,25.25,24.1
2016,23.2,25.5,24.32
2017,23.4,25.75,24.54
2018,23.6,26.0,24.76
2019,23.8,26.25,24.98
2020,24.0,26.5,25.2
2021,24.2,26.75,25.42
2022,24.4,27.0,25.64
```
--------------------------------------------------------------------------------
/examples/data/rice_yields.csv:
--------------------------------------------------------------------------------
```
Year,Region,Average Temperature(°C),Yield(ton/hectare)
2010,South,24.0,5.772211109586192
2011,South,24.25,5.768775076745289
2012,South,24.5,5.633577850775186
2013,South,24.75,5.567733432553301
2014,South,25.0,5.546675121908924
2015,South,25.25,5.338465977162644
2016,South,25.5,5.35486878347953
2017,South,25.75,5.349000767493246
2018,South,26.0,5.114887795954057
2019,South,26.25,5.003123064631635
2020,South,26.5,4.910301072246474
2021,South,26.75,4.830848429519767
2022,South,27.0,4.839301775701944
```
--------------------------------------------------------------------------------
/examples/data/wheat_yields.csv:
--------------------------------------------------------------------------------
```
Year,Region,Average Temperature(°C),Yield(ton/hectare)
2010,North,22.0,4.170916869556414
2011,North,22.2,4.127563812254134
2012,North,22.4,4.113646614676481
2013,North,22.6,4.036069023417953
2014,North,22.8,3.9702148178835747
2015,North,23.0,3.9581433256795266
2016,North,23.2,3.842753248007842
2017,North,23.4,3.846889161483347
2018,North,23.6,3.714642481399471
2019,North,23.8,3.7133893798272284
2020,North,24.0,3.6473729560326555
2021,North,24.2,3.564664051294111
2022,North,24.4,3.5251559539742284
```
--------------------------------------------------------------------------------
/.github/workflows/test.yml:
--------------------------------------------------------------------------------
```yaml
name: Lint and Test
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest]
python-version: ["3.12"]
steps:
- uses: actions/checkout@v4
- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
version: latest
- name: Install dependencies
run: |
uv sync --dev
- name: Code check
run: |
uv run ruff check .
uv run ruff format --check .
- name: Run tests
run: |
uv run pytest -xvs tests/
```
--------------------------------------------------------------------------------
/.github/workflows/release.yml:
--------------------------------------------------------------------------------
```yaml
name: Release
on:
push:
tags:
- "*"
jobs:
release-note:
name: Generate release notes
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: actions/setup-node@v4
with:
node-version: lts/*
- run: npx changelogithub
continue-on-error: true
env:
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}
build-package:
name: Build & inspect our package.
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: hynek/build-and-inspect-python-package@v2
release-pypi:
name: Publish distributions to PyPI
permissions:
id-token: write
environment:
name: pypi
url: https://pypi.org/project/mcp-server-opendal/${{ github.ref_name }}
runs-on: ubuntu-latest
needs: build-package
steps:
- name: Download packages built by build-and-inspect-python-package
uses: actions/download-artifact@v4
with:
name: Packages
path: dist
- name: Upload package to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "mcp-server-opendal"
description = "A Model Context Protocol server providing tools to access storage services for usage by LLMs"
readme = "README.md"
requires-python = ">=3.12"
license = { text = "Apache-2.0" }
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.12",
]
dependencies = [
"mcp>=1.0.0",
"opendal>=0.45.16,<0.46.0",
"python-dotenv>=1.0.1",
]
dynamic = ["version"]
[build-system]
requires = ["hatchling", "hatch-vcs>=0.3.0"]
build-backend = "hatchling.build"
[dependency-groups]
dev = ["pytest>=8.3.5", "pytest-asyncio>=0.25.3", "ruff>=0.7.3"]
examples = [
"llama-index>=0.12.22",
"llama-index-llms-openai-like>=0.3.4",
"llama-index-tools-mcp>=0.1.0",
]
[project.scripts]
mcp-server-opendal = "mcp_server_opendal:main"
[tool.hatch.version]
source = "vcs"
fallback-version = "0.0.0"
[tool.hatch.build.targets.sdist]
only-include = ["src/", "examples/", "tests/"]
[tool.ruff]
line-length = 88
src = ["src"]
exclude = ["tests/fixtures"]
target-version = "py312"
[tool.ruff.lint]
extend-select = [
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"PGH", # pygrep-hooks
"RUF", # ruff
"W", # pycodestyle
"YTT", # flake8-2020
]
extend-ignore = ["B018", "B019", "B905"]
[tool.ruff.lint.mccabe]
max-complexity = 10
```
--------------------------------------------------------------------------------
/examples/llamaindex-with-opendal-mcp.py:
--------------------------------------------------------------------------------
```python
import asyncio
import os
from dotenv import load_dotenv
from llama_index.core.agent import ReActAgent, ReActChatFormatter
from llama_index.core.agent.react.prompts import REACT_CHAT_SYSTEM_HEADER
from llama_index.core.llms import ChatMessage
from llama_index.llms.openai_like import OpenAILike
from llama_index.tools.mcp import BasicMCPClient, McpToolSpec
load_dotenv()
# MCP Server Connection Parameters
MCP_HOST = os.getenv("MCP_HOST")
MCP_PORT = os.getenv("MCP_PORT")
SYSTEM_PROMPT = """\
You are an agricultural research assistant. I am researching the impact of climate change on crop yields.
I need you to help me analyze some research data stored in the file system.
Please first list the files in the `fs://data/` directory,
Then you must read the file `README.txt` to understand the data,
then read the necessary CSV files, and summarize the main findings.
"""
async def get_agent(tools: McpToolSpec):
tools = await tools.to_tool_list_async()
agent = ReActAgent.from_tools(
llm=OpenAILike(
model=os.getenv("OPENAI_MODEL"),
api_base=os.getenv("OPENAI_ENDPOINT"),
api_key=os.getenv("OPENAI_API_KEY"),
is_chat_model=True,
),
tools=list(tools),
react_chat_formatter=ReActChatFormatter(
system_header=SYSTEM_PROMPT + "\n" + REACT_CHAT_SYSTEM_HEADER,
),
max_iterations=20,
verbose=True,
)
return agent
async def handle_user_message(message_content: str, agent: ReActAgent):
user_message = ChatMessage.from_str(role="user", content=message_content)
response = await agent.achat(message=user_message.content)
print(response.response)
async def main():
mcp_tool = McpToolSpec(client=BasicMCPClient(f"http://{MCP_HOST}:{MCP_PORT}/sse"))
agent = await get_agent(mcp_tool)
try:
await handle_user_message("What is the main finding of the data?", agent)
except Exception as e:
print(f"Unexpected error: {type(e)}, {e}")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/tests/test_integration_fs.py:
--------------------------------------------------------------------------------
```python
import base64
import json
import shutil
import tempfile
from pathlib import Path
import pytest
from mcp_server_opendal.server import (
OPENDAL_OPTIONS,
get_info,
list,
mcp,
read,
register_resources,
)
@pytest.fixture
def test_files():
"""Create a temporary directory with test files"""
# Create a temporary directory
temp_dir = tempfile.mkdtemp()
try:
# Create some test files and directories
root_path = Path(temp_dir)
# Create a text file
text_file = root_path / "test_text.txt"
text_file.write_text(
"This is a test text file\nSecond line content", encoding="utf-8"
)
# Create a binary file
bin_file = root_path / "binary_file.bin"
bin_file.write_bytes(b"\x00\x01\x02\x03\xff\xfe\xab\xcd")
# Create a JSON file
json_file = root_path / "config.json"
json_data = {"name": "test", "value": 123, "enabled": True}
json_file.write_text(
json.dumps(json_data, ensure_ascii=False), encoding="utf-8"
)
# Create a subdirectory and a nested file
subdir = root_path / "subdir"
subdir.mkdir()
nested_file = subdir / "nested_file.log"
nested_file.write_text("Subdirectory file content", encoding="utf-8")
yield temp_dir
finally:
# Clean up the temporary directory
shutil.rmtree(temp_dir)
@pytest.fixture
def setup_env(test_files, monkeypatch):
"""Set environment variables and OpenDAL configuration"""
OPENDAL_OPTIONS.clear()
OPENDAL_OPTIONS.update(
{
"fs_type": "fs",
"fs_root": test_files,
}
)
monkeypatch.setenv("OPENDAL_FS_TYPE", "fs")
monkeypatch.setenv("OPENDAL_FS_ROOT", test_files)
# Re-register resources
register_resources()
yield
@pytest.mark.asyncio
async def test_list_resources(setup_env):
"""Test listing available resources"""
resources = await mcp.list_resources()
assert len(resources) > 0
resource_schemes = [str(r.uri.scheme) for r in resources if hasattr(r, "uri")]
assert "fs" in resource_schemes
@pytest.mark.asyncio
async def test_list_directory_contents(setup_env, test_files):
"""Test listing directory contents"""
result = await list("fs://")
assert "test_text.txt" in result
assert "binary_file.bin" in result
assert "config.json" in result
assert "subdir" in result
subdir_result = await list("fs://subdir/")
assert "nested_file.log" in subdir_result
@pytest.mark.asyncio
async def test_read_text_file(setup_env):
"""Test reading a text file"""
result = await read("fs://test_text.txt")
assert "This is a test text file\nSecond line content" in result["content"]
assert result["mime_type"] in ["text/plain", None]
@pytest.mark.asyncio
async def test_read_binary_file(setup_env):
"""Test reading a binary file"""
result = await read("fs://binary_file.bin")
expected_binary = b"\x00\x01\x02\x03\xff\xfe\xab\xcd"
decoded = base64.b64decode(result["content"])
assert decoded == expected_binary
assert result.get("is_binary", False) is True
@pytest.mark.asyncio
async def test_read_json_file(setup_env):
"""Test reading a JSON file"""
result = await read("fs://config.json")
assert "test" in result["content"]
assert result["mime_type"] in ["application/json", "text/plain", None]
@pytest.mark.asyncio
async def test_read_json_file_with_read(setup_env):
"""Test reading a JSON file with read"""
result = await read("fs://config.json")
assert "test" in result["content"]
assert result["mime_type"] in ["application/json", "text/plain", None]
@pytest.mark.asyncio
async def test_get_file_info(setup_env):
"""Test getting file information"""
result = await get_info("fs://test_text.txt")
assert "test_text.txt" in result
content = "This is a test text file\nSecond line content"
expected_size = len(content.encode("utf-8"))
assert f"Size: {expected_size} bytes" in result
@pytest.mark.asyncio
async def test_read_nested_file(setup_env):
"""Test reading a nested file"""
result = await read("fs://subdir/nested_file.log")
assert "Subdirectory file content" in result["content"]
```
--------------------------------------------------------------------------------
/src/mcp_server_opendal/resource.py:
--------------------------------------------------------------------------------
```python
import logging
import os
from typing import Any, List, Tuple, Union
import opendal
from dotenv import load_dotenv
from mcp.server.fastmcp.resources import Resource
from opendal import Entry, Metadata
from opendal.layers import RetryLayer
from pydantic import Field
logger = logging.getLogger("mcp_server_opendal")
# all opendal related environment variables
load_dotenv()
OPENDAL_OPTIONS = {
k.replace("OPENDAL_", "").lower(): v.lower()
for k, v in os.environ.items()
if k.startswith("OPENDAL_")
}
class OpendalResource(Resource):
"""
OpenDAL Resource provider that handles interactions with different storage services.
Acts both as a FastMCP Resource and as an interface to OpenDAL operations.
This resource provider will read the environment variables for the given scheme and use them to configure the opendal operator.
For example, if the scheme is "mys3", the environment variables should be:
```
OPENDAL_MYS3_TYPE=s3
OPENDAL_MYS3_BUCKET=mybucket
OPENDAL_MYS3_REGION=us-east-1
OPENDAL_MYS3_ENDPOINT=http://localhost:9000
OPENDAL_MYS3_ACCESS_KEY_ID=myaccesskey
OPENDAL_MYS3_SECRET_ACCESS_KEY=mysecretkey
```
"""
scheme: str = Field("", description="Storage scheme (e.g., s3, fs)")
op: Any = None
def __init__(self, scheme: str):
scheme = scheme.lower()
# Configure OpenDAL operator
opendal_type = OPENDAL_OPTIONS.get(f"{scheme}_type")
opendal_options = {
k.replace(f"{scheme}_", ""): v
for k, v in OPENDAL_OPTIONS.items()
if k.startswith(f"{scheme}_")
}
logger.debug(f"Initializing OpendalResource with options: {opendal_options}")
# Initialize FastMCP Resource
super().__init__(
uri=f"{scheme}://",
name=f"{scheme} storage",
description=f"Storage service accessed via OpenDAL {scheme} protocol",
mime_type="application/vnd.folder", # for containers/directories
)
# Initialize OpenDAL operator
self.scheme = scheme
self.op = opendal.AsyncOperator(opendal_type, **opendal_options).layer(
RetryLayer()
)
logger.debug(f"Initialized OpendalResource: {self.op}")
async def read(self) -> str:
"""
Read method for scheme-level resources, only returns descriptive information
Actual file content is obtained through path-specific methods
"""
info = f"OpenDAL {self.scheme} storage resource.\n\n"
info += f"To access specific files, use: {self.scheme}://path/to/file\n"
info += f"To list directory contents, use the 'list' tool with: {self.scheme}://path/to/dir\n"
return info
async def list(
self, prefix: Union[str, os.PathLike], max_keys: int = 1000
) -> List[Entry]:
"""List entries with the given prefix"""
logger.debug(f"Listing entries with prefix: {prefix}")
if max_keys <= 0:
return []
entries = []
it = await self.op.list(prefix)
async for entry in it:
logger.debug(f"Listing entry: {entry}")
entries.append(entry)
if len(entries) >= max_keys:
break
return entries
async def read_path(self, path: Union[str, os.PathLike]) -> bytes:
"""Read content from a specific path"""
logger.debug(f"Reading path: {path}")
return await self.op.read(path)
async def stat(self, path: Union[str, os.PathLike]) -> Metadata:
"""Get metadata for a specific path"""
logger.debug(f"Statting path: {path}")
return await self.op.stat(path)
def is_text_file(self, path: Union[str, os.PathLike]) -> bool:
"""Determine if a file is text-based by its extension"""
text_extensions = {
".txt",
".log",
".json",
".xml",
".yml",
".yaml",
".md",
".csv",
".ini",
".conf",
".py",
".js",
".html",
".css",
".sh",
".bash",
".cfg",
".properties",
}
return any(path.lower().endswith(ext) for ext in text_extensions)
def parse_uri(uri: str) -> Tuple[OpendalResource, str]:
"""Parse a URI into a resource and path"""
from urllib.parse import unquote, urlparse
logger.debug(f"Parsing URI: {uri}")
parsed = urlparse(uri)
scheme = parsed.scheme
path = parsed.netloc + parsed.path
path = unquote(path) # Decode URL-encoded characters
return (OpendalResource(scheme), path)
```
--------------------------------------------------------------------------------
/src/mcp_server_opendal/server.py:
--------------------------------------------------------------------------------
```python
import argparse
import base64
import logging
import os
from typing import Any, Dict
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from mcp_server_opendal.resource import OPENDAL_OPTIONS, OpendalResource, parse_uri
load_dotenv()
default_log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
log_level = getattr(logging, default_log_level, logging.INFO)
# Initialize the FastMCP server - Use the default log level
mcp = FastMCP("opendal_service", log_level=default_log_level)
# Configure logging - still use numeric constants to configure standard logging
logging.basicConfig(level=log_level)
logger = logging.getLogger("mcp_server_opendal")
def register_resources():
"""Register all OpenDAL resources"""
# Get all available schemes
schemes = {k.split("_")[0] for k in OPENDAL_OPTIONS.keys()}
# Clean existing resources and register new resources
for scheme in schemes:
try:
resource = OpendalResource(scheme)
mcp.add_resource(resource)
logger.info(f"Registered OpenDAL resource for scheme: {scheme}")
except Exception as e:
logger.error(
f"Failed to register OpenDAL resource for scheme {scheme}: {e}"
)
# Register resources
register_resources()
# Create a resource template, used to dynamically generate resources
@mcp.resource("{scheme}://{path}")
async def opendal_resource(scheme: str, path: str) -> Dict[str, Any]:
"""
Access files in OpenDAL service
Args:
scheme: storage service scheme
path: file path
Returns:
Dictionary containing file content and metadata
"""
logger.debug(f"Reading template resource content: {scheme}://{path}")
try:
resource = OpendalResource(scheme)
data = await resource.read_path(path)
metadata = await resource.stat(path)
if resource.is_text_file(path):
return {
"content": data.decode("utf-8"),
"mime_type": metadata.content_type or "text/plain",
"size": metadata.content_length,
"is_binary": False,
}
else:
return {
"content": base64.b64encode(data).decode("ascii"),
"mime_type": metadata.content_type or "application/octet-stream",
"size": metadata.content_length,
"is_binary": True,
}
except Exception as e:
logger.error(f"Failed to read resource: {e!s}")
return {"error": str(e)}
# Modify the list tool to ensure the path ends with a slash
@mcp.tool()
async def list(uri: str) -> str:
"""
List files in OpenDAL service
Args:
uri: resource URI, e.g. mys3://path/to/dir
Returns:
String containing directory content
"""
logger.debug(f"Listing directory content: {uri}")
try:
resource, path = parse_uri(uri)
# Ensure directory path ends with a slash
if path and not path.endswith("/"):
path = path + "/"
entries = await resource.list(path)
return str(entries)
except Exception as e:
logger.error(f"Failed to list directory content: {e!s}")
return f"Error: {e!s}"
# Read content of file
@mcp.tool()
async def read(uri: str) -> Dict[str, Any]:
"""
Read file content from OpenDAL service
Args:
uri: resource URI, e.g. mys3://path/to/file
Returns:
File content or error information
"""
logger.debug(f"Reading file content: {uri}")
try:
resource, path = parse_uri(uri)
# Directly call the resource function to get content
return await opendal_resource(resource.scheme, path)
except Exception as e:
logger.error(f"Failed to read file content: {e!s}")
return {"error": str(e)}
# Get file metadata
@mcp.tool()
async def get_info(uri: str) -> str:
"""
Get metadata of file in OpenDAL service
Args:
uri: resource URI, e.g. mys3://path/to/file
Returns:
File metadata information
"""
logger.debug(f"Getting file info: {uri}")
try:
resource, path = parse_uri(uri)
metadata = await resource.stat(path)
result = f"File: {path}\n"
result += f"Size: {metadata.content_length} bytes\n"
result += f"Type: {metadata.content_type}\n"
return result
except Exception as e:
logger.error(f"Failed to get file info: {e!s}")
return f"Error: {e!s}"
def main():
parser = argparse.ArgumentParser(description="OpenDAL MCP server")
parser.add_argument(
"--transport",
type=str,
choices=["stdio", "sse"],
default="stdio",
help="Transport method (stdio or sse)",
)
args = parser.parse_args()
if args.transport == "sse":
mcp.run("sse")
else:
mcp.run("stdio")
if __name__ == "__main__":
main()
```