#
tokens: 10050/50000 16/16 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .cursor
│   └── rules
│       └── logo-creation.mdc
├── .gitignore
├── .python-version
├── config
│   ├── __init__.py
│   └── settings.py
├── Dockerfile
├── downloads
│   ├── db-icon-1_128x128.png
│   ├── db-icon-1_32x32.png
│   ├── db-icon-1.png
│   ├── fighter_jet.glb
│   ├── skKKrkhF_XplQxNPUPrFX_f09176a2fab045d0945f724a3833b470.png
│   ├── y8c1zcRFBHv00oJ3mnonf_8293637079c74a8a8570c655a55904c9_128x128.png
│   ├── y8c1zcRFBHv00oJ3mnonf_8293637079c74a8a8570c655a55904c9_32x32.png
│   ├── y8c1zcRFBHv00oJ3mnonf_8293637079c74a8a8570c655a55904c9.png
│   ├── zEChDDxjUQrMQebsjJxEk_3e241e40750a4293bc1230f064b691be_128x128.png
│   ├── zEChDDxjUQrMQebsjJxEk_3e241e40750a4293bc1230f064b691be_32x32.png
│   └── zEChDDxjUQrMQebsjJxEk_3e241e40750a4293bc1230f064b691be.png
├── LICENSE
├── pyproject.toml
├── README.md
├── requirements.txt
├── routes
│   └── scale_image.py
├── run_server.py
├── server.py
├── tools
│   ├── __init__.py
│   ├── background_removal.py
│   ├── image_download.py
│   ├── image_gen.py
│   └── image_scaling.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.13
2 | 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python-generated files
 2 | __pycache__/
 3 | *.py[oc]
 4 | build/
 5 | dist/
 6 | wheels/
 7 | *.egg-info
 8 | 
 9 | # Virtual environments
10 | .venv
11 | 
12 | # Environment variables
13 | .env
14 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
 1 | # MCP Tool Server for Logo Generation
 2 | 
 3 | This server provides logo generation capabilities using FAL AI, with tools for image generation, background removal, and automatic scaling.
 4 | 
 5 | ## Demo
 6 | 
 7 | [![MCP Tool Server Demo](https://img.youtube.com/vi/Miemu1xEZng/0.jpg)](https://www.youtube.com/watch?v=Miemu1xEZng)
 8 | 
 9 | ## Installation
10 | 
11 | 1. Install `uv` (Universal Virtualenv):
12 | 
13 | ```bash
14 | curl -LsSf https://astral.sh/uv/install.sh | sh
15 | ```
16 | 
17 | 2. Create and activate a virtual environment:
18 | 
19 | ```bash
20 | uv venv
21 | source .venv/bin/activate  # On Unix/macOS
22 | # or
23 | .venv\Scripts\activate     # On Windows
24 | ```
25 | 
26 | 3. Install dependencies:
27 | 
28 | ```bash
29 | uv pip install -r requirements.txt
30 | ```
31 | 
32 | 4. Set up your environment variables:
33 |    - Create a `.env` file in the root directory
34 |    - Add your FAL AI API key:
35 | 
36 | ```bash
37 | FAL_KEY=your_fal_ai_key_here
38 | ```
39 | 
40 | ## Running the Server
41 | 
42 | Start the server with:
43 | 
44 | ```bash
45 | python run_server.py
46 | ```
47 | 
48 | The server will be available at `http://127.0.0.1:7777`
49 | 
50 | ### Troubleshooting
51 | 
52 | If you encounter a `FileNotFoundError` on Windows when running the server, make sure you're running the command from the root directory of the project. If the issue persists, try updating to the latest version of the repository which includes fixes for Windows compatibility.
53 | 
54 | For Windows users specifically:
55 | 
56 | 1. Make sure you've activated your virtual environment with `.venv\Scripts\activate`
57 | 2. Run the server from the root directory of the project with `python run_server.py`
58 | 3. If you see any path-related errors, please report them in the issues section of the repository
59 | 
60 | ## Cursor IDE Configuration
61 | 
62 | 1. Open Cursor Settings
63 | 2. Navigate to the MCP section
64 | 3. Add the following configuration:
65 |    - URL: `http://127.0.0.1:7777/sse`
66 |    - Connection Type: `SSE`
67 |    - Enable the connection
68 | 
69 | ## Notes
70 | 
71 | - Always reference `@logo-creation.mdc` in your Cursor Composer for consistent results
72 | - Steps are defined in `@logo-creation.mdc` but tools can be used independently
73 | - All generated logos will be saved in the `downloads` directory
74 | - Each logo is automatically generated in three sizes:
75 |   - Original size
76 |   - 32x32 pixels
77 |   - 128x128 pixels
78 | - All logos maintain transparency in their final PNG format
79 | - Prompts created by agent are informed by examples and prompt structure seen in server.py. You can customize the prompt structure by editing the server.py file.
80 | - You can use the generate_image tool to generate any image you want, not just logos
81 | 
82 | ## Requirements
83 | 
84 | - Python 3.8+
85 | - FAL AI API key (required for image generation)
86 | - Active internet connection
87 | 
88 | ## References
89 | 
90 | - [Cursor MCP Documentation](https://docs.cursor.com/context/model-context-protocol)
91 | - [Model Context Protocol Introduction](https://modelcontextprotocol.io/introduction)
92 | - [FAL AI Dashboard](https://fal.ai/dashboard)
93 | 
94 | ---
95 | 
96 | If you find this tool helpful, you can [buy me a coffee](https://buymeacoffee.com/sshtunnelvision) ☕️ to support development!
97 | 
```

--------------------------------------------------------------------------------
/config/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
1 | FROM python:3.10-slim
2 | WORKDIR /app
3 | COPY requirements.txt .
4 | RUN pip install --no-cache-dir -r requirements.txt
5 | COPY . .
6 | EXPOSE 8000
7 | CMD ["uv", "run", "server.py"]
```

--------------------------------------------------------------------------------
/config/settings.py:
--------------------------------------------------------------------------------

```python
1 | # config/settings.py
2 | import os
3 | 
4 | # Load environment variables (e.g., from a .env file or system env)
5 | FAL_API_KEY = os.getenv("FAL_API_KEY")  # Replace with your actual key
```

--------------------------------------------------------------------------------
/tools/__init__.py:
--------------------------------------------------------------------------------

```python
 1 | from .image_gen import generate_image
 2 | from .background_removal import remove_background
 3 | from .image_download import download_image_from_url
 4 | from .image_scaling import scale_image
 5 | 
 6 | __all__ = [
 7 |     'generate_image',
 8 |     'remove_background',
 9 |     'download_image_from_url',
10 |     'scale_image'
11 | ]
12 | 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "mcp-tool-server"
 3 | version = "0.1.0"
 4 | description = "Add your description here"
 5 | readme = "README.md"
 6 | requires-python = ">=3.13"
 7 | dependencies = [
 8 |     "fal-client>=0.5.9",
 9 |     "fastapi>=0.115.11",
10 |     "mcp[cli]>=1.3.0",
11 |     "python-dotenv>=1.0.1",
12 |     "sse-starlette>=2.2.1",
13 |     "uvicorn>=0.34.0",
14 | ]
15 | 
```

--------------------------------------------------------------------------------
/routes/scale_image.py:
--------------------------------------------------------------------------------

```python
 1 | from fastapi import APIRouter, HTTPException
 2 | from pydantic import BaseModel
 3 | from typing import List, Tuple
 4 | from ..tools import scale_image
 5 | 
 6 | router = APIRouter()
 7 | 
 8 | class ScaleImageRequest(BaseModel):
 9 |     input_path: str
10 |     sizes: List[Tuple[int, int]] = [(32, 32), (128, 128)]
11 | 
12 | @router.post("/scale-image")
13 | async def scale_image_route(request: ScaleImageRequest):
14 |     """
15 |     Scale an image to specified sizes while preserving transparency.
16 |     """
17 |     try:
18 |         result = await scale_image(request.input_path, request.sizes)
19 |         return {"message": result}
20 |     except Exception as e:
21 |         raise HTTPException(status_code=500, detail=str(e)) 
```

--------------------------------------------------------------------------------
/tools/image_gen.py:
--------------------------------------------------------------------------------

```python
 1 | # tools/image_gen.py
 2 | from typing import Optional
 3 | import fal_client
 4 | import asyncio
 5 | import os
 6 | 
 7 | async def generate_image(prompt: str, model: str = "fal-ai/ideogram/v2", aspect_ratio: str = "1:1", expand_prompt: bool = True, style: str = "auto", negative_prompt: str = "") -> str:
 8 |     """
 9 |     Generate an image using FAL AI based on a text prompt.
10 |     """
11 |     fal_key = os.getenv("FAL_KEY")
12 |     print(f"FAL_KEY in environment: {fal_key[:4] if fal_key else 'Not set'}...")
13 | 
14 |     def on_queue_update(update):
15 |         if isinstance(update, fal_client.InProgress):
16 |             for log in update.logs:
17 |                 print(log["message"])
18 | 
19 |     try:
20 |         loop = asyncio.get_event_loop()
21 |         result = await loop.run_in_executor(
22 |             None,
23 |             lambda: fal_client.subscribe(
24 |                 model,
25 |                 arguments={
26 |                     "prompt": prompt,
27 |                     "aspect_ratio": aspect_ratio,
28 |                     "expand_prompt": expand_prompt,
29 |                     "style": style,
30 |                     "negative_prompt": negative_prompt
31 |                 },
32 |                 with_logs=True,
33 |                 on_queue_update=on_queue_update,
34 |             )
35 |         )
36 |         print(f"Raw FAL response: {result}")
37 |         if result and isinstance(result, dict) and "images" in result and len(result["images"]) > 0:
38 |             return result["images"][0]["url"]
39 |         return "Image generation completed, but no URL returned."
40 |     except Exception as e:
41 |         return f"Error generating image: {str(e)}"
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
 1 | # This file was autogenerated by uv via the following command:
 2 | #    uv pip compile -o requirements.txt pyproject.toml
 3 | annotated-types==0.7.0
 4 |     # via pydantic
 5 | anyio==4.8.0
 6 |     # via
 7 |     #   httpx
 8 |     #   mcp
 9 |     #   sse-starlette
10 |     #   starlette
11 | certifi==2025.1.31
12 |     # via
13 |     #   httpcore
14 |     #   httpx
15 | click==8.1.8
16 |     # via
17 |     #   typer
18 |     #   uvicorn
19 | fal-client==0.5.9
20 |     # via mcp-tool-server (pyproject.toml)
21 | fastapi==0.115.11
22 |     # via mcp-tool-server (pyproject.toml)
23 | h11==0.14.0
24 |     # via
25 |     #   httpcore
26 |     #   uvicorn
27 | httpcore==1.0.7
28 |     # via httpx
29 | httpx==0.28.1
30 |     # via
31 |     #   fal-client
32 |     #   mcp
33 | httpx-sse==0.4.0
34 |     # via
35 |     #   fal-client
36 |     #   mcp
37 | idna==3.10
38 |     # via
39 |     #   anyio
40 |     #   httpx
41 | markdown-it-py==3.0.0
42 |     # via rich
43 | mcp==1.3.0
44 |     # via mcp-tool-server (pyproject.toml)
45 | mdurl==0.1.2
46 |     # via markdown-it-py
47 | pydantic==2.10.6
48 |     # via
49 |     #   fastapi
50 |     #   mcp
51 |     #   pydantic-settings
52 | pydantic-core==2.27.2
53 |     # via pydantic
54 | pydantic-settings==2.8.1
55 |     # via mcp
56 | pygments==2.19.1
57 |     # via rich
58 | python-dotenv==1.0.1
59 |     # via
60 |     #   mcp-tool-server (pyproject.toml)
61 |     #   mcp
62 |     #   pydantic-settings
63 | rich==13.9.4
64 |     # via typer
65 | shellingham==1.5.4
66 |     # via typer
67 | sniffio==1.3.1
68 |     # via anyio
69 | sse-starlette==2.2.1
70 |     # via
71 |     #   mcp-tool-server (pyproject.toml)
72 |     #   mcp
73 | starlette==0.46.0
74 |     # via
75 |     #   fastapi
76 |     #   mcp
77 |     #   sse-starlette
78 | typer==0.15.2
79 |     # via mcp
80 | typing-extensions==4.12.2
81 |     # via
82 |     #   fastapi
83 |     #   pydantic
84 |     #   pydantic-core
85 |     #   typer
86 | uvicorn==0.34.0
87 |     # via
88 |     #   mcp-tool-server (pyproject.toml)
89 |     #   mcp
90 | 
```

--------------------------------------------------------------------------------
/tools/image_scaling.py:
--------------------------------------------------------------------------------

```python
 1 | from PIL import Image
 2 | import os
 3 | from typing import List, Tuple
 4 | 
 5 | async def scale_image(input_path: str, sizes: List[Tuple[int, int]] = [(32, 32), (128, 128)]) -> str:
 6 |     """
 7 |     Scale an image to multiple specified sizes while preserving transparency.
 8 |     
 9 |     Args:
10 |         input_path: Path to the input image
11 |         sizes: List of (width, height) tuples for desired output sizes
12 |     
13 |     Returns:
14 |         str: Message indicating where the scaled images were saved
15 |     """
16 |     try:
17 |         if not os.path.exists(input_path):
18 |             return f"Error: Input file {input_path} does not exist"
19 | 
20 |         # Open the image while preserving transparency
21 |         with Image.open(input_path) as img:
22 |             # Convert to RGBA if not already
23 |             if img.mode != 'RGBA':
24 |                 img = img.convert('RGBA')
25 |             
26 |             # Get the base filename and directory
27 |             directory = os.path.dirname(input_path)
28 |             filename = os.path.splitext(os.path.basename(input_path))[0]
29 |             
30 |             scaled_files = []
31 |             # Create scaled versions
32 |             for width, height in sizes:
33 |                 # Resize the image using high-quality resampling
34 |                 scaled = img.resize((width, height), Image.Resampling.LANCZOS)
35 |                 
36 |                 # Generate output filename
37 |                 output_filename = f"{filename}_{width}x{height}.png"
38 |                 output_path = os.path.join(directory, output_filename)
39 |                 
40 |                 # Save with transparency
41 |                 scaled.save(output_path, "PNG")
42 |                 scaled_files.append(output_path)
43 |             
44 |             return f"Successfully created scaled versions: {', '.join(scaled_files)}"
45 |             
46 |     except Exception as e:
47 |         return f"Error scaling image: {str(e)}" 
```

--------------------------------------------------------------------------------
/tools/background_removal.py:
--------------------------------------------------------------------------------

```python
 1 | import base64
 2 | from typing import Optional
 3 | import fal_client
 4 | import asyncio
 5 | import os
 6 | from .image_download import download_image_from_url
 7 | 
 8 | def is_base64(s: str) -> bool:
 9 |     """Check if a string is base64 encoded."""
10 |     try:
11 |         # Check if string starts with data URI scheme
12 |         if s.startswith('data:image'):
13 |             # Extract the base64 part after the comma
14 |             base64_str = s.split(',')[1]
15 |             # Try to decode it
16 |             base64.b64decode(base64_str)
17 |             return True
18 |     except Exception:
19 |         pass
20 |     return False
21 | 
22 | async def remove_background(
23 |     image_url: str,
24 |     sync_mode: bool = True,
25 |     crop_to_bbox: bool = False
26 | ) -> str:
27 |     """
28 |     Remove background from an image using FAL AI.
29 |     """
30 |     fal_key = os.getenv("FAL_KEY")
31 |     print(f"FAL_KEY in environment: {fal_key[:4] if fal_key else 'Not set'}...")
32 | 
33 |     try:
34 |         loop = asyncio.get_event_loop()
35 |         result = await loop.run_in_executor(
36 |             None,
37 |             lambda: fal_client.subscribe(
38 |                 "fal-ai/bria/background/remove",
39 |                 arguments={
40 |                     "image_url": image_url,
41 |                     "sync_mode": sync_mode
42 |                 }
43 |             )
44 |         )
45 |         
46 |         # Handle the response according to the new schema
47 |         if isinstance(result, dict) and "image" in result:
48 |             image_data = result["image"]
49 |             if "url" in image_data:
50 |                 print("Successfully removed background from image")
51 |                 return image_data["url"]  # Return the FAL-hosted URL directly
52 |             else:
53 |                 return "Background removal completed, but no image URL was returned"
54 |         else:
55 |             return f"Unexpected response format: {str(result)}"
56 |     except Exception as e:
57 |         return f"Error removing background: {str(e)}" 
```

--------------------------------------------------------------------------------
/tools/image_download.py:
--------------------------------------------------------------------------------

```python
 1 | from typing import Optional
 2 | import aiohttp
 3 | import asyncio
 4 | import os
 5 | from urllib.parse import urlparse
 6 | import mimetypes
 7 | 
 8 | async def download_image_from_url(image_url: str, output_dir: str = "downloads") -> str:
 9 |     """
10 |     Download an image from a URL and save it locally.
11 |     """
12 |     try:
13 |         # Create downloads directory if it doesn't exist
14 |         os.makedirs(output_dir, exist_ok=True)
15 | 
16 |         # Extract filename from URL or generate one
17 |         parsed_url = urlparse(image_url)
18 |         filename = os.path.basename(parsed_url.path)
19 |         if not filename:
20 |             # If no filename in URL, create one based on timestamp
21 |             content_type = mimetypes.guess_type(image_url)[0]
22 |             ext = mimetypes.guess_extension(content_type) if content_type else '.jpg'
23 |             filename = f"image_{int(asyncio.get_event_loop().time())}{ext}"
24 | 
25 |         output_path = os.path.join(output_dir, filename)
26 | 
27 |         async with aiohttp.ClientSession() as session:
28 |             async with session.get(image_url) as response:
29 |                 if response.status != 200:
30 |                     return f"Error downloading image: HTTP {response.status}"
31 |                 
32 |                 # Verify it's an image from content-type
33 |                 content_type = response.headers.get('content-type', '')
34 |                 if not content_type.startswith('image/'):
35 |                     return f"Error: URL does not point to an image (content-type: {content_type})"
36 | 
37 |                 # Download and save the image
38 |                 with open(output_path, 'wb') as f:
39 |                     while True:
40 |                         chunk = await response.content.read(8192)
41 |                         if not chunk:
42 |                             break
43 |                         f.write(chunk)
44 | 
45 |         return f"Image successfully downloaded to: {output_path}"
46 |     except Exception as e:
47 |         return f"Error downloading image: {str(e)}" 
```

--------------------------------------------------------------------------------
/run_server.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python
  2 | """
  3 | Server runner script with clean shutdown handling and auto-reload.
  4 | This script runs the server in a subprocess, handles Ctrl+C properly,
  5 | and automatically restarts the server when files change.
  6 | """
  7 | 
  8 | import os
  9 | import signal
 10 | import subprocess
 11 | import sys
 12 | import time
 13 | import threading
 14 | from watchdog.observers import Observer
 15 | from watchdog.events import FileSystemEventHandler
 16 | 
 17 | # Flag to indicate if we should restart the server
 18 | restart_server = False
 19 | # Flag to indicate if we're shutting down
 20 | shutting_down = False
 21 | 
 22 | class FileChangeHandler(FileSystemEventHandler):
 23 |     def on_any_event(self, event):
 24 |         global restart_server
 25 |         # Skip temporary files and __pycache__ directories
 26 |         if (event.src_path.endswith('.pyc') or 
 27 |             '__pycache__' in event.src_path or 
 28 |             '.git' in event.src_path or
 29 |             event.is_directory):
 30 |             return
 31 |         
 32 |         # Only restart for Python files
 33 |         if event.src_path.endswith('.py'):
 34 |             print(f"\n[RELOAD] Detected change in {event.src_path}")
 35 |             restart_server = True
 36 | 
 37 | def start_file_watcher(directory):
 38 |     """Start watching for file changes in the specified directory."""
 39 |     event_handler = FileChangeHandler()
 40 |     observer = Observer()
 41 |     observer.schedule(event_handler, directory, recursive=True)
 42 |     observer.start()
 43 |     return observer
 44 | 
 45 | def run_server():
 46 |     """Run the server process and handle its lifecycle."""
 47 |     global restart_server, shutting_down
 48 |     
 49 |     # Get the path to server.py in the same directory as this script
 50 |     current_dir = os.path.dirname(os.path.abspath(__file__)) if __file__ else "."
 51 |     server_path = os.path.join(current_dir, "server.py")
 52 |     
 53 |     # Start the server as a subprocess
 54 |     server_process = subprocess.Popen(
 55 |         [sys.executable, server_path],
 56 |         stdout=subprocess.PIPE,
 57 |         stderr=subprocess.STDOUT,
 58 |         universal_newlines=True,
 59 |         bufsize=1  # Line buffered
 60 |     )
 61 |     
 62 |     # Print server output in real-time
 63 |     def print_output():
 64 |         for line in server_process.stdout:
 65 |             if not shutting_down:  # Only print if we're not shutting down
 66 |                 print(line, end='')
 67 |     
 68 |     # Start a thread to print output
 69 |     output_thread = threading.Thread(target=print_output)
 70 |     output_thread.daemon = True
 71 |     output_thread.start()
 72 |     
 73 |     # Monitor the server process
 74 |     while server_process.poll() is None:
 75 |         if restart_server:
 76 |             print("\n[RELOAD] Restarting server due to file changes...")
 77 |             server_process.terminate()
 78 |             try:
 79 |                 server_process.wait(timeout=2)
 80 |             except subprocess.TimeoutExpired:
 81 |                 server_process.kill()
 82 |                 server_process.wait()
 83 |             restart_server = False
 84 |             return True  # Signal to restart
 85 |         time.sleep(0.1)
 86 |     
 87 |     # If we get here, the server exited on its own
 88 |     return_code = server_process.poll()
 89 |     print(f"\nServer exited with code {return_code}")
 90 |     return False  # Signal not to restart
 91 | 
 92 | def main():
 93 |     global restart_server, shutting_down
 94 |     
 95 |     print("Starting MCP Tool Server with clean shutdown handling and auto-reload...")
 96 |     
 97 |     # Get the current directory (where this script is located)
 98 |     current_dir = os.path.dirname(os.path.abspath(__file__)) if __file__ else "."
 99 |     
100 |     # Create downloads directory if it doesn't exist
101 |     downloads_dir = os.path.join(current_dir, "downloads")
102 |     if not os.path.exists(downloads_dir):
103 |         os.makedirs(downloads_dir)
104 |         print(f"Created downloads directory at: {downloads_dir}")
105 |     
106 |     # Start file watcher
107 |     observer = start_file_watcher(current_dir)
108 |     
109 |     # Function to handle Ctrl+C
110 |     def signal_handler(sig, frame):
111 |         global shutting_down
112 |         print("\nReceived shutdown signal. Terminating server...")
113 |         shutting_down = True
114 |         observer.stop()
115 |         sys.exit(0)
116 |     
117 |     # Register signal handlers
118 |     signal.signal(signal.SIGINT, signal_handler)
119 |     signal.signal(signal.SIGTERM, signal_handler)
120 |     
121 |     # Run the server, restarting as needed
122 |     try:
123 |         while True:
124 |             should_restart = run_server()
125 |             if not should_restart:
126 |                 break
127 |             time.sleep(0.5)  # Small delay before restart
128 |     except KeyboardInterrupt:
129 |         signal_handler(signal.SIGINT, None)
130 |     finally:
131 |         observer.stop()
132 |         observer.join()
133 |     
134 |     return 0
135 | 
136 | if __name__ == "__main__":
137 |     sys.exit(main()) 
```

--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------

```python
  1 |  # server.py
  2 | import asyncio
  3 | import click
  4 | from mcp.server.models import InitializationOptions
  5 | import mcp.types as types
  6 | from mcp.server import NotificationOptions, Server
  7 | from tools.image_gen import generate_image
  8 | from tools.background_removal import remove_background
  9 | from tools.image_download import download_image_from_url
 10 | from tools.image_scaling import scale_image
 11 | from typing import Optional
 12 | import os
 13 | import sys
 14 | from dotenv import load_dotenv
 15 | from fastapi import FastAPI
 16 | from mcp.server.sse import SseServerTransport
 17 | from starlette.routing import Mount, Route
 18 | import signal
 19 | import uvicorn
 20 | 
 21 | # Debug: Print current working directory
 22 | print(f"Current working directory: {os.getcwd()}")
 23 | 
 24 | # Load environment variables
 25 | print("Loading environment variables...")
 26 | load_dotenv(verbose=True)
 27 | print(f"Environment after load_dotenv: FAL_KEY={'*' * len(os.getenv('FAL_KEY')) if os.getenv('FAL_KEY') else 'Not found'}")
 28 | 
 29 | # Initialize the server
 30 | app = FastAPI(debug=True)
 31 | server = Server("image-gen-server")
 32 | sse = SseServerTransport("/messages/")
 33 | 
 34 | # Force exit on SIGINT (Ctrl+C)
 35 | def force_exit_handler(sig, frame):
 36 |     print("\nForce exiting server...")
 37 |     os._exit(0)  # Force immediate exit
 38 | 
 39 | # Register signal handlers
 40 | signal.signal(signal.SIGINT, force_exit_handler)
 41 | signal.signal(signal.SIGTERM, force_exit_handler)
 42 | 
 43 | # Add shutdown event handler
 44 | @app.on_event("shutdown")
 45 | async def shutdown_event():
 46 |     print("Shutting down server gracefully...")
 47 |     # Cancel all tasks
 48 |     tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
 49 |     for task in tasks:
 50 |         task.cancel()
 51 |     
 52 |     # Wait briefly for tasks to cancel, then force exit if needed
 53 |     try:
 54 |         await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=1.0)
 55 |         print("All tasks cancelled successfully")
 56 |     except asyncio.TimeoutError:
 57 |         print("Timeout waiting for tasks to cancel, forcing exit")
 58 |         os._exit(0)
 59 | 
 60 | @server.list_resources()
 61 | async def handle_list_resources() -> list[types.Resource]:
 62 |     """List available resources."""
 63 |     return []
 64 | 
 65 | @server.read_resource()
 66 | async def handle_read_resource(uri: str) -> str:
 67 |     """Read a specific resource."""
 68 |     raise ValueError(f"Unsupported resource: {uri}")
 69 | 
 70 | @server.list_prompts()
 71 | async def handle_list_prompts() -> list[types.Prompt]:
 72 |     """List available prompts."""
 73 |     return []
 74 | 
 75 | @server.get_prompt()
 76 | async def handle_get_prompt(name: str, arguments: dict[str, str] | None) -> types.GetPromptResult:
 77 |     """Get a specific prompt."""
 78 |     raise ValueError(f"Unknown prompt: {name}")
 79 | 
 80 | @server.list_tools()
 81 | async def handle_list_tools() -> list[types.Tool]:
 82 |     """List available tools."""
 83 |     return [
 84 |         types.Tool(
 85 |             name="generate_image",
 86 |             description="Generate an image from a text prompt using FAL AI. For best results with logos and icons, use the format: '[subject], 2D flat design, [optional style details], white background'. Example: 'pine tree logo, 2D flat design, minimal geometric style, white background'",
 87 |             inputSchema={
 88 |                 "type": "object",
 89 |                 "properties": {
 90 |                     "prompt": {
 91 |                         "type": "string",
 92 |                         "description": "Text prompt to generate the image. Recommended format: '[subject], 2D flat design, [optional style details], white background'",
 93 |                         "examples": [
 94 |                             "mountain peak logo, 2D flat design, minimalist geometric shapes, white background",
 95 |                             "coffee cup icon, 2D flat design, simple line art style, white background",
 96 |                             "fox mascot, 2D flat design, modern geometric shapes, white background"
 97 |                         ]
 98 |                     },
 99 |                     "model": {
100 |                         "type": "string",
101 |                         "description": "Model to use for generation",
102 |                         "default": "fal-ai/ideogram/v2",
103 |                         "enum": ["fal-ai/ideogram/v2"]
104 |                     },
105 |                     "aspect_ratio": {
106 |                         "type": "string",
107 |                         "description": "The aspect ratio of the generated image",
108 |                         "default": "1:1",
109 |                         "enum": ["10:16", "16:10", "9:16", "16:9", "4:3", "3:4", "1:1", "1:3", "3:1", "3:2", "2:3"]
110 |                     },
111 |                     "expand_prompt": {
112 |                         "type": "boolean",
113 |                         "description": "Whether to expand the prompt with MagicPrompt functionality",
114 |                         "default": True
115 |                     },
116 |                     "style": {
117 |                         "type": "string",
118 |                         "description": "The style of the generated image",
119 |                         "default": "auto",
120 |                         "enum": ["auto", "general", "realistic", "design", "render_3D", "anime"]
121 |                     },
122 |                     "negative_prompt": {
123 |                         "type": "string",
124 |                         "description": "A negative prompt to avoid in the generated image",
125 |                         "default": ""
126 |                     }
127 |                 },
128 |                 "required": ["prompt"]
129 |             }
130 |         ),
131 |         types.Tool(
132 |             name="remove_background",
133 |             description="Remove background from an image using FAL AI",
134 |             inputSchema={
135 |                 "type": "object",
136 |                 "properties": {
137 |                     "image_url": {
138 |                         "type": "string",
139 |                         "description": "Input image url"
140 |                     },
141 |                     "sync_mode": {
142 |                         "type": "boolean",
143 |                         "description": "If true, wait for the image to be generated and uploaded before returning",
144 |                         "default": True
145 |                     },
146 |                     "crop_to_bbox": {
147 |                         "type": "boolean",
148 |                         "description": "If true, crop the result to a bounding box around the subject",
149 |                         "default": False
150 |                     }
151 |                 },
152 |                 "required": ["image_url"]
153 |             }
154 |         ),
155 |         types.Tool(
156 |             name="download_image",
157 |             description="Download an image from a URL and save it locally",
158 |             inputSchema={
159 |                 "type": "object",
160 |                 "properties": {
161 |                     "image_url": {
162 |                         "type": "string",
163 |                         "description": "URL of the image to download"
164 |                     },
165 |                     "output_dir": {
166 |                         "type": "string",
167 |                         "description": "Directory to save the downloaded image",
168 |                         "default": "downloads"
169 |                     }
170 |                 },
171 |                 "required": ["image_url"]
172 |             }
173 |         ),
174 |         types.Tool(
175 |             name="scale_image",
176 |             description="Scale an image to multiple sizes while preserving transparency",
177 |             inputSchema={
178 |                 "type": "object",
179 |                 "properties": {
180 |                     "input_path": {
181 |                         "type": "string",
182 |                         "description": "Path to the input image to scale"
183 |                     },
184 |                     "sizes": {
185 |                         "type": "array",
186 |                         "items": {
187 |                             "type": "array",
188 |                             "items": {"type": "integer"},
189 |                             "minItems": 2,
190 |                             "maxItems": 2
191 |                         },
192 |                         "description": "List of [width, height] pairs for desired output sizes",
193 |                         "default": [[32, 32], [128, 128]]
194 |                     }
195 |                 },
196 |                 "required": ["input_path"]
197 |             }
198 |         )
199 |     ]
200 | 
201 | class ImageGenToolHandler:
202 |     def validate_prompt(self, prompt: str) -> bool:
203 |         """
204 |         Validate that the prompt is not empty.
205 |         """
206 |         return bool(prompt and prompt.strip())
207 | 
208 |     async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent]:
209 |         prompt = arguments.get("prompt")
210 |         if not prompt or not self.validate_prompt(prompt):
211 |             return [types.TextContent(
212 |                 type="text", 
213 |                 text="Error: Prompt cannot be empty"
214 |             )]
215 |             
216 |         print(f"Generating image with prompt: {prompt}")
217 |         result = await generate_image(
218 |             prompt=prompt,
219 |             model=arguments.get("model", "fal-ai/ideogram/v2"),
220 |             aspect_ratio=arguments.get("aspect_ratio", "1:1"),
221 |             expand_prompt=arguments.get("expand_prompt", True),
222 |             style=arguments.get("style", "auto"),
223 |             negative_prompt=arguments.get("negative_prompt", "")
224 |         )
225 |         print(f"Image generation result: {result}")
226 |         if result.startswith("http"):
227 |             return [types.TextContent(type="text", text=f"Generated image URL: {result}")]
228 |         return [types.TextContent(type="text", text=result)]
229 | 
230 | class BackgroundRemovalToolHandler:
231 |     async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent]:
232 |         print(f"Removing background from image: {arguments.get('image_url')}")
233 |         result = await remove_background(
234 |             arguments.get("image_url"),
235 |             arguments.get("sync_mode", True),
236 |             arguments.get("crop_to_bbox", False)
237 |         )
238 |         print(f"Background removal result: {result}")
239 |         if result.startswith("http"):
240 |             return [types.TextContent(type="text", text=f"Background removed image URL: {result}")]
241 |         return [types.TextContent(type="text", text=result)]
242 | 
243 | class ImageDownloadToolHandler:
244 |     async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent]:
245 |         print(f"Downloading image from: {arguments.get('image_url')}")
246 |         result = await download_image_from_url(
247 |             arguments.get("image_url"),
248 |             arguments.get("output_dir", "downloads")
249 |         )
250 |         print(f"Download result: {result}")
251 |         return [types.TextContent(type="text", text=result)]
252 | 
253 | class ImageScalingToolHandler:
254 |     async def handle(self, name: str, arguments: dict | None) -> list[types.TextContent | types.ImageContent]:
255 |         print(f"Scaling image: {arguments.get('input_path')}")
256 |         result = await scale_image(
257 |             arguments.get("input_path"),
258 |             arguments.get("sizes", [(32, 32), (128, 128)])
259 |         )
260 |         print(f"Scaling result: {result}")
261 |         return [types.TextContent(type="text", text=result)]
262 | 
263 | tool_handlers = {
264 |     "generate_image": ImageGenToolHandler(),
265 |     "remove_background": BackgroundRemovalToolHandler(),
266 |     "download_image": ImageDownloadToolHandler(),
267 |     "scale_image": ImageScalingToolHandler()
268 | }
269 | 
270 | @server.call_tool()
271 | async def handle_call_tool(
272 |     name: str,
273 |     arguments: dict | None
274 | ) -> list[types.TextContent | types.ImageContent]:
275 |     """Handle tool execution requests."""
276 |     if name in tool_handlers:
277 |         return await tool_handlers[name].handle(name, arguments)
278 |     else:
279 |         raise ValueError(f"Unknown tool: {name}")
280 | 
281 | async def handle_sse(request):
282 |     async with sse.connect_sse(
283 |         request.scope, request.receive, request._send
284 |     ) as streams:
285 |         await server.run(
286 |             streams[0],
287 |             streams[1],
288 |             InitializationOptions(
289 |                 server_name="image-gen-server",
290 |                 server_version="0.1.0",
291 |                 capabilities=server.get_capabilities(
292 |                     notification_options=NotificationOptions(),
293 |                     experimental_capabilities={},
294 |                 ),
295 |             ),
296 |         )
297 | 
298 | @click.command()
299 | @click.option("--port", default=7777, help="Port to listen on")
300 | def main(port: int) -> int:
301 |     # Ensure FAL_KEY is set
302 |     fal_key = os.getenv("FAL_KEY")
303 |     if not fal_key:
304 |         print("Warning: FAL_KEY environment variable not found, checking FAL_API_KEY...")
305 |         fal_key = os.getenv("FAL_API_KEY")
306 |         if not fal_key:
307 |             print("Error: Neither FAL_KEY nor FAL_API_KEY environment variables are set")
308 |             exit(1)
309 |         os.environ["FAL_KEY"] = fal_key
310 | 
311 |     print("Starting image generation server...")
312 | 
313 |     # Add routes
314 |     app.add_route("/sse", handle_sse)
315 |     app.mount("/messages", sse.handle_post_message)
316 | 
317 |     # Cool ASCII art log
318 |     print("""
319 |     ===========================================
320 |           🚀 MCP Server is LIVE! 🚀
321 |     ------------------------------------------- 
322 |     |  Status: Running                        |
323 |     |  Transport: SSE                         |
324 |     |  URL: http://127.0.0.1:{}              |
325 |     |  Ready for Cursor MCP client            |
326 |     |  Auto-reload: Enabled                   |
327 |     |  Force exit on Ctrl+C: Enabled          |
328 |     ------------------------------------------- 
329 |     Listening for requests... 🎉
330 |     ===========================================
331 |     """.format(port))
332 | 
333 |     # Configure uvicorn with a short timeout for graceful shutdown
334 |     config = uvicorn.Config(
335 |         app=app,
336 |         host="127.0.0.1",
337 |         port=port,
338 |         reload=True,
339 |         reload_dirs=["mcp_tool_server"],
340 |         workers=1,
341 |         timeout_graceful_shutdown=1  # Only wait 1 second for graceful shutdown
342 |     )
343 |     
344 |     # Run with a custom server instance that has a shorter timeout
345 |     server = uvicorn.Server(config)
346 |     
347 |     try:
348 |         server.run()
349 |     except KeyboardInterrupt:
350 |         print("KeyboardInterrupt received, forcing exit...")
351 |         os._exit(0)
352 |     
353 |     return 0
354 | 
355 | if __name__ == "__main__":
356 |     main()
```