This is page 2 of 2. Use http://codebase.md/gongrzhe/yolo-mcp-server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .gitignore
├── LICENSE
├── mcp-config.json
├── Readme.md
├── requirements.txt
├── server_cli.py
├── server_combine_terminal.py
├── server.py
└── setup.py
```
# Files
--------------------------------------------------------------------------------
/server_cli.py:
--------------------------------------------------------------------------------
```python
1 | # server.py - CLI version
2 | import fnmatch
3 | import os
4 | import base64
5 | import cv2
6 | import time
7 | import threading
8 | import subprocess
9 | import json
10 | import tempfile
11 | import platform
12 | from io import BytesIO
13 | from typing import List, Dict, Any, Optional, Union
14 | import numpy as np
15 | from PIL import Image
16 |
17 | from mcp.server.fastmcp import FastMCP
18 |
19 | # Set up logging configuration
20 | import os.path
21 | import sys
22 | import logging
23 | import contextlib
24 | import signal
25 | import atexit
26 |
27 | logging.basicConfig(
28 | level=logging.INFO,
29 | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
30 | handlers=[
31 | logging.FileHandler("yolo_service.log"),
32 | logging.StreamHandler(sys.stderr)
33 | ]
34 | )
35 | camera_startup_status = None # Will store error details if startup fails
36 | camera_last_error = None
37 | logger = logging.getLogger('yolo_service')
38 |
39 | # Global variables for camera control
40 | camera_running = False
41 | camera_thread = None
42 | detection_results = []
43 | camera_last_access_time = 0
44 | CAMERA_INACTIVITY_TIMEOUT = 60 # Auto-shutdown after 60 seconds of inactivity
45 |
46 | def camera_watchdog_thread():
47 | """Monitor thread that auto-stops the camera after inactivity"""
48 | global camera_running, camera_last_access_time
49 |
50 | logger.info("Camera watchdog thread started")
51 |
52 | while True:
53 | # Sleep for a short time to avoid excessive CPU usage
54 | time.sleep(5)
55 |
56 | # Check if camera is running
57 | if camera_running:
58 | current_time = time.time()
59 | elapsed_time = current_time - camera_last_access_time
60 |
61 | # If no access for more than the timeout, auto-stop
62 | if elapsed_time > CAMERA_INACTIVITY_TIMEOUT:
63 | logger.info(f"Auto-stopping camera after {elapsed_time:.1f} seconds of inactivity")
64 | stop_camera_detection()
65 | else:
66 | # If camera is not running, no need to check frequently
67 | time.sleep(10)
68 |
69 | def load_image(image_source, is_path=False):
70 | """
71 | Load image from file path or base64 data
72 |
73 | Args:
74 | image_source: File path or base64 encoded image data
75 | is_path: Whether image_source is a file path
76 |
77 | Returns:
78 | PIL Image object
79 | """
80 | try:
81 | if is_path:
82 | # Load image from file path
83 | if os.path.exists(image_source):
84 | return Image.open(image_source)
85 | else:
86 | raise FileNotFoundError(f"Image file not found: {image_source}")
87 | else:
88 | # Load image from base64 data
89 | image_bytes = base64.b64decode(image_source)
90 | return Image.open(BytesIO(image_bytes))
91 | except Exception as e:
92 | raise ValueError(f"Failed to load image: {str(e)}")
93 |
94 | # New function to run YOLO CLI commands
95 | def run_yolo_cli(command_args, capture_output=True, timeout=60):
96 | """
97 | Run YOLO CLI command and return the results
98 |
99 | Args:
100 | command_args: List of command arguments to pass to yolo CLI
101 | capture_output: Whether to capture and return command output
102 | timeout: Command timeout in seconds
103 |
104 | Returns:
105 | Command output or success status
106 | """
107 | # Build the complete command
108 | cmd = ["yolo"] + command_args
109 |
110 | # Log the command
111 | logger.info(f"Running YOLO CLI command: {' '.join(cmd)}")
112 |
113 | try:
114 | # Run the command
115 | result = subprocess.run(
116 | cmd,
117 | capture_output=capture_output,
118 | text=True,
119 | check=False, # Don't raise exception on non-zero exit
120 | timeout=timeout
121 | )
122 |
123 | # Check for errors
124 | if result.returncode != 0:
125 | logger.error(f"YOLO CLI command failed with code {result.returncode}")
126 | logger.error(f"stderr: {result.stderr}")
127 | return {
128 | "success": False,
129 | "error": result.stderr,
130 | "command": " ".join(cmd),
131 | "returncode": result.returncode
132 | }
133 |
134 | # Return the result
135 | if capture_output:
136 | return {
137 | "success": True,
138 | "stdout": result.stdout,
139 | "stderr": result.stderr,
140 | "command": " ".join(cmd)
141 | }
142 | else:
143 | return {"success": True, "command": " ".join(cmd)}
144 |
145 | except subprocess.TimeoutExpired:
146 | logger.error(f"YOLO CLI command timed out after {timeout} seconds")
147 | return {
148 | "success": False,
149 | "error": f"Command timed out after {timeout} seconds",
150 | "command": " ".join(cmd)
151 | }
152 | except Exception as e:
153 | logger.error(f"Error running YOLO CLI command: {str(e)}")
154 | return {
155 | "success": False,
156 | "error": str(e),
157 | "command": " ".join(cmd)
158 | }
159 |
160 | # Create MCP server
161 | mcp = FastMCP("YOLO_Service")
162 |
163 | # Global configuration
164 | CONFIG = {
165 | "model_dirs": [
166 | ".", # Current directory
167 | "./models", # Models subdirectory
168 | os.path.join(os.path.dirname(os.path.abspath(__file__)), "models"),
169 | ]
170 | }
171 |
172 | # Function to save base64 data to temp file
173 | def save_base64_to_temp(base64_data, prefix="image", suffix=".jpg"):
174 | """Save base64 encoded data to a temporary file and return the path"""
175 | try:
176 | # Create a temporary file
177 | fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix=prefix)
178 |
179 | # Decode base64 data
180 | image_data = base64.b64decode(base64_data)
181 |
182 | # Write data to file
183 | with os.fdopen(fd, 'wb') as temp_file:
184 | temp_file.write(image_data)
185 |
186 | return temp_path
187 | except Exception as e:
188 | logger.error(f"Error saving base64 to temp file: {str(e)}")
189 | raise ValueError(f"Failed to save base64 data: {str(e)}")
190 |
191 | @mcp.tool()
192 | def get_model_directories() -> Dict[str, Any]:
193 | """Get information about configured model directories and available models"""
194 | directories = []
195 |
196 | for directory in CONFIG["model_dirs"]:
197 | dir_info = {
198 | "path": directory,
199 | "exists": os.path.exists(directory),
200 | "is_directory": os.path.isdir(directory) if os.path.exists(directory) else False,
201 | "models": []
202 | }
203 |
204 | if dir_info["exists"] and dir_info["is_directory"]:
205 | for filename in os.listdir(directory):
206 | if filename.endswith(".pt"):
207 | dir_info["models"].append(filename)
208 |
209 | directories.append(dir_info)
210 |
211 | return {
212 | "configured_directories": CONFIG["model_dirs"],
213 | "directory_details": directories,
214 | "available_models": list_available_models(),
215 | "loaded_models": [] # No longer track loaded models with CLI approach
216 | }
217 |
218 | @mcp.tool()
219 | def detect_objects(
220 | image_data: str,
221 | model_name: str = "yolov8n.pt",
222 | confidence: float = 0.25,
223 | save_results: bool = False,
224 | is_path: bool = False
225 | ) -> Dict[str, Any]:
226 | """
227 | Detect objects in an image using YOLO CLI
228 |
229 | Args:
230 | image_data: Base64 encoded image or file path (if is_path=True)
231 | model_name: YOLO model name
232 | confidence: Detection confidence threshold
233 | save_results: Whether to save results to disk
234 | is_path: Whether image_data is a file path
235 |
236 | Returns:
237 | Dictionary containing detection results
238 | """
239 | try:
240 | # Determine source path
241 | if is_path:
242 | source_path = image_data
243 | if not os.path.exists(source_path):
244 | return {
245 | "error": f"Image file not found: {source_path}",
246 | "source": source_path
247 | }
248 | else:
249 | # Save base64 data to temp file
250 | source_path = save_base64_to_temp(image_data)
251 |
252 | # Determine full model path
253 | model_path = None
254 | for directory in CONFIG["model_dirs"]:
255 | potential_path = os.path.join(directory, model_name)
256 | if os.path.exists(potential_path):
257 | model_path = potential_path
258 | break
259 |
260 | if model_path is None:
261 | available = list_available_models()
262 | available_str = ", ".join(available) if available else "none"
263 | return {
264 | "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
265 | "source": image_data if is_path else "base64_image"
266 | }
267 |
268 | # Setup output directory if saving results
269 | output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
270 | if save_results and not os.path.exists(output_dir):
271 | os.makedirs(output_dir)
272 |
273 | # Build YOLO CLI command
274 | cmd_args = [
275 | "detect", # Task
276 | "predict", # Mode
277 | f"model={model_path}",
278 | f"source={source_path}",
279 | f"conf={confidence}",
280 | "format=json", # Request JSON output for parsing
281 | ]
282 |
283 | if save_results:
284 | cmd_args.append(f"project={output_dir}")
285 | cmd_args.append("save=True")
286 | else:
287 | cmd_args.append("save=False")
288 |
289 | # Run YOLO CLI command
290 | result = run_yolo_cli(cmd_args)
291 |
292 | # Clean up temp file if we created one
293 | if not is_path:
294 | try:
295 | os.remove(source_path)
296 | except Exception as e:
297 | logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
298 |
299 | # Check for command success
300 | if not result["success"]:
301 | return {
302 | "error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
303 | "command": result.get("command", ""),
304 | "source": image_data if is_path else "base64_image"
305 | }
306 |
307 | # Parse JSON output from stdout
308 | try:
309 | # Try to find JSON in the output
310 | json_start = result["stdout"].find("{")
311 | json_end = result["stdout"].rfind("}")
312 |
313 | if json_start >= 0 and json_end > json_start:
314 | json_str = result["stdout"][json_start:json_end+1]
315 | detection_data = json.loads(json_str)
316 | else:
317 | # If no JSON found, create a basic response with info from stderr
318 | return {
319 | "results": [],
320 | "model_used": model_name,
321 | "total_detections": 0,
322 | "source": image_data if is_path else "base64_image",
323 | "command_output": result["stderr"]
324 | }
325 |
326 | # Format results
327 | formatted_results = []
328 |
329 | # Parse detection data from YOLO JSON output
330 | if "predictions" in detection_data:
331 | detections = []
332 |
333 | for pred in detection_data["predictions"]:
334 | # Extract box coordinates
335 | box = pred.get("box", {})
336 | x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
337 |
338 | # Extract class information
339 | confidence = pred.get("confidence", 0)
340 | class_name = pred.get("name", "unknown")
341 | class_id = pred.get("class", -1)
342 |
343 | detections.append({
344 | "box": [x1, y1, x2, y2],
345 | "confidence": confidence,
346 | "class_id": class_id,
347 | "class_name": class_name
348 | })
349 |
350 | # Get image dimensions if available
351 | image_shape = [
352 | detection_data.get("width", 0),
353 | detection_data.get("height", 0)
354 | ]
355 |
356 | formatted_results.append({
357 | "detections": detections,
358 | "image_shape": image_shape
359 | })
360 |
361 | return {
362 | "results": formatted_results,
363 | "model_used": model_name,
364 | "total_detections": sum(len(r["detections"]) for r in formatted_results),
365 | "source": image_data if is_path else "base64_image",
366 | "save_dir": output_dir if save_results else None
367 | }
368 |
369 | except json.JSONDecodeError as e:
370 | logger.error(f"Failed to parse JSON from YOLO output: {e}")
371 | logger.error(f"Output: {result['stdout']}")
372 |
373 | return {
374 | "error": f"Failed to parse YOLO results: {str(e)}",
375 | "command": result.get("command", ""),
376 | "source": image_data if is_path else "base64_image",
377 | "stdout": result.get("stdout", ""),
378 | "stderr": result.get("stderr", "")
379 | }
380 |
381 | except Exception as e:
382 | logger.error(f"Error in detect_objects: {str(e)}")
383 | return {
384 | "error": f"Failed to detect objects: {str(e)}",
385 | "source": image_data if is_path else "base64_image"
386 | }
387 |
388 | @mcp.tool()
389 | def segment_objects(
390 | image_data: str,
391 | model_name: str = "yolov11n-seg.pt",
392 | confidence: float = 0.25,
393 | save_results: bool = False,
394 | is_path: bool = False
395 | ) -> Dict[str, Any]:
396 | """
397 | Perform instance segmentation on an image using YOLO CLI
398 |
399 | Args:
400 | image_data: Base64 encoded image or file path (if is_path=True)
401 | model_name: YOLO segmentation model name
402 | confidence: Detection confidence threshold
403 | save_results: Whether to save results to disk
404 | is_path: Whether image_data is a file path
405 |
406 | Returns:
407 | Dictionary containing segmentation results
408 | """
409 | try:
410 | # Determine source path
411 | if is_path:
412 | source_path = image_data
413 | if not os.path.exists(source_path):
414 | return {
415 | "error": f"Image file not found: {source_path}",
416 | "source": source_path
417 | }
418 | else:
419 | # Save base64 data to temp file
420 | source_path = save_base64_to_temp(image_data)
421 |
422 | # Determine full model path
423 | model_path = None
424 | for directory in CONFIG["model_dirs"]:
425 | potential_path = os.path.join(directory, model_name)
426 | if os.path.exists(potential_path):
427 | model_path = potential_path
428 | break
429 |
430 | if model_path is None:
431 | available = list_available_models()
432 | available_str = ", ".join(available) if available else "none"
433 | return {
434 | "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
435 | "source": image_data if is_path else "base64_image"
436 | }
437 |
438 | # Setup output directory if saving results
439 | output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
440 | if save_results and not os.path.exists(output_dir):
441 | os.makedirs(output_dir)
442 |
443 | # Build YOLO CLI command
444 | cmd_args = [
445 | "segment", # Task
446 | "predict", # Mode
447 | f"model={model_path}",
448 | f"source={source_path}",
449 | f"conf={confidence}",
450 | "format=json", # Request JSON output for parsing
451 | ]
452 |
453 | if save_results:
454 | cmd_args.append(f"project={output_dir}")
455 | cmd_args.append("save=True")
456 | else:
457 | cmd_args.append("save=False")
458 |
459 | # Run YOLO CLI command
460 | result = run_yolo_cli(cmd_args)
461 |
462 | # Clean up temp file if we created one
463 | if not is_path:
464 | try:
465 | os.remove(source_path)
466 | except Exception as e:
467 | logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
468 |
469 | # Check for command success
470 | if not result["success"]:
471 | return {
472 | "error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
473 | "command": result.get("command", ""),
474 | "source": image_data if is_path else "base64_image"
475 | }
476 |
477 | # Parse JSON output from stdout
478 | try:
479 | # Try to find JSON in the output
480 | json_start = result["stdout"].find("{")
481 | json_end = result["stdout"].rfind("}")
482 |
483 | if json_start >= 0 and json_end > json_start:
484 | json_str = result["stdout"][json_start:json_end+1]
485 | segmentation_data = json.loads(json_str)
486 | else:
487 | # If no JSON found, create a basic response with info from stderr
488 | return {
489 | "results": [],
490 | "model_used": model_name,
491 | "total_segments": 0,
492 | "source": image_data if is_path else "base64_image",
493 | "command_output": result["stderr"]
494 | }
495 |
496 | # Format results
497 | formatted_results = []
498 |
499 | # Parse segmentation data from YOLO JSON output
500 | if "predictions" in segmentation_data:
501 | segments = []
502 |
503 | for pred in segmentation_data["predictions"]:
504 | # Extract box coordinates
505 | box = pred.get("box", {})
506 | x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
507 |
508 | # Extract class information
509 | confidence = pred.get("confidence", 0)
510 | class_name = pred.get("name", "unknown")
511 | class_id = pred.get("class", -1)
512 |
513 | segment = {
514 | "box": [x1, y1, x2, y2],
515 | "confidence": confidence,
516 | "class_id": class_id,
517 | "class_name": class_name
518 | }
519 |
520 | # Extract mask if available
521 | if "mask" in pred:
522 | segment["mask"] = pred["mask"]
523 |
524 | segments.append(segment)
525 |
526 | # Get image dimensions if available
527 | image_shape = [
528 | segmentation_data.get("width", 0),
529 | segmentation_data.get("height", 0)
530 | ]
531 |
532 | formatted_results.append({
533 | "segments": segments,
534 | "image_shape": image_shape
535 | })
536 |
537 | return {
538 | "results": formatted_results,
539 | "model_used": model_name,
540 | "total_segments": sum(len(r["segments"]) for r in formatted_results),
541 | "source": image_data if is_path else "base64_image",
542 | "save_dir": output_dir if save_results else None
543 | }
544 |
545 | except json.JSONDecodeError as e:
546 | logger.error(f"Failed to parse JSON from YOLO output: {e}")
547 | logger.error(f"Output: {result['stdout']}")
548 |
549 | return {
550 | "error": f"Failed to parse YOLO results: {str(e)}",
551 | "command": result.get("command", ""),
552 | "source": image_data if is_path else "base64_image",
553 | "stdout": result.get("stdout", ""),
554 | "stderr": result.get("stderr", "")
555 | }
556 |
557 | except Exception as e:
558 | logger.error(f"Error in segment_objects: {str(e)}")
559 | return {
560 | "error": f"Failed to segment objects: {str(e)}",
561 | "source": image_data if is_path else "base64_image"
562 | }
563 |
564 | @mcp.tool()
565 | def classify_image(
566 | image_data: str,
567 | model_name: str = "yolov11n-cls.pt",
568 | top_k: int = 5,
569 | save_results: bool = False,
570 | is_path: bool = False
571 | ) -> Dict[str, Any]:
572 | """
573 | Classify an image using YOLO classification model via CLI
574 |
575 | Args:
576 | image_data: Base64 encoded image or file path (if is_path=True)
577 | model_name: YOLO classification model name
578 | top_k: Number of top categories to return
579 | save_results: Whether to save results to disk
580 | is_path: Whether image_data is a file path
581 |
582 | Returns:
583 | Dictionary containing classification results
584 | """
585 | try:
586 | # Determine source path
587 | if is_path:
588 | source_path = image_data
589 | if not os.path.exists(source_path):
590 | return {
591 | "error": f"Image file not found: {source_path}",
592 | "source": source_path
593 | }
594 | else:
595 | # Save base64 data to temp file
596 | source_path = save_base64_to_temp(image_data)
597 |
598 | # Determine full model path
599 | model_path = None
600 | for directory in CONFIG["model_dirs"]:
601 | potential_path = os.path.join(directory, model_name)
602 | if os.path.exists(potential_path):
603 | model_path = potential_path
604 | break
605 |
606 | if model_path is None:
607 | available = list_available_models()
608 | available_str = ", ".join(available) if available else "none"
609 | return {
610 | "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
611 | "source": image_data if is_path else "base64_image"
612 | }
613 |
614 | # Setup output directory if saving results
615 | output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
616 | if save_results and not os.path.exists(output_dir):
617 | os.makedirs(output_dir)
618 |
619 | # Build YOLO CLI command
620 | cmd_args = [
621 | "classify", # Task
622 | "predict", # Mode
623 | f"model={model_path}",
624 | f"source={source_path}",
625 | "format=json", # Request JSON output for parsing
626 | ]
627 |
628 | if save_results:
629 | cmd_args.append(f"project={output_dir}")
630 | cmd_args.append("save=True")
631 | else:
632 | cmd_args.append("save=False")
633 |
634 | # Run YOLO CLI command
635 | result = run_yolo_cli(cmd_args)
636 |
637 | # Clean up temp file if we created one
638 | if not is_path:
639 | try:
640 | os.remove(source_path)
641 | except Exception as e:
642 | logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
643 |
644 | # Check for command success
645 | if not result["success"]:
646 | return {
647 | "error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
648 | "command": result.get("command", ""),
649 | "source": image_data if is_path else "base64_image"
650 | }
651 |
652 | # Parse JSON output from stdout
653 | try:
654 | # Try to find JSON in the output
655 | json_start = result["stdout"].find("{")
656 | json_end = result["stdout"].rfind("}")
657 |
658 | if json_start >= 0 and json_end > json_start:
659 | json_str = result["stdout"][json_start:json_end+1]
660 | classification_data = json.loads(json_str)
661 | else:
662 | # If no JSON found, create a basic response with info from stderr
663 | return {
664 | "results": [],
665 | "model_used": model_name,
666 | "top_k": top_k,
667 | "source": image_data if is_path else "base64_image",
668 | "command_output": result["stderr"]
669 | }
670 |
671 | # Format results
672 | formatted_results = []
673 |
674 | # Parse classification data from YOLO JSON output
675 | if "predictions" in classification_data:
676 | classifications = []
677 | predictions = classification_data["predictions"]
678 |
679 | # Predictions could be an array of classifications
680 | for i, pred in enumerate(predictions[:top_k]):
681 | class_name = pred.get("name", f"class_{i}")
682 | confidence = pred.get("confidence", 0)
683 |
684 | classifications.append({
685 | "class_id": i,
686 | "class_name": class_name,
687 | "probability": confidence
688 | })
689 |
690 | # Get image dimensions if available
691 | image_shape = [
692 | classification_data.get("width", 0),
693 | classification_data.get("height", 0)
694 | ]
695 |
696 | formatted_results.append({
697 | "classifications": classifications,
698 | "image_shape": image_shape
699 | })
700 |
701 | return {
702 | "results": formatted_results,
703 | "model_used": model_name,
704 | "top_k": top_k,
705 | "source": image_data if is_path else "base64_image",
706 | "save_dir": output_dir if save_results else None
707 | }
708 |
709 | except json.JSONDecodeError as e:
710 | logger.error(f"Failed to parse JSON from YOLO output: {e}")
711 | logger.error(f"Output: {result['stdout']}")
712 |
713 | return {
714 | "error": f"Failed to parse YOLO results: {str(e)}",
715 | "command": result.get("command", ""),
716 | "source": image_data if is_path else "base64_image",
717 | "stdout": result.get("stdout", ""),
718 | "stderr": result.get("stderr", "")
719 | }
720 |
721 | except Exception as e:
722 | logger.error(f"Error in classify_image: {str(e)}")
723 | return {
724 | "error": f"Failed to classify image: {str(e)}",
725 | "source": image_data if is_path else "base64_image"
726 | }
727 |
728 | @mcp.tool()
729 | def track_objects(
730 | image_data: str,
731 | model_name: str = "yolov8n.pt",
732 | confidence: float = 0.25,
733 | tracker: str = "bytetrack.yaml",
734 | save_results: bool = False
735 | ) -> Dict[str, Any]:
736 | """
737 | Track objects in an image sequence using YOLO CLI
738 |
739 | Args:
740 | image_data: Base64 encoded image
741 | model_name: YOLO model name
742 | confidence: Detection confidence threshold
743 | tracker: Tracker name to use (e.g., 'bytetrack.yaml', 'botsort.yaml')
744 | save_results: Whether to save results to disk
745 |
746 | Returns:
747 | Dictionary containing tracking results
748 | """
749 | try:
750 | # Save base64 data to temp file
751 | source_path = save_base64_to_temp(image_data)
752 |
753 | # Determine full model path
754 | model_path = None
755 | for directory in CONFIG["model_dirs"]:
756 | potential_path = os.path.join(directory, model_name)
757 | if os.path.exists(potential_path):
758 | model_path = potential_path
759 | break
760 |
761 | if model_path is None:
762 | available = list_available_models()
763 | available_str = ", ".join(available) if available else "none"
764 | return {
765 | "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
766 | }
767 |
768 | # Setup output directory if saving results
769 | output_dir = os.path.join(tempfile.gettempdir(), "yolo_track_results")
770 | if save_results and not os.path.exists(output_dir):
771 | os.makedirs(output_dir)
772 |
773 | # Build YOLO CLI command
774 | cmd_args = [
775 | "track", # Combined task and mode for tracking
776 | f"model={model_path}",
777 | f"source={source_path}",
778 | f"conf={confidence}",
779 | f"tracker={tracker}",
780 | "format=json", # Request JSON output for parsing
781 | ]
782 |
783 | if save_results:
784 | cmd_args.append(f"project={output_dir}")
785 | cmd_args.append("save=True")
786 | else:
787 | cmd_args.append("save=False")
788 |
789 | # Run YOLO CLI command
790 | result = run_yolo_cli(cmd_args)
791 |
792 | # Clean up temp file
793 | try:
794 | os.remove(source_path)
795 | except Exception as e:
796 | logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
797 |
798 | # Check for command success
799 | if not result["success"]:
800 | return {
801 | "error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
802 | "command": result.get("command", ""),
803 | }
804 |
805 | # Parse JSON output from stdout
806 | try:
807 | # Try to find JSON in the output
808 | json_start = result["stdout"].find("{")
809 | json_end = result["stdout"].rfind("}")
810 |
811 | if json_start >= 0 and json_end > json_start:
812 | json_str = result["stdout"][json_start:json_end+1]
813 | tracking_data = json.loads(json_str)
814 | else:
815 | # If no JSON found, create a basic response
816 | return {
817 | "results": [],
818 | "model_used": model_name,
819 | "tracker": tracker,
820 | "total_tracks": 0,
821 | "command_output": result["stderr"]
822 | }
823 |
824 | # Format results
825 | formatted_results = []
826 |
827 | # Parse tracking data from YOLO JSON output
828 | if "predictions" in tracking_data:
829 | tracks = []
830 |
831 | for pred in tracking_data["predictions"]:
832 | # Extract box coordinates
833 | box = pred.get("box", {})
834 | x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
835 |
836 | # Extract class and tracking information
837 | confidence = pred.get("confidence", 0)
838 | class_name = pred.get("name", "unknown")
839 | class_id = pred.get("class", -1)
840 | track_id = pred.get("id", -1)
841 |
842 | track = {
843 | "box": [x1, y1, x2, y2],
844 | "confidence": confidence,
845 | "class_id": class_id,
846 | "class_name": class_name,
847 | "track_id": track_id
848 | }
849 |
850 | tracks.append(track)
851 |
852 | # Get image dimensions if available
853 | image_shape = [
854 | tracking_data.get("width", 0),
855 | tracking_data.get("height", 0)
856 | ]
857 |
858 | formatted_results.append({
859 | "tracks": tracks,
860 | "image_shape": image_shape
861 | })
862 |
863 | return {
864 | "results": formatted_results,
865 | "model_used": model_name,
866 | "tracker": tracker,
867 | "total_tracks": sum(len(r["tracks"]) for r in formatted_results),
868 | "save_dir": output_dir if save_results else None
869 | }
870 |
871 | except json.JSONDecodeError as e:
872 | logger.error(f"Failed to parse JSON from YOLO output: {e}")
873 | logger.error(f"Output: {result['stdout']}")
874 |
875 | return {
876 | "error": f"Failed to parse YOLO results: {str(e)}",
877 | "command": result.get("command", ""),
878 | "stdout": result.get("stdout", ""),
879 | "stderr": result.get("stderr", "")
880 | }
881 |
882 | except Exception as e:
883 | logger.error(f"Error in track_objects: {str(e)}")
884 | return {
885 | "error": f"Failed to track objects: {str(e)}"
886 | }
887 |
888 | @mcp.tool()
889 | def train_model(
890 | dataset_path: str,
891 | model_name: str = "yolov8n.pt",
892 | epochs: int = 100,
893 | imgsz: int = 640,
894 | batch: int = 16,
895 | name: str = "yolo_custom_model",
896 | project: str = "runs/train"
897 | ) -> Dict[str, Any]:
898 | """
899 | Train a YOLO model on a custom dataset using CLI
900 |
901 | Args:
902 | dataset_path: Path to YOLO format dataset
903 | model_name: Base model to start with
904 | epochs: Number of training epochs
905 | imgsz: Image size for training
906 | batch: Batch size
907 | name: Name for the training run
908 | project: Project directory
909 |
910 | Returns:
911 | Dictionary containing training results
912 | """
913 | # Validate dataset path
914 | if not os.path.exists(dataset_path):
915 | return {"error": f"Dataset not found: {dataset_path}"}
916 |
917 | # Determine full model path
918 | model_path = None
919 | for directory in CONFIG["model_dirs"]:
920 | potential_path = os.path.join(directory, model_name)
921 | if os.path.exists(potential_path):
922 | model_path = potential_path
923 | break
924 |
925 | if model_path is None:
926 | available = list_available_models()
927 | available_str = ", ".join(available) if available else "none"
928 | return {
929 | "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
930 | }
931 |
932 | # Create project directory if it doesn't exist
933 | if not os.path.exists(project):
934 | os.makedirs(project)
935 |
936 | # Determine task type based on model name
937 | task = "detect" # Default task
938 | if "seg" in model_name:
939 | task = "segment"
940 | elif "pose" in model_name:
941 | task = "pose"
942 | elif "cls" in model_name:
943 | task = "classify"
944 | elif "obb" in model_name:
945 | task = "obb"
946 |
947 | # Build YOLO CLI command
948 | cmd_args = [
949 | task, # Task
950 | "train", # Mode
951 | f"model={model_path}",
952 | f"data={dataset_path}",
953 | f"epochs={epochs}",
954 | f"imgsz={imgsz}",
955 | f"batch={batch}",
956 | f"name={name}",
957 | f"project={project}"
958 | ]
959 |
960 | # Run YOLO CLI command - with longer timeout
961 | logger.info(f"Starting model training with {epochs} epochs - this may take a while...")
962 | result = run_yolo_cli(cmd_args, timeout=epochs * 300) # 5 minutes per epoch
963 |
964 | # Check for command success
965 | if not result["success"]:
966 | return {
967 | "error": f"Training failed: {result.get('error', 'Unknown error')}",
968 | "command": result.get("command", ""),
969 | "stderr": result.get("stderr", "")
970 | }
971 |
972 | # Determine path to best model weights
973 | best_model_path = os.path.join(project, name, "weights", "best.pt")
974 |
975 | # Determine metrics from stdout if possible
976 | metrics = {}
977 | try:
978 | # Look for metrics in output
979 | stdout = result.get("stdout", "")
980 |
981 | # Extract metrics from training output
982 | import re
983 | precision_match = re.search(r"Precision: ([\d\.]+)", stdout)
984 | recall_match = re.search(r"Recall: ([\d\.]+)", stdout)
985 | map50_match = re.search(r"mAP50: ([\d\.]+)", stdout)
986 | map_match = re.search(r"mAP50-95: ([\d\.]+)", stdout)
987 |
988 | if precision_match:
989 | metrics["precision"] = float(precision_match.group(1))
990 | if recall_match:
991 | metrics["recall"] = float(recall_match.group(1))
992 | if map50_match:
993 | metrics["mAP50"] = float(map50_match.group(1))
994 | if map_match:
995 | metrics["mAP50-95"] = float(map_match.group(1))
996 | except Exception as e:
997 | logger.warning(f"Failed to parse metrics from training output: {str(e)}")
998 |
999 | return {
1000 | "status": "success",
1001 | "model_path": best_model_path,
1002 | "epochs_completed": epochs,
1003 | "final_metrics": metrics,
1004 | "training_log_sample": result.get("stdout", "")[:1000] + "..." if len(result.get("stdout", "")) > 1000 else result.get("stdout", "")
1005 | }
1006 |
1007 | @mcp.tool()
1008 | def validate_model(
1009 | model_path: str,
1010 | data_path: str,
1011 | imgsz: int = 640,
1012 | batch: int = 16
1013 | ) -> Dict[str, Any]:
1014 | """
1015 | Validate a YOLO model on a dataset using CLI
1016 |
1017 | Args:
1018 | model_path: Path to YOLO model (.pt file)
1019 | data_path: Path to YOLO format validation dataset
1020 | imgsz: Image size for validation
1021 | batch: Batch size
1022 |
1023 | Returns:
1024 | Dictionary containing validation results
1025 | """
1026 | # Validate model path
1027 | if not os.path.exists(model_path):
1028 | return {"error": f"Model file not found: {model_path}"}
1029 |
1030 | # Validate dataset path
1031 | if not os.path.exists(data_path):
1032 | return {"error": f"Dataset not found: {data_path}"}
1033 |
1034 | # Determine task type based on model name
1035 | model_name = os.path.basename(model_path)
1036 | task = "detect" # Default task
1037 | if "seg" in model_name:
1038 | task = "segment"
1039 | elif "pose" in model_name:
1040 | task = "pose"
1041 | elif "cls" in model_name:
1042 | task = "classify"
1043 | elif "obb" in model_name:
1044 | task = "obb"
1045 |
1046 | # Build YOLO CLI command
1047 | cmd_args = [
1048 | task, # Task
1049 | "val", # Mode
1050 | f"model={model_path}",
1051 | f"data={data_path}",
1052 | f"imgsz={imgsz}",
1053 | f"batch={batch}"
1054 | ]
1055 |
1056 | # Run YOLO CLI command
1057 | result = run_yolo_cli(cmd_args, timeout=300) # 5 minute timeout
1058 |
1059 | # Check for command success
1060 | if not result["success"]:
1061 | return {
1062 | "error": f"Validation failed: {result.get('error', 'Unknown error')}",
1063 | "command": result.get("command", ""),
1064 | "stderr": result.get("stderr", "")
1065 | }
1066 |
1067 | # Extract metrics from validation output
1068 | metrics = {}
1069 | try:
1070 | stdout = result.get("stdout", "")
1071 |
1072 | import re
1073 | precision_match = re.search(r"Precision: ([\d\.]+)", stdout)
1074 | recall_match = re.search(r"Recall: ([\d\.]+)", stdout)
1075 | map50_match = re.search(r"mAP50: ([\d\.]+)", stdout)
1076 | map_match = re.search(r"mAP50-95: ([\d\.]+)", stdout)
1077 |
1078 | if precision_match:
1079 | metrics["precision"] = float(precision_match.group(1))
1080 | if recall_match:
1081 | metrics["recall"] = float(recall_match.group(1))
1082 | if map50_match:
1083 | metrics["mAP50"] = float(map50_match.group(1))
1084 | if map_match:
1085 | metrics["mAP50-95"] = float(map_match.group(1))
1086 | except Exception as e:
1087 | logger.warning(f"Failed to parse metrics from validation output: {str(e)}")
1088 |
1089 | return {
1090 | "status": "success",
1091 | "metrics": metrics,
1092 | "validation_output": result.get("stdout", "")[:1000] + "..." if len(result.get("stdout", "")) > 1000 else result.get("stdout", "")
1093 | }
1094 |
1095 | @mcp.tool()
1096 | def export_model(
1097 | model_path: str,
1098 | format: str = "onnx",
1099 | imgsz: int = 640
1100 | ) -> Dict[str, Any]:
1101 | """
1102 | Export a YOLO model to different formats using CLI
1103 |
1104 | Args:
1105 | model_path: Path to YOLO model (.pt file)
1106 | format: Export format (onnx, torchscript, openvino, etc.)
1107 | imgsz: Image size for export
1108 |
1109 | Returns:
1110 | Dictionary containing export results
1111 | """
1112 | # Validate model path
1113 | if not os.path.exists(model_path):
1114 | return {"error": f"Model file not found: {model_path}"}
1115 |
1116 | # Valid export formats
1117 | valid_formats = [
1118 | "torchscript", "onnx", "openvino", "engine", "coreml", "saved_model",
1119 | "pb", "tflite", "edgetpu", "tfjs", "paddle"
1120 | ]
1121 |
1122 | if format not in valid_formats:
1123 | return {"error": f"Invalid export format: {format}. Valid formats include: {', '.join(valid_formats)}"}
1124 |
1125 | # Build YOLO CLI command
1126 | cmd_args = [
1127 | "export", # Combined task and mode for export
1128 | f"model={model_path}",
1129 | f"format={format}",
1130 | f"imgsz={imgsz}"
1131 | ]
1132 |
1133 | # Run YOLO CLI command
1134 | result = run_yolo_cli(cmd_args, timeout=300) # 5 minute timeout
1135 |
1136 | # Check for command success
1137 | if not result["success"]:
1138 | return {
1139 | "error": f"Export failed: {result.get('error', 'Unknown error')}",
1140 | "command": result.get("command", ""),
1141 | "stderr": result.get("stderr", "")
1142 | }
1143 |
1144 | # Try to determine export path
1145 | export_path = None
1146 | try:
1147 | # Model path without extension
1148 | base_path = os.path.splitext(model_path)[0]
1149 |
1150 | # Expected export paths based on format
1151 | format_extensions = {
1152 | "torchscript": ".torchscript",
1153 | "onnx": ".onnx",
1154 | "openvino": "_openvino_model",
1155 | "engine": ".engine",
1156 | "coreml": ".mlmodel",
1157 | "saved_model": "_saved_model",
1158 | "pb": ".pb",
1159 | "tflite": ".tflite",
1160 | "edgetpu": "_edgetpu.tflite",
1161 | "tfjs": "_web_model",
1162 | "paddle": "_paddle_model"
1163 | }
1164 |
1165 | expected_ext = format_extensions.get(format, "")
1166 | expected_path = base_path + expected_ext
1167 |
1168 | # Check if the exported file exists
1169 | if os.path.exists(expected_path) or os.path.isdir(expected_path):
1170 | export_path = expected_path
1171 | except Exception as e:
1172 | logger.warning(f"Failed to determine export path: {str(e)}")
1173 |
1174 | return {
1175 | "status": "success",
1176 | "export_path": export_path,
1177 | "format": format,
1178 | "export_output": result.get("stdout", "")[:1000] + "..." if len(result.get("stdout", "")) > 1000 else result.get("stdout", "")
1179 | }
1180 |
1181 | @mcp.tool()
1182 | def list_available_models() -> List[str]:
1183 | """List available YOLO models that actually exist on disk in any configured directory"""
1184 | # Common YOLO model patterns
1185 | model_patterns = [
1186 | "yolov11*.pt",
1187 | "yolov8*.pt"
1188 | ]
1189 |
1190 | # Find all existing models in all configured directories
1191 | available_models = set()
1192 | for directory in CONFIG["model_dirs"]:
1193 | if not os.path.exists(directory):
1194 | continue
1195 |
1196 | # Check for model files directly
1197 | for filename in os.listdir(directory):
1198 | if filename.endswith(".pt") and any(
1199 | fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
1200 | ):
1201 | available_models.add(filename)
1202 |
1203 | # Convert to sorted list
1204 | result = sorted(list(available_models))
1205 |
1206 | if not result:
1207 | logger.warning("No model files found in configured directories.")
1208 | return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
1209 |
1210 | return result
1211 |
1212 | # Camera detection functions using CLI instead of Python API
1213 | def camera_detection_thread(model_name, confidence, fps_limit=30, camera_id=0):
1214 | """Background thread for camera detection using YOLO CLI"""
1215 | global camera_running, detection_results, camera_last_access_time, camera_startup_status, camera_last_error
1216 |
1217 | try:
1218 | # Create a unique directory for camera results
1219 | output_dir = os.path.join(tempfile.gettempdir(), f"yolo_camera_{int(time.time())}")
1220 | os.makedirs(output_dir, exist_ok=True)
1221 |
1222 | # Determine full model path
1223 | model_path = None
1224 | for directory in CONFIG["model_dirs"]:
1225 | potential_path = os.path.join(directory, model_name)
1226 | if os.path.exists(potential_path):
1227 | model_path = potential_path
1228 | break
1229 |
1230 | if model_path is None:
1231 | error_msg = f"Model {model_name} not found in any configured directories"
1232 | logger.error(error_msg)
1233 | camera_running = False
1234 | camera_startup_status = {
1235 | "success": False,
1236 | "error": error_msg,
1237 | "timestamp": time.time()
1238 | }
1239 | detection_results.append({
1240 | "timestamp": time.time(),
1241 | "error": f"Failed to load model: Model not found",
1242 | "camera_status": "error",
1243 | "detections": []
1244 | })
1245 | return
1246 |
1247 | # Log camera start
1248 | logger.info(f"Starting camera detection with model {model_name}, camera ID {camera_id}")
1249 | detection_results.append({
1250 | "timestamp": time.time(),
1251 | "system_info": {
1252 | "os": platform.system() if 'platform' in globals() else "Unknown",
1253 | "camera_id": camera_id
1254 | },
1255 | "camera_status": "starting",
1256 | "detections": []
1257 | })
1258 |
1259 | # Determine task type based on model name
1260 | task = "detect" # Default task
1261 | if "seg" in model_name:
1262 | task = "segment"
1263 | elif "pose" in model_name:
1264 | task = "pose"
1265 | elif "cls" in model_name:
1266 | task = "classify"
1267 |
1268 | # Build YOLO CLI command
1269 | base_cmd_args = [
1270 | task, # Task
1271 | "predict", # Mode
1272 | f"model={model_path}",
1273 | f"source={camera_id}", # Camera source ID
1274 | f"conf={confidence}",
1275 | "format=json",
1276 | "save=False", # Don't save frames by default
1277 | "show=False" # Don't show GUI window
1278 | ]
1279 |
1280 | # First verify YOLO command is available
1281 | logger.info("Verifying YOLO CLI availability before starting camera...")
1282 | check_cmd = ["yolo", "--version"]
1283 | try:
1284 | check_result = subprocess.run(
1285 | check_cmd,
1286 | capture_output=True,
1287 | text=True,
1288 | check=False,
1289 | timeout=10
1290 | )
1291 |
1292 | if check_result.returncode != 0:
1293 | error_msg = f"YOLO CLI check failed with code {check_result.returncode}: {check_result.stderr}"
1294 | logger.error(error_msg)
1295 | camera_running = False
1296 | camera_startup_status = {
1297 | "success": False,
1298 | "error": error_msg,
1299 | "timestamp": time.time()
1300 | }
1301 | detection_results.append({
1302 | "timestamp": time.time(),
1303 | "error": error_msg,
1304 | "camera_status": "error",
1305 | "detections": []
1306 | })
1307 | return
1308 |
1309 | logger.info(f"YOLO CLI is available: {check_result.stdout.strip()}")
1310 | except Exception as e:
1311 | error_msg = f"Error checking YOLO CLI: {str(e)}"
1312 | logger.error(error_msg)
1313 | camera_running = False
1314 | camera_startup_status = {
1315 | "success": False,
1316 | "error": error_msg,
1317 | "timestamp": time.time()
1318 | }
1319 | detection_results.append({
1320 | "timestamp": time.time(),
1321 | "error": error_msg,
1322 | "camera_status": "error",
1323 | "detections": []
1324 | })
1325 | return
1326 |
1327 | # Set up subprocess for ongoing camera capture
1328 | process = None
1329 | frame_count = 0
1330 | error_count = 0
1331 | start_time = time.time()
1332 |
1333 | # Start YOLO CLI process
1334 | cmd_str = "yolo " + " ".join(base_cmd_args)
1335 | logger.info(f"Starting YOLO CLI process: {cmd_str}")
1336 |
1337 | try:
1338 | process = subprocess.Popen(
1339 | ["yolo"] + base_cmd_args,
1340 | stdin=subprocess.PIPE,
1341 | stdout=subprocess.PIPE,
1342 | stderr=subprocess.PIPE,
1343 | text=True,
1344 | bufsize=1, # Line buffered
1345 | )
1346 |
1347 | # Wait a moment to check if the process immediately fails
1348 | time.sleep(1)
1349 | if process.poll() is not None:
1350 | error_msg = f"YOLO process failed to start (exit code {process.returncode})"
1351 | stderr_output = process.stderr.read()
1352 | logger.error(f"{error_msg} - STDERR: {stderr_output}")
1353 |
1354 | camera_running = False
1355 | camera_startup_status = {
1356 | "success": False,
1357 | "error": error_msg,
1358 | "stderr": stderr_output,
1359 | "timestamp": time.time()
1360 | }
1361 | detection_results.append({
1362 | "timestamp": time.time(),
1363 | "error": error_msg,
1364 | "stderr": stderr_output,
1365 | "camera_status": "error",
1366 | "detections": []
1367 | })
1368 | return
1369 |
1370 | # Process started successfully
1371 | camera_startup_status = {
1372 | "success": True,
1373 | "timestamp": time.time()
1374 | }
1375 |
1376 | # Handle camera stream
1377 | while camera_running:
1378 | # Read output line from process
1379 | stdout_line = process.stdout.readline().strip()
1380 |
1381 | if not stdout_line:
1382 | # Check if process is still running
1383 | if process.poll() is not None:
1384 | error_msg = f"YOLO process ended unexpectedly with code {process.returncode}"
1385 | stderr_output = process.stderr.read()
1386 | logger.error(f"{error_msg} - STDERR: {stderr_output}")
1387 |
1388 | camera_running = False
1389 | camera_last_error = {
1390 | "error": error_msg,
1391 | "stderr": stderr_output,
1392 | "timestamp": time.time()
1393 | }
1394 | detection_results.append({
1395 | "timestamp": time.time(),
1396 | "error": error_msg,
1397 | "camera_status": "error",
1398 | "stderr": stderr_output,
1399 | "detections": []
1400 | })
1401 | break
1402 |
1403 | time.sleep(0.1) # Short sleep to avoid CPU spin
1404 | continue
1405 |
1406 | # Try to parse JSON output from YOLO
1407 | try:
1408 | # Find JSON in the output line
1409 | json_start = stdout_line.find("{")
1410 | if json_start >= 0:
1411 | json_str = stdout_line[json_start:]
1412 | detection_data = json.loads(json_str)
1413 |
1414 | frame_count += 1
1415 |
1416 | # Process detection data
1417 | if "predictions" in detection_data:
1418 | detections = []
1419 |
1420 | for pred in detection_data["predictions"]:
1421 | # Extract box coordinates
1422 | box = pred.get("box", {})
1423 | x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
1424 |
1425 | # Extract class information
1426 | confidence = pred.get("confidence", 0)
1427 | class_name = pred.get("name", "unknown")
1428 | class_id = pred.get("class", -1)
1429 |
1430 | detections.append({
1431 | "box": [x1, y1, x2, y2],
1432 | "confidence": confidence,
1433 | "class_id": class_id,
1434 | "class_name": class_name
1435 | })
1436 |
1437 | # Update detection results (keep only the last 10)
1438 | if len(detection_results) >= 10:
1439 | detection_results.pop(0)
1440 |
1441 | # Get image dimensions if available
1442 | image_shape = [
1443 | detection_data.get("width", 0),
1444 | detection_data.get("height", 0)
1445 | ]
1446 |
1447 | detection_results.append({
1448 | "timestamp": time.time(),
1449 | "frame_count": frame_count,
1450 | "detections": detections,
1451 | "camera_status": "running",
1452 | "image_shape": image_shape
1453 | })
1454 |
1455 | # Update last access time when processing frames
1456 | camera_last_access_time = time.time()
1457 |
1458 | # Log occasional status
1459 | if frame_count % 30 == 0:
1460 | fps = frame_count / (time.time() - start_time)
1461 | logger.info(f"Camera running: processed {frame_count} frames ({fps:.1f} FPS)")
1462 | detection_count = sum(len(r.get("detections", [])) for r in detection_results if "detections" in r)
1463 | logger.info(f"Total detections in current buffer: {detection_count}")
1464 |
1465 | except json.JSONDecodeError:
1466 | # Not all lines will be valid JSON, that's normal
1467 | pass
1468 | except Exception as e:
1469 | error_msg = f"Error processing camera output: {str(e)}"
1470 | logger.warning(error_msg)
1471 | error_count += 1
1472 |
1473 | if error_count > 10:
1474 | logger.error("Too many processing errors, stopping camera")
1475 | camera_running = False
1476 | camera_last_error = {
1477 | "error": "Too many processing errors",
1478 | "timestamp": time.time()
1479 | }
1480 | break
1481 |
1482 | except Exception as e:
1483 | error_msg = f"Error in camera process management: {str(e)}"
1484 | logger.error(error_msg)
1485 | camera_running = False
1486 | camera_startup_status = {
1487 | "success": False,
1488 | "error": error_msg,
1489 | "timestamp": time.time()
1490 | }
1491 | detection_results.append({
1492 | "timestamp": time.time(),
1493 | "error": error_msg,
1494 | "camera_status": "error",
1495 | "detections": []
1496 | })
1497 | return
1498 |
1499 | except Exception as e:
1500 | error_msg = f"Error in camera thread: {str(e)}"
1501 | logger.error(error_msg)
1502 | camera_running = False
1503 | camera_startup_status = {
1504 | "success": False,
1505 | "error": error_msg,
1506 | "timestamp": time.time()
1507 | }
1508 | detection_results.append({
1509 | "timestamp": time.time(),
1510 | "error": error_msg,
1511 | "camera_status": "error",
1512 | "detections": []
1513 | })
1514 |
1515 | finally:
1516 | # Clean up
1517 | logger.info("Shutting down camera...")
1518 | camera_running = False
1519 |
1520 | if process is not None and process.poll() is None:
1521 | try:
1522 | # Terminate process
1523 | process.terminate()
1524 | process.wait(timeout=5)
1525 | except subprocess.TimeoutExpired:
1526 | process.kill() # Force kill if terminate doesn't work
1527 | except Exception as e:
1528 | logger.error(f"Error terminating YOLO process: {str(e)}")
1529 |
1530 | logger.info("Camera detection stopped")
1531 |
1532 |
1533 | @mcp.tool()
1534 | def start_camera_detection(
1535 | model_name: str = "yolov8n.pt",
1536 | confidence: float = 0.25,
1537 | camera_id: int = 0
1538 | ) -> Dict[str, Any]:
1539 | """
1540 | Start realtime object detection using the computer's camera via YOLO CLI
1541 |
1542 | Args:
1543 | model_name: YOLO model name to use
1544 | confidence: Detection confidence threshold
1545 | camera_id: Camera device ID (0 is usually the default camera)
1546 |
1547 | Returns:
1548 | Status of camera detection
1549 | """
1550 | global camera_thread, camera_running, detection_results, camera_last_access_time, camera_startup_status, camera_last_error
1551 |
1552 | # Reset status variables
1553 | camera_startup_status = None
1554 | camera_last_error = None
1555 |
1556 | # Check if already running
1557 | if camera_running:
1558 | # Update last access time
1559 | camera_last_access_time = time.time()
1560 | return {"status": "success", "message": "Camera detection is already running"}
1561 |
1562 | # Clear previous results
1563 | detection_results = []
1564 |
1565 | # First check if YOLO CLI is available
1566 | try:
1567 | version_check = run_yolo_cli(["--version"], timeout=10)
1568 | if not version_check["success"]:
1569 | return {
1570 | "status": "error",
1571 | "message": "YOLO CLI not available or not properly installed",
1572 | "details": version_check.get("error", "Unknown error"),
1573 | "solution": "Please make sure the 'yolo' command is in your PATH"
1574 | }
1575 | except Exception as e:
1576 | error_msg = f"Error checking YOLO CLI: {str(e)}"
1577 | logger.error(error_msg)
1578 | return {
1579 | "status": "error",
1580 | "message": error_msg,
1581 | "solution": "Please make sure the 'yolo' command is in your PATH"
1582 | }
1583 |
1584 | # Start detection thread
1585 | camera_running = True
1586 | camera_last_access_time = time.time() # Update access time
1587 | camera_thread = threading.Thread(
1588 | target=camera_detection_thread,
1589 | args=(model_name, confidence, 30, camera_id),
1590 | daemon=True
1591 | )
1592 | camera_thread.start()
1593 |
1594 | # Give the thread a moment to initialize and potentially fail
1595 | time.sleep(2)
1596 |
1597 | # Check if the thread has reported any startup issues
1598 | if camera_startup_status and not camera_startup_status.get("success", False):
1599 | # Camera thread encountered an error during startup
1600 | return {
1601 | "status": "error",
1602 | "message": "Camera detection failed to start",
1603 | "details": camera_startup_status,
1604 | "solution": "Check logs for detailed error information"
1605 | }
1606 |
1607 | # Thread is running, camera should be starting
1608 | return {
1609 | "status": "success",
1610 | "message": f"Started camera detection using model {model_name}",
1611 | "model": model_name,
1612 | "confidence": confidence,
1613 | "camera_id": camera_id,
1614 | "auto_shutdown": f"Camera will auto-shutdown after {CAMERA_INACTIVITY_TIMEOUT} seconds of inactivity",
1615 | "note": "If camera doesn't work, try different camera_id values (0, 1, or 2)"
1616 | }
1617 |
1618 |
1619 | @mcp.tool()
1620 | def stop_camera_detection() -> Dict[str, Any]:
1621 | """
1622 | Stop realtime camera detection
1623 |
1624 | Returns:
1625 | Status message
1626 | """
1627 | global camera_running
1628 |
1629 | if not camera_running:
1630 | return {"status": "error", "message": "Camera detection is not running"}
1631 |
1632 | logger.info("Stopping camera detection by user request")
1633 | camera_running = False
1634 |
1635 | # Wait for thread to terminate
1636 | if camera_thread and camera_thread.is_alive():
1637 | camera_thread.join(timeout=2.0)
1638 |
1639 | return {
1640 | "status": "success",
1641 | "message": "Stopped camera detection"
1642 | }
1643 |
1644 | @mcp.tool()
1645 | def get_camera_detections() -> Dict[str, Any]:
1646 | """
1647 | Get the latest detections from the camera
1648 |
1649 | Returns:
1650 | Dictionary with recent detections
1651 | """
1652 | global detection_results, camera_thread, camera_last_access_time, camera_startup_status, camera_last_error
1653 |
1654 | # Update the last access time whenever this function is called
1655 | if camera_running:
1656 | camera_last_access_time = time.time()
1657 |
1658 | # Check if thread is alive
1659 | thread_alive = camera_thread is not None and camera_thread.is_alive()
1660 |
1661 | # If camera_running is False, check if we have startup status information
1662 | if not camera_running and camera_startup_status and not camera_startup_status.get("success", False):
1663 | return {
1664 | "status": "error",
1665 | "message": "Camera detection failed to start",
1666 | "is_running": False,
1667 | "camera_status": "error",
1668 | "startup_error": camera_startup_status,
1669 | "solution": "Check logs for detailed error information"
1670 | }
1671 |
1672 | # If camera_running is True but thread is dead, there's an issue
1673 | if camera_running and not thread_alive:
1674 | return {
1675 | "status": "error",
1676 | "message": "Camera thread has stopped unexpectedly",
1677 | "is_running": False,
1678 | "camera_status": "error",
1679 | "thread_alive": thread_alive,
1680 | "last_error": camera_last_error,
1681 | "detections": detection_results,
1682 | "count": len(detection_results),
1683 | "solution": "Please try restart the camera with a different camera_id"
1684 | }
1685 |
1686 | if not camera_running:
1687 | return {
1688 | "status": "error",
1689 | "message": "Camera detection is not running",
1690 | "is_running": False,
1691 | "camera_status": "stopped"
1692 | }
1693 |
1694 | # Check for errors in detection results
1695 | errors = [result.get("error") for result in detection_results if "error" in result]
1696 | recent_errors = errors[-5:] if errors else []
1697 |
1698 | # Count actual detections
1699 | detection_count = sum(len(result.get("detections", [])) for result in detection_results if "detections" in result)
1700 |
1701 | return {
1702 | "status": "success",
1703 | "is_running": camera_running,
1704 | "thread_alive": thread_alive,
1705 | "detections": detection_results,
1706 | "count": len(detection_results),
1707 | "total_detections": detection_count,
1708 | "recent_errors": recent_errors if recent_errors else None,
1709 | "camera_status": "error" if recent_errors else "running",
1710 | "inactivity_timeout": {
1711 | "seconds_remaining": int(CAMERA_INACTIVITY_TIMEOUT - (time.time() - camera_last_access_time)),
1712 | "last_access": camera_last_access_time
1713 | }
1714 | }
1715 |
1716 |
1717 | @mcp.tool()
1718 | def comprehensive_image_analysis(
1719 | image_path: str,
1720 | confidence: float = 0.25,
1721 | save_results: bool = False
1722 | ) -> Dict[str, Any]:
1723 | """
1724 | Perform comprehensive analysis on an image by combining multiple CLI model results
1725 |
1726 | Args:
1727 | image_path: Path to the image file
1728 | confidence: Detection confidence threshold
1729 | save_results: Whether to save results to disk
1730 |
1731 | Returns:
1732 | Dictionary containing comprehensive analysis results
1733 | """
1734 | try:
1735 | if not os.path.exists(image_path):
1736 | return {"error": f"Image file not found: {image_path}"}
1737 |
1738 | analysis_results = {}
1739 |
1740 | # 1. Object detection
1741 | logger.info("Running object detection for comprehensive analysis")
1742 | object_result = detect_objects(
1743 | image_data=image_path,
1744 | model_name="yolov11n.pt",
1745 | confidence=confidence,
1746 | save_results=save_results,
1747 | is_path=True
1748 | )
1749 |
1750 | # Process object detection results
1751 | detected_objects = []
1752 | if "results" in object_result and object_result["results"]:
1753 | for result in object_result["results"]:
1754 | for obj in result.get("detections", []):
1755 | detected_objects.append({
1756 | "class_name": obj.get("class_name", "unknown"),
1757 | "confidence": obj.get("confidence", 0)
1758 | })
1759 | analysis_results["objects"] = detected_objects
1760 |
1761 | # 2. Scene classification
1762 | try:
1763 | logger.info("Running classification for comprehensive analysis")
1764 | cls_result = classify_image(
1765 | image_data=image_path,
1766 | model_name="yolov8n-cls.pt",
1767 | top_k=3,
1768 | save_results=False,
1769 | is_path=True
1770 | )
1771 |
1772 | scene_classifications = []
1773 | if "results" in cls_result and cls_result["results"]:
1774 | for result in cls_result["results"]:
1775 | for cls in result.get("classifications", []):
1776 | scene_classifications.append({
1777 | "class_name": cls.get("class_name", "unknown"),
1778 | "probability": cls.get("probability", 0)
1779 | })
1780 | analysis_results["scene"] = scene_classifications
1781 | except Exception as e:
1782 | logger.error(f"Error during scene classification: {str(e)}")
1783 | analysis_results["scene_error"] = str(e)
1784 |
1785 | # 3. Human pose detection (if pose model is available)
1786 | try:
1787 | # Check if pose model exists
1788 | pose_model_exists = False
1789 | for directory in CONFIG["model_dirs"]:
1790 | if os.path.exists(os.path.join(directory, "yolov8n-pose.pt")):
1791 | pose_model_exists = True
1792 | break
1793 |
1794 | if pose_model_exists:
1795 | logger.info("Running pose detection for comprehensive analysis")
1796 | # Build YOLO CLI command for pose detection
1797 | cmd_args = [
1798 | "pose", # Task
1799 | "predict", # Mode
1800 | f"model=yolov8n-pose.pt",
1801 | f"source={image_path}",
1802 | f"conf={confidence}",
1803 | "format=json",
1804 | ]
1805 |
1806 | result = run_yolo_cli(cmd_args)
1807 |
1808 | if result["success"]:
1809 | # Parse JSON output
1810 | json_start = result["stdout"].find("{")
1811 | json_end = result["stdout"].rfind("}")
1812 |
1813 | if json_start >= 0 and json_end > json_start:
1814 | json_str = result["stdout"][json_start:json_end+1]
1815 | pose_data = json.loads(json_str)
1816 |
1817 | detected_poses = []
1818 | if "predictions" in pose_data:
1819 | for pred in pose_data["predictions"]:
1820 | confidence = pred.get("confidence", 0)
1821 | keypoints = pred.get("keypoints", [])
1822 |
1823 | detected_poses.append({
1824 | "person_confidence": confidence,
1825 | "has_keypoints": len(keypoints) if keypoints else 0
1826 | })
1827 |
1828 | analysis_results["poses"] = detected_poses
1829 | else:
1830 | analysis_results["pose_error"] = "Pose model not available"
1831 |
1832 | except Exception as e:
1833 | logger.error(f"Error during pose detection: {str(e)}")
1834 | analysis_results["pose_error"] = str(e)
1835 |
1836 | # 4. Comprehensive task description
1837 | tasks = []
1838 |
1839 | # Detect main objects
1840 | main_objects = [obj["class_name"] for obj in detected_objects if obj["confidence"] > 0.5]
1841 | if "person" in main_objects:
1842 | tasks.append("Person Detection")
1843 |
1844 | # Check for weapon objects
1845 | weapon_objects = ["sword", "knife", "katana", "gun", "pistol", "rifle"]
1846 | weapons = [obj for obj in main_objects if any(weapon in obj.lower() for weapon in weapon_objects)]
1847 | if weapons:
1848 | tasks.append(f"Weapon Detection ({', '.join(weapons)})")
1849 |
1850 | # Count people
1851 | person_count = main_objects.count("person")
1852 | if person_count > 0:
1853 | tasks.append(f"Person Count ({person_count} people)")
1854 |
1855 | # Pose analysis
1856 | if "poses" in analysis_results and analysis_results["poses"]:
1857 | tasks.append("Human Pose Analysis")
1858 |
1859 | # Scene classification
1860 | if "scene" in analysis_results and analysis_results["scene"]:
1861 | scene_types = [scene["class_name"] for scene in analysis_results["scene"][:2]]
1862 | tasks.append(f"Scene Classification ({', '.join(scene_types)})")
1863 |
1864 | analysis_results["identified_tasks"] = tasks
1865 |
1866 | # Return comprehensive results
1867 | return {
1868 | "status": "success",
1869 | "image_path": image_path,
1870 | "analysis": analysis_results,
1871 | "summary": "Tasks identified in the image: " + ", ".join(tasks) if tasks else "No clear tasks identified"
1872 | }
1873 | except Exception as e:
1874 | return {
1875 | "status": "error",
1876 | "image_path": image_path,
1877 | "error": f"Comprehensive analysis failed: {str(e)}"
1878 | }
1879 |
1880 | @mcp.tool()
1881 | def analyze_image_from_path(
1882 | image_path: str,
1883 | model_name: str = "yolov8n.pt",
1884 | confidence: float = 0.25,
1885 | save_results: bool = False
1886 | ) -> Dict[str, Any]:
1887 | """
1888 | Analyze image from file path using YOLO CLI
1889 |
1890 | Args:
1891 | image_path: Path to the image file
1892 | model_name: YOLO model name
1893 | confidence: Detection confidence threshold
1894 | save_results: Whether to save results to disk
1895 |
1896 | Returns:
1897 | Dictionary containing detection results
1898 | """
1899 | try:
1900 | # Call detect_objects function with is_path=True
1901 | return detect_objects(
1902 | image_data=image_path,
1903 | model_name=model_name,
1904 | confidence=confidence,
1905 | save_results=save_results,
1906 | is_path=True
1907 | )
1908 | except Exception as e:
1909 | return {
1910 | "error": f"Failed to analyze image: {str(e)}",
1911 | "image_path": image_path
1912 | }
1913 |
1914 | @mcp.tool()
1915 | def test_connection() -> Dict[str, Any]:
1916 | """
1917 | Test if YOLO CLI service is running properly
1918 |
1919 | Returns:
1920 | Status information and available tools
1921 | """
1922 | # Test YOLO CLI availability
1923 | try:
1924 | version_result = run_yolo_cli(["--version"], timeout=10)
1925 | yolo_version = version_result.get("stdout", "Unknown") if version_result.get("success") else "Not available"
1926 |
1927 | # Clean up version string
1928 | if "ultralytics" in yolo_version.lower():
1929 | yolo_version = yolo_version.strip()
1930 | else:
1931 | yolo_version = "YOLO CLI not found or not responding correctly"
1932 | except Exception as e:
1933 | yolo_version = f"Error checking YOLO CLI: {str(e)}"
1934 |
1935 | return {
1936 | "status": "YOLO CLI service is running normally",
1937 | "yolo_version": yolo_version,
1938 | "available_models": list_available_models(),
1939 | "available_tools": [
1940 | "list_available_models", "detect_objects", "segment_objects",
1941 | "classify_image", "track_objects", "train_model", "validate_model",
1942 | "export_model", "start_camera_detection", "stop_camera_detection",
1943 | "get_camera_detections", "test_connection",
1944 | # Additional tools
1945 | "analyze_image_from_path",
1946 | "comprehensive_image_analysis"
1947 | ],
1948 | "features": [
1949 | "All detection functions use YOLO CLI rather than Python API",
1950 | "Support for loading images directly from file paths",
1951 | "Support for comprehensive image analysis with task identification",
1952 | "Support for camera detection using YOLO CLI"
1953 | ]
1954 | }
1955 |
1956 | def cleanup_resources():
1957 | """Clean up resources when the server is shutting down"""
1958 | global camera_running
1959 |
1960 | logger.info("Cleaning up resources...")
1961 |
1962 | # Stop camera if it's running
1963 | if camera_running:
1964 | logger.info("Shutting down camera during server exit")
1965 | camera_running = False
1966 |
1967 | # Give the camera thread a moment to clean up
1968 | if camera_thread and camera_thread.is_alive():
1969 | camera_thread.join(timeout=2.0)
1970 |
1971 | logger.info("Cleanup complete")
1972 |
1973 | def signal_handler(sig, frame):
1974 | """Handle termination signals"""
1975 | logger.info(f"Received signal {sig}, shutting down...")
1976 | cleanup_resources()
1977 | sys.exit(0)
1978 |
1979 | def start_watchdog():
1980 | """Start the camera watchdog thread"""
1981 | watchdog = threading.Thread(
1982 | target=camera_watchdog_thread,
1983 | daemon=True
1984 | )
1985 | watchdog.start()
1986 | return watchdog
1987 |
1988 | # Register cleanup functions
1989 | atexit.register(cleanup_resources)
1990 | signal.signal(signal.SIGINT, signal_handler)
1991 | signal.signal(signal.SIGTERM, signal_handler)
1992 |
1993 | # Modify the main execution section
1994 | if __name__ == "__main__":
1995 | import platform
1996 |
1997 | logger.info("Starting YOLO CLI service")
1998 | logger.info(f"Platform: {platform.system()} {platform.release()}")
1999 |
2000 | # Test if YOLO CLI is available
2001 | try:
2002 | test_result = run_yolo_cli(["--version"], timeout=10)
2003 | if test_result["success"]:
2004 | logger.info(f"YOLO CLI available: {test_result.get('stdout', '').strip()}")
2005 | else:
2006 | logger.warning(f"YOLO CLI test failed: {test_result.get('stderr', '')}")
2007 | logger.warning("Service may not function correctly without YOLO CLI available")
2008 | except Exception as e:
2009 | logger.error(f"Error testing YOLO CLI: {str(e)}")
2010 | logger.warning("Service may not function correctly without YOLO CLI available")
2011 |
2012 | # Start the camera watchdog thread
2013 | watchdog_thread = start_watchdog()
2014 |
2015 | # Initialize and run server
2016 | logger.info("Starting MCP server...")
2017 | mcp.run(transport='stdio')
```