#
tokens: 3983/50000 7/7 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── Dockerfile
├── images
│   ├── cursor_config.png
│   ├── example.png
│   ├── logo_0.png
│   ├── logo_1.png
│   ├── logo_2.png
│   └── logo_3.png
├── LICENSE
├── proxy
│   ├── example.py
│   ├── jimeng
│   │   ├── __init__.py
│   │   ├── chat.py
│   │   ├── core.py
│   │   ├── exceptions.py
│   │   ├── images.py
│   │   ├── requirements.txt
│   │   └── utils.py
│   ├── README.md
│   └── setup.py
├── README.md
├── requirements.txt
├── server.py
└── smithery.yaml
```

# Files

--------------------------------------------------------------------------------
/proxy/jimeng/requirements.txt:
--------------------------------------------------------------------------------

```
requests>=2.31.0 
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
aiohttp>=3.8.1
aiofiles>=0.8.0
requests>=2.26.0
psutil>=5.8.0
fastmcp==0.4.1
mcp==1.2.1
brotlipy>=0.7.0
brotli==1.1.0

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml

startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    {}
  commandFunction:
    # A JS function that produces the CLI command based on the given config to start the MCP on stdio.
    |-
    (config) => ({ command: 'uv', args: ['run', '--with', 'fastmcp', 'fastmcp', 'run', 'server.py'] })
  exampleConfig: {}

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
FROM python:3.10-slim

WORKDIR /app

# Copy dependency list and install packages
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt \
    && pip install uv

# Copy the rest of the application code
COPY . .

# Expose any needed ports (if applicable)
EXPOSE 8000

# Start the MCP server using uv and fastmcp
CMD ["uv", "run", "--with", "fastmcp", "fastmcp", "run", "server.py"]

```

--------------------------------------------------------------------------------
/proxy/setup.py:
--------------------------------------------------------------------------------

```python
#作者:凌封 (微信fengin)
#GITHUB: https://github.com/fengin/image-gen-server.git
#相关知识可以看AI全书:https://aibook.ren 

from setuptools import setup, find_packages

setup(
    name="jimeng",
    version="0.0.1",
    packages=find_packages(),
    install_requires=[
        "requests>=2.31.0"
    ],
    author="Your Name",
    author_email="[email protected]",
    description="即梦AI Python模块",
    long_description=open("jimeng/README.md", encoding="utf-8").read(),
    long_description_content_type="text/markdown",
    url="https://github.com/yourusername/jimeng",
    classifiers=[
        "Programming Language :: Python :: 3",
        "License :: OSI Approved :: MIT License",
        "Operating System :: OS Independent",
    ],
    python_requires=">=3.7",
) 
```

--------------------------------------------------------------------------------
/proxy/jimeng/core.py:
--------------------------------------------------------------------------------

```python
# 作者:凌封 (微信fengin)
# GITHUB: https://github.com/fengin/image-gen-server.git
# 相关知识可以看AI全书:https://aibook.ren


"""核心功能实现"""

import json
from typing import Any, Dict, Optional, Union
import requests
import logging

from . import utils
from .exceptions import API_REQUEST_FAILED, API_IMAGE_GENERATION_INSUFFICIENT_POINTS

import gzip
import brotli
import json
from io import BytesIO

# 常量定义
MODEL_NAME = "jimeng"
DEFAULT_ASSISTANT_ID = "513695"
VERSION_CODE = "5.8.0"
PLATFORM_CODE = "7"
DEVICE_ID = utils.generate_device_id()
WEB_ID = utils.generate_web_id()
USER_ID = utils.generate_uuid(False)
MAX_RETRY_COUNT = 3
RETRY_DELAY = 5000
FILE_MAX_SIZE = 100 * 1024 * 1024

# 请求头
FAKE_HEADERS = {
    "Accept": "application/json, text/plain, */*",
    "Accept-Encoding": "gzip, deflate, br, zstd",
    "Accept-language": "zh-CN,zh;q=0.9",
    "Cache-control": "no-cache",
    "Last-event-id": "undefined",
    "Appid": DEFAULT_ASSISTANT_ID,
    "Appvr": VERSION_CODE,
    "Origin": "https://jimeng.jianying.com",
    "Pragma": "no-cache",
    "Priority": "u=1, i",
    "Referer": "https://jimeng.jianying.com",
    "Pf": PLATFORM_CODE,
    "Sec-Ch-Ua": '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
    "Sec-Ch-Ua-Mobile": "?0",
    "Sec-Ch-Ua-Platform": '"Windows"',
    "Sec-Fetch-Dest": "empty",
    "Sec-Fetch-Mode": "cors",
    "Sec-Fetch-Site": "same-origin",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
}


def acquire_token(refresh_token: str) -> str:
    """获取访问token

    目前jimeng的access_token是固定的,暂无刷新功能

    Args:
        refresh_token: 用于刷新access_token的refresh_token

    Returns:
        str: access_token
    """
    return refresh_token


def generate_cookie(token: str) -> str:
    """生成Cookie

    Args:
        token: 访问token

    Returns:
        str: Cookie字符串
    """
    return f"sessionid={token}; sessionid_ss={token}; sid_tt={token}; uid_tt={token}; uid_tt_ss={token}"


def check_result(response: requests.Response) -> Dict[str, Any]:
    """检查请求结果

    Args:
        response: 请求响应

    Returns:
        Dict: 响应数据

    Raises:
        API_IMAGE_GENERATION_INSUFFICIENT_POINTS: 积分不足
        API_REQUEST_FAILED: 请求失败
    """
    result = response.json()
    ret, errmsg, data = result.get('ret'), result.get('errmsg'), result.get('data')

    if not utils.is_finite(ret):
        return result

    if ret == '0':
        return data

    if ret == '5000':
        raise API_IMAGE_GENERATION_INSUFFICIENT_POINTS(f"即梦积分可能不足,{errmsg}")

    raise API_REQUEST_FAILED(f"请求jimeng失败: {errmsg}")


def request(
    method: str,
    uri: str,
    refresh_token: str,
    params: Optional[Dict] = None,
    data: Optional[Dict] = None,
    headers: Optional[Dict] = None,
    **kwargs
) -> Dict[str, Any]:
    """请求即梦API

    Args:
        method: 请求方法
        uri: 请求路径
        refresh_token: 刷新token
        params: URL参数
        data: 请求数据
        headers: 请求头
        **kwargs: 其他参数

    Returns:
        Dict: 响应数据
    """
    token = acquire_token(refresh_token)
    device_time = utils.get_timestamp()
    sign = utils.md5(f"9e2c|{uri[-7:]}|{PLATFORM_CODE}|{VERSION_CODE}|{device_time}||11ac")

    _headers = {
        **FAKE_HEADERS,
        "Cookie": generate_cookie(token),
        "Device-Time": str(device_time),
        "Sign": sign,
        "Sign-Ver": "1"
    }
    if headers:
        _headers.update(headers)

    _params = {
        "aid": DEFAULT_ASSISTANT_ID,
        "device_platform": "web",
        "region": "CN",
        "web_id": WEB_ID
    }
    if params:
        _params.update(params)

    response = requests.request(
        method=method.lower(),
        url=f"https://jimeng.jianying.com{uri}",
        params=_params,
        json=data,
        headers=_headers,
        timeout=15,
        verify=True,
        **kwargs
    )

    # 检查响应
    try:
        logging.debug(f'请求uri:{uri},响应状态:{response.status_code}')
        # 检查Content-Encoding
        logging.debug(f'请求uri:{uri},响应状态:{response.status_code}')
        # 检查Content-Encoding并解压
        try:
            content = decompress_response(response)
            logging.debug(f'响应结果:{content}')
        except Exception as e:
            logging.debug(f'解压失败,使用原始响应: {str(e)}')
            content = response.text
            logging.debug(f'响应结果:{content}')
        # result = response.json()
        result = json.loads(content)
    except:
        raise API_REQUEST_FAILED("响应格式错误")

    ret = result.get('ret')
    if ret is None:
        return result

    if str(ret) == '0':
        return result.get('data', {})

    if str(ret) == '5000':
        raise API_IMAGE_GENERATION_INSUFFICIENT_POINTS(f"[无法生成图像]: 即梦积分可能不足,{result.get('errmsg')}")

    raise API_REQUEST_FAILED(f"[请求jimeng失败]: {result.get('errmsg')}")


def decompress_response(response: requests.Response) -> str:
    """解压响应内容

    Args:
        response: 请求响应

    Returns:
        str: 解压后的内容
    """
    content = response.content
    encoding = response.headers.get('Content-Encoding', '').lower()

    if encoding == 'gzip':
        buffer = BytesIO(content)
        with gzip.GzipFile(fileobj=buffer) as f:
            content = f.read()
    elif encoding == 'br':
        content = brotli.decompress(content)
    # 如果之后需要支持其他压缩格式(如zstd),可以在这里添加

    return content.decode('utf-8')
```

--------------------------------------------------------------------------------
/proxy/jimeng/chat.py:
--------------------------------------------------------------------------------

```python
#作者:凌封 (微信fengin)
#GITHUB: https://github.com/fengin/image-gen-server.git
#相关知识可以看AI全书:https://aibook.ren 


"""对话补全相关功能"""

import re
import time
from typing import Dict, List, Optional, Union, Generator
import random

from . import utils
from .images import generate_images, DEFAULT_MODEL
from .exceptions import API_REQUEST_PARAMS_INVALID

MAX_RETRY_COUNT = 3
RETRY_DELAY = 5000

def parse_model(model: str) -> Dict[str, Union[str, int]]:
    """解析模型参数
    
    Args:
        model: 模型名称
        
    Returns:
        Dict: 模型信息
    """
    model_name, size = model.split(':') if ':' in model else (model, None)
    if not size:
        return {
            'model': model_name,
            'width': 1024,
            'height': 1024
        }
        
    match = re.search(r'(\d+)[\W\w](\d+)', size)
    if not match:
        return {
            'model': model_name,
            'width': 1024,
            'height': 1024
        }
        
    width, height = match.groups()
    return {
        'model': model_name,
        'width': int((int(width) + 1) // 2 * 2),  # 确保是偶数
        'height': int((int(height) + 1) // 2 * 2)  # 确保是偶数
    }

async def create_completion(
    messages: List[Dict[str, str]],
    refresh_token: str,
    model: str = DEFAULT_MODEL,
    retry_count: int = 0
) -> Dict:
    """同步对话补全
    
    Args:
        messages: 消息列表
        refresh_token: 刷新token
        model: 模型名称
        retry_count: 重试次数
        
    Returns:
        Dict: 补全结果
        
    Raises:
        API_REQUEST_PARAMS_INVALID: 参数无效
    """
    try:
        if not messages:
            raise API_REQUEST_PARAMS_INVALID("消息不能为空")
            
        # 解析模型参数
        model_info = parse_model(model)
        
        # 生成图像
        image_urls = generate_images(
            model=model_info['model'],
            prompt=messages[-1]['content'],
            width=model_info['width'],
            height=model_info['height'],
            refresh_token=refresh_token
        )
        
        # 构造返回结果
        return {
            'id': utils.generate_uuid(),
            'model': model or model_info['model'],
            'object': 'chat.completion',
            'choices': [{
                'index': 0,
                'message': {
                    'role': 'assistant',
                    'content': ''.join(f'![image_{i}]({url})\n' for i, url in enumerate(image_urls))
                },
                'finish_reason': 'stop'
            }],
            'usage': {
                'prompt_tokens': 1,
                'completion_tokens': 1,
                'total_tokens': 2
            },
            'created': utils.get_timestamp()
        }
    except Exception as e:
        if retry_count < MAX_RETRY_COUNT:
            print(f"Response error: {str(e)}")
            print(f"Try again after {RETRY_DELAY / 1000}s...")
            time.sleep(RETRY_DELAY / 1000)
            return await create_completion(messages, refresh_token, model, retry_count + 1)
        raise e

async def create_completion_stream(
    messages: List[Dict[str, str]],
    refresh_token: str,
    model: str = DEFAULT_MODEL,
    retry_count: int = 0
) -> Generator[Dict, None, None]:
    """流式对话补全
    
    Args:
        messages: 消息列表
        refresh_token: 刷新token
        model: 模型名称
        retry_count: 重试次数
        
    Yields:
        Dict: 补全结果片段
    """
    try:
        if not messages:
            yield {
                'id': utils.generate_uuid(),
                'model': model,
                'object': 'chat.completion.chunk',
                'choices': [{
                    'index': 0,
                    'delta': {'role': 'assistant', 'content': '消息为空'},
                    'finish_reason': 'stop'
                }]
            }
            return
            
        # 解析模型参数
        model_info = parse_model(model)
        
        # 发送开始生成消息
        yield {
            'id': utils.generate_uuid(),
            'model': model or model_info['model'],
            'object': 'chat.completion.chunk',
            'choices': [{
                'index': 0,
                'delta': {'role': 'assistant', 'content': '🎨 图像生成中,请稍候...'},
                'finish_reason': None
            }]
        }
        
        try:
            # 生成图像
            image_urls = generate_images(
                model=model_info['model'],
                prompt=messages[-1]['content'],
                width=model_info['width'],
                height=model_info['height'],
                refresh_token=refresh_token
            )
            
            # 发送图像URL
            for i, url in enumerate(image_urls):
                yield {
                    'id': utils.generate_uuid(),
                    'model': model or model_info['model'],
                    'object': 'chat.completion.chunk',
                    'choices': [{
                        'index': i + 1,
                        'delta': {
                            'role': 'assistant',
                            'content': f'![image_{i}]({url})\n'
                        },
                        'finish_reason': None if i < len(image_urls) - 1 else 'stop'
                    }]
                }
                
            # 发送完成消息
            yield {
                'id': utils.generate_uuid(),
                'model': model or model_info['model'],
                'object': 'chat.completion.chunk',
                'choices': [{
                    'index': len(image_urls) + 1,
                    'delta': {
                        'role': 'assistant',
                        'content': '图像生成完成!'
                    },
                    'finish_reason': 'stop'
                }]
            }
                
        except Exception as e:
            # 发送错误消息
            yield {
                'id': utils.generate_uuid(),
                'model': model or model_info['model'],
                'object': 'chat.completion.chunk',
                'choices': [{
                    'index': 1,
                    'delta': {
                        'role': 'assistant',
                        'content': f'生成图片失败: {str(e)}'
                    },
                    'finish_reason': 'stop'
                }]
            }
    except Exception as e:
        if retry_count < MAX_RETRY_COUNT:
            print(f"Response error: {str(e)}")
            print(f"Try again after {RETRY_DELAY / 1000}s...")
            time.sleep(RETRY_DELAY / 1000)
            async for chunk in create_completion_stream(messages, refresh_token, model, retry_count + 1):
                yield chunk
            return
        raise e 
```