# Directory Structure ``` ├── .gitignore ├── pyproject.toml ├── README.md ├── src │ └── mcp_nano_banana │ ├── __init__.py │ ├── main.py │ └── test_gemini.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | vnev 2 | venv 3 | .env 4 | build 5 | 6 | mcp_image_generator.egg-info 7 | mcp_nano_banana.egg-info 8 | __pycache__ ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # MCP Nano Banana 2 | 3 | [](https://pypi.org/project/mcp-nano-banana/) 4 | 5 | This project is an MCP (Model Context Protocol) server that generates images using the Google Gemini API. 6 | 7 | ## Description 8 | 9 | This server implements the Model Context Protocol to expose a single tool, `generate_image`, to a compatible AI model. The tool accepts a text prompt, uses the Google Gemini API to generate an image, saves the image to the `public/` directory for auditing, and returns the raw image data as a base64-encoded string. 10 | 11 | ## To use the server with Claude Desktop or other applications 12 | 13 | You need a Google Gemini API key and ImgBB API key to use this server. 14 | 15 | Access https://api.imgbb.com/ to generate a IMGBB API Key. This is used to store and host the image online. 16 | 17 | ```json 18 | { 19 | "mcpServers": { 20 | "mcp-nano-banana": { 21 | "command": "uvx", 22 | "args": [ 23 | "mcp-nano-banana" 24 | ], 25 | "env": { 26 | "GEMINI_API_KEY": "YOUR_API_KEY_HERE", 27 | "IMGBB_API_KEY": "YOUR_API_KEY_HERE" 28 | } 29 | } 30 | } 31 | } 32 | ``` 33 | 34 | 35 | ## Dev Setup 36 | 37 | ### 1. Dependencies 38 | 39 | This project uses Python and its dependencies are defined in `pyproject.toml`. You can install them using `pip`: 40 | 41 | ```bash 42 | pip install . 43 | # Or 44 | uv sync 45 | ``` 46 | 47 | This will install `mcp`, `google-generativeai`, and other required packages. 48 | 49 | ### 2. API Key 50 | 51 | You need a Google Gemini API key and ImgBB API key to use this server. 52 | 53 | Access https://api.imgbb.com/ to generate a IMGBB API Key. This is used to store and host the image online. 54 | 55 | 1. Create a file named `.env` in the root of the project. 56 | 2. Add your API key to the `.env` file in the following format: 57 | 58 | ``` 59 | GEMINI_API_KEY="YOUR_API_KEY_HERE" 60 | IMGBB_API_KEY="YOUR_API_KEY_HERE" 61 | ``` 62 | 63 | ## Running the Server 64 | 65 | This server is designed to be run as a subprocess by an MCP client or using the `mcp` command-line tool. The server listens for requests on `stdio`. 66 | 67 | ```bash 68 | uvx --from git+https://github.com/GuilhermeAumo/mcp-nano-banana mcp-nano-banana 69 | ``` 70 | 71 | 72 | ## Publishing new pipy version 73 | To publish a new version of this package to PyPI: 74 | 75 | 1. **Update the version** 76 | Edit the `version` field in `pyproject.toml` to the new version number. 77 | 78 | 2. **Build the package** 79 | Run: 80 | ```bash 81 | uv build 82 | ``` 83 | This will create `.tar.gz` and `.whl` files in the `dist/` directory. 84 | 85 | 3. **Upload to PyPI** 86 | ```bash 87 | uv publish 88 | ``` 89 | 90 | 4. **Tag the release (optional but recommended)** 91 | Commit the changes to github first, then: 92 | 93 | ```bash 94 | git tag v<new-version> 95 | git push --tags 96 | ``` 97 | 98 | **Note:** 99 | - You need a PyPI account and must be listed as a maintainer of the project. 100 | 101 | For more details, see the [Python Packaging User Guide](https://packaging.python.org/tutorials/packaging-projects/). 102 | ``` -------------------------------------------------------------------------------- /src/mcp_nano_banana/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [project] 2 | name = "mcp-nano-banana" 3 | version = "0.2.1" 4 | requires-python = ">=3.10" 5 | description = "Um servidor MCP que gera imagens usando a Google Gemini API." 6 | readme = "README.md" 7 | dependencies = [ 8 | "mcp[cli]>=1.13", 9 | "google-generativeai>=0.5.4", 10 | "python-dotenv>=1.0.1", 11 | "pillow>=11.3.0", 12 | ] 13 | 14 | [project.urls] 15 | Homepage = "https://github.com/GuilhermeAumo/mcp-nano-banana" 16 | Repository = "https://github.com/GuilhermeAumo/mcp-nano-banana" 17 | 18 | [project.scripts] 19 | mcp-nano-banana = "mcp_nano_banana.main:main" 20 | 21 | [tool.setuptools.packages.find] 22 | where = ["src"] 23 | ``` -------------------------------------------------------------------------------- /src/mcp_nano_banana/test_gemini.py: -------------------------------------------------------------------------------- ```python 1 | import json 2 | import os 3 | import base64 4 | import google.generativeai as genai 5 | from dotenv import load_dotenv 6 | import requests 7 | 8 | load_dotenv() 9 | 10 | genai.configure(api_key=os.getenv('GEMINI_API_KEY')) 11 | model = genai.GenerativeModel('gemini-2.5-flash-image-preview') 12 | 13 | prompt = "Create nano-sized banana in a lab setting." 14 | response = model.generate_content([prompt]) 15 | response = response.to_dict() 16 | 17 | bytes_data = response["candidates"][0]["content"]["parts"][-1]["inline_data"]["data"] 18 | 19 | generated_img = base64.b64decode(bytes_data) 20 | with open('edited_nano_banana.png', 'wb') as out: 21 | out.write(generated_img) 22 | 23 | 24 | # Upload image to Imgbb.host 25 | # --- STEP 1: Ensure 'edited_nano_banana.png' exists and is a valid, non-empty image --- 26 | try: 27 | with open('edited_nano_banana.png', 'rb') as image_file: 28 | # Read the binary data and encode it to a Base64 string 29 | generated_img_b64 = base64.b64encode(image_file.read()).decode('utf-8') 30 | 31 | if not generated_img_b64: 32 | raise ValueError("The image file 'edited_nano_banana.png' is empty.") 33 | 34 | except FileNotFoundError: 35 | raise FileNotFoundError("The image file 'edited_nano_banana.png' was not found. Please ensure it exists.") 36 | 37 | 38 | # --- STEP 2: Build and send the correct POST request to ImgBB --- 39 | IMGBB_API_KEY = os.getenv("IMGBB_API_KEY") 40 | if not IMGBB_API_KEY: 41 | raise ValueError("IMGBB_API_KEY environment variable not set or .env file is missing.") 42 | 43 | upload_url = "https://api.imgbb.com/1/upload" 44 | 45 | # All parameters go into the 'data' payload for the POST request 46 | payload = { 47 | "key": IMGBB_API_KEY, 48 | "image": generated_img_b64, # The Base64 string is the 'image' field 49 | "name": "nano_banana.png" # Optional: specify a name for the file 50 | } 51 | 52 | try: 53 | print("Uploading image to ImgBB...") 54 | # Use the data= parameter, NOT files= 55 | resp = requests.post(upload_url, data=payload, timeout=60) # Increased timeout for larger files 56 | 57 | resp.raise_for_status() # Raise an error for bad status codes (4xx or 5xx) 58 | 59 | resp_json = resp.json() 60 | 61 | # ImgBB's success indicator is the 'data' key in the response 62 | if "data" not in resp_json: 63 | raise Exception(f"Imgbb upload failed. Response: {resp_json}") 64 | 65 | uploaded_url = resp_json["data"]["url"] 66 | print(f"Success! Image uploaded to {uploaded_url}") 67 | 68 | except requests.exceptions.HTTPError as err: 69 | print(f"HTTP error occurred: {err}") 70 | print(f"Response body: {err.response.text}") 71 | raise 72 | ``` -------------------------------------------------------------------------------- /src/mcp_nano_banana/main.py: -------------------------------------------------------------------------------- ```python 1 | import asyncio 2 | import logging 3 | import os 4 | import base64 5 | import uuid 6 | import json 7 | import httpx 8 | from dotenv import load_dotenv 9 | from mcp.server.fastmcp import FastMCP 10 | import google.generativeai as genai 11 | import requests 12 | from PIL import Image 13 | from io import BytesIO 14 | from urllib.parse import urlparse 15 | from typing import Dict, Any, Optional 16 | import re 17 | 18 | # Configure logging 19 | logging.basicConfig(level=logging.INFO) 20 | logger = logging.getLogger(__name__) 21 | 22 | # --- Error Handling Classes --- 23 | class ImageGenerationError(Exception): 24 | """Custom exception for image generation errors""" 25 | pass 26 | 27 | class ImageUploadError(Exception): 28 | """Custom exception for image upload errors""" 29 | pass 30 | 31 | class ValidationError(Exception): 32 | """Custom exception for input validation errors""" 33 | pass 34 | 35 | class APIError(Exception): 36 | """Custom exception for API-related errors""" 37 | pass 38 | 39 | # --- Utility Functions --- 40 | def validate_prompt(prompt: str) -> None: 41 | """Validate image generation prompt""" 42 | if not prompt or not isinstance(prompt, str): 43 | raise ValidationError("Prompt must be a non-empty string") 44 | 45 | if len(prompt.strip()) == 0: 46 | raise ValidationError("Prompt cannot be empty or only whitespace") 47 | 48 | if len(prompt) > 1000: 49 | raise ValidationError("Prompt is too long (maximum 1000 characters)") 50 | 51 | # Check for potentially problematic content 52 | if any(char in prompt for char in ['<', '>', '&', '"', "'"]): 53 | logger.warning("Prompt contains potentially problematic characters") 54 | 55 | def validate_image_url(url: str) -> None: 56 | """Validate image URL""" 57 | if not url or not isinstance(url, str): 58 | raise ValidationError("Image URL must be a non-empty string") 59 | 60 | try: 61 | parsed = urlparse(url) 62 | if not parsed.scheme or not parsed.netloc: 63 | raise ValidationError("Invalid URL format") 64 | 65 | if parsed.scheme not in ['http', 'https']: 66 | raise ValidationError("URL must use HTTP or HTTPS protocol") 67 | except Exception as e: 68 | raise ValidationError(f"Invalid URL format: {str(e)}") 69 | 70 | def validate_environment_variables() -> Dict[str, str]: 71 | """Validate required environment variables""" 72 | errors = [] 73 | env_vars = {} 74 | 75 | # Check GEMINI_API_KEY 76 | gemini_key = os.getenv("GEMINI_API_KEY") 77 | if not gemini_key: 78 | errors.append("GEMINI_API_KEY environment variable not set") 79 | elif not gemini_key.strip(): 80 | errors.append("GEMINI_API_KEY environment variable is empty") 81 | else: 82 | env_vars['GEMINI_API_KEY'] = gemini_key 83 | 84 | # Check IMGBB_API_KEY 85 | imgbb_key = os.getenv("IMGBB_API_KEY") 86 | if not imgbb_key: 87 | errors.append("IMGBB_API_KEY environment variable not set") 88 | elif not imgbb_key.strip(): 89 | errors.append("IMGBB_API_KEY environment variable is empty") 90 | else: 91 | env_vars['IMGBB_API_KEY'] = imgbb_key 92 | 93 | if errors: 94 | raise ValidationError(f"Environment validation failed: {'; '.join(errors)}") 95 | 96 | return env_vars 97 | 98 | def create_error_response(error_type: str, message: str, details: Optional[Dict[str, Any]] = None) -> str: 99 | """Create a standardized error response""" 100 | error_response = { 101 | "error": True, 102 | "error_type": error_type, 103 | "message": message, 104 | "timestamp": asyncio.get_event_loop().time() if asyncio.get_event_loop().is_running() else None 105 | } 106 | 107 | if details: 108 | error_response["details"] = details 109 | 110 | return json.dumps(error_response) 111 | 112 | def create_success_response(data: Any) -> str: 113 | """Create a standardized success response""" 114 | success_response = { 115 | "success": True, 116 | "data": data, 117 | "timestamp": asyncio.get_event_loop().time() if asyncio.get_event_loop().is_running() else None 118 | } 119 | return json.dumps(success_response) 120 | 121 | # --- MCP Server Setup --- 122 | # Create a FastMCP server instance 123 | mcp = FastMCP( 124 | name="image_generator_mcp_server", 125 | ) 126 | logger.info(f"MCP server '{mcp.name}' created.") 127 | 128 | 129 | # --- Tool Definition --- 130 | @mcp.tool( 131 | name="generate_image", 132 | description="Generates an image based on a text prompt using the Gemini API and returns the image as a url.", 133 | ) 134 | async def generate_image(prompt: str) -> str: 135 | """ 136 | Generates an image from a text prompt and returns the url of the image. 137 | """ 138 | try: 139 | # Input validation 140 | validate_prompt(prompt) 141 | 142 | # Environment validation 143 | env_vars = validate_environment_variables() 144 | 145 | logger.info(f"Tool 'generate_image' called with prompt: '{prompt}'") 146 | 147 | # Image generation with specific error handling 148 | try: 149 | model = genai.GenerativeModel('gemini-2.5-flash-image-preview') 150 | 151 | # Generate content with timeout handling 152 | response = await asyncio.wait_for( 153 | model.generate_content_async([f"Generate a high-quality, detailed image of: {prompt}"]), 154 | timeout=120 # 2 minute timeout for generation 155 | ) 156 | 157 | if not response: 158 | raise ImageGenerationError("Gemini API returned empty response") 159 | 160 | response_dict = response.to_dict() 161 | 162 | # Validate response structure 163 | if "candidates" not in response_dict: 164 | raise ImageGenerationError("Invalid response structure: missing 'candidates' field") 165 | 166 | if not response_dict["candidates"]: 167 | raise ImageGenerationError("No candidates returned from Gemini API") 168 | 169 | candidate = response_dict["candidates"][0] 170 | if "content" not in candidate: 171 | raise ImageGenerationError("Invalid candidate structure: missing 'content' field") 172 | 173 | if "parts" not in candidate["content"]: 174 | raise ImageGenerationError("Invalid content structure: missing 'parts' field") 175 | 176 | parts = candidate["content"]["parts"] 177 | if not parts: 178 | raise ImageGenerationError("No parts returned in content") 179 | 180 | last_part = parts[-1] 181 | if "inline_data" not in last_part: 182 | raise ImageGenerationError("Last part does not contain image data") 183 | 184 | if "data" not in last_part["inline_data"]: 185 | raise ImageGenerationError("Image data field is missing") 186 | 187 | image_data_base64 = last_part["inline_data"]["data"] 188 | 189 | # Validate base64 data 190 | if not image_data_base64: 191 | raise ImageGenerationError("Empty image data received") 192 | 193 | # Test if base64 is valid 194 | try: 195 | base64.b64decode(image_data_base64, validate=True) 196 | except Exception as e: 197 | raise ImageGenerationError(f"Invalid base64 image data: {str(e)}") 198 | 199 | except asyncio.TimeoutError: 200 | logger.error("Image generation timed out") 201 | return create_error_response( 202 | "timeout_error", 203 | "Image generation timed out after 2 minutes", 204 | {"timeout_seconds": 120} 205 | ) 206 | except genai.types.BlockedPromptException as e: 207 | logger.error(f"Prompt blocked by Gemini API: {e}") 208 | return create_error_response( 209 | "content_policy_error", 210 | "Prompt was blocked by content policy", 211 | {"blocked_reason": str(e)} 212 | ) 213 | except genai.types.StopCandidateException as e: 214 | logger.error(f"Generation stopped by Gemini API: {e}") 215 | return create_error_response( 216 | "generation_stopped_error", 217 | "Image generation was stopped by the API", 218 | {"stop_reason": str(e)} 219 | ) 220 | except genai.types.SafetySettingsException as e: 221 | logger.error(f"Safety settings violation: {e}") 222 | return create_error_response( 223 | "safety_violation_error", 224 | "Prompt violates safety settings", 225 | {"violation_details": str(e)} 226 | ) 227 | except genai.types.APIError as e: 228 | logger.error(f"Gemini API error: {e}") 229 | return create_error_response( 230 | "api_error", 231 | f"Gemini API error: {str(e)}", 232 | {"api_error_code": getattr(e, 'code', 'unknown')} 233 | ) 234 | except ImageGenerationError as e: 235 | logger.error(f"Image generation error: {e}") 236 | return create_error_response("image_generation_error", str(e)) 237 | except Exception as e: 238 | logger.exception(f"Unexpected error during image generation: {e}") 239 | return create_error_response( 240 | "unexpected_error", 241 | f"Unexpected error during image generation: {str(e)}" 242 | ) 243 | 244 | # Image upload with specific error handling 245 | try: 246 | upload_url = "https://api.imgbb.com/1/upload" 247 | 248 | # Validate image size (ImgBB has a 32MB limit) 249 | image_size = len(base64.b64decode(image_data_base64)) 250 | if image_size > 32 * 1024 * 1024: # 32MB 251 | raise ImageUploadError(f"Image too large: {image_size} bytes (max 32MB)") 252 | 253 | payload = { 254 | "key": env_vars['IMGBB_API_KEY'], 255 | "image": image_data_base64, 256 | "name": f"{uuid.uuid4()}" 257 | } 258 | 259 | # Upload with timeout and retry logic 260 | max_retries = 3 261 | for attempt in range(max_retries): 262 | try: 263 | resp = requests.post(upload_url, data=payload, timeout=60) 264 | resp.raise_for_status() 265 | break 266 | except requests.exceptions.Timeout: 267 | if attempt == max_retries - 1: 268 | raise ImageUploadError("Upload timed out after multiple attempts") 269 | logger.warning(f"Upload attempt {attempt + 1} timed out, retrying...") 270 | await asyncio.sleep(2 ** attempt) # Exponential backoff 271 | except requests.exceptions.ConnectionError as e: 272 | if attempt == max_retries - 1: 273 | raise ImageUploadError(f"Connection error during upload: {str(e)}") 274 | logger.warning(f"Connection error on attempt {attempt + 1}, retrying...") 275 | await asyncio.sleep(2 ** attempt) 276 | 277 | resp_json = resp.json() 278 | 279 | # Validate ImgBB response 280 | if "data" not in resp_json: 281 | error_msg = resp_json.get("error", {}).get("message", "Unknown error") 282 | raise ImageUploadError(f"ImgBB upload failed: {error_msg}") 283 | 284 | if "url" not in resp_json["data"]: 285 | raise ImageUploadError("ImgBB response missing URL field") 286 | 287 | uploaded_url = resp_json["data"]["url"] 288 | 289 | # Validate the returned URL 290 | validate_image_url(uploaded_url) 291 | 292 | logger.info(f"Image uploaded successfully to {uploaded_url}") 293 | return create_success_response({"url": uploaded_url}) 294 | 295 | except requests.exceptions.HTTPError as e: 296 | status_code = e.response.status_code 297 | if status_code == 400: 298 | error_msg = "Bad request to ImgBB API" 299 | elif status_code == 401: 300 | error_msg = "Invalid ImgBB API key" 301 | elif status_code == 403: 302 | error_msg = "ImgBB API access forbidden" 303 | elif status_code == 413: 304 | error_msg = "Image file too large for ImgBB" 305 | elif status_code == 429: 306 | error_msg = "ImgBB API rate limit exceeded" 307 | elif status_code >= 500: 308 | error_msg = "ImgBB server error" 309 | else: 310 | error_msg = f"HTTP error {status_code}" 311 | 312 | logger.error(f"ImgBB HTTP error: {e}") 313 | return create_error_response( 314 | "upload_http_error", 315 | error_msg, 316 | {"status_code": status_code, "response_text": e.response.text} 317 | ) 318 | except ImageUploadError as e: 319 | logger.error(f"Image upload error: {e}") 320 | return create_error_response("image_upload_error", str(e)) 321 | except Exception as e: 322 | logger.exception(f"Unexpected error during image upload: {e}") 323 | return create_error_response( 324 | "unexpected_error", 325 | f"Unexpected error during image upload: {str(e)}" 326 | ) 327 | 328 | except ValidationError as e: 329 | logger.error(f"Validation error: {e}") 330 | return create_error_response("validation_error", str(e)) 331 | except Exception as e: 332 | logger.exception(f"Unexpected error in generate_image: {e}") 333 | return create_error_response( 334 | "unexpected_error", 335 | f"Unexpected error: {str(e)}" 336 | ) 337 | 338 | 339 | @mcp.tool( 340 | name="edit_image", 341 | description="Edits an existing image based on a text prompt using the Gemini API. Takes an image URL and a prompt, then returns the edited image as a URL.", 342 | ) 343 | async def edit_image(image_url: str, prompt: str) -> str: 344 | """ 345 | Edits an existing image from a URL based on a text prompt and returns the edited image as a URL. 346 | """ 347 | try: 348 | # Input validation 349 | validate_prompt(prompt) 350 | validate_image_url(image_url) 351 | 352 | # Environment validation 353 | env_vars = validate_environment_variables() 354 | 355 | logger.info(f"Tool 'edit_image' called with image_url: '{image_url}' and prompt: '{prompt}'") 356 | 357 | # Image download with specific error handling 358 | try: 359 | # Download the image from the URL with timeout and retry logic 360 | max_retries = 3 361 | image_data = None 362 | 363 | for attempt in range(max_retries): 364 | try: 365 | response = requests.get(image_url, timeout=30) 366 | response.raise_for_status() 367 | 368 | # Check content type 369 | content_type = response.headers.get('content-type', '').lower() 370 | if not any(img_type in content_type for img_type in ['image/', 'application/octet-stream']): 371 | raise ValidationError(f"URL does not point to an image. Content-Type: {content_type}") 372 | 373 | # Check file size (10MB limit for download) 374 | if len(response.content) > 10 * 1024 * 1024: 375 | raise ValidationError("Image file too large (max 10MB)") 376 | 377 | image_data = response.content 378 | break 379 | 380 | except requests.exceptions.Timeout: 381 | if attempt == max_retries - 1: 382 | raise ImageGenerationError("Image download timed out after multiple attempts") 383 | logger.warning(f"Download attempt {attempt + 1} timed out, retrying...") 384 | await asyncio.sleep(2 ** attempt) 385 | except requests.exceptions.ConnectionError as e: 386 | if attempt == max_retries - 1: 387 | raise ImageGenerationError(f"Connection error during image download: {str(e)}") 388 | logger.warning(f"Connection error on attempt {attempt + 1}, retrying...") 389 | await asyncio.sleep(2 ** attempt) 390 | except requests.exceptions.HTTPError as e: 391 | status_code = e.response.status_code 392 | if status_code == 404: 393 | raise ValidationError("Image not found at the provided URL") 394 | elif status_code == 403: 395 | raise ValidationError("Access forbidden to the image URL") 396 | elif status_code == 410: 397 | raise ValidationError("Image is no longer available at the provided URL") 398 | elif status_code >= 500: 399 | if attempt == max_retries - 1: 400 | raise ImageGenerationError(f"Server error downloading image: {status_code}") 401 | logger.warning(f"Server error {status_code} on attempt {attempt + 1}, retrying...") 402 | await asyncio.sleep(2 ** attempt) 403 | else: 404 | raise ImageGenerationError(f"HTTP error downloading image: {status_code}") 405 | 406 | if not image_data: 407 | raise ImageGenerationError("Failed to download image after all retry attempts") 408 | 409 | # Validate and process image 410 | try: 411 | image = Image.open(BytesIO(image_data)) 412 | 413 | # Validate image format 414 | if image.format not in ['JPEG', 'PNG', 'WEBP', 'BMP', 'GIF']: 415 | raise ValidationError(f"Unsupported image format: {image.format}") 416 | 417 | # Check image dimensions 418 | width, height = image.size 419 | if width > 4096 or height > 4096: 420 | raise ValidationError(f"Image too large: {width}x{height} (max 4096x4096)") 421 | 422 | if width < 1 or height < 1: 423 | raise ValidationError("Invalid image dimensions") 424 | 425 | # Convert to RGB if necessary (for compatibility) 426 | if image.mode not in ['RGB', 'RGBA']: 427 | image = image.convert('RGB') 428 | 429 | except Exception as e: 430 | if "cannot identify image file" in str(e).lower(): 431 | raise ValidationError("Invalid image file format or corrupted image") 432 | else: 433 | raise ImageGenerationError(f"Error processing image: {str(e)}") 434 | 435 | except ValidationError as e: 436 | logger.error(f"Image validation error: {e}") 437 | return create_error_response("validation_error", str(e)) 438 | except ImageGenerationError as e: 439 | logger.error(f"Image download error: {e}") 440 | return create_error_response("image_download_error", str(e)) 441 | except Exception as e: 442 | logger.exception(f"Unexpected error during image download: {e}") 443 | return create_error_response( 444 | "unexpected_error", 445 | f"Unexpected error during image download: {str(e)}" 446 | ) 447 | 448 | # Image editing with specific error handling 449 | try: 450 | model = genai.GenerativeModel('gemini-2.5-flash-image-preview') 451 | 452 | # Generate content with timeout handling 453 | response = await asyncio.wait_for( 454 | model.generate_content_async([prompt, image]), 455 | timeout=120 # 2 minute timeout for editing 456 | ) 457 | 458 | if not response: 459 | raise ImageGenerationError("Gemini API returned empty response") 460 | 461 | response_dict = response.to_dict() 462 | 463 | # Validate response structure (same as generate_image) 464 | if "candidates" not in response_dict: 465 | raise ImageGenerationError("Invalid response structure: missing 'candidates' field") 466 | 467 | if not response_dict["candidates"]: 468 | raise ImageGenerationError("No candidates returned from Gemini API") 469 | 470 | candidate = response_dict["candidates"][0] 471 | if "content" not in candidate: 472 | raise ImageGenerationError("Invalid candidate structure: missing 'content' field") 473 | 474 | if "parts" not in candidate["content"]: 475 | raise ImageGenerationError("Invalid content structure: missing 'parts' field") 476 | 477 | parts = candidate["content"]["parts"] 478 | if not parts: 479 | raise ImageGenerationError("No parts returned in content") 480 | 481 | last_part = parts[-1] 482 | if "inline_data" not in last_part: 483 | raise ImageGenerationError("Last part does not contain image data") 484 | 485 | if "data" not in last_part["inline_data"]: 486 | raise ImageGenerationError("Image data field is missing") 487 | 488 | image_data_base64 = last_part["inline_data"]["data"] 489 | 490 | # Validate base64 data 491 | if not image_data_base64: 492 | raise ImageGenerationError("Empty image data received") 493 | 494 | # Test if base64 is valid 495 | try: 496 | base64.b64decode(image_data_base64, validate=True) 497 | except Exception as e: 498 | raise ImageGenerationError(f"Invalid base64 image data: {str(e)}") 499 | 500 | except asyncio.TimeoutError: 501 | logger.error("Image editing timed out") 502 | return create_error_response( 503 | "timeout_error", 504 | "Image editing timed out after 2 minutes", 505 | {"timeout_seconds": 120} 506 | ) 507 | except genai.types.BlockedPromptException as e: 508 | logger.error(f"Prompt blocked by Gemini API: {e}") 509 | return create_error_response( 510 | "content_policy_error", 511 | "Prompt was blocked by content policy", 512 | {"blocked_reason": str(e)} 513 | ) 514 | except genai.types.StopCandidateException as e: 515 | logger.error(f"Editing stopped by Gemini API: {e}") 516 | return create_error_response( 517 | "generation_stopped_error", 518 | "Image editing was stopped by the API", 519 | {"stop_reason": str(e)} 520 | ) 521 | except genai.types.SafetySettingsException as e: 522 | logger.error(f"Safety settings violation: {e}") 523 | return create_error_response( 524 | "safety_violation_error", 525 | "Prompt violates safety settings", 526 | {"violation_details": str(e)} 527 | ) 528 | except genai.types.APIError as e: 529 | logger.error(f"Gemini API error: {e}") 530 | return create_error_response( 531 | "api_error", 532 | f"Gemini API error: {str(e)}", 533 | {"api_error_code": getattr(e, 'code', 'unknown')} 534 | ) 535 | except ImageGenerationError as e: 536 | logger.error(f"Image editing error: {e}") 537 | return create_error_response("image_editing_error", str(e)) 538 | except Exception as e: 539 | logger.exception(f"Unexpected error during image editing: {e}") 540 | return create_error_response( 541 | "unexpected_error", 542 | f"Unexpected error during image editing: {str(e)}" 543 | ) 544 | 545 | # Image upload with specific error handling (same as generate_image) 546 | try: 547 | upload_url = "https://api.imgbb.com/1/upload" 548 | 549 | # Validate image size (ImgBB has a 32MB limit) 550 | image_size = len(base64.b64decode(image_data_base64)) 551 | if image_size > 32 * 1024 * 1024: # 32MB 552 | raise ImageUploadError(f"Image too large: {image_size} bytes (max 32MB)") 553 | 554 | payload = { 555 | "key": env_vars['IMGBB_API_KEY'], 556 | "image": image_data_base64, 557 | "name": f"{uuid.uuid4()}" 558 | } 559 | 560 | # Upload with timeout and retry logic 561 | max_retries = 3 562 | for attempt in range(max_retries): 563 | try: 564 | resp = requests.post(upload_url, data=payload, timeout=60) 565 | resp.raise_for_status() 566 | break 567 | except requests.exceptions.Timeout: 568 | if attempt == max_retries - 1: 569 | raise ImageUploadError("Upload timed out after multiple attempts") 570 | logger.warning(f"Upload attempt {attempt + 1} timed out, retrying...") 571 | await asyncio.sleep(2 ** attempt) # Exponential backoff 572 | except requests.exceptions.ConnectionError as e: 573 | if attempt == max_retries - 1: 574 | raise ImageUploadError(f"Connection error during upload: {str(e)}") 575 | logger.warning(f"Connection error on attempt {attempt + 1}, retrying...") 576 | await asyncio.sleep(2 ** attempt) 577 | 578 | resp_json = resp.json() 579 | 580 | # Validate ImgBB response 581 | if "data" not in resp_json: 582 | error_msg = resp_json.get("error", {}).get("message", "Unknown error") 583 | raise ImageUploadError(f"ImgBB upload failed: {error_msg}") 584 | 585 | if "url" not in resp_json["data"]: 586 | raise ImageUploadError("ImgBB response missing URL field") 587 | 588 | uploaded_url = resp_json["data"]["url"] 589 | 590 | # Validate the returned URL 591 | validate_image_url(uploaded_url) 592 | 593 | logger.info(f"Edited image uploaded successfully to {uploaded_url}") 594 | return create_success_response({"url": uploaded_url}) 595 | 596 | except requests.exceptions.HTTPError as e: 597 | status_code = e.response.status_code 598 | if status_code == 400: 599 | error_msg = "Bad request to ImgBB API" 600 | elif status_code == 401: 601 | error_msg = "Invalid ImgBB API key" 602 | elif status_code == 403: 603 | error_msg = "ImgBB API access forbidden" 604 | elif status_code == 413: 605 | error_msg = "Image file too large for ImgBB" 606 | elif status_code == 429: 607 | error_msg = "ImgBB API rate limit exceeded" 608 | elif status_code >= 500: 609 | error_msg = "ImgBB server error" 610 | else: 611 | error_msg = f"HTTP error {status_code}" 612 | 613 | logger.error(f"ImgBB HTTP error: {e}") 614 | return create_error_response( 615 | "upload_http_error", 616 | error_msg, 617 | {"status_code": status_code, "response_text": e.response.text} 618 | ) 619 | except ImageUploadError as e: 620 | logger.error(f"Image upload error: {e}") 621 | return create_error_response("image_upload_error", str(e)) 622 | except Exception as e: 623 | logger.exception(f"Unexpected error during image upload: {e}") 624 | return create_error_response( 625 | "unexpected_error", 626 | f"Unexpected error during image upload: {str(e)}" 627 | ) 628 | 629 | except ValidationError as e: 630 | logger.error(f"Validation error: {e}") 631 | return create_error_response("validation_error", str(e)) 632 | except Exception as e: 633 | logger.exception(f"Unexpected error in edit_image: {e}") 634 | return create_error_response( 635 | "unexpected_error", 636 | f"Unexpected error: {str(e)}" 637 | ) 638 | 639 | 640 | def main(): 641 | try: 642 | # Validate environment variables 643 | env_vars = validate_environment_variables() 644 | 645 | # Configure the Gemini API client 646 | genai.configure(api_key=env_vars['GEMINI_API_KEY']) 647 | logger.info("Gemini API configured successfully.") 648 | logger.info("IMGBB_API_KEY API configured successfully.") 649 | 650 | logger.info("Starting MCP server via mcp.run()...") 651 | asyncio.run(mcp.run()) 652 | 653 | except ValidationError as e: 654 | logger.error(f"Environment validation failed: {e}") 655 | raise 656 | except Exception as e: 657 | logger.exception(f"Failed to start MCP server: {e}") 658 | raise 659 | 660 | if __name__ == "__main__": 661 | main() 662 | ```