#
tokens: 21236/50000 1/8 files (page 2/2)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 2 of 2. Use http://codebase.md/gongrzhe/yolo-mcp-server?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .gitignore
├── LICENSE
├── mcp-config.json
├── Readme.md
├── requirements.txt
├── server_cli.py
├── server_combine_terminal.py
├── server.py
└── setup.py
```

# Files

--------------------------------------------------------------------------------
/server_cli.py:
--------------------------------------------------------------------------------

```python
   1 | # server.py - CLI version
   2 | import fnmatch
   3 | import os
   4 | import base64
   5 | import cv2
   6 | import time
   7 | import threading
   8 | import subprocess
   9 | import json
  10 | import tempfile
  11 | import platform
  12 | from io import BytesIO
  13 | from typing import List, Dict, Any, Optional, Union
  14 | import numpy as np
  15 | from PIL import Image
  16 | 
  17 | from mcp.server.fastmcp import FastMCP
  18 | 
  19 | # Set up logging configuration
  20 | import os.path
  21 | import sys
  22 | import logging
  23 | import contextlib
  24 | import signal
  25 | import atexit
  26 | 
  27 | logging.basicConfig(
  28 |     level=logging.INFO,
  29 |     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  30 |     handlers=[
  31 |         logging.FileHandler("yolo_service.log"),
  32 |         logging.StreamHandler(sys.stderr)
  33 |     ]
  34 | )
  35 | camera_startup_status = None  # Will store error details if startup fails
  36 | camera_last_error = None
  37 | logger = logging.getLogger('yolo_service')
  38 | 
  39 | # Global variables for camera control
  40 | camera_running = False
  41 | camera_thread = None
  42 | detection_results = []
  43 | camera_last_access_time = 0
  44 | CAMERA_INACTIVITY_TIMEOUT = 60  # Auto-shutdown after 60 seconds of inactivity
  45 | 
  46 | def camera_watchdog_thread():
  47 |     """Monitor thread that auto-stops the camera after inactivity"""
  48 |     global camera_running, camera_last_access_time
  49 |     
  50 |     logger.info("Camera watchdog thread started")
  51 |     
  52 |     while True:
  53 |         # Sleep for a short time to avoid excessive CPU usage
  54 |         time.sleep(5)
  55 |         
  56 |         # Check if camera is running
  57 |         if camera_running:
  58 |             current_time = time.time()
  59 |             elapsed_time = current_time - camera_last_access_time
  60 |             
  61 |             # If no access for more than the timeout, auto-stop
  62 |             if elapsed_time > CAMERA_INACTIVITY_TIMEOUT:
  63 |                 logger.info(f"Auto-stopping camera after {elapsed_time:.1f} seconds of inactivity")
  64 |                 stop_camera_detection()
  65 |         else:
  66 |             # If camera is not running, no need to check frequently
  67 |             time.sleep(10)
  68 | 
  69 | def load_image(image_source, is_path=False):
  70 |     """
  71 |     Load image from file path or base64 data
  72 |     
  73 |     Args:
  74 |         image_source: File path or base64 encoded image data
  75 |         is_path: Whether image_source is a file path
  76 |         
  77 |     Returns:
  78 |         PIL Image object
  79 |     """
  80 |     try:
  81 |         if is_path:
  82 |             # Load image from file path
  83 |             if os.path.exists(image_source):
  84 |                 return Image.open(image_source)
  85 |             else:
  86 |                 raise FileNotFoundError(f"Image file not found: {image_source}")
  87 |         else:
  88 |             # Load image from base64 data
  89 |             image_bytes = base64.b64decode(image_source)
  90 |             return Image.open(BytesIO(image_bytes))
  91 |     except Exception as e:
  92 |         raise ValueError(f"Failed to load image: {str(e)}")
  93 | 
  94 | # New function to run YOLO CLI commands
  95 | def run_yolo_cli(command_args, capture_output=True, timeout=60):
  96 |     """
  97 |     Run YOLO CLI command and return the results
  98 |     
  99 |     Args:
 100 |         command_args: List of command arguments to pass to yolo CLI
 101 |         capture_output: Whether to capture and return command output
 102 |         timeout: Command timeout in seconds
 103 |         
 104 |     Returns:
 105 |         Command output or success status
 106 |     """
 107 |     # Build the complete command
 108 |     cmd = ["yolo"] + command_args
 109 |     
 110 |     # Log the command
 111 |     logger.info(f"Running YOLO CLI command: {' '.join(cmd)}")
 112 |     
 113 |     try:
 114 |         # Run the command
 115 |         result = subprocess.run(
 116 |             cmd,
 117 |             capture_output=capture_output,
 118 |             text=True,
 119 |             check=False,  # Don't raise exception on non-zero exit
 120 |             timeout=timeout
 121 |         )
 122 |         
 123 |         # Check for errors
 124 |         if result.returncode != 0:
 125 |             logger.error(f"YOLO CLI command failed with code {result.returncode}")
 126 |             logger.error(f"stderr: {result.stderr}")
 127 |             return {
 128 |                 "success": False,
 129 |                 "error": result.stderr,
 130 |                 "command": " ".join(cmd),
 131 |                 "returncode": result.returncode
 132 |             }
 133 |         
 134 |         # Return the result
 135 |         if capture_output:
 136 |             return {
 137 |                 "success": True,
 138 |                 "stdout": result.stdout,
 139 |                 "stderr": result.stderr,
 140 |                 "command": " ".join(cmd)
 141 |             }
 142 |         else:
 143 |             return {"success": True, "command": " ".join(cmd)}
 144 |         
 145 |     except subprocess.TimeoutExpired:
 146 |         logger.error(f"YOLO CLI command timed out after {timeout} seconds")
 147 |         return {
 148 |             "success": False,
 149 |             "error": f"Command timed out after {timeout} seconds",
 150 |             "command": " ".join(cmd)
 151 |         }
 152 |     except Exception as e:
 153 |         logger.error(f"Error running YOLO CLI command: {str(e)}")
 154 |         return {
 155 |             "success": False,
 156 |             "error": str(e),
 157 |             "command": " ".join(cmd)
 158 |         }
 159 | 
 160 | # Create MCP server
 161 | mcp = FastMCP("YOLO_Service")
 162 | 
 163 | # Global configuration
 164 | CONFIG = {
 165 |     "model_dirs": [
 166 |         ".",  # Current directory
 167 |         "./models",  # Models subdirectory
 168 |         os.path.join(os.path.dirname(os.path.abspath(__file__)), "models"),
 169 |     ]
 170 | }
 171 | 
 172 | # Function to save base64 data to temp file
 173 | def save_base64_to_temp(base64_data, prefix="image", suffix=".jpg"):
 174 |     """Save base64 encoded data to a temporary file and return the path"""
 175 |     try:
 176 |         # Create a temporary file
 177 |         fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix=prefix)
 178 |         
 179 |         # Decode base64 data
 180 |         image_data = base64.b64decode(base64_data)
 181 |         
 182 |         # Write data to file
 183 |         with os.fdopen(fd, 'wb') as temp_file:
 184 |             temp_file.write(image_data)
 185 |             
 186 |         return temp_path
 187 |     except Exception as e:
 188 |         logger.error(f"Error saving base64 to temp file: {str(e)}")
 189 |         raise ValueError(f"Failed to save base64 data: {str(e)}")
 190 | 
 191 | @mcp.tool()
 192 | def get_model_directories() -> Dict[str, Any]:
 193 |     """Get information about configured model directories and available models"""
 194 |     directories = []
 195 |     
 196 |     for directory in CONFIG["model_dirs"]:
 197 |         dir_info = {
 198 |             "path": directory,
 199 |             "exists": os.path.exists(directory),
 200 |             "is_directory": os.path.isdir(directory) if os.path.exists(directory) else False,
 201 |             "models": []
 202 |         }
 203 |         
 204 |         if dir_info["exists"] and dir_info["is_directory"]:
 205 |             for filename in os.listdir(directory):
 206 |                 if filename.endswith(".pt"):
 207 |                     dir_info["models"].append(filename)
 208 |         
 209 |         directories.append(dir_info)
 210 |     
 211 |     return {
 212 |         "configured_directories": CONFIG["model_dirs"],
 213 |         "directory_details": directories,
 214 |         "available_models": list_available_models(),
 215 |         "loaded_models": []  # No longer track loaded models with CLI approach
 216 |     }
 217 | 
 218 | @mcp.tool()
 219 | def detect_objects(
 220 |     image_data: str,
 221 |     model_name: str = "yolov8n.pt",
 222 |     confidence: float = 0.25,
 223 |     save_results: bool = False,
 224 |     is_path: bool = False
 225 | ) -> Dict[str, Any]:
 226 |     """
 227 |     Detect objects in an image using YOLO CLI
 228 |     
 229 |     Args:
 230 |         image_data: Base64 encoded image or file path (if is_path=True)
 231 |         model_name: YOLO model name
 232 |         confidence: Detection confidence threshold
 233 |         save_results: Whether to save results to disk
 234 |         is_path: Whether image_data is a file path
 235 |         
 236 |     Returns:
 237 |         Dictionary containing detection results
 238 |     """
 239 |     try:
 240 |         # Determine source path
 241 |         if is_path:
 242 |             source_path = image_data
 243 |             if not os.path.exists(source_path):
 244 |                 return {
 245 |                     "error": f"Image file not found: {source_path}",
 246 |                     "source": source_path
 247 |                 }
 248 |         else:
 249 |             # Save base64 data to temp file
 250 |             source_path = save_base64_to_temp(image_data)
 251 |         
 252 |         # Determine full model path
 253 |         model_path = None
 254 |         for directory in CONFIG["model_dirs"]:
 255 |             potential_path = os.path.join(directory, model_name)
 256 |             if os.path.exists(potential_path):
 257 |                 model_path = potential_path
 258 |                 break
 259 |         
 260 |         if model_path is None:
 261 |             available = list_available_models()
 262 |             available_str = ", ".join(available) if available else "none"
 263 |             return {
 264 |                 "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
 265 |                 "source": image_data if is_path else "base64_image"
 266 |             }
 267 |         
 268 |         # Setup output directory if saving results
 269 |         output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
 270 |         if save_results and not os.path.exists(output_dir):
 271 |             os.makedirs(output_dir)
 272 |         
 273 |         # Build YOLO CLI command
 274 |         cmd_args = [
 275 |             "detect",  # Task
 276 |             "predict",  # Mode
 277 |             f"model={model_path}",
 278 |             f"source={source_path}",
 279 |             f"conf={confidence}",
 280 |             "format=json",  # Request JSON output for parsing
 281 |         ]
 282 |         
 283 |         if save_results:
 284 |             cmd_args.append(f"project={output_dir}")
 285 |             cmd_args.append("save=True")
 286 |         else:
 287 |             cmd_args.append("save=False")
 288 |         
 289 |         # Run YOLO CLI command
 290 |         result = run_yolo_cli(cmd_args)
 291 |         
 292 |         # Clean up temp file if we created one
 293 |         if not is_path:
 294 |             try:
 295 |                 os.remove(source_path)
 296 |             except Exception as e:
 297 |                 logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
 298 |         
 299 |         # Check for command success
 300 |         if not result["success"]:
 301 |             return {
 302 |                 "error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
 303 |                 "command": result.get("command", ""),
 304 |                 "source": image_data if is_path else "base64_image"
 305 |             }
 306 |         
 307 |         # Parse JSON output from stdout
 308 |         try:
 309 |             # Try to find JSON in the output
 310 |             json_start = result["stdout"].find("{")
 311 |             json_end = result["stdout"].rfind("}")
 312 |             
 313 |             if json_start >= 0 and json_end > json_start:
 314 |                 json_str = result["stdout"][json_start:json_end+1]
 315 |                 detection_data = json.loads(json_str)
 316 |             else:
 317 |                 # If no JSON found, create a basic response with info from stderr
 318 |                 return {
 319 |                     "results": [],
 320 |                     "model_used": model_name,
 321 |                     "total_detections": 0,
 322 |                     "source": image_data if is_path else "base64_image",
 323 |                     "command_output": result["stderr"]
 324 |                 }
 325 |             
 326 |             # Format results
 327 |             formatted_results = []
 328 |             
 329 |             # Parse detection data from YOLO JSON output
 330 |             if "predictions" in detection_data:
 331 |                 detections = []
 332 |                 
 333 |                 for pred in detection_data["predictions"]:
 334 |                     # Extract box coordinates
 335 |                     box = pred.get("box", {})
 336 |                     x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
 337 |                     
 338 |                     # Extract class information
 339 |                     confidence = pred.get("confidence", 0)
 340 |                     class_name = pred.get("name", "unknown")
 341 |                     class_id = pred.get("class", -1)
 342 |                     
 343 |                     detections.append({
 344 |                         "box": [x1, y1, x2, y2],
 345 |                         "confidence": confidence,
 346 |                         "class_id": class_id,
 347 |                         "class_name": class_name
 348 |                     })
 349 |                 
 350 |                 # Get image dimensions if available
 351 |                 image_shape = [
 352 |                     detection_data.get("width", 0),
 353 |                     detection_data.get("height", 0)
 354 |                 ]
 355 |                 
 356 |                 formatted_results.append({
 357 |                     "detections": detections,
 358 |                     "image_shape": image_shape
 359 |                 })
 360 |             
 361 |             return {
 362 |                 "results": formatted_results,
 363 |                 "model_used": model_name,
 364 |                 "total_detections": sum(len(r["detections"]) for r in formatted_results),
 365 |                 "source": image_data if is_path else "base64_image",
 366 |                 "save_dir": output_dir if save_results else None
 367 |             }
 368 |             
 369 |         except json.JSONDecodeError as e:
 370 |             logger.error(f"Failed to parse JSON from YOLO output: {e}")
 371 |             logger.error(f"Output: {result['stdout']}")
 372 |             
 373 |             return {
 374 |                 "error": f"Failed to parse YOLO results: {str(e)}",
 375 |                 "command": result.get("command", ""),
 376 |                 "source": image_data if is_path else "base64_image",
 377 |                 "stdout": result.get("stdout", ""),
 378 |                 "stderr": result.get("stderr", "")
 379 |             }
 380 |             
 381 |     except Exception as e:
 382 |         logger.error(f"Error in detect_objects: {str(e)}")
 383 |         return {
 384 |             "error": f"Failed to detect objects: {str(e)}",
 385 |             "source": image_data if is_path else "base64_image"
 386 |         }
 387 | 
 388 | @mcp.tool()
 389 | def segment_objects(
 390 |     image_data: str,
 391 |     model_name: str = "yolov11n-seg.pt",
 392 |     confidence: float = 0.25,
 393 |     save_results: bool = False,
 394 |     is_path: bool = False
 395 | ) -> Dict[str, Any]:
 396 |     """
 397 |     Perform instance segmentation on an image using YOLO CLI
 398 |     
 399 |     Args:
 400 |         image_data: Base64 encoded image or file path (if is_path=True)
 401 |         model_name: YOLO segmentation model name
 402 |         confidence: Detection confidence threshold
 403 |         save_results: Whether to save results to disk
 404 |         is_path: Whether image_data is a file path
 405 |         
 406 |     Returns:
 407 |         Dictionary containing segmentation results
 408 |     """
 409 |     try:
 410 |         # Determine source path
 411 |         if is_path:
 412 |             source_path = image_data
 413 |             if not os.path.exists(source_path):
 414 |                 return {
 415 |                     "error": f"Image file not found: {source_path}",
 416 |                     "source": source_path
 417 |                 }
 418 |         else:
 419 |             # Save base64 data to temp file
 420 |             source_path = save_base64_to_temp(image_data)
 421 |         
 422 |         # Determine full model path
 423 |         model_path = None
 424 |         for directory in CONFIG["model_dirs"]:
 425 |             potential_path = os.path.join(directory, model_name)
 426 |             if os.path.exists(potential_path):
 427 |                 model_path = potential_path
 428 |                 break
 429 |         
 430 |         if model_path is None:
 431 |             available = list_available_models()
 432 |             available_str = ", ".join(available) if available else "none"
 433 |             return {
 434 |                 "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
 435 |                 "source": image_data if is_path else "base64_image"
 436 |             }
 437 |         
 438 |         # Setup output directory if saving results
 439 |         output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
 440 |         if save_results and not os.path.exists(output_dir):
 441 |             os.makedirs(output_dir)
 442 |         
 443 |         # Build YOLO CLI command
 444 |         cmd_args = [
 445 |             "segment",  # Task
 446 |             "predict",  # Mode
 447 |             f"model={model_path}",
 448 |             f"source={source_path}",
 449 |             f"conf={confidence}",
 450 |             "format=json",  # Request JSON output for parsing
 451 |         ]
 452 |         
 453 |         if save_results:
 454 |             cmd_args.append(f"project={output_dir}")
 455 |             cmd_args.append("save=True")
 456 |         else:
 457 |             cmd_args.append("save=False")
 458 |         
 459 |         # Run YOLO CLI command
 460 |         result = run_yolo_cli(cmd_args)
 461 |         
 462 |         # Clean up temp file if we created one
 463 |         if not is_path:
 464 |             try:
 465 |                 os.remove(source_path)
 466 |             except Exception as e:
 467 |                 logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
 468 |         
 469 |         # Check for command success
 470 |         if not result["success"]:
 471 |             return {
 472 |                 "error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
 473 |                 "command": result.get("command", ""),
 474 |                 "source": image_data if is_path else "base64_image"
 475 |             }
 476 |         
 477 |         # Parse JSON output from stdout
 478 |         try:
 479 |             # Try to find JSON in the output
 480 |             json_start = result["stdout"].find("{")
 481 |             json_end = result["stdout"].rfind("}")
 482 |             
 483 |             if json_start >= 0 and json_end > json_start:
 484 |                 json_str = result["stdout"][json_start:json_end+1]
 485 |                 segmentation_data = json.loads(json_str)
 486 |             else:
 487 |                 # If no JSON found, create a basic response with info from stderr
 488 |                 return {
 489 |                     "results": [],
 490 |                     "model_used": model_name,
 491 |                     "total_segments": 0,
 492 |                     "source": image_data if is_path else "base64_image",
 493 |                     "command_output": result["stderr"]
 494 |                 }
 495 |             
 496 |             # Format results
 497 |             formatted_results = []
 498 |             
 499 |             # Parse segmentation data from YOLO JSON output
 500 |             if "predictions" in segmentation_data:
 501 |                 segments = []
 502 |                 
 503 |                 for pred in segmentation_data["predictions"]:
 504 |                     # Extract box coordinates
 505 |                     box = pred.get("box", {})
 506 |                     x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
 507 |                     
 508 |                     # Extract class information
 509 |                     confidence = pred.get("confidence", 0)
 510 |                     class_name = pred.get("name", "unknown")
 511 |                     class_id = pred.get("class", -1)
 512 |                     
 513 |                     segment = {
 514 |                         "box": [x1, y1, x2, y2],
 515 |                         "confidence": confidence,
 516 |                         "class_id": class_id,
 517 |                         "class_name": class_name
 518 |                     }
 519 |                     
 520 |                     # Extract mask if available
 521 |                     if "mask" in pred:
 522 |                         segment["mask"] = pred["mask"]
 523 |                     
 524 |                     segments.append(segment)
 525 |                 
 526 |                 # Get image dimensions if available
 527 |                 image_shape = [
 528 |                     segmentation_data.get("width", 0),
 529 |                     segmentation_data.get("height", 0)
 530 |                 ]
 531 |                 
 532 |                 formatted_results.append({
 533 |                     "segments": segments,
 534 |                     "image_shape": image_shape
 535 |                 })
 536 |             
 537 |             return {
 538 |                 "results": formatted_results,
 539 |                 "model_used": model_name,
 540 |                 "total_segments": sum(len(r["segments"]) for r in formatted_results),
 541 |                 "source": image_data if is_path else "base64_image",
 542 |                 "save_dir": output_dir if save_results else None
 543 |             }
 544 |             
 545 |         except json.JSONDecodeError as e:
 546 |             logger.error(f"Failed to parse JSON from YOLO output: {e}")
 547 |             logger.error(f"Output: {result['stdout']}")
 548 |             
 549 |             return {
 550 |                 "error": f"Failed to parse YOLO results: {str(e)}",
 551 |                 "command": result.get("command", ""),
 552 |                 "source": image_data if is_path else "base64_image",
 553 |                 "stdout": result.get("stdout", ""),
 554 |                 "stderr": result.get("stderr", "")
 555 |             }
 556 |             
 557 |     except Exception as e:
 558 |         logger.error(f"Error in segment_objects: {str(e)}")
 559 |         return {
 560 |             "error": f"Failed to segment objects: {str(e)}",
 561 |             "source": image_data if is_path else "base64_image"
 562 |         }
 563 | 
 564 | @mcp.tool()
 565 | def classify_image(
 566 |     image_data: str,
 567 |     model_name: str = "yolov11n-cls.pt",
 568 |     top_k: int = 5,
 569 |     save_results: bool = False,
 570 |     is_path: bool = False
 571 | ) -> Dict[str, Any]:
 572 |     """
 573 |     Classify an image using YOLO classification model via CLI
 574 |     
 575 |     Args:
 576 |         image_data: Base64 encoded image or file path (if is_path=True)
 577 |         model_name: YOLO classification model name
 578 |         top_k: Number of top categories to return
 579 |         save_results: Whether to save results to disk
 580 |         is_path: Whether image_data is a file path
 581 |         
 582 |     Returns:
 583 |         Dictionary containing classification results
 584 |     """
 585 |     try:
 586 |         # Determine source path
 587 |         if is_path:
 588 |             source_path = image_data
 589 |             if not os.path.exists(source_path):
 590 |                 return {
 591 |                     "error": f"Image file not found: {source_path}",
 592 |                     "source": source_path
 593 |                 }
 594 |         else:
 595 |             # Save base64 data to temp file
 596 |             source_path = save_base64_to_temp(image_data)
 597 |         
 598 |         # Determine full model path
 599 |         model_path = None
 600 |         for directory in CONFIG["model_dirs"]:
 601 |             potential_path = os.path.join(directory, model_name)
 602 |             if os.path.exists(potential_path):
 603 |                 model_path = potential_path
 604 |                 break
 605 |         
 606 |         if model_path is None:
 607 |             available = list_available_models()
 608 |             available_str = ", ".join(available) if available else "none"
 609 |             return {
 610 |                 "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
 611 |                 "source": image_data if is_path else "base64_image"
 612 |             }
 613 |         
 614 |         # Setup output directory if saving results
 615 |         output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
 616 |         if save_results and not os.path.exists(output_dir):
 617 |             os.makedirs(output_dir)
 618 |         
 619 |         # Build YOLO CLI command
 620 |         cmd_args = [
 621 |             "classify",  # Task
 622 |             "predict",  # Mode
 623 |             f"model={model_path}",
 624 |             f"source={source_path}",
 625 |             "format=json",  # Request JSON output for parsing
 626 |         ]
 627 |         
 628 |         if save_results:
 629 |             cmd_args.append(f"project={output_dir}")
 630 |             cmd_args.append("save=True")
 631 |         else:
 632 |             cmd_args.append("save=False")
 633 |         
 634 |         # Run YOLO CLI command
 635 |         result = run_yolo_cli(cmd_args)
 636 |         
 637 |         # Clean up temp file if we created one
 638 |         if not is_path:
 639 |             try:
 640 |                 os.remove(source_path)
 641 |             except Exception as e:
 642 |                 logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
 643 |         
 644 |         # Check for command success
 645 |         if not result["success"]:
 646 |             return {
 647 |                 "error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
 648 |                 "command": result.get("command", ""),
 649 |                 "source": image_data if is_path else "base64_image"
 650 |             }
 651 |         
 652 |         # Parse JSON output from stdout
 653 |         try:
 654 |             # Try to find JSON in the output
 655 |             json_start = result["stdout"].find("{")
 656 |             json_end = result["stdout"].rfind("}")
 657 |             
 658 |             if json_start >= 0 and json_end > json_start:
 659 |                 json_str = result["stdout"][json_start:json_end+1]
 660 |                 classification_data = json.loads(json_str)
 661 |             else:
 662 |                 # If no JSON found, create a basic response with info from stderr
 663 |                 return {
 664 |                     "results": [],
 665 |                     "model_used": model_name,
 666 |                     "top_k": top_k,
 667 |                     "source": image_data if is_path else "base64_image",
 668 |                     "command_output": result["stderr"]
 669 |                 }
 670 |             
 671 |             # Format results
 672 |             formatted_results = []
 673 |             
 674 |             # Parse classification data from YOLO JSON output
 675 |             if "predictions" in classification_data:
 676 |                 classifications = []
 677 |                 predictions = classification_data["predictions"]
 678 |                 
 679 |                 # Predictions could be an array of classifications
 680 |                 for i, pred in enumerate(predictions[:top_k]):
 681 |                     class_name = pred.get("name", f"class_{i}")
 682 |                     confidence = pred.get("confidence", 0)
 683 |                     
 684 |                     classifications.append({
 685 |                         "class_id": i,
 686 |                         "class_name": class_name,
 687 |                         "probability": confidence
 688 |                     })
 689 |                 
 690 |                 # Get image dimensions if available
 691 |                 image_shape = [
 692 |                     classification_data.get("width", 0),
 693 |                     classification_data.get("height", 0)
 694 |                 ]
 695 |                 
 696 |                 formatted_results.append({
 697 |                     "classifications": classifications,
 698 |                     "image_shape": image_shape
 699 |                 })
 700 |             
 701 |             return {
 702 |                 "results": formatted_results,
 703 |                 "model_used": model_name,
 704 |                 "top_k": top_k,
 705 |                 "source": image_data if is_path else "base64_image",
 706 |                 "save_dir": output_dir if save_results else None
 707 |             }
 708 |             
 709 |         except json.JSONDecodeError as e:
 710 |             logger.error(f"Failed to parse JSON from YOLO output: {e}")
 711 |             logger.error(f"Output: {result['stdout']}")
 712 |             
 713 |             return {
 714 |                 "error": f"Failed to parse YOLO results: {str(e)}",
 715 |                 "command": result.get("command", ""),
 716 |                 "source": image_data if is_path else "base64_image",
 717 |                 "stdout": result.get("stdout", ""),
 718 |                 "stderr": result.get("stderr", "")
 719 |             }
 720 |             
 721 |     except Exception as e:
 722 |         logger.error(f"Error in classify_image: {str(e)}")
 723 |         return {
 724 |             "error": f"Failed to classify image: {str(e)}",
 725 |             "source": image_data if is_path else "base64_image"
 726 |         }
 727 | 
 728 | @mcp.tool()
 729 | def track_objects(
 730 |     image_data: str,
 731 |     model_name: str = "yolov8n.pt",
 732 |     confidence: float = 0.25,
 733 |     tracker: str = "bytetrack.yaml",
 734 |     save_results: bool = False
 735 | ) -> Dict[str, Any]:
 736 |     """
 737 |     Track objects in an image sequence using YOLO CLI
 738 |     
 739 |     Args:
 740 |         image_data: Base64 encoded image
 741 |         model_name: YOLO model name
 742 |         confidence: Detection confidence threshold
 743 |         tracker: Tracker name to use (e.g., 'bytetrack.yaml', 'botsort.yaml')
 744 |         save_results: Whether to save results to disk
 745 |         
 746 |     Returns:
 747 |         Dictionary containing tracking results
 748 |     """
 749 |     try:
 750 |         # Save base64 data to temp file
 751 |         source_path = save_base64_to_temp(image_data)
 752 |         
 753 |         # Determine full model path
 754 |         model_path = None
 755 |         for directory in CONFIG["model_dirs"]:
 756 |             potential_path = os.path.join(directory, model_name)
 757 |             if os.path.exists(potential_path):
 758 |                 model_path = potential_path
 759 |                 break
 760 |         
 761 |         if model_path is None:
 762 |             available = list_available_models()
 763 |             available_str = ", ".join(available) if available else "none"
 764 |             return {
 765 |                 "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
 766 |             }
 767 |         
 768 |         # Setup output directory if saving results
 769 |         output_dir = os.path.join(tempfile.gettempdir(), "yolo_track_results")
 770 |         if save_results and not os.path.exists(output_dir):
 771 |             os.makedirs(output_dir)
 772 |         
 773 |         # Build YOLO CLI command
 774 |         cmd_args = [
 775 |             "track",  # Combined task and mode for tracking
 776 |             f"model={model_path}",
 777 |             f"source={source_path}",
 778 |             f"conf={confidence}",
 779 |             f"tracker={tracker}",
 780 |             "format=json",  # Request JSON output for parsing
 781 |         ]
 782 |         
 783 |         if save_results:
 784 |             cmd_args.append(f"project={output_dir}")
 785 |             cmd_args.append("save=True")
 786 |         else:
 787 |             cmd_args.append("save=False")
 788 |         
 789 |         # Run YOLO CLI command
 790 |         result = run_yolo_cli(cmd_args)
 791 |         
 792 |         # Clean up temp file
 793 |         try:
 794 |             os.remove(source_path)
 795 |         except Exception as e:
 796 |             logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
 797 |         
 798 |         # Check for command success
 799 |         if not result["success"]:
 800 |             return {
 801 |                 "error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
 802 |                 "command": result.get("command", ""),
 803 |             }
 804 |         
 805 |         # Parse JSON output from stdout
 806 |         try:
 807 |             # Try to find JSON in the output
 808 |             json_start = result["stdout"].find("{")
 809 |             json_end = result["stdout"].rfind("}")
 810 |             
 811 |             if json_start >= 0 and json_end > json_start:
 812 |                 json_str = result["stdout"][json_start:json_end+1]
 813 |                 tracking_data = json.loads(json_str)
 814 |             else:
 815 |                 # If no JSON found, create a basic response
 816 |                 return {
 817 |                     "results": [],
 818 |                     "model_used": model_name,
 819 |                     "tracker": tracker,
 820 |                     "total_tracks": 0,
 821 |                     "command_output": result["stderr"]
 822 |                 }
 823 |             
 824 |             # Format results
 825 |             formatted_results = []
 826 |             
 827 |             # Parse tracking data from YOLO JSON output
 828 |             if "predictions" in tracking_data:
 829 |                 tracks = []
 830 |                 
 831 |                 for pred in tracking_data["predictions"]:
 832 |                     # Extract box coordinates
 833 |                     box = pred.get("box", {})
 834 |                     x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
 835 |                     
 836 |                     # Extract class and tracking information
 837 |                     confidence = pred.get("confidence", 0)
 838 |                     class_name = pred.get("name", "unknown")
 839 |                     class_id = pred.get("class", -1)
 840 |                     track_id = pred.get("id", -1)
 841 |                     
 842 |                     track = {
 843 |                         "box": [x1, y1, x2, y2],
 844 |                         "confidence": confidence,
 845 |                         "class_id": class_id,
 846 |                         "class_name": class_name,
 847 |                         "track_id": track_id
 848 |                     }
 849 |                     
 850 |                     tracks.append(track)
 851 |                 
 852 |                 # Get image dimensions if available
 853 |                 image_shape = [
 854 |                     tracking_data.get("width", 0),
 855 |                     tracking_data.get("height", 0)
 856 |                 ]
 857 |                 
 858 |                 formatted_results.append({
 859 |                     "tracks": tracks,
 860 |                     "image_shape": image_shape
 861 |                 })
 862 |             
 863 |             return {
 864 |                 "results": formatted_results,
 865 |                 "model_used": model_name,
 866 |                 "tracker": tracker,
 867 |                 "total_tracks": sum(len(r["tracks"]) for r in formatted_results),
 868 |                 "save_dir": output_dir if save_results else None
 869 |             }
 870 |             
 871 |         except json.JSONDecodeError as e:
 872 |             logger.error(f"Failed to parse JSON from YOLO output: {e}")
 873 |             logger.error(f"Output: {result['stdout']}")
 874 |             
 875 |             return {
 876 |                 "error": f"Failed to parse YOLO results: {str(e)}",
 877 |                 "command": result.get("command", ""),
 878 |                 "stdout": result.get("stdout", ""),
 879 |                 "stderr": result.get("stderr", "")
 880 |             }
 881 |             
 882 |     except Exception as e:
 883 |         logger.error(f"Error in track_objects: {str(e)}")
 884 |         return {
 885 |             "error": f"Failed to track objects: {str(e)}"
 886 |         }
 887 | 
 888 | @mcp.tool()
 889 | def train_model(
 890 |     dataset_path: str,
 891 |     model_name: str = "yolov8n.pt",
 892 |     epochs: int = 100,
 893 |     imgsz: int = 640,
 894 |     batch: int = 16,
 895 |     name: str = "yolo_custom_model",
 896 |     project: str = "runs/train"
 897 | ) -> Dict[str, Any]:
 898 |     """
 899 |     Train a YOLO model on a custom dataset using CLI
 900 |     
 901 |     Args:
 902 |         dataset_path: Path to YOLO format dataset
 903 |         model_name: Base model to start with
 904 |         epochs: Number of training epochs
 905 |         imgsz: Image size for training
 906 |         batch: Batch size
 907 |         name: Name for the training run
 908 |         project: Project directory
 909 |         
 910 |     Returns:
 911 |         Dictionary containing training results
 912 |     """
 913 |     # Validate dataset path
 914 |     if not os.path.exists(dataset_path):
 915 |         return {"error": f"Dataset not found: {dataset_path}"}
 916 |     
 917 |     # Determine full model path
 918 |     model_path = None
 919 |     for directory in CONFIG["model_dirs"]:
 920 |         potential_path = os.path.join(directory, model_name)
 921 |         if os.path.exists(potential_path):
 922 |             model_path = potential_path
 923 |             break
 924 |     
 925 |     if model_path is None:
 926 |         available = list_available_models()
 927 |         available_str = ", ".join(available) if available else "none"
 928 |         return {
 929 |             "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
 930 |         }
 931 |     
 932 |     # Create project directory if it doesn't exist
 933 |     if not os.path.exists(project):
 934 |         os.makedirs(project)
 935 |     
 936 |     # Determine task type based on model name
 937 |     task = "detect"  # Default task
 938 |     if "seg" in model_name:
 939 |         task = "segment"
 940 |     elif "pose" in model_name:
 941 |         task = "pose"
 942 |     elif "cls" in model_name:
 943 |         task = "classify"
 944 |     elif "obb" in model_name:
 945 |         task = "obb"
 946 |     
 947 |     # Build YOLO CLI command
 948 |     cmd_args = [
 949 |         task,  # Task
 950 |         "train",  # Mode
 951 |         f"model={model_path}",
 952 |         f"data={dataset_path}",
 953 |         f"epochs={epochs}",
 954 |         f"imgsz={imgsz}",
 955 |         f"batch={batch}",
 956 |         f"name={name}",
 957 |         f"project={project}"
 958 |     ]
 959 |     
 960 |     # Run YOLO CLI command - with longer timeout
 961 |     logger.info(f"Starting model training with {epochs} epochs - this may take a while...")
 962 |     result = run_yolo_cli(cmd_args, timeout=epochs * 300)  # 5 minutes per epoch
 963 |     
 964 |     # Check for command success
 965 |     if not result["success"]:
 966 |         return {
 967 |             "error": f"Training failed: {result.get('error', 'Unknown error')}",
 968 |             "command": result.get("command", ""),
 969 |             "stderr": result.get("stderr", "")
 970 |         }
 971 |     
 972 |     # Determine path to best model weights
 973 |     best_model_path = os.path.join(project, name, "weights", "best.pt")
 974 |     
 975 |     # Determine metrics from stdout if possible
 976 |     metrics = {}
 977 |     try:
 978 |         # Look for metrics in output
 979 |         stdout = result.get("stdout", "")
 980 |         
 981 |         # Extract metrics from training output
 982 |         import re
 983 |         precision_match = re.search(r"Precision: ([\d\.]+)", stdout)
 984 |         recall_match = re.search(r"Recall: ([\d\.]+)", stdout)
 985 |         map50_match = re.search(r"mAP50: ([\d\.]+)", stdout)
 986 |         map_match = re.search(r"mAP50-95: ([\d\.]+)", stdout)
 987 |         
 988 |         if precision_match:
 989 |             metrics["precision"] = float(precision_match.group(1))
 990 |         if recall_match:
 991 |             metrics["recall"] = float(recall_match.group(1))
 992 |         if map50_match:
 993 |             metrics["mAP50"] = float(map50_match.group(1))
 994 |         if map_match:
 995 |             metrics["mAP50-95"] = float(map_match.group(1))
 996 |     except Exception as e:
 997 |         logger.warning(f"Failed to parse metrics from training output: {str(e)}")
 998 |     
 999 |     return {
1000 |         "status": "success",
1001 |         "model_path": best_model_path,
1002 |         "epochs_completed": epochs,
1003 |         "final_metrics": metrics,
1004 |         "training_log_sample": result.get("stdout", "")[:1000] + "..." if len(result.get("stdout", "")) > 1000 else result.get("stdout", "")
1005 |     }
1006 | 
1007 | @mcp.tool()
1008 | def validate_model(
1009 |     model_path: str,
1010 |     data_path: str,
1011 |     imgsz: int = 640,
1012 |     batch: int = 16
1013 | ) -> Dict[str, Any]:
1014 |     """
1015 |     Validate a YOLO model on a dataset using CLI
1016 |     
1017 |     Args:
1018 |         model_path: Path to YOLO model (.pt file)
1019 |         data_path: Path to YOLO format validation dataset
1020 |         imgsz: Image size for validation
1021 |         batch: Batch size
1022 |         
1023 |     Returns:
1024 |         Dictionary containing validation results
1025 |     """
1026 |     # Validate model path
1027 |     if not os.path.exists(model_path):
1028 |         return {"error": f"Model file not found: {model_path}"}
1029 |     
1030 |     # Validate dataset path
1031 |     if not os.path.exists(data_path):
1032 |         return {"error": f"Dataset not found: {data_path}"}
1033 |     
1034 |     # Determine task type based on model name
1035 |     model_name = os.path.basename(model_path)
1036 |     task = "detect"  # Default task
1037 |     if "seg" in model_name:
1038 |         task = "segment"
1039 |     elif "pose" in model_name:
1040 |         task = "pose"
1041 |     elif "cls" in model_name:
1042 |         task = "classify"
1043 |     elif "obb" in model_name:
1044 |         task = "obb"
1045 |     
1046 |     # Build YOLO CLI command
1047 |     cmd_args = [
1048 |         task,  # Task
1049 |         "val",  # Mode
1050 |         f"model={model_path}",
1051 |         f"data={data_path}",
1052 |         f"imgsz={imgsz}",
1053 |         f"batch={batch}"
1054 |     ]
1055 |     
1056 |     # Run YOLO CLI command
1057 |     result = run_yolo_cli(cmd_args, timeout=300)  # 5 minute timeout
1058 |     
1059 |     # Check for command success
1060 |     if not result["success"]:
1061 |         return {
1062 |             "error": f"Validation failed: {result.get('error', 'Unknown error')}",
1063 |             "command": result.get("command", ""),
1064 |             "stderr": result.get("stderr", "")
1065 |         }
1066 |     
1067 |     # Extract metrics from validation output
1068 |     metrics = {}
1069 |     try:
1070 |         stdout = result.get("stdout", "")
1071 |         
1072 |         import re
1073 |         precision_match = re.search(r"Precision: ([\d\.]+)", stdout)
1074 |         recall_match = re.search(r"Recall: ([\d\.]+)", stdout)
1075 |         map50_match = re.search(r"mAP50: ([\d\.]+)", stdout)
1076 |         map_match = re.search(r"mAP50-95: ([\d\.]+)", stdout)
1077 |         
1078 |         if precision_match:
1079 |             metrics["precision"] = float(precision_match.group(1))
1080 |         if recall_match:
1081 |             metrics["recall"] = float(recall_match.group(1))
1082 |         if map50_match:
1083 |             metrics["mAP50"] = float(map50_match.group(1))
1084 |         if map_match:
1085 |             metrics["mAP50-95"] = float(map_match.group(1))
1086 |     except Exception as e:
1087 |         logger.warning(f"Failed to parse metrics from validation output: {str(e)}")
1088 |     
1089 |     return {
1090 |         "status": "success",
1091 |         "metrics": metrics,
1092 |         "validation_output": result.get("stdout", "")[:1000] + "..." if len(result.get("stdout", "")) > 1000 else result.get("stdout", "")
1093 |     }
1094 | 
1095 | @mcp.tool()
1096 | def export_model(
1097 |     model_path: str,
1098 |     format: str = "onnx",
1099 |     imgsz: int = 640
1100 | ) -> Dict[str, Any]:
1101 |     """
1102 |     Export a YOLO model to different formats using CLI
1103 |     
1104 |     Args:
1105 |         model_path: Path to YOLO model (.pt file)
1106 |         format: Export format (onnx, torchscript, openvino, etc.)
1107 |         imgsz: Image size for export
1108 |         
1109 |     Returns:
1110 |         Dictionary containing export results
1111 |     """
1112 |     # Validate model path
1113 |     if not os.path.exists(model_path):
1114 |         return {"error": f"Model file not found: {model_path}"}
1115 |     
1116 |     # Valid export formats
1117 |     valid_formats = [
1118 |         "torchscript", "onnx", "openvino", "engine", "coreml", "saved_model", 
1119 |         "pb", "tflite", "edgetpu", "tfjs", "paddle"
1120 |     ]
1121 |     
1122 |     if format not in valid_formats:
1123 |         return {"error": f"Invalid export format: {format}. Valid formats include: {', '.join(valid_formats)}"}
1124 |     
1125 |     # Build YOLO CLI command
1126 |     cmd_args = [
1127 |         "export",  # Combined task and mode for export
1128 |         f"model={model_path}",
1129 |         f"format={format}",
1130 |         f"imgsz={imgsz}"
1131 |     ]
1132 |     
1133 |     # Run YOLO CLI command
1134 |     result = run_yolo_cli(cmd_args, timeout=300)  # 5 minute timeout
1135 |     
1136 |     # Check for command success
1137 |     if not result["success"]:
1138 |         return {
1139 |             "error": f"Export failed: {result.get('error', 'Unknown error')}",
1140 |             "command": result.get("command", ""),
1141 |             "stderr": result.get("stderr", "")
1142 |         }
1143 |     
1144 |     # Try to determine export path
1145 |     export_path = None
1146 |     try:
1147 |         # Model path without extension
1148 |         base_path = os.path.splitext(model_path)[0]
1149 |         
1150 |         # Expected export paths based on format
1151 |         format_extensions = {
1152 |             "torchscript": ".torchscript",
1153 |             "onnx": ".onnx",
1154 |             "openvino": "_openvino_model",
1155 |             "engine": ".engine",
1156 |             "coreml": ".mlmodel",
1157 |             "saved_model": "_saved_model",
1158 |             "pb": ".pb",
1159 |             "tflite": ".tflite",
1160 |             "edgetpu": "_edgetpu.tflite",
1161 |             "tfjs": "_web_model",
1162 |             "paddle": "_paddle_model"
1163 |         }
1164 |         
1165 |         expected_ext = format_extensions.get(format, "")
1166 |         expected_path = base_path + expected_ext
1167 |         
1168 |         # Check if the exported file exists
1169 |         if os.path.exists(expected_path) or os.path.isdir(expected_path):
1170 |             export_path = expected_path
1171 |     except Exception as e:
1172 |         logger.warning(f"Failed to determine export path: {str(e)}")
1173 |     
1174 |     return {
1175 |         "status": "success",
1176 |         "export_path": export_path,
1177 |         "format": format,
1178 |         "export_output": result.get("stdout", "")[:1000] + "..." if len(result.get("stdout", "")) > 1000 else result.get("stdout", "")
1179 |     }
1180 | 
1181 | @mcp.tool()
1182 | def list_available_models() -> List[str]:
1183 |     """List available YOLO models that actually exist on disk in any configured directory"""
1184 |     # Common YOLO model patterns
1185 |     model_patterns = [
1186 |         "yolov11*.pt", 
1187 |         "yolov8*.pt"
1188 |     ]
1189 |     
1190 |     # Find all existing models in all configured directories
1191 |     available_models = set()
1192 |     for directory in CONFIG["model_dirs"]:
1193 |         if not os.path.exists(directory):
1194 |             continue
1195 |             
1196 |         # Check for model files directly
1197 |         for filename in os.listdir(directory):
1198 |             if filename.endswith(".pt") and any(
1199 |                 fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
1200 |             ):
1201 |                 available_models.add(filename)
1202 |     
1203 |     # Convert to sorted list
1204 |     result = sorted(list(available_models))
1205 |     
1206 |     if not result:
1207 |         logger.warning("No model files found in configured directories.")
1208 |         return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
1209 |     
1210 |     return result
1211 | 
1212 | # Camera detection functions using CLI instead of Python API
1213 | def camera_detection_thread(model_name, confidence, fps_limit=30, camera_id=0):
1214 |     """Background thread for camera detection using YOLO CLI"""
1215 |     global camera_running, detection_results, camera_last_access_time, camera_startup_status, camera_last_error
1216 |     
1217 |     try:
1218 |         # Create a unique directory for camera results
1219 |         output_dir = os.path.join(tempfile.gettempdir(), f"yolo_camera_{int(time.time())}")
1220 |         os.makedirs(output_dir, exist_ok=True)
1221 |         
1222 |         # Determine full model path
1223 |         model_path = None
1224 |         for directory in CONFIG["model_dirs"]:
1225 |             potential_path = os.path.join(directory, model_name)
1226 |             if os.path.exists(potential_path):
1227 |                 model_path = potential_path
1228 |                 break
1229 |         
1230 |         if model_path is None:
1231 |             error_msg = f"Model {model_name} not found in any configured directories"
1232 |             logger.error(error_msg)
1233 |             camera_running = False
1234 |             camera_startup_status = {
1235 |                 "success": False,
1236 |                 "error": error_msg,
1237 |                 "timestamp": time.time()
1238 |             }
1239 |             detection_results.append({
1240 |                 "timestamp": time.time(),
1241 |                 "error": f"Failed to load model: Model not found",
1242 |                 "camera_status": "error",
1243 |                 "detections": []
1244 |             })
1245 |             return
1246 |         
1247 |         # Log camera start
1248 |         logger.info(f"Starting camera detection with model {model_name}, camera ID {camera_id}")
1249 |         detection_results.append({
1250 |             "timestamp": time.time(),
1251 |             "system_info": {
1252 |                 "os": platform.system() if 'platform' in globals() else "Unknown",
1253 |                 "camera_id": camera_id
1254 |             },
1255 |             "camera_status": "starting",
1256 |             "detections": []
1257 |         })
1258 |         
1259 |         # Determine task type based on model name
1260 |         task = "detect"  # Default task
1261 |         if "seg" in model_name:
1262 |             task = "segment"
1263 |         elif "pose" in model_name:
1264 |             task = "pose"
1265 |         elif "cls" in model_name:
1266 |             task = "classify"
1267 |         
1268 |         # Build YOLO CLI command
1269 |         base_cmd_args = [
1270 |             task,  # Task
1271 |             "predict",  # Mode
1272 |             f"model={model_path}",
1273 |             f"source={camera_id}",  # Camera source ID
1274 |             f"conf={confidence}",
1275 |             "format=json",
1276 |             "save=False",  # Don't save frames by default
1277 |             "show=False"   # Don't show GUI window
1278 |         ]
1279 |         
1280 |         # First verify YOLO command is available
1281 |         logger.info("Verifying YOLO CLI availability before starting camera...")
1282 |         check_cmd = ["yolo", "--version"]
1283 |         try:
1284 |             check_result = subprocess.run(
1285 |                 check_cmd,
1286 |                 capture_output=True,
1287 |                 text=True,
1288 |                 check=False,
1289 |                 timeout=10
1290 |             )
1291 |             
1292 |             if check_result.returncode != 0:
1293 |                 error_msg = f"YOLO CLI check failed with code {check_result.returncode}: {check_result.stderr}"
1294 |                 logger.error(error_msg)
1295 |                 camera_running = False
1296 |                 camera_startup_status = {
1297 |                     "success": False,
1298 |                     "error": error_msg,
1299 |                     "timestamp": time.time()
1300 |                 }
1301 |                 detection_results.append({
1302 |                     "timestamp": time.time(),
1303 |                     "error": error_msg,
1304 |                     "camera_status": "error",
1305 |                     "detections": []
1306 |                 })
1307 |                 return
1308 |                 
1309 |             logger.info(f"YOLO CLI is available: {check_result.stdout.strip()}")
1310 |         except Exception as e:
1311 |             error_msg = f"Error checking YOLO CLI: {str(e)}"
1312 |             logger.error(error_msg)
1313 |             camera_running = False
1314 |             camera_startup_status = {
1315 |                 "success": False,
1316 |                 "error": error_msg,
1317 |                 "timestamp": time.time()
1318 |             }
1319 |             detection_results.append({
1320 |                 "timestamp": time.time(),
1321 |                 "error": error_msg,
1322 |                 "camera_status": "error",
1323 |                 "detections": []
1324 |             })
1325 |             return
1326 |             
1327 |         # Set up subprocess for ongoing camera capture
1328 |         process = None
1329 |         frame_count = 0
1330 |         error_count = 0
1331 |         start_time = time.time()
1332 |         
1333 |         # Start YOLO CLI process
1334 |         cmd_str = "yolo " + " ".join(base_cmd_args)
1335 |         logger.info(f"Starting YOLO CLI process: {cmd_str}")
1336 |         
1337 |         try:
1338 |             process = subprocess.Popen(
1339 |                 ["yolo"] + base_cmd_args,
1340 |                 stdin=subprocess.PIPE,
1341 |                 stdout=subprocess.PIPE,
1342 |                 stderr=subprocess.PIPE,
1343 |                 text=True,
1344 |                 bufsize=1,  # Line buffered
1345 |             )
1346 |             
1347 |             # Wait a moment to check if the process immediately fails
1348 |             time.sleep(1)
1349 |             if process.poll() is not None:
1350 |                 error_msg = f"YOLO process failed to start (exit code {process.returncode})"
1351 |                 stderr_output = process.stderr.read()
1352 |                 logger.error(f"{error_msg} - STDERR: {stderr_output}")
1353 |                 
1354 |                 camera_running = False
1355 |                 camera_startup_status = {
1356 |                     "success": False,
1357 |                     "error": error_msg,
1358 |                     "stderr": stderr_output,
1359 |                     "timestamp": time.time()
1360 |                 }
1361 |                 detection_results.append({
1362 |                     "timestamp": time.time(),
1363 |                     "error": error_msg,
1364 |                     "stderr": stderr_output,
1365 |                     "camera_status": "error",
1366 |                     "detections": []
1367 |                 })
1368 |                 return
1369 |                 
1370 |             # Process started successfully
1371 |             camera_startup_status = {
1372 |                 "success": True,
1373 |                 "timestamp": time.time()
1374 |             }
1375 |             
1376 |             # Handle camera stream
1377 |             while camera_running:
1378 |                 # Read output line from process
1379 |                 stdout_line = process.stdout.readline().strip()
1380 |                 
1381 |                 if not stdout_line:
1382 |                     # Check if process is still running
1383 |                     if process.poll() is not None:
1384 |                         error_msg = f"YOLO process ended unexpectedly with code {process.returncode}"
1385 |                         stderr_output = process.stderr.read()
1386 |                         logger.error(f"{error_msg} - STDERR: {stderr_output}")
1387 |                         
1388 |                         camera_running = False
1389 |                         camera_last_error = {
1390 |                             "error": error_msg,
1391 |                             "stderr": stderr_output,
1392 |                             "timestamp": time.time()
1393 |                         }
1394 |                         detection_results.append({
1395 |                             "timestamp": time.time(),
1396 |                             "error": error_msg,
1397 |                             "camera_status": "error",
1398 |                             "stderr": stderr_output,
1399 |                             "detections": []
1400 |                         })
1401 |                         break
1402 |                     
1403 |                     time.sleep(0.1)  # Short sleep to avoid CPU spin
1404 |                     continue
1405 |                 
1406 |                 # Try to parse JSON output from YOLO
1407 |                 try:
1408 |                     # Find JSON in the output line
1409 |                     json_start = stdout_line.find("{")
1410 |                     if json_start >= 0:
1411 |                         json_str = stdout_line[json_start:]
1412 |                         detection_data = json.loads(json_str)
1413 |                         
1414 |                         frame_count += 1
1415 |                         
1416 |                         # Process detection data
1417 |                         if "predictions" in detection_data:
1418 |                             detections = []
1419 |                             
1420 |                             for pred in detection_data["predictions"]:
1421 |                                 # Extract box coordinates
1422 |                                 box = pred.get("box", {})
1423 |                                 x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
1424 |                                 
1425 |                                 # Extract class information
1426 |                                 confidence = pred.get("confidence", 0)
1427 |                                 class_name = pred.get("name", "unknown")
1428 |                                 class_id = pred.get("class", -1)
1429 |                                 
1430 |                                 detections.append({
1431 |                                     "box": [x1, y1, x2, y2],
1432 |                                     "confidence": confidence,
1433 |                                     "class_id": class_id,
1434 |                                     "class_name": class_name
1435 |                                 })
1436 |                             
1437 |                             # Update detection results (keep only the last 10)
1438 |                             if len(detection_results) >= 10:
1439 |                                 detection_results.pop(0)
1440 |                                 
1441 |                             # Get image dimensions if available
1442 |                             image_shape = [
1443 |                                 detection_data.get("width", 0),
1444 |                                 detection_data.get("height", 0)
1445 |                             ]
1446 |                             
1447 |                             detection_results.append({
1448 |                                 "timestamp": time.time(),
1449 |                                 "frame_count": frame_count,
1450 |                                 "detections": detections,
1451 |                                 "camera_status": "running",
1452 |                                 "image_shape": image_shape
1453 |                             })
1454 |                             
1455 |                             # Update last access time when processing frames
1456 |                             camera_last_access_time = time.time()
1457 |                             
1458 |                             # Log occasional status
1459 |                             if frame_count % 30 == 0:
1460 |                                 fps = frame_count / (time.time() - start_time)
1461 |                                 logger.info(f"Camera running: processed {frame_count} frames ({fps:.1f} FPS)")
1462 |                                 detection_count = sum(len(r.get("detections", [])) for r in detection_results if "detections" in r)
1463 |                                 logger.info(f"Total detections in current buffer: {detection_count}")
1464 |                         
1465 |                 except json.JSONDecodeError:
1466 |                     # Not all lines will be valid JSON, that's normal
1467 |                     pass
1468 |                 except Exception as e:
1469 |                     error_msg = f"Error processing camera output: {str(e)}"
1470 |                     logger.warning(error_msg)
1471 |                     error_count += 1
1472 |                     
1473 |                     if error_count > 10:
1474 |                         logger.error("Too many processing errors, stopping camera")
1475 |                         camera_running = False
1476 |                         camera_last_error = {
1477 |                             "error": "Too many processing errors",
1478 |                             "timestamp": time.time()
1479 |                         }
1480 |                         break
1481 |                 
1482 |         except Exception as e:
1483 |             error_msg = f"Error in camera process management: {str(e)}"
1484 |             logger.error(error_msg)
1485 |             camera_running = False
1486 |             camera_startup_status = {
1487 |                 "success": False,
1488 |                 "error": error_msg,
1489 |                 "timestamp": time.time()
1490 |             }
1491 |             detection_results.append({
1492 |                 "timestamp": time.time(),
1493 |                 "error": error_msg,
1494 |                 "camera_status": "error",
1495 |                 "detections": []
1496 |             })
1497 |             return
1498 |             
1499 |     except Exception as e:
1500 |         error_msg = f"Error in camera thread: {str(e)}"
1501 |         logger.error(error_msg)
1502 |         camera_running = False
1503 |         camera_startup_status = {
1504 |             "success": False,
1505 |             "error": error_msg,
1506 |             "timestamp": time.time()
1507 |         }
1508 |         detection_results.append({
1509 |             "timestamp": time.time(),
1510 |             "error": error_msg,
1511 |             "camera_status": "error",
1512 |             "detections": []
1513 |         })
1514 |     
1515 |     finally:
1516 |         # Clean up
1517 |         logger.info("Shutting down camera...")
1518 |         camera_running = False
1519 |         
1520 |         if process is not None and process.poll() is None:
1521 |             try:
1522 |                 # Terminate process
1523 |                 process.terminate()
1524 |                 process.wait(timeout=5)
1525 |             except subprocess.TimeoutExpired:
1526 |                 process.kill()  # Force kill if terminate doesn't work
1527 |             except Exception as e:
1528 |                 logger.error(f"Error terminating YOLO process: {str(e)}")
1529 |         
1530 |         logger.info("Camera detection stopped")
1531 | 
1532 | 
1533 | @mcp.tool()
1534 | def start_camera_detection(
1535 |     model_name: str = "yolov8n.pt",
1536 |     confidence: float = 0.25,
1537 |     camera_id: int = 0
1538 | ) -> Dict[str, Any]:
1539 |     """
1540 |     Start realtime object detection using the computer's camera via YOLO CLI
1541 |     
1542 |     Args:
1543 |         model_name: YOLO model name to use
1544 |         confidence: Detection confidence threshold
1545 |         camera_id: Camera device ID (0 is usually the default camera)
1546 |         
1547 |     Returns:
1548 |         Status of camera detection
1549 |     """
1550 |     global camera_thread, camera_running, detection_results, camera_last_access_time, camera_startup_status, camera_last_error
1551 |     
1552 |     # Reset status variables
1553 |     camera_startup_status = None
1554 |     camera_last_error = None
1555 |     
1556 |     # Check if already running
1557 |     if camera_running:
1558 |         # Update last access time
1559 |         camera_last_access_time = time.time()
1560 |         return {"status": "success", "message": "Camera detection is already running"}
1561 |     
1562 |     # Clear previous results
1563 |     detection_results = []
1564 |     
1565 |     # First check if YOLO CLI is available
1566 |     try:
1567 |         version_check = run_yolo_cli(["--version"], timeout=10)
1568 |         if not version_check["success"]:
1569 |             return {
1570 |                 "status": "error",
1571 |                 "message": "YOLO CLI not available or not properly installed",
1572 |                 "details": version_check.get("error", "Unknown error"),
1573 |                 "solution": "Please make sure the 'yolo' command is in your PATH"
1574 |             }
1575 |     except Exception as e:
1576 |         error_msg = f"Error checking YOLO CLI: {str(e)}"
1577 |         logger.error(error_msg)
1578 |         return {
1579 |             "status": "error",
1580 |             "message": error_msg,
1581 |             "solution": "Please make sure the 'yolo' command is in your PATH"
1582 |         }
1583 |     
1584 |     # Start detection thread
1585 |     camera_running = True
1586 |     camera_last_access_time = time.time()  # Update access time
1587 |     camera_thread = threading.Thread(
1588 |         target=camera_detection_thread,
1589 |         args=(model_name, confidence, 30, camera_id),
1590 |         daemon=True
1591 |     )
1592 |     camera_thread.start()
1593 |     
1594 |     # Give the thread a moment to initialize and potentially fail
1595 |     time.sleep(2)
1596 |     
1597 |     # Check if the thread has reported any startup issues
1598 |     if camera_startup_status and not camera_startup_status.get("success", False):
1599 |         # Camera thread encountered an error during startup
1600 |         return {
1601 |             "status": "error",
1602 |             "message": "Camera detection failed to start",
1603 |             "details": camera_startup_status,
1604 |             "solution": "Check logs for detailed error information"
1605 |         }
1606 |     
1607 |     # Thread is running, camera should be starting
1608 |     return {
1609 |         "status": "success",
1610 |         "message": f"Started camera detection using model {model_name}",
1611 |         "model": model_name,
1612 |         "confidence": confidence,
1613 |         "camera_id": camera_id,
1614 |         "auto_shutdown": f"Camera will auto-shutdown after {CAMERA_INACTIVITY_TIMEOUT} seconds of inactivity",
1615 |         "note": "If camera doesn't work, try different camera_id values (0, 1, or 2)"
1616 |     }
1617 | 
1618 | 
1619 | @mcp.tool()
1620 | def stop_camera_detection() -> Dict[str, Any]:
1621 |     """
1622 |     Stop realtime camera detection
1623 |     
1624 |     Returns:
1625 |         Status message
1626 |     """
1627 |     global camera_running
1628 |     
1629 |     if not camera_running:
1630 |         return {"status": "error", "message": "Camera detection is not running"}
1631 |     
1632 |     logger.info("Stopping camera detection by user request")
1633 |     camera_running = False
1634 |     
1635 |     # Wait for thread to terminate
1636 |     if camera_thread and camera_thread.is_alive():
1637 |         camera_thread.join(timeout=2.0)
1638 |     
1639 |     return {
1640 |         "status": "success",
1641 |         "message": "Stopped camera detection"
1642 |     }
1643 | 
1644 | @mcp.tool()
1645 | def get_camera_detections() -> Dict[str, Any]:
1646 |     """
1647 |     Get the latest detections from the camera
1648 |     
1649 |     Returns:
1650 |         Dictionary with recent detections
1651 |     """
1652 |     global detection_results, camera_thread, camera_last_access_time, camera_startup_status, camera_last_error
1653 |     
1654 |     # Update the last access time whenever this function is called
1655 |     if camera_running:
1656 |         camera_last_access_time = time.time()
1657 |     
1658 |     # Check if thread is alive
1659 |     thread_alive = camera_thread is not None and camera_thread.is_alive()
1660 |     
1661 |     # If camera_running is False, check if we have startup status information
1662 |     if not camera_running and camera_startup_status and not camera_startup_status.get("success", False):
1663 |         return {
1664 |             "status": "error", 
1665 |             "message": "Camera detection failed to start",
1666 |             "is_running": False,
1667 |             "camera_status": "error",
1668 |             "startup_error": camera_startup_status,
1669 |             "solution": "Check logs for detailed error information"
1670 |         }
1671 |     
1672 |     # If camera_running is True but thread is dead, there's an issue
1673 |     if camera_running and not thread_alive:
1674 |         return {
1675 |             "status": "error", 
1676 |             "message": "Camera thread has stopped unexpectedly",
1677 |             "is_running": False,
1678 |             "camera_status": "error",
1679 |             "thread_alive": thread_alive,
1680 |             "last_error": camera_last_error,
1681 |             "detections": detection_results,
1682 |             "count": len(detection_results),
1683 |             "solution": "Please try restart the camera with a different camera_id"
1684 |         }
1685 |     
1686 |     if not camera_running:
1687 |         return {
1688 |             "status": "error", 
1689 |             "message": "Camera detection is not running",
1690 |             "is_running": False,
1691 |             "camera_status": "stopped"
1692 |         }
1693 |     
1694 |     # Check for errors in detection results
1695 |     errors = [result.get("error") for result in detection_results if "error" in result]
1696 |     recent_errors = errors[-5:] if errors else []
1697 |     
1698 |     # Count actual detections
1699 |     detection_count = sum(len(result.get("detections", [])) for result in detection_results if "detections" in result)
1700 |     
1701 |     return {
1702 |         "status": "success",
1703 |         "is_running": camera_running,
1704 |         "thread_alive": thread_alive,
1705 |         "detections": detection_results,
1706 |         "count": len(detection_results),
1707 |         "total_detections": detection_count,
1708 |         "recent_errors": recent_errors if recent_errors else None,
1709 |         "camera_status": "error" if recent_errors else "running",
1710 |         "inactivity_timeout": {
1711 |             "seconds_remaining": int(CAMERA_INACTIVITY_TIMEOUT - (time.time() - camera_last_access_time)),
1712 |             "last_access": camera_last_access_time
1713 |         }
1714 |     }
1715 | 
1716 | 
1717 | @mcp.tool()
1718 | def comprehensive_image_analysis(
1719 |     image_path: str,
1720 |     confidence: float = 0.25,
1721 |     save_results: bool = False
1722 | ) -> Dict[str, Any]:
1723 |     """
1724 |     Perform comprehensive analysis on an image by combining multiple CLI model results
1725 |     
1726 |     Args:
1727 |         image_path: Path to the image file
1728 |         confidence: Detection confidence threshold
1729 |         save_results: Whether to save results to disk
1730 |         
1731 |     Returns:
1732 |         Dictionary containing comprehensive analysis results
1733 |     """
1734 |     try:
1735 |         if not os.path.exists(image_path):
1736 |             return {"error": f"Image file not found: {image_path}"}
1737 |         
1738 |         analysis_results = {}
1739 |         
1740 |         # 1. Object detection
1741 |         logger.info("Running object detection for comprehensive analysis")
1742 |         object_result = detect_objects(
1743 |             image_data=image_path,
1744 |             model_name="yolov11n.pt",
1745 |             confidence=confidence,
1746 |             save_results=save_results,
1747 |             is_path=True
1748 |         )
1749 |         
1750 |         # Process object detection results
1751 |         detected_objects = []
1752 |         if "results" in object_result and object_result["results"]:
1753 |             for result in object_result["results"]:
1754 |                 for obj in result.get("detections", []):
1755 |                     detected_objects.append({
1756 |                         "class_name": obj.get("class_name", "unknown"),
1757 |                         "confidence": obj.get("confidence", 0)
1758 |                     })
1759 |         analysis_results["objects"] = detected_objects
1760 |         
1761 |         # 2. Scene classification
1762 |         try:
1763 |             logger.info("Running classification for comprehensive analysis")
1764 |             cls_result = classify_image(
1765 |                 image_data=image_path,
1766 |                 model_name="yolov8n-cls.pt",
1767 |                 top_k=3,
1768 |                 save_results=False,
1769 |                 is_path=True
1770 |             )
1771 |             
1772 |             scene_classifications = []
1773 |             if "results" in cls_result and cls_result["results"]:
1774 |                 for result in cls_result["results"]:
1775 |                     for cls in result.get("classifications", []):
1776 |                         scene_classifications.append({
1777 |                             "class_name": cls.get("class_name", "unknown"),
1778 |                             "probability": cls.get("probability", 0)
1779 |                         })
1780 |             analysis_results["scene"] = scene_classifications
1781 |         except Exception as e:
1782 |             logger.error(f"Error during scene classification: {str(e)}")
1783 |             analysis_results["scene_error"] = str(e)
1784 |         
1785 |         # 3. Human pose detection (if pose model is available)
1786 |         try:
1787 |             # Check if pose model exists
1788 |             pose_model_exists = False
1789 |             for directory in CONFIG["model_dirs"]:
1790 |                 if os.path.exists(os.path.join(directory, "yolov8n-pose.pt")):
1791 |                     pose_model_exists = True
1792 |                     break
1793 |             
1794 |             if pose_model_exists:
1795 |                 logger.info("Running pose detection for comprehensive analysis")
1796 |                 # Build YOLO CLI command for pose detection
1797 |                 cmd_args = [
1798 |                     "pose",  # Task
1799 |                     "predict",  # Mode
1800 |                     f"model=yolov8n-pose.pt",
1801 |                     f"source={image_path}",
1802 |                     f"conf={confidence}",
1803 |                     "format=json",
1804 |                 ]
1805 |                 
1806 |                 result = run_yolo_cli(cmd_args)
1807 |                 
1808 |                 if result["success"]:
1809 |                     # Parse JSON output
1810 |                     json_start = result["stdout"].find("{")
1811 |                     json_end = result["stdout"].rfind("}")
1812 |                     
1813 |                     if json_start >= 0 and json_end > json_start:
1814 |                         json_str = result["stdout"][json_start:json_end+1]
1815 |                         pose_data = json.loads(json_str)
1816 |                         
1817 |                         detected_poses = []
1818 |                         if "predictions" in pose_data:
1819 |                             for pred in pose_data["predictions"]:
1820 |                                 confidence = pred.get("confidence", 0)
1821 |                                 keypoints = pred.get("keypoints", [])
1822 |                                 
1823 |                                 detected_poses.append({
1824 |                                     "person_confidence": confidence,
1825 |                                     "has_keypoints": len(keypoints) if keypoints else 0
1826 |                                 })
1827 |                         
1828 |                         analysis_results["poses"] = detected_poses
1829 |             else:
1830 |                 analysis_results["pose_error"] = "Pose model not available"
1831 |                 
1832 |         except Exception as e:
1833 |             logger.error(f"Error during pose detection: {str(e)}")
1834 |             analysis_results["pose_error"] = str(e)
1835 |         
1836 |         # 4. Comprehensive task description
1837 |         tasks = []
1838 |         
1839 |         # Detect main objects
1840 |         main_objects = [obj["class_name"] for obj in detected_objects if obj["confidence"] > 0.5]
1841 |         if "person" in main_objects:
1842 |             tasks.append("Person Detection")
1843 |         
1844 |         # Check for weapon objects
1845 |         weapon_objects = ["sword", "knife", "katana", "gun", "pistol", "rifle"]
1846 |         weapons = [obj for obj in main_objects if any(weapon in obj.lower() for weapon in weapon_objects)]
1847 |         if weapons:
1848 |             tasks.append(f"Weapon Detection ({', '.join(weapons)})")
1849 |         
1850 |         # Count people
1851 |         person_count = main_objects.count("person")
1852 |         if person_count > 0:
1853 |             tasks.append(f"Person Count ({person_count} people)")
1854 |         
1855 |         # Pose analysis
1856 |         if "poses" in analysis_results and analysis_results["poses"]:
1857 |             tasks.append("Human Pose Analysis")
1858 |         
1859 |         # Scene classification
1860 |         if "scene" in analysis_results and analysis_results["scene"]:
1861 |             scene_types = [scene["class_name"] for scene in analysis_results["scene"][:2]]
1862 |             tasks.append(f"Scene Classification ({', '.join(scene_types)})")
1863 |         
1864 |         analysis_results["identified_tasks"] = tasks
1865 |         
1866 |         # Return comprehensive results
1867 |         return {
1868 |             "status": "success",
1869 |             "image_path": image_path,
1870 |             "analysis": analysis_results,
1871 |             "summary": "Tasks identified in the image: " + ", ".join(tasks) if tasks else "No clear tasks identified"
1872 |         }
1873 |     except Exception as e:
1874 |         return {
1875 |             "status": "error",
1876 |             "image_path": image_path,
1877 |             "error": f"Comprehensive analysis failed: {str(e)}"
1878 |         }
1879 | 
1880 | @mcp.tool()
1881 | def analyze_image_from_path(
1882 |     image_path: str,
1883 |     model_name: str = "yolov8n.pt",
1884 |     confidence: float = 0.25,
1885 |     save_results: bool = False
1886 | ) -> Dict[str, Any]:
1887 |     """
1888 |     Analyze image from file path using YOLO CLI
1889 |     
1890 |     Args:
1891 |         image_path: Path to the image file
1892 |         model_name: YOLO model name
1893 |         confidence: Detection confidence threshold
1894 |         save_results: Whether to save results to disk
1895 |         
1896 |     Returns:
1897 |         Dictionary containing detection results
1898 |     """
1899 |     try:
1900 |         # Call detect_objects function with is_path=True
1901 |         return detect_objects(
1902 |             image_data=image_path,
1903 |             model_name=model_name,
1904 |             confidence=confidence,
1905 |             save_results=save_results,
1906 |             is_path=True
1907 |         )
1908 |     except Exception as e:
1909 |         return {
1910 |             "error": f"Failed to analyze image: {str(e)}",
1911 |             "image_path": image_path
1912 |         }
1913 | 
1914 | @mcp.tool()
1915 | def test_connection() -> Dict[str, Any]:
1916 |     """
1917 |     Test if YOLO CLI service is running properly
1918 |     
1919 |     Returns:
1920 |         Status information and available tools
1921 |     """
1922 |     # Test YOLO CLI availability
1923 |     try:
1924 |         version_result = run_yolo_cli(["--version"], timeout=10)
1925 |         yolo_version = version_result.get("stdout", "Unknown") if version_result.get("success") else "Not available"
1926 |         
1927 |         # Clean up version string
1928 |         if "ultralytics" in yolo_version.lower():
1929 |             yolo_version = yolo_version.strip()
1930 |         else:
1931 |             yolo_version = "YOLO CLI not found or not responding correctly"
1932 |     except Exception as e:
1933 |         yolo_version = f"Error checking YOLO CLI: {str(e)}"
1934 |     
1935 |     return {
1936 |         "status": "YOLO CLI service is running normally",
1937 |         "yolo_version": yolo_version,
1938 |         "available_models": list_available_models(),
1939 |         "available_tools": [
1940 |             "list_available_models", "detect_objects", "segment_objects", 
1941 |             "classify_image", "track_objects", "train_model", "validate_model", 
1942 |             "export_model", "start_camera_detection", "stop_camera_detection", 
1943 |             "get_camera_detections", "test_connection",
1944 |             # Additional tools
1945 |             "analyze_image_from_path",
1946 |             "comprehensive_image_analysis"
1947 |         ],
1948 |         "features": [
1949 |             "All detection functions use YOLO CLI rather than Python API",
1950 |             "Support for loading images directly from file paths",
1951 |             "Support for comprehensive image analysis with task identification",
1952 |             "Support for camera detection using YOLO CLI"
1953 |         ]
1954 |     }
1955 | 
1956 | def cleanup_resources():
1957 |     """Clean up resources when the server is shutting down"""
1958 |     global camera_running
1959 |     
1960 |     logger.info("Cleaning up resources...")
1961 |     
1962 |     # Stop camera if it's running
1963 |     if camera_running:
1964 |         logger.info("Shutting down camera during server exit")
1965 |         camera_running = False
1966 |         
1967 |         # Give the camera thread a moment to clean up
1968 |         if camera_thread and camera_thread.is_alive():
1969 |             camera_thread.join(timeout=2.0)
1970 |     
1971 |     logger.info("Cleanup complete")
1972 | 
1973 | def signal_handler(sig, frame):
1974 |     """Handle termination signals"""
1975 |     logger.info(f"Received signal {sig}, shutting down...")
1976 |     cleanup_resources()
1977 |     sys.exit(0)
1978 | 
1979 | def start_watchdog():
1980 |     """Start the camera watchdog thread"""
1981 |     watchdog = threading.Thread(
1982 |         target=camera_watchdog_thread,
1983 |         daemon=True
1984 |     )
1985 |     watchdog.start()
1986 |     return watchdog
1987 | 
1988 | # Register cleanup functions
1989 | atexit.register(cleanup_resources)
1990 | signal.signal(signal.SIGINT, signal_handler)
1991 | signal.signal(signal.SIGTERM, signal_handler)
1992 | 
1993 | # Modify the main execution section
1994 | if __name__ == "__main__":
1995 |     import platform
1996 |     
1997 |     logger.info("Starting YOLO CLI service")
1998 |     logger.info(f"Platform: {platform.system()} {platform.release()}")
1999 |     
2000 |     # Test if YOLO CLI is available
2001 |     try:
2002 |         test_result = run_yolo_cli(["--version"], timeout=10)
2003 |         if test_result["success"]:
2004 |             logger.info(f"YOLO CLI available: {test_result.get('stdout', '').strip()}")
2005 |         else:
2006 |             logger.warning(f"YOLO CLI test failed: {test_result.get('stderr', '')}")
2007 |             logger.warning("Service may not function correctly without YOLO CLI available")
2008 |     except Exception as e:
2009 |         logger.error(f"Error testing YOLO CLI: {str(e)}")
2010 |         logger.warning("Service may not function correctly without YOLO CLI available")
2011 |     
2012 |     # Start the camera watchdog thread
2013 |     watchdog_thread = start_watchdog()
2014 |     
2015 |     # Initialize and run server
2016 |     logger.info("Starting MCP server...")
2017 |     mcp.run(transport='stdio')
```
Page 2/2FirstPrevNextLast