# Directory Structure
```
├── .gitignore
├── LICENSE
├── mcp-config.json
├── Readme.md
├── requirements.txt
├── server_cli.py
├── server_combine_terminal.py
├── server.py
└── setup.py
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
yolo_service.log
*.pt
runs/
```
--------------------------------------------------------------------------------
/Readme.md:
--------------------------------------------------------------------------------
```markdown
# YOLO MCP Service
A powerful YOLO (You Only Look Once) computer vision service that integrates with Claude AI through Model Context Protocol (MCP). This service enables Claude to perform object detection, segmentation, classification, and real-time camera analysis using state-of-the-art YOLO models.

## Features
- Object detection, segmentation, classification, and pose estimation
- Real-time camera integration for live object detection
- Support for model training, validation, and export
- Comprehensive image analysis combining multiple models
- Support for both file paths and base64-encoded images
- Seamless integration with Claude AI
## Setup Instructions
### Prerequisites
- Python 3.10 or higher
- Git (optional, for cloning the repository)
### Environment Setup
1. Create a directory for the project and navigate to it:
```bash
mkdir yolo-mcp-service
cd yolo-mcp-service
```
2. Download the project files or clone from repository:
```bash
# If you have the files, copy them to this directory
# If using git:
git clone https://github.com/GongRzhe/YOLO-MCP-Server.git .
```
3. Create a virtual environment:
```bash
# On Windows
python -m venv .venv
# On macOS/Linux
python3 -m venv .venv
```
4. Activate the virtual environment:
```bash
# On Windows
.venv\Scripts\activate
# On macOS/Linux
source .venv/bin/activate
```
5. Run the setup script:
```bash
python setup.py
```
The setup script will:
- Check your Python version
- Create a virtual environment (if not already created)
- Install required dependencies
- Generate an MCP configuration file (mcp-config.json)
- Output configuration information for different MCP clients including Claude
6. Note the output from the setup script, which will look similar to:
```
MCP configuration has been written to: /path/to/mcp-config.json
MCP configuration for Cursor:
/path/to/.venv/bin/python /path/to/server.py
MCP configuration for Windsurf/Claude Desktop:
{
"mcpServers": {
"yolo-service": {
"command": "/path/to/.venv/bin/python",
"args": [
"/path/to/server.py"
],
"env": {
"PYTHONPATH": "/path/to"
}
}
}
}
To use with Claude Desktop, merge this configuration into: /path/to/claude_desktop_config.json
```
### Downloading YOLO Models
Before using the service, you need to download the YOLO models. The service looks for models in the following directories:
- The current directory where the service is running
- A `models` subdirectory
- Any other directory configured in the `CONFIG["model_dirs"]` variable in server.py
Create a models directory and download some common models:
```bash
# Create models directory
mkdir models
# Download YOLOv8n for basic object detection
curl -L https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt -o models/yolov8n.pt
# Download YOLOv8n-seg for segmentation
curl -L https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-seg.pt -o models/yolov8n-seg.pt
# Download YOLOv8n-cls for classification
curl -L https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-cls.pt -o models/yolov8n-cls.pt
# Download YOLOv8n-pose for pose estimation
curl -L https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-pose.pt -o models/yolov8n-pose.pt
```
For Windows PowerShell users:
```powershell
# Create models directory
mkdir models
# Download models using Invoke-WebRequest
Invoke-WebRequest -Uri "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt" -OutFile "models/yolov8n.pt"
Invoke-WebRequest -Uri "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-seg.pt" -OutFile "models/yolov8n-seg.pt"
Invoke-WebRequest -Uri "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-cls.pt" -OutFile "models/yolov8n-cls.pt"
Invoke-WebRequest -Uri "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-pose.pt" -OutFile "models/yolov8n-pose.pt"
```
### Configuring Claude
To use this service with Claude:
1. For Claude web: Set up the service on your local machine and use the configuration provided by the setup script in your MCP client.
2. For Claude Desktop:
- Run the setup script and note the configuration output
- Locate your Claude Desktop configuration file (the path is provided in the setup script output)
- Add or merge the configuration into your Claude Desktop configuration file
- Restart Claude Desktop
## Using YOLO Tools in Claude
### 1. First Check Available Models
Always check which models are available on your system first:
```
I'd like to use the YOLO tools. Can you first check which models are available on my system?
<function_calls>
<invoke name="list_available_models">
</invoke>
</function_calls>
```
### 2. Detecting Objects in an Image
For analyzing an image file on your computer:
```
Can you analyze this image file for objects?
<function_calls>
<invoke name="analyze_image_from_path">
<parameter name="image_path">/path/to/your/image.jpg</parameter>
<parameter name="confidence">0.3</parameter>
</invoke>
</function_calls>
```
You can also specify a different model:
```
Can you analyze this image using a different model?
<function_calls>
<invoke name="analyze_image_from_path">
<parameter name="image_path">/path/to/your/image.jpg</parameter>
<parameter name="model_name">yolov8n.pt</parameter>
<parameter name="confidence">0.4</parameter>
</invoke>
</function_calls>
```
### 3. Running Comprehensive Image Analysis
For more detailed analysis that combines object detection, classification, and more:
```
Can you perform a comprehensive analysis on this image?
<function_calls>
<invoke name="comprehensive_image_analysis">
<parameter name="image_path">/path/to/your/image.jpg</parameter>
<parameter name="confidence">0.3</parameter>
</invoke>
</function_calls>
```
### 4. Image Segmentation
For identifying object boundaries and creating segmentation masks:
```
Can you perform image segmentation on this photo?
<function_calls>
<invoke name="segment_objects">
<parameter name="image_data">/path/to/your/image.jpg</parameter>
<parameter name="is_path">true</parameter>
<parameter name="model_name">yolov8n-seg.pt</parameter>
</invoke>
</function_calls>
```
### 5. Image Classification
For classifying the entire image content:
```
What does this image show? Can you classify it?
<function_calls>
<invoke name="classify_image">
<parameter name="image_data">/path/to/your/image.jpg</parameter>
<parameter name="is_path">true</parameter>
<parameter name="model_name">yolov8n-cls.pt</parameter>
<parameter name="top_k">5</parameter>
</invoke>
</function_calls>
```
### 6. Using Your Computer's Camera
Start real-time object detection using your computer's camera:
```
Can you turn on my camera and detect objects in real-time?
<function_calls>
<invoke name="start_camera_detection">
<parameter name="model_name">yolov8n.pt</parameter>
<parameter name="confidence">0.3</parameter>
</invoke>
</function_calls>
```
Get the latest camera detections:
```
What are you seeing through my camera right now?
<function_calls>
<invoke name="get_camera_detections">
</invoke>
</function_calls>
```
Stop the camera when finished:
```
Please turn off the camera.
<function_calls>
<invoke name="stop_camera_detection">
</invoke>
</function_calls>
```
### 7. Advanced Model Operations
#### Training a Custom Model
```
I want to train a custom object detection model on my dataset.
<function_calls>
<invoke name="train_model">
<parameter name="dataset_path">/path/to/your/dataset</parameter>
<parameter name="model_name">yolov8n.pt</parameter>
<parameter name="epochs">50</parameter>
</invoke>
</function_calls>
```
#### Validating a Model
```
Can you validate the performance of my model on a test dataset?
<function_calls>
<invoke name="validate_model">
<parameter name="model_path">/path/to/your/trained/model.pt</parameter>
<parameter name="data_path">/path/to/validation/dataset</parameter>
</invoke>
</function_calls>
```
#### Exporting a Model to Different Formats
```
I need to export my YOLO model to ONNX format.
<function_calls>
<invoke name="export_model">
<parameter name="model_path">/path/to/your/model.pt</parameter>
<parameter name="format">onnx</parameter>
</invoke>
</function_calls>
```
### 8. Testing Connection
Check if the YOLO service is running correctly:
```
Is the YOLO service running correctly?
<function_calls>
<invoke name="test_connection">
</invoke>
</function_calls>
```
## Troubleshooting
### Camera Issues
If the camera doesn't work, try different camera IDs:
```
<function_calls>
<invoke name="start_camera_detection">
<parameter name="camera_id">1</parameter> <!-- Try 0, 1, or 2 -->
</invoke>
</function_calls>
```
### Model Not Found
If a model is not found, make sure you've downloaded it to one of the configured directories:
```
<function_calls>
<invoke name="get_model_directories">
</invoke>
</function_calls>
```
### Performance Issues
For better performance with limited resources, use the smaller models (e.g., yolov8n.pt instead of yolov8x.pt)
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
mcp
ultralytics
opencv-python
numpy
pillow
```
--------------------------------------------------------------------------------
/mcp-config.json:
--------------------------------------------------------------------------------
```json
{
"mcpServers": {
"yolo-service": {
"command": "D:\\BackDataService\\YOLO-MCP-Server\\.venv\\Scripts\\python.exe",
"args": [
"D:\\BackDataService\\YOLO-MCP-Server\\server.py"
],
"env": {
"PYTHONPATH": "D:\\BackDataService\\YOLO-MCP-Server"
}
}
}
}
```
--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------
```python
# Import necessary Python standard libraries
import os # For operating with file system, handling files and directory paths
import json # For processing JSON format data
import subprocess # For creating and managing subprocesses
import sys # For accessing Python interpreter related variables and functions
import platform # For getting current operating system information
def setup_venv():
"""
Function to set up Python virtual environment
Features:
- Checks if Python version meets requirements (3.10+)
- Creates Python virtual environment (if it doesn't exist)
- Installs required dependencies in the newly created virtual environment
No parameters required
Returns: Path to Python interpreter in the virtual environment
"""
# Check Python version
python_version = sys.version_info
if python_version.major < 3 or (python_version.major == 3 and python_version.minor < 10):
print("Error: Python 3.10 or higher is required.")
sys.exit(1)
# Get absolute path of the directory containing the current script
base_path = os.path.abspath(os.path.dirname(__file__))
# Set virtual environment directory path, will create a directory named '.venv' under base_path
venv_path = os.path.join(base_path, '.venv')
# Flag whether a new virtual environment was created
venv_created = False
# Check if virtual environment already exists
if not os.path.exists(venv_path):
print("Creating virtual environment...")
# Use Python's venv module to create virtual environment
# sys.executable gets the path of the current Python interpreter
subprocess.run([sys.executable, '-m', 'venv', venv_path], check=True)
print("Virtual environment created successfully!")
venv_created = True
else:
print("Virtual environment already exists.")
# Determine pip and python executable paths based on operating system
is_windows = platform.system() == "Windows"
if is_windows:
pip_path = os.path.join(venv_path, 'Scripts', 'pip.exe')
python_path = os.path.join(venv_path, 'Scripts', 'python.exe')
else:
pip_path = os.path.join(venv_path, 'bin', 'pip')
python_path = os.path.join(venv_path, 'bin', 'python')
# Install or update dependencies
print("\nInstalling requirements...")
# Create requirements.txt with necessary packages for YOLO MCP server
requirements = [
"mcp", # Model Context Protocol for server
"ultralytics", # YOLO models
"opencv-python", # For camera operations
"numpy", # Numerical operations
"pillow" # Image processing
]
requirements_path = os.path.join(base_path, 'requirements.txt')
with open(requirements_path, 'w') as f:
f.write('\n'.join(requirements))
# Update pip using the python executable (more reliable method)
try:
subprocess.run([python_path, '-m', 'pip', 'install', '--upgrade', 'pip'], check=True)
print("Pip upgraded successfully.")
except subprocess.CalledProcessError:
print("Warning: Pip upgrade failed, continuing with existing version.")
# Install requirements
subprocess.run([pip_path, 'install', '-r', requirements_path], check=True)
print("Requirements installed successfully!")
return python_path
def generate_mcp_config(python_path):
"""
Function to generate MCP (Model Context Protocol) configuration file for YOLO service
Features:
- Creates configuration containing Python interpreter path and server script path
- Saves configuration as JSON format file
- Prints configuration information for different MCP clients
Parameters:
- python_path: Path to Python interpreter in the virtual environment
Returns: None
"""
# Get absolute path of the directory containing the current script
base_path = os.path.abspath(os.path.dirname(__file__))
# Path to YOLO MCP server script
server_script_path = os.path.join(base_path, 'server.py')
# Create MCP configuration dictionary
config = {
"mcpServers": {
"yolo-service": {
"command": python_path,
"args": [server_script_path],
"env": {
"PYTHONPATH": base_path
}
}
}
}
# Save configuration to JSON file
config_path = os.path.join(base_path, 'mcp-config.json')
with open(config_path, 'w') as f:
json.dump(config, f, indent=2) # indent=2 gives the JSON file good formatting
# Print configuration information
print(f"\nMCP configuration has been written to: {config_path}")
print(f"\nMCP configuration for Cursor:\n\n{python_path} {server_script_path}")
print("\nMCP configuration for Windsurf/Claude Desktop:")
print(json.dumps(config, indent=2))
# Provide instructions for adding configuration to Claude Desktop configuration file
if platform.system() == "Windows":
claude_config_path = os.path.expandvars("%APPDATA%\\Claude\\claude_desktop_config.json")
else: # macOS
claude_config_path = os.path.expanduser("~/Library/Application Support/Claude/claude_desktop_config.json")
print(f"\nTo use with Claude Desktop, merge this configuration into: {claude_config_path}")
# Code executed when the script is run directly (not imported)
if __name__ == '__main__':
# Execute main functions in sequence:
# 1. Set up virtual environment and install dependencies
python_path = setup_venv()
# 2. Generate MCP configuration file
generate_mcp_config(python_path)
print("\nSetup complete! You can now use the YOLO MCP server with compatible clients.")
```
--------------------------------------------------------------------------------
/server_combine_terminal.py:
--------------------------------------------------------------------------------
```python
# server.py - CLI version (command return only)
import fnmatch
import os
import base64
import time
import threading
import json
import tempfile
import platform
from io import BytesIO
from typing import List, Dict, Any, Optional, Union
import numpy as np
from PIL import Image
from mcp.server.fastmcp import FastMCP
# Set up logging configuration
import os.path
import sys
import logging
import contextlib
import signal
import atexit
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("yolo_service.log"),
logging.StreamHandler(sys.stderr)
]
)
camera_startup_status = None # Will store error details if startup fails
camera_last_error = None
logger = logging.getLogger('yolo_service')
# Global variables for camera control
camera_running = False
camera_thread = None
detection_results = []
camera_last_access_time = 0
CAMERA_INACTIVITY_TIMEOUT = 60 # Auto-shutdown after 60 seconds of inactivity
def load_image(image_source, is_path=False):
"""
Load image from file path or base64 data
Args:
image_source: File path or base64 encoded image data
is_path: Whether image_source is a file path
Returns:
PIL Image object
"""
try:
if is_path:
# Load image from file path
if os.path.exists(image_source):
return Image.open(image_source)
else:
raise FileNotFoundError(f"Image file not found: {image_source}")
else:
# Load image from base64 data
image_bytes = base64.b64decode(image_source)
return Image.open(BytesIO(image_bytes))
except Exception as e:
raise ValueError(f"Failed to load image: {str(e)}")
# Modified function to just return the command string
def run_yolo_cli(command_args, capture_output=True, timeout=60):
"""
Return the YOLO CLI command string without executing it
Args:
command_args: List of command arguments to pass to yolo CLI
capture_output: Not used, kept for compatibility with original function
timeout: Not used, kept for compatibility with original function
Returns:
Dictionary containing the command string
"""
# Build the complete command
cmd = ["yolo"] + command_args
cmd_str = " ".join(cmd)
# Log the command
logger.info(f"Would run YOLO CLI command: {cmd_str}")
# Return the command string in a similar structure as the original function
return {
"success": True,
"command": cmd_str,
"would_execute": True,
"note": "CLI execution disabled, showing command only"
}
# Create MCP server
mcp = FastMCP("YOLO_Service")
# Global configuration
CONFIG = {
"model_dirs": [
".", # Current directory
"./models", # Models subdirectory
os.path.join(os.path.dirname(os.path.abspath(__file__)), "models"),
]
}
# Function to save base64 data to temp file
def save_base64_to_temp(base64_data, prefix="image", suffix=".jpg"):
"""Save base64 encoded data to a temporary file and return the path"""
try:
# Create a temporary file
fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix=prefix)
# Decode base64 data
image_data = base64.b64decode(base64_data)
# Write data to file
with os.fdopen(fd, 'wb') as temp_file:
temp_file.write(image_data)
return temp_path
except Exception as e:
logger.error(f"Error saving base64 to temp file: {str(e)}")
raise ValueError(f"Failed to save base64 data: {str(e)}")
@mcp.tool()
def get_model_directories() -> Dict[str, Any]:
"""Get information about configured model directories and available models"""
directories = []
for directory in CONFIG["model_dirs"]:
dir_info = {
"path": directory,
"exists": os.path.exists(directory),
"is_directory": os.path.isdir(directory) if os.path.exists(directory) else False,
"models": []
}
if dir_info["exists"] and dir_info["is_directory"]:
for filename in os.listdir(directory):
if filename.endswith(".pt"):
dir_info["models"].append(filename)
directories.append(dir_info)
return {
"configured_directories": CONFIG["model_dirs"],
"directory_details": directories,
"available_models": list_available_models(),
"loaded_models": [] # No longer track loaded models with CLI approach
}
@mcp.tool()
def detect_objects(
image_data: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Return the YOLO CLI command for object detection without executing it
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing command that would be executed
"""
try:
# Determine source path
if is_path:
source_path = image_data
if not os.path.exists(source_path):
return {
"error": f"Image file not found: {source_path}",
"source": source_path
}
else:
# For base64, we would save to temp file, but we'll just indicate this
source_path = "[temp_file_from_base64]"
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
"source": image_data if is_path else "base64_image"
}
# Setup output directory for save_results
output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
# Build YOLO CLI command
cmd_args = [
"detect", # Task
"predict", # Mode
f"model={model_path}",
f"source={source_path}",
f"conf={confidence}",
"format=json", # Request JSON output for parsing
]
if save_results:
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
# Get command string without executing
result = run_yolo_cli(cmd_args)
# Return command information
return {
"status": "command_generated",
"model_used": model_name,
"model_path": model_path,
"source": source_path,
"command": result["command"],
"note": "Command generated but not executed - detection results would be returned from actual execution",
"parameters": {
"confidence": confidence,
"save_results": save_results,
"is_path": is_path,
"output_dir": output_dir if save_results else None
}
}
except Exception as e:
logger.error(f"Error in detect_objects command generation: {str(e)}")
return {
"error": f"Failed to generate detection command: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def segment_objects(
image_data: str,
model_name: str = "yolov11n-seg.pt",
confidence: float = 0.25,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Return the YOLO CLI command for segmentation without executing it
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO segmentation model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing command that would be executed
"""
try:
# Determine source path
if is_path:
source_path = image_data
if not os.path.exists(source_path):
return {
"error": f"Image file not found: {source_path}",
"source": source_path
}
else:
# For base64, we would save to temp file, but we'll just indicate this
source_path = "[temp_file_from_base64]"
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
"source": image_data if is_path else "base64_image"
}
# Setup output directory for save_results
output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
# Build YOLO CLI command
cmd_args = [
"segment", # Task
"predict", # Mode
f"model={model_path}",
f"source={source_path}",
f"conf={confidence}",
"format=json", # Request JSON output for parsing
]
if save_results:
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
# Get command string without executing
result = run_yolo_cli(cmd_args)
# Return command information
return {
"status": "command_generated",
"model_used": model_name,
"model_path": model_path,
"source": source_path,
"command": result["command"],
"note": "Command generated but not executed - segmentation results would be returned from actual execution",
"parameters": {
"confidence": confidence,
"save_results": save_results,
"is_path": is_path,
"output_dir": output_dir if save_results else None
}
}
except Exception as e:
logger.error(f"Error in segment_objects command generation: {str(e)}")
return {
"error": f"Failed to generate segmentation command: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def classify_image(
image_data: str,
model_name: str = "yolov11n-cls.pt",
top_k: int = 5,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Return the YOLO CLI command for image classification without executing it
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO classification model name
top_k: Number of top categories to return
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing command that would be executed
"""
try:
# Determine source path
if is_path:
source_path = image_data
if not os.path.exists(source_path):
return {
"error": f"Image file not found: {source_path}",
"source": source_path
}
else:
# For base64, we would save to temp file, but we'll just indicate this
source_path = "[temp_file_from_base64]"
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
"source": image_data if is_path else "base64_image"
}
# Setup output directory for save_results
output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
# Build YOLO CLI command
cmd_args = [
"classify", # Task
"predict", # Mode
f"model={model_path}",
f"source={source_path}",
"format=json", # Request JSON output for parsing
]
if save_results:
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
# Get command string without executing
result = run_yolo_cli(cmd_args)
# Return command information
return {
"status": "command_generated",
"model_used": model_name,
"model_path": model_path,
"source": source_path,
"command": result["command"],
"note": "Command generated but not executed - classification results would be returned from actual execution",
"parameters": {
"top_k": top_k,
"save_results": save_results,
"is_path": is_path,
"output_dir": output_dir if save_results else None
}
}
except Exception as e:
logger.error(f"Error in classify_image command generation: {str(e)}")
return {
"error": f"Failed to generate classification command: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def track_objects(
image_data: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
tracker: str = "bytetrack.yaml",
save_results: bool = False
) -> Dict[str, Any]:
"""
Return the YOLO CLI command for object tracking without executing it
Args:
image_data: Base64 encoded image
model_name: YOLO model name
confidence: Detection confidence threshold
tracker: Tracker name to use (e.g., 'bytetrack.yaml', 'botsort.yaml')
save_results: Whether to save results to disk
Returns:
Dictionary containing command that would be executed
"""
try:
# For base64, we would save to temp file, but we'll just indicate this
source_path = "[temp_file_from_base64]"
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
}
# Setup output directory for save_results
output_dir = os.path.join(tempfile.gettempdir(), "yolo_track_results")
# Build YOLO CLI command
cmd_args = [
"track", # Combined task and mode for tracking
f"model={model_path}",
f"source={source_path}",
f"conf={confidence}",
f"tracker={tracker}",
"format=json", # Request JSON output for parsing
]
if save_results:
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
# Get command string without executing
result = run_yolo_cli(cmd_args)
# Return command information
return {
"status": "command_generated",
"model_used": model_name,
"model_path": model_path,
"source": source_path,
"command": result["command"],
"note": "Command generated but not executed - tracking results would be returned from actual execution",
"parameters": {
"confidence": confidence,
"tracker": tracker,
"save_results": save_results,
"output_dir": output_dir if save_results else None
}
}
except Exception as e:
logger.error(f"Error in track_objects command generation: {str(e)}")
return {
"error": f"Failed to generate tracking command: {str(e)}"
}
@mcp.tool()
def train_model(
dataset_path: str,
model_name: str = "yolov8n.pt",
epochs: int = 100,
imgsz: int = 640,
batch: int = 16,
name: str = "yolo_custom_model",
project: str = "runs/train"
) -> Dict[str, Any]:
"""
Return the YOLO CLI command for model training without executing it
Args:
dataset_path: Path to YOLO format dataset
model_name: Base model to start with
epochs: Number of training epochs
imgsz: Image size for training
batch: Batch size
name: Name for the training run
project: Project directory
Returns:
Dictionary containing command that would be executed
"""
# Validate dataset path
if not os.path.exists(dataset_path):
return {"error": f"Dataset not found: {dataset_path}"}
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
}
# Determine task type based on model name
task = "detect" # Default task
if "seg" in model_name:
task = "segment"
elif "pose" in model_name:
task = "pose"
elif "cls" in model_name:
task = "classify"
elif "obb" in model_name:
task = "obb"
# Build YOLO CLI command
cmd_args = [
task, # Task
"train", # Mode
f"model={model_path}",
f"data={dataset_path}",
f"epochs={epochs}",
f"imgsz={imgsz}",
f"batch={batch}",
f"name={name}",
f"project={project}"
]
# Get command string without executing
result = run_yolo_cli(cmd_args)
# Return command information
return {
"status": "command_generated",
"model_used": model_name,
"model_path": model_path,
"command": result["command"],
"note": "Command generated but not executed - training would start with actual execution",
"parameters": {
"dataset_path": dataset_path,
"epochs": epochs,
"imgsz": imgsz,
"batch": batch,
"name": name,
"project": project,
"task": task
}
}
@mcp.tool()
def validate_model(
model_path: str,
data_path: str,
imgsz: int = 640,
batch: int = 16
) -> Dict[str, Any]:
"""
Return the YOLO CLI command for model validation without executing it
Args:
model_path: Path to YOLO model (.pt file)
data_path: Path to YOLO format validation dataset
imgsz: Image size for validation
batch: Batch size
Returns:
Dictionary containing command that would be executed
"""
# Validate model path
if not os.path.exists(model_path):
return {"error": f"Model file not found: {model_path}"}
# Validate dataset path
if not os.path.exists(data_path):
return {"error": f"Dataset not found: {data_path}"}
# Determine task type based on model name
model_name = os.path.basename(model_path)
task = "detect" # Default task
if "seg" in model_name:
task = "segment"
elif "pose" in model_name:
task = "pose"
elif "cls" in model_name:
task = "classify"
elif "obb" in model_name:
task = "obb"
# Build YOLO CLI command
cmd_args = [
task, # Task
"val", # Mode
f"model={model_path}",
f"data={data_path}",
f"imgsz={imgsz}",
f"batch={batch}"
]
# Get command string without executing
result = run_yolo_cli(cmd_args)
# Return command information
return {
"status": "command_generated",
"model_path": model_path,
"command": result["command"],
"note": "Command generated but not executed - validation would begin with actual execution",
"parameters": {
"data_path": data_path,
"imgsz": imgsz,
"batch": batch,
"task": task
}
}
@mcp.tool()
def export_model(
model_path: str,
format: str = "onnx",
imgsz: int = 640
) -> Dict[str, Any]:
"""
Return the YOLO CLI command for model export without executing it
Args:
model_path: Path to YOLO model (.pt file)
format: Export format (onnx, torchscript, openvino, etc.)
imgsz: Image size for export
Returns:
Dictionary containing command that would be executed
"""
# Validate model path
if not os.path.exists(model_path):
return {"error": f"Model file not found: {model_path}"}
# Valid export formats
valid_formats = [
"torchscript", "onnx", "openvino", "engine", "coreml", "saved_model",
"pb", "tflite", "edgetpu", "tfjs", "paddle"
]
if format not in valid_formats:
return {"error": f"Invalid export format: {format}. Valid formats include: {', '.join(valid_formats)}"}
# Build YOLO CLI command
cmd_args = [
"export", # Combined task and mode for export
f"model={model_path}",
f"format={format}",
f"imgsz={imgsz}"
]
# Get command string without executing
result = run_yolo_cli(cmd_args)
# Return command information
return {
"status": "command_generated",
"model_path": model_path,
"command": result["command"],
"note": "Command generated but not executed - export would begin with actual execution",
"parameters": {
"format": format,
"imgsz": imgsz,
"expected_output": f"{os.path.splitext(model_path)[0]}.{format}"
}
}
@mcp.tool()
def list_available_models() -> List[str]:
"""List available YOLO models that actually exist on disk in any configured directory"""
# Common YOLO model patterns
model_patterns = [
"yolov11*.pt",
"yolov8*.pt"
]
# Find all existing models in all configured directories
available_models = set()
for directory in CONFIG["model_dirs"]:
if not os.path.exists(directory):
continue
# Check for model files directly
for filename in os.listdir(directory):
if filename.endswith(".pt") and any(
fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
):
available_models.add(filename)
# Convert to sorted list
result = sorted(list(available_models))
if not result:
logger.warning("No model files found in configured directories.")
return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
return result
@mcp.tool()
def start_camera_detection(
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
camera_id: int = 0
) -> Dict[str, Any]:
"""
Return the YOLO CLI command for starting camera detection without executing it
Args:
model_name: YOLO model name to use
confidence: Detection confidence threshold
camera_id: Camera device ID (0 is usually the default camera)
Returns:
Dictionary containing command that would be executed
"""
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
}
# Determine task type based on model name
task = "detect" # Default task
if "seg" in model_name:
task = "segment"
elif "pose" in model_name:
task = "pose"
elif "cls" in model_name:
task = "classify"
# Build YOLO CLI command
cmd_args = [
task, # Task
"predict", # Mode
f"model={model_path}",
f"source={camera_id}", # Camera source ID
f"conf={confidence}",
"format=json",
"save=False", # Don't save frames by default
"show=True" # Show GUI window for camera view
]
# Get command string without executing
result = run_yolo_cli(cmd_args)
# Return command information
return {
"status": "command_generated",
"model_used": model_name,
"model_path": model_path,
"command": result["command"],
"note": "Command generated but not executed - camera would start with actual execution",
"parameters": {
"confidence": confidence,
"camera_id": camera_id,
"task": task
}
}
@mcp.tool()
def stop_camera_detection() -> Dict[str, Any]:
"""
Simulate stopping camera detection (no actual command to execute)
Returns:
Information message
"""
return {
"status": "command_generated",
"message": "To stop camera detection, close the YOLO window or press 'q' in the terminal",
"note": "Since commands are not executed, no actual camera is running"
}
@mcp.tool()
def get_camera_detections() -> Dict[str, Any]:
"""
Simulate getting latest camera detections (no actual command to execute)
Returns:
Information message
"""
return {
"status": "command_generated",
"message": "Camera detections would be returned here if a camera was running",
"note": "Since commands are not executed, no camera is running and no detections are available"
}
@mcp.tool()
def comprehensive_image_analysis(
image_path: str,
confidence: float = 0.25,
save_results: bool = False
) -> Dict[str, Any]:
"""
Return the YOLO CLI commands for comprehensive image analysis without executing them
Args:
image_path: Path to the image file
confidence: Detection confidence threshold
save_results: Whether to save results to disk
Returns:
Dictionary containing commands that would be executed
"""
if not os.path.exists(image_path):
return {"error": f"Image file not found: {image_path}"}
commands = []
# 1. Object detection
detect_result = detect_objects(
image_data=image_path,
model_name="yolov11n.pt",
confidence=confidence,
save_results=save_results,
is_path=True
)
if "command" in detect_result:
commands.append({
"task": "object_detection",
"command": detect_result["command"]
})
# 2. Scene classification
try:
cls_result = classify_image(
image_data=image_path,
model_name="yolov8n-cls.pt",
top_k=3,
save_results=save_results,
is_path=True
)
if "command" in cls_result:
commands.append({
"task": "classification",
"command": cls_result["command"]
})
except Exception as e:
logger.error(f"Error generating classification command: {str(e)}")
# 3. Pose detection if available
for directory in CONFIG["model_dirs"]:
pose_model_path = os.path.join(directory, "yolov8n-pose.pt")
if os.path.exists(pose_model_path):
# Build YOLO CLI command for pose detection
cmd_args = [
"pose", # Task
"predict", # Mode
f"model={pose_model_path}",
f"source={image_path}",
f"conf={confidence}",
"format=json",
]
if save_results:
output_dir = os.path.join(tempfile.gettempdir(), "yolo_pose_results")
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
result = run_yolo_cli(cmd_args)
commands.append({
"task": "pose_detection",
"command": result["command"]
})
break
return {
"status": "commands_generated",
"image_path": image_path,
"commands": commands,
"note": "Commands generated but not executed - comprehensive analysis would occur with actual execution",
"parameters": {
"confidence": confidence,
"save_results": save_results
}
}
@mcp.tool()
def analyze_image_from_path(
image_path: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
save_results: bool = False
) -> Dict[str, Any]:
"""
Return the YOLO CLI command for image analysis without executing it
Args:
image_path: Path to the image file
model_name: YOLO model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
Returns:
Dictionary containing command that would be executed
"""
try:
# Call detect_objects function with is_path=True
return detect_objects(
image_data=image_path,
model_name=model_name,
confidence=confidence,
save_results=save_results,
is_path=True
)
except Exception as e:
return {
"error": f"Failed to generate analysis command: {str(e)}",
"image_path": image_path
}
@mcp.tool()
def test_connection() -> Dict[str, Any]:
"""
Test if YOLO CLI service is available
Returns:
Status information and available tools
"""
# Build a simple YOLO CLI version command
cmd_args = ["--version"]
result = run_yolo_cli(cmd_args)
return {
"status": "YOLO CLI command generator is running",
"command_mode": "Command generation only, no execution",
"version_command": result["command"],
"available_models": list_available_models(),
"available_tools": [
"list_available_models", "detect_objects", "segment_objects",
"classify_image", "track_objects", "train_model", "validate_model",
"export_model", "start_camera_detection", "stop_camera_detection",
"get_camera_detections", "test_connection",
# Additional tools
"analyze_image_from_path",
"comprehensive_image_analysis"
],
"note": "This service only generates YOLO commands without executing them"
}
# Modify the main execution section
if __name__ == "__main__":
import platform
logger.info("Starting YOLO CLI command generator service")
logger.info(f"Platform: {platform.system()} {platform.release()}")
logger.info("⚠️ Commands will be generated but NOT executed")
# Initialize and run server
logger.info("Starting MCP server...")
mcp.run(transport='stdio')
```
--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------
```python
# server.py
import fnmatch
import os
import base64
import cv2
import time
import threading
from io import BytesIO
from typing import List, Dict, Any, Optional, Union
import numpy as np
from PIL import Image
from mcp.server.fastmcp import FastMCP
from ultralytics import YOLO
# Add this near the top of server.py with other imports
import os.path
import sys
import logging
import contextlib
import logging
import sys
import contextlib
import signal
import atexit
# Set up logging configuration - add this near the top of the file
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("yolo_service.log"),
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger('yolo_service')
# Global variables for camera control
camera_running = False
camera_thread = None
detection_results = []
camera_last_access_time = 0
CAMERA_INACTIVITY_TIMEOUT = 60 # Auto-shutdown after 60 seconds of inactivity
@contextlib.contextmanager
def redirect_stdout_to_stderr():
old_stdout = sys.stdout
sys.stdout = sys.stderr
try:
yield
finally:
sys.stdout = old_stdout
def camera_watchdog_thread():
"""Monitor thread that auto-stops the camera after inactivity"""
global camera_running, camera_last_access_time
logger.info("Camera watchdog thread started")
while True:
# Sleep for a short time to avoid excessive CPU usage
time.sleep(5)
# Check if camera is running
if camera_running:
current_time = time.time()
elapsed_time = current_time - camera_last_access_time
# If no access for more than the timeout, auto-stop
if elapsed_time > CAMERA_INACTIVITY_TIMEOUT:
logger.info(f"Auto-stopping camera after {elapsed_time:.1f} seconds of inactivity")
stop_camera_detection()
else:
# If camera is not running, no need to check frequently
time.sleep(10)
def load_image(image_source, is_path=False):
"""
Load image from file path or base64 data
Args:
image_source: File path or base64 encoded image data
is_path: Whether image_source is a file path
Returns:
PIL Image object
"""
try:
if is_path:
# Load image from file path
if os.path.exists(image_source):
return Image.open(image_source)
else:
raise FileNotFoundError(f"Image file not found: {image_source}")
else:
# Load image from base64 data
image_bytes = base64.b64decode(image_source)
return Image.open(BytesIO(image_bytes))
except Exception as e:
raise ValueError(f"Failed to load image: {str(e)}")
# Create MCP server
mcp = FastMCP("YOLO_Service")
# Global model cache
models = {}
def get_model(model_name: str = "yolov8n.pt") -> YOLO:
"""Get or load YOLO model from any of the configured model directories"""
if model_name in models:
return models[model_name]
# Try to find the model in any of the configured directories
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
raise FileNotFoundError(f"Model '{model_name}' not found in any configured directories. Available models: {available_str}")
# Load and cache the model - with stdout redirected
logger.info(f"Loading model: {model_name} from {model_path}")
with redirect_stdout_to_stderr():
models[model_name] = YOLO(model_path)
return models[model_name]
# Global configuration
CONFIG = {
"model_dirs": [
".", # Current directory
"./models", # Models subdirectory
os.path.join(os.path.dirname(os.path.abspath(__file__)), "models"), # Absolute path to models
# Add any other potential model directories here
]
}
# Add a new tool to get information about model directories
@mcp.tool()
def get_model_directories() -> Dict[str, Any]:
"""Get information about configured model directories and available models"""
directories = []
for directory in CONFIG["model_dirs"]:
dir_info = {
"path": directory,
"exists": os.path.exists(directory),
"is_directory": os.path.isdir(directory) if os.path.exists(directory) else False,
"models": []
}
if dir_info["exists"] and dir_info["is_directory"]:
for filename in os.listdir(directory):
if filename.endswith(".pt"):
dir_info["models"].append(filename)
directories.append(dir_info)
return {
"configured_directories": CONFIG["model_dirs"],
"directory_details": directories,
"available_models": list_available_models(),
"loaded_models": list(models.keys())
}
@mcp.tool()
def detect_objects(
image_data: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Detect objects in an image using YOLO
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing detection results
"""
try:
# Load image (supports path or base64)
image = load_image(image_data, is_path=is_path)
# Load model and perform detection - with stdout redirected
model = get_model(model_name)
with redirect_stdout_to_stderr(): # Ensure all YOLO outputs go to stderr
results = model.predict(image, conf=confidence, save=save_results)
# Format results
formatted_results = []
for result in results:
boxes = result.boxes
detections = []
for i in range(len(boxes)):
box = boxes[i]
x1, y1, x2, y2 = box.xyxy[0].tolist()
confidence = float(box.conf[0])
class_id = int(box.cls[0])
class_name = result.names[class_id]
detections.append({
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name
})
formatted_results.append({
"detections": detections,
"image_shape": result.orig_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"total_detections": sum(len(r["detections"]) for r in formatted_results),
"source": image_data if is_path else "base64_image"
}
except Exception as e:
logger.error(f"Error in detect_objects: {str(e)}")
return {
"error": f"Failed to detect objects: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def segment_objects(
image_data: str,
model_name: str = "yolov11n-seg.pt",
confidence: float = 0.25,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Perform instance segmentation on an image using YOLO
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO segmentation model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing segmentation results
"""
try:
# Load image (supports path or base64)
image = load_image(image_data, is_path=is_path)
# Load model and perform segmentation
model = get_model(model_name)
with redirect_stdout_to_stderr(): # Add this context manager
results = model.predict(image, conf=confidence, save=save_results)
# Format results
formatted_results = []
for result in results:
if not hasattr(result, 'masks') or result.masks is None:
continue
boxes = result.boxes
masks = result.masks
segments = []
for i in range(len(boxes)):
box = boxes[i]
mask = masks[i].data[0].cpu().numpy() if masks else None
x1, y1, x2, y2 = box.xyxy[0].tolist()
confidence = float(box.conf[0])
class_id = int(box.cls[0])
class_name = result.names[class_id]
segment = {
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name
}
if mask is not None:
# Convert binary mask to simplified format for API response
segment["mask"] = mask.tolist()
segments.append(segment)
formatted_results.append({
"segments": segments,
"image_shape": result.orig_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"total_segments": sum(len(r["segments"]) for r in formatted_results),
"source": image_data if is_path else "base64_image"
}
except Exception as e:
return {
"error": f"Failed to segment objects: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def classify_image(
image_data: str,
model_name: str = "yolov11n-cls.pt",
top_k: int = 5,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Classify an image using YOLO classification model
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO classification model name
top_k: Number of top categories to return
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing classification results
"""
try:
# Load image (supports path or base64)
image = load_image(image_data, is_path=is_path)
# Load model and perform classification
model = get_model(model_name)
with redirect_stdout_to_stderr(): # Add this context manager
results = model.predict(image, save=save_results)
# Format results
formatted_results = []
for result in results:
if not hasattr(result, 'probs') or result.probs is None:
continue
probs = result.probs
top_indices = probs.top5
top_probs = probs.top5conf.tolist()
top_classes = [result.names[idx] for idx in top_indices]
classifications = [
{"class_id": int(idx), "class_name": name, "probability": float(prob)}
for idx, name, prob in zip(top_indices[:top_k], top_classes[:top_k], top_probs[:top_k])
]
formatted_results.append({
"classifications": classifications,
"image_shape": result.orig_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"top_k": top_k,
"source": image_data if is_path else "base64_image"
}
except Exception as e:
return {
"error": f"Failed to classify image: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def track_objects(
image_data: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
tracker: str = "bytetrack.yaml",
save_results: bool = False
) -> Dict[str, Any]:
"""
Track objects in an image sequence using YOLO
Args:
image_data: Base64 encoded image
model_name: YOLO model name
confidence: Detection confidence threshold
tracker: Tracker name to use (e.g., 'bytetrack.yaml', 'botsort.yaml')
save_results: Whether to save results to disk
Returns:
Dictionary containing tracking results
"""
# Decode Base64 image
image_bytes = base64.b64decode(image_data)
image = Image.open(BytesIO(image_bytes))
# Load model and perform tracking
model = get_model(model_name)
# Add redirect_stdout_to_stderr context manager
with redirect_stdout_to_stderr():
results = model.track(image, conf=confidence, tracker=tracker, save=save_results)
# Format results
formatted_results = []
for result in results:
if not hasattr(result, 'boxes') or result.boxes is None:
continue
boxes = result.boxes
tracks = []
for i in range(len(boxes)):
box = boxes[i]
x1, y1, x2, y2 = box.xyxy[0].tolist()
confidence = float(box.conf[0])
class_id = int(box.cls[0])
class_name = result.names[class_id]
# Extract track ID (if any)
track_id = int(box.id[0]) if box.id is not None else None
track = {
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name,
"track_id": track_id
}
tracks.append(track)
formatted_results.append({
"tracks": tracks,
"image_shape": result.orig_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"tracker": tracker,
"total_tracks": sum(len(r["tracks"]) for r in formatted_results)
}
# 3. FIX train_model FUNCTION TO USE REDIRECTION:
@mcp.tool()
def train_model(
dataset_path: str,
model_name: str = "yolov8n.pt",
epochs: int = 100,
imgsz: int = 640,
batch: int = 16,
name: str = "yolo_custom_model",
project: str = "runs/train"
) -> Dict[str, Any]:
"""
Train a YOLO model on a custom dataset
Args:
dataset_path: Path to YOLO format dataset
model_name: Base model to start with
epochs: Number of training epochs
imgsz: Image size for training
batch: Batch size
name: Name for the training run
project: Project directory
Returns:
Dictionary containing training results
"""
# Validate dataset path
if not os.path.exists(dataset_path):
return {"error": f"Dataset not found: {dataset_path}"}
# Initialize model
model = get_model(model_name)
# Train model
try:
# Add redirect_stdout_to_stderr context manager
with redirect_stdout_to_stderr():
results = model.train(
data=dataset_path,
epochs=epochs,
imgsz=imgsz,
batch=batch,
name=name,
project=project
)
# Get best model path
best_model_path = os.path.join(project, name, "weights", "best.pt")
return {
"status": "success",
"model_path": best_model_path,
"epochs_completed": epochs,
"final_metrics": {
"precision": float(results.results_dict.get("metrics/precision(B)", 0)),
"recall": float(results.results_dict.get("metrics/recall(B)", 0)),
"mAP50": float(results.results_dict.get("metrics/mAP50(B)", 0)),
"mAP50-95": float(results.results_dict.get("metrics/mAP50-95(B)", 0))
}
}
except Exception as e:
return {"error": f"Training failed: {str(e)}"}
# 4. FIX validate_model FUNCTION TO USE REDIRECTION:
@mcp.tool()
def validate_model(
model_path: str,
data_path: str,
imgsz: int = 640,
batch: int = 16
) -> Dict[str, Any]:
"""
Validate a YOLO model on a dataset
Args:
model_path: Path to YOLO model (.pt file)
data_path: Path to YOLO format validation dataset
imgsz: Image size for validation
batch: Batch size
Returns:
Dictionary containing validation results
"""
# Validate model path
if not os.path.exists(model_path):
return {"error": f"Model file not found: {model_path}"}
# Validate dataset path
if not os.path.exists(data_path):
return {"error": f"Dataset not found: {data_path}"}
# Load model
try:
model = get_model(model_path)
except Exception as e:
return {"error": f"Failed to load model: {str(e)}"}
# Validate model
try:
# Add redirect_stdout_to_stderr context manager
with redirect_stdout_to_stderr():
results = model.val(data=data_path, imgsz=imgsz, batch=batch)
return {
"status": "success",
"metrics": {
"precision": float(results.results_dict.get("metrics/precision(B)", 0)),
"recall": float(results.results_dict.get("metrics/recall(B)", 0)),
"mAP50": float(results.results_dict.get("metrics/mAP50(B)", 0)),
"mAP50-95": float(results.results_dict.get("metrics/mAP50-95(B)", 0))
}
}
except Exception as e:
return {"error": f"Validation failed: {str(e)}"}
# 5. FIX export_model FUNCTION TO USE REDIRECTION:
@mcp.tool()
def export_model(
model_path: str,
format: str = "onnx",
imgsz: int = 640
) -> Dict[str, Any]:
"""
Export a YOLO model to different formats
Args:
model_path: Path to YOLO model (.pt file)
format: Export format (onnx, torchscript, openvino, etc.)
imgsz: Image size for export
Returns:
Dictionary containing export results
"""
# Validate model path
if not os.path.exists(model_path):
return {"error": f"Model file not found: {model_path}"}
# Valid export formats
valid_formats = [
"torchscript", "onnx", "openvino", "engine", "coreml", "saved_model",
"pb", "tflite", "edgetpu", "tfjs", "paddle"
]
if format not in valid_formats:
return {"error": f"Invalid export format: {format}. Valid formats include: {', '.join(valid_formats)}"}
# Load model
try:
model = get_model(model_path)
except Exception as e:
return {"error": f"Failed to load model: {str(e)}"}
# Export model
try:
# Add redirect_stdout_to_stderr context manager
with redirect_stdout_to_stderr():
export_path = model.export(format=format, imgsz=imgsz)
return {
"status": "success",
"export_path": str(export_path),
"format": format
}
except Exception as e:
return {"error": f"Export failed: {str(e)}"}
# 6. ADD REDIRECTION TO get_model_info FUNCTION:
@mcp.resource("model_info/{model_name}")
def get_model_info(model_name: str) -> Dict[str, Any]:
"""
Get information about a YOLO model
Args:
model_name: YOLO model name
Returns:
Dictionary containing model information
"""
try:
model = get_model(model_name)
# Get model task
task = 'detect' # Default task
if 'seg' in model_name:
task = 'segment'
elif 'pose' in model_name:
task = 'pose'
elif 'cls' in model_name:
task = 'classify'
elif 'obb' in model_name:
task = 'obb'
# Make sure any model property access that might trigger output is wrapped
with redirect_stdout_to_stderr():
yaml_str = str(model.yaml)
pt_path = str(model.pt_path) if hasattr(model, 'pt_path') else None
class_names = model.names
# Get model info
return {
"model_name": model_name,
"task": task,
"yaml": yaml_str,
"pt_path": pt_path,
"class_names": class_names
}
except Exception as e:
return {"error": f"Failed to get model info: {str(e)}"}
# 7. MODIFY list_available_models to use logging instead of print
@mcp.tool()
def list_available_models() -> List[str]:
"""List available YOLO models that actually exist on disk in any configured directory"""
# Common YOLO model patterns
model_patterns = [
"yolov11*.pt",
"yolov8*.pt"
]
# Find all existing models in all configured directories
available_models = set()
for directory in CONFIG["model_dirs"]:
if not os.path.exists(directory):
continue
# Check for model files directly
for filename in os.listdir(directory):
if filename.endswith(".pt") and any(
fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
):
available_models.add(filename)
# Convert to sorted list
result = sorted(list(available_models))
if not result:
# Replace print with logger
logger.warning("No model files found in configured directories.")
return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
return result
@mcp.resource("model_info/{model_name}")
def get_model_info(model_name: str) -> Dict[str, Any]:
"""
Get information about a YOLO model
Args:
model_name: YOLO model name
Returns:
Dictionary containing model information
"""
try:
model = get_model(model_name)
# Get model task
task = 'detect' # Default task
if 'seg' in model_name:
task = 'segment'
elif 'pose' in model_name:
task = 'pose'
elif 'cls' in model_name:
task = 'classify'
elif 'obb' in model_name:
task = 'obb'
# Get model info
return {
"model_name": model_name,
"task": task,
"yaml": str(model.yaml),
"pt_path": str(model.pt_path) if hasattr(model, 'pt_path') else None,
"class_names": model.names
}
except Exception as e:
return {"error": f"Failed to get model info: {str(e)}"}
@mcp.tool()
def list_available_models() -> List[str]:
"""List available YOLO models that actually exist on disk in any configured directory"""
# Common YOLO model patterns
model_patterns = [
"yolov11*.pt",
"yolov8*.pt"
]
# Find all existing models in all configured directories
available_models = set()
for directory in CONFIG["model_dirs"]:
if not os.path.exists(directory):
continue
# Check for model files directly
for filename in os.listdir(directory):
if filename.endswith(".pt") and any(
fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
):
available_models.add(filename)
# Convert to sorted list
result = sorted(list(available_models))
if not result:
print("Warning: No model files found in configured directories.")
return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
return result
# Camera detection background thread
camera_thread = None
camera_running = False
detection_results = []
def camera_detection_thread(model_name, confidence, fps_limit=30, camera_id=0):
"""Background thread for camera detection"""
global camera_running, detection_results
# Load model
try:
with redirect_stdout_to_stderr():
model = get_model(model_name)
logger.info(f"Model {model_name} loaded successfully")
except Exception as e:
logger.error(f"Error loading model: {str(e)}")
camera_running = False
detection_results.append({
"timestamp": time.time(),
"error": f"Failed to load model: {str(e)}",
"detections": []
})
return
# Rest of the function...
# Try to open camera with multiple attempts and multiple camera IDs if necessary
cap = None
error_message = ""
# Try camera IDs from 0 to 2
for cam_id in range(3):
try:
logger.info(f"Attempting to open camera with ID {cam_id}...")
cap = cv2.VideoCapture(cam_id)
if cap.isOpened():
logger.info(f"Successfully opened camera {cam_id}")
break
except Exception as e:
error_message = f"Error opening camera {cam_id}: {str(e)}"
logger.error(error_message)
# Check if any camera was successfully opened
if cap is None or not cap.isOpened():
logger.error("Error: Could not open any camera.")
camera_running = False
detection_results.append({
"timestamp": time.time(),
"error": "Failed to open camera. Make sure camera is connected and not in use by another application.",
"camera_status": "unavailable",
"detections": []
})
return
# Get camera properties for diagnostics
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
fps = cap.get(cv2.CAP_PROP_FPS)
logger.info(f"Camera properties: {width}x{height} at {fps} FPS")
# Calculate frame interval based on fps_limit
frame_interval = 1.0 / fps_limit
frame_count = 0
error_count = 0
while camera_running:
start_time = time.time()
try:
ret, frame = cap.read()
if not ret:
logger.warning(f"Error: Failed to capture frame (attempt {error_count+1}).")
error_count += 1
# Add error to detection results
detection_results.append({
"timestamp": time.time(),
"error": f"Failed to capture frame (attempt {error_count})",
"camera_status": "error",
"detections": []
})
# If we have consistent failures, try to restart the camera
if error_count >= 5:
logger.warning("Too many frame capture errors, attempting to restart camera...")
cap.release()
time.sleep(1)
cap = cv2.VideoCapture(camera_id)
error_count = 0
if not cap.isOpened():
logger.error("Failed to reopen camera after errors.")
break
time.sleep(1) # Wait before trying again
continue
# Reset error count on successful frame capture
error_count = 0
frame_count += 1
# Perform detection on frame
with redirect_stdout_to_stderr(): # Add this context manager
results = model.predict(frame, conf=confidence)
# Update detection results (only keep the last 10)
if len(detection_results) >= 10:
detection_results.pop(0)
# Format results
for result in results:
boxes = result.boxes
detections = []
for i in range(len(boxes)):
box = boxes[i]
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = float(box.conf[0])
class_id = int(box.cls[0])
class_name = result.names[class_id]
detections.append({
"box": [x1, y1, x2, y2],
"confidence": conf,
"class_id": class_id,
"class_name": class_name
})
detection_results.append({
"timestamp": time.time(),
"frame_count": frame_count,
"detections": detections,
"camera_status": "running",
"image_shape": result.orig_shape
})
# Log occasional status
if frame_count % 30 == 0:
logger.info(f"Camera running: processed {frame_count} frames")
detection_count = sum(len(r.get("detections", [])) for r in detection_results if "detections" in r)
logger.info(f"Total detections in current buffer: {detection_count}")
# Limit FPS by waiting if necessary
elapsed = time.time() - start_time
if elapsed < frame_interval:
time.sleep(frame_interval - elapsed)
except Exception as e:
logger.error(f"Error in camera thread: {str(e)}")
detection_results.append({
"timestamp": time.time(),
"error": f"Exception in camera processing: {str(e)}",
"camera_status": "error",
"detections": []
})
time.sleep(1) # Wait before continuing
# Clean up
logger.info("Shutting down camera...")
if cap is not None:
cap.release()
@mcp.tool()
def start_camera_detection(
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
camera_id: int = 0
) -> Dict[str, Any]:
"""
Start realtime object detection using the computer's camera
Args:
model_name: YOLO model name to use
confidence: Detection confidence threshold
camera_id: Camera device ID (0 is usually the default camera)
Returns:
Status of camera detection
"""
global camera_thread, camera_running, detection_results, camera_last_access_time
# Check if already running
if camera_running:
# Update last access time
camera_last_access_time = time.time()
return {"status": "success", "message": "Camera detection is already running"}
# Clear previous results
detection_results = []
# First, try to check if OpenCV is properly installed
try:
cv2_version = cv2.__version__
logger.info(f"OpenCV version: {cv2_version}")
except Exception as e:
logger.error(f"OpenCV not properly installed: {str(e)}")
return {
"status": "error",
"message": f"OpenCV not properly installed: {str(e)}",
"solution": "Please check OpenCV installation"
}
# Start detection thread
camera_running = True
camera_last_access_time = time.time() # Update access time
camera_thread = threading.Thread(
target=camera_detection_thread,
args=(model_name, confidence, 30, camera_id),
daemon=True
)
camera_thread.start()
# Add initial status to detection results
detection_results.append({
"timestamp": time.time(),
"system_info": {
"os": platform.system() if 'platform' in globals() else "Unknown",
"opencv_version": cv2.__version__,
"camera_id": camera_id
},
"camera_status": "starting",
"detections": []
})
return {
"status": "success",
"message": f"Started camera detection using model {model_name}",
"model": model_name,
"confidence": confidence,
"camera_id": camera_id,
"auto_shutdown": f"Camera will auto-shutdown after {CAMERA_INACTIVITY_TIMEOUT} seconds of inactivity",
"note": "If camera doesn't work, try different camera_id values (0, 1, or 2)"
}
@mcp.tool()
def stop_camera_detection() -> Dict[str, Any]:
"""
Stop realtime camera detection
Returns:
Status message
"""
global camera_running
if not camera_running:
return {"status": "error", "message": "Camera detection is not running"}
logger.info("Stopping camera detection by user request")
camera_running = False
# Wait for thread to terminate
if camera_thread and camera_thread.is_alive():
camera_thread.join(timeout=2.0)
return {
"status": "success",
"message": "Stopped camera detection"
}
@mcp.tool()
def get_camera_detections() -> Dict[str, Any]:
"""
Get the latest detections from the camera
Returns:
Dictionary with recent detections
"""
global detection_results, camera_thread, camera_last_access_time
# Update the last access time whenever this function is called
if camera_running:
camera_last_access_time = time.time()
# Check if thread is alive
thread_alive = camera_thread is not None and camera_thread.is_alive()
# If camera_running is True but thread is dead, there's an issue
if camera_running and not thread_alive:
return {
"status": "error",
"message": "Camera thread has stopped unexpectedly",
"is_running": False,
"camera_status": "error",
"thread_alive": thread_alive,
"detections": detection_results,
"count": len(detection_results),
"solution": "Please try restart the camera with a different camera_id"
}
if not camera_running:
return {
"status": "error",
"message": "Camera detection is not running",
"is_running": False,
"camera_status": "stopped"
}
# Check for errors in detection results
errors = [result.get("error") for result in detection_results if "error" in result]
recent_errors = errors[-5:] if errors else []
# Count actual detections
detection_count = sum(len(result.get("detections", [])) for result in detection_results if "detections" in result)
return {
"status": "success",
"is_running": camera_running,
"thread_alive": thread_alive,
"detections": detection_results,
"count": len(detection_results),
"total_detections": detection_count,
"recent_errors": recent_errors if recent_errors else None,
"camera_status": "error" if recent_errors else "running",
"inactivity_timeout": {
"seconds_remaining": int(CAMERA_INACTIVITY_TIMEOUT - (time.time() - camera_last_access_time)),
"last_access": camera_last_access_time
}
}
def cleanup_resources():
"""Clean up resources when the server is shutting down"""
global camera_running
logger.info("Cleaning up resources...")
# Stop camera if it's running
if camera_running:
logger.info("Shutting down camera during server exit")
camera_running = False
# Give the camera thread a moment to clean up
if camera_thread and camera_thread.is_alive():
camera_thread.join(timeout=2.0)
logger.info("Cleanup complete")
atexit.register(cleanup_resources)
def signal_handler(sig, frame):
"""Handle termination signals"""
logger.info(f"Received signal {sig}, shutting down...")
cleanup_resources()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def start_watchdog():
"""Start the camera watchdog thread"""
watchdog = threading.Thread(
target=camera_watchdog_thread,
daemon=True
)
watchdog.start()
return watchdog
@mcp.tool()
def comprehensive_image_analysis(
image_path: str,
confidence: float = 0.25,
save_results: bool = False
) -> Dict[str, Any]:
"""
Perform comprehensive analysis on an image by combining multiple model results
Args:
image_path: Path to the image file
confidence: Detection confidence threshold
save_results: Whether to save results to disk
Returns:
Dictionary containing comprehensive analysis results
"""
try:
if not os.path.exists(image_path):
return {"error": f"Image file not found: {image_path}"}
# Load image
image = load_image(image_path, is_path=True)
analysis_results = {}
# 1. Object detection
object_model = get_model("yolov11n.pt")
with redirect_stdout_to_stderr(): # Add this context manager
object_results = object_model.predict(image, conf=confidence, save=save_results)
# Process object detection results
detected_objects = []
for result in object_results:
boxes = result.boxes
for i in range(len(boxes)):
box = boxes[i]
conf = float(box.conf[0])
class_id = int(box.cls[0])
class_name = result.names[class_id]
detected_objects.append({
"class_name": class_name,
"confidence": conf
})
analysis_results["objects"] = detected_objects
# 2. Scene classification
try:
cls_model = get_model("yolov8n-cls.pt")
with redirect_stdout_to_stderr(): # Add this context manager
cls_results = cls_model.predict(image, save=False)
scene_classifications = []
for result in cls_results:
if hasattr(result, 'probs') and result.probs is not None:
probs = result.probs
top_indices = probs.top5
top_probs = probs.top5conf.tolist()
top_classes = [result.names[idx] for idx in top_indices]
for idx, name, prob in zip(top_indices[:3], top_classes[:3], top_probs[:3]):
scene_classifications.append({
"class_name": name,
"probability": float(prob)
})
analysis_results["scene"] = scene_classifications
except Exception as e:
analysis_results["scene_error"] = str(e)
# 3. Human pose detection
try:
pose_model = get_model("yolov8n-pose.pt")
with redirect_stdout_to_stderr(): # Add this context manager
pose_results = pose_model.predict(image, conf=confidence, save=False)
detected_poses = []
for result in pose_results:
if hasattr(result, 'keypoints') and result.keypoints is not None:
boxes = result.boxes
keypoints = result.keypoints
for i in range(len(boxes)):
box = boxes[i]
conf = float(box.conf[0])
detected_poses.append({
"person_confidence": conf,
"has_keypoints": keypoints[i].data.shape[1] if keypoints else 0
})
analysis_results["poses"] = detected_poses
except Exception as e:
analysis_results["pose_error"] = str(e)
# Rest of the function remains the same...
# 4. Comprehensive task description
tasks = []
# Detect main objects
main_objects = [obj["class_name"] for obj in detected_objects if obj["confidence"] > 0.5]
if "person" in main_objects:
tasks.append("Person Detection")
# Check for weapon objects
weapon_objects = ["sword", "knife", "katana", "gun", "pistol", "rifle"]
weapons = [obj for obj in main_objects if any(weapon in obj.lower() for weapon in weapon_objects)]
if weapons:
tasks.append(f"Weapon Detection ({', '.join(weapons)})")
# Count people
person_count = main_objects.count("person")
if person_count > 0:
tasks.append(f"Person Count ({person_count} people)")
# Pose analysis
if "poses" in analysis_results and analysis_results["poses"]:
tasks.append("Human Pose Analysis")
# Scene classification
if "scene" in analysis_results and analysis_results["scene"]:
scene_types = [scene["class_name"] for scene in analysis_results["scene"][:2]]
tasks.append(f"Scene Classification ({', '.join(scene_types)})")
analysis_results["identified_tasks"] = tasks
# Return comprehensive results
return {
"status": "success",
"image_path": image_path,
"analysis": analysis_results,
"summary": "Tasks identified in the image: " + ", ".join(tasks) if tasks else "No clear tasks identified"
}
except Exception as e:
return {
"status": "error",
"image_path": image_path,
"error": f"Comprehensive analysis failed: {str(e)}"
}
@mcp.tool()
def analyze_image_from_path(
image_path: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
save_results: bool = False
) -> Dict[str, Any]:
"""
Analyze image from file path using YOLO
Args:
image_path: Path to the image file
model_name: YOLO model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
Returns:
Dictionary containing detection results
"""
try:
# Call detect_objects function with is_path=True
return detect_objects(
image_data=image_path,
model_name=model_name,
confidence=confidence,
save_results=save_results,
is_path=True
)
except Exception as e:
return {
"error": f"Failed to analyze image: {str(e)}",
"image_path": image_path
}
@mcp.tool()
def test_connection() -> Dict[str, Any]:
"""
Test if YOLO MCP service is running properly
Returns:
Status information and available tools
"""
return {
"status": "YOLO MCP service is running normally",
"available_models": list_available_models(),
"available_tools": [
"list_available_models", "detect_objects", "segment_objects",
"classify_image", "detect_poses", "detect_oriented_objects",
"track_objects", "train_model", "validate_model",
"export_model", "start_camera_detection", "stop_camera_detection",
"get_camera_detections", "test_connection",
# Additional tools
"analyze_image_from_path",
"comprehensive_image_analysis"
],
"new_features": [
"Support for loading images directly from file paths",
"Support for comprehensive image analysis with task identification",
"All detection functions support both file paths and base64 data"
]
}
# Modify the main execution section
if __name__ == "__main__":
logger.info("Starting YOLO MCP service")
# Start the camera watchdog thread
watchdog_thread = start_watchdog()
# Initialize and run server
mcp.run(transport='stdio')
```
--------------------------------------------------------------------------------
/server_cli.py:
--------------------------------------------------------------------------------
```python
# server.py - CLI version
import fnmatch
import os
import base64
import cv2
import time
import threading
import subprocess
import json
import tempfile
import platform
from io import BytesIO
from typing import List, Dict, Any, Optional, Union
import numpy as np
from PIL import Image
from mcp.server.fastmcp import FastMCP
# Set up logging configuration
import os.path
import sys
import logging
import contextlib
import signal
import atexit
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("yolo_service.log"),
logging.StreamHandler(sys.stderr)
]
)
camera_startup_status = None # Will store error details if startup fails
camera_last_error = None
logger = logging.getLogger('yolo_service')
# Global variables for camera control
camera_running = False
camera_thread = None
detection_results = []
camera_last_access_time = 0
CAMERA_INACTIVITY_TIMEOUT = 60 # Auto-shutdown after 60 seconds of inactivity
def camera_watchdog_thread():
"""Monitor thread that auto-stops the camera after inactivity"""
global camera_running, camera_last_access_time
logger.info("Camera watchdog thread started")
while True:
# Sleep for a short time to avoid excessive CPU usage
time.sleep(5)
# Check if camera is running
if camera_running:
current_time = time.time()
elapsed_time = current_time - camera_last_access_time
# If no access for more than the timeout, auto-stop
if elapsed_time > CAMERA_INACTIVITY_TIMEOUT:
logger.info(f"Auto-stopping camera after {elapsed_time:.1f} seconds of inactivity")
stop_camera_detection()
else:
# If camera is not running, no need to check frequently
time.sleep(10)
def load_image(image_source, is_path=False):
"""
Load image from file path or base64 data
Args:
image_source: File path or base64 encoded image data
is_path: Whether image_source is a file path
Returns:
PIL Image object
"""
try:
if is_path:
# Load image from file path
if os.path.exists(image_source):
return Image.open(image_source)
else:
raise FileNotFoundError(f"Image file not found: {image_source}")
else:
# Load image from base64 data
image_bytes = base64.b64decode(image_source)
return Image.open(BytesIO(image_bytes))
except Exception as e:
raise ValueError(f"Failed to load image: {str(e)}")
# New function to run YOLO CLI commands
def run_yolo_cli(command_args, capture_output=True, timeout=60):
"""
Run YOLO CLI command and return the results
Args:
command_args: List of command arguments to pass to yolo CLI
capture_output: Whether to capture and return command output
timeout: Command timeout in seconds
Returns:
Command output or success status
"""
# Build the complete command
cmd = ["yolo"] + command_args
# Log the command
logger.info(f"Running YOLO CLI command: {' '.join(cmd)}")
try:
# Run the command
result = subprocess.run(
cmd,
capture_output=capture_output,
text=True,
check=False, # Don't raise exception on non-zero exit
timeout=timeout
)
# Check for errors
if result.returncode != 0:
logger.error(f"YOLO CLI command failed with code {result.returncode}")
logger.error(f"stderr: {result.stderr}")
return {
"success": False,
"error": result.stderr,
"command": " ".join(cmd),
"returncode": result.returncode
}
# Return the result
if capture_output:
return {
"success": True,
"stdout": result.stdout,
"stderr": result.stderr,
"command": " ".join(cmd)
}
else:
return {"success": True, "command": " ".join(cmd)}
except subprocess.TimeoutExpired:
logger.error(f"YOLO CLI command timed out after {timeout} seconds")
return {
"success": False,
"error": f"Command timed out after {timeout} seconds",
"command": " ".join(cmd)
}
except Exception as e:
logger.error(f"Error running YOLO CLI command: {str(e)}")
return {
"success": False,
"error": str(e),
"command": " ".join(cmd)
}
# Create MCP server
mcp = FastMCP("YOLO_Service")
# Global configuration
CONFIG = {
"model_dirs": [
".", # Current directory
"./models", # Models subdirectory
os.path.join(os.path.dirname(os.path.abspath(__file__)), "models"),
]
}
# Function to save base64 data to temp file
def save_base64_to_temp(base64_data, prefix="image", suffix=".jpg"):
"""Save base64 encoded data to a temporary file and return the path"""
try:
# Create a temporary file
fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix=prefix)
# Decode base64 data
image_data = base64.b64decode(base64_data)
# Write data to file
with os.fdopen(fd, 'wb') as temp_file:
temp_file.write(image_data)
return temp_path
except Exception as e:
logger.error(f"Error saving base64 to temp file: {str(e)}")
raise ValueError(f"Failed to save base64 data: {str(e)}")
@mcp.tool()
def get_model_directories() -> Dict[str, Any]:
"""Get information about configured model directories and available models"""
directories = []
for directory in CONFIG["model_dirs"]:
dir_info = {
"path": directory,
"exists": os.path.exists(directory),
"is_directory": os.path.isdir(directory) if os.path.exists(directory) else False,
"models": []
}
if dir_info["exists"] and dir_info["is_directory"]:
for filename in os.listdir(directory):
if filename.endswith(".pt"):
dir_info["models"].append(filename)
directories.append(dir_info)
return {
"configured_directories": CONFIG["model_dirs"],
"directory_details": directories,
"available_models": list_available_models(),
"loaded_models": [] # No longer track loaded models with CLI approach
}
@mcp.tool()
def detect_objects(
image_data: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Detect objects in an image using YOLO CLI
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing detection results
"""
try:
# Determine source path
if is_path:
source_path = image_data
if not os.path.exists(source_path):
return {
"error": f"Image file not found: {source_path}",
"source": source_path
}
else:
# Save base64 data to temp file
source_path = save_base64_to_temp(image_data)
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
"source": image_data if is_path else "base64_image"
}
# Setup output directory if saving results
output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
if save_results and not os.path.exists(output_dir):
os.makedirs(output_dir)
# Build YOLO CLI command
cmd_args = [
"detect", # Task
"predict", # Mode
f"model={model_path}",
f"source={source_path}",
f"conf={confidence}",
"format=json", # Request JSON output for parsing
]
if save_results:
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
# Run YOLO CLI command
result = run_yolo_cli(cmd_args)
# Clean up temp file if we created one
if not is_path:
try:
os.remove(source_path)
except Exception as e:
logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
# Check for command success
if not result["success"]:
return {
"error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
"source": image_data if is_path else "base64_image"
}
# Parse JSON output from stdout
try:
# Try to find JSON in the output
json_start = result["stdout"].find("{")
json_end = result["stdout"].rfind("}")
if json_start >= 0 and json_end > json_start:
json_str = result["stdout"][json_start:json_end+1]
detection_data = json.loads(json_str)
else:
# If no JSON found, create a basic response with info from stderr
return {
"results": [],
"model_used": model_name,
"total_detections": 0,
"source": image_data if is_path else "base64_image",
"command_output": result["stderr"]
}
# Format results
formatted_results = []
# Parse detection data from YOLO JSON output
if "predictions" in detection_data:
detections = []
for pred in detection_data["predictions"]:
# Extract box coordinates
box = pred.get("box", {})
x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
# Extract class information
confidence = pred.get("confidence", 0)
class_name = pred.get("name", "unknown")
class_id = pred.get("class", -1)
detections.append({
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name
})
# Get image dimensions if available
image_shape = [
detection_data.get("width", 0),
detection_data.get("height", 0)
]
formatted_results.append({
"detections": detections,
"image_shape": image_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"total_detections": sum(len(r["detections"]) for r in formatted_results),
"source": image_data if is_path else "base64_image",
"save_dir": output_dir if save_results else None
}
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON from YOLO output: {e}")
logger.error(f"Output: {result['stdout']}")
return {
"error": f"Failed to parse YOLO results: {str(e)}",
"command": result.get("command", ""),
"source": image_data if is_path else "base64_image",
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
except Exception as e:
logger.error(f"Error in detect_objects: {str(e)}")
return {
"error": f"Failed to detect objects: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def segment_objects(
image_data: str,
model_name: str = "yolov11n-seg.pt",
confidence: float = 0.25,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Perform instance segmentation on an image using YOLO CLI
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO segmentation model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing segmentation results
"""
try:
# Determine source path
if is_path:
source_path = image_data
if not os.path.exists(source_path):
return {
"error": f"Image file not found: {source_path}",
"source": source_path
}
else:
# Save base64 data to temp file
source_path = save_base64_to_temp(image_data)
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
"source": image_data if is_path else "base64_image"
}
# Setup output directory if saving results
output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
if save_results and not os.path.exists(output_dir):
os.makedirs(output_dir)
# Build YOLO CLI command
cmd_args = [
"segment", # Task
"predict", # Mode
f"model={model_path}",
f"source={source_path}",
f"conf={confidence}",
"format=json", # Request JSON output for parsing
]
if save_results:
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
# Run YOLO CLI command
result = run_yolo_cli(cmd_args)
# Clean up temp file if we created one
if not is_path:
try:
os.remove(source_path)
except Exception as e:
logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
# Check for command success
if not result["success"]:
return {
"error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
"source": image_data if is_path else "base64_image"
}
# Parse JSON output from stdout
try:
# Try to find JSON in the output
json_start = result["stdout"].find("{")
json_end = result["stdout"].rfind("}")
if json_start >= 0 and json_end > json_start:
json_str = result["stdout"][json_start:json_end+1]
segmentation_data = json.loads(json_str)
else:
# If no JSON found, create a basic response with info from stderr
return {
"results": [],
"model_used": model_name,
"total_segments": 0,
"source": image_data if is_path else "base64_image",
"command_output": result["stderr"]
}
# Format results
formatted_results = []
# Parse segmentation data from YOLO JSON output
if "predictions" in segmentation_data:
segments = []
for pred in segmentation_data["predictions"]:
# Extract box coordinates
box = pred.get("box", {})
x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
# Extract class information
confidence = pred.get("confidence", 0)
class_name = pred.get("name", "unknown")
class_id = pred.get("class", -1)
segment = {
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name
}
# Extract mask if available
if "mask" in pred:
segment["mask"] = pred["mask"]
segments.append(segment)
# Get image dimensions if available
image_shape = [
segmentation_data.get("width", 0),
segmentation_data.get("height", 0)
]
formatted_results.append({
"segments": segments,
"image_shape": image_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"total_segments": sum(len(r["segments"]) for r in formatted_results),
"source": image_data if is_path else "base64_image",
"save_dir": output_dir if save_results else None
}
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON from YOLO output: {e}")
logger.error(f"Output: {result['stdout']}")
return {
"error": f"Failed to parse YOLO results: {str(e)}",
"command": result.get("command", ""),
"source": image_data if is_path else "base64_image",
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
except Exception as e:
logger.error(f"Error in segment_objects: {str(e)}")
return {
"error": f"Failed to segment objects: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def classify_image(
image_data: str,
model_name: str = "yolov11n-cls.pt",
top_k: int = 5,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Classify an image using YOLO classification model via CLI
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO classification model name
top_k: Number of top categories to return
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing classification results
"""
try:
# Determine source path
if is_path:
source_path = image_data
if not os.path.exists(source_path):
return {
"error": f"Image file not found: {source_path}",
"source": source_path
}
else:
# Save base64 data to temp file
source_path = save_base64_to_temp(image_data)
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
"source": image_data if is_path else "base64_image"
}
# Setup output directory if saving results
output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
if save_results and not os.path.exists(output_dir):
os.makedirs(output_dir)
# Build YOLO CLI command
cmd_args = [
"classify", # Task
"predict", # Mode
f"model={model_path}",
f"source={source_path}",
"format=json", # Request JSON output for parsing
]
if save_results:
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
# Run YOLO CLI command
result = run_yolo_cli(cmd_args)
# Clean up temp file if we created one
if not is_path:
try:
os.remove(source_path)
except Exception as e:
logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
# Check for command success
if not result["success"]:
return {
"error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
"source": image_data if is_path else "base64_image"
}
# Parse JSON output from stdout
try:
# Try to find JSON in the output
json_start = result["stdout"].find("{")
json_end = result["stdout"].rfind("}")
if json_start >= 0 and json_end > json_start:
json_str = result["stdout"][json_start:json_end+1]
classification_data = json.loads(json_str)
else:
# If no JSON found, create a basic response with info from stderr
return {
"results": [],
"model_used": model_name,
"top_k": top_k,
"source": image_data if is_path else "base64_image",
"command_output": result["stderr"]
}
# Format results
formatted_results = []
# Parse classification data from YOLO JSON output
if "predictions" in classification_data:
classifications = []
predictions = classification_data["predictions"]
# Predictions could be an array of classifications
for i, pred in enumerate(predictions[:top_k]):
class_name = pred.get("name", f"class_{i}")
confidence = pred.get("confidence", 0)
classifications.append({
"class_id": i,
"class_name": class_name,
"probability": confidence
})
# Get image dimensions if available
image_shape = [
classification_data.get("width", 0),
classification_data.get("height", 0)
]
formatted_results.append({
"classifications": classifications,
"image_shape": image_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"top_k": top_k,
"source": image_data if is_path else "base64_image",
"save_dir": output_dir if save_results else None
}
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON from YOLO output: {e}")
logger.error(f"Output: {result['stdout']}")
return {
"error": f"Failed to parse YOLO results: {str(e)}",
"command": result.get("command", ""),
"source": image_data if is_path else "base64_image",
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
except Exception as e:
logger.error(f"Error in classify_image: {str(e)}")
return {
"error": f"Failed to classify image: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def track_objects(
image_data: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
tracker: str = "bytetrack.yaml",
save_results: bool = False
) -> Dict[str, Any]:
"""
Track objects in an image sequence using YOLO CLI
Args:
image_data: Base64 encoded image
model_name: YOLO model name
confidence: Detection confidence threshold
tracker: Tracker name to use (e.g., 'bytetrack.yaml', 'botsort.yaml')
save_results: Whether to save results to disk
Returns:
Dictionary containing tracking results
"""
try:
# Save base64 data to temp file
source_path = save_base64_to_temp(image_data)
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
}
# Setup output directory if saving results
output_dir = os.path.join(tempfile.gettempdir(), "yolo_track_results")
if save_results and not os.path.exists(output_dir):
os.makedirs(output_dir)
# Build YOLO CLI command
cmd_args = [
"track", # Combined task and mode for tracking
f"model={model_path}",
f"source={source_path}",
f"conf={confidence}",
f"tracker={tracker}",
"format=json", # Request JSON output for parsing
]
if save_results:
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
# Run YOLO CLI command
result = run_yolo_cli(cmd_args)
# Clean up temp file
try:
os.remove(source_path)
except Exception as e:
logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
# Check for command success
if not result["success"]:
return {
"error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
}
# Parse JSON output from stdout
try:
# Try to find JSON in the output
json_start = result["stdout"].find("{")
json_end = result["stdout"].rfind("}")
if json_start >= 0 and json_end > json_start:
json_str = result["stdout"][json_start:json_end+1]
tracking_data = json.loads(json_str)
else:
# If no JSON found, create a basic response
return {
"results": [],
"model_used": model_name,
"tracker": tracker,
"total_tracks": 0,
"command_output": result["stderr"]
}
# Format results
formatted_results = []
# Parse tracking data from YOLO JSON output
if "predictions" in tracking_data:
tracks = []
for pred in tracking_data["predictions"]:
# Extract box coordinates
box = pred.get("box", {})
x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
# Extract class and tracking information
confidence = pred.get("confidence", 0)
class_name = pred.get("name", "unknown")
class_id = pred.get("class", -1)
track_id = pred.get("id", -1)
track = {
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name,
"track_id": track_id
}
tracks.append(track)
# Get image dimensions if available
image_shape = [
tracking_data.get("width", 0),
tracking_data.get("height", 0)
]
formatted_results.append({
"tracks": tracks,
"image_shape": image_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"tracker": tracker,
"total_tracks": sum(len(r["tracks"]) for r in formatted_results),
"save_dir": output_dir if save_results else None
}
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON from YOLO output: {e}")
logger.error(f"Output: {result['stdout']}")
return {
"error": f"Failed to parse YOLO results: {str(e)}",
"command": result.get("command", ""),
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
except Exception as e:
logger.error(f"Error in track_objects: {str(e)}")
return {
"error": f"Failed to track objects: {str(e)}"
}
@mcp.tool()
def train_model(
dataset_path: str,
model_name: str = "yolov8n.pt",
epochs: int = 100,
imgsz: int = 640,
batch: int = 16,
name: str = "yolo_custom_model",
project: str = "runs/train"
) -> Dict[str, Any]:
"""
Train a YOLO model on a custom dataset using CLI
Args:
dataset_path: Path to YOLO format dataset
model_name: Base model to start with
epochs: Number of training epochs
imgsz: Image size for training
batch: Batch size
name: Name for the training run
project: Project directory
Returns:
Dictionary containing training results
"""
# Validate dataset path
if not os.path.exists(dataset_path):
return {"error": f"Dataset not found: {dataset_path}"}
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
}
# Create project directory if it doesn't exist
if not os.path.exists(project):
os.makedirs(project)
# Determine task type based on model name
task = "detect" # Default task
if "seg" in model_name:
task = "segment"
elif "pose" in model_name:
task = "pose"
elif "cls" in model_name:
task = "classify"
elif "obb" in model_name:
task = "obb"
# Build YOLO CLI command
cmd_args = [
task, # Task
"train", # Mode
f"model={model_path}",
f"data={dataset_path}",
f"epochs={epochs}",
f"imgsz={imgsz}",
f"batch={batch}",
f"name={name}",
f"project={project}"
]
# Run YOLO CLI command - with longer timeout
logger.info(f"Starting model training with {epochs} epochs - this may take a while...")
result = run_yolo_cli(cmd_args, timeout=epochs * 300) # 5 minutes per epoch
# Check for command success
if not result["success"]:
return {
"error": f"Training failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
"stderr": result.get("stderr", "")
}
# Determine path to best model weights
best_model_path = os.path.join(project, name, "weights", "best.pt")
# Determine metrics from stdout if possible
metrics = {}
try:
# Look for metrics in output
stdout = result.get("stdout", "")
# Extract metrics from training output
import re
precision_match = re.search(r"Precision: ([\d\.]+)", stdout)
recall_match = re.search(r"Recall: ([\d\.]+)", stdout)
map50_match = re.search(r"mAP50: ([\d\.]+)", stdout)
map_match = re.search(r"mAP50-95: ([\d\.]+)", stdout)
if precision_match:
metrics["precision"] = float(precision_match.group(1))
if recall_match:
metrics["recall"] = float(recall_match.group(1))
if map50_match:
metrics["mAP50"] = float(map50_match.group(1))
if map_match:
metrics["mAP50-95"] = float(map_match.group(1))
except Exception as e:
logger.warning(f"Failed to parse metrics from training output: {str(e)}")
return {
"status": "success",
"model_path": best_model_path,
"epochs_completed": epochs,
"final_metrics": metrics,
"training_log_sample": result.get("stdout", "")[:1000] + "..." if len(result.get("stdout", "")) > 1000 else result.get("stdout", "")
}
@mcp.tool()
def validate_model(
model_path: str,
data_path: str,
imgsz: int = 640,
batch: int = 16
) -> Dict[str, Any]:
"""
Validate a YOLO model on a dataset using CLI
Args:
model_path: Path to YOLO model (.pt file)
data_path: Path to YOLO format validation dataset
imgsz: Image size for validation
batch: Batch size
Returns:
Dictionary containing validation results
"""
# Validate model path
if not os.path.exists(model_path):
return {"error": f"Model file not found: {model_path}"}
# Validate dataset path
if not os.path.exists(data_path):
return {"error": f"Dataset not found: {data_path}"}
# Determine task type based on model name
model_name = os.path.basename(model_path)
task = "detect" # Default task
if "seg" in model_name:
task = "segment"
elif "pose" in model_name:
task = "pose"
elif "cls" in model_name:
task = "classify"
elif "obb" in model_name:
task = "obb"
# Build YOLO CLI command
cmd_args = [
task, # Task
"val", # Mode
f"model={model_path}",
f"data={data_path}",
f"imgsz={imgsz}",
f"batch={batch}"
]
# Run YOLO CLI command
result = run_yolo_cli(cmd_args, timeout=300) # 5 minute timeout
# Check for command success
if not result["success"]:
return {
"error": f"Validation failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
"stderr": result.get("stderr", "")
}
# Extract metrics from validation output
metrics = {}
try:
stdout = result.get("stdout", "")
import re
precision_match = re.search(r"Precision: ([\d\.]+)", stdout)
recall_match = re.search(r"Recall: ([\d\.]+)", stdout)
map50_match = re.search(r"mAP50: ([\d\.]+)", stdout)
map_match = re.search(r"mAP50-95: ([\d\.]+)", stdout)
if precision_match:
metrics["precision"] = float(precision_match.group(1))
if recall_match:
metrics["recall"] = float(recall_match.group(1))
if map50_match:
metrics["mAP50"] = float(map50_match.group(1))
if map_match:
metrics["mAP50-95"] = float(map_match.group(1))
except Exception as e:
logger.warning(f"Failed to parse metrics from validation output: {str(e)}")
return {
"status": "success",
"metrics": metrics,
"validation_output": result.get("stdout", "")[:1000] + "..." if len(result.get("stdout", "")) > 1000 else result.get("stdout", "")
}
@mcp.tool()
def export_model(
model_path: str,
format: str = "onnx",
imgsz: int = 640
) -> Dict[str, Any]:
"""
Export a YOLO model to different formats using CLI
Args:
model_path: Path to YOLO model (.pt file)
format: Export format (onnx, torchscript, openvino, etc.)
imgsz: Image size for export
Returns:
Dictionary containing export results
"""
# Validate model path
if not os.path.exists(model_path):
return {"error": f"Model file not found: {model_path}"}
# Valid export formats
valid_formats = [
"torchscript", "onnx", "openvino", "engine", "coreml", "saved_model",
"pb", "tflite", "edgetpu", "tfjs", "paddle"
]
if format not in valid_formats:
return {"error": f"Invalid export format: {format}. Valid formats include: {', '.join(valid_formats)}"}
# Build YOLO CLI command
cmd_args = [
"export", # Combined task and mode for export
f"model={model_path}",
f"format={format}",
f"imgsz={imgsz}"
]
# Run YOLO CLI command
result = run_yolo_cli(cmd_args, timeout=300) # 5 minute timeout
# Check for command success
if not result["success"]:
return {
"error": f"Export failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
"stderr": result.get("stderr", "")
}
# Try to determine export path
export_path = None
try:
# Model path without extension
base_path = os.path.splitext(model_path)[0]
# Expected export paths based on format
format_extensions = {
"torchscript": ".torchscript",
"onnx": ".onnx",
"openvino": "_openvino_model",
"engine": ".engine",
"coreml": ".mlmodel",
"saved_model": "_saved_model",
"pb": ".pb",
"tflite": ".tflite",
"edgetpu": "_edgetpu.tflite",
"tfjs": "_web_model",
"paddle": "_paddle_model"
}
expected_ext = format_extensions.get(format, "")
expected_path = base_path + expected_ext
# Check if the exported file exists
if os.path.exists(expected_path) or os.path.isdir(expected_path):
export_path = expected_path
except Exception as e:
logger.warning(f"Failed to determine export path: {str(e)}")
return {
"status": "success",
"export_path": export_path,
"format": format,
"export_output": result.get("stdout", "")[:1000] + "..." if len(result.get("stdout", "")) > 1000 else result.get("stdout", "")
}
@mcp.tool()
def list_available_models() -> List[str]:
"""List available YOLO models that actually exist on disk in any configured directory"""
# Common YOLO model patterns
model_patterns = [
"yolov11*.pt",
"yolov8*.pt"
]
# Find all existing models in all configured directories
available_models = set()
for directory in CONFIG["model_dirs"]:
if not os.path.exists(directory):
continue
# Check for model files directly
for filename in os.listdir(directory):
if filename.endswith(".pt") and any(
fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
):
available_models.add(filename)
# Convert to sorted list
result = sorted(list(available_models))
if not result:
logger.warning("No model files found in configured directories.")
return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
return result
# Camera detection functions using CLI instead of Python API
def camera_detection_thread(model_name, confidence, fps_limit=30, camera_id=0):
"""Background thread for camera detection using YOLO CLI"""
global camera_running, detection_results, camera_last_access_time, camera_startup_status, camera_last_error
try:
# Create a unique directory for camera results
output_dir = os.path.join(tempfile.gettempdir(), f"yolo_camera_{int(time.time())}")
os.makedirs(output_dir, exist_ok=True)
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
error_msg = f"Model {model_name} not found in any configured directories"
logger.error(error_msg)
camera_running = False
camera_startup_status = {
"success": False,
"error": error_msg,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": f"Failed to load model: Model not found",
"camera_status": "error",
"detections": []
})
return
# Log camera start
logger.info(f"Starting camera detection with model {model_name}, camera ID {camera_id}")
detection_results.append({
"timestamp": time.time(),
"system_info": {
"os": platform.system() if 'platform' in globals() else "Unknown",
"camera_id": camera_id
},
"camera_status": "starting",
"detections": []
})
# Determine task type based on model name
task = "detect" # Default task
if "seg" in model_name:
task = "segment"
elif "pose" in model_name:
task = "pose"
elif "cls" in model_name:
task = "classify"
# Build YOLO CLI command
base_cmd_args = [
task, # Task
"predict", # Mode
f"model={model_path}",
f"source={camera_id}", # Camera source ID
f"conf={confidence}",
"format=json",
"save=False", # Don't save frames by default
"show=False" # Don't show GUI window
]
# First verify YOLO command is available
logger.info("Verifying YOLO CLI availability before starting camera...")
check_cmd = ["yolo", "--version"]
try:
check_result = subprocess.run(
check_cmd,
capture_output=True,
text=True,
check=False,
timeout=10
)
if check_result.returncode != 0:
error_msg = f"YOLO CLI check failed with code {check_result.returncode}: {check_result.stderr}"
logger.error(error_msg)
camera_running = False
camera_startup_status = {
"success": False,
"error": error_msg,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": error_msg,
"camera_status": "error",
"detections": []
})
return
logger.info(f"YOLO CLI is available: {check_result.stdout.strip()}")
except Exception as e:
error_msg = f"Error checking YOLO CLI: {str(e)}"
logger.error(error_msg)
camera_running = False
camera_startup_status = {
"success": False,
"error": error_msg,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": error_msg,
"camera_status": "error",
"detections": []
})
return
# Set up subprocess for ongoing camera capture
process = None
frame_count = 0
error_count = 0
start_time = time.time()
# Start YOLO CLI process
cmd_str = "yolo " + " ".join(base_cmd_args)
logger.info(f"Starting YOLO CLI process: {cmd_str}")
try:
process = subprocess.Popen(
["yolo"] + base_cmd_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # Line buffered
)
# Wait a moment to check if the process immediately fails
time.sleep(1)
if process.poll() is not None:
error_msg = f"YOLO process failed to start (exit code {process.returncode})"
stderr_output = process.stderr.read()
logger.error(f"{error_msg} - STDERR: {stderr_output}")
camera_running = False
camera_startup_status = {
"success": False,
"error": error_msg,
"stderr": stderr_output,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": error_msg,
"stderr": stderr_output,
"camera_status": "error",
"detections": []
})
return
# Process started successfully
camera_startup_status = {
"success": True,
"timestamp": time.time()
}
# Handle camera stream
while camera_running:
# Read output line from process
stdout_line = process.stdout.readline().strip()
if not stdout_line:
# Check if process is still running
if process.poll() is not None:
error_msg = f"YOLO process ended unexpectedly with code {process.returncode}"
stderr_output = process.stderr.read()
logger.error(f"{error_msg} - STDERR: {stderr_output}")
camera_running = False
camera_last_error = {
"error": error_msg,
"stderr": stderr_output,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": error_msg,
"camera_status": "error",
"stderr": stderr_output,
"detections": []
})
break
time.sleep(0.1) # Short sleep to avoid CPU spin
continue
# Try to parse JSON output from YOLO
try:
# Find JSON in the output line
json_start = stdout_line.find("{")
if json_start >= 0:
json_str = stdout_line[json_start:]
detection_data = json.loads(json_str)
frame_count += 1
# Process detection data
if "predictions" in detection_data:
detections = []
for pred in detection_data["predictions"]:
# Extract box coordinates
box = pred.get("box", {})
x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
# Extract class information
confidence = pred.get("confidence", 0)
class_name = pred.get("name", "unknown")
class_id = pred.get("class", -1)
detections.append({
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name
})
# Update detection results (keep only the last 10)
if len(detection_results) >= 10:
detection_results.pop(0)
# Get image dimensions if available
image_shape = [
detection_data.get("width", 0),
detection_data.get("height", 0)
]
detection_results.append({
"timestamp": time.time(),
"frame_count": frame_count,
"detections": detections,
"camera_status": "running",
"image_shape": image_shape
})
# Update last access time when processing frames
camera_last_access_time = time.time()
# Log occasional status
if frame_count % 30 == 0:
fps = frame_count / (time.time() - start_time)
logger.info(f"Camera running: processed {frame_count} frames ({fps:.1f} FPS)")
detection_count = sum(len(r.get("detections", [])) for r in detection_results if "detections" in r)
logger.info(f"Total detections in current buffer: {detection_count}")
except json.JSONDecodeError:
# Not all lines will be valid JSON, that's normal
pass
except Exception as e:
error_msg = f"Error processing camera output: {str(e)}"
logger.warning(error_msg)
error_count += 1
if error_count > 10:
logger.error("Too many processing errors, stopping camera")
camera_running = False
camera_last_error = {
"error": "Too many processing errors",
"timestamp": time.time()
}
break
except Exception as e:
error_msg = f"Error in camera process management: {str(e)}"
logger.error(error_msg)
camera_running = False
camera_startup_status = {
"success": False,
"error": error_msg,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": error_msg,
"camera_status": "error",
"detections": []
})
return
except Exception as e:
error_msg = f"Error in camera thread: {str(e)}"
logger.error(error_msg)
camera_running = False
camera_startup_status = {
"success": False,
"error": error_msg,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": error_msg,
"camera_status": "error",
"detections": []
})
finally:
# Clean up
logger.info("Shutting down camera...")
camera_running = False
if process is not None and process.poll() is None:
try:
# Terminate process
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill() # Force kill if terminate doesn't work
except Exception as e:
logger.error(f"Error terminating YOLO process: {str(e)}")
logger.info("Camera detection stopped")
@mcp.tool()
def start_camera_detection(
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
camera_id: int = 0
) -> Dict[str, Any]:
"""
Start realtime object detection using the computer's camera via YOLO CLI
Args:
model_name: YOLO model name to use
confidence: Detection confidence threshold
camera_id: Camera device ID (0 is usually the default camera)
Returns:
Status of camera detection
"""
global camera_thread, camera_running, detection_results, camera_last_access_time, camera_startup_status, camera_last_error
# Reset status variables
camera_startup_status = None
camera_last_error = None
# Check if already running
if camera_running:
# Update last access time
camera_last_access_time = time.time()
return {"status": "success", "message": "Camera detection is already running"}
# Clear previous results
detection_results = []
# First check if YOLO CLI is available
try:
version_check = run_yolo_cli(["--version"], timeout=10)
if not version_check["success"]:
return {
"status": "error",
"message": "YOLO CLI not available or not properly installed",
"details": version_check.get("error", "Unknown error"),
"solution": "Please make sure the 'yolo' command is in your PATH"
}
except Exception as e:
error_msg = f"Error checking YOLO CLI: {str(e)}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"solution": "Please make sure the 'yolo' command is in your PATH"
}
# Start detection thread
camera_running = True
camera_last_access_time = time.time() # Update access time
camera_thread = threading.Thread(
target=camera_detection_thread,
args=(model_name, confidence, 30, camera_id),
daemon=True
)
camera_thread.start()
# Give the thread a moment to initialize and potentially fail
time.sleep(2)
# Check if the thread has reported any startup issues
if camera_startup_status and not camera_startup_status.get("success", False):
# Camera thread encountered an error during startup
return {
"status": "error",
"message": "Camera detection failed to start",
"details": camera_startup_status,
"solution": "Check logs for detailed error information"
}
# Thread is running, camera should be starting
return {
"status": "success",
"message": f"Started camera detection using model {model_name}",
"model": model_name,
"confidence": confidence,
"camera_id": camera_id,
"auto_shutdown": f"Camera will auto-shutdown after {CAMERA_INACTIVITY_TIMEOUT} seconds of inactivity",
"note": "If camera doesn't work, try different camera_id values (0, 1, or 2)"
}
@mcp.tool()
def stop_camera_detection() -> Dict[str, Any]:
"""
Stop realtime camera detection
Returns:
Status message
"""
global camera_running
if not camera_running:
return {"status": "error", "message": "Camera detection is not running"}
logger.info("Stopping camera detection by user request")
camera_running = False
# Wait for thread to terminate
if camera_thread and camera_thread.is_alive():
camera_thread.join(timeout=2.0)
return {
"status": "success",
"message": "Stopped camera detection"
}
@mcp.tool()
def get_camera_detections() -> Dict[str, Any]:
"""
Get the latest detections from the camera
Returns:
Dictionary with recent detections
"""
global detection_results, camera_thread, camera_last_access_time, camera_startup_status, camera_last_error
# Update the last access time whenever this function is called
if camera_running:
camera_last_access_time = time.time()
# Check if thread is alive
thread_alive = camera_thread is not None and camera_thread.is_alive()
# If camera_running is False, check if we have startup status information
if not camera_running and camera_startup_status and not camera_startup_status.get("success", False):
return {
"status": "error",
"message": "Camera detection failed to start",
"is_running": False,
"camera_status": "error",
"startup_error": camera_startup_status,
"solution": "Check logs for detailed error information"
}
# If camera_running is True but thread is dead, there's an issue
if camera_running and not thread_alive:
return {
"status": "error",
"message": "Camera thread has stopped unexpectedly",
"is_running": False,
"camera_status": "error",
"thread_alive": thread_alive,
"last_error": camera_last_error,
"detections": detection_results,
"count": len(detection_results),
"solution": "Please try restart the camera with a different camera_id"
}
if not camera_running:
return {
"status": "error",
"message": "Camera detection is not running",
"is_running": False,
"camera_status": "stopped"
}
# Check for errors in detection results
errors = [result.get("error") for result in detection_results if "error" in result]
recent_errors = errors[-5:] if errors else []
# Count actual detections
detection_count = sum(len(result.get("detections", [])) for result in detection_results if "detections" in result)
return {
"status": "success",
"is_running": camera_running,
"thread_alive": thread_alive,
"detections": detection_results,
"count": len(detection_results),
"total_detections": detection_count,
"recent_errors": recent_errors if recent_errors else None,
"camera_status": "error" if recent_errors else "running",
"inactivity_timeout": {
"seconds_remaining": int(CAMERA_INACTIVITY_TIMEOUT - (time.time() - camera_last_access_time)),
"last_access": camera_last_access_time
}
}
@mcp.tool()
def comprehensive_image_analysis(
image_path: str,
confidence: float = 0.25,
save_results: bool = False
) -> Dict[str, Any]:
"""
Perform comprehensive analysis on an image by combining multiple CLI model results
Args:
image_path: Path to the image file
confidence: Detection confidence threshold
save_results: Whether to save results to disk
Returns:
Dictionary containing comprehensive analysis results
"""
try:
if not os.path.exists(image_path):
return {"error": f"Image file not found: {image_path}"}
analysis_results = {}
# 1. Object detection
logger.info("Running object detection for comprehensive analysis")
object_result = detect_objects(
image_data=image_path,
model_name="yolov11n.pt",
confidence=confidence,
save_results=save_results,
is_path=True
)
# Process object detection results
detected_objects = []
if "results" in object_result and object_result["results"]:
for result in object_result["results"]:
for obj in result.get("detections", []):
detected_objects.append({
"class_name": obj.get("class_name", "unknown"),
"confidence": obj.get("confidence", 0)
})
analysis_results["objects"] = detected_objects
# 2. Scene classification
try:
logger.info("Running classification for comprehensive analysis")
cls_result = classify_image(
image_data=image_path,
model_name="yolov8n-cls.pt",
top_k=3,
save_results=False,
is_path=True
)
scene_classifications = []
if "results" in cls_result and cls_result["results"]:
for result in cls_result["results"]:
for cls in result.get("classifications", []):
scene_classifications.append({
"class_name": cls.get("class_name", "unknown"),
"probability": cls.get("probability", 0)
})
analysis_results["scene"] = scene_classifications
except Exception as e:
logger.error(f"Error during scene classification: {str(e)}")
analysis_results["scene_error"] = str(e)
# 3. Human pose detection (if pose model is available)
try:
# Check if pose model exists
pose_model_exists = False
for directory in CONFIG["model_dirs"]:
if os.path.exists(os.path.join(directory, "yolov8n-pose.pt")):
pose_model_exists = True
break
if pose_model_exists:
logger.info("Running pose detection for comprehensive analysis")
# Build YOLO CLI command for pose detection
cmd_args = [
"pose", # Task
"predict", # Mode
f"model=yolov8n-pose.pt",
f"source={image_path}",
f"conf={confidence}",
"format=json",
]
result = run_yolo_cli(cmd_args)
if result["success"]:
# Parse JSON output
json_start = result["stdout"].find("{")
json_end = result["stdout"].rfind("}")
if json_start >= 0 and json_end > json_start:
json_str = result["stdout"][json_start:json_end+1]
pose_data = json.loads(json_str)
detected_poses = []
if "predictions" in pose_data:
for pred in pose_data["predictions"]:
confidence = pred.get("confidence", 0)
keypoints = pred.get("keypoints", [])
detected_poses.append({
"person_confidence": confidence,
"has_keypoints": len(keypoints) if keypoints else 0
})
analysis_results["poses"] = detected_poses
else:
analysis_results["pose_error"] = "Pose model not available"
except Exception as e:
logger.error(f"Error during pose detection: {str(e)}")
analysis_results["pose_error"] = str(e)
# 4. Comprehensive task description
tasks = []
# Detect main objects
main_objects = [obj["class_name"] for obj in detected_objects if obj["confidence"] > 0.5]
if "person" in main_objects:
tasks.append("Person Detection")
# Check for weapon objects
weapon_objects = ["sword", "knife", "katana", "gun", "pistol", "rifle"]
weapons = [obj for obj in main_objects if any(weapon in obj.lower() for weapon in weapon_objects)]
if weapons:
tasks.append(f"Weapon Detection ({', '.join(weapons)})")
# Count people
person_count = main_objects.count("person")
if person_count > 0:
tasks.append(f"Person Count ({person_count} people)")
# Pose analysis
if "poses" in analysis_results and analysis_results["poses"]:
tasks.append("Human Pose Analysis")
# Scene classification
if "scene" in analysis_results and analysis_results["scene"]:
scene_types = [scene["class_name"] for scene in analysis_results["scene"][:2]]
tasks.append(f"Scene Classification ({', '.join(scene_types)})")
analysis_results["identified_tasks"] = tasks
# Return comprehensive results
return {
"status": "success",
"image_path": image_path,
"analysis": analysis_results,
"summary": "Tasks identified in the image: " + ", ".join(tasks) if tasks else "No clear tasks identified"
}
except Exception as e:
return {
"status": "error",
"image_path": image_path,
"error": f"Comprehensive analysis failed: {str(e)}"
}
@mcp.tool()
def analyze_image_from_path(
image_path: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
save_results: bool = False
) -> Dict[str, Any]:
"""
Analyze image from file path using YOLO CLI
Args:
image_path: Path to the image file
model_name: YOLO model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
Returns:
Dictionary containing detection results
"""
try:
# Call detect_objects function with is_path=True
return detect_objects(
image_data=image_path,
model_name=model_name,
confidence=confidence,
save_results=save_results,
is_path=True
)
except Exception as e:
return {
"error": f"Failed to analyze image: {str(e)}",
"image_path": image_path
}
@mcp.tool()
def test_connection() -> Dict[str, Any]:
"""
Test if YOLO CLI service is running properly
Returns:
Status information and available tools
"""
# Test YOLO CLI availability
try:
version_result = run_yolo_cli(["--version"], timeout=10)
yolo_version = version_result.get("stdout", "Unknown") if version_result.get("success") else "Not available"
# Clean up version string
if "ultralytics" in yolo_version.lower():
yolo_version = yolo_version.strip()
else:
yolo_version = "YOLO CLI not found or not responding correctly"
except Exception as e:
yolo_version = f"Error checking YOLO CLI: {str(e)}"
return {
"status": "YOLO CLI service is running normally",
"yolo_version": yolo_version,
"available_models": list_available_models(),
"available_tools": [
"list_available_models", "detect_objects", "segment_objects",
"classify_image", "track_objects", "train_model", "validate_model",
"export_model", "start_camera_detection", "stop_camera_detection",
"get_camera_detections", "test_connection",
# Additional tools
"analyze_image_from_path",
"comprehensive_image_analysis"
],
"features": [
"All detection functions use YOLO CLI rather than Python API",
"Support for loading images directly from file paths",
"Support for comprehensive image analysis with task identification",
"Support for camera detection using YOLO CLI"
]
}
def cleanup_resources():
"""Clean up resources when the server is shutting down"""
global camera_running
logger.info("Cleaning up resources...")
# Stop camera if it's running
if camera_running:
logger.info("Shutting down camera during server exit")
camera_running = False
# Give the camera thread a moment to clean up
if camera_thread and camera_thread.is_alive():
camera_thread.join(timeout=2.0)
logger.info("Cleanup complete")
def signal_handler(sig, frame):
"""Handle termination signals"""
logger.info(f"Received signal {sig}, shutting down...")
cleanup_resources()
sys.exit(0)
def start_watchdog():
"""Start the camera watchdog thread"""
watchdog = threading.Thread(
target=camera_watchdog_thread,
daemon=True
)
watchdog.start()
return watchdog
# Register cleanup functions
atexit.register(cleanup_resources)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Modify the main execution section
if __name__ == "__main__":
import platform
logger.info("Starting YOLO CLI service")
logger.info(f"Platform: {platform.system()} {platform.release()}")
# Test if YOLO CLI is available
try:
test_result = run_yolo_cli(["--version"], timeout=10)
if test_result["success"]:
logger.info(f"YOLO CLI available: {test_result.get('stdout', '').strip()}")
else:
logger.warning(f"YOLO CLI test failed: {test_result.get('stderr', '')}")
logger.warning("Service may not function correctly without YOLO CLI available")
except Exception as e:
logger.error(f"Error testing YOLO CLI: {str(e)}")
logger.warning("Service may not function correctly without YOLO CLI available")
# Start the camera watchdog thread
watchdog_thread = start_watchdog()
# Initialize and run server
logger.info("Starting MCP server...")
mcp.run(transport='stdio')
```