#
tokens: 7748/50000 16/16 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .cursor
│   └── rules
│       └── logo-creation.mdc
├── .gitignore
├── .python-version
├── config
│   ├── __init__.py
│   └── settings.py
├── Dockerfile
├── downloads
│   ├── db-icon-1_128x128.png
│   ├── db-icon-1_32x32.png
│   ├── db-icon-1.png
│   ├── fighter_jet.glb
│   ├── skKKrkhF_XplQxNPUPrFX_f09176a2fab045d0945f724a3833b470.png
│   ├── y8c1zcRFBHv00oJ3mnonf_8293637079c74a8a8570c655a55904c9_128x128.png
│   ├── y8c1zcRFBHv00oJ3mnonf_8293637079c74a8a8570c655a55904c9_32x32.png
│   ├── y8c1zcRFBHv00oJ3mnonf_8293637079c74a8a8570c655a55904c9.png
│   ├── zEChDDxjUQrMQebsjJxEk_3e241e40750a4293bc1230f064b691be_128x128.png
│   ├── zEChDDxjUQrMQebsjJxEk_3e241e40750a4293bc1230f064b691be_32x32.png
│   └── zEChDDxjUQrMQebsjJxEk_3e241e40750a4293bc1230f064b691be.png
├── LICENSE
├── pyproject.toml
├── README.md
├── requirements.txt
├── routes
│   └── scale_image.py
├── run_server.py
├── server.py
├── tools
│   ├── __init__.py
│   ├── background_removal.py
│   ├── image_download.py
│   ├── image_gen.py
│   └── image_scaling.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.13

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info

# Virtual environments
.venv

# Environment variables
.env

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# MCP Tool Server for Logo Generation

This server provides logo generation capabilities using FAL AI, with tools for image generation, background removal, and automatic scaling.

## Demo

[![MCP Tool Server Demo](https://img.youtube.com/vi/Miemu1xEZng/0.jpg)](https://www.youtube.com/watch?v=Miemu1xEZng)

## Installation

1. Install `uv` (Universal Virtualenv):

```bash
curl -LsSf https://astral.sh/uv/install.sh | sh
```

2. Create and activate a virtual environment:

```bash
uv venv
source .venv/bin/activate  # On Unix/macOS
# or
.venv\Scripts\activate     # On Windows
```

3. Install dependencies:

```bash
uv pip install -r requirements.txt
```

4. Set up your environment variables:
   - Create a `.env` file in the root directory
   - Add your FAL AI API key:

```bash
FAL_KEY=your_fal_ai_key_here
```

## Running the Server

Start the server with:

```bash
python run_server.py
```

The server will be available at `http://127.0.0.1:7777`

### Troubleshooting

If you encounter a `FileNotFoundError` on Windows when running the server, make sure you're running the command from the root directory of the project. If the issue persists, try updating to the latest version of the repository which includes fixes for Windows compatibility.

For Windows users specifically:

1. Make sure you've activated your virtual environment with `.venv\Scripts\activate`
2. Run the server from the root directory of the project with `python run_server.py`
3. If you see any path-related errors, please report them in the issues section of the repository

## Cursor IDE Configuration

1. Open Cursor Settings
2. Navigate to the MCP section
3. Add the following configuration:
   - URL: `http://127.0.0.1:7777/sse`
   - Connection Type: `SSE`
   - Enable the connection

## Notes

- Always reference `@logo-creation.mdc` in your Cursor Composer for consistent results
- Steps are defined in `@logo-creation.mdc` but tools can be used independently
- All generated logos will be saved in the `downloads` directory
- Each logo is automatically generated in three sizes:
  - Original size
  - 32x32 pixels
  - 128x128 pixels
- All logos maintain transparency in their final PNG format
- Prompts created by agent are informed by examples and prompt structure seen in server.py. You can customize the prompt structure by editing the server.py file.
- You can use the generate_image tool to generate any image you want, not just logos

## Requirements

- Python 3.8+
- FAL AI API key (required for image generation)
- Active internet connection

## References

- [Cursor MCP Documentation](https://docs.cursor.com/context/model-context-protocol)
- [Model Context Protocol Introduction](https://modelcontextprotocol.io/introduction)
- [FAL AI Dashboard](https://fal.ai/dashboard)

---

If you find this tool helpful, you can [buy me a coffee](https://buymeacoffee.com/sshtunnelvision) ☕️ to support development!

```

--------------------------------------------------------------------------------
/config/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uv", "run", "server.py"]
```

--------------------------------------------------------------------------------
/config/settings.py:
--------------------------------------------------------------------------------

```python
# config/settings.py
import os

# Load environment variables (e.g., from a .env file or system env)
FAL_API_KEY = os.getenv("FAL_API_KEY")  # Replace with your actual key
```

--------------------------------------------------------------------------------
/tools/__init__.py:
--------------------------------------------------------------------------------

```python
from .image_gen import generate_image
from .background_removal import remove_background
from .image_download import download_image_from_url
from .image_scaling import scale_image

__all__ = [
    'generate_image',
    'remove_background',
    'download_image_from_url',
    'scale_image'
]

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp-tool-server"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
    "fal-client>=0.5.9",
    "fastapi>=0.115.11",
    "mcp[cli]>=1.3.0",
    "python-dotenv>=1.0.1",
    "sse-starlette>=2.2.1",
    "uvicorn>=0.34.0",
]

```

--------------------------------------------------------------------------------
/routes/scale_image.py:
--------------------------------------------------------------------------------

```python
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List, Tuple
from ..tools import scale_image

router = APIRouter()

class ScaleImageRequest(BaseModel):
    input_path: str
    sizes: List[Tuple[int, int]] = [(32, 32), (128, 128)]

@router.post("/scale-image")
async def scale_image_route(request: ScaleImageRequest):
    """
    Scale an image to specified sizes while preserving transparency.
    """
    try:
        result = await scale_image(request.input_path, request.sizes)
        return {"message": result}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e)) 
```

--------------------------------------------------------------------------------
/tools/image_gen.py:
--------------------------------------------------------------------------------

```python
# tools/image_gen.py
from typing import Optional
import fal_client
import asyncio
import os

async def generate_image(prompt: str, model: str = "fal-ai/ideogram/v2", aspect_ratio: str = "1:1", expand_prompt: bool = True, style: str = "auto", negative_prompt: str = "") -> str:
    """
    Generate an image using FAL AI based on a text prompt.
    """
    fal_key = os.getenv("FAL_KEY")
    print(f"FAL_KEY in environment: {fal_key[:4] if fal_key else 'Not set'}...")

    def on_queue_update(update):
        if isinstance(update, fal_client.InProgress):
            for log in update.logs:
                print(log["message"])

    try:
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(
            None,
            lambda: fal_client.subscribe(
                model,
                arguments={
                    "prompt": prompt,
                    "aspect_ratio": aspect_ratio,
                    "expand_prompt": expand_prompt,
                    "style": style,
                    "negative_prompt": negative_prompt
                },
                with_logs=True,
                on_queue_update=on_queue_update,
            )
        )
        print(f"Raw FAL response: {result}")
        if result and isinstance(result, dict) and "images" in result and len(result["images"]) > 0:
            return result["images"][0]["url"]
        return "Image generation completed, but no URL returned."
    except Exception as e:
        return f"Error generating image: {str(e)}"
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
# This file was autogenerated by uv via the following command:
#    uv pip compile -o requirements.txt pyproject.toml
annotated-types==0.7.0
    # via pydantic
anyio==4.8.0
    # via
    #   httpx
    #   mcp
    #   sse-starlette
    #   starlette
certifi==2025.1.31
    # via
    #   httpcore
    #   httpx
click==8.1.8
    # via
    #   typer
    #   uvicorn
fal-client==0.5.9
    # via mcp-tool-server (pyproject.toml)
fastapi==0.115.11
    # via mcp-tool-server (pyproject.toml)
h11==0.14.0
    # via
    #   httpcore
    #   uvicorn
httpcore==1.0.7
    # via httpx
httpx==0.28.1
    # via
    #   fal-client
    #   mcp
httpx-sse==0.4.0
    # via
    #   fal-client
    #   mcp
idna==3.10
    # via
    #   anyio
    #   httpx
markdown-it-py==3.0.0
    # via rich
mcp==1.3.0
    # via mcp-tool-server (pyproject.toml)
mdurl==0.1.2
    # via markdown-it-py
pydantic==2.10.6
    # via
    #   fastapi
    #   mcp
    #   pydantic-settings
pydantic-core==2.27.2
    # via pydantic
pydantic-settings==2.8.1
    # via mcp
pygments==2.19.1
    # via rich
python-dotenv==1.0.1
    # via
    #   mcp-tool-server (pyproject.toml)
    #   mcp
    #   pydantic-settings
rich==13.9.4
    # via typer
shellingham==1.5.4
    # via typer
sniffio==1.3.1
    # via anyio
sse-starlette==2.2.1
    # via
    #   mcp-tool-server (pyproject.toml)
    #   mcp
starlette==0.46.0
    # via
    #   fastapi
    #   mcp
    #   sse-starlette
typer==0.15.2
    # via mcp
typing-extensions==4.12.2
    # via
    #   fastapi
    #   pydantic
    #   pydantic-core
    #   typer
uvicorn==0.34.0
    # via
    #   mcp-tool-server (pyproject.toml)
    #   mcp

```

--------------------------------------------------------------------------------
/tools/image_scaling.py:
--------------------------------------------------------------------------------

```python
from PIL import Image
import os
from typing import List, Tuple

async def scale_image(input_path: str, sizes: List[Tuple[int, int]] = [(32, 32), (128, 128)]) -> str:
    """
    Scale an image to multiple specified sizes while preserving transparency.
    
    Args:
        input_path: Path to the input image
        sizes: List of (width, height) tuples for desired output sizes
    
    Returns:
        str: Message indicating where the scaled images were saved
    """
    try:
        if not os.path.exists(input_path):
            return f"Error: Input file {input_path} does not exist"

        # Open the image while preserving transparency
        with Image.open(input_path) as img:
            # Convert to RGBA if not already
            if img.mode != 'RGBA':
                img = img.convert('RGBA')
            
            # Get the base filename and directory
            directory = os.path.dirname(input_path)
            filename = os.path.splitext(os.path.basename(input_path))[0]
            
            scaled_files = []
            # Create scaled versions
            for width, height in sizes:
                # Resize the image using high-quality resampling
                scaled = img.resize((width, height), Image.Resampling.LANCZOS)
                
                # Generate output filename
                output_filename = f"{filename}_{width}x{height}.png"
                output_path = os.path.join(directory, output_filename)
                
                # Save with transparency
                scaled.save(output_path, "PNG")
                scaled_files.append(output_path)
            
            return f"Successfully created scaled versions: {', '.join(scaled_files)}"
            
    except Exception as e:
        return f"Error scaling image: {str(e)}" 
```

--------------------------------------------------------------------------------
/tools/background_removal.py:
--------------------------------------------------------------------------------

```python
import base64
from typing import Optional
import fal_client
import asyncio
import os
from .image_download import download_image_from_url

def is_base64(s: str) -> bool:
    """Check if a string is base64 encoded."""
    try:
        # Check if string starts with data URI scheme
        if s.startswith('data:image'):
            # Extract the base64 part after the comma
            base64_str = s.split(',')[1]
            # Try to decode it
            base64.b64decode(base64_str)
            return True
    except Exception:
        pass
    return False

async def remove_background(
    image_url: str,
    sync_mode: bool = True,
    crop_to_bbox: bool = False
) -> str:
    """
    Remove background from an image using FAL AI.
    """
    fal_key = os.getenv("FAL_KEY")
    print(f"FAL_KEY in environment: {fal_key[:4] if fal_key else 'Not set'}...")

    try:
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(
            None,
            lambda: fal_client.subscribe(
                "fal-ai/bria/background/remove",
                arguments={
                    "image_url": image_url,
                    "sync_mode": sync_mode
                }
            )
        )
        
        # Handle the response according to the new schema
        if isinstance(result, dict) and "image" in result:
            image_data = result["image"]
            if "url" in image_data:
                print("Successfully removed background from image")
                return image_data["url"]  # Return the FAL-hosted URL directly
            else:
                return "Background removal completed, but no image URL was returned"
        else:
            return f"Unexpected response format: {str(result)}"
    except Exception as e:
        return f"Error removing background: {str(e)}" 
```

--------------------------------------------------------------------------------
/tools/image_download.py:
--------------------------------------------------------------------------------

```python
from typing import Optional
import aiohttp
import asyncio
import os
from urllib.parse import urlparse
import mimetypes

async def download_image_from_url(image_url: str, output_dir: str = "downloads") -> str:
    """
    Download an image from a URL and save it locally.
    """
    try:
        # Create downloads directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)

        # Extract filename from URL or generate one
        parsed_url = urlparse(image_url)
        filename = os.path.basename(parsed_url.path)
        if not filename:
            # If no filename in URL, create one based on timestamp
            content_type = mimetypes.guess_type(image_url)[0]
            ext = mimetypes.guess_extension(content_type) if content_type else '.jpg'
            filename = f"image_{int(asyncio.get_event_loop().time())}{ext}"

        output_path = os.path.join(output_dir, filename)

        async with aiohttp.ClientSession() as session:
            async with session.get(image_url) as response:
                if response.status != 200:
                    return f"Error downloading image: HTTP {response.status}"
                
                # Verify it's an image from content-type
                content_type = response.headers.get('content-type', '')
                if not content_type.startswith('image/'):
                    return f"Error: URL does not point to an image (content-type: {content_type})"

                # Download and save the image
                with open(output_path, 'wb') as f:
                    while True:
                        chunk = await response.content.read(8192)
                        if not chunk:
                            break
                        f.write(chunk)

        return f"Image successfully downloaded to: {output_path}"
    except Exception as e:
        return f"Error downloading image: {str(e)}" 
```

--------------------------------------------------------------------------------
/run_server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""
Server runner script with clean shutdown handling and auto-reload.
This script runs the server in a subprocess, handles Ctrl+C properly,
and automatically restarts the server when files change.
"""

import os
import signal
import subprocess
import sys
import time
import threading
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# Flag to indicate if we should restart the server
restart_server = False
# Flag to indicate if we're shutting down
shutting_down = False

class FileChangeHandler(FileSystemEventHandler):
    def on_any_event(self, event):
        global restart_server
        # Skip temporary files and __pycache__ directories
        if (event.src_path.endswith('.pyc') or 
            '__pycache__' in event.src_path or 
            '.git' in event.src_path or
            event.is_directory):
            return
        
        # Only restart for Python files
        if event.src_path.endswith('.py'):
            print(f"\n[RELOAD] Detected change in {event.src_path}")
            restart_server = True

def start_file_watcher(directory):
    """Start watching for file changes in the specified directory."""
    event_handler = FileChangeHandler()
    observer = Observer()
    observer.schedule(event_handler, directory, recursive=True)
    observer.start()
    return observer

def run_server():
    """Run the server process and handle its lifecycle."""
    global restart_server, shutting_down
    
    # Get the path to server.py in the same directory as this script
    current_dir = os.path.dirname(os.path.abspath(__file__)) if __file__ else "."
    server_path = os.path.join(current_dir, "server.py")
    
    # Start the server as a subprocess
    server_process = subprocess.Popen(
        [sys.executable, server_path],
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        universal_newlines=True,
        bufsize=1  # Line buffered
    )
    
    # Print server output in real-time
    def print_output():
        for line in server_process.stdout:
            if not shutting_down:  # Only print if we're not shutting down
                print(line, end='')
    
    # Start a thread to print output
    output_thread = threading.Thread(target=print_output)
    output_thread.daemon = True
    output_thread.start()
    
    # Monitor the server process
    while server_process.poll() is None:
        if restart_server:
            print("\n[RELOAD] Restarting server due to file changes...")
            server_process.terminate()
            try:
                server_process.wait(timeout=2)
            except subprocess.TimeoutExpired:
                server_process.kill()
                server_process.wait()
            restart_server = False
            return True  # Signal to restart
        time.sleep(0.1)
    
    # If we get here, the server exited on its own
    return_code = server_process.poll()
    print(f"\nServer exited with code {return_code}")
    return False  # Signal not to restart

def main():
    global restart_server, shutting_down
    
    print("Starting MCP Tool Server with clean shutdown handling and auto-reload...")
    
    # Get the current directory (where this script is located)
    current_dir = os.path.dirname(os.path.abspath(__file__)) if __file__ else "."
    
    # Create downloads directory if it doesn't exist
    downloads_dir = os.path.join(current_dir, "downloads")
    if not os.path.exists(downloads_dir):
        os.makedirs(downloads_dir)
        print(f"Created downloads directory at: {downloads_dir}")
    
    # Start file watcher
    observer = start_file_watcher(current_dir)
    
    # Function to handle Ctrl+C
    def signal_handler(sig, frame):
        global shutting_down
        print("\nReceived shutdown signal. Terminating server...")
        shutting_down = True
        observer.stop()
        sys.exit(0)
    
    # Register signal handlers
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    # Run the server, restarting as needed
    try:
        while True:
            should_restart = run_server()
            if not should_restart:
                break
            time.sleep(0.5)  # Small delay before restart
    except KeyboardInterrupt:
        signal_handler(signal.SIGINT, None)
    finally:
        observer.stop()
        observer.join()
    
    return 0

if __name__ == "__main__":
    sys.exit(main()) 
```

--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------

```python
 # server.py
import asyncio
import click
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
from tools.image_gen import generate_image
from tools.background_removal import remove_background
from tools.image_download import download_image_from_url
from tools.image_scaling import scale_image
from typing import Optional
import os
import sys
from dotenv import load_dotenv
from fastapi import FastAPI
from mcp.server.sse import SseServerTransport
from starlette.routing import Mount, Route
import signal
import uvicorn

# Debug: Print current working directory
print(f"Current working directory: {os.getcwd()}")

# Load environment variables
print("Loading environment variables...")
load_dotenv(verbose=True)
print(f"Environment after load_dotenv: FAL_KEY={'*' * len(os.getenv('FAL_KEY')) if os.getenv('FAL_KEY') else 'Not found'}")

# Initialize the server
app = FastAPI(debug=True)
server = Server("image-gen-server")
sse = SseServerTransport("/messages/")

# Force exit on SIGINT (Ctrl+C)
def force_exit_handler(sig, frame):
    print("\nForce exiting server...")
    os._exit(0)  # Force immediate exit

# Register signal handlers
signal.signal(signal.SIGINT, force_exit_handler)
signal.signal(signal.SIGTERM, force_exit_handler)

# Add shutdown event handler
@app.on_event("shutdown")
async def shutdown_event():
    print("Shutting down server gracefully...")
    # Cancel all tasks
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    for task in tasks:
        task.cancel()
    
    # Wait briefly for tasks to cancel, then force exit if needed
    try:
        await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=1.0)
        print("All tasks cancelled successfully")
    except asyncio.TimeoutError:
        print("Timeout waiting for tasks to cancel, forcing exit")
        os._exit(0)

@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
    """List available resources."""
    return []

@server.read_resource()
async def handle_read_resource(uri: str) -> str:
    """Read a specific resource."""
    raise ValueError(f"Unsupported resource: {uri}")

@server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
    """List available prompts."""
    return []

@server.get_prompt()
async def handle_get_prompt(name: str, arguments: dict[str, str] | None) -> types.GetPromptResult:
    """Get a specific prompt."""
    raise ValueError(f"Unknown prompt: {name}")

@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
    """List available tools."""
    return [
        types.Tool(
            name="generate_image",
            description="Generate an image from a text prompt using FAL AI. For best results with logos and icons, use the format: '[subject], 2D flat design, [optional style details], white background'. Example: 'pine tree logo, 2D flat design, minimal geometric style, white background'",
            inputSchema={
                "type": "object",
                "properties": {
                    "prompt": {
                        "type": "string",
                        "description": "Text prompt to generate the image. Recommended format: '[subject], 2D flat design, [optional style details], white background'",
                        "examples": [
                            "mountain peak logo, 2D flat design, minimalist geometric shapes, white background",
                            "coffee cup icon, 2D flat design, simple line art style, white background",
                            "fox mascot, 2D flat design, modern geometric shapes, white background"
                        ]
                    },
                    "model": {
                        "type": "string",
                        "description": "Model to use for generation",
                        "default": "fal-ai/ideogram/v2",
                        "enum": ["fal-ai/ideogram/v2"]
                    },
                    "aspect_ratio": {
                        "type": "string",
                        "description": "The aspect ratio of the generated image",
                        "default": "1:1",
                        "enum": ["10:16", "16:10", "9:16", "16:9", "4:3", "3:4", "1:1", "1:3", "3:1", "3:2", "2:3"]
                    },
                    "expand_prompt": {
                        "type": "boolean",
                        "description": "Whether to expand the prompt with MagicPrompt functionality",
                        "default": True
                    },
                    "style": {
                        "type": "string",
                        "description": "The style of the generated image",
                        "default": "auto",
                        "enum": ["auto", "general", "realistic", "design", "render_3D", "anime"]
                    },
                    "negative_prompt": {
                        "type": "string",
                        "description": "A negative prompt to avoid in the generated image",
                        "default": ""
                    }
                },
                "required": ["prompt"]
            }
        ),
        types.Tool(
            name="remove_background",
            description="Remove background from an image using FAL AI",
            inputSchema={
                "type": "object",
                "properties": {
                    "image_url": {
                        "type": "string",
                        "description": "Input image url"
                    },
                    "sync_mode": {
                        "type": "boolean",
                        "description": "If true, wait for the image to be generated and uploaded before returning",
                        "default": True
                    },
                    "crop_to_bbox": {
                        "type": "boolean",
                        "description": "If true, crop the result to a bounding box around the subject",
                        "default": False
                    }
                },
                "required": ["image_url"]
            }
        ),
        types.Tool(
            name="download_image",
            description="Download an image from a URL and save it locally",
            inputSchema={
                "type": "object",
                "properties": {
                    "image_url": {
                        "type": "string",
                        "description": "URL of the image to download"
                    },
                    "output_dir": {
                        "type": "string",
                        "description": "Directory to save the downloaded image",
                        "default": "downloads"
                    }
                },
                "required": ["image_url"]
            }
        ),
        types.Tool(
            name="scale_image",
            description="Scale an image to multiple sizes while preserving transparency",
            inputSchema={
                "type": "object",
                "properties": {
                    "input_path": {
                        "type": "string",
                        "description": "Path to the input image to scale"
                    },
                    "sizes": {
                        "type": "array",
                        "items": {
                            "type": "array",
                            "items": {"type": "integer"},
                            "minItems": 2,
                            "maxItems": 2
                        },
                        "description": "List of [width, height] pairs for desired output sizes",
                        "default": [[32, 32], [128, 128]]
                    }
                },
                "required": ["input_path"]
            }
        )
    ]

class ImageGenToolHandler:
    def validate_prompt(self, prompt: str) -> bool:
        """
        Validate that the prompt is not empty.
        """
        return bool(prompt and prompt.strip())

    async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent]:
        prompt = arguments.get("prompt")
        if not prompt or not self.validate_prompt(prompt):
            return [types.TextContent(
                type="text", 
                text="Error: Prompt cannot be empty"
            )]
            
        print(f"Generating image with prompt: {prompt}")
        result = await generate_image(
            prompt=prompt,
            model=arguments.get("model", "fal-ai/ideogram/v2"),
            aspect_ratio=arguments.get("aspect_ratio", "1:1"),
            expand_prompt=arguments.get("expand_prompt", True),
            style=arguments.get("style", "auto"),
            negative_prompt=arguments.get("negative_prompt", "")
        )
        print(f"Image generation result: {result}")
        if result.startswith("http"):
            return [types.TextContent(type="text", text=f"Generated image URL: {result}")]
        return [types.TextContent(type="text", text=result)]

class BackgroundRemovalToolHandler:
    async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent]:
        print(f"Removing background from image: {arguments.get('image_url')}")
        result = await remove_background(
            arguments.get("image_url"),
            arguments.get("sync_mode", True),
            arguments.get("crop_to_bbox", False)
        )
        print(f"Background removal result: {result}")
        if result.startswith("http"):
            return [types.TextContent(type="text", text=f"Background removed image URL: {result}")]
        return [types.TextContent(type="text", text=result)]

class ImageDownloadToolHandler:
    async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent]:
        print(f"Downloading image from: {arguments.get('image_url')}")
        result = await download_image_from_url(
            arguments.get("image_url"),
            arguments.get("output_dir", "downloads")
        )
        print(f"Download result: {result}")
        return [types.TextContent(type="text", text=result)]

class ImageScalingToolHandler:
    async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent]:
        print(f"Scaling image: {arguments.get('input_path')}")
        result = await scale_image(
            arguments.get("input_path"),
            arguments.get("sizes", [(32, 32), (128, 128)])
        )
        print(f"Scaling result: {result}")
        return [types.TextContent(type="text", text=result)]

tool_handlers = {
    "generate_image": ImageGenToolHandler(),
    "remove_background": BackgroundRemovalToolHandler(),
    "download_image": ImageDownloadToolHandler(),
    "scale_image": ImageScalingToolHandler()
}

@server.call_tool()
async def handle_call_tool(
    name: str,
    arguments: dict | None
) -> list[types.TextContent | types.ImageContent]:
    """Handle tool execution requests."""
    if name in tool_handlers:
        return await tool_handlers[name].handle(name, arguments)
    else:
        raise ValueError(f"Unknown tool: {name}")

async def handle_sse(request):
    async with sse.connect_sse(
        request.scope, request.receive, request._send
    ) as streams:
        await server.run(
            streams[0],
            streams[1],
            InitializationOptions(
                server_name="image-gen-server",
                server_version="0.1.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={},
                ),
            ),
        )

@click.command()
@click.option("--port", default=7777, help="Port to listen on")
def main(port: int) -> int:
    # Ensure FAL_KEY is set
    fal_key = os.getenv("FAL_KEY")
    if not fal_key:
        print("Warning: FAL_KEY environment variable not found, checking FAL_API_KEY...")
        fal_key = os.getenv("FAL_API_KEY")
        if not fal_key:
            print("Error: Neither FAL_KEY nor FAL_API_KEY environment variables are set")
            exit(1)
        os.environ["FAL_KEY"] = fal_key

    print("Starting image generation server...")

    # Add routes
    app.add_route("/sse", handle_sse)
    app.mount("/messages", sse.handle_post_message)

    # Cool ASCII art log
    print("""
    ===========================================
          🚀 MCP Server is LIVE! 🚀
    ------------------------------------------- 
    |  Status: Running                        |
    |  Transport: SSE                         |
    |  URL: http://127.0.0.1:{}              |
    |  Ready for Cursor MCP client            |
    |  Auto-reload: Enabled                   |
    |  Force exit on Ctrl+C: Enabled          |
    ------------------------------------------- 
    Listening for requests... 🎉
    ===========================================
    """.format(port))

    # Configure uvicorn with a short timeout for graceful shutdown
    config = uvicorn.Config(
        app=app,
        host="127.0.0.1",
        port=port,
        reload=True,
        reload_dirs=["mcp_tool_server"],
        workers=1,
        timeout_graceful_shutdown=1  # Only wait 1 second for graceful shutdown
    )
    
    # Run with a custom server instance that has a shorter timeout
    server = uvicorn.Server(config)
    
    try:
        server.run()
    except KeyboardInterrupt:
        print("KeyboardInterrupt received, forcing exit...")
        os._exit(0)
    
    return 0

if __name__ == "__main__":
    main()
```