#
tokens: 13871/50000 32/32 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .env.example
├── .github
│   └── workflows
│       ├── ruff.yml
│       └── workflow.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── chatmcp.yaml
├── Dockerfile
├── docs
│   └── images
│       └── logo.png
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│   └── mcp_server
│       ├── __init__.py
│       ├── application.py
│       ├── config
│       │   ├── __init__.py
│       │   └── config.py
│       ├── consts
│       │   ├── __init__.py
│       │   └── consts.py
│       ├── core
│       │   ├── __init__.py
│       │   ├── cdn
│       │   │   ├── __init__.py
│       │   │   ├── cdn.py
│       │   │   └── tools.py
│       │   ├── media_processing
│       │   │   ├── __init__.py
│       │   │   ├── processing.py
│       │   │   ├── tools.py
│       │   │   └── utils.py
│       │   ├── storage
│       │   │   ├── __init__.py
│       │   │   ├── resource.py
│       │   │   ├── storage.py
│       │   │   └── tools.py
│       │   └── version
│       │       ├── __init__.py
│       │       ├── tools.py
│       │       └── version.py
│       ├── README.md
│       ├── resource
│       │   ├── __init__.py
│       │   └── resource.py
│       ├── server.py
│       └── tools
│           ├── __init__.py
│           └── tools.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info

# Virtual environments
.venv
.env
.env.dora
.env.kodo

src/mcp_server/test.py
```

--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------

```
# S3/Kodo 认证信息
QINIU_ACCESS_KEY=your_access_key
QINIU_SECRET_KEY=your_secret_key
QINIU_REGION_NAME=your_region
QINIU_ENDPOINT_URL=endpoint_url # eg:https://s3.your_region.qiniucs.com
QINIU_BUCKETS=bucket1,bucket2,bucket3

```

--------------------------------------------------------------------------------
/.github/workflows/workflow.yml:
--------------------------------------------------------------------------------

```yaml

```

--------------------------------------------------------------------------------
/src/mcp_server/config/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_server/consts/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_server/resource/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_server/tools/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_server/core/version/version.py:
--------------------------------------------------------------------------------

```python

VERSION = '1.2.3'
```

--------------------------------------------------------------------------------
/src/mcp_server/consts/consts.py:
--------------------------------------------------------------------------------

```python
LOGGER_NAME = "qiniu-mcp"

```

--------------------------------------------------------------------------------
/src/mcp_server/core/version/__init__.py:
--------------------------------------------------------------------------------

```python
from .tools import register_tools
from ...config import config


def load(cfg: config.Config):
    register_tools()


__all__ = ["load"]

```

--------------------------------------------------------------------------------
/.github/workflows/ruff.yml:
--------------------------------------------------------------------------------

```yaml
name: Ruff
on: [push, pull_request]
jobs:
  ruff:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: astral-sh/ruff-action@v3

```

--------------------------------------------------------------------------------
/src/mcp_server/core/cdn/__init__.py:
--------------------------------------------------------------------------------

```python
from .tools import register_tools
from ...config import config
from .cdn import CDNService


def load(cfg: config.Config):
    cdn = CDNService(cfg)
    register_tools(cdn)


__all__ = [
    "load",
]

```

--------------------------------------------------------------------------------
/src/mcp_server/core/media_processing/__init__.py:
--------------------------------------------------------------------------------

```python
from . import processing
from .tools import register_tools
from ...config import config


def load(cfg: config.Config):
    cli = processing.MediaProcessingService(cfg)
    register_tools(cfg, cli)


__all__ = [
    "load",
]

```

--------------------------------------------------------------------------------
/src/mcp_server/__init__.py:
--------------------------------------------------------------------------------

```python
import logging

from .consts import consts
from .server import main

# Configure logging
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(consts.LOGGER_NAME)
logger.info("Initializing MCP server package")

__all__ = ["main"]

```

--------------------------------------------------------------------------------
/src/mcp_server/core/storage/__init__.py:
--------------------------------------------------------------------------------

```python
from .storage import StorageService
from .tools import register_tools
from .resource import register_resource_provider
from ...config import config


def load(cfg: config.Config):
    storage = StorageService(cfg)
    register_tools(storage)
    register_resource_provider(storage)


__all__ = ["load"]

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "qiniu-mcp-server"
version = "1.2.3"
description = "A MCP server project of Qiniu."
requires-python = ">=3.12"
authors = [
    { name = "Qiniu", email = "[email protected]" },
]
keywords = ["qiniu", "mcp", "llm"]
dependencies = [
    "aioboto3>=13.2.0",
    "fastjsonschema>=2.21.1",
    "httpx>=0.28.1",
    "mcp[cli]>=1.0.0",
    "openai>=1.66.3",
    "pip>=25.0.1",
    "python-dotenv>=1.0.1",
    "qiniu>=7.16.0",
]

[build-system]
requires = [ "hatchling",]
build-backend = "hatchling.build"

[project.scripts]
qiniu-mcp-server = "mcp_server:main"

[tool.hatch.build.targets.wheel]
packages = ["src/mcp_server"]
```

--------------------------------------------------------------------------------
/src/mcp_server/core/version/tools.py:
--------------------------------------------------------------------------------

```python

from mcp import types

from . import version
from ...tools import tools


class _ToolImpl:
    def __init__(self):
        pass

    @tools.tool_meta(
        types.Tool(
            name="version",
            description="qiniu mcp server version info.",
            inputSchema={
                "type": "object",
                "required": [],
            }
        )
    )
    def version(self, **kwargs) -> list[types.TextContent]:
        return [types.TextContent(type="text", text=version.VERSION)]

def register_tools():
    tool_impl = _ToolImpl()
    tools.auto_register_tools(
        [
            tool_impl.version,
        ]
    )
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv

WORKDIR /app

ENV UV_COMPILE_BYTECODE=1
ENV UV_LINK_MODE=copy

RUN --mount=type=cache,target=/root/.cache/uv \
    --mount=type=bind,source=uv.lock,target=uv.lock \
    --mount=type=bind,source=pyproject.toml,target=pyproject.toml \
    uv sync --frozen --no-install-project --no-dev --no-editable

ADD . /app
RUN --mount=type=cache,target=/root/.cache/uv \
    uv sync --frozen --no-dev --no-editable

FROM python:3.12-slim-bookworm

WORKDIR /app

#COPY --from=uv /root/.local /root/.local
COPY --from=uv --chown=app:app /app/.venv /app/.venv

# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"

# when running the container, add --db-path and a bind mount to the host's db file
ENTRYPOINT ["qiniu-mcp-server"]
```

--------------------------------------------------------------------------------
/chatmcp.yaml:
--------------------------------------------------------------------------------

```yaml
params:
  type: object
  properties:
    QINIU_ACCESS_KEY:
      type: string
      description: The access key for your Qiniu account.
    QINIU_SECRET_KEY:
      type: string
      description: The secret key for your Qiniu account.
    QINIU_REGION_NAME:
      type: string
      description: The region name for your config of Qiniu buckets.
    QINIU_ENDPOINT_URL:
      type: string
      description: The endpoint URL for your config of Qiniu buckets. eg:https://s3.your_region.qiniucs.com.
    QINIU_BUCKETS:
      type: string
      description: The buckets of Qiniu, If there are multiple extra items, separate them with commas. eg:bucket1,bucket2.
  required:
    - QINIU_ACCESS_KEY
    - QINIU_SECRET_KEY
    - QINIU_REGION_NAME
    - QINIU_ENDPOINT_URL
    - QINIU_BUCKETS

uvx:
  command:
    | uvx qiniu-mcp-server
  config:
    | {
        "mcpServers": {
          "qiniu-mcp-server": {
            "command": "uvx",
            "args": [
              "qiniu-mcp-server"
            ],
            "env": {
              "QINIU_ACCESS_KEY": "YOUR QINIU ACCESS KEY",
              "QINIU_SECRET_KEY": "YOUR QINIU SECRET KEY",
              "QINIU_REGION_NAME": "YOUR QINIU REGION NAME",
              "QINIU_ENDPOINT_URL": "YOUR QINIU ENDPOINT URL"
            }
          }
        }
      }
```

--------------------------------------------------------------------------------
/src/mcp_server/application.py:
--------------------------------------------------------------------------------

```python
import logging
from contextlib import aclosing

import mcp.types as types
from mcp.types import EmptyResult

from mcp import LoggingLevel
from mcp.server.lowlevel import Server
from mcp.types import Tool, AnyUrl

from . import core
from .consts import consts
from .resource import resource
from .tools import tools


logger = logging.getLogger(consts.LOGGER_NAME)

core.load()
server = Server("qiniu-mcp-server")


@server.set_logging_level()
async def set_logging_level(level: LoggingLevel) -> EmptyResult:
    logger.setLevel(level.lower())
    await server.request_context.session.send_log_message(
        level="warning", data=f"Log level set to {level}", logger=consts.LOGGER_NAME
    )
    return EmptyResult()


@server.list_resources()
async def list_resources(**kwargs) -> list[types.Resource]:
    resource_list = []
    async with aclosing(resource.list_resources(**kwargs)) as results:
        async for result in results:
            resource_list.append(result)
    return resource_list


@server.read_resource()
async def read_resource(uri: AnyUrl) -> str:
    return await resource.read_resource(uri)


@server.list_tools()
async def handle_list_tools() -> list[Tool]:
    return tools.all_tools()


@server.call_tool()
async def call_tool(name: str, arguments: dict):
    return await tools.call_tool(name, arguments)

```

--------------------------------------------------------------------------------
/src/mcp_server/core/media_processing/processing.py:
--------------------------------------------------------------------------------

```python
import qiniu

from ...config import config


class MediaProcessingService:
    def __init__(self, cfg: config.Config):
        self.cfg = cfg
        self.auth = qiniu.Auth(cfg.access_key, cfg.secret_key)

    def execute_fop(
        self,
        bucket: str,
        key: str,
        fops: str = None,
        persistent_type: int = None,
        workflow_template_id: str = None,
        pipeline: str = None,
        notify_url: str = None,
    ) -> dict:
        """
        执行持久化处理
        :param bucket:
        :param key:
        :param fops:
        :param persistent_type:
        :param workflow_template_id:
        :param pipeline:
        :param notify_url:
        :return: 返回字典 dict
            获取 persistentId key 为 persistentId
        """

        persistent_fop = qiniu.PersistentFop(
            auth=self.auth, bucket=bucket, pipeline=pipeline, notify_url=notify_url
        )
        result, info = persistent_fop.execute(
            key=key,
            fops=fops,
            persistent_type=persistent_type,
            workflow_template_id=workflow_template_id,
        )
        return result

    def get_fop_status(self, persistent_id: str) -> dict:
        """
        查询 fop 执行状态
        :param persistent_id:
        :return: dict
            持久化处理的状态,详见 https://developer.qiniu.com/dora/1294/persistent-processing-status-query-prefop
        """
        persistent_fop = qiniu.PersistentFop(auth=self.auth, bucket="")
        result, info = persistent_fop.get_status(persistent_id=persistent_id)
        return result

```

--------------------------------------------------------------------------------
/src/mcp_server/resource/resource.py:
--------------------------------------------------------------------------------

```python
import logging
from abc import abstractmethod
from typing import Dict, AsyncGenerator, Iterable

from mcp import types
from mcp.server.lowlevel.helper_types import ReadResourceContents

from ..consts import consts

logger = logging.getLogger(consts.LOGGER_NAME)

ResourceContents = str | bytes | Iterable[ReadResourceContents]

class ResourceProvider:
    def __init__(self, scheme: str):
        self.scheme = scheme

    @abstractmethod
    async def list_resources(self, **kwargs) -> list[types.Resource]:
        pass

    @abstractmethod
    async def read_resource(self, uri: types.AnyUrl, **kwargs) -> ResourceContents:
        pass


_all_resource_providers: Dict[str, ResourceProvider] = {}


async def list_resources(**kwargs) -> AsyncGenerator[types.Resource, None]:
    if len(_all_resource_providers) == 0:
        return

    for provider in _all_resource_providers.values():
        resources = await provider.list_resources(**kwargs)
        for resource in resources:
            yield resource
    return


async def read_resource(uri: types.AnyUrl, **kwargs) -> ResourceContents:
    if len(_all_resource_providers) == 0:
        return ""

    provider = _all_resource_providers.get(uri.scheme)
    return await provider.read_resource(uri=uri, **kwargs)


def register_resource_provider(provider: ResourceProvider):
    """注册工具,禁止重复名称"""
    name = provider.scheme
    if name in _all_resource_providers:
        raise ValueError(f"Resource Provider {name} already registered")
    _all_resource_providers[name] = provider


__all__ = [
    "ResourceContents",
    "ResourceProvider",
    "list_resources",
    "read_resource",
    "register_resource_provider",
]

```

--------------------------------------------------------------------------------
/src/mcp_server/server.py:
--------------------------------------------------------------------------------

```python
import asyncio
import logging

import anyio
import click

from . import application
from .consts import consts

logger = logging.getLogger(consts.LOGGER_NAME)
logger.info("Starting MCP server")

SAMPLE_RESOURCES = {
    "greeting": "Hello! This is a MCP Server for Qiniu.",
    "help": "This server provides a few resources and tools for Qiniu.",
    "about": "This is the MCP server implementation.",
}


@click.command()
@click.option("--port", default=8000, help="Port to listen on for SSE")
@click.option(
    "--transport",
    type=click.Choice(["stdio", "sse"]),
    default="stdio",
    help="Transport type",
)
def main(port: int, transport: str) -> int:
    app = application.server

    if transport == "sse":
        from mcp.server.sse import SseServerTransport
        from starlette.applications import Starlette
        from starlette.routing import Mount, Route

        sse = SseServerTransport("/messages/")

        async def handle_sse(request):
            async with sse.connect_sse(
                request.scope, request.receive, request._send
            ) as streams:
                await app.run(
                    streams[0], streams[1], app.create_initialization_options()
                )

        starlette_app = Starlette(
            debug=True,
            routes=[
                Route("/sse", endpoint=handle_sse),
                Mount("/messages/", app=sse.handle_post_message),
            ],
        )

        import uvicorn

        uvicorn.run(starlette_app, host="0.0.0.0", port=port)
    else:
        from mcp.server.stdio import stdio_server

        async def arun():
            async with stdio_server() as streams:
                await app.run(
                    streams[0], streams[1], app.create_initialization_options()
                )

        anyio.run(arun)

    return 0


if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/src/mcp_server/config/config.py:
--------------------------------------------------------------------------------

```python
import logging
import os
from typing import List
from attr import dataclass
from dotenv import load_dotenv

from ..consts import consts

_CONFIG_ENV_KEY_ACCESS_KEY = "QINIU_ACCESS_KEY"
_CONFIG_ENV_KEY_SECRET_KEY = "QINIU_SECRET_KEY"
_CONFIG_ENV_KEY_ENDPOINT_URL = "QINIU_ENDPOINT_URL"
_CONFIG_ENV_KEY_REGION_NAME = "QINIU_REGION_NAME"
_CONFIG_ENV_KEY_BUCKETS = "QINIU_BUCKETS"

logger = logging.getLogger(consts.LOGGER_NAME)

# Load environment variables at package initialization
load_dotenv()


@dataclass
class Config:
    access_key: str
    secret_key: str
    endpoint_url: str
    region_name: str
    buckets: List[str]


def load_config() -> Config:
    config = Config(
        access_key=os.getenv(_CONFIG_ENV_KEY_ACCESS_KEY),
        secret_key=os.getenv(_CONFIG_ENV_KEY_SECRET_KEY),
        endpoint_url=os.getenv(_CONFIG_ENV_KEY_ENDPOINT_URL),
        region_name=os.getenv(_CONFIG_ENV_KEY_REGION_NAME),
        buckets=_get_configured_buckets_from_env(),
    )

    if not config.access_key or len(config.access_key) == 0:
        config.access_key = "YOUR_QINIU_ACCESS_KEY"
    if not config.secret_key or len(config.access_key) == 0:
        config.secret_key = "YOUR_QINIU_SECRET_KEY"
    if not config.endpoint_url or len(config.access_key) == 0:
        config.endpoint_url = "YOUR_QINIU_ENDPOINT_URL"
    if not config.region_name or len(config.access_key) == 0:
        config.region_name = "YOUR_QINIU_REGION_NAME"

    logger.info(f"Configured   access_key: {config.access_key}")
    logger.info(f"Configured endpoint_url: {config.endpoint_url}")
    logger.info(f"Configured  region_name: {config.region_name}")
    logger.info(f"Configured      buckets: {config.buckets}")
    return config


def _get_configured_buckets_from_env() -> List[str]:
    bucket_list = os.getenv(_CONFIG_ENV_KEY_BUCKETS)
    if bucket_list:
        buckets = [b.strip() for b in bucket_list.split(",")]
        return buckets
    else:
        return []

```

--------------------------------------------------------------------------------
/src/mcp_server/core/cdn/cdn.py:
--------------------------------------------------------------------------------

```python
import logging

from qiniu import CdnManager, Auth
from qiniu.http import ResponseInfo
from typing import List, Optional, Dict
from pydantic import BaseModel
from dataclasses import dataclass

from ...consts import consts
from ...config import config

logger = logging.getLogger(consts.LOGGER_NAME)


@dataclass
class PrefetchUrlsResult(BaseModel):
    code: Optional[int] = None
    error: Optional[str] = None
    requestId: Optional[str] = None
    invalidUrls: Optional[List[str]] = None
    quotaDay: Optional[int] = None
    surplusDay: Optional[int] = None


@dataclass
class RefreshResult(BaseModel):
    code: Optional[int] = None
    error: Optional[str] = None
    requestId: Optional[str] = None
    taskIds: Optional[Dict[str, str]] = None
    invalidUrls: Optional[List[str]] = None
    invalidDirs: Optional[List[str]] = None
    urlQuotaDay: Optional[int] = None
    urlSurplusDay: Optional[int] = None
    dirQuotaDay: Optional[int] = None
    dirSurplusDay: Optional[int] = None


def _raise_if_resp_error(resp: ResponseInfo):
    if resp.ok():
        return
    raise RuntimeError(f"qiniu response error: {str(resp)}")


class CDNService:
    def __init__(self, cfg: config.Config):
        auth = Auth(access_key=cfg.access_key, secret_key=cfg.secret_key)
        self._cdn_manager = CdnManager(auth)

    def prefetch_urls(self, urls: List[str] = []) -> PrefetchUrlsResult:
        if not urls:
            raise ValueError("urls is empty")
        info, resp = self._cdn_manager.prefetch_urls(urls)
        _raise_if_resp_error(resp)
        return PrefetchUrlsResult.model_validate(info)

    def refresh(self, urls: List[str] = [], dirs: List[str] = []) -> RefreshResult:
        if not urls and not dirs:
            raise ValueError("urls and dirs cannot be empty")
        info, resp = self._cdn_manager.refresh_urls_and_dirs(urls, dirs)
        _raise_if_resp_error(resp)
        return RefreshResult.model_validate(info)


__all__ = [
    "PrefetchUrlsResult",
    "RefreshResult",
    "CDNService",
]

```

--------------------------------------------------------------------------------
/src/mcp_server/tools/tools.py:
--------------------------------------------------------------------------------

```python
import functools
import inspect
import asyncio
import logging
import fastjsonschema

from typing import List, Dict, Callable, Optional, Union, Awaitable
from dataclasses import dataclass
from mcp import types

from .. import consts

logger = logging.getLogger(consts.LOGGER_NAME)

ToolResult = list[types.TextContent | types.ImageContent | types.EmbeddedResource]
ToolFunc = Callable[..., ToolResult]
AsyncToolFunc = Callable[..., Awaitable[ToolResult]]


@dataclass
class _ToolEntry:
    meta: types.Tool
    func: Optional[ToolFunc]
    async_func: Optional[AsyncToolFunc]
    input_validator: Optional[Callable[..., None]]


# 初始化全局工具字典
_all_tools: Dict[str, _ToolEntry] = {}


def all_tools() -> List[types.Tool]:
    """获取所有工具"""
    if not _all_tools:
        raise ValueError("No tools registered")
    return list(map(lambda x: x.meta, _all_tools.values()))


def register_tool(
        meta: types.Tool,
        func: Union[ToolFunc, AsyncToolFunc],
) -> None:
    """注册工具,禁止重复名称"""
    name = meta.name
    if name in _all_tools:
        raise ValueError(f"Tool {name} already registered")

    # 判断是否为异步函数
    if inspect.iscoroutinefunction(func):
        async_func = func
        func = None
    else:
        async_func = None
    entry = _ToolEntry(
        meta=meta,
        func=func,
        async_func=async_func,
        input_validator=fastjsonschema.compile(meta.inputSchema),
    )
    _all_tools[name] = entry


def tool_meta(meta: types.Tool):
    def _add_metadata(**kwargs):
        def decorator(func):
            if inspect.iscoroutinefunction(func):

                @functools.wraps(func)
                async def async_wrapper(*args, **kwargs):
                    return await func(*args, **kwargs)

                wrapper = async_wrapper
            else:

                @functools.wraps(func)
                def sync_wrapper(*args, **kwargs):
                    return func(*args, **kwargs)

                wrapper = sync_wrapper
            for key, value in kwargs.items():
                setattr(wrapper, key, value)
            return wrapper

        return decorator

    return _add_metadata(tool_meta=meta)


def auto_register_tools(func_list: list[Union[ToolFunc, AsyncToolFunc]]):
    """尝试自动注册带有 tool_meta 的工具"""
    for func in func_list:
        if hasattr(func, "tool_meta"):
            meta = getattr(func, "tool_meta")
            register_tool(meta=meta, func=func)
        else:
            raise ValueError("func must have tool_meta attribute")


async def call_tool(name: str, arguments: dict) -> ToolResult:
    """执行工具并处理异常"""

    # 工具存在性校验
    if (tool_entry := _all_tools.get(name)) is None:
        raise ValueError(f"Tool {name} not found")

    # 工具输入参数校验
    # 把 None 移除否则校验不过
    arguments = {k: v for k, v in arguments.items() if v is not None}
    try:
        tool_entry.input_validator(arguments)
    except fastjsonschema.JsonSchemaException as e:
        raise ValueError(f"Invalid arguments for tool {name}: {e}")

    try:
        if tool_entry.async_func is not None:
            # 异步函数直接执行
            result = await tool_entry.async_func(**arguments)
            return result
        elif tool_entry.func is not None:
            # 同步函数需要到线程池中转化为异步函数执行
            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(
                executor=None,  # 使用全局线程池
                func=lambda: tool_entry.func(**arguments),
            )
            return result
        else:
            raise ValueError(f"Unexpected tool entry: {tool_entry}")
    except Exception as e:
        raise RuntimeError(f"Tool {name} execution error: {str(e)}") from e


# 明确导出接口
__all__ = [
    "all_tools",
    "register_tool",
    "call_tool",
    "tool_meta",
    "auto_register_tools",
]

```

--------------------------------------------------------------------------------
/src/mcp_server/core/storage/resource.py:
--------------------------------------------------------------------------------

```python
import asyncio
import logging
import base64

from mcp import types
from urllib.parse import unquote

from mcp.server.lowlevel.helper_types import ReadResourceContents

from .storage import StorageService
from ...consts import consts
from ...resource import resource
from ...resource.resource import ResourceContents

logger = logging.getLogger(consts.LOGGER_NAME)


class _ResourceProvider(resource.ResourceProvider):
    def __init__(self, storage: StorageService):
        super().__init__("s3")
        self.storage = storage

    async def list_resources(
        self, prefix: str = "", max_keys: int = 20, **kwargs
    ) -> list[types.Resource]:
        """
        List S3 buckets and their contents as resources with pagination
        Args:
            prefix: Prefix listing after this bucket name
            max_keys: Returns the maximum number of keys (up to 100), default 20
        """
        resources = []
        logger.debug("Starting to list resources")
        logger.debug(f"Configured buckets: {self.storage.config.buckets}")

        try:
            # Get limited number of buckets
            buckets = await self.storage.list_buckets(prefix)

            # limit concurrent operations
            async def process_bucket(bucket):
                bucket_name = bucket["Name"]
                logger.debug(f"Processing bucket: {bucket_name}")

                try:
                    # List objects in the bucket with a reasonable limit
                    objects = await self.storage.list_objects(
                        bucket_name, max_keys=max_keys
                    )

                    for obj in objects:
                        if "Key" in obj and not obj["Key"].endswith("/"):
                            object_key = obj["Key"]
                            if self.storage.is_markdown_file(object_key):
                                mime_type = "text/markdown"
                            elif self.storage.is_image_file(object_key):
                                mime_type = "image/png"
                            else:
                                mime_type = "text/plain"

                            resource_entry = types.Resource(
                                uri=f"s3://{bucket_name}/{object_key}",
                                name=object_key,
                                mimeType=mime_type,
                                description=str(obj),
                            )
                            resources.append(resource_entry)
                            logger.debug(f"Added resource: {resource_entry.uri}")

                except Exception as e:
                    logger.error(
                        f"Error listing objects in bucket {bucket_name}: {str(e)}"
                    )

            # Use semaphore to limit concurrent bucket processing
            semaphore = asyncio.Semaphore(3)  # Limit concurrent bucket processing

            async def process_bucket_with_semaphore(bucket):
                async with semaphore:
                    await process_bucket(bucket)

            # Process buckets concurrently
            await asyncio.gather(
                *[process_bucket_with_semaphore(bucket) for bucket in buckets]
            )

        except Exception as e:
            logger.error(f"Error listing buckets: {str(e)}")
            raise

        logger.info(f"Returning {len(resources)} resources")
        return resources

    async def read_resource(self, uri: types.AnyUrl, **kwargs) -> ResourceContents:
        """
        Read content from an S3 resource and return structured response

        Returns:
            Dict containing 'contents' list with uri, mimeType, and text for each resource
        """
        uri_str = str(uri)
        logger.debug(f"Reading resource: {uri_str}")

        if not uri_str.startswith("s3://"):
            raise ValueError("Invalid S3 URI")

        # Parse the S3 URI
        path = uri_str[5:]  # Remove "s3://"
        path = unquote(path)  # Decode URL-encoded characters
        parts = path.split("/", 1)

        if len(parts) < 2:
            raise ValueError("Invalid S3 URI format")

        bucket = parts[0]
        key = parts[1]

        response = await self.storage.get_object(bucket, key)
        file_content = response["Body"]

        content_type = response.get("ContentType", "application/octet-stream")
        # 根据内容类型返回不同的响应
        if content_type.startswith("image/"):
            file_content = base64.b64encode(file_content).decode("utf-8")

        return [ReadResourceContents(mime_type=content_type, content=file_content)]


def register_resource_provider(storage: StorageService):
    resource_provider = _ResourceProvider(storage)
    resource.register_resource_provider(resource_provider)

```

--------------------------------------------------------------------------------
/src/mcp_server/core/cdn/tools.py:
--------------------------------------------------------------------------------

```python
from .cdn import CDNService
from ...consts import consts
from ...tools import tools
import logging
from mcp import types
from typing import Optional, List

logger = logging.getLogger(consts.LOGGER_NAME)


def _build_base_list(
    code: Optional[int],
    error: Optional[str],
    request_id: Optional[str],
) -> List[str]:
    rets = []
    if code:
        rets.append(f"Status Code: {code}")
    if error:
        rets.append(f"Message: {error}")
    if request_id:
        rets.append(f"RequestID: {request_id}")
    return rets


class _ToolImpl:
    def __init__(self, cdn: CDNService):
        self._cdn = cdn

    @tools.tool_meta(
        types.Tool(
            name="cdn_prefetch_urls",
            description="Newly added resources are proactively retrieved by the CDN and stored on its cache nodes in advance. Users simply submit the resource URLs, and the CDN automatically triggers the prefetch process.",
            inputSchema={
                "type": "object",
                "additionalProperties": False,
                "properties": {
                    "urls": {
                        "type": "array",
                        "description": "List of individual URLs to prefetch (max 60 items). Must be full URLs with protocol, e.g. 'http://example.com/file.zip'",
                        "items": {
                            "type": "string",
                            "format": "uri",
                            "pattern": "^https?://",
                            "examples": [
                                "https://cdn.example.com/images/photo.jpg",
                                "http://static.example.com/downloads/app.exe",
                            ],
                        },
                        "maxItems": 60,
                        "minItems": 1,
                    }
                },
                "required": ["urls"],
            },
        )
    )
    def prefetch_urls(self, **kwargs) -> list[types.TextContent]:
        ret = self._cdn.prefetch_urls(**kwargs)

        rets = _build_base_list(ret.code, ret.error, ret.requestId)
        if ret.invalidUrls:
            rets.append(f"Invalid URLs: {ret.invalidUrls}")
        if ret.code // 100 == 2:
            if ret.quotaDay is not None:
                rets.append(f"Today's prefetch quota: {ret.quotaDay}")
            if ret.surplusDay is not None:
                rets.append(f"Today's remaining quota: {ret.surplusDay}")

        return [
            types.TextContent(
                type="text",
                text="\n".join(rets),
            )
        ]

    @tools.tool_meta(
        types.Tool(
            name="cdn_refresh",
            description="This function marks resources cached on CDN nodes as expired. When users access these resources again, the CDN nodes will fetch the latest version from the origin server and store them anew.",
            inputSchema={
                "type": "object",
                "additionalProperties": False,  # 不允许出现未定义的属性
                "properties": {
                    "urls": {
                        "type": "array",
                        "items": {
                            "type": "string",
                            "format": "uri",
                            "pattern": "^https?://",  # 匹配http://或https://开头的URL
                            "examples": ["http://bar.foo.com/index.html"],
                        },
                        "maxItems": 60,
                        "description": "List of exact URLs to refresh (max 60 items). Must be full URLs with protocol, e.g. 'http://example.com/path/page.html'",
                    },
                    "dirs": {
                        "type": "array",
                        "items": {
                            "type": "string",
                            "pattern": "^https?://.*/(\\*|$)",  # 匹配以http://或https://开头的URL,并以/或者以/*结尾的字符串
                            "examples": [
                                "http://bar.foo.com/dir/",
                                "http://bar.foo.com/images/*",
                            ],
                        },
                        "maxItems": 10,
                        "description": "List of directory patterns to refresh (max 10 items). Must end with '/' or '/*' to indicate directory scope",
                    },
                }
            },
        )
    )
    def refresh(self, **kwargs) -> list[types.TextContent]:
        ret = self._cdn.refresh(**kwargs)
        rets = _build_base_list(ret.code, ret.error, ret.requestId)
        if ret.taskIds is not None:
            # 这个可能暂时用不到
            pass
        if ret.invalidUrls:
            rets.append(f"Invalid URLs list: {ret.invalidUrls}")
        if ret.invalidDirs:
            rets.append(f"Invalid dirs: {ret.invalidDirs}")

        if ret.code // 100 == 2:
            if ret.urlQuotaDay is not None:
                rets.append(f"Today's URL refresh quota: {ret.urlQuotaDay}")
            if ret.urlSurplusDay is not None:
                rets.append(f"Today's remaining URL refresh quota: {ret.urlSurplusDay}")
            if ret.dirQuotaDay is not None:
                rets.append(f"Today's directory refresh quota: {ret.dirQuotaDay}")
            if ret.dirSurplusDay is not None:
                rets.append(
                    f"Today's remaining directory refresh quota: {ret.dirSurplusDay}"
                )
        return [
            types.TextContent(
                type="text",
                text="\n".join(rets),
            )
        ]


def register_tools(cdn: CDNService):
    tool_impl = _ToolImpl(cdn)
    tools.auto_register_tools(
        [
            tool_impl.refresh,
            tool_impl.prefetch_urls,
        ]
    )

```

--------------------------------------------------------------------------------
/src/mcp_server/core/storage/storage.py:
--------------------------------------------------------------------------------

```python
import aioboto3
import logging
import qiniu

from typing import List, Dict, Any, Optional
from botocore.config import Config as S3Config

from ...config import config
from ...consts import consts

logger = logging.getLogger(consts.LOGGER_NAME)


class StorageService:
    def __init__(self, cfg: config.Config = None):
        # Configure boto3 with retries and timeouts
        self.s3_config = S3Config(
            retries=dict(max_attempts=2, mode="adaptive"),
            connect_timeout=30,
            read_timeout=60,
            max_pool_connections=50,
        )
        self.config = cfg
        self.s3_session = aioboto3.Session()
        self.auth = qiniu.Auth(cfg.access_key, cfg.secret_key)
        self.bucket_manager = qiniu.BucketManager(self.auth, preferred_scheme="https")

    def get_object_url(
            self, bucket: str, key: str, disable_ssl: bool = False, expires: int = 3600
    ) -> list[dict[str:Any]]:
        # 获取下载域名
        domains_getter = getattr(self.bucket_manager, "_BucketManager__uc_do_with_retrier")
        domains_list, domain_response = domains_getter('/v3/domains?tbl={0}'.format(bucket))
        if domain_response.status_code != 200:
            raise Exception(
                f"get bucket domain error:{domain_response.exception} reqId:{domain_response.req_id}"
            )

        if not domains_list or len(domains_list) == 0:
            raise Exception(
                f"get bucket domain error:domains_list is empty reqId:{domain_response.req_id}"
            )

        http_schema = "https" if not disable_ssl else "http"
        object_public_urls = []
        for domain in domains_list:
            # 被冻结
            freeze_types = domain.get("freeze_types")
            if freeze_types is not None:
                continue

            domain_url = domain.get("domain")
            if domain_url is None:
                continue

            object_public_urls.append({
                "object_url": f"{http_schema}://{domain_url}/{key}",
                "domain_type": "cdn" if domain.get("domaintype") is None or domain.get("domaintype") == 0 else "origin"
            })

        object_urls = []
        bucket_info, bucket_info_response = self.bucket_manager.bucket_info(bucket)
        if domain_response.status_code != 200:
            raise Exception(
                f"get bucket domain error:{bucket_info_response.exception} reqId:{bucket_info_response.req_id}"
            )
        if bucket_info["private"] != 0:
            for url_info in object_public_urls:
                public_url = url_info.get("object_url")
                if public_url is None:
                    continue
                url_info["object_url"] = self.auth.private_download_url(public_url, expires=expires)
                object_urls.append(url_info)
        else:
            for url_info in object_public_urls:
                object_urls.append(url_info)
        return object_urls

    async def list_buckets(self, prefix: Optional[str] = None) -> List[dict]:
        if not self.config.buckets or len(self.config.buckets) == 0:
            return []

        max_buckets = 50

        async with self.s3_session.client(
                "s3",
                aws_access_key_id=self.config.access_key,
                aws_secret_access_key=self.config.secret_key,
                endpoint_url=self.config.endpoint_url,
                region_name=self.config.region_name,
        ) as s3:
            # If buckets are configured, only return those
            response = await s3.list_buckets()
            all_buckets = response.get("Buckets", [])

            configured_bucket_list = [
                bucket
                for bucket in all_buckets
                if bucket["Name"] in self.config.buckets
            ]

            if prefix:
                configured_bucket_list = [
                    b for b in configured_bucket_list if b["Name"] > prefix
                ]

            return configured_bucket_list[:max_buckets]

    async def list_objects(
            self, bucket: str, prefix: str = "", max_keys: int = 20, start_after: str = ""
    ) -> List[dict]:
        if self.config.buckets and bucket not in self.config.buckets:
            logger.warning(f"Bucket {bucket} not in configured bucket list")
            return []

        if isinstance(max_keys, str):
            max_keys = int(max_keys)

        if max_keys > 100:
            max_keys = 100

        async with self.s3_session.client(
                "s3",
                aws_access_key_id=self.config.access_key,
                aws_secret_access_key=self.config.secret_key,
                endpoint_url=self.config.endpoint_url,
                region_name=self.config.region_name,
        ) as s3:
            response = await s3.list_objects_v2(
                Bucket=bucket,
                Prefix=prefix,
                MaxKeys=max_keys,
                StartAfter=start_after,
            )
            return response.get("Contents", [])

    async def get_object(self, bucket: str, key: str) -> Dict[str, Any]:
        if self.config.buckets and bucket not in self.config.buckets:
            logger.warning(f"Bucket {bucket} not in configured bucket list")
            return {}

        async with self.s3_session.client(
                "s3",
                aws_access_key_id=self.config.access_key,
                aws_secret_access_key=self.config.secret_key,
                endpoint_url=self.config.endpoint_url,
                region_name=self.config.region_name,
                config=self.s3_config,
        ) as s3:
            # Get the object and its stream
            response = await s3.get_object(Bucket=bucket, Key=key)
            stream = response["Body"]

            # Read the entire stream in chunks
            chunks = []
            async for chunk in stream:
                chunks.append(chunk)

            # Replace the stream with the complete data
            response["Body"] = b"".join(chunks)
            return response

    def upload_text_data(self, bucket: str, key: str, data: str, overwrite: bool = False) -> list[dict[str:Any]]:
        policy = {
            "insertOnly": 1,
        }

        if overwrite:
            policy["insertOnly"] = 0
            policy["scope"] = f"{bucket}:{key}"

        token = self.auth.upload_token(bucket=bucket, key=key, policy=policy)
        ret, info = qiniu.put_data(up_token=token, key=key, data=bytes(data, encoding="utf-8"))
        if info.status_code != 200:
            raise Exception(f"Failed to upload object: {info}")

        return self.get_object_url(bucket, key)

    def upload_local_file(self, bucket: str, key: str, file_path: str, overwrite: bool = False) -> list[dict[str:Any]]:
        policy = {
            "insertOnly": 1,
        }

        if overwrite:
            policy["insertOnly"] = 0
            policy["scope"] = f"{bucket}:{key}"

        token = self.auth.upload_token(bucket=bucket, key=key, policy=policy)
        ret, info = qiniu.put_file(up_token=token, key=key, file_path=file_path)
        if info.status_code != 200:
            raise Exception(f"Failed to upload object: {info}")

        return self.get_object_url(bucket, key)

    def fetch_object(self, bucket: str, key: str, url: str):
        ret, info = self.bucket_manager.fetch(url, bucket, key=key)
        if info.status_code != 200:
            raise Exception(f"Failed to fetch object: {info}")

        return self.get_object_url(bucket, key)

    def is_text_file(self, key: str) -> bool:
        text_extensions = {
            ".ini",
            ".conf",
            ".py",
            ".js",
            ".xml",
            ".yml",
            ".properties",
            ".txt",
            ".log",
            ".json",
            ".yaml",
            ".md",
            ".csv",
            ".html",
            ".css",
            ".sh",
            ".bash",
            ".cfg",
        }
        return any(key.lower().endswith(ext) for ext in text_extensions)


    def is_image_file(self, key: str) -> bool:
        """Determine if a file is text-based by its extension"""
        text_extensions = {
            ".gif",
            ".png",
            ".jpg",
            ".bmp",
            ".jpeg",
            ".tiff",
            ".webp",
            ".svg",
        }
        return any(key.lower().endswith(ext) for ext in text_extensions)


    def is_markdown_file(self, key: str) -> bool:
        text_extensions = {
            ".md",
        }
        return any(key.lower().endswith(ext) for ext in text_extensions)

```

--------------------------------------------------------------------------------
/src/mcp_server/core/storage/tools.py:
--------------------------------------------------------------------------------

```python
import logging
import base64

from mcp import types
from mcp.types import ImageContent, TextContent

from .storage import StorageService
from ...consts import consts
from ...tools import tools

logger = logging.getLogger(consts.LOGGER_NAME)

_BUCKET_DESC = "Qiniu Cloud Storage bucket Name"

class _ToolImpl:
    def __init__(self, storage: StorageService):
        self.storage = storage

    @tools.tool_meta(
        types.Tool(
            name="list_buckets",
            description="Return the Bucket you configured based on the conditions.",
            inputSchema={
                "type": "object",
                "properties": {
                    "prefix": {
                        "type": "string",
                        "description": "Bucket prefix. The listed Buckets will be filtered based on this prefix, and only those matching the prefix will be output.",
                    },
                },
                "required": [],
            },
        )
    )
    async def list_buckets(self, **kwargs) -> list[types.TextContent]:
        buckets = await self.storage.list_buckets(**kwargs)
        return [types.TextContent(type="text", text=str(buckets))]

    @tools.tool_meta(
        types.Tool(
            name="list_objects",
            description="List objects in Qiniu Cloud, list a part each time, you can set start_after to continue listing, when the number of listed objects is less than max_keys, it means that all files are listed. start_after can be the key of the last file in the previous listing.",
            inputSchema={
                "type": "object",
                "properties": {
                    "bucket": {
                        "type": "string",
                        "description": _BUCKET_DESC,
                    },
                    "max_keys": {
                        "type": "integer",
                        "description": "Sets the max number of keys returned, default: 20",
                    },
                    "prefix": {
                        "type": "string",
                        "description": "Specify the prefix of the operation response key. Only keys that meet this prefix will be listed.",
                    },
                    "start_after": {
                        "type": "string",
                        "description": "start_after is where you want Qiniu Cloud to start listing from. Qiniu Cloud starts listing after this specified key. start_after can be any key in the bucket.",
                    },
                },
                "required": ["bucket"],
            },
        )
    )
    async def list_objects(self, **kwargs) -> list[types.TextContent]:
        objects = await self.storage.list_objects(**kwargs)
        return [types.TextContent(type="text", text=str(objects))]

    @tools.tool_meta(
        types.Tool(
            name="get_object",
            description="Get an object contents from Qiniu Cloud bucket. In the GetObject request, specify the full key name for the object.",
            inputSchema={
                "type": "object",
                "properties": {
                    "bucket": {
                        "type": "string",
                        "description": _BUCKET_DESC,
                    },
                    "key": {
                        "type": "string",
                        "description": "Key of the object to get.",
                    },
                },
                "required": ["bucket", "key"],
            },
        )
    )
    async def get_object(self, **kwargs) -> list[ImageContent] | list[TextContent]:
        response = await self.storage.get_object(**kwargs)
        file_content = response["Body"]
        content_type = response.get("ContentType", "application/octet-stream")

        # 根据内容类型返回不同的响应
        if content_type.startswith("image/"):
            base64_data = base64.b64encode(file_content).decode("utf-8")
            return [
                types.ImageContent(
                    type="image", data=base64_data, mimeType=content_type
                )
            ]

        if isinstance(file_content, bytes):
            text_content = file_content.decode("utf-8")
        else:
            text_content = str(file_content)
        return [types.TextContent(type="text", text=text_content)]

    @tools.tool_meta(
        types.Tool(
            name="upload_text_data",
            description="Upload text data to Qiniu bucket.",
            inputSchema={
                "type": "object",
                "properties": {
                    "bucket": {
                        "type": "string",
                        "description": _BUCKET_DESC,
                    },
                    "key": {
                        "type": "string",
                        "description": "The key under which a file is saved in Qiniu Cloud Storage serves as the unique identifier for the file within that space, typically using the filename.",
                    },
                    "data": {
                        "type": "string",
                        "description": "The data to upload.",
                    },
                    "overwrite": {
                        "type": "boolean",
                        "description": "Whether to overwrite the existing object if it already exists.",
                    },
                },
                "required": ["bucket", "key", "data"],
            }
        )
    )
    def upload_text_data(self, **kwargs) -> list[types.TextContent]:
        urls = self.storage.upload_text_data(**kwargs)
        return [types.TextContent(type="text", text=str(urls))]

    @tools.tool_meta(
        types.Tool(
            name="upload_local_file",
            description="Upload a local file to Qiniu bucket.",
            inputSchema={
                "type": "object",
                "properties": {
                    "bucket": {
                        "type": "string",
                        "description": _BUCKET_DESC,
                    },
                    "key": {
                        "type": "string",
                        "description": "The key under which a file is saved in Qiniu Cloud Storage serves as the unique identifier for the file within that space, typically using the filename.",
                    },
                    "file_path": {
                        "type": "string",
                        "description": "The file path of file to upload.",
                    },
                    "overwrite": {
                        "type": "boolean",
                        "description": "Whether to overwrite the existing object if it already exists.",
                    },
                },
                "required": ["bucket", "key", "file_path"],
            }
        )
    )
    def upload_local_file(self, **kwargs) -> list[types.TextContent]:
        urls = self.storage.upload_local_file(**kwargs)
        return [types.TextContent(type="text", text=str(urls))]

    @tools.tool_meta(
        types.Tool(
            name="fetch_object",
            description="Fetch a http object to Qiniu bucket.",
            inputSchema={
                "type": "object",
                "properties": {
                    "bucket": {
                        "type": "string",
                        "description": _BUCKET_DESC,
                    },
                    "key": {
                        "type": "string",
                        "description": "The key under which a file is saved in Qiniu Cloud Storage serves as the unique identifier for the file within that space, typically using the filename.",
                    },
                    "url": {
                        "type": "string",
                        "description": "The URL of the object to fetch.",
                    },
                },
                "required": ["bucket", "key", "url"],
            }
        )
    )
    def fetch_object(self, **kwargs) -> list[types.TextContent]:
        urls = self.storage.fetch_object(**kwargs)
        return [types.TextContent(type="text", text=str(urls))]

    @tools.tool_meta(
        types.Tool(
            name="get_object_url",
            description="Get the file download URL, and note that the Bucket where the file is located must be bound to a domain name. If using Qiniu Cloud test domain, HTTPS access will not be available, and users need to make adjustments for this themselves.",
            inputSchema={
                "type": "object",
                "properties": {
                    "bucket": {
                        "type": "string",
                        "description": _BUCKET_DESC,
                    },
                    "key": {
                        "type": "string",
                        "description": "Key of the object to get.",
                    },
                    "disable_ssl": {
                        "type": "boolean",
                        "description": "Whether to disable SSL. By default, it is not disabled (HTTP protocol is used). If disabled, the HTTP protocol will be used.",
                    },
                    "expires": {
                        "type": "integer",
                        "description": "Token expiration time (in seconds) for download links. When the bucket is private, a signed Token is required to access file objects. Public buckets do not require Token signing.",
                    },
                },
                "required": ["bucket", "key"],
            },
        )
    )
    def get_object_url(self, **kwargs) -> list[types.TextContent]:
        urls = self.storage.get_object_url(**kwargs)
        return [types.TextContent(type="text", text=str(urls))]


def register_tools(storage: StorageService):
    tool_impl = _ToolImpl(storage)
    tools.auto_register_tools(
        [
            tool_impl.list_buckets,
            tool_impl.list_objects,
            tool_impl.get_object,
            tool_impl.upload_text_data,
            tool_impl.upload_local_file,
            tool_impl.get_object_url,
        ]
    )

```

--------------------------------------------------------------------------------
/src/mcp_server/core/media_processing/tools.py:
--------------------------------------------------------------------------------

```python
import logging

import qiniu
from mcp import types

from . import utils
from .processing import MediaProcessingService
from ...config import config
from ...consts import consts
from ...tools import tools

logger = logging.getLogger(consts.LOGGER_NAME)

_OBJECT_URL_DESC = "The URL of the image. This can be a URL obtained via the GetObjectURL tool or a URL generated by other Fop tools. Length Constraints: Minimum length of 1."


class _ToolImpl:
    def __init__(self, cfg: config.Config, cli: MediaProcessingService):
        self.auth = qiniu.Auth(cfg.access_key, cfg.secret_key)
        self.client = cli

    @tools.tool_meta(
        types.Tool(
            name="image_scale_by_percent",
            description="""Image scaling tool that resizes images based on a percentage and returns information about the scaled image.
            The information includes the object_url of the scaled image, which users can directly use for HTTP GET requests to retrieve the image content or open in a browser to view the file.
            The image must be stored in a Qiniu Cloud Bucket.
            Supported original image formats: psd, jpeg, png, gif, webp, tiff, bmp, avif, heic. Image width and height cannot exceed 30,000 pixels, and total pixels cannot exceed 150 million.
            """,
            inputSchema={
                "type": "object",
                "properties": {
                    "object_url": {
                        "type": "string", 
                        "description": _OBJECT_URL_DESC
                    },
                    "percent": {
                        "type": "integer",
                        "description": "Scaling percentage, range [1,999]. For example: 90 means the image width and height are reduced to 90% of the original; 200 means the width and height are enlarged to 200% of the original.",
                        "minimum": 1,
                        "maximum": 999
                    },
                },
                "required": ["object_url", "percent"],
            },
        )
    )
    def image_scale_by_percent(
            self, **kwargs
    ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
        object_url = kwargs.get("object_url", "")
        percent = kwargs.get("percent", "")
        if object_url is None or len(object_url) == 0:
            return [types.TextContent(type="text", text="object_url is required")]

        percent_int = int(percent)
        if percent_int < 1 or percent_int > 999:
            return [
                types.TextContent(type="text", text="percent must be between 1 and 999")
            ]

        func = f"imageMogr2/thumbnail/!{percent}p"
        object_url = utils.url_add_processing_func(auth=self.auth, url=object_url, func=func)
        return [
            types.TextContent(
                type="text",
                text=str(
                    {
                        "object_url": object_url,
                    }
                ),
            )
        ]

    @tools.tool_meta(
        types.Tool(
            name="image_scale_by_size",
            description="""Image scaling tool that resizes images based on a specified width or height and returns information about the scaled image.
            The information includes the object_url of the scaled image, which users can directly use for HTTP GET requests to retrieve the image content or open in a browser to view the file.
            The image must be stored in a Qiniu Cloud Bucket.
            Supported original image formats: psd, jpeg, png, gif, webp, tiff, bmp, avif, heic. Image width and height cannot exceed 30,000 pixels, and total pixels cannot exceed 150 million.
            """,
            inputSchema={
                "type": "object",
                "properties": {
                    "object_url": {
                        "type": "string", 
                        "description": _OBJECT_URL_DESC
                    },
                    "width": {
                        "type": "integer",
                        "description": "Specifies the width for image scaling. The image will be scaled to the specified width, and the height will be adjusted proportionally.",
                        "minimum": 1
                    },
                    "height": {
                        "type": "integer",
                        "description": "Specifies the height for image scaling. The image will be scaled to the specified height, and the width will be adjusted proportionally.",
                        "minimum": 1
                    },
                },
                "required": ["object_url"]
            },
        )
    )
    def image_scale_by_size(
            self, **kwargs
    ) -> list[types.TextContent]:
        object_url = kwargs.get("object_url", "")
        width = kwargs.get("width", "")
        height = kwargs.get("height", "")
        if object_url is None or len(object_url) == 0:
            return [types.TextContent(type="text", text="object_url is required")]

        func = f"{width}x{height}"
        if len(func) == 1:
            return [
                types.TextContent(
                    type="text", text="At least one width or height must be set"
                )
            ]

        func = f"imageMogr2/thumbnail/{func}"
        object_url = utils.url_add_processing_func(auth=self.auth, url=object_url, func=func)
        return [
            types.TextContent(
                type="text",
                text=str(
                    {
                        "object_url": object_url,
                    }
                ),
            )
        ]

    @tools.tool_meta(
        types.Tool(
            name="image_round_corner",
            description="""Image rounded corner tool that processes images based on width, height, and corner radius, returning information about the processed image.
            If only radius_x or radius_y is set, the other parameter will be assigned the same value, meaning horizontal and vertical parameters will be identical.
            The information includes the object_url of the processed image, which users can directly use for HTTP GET requests to retrieve the image content or open in a browser to view the file.
            The image must be stored in a Qiniu Cloud Bucket.
            Supported original image formats: psd, jpeg, png, gif, webp, tiff, bmp, avif, heic. Image width and height cannot exceed 30,000 pixels, and total pixels cannot exceed 150 million.
            Corner radius supports pixels and percentages, but cannot be negative. Pixels are represented by numbers, e.g., 200 means 200px; percentages use !xp, e.g., !25p means 25%.""",
            inputSchema={
                "type": "object",
                "properties": {
                    "object_url": {
                        "type": "string",
                        "description": _OBJECT_URL_DESC
                    },
                    "radius_x": {
                        "type": "string",
                        "description": "Parameter for horizontal corner size. Can use: pixel values (e.g., 200 for 200px) or percentages (e.g., !25p for 25%), all non-negative values."
                    },
                    "radius_y": {
                        "type": "string",
                        "description": "Parameter for vertical corner size. Can use: pixel values (e.g., 200 for 200px) or percentages (e.g., !25p for 25%), all non-negative values."
                    },
                },
                "required": ["object_url"],
            }
        )
    )
    def image_round_corner(self, **kwargs) -> list[types.TextContent]:
        object_url = kwargs.get("object_url", "")
        radius_x = kwargs.get("radius_x", "")
        radius_y = kwargs.get("radius_y", "")
        if object_url is None or len(object_url) == 0:
            return [
                types.TextContent(
                    type="text",
                    text="object_url is required"
                )
            ]

        if (radius_x is None or len(radius_x) == 0) and (radius_y is None or len(radius_y) == 0) is None:
            return [
                types.TextContent(
                    type="text",
                    text="At least one of radius_x or radius_y must be set"
                )
            ]

        if radius_x is None or len(radius_x) == 0:
            radius_x = radius_y
        elif radius_y is None or len(radius_y) == 0:
            radius_y = radius_x

        func = f"roundPic/radiusx/{radius_x}/radiusy/{radius_y}"
        object_url = utils.url_add_processing_func(auth=self.auth, url=object_url, func=func)
        return [
            types.TextContent(
                type="text",
                text=str({
                    "object_url": object_url,
                })
            )
        ]

    @tools.tool_meta(
        types.Tool(
            name="image_info",
            description="Retrieves basic image information, including image format, size, and color model.",
            inputSchema={
                "type": "object",
                "properties": {
                    "object_url": {
                        "type": "string",
                        "description": _OBJECT_URL_DESC
                    },
                },
                "required": ["object_url"],
            },
        )
    )
    def image_info(self, **kwargs) -> list[types.TextContent]:
        object_url = kwargs.get("object_url", "")
        if object_url is None or len(object_url) == 0:
            return [
                types.TextContent(
                    type="text",
                    text="object_url is required"
                )
            ]

        func = "imageInfo"
        object_url = utils.url_add_processing_func(auth=self.auth, url=object_url, func=func)
        return [
            types.TextContent(
                type="text",
                text=str({
                    "object_url": object_url,
                })
            )
        ]

    @tools.tool_meta(
        types.Tool(
            name="get_fop_status",
            description="Retrieves the execution status of a Fop operation.",
            inputSchema={
                "type": "object",
                "properties": {
                    "persistent_id": {
                        "type": "string",
                        "description": "Operation ID returned from executing a Fop operation",
                    },
                },
                "required": ["persistent_id"],
            },
        )
    )
    def get_fop_status(self, **kwargs) -> list[types.TextContent]:
        status = self.client.get_fop_status(**kwargs)
        return [types.TextContent(type="text", text=str(status))]


def register_tools(cfg: config.Config, cli: MediaProcessingService):
    tool_impl = _ToolImpl(cfg, cli)
    tools.auto_register_tools(
        [
            tool_impl.image_scale_by_percent,
            tool_impl.image_scale_by_size,
            tool_impl.image_round_corner,
            tool_impl.image_info,
        ]
    )

```