#
tokens: 29477/50000 7/8 files (page 1/2)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 1 of 2. Use http://codebase.md/gongrzhe/yolo-mcp-server?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .gitignore
├── LICENSE
├── mcp-config.json
├── Readme.md
├── requirements.txt
├── server_cli.py
├── server_combine_terminal.py
├── server.py
└── setup.py
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python-generated files
 2 | __pycache__/
 3 | *.py[oc]
 4 | build/
 5 | dist/
 6 | wheels/
 7 | *.egg-info
 8 | 
 9 | # Virtual environments
10 | .venv
11 | yolo_service.log
12 | *.pt
13 | runs/
```

--------------------------------------------------------------------------------
/Readme.md:
--------------------------------------------------------------------------------

```markdown
  1 | # YOLO MCP Service
  2 | 
  3 | A powerful YOLO (You Only Look Once) computer vision service that integrates with Claude AI through Model Context Protocol (MCP). This service enables Claude to perform object detection, segmentation, classification, and real-time camera analysis using state-of-the-art YOLO models.
  4 | 
  5 | ![](https://badge.mcpx.dev?type=server 'MCP Server')
  6 | 
  7 | 
  8 | ## Features
  9 | 
 10 | - Object detection, segmentation, classification, and pose estimation
 11 | - Real-time camera integration for live object detection
 12 | - Support for model training, validation, and export
 13 | - Comprehensive image analysis combining multiple models
 14 | - Support for both file paths and base64-encoded images
 15 | - Seamless integration with Claude AI
 16 | 
 17 | ## Setup Instructions
 18 | 
 19 | ### Prerequisites
 20 | 
 21 | - Python 3.10 or higher
 22 | - Git (optional, for cloning the repository)
 23 | 
 24 | ### Environment Setup
 25 | 
 26 | 1. Create a directory for the project and navigate to it:
 27 |    ```bash
 28 |    mkdir yolo-mcp-service
 29 |    cd yolo-mcp-service
 30 |    ```
 31 | 
 32 | 2. Download the project files or clone from repository:
 33 |    ```bash
 34 |    # If you have the files, copy them to this directory
 35 |    # If using git:
 36 |    git clone https://github.com/GongRzhe/YOLO-MCP-Server.git .
 37 |    ```
 38 | 
 39 | 3. Create a virtual environment:
 40 |    ```bash
 41 |    # On Windows
 42 |    python -m venv .venv
 43 |    
 44 |    # On macOS/Linux
 45 |    python3 -m venv .venv
 46 |    ```
 47 | 
 48 | 4. Activate the virtual environment:
 49 |    ```bash
 50 |    # On Windows
 51 |    .venv\Scripts\activate
 52 |    
 53 |    # On macOS/Linux
 54 |    source .venv/bin/activate
 55 |    ```
 56 | 
 57 | 5. Run the setup script:
 58 |    ```bash
 59 |    python setup.py
 60 |    ```
 61 |    
 62 |    The setup script will:
 63 |    - Check your Python version
 64 |    - Create a virtual environment (if not already created)
 65 |    - Install required dependencies
 66 |    - Generate an MCP configuration file (mcp-config.json)
 67 |    - Output configuration information for different MCP clients including Claude
 68 | 
 69 | 6. Note the output from the setup script, which will look similar to:
 70 |    ```
 71 |    MCP configuration has been written to: /path/to/mcp-config.json
 72 |    
 73 |    MCP configuration for Cursor:
 74 |    
 75 |    /path/to/.venv/bin/python /path/to/server.py
 76 |    
 77 |    MCP configuration for Windsurf/Claude Desktop:
 78 |    {
 79 |      "mcpServers": {
 80 |        "yolo-service": {
 81 |          "command": "/path/to/.venv/bin/python",
 82 |          "args": [
 83 |            "/path/to/server.py"
 84 |          ],
 85 |          "env": {
 86 |            "PYTHONPATH": "/path/to"
 87 |          }
 88 |        }
 89 |      }
 90 |    }
 91 |    
 92 |    To use with Claude Desktop, merge this configuration into: /path/to/claude_desktop_config.json
 93 |    ```
 94 | 
 95 | ### Downloading YOLO Models
 96 | 
 97 | Before using the service, you need to download the YOLO models. The service looks for models in the following directories:
 98 | - The current directory where the service is running
 99 | - A `models` subdirectory
100 | - Any other directory configured in the `CONFIG["model_dirs"]` variable in server.py
101 | 
102 | Create a models directory and download some common models:
103 | 
104 | ```bash
105 | # Create models directory
106 | mkdir models
107 | 
108 | # Download YOLOv8n for basic object detection
109 | curl -L https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt -o models/yolov8n.pt
110 | 
111 | # Download YOLOv8n-seg for segmentation
112 | curl -L https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-seg.pt -o models/yolov8n-seg.pt
113 | 
114 | # Download YOLOv8n-cls for classification
115 | curl -L https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-cls.pt -o models/yolov8n-cls.pt
116 | 
117 | # Download YOLOv8n-pose for pose estimation
118 | curl -L https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-pose.pt -o models/yolov8n-pose.pt
119 | ```
120 | 
121 | For Windows PowerShell users:
122 | ```powershell
123 | # Create models directory
124 | mkdir models
125 | 
126 | # Download models using Invoke-WebRequest
127 | Invoke-WebRequest -Uri "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt" -OutFile "models/yolov8n.pt"
128 | Invoke-WebRequest -Uri "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-seg.pt" -OutFile "models/yolov8n-seg.pt"
129 | Invoke-WebRequest -Uri "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-cls.pt" -OutFile "models/yolov8n-cls.pt"
130 | Invoke-WebRequest -Uri "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-pose.pt" -OutFile "models/yolov8n-pose.pt"
131 | ```
132 | 
133 | ### Configuring Claude
134 | 
135 | To use this service with Claude:
136 | 
137 | 1. For Claude web: Set up the service on your local machine and use the configuration provided by the setup script in your MCP client.
138 | 
139 | 2. For Claude Desktop:
140 |    - Run the setup script and note the configuration output
141 |    - Locate your Claude Desktop configuration file (the path is provided in the setup script output)
142 |    - Add or merge the configuration into your Claude Desktop configuration file
143 |    - Restart Claude Desktop
144 | 
145 | ## Using YOLO Tools in Claude
146 | 
147 | ### 1. First Check Available Models
148 | 
149 | Always check which models are available on your system first:
150 | 
151 | ```
152 | I'd like to use the YOLO tools. Can you first check which models are available on my system?
153 | 
154 | <function_calls>
155 | <invoke name="list_available_models">
156 | </invoke>
157 | </function_calls>
158 | ```
159 | 
160 | ### 2. Detecting Objects in an Image
161 | 
162 | For analyzing an image file on your computer:
163 | 
164 | ```
165 | Can you analyze this image file for objects?
166 | 
167 | <function_calls>
168 | <invoke name="analyze_image_from_path">
169 | <parameter name="image_path">/path/to/your/image.jpg</parameter>
170 | <parameter name="confidence">0.3</parameter>
171 | </invoke>
172 | </function_calls>
173 | ```
174 | 
175 | You can also specify a different model:
176 | 
177 | ```
178 | Can you analyze this image using a different model?
179 | 
180 | <function_calls>
181 | <invoke name="analyze_image_from_path">
182 | <parameter name="image_path">/path/to/your/image.jpg</parameter>
183 | <parameter name="model_name">yolov8n.pt</parameter>
184 | <parameter name="confidence">0.4</parameter>
185 | </invoke>
186 | </function_calls>
187 | ```
188 | 
189 | ### 3. Running Comprehensive Image Analysis
190 | 
191 | For more detailed analysis that combines object detection, classification, and more:
192 | 
193 | ```
194 | Can you perform a comprehensive analysis on this image?
195 | 
196 | <function_calls>
197 | <invoke name="comprehensive_image_analysis">
198 | <parameter name="image_path">/path/to/your/image.jpg</parameter>
199 | <parameter name="confidence">0.3</parameter>
200 | </invoke>
201 | </function_calls>
202 | ```
203 | 
204 | ### 4. Image Segmentation
205 | 
206 | For identifying object boundaries and creating segmentation masks:
207 | 
208 | ```
209 | Can you perform image segmentation on this photo?
210 | 
211 | <function_calls>
212 | <invoke name="segment_objects">
213 | <parameter name="image_data">/path/to/your/image.jpg</parameter>
214 | <parameter name="is_path">true</parameter>
215 | <parameter name="model_name">yolov8n-seg.pt</parameter>
216 | </invoke>
217 | </function_calls>
218 | ```
219 | 
220 | ### 5. Image Classification
221 | 
222 | For classifying the entire image content:
223 | 
224 | ```
225 | What does this image show? Can you classify it?
226 | 
227 | <function_calls>
228 | <invoke name="classify_image">
229 | <parameter name="image_data">/path/to/your/image.jpg</parameter>
230 | <parameter name="is_path">true</parameter>
231 | <parameter name="model_name">yolov8n-cls.pt</parameter>
232 | <parameter name="top_k">5</parameter>
233 | </invoke>
234 | </function_calls>
235 | ```
236 | 
237 | ### 6. Using Your Computer's Camera
238 | 
239 | Start real-time object detection using your computer's camera:
240 | 
241 | ```
242 | Can you turn on my camera and detect objects in real-time?
243 | 
244 | <function_calls>
245 | <invoke name="start_camera_detection">
246 | <parameter name="model_name">yolov8n.pt</parameter>
247 | <parameter name="confidence">0.3</parameter>
248 | </invoke>
249 | </function_calls>
250 | ```
251 | 
252 | Get the latest camera detections:
253 | 
254 | ```
255 | What are you seeing through my camera right now?
256 | 
257 | <function_calls>
258 | <invoke name="get_camera_detections">
259 | </invoke>
260 | </function_calls>
261 | ```
262 | 
263 | Stop the camera when finished:
264 | 
265 | ```
266 | Please turn off the camera.
267 | 
268 | <function_calls>
269 | <invoke name="stop_camera_detection">
270 | </invoke>
271 | </function_calls>
272 | ```
273 | 
274 | ### 7. Advanced Model Operations
275 | 
276 | #### Training a Custom Model
277 | 
278 | ```
279 | I want to train a custom object detection model on my dataset.
280 | 
281 | <function_calls>
282 | <invoke name="train_model">
283 | <parameter name="dataset_path">/path/to/your/dataset</parameter>
284 | <parameter name="model_name">yolov8n.pt</parameter>
285 | <parameter name="epochs">50</parameter>
286 | </invoke>
287 | </function_calls>
288 | ```
289 | 
290 | #### Validating a Model
291 | 
292 | ```
293 | Can you validate the performance of my model on a test dataset?
294 | 
295 | <function_calls>
296 | <invoke name="validate_model">
297 | <parameter name="model_path">/path/to/your/trained/model.pt</parameter>
298 | <parameter name="data_path">/path/to/validation/dataset</parameter>
299 | </invoke>
300 | </function_calls>
301 | ```
302 | 
303 | #### Exporting a Model to Different Formats
304 | 
305 | ```
306 | I need to export my YOLO model to ONNX format.
307 | 
308 | <function_calls>
309 | <invoke name="export_model">
310 | <parameter name="model_path">/path/to/your/model.pt</parameter>
311 | <parameter name="format">onnx</parameter>
312 | </invoke>
313 | </function_calls>
314 | ```
315 | 
316 | ### 8. Testing Connection
317 | 
318 | Check if the YOLO service is running correctly:
319 | 
320 | ```
321 | Is the YOLO service running correctly?
322 | 
323 | <function_calls>
324 | <invoke name="test_connection">
325 | </invoke>
326 | </function_calls>
327 | ```
328 | 
329 | ## Troubleshooting
330 | 
331 | ### Camera Issues
332 | 
333 | If the camera doesn't work, try different camera IDs:
334 | 
335 | ```
336 | <function_calls>
337 | <invoke name="start_camera_detection">
338 | <parameter name="camera_id">1</parameter>  <!-- Try 0, 1, or 2 -->
339 | </invoke>
340 | </function_calls>
341 | ```
342 | 
343 | ### Model Not Found
344 | 
345 | If a model is not found, make sure you've downloaded it to one of the configured directories:
346 | 
347 | ```
348 | <function_calls>
349 | <invoke name="get_model_directories">
350 | </invoke>
351 | </function_calls>
352 | ```
353 | 
354 | ### Performance Issues
355 | 
356 | For better performance with limited resources, use the smaller models (e.g., yolov8n.pt instead of yolov8x.pt)
357 | 
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
1 | mcp
2 | ultralytics
3 | opencv-python
4 | numpy
5 | pillow
```

--------------------------------------------------------------------------------
/mcp-config.json:
--------------------------------------------------------------------------------

```json
 1 | {
 2 |   "mcpServers": {
 3 |     "yolo-service": {
 4 |       "command": "D:\\BackDataService\\YOLO-MCP-Server\\.venv\\Scripts\\python.exe",
 5 |       "args": [
 6 |         "D:\\BackDataService\\YOLO-MCP-Server\\server.py"
 7 |       ],
 8 |       "env": {
 9 |         "PYTHONPATH": "D:\\BackDataService\\YOLO-MCP-Server"
10 |       }
11 |     }
12 |   }
13 | }
14 | 
```

--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------

```python
  1 | # Import necessary Python standard libraries
  2 | import os          # For operating with file system, handling files and directory paths
  3 | import json        # For processing JSON format data
  4 | import subprocess  # For creating and managing subprocesses
  5 | import sys         # For accessing Python interpreter related variables and functions
  6 | import platform    # For getting current operating system information
  7 | 
  8 | def setup_venv():
  9 |     """
 10 |     Function to set up Python virtual environment
 11 |     
 12 |     Features:
 13 |     - Checks if Python version meets requirements (3.10+)
 14 |     - Creates Python virtual environment (if it doesn't exist)
 15 |     - Installs required dependencies in the newly created virtual environment
 16 |     
 17 |     No parameters required
 18 |     
 19 |     Returns: Path to Python interpreter in the virtual environment
 20 |     """
 21 |     # Check Python version
 22 |     python_version = sys.version_info
 23 |     if python_version.major < 3 or (python_version.major == 3 and python_version.minor < 10):
 24 |         print("Error: Python 3.10 or higher is required.")
 25 |         sys.exit(1)
 26 |     
 27 |     # Get absolute path of the directory containing the current script
 28 |     base_path = os.path.abspath(os.path.dirname(__file__))
 29 |     # Set virtual environment directory path, will create a directory named '.venv' under base_path
 30 |     venv_path = os.path.join(base_path, '.venv')
 31 |     # Flag whether a new virtual environment was created
 32 |     venv_created = False
 33 | 
 34 |     # Check if virtual environment already exists
 35 |     if not os.path.exists(venv_path):
 36 |         print("Creating virtual environment...")
 37 |         # Use Python's venv module to create virtual environment
 38 |         # sys.executable gets the path of the current Python interpreter
 39 |         subprocess.run([sys.executable, '-m', 'venv', venv_path], check=True)
 40 |         print("Virtual environment created successfully!")
 41 |         venv_created = True
 42 |     else:
 43 |         print("Virtual environment already exists.")
 44 |     
 45 |     # Determine pip and python executable paths based on operating system
 46 |     is_windows = platform.system() == "Windows"
 47 |     if is_windows:
 48 |         pip_path = os.path.join(venv_path, 'Scripts', 'pip.exe')
 49 |         python_path = os.path.join(venv_path, 'Scripts', 'python.exe')
 50 |     else:
 51 |         pip_path = os.path.join(venv_path, 'bin', 'pip')
 52 |         python_path = os.path.join(venv_path, 'bin', 'python')
 53 |     
 54 |     # Install or update dependencies
 55 |     print("\nInstalling requirements...")
 56 |     
 57 |     # Create requirements.txt with necessary packages for YOLO MCP server
 58 |     requirements = [
 59 |         "mcp",           # Model Context Protocol for server
 60 |         "ultralytics",   # YOLO models
 61 |         "opencv-python", # For camera operations
 62 |         "numpy",         # Numerical operations
 63 |         "pillow"         # Image processing
 64 |     ]
 65 |     
 66 |     requirements_path = os.path.join(base_path, 'requirements.txt')
 67 |     with open(requirements_path, 'w') as f:
 68 |         f.write('\n'.join(requirements))
 69 |     
 70 |     # Update pip using the python executable (more reliable method)
 71 |     try:
 72 |         subprocess.run([python_path, '-m', 'pip', 'install', '--upgrade', 'pip'], check=True)
 73 |         print("Pip upgraded successfully.")
 74 |     except subprocess.CalledProcessError:
 75 |         print("Warning: Pip upgrade failed, continuing with existing version.")
 76 |     
 77 |     # Install requirements
 78 |     subprocess.run([pip_path, 'install', '-r', requirements_path], check=True)
 79 |     
 80 |     print("Requirements installed successfully!")
 81 |     
 82 |     return python_path
 83 | 
 84 | def generate_mcp_config(python_path):
 85 |     """
 86 |     Function to generate MCP (Model Context Protocol) configuration file for YOLO service
 87 |     
 88 |     Features:
 89 |     - Creates configuration containing Python interpreter path and server script path
 90 |     - Saves configuration as JSON format file
 91 |     - Prints configuration information for different MCP clients
 92 |     
 93 |     Parameters:
 94 |     - python_path: Path to Python interpreter in the virtual environment
 95 |     
 96 |     Returns: None
 97 |     """
 98 |     # Get absolute path of the directory containing the current script
 99 |     base_path = os.path.abspath(os.path.dirname(__file__))
100 |     
101 |     # Path to YOLO MCP server script
102 |     server_script_path = os.path.join(base_path, 'server.py')
103 |     
104 |     # Create MCP configuration dictionary
105 |     config = {
106 |         "mcpServers": {
107 |             "yolo-service": {
108 |                 "command": python_path,
109 |                 "args": [server_script_path],
110 |                 "env": {
111 |                     "PYTHONPATH": base_path
112 |                 }
113 |             }
114 |         }
115 |     }
116 |     
117 |     # Save configuration to JSON file
118 |     config_path = os.path.join(base_path, 'mcp-config.json')
119 |     with open(config_path, 'w') as f:
120 |         json.dump(config, f, indent=2)  # indent=2 gives the JSON file good formatting
121 | 
122 |     # Print configuration information
123 |     print(f"\nMCP configuration has been written to: {config_path}")    
124 |     print(f"\nMCP configuration for Cursor:\n\n{python_path} {server_script_path}")
125 |     print("\nMCP configuration for Windsurf/Claude Desktop:")
126 |     print(json.dumps(config, indent=2))
127 |     
128 |     # Provide instructions for adding configuration to Claude Desktop configuration file
129 |     if platform.system() == "Windows":
130 |         claude_config_path = os.path.expandvars("%APPDATA%\\Claude\\claude_desktop_config.json")
131 |     else:  # macOS
132 |         claude_config_path = os.path.expanduser("~/Library/Application Support/Claude/claude_desktop_config.json")
133 |     
134 |     print(f"\nTo use with Claude Desktop, merge this configuration into: {claude_config_path}")
135 | 
136 | # Code executed when the script is run directly (not imported)
137 | if __name__ == '__main__':
138 |     # Execute main functions in sequence:
139 |     # 1. Set up virtual environment and install dependencies
140 |     python_path = setup_venv()
141 |     # 2. Generate MCP configuration file
142 |     generate_mcp_config(python_path)
143 |     
144 |     print("\nSetup complete! You can now use the YOLO MCP server with compatible clients.")
```

--------------------------------------------------------------------------------
/server_combine_terminal.py:
--------------------------------------------------------------------------------

```python
   1 | # server.py - CLI version (command return only)
   2 | import fnmatch
   3 | import os
   4 | import base64
   5 | import time
   6 | import threading
   7 | import json
   8 | import tempfile
   9 | import platform
  10 | from io import BytesIO
  11 | from typing import List, Dict, Any, Optional, Union
  12 | import numpy as np
  13 | from PIL import Image
  14 | 
  15 | from mcp.server.fastmcp import FastMCP
  16 | 
  17 | # Set up logging configuration
  18 | import os.path
  19 | import sys
  20 | import logging
  21 | import contextlib
  22 | import signal
  23 | import atexit
  24 | 
  25 | logging.basicConfig(
  26 |     level=logging.INFO,
  27 |     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  28 |     handlers=[
  29 |         logging.FileHandler("yolo_service.log"),
  30 |         logging.StreamHandler(sys.stderr)
  31 |     ]
  32 | )
  33 | camera_startup_status = None  # Will store error details if startup fails
  34 | camera_last_error = None
  35 | logger = logging.getLogger('yolo_service')
  36 | 
  37 | # Global variables for camera control
  38 | camera_running = False
  39 | camera_thread = None
  40 | detection_results = []
  41 | camera_last_access_time = 0
  42 | CAMERA_INACTIVITY_TIMEOUT = 60  # Auto-shutdown after 60 seconds of inactivity
  43 | 
  44 | def load_image(image_source, is_path=False):
  45 |     """
  46 |     Load image from file path or base64 data
  47 |     
  48 |     Args:
  49 |         image_source: File path or base64 encoded image data
  50 |         is_path: Whether image_source is a file path
  51 |         
  52 |     Returns:
  53 |         PIL Image object
  54 |     """
  55 |     try:
  56 |         if is_path:
  57 |             # Load image from file path
  58 |             if os.path.exists(image_source):
  59 |                 return Image.open(image_source)
  60 |             else:
  61 |                 raise FileNotFoundError(f"Image file not found: {image_source}")
  62 |         else:
  63 |             # Load image from base64 data
  64 |             image_bytes = base64.b64decode(image_source)
  65 |             return Image.open(BytesIO(image_bytes))
  66 |     except Exception as e:
  67 |         raise ValueError(f"Failed to load image: {str(e)}")
  68 | 
  69 | # Modified function to just return the command string
  70 | def run_yolo_cli(command_args, capture_output=True, timeout=60):
  71 |     """
  72 |     Return the YOLO CLI command string without executing it
  73 |     
  74 |     Args:
  75 |         command_args: List of command arguments to pass to yolo CLI
  76 |         capture_output: Not used, kept for compatibility with original function
  77 |         timeout: Not used, kept for compatibility with original function
  78 |         
  79 |     Returns:
  80 |         Dictionary containing the command string
  81 |     """
  82 |     # Build the complete command
  83 |     cmd = ["yolo"] + command_args
  84 |     cmd_str = " ".join(cmd)
  85 |     
  86 |     # Log the command
  87 |     logger.info(f"Would run YOLO CLI command: {cmd_str}")
  88 |     
  89 |     # Return the command string in a similar structure as the original function
  90 |     return {
  91 |         "success": True,
  92 |         "command": cmd_str,
  93 |         "would_execute": True,
  94 |         "note": "CLI execution disabled, showing command only"
  95 |     }
  96 | 
  97 | # Create MCP server
  98 | mcp = FastMCP("YOLO_Service")
  99 | 
 100 | # Global configuration
 101 | CONFIG = {
 102 |     "model_dirs": [
 103 |         ".",  # Current directory
 104 |         "./models",  # Models subdirectory
 105 |         os.path.join(os.path.dirname(os.path.abspath(__file__)), "models"),
 106 |     ]
 107 | }
 108 | 
 109 | # Function to save base64 data to temp file
 110 | def save_base64_to_temp(base64_data, prefix="image", suffix=".jpg"):
 111 |     """Save base64 encoded data to a temporary file and return the path"""
 112 |     try:
 113 |         # Create a temporary file
 114 |         fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix=prefix)
 115 |         
 116 |         # Decode base64 data
 117 |         image_data = base64.b64decode(base64_data)
 118 |         
 119 |         # Write data to file
 120 |         with os.fdopen(fd, 'wb') as temp_file:
 121 |             temp_file.write(image_data)
 122 |             
 123 |         return temp_path
 124 |     except Exception as e:
 125 |         logger.error(f"Error saving base64 to temp file: {str(e)}")
 126 |         raise ValueError(f"Failed to save base64 data: {str(e)}")
 127 | 
 128 | @mcp.tool()
 129 | def get_model_directories() -> Dict[str, Any]:
 130 |     """Get information about configured model directories and available models"""
 131 |     directories = []
 132 |     
 133 |     for directory in CONFIG["model_dirs"]:
 134 |         dir_info = {
 135 |             "path": directory,
 136 |             "exists": os.path.exists(directory),
 137 |             "is_directory": os.path.isdir(directory) if os.path.exists(directory) else False,
 138 |             "models": []
 139 |         }
 140 |         
 141 |         if dir_info["exists"] and dir_info["is_directory"]:
 142 |             for filename in os.listdir(directory):
 143 |                 if filename.endswith(".pt"):
 144 |                     dir_info["models"].append(filename)
 145 |         
 146 |         directories.append(dir_info)
 147 |     
 148 |     return {
 149 |         "configured_directories": CONFIG["model_dirs"],
 150 |         "directory_details": directories,
 151 |         "available_models": list_available_models(),
 152 |         "loaded_models": []  # No longer track loaded models with CLI approach
 153 |     }
 154 | 
 155 | @mcp.tool()
 156 | def detect_objects(
 157 |     image_data: str,
 158 |     model_name: str = "yolov8n.pt",
 159 |     confidence: float = 0.25,
 160 |     save_results: bool = False,
 161 |     is_path: bool = False
 162 | ) -> Dict[str, Any]:
 163 |     """
 164 |     Return the YOLO CLI command for object detection without executing it
 165 |     
 166 |     Args:
 167 |         image_data: Base64 encoded image or file path (if is_path=True)
 168 |         model_name: YOLO model name
 169 |         confidence: Detection confidence threshold
 170 |         save_results: Whether to save results to disk
 171 |         is_path: Whether image_data is a file path
 172 |         
 173 |     Returns:
 174 |         Dictionary containing command that would be executed
 175 |     """
 176 |     try:
 177 |         # Determine source path
 178 |         if is_path:
 179 |             source_path = image_data
 180 |             if not os.path.exists(source_path):
 181 |                 return {
 182 |                     "error": f"Image file not found: {source_path}",
 183 |                     "source": source_path
 184 |                 }
 185 |         else:
 186 |             # For base64, we would save to temp file, but we'll just indicate this
 187 |             source_path = "[temp_file_from_base64]"
 188 |         
 189 |         # Determine full model path
 190 |         model_path = None
 191 |         for directory in CONFIG["model_dirs"]:
 192 |             potential_path = os.path.join(directory, model_name)
 193 |             if os.path.exists(potential_path):
 194 |                 model_path = potential_path
 195 |                 break
 196 |         
 197 |         if model_path is None:
 198 |             available = list_available_models()
 199 |             available_str = ", ".join(available) if available else "none"
 200 |             return {
 201 |                 "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
 202 |                 "source": image_data if is_path else "base64_image"
 203 |             }
 204 |         
 205 |         # Setup output directory for save_results
 206 |         output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
 207 |         
 208 |         # Build YOLO CLI command
 209 |         cmd_args = [
 210 |             "detect",  # Task
 211 |             "predict",  # Mode
 212 |             f"model={model_path}",
 213 |             f"source={source_path}",
 214 |             f"conf={confidence}",
 215 |             "format=json",  # Request JSON output for parsing
 216 |         ]
 217 |         
 218 |         if save_results:
 219 |             cmd_args.append(f"project={output_dir}")
 220 |             cmd_args.append("save=True")
 221 |         else:
 222 |             cmd_args.append("save=False")
 223 |         
 224 |         # Get command string without executing
 225 |         result = run_yolo_cli(cmd_args)
 226 |         
 227 |         # Return command information
 228 |         return {
 229 |             "status": "command_generated",
 230 |             "model_used": model_name,
 231 |             "model_path": model_path,
 232 |             "source": source_path,
 233 |             "command": result["command"],
 234 |             "note": "Command generated but not executed - detection results would be returned from actual execution",
 235 |             "parameters": {
 236 |                 "confidence": confidence,
 237 |                 "save_results": save_results,
 238 |                 "is_path": is_path,
 239 |                 "output_dir": output_dir if save_results else None
 240 |             }
 241 |         }
 242 |             
 243 |     except Exception as e:
 244 |         logger.error(f"Error in detect_objects command generation: {str(e)}")
 245 |         return {
 246 |             "error": f"Failed to generate detection command: {str(e)}",
 247 |             "source": image_data if is_path else "base64_image"
 248 |         }
 249 | 
 250 | @mcp.tool()
 251 | def segment_objects(
 252 |     image_data: str,
 253 |     model_name: str = "yolov11n-seg.pt",
 254 |     confidence: float = 0.25,
 255 |     save_results: bool = False,
 256 |     is_path: bool = False
 257 | ) -> Dict[str, Any]:
 258 |     """
 259 |     Return the YOLO CLI command for segmentation without executing it
 260 |     
 261 |     Args:
 262 |         image_data: Base64 encoded image or file path (if is_path=True)
 263 |         model_name: YOLO segmentation model name
 264 |         confidence: Detection confidence threshold
 265 |         save_results: Whether to save results to disk
 266 |         is_path: Whether image_data is a file path
 267 |         
 268 |     Returns:
 269 |         Dictionary containing command that would be executed
 270 |     """
 271 |     try:
 272 |         # Determine source path
 273 |         if is_path:
 274 |             source_path = image_data
 275 |             if not os.path.exists(source_path):
 276 |                 return {
 277 |                     "error": f"Image file not found: {source_path}",
 278 |                     "source": source_path
 279 |                 }
 280 |         else:
 281 |             # For base64, we would save to temp file, but we'll just indicate this
 282 |             source_path = "[temp_file_from_base64]"
 283 |         
 284 |         # Determine full model path
 285 |         model_path = None
 286 |         for directory in CONFIG["model_dirs"]:
 287 |             potential_path = os.path.join(directory, model_name)
 288 |             if os.path.exists(potential_path):
 289 |                 model_path = potential_path
 290 |                 break
 291 |         
 292 |         if model_path is None:
 293 |             available = list_available_models()
 294 |             available_str = ", ".join(available) if available else "none"
 295 |             return {
 296 |                 "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
 297 |                 "source": image_data if is_path else "base64_image"
 298 |             }
 299 |         
 300 |         # Setup output directory for save_results
 301 |         output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
 302 |         
 303 |         # Build YOLO CLI command
 304 |         cmd_args = [
 305 |             "segment",  # Task
 306 |             "predict",  # Mode
 307 |             f"model={model_path}",
 308 |             f"source={source_path}",
 309 |             f"conf={confidence}",
 310 |             "format=json",  # Request JSON output for parsing
 311 |         ]
 312 |         
 313 |         if save_results:
 314 |             cmd_args.append(f"project={output_dir}")
 315 |             cmd_args.append("save=True")
 316 |         else:
 317 |             cmd_args.append("save=False")
 318 |         
 319 |         # Get command string without executing
 320 |         result = run_yolo_cli(cmd_args)
 321 |         
 322 |         # Return command information
 323 |         return {
 324 |             "status": "command_generated",
 325 |             "model_used": model_name,
 326 |             "model_path": model_path,
 327 |             "source": source_path,
 328 |             "command": result["command"],
 329 |             "note": "Command generated but not executed - segmentation results would be returned from actual execution",
 330 |             "parameters": {
 331 |                 "confidence": confidence,
 332 |                 "save_results": save_results,
 333 |                 "is_path": is_path,
 334 |                 "output_dir": output_dir if save_results else None
 335 |             }
 336 |         }
 337 |             
 338 |     except Exception as e:
 339 |         logger.error(f"Error in segment_objects command generation: {str(e)}")
 340 |         return {
 341 |             "error": f"Failed to generate segmentation command: {str(e)}",
 342 |             "source": image_data if is_path else "base64_image"
 343 |         }
 344 | 
 345 | @mcp.tool()
 346 | def classify_image(
 347 |     image_data: str,
 348 |     model_name: str = "yolov11n-cls.pt",
 349 |     top_k: int = 5,
 350 |     save_results: bool = False,
 351 |     is_path: bool = False
 352 | ) -> Dict[str, Any]:
 353 |     """
 354 |     Return the YOLO CLI command for image classification without executing it
 355 |     
 356 |     Args:
 357 |         image_data: Base64 encoded image or file path (if is_path=True)
 358 |         model_name: YOLO classification model name
 359 |         top_k: Number of top categories to return
 360 |         save_results: Whether to save results to disk
 361 |         is_path: Whether image_data is a file path
 362 |         
 363 |     Returns:
 364 |         Dictionary containing command that would be executed
 365 |     """
 366 |     try:
 367 |         # Determine source path
 368 |         if is_path:
 369 |             source_path = image_data
 370 |             if not os.path.exists(source_path):
 371 |                 return {
 372 |                     "error": f"Image file not found: {source_path}",
 373 |                     "source": source_path
 374 |                 }
 375 |         else:
 376 |             # For base64, we would save to temp file, but we'll just indicate this
 377 |             source_path = "[temp_file_from_base64]"
 378 |         
 379 |         # Determine full model path
 380 |         model_path = None
 381 |         for directory in CONFIG["model_dirs"]:
 382 |             potential_path = os.path.join(directory, model_name)
 383 |             if os.path.exists(potential_path):
 384 |                 model_path = potential_path
 385 |                 break
 386 |         
 387 |         if model_path is None:
 388 |             available = list_available_models()
 389 |             available_str = ", ".join(available) if available else "none"
 390 |             return {
 391 |                 "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
 392 |                 "source": image_data if is_path else "base64_image"
 393 |             }
 394 |         
 395 |         # Setup output directory for save_results
 396 |         output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
 397 |         
 398 |         # Build YOLO CLI command
 399 |         cmd_args = [
 400 |             "classify",  # Task
 401 |             "predict",  # Mode
 402 |             f"model={model_path}",
 403 |             f"source={source_path}",
 404 |             "format=json",  # Request JSON output for parsing
 405 |         ]
 406 |         
 407 |         if save_results:
 408 |             cmd_args.append(f"project={output_dir}")
 409 |             cmd_args.append("save=True")
 410 |         else:
 411 |             cmd_args.append("save=False")
 412 |         
 413 |         # Get command string without executing
 414 |         result = run_yolo_cli(cmd_args)
 415 |         
 416 |         # Return command information
 417 |         return {
 418 |             "status": "command_generated",
 419 |             "model_used": model_name,
 420 |             "model_path": model_path,
 421 |             "source": source_path,
 422 |             "command": result["command"],
 423 |             "note": "Command generated but not executed - classification results would be returned from actual execution",
 424 |             "parameters": {
 425 |                 "top_k": top_k,
 426 |                 "save_results": save_results,
 427 |                 "is_path": is_path,
 428 |                 "output_dir": output_dir if save_results else None
 429 |             }
 430 |         }
 431 |             
 432 |     except Exception as e:
 433 |         logger.error(f"Error in classify_image command generation: {str(e)}")
 434 |         return {
 435 |             "error": f"Failed to generate classification command: {str(e)}",
 436 |             "source": image_data if is_path else "base64_image"
 437 |         }
 438 | 
 439 | @mcp.tool()
 440 | def track_objects(
 441 |     image_data: str,
 442 |     model_name: str = "yolov8n.pt",
 443 |     confidence: float = 0.25,
 444 |     tracker: str = "bytetrack.yaml",
 445 |     save_results: bool = False
 446 | ) -> Dict[str, Any]:
 447 |     """
 448 |     Return the YOLO CLI command for object tracking without executing it
 449 |     
 450 |     Args:
 451 |         image_data: Base64 encoded image
 452 |         model_name: YOLO model name
 453 |         confidence: Detection confidence threshold
 454 |         tracker: Tracker name to use (e.g., 'bytetrack.yaml', 'botsort.yaml')
 455 |         save_results: Whether to save results to disk
 456 |         
 457 |     Returns:
 458 |         Dictionary containing command that would be executed
 459 |     """
 460 |     try:
 461 |         # For base64, we would save to temp file, but we'll just indicate this
 462 |         source_path = "[temp_file_from_base64]"
 463 |         
 464 |         # Determine full model path
 465 |         model_path = None
 466 |         for directory in CONFIG["model_dirs"]:
 467 |             potential_path = os.path.join(directory, model_name)
 468 |             if os.path.exists(potential_path):
 469 |                 model_path = potential_path
 470 |                 break
 471 |         
 472 |         if model_path is None:
 473 |             available = list_available_models()
 474 |             available_str = ", ".join(available) if available else "none"
 475 |             return {
 476 |                 "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
 477 |             }
 478 |         
 479 |         # Setup output directory for save_results
 480 |         output_dir = os.path.join(tempfile.gettempdir(), "yolo_track_results")
 481 |         
 482 |         # Build YOLO CLI command
 483 |         cmd_args = [
 484 |             "track",  # Combined task and mode for tracking
 485 |             f"model={model_path}",
 486 |             f"source={source_path}",
 487 |             f"conf={confidence}",
 488 |             f"tracker={tracker}",
 489 |             "format=json",  # Request JSON output for parsing
 490 |         ]
 491 |         
 492 |         if save_results:
 493 |             cmd_args.append(f"project={output_dir}")
 494 |             cmd_args.append("save=True")
 495 |         else:
 496 |             cmd_args.append("save=False")
 497 |         
 498 |         # Get command string without executing
 499 |         result = run_yolo_cli(cmd_args)
 500 |         
 501 |         # Return command information
 502 |         return {
 503 |             "status": "command_generated",
 504 |             "model_used": model_name,
 505 |             "model_path": model_path,
 506 |             "source": source_path,
 507 |             "command": result["command"],
 508 |             "note": "Command generated but not executed - tracking results would be returned from actual execution",
 509 |             "parameters": {
 510 |                 "confidence": confidence,
 511 |                 "tracker": tracker,
 512 |                 "save_results": save_results,
 513 |                 "output_dir": output_dir if save_results else None
 514 |             }
 515 |         }
 516 |             
 517 |     except Exception as e:
 518 |         logger.error(f"Error in track_objects command generation: {str(e)}")
 519 |         return {
 520 |             "error": f"Failed to generate tracking command: {str(e)}"
 521 |         }
 522 | 
 523 | @mcp.tool()
 524 | def train_model(
 525 |     dataset_path: str,
 526 |     model_name: str = "yolov8n.pt",
 527 |     epochs: int = 100,
 528 |     imgsz: int = 640,
 529 |     batch: int = 16,
 530 |     name: str = "yolo_custom_model",
 531 |     project: str = "runs/train"
 532 | ) -> Dict[str, Any]:
 533 |     """
 534 |     Return the YOLO CLI command for model training without executing it
 535 |     
 536 |     Args:
 537 |         dataset_path: Path to YOLO format dataset
 538 |         model_name: Base model to start with
 539 |         epochs: Number of training epochs
 540 |         imgsz: Image size for training
 541 |         batch: Batch size
 542 |         name: Name for the training run
 543 |         project: Project directory
 544 |         
 545 |     Returns:
 546 |         Dictionary containing command that would be executed
 547 |     """
 548 |     # Validate dataset path
 549 |     if not os.path.exists(dataset_path):
 550 |         return {"error": f"Dataset not found: {dataset_path}"}
 551 |     
 552 |     # Determine full model path
 553 |     model_path = None
 554 |     for directory in CONFIG["model_dirs"]:
 555 |         potential_path = os.path.join(directory, model_name)
 556 |         if os.path.exists(potential_path):
 557 |             model_path = potential_path
 558 |             break
 559 |     
 560 |     if model_path is None:
 561 |         available = list_available_models()
 562 |         available_str = ", ".join(available) if available else "none"
 563 |         return {
 564 |             "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
 565 |         }
 566 |     
 567 |     # Determine task type based on model name
 568 |     task = "detect"  # Default task
 569 |     if "seg" in model_name:
 570 |         task = "segment"
 571 |     elif "pose" in model_name:
 572 |         task = "pose"
 573 |     elif "cls" in model_name:
 574 |         task = "classify"
 575 |     elif "obb" in model_name:
 576 |         task = "obb"
 577 |     
 578 |     # Build YOLO CLI command
 579 |     cmd_args = [
 580 |         task,  # Task
 581 |         "train",  # Mode
 582 |         f"model={model_path}",
 583 |         f"data={dataset_path}",
 584 |         f"epochs={epochs}",
 585 |         f"imgsz={imgsz}",
 586 |         f"batch={batch}",
 587 |         f"name={name}",
 588 |         f"project={project}"
 589 |     ]
 590 |     
 591 |     # Get command string without executing
 592 |     result = run_yolo_cli(cmd_args)
 593 |     
 594 |     # Return command information
 595 |     return {
 596 |         "status": "command_generated",
 597 |         "model_used": model_name,
 598 |         "model_path": model_path,
 599 |         "command": result["command"],
 600 |         "note": "Command generated but not executed - training would start with actual execution",
 601 |         "parameters": {
 602 |             "dataset_path": dataset_path,
 603 |             "epochs": epochs,
 604 |             "imgsz": imgsz,
 605 |             "batch": batch,
 606 |             "name": name,
 607 |             "project": project,
 608 |             "task": task
 609 |         }
 610 |     }
 611 | 
 612 | @mcp.tool()
 613 | def validate_model(
 614 |     model_path: str,
 615 |     data_path: str,
 616 |     imgsz: int = 640,
 617 |     batch: int = 16
 618 | ) -> Dict[str, Any]:
 619 |     """
 620 |     Return the YOLO CLI command for model validation without executing it
 621 |     
 622 |     Args:
 623 |         model_path: Path to YOLO model (.pt file)
 624 |         data_path: Path to YOLO format validation dataset
 625 |         imgsz: Image size for validation
 626 |         batch: Batch size
 627 |         
 628 |     Returns:
 629 |         Dictionary containing command that would be executed
 630 |     """
 631 |     # Validate model path
 632 |     if not os.path.exists(model_path):
 633 |         return {"error": f"Model file not found: {model_path}"}
 634 |     
 635 |     # Validate dataset path
 636 |     if not os.path.exists(data_path):
 637 |         return {"error": f"Dataset not found: {data_path}"}
 638 |     
 639 |     # Determine task type based on model name
 640 |     model_name = os.path.basename(model_path)
 641 |     task = "detect"  # Default task
 642 |     if "seg" in model_name:
 643 |         task = "segment"
 644 |     elif "pose" in model_name:
 645 |         task = "pose"
 646 |     elif "cls" in model_name:
 647 |         task = "classify"
 648 |     elif "obb" in model_name:
 649 |         task = "obb"
 650 |     
 651 |     # Build YOLO CLI command
 652 |     cmd_args = [
 653 |         task,  # Task
 654 |         "val",  # Mode
 655 |         f"model={model_path}",
 656 |         f"data={data_path}",
 657 |         f"imgsz={imgsz}",
 658 |         f"batch={batch}"
 659 |     ]
 660 |     
 661 |     # Get command string without executing
 662 |     result = run_yolo_cli(cmd_args)
 663 |     
 664 |     # Return command information
 665 |     return {
 666 |         "status": "command_generated",
 667 |         "model_path": model_path,
 668 |         "command": result["command"],
 669 |         "note": "Command generated but not executed - validation would begin with actual execution",
 670 |         "parameters": {
 671 |             "data_path": data_path,
 672 |             "imgsz": imgsz,
 673 |             "batch": batch,
 674 |             "task": task
 675 |         }
 676 |     }
 677 | 
 678 | @mcp.tool()
 679 | def export_model(
 680 |     model_path: str,
 681 |     format: str = "onnx",
 682 |     imgsz: int = 640
 683 | ) -> Dict[str, Any]:
 684 |     """
 685 |     Return the YOLO CLI command for model export without executing it
 686 |     
 687 |     Args:
 688 |         model_path: Path to YOLO model (.pt file)
 689 |         format: Export format (onnx, torchscript, openvino, etc.)
 690 |         imgsz: Image size for export
 691 |         
 692 |     Returns:
 693 |         Dictionary containing command that would be executed
 694 |     """
 695 |     # Validate model path
 696 |     if not os.path.exists(model_path):
 697 |         return {"error": f"Model file not found: {model_path}"}
 698 |     
 699 |     # Valid export formats
 700 |     valid_formats = [
 701 |         "torchscript", "onnx", "openvino", "engine", "coreml", "saved_model", 
 702 |         "pb", "tflite", "edgetpu", "tfjs", "paddle"
 703 |     ]
 704 |     
 705 |     if format not in valid_formats:
 706 |         return {"error": f"Invalid export format: {format}. Valid formats include: {', '.join(valid_formats)}"}
 707 |     
 708 |     # Build YOLO CLI command
 709 |     cmd_args = [
 710 |         "export",  # Combined task and mode for export
 711 |         f"model={model_path}",
 712 |         f"format={format}",
 713 |         f"imgsz={imgsz}"
 714 |     ]
 715 |     
 716 |     # Get command string without executing
 717 |     result = run_yolo_cli(cmd_args)
 718 |     
 719 |     # Return command information
 720 |     return {
 721 |         "status": "command_generated",
 722 |         "model_path": model_path,
 723 |         "command": result["command"],
 724 |         "note": "Command generated but not executed - export would begin with actual execution",
 725 |         "parameters": {
 726 |             "format": format,
 727 |             "imgsz": imgsz,
 728 |             "expected_output": f"{os.path.splitext(model_path)[0]}.{format}"
 729 |         }
 730 |     }
 731 | 
 732 | @mcp.tool()
 733 | def list_available_models() -> List[str]:
 734 |     """List available YOLO models that actually exist on disk in any configured directory"""
 735 |     # Common YOLO model patterns
 736 |     model_patterns = [
 737 |         "yolov11*.pt", 
 738 |         "yolov8*.pt"
 739 |     ]
 740 |     
 741 |     # Find all existing models in all configured directories
 742 |     available_models = set()
 743 |     for directory in CONFIG["model_dirs"]:
 744 |         if not os.path.exists(directory):
 745 |             continue
 746 |             
 747 |         # Check for model files directly
 748 |         for filename in os.listdir(directory):
 749 |             if filename.endswith(".pt") and any(
 750 |                 fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
 751 |             ):
 752 |                 available_models.add(filename)
 753 |     
 754 |     # Convert to sorted list
 755 |     result = sorted(list(available_models))
 756 |     
 757 |     if not result:
 758 |         logger.warning("No model files found in configured directories.")
 759 |         return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
 760 |     
 761 |     return result
 762 | 
 763 | @mcp.tool()
 764 | def start_camera_detection(
 765 |     model_name: str = "yolov8n.pt",
 766 |     confidence: float = 0.25,
 767 |     camera_id: int = 0
 768 | ) -> Dict[str, Any]:
 769 |     """
 770 |     Return the YOLO CLI command for starting camera detection without executing it
 771 |     
 772 |     Args:
 773 |         model_name: YOLO model name to use
 774 |         confidence: Detection confidence threshold
 775 |         camera_id: Camera device ID (0 is usually the default camera)
 776 |         
 777 |     Returns:
 778 |         Dictionary containing command that would be executed
 779 |     """
 780 |     # Determine full model path
 781 |     model_path = None
 782 |     for directory in CONFIG["model_dirs"]:
 783 |         potential_path = os.path.join(directory, model_name)
 784 |         if os.path.exists(potential_path):
 785 |             model_path = potential_path
 786 |             break
 787 |     
 788 |     if model_path is None:
 789 |         available = list_available_models()
 790 |         available_str = ", ".join(available) if available else "none"
 791 |         return {
 792 |             "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
 793 |         }
 794 |     
 795 |     # Determine task type based on model name
 796 |     task = "detect"  # Default task
 797 |     if "seg" in model_name:
 798 |         task = "segment"
 799 |     elif "pose" in model_name:
 800 |         task = "pose"
 801 |     elif "cls" in model_name:
 802 |         task = "classify"
 803 |     
 804 |     # Build YOLO CLI command
 805 |     cmd_args = [
 806 |         task,  # Task
 807 |         "predict",  # Mode
 808 |         f"model={model_path}",
 809 |         f"source={camera_id}",  # Camera source ID
 810 |         f"conf={confidence}",
 811 |         "format=json",
 812 |         "save=False",  # Don't save frames by default
 813 |         "show=True"   # Show GUI window for camera view
 814 |     ]
 815 |     
 816 |     # Get command string without executing
 817 |     result = run_yolo_cli(cmd_args)
 818 |     
 819 |     # Return command information
 820 |     return {
 821 |         "status": "command_generated",
 822 |         "model_used": model_name,
 823 |         "model_path": model_path,
 824 |         "command": result["command"],
 825 |         "note": "Command generated but not executed - camera would start with actual execution",
 826 |         "parameters": {
 827 |             "confidence": confidence,
 828 |             "camera_id": camera_id,
 829 |             "task": task
 830 |         }
 831 |     }
 832 | 
 833 | @mcp.tool()
 834 | def stop_camera_detection() -> Dict[str, Any]:
 835 |     """
 836 |     Simulate stopping camera detection (no actual command to execute)
 837 |     
 838 |     Returns:
 839 |         Information message
 840 |     """
 841 |     return {
 842 |         "status": "command_generated",
 843 |         "message": "To stop camera detection, close the YOLO window or press 'q' in the terminal",
 844 |         "note": "Since commands are not executed, no actual camera is running"
 845 |     }
 846 | 
 847 | @mcp.tool()
 848 | def get_camera_detections() -> Dict[str, Any]:
 849 |     """
 850 |     Simulate getting latest camera detections (no actual command to execute)
 851 |     
 852 |     Returns:
 853 |         Information message
 854 |     """
 855 |     return {
 856 |         "status": "command_generated",
 857 |         "message": "Camera detections would be returned here if a camera was running",
 858 |         "note": "Since commands are not executed, no camera is running and no detections are available"
 859 |     }
 860 | 
 861 | @mcp.tool()
 862 | def comprehensive_image_analysis(
 863 |     image_path: str,
 864 |     confidence: float = 0.25,
 865 |     save_results: bool = False
 866 | ) -> Dict[str, Any]:
 867 |     """
 868 |     Return the YOLO CLI commands for comprehensive image analysis without executing them
 869 |     
 870 |     Args:
 871 |         image_path: Path to the image file
 872 |         confidence: Detection confidence threshold
 873 |         save_results: Whether to save results to disk
 874 |         
 875 |     Returns:
 876 |         Dictionary containing commands that would be executed
 877 |     """
 878 |     if not os.path.exists(image_path):
 879 |         return {"error": f"Image file not found: {image_path}"}
 880 |     
 881 |     commands = []
 882 |     
 883 |     # 1. Object detection
 884 |     detect_result = detect_objects(
 885 |         image_data=image_path,
 886 |         model_name="yolov11n.pt",
 887 |         confidence=confidence,
 888 |         save_results=save_results,
 889 |         is_path=True
 890 |     )
 891 |     if "command" in detect_result:
 892 |         commands.append({
 893 |             "task": "object_detection",
 894 |             "command": detect_result["command"]
 895 |         })
 896 |     
 897 |     # 2. Scene classification
 898 |     try:
 899 |         cls_result = classify_image(
 900 |             image_data=image_path,
 901 |             model_name="yolov8n-cls.pt",
 902 |             top_k=3,
 903 |             save_results=save_results,
 904 |             is_path=True
 905 |         )
 906 |         if "command" in cls_result:
 907 |             commands.append({
 908 |                 "task": "classification",
 909 |                 "command": cls_result["command"]
 910 |             })
 911 |     except Exception as e:
 912 |         logger.error(f"Error generating classification command: {str(e)}")
 913 |     
 914 |     # 3. Pose detection if available
 915 |     for directory in CONFIG["model_dirs"]:
 916 |         pose_model_path = os.path.join(directory, "yolov8n-pose.pt")
 917 |         if os.path.exists(pose_model_path):
 918 |             # Build YOLO CLI command for pose detection
 919 |             cmd_args = [
 920 |                 "pose",  # Task
 921 |                 "predict",  # Mode
 922 |                 f"model={pose_model_path}",
 923 |                 f"source={image_path}",
 924 |                 f"conf={confidence}",
 925 |                 "format=json",
 926 |             ]
 927 |             
 928 |             if save_results:
 929 |                 output_dir = os.path.join(tempfile.gettempdir(), "yolo_pose_results")
 930 |                 cmd_args.append(f"project={output_dir}")
 931 |                 cmd_args.append("save=True")
 932 |             else:
 933 |                 cmd_args.append("save=False")
 934 |             
 935 |             result = run_yolo_cli(cmd_args)
 936 |             
 937 |             commands.append({
 938 |                 "task": "pose_detection",
 939 |                 "command": result["command"]
 940 |             })
 941 |             break
 942 |     
 943 |     return {
 944 |         "status": "commands_generated",
 945 |         "image_path": image_path,
 946 |         "commands": commands,
 947 |         "note": "Commands generated but not executed - comprehensive analysis would occur with actual execution",
 948 |         "parameters": {
 949 |             "confidence": confidence,
 950 |             "save_results": save_results
 951 |         }
 952 |     }
 953 | 
 954 | @mcp.tool()
 955 | def analyze_image_from_path(
 956 |     image_path: str,
 957 |     model_name: str = "yolov8n.pt",
 958 |     confidence: float = 0.25,
 959 |     save_results: bool = False
 960 | ) -> Dict[str, Any]:
 961 |     """
 962 |     Return the YOLO CLI command for image analysis without executing it
 963 |     
 964 |     Args:
 965 |         image_path: Path to the image file
 966 |         model_name: YOLO model name
 967 |         confidence: Detection confidence threshold
 968 |         save_results: Whether to save results to disk
 969 |         
 970 |     Returns:
 971 |         Dictionary containing command that would be executed
 972 |     """
 973 |     try:
 974 |         # Call detect_objects function with is_path=True
 975 |         return detect_objects(
 976 |             image_data=image_path,
 977 |             model_name=model_name,
 978 |             confidence=confidence,
 979 |             save_results=save_results,
 980 |             is_path=True
 981 |         )
 982 |     except Exception as e:
 983 |         return {
 984 |             "error": f"Failed to generate analysis command: {str(e)}",
 985 |             "image_path": image_path
 986 |         }
 987 | 
 988 | @mcp.tool()
 989 | def test_connection() -> Dict[str, Any]:
 990 |     """
 991 |     Test if YOLO CLI service is available
 992 |     
 993 |     Returns:
 994 |         Status information and available tools
 995 |     """
 996 |     # Build a simple YOLO CLI version command
 997 |     cmd_args = ["--version"]
 998 |     result = run_yolo_cli(cmd_args)
 999 |     
1000 |     return {
1001 |         "status": "YOLO CLI command generator is running",
1002 |         "command_mode": "Command generation only, no execution",
1003 |         "version_command": result["command"],
1004 |         "available_models": list_available_models(),
1005 |         "available_tools": [
1006 |             "list_available_models", "detect_objects", "segment_objects", 
1007 |             "classify_image", "track_objects", "train_model", "validate_model", 
1008 |             "export_model", "start_camera_detection", "stop_camera_detection", 
1009 |             "get_camera_detections", "test_connection",
1010 |             # Additional tools
1011 |             "analyze_image_from_path",
1012 |             "comprehensive_image_analysis"
1013 |         ],
1014 |         "note": "This service only generates YOLO commands without executing them"
1015 |     }
1016 | 
1017 | # Modify the main execution section
1018 | if __name__ == "__main__":
1019 |     import platform
1020 |     
1021 |     logger.info("Starting YOLO CLI command generator service")
1022 |     logger.info(f"Platform: {platform.system()} {platform.release()}")
1023 |     logger.info("⚠️ Commands will be generated but NOT executed")
1024 |     
1025 |     # Initialize and run server
1026 |     logger.info("Starting MCP server...")
1027 |     mcp.run(transport='stdio')
```

--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------

```python
   1 | # server.py
   2 | import fnmatch
   3 | import os
   4 | import base64
   5 | import cv2
   6 | import time
   7 | import threading
   8 | from io import BytesIO
   9 | from typing import List, Dict, Any, Optional, Union
  10 | import numpy as np
  11 | from PIL import Image
  12 | 
  13 | from mcp.server.fastmcp import FastMCP
  14 | from ultralytics import YOLO
  15 | 
  16 | # Add this near the top of server.py with other imports
  17 | import os.path
  18 | import sys
  19 | import logging
  20 | import contextlib
  21 | import logging
  22 | import sys
  23 | import contextlib
  24 | import signal
  25 | import atexit
  26 | 
  27 | # Set up logging configuration - add this near the top of the file
  28 | logging.basicConfig(
  29 |     level=logging.INFO,
  30 |     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  31 |     handlers=[
  32 |         logging.FileHandler("yolo_service.log"),
  33 |         logging.StreamHandler(sys.stderr)
  34 |     ]
  35 | )
  36 | 
  37 | logger = logging.getLogger('yolo_service')
  38 | 
  39 | # Global variables for camera control
  40 | camera_running = False
  41 | camera_thread = None
  42 | detection_results = []
  43 | camera_last_access_time = 0
  44 | CAMERA_INACTIVITY_TIMEOUT = 60  # Auto-shutdown after 60 seconds of inactivity
  45 | 
  46 | @contextlib.contextmanager
  47 | def redirect_stdout_to_stderr():
  48 |     old_stdout = sys.stdout
  49 |     sys.stdout = sys.stderr
  50 |     try:
  51 |         yield
  52 |     finally:
  53 |         sys.stdout = old_stdout
  54 | 
  55 | def camera_watchdog_thread():
  56 |     """Monitor thread that auto-stops the camera after inactivity"""
  57 |     global camera_running, camera_last_access_time
  58 |     
  59 |     logger.info("Camera watchdog thread started")
  60 |     
  61 |     while True:
  62 |         # Sleep for a short time to avoid excessive CPU usage
  63 |         time.sleep(5)
  64 |         
  65 |         # Check if camera is running
  66 |         if camera_running:
  67 |             current_time = time.time()
  68 |             elapsed_time = current_time - camera_last_access_time
  69 |             
  70 |             # If no access for more than the timeout, auto-stop
  71 |             if elapsed_time > CAMERA_INACTIVITY_TIMEOUT:
  72 |                 logger.info(f"Auto-stopping camera after {elapsed_time:.1f} seconds of inactivity")
  73 |                 stop_camera_detection()
  74 |         else:
  75 |             # If camera is not running, no need to check frequently
  76 |             time.sleep(10)
  77 | 
  78 | 
  79 | def load_image(image_source, is_path=False):
  80 |     """
  81 |     Load image from file path or base64 data
  82 |     
  83 |     Args:
  84 |         image_source: File path or base64 encoded image data
  85 |         is_path: Whether image_source is a file path
  86 |         
  87 |     Returns:
  88 |         PIL Image object
  89 |     """
  90 |     try:
  91 |         if is_path:
  92 |             # Load image from file path
  93 |             if os.path.exists(image_source):
  94 |                 return Image.open(image_source)
  95 |             else:
  96 |                 raise FileNotFoundError(f"Image file not found: {image_source}")
  97 |         else:
  98 |             # Load image from base64 data
  99 |             image_bytes = base64.b64decode(image_source)
 100 |             return Image.open(BytesIO(image_bytes))
 101 |     except Exception as e:
 102 |         raise ValueError(f"Failed to load image: {str(e)}")
 103 | 
 104 | # Create MCP server
 105 | mcp = FastMCP("YOLO_Service")
 106 | 
 107 | # Global model cache
 108 | models = {}
 109 | 
 110 | def get_model(model_name: str = "yolov8n.pt") -> YOLO:
 111 |     """Get or load YOLO model from any of the configured model directories"""
 112 |     if model_name in models:
 113 |         return models[model_name]
 114 |     
 115 |     # Try to find the model in any of the configured directories
 116 |     model_path = None
 117 |     for directory in CONFIG["model_dirs"]:
 118 |         potential_path = os.path.join(directory, model_name)
 119 |         if os.path.exists(potential_path):
 120 |             model_path = potential_path
 121 |             break
 122 |     
 123 |     if model_path is None:
 124 |         available = list_available_models()
 125 |         available_str = ", ".join(available) if available else "none"
 126 |         raise FileNotFoundError(f"Model '{model_name}' not found in any configured directories. Available models: {available_str}")
 127 |     
 128 |     # Load and cache the model - with stdout redirected
 129 |     logger.info(f"Loading model: {model_name} from {model_path}")
 130 |     with redirect_stdout_to_stderr():
 131 |         models[model_name] = YOLO(model_path)
 132 |     return models[model_name]
 133 | 
 134 | # Global configuration
 135 | CONFIG = {
 136 |     "model_dirs": [
 137 |         ".",  # Current directory
 138 |         "./models",  # Models subdirectory
 139 |         os.path.join(os.path.dirname(os.path.abspath(__file__)), "models"),  # Absolute path to models
 140 |         # Add any other potential model directories here
 141 |     ]
 142 | }
 143 | 
 144 | 
 145 | 
 146 | # Add a new tool to get information about model directories
 147 | @mcp.tool()
 148 | def get_model_directories() -> Dict[str, Any]:
 149 |     """Get information about configured model directories and available models"""
 150 |     directories = []
 151 |     
 152 |     for directory in CONFIG["model_dirs"]:
 153 |         dir_info = {
 154 |             "path": directory,
 155 |             "exists": os.path.exists(directory),
 156 |             "is_directory": os.path.isdir(directory) if os.path.exists(directory) else False,
 157 |             "models": []
 158 |         }
 159 |         
 160 |         if dir_info["exists"] and dir_info["is_directory"]:
 161 |             for filename in os.listdir(directory):
 162 |                 if filename.endswith(".pt"):
 163 |                     dir_info["models"].append(filename)
 164 |         
 165 |         directories.append(dir_info)
 166 |     
 167 |     return {
 168 |         "configured_directories": CONFIG["model_dirs"],
 169 |         "directory_details": directories,
 170 |         "available_models": list_available_models(),
 171 |         "loaded_models": list(models.keys())
 172 |     }
 173 | 
 174 | @mcp.tool()
 175 | def detect_objects(
 176 |     image_data: str,
 177 |     model_name: str = "yolov8n.pt",
 178 |     confidence: float = 0.25,
 179 |     save_results: bool = False,
 180 |     is_path: bool = False
 181 | ) -> Dict[str, Any]:
 182 |     """
 183 |     Detect objects in an image using YOLO
 184 |     
 185 |     Args:
 186 |         image_data: Base64 encoded image or file path (if is_path=True)
 187 |         model_name: YOLO model name
 188 |         confidence: Detection confidence threshold
 189 |         save_results: Whether to save results to disk
 190 |         is_path: Whether image_data is a file path
 191 |         
 192 |     Returns:
 193 |         Dictionary containing detection results
 194 |     """
 195 |     try:
 196 |         # Load image (supports path or base64)
 197 |         image = load_image(image_data, is_path=is_path)
 198 |         
 199 |         # Load model and perform detection - with stdout redirected
 200 |         model = get_model(model_name)
 201 |         with redirect_stdout_to_stderr():  # Ensure all YOLO outputs go to stderr
 202 |             results = model.predict(image, conf=confidence, save=save_results)
 203 |         
 204 |         # Format results
 205 |         formatted_results = []
 206 |         for result in results:
 207 |             boxes = result.boxes
 208 |             detections = []
 209 |             
 210 |             for i in range(len(boxes)):
 211 |                 box = boxes[i]
 212 |                 x1, y1, x2, y2 = box.xyxy[0].tolist()
 213 |                 confidence = float(box.conf[0])
 214 |                 class_id = int(box.cls[0])
 215 |                 class_name = result.names[class_id]
 216 |                 
 217 |                 detections.append({
 218 |                     "box": [x1, y1, x2, y2],
 219 |                     "confidence": confidence,
 220 |                     "class_id": class_id,
 221 |                     "class_name": class_name
 222 |                 })
 223 |             
 224 |             formatted_results.append({
 225 |                 "detections": detections,
 226 |                 "image_shape": result.orig_shape
 227 |             })
 228 |         
 229 |         return {
 230 |             "results": formatted_results,
 231 |             "model_used": model_name,
 232 |             "total_detections": sum(len(r["detections"]) for r in formatted_results),
 233 |             "source": image_data if is_path else "base64_image"
 234 |         }
 235 |     except Exception as e:
 236 |         logger.error(f"Error in detect_objects: {str(e)}")
 237 |         return {
 238 |             "error": f"Failed to detect objects: {str(e)}",
 239 |             "source": image_data if is_path else "base64_image"
 240 |         }
 241 | 
 242 | @mcp.tool()
 243 | def segment_objects(
 244 |     image_data: str,
 245 |     model_name: str = "yolov11n-seg.pt",
 246 |     confidence: float = 0.25,
 247 |     save_results: bool = False,
 248 |     is_path: bool = False
 249 | ) -> Dict[str, Any]:
 250 |     """
 251 |     Perform instance segmentation on an image using YOLO
 252 |     
 253 |     Args:
 254 |         image_data: Base64 encoded image or file path (if is_path=True)
 255 |         model_name: YOLO segmentation model name
 256 |         confidence: Detection confidence threshold
 257 |         save_results: Whether to save results to disk
 258 |         is_path: Whether image_data is a file path
 259 |         
 260 |     Returns:
 261 |         Dictionary containing segmentation results
 262 |     """
 263 |     try:
 264 |         # Load image (supports path or base64)
 265 |         image = load_image(image_data, is_path=is_path)
 266 |         
 267 |         # Load model and perform segmentation
 268 |         model = get_model(model_name)
 269 |         with redirect_stdout_to_stderr():  # Add this context manager
 270 |             results = model.predict(image, conf=confidence, save=save_results)
 271 |         
 272 |         # Format results
 273 |         formatted_results = []
 274 |         for result in results:
 275 |             if not hasattr(result, 'masks') or result.masks is None:
 276 |                 continue
 277 |                 
 278 |             boxes = result.boxes
 279 |             masks = result.masks
 280 |             segments = []
 281 |             
 282 |             for i in range(len(boxes)):
 283 |                 box = boxes[i]
 284 |                 mask = masks[i].data[0].cpu().numpy() if masks else None
 285 |                 
 286 |                 x1, y1, x2, y2 = box.xyxy[0].tolist()
 287 |                 confidence = float(box.conf[0])
 288 |                 class_id = int(box.cls[0])
 289 |                 class_name = result.names[class_id]
 290 |                 
 291 |                 segment = {
 292 |                     "box": [x1, y1, x2, y2],
 293 |                     "confidence": confidence,
 294 |                     "class_id": class_id,
 295 |                     "class_name": class_name
 296 |                 }
 297 |                 
 298 |                 if mask is not None:
 299 |                     # Convert binary mask to simplified format for API response
 300 |                     segment["mask"] = mask.tolist()
 301 |                 
 302 |                 segments.append(segment)
 303 |             
 304 |             formatted_results.append({
 305 |                 "segments": segments,
 306 |                 "image_shape": result.orig_shape
 307 |             })
 308 |         
 309 |         return {
 310 |             "results": formatted_results,
 311 |             "model_used": model_name,
 312 |             "total_segments": sum(len(r["segments"]) for r in formatted_results),
 313 |             "source": image_data if is_path else "base64_image"
 314 |         }
 315 |     except Exception as e:
 316 |         return {
 317 |             "error": f"Failed to segment objects: {str(e)}",
 318 |             "source": image_data if is_path else "base64_image"
 319 |         }
 320 | 
 321 | 
 322 | @mcp.tool()
 323 | def classify_image(
 324 |     image_data: str,
 325 |     model_name: str = "yolov11n-cls.pt",
 326 |     top_k: int = 5,
 327 |     save_results: bool = False,
 328 |     is_path: bool = False
 329 | ) -> Dict[str, Any]:
 330 |     """
 331 |     Classify an image using YOLO classification model
 332 |     
 333 |     Args:
 334 |         image_data: Base64 encoded image or file path (if is_path=True)
 335 |         model_name: YOLO classification model name
 336 |         top_k: Number of top categories to return
 337 |         save_results: Whether to save results to disk
 338 |         is_path: Whether image_data is a file path
 339 |         
 340 |     Returns:
 341 |         Dictionary containing classification results
 342 |     """
 343 |     try:
 344 |         # Load image (supports path or base64)
 345 |         image = load_image(image_data, is_path=is_path)
 346 |         
 347 |         # Load model and perform classification
 348 |         model = get_model(model_name)
 349 |         with redirect_stdout_to_stderr():  # Add this context manager
 350 |             results = model.predict(image, save=save_results)
 351 |         
 352 |         # Format results
 353 |         formatted_results = []
 354 |         for result in results:
 355 |             if not hasattr(result, 'probs') or result.probs is None:
 356 |                 continue
 357 |                 
 358 |             probs = result.probs
 359 |             top_indices = probs.top5
 360 |             top_probs = probs.top5conf.tolist()
 361 |             top_classes = [result.names[idx] for idx in top_indices]
 362 |             
 363 |             classifications = [
 364 |                 {"class_id": int(idx), "class_name": name, "probability": float(prob)}
 365 |                 for idx, name, prob in zip(top_indices[:top_k], top_classes[:top_k], top_probs[:top_k])
 366 |             ]
 367 |             
 368 |             formatted_results.append({
 369 |                 "classifications": classifications,
 370 |                 "image_shape": result.orig_shape
 371 |             })
 372 |         
 373 |         return {
 374 |             "results": formatted_results,
 375 |             "model_used": model_name,
 376 |             "top_k": top_k,
 377 |             "source": image_data if is_path else "base64_image"
 378 |         }
 379 |     except Exception as e:
 380 |         return {
 381 |             "error": f"Failed to classify image: {str(e)}",
 382 |             "source": image_data if is_path else "base64_image"
 383 |         }
 384 | 
 385 | 
 386 | @mcp.tool()
 387 | def track_objects(
 388 |     image_data: str,
 389 |     model_name: str = "yolov8n.pt",
 390 |     confidence: float = 0.25,
 391 |     tracker: str = "bytetrack.yaml",
 392 |     save_results: bool = False
 393 | ) -> Dict[str, Any]:
 394 |     """
 395 |     Track objects in an image sequence using YOLO
 396 |     
 397 |     Args:
 398 |         image_data: Base64 encoded image
 399 |         model_name: YOLO model name
 400 |         confidence: Detection confidence threshold
 401 |         tracker: Tracker name to use (e.g., 'bytetrack.yaml', 'botsort.yaml')
 402 |         save_results: Whether to save results to disk
 403 |         
 404 |     Returns:
 405 |         Dictionary containing tracking results
 406 |     """
 407 |     # Decode Base64 image
 408 |     image_bytes = base64.b64decode(image_data)
 409 |     image = Image.open(BytesIO(image_bytes))
 410 |     
 411 |     # Load model and perform tracking
 412 |     model = get_model(model_name)
 413 |     # Add redirect_stdout_to_stderr context manager
 414 |     with redirect_stdout_to_stderr():
 415 |         results = model.track(image, conf=confidence, tracker=tracker, save=save_results)
 416 |     
 417 |     # Format results
 418 |     formatted_results = []
 419 |     for result in results:
 420 |         if not hasattr(result, 'boxes') or result.boxes is None:
 421 |             continue
 422 |             
 423 |         boxes = result.boxes
 424 |         tracks = []
 425 |         
 426 |         for i in range(len(boxes)):
 427 |             box = boxes[i]
 428 |             x1, y1, x2, y2 = box.xyxy[0].tolist()
 429 |             confidence = float(box.conf[0])
 430 |             class_id = int(box.cls[0])
 431 |             class_name = result.names[class_id]
 432 |             
 433 |             # Extract track ID (if any)
 434 |             track_id = int(box.id[0]) if box.id is not None else None
 435 |             
 436 |             track = {
 437 |                 "box": [x1, y1, x2, y2],
 438 |                 "confidence": confidence,
 439 |                 "class_id": class_id,
 440 |                 "class_name": class_name,
 441 |                 "track_id": track_id
 442 |             }
 443 |             
 444 |             tracks.append(track)
 445 |         
 446 |         formatted_results.append({
 447 |             "tracks": tracks,
 448 |             "image_shape": result.orig_shape
 449 |         })
 450 |     
 451 |     return {
 452 |         "results": formatted_results,
 453 |         "model_used": model_name,
 454 |         "tracker": tracker,
 455 |         "total_tracks": sum(len(r["tracks"]) for r in formatted_results)
 456 |     }
 457 | 
 458 | # 3. FIX train_model FUNCTION TO USE REDIRECTION:
 459 | @mcp.tool()
 460 | def train_model(
 461 |     dataset_path: str,
 462 |     model_name: str = "yolov8n.pt",
 463 |     epochs: int = 100,
 464 |     imgsz: int = 640,
 465 |     batch: int = 16,
 466 |     name: str = "yolo_custom_model",
 467 |     project: str = "runs/train"
 468 | ) -> Dict[str, Any]:
 469 |     """
 470 |     Train a YOLO model on a custom dataset
 471 |     
 472 |     Args:
 473 |         dataset_path: Path to YOLO format dataset
 474 |         model_name: Base model to start with
 475 |         epochs: Number of training epochs
 476 |         imgsz: Image size for training
 477 |         batch: Batch size
 478 |         name: Name for the training run
 479 |         project: Project directory
 480 |         
 481 |     Returns:
 482 |         Dictionary containing training results
 483 |     """
 484 |     # Validate dataset path
 485 |     if not os.path.exists(dataset_path):
 486 |         return {"error": f"Dataset not found: {dataset_path}"}
 487 |     
 488 |     # Initialize model
 489 |     model = get_model(model_name)
 490 |     
 491 |     # Train model
 492 |     try:
 493 |         # Add redirect_stdout_to_stderr context manager
 494 |         with redirect_stdout_to_stderr():
 495 |             results = model.train(
 496 |                 data=dataset_path,
 497 |                 epochs=epochs,
 498 |                 imgsz=imgsz,
 499 |                 batch=batch,
 500 |                 name=name,
 501 |                 project=project
 502 |             )
 503 |         
 504 |         # Get best model path
 505 |         best_model_path = os.path.join(project, name, "weights", "best.pt")
 506 |         
 507 |         return {
 508 |             "status": "success",
 509 |             "model_path": best_model_path,
 510 |             "epochs_completed": epochs,
 511 |             "final_metrics": {
 512 |                 "precision": float(results.results_dict.get("metrics/precision(B)", 0)),
 513 |                 "recall": float(results.results_dict.get("metrics/recall(B)", 0)),
 514 |                 "mAP50": float(results.results_dict.get("metrics/mAP50(B)", 0)),
 515 |                 "mAP50-95": float(results.results_dict.get("metrics/mAP50-95(B)", 0))
 516 |             }
 517 |         }
 518 |     except Exception as e:
 519 |         return {"error": f"Training failed: {str(e)}"}
 520 | 
 521 | # 4. FIX validate_model FUNCTION TO USE REDIRECTION:
 522 | @mcp.tool()
 523 | def validate_model(
 524 |     model_path: str,
 525 |     data_path: str,
 526 |     imgsz: int = 640,
 527 |     batch: int = 16
 528 | ) -> Dict[str, Any]:
 529 |     """
 530 |     Validate a YOLO model on a dataset
 531 |     
 532 |     Args:
 533 |         model_path: Path to YOLO model (.pt file)
 534 |         data_path: Path to YOLO format validation dataset
 535 |         imgsz: Image size for validation
 536 |         batch: Batch size
 537 |         
 538 |     Returns:
 539 |         Dictionary containing validation results
 540 |     """
 541 |     # Validate model path
 542 |     if not os.path.exists(model_path):
 543 |         return {"error": f"Model file not found: {model_path}"}
 544 |     
 545 |     # Validate dataset path
 546 |     if not os.path.exists(data_path):
 547 |         return {"error": f"Dataset not found: {data_path}"}
 548 |     
 549 |     # Load model
 550 |     try:
 551 |         model = get_model(model_path)
 552 |     except Exception as e:
 553 |         return {"error": f"Failed to load model: {str(e)}"}
 554 |     
 555 |     # Validate model
 556 |     try:
 557 |         # Add redirect_stdout_to_stderr context manager
 558 |         with redirect_stdout_to_stderr():
 559 |             results = model.val(data=data_path, imgsz=imgsz, batch=batch)
 560 |         
 561 |         return {
 562 |             "status": "success",
 563 |             "metrics": {
 564 |                 "precision": float(results.results_dict.get("metrics/precision(B)", 0)),
 565 |                 "recall": float(results.results_dict.get("metrics/recall(B)", 0)),
 566 |                 "mAP50": float(results.results_dict.get("metrics/mAP50(B)", 0)),
 567 |                 "mAP50-95": float(results.results_dict.get("metrics/mAP50-95(B)", 0))
 568 |             }
 569 |         }
 570 |     except Exception as e:
 571 |         return {"error": f"Validation failed: {str(e)}"}
 572 | 
 573 | # 5. FIX export_model FUNCTION TO USE REDIRECTION:
 574 | @mcp.tool()
 575 | def export_model(
 576 |     model_path: str,
 577 |     format: str = "onnx",
 578 |     imgsz: int = 640
 579 | ) -> Dict[str, Any]:
 580 |     """
 581 |     Export a YOLO model to different formats
 582 |     
 583 |     Args:
 584 |         model_path: Path to YOLO model (.pt file)
 585 |         format: Export format (onnx, torchscript, openvino, etc.)
 586 |         imgsz: Image size for export
 587 |         
 588 |     Returns:
 589 |         Dictionary containing export results
 590 |     """
 591 |     # Validate model path
 592 |     if not os.path.exists(model_path):
 593 |         return {"error": f"Model file not found: {model_path}"}
 594 |     
 595 |     # Valid export formats
 596 |     valid_formats = [
 597 |         "torchscript", "onnx", "openvino", "engine", "coreml", "saved_model", 
 598 |         "pb", "tflite", "edgetpu", "tfjs", "paddle"
 599 |     ]
 600 |     
 601 |     if format not in valid_formats:
 602 |         return {"error": f"Invalid export format: {format}. Valid formats include: {', '.join(valid_formats)}"}
 603 |     
 604 |     # Load model
 605 |     try:
 606 |         model = get_model(model_path)
 607 |     except Exception as e:
 608 |         return {"error": f"Failed to load model: {str(e)}"}
 609 |     
 610 |     # Export model
 611 |     try:
 612 |         # Add redirect_stdout_to_stderr context manager
 613 |         with redirect_stdout_to_stderr():
 614 |             export_path = model.export(format=format, imgsz=imgsz)
 615 |         
 616 |         return {
 617 |             "status": "success",
 618 |             "export_path": str(export_path),
 619 |             "format": format
 620 |         }
 621 |     except Exception as e:
 622 |         return {"error": f"Export failed: {str(e)}"}
 623 | 
 624 | # 6. ADD REDIRECTION TO get_model_info FUNCTION:
 625 | @mcp.resource("model_info/{model_name}")
 626 | def get_model_info(model_name: str) -> Dict[str, Any]:
 627 |     """
 628 |     Get information about a YOLO model
 629 |     
 630 |     Args:
 631 |         model_name: YOLO model name
 632 |         
 633 |     Returns:
 634 |         Dictionary containing model information
 635 |     """
 636 |     try:
 637 |         model = get_model(model_name)
 638 |         
 639 |         # Get model task
 640 |         task = 'detect'  # Default task
 641 |         if 'seg' in model_name:
 642 |             task = 'segment'
 643 |         elif 'pose' in model_name:
 644 |             task = 'pose'
 645 |         elif 'cls' in model_name:
 646 |             task = 'classify'
 647 |         elif 'obb' in model_name:
 648 |             task = 'obb'
 649 |         
 650 |         # Make sure any model property access that might trigger output is wrapped
 651 |         with redirect_stdout_to_stderr():
 652 |             yaml_str = str(model.yaml)
 653 |             pt_path = str(model.pt_path) if hasattr(model, 'pt_path') else None
 654 |             class_names = model.names
 655 |         
 656 |         # Get model info
 657 |         return {
 658 |             "model_name": model_name,
 659 |             "task": task,
 660 |             "yaml": yaml_str,
 661 |             "pt_path": pt_path,
 662 |             "class_names": class_names
 663 |         }
 664 |     except Exception as e:
 665 |         return {"error": f"Failed to get model info: {str(e)}"}
 666 | 
 667 | # 7. MODIFY list_available_models to use logging instead of print
 668 | @mcp.tool()
 669 | def list_available_models() -> List[str]:
 670 |     """List available YOLO models that actually exist on disk in any configured directory"""
 671 |     # Common YOLO model patterns
 672 |     model_patterns = [
 673 |         "yolov11*.pt", 
 674 |         "yolov8*.pt"
 675 |     ]
 676 |     
 677 |     # Find all existing models in all configured directories
 678 |     available_models = set()
 679 |     for directory in CONFIG["model_dirs"]:
 680 |         if not os.path.exists(directory):
 681 |             continue
 682 |             
 683 |         # Check for model files directly
 684 |         for filename in os.listdir(directory):
 685 |             if filename.endswith(".pt") and any(
 686 |                 fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
 687 |             ):
 688 |                 available_models.add(filename)
 689 |     
 690 |     # Convert to sorted list
 691 |     result = sorted(list(available_models))
 692 |     
 693 |     if not result:
 694 |         # Replace print with logger
 695 |         logger.warning("No model files found in configured directories.")
 696 |         return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
 697 |     
 698 |     return result
 699 | @mcp.resource("model_info/{model_name}")
 700 | def get_model_info(model_name: str) -> Dict[str, Any]:
 701 |     """
 702 |     Get information about a YOLO model
 703 |     
 704 |     Args:
 705 |         model_name: YOLO model name
 706 |         
 707 |     Returns:
 708 |         Dictionary containing model information
 709 |     """
 710 |     try:
 711 |         model = get_model(model_name)
 712 |         
 713 |         # Get model task
 714 |         task = 'detect'  # Default task
 715 |         if 'seg' in model_name:
 716 |             task = 'segment'
 717 |         elif 'pose' in model_name:
 718 |             task = 'pose'
 719 |         elif 'cls' in model_name:
 720 |             task = 'classify'
 721 |         elif 'obb' in model_name:
 722 |             task = 'obb'
 723 |         
 724 |         # Get model info
 725 |         return {
 726 |             "model_name": model_name,
 727 |             "task": task,
 728 |             "yaml": str(model.yaml),
 729 |             "pt_path": str(model.pt_path) if hasattr(model, 'pt_path') else None,
 730 |             "class_names": model.names
 731 |         }
 732 |     except Exception as e:
 733 |         return {"error": f"Failed to get model info: {str(e)}"}
 734 | 
 735 | @mcp.tool()
 736 | def list_available_models() -> List[str]:
 737 |     """List available YOLO models that actually exist on disk in any configured directory"""
 738 |     # Common YOLO model patterns
 739 |     model_patterns = [
 740 |         "yolov11*.pt", 
 741 |         "yolov8*.pt"
 742 |     ]
 743 |     
 744 |     # Find all existing models in all configured directories
 745 |     available_models = set()
 746 |     for directory in CONFIG["model_dirs"]:
 747 |         if not os.path.exists(directory):
 748 |             continue
 749 |             
 750 |         # Check for model files directly
 751 |         for filename in os.listdir(directory):
 752 |             if filename.endswith(".pt") and any(
 753 |                 fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
 754 |             ):
 755 |                 available_models.add(filename)
 756 |     
 757 |     # Convert to sorted list
 758 |     result = sorted(list(available_models))
 759 |     
 760 |     if not result:
 761 |         print("Warning: No model files found in configured directories.")
 762 |         return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
 763 |     
 764 |     return result
 765 | 
 766 | 
 767 | 
 768 | # Camera detection background thread
 769 | camera_thread = None
 770 | camera_running = False
 771 | detection_results = []
 772 | 
 773 | def camera_detection_thread(model_name, confidence, fps_limit=30, camera_id=0):
 774 |     """Background thread for camera detection"""
 775 |     global camera_running, detection_results
 776 |     
 777 |     # Load model
 778 |     try:
 779 |         with redirect_stdout_to_stderr():
 780 |             model = get_model(model_name)
 781 |         logger.info(f"Model {model_name} loaded successfully")
 782 |     except Exception as e:
 783 |         logger.error(f"Error loading model: {str(e)}")
 784 |         camera_running = False
 785 |         detection_results.append({
 786 |             "timestamp": time.time(),
 787 |             "error": f"Failed to load model: {str(e)}",
 788 |             "detections": []
 789 |         })
 790 |         return
 791 |     
 792 |     # Rest of the function...
 793 |     # Try to open camera with multiple attempts and multiple camera IDs if necessary
 794 |     cap = None
 795 |     error_message = ""
 796 |     
 797 |     # Try camera IDs from 0 to 2
 798 |     for cam_id in range(3):
 799 |         try:
 800 |             logger.info(f"Attempting to open camera with ID {cam_id}...")
 801 |             cap = cv2.VideoCapture(cam_id)
 802 |             if cap.isOpened():
 803 |                 logger.info(f"Successfully opened camera {cam_id}")
 804 |                 break
 805 |         except Exception as e:
 806 |             error_message = f"Error opening camera {cam_id}: {str(e)}"
 807 |             logger.error(error_message)
 808 |     
 809 |     # Check if any camera was successfully opened
 810 |     if cap is None or not cap.isOpened():
 811 |         logger.error("Error: Could not open any camera.")
 812 |         camera_running = False
 813 |         detection_results.append({
 814 |             "timestamp": time.time(),
 815 |             "error": "Failed to open camera. Make sure camera is connected and not in use by another application.",
 816 |             "camera_status": "unavailable",
 817 |             "detections": []
 818 |         })
 819 |         return
 820 |     
 821 |     # Get camera properties for diagnostics
 822 |     width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
 823 |     height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
 824 |     fps = cap.get(cv2.CAP_PROP_FPS)
 825 |     
 826 |     logger.info(f"Camera properties: {width}x{height} at {fps} FPS")
 827 |     
 828 |     # Calculate frame interval based on fps_limit
 829 |     frame_interval = 1.0 / fps_limit
 830 |     frame_count = 0
 831 |     error_count = 0
 832 |     
 833 |     while camera_running:
 834 |         start_time = time.time()
 835 |         
 836 |         try:
 837 |             ret, frame = cap.read()
 838 |             if not ret:
 839 |                 logger.warning(f"Error: Failed to capture frame (attempt {error_count+1}).")
 840 |                 error_count += 1
 841 |                 
 842 |                 # Add error to detection results
 843 |                 detection_results.append({
 844 |                     "timestamp": time.time(),
 845 |                     "error": f"Failed to capture frame (attempt {error_count})",
 846 |                     "camera_status": "error",
 847 |                     "detections": []
 848 |                 })
 849 |                 
 850 |                 # If we have consistent failures, try to restart the camera
 851 |                 if error_count >= 5:
 852 |                     logger.warning("Too many frame capture errors, attempting to restart camera...")
 853 |                     cap.release()
 854 |                     time.sleep(1)
 855 |                     cap = cv2.VideoCapture(camera_id)
 856 |                     error_count = 0
 857 |                     if not cap.isOpened():
 858 |                         logger.error("Failed to reopen camera after errors.")
 859 |                         break
 860 |                 
 861 |                 time.sleep(1)  # Wait before trying again
 862 |                 continue
 863 |             
 864 |             # Reset error count on successful frame capture
 865 |             error_count = 0
 866 |             frame_count += 1
 867 |             
 868 |             # Perform detection on frame
 869 |             with redirect_stdout_to_stderr():  # Add this context manager
 870 |                 results = model.predict(frame, conf=confidence)
 871 |             
 872 |             # Update detection results (only keep the last 10)
 873 |             if len(detection_results) >= 10:
 874 |                 detection_results.pop(0)
 875 |                 
 876 |             # Format results
 877 |             for result in results:
 878 |                 boxes = result.boxes
 879 |                 detections = []
 880 |                 
 881 |                 for i in range(len(boxes)):
 882 |                     box = boxes[i]
 883 |                     x1, y1, x2, y2 = box.xyxy[0].tolist()
 884 |                     conf = float(box.conf[0])
 885 |                     class_id = int(box.cls[0])
 886 |                     class_name = result.names[class_id]
 887 |                     
 888 |                     detections.append({
 889 |                         "box": [x1, y1, x2, y2],
 890 |                         "confidence": conf,
 891 |                         "class_id": class_id,
 892 |                         "class_name": class_name
 893 |                     })
 894 |                 
 895 |                 detection_results.append({
 896 |                     "timestamp": time.time(),
 897 |                     "frame_count": frame_count,
 898 |                     "detections": detections,
 899 |                     "camera_status": "running",
 900 |                     "image_shape": result.orig_shape
 901 |                 })
 902 |             
 903 |             # Log occasional status
 904 |             if frame_count % 30 == 0:
 905 |                 logger.info(f"Camera running: processed {frame_count} frames")
 906 |                 detection_count = sum(len(r.get("detections", [])) for r in detection_results if "detections" in r)
 907 |                 logger.info(f"Total detections in current buffer: {detection_count}")
 908 |             
 909 |             # Limit FPS by waiting if necessary
 910 |             elapsed = time.time() - start_time
 911 |             if elapsed < frame_interval:
 912 |                 time.sleep(frame_interval - elapsed)
 913 |                 
 914 |         except Exception as e:
 915 |             logger.error(f"Error in camera thread: {str(e)}")
 916 |             detection_results.append({
 917 |                 "timestamp": time.time(),
 918 |                 "error": f"Exception in camera processing: {str(e)}",
 919 |                 "camera_status": "error",
 920 |                 "detections": []
 921 |             })
 922 |             time.sleep(1)  # Wait before continuing
 923 |     
 924 |     # Clean up
 925 |     logger.info("Shutting down camera...")
 926 |     if cap is not None:
 927 |         cap.release()
 928 | 
 929 | @mcp.tool()
 930 | def start_camera_detection(
 931 |     model_name: str = "yolov8n.pt",
 932 |     confidence: float = 0.25,
 933 |     camera_id: int = 0
 934 | ) -> Dict[str, Any]:
 935 |     """
 936 |     Start realtime object detection using the computer's camera
 937 |     
 938 |     Args:
 939 |         model_name: YOLO model name to use
 940 |         confidence: Detection confidence threshold
 941 |         camera_id: Camera device ID (0 is usually the default camera)
 942 |         
 943 |     Returns:
 944 |         Status of camera detection
 945 |     """
 946 |     global camera_thread, camera_running, detection_results, camera_last_access_time
 947 |     
 948 |     # Check if already running
 949 |     if camera_running:
 950 |         # Update last access time
 951 |         camera_last_access_time = time.time()
 952 |         return {"status": "success", "message": "Camera detection is already running"}
 953 |     
 954 |     # Clear previous results
 955 |     detection_results = []
 956 |     
 957 |     # First, try to check if OpenCV is properly installed
 958 |     try:
 959 |         cv2_version = cv2.__version__
 960 |         logger.info(f"OpenCV version: {cv2_version}")
 961 |     except Exception as e:
 962 |         logger.error(f"OpenCV not properly installed: {str(e)}")
 963 |         return {
 964 |             "status": "error",
 965 |             "message": f"OpenCV not properly installed: {str(e)}",
 966 |             "solution": "Please check OpenCV installation"
 967 |         }
 968 |     
 969 |     # Start detection thread
 970 |     camera_running = True
 971 |     camera_last_access_time = time.time()  # Update access time
 972 |     camera_thread = threading.Thread(
 973 |         target=camera_detection_thread,
 974 |         args=(model_name, confidence, 30, camera_id),
 975 |         daemon=True
 976 |     )
 977 |     camera_thread.start()
 978 |     
 979 |     # Add initial status to detection results
 980 |     detection_results.append({
 981 |         "timestamp": time.time(),
 982 |         "system_info": {
 983 |             "os": platform.system() if 'platform' in globals() else "Unknown",
 984 |             "opencv_version": cv2.__version__,
 985 |             "camera_id": camera_id
 986 |         },
 987 |         "camera_status": "starting",
 988 |         "detections": []
 989 |     })
 990 |     
 991 |     return {
 992 |         "status": "success",
 993 |         "message": f"Started camera detection using model {model_name}",
 994 |         "model": model_name,
 995 |         "confidence": confidence,
 996 |         "camera_id": camera_id,
 997 |         "auto_shutdown": f"Camera will auto-shutdown after {CAMERA_INACTIVITY_TIMEOUT} seconds of inactivity",
 998 |         "note": "If camera doesn't work, try different camera_id values (0, 1, or 2)"
 999 |     }
1000 | 
1001 | 
1002 | @mcp.tool()
1003 | def stop_camera_detection() -> Dict[str, Any]:
1004 |     """
1005 |     Stop realtime camera detection
1006 |     
1007 |     Returns:
1008 |         Status message
1009 |     """
1010 |     global camera_running
1011 |     
1012 |     if not camera_running:
1013 |         return {"status": "error", "message": "Camera detection is not running"}
1014 |     
1015 |     logger.info("Stopping camera detection by user request")
1016 |     camera_running = False
1017 |     
1018 |     # Wait for thread to terminate
1019 |     if camera_thread and camera_thread.is_alive():
1020 |         camera_thread.join(timeout=2.0)
1021 |     
1022 |     return {
1023 |         "status": "success",
1024 |         "message": "Stopped camera detection"
1025 |     }
1026 | 
1027 | 
1028 | @mcp.tool()
1029 | def get_camera_detections() -> Dict[str, Any]:
1030 |     """
1031 |     Get the latest detections from the camera
1032 |     
1033 |     Returns:
1034 |         Dictionary with recent detections
1035 |     """
1036 |     global detection_results, camera_thread, camera_last_access_time
1037 |     
1038 |     # Update the last access time whenever this function is called
1039 |     if camera_running:
1040 |         camera_last_access_time = time.time()
1041 |     
1042 |     # Check if thread is alive
1043 |     thread_alive = camera_thread is not None and camera_thread.is_alive()
1044 |     
1045 |     # If camera_running is True but thread is dead, there's an issue
1046 |     if camera_running and not thread_alive:
1047 |         return {
1048 |             "status": "error", 
1049 |             "message": "Camera thread has stopped unexpectedly",
1050 |             "is_running": False,
1051 |             "camera_status": "error",
1052 |             "thread_alive": thread_alive,
1053 |             "detections": detection_results,
1054 |             "count": len(detection_results),
1055 |             "solution": "Please try restart the camera with a different camera_id"
1056 |         }
1057 |     
1058 |     if not camera_running:
1059 |         return {
1060 |             "status": "error", 
1061 |             "message": "Camera detection is not running",
1062 |             "is_running": False,
1063 |             "camera_status": "stopped"
1064 |         }
1065 |     
1066 |     # Check for errors in detection results
1067 |     errors = [result.get("error") for result in detection_results if "error" in result]
1068 |     recent_errors = errors[-5:] if errors else []
1069 |     
1070 |     # Count actual detections
1071 |     detection_count = sum(len(result.get("detections", [])) for result in detection_results if "detections" in result)
1072 |     
1073 |     return {
1074 |         "status": "success",
1075 |         "is_running": camera_running,
1076 |         "thread_alive": thread_alive,
1077 |         "detections": detection_results,
1078 |         "count": len(detection_results),
1079 |         "total_detections": detection_count,
1080 |         "recent_errors": recent_errors if recent_errors else None,
1081 |         "camera_status": "error" if recent_errors else "running",
1082 |         "inactivity_timeout": {
1083 |             "seconds_remaining": int(CAMERA_INACTIVITY_TIMEOUT - (time.time() - camera_last_access_time)),
1084 |             "last_access": camera_last_access_time
1085 |         }
1086 |     }
1087 | 
1088 | def cleanup_resources():
1089 |     """Clean up resources when the server is shutting down"""
1090 |     global camera_running
1091 |     
1092 |     logger.info("Cleaning up resources...")
1093 |     
1094 |     # Stop camera if it's running
1095 |     if camera_running:
1096 |         logger.info("Shutting down camera during server exit")
1097 |         camera_running = False
1098 |         
1099 |         # Give the camera thread a moment to clean up
1100 |         if camera_thread and camera_thread.is_alive():
1101 |             camera_thread.join(timeout=2.0)
1102 |     
1103 |     logger.info("Cleanup complete")
1104 | atexit.register(cleanup_resources)
1105 | 
1106 | def signal_handler(sig, frame):
1107 |     """Handle termination signals"""
1108 |     logger.info(f"Received signal {sig}, shutting down...")
1109 |     cleanup_resources()
1110 |     sys.exit(0)
1111 | 
1112 | signal.signal(signal.SIGINT, signal_handler)
1113 | signal.signal(signal.SIGTERM, signal_handler)
1114 | 
1115 | def start_watchdog():
1116 |     """Start the camera watchdog thread"""
1117 |     watchdog = threading.Thread(
1118 |         target=camera_watchdog_thread,
1119 |         daemon=True
1120 |     )
1121 |     watchdog.start()
1122 |     return watchdog
1123 | 
1124 | @mcp.tool()
1125 | def comprehensive_image_analysis(
1126 |     image_path: str,
1127 |     confidence: float = 0.25,
1128 |     save_results: bool = False
1129 | ) -> Dict[str, Any]:
1130 |     """
1131 |     Perform comprehensive analysis on an image by combining multiple model results
1132 |     
1133 |     Args:
1134 |         image_path: Path to the image file
1135 |         confidence: Detection confidence threshold
1136 |         save_results: Whether to save results to disk
1137 |         
1138 |     Returns:
1139 |         Dictionary containing comprehensive analysis results
1140 |     """
1141 |     try:
1142 |         if not os.path.exists(image_path):
1143 |             return {"error": f"Image file not found: {image_path}"}
1144 |         
1145 |         # Load image
1146 |         image = load_image(image_path, is_path=True)
1147 |         
1148 |         analysis_results = {}
1149 |         
1150 |         # 1. Object detection
1151 |         object_model = get_model("yolov11n.pt")
1152 |         with redirect_stdout_to_stderr():  # Add this context manager
1153 |             object_results = object_model.predict(image, conf=confidence, save=save_results)
1154 |         
1155 |         # Process object detection results
1156 |         detected_objects = []
1157 |         for result in object_results:
1158 |             boxes = result.boxes
1159 |             for i in range(len(boxes)):
1160 |                 box = boxes[i]
1161 |                 conf = float(box.conf[0])
1162 |                 class_id = int(box.cls[0])
1163 |                 class_name = result.names[class_id]
1164 |                 detected_objects.append({
1165 |                     "class_name": class_name,
1166 |                     "confidence": conf
1167 |                 })
1168 |         analysis_results["objects"] = detected_objects
1169 |         
1170 |         # 2. Scene classification
1171 |         try:
1172 |             cls_model = get_model("yolov8n-cls.pt")
1173 |             with redirect_stdout_to_stderr():  # Add this context manager
1174 |                 cls_results = cls_model.predict(image, save=False)
1175 |             
1176 |             scene_classifications = []
1177 |             for result in cls_results:
1178 |                 if hasattr(result, 'probs') and result.probs is not None:
1179 |                     probs = result.probs
1180 |                     top_indices = probs.top5
1181 |                     top_probs = probs.top5conf.tolist()
1182 |                     top_classes = [result.names[idx] for idx in top_indices]
1183 |                     
1184 |                     for idx, name, prob in zip(top_indices[:3], top_classes[:3], top_probs[:3]):
1185 |                         scene_classifications.append({
1186 |                             "class_name": name,
1187 |                             "probability": float(prob)
1188 |                         })
1189 |             analysis_results["scene"] = scene_classifications
1190 |         except Exception as e:
1191 |             analysis_results["scene_error"] = str(e)
1192 |         
1193 |         # 3. Human pose detection
1194 |         try:
1195 |             pose_model = get_model("yolov8n-pose.pt")
1196 |             with redirect_stdout_to_stderr():  # Add this context manager
1197 |                 pose_results = pose_model.predict(image, conf=confidence, save=False)
1198 |             
1199 |             detected_poses = []
1200 |             for result in pose_results:
1201 |                 if hasattr(result, 'keypoints') and result.keypoints is not None:
1202 |                     boxes = result.boxes
1203 |                     keypoints = result.keypoints
1204 |                     
1205 |                     for i in range(len(boxes)):
1206 |                         box = boxes[i]
1207 |                         conf = float(box.conf[0])
1208 |                         detected_poses.append({
1209 |                             "person_confidence": conf,
1210 |                             "has_keypoints": keypoints[i].data.shape[1] if keypoints else 0
1211 |                         })
1212 |             analysis_results["poses"] = detected_poses
1213 |         except Exception as e:
1214 |             analysis_results["pose_error"] = str(e)
1215 |         
1216 |         # Rest of the function remains the same...
1217 |         # 4. Comprehensive task description
1218 |         tasks = []
1219 |         
1220 |         # Detect main objects
1221 |         main_objects = [obj["class_name"] for obj in detected_objects if obj["confidence"] > 0.5]
1222 |         if "person" in main_objects:
1223 |             tasks.append("Person Detection")
1224 |         
1225 |         # Check for weapon objects
1226 |         weapon_objects = ["sword", "knife", "katana", "gun", "pistol", "rifle"]
1227 |         weapons = [obj for obj in main_objects if any(weapon in obj.lower() for weapon in weapon_objects)]
1228 |         if weapons:
1229 |             tasks.append(f"Weapon Detection ({', '.join(weapons)})")
1230 |         
1231 |         # Count people
1232 |         person_count = main_objects.count("person")
1233 |         if person_count > 0:
1234 |             tasks.append(f"Person Count ({person_count} people)")
1235 |         
1236 |         # Pose analysis
1237 |         if "poses" in analysis_results and analysis_results["poses"]:
1238 |             tasks.append("Human Pose Analysis")
1239 |         
1240 |         # Scene classification
1241 |         if "scene" in analysis_results and analysis_results["scene"]:
1242 |             scene_types = [scene["class_name"] for scene in analysis_results["scene"][:2]]
1243 |             tasks.append(f"Scene Classification ({', '.join(scene_types)})")
1244 |         
1245 |         analysis_results["identified_tasks"] = tasks
1246 |         
1247 |         # Return comprehensive results
1248 |         return {
1249 |             "status": "success",
1250 |             "image_path": image_path,
1251 |             "analysis": analysis_results,
1252 |             "summary": "Tasks identified in the image: " + ", ".join(tasks) if tasks else "No clear tasks identified"
1253 |         }
1254 |     except Exception as e:
1255 |         return {
1256 |             "status": "error",
1257 |             "image_path": image_path,
1258 |             "error": f"Comprehensive analysis failed: {str(e)}"
1259 |         }
1260 | 
1261 | 
1262 | @mcp.tool()
1263 | def analyze_image_from_path(
1264 |     image_path: str,
1265 |     model_name: str = "yolov8n.pt",
1266 |     confidence: float = 0.25,
1267 |     save_results: bool = False
1268 | ) -> Dict[str, Any]:
1269 |     """
1270 |     Analyze image from file path using YOLO
1271 |     
1272 |     Args:
1273 |         image_path: Path to the image file
1274 |         model_name: YOLO model name
1275 |         confidence: Detection confidence threshold
1276 |         save_results: Whether to save results to disk
1277 |         
1278 |     Returns:
1279 |         Dictionary containing detection results
1280 |     """
1281 |     try:
1282 |         # Call detect_objects function with is_path=True
1283 |         return detect_objects(
1284 |             image_data=image_path,
1285 |             model_name=model_name,
1286 |             confidence=confidence,
1287 |             save_results=save_results,
1288 |             is_path=True
1289 |         )
1290 |     except Exception as e:
1291 |         return {
1292 |             "error": f"Failed to analyze image: {str(e)}",
1293 |             "image_path": image_path
1294 |         }
1295 | 
1296 | @mcp.tool()
1297 | def test_connection() -> Dict[str, Any]:
1298 |     """
1299 |     Test if YOLO MCP service is running properly
1300 |     
1301 |     Returns:
1302 |         Status information and available tools
1303 |     """
1304 |     return {
1305 |         "status": "YOLO MCP service is running normally",
1306 |         "available_models": list_available_models(),
1307 |         "available_tools": [
1308 |             "list_available_models", "detect_objects", "segment_objects", 
1309 |             "classify_image", "detect_poses", "detect_oriented_objects", 
1310 |             "track_objects", "train_model", "validate_model", 
1311 |             "export_model", "start_camera_detection", "stop_camera_detection", 
1312 |             "get_camera_detections", "test_connection",
1313 |             # Additional tools
1314 |             "analyze_image_from_path",
1315 |             "comprehensive_image_analysis"
1316 |         ],
1317 |         "new_features": [
1318 |             "Support for loading images directly from file paths",
1319 |             "Support for comprehensive image analysis with task identification",
1320 |             "All detection functions support both file paths and base64 data"
1321 |         ]
1322 |     }
1323 | 
1324 | # Modify the main execution section
1325 | if __name__ == "__main__":
1326 |     logger.info("Starting YOLO MCP service")
1327 |     
1328 |     # Start the camera watchdog thread
1329 |     watchdog_thread = start_watchdog()
1330 |     
1331 |     # Initialize and run server
1332 |     mcp.run(transport='stdio')
```
Page 1/2FirstPrevNextLast