# Directory Structure
```
├── .env.example
├── .github
│ └── workflows
│ ├── ruff.yml
│ └── workflow.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── chatmcp.yaml
├── Dockerfile
├── docs
│ └── images
│ └── logo.png
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│ └── mcp_server
│ ├── __init__.py
│ ├── application.py
│ ├── config
│ │ ├── __init__.py
│ │ └── config.py
│ ├── consts
│ │ ├── __init__.py
│ │ └── consts.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── cdn
│ │ │ ├── __init__.py
│ │ │ ├── cdn.py
│ │ │ └── tools.py
│ │ ├── media_processing
│ │ │ ├── __init__.py
│ │ │ ├── processing.py
│ │ │ ├── tools.py
│ │ │ └── utils.py
│ │ ├── storage
│ │ │ ├── __init__.py
│ │ │ ├── resource.py
│ │ │ ├── storage.py
│ │ │ └── tools.py
│ │ └── version
│ │ ├── __init__.py
│ │ ├── tools.py
│ │ └── version.py
│ ├── README.md
│ ├── resource
│ │ ├── __init__.py
│ │ └── resource.py
│ ├── server.py
│ └── tools
│ ├── __init__.py
│ └── tools.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.12
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
.env
.env.dora
.env.kodo
src/mcp_server/test.py
```
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
# S3/Kodo 认证信息
QINIU_ACCESS_KEY=your_access_key
QINIU_SECRET_KEY=your_secret_key
QINIU_REGION_NAME=your_region
QINIU_ENDPOINT_URL=endpoint_url # eg:https://s3.your_region.qiniucs.com
QINIU_BUCKETS=bucket1,bucket2,bucket3
```
--------------------------------------------------------------------------------
/.github/workflows/workflow.yml:
--------------------------------------------------------------------------------
```yaml
```
--------------------------------------------------------------------------------
/src/mcp_server/config/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mcp_server/consts/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mcp_server/resource/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mcp_server/tools/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mcp_server/core/version/version.py:
--------------------------------------------------------------------------------
```python
VERSION = '1.2.3'
```
--------------------------------------------------------------------------------
/src/mcp_server/consts/consts.py:
--------------------------------------------------------------------------------
```python
LOGGER_NAME = "qiniu-mcp"
```
--------------------------------------------------------------------------------
/src/mcp_server/core/version/__init__.py:
--------------------------------------------------------------------------------
```python
from .tools import register_tools
from ...config import config
def load(cfg: config.Config):
register_tools()
__all__ = ["load"]
```
--------------------------------------------------------------------------------
/.github/workflows/ruff.yml:
--------------------------------------------------------------------------------
```yaml
name: Ruff
on: [push, pull_request]
jobs:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/ruff-action@v3
```
--------------------------------------------------------------------------------
/src/mcp_server/core/cdn/__init__.py:
--------------------------------------------------------------------------------
```python
from .tools import register_tools
from ...config import config
from .cdn import CDNService
def load(cfg: config.Config):
cdn = CDNService(cfg)
register_tools(cdn)
__all__ = [
"load",
]
```
--------------------------------------------------------------------------------
/src/mcp_server/core/media_processing/__init__.py:
--------------------------------------------------------------------------------
```python
from . import processing
from .tools import register_tools
from ...config import config
def load(cfg: config.Config):
cli = processing.MediaProcessingService(cfg)
register_tools(cfg, cli)
__all__ = [
"load",
]
```
--------------------------------------------------------------------------------
/src/mcp_server/__init__.py:
--------------------------------------------------------------------------------
```python
import logging
from .consts import consts
from .server import main
# Configure logging
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(consts.LOGGER_NAME)
logger.info("Initializing MCP server package")
__all__ = ["main"]
```
--------------------------------------------------------------------------------
/src/mcp_server/core/storage/__init__.py:
--------------------------------------------------------------------------------
```python
from .storage import StorageService
from .tools import register_tools
from .resource import register_resource_provider
from ...config import config
def load(cfg: config.Config):
storage = StorageService(cfg)
register_tools(storage)
register_resource_provider(storage)
__all__ = ["load"]
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "qiniu-mcp-server"
version = "1.2.3"
description = "A MCP server project of Qiniu."
requires-python = ">=3.12"
authors = [
{ name = "Qiniu", email = "[email protected]" },
]
keywords = ["qiniu", "mcp", "llm"]
dependencies = [
"aioboto3>=13.2.0",
"fastjsonschema>=2.21.1",
"httpx>=0.28.1",
"mcp[cli]>=1.0.0",
"openai>=1.66.3",
"pip>=25.0.1",
"python-dotenv>=1.0.1",
"qiniu>=7.16.0",
]
[build-system]
requires = [ "hatchling",]
build-backend = "hatchling.build"
[project.scripts]
qiniu-mcp-server = "mcp_server:main"
[tool.hatch.build.targets.wheel]
packages = ["src/mcp_server"]
```
--------------------------------------------------------------------------------
/src/mcp_server/core/version/tools.py:
--------------------------------------------------------------------------------
```python
from mcp import types
from . import version
from ...tools import tools
class _ToolImpl:
def __init__(self):
pass
@tools.tool_meta(
types.Tool(
name="version",
description="qiniu mcp server version info.",
inputSchema={
"type": "object",
"required": [],
}
)
)
def version(self, **kwargs) -> list[types.TextContent]:
return [types.TextContent(type="text", text=version.VERSION)]
def register_tools():
tool_impl = _ToolImpl()
tools.auto_register_tools(
[
tool_impl.version,
]
)
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv
WORKDIR /app
ENV UV_COMPILE_BYTECODE=1
ENV UV_LINK_MODE=copy
RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=uv.lock,target=uv.lock \
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \
uv sync --frozen --no-install-project --no-dev --no-editable
ADD . /app
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-dev --no-editable
FROM python:3.12-slim-bookworm
WORKDIR /app
#COPY --from=uv /root/.local /root/.local
COPY --from=uv --chown=app:app /app/.venv /app/.venv
# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"
# when running the container, add --db-path and a bind mount to the host's db file
ENTRYPOINT ["qiniu-mcp-server"]
```
--------------------------------------------------------------------------------
/chatmcp.yaml:
--------------------------------------------------------------------------------
```yaml
params:
type: object
properties:
QINIU_ACCESS_KEY:
type: string
description: The access key for your Qiniu account.
QINIU_SECRET_KEY:
type: string
description: The secret key for your Qiniu account.
QINIU_REGION_NAME:
type: string
description: The region name for your config of Qiniu buckets.
QINIU_ENDPOINT_URL:
type: string
description: The endpoint URL for your config of Qiniu buckets. eg:https://s3.your_region.qiniucs.com.
QINIU_BUCKETS:
type: string
description: The buckets of Qiniu, If there are multiple extra items, separate them with commas. eg:bucket1,bucket2.
required:
- QINIU_ACCESS_KEY
- QINIU_SECRET_KEY
- QINIU_REGION_NAME
- QINIU_ENDPOINT_URL
- QINIU_BUCKETS
uvx:
command:
| uvx qiniu-mcp-server
config:
| {
"mcpServers": {
"qiniu-mcp-server": {
"command": "uvx",
"args": [
"qiniu-mcp-server"
],
"env": {
"QINIU_ACCESS_KEY": "YOUR QINIU ACCESS KEY",
"QINIU_SECRET_KEY": "YOUR QINIU SECRET KEY",
"QINIU_REGION_NAME": "YOUR QINIU REGION NAME",
"QINIU_ENDPOINT_URL": "YOUR QINIU ENDPOINT URL"
}
}
}
}
```
--------------------------------------------------------------------------------
/src/mcp_server/application.py:
--------------------------------------------------------------------------------
```python
import logging
from contextlib import aclosing
import mcp.types as types
from mcp.types import EmptyResult
from mcp import LoggingLevel
from mcp.server.lowlevel import Server
from mcp.types import Tool, AnyUrl
from . import core
from .consts import consts
from .resource import resource
from .tools import tools
logger = logging.getLogger(consts.LOGGER_NAME)
core.load()
server = Server("qiniu-mcp-server")
@server.set_logging_level()
async def set_logging_level(level: LoggingLevel) -> EmptyResult:
logger.setLevel(level.lower())
await server.request_context.session.send_log_message(
level="warning", data=f"Log level set to {level}", logger=consts.LOGGER_NAME
)
return EmptyResult()
@server.list_resources()
async def list_resources(**kwargs) -> list[types.Resource]:
resource_list = []
async with aclosing(resource.list_resources(**kwargs)) as results:
async for result in results:
resource_list.append(result)
return resource_list
@server.read_resource()
async def read_resource(uri: AnyUrl) -> str:
return await resource.read_resource(uri)
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
return tools.all_tools()
@server.call_tool()
async def call_tool(name: str, arguments: dict):
return await tools.call_tool(name, arguments)
```
--------------------------------------------------------------------------------
/src/mcp_server/core/media_processing/processing.py:
--------------------------------------------------------------------------------
```python
import qiniu
from ...config import config
class MediaProcessingService:
def __init__(self, cfg: config.Config):
self.cfg = cfg
self.auth = qiniu.Auth(cfg.access_key, cfg.secret_key)
def execute_fop(
self,
bucket: str,
key: str,
fops: str = None,
persistent_type: int = None,
workflow_template_id: str = None,
pipeline: str = None,
notify_url: str = None,
) -> dict:
"""
执行持久化处理
:param bucket:
:param key:
:param fops:
:param persistent_type:
:param workflow_template_id:
:param pipeline:
:param notify_url:
:return: 返回字典 dict
获取 persistentId key 为 persistentId
"""
persistent_fop = qiniu.PersistentFop(
auth=self.auth, bucket=bucket, pipeline=pipeline, notify_url=notify_url
)
result, info = persistent_fop.execute(
key=key,
fops=fops,
persistent_type=persistent_type,
workflow_template_id=workflow_template_id,
)
return result
def get_fop_status(self, persistent_id: str) -> dict:
"""
查询 fop 执行状态
:param persistent_id:
:return: dict
持久化处理的状态,详见 https://developer.qiniu.com/dora/1294/persistent-processing-status-query-prefop
"""
persistent_fop = qiniu.PersistentFop(auth=self.auth, bucket="")
result, info = persistent_fop.get_status(persistent_id=persistent_id)
return result
```
--------------------------------------------------------------------------------
/src/mcp_server/resource/resource.py:
--------------------------------------------------------------------------------
```python
import logging
from abc import abstractmethod
from typing import Dict, AsyncGenerator, Iterable
from mcp import types
from mcp.server.lowlevel.helper_types import ReadResourceContents
from ..consts import consts
logger = logging.getLogger(consts.LOGGER_NAME)
ResourceContents = str | bytes | Iterable[ReadResourceContents]
class ResourceProvider:
def __init__(self, scheme: str):
self.scheme = scheme
@abstractmethod
async def list_resources(self, **kwargs) -> list[types.Resource]:
pass
@abstractmethod
async def read_resource(self, uri: types.AnyUrl, **kwargs) -> ResourceContents:
pass
_all_resource_providers: Dict[str, ResourceProvider] = {}
async def list_resources(**kwargs) -> AsyncGenerator[types.Resource, None]:
if len(_all_resource_providers) == 0:
return
for provider in _all_resource_providers.values():
resources = await provider.list_resources(**kwargs)
for resource in resources:
yield resource
return
async def read_resource(uri: types.AnyUrl, **kwargs) -> ResourceContents:
if len(_all_resource_providers) == 0:
return ""
provider = _all_resource_providers.get(uri.scheme)
return await provider.read_resource(uri=uri, **kwargs)
def register_resource_provider(provider: ResourceProvider):
"""注册工具,禁止重复名称"""
name = provider.scheme
if name in _all_resource_providers:
raise ValueError(f"Resource Provider {name} already registered")
_all_resource_providers[name] = provider
__all__ = [
"ResourceContents",
"ResourceProvider",
"list_resources",
"read_resource",
"register_resource_provider",
]
```
--------------------------------------------------------------------------------
/src/mcp_server/server.py:
--------------------------------------------------------------------------------
```python
import asyncio
import logging
import anyio
import click
from . import application
from .consts import consts
logger = logging.getLogger(consts.LOGGER_NAME)
logger.info("Starting MCP server")
SAMPLE_RESOURCES = {
"greeting": "Hello! This is a MCP Server for Qiniu.",
"help": "This server provides a few resources and tools for Qiniu.",
"about": "This is the MCP server implementation.",
}
@click.command()
@click.option("--port", default=8000, help="Port to listen on for SSE")
@click.option(
"--transport",
type=click.Choice(["stdio", "sse"]),
default="stdio",
help="Transport type",
)
def main(port: int, transport: str) -> int:
app = application.server
if transport == "sse":
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Mount, Route
sse = SseServerTransport("/messages/")
async def handle_sse(request):
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await app.run(
streams[0], streams[1], app.create_initialization_options()
)
starlette_app = Starlette(
debug=True,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
import uvicorn
uvicorn.run(starlette_app, host="0.0.0.0", port=port)
else:
from mcp.server.stdio import stdio_server
async def arun():
async with stdio_server() as streams:
await app.run(
streams[0], streams[1], app.create_initialization_options()
)
anyio.run(arun)
return 0
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/src/mcp_server/config/config.py:
--------------------------------------------------------------------------------
```python
import logging
import os
from typing import List
from attr import dataclass
from dotenv import load_dotenv
from ..consts import consts
_CONFIG_ENV_KEY_ACCESS_KEY = "QINIU_ACCESS_KEY"
_CONFIG_ENV_KEY_SECRET_KEY = "QINIU_SECRET_KEY"
_CONFIG_ENV_KEY_ENDPOINT_URL = "QINIU_ENDPOINT_URL"
_CONFIG_ENV_KEY_REGION_NAME = "QINIU_REGION_NAME"
_CONFIG_ENV_KEY_BUCKETS = "QINIU_BUCKETS"
logger = logging.getLogger(consts.LOGGER_NAME)
# Load environment variables at package initialization
load_dotenv()
@dataclass
class Config:
access_key: str
secret_key: str
endpoint_url: str
region_name: str
buckets: List[str]
def load_config() -> Config:
config = Config(
access_key=os.getenv(_CONFIG_ENV_KEY_ACCESS_KEY),
secret_key=os.getenv(_CONFIG_ENV_KEY_SECRET_KEY),
endpoint_url=os.getenv(_CONFIG_ENV_KEY_ENDPOINT_URL),
region_name=os.getenv(_CONFIG_ENV_KEY_REGION_NAME),
buckets=_get_configured_buckets_from_env(),
)
if not config.access_key or len(config.access_key) == 0:
config.access_key = "YOUR_QINIU_ACCESS_KEY"
if not config.secret_key or len(config.access_key) == 0:
config.secret_key = "YOUR_QINIU_SECRET_KEY"
if not config.endpoint_url or len(config.access_key) == 0:
config.endpoint_url = "YOUR_QINIU_ENDPOINT_URL"
if not config.region_name or len(config.access_key) == 0:
config.region_name = "YOUR_QINIU_REGION_NAME"
logger.info(f"Configured access_key: {config.access_key}")
logger.info(f"Configured endpoint_url: {config.endpoint_url}")
logger.info(f"Configured region_name: {config.region_name}")
logger.info(f"Configured buckets: {config.buckets}")
return config
def _get_configured_buckets_from_env() -> List[str]:
bucket_list = os.getenv(_CONFIG_ENV_KEY_BUCKETS)
if bucket_list:
buckets = [b.strip() for b in bucket_list.split(",")]
return buckets
else:
return []
```
--------------------------------------------------------------------------------
/src/mcp_server/core/cdn/cdn.py:
--------------------------------------------------------------------------------
```python
import logging
from qiniu import CdnManager, Auth
from qiniu.http import ResponseInfo
from typing import List, Optional, Dict
from pydantic import BaseModel
from dataclasses import dataclass
from ...consts import consts
from ...config import config
logger = logging.getLogger(consts.LOGGER_NAME)
@dataclass
class PrefetchUrlsResult(BaseModel):
code: Optional[int] = None
error: Optional[str] = None
requestId: Optional[str] = None
invalidUrls: Optional[List[str]] = None
quotaDay: Optional[int] = None
surplusDay: Optional[int] = None
@dataclass
class RefreshResult(BaseModel):
code: Optional[int] = None
error: Optional[str] = None
requestId: Optional[str] = None
taskIds: Optional[Dict[str, str]] = None
invalidUrls: Optional[List[str]] = None
invalidDirs: Optional[List[str]] = None
urlQuotaDay: Optional[int] = None
urlSurplusDay: Optional[int] = None
dirQuotaDay: Optional[int] = None
dirSurplusDay: Optional[int] = None
def _raise_if_resp_error(resp: ResponseInfo):
if resp.ok():
return
raise RuntimeError(f"qiniu response error: {str(resp)}")
class CDNService:
def __init__(self, cfg: config.Config):
auth = Auth(access_key=cfg.access_key, secret_key=cfg.secret_key)
self._cdn_manager = CdnManager(auth)
def prefetch_urls(self, urls: List[str] = []) -> PrefetchUrlsResult:
if not urls:
raise ValueError("urls is empty")
info, resp = self._cdn_manager.prefetch_urls(urls)
_raise_if_resp_error(resp)
return PrefetchUrlsResult.model_validate(info)
def refresh(self, urls: List[str] = [], dirs: List[str] = []) -> RefreshResult:
if not urls and not dirs:
raise ValueError("urls and dirs cannot be empty")
info, resp = self._cdn_manager.refresh_urls_and_dirs(urls, dirs)
_raise_if_resp_error(resp)
return RefreshResult.model_validate(info)
__all__ = [
"PrefetchUrlsResult",
"RefreshResult",
"CDNService",
]
```
--------------------------------------------------------------------------------
/src/mcp_server/tools/tools.py:
--------------------------------------------------------------------------------
```python
import functools
import inspect
import asyncio
import logging
import fastjsonschema
from typing import List, Dict, Callable, Optional, Union, Awaitable
from dataclasses import dataclass
from mcp import types
from .. import consts
logger = logging.getLogger(consts.LOGGER_NAME)
ToolResult = list[types.TextContent | types.ImageContent | types.EmbeddedResource]
ToolFunc = Callable[..., ToolResult]
AsyncToolFunc = Callable[..., Awaitable[ToolResult]]
@dataclass
class _ToolEntry:
meta: types.Tool
func: Optional[ToolFunc]
async_func: Optional[AsyncToolFunc]
input_validator: Optional[Callable[..., None]]
# 初始化全局工具字典
_all_tools: Dict[str, _ToolEntry] = {}
def all_tools() -> List[types.Tool]:
"""获取所有工具"""
if not _all_tools:
raise ValueError("No tools registered")
return list(map(lambda x: x.meta, _all_tools.values()))
def register_tool(
meta: types.Tool,
func: Union[ToolFunc, AsyncToolFunc],
) -> None:
"""注册工具,禁止重复名称"""
name = meta.name
if name in _all_tools:
raise ValueError(f"Tool {name} already registered")
# 判断是否为异步函数
if inspect.iscoroutinefunction(func):
async_func = func
func = None
else:
async_func = None
entry = _ToolEntry(
meta=meta,
func=func,
async_func=async_func,
input_validator=fastjsonschema.compile(meta.inputSchema),
)
_all_tools[name] = entry
def tool_meta(meta: types.Tool):
def _add_metadata(**kwargs):
def decorator(func):
if inspect.iscoroutinefunction(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
return await func(*args, **kwargs)
wrapper = async_wrapper
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper = sync_wrapper
for key, value in kwargs.items():
setattr(wrapper, key, value)
return wrapper
return decorator
return _add_metadata(tool_meta=meta)
def auto_register_tools(func_list: list[Union[ToolFunc, AsyncToolFunc]]):
"""尝试自动注册带有 tool_meta 的工具"""
for func in func_list:
if hasattr(func, "tool_meta"):
meta = getattr(func, "tool_meta")
register_tool(meta=meta, func=func)
else:
raise ValueError("func must have tool_meta attribute")
async def call_tool(name: str, arguments: dict) -> ToolResult:
"""执行工具并处理异常"""
# 工具存在性校验
if (tool_entry := _all_tools.get(name)) is None:
raise ValueError(f"Tool {name} not found")
# 工具输入参数校验
# 把 None 移除否则校验不过
arguments = {k: v for k, v in arguments.items() if v is not None}
try:
tool_entry.input_validator(arguments)
except fastjsonschema.JsonSchemaException as e:
raise ValueError(f"Invalid arguments for tool {name}: {e}")
try:
if tool_entry.async_func is not None:
# 异步函数直接执行
result = await tool_entry.async_func(**arguments)
return result
elif tool_entry.func is not None:
# 同步函数需要到线程池中转化为异步函数执行
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor=None, # 使用全局线程池
func=lambda: tool_entry.func(**arguments),
)
return result
else:
raise ValueError(f"Unexpected tool entry: {tool_entry}")
except Exception as e:
raise RuntimeError(f"Tool {name} execution error: {str(e)}") from e
# 明确导出接口
__all__ = [
"all_tools",
"register_tool",
"call_tool",
"tool_meta",
"auto_register_tools",
]
```
--------------------------------------------------------------------------------
/src/mcp_server/core/storage/resource.py:
--------------------------------------------------------------------------------
```python
import asyncio
import logging
import base64
from mcp import types
from urllib.parse import unquote
from mcp.server.lowlevel.helper_types import ReadResourceContents
from .storage import StorageService
from ...consts import consts
from ...resource import resource
from ...resource.resource import ResourceContents
logger = logging.getLogger(consts.LOGGER_NAME)
class _ResourceProvider(resource.ResourceProvider):
def __init__(self, storage: StorageService):
super().__init__("s3")
self.storage = storage
async def list_resources(
self, prefix: str = "", max_keys: int = 20, **kwargs
) -> list[types.Resource]:
"""
List S3 buckets and their contents as resources with pagination
Args:
prefix: Prefix listing after this bucket name
max_keys: Returns the maximum number of keys (up to 100), default 20
"""
resources = []
logger.debug("Starting to list resources")
logger.debug(f"Configured buckets: {self.storage.config.buckets}")
try:
# Get limited number of buckets
buckets = await self.storage.list_buckets(prefix)
# limit concurrent operations
async def process_bucket(bucket):
bucket_name = bucket["Name"]
logger.debug(f"Processing bucket: {bucket_name}")
try:
# List objects in the bucket with a reasonable limit
objects = await self.storage.list_objects(
bucket_name, max_keys=max_keys
)
for obj in objects:
if "Key" in obj and not obj["Key"].endswith("/"):
object_key = obj["Key"]
if self.storage.is_markdown_file(object_key):
mime_type = "text/markdown"
elif self.storage.is_image_file(object_key):
mime_type = "image/png"
else:
mime_type = "text/plain"
resource_entry = types.Resource(
uri=f"s3://{bucket_name}/{object_key}",
name=object_key,
mimeType=mime_type,
description=str(obj),
)
resources.append(resource_entry)
logger.debug(f"Added resource: {resource_entry.uri}")
except Exception as e:
logger.error(
f"Error listing objects in bucket {bucket_name}: {str(e)}"
)
# Use semaphore to limit concurrent bucket processing
semaphore = asyncio.Semaphore(3) # Limit concurrent bucket processing
async def process_bucket_with_semaphore(bucket):
async with semaphore:
await process_bucket(bucket)
# Process buckets concurrently
await asyncio.gather(
*[process_bucket_with_semaphore(bucket) for bucket in buckets]
)
except Exception as e:
logger.error(f"Error listing buckets: {str(e)}")
raise
logger.info(f"Returning {len(resources)} resources")
return resources
async def read_resource(self, uri: types.AnyUrl, **kwargs) -> ResourceContents:
"""
Read content from an S3 resource and return structured response
Returns:
Dict containing 'contents' list with uri, mimeType, and text for each resource
"""
uri_str = str(uri)
logger.debug(f"Reading resource: {uri_str}")
if not uri_str.startswith("s3://"):
raise ValueError("Invalid S3 URI")
# Parse the S3 URI
path = uri_str[5:] # Remove "s3://"
path = unquote(path) # Decode URL-encoded characters
parts = path.split("/", 1)
if len(parts) < 2:
raise ValueError("Invalid S3 URI format")
bucket = parts[0]
key = parts[1]
response = await self.storage.get_object(bucket, key)
file_content = response["Body"]
content_type = response.get("ContentType", "application/octet-stream")
# 根据内容类型返回不同的响应
if content_type.startswith("image/"):
file_content = base64.b64encode(file_content).decode("utf-8")
return [ReadResourceContents(mime_type=content_type, content=file_content)]
def register_resource_provider(storage: StorageService):
resource_provider = _ResourceProvider(storage)
resource.register_resource_provider(resource_provider)
```
--------------------------------------------------------------------------------
/src/mcp_server/core/cdn/tools.py:
--------------------------------------------------------------------------------
```python
from .cdn import CDNService
from ...consts import consts
from ...tools import tools
import logging
from mcp import types
from typing import Optional, List
logger = logging.getLogger(consts.LOGGER_NAME)
def _build_base_list(
code: Optional[int],
error: Optional[str],
request_id: Optional[str],
) -> List[str]:
rets = []
if code:
rets.append(f"Status Code: {code}")
if error:
rets.append(f"Message: {error}")
if request_id:
rets.append(f"RequestID: {request_id}")
return rets
class _ToolImpl:
def __init__(self, cdn: CDNService):
self._cdn = cdn
@tools.tool_meta(
types.Tool(
name="cdn_prefetch_urls",
description="Newly added resources are proactively retrieved by the CDN and stored on its cache nodes in advance. Users simply submit the resource URLs, and the CDN automatically triggers the prefetch process.",
inputSchema={
"type": "object",
"additionalProperties": False,
"properties": {
"urls": {
"type": "array",
"description": "List of individual URLs to prefetch (max 60 items). Must be full URLs with protocol, e.g. 'http://example.com/file.zip'",
"items": {
"type": "string",
"format": "uri",
"pattern": "^https?://",
"examples": [
"https://cdn.example.com/images/photo.jpg",
"http://static.example.com/downloads/app.exe",
],
},
"maxItems": 60,
"minItems": 1,
}
},
"required": ["urls"],
},
)
)
def prefetch_urls(self, **kwargs) -> list[types.TextContent]:
ret = self._cdn.prefetch_urls(**kwargs)
rets = _build_base_list(ret.code, ret.error, ret.requestId)
if ret.invalidUrls:
rets.append(f"Invalid URLs: {ret.invalidUrls}")
if ret.code // 100 == 2:
if ret.quotaDay is not None:
rets.append(f"Today's prefetch quota: {ret.quotaDay}")
if ret.surplusDay is not None:
rets.append(f"Today's remaining quota: {ret.surplusDay}")
return [
types.TextContent(
type="text",
text="\n".join(rets),
)
]
@tools.tool_meta(
types.Tool(
name="cdn_refresh",
description="This function marks resources cached on CDN nodes as expired. When users access these resources again, the CDN nodes will fetch the latest version from the origin server and store them anew.",
inputSchema={
"type": "object",
"additionalProperties": False, # 不允许出现未定义的属性
"properties": {
"urls": {
"type": "array",
"items": {
"type": "string",
"format": "uri",
"pattern": "^https?://", # 匹配http://或https://开头的URL
"examples": ["http://bar.foo.com/index.html"],
},
"maxItems": 60,
"description": "List of exact URLs to refresh (max 60 items). Must be full URLs with protocol, e.g. 'http://example.com/path/page.html'",
},
"dirs": {
"type": "array",
"items": {
"type": "string",
"pattern": "^https?://.*/(\\*|$)", # 匹配以http://或https://开头的URL,并以/或者以/*结尾的字符串
"examples": [
"http://bar.foo.com/dir/",
"http://bar.foo.com/images/*",
],
},
"maxItems": 10,
"description": "List of directory patterns to refresh (max 10 items). Must end with '/' or '/*' to indicate directory scope",
},
}
},
)
)
def refresh(self, **kwargs) -> list[types.TextContent]:
ret = self._cdn.refresh(**kwargs)
rets = _build_base_list(ret.code, ret.error, ret.requestId)
if ret.taskIds is not None:
# 这个可能暂时用不到
pass
if ret.invalidUrls:
rets.append(f"Invalid URLs list: {ret.invalidUrls}")
if ret.invalidDirs:
rets.append(f"Invalid dirs: {ret.invalidDirs}")
if ret.code // 100 == 2:
if ret.urlQuotaDay is not None:
rets.append(f"Today's URL refresh quota: {ret.urlQuotaDay}")
if ret.urlSurplusDay is not None:
rets.append(f"Today's remaining URL refresh quota: {ret.urlSurplusDay}")
if ret.dirQuotaDay is not None:
rets.append(f"Today's directory refresh quota: {ret.dirQuotaDay}")
if ret.dirSurplusDay is not None:
rets.append(
f"Today's remaining directory refresh quota: {ret.dirSurplusDay}"
)
return [
types.TextContent(
type="text",
text="\n".join(rets),
)
]
def register_tools(cdn: CDNService):
tool_impl = _ToolImpl(cdn)
tools.auto_register_tools(
[
tool_impl.refresh,
tool_impl.prefetch_urls,
]
)
```
--------------------------------------------------------------------------------
/src/mcp_server/core/storage/storage.py:
--------------------------------------------------------------------------------
```python
import aioboto3
import logging
import qiniu
from typing import List, Dict, Any, Optional
from botocore.config import Config as S3Config
from ...config import config
from ...consts import consts
logger = logging.getLogger(consts.LOGGER_NAME)
class StorageService:
def __init__(self, cfg: config.Config = None):
# Configure boto3 with retries and timeouts
self.s3_config = S3Config(
retries=dict(max_attempts=2, mode="adaptive"),
connect_timeout=30,
read_timeout=60,
max_pool_connections=50,
)
self.config = cfg
self.s3_session = aioboto3.Session()
self.auth = qiniu.Auth(cfg.access_key, cfg.secret_key)
self.bucket_manager = qiniu.BucketManager(self.auth, preferred_scheme="https")
def get_object_url(
self, bucket: str, key: str, disable_ssl: bool = False, expires: int = 3600
) -> list[dict[str:Any]]:
# 获取下载域名
domains_getter = getattr(self.bucket_manager, "_BucketManager__uc_do_with_retrier")
domains_list, domain_response = domains_getter('/v3/domains?tbl={0}'.format(bucket))
if domain_response.status_code != 200:
raise Exception(
f"get bucket domain error:{domain_response.exception} reqId:{domain_response.req_id}"
)
if not domains_list or len(domains_list) == 0:
raise Exception(
f"get bucket domain error:domains_list is empty reqId:{domain_response.req_id}"
)
http_schema = "https" if not disable_ssl else "http"
object_public_urls = []
for domain in domains_list:
# 被冻结
freeze_types = domain.get("freeze_types")
if freeze_types is not None:
continue
domain_url = domain.get("domain")
if domain_url is None:
continue
object_public_urls.append({
"object_url": f"{http_schema}://{domain_url}/{key}",
"domain_type": "cdn" if domain.get("domaintype") is None or domain.get("domaintype") == 0 else "origin"
})
object_urls = []
bucket_info, bucket_info_response = self.bucket_manager.bucket_info(bucket)
if domain_response.status_code != 200:
raise Exception(
f"get bucket domain error:{bucket_info_response.exception} reqId:{bucket_info_response.req_id}"
)
if bucket_info["private"] != 0:
for url_info in object_public_urls:
public_url = url_info.get("object_url")
if public_url is None:
continue
url_info["object_url"] = self.auth.private_download_url(public_url, expires=expires)
object_urls.append(url_info)
else:
for url_info in object_public_urls:
object_urls.append(url_info)
return object_urls
async def list_buckets(self, prefix: Optional[str] = None) -> List[dict]:
if not self.config.buckets or len(self.config.buckets) == 0:
return []
max_buckets = 50
async with self.s3_session.client(
"s3",
aws_access_key_id=self.config.access_key,
aws_secret_access_key=self.config.secret_key,
endpoint_url=self.config.endpoint_url,
region_name=self.config.region_name,
) as s3:
# If buckets are configured, only return those
response = await s3.list_buckets()
all_buckets = response.get("Buckets", [])
configured_bucket_list = [
bucket
for bucket in all_buckets
if bucket["Name"] in self.config.buckets
]
if prefix:
configured_bucket_list = [
b for b in configured_bucket_list if b["Name"] > prefix
]
return configured_bucket_list[:max_buckets]
async def list_objects(
self, bucket: str, prefix: str = "", max_keys: int = 20, start_after: str = ""
) -> List[dict]:
if self.config.buckets and bucket not in self.config.buckets:
logger.warning(f"Bucket {bucket} not in configured bucket list")
return []
if isinstance(max_keys, str):
max_keys = int(max_keys)
if max_keys > 100:
max_keys = 100
async with self.s3_session.client(
"s3",
aws_access_key_id=self.config.access_key,
aws_secret_access_key=self.config.secret_key,
endpoint_url=self.config.endpoint_url,
region_name=self.config.region_name,
) as s3:
response = await s3.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
MaxKeys=max_keys,
StartAfter=start_after,
)
return response.get("Contents", [])
async def get_object(self, bucket: str, key: str) -> Dict[str, Any]:
if self.config.buckets and bucket not in self.config.buckets:
logger.warning(f"Bucket {bucket} not in configured bucket list")
return {}
async with self.s3_session.client(
"s3",
aws_access_key_id=self.config.access_key,
aws_secret_access_key=self.config.secret_key,
endpoint_url=self.config.endpoint_url,
region_name=self.config.region_name,
config=self.s3_config,
) as s3:
# Get the object and its stream
response = await s3.get_object(Bucket=bucket, Key=key)
stream = response["Body"]
# Read the entire stream in chunks
chunks = []
async for chunk in stream:
chunks.append(chunk)
# Replace the stream with the complete data
response["Body"] = b"".join(chunks)
return response
def upload_text_data(self, bucket: str, key: str, data: str, overwrite: bool = False) -> list[dict[str:Any]]:
policy = {
"insertOnly": 1,
}
if overwrite:
policy["insertOnly"] = 0
policy["scope"] = f"{bucket}:{key}"
token = self.auth.upload_token(bucket=bucket, key=key, policy=policy)
ret, info = qiniu.put_data(up_token=token, key=key, data=bytes(data, encoding="utf-8"))
if info.status_code != 200:
raise Exception(f"Failed to upload object: {info}")
return self.get_object_url(bucket, key)
def upload_local_file(self, bucket: str, key: str, file_path: str, overwrite: bool = False) -> list[dict[str:Any]]:
policy = {
"insertOnly": 1,
}
if overwrite:
policy["insertOnly"] = 0
policy["scope"] = f"{bucket}:{key}"
token = self.auth.upload_token(bucket=bucket, key=key, policy=policy)
ret, info = qiniu.put_file(up_token=token, key=key, file_path=file_path)
if info.status_code != 200:
raise Exception(f"Failed to upload object: {info}")
return self.get_object_url(bucket, key)
def fetch_object(self, bucket: str, key: str, url: str):
ret, info = self.bucket_manager.fetch(url, bucket, key=key)
if info.status_code != 200:
raise Exception(f"Failed to fetch object: {info}")
return self.get_object_url(bucket, key)
def is_text_file(self, key: str) -> bool:
text_extensions = {
".ini",
".conf",
".py",
".js",
".xml",
".yml",
".properties",
".txt",
".log",
".json",
".yaml",
".md",
".csv",
".html",
".css",
".sh",
".bash",
".cfg",
}
return any(key.lower().endswith(ext) for ext in text_extensions)
def is_image_file(self, key: str) -> bool:
"""Determine if a file is text-based by its extension"""
text_extensions = {
".gif",
".png",
".jpg",
".bmp",
".jpeg",
".tiff",
".webp",
".svg",
}
return any(key.lower().endswith(ext) for ext in text_extensions)
def is_markdown_file(self, key: str) -> bool:
text_extensions = {
".md",
}
return any(key.lower().endswith(ext) for ext in text_extensions)
```
--------------------------------------------------------------------------------
/src/mcp_server/core/storage/tools.py:
--------------------------------------------------------------------------------
```python
import logging
import base64
from mcp import types
from mcp.types import ImageContent, TextContent
from .storage import StorageService
from ...consts import consts
from ...tools import tools
logger = logging.getLogger(consts.LOGGER_NAME)
_BUCKET_DESC = "Qiniu Cloud Storage bucket Name"
class _ToolImpl:
def __init__(self, storage: StorageService):
self.storage = storage
@tools.tool_meta(
types.Tool(
name="list_buckets",
description="Return the Bucket you configured based on the conditions.",
inputSchema={
"type": "object",
"properties": {
"prefix": {
"type": "string",
"description": "Bucket prefix. The listed Buckets will be filtered based on this prefix, and only those matching the prefix will be output.",
},
},
"required": [],
},
)
)
async def list_buckets(self, **kwargs) -> list[types.TextContent]:
buckets = await self.storage.list_buckets(**kwargs)
return [types.TextContent(type="text", text=str(buckets))]
@tools.tool_meta(
types.Tool(
name="list_objects",
description="List objects in Qiniu Cloud, list a part each time, you can set start_after to continue listing, when the number of listed objects is less than max_keys, it means that all files are listed. start_after can be the key of the last file in the previous listing.",
inputSchema={
"type": "object",
"properties": {
"bucket": {
"type": "string",
"description": _BUCKET_DESC,
},
"max_keys": {
"type": "integer",
"description": "Sets the max number of keys returned, default: 20",
},
"prefix": {
"type": "string",
"description": "Specify the prefix of the operation response key. Only keys that meet this prefix will be listed.",
},
"start_after": {
"type": "string",
"description": "start_after is where you want Qiniu Cloud to start listing from. Qiniu Cloud starts listing after this specified key. start_after can be any key in the bucket.",
},
},
"required": ["bucket"],
},
)
)
async def list_objects(self, **kwargs) -> list[types.TextContent]:
objects = await self.storage.list_objects(**kwargs)
return [types.TextContent(type="text", text=str(objects))]
@tools.tool_meta(
types.Tool(
name="get_object",
description="Get an object contents from Qiniu Cloud bucket. In the GetObject request, specify the full key name for the object.",
inputSchema={
"type": "object",
"properties": {
"bucket": {
"type": "string",
"description": _BUCKET_DESC,
},
"key": {
"type": "string",
"description": "Key of the object to get.",
},
},
"required": ["bucket", "key"],
},
)
)
async def get_object(self, **kwargs) -> list[ImageContent] | list[TextContent]:
response = await self.storage.get_object(**kwargs)
file_content = response["Body"]
content_type = response.get("ContentType", "application/octet-stream")
# 根据内容类型返回不同的响应
if content_type.startswith("image/"):
base64_data = base64.b64encode(file_content).decode("utf-8")
return [
types.ImageContent(
type="image", data=base64_data, mimeType=content_type
)
]
if isinstance(file_content, bytes):
text_content = file_content.decode("utf-8")
else:
text_content = str(file_content)
return [types.TextContent(type="text", text=text_content)]
@tools.tool_meta(
types.Tool(
name="upload_text_data",
description="Upload text data to Qiniu bucket.",
inputSchema={
"type": "object",
"properties": {
"bucket": {
"type": "string",
"description": _BUCKET_DESC,
},
"key": {
"type": "string",
"description": "The key under which a file is saved in Qiniu Cloud Storage serves as the unique identifier for the file within that space, typically using the filename.",
},
"data": {
"type": "string",
"description": "The data to upload.",
},
"overwrite": {
"type": "boolean",
"description": "Whether to overwrite the existing object if it already exists.",
},
},
"required": ["bucket", "key", "data"],
}
)
)
def upload_text_data(self, **kwargs) -> list[types.TextContent]:
urls = self.storage.upload_text_data(**kwargs)
return [types.TextContent(type="text", text=str(urls))]
@tools.tool_meta(
types.Tool(
name="upload_local_file",
description="Upload a local file to Qiniu bucket.",
inputSchema={
"type": "object",
"properties": {
"bucket": {
"type": "string",
"description": _BUCKET_DESC,
},
"key": {
"type": "string",
"description": "The key under which a file is saved in Qiniu Cloud Storage serves as the unique identifier for the file within that space, typically using the filename.",
},
"file_path": {
"type": "string",
"description": "The file path of file to upload.",
},
"overwrite": {
"type": "boolean",
"description": "Whether to overwrite the existing object if it already exists.",
},
},
"required": ["bucket", "key", "file_path"],
}
)
)
def upload_local_file(self, **kwargs) -> list[types.TextContent]:
urls = self.storage.upload_local_file(**kwargs)
return [types.TextContent(type="text", text=str(urls))]
@tools.tool_meta(
types.Tool(
name="fetch_object",
description="Fetch a http object to Qiniu bucket.",
inputSchema={
"type": "object",
"properties": {
"bucket": {
"type": "string",
"description": _BUCKET_DESC,
},
"key": {
"type": "string",
"description": "The key under which a file is saved in Qiniu Cloud Storage serves as the unique identifier for the file within that space, typically using the filename.",
},
"url": {
"type": "string",
"description": "The URL of the object to fetch.",
},
},
"required": ["bucket", "key", "url"],
}
)
)
def fetch_object(self, **kwargs) -> list[types.TextContent]:
urls = self.storage.fetch_object(**kwargs)
return [types.TextContent(type="text", text=str(urls))]
@tools.tool_meta(
types.Tool(
name="get_object_url",
description="Get the file download URL, and note that the Bucket where the file is located must be bound to a domain name. If using Qiniu Cloud test domain, HTTPS access will not be available, and users need to make adjustments for this themselves.",
inputSchema={
"type": "object",
"properties": {
"bucket": {
"type": "string",
"description": _BUCKET_DESC,
},
"key": {
"type": "string",
"description": "Key of the object to get.",
},
"disable_ssl": {
"type": "boolean",
"description": "Whether to disable SSL. By default, it is not disabled (HTTP protocol is used). If disabled, the HTTP protocol will be used.",
},
"expires": {
"type": "integer",
"description": "Token expiration time (in seconds) for download links. When the bucket is private, a signed Token is required to access file objects. Public buckets do not require Token signing.",
},
},
"required": ["bucket", "key"],
},
)
)
def get_object_url(self, **kwargs) -> list[types.TextContent]:
urls = self.storage.get_object_url(**kwargs)
return [types.TextContent(type="text", text=str(urls))]
def register_tools(storage: StorageService):
tool_impl = _ToolImpl(storage)
tools.auto_register_tools(
[
tool_impl.list_buckets,
tool_impl.list_objects,
tool_impl.get_object,
tool_impl.upload_text_data,
tool_impl.upload_local_file,
tool_impl.get_object_url,
]
)
```
--------------------------------------------------------------------------------
/src/mcp_server/core/media_processing/tools.py:
--------------------------------------------------------------------------------
```python
import logging
import qiniu
from mcp import types
from . import utils
from .processing import MediaProcessingService
from ...config import config
from ...consts import consts
from ...tools import tools
logger = logging.getLogger(consts.LOGGER_NAME)
_OBJECT_URL_DESC = "The URL of the image. This can be a URL obtained via the GetObjectURL tool or a URL generated by other Fop tools. Length Constraints: Minimum length of 1."
class _ToolImpl:
def __init__(self, cfg: config.Config, cli: MediaProcessingService):
self.auth = qiniu.Auth(cfg.access_key, cfg.secret_key)
self.client = cli
@tools.tool_meta(
types.Tool(
name="image_scale_by_percent",
description="""Image scaling tool that resizes images based on a percentage and returns information about the scaled image.
The information includes the object_url of the scaled image, which users can directly use for HTTP GET requests to retrieve the image content or open in a browser to view the file.
The image must be stored in a Qiniu Cloud Bucket.
Supported original image formats: psd, jpeg, png, gif, webp, tiff, bmp, avif, heic. Image width and height cannot exceed 30,000 pixels, and total pixels cannot exceed 150 million.
""",
inputSchema={
"type": "object",
"properties": {
"object_url": {
"type": "string",
"description": _OBJECT_URL_DESC
},
"percent": {
"type": "integer",
"description": "Scaling percentage, range [1,999]. For example: 90 means the image width and height are reduced to 90% of the original; 200 means the width and height are enlarged to 200% of the original.",
"minimum": 1,
"maximum": 999
},
},
"required": ["object_url", "percent"],
},
)
)
def image_scale_by_percent(
self, **kwargs
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
object_url = kwargs.get("object_url", "")
percent = kwargs.get("percent", "")
if object_url is None or len(object_url) == 0:
return [types.TextContent(type="text", text="object_url is required")]
percent_int = int(percent)
if percent_int < 1 or percent_int > 999:
return [
types.TextContent(type="text", text="percent must be between 1 and 999")
]
func = f"imageMogr2/thumbnail/!{percent}p"
object_url = utils.url_add_processing_func(auth=self.auth, url=object_url, func=func)
return [
types.TextContent(
type="text",
text=str(
{
"object_url": object_url,
}
),
)
]
@tools.tool_meta(
types.Tool(
name="image_scale_by_size",
description="""Image scaling tool that resizes images based on a specified width or height and returns information about the scaled image.
The information includes the object_url of the scaled image, which users can directly use for HTTP GET requests to retrieve the image content or open in a browser to view the file.
The image must be stored in a Qiniu Cloud Bucket.
Supported original image formats: psd, jpeg, png, gif, webp, tiff, bmp, avif, heic. Image width and height cannot exceed 30,000 pixels, and total pixels cannot exceed 150 million.
""",
inputSchema={
"type": "object",
"properties": {
"object_url": {
"type": "string",
"description": _OBJECT_URL_DESC
},
"width": {
"type": "integer",
"description": "Specifies the width for image scaling. The image will be scaled to the specified width, and the height will be adjusted proportionally.",
"minimum": 1
},
"height": {
"type": "integer",
"description": "Specifies the height for image scaling. The image will be scaled to the specified height, and the width will be adjusted proportionally.",
"minimum": 1
},
},
"required": ["object_url"]
},
)
)
def image_scale_by_size(
self, **kwargs
) -> list[types.TextContent]:
object_url = kwargs.get("object_url", "")
width = kwargs.get("width", "")
height = kwargs.get("height", "")
if object_url is None or len(object_url) == 0:
return [types.TextContent(type="text", text="object_url is required")]
func = f"{width}x{height}"
if len(func) == 1:
return [
types.TextContent(
type="text", text="At least one width or height must be set"
)
]
func = f"imageMogr2/thumbnail/{func}"
object_url = utils.url_add_processing_func(auth=self.auth, url=object_url, func=func)
return [
types.TextContent(
type="text",
text=str(
{
"object_url": object_url,
}
),
)
]
@tools.tool_meta(
types.Tool(
name="image_round_corner",
description="""Image rounded corner tool that processes images based on width, height, and corner radius, returning information about the processed image.
If only radius_x or radius_y is set, the other parameter will be assigned the same value, meaning horizontal and vertical parameters will be identical.
The information includes the object_url of the processed image, which users can directly use for HTTP GET requests to retrieve the image content or open in a browser to view the file.
The image must be stored in a Qiniu Cloud Bucket.
Supported original image formats: psd, jpeg, png, gif, webp, tiff, bmp, avif, heic. Image width and height cannot exceed 30,000 pixels, and total pixels cannot exceed 150 million.
Corner radius supports pixels and percentages, but cannot be negative. Pixels are represented by numbers, e.g., 200 means 200px; percentages use !xp, e.g., !25p means 25%.""",
inputSchema={
"type": "object",
"properties": {
"object_url": {
"type": "string",
"description": _OBJECT_URL_DESC
},
"radius_x": {
"type": "string",
"description": "Parameter for horizontal corner size. Can use: pixel values (e.g., 200 for 200px) or percentages (e.g., !25p for 25%), all non-negative values."
},
"radius_y": {
"type": "string",
"description": "Parameter for vertical corner size. Can use: pixel values (e.g., 200 for 200px) or percentages (e.g., !25p for 25%), all non-negative values."
},
},
"required": ["object_url"],
}
)
)
def image_round_corner(self, **kwargs) -> list[types.TextContent]:
object_url = kwargs.get("object_url", "")
radius_x = kwargs.get("radius_x", "")
radius_y = kwargs.get("radius_y", "")
if object_url is None or len(object_url) == 0:
return [
types.TextContent(
type="text",
text="object_url is required"
)
]
if (radius_x is None or len(radius_x) == 0) and (radius_y is None or len(radius_y) == 0) is None:
return [
types.TextContent(
type="text",
text="At least one of radius_x or radius_y must be set"
)
]
if radius_x is None or len(radius_x) == 0:
radius_x = radius_y
elif radius_y is None or len(radius_y) == 0:
radius_y = radius_x
func = f"roundPic/radiusx/{radius_x}/radiusy/{radius_y}"
object_url = utils.url_add_processing_func(auth=self.auth, url=object_url, func=func)
return [
types.TextContent(
type="text",
text=str({
"object_url": object_url,
})
)
]
@tools.tool_meta(
types.Tool(
name="image_info",
description="Retrieves basic image information, including image format, size, and color model.",
inputSchema={
"type": "object",
"properties": {
"object_url": {
"type": "string",
"description": _OBJECT_URL_DESC
},
},
"required": ["object_url"],
},
)
)
def image_info(self, **kwargs) -> list[types.TextContent]:
object_url = kwargs.get("object_url", "")
if object_url is None or len(object_url) == 0:
return [
types.TextContent(
type="text",
text="object_url is required"
)
]
func = "imageInfo"
object_url = utils.url_add_processing_func(auth=self.auth, url=object_url, func=func)
return [
types.TextContent(
type="text",
text=str({
"object_url": object_url,
})
)
]
@tools.tool_meta(
types.Tool(
name="get_fop_status",
description="Retrieves the execution status of a Fop operation.",
inputSchema={
"type": "object",
"properties": {
"persistent_id": {
"type": "string",
"description": "Operation ID returned from executing a Fop operation",
},
},
"required": ["persistent_id"],
},
)
)
def get_fop_status(self, **kwargs) -> list[types.TextContent]:
status = self.client.get_fop_status(**kwargs)
return [types.TextContent(type="text", text=str(status))]
def register_tools(cfg: config.Config, cli: MediaProcessingService):
tool_impl = _ToolImpl(cfg, cli)
tools.auto_register_tools(
[
tool_impl.image_scale_by_percent,
tool_impl.image_scale_by_size,
tool_impl.image_round_corner,
tool_impl.image_info,
]
)
```