#
tokens: 9114/50000 6/6 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── pyproject.toml
├── README.md
├── src
│   └── mcp_nano_banana
│       ├── __init__.py
│       ├── main.py
│       └── test_gemini.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
1 | vnev
2 | venv
3 | .env
4 | build
5 | 
6 | mcp_image_generator.egg-info
7 | mcp_nano_banana.egg-info
8 | __pycache__
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # MCP Nano Banana
  2 | 
  3 | [![PyPI Version](https://img.shields.io/pypi/v/mcp-nano-banana.svg)](https://pypi.org/project/mcp-nano-banana/)
  4 | 
  5 | This project is an MCP (Model Context Protocol) server that generates images using the Google Gemini API.
  6 | 
  7 | ## Description
  8 | 
  9 | This server implements the Model Context Protocol to expose a single tool, `generate_image`, to a compatible AI model. The tool accepts a text prompt, uses the Google Gemini API to generate an image, saves the image to the `public/` directory for auditing, and returns the raw image data as a base64-encoded string.
 10 | 
 11 | ## To use the server with Claude Desktop or other applications
 12 | 
 13 | You need a Google Gemini API key and ImgBB API key to use this server.
 14 | 
 15 | Access https://api.imgbb.com/ to generate a IMGBB API Key. This is used to store and host the image online.
 16 | 
 17 | ```json
 18 | {
 19 |   "mcpServers": {
 20 |     "mcp-nano-banana": {
 21 |         "command": "uvx",
 22 |         "args": [
 23 |             "mcp-nano-banana"
 24 |         ],
 25 |         "env": {
 26 |             "GEMINI_API_KEY": "YOUR_API_KEY_HERE",
 27 |             "IMGBB_API_KEY": "YOUR_API_KEY_HERE"
 28 |         }
 29 |     }
 30 |   }
 31 | }
 32 | ```
 33 | 
 34 | 
 35 | ## Dev Setup
 36 | 
 37 | ### 1. Dependencies
 38 | 
 39 | This project uses Python and its dependencies are defined in `pyproject.toml`. You can install them using `pip`:
 40 | 
 41 | ```bash
 42 | pip install .
 43 | # Or
 44 | uv sync
 45 | ```
 46 | 
 47 | This will install `mcp`, `google-generativeai`, and other required packages.
 48 | 
 49 | ### 2. API Key
 50 | 
 51 | You need a Google Gemini API key and ImgBB API key to use this server.
 52 | 
 53 | Access https://api.imgbb.com/ to generate a IMGBB API Key. This is used to store and host the image online.
 54 | 
 55 | 1.  Create a file named `.env` in the root of the project.
 56 | 2.  Add your API key to the `.env` file in the following format:
 57 | 
 58 | ```
 59 |     GEMINI_API_KEY="YOUR_API_KEY_HERE"
 60 |     IMGBB_API_KEY="YOUR_API_KEY_HERE"
 61 | ```
 62 | 
 63 | ## Running the Server
 64 | 
 65 | This server is designed to be run as a subprocess by an MCP client or using the `mcp` command-line tool. The server listens for requests on `stdio`.
 66 | 
 67 | ```bash
 68 | uvx --from git+https://github.com/GuilhermeAumo/mcp-nano-banana mcp-nano-banana
 69 | ```
 70 | 
 71 | 
 72 | ## Publishing new pipy version
 73 | To publish a new version of this package to PyPI:
 74 | 
 75 | 1. **Update the version**  
 76 |    Edit the `version` field in `pyproject.toml` to the new version number.
 77 | 
 78 | 2. **Build the package**  
 79 |    Run:
 80 |    ```bash
 81 |    uv build
 82 |    ```
 83 |    This will create `.tar.gz` and `.whl` files in the `dist/` directory.
 84 | 
 85 | 3. **Upload to PyPI**  
 86 |    ```bash
 87 |    uv publish
 88 |    ```
 89 | 
 90 | 4. **Tag the release (optional but recommended)**  
 91 |   Commit the changes to github first, then:
 92 |   
 93 |    ```bash
 94 |    git tag v<new-version>
 95 |    git push --tags
 96 |    ```
 97 | 
 98 | **Note:**  
 99 | - You need a PyPI account and must be listed as a maintainer of the project.
100 | 
101 | For more details, see the [Python Packaging User Guide](https://packaging.python.org/tutorials/packaging-projects/).
102 | 
```

--------------------------------------------------------------------------------
/src/mcp_nano_banana/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "mcp-nano-banana"
 3 | version = "0.2.1"
 4 | requires-python = ">=3.10"
 5 | description = "Um servidor MCP que gera imagens usando a Google Gemini API."
 6 | readme = "README.md"
 7 | dependencies = [
 8 |     "mcp[cli]>=1.13",
 9 |     "google-generativeai>=0.5.4",
10 |     "python-dotenv>=1.0.1",
11 |     "pillow>=11.3.0",
12 | ]
13 | 
14 | [project.urls]
15 | Homepage = "https://github.com/GuilhermeAumo/mcp-nano-banana"
16 | Repository = "https://github.com/GuilhermeAumo/mcp-nano-banana"
17 | 
18 | [project.scripts]
19 | mcp-nano-banana = "mcp_nano_banana.main:main"
20 | 
21 | [tool.setuptools.packages.find]
22 | where = ["src"]
23 | 
```

--------------------------------------------------------------------------------
/src/mcp_nano_banana/test_gemini.py:
--------------------------------------------------------------------------------

```python
 1 | import json
 2 | import os
 3 | import base64
 4 | import google.generativeai as genai
 5 | from dotenv import load_dotenv
 6 | import requests
 7 | 
 8 | load_dotenv()
 9 | 
10 | genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
11 | model = genai.GenerativeModel('gemini-2.5-flash-image-preview')
12 | 
13 | prompt = "Create nano-sized banana in a lab setting."
14 | response = model.generate_content([prompt])
15 | response = response.to_dict()
16 | 
17 | bytes_data = response["candidates"][0]["content"]["parts"][-1]["inline_data"]["data"]
18 | 
19 | generated_img = base64.b64decode(bytes_data)
20 | with open('edited_nano_banana.png', 'wb') as out:
21 |     out.write(generated_img)
22 | 
23 | 
24 | # Upload image to Imgbb.host
25 | # --- STEP 1: Ensure 'edited_nano_banana.png' exists and is a valid, non-empty image ---
26 | try:
27 |     with open('edited_nano_banana.png', 'rb') as image_file:
28 |         # Read the binary data and encode it to a Base64 string
29 |         generated_img_b64 = base64.b64encode(image_file.read()).decode('utf-8')
30 |     
31 |     if not generated_img_b64:
32 |         raise ValueError("The image file 'edited_nano_banana.png' is empty.")
33 | 
34 | except FileNotFoundError:
35 |     raise FileNotFoundError("The image file 'edited_nano_banana.png' was not found. Please ensure it exists.")
36 | 
37 | 
38 | # --- STEP 2: Build and send the correct POST request to ImgBB ---
39 | IMGBB_API_KEY = os.getenv("IMGBB_API_KEY")
40 | if not IMGBB_API_KEY:
41 |     raise ValueError("IMGBB_API_KEY environment variable not set or .env file is missing.")
42 | 
43 | upload_url = "https://api.imgbb.com/1/upload"
44 | 
45 | # All parameters go into the 'data' payload for the POST request
46 | payload = {
47 |     "key": IMGBB_API_KEY,
48 |     "image": generated_img_b64,  # The Base64 string is the 'image' field
49 |     "name": "nano_banana.png"   # Optional: specify a name for the file
50 | }
51 | 
52 | try:
53 |     print("Uploading image to ImgBB...")
54 |     # Use the data= parameter, NOT files=
55 |     resp = requests.post(upload_url, data=payload, timeout=60) # Increased timeout for larger files
56 |     
57 |     resp.raise_for_status() # Raise an error for bad status codes (4xx or 5xx)
58 | 
59 |     resp_json = resp.json()
60 | 
61 |     # ImgBB's success indicator is the 'data' key in the response
62 |     if "data" not in resp_json:
63 |         raise Exception(f"Imgbb upload failed. Response: {resp_json}")
64 | 
65 |     uploaded_url = resp_json["data"]["url"]
66 |     print(f"Success! Image uploaded to {uploaded_url}")
67 | 
68 | except requests.exceptions.HTTPError as err:
69 |     print(f"HTTP error occurred: {err}")
70 |     print(f"Response body: {err.response.text}")
71 |     raise
72 | 
```

--------------------------------------------------------------------------------
/src/mcp_nano_banana/main.py:
--------------------------------------------------------------------------------

```python
  1 | import asyncio
  2 | import logging
  3 | import os
  4 | import base64
  5 | import uuid
  6 | import json
  7 | import httpx
  8 | from dotenv import load_dotenv
  9 | from mcp.server.fastmcp import FastMCP
 10 | import google.generativeai as genai
 11 | import requests
 12 | from PIL import Image
 13 | from io import BytesIO
 14 | from urllib.parse import urlparse
 15 | from typing import Dict, Any, Optional
 16 | import re
 17 | 
 18 | # Configure logging
 19 | logging.basicConfig(level=logging.INFO)
 20 | logger = logging.getLogger(__name__)
 21 | 
 22 | # --- Error Handling Classes ---
 23 | class ImageGenerationError(Exception):
 24 |     """Custom exception for image generation errors"""
 25 |     pass
 26 | 
 27 | class ImageUploadError(Exception):
 28 |     """Custom exception for image upload errors"""
 29 |     pass
 30 | 
 31 | class ValidationError(Exception):
 32 |     """Custom exception for input validation errors"""
 33 |     pass
 34 | 
 35 | class APIError(Exception):
 36 |     """Custom exception for API-related errors"""
 37 |     pass
 38 | 
 39 | # --- Utility Functions ---
 40 | def validate_prompt(prompt: str) -> None:
 41 |     """Validate image generation prompt"""
 42 |     if not prompt or not isinstance(prompt, str):
 43 |         raise ValidationError("Prompt must be a non-empty string")
 44 |     
 45 |     if len(prompt.strip()) == 0:
 46 |         raise ValidationError("Prompt cannot be empty or only whitespace")
 47 |     
 48 |     if len(prompt) > 1000:
 49 |         raise ValidationError("Prompt is too long (maximum 1000 characters)")
 50 |     
 51 |     # Check for potentially problematic content
 52 |     if any(char in prompt for char in ['<', '>', '&', '"', "'"]):
 53 |         logger.warning("Prompt contains potentially problematic characters")
 54 | 
 55 | def validate_image_url(url: str) -> None:
 56 |     """Validate image URL"""
 57 |     if not url or not isinstance(url, str):
 58 |         raise ValidationError("Image URL must be a non-empty string")
 59 |     
 60 |     try:
 61 |         parsed = urlparse(url)
 62 |         if not parsed.scheme or not parsed.netloc:
 63 |             raise ValidationError("Invalid URL format")
 64 |         
 65 |         if parsed.scheme not in ['http', 'https']:
 66 |             raise ValidationError("URL must use HTTP or HTTPS protocol")
 67 |     except Exception as e:
 68 |         raise ValidationError(f"Invalid URL format: {str(e)}")
 69 | 
 70 | def validate_environment_variables() -> Dict[str, str]:
 71 |     """Validate required environment variables"""
 72 |     errors = []
 73 |     env_vars = {}
 74 |     
 75 |     # Check GEMINI_API_KEY
 76 |     gemini_key = os.getenv("GEMINI_API_KEY")
 77 |     if not gemini_key:
 78 |         errors.append("GEMINI_API_KEY environment variable not set")
 79 |     elif not gemini_key.strip():
 80 |         errors.append("GEMINI_API_KEY environment variable is empty")
 81 |     else:
 82 |         env_vars['GEMINI_API_KEY'] = gemini_key
 83 |     
 84 |     # Check IMGBB_API_KEY
 85 |     imgbb_key = os.getenv("IMGBB_API_KEY")
 86 |     if not imgbb_key:
 87 |         errors.append("IMGBB_API_KEY environment variable not set")
 88 |     elif not imgbb_key.strip():
 89 |         errors.append("IMGBB_API_KEY environment variable is empty")
 90 |     else:
 91 |         env_vars['IMGBB_API_KEY'] = imgbb_key
 92 |     
 93 |     if errors:
 94 |         raise ValidationError(f"Environment validation failed: {'; '.join(errors)}")
 95 |     
 96 |     return env_vars
 97 | 
 98 | def create_error_response(error_type: str, message: str, details: Optional[Dict[str, Any]] = None) -> str:
 99 |     """Create a standardized error response"""
100 |     error_response = {
101 |         "error": True,
102 |         "error_type": error_type,
103 |         "message": message,
104 |         "timestamp": asyncio.get_event_loop().time() if asyncio.get_event_loop().is_running() else None
105 |     }
106 |     
107 |     if details:
108 |         error_response["details"] = details
109 |     
110 |     return json.dumps(error_response)
111 | 
112 | def create_success_response(data: Any) -> str:
113 |     """Create a standardized success response"""
114 |     success_response = {
115 |         "success": True,
116 |         "data": data,
117 |         "timestamp": asyncio.get_event_loop().time() if asyncio.get_event_loop().is_running() else None
118 |     }
119 |     return json.dumps(success_response)
120 | 
121 | # --- MCP Server Setup ---
122 | # Create a FastMCP server instance
123 | mcp = FastMCP(
124 |     name="image_generator_mcp_server",
125 | )
126 | logger.info(f"MCP server '{mcp.name}' created.")
127 | 
128 | 
129 | # --- Tool Definition ---
130 | @mcp.tool(
131 |     name="generate_image",
132 |     description="Generates an image based on a text prompt using the Gemini API and returns the image as a url.",
133 | )
134 | async def generate_image(prompt: str) -> str:
135 |     """
136 |     Generates an image from a text prompt and returns the url of the image.
137 |     """
138 |     try:
139 |         # Input validation
140 |         validate_prompt(prompt)
141 |         
142 |         # Environment validation
143 |         env_vars = validate_environment_variables()
144 |         
145 |         logger.info(f"Tool 'generate_image' called with prompt: '{prompt}'")
146 | 
147 |         # Image generation with specific error handling
148 |         try:
149 |             model = genai.GenerativeModel('gemini-2.5-flash-image-preview')
150 |             
151 |             # Generate content with timeout handling
152 |             response = await asyncio.wait_for(
153 |                 model.generate_content_async([f"Generate a high-quality, detailed image of: {prompt}"]),
154 |                 timeout=120  # 2 minute timeout for generation
155 |             )
156 |             
157 |             if not response:
158 |                 raise ImageGenerationError("Gemini API returned empty response")
159 |             
160 |             response_dict = response.to_dict()
161 |             
162 |             # Validate response structure
163 |             if "candidates" not in response_dict:
164 |                 raise ImageGenerationError("Invalid response structure: missing 'candidates' field")
165 |             
166 |             if not response_dict["candidates"]:
167 |                 raise ImageGenerationError("No candidates returned from Gemini API")
168 |             
169 |             candidate = response_dict["candidates"][0]
170 |             if "content" not in candidate:
171 |                 raise ImageGenerationError("Invalid candidate structure: missing 'content' field")
172 |             
173 |             if "parts" not in candidate["content"]:
174 |                 raise ImageGenerationError("Invalid content structure: missing 'parts' field")
175 |             
176 |             parts = candidate["content"]["parts"]
177 |             if not parts:
178 |                 raise ImageGenerationError("No parts returned in content")
179 |             
180 |             last_part = parts[-1]
181 |             if "inline_data" not in last_part:
182 |                 raise ImageGenerationError("Last part does not contain image data")
183 |             
184 |             if "data" not in last_part["inline_data"]:
185 |                 raise ImageGenerationError("Image data field is missing")
186 |             
187 |             image_data_base64 = last_part["inline_data"]["data"]
188 |             
189 |             # Validate base64 data
190 |             if not image_data_base64:
191 |                 raise ImageGenerationError("Empty image data received")
192 |             
193 |             # Test if base64 is valid
194 |             try:
195 |                 base64.b64decode(image_data_base64, validate=True)
196 |             except Exception as e:
197 |                 raise ImageGenerationError(f"Invalid base64 image data: {str(e)}")
198 |                 
199 |         except asyncio.TimeoutError:
200 |             logger.error("Image generation timed out")
201 |             return create_error_response(
202 |                 "timeout_error",
203 |                 "Image generation timed out after 2 minutes",
204 |                 {"timeout_seconds": 120}
205 |             )
206 |         except genai.types.BlockedPromptException as e:
207 |             logger.error(f"Prompt blocked by Gemini API: {e}")
208 |             return create_error_response(
209 |                 "content_policy_error",
210 |                 "Prompt was blocked by content policy",
211 |                 {"blocked_reason": str(e)}
212 |             )
213 |         except genai.types.StopCandidateException as e:
214 |             logger.error(f"Generation stopped by Gemini API: {e}")
215 |             return create_error_response(
216 |                 "generation_stopped_error",
217 |                 "Image generation was stopped by the API",
218 |                 {"stop_reason": str(e)}
219 |             )
220 |         except genai.types.SafetySettingsException as e:
221 |             logger.error(f"Safety settings violation: {e}")
222 |             return create_error_response(
223 |                 "safety_violation_error",
224 |                 "Prompt violates safety settings",
225 |                 {"violation_details": str(e)}
226 |             )
227 |         except genai.types.APIError as e:
228 |             logger.error(f"Gemini API error: {e}")
229 |             return create_error_response(
230 |                 "api_error",
231 |                 f"Gemini API error: {str(e)}",
232 |                 {"api_error_code": getattr(e, 'code', 'unknown')}
233 |             )
234 |         except ImageGenerationError as e:
235 |             logger.error(f"Image generation error: {e}")
236 |             return create_error_response("image_generation_error", str(e))
237 |         except Exception as e:
238 |             logger.exception(f"Unexpected error during image generation: {e}")
239 |             return create_error_response(
240 |                 "unexpected_error",
241 |                 f"Unexpected error during image generation: {str(e)}"
242 |             )
243 | 
244 |         # Image upload with specific error handling
245 |         try:
246 |             upload_url = "https://api.imgbb.com/1/upload"
247 |             
248 |             # Validate image size (ImgBB has a 32MB limit)
249 |             image_size = len(base64.b64decode(image_data_base64))
250 |             if image_size > 32 * 1024 * 1024:  # 32MB
251 |                 raise ImageUploadError(f"Image too large: {image_size} bytes (max 32MB)")
252 |             
253 |             payload = {
254 |                 "key": env_vars['IMGBB_API_KEY'],
255 |                 "image": image_data_base64,
256 |                 "name": f"{uuid.uuid4()}"
257 |             }
258 |             
259 |             # Upload with timeout and retry logic
260 |             max_retries = 3
261 |             for attempt in range(max_retries):
262 |                 try:
263 |                     resp = requests.post(upload_url, data=payload, timeout=60)
264 |                     resp.raise_for_status()
265 |                     break
266 |                 except requests.exceptions.Timeout:
267 |                     if attempt == max_retries - 1:
268 |                         raise ImageUploadError("Upload timed out after multiple attempts")
269 |                     logger.warning(f"Upload attempt {attempt + 1} timed out, retrying...")
270 |                     await asyncio.sleep(2 ** attempt)  # Exponential backoff
271 |                 except requests.exceptions.ConnectionError as e:
272 |                     if attempt == max_retries - 1:
273 |                         raise ImageUploadError(f"Connection error during upload: {str(e)}")
274 |                     logger.warning(f"Connection error on attempt {attempt + 1}, retrying...")
275 |                     await asyncio.sleep(2 ** attempt)
276 |             
277 |             resp_json = resp.json()
278 |             
279 |             # Validate ImgBB response
280 |             if "data" not in resp_json:
281 |                 error_msg = resp_json.get("error", {}).get("message", "Unknown error")
282 |                 raise ImageUploadError(f"ImgBB upload failed: {error_msg}")
283 |             
284 |             if "url" not in resp_json["data"]:
285 |                 raise ImageUploadError("ImgBB response missing URL field")
286 |             
287 |             uploaded_url = resp_json["data"]["url"]
288 |             
289 |             # Validate the returned URL
290 |             validate_image_url(uploaded_url)
291 |             
292 |             logger.info(f"Image uploaded successfully to {uploaded_url}")
293 |             return create_success_response({"url": uploaded_url})
294 |             
295 |         except requests.exceptions.HTTPError as e:
296 |             status_code = e.response.status_code
297 |             if status_code == 400:
298 |                 error_msg = "Bad request to ImgBB API"
299 |             elif status_code == 401:
300 |                 error_msg = "Invalid ImgBB API key"
301 |             elif status_code == 403:
302 |                 error_msg = "ImgBB API access forbidden"
303 |             elif status_code == 413:
304 |                 error_msg = "Image file too large for ImgBB"
305 |             elif status_code == 429:
306 |                 error_msg = "ImgBB API rate limit exceeded"
307 |             elif status_code >= 500:
308 |                 error_msg = "ImgBB server error"
309 |             else:
310 |                 error_msg = f"HTTP error {status_code}"
311 |             
312 |             logger.error(f"ImgBB HTTP error: {e}")
313 |             return create_error_response(
314 |                 "upload_http_error",
315 |                 error_msg,
316 |                 {"status_code": status_code, "response_text": e.response.text}
317 |             )
318 |         except ImageUploadError as e:
319 |             logger.error(f"Image upload error: {e}")
320 |             return create_error_response("image_upload_error", str(e))
321 |         except Exception as e:
322 |             logger.exception(f"Unexpected error during image upload: {e}")
323 |             return create_error_response(
324 |                 "unexpected_error",
325 |                 f"Unexpected error during image upload: {str(e)}"
326 |             )
327 | 
328 |     except ValidationError as e:
329 |         logger.error(f"Validation error: {e}")
330 |         return create_error_response("validation_error", str(e))
331 |     except Exception as e:
332 |         logger.exception(f"Unexpected error in generate_image: {e}")
333 |         return create_error_response(
334 |             "unexpected_error",
335 |             f"Unexpected error: {str(e)}"
336 |         )
337 |     
338 | 
339 | @mcp.tool(
340 |     name="edit_image",
341 |     description="Edits an existing image based on a text prompt using the Gemini API. Takes an image URL and a prompt, then returns the edited image as a URL.",
342 | )
343 | async def edit_image(image_url: str, prompt: str) -> str:
344 |     """
345 |     Edits an existing image from a URL based on a text prompt and returns the edited image as a URL.
346 |     """
347 |     try:
348 |         # Input validation
349 |         validate_prompt(prompt)
350 |         validate_image_url(image_url)
351 |         
352 |         # Environment validation
353 |         env_vars = validate_environment_variables()
354 |         
355 |         logger.info(f"Tool 'edit_image' called with image_url: '{image_url}' and prompt: '{prompt}'")
356 | 
357 |         # Image download with specific error handling
358 |         try:
359 |             # Download the image from the URL with timeout and retry logic
360 |             max_retries = 3
361 |             image_data = None
362 |             
363 |             for attempt in range(max_retries):
364 |                 try:
365 |                     response = requests.get(image_url, timeout=30)
366 |                     response.raise_for_status()
367 |                     
368 |                     # Check content type
369 |                     content_type = response.headers.get('content-type', '').lower()
370 |                     if not any(img_type in content_type for img_type in ['image/', 'application/octet-stream']):
371 |                         raise ValidationError(f"URL does not point to an image. Content-Type: {content_type}")
372 |                     
373 |                     # Check file size (10MB limit for download)
374 |                     if len(response.content) > 10 * 1024 * 1024:
375 |                         raise ValidationError("Image file too large (max 10MB)")
376 |                     
377 |                     image_data = response.content
378 |                     break
379 |                     
380 |                 except requests.exceptions.Timeout:
381 |                     if attempt == max_retries - 1:
382 |                         raise ImageGenerationError("Image download timed out after multiple attempts")
383 |                     logger.warning(f"Download attempt {attempt + 1} timed out, retrying...")
384 |                     await asyncio.sleep(2 ** attempt)
385 |                 except requests.exceptions.ConnectionError as e:
386 |                     if attempt == max_retries - 1:
387 |                         raise ImageGenerationError(f"Connection error during image download: {str(e)}")
388 |                     logger.warning(f"Connection error on attempt {attempt + 1}, retrying...")
389 |                     await asyncio.sleep(2 ** attempt)
390 |                 except requests.exceptions.HTTPError as e:
391 |                     status_code = e.response.status_code
392 |                     if status_code == 404:
393 |                         raise ValidationError("Image not found at the provided URL")
394 |                     elif status_code == 403:
395 |                         raise ValidationError("Access forbidden to the image URL")
396 |                     elif status_code == 410:
397 |                         raise ValidationError("Image is no longer available at the provided URL")
398 |                     elif status_code >= 500:
399 |                         if attempt == max_retries - 1:
400 |                             raise ImageGenerationError(f"Server error downloading image: {status_code}")
401 |                         logger.warning(f"Server error {status_code} on attempt {attempt + 1}, retrying...")
402 |                         await asyncio.sleep(2 ** attempt)
403 |                     else:
404 |                         raise ImageGenerationError(f"HTTP error downloading image: {status_code}")
405 |             
406 |             if not image_data:
407 |                 raise ImageGenerationError("Failed to download image after all retry attempts")
408 |             
409 |             # Validate and process image
410 |             try:
411 |                 image = Image.open(BytesIO(image_data))
412 |                 
413 |                 # Validate image format
414 |                 if image.format not in ['JPEG', 'PNG', 'WEBP', 'BMP', 'GIF']:
415 |                     raise ValidationError(f"Unsupported image format: {image.format}")
416 |                 
417 |                 # Check image dimensions
418 |                 width, height = image.size
419 |                 if width > 4096 or height > 4096:
420 |                     raise ValidationError(f"Image too large: {width}x{height} (max 4096x4096)")
421 |                 
422 |                 if width < 1 or height < 1:
423 |                     raise ValidationError("Invalid image dimensions")
424 |                 
425 |                 # Convert to RGB if necessary (for compatibility)
426 |                 if image.mode not in ['RGB', 'RGBA']:
427 |                     image = image.convert('RGB')
428 |                 
429 |             except Exception as e:
430 |                 if "cannot identify image file" in str(e).lower():
431 |                     raise ValidationError("Invalid image file format or corrupted image")
432 |                 else:
433 |                     raise ImageGenerationError(f"Error processing image: {str(e)}")
434 |             
435 |         except ValidationError as e:
436 |             logger.error(f"Image validation error: {e}")
437 |             return create_error_response("validation_error", str(e))
438 |         except ImageGenerationError as e:
439 |             logger.error(f"Image download error: {e}")
440 |             return create_error_response("image_download_error", str(e))
441 |         except Exception as e:
442 |             logger.exception(f"Unexpected error during image download: {e}")
443 |             return create_error_response(
444 |                 "unexpected_error",
445 |                 f"Unexpected error during image download: {str(e)}"
446 |             )
447 | 
448 |         # Image editing with specific error handling
449 |         try:
450 |             model = genai.GenerativeModel('gemini-2.5-flash-image-preview')
451 | 
452 |             # Generate content with timeout handling
453 |             response = await asyncio.wait_for(
454 |                 model.generate_content_async([prompt, image]),
455 |                 timeout=120  # 2 minute timeout for editing
456 |             )
457 |             
458 |             if not response:
459 |                 raise ImageGenerationError("Gemini API returned empty response")
460 |             
461 |             response_dict = response.to_dict()
462 |             
463 |             # Validate response structure (same as generate_image)
464 |             if "candidates" not in response_dict:
465 |                 raise ImageGenerationError("Invalid response structure: missing 'candidates' field")
466 |             
467 |             if not response_dict["candidates"]:
468 |                 raise ImageGenerationError("No candidates returned from Gemini API")
469 |             
470 |             candidate = response_dict["candidates"][0]
471 |             if "content" not in candidate:
472 |                 raise ImageGenerationError("Invalid candidate structure: missing 'content' field")
473 |             
474 |             if "parts" not in candidate["content"]:
475 |                 raise ImageGenerationError("Invalid content structure: missing 'parts' field")
476 |             
477 |             parts = candidate["content"]["parts"]
478 |             if not parts:
479 |                 raise ImageGenerationError("No parts returned in content")
480 |             
481 |             last_part = parts[-1]
482 |             if "inline_data" not in last_part:
483 |                 raise ImageGenerationError("Last part does not contain image data")
484 |             
485 |             if "data" not in last_part["inline_data"]:
486 |                 raise ImageGenerationError("Image data field is missing")
487 |             
488 |             image_data_base64 = last_part["inline_data"]["data"]
489 |             
490 |             # Validate base64 data
491 |             if not image_data_base64:
492 |                 raise ImageGenerationError("Empty image data received")
493 |             
494 |             # Test if base64 is valid
495 |             try:
496 |                 base64.b64decode(image_data_base64, validate=True)
497 |             except Exception as e:
498 |                 raise ImageGenerationError(f"Invalid base64 image data: {str(e)}")
499 |                 
500 |         except asyncio.TimeoutError:
501 |             logger.error("Image editing timed out")
502 |             return create_error_response(
503 |                 "timeout_error",
504 |                 "Image editing timed out after 2 minutes",
505 |                 {"timeout_seconds": 120}
506 |             )
507 |         except genai.types.BlockedPromptException as e:
508 |             logger.error(f"Prompt blocked by Gemini API: {e}")
509 |             return create_error_response(
510 |                 "content_policy_error",
511 |                 "Prompt was blocked by content policy",
512 |                 {"blocked_reason": str(e)}
513 |             )
514 |         except genai.types.StopCandidateException as e:
515 |             logger.error(f"Editing stopped by Gemini API: {e}")
516 |             return create_error_response(
517 |                 "generation_stopped_error",
518 |                 "Image editing was stopped by the API",
519 |                 {"stop_reason": str(e)}
520 |             )
521 |         except genai.types.SafetySettingsException as e:
522 |             logger.error(f"Safety settings violation: {e}")
523 |             return create_error_response(
524 |                 "safety_violation_error",
525 |                 "Prompt violates safety settings",
526 |                 {"violation_details": str(e)}
527 |             )
528 |         except genai.types.APIError as e:
529 |             logger.error(f"Gemini API error: {e}")
530 |             return create_error_response(
531 |                 "api_error",
532 |                 f"Gemini API error: {str(e)}",
533 |                 {"api_error_code": getattr(e, 'code', 'unknown')}
534 |             )
535 |         except ImageGenerationError as e:
536 |             logger.error(f"Image editing error: {e}")
537 |             return create_error_response("image_editing_error", str(e))
538 |         except Exception as e:
539 |             logger.exception(f"Unexpected error during image editing: {e}")
540 |             return create_error_response(
541 |                 "unexpected_error",
542 |                 f"Unexpected error during image editing: {str(e)}"
543 |             )
544 | 
545 |         # Image upload with specific error handling (same as generate_image)
546 |         try:
547 |             upload_url = "https://api.imgbb.com/1/upload"
548 |             
549 |             # Validate image size (ImgBB has a 32MB limit)
550 |             image_size = len(base64.b64decode(image_data_base64))
551 |             if image_size > 32 * 1024 * 1024:  # 32MB
552 |                 raise ImageUploadError(f"Image too large: {image_size} bytes (max 32MB)")
553 |             
554 |             payload = {
555 |                 "key": env_vars['IMGBB_API_KEY'],
556 |                 "image": image_data_base64,
557 |                 "name": f"{uuid.uuid4()}"
558 |             }
559 |             
560 |             # Upload with timeout and retry logic
561 |             max_retries = 3
562 |             for attempt in range(max_retries):
563 |                 try:
564 |                     resp = requests.post(upload_url, data=payload, timeout=60)
565 |                     resp.raise_for_status()
566 |                     break
567 |                 except requests.exceptions.Timeout:
568 |                     if attempt == max_retries - 1:
569 |                         raise ImageUploadError("Upload timed out after multiple attempts")
570 |                     logger.warning(f"Upload attempt {attempt + 1} timed out, retrying...")
571 |                     await asyncio.sleep(2 ** attempt)  # Exponential backoff
572 |                 except requests.exceptions.ConnectionError as e:
573 |                     if attempt == max_retries - 1:
574 |                         raise ImageUploadError(f"Connection error during upload: {str(e)}")
575 |                     logger.warning(f"Connection error on attempt {attempt + 1}, retrying...")
576 |                     await asyncio.sleep(2 ** attempt)
577 |             
578 |             resp_json = resp.json()
579 |             
580 |             # Validate ImgBB response
581 |             if "data" not in resp_json:
582 |                 error_msg = resp_json.get("error", {}).get("message", "Unknown error")
583 |                 raise ImageUploadError(f"ImgBB upload failed: {error_msg}")
584 |             
585 |             if "url" not in resp_json["data"]:
586 |                 raise ImageUploadError("ImgBB response missing URL field")
587 |             
588 |             uploaded_url = resp_json["data"]["url"]
589 |             
590 |             # Validate the returned URL
591 |             validate_image_url(uploaded_url)
592 |             
593 |             logger.info(f"Edited image uploaded successfully to {uploaded_url}")
594 |             return create_success_response({"url": uploaded_url})
595 |             
596 |         except requests.exceptions.HTTPError as e:
597 |             status_code = e.response.status_code
598 |             if status_code == 400:
599 |                 error_msg = "Bad request to ImgBB API"
600 |             elif status_code == 401:
601 |                 error_msg = "Invalid ImgBB API key"
602 |             elif status_code == 403:
603 |                 error_msg = "ImgBB API access forbidden"
604 |             elif status_code == 413:
605 |                 error_msg = "Image file too large for ImgBB"
606 |             elif status_code == 429:
607 |                 error_msg = "ImgBB API rate limit exceeded"
608 |             elif status_code >= 500:
609 |                 error_msg = "ImgBB server error"
610 |             else:
611 |                 error_msg = f"HTTP error {status_code}"
612 |             
613 |             logger.error(f"ImgBB HTTP error: {e}")
614 |             return create_error_response(
615 |                 "upload_http_error",
616 |                 error_msg,
617 |                 {"status_code": status_code, "response_text": e.response.text}
618 |             )
619 |         except ImageUploadError as e:
620 |             logger.error(f"Image upload error: {e}")
621 |             return create_error_response("image_upload_error", str(e))
622 |         except Exception as e:
623 |             logger.exception(f"Unexpected error during image upload: {e}")
624 |             return create_error_response(
625 |                 "unexpected_error",
626 |                 f"Unexpected error during image upload: {str(e)}"
627 |             )
628 | 
629 |     except ValidationError as e:
630 |         logger.error(f"Validation error: {e}")
631 |         return create_error_response("validation_error", str(e))
632 |     except Exception as e:
633 |         logger.exception(f"Unexpected error in edit_image: {e}")
634 |         return create_error_response(
635 |             "unexpected_error",
636 |             f"Unexpected error: {str(e)}"
637 |         )
638 | 
639 | 
640 | def main():
641 |     try:
642 |         # Validate environment variables
643 |         env_vars = validate_environment_variables()
644 |         
645 |         # Configure the Gemini API client
646 |         genai.configure(api_key=env_vars['GEMINI_API_KEY'])
647 |         logger.info("Gemini API configured successfully.")
648 |         logger.info("IMGBB_API_KEY API configured successfully.")
649 | 
650 |         logger.info("Starting MCP server via mcp.run()...")
651 |         asyncio.run(mcp.run())
652 |         
653 |     except ValidationError as e:
654 |         logger.error(f"Environment validation failed: {e}")
655 |         raise
656 |     except Exception as e:
657 |         logger.exception(f"Failed to start MCP server: {e}")
658 |         raise
659 | 
660 | if __name__ == "__main__":
661 |     main()
662 | 
```