This is page 1 of 2. Use http://codebase.md/gongrzhe/yolo-mcp-server?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .gitignore
├── LICENSE
├── mcp-config.json
├── Readme.md
├── requirements.txt
├── server_cli.py
├── server_combine_terminal.py
├── server.py
└── setup.py
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | # Python-generated files
2 | __pycache__/
3 | *.py[oc]
4 | build/
5 | dist/
6 | wheels/
7 | *.egg-info
8 |
9 | # Virtual environments
10 | .venv
11 | yolo_service.log
12 | *.pt
13 | runs/
```
--------------------------------------------------------------------------------
/Readme.md:
--------------------------------------------------------------------------------
```markdown
1 | # YOLO MCP Service
2 |
3 | A powerful YOLO (You Only Look Once) computer vision service that integrates with Claude AI through Model Context Protocol (MCP). This service enables Claude to perform object detection, segmentation, classification, and real-time camera analysis using state-of-the-art YOLO models.
4 |
5 | 
6 |
7 |
8 | ## Features
9 |
10 | - Object detection, segmentation, classification, and pose estimation
11 | - Real-time camera integration for live object detection
12 | - Support for model training, validation, and export
13 | - Comprehensive image analysis combining multiple models
14 | - Support for both file paths and base64-encoded images
15 | - Seamless integration with Claude AI
16 |
17 | ## Setup Instructions
18 |
19 | ### Prerequisites
20 |
21 | - Python 3.10 or higher
22 | - Git (optional, for cloning the repository)
23 |
24 | ### Environment Setup
25 |
26 | 1. Create a directory for the project and navigate to it:
27 | ```bash
28 | mkdir yolo-mcp-service
29 | cd yolo-mcp-service
30 | ```
31 |
32 | 2. Download the project files or clone from repository:
33 | ```bash
34 | # If you have the files, copy them to this directory
35 | # If using git:
36 | git clone https://github.com/GongRzhe/YOLO-MCP-Server.git .
37 | ```
38 |
39 | 3. Create a virtual environment:
40 | ```bash
41 | # On Windows
42 | python -m venv .venv
43 |
44 | # On macOS/Linux
45 | python3 -m venv .venv
46 | ```
47 |
48 | 4. Activate the virtual environment:
49 | ```bash
50 | # On Windows
51 | .venv\Scripts\activate
52 |
53 | # On macOS/Linux
54 | source .venv/bin/activate
55 | ```
56 |
57 | 5. Run the setup script:
58 | ```bash
59 | python setup.py
60 | ```
61 |
62 | The setup script will:
63 | - Check your Python version
64 | - Create a virtual environment (if not already created)
65 | - Install required dependencies
66 | - Generate an MCP configuration file (mcp-config.json)
67 | - Output configuration information for different MCP clients including Claude
68 |
69 | 6. Note the output from the setup script, which will look similar to:
70 | ```
71 | MCP configuration has been written to: /path/to/mcp-config.json
72 |
73 | MCP configuration for Cursor:
74 |
75 | /path/to/.venv/bin/python /path/to/server.py
76 |
77 | MCP configuration for Windsurf/Claude Desktop:
78 | {
79 | "mcpServers": {
80 | "yolo-service": {
81 | "command": "/path/to/.venv/bin/python",
82 | "args": [
83 | "/path/to/server.py"
84 | ],
85 | "env": {
86 | "PYTHONPATH": "/path/to"
87 | }
88 | }
89 | }
90 | }
91 |
92 | To use with Claude Desktop, merge this configuration into: /path/to/claude_desktop_config.json
93 | ```
94 |
95 | ### Downloading YOLO Models
96 |
97 | Before using the service, you need to download the YOLO models. The service looks for models in the following directories:
98 | - The current directory where the service is running
99 | - A `models` subdirectory
100 | - Any other directory configured in the `CONFIG["model_dirs"]` variable in server.py
101 |
102 | Create a models directory and download some common models:
103 |
104 | ```bash
105 | # Create models directory
106 | mkdir models
107 |
108 | # Download YOLOv8n for basic object detection
109 | curl -L https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt -o models/yolov8n.pt
110 |
111 | # Download YOLOv8n-seg for segmentation
112 | curl -L https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-seg.pt -o models/yolov8n-seg.pt
113 |
114 | # Download YOLOv8n-cls for classification
115 | curl -L https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-cls.pt -o models/yolov8n-cls.pt
116 |
117 | # Download YOLOv8n-pose for pose estimation
118 | curl -L https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-pose.pt -o models/yolov8n-pose.pt
119 | ```
120 |
121 | For Windows PowerShell users:
122 | ```powershell
123 | # Create models directory
124 | mkdir models
125 |
126 | # Download models using Invoke-WebRequest
127 | Invoke-WebRequest -Uri "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt" -OutFile "models/yolov8n.pt"
128 | Invoke-WebRequest -Uri "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-seg.pt" -OutFile "models/yolov8n-seg.pt"
129 | Invoke-WebRequest -Uri "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-cls.pt" -OutFile "models/yolov8n-cls.pt"
130 | Invoke-WebRequest -Uri "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-pose.pt" -OutFile "models/yolov8n-pose.pt"
131 | ```
132 |
133 | ### Configuring Claude
134 |
135 | To use this service with Claude:
136 |
137 | 1. For Claude web: Set up the service on your local machine and use the configuration provided by the setup script in your MCP client.
138 |
139 | 2. For Claude Desktop:
140 | - Run the setup script and note the configuration output
141 | - Locate your Claude Desktop configuration file (the path is provided in the setup script output)
142 | - Add or merge the configuration into your Claude Desktop configuration file
143 | - Restart Claude Desktop
144 |
145 | ## Using YOLO Tools in Claude
146 |
147 | ### 1. First Check Available Models
148 |
149 | Always check which models are available on your system first:
150 |
151 | ```
152 | I'd like to use the YOLO tools. Can you first check which models are available on my system?
153 |
154 | <function_calls>
155 | <invoke name="list_available_models">
156 | </invoke>
157 | </function_calls>
158 | ```
159 |
160 | ### 2. Detecting Objects in an Image
161 |
162 | For analyzing an image file on your computer:
163 |
164 | ```
165 | Can you analyze this image file for objects?
166 |
167 | <function_calls>
168 | <invoke name="analyze_image_from_path">
169 | <parameter name="image_path">/path/to/your/image.jpg</parameter>
170 | <parameter name="confidence">0.3</parameter>
171 | </invoke>
172 | </function_calls>
173 | ```
174 |
175 | You can also specify a different model:
176 |
177 | ```
178 | Can you analyze this image using a different model?
179 |
180 | <function_calls>
181 | <invoke name="analyze_image_from_path">
182 | <parameter name="image_path">/path/to/your/image.jpg</parameter>
183 | <parameter name="model_name">yolov8n.pt</parameter>
184 | <parameter name="confidence">0.4</parameter>
185 | </invoke>
186 | </function_calls>
187 | ```
188 |
189 | ### 3. Running Comprehensive Image Analysis
190 |
191 | For more detailed analysis that combines object detection, classification, and more:
192 |
193 | ```
194 | Can you perform a comprehensive analysis on this image?
195 |
196 | <function_calls>
197 | <invoke name="comprehensive_image_analysis">
198 | <parameter name="image_path">/path/to/your/image.jpg</parameter>
199 | <parameter name="confidence">0.3</parameter>
200 | </invoke>
201 | </function_calls>
202 | ```
203 |
204 | ### 4. Image Segmentation
205 |
206 | For identifying object boundaries and creating segmentation masks:
207 |
208 | ```
209 | Can you perform image segmentation on this photo?
210 |
211 | <function_calls>
212 | <invoke name="segment_objects">
213 | <parameter name="image_data">/path/to/your/image.jpg</parameter>
214 | <parameter name="is_path">true</parameter>
215 | <parameter name="model_name">yolov8n-seg.pt</parameter>
216 | </invoke>
217 | </function_calls>
218 | ```
219 |
220 | ### 5. Image Classification
221 |
222 | For classifying the entire image content:
223 |
224 | ```
225 | What does this image show? Can you classify it?
226 |
227 | <function_calls>
228 | <invoke name="classify_image">
229 | <parameter name="image_data">/path/to/your/image.jpg</parameter>
230 | <parameter name="is_path">true</parameter>
231 | <parameter name="model_name">yolov8n-cls.pt</parameter>
232 | <parameter name="top_k">5</parameter>
233 | </invoke>
234 | </function_calls>
235 | ```
236 |
237 | ### 6. Using Your Computer's Camera
238 |
239 | Start real-time object detection using your computer's camera:
240 |
241 | ```
242 | Can you turn on my camera and detect objects in real-time?
243 |
244 | <function_calls>
245 | <invoke name="start_camera_detection">
246 | <parameter name="model_name">yolov8n.pt</parameter>
247 | <parameter name="confidence">0.3</parameter>
248 | </invoke>
249 | </function_calls>
250 | ```
251 |
252 | Get the latest camera detections:
253 |
254 | ```
255 | What are you seeing through my camera right now?
256 |
257 | <function_calls>
258 | <invoke name="get_camera_detections">
259 | </invoke>
260 | </function_calls>
261 | ```
262 |
263 | Stop the camera when finished:
264 |
265 | ```
266 | Please turn off the camera.
267 |
268 | <function_calls>
269 | <invoke name="stop_camera_detection">
270 | </invoke>
271 | </function_calls>
272 | ```
273 |
274 | ### 7. Advanced Model Operations
275 |
276 | #### Training a Custom Model
277 |
278 | ```
279 | I want to train a custom object detection model on my dataset.
280 |
281 | <function_calls>
282 | <invoke name="train_model">
283 | <parameter name="dataset_path">/path/to/your/dataset</parameter>
284 | <parameter name="model_name">yolov8n.pt</parameter>
285 | <parameter name="epochs">50</parameter>
286 | </invoke>
287 | </function_calls>
288 | ```
289 |
290 | #### Validating a Model
291 |
292 | ```
293 | Can you validate the performance of my model on a test dataset?
294 |
295 | <function_calls>
296 | <invoke name="validate_model">
297 | <parameter name="model_path">/path/to/your/trained/model.pt</parameter>
298 | <parameter name="data_path">/path/to/validation/dataset</parameter>
299 | </invoke>
300 | </function_calls>
301 | ```
302 |
303 | #### Exporting a Model to Different Formats
304 |
305 | ```
306 | I need to export my YOLO model to ONNX format.
307 |
308 | <function_calls>
309 | <invoke name="export_model">
310 | <parameter name="model_path">/path/to/your/model.pt</parameter>
311 | <parameter name="format">onnx</parameter>
312 | </invoke>
313 | </function_calls>
314 | ```
315 |
316 | ### 8. Testing Connection
317 |
318 | Check if the YOLO service is running correctly:
319 |
320 | ```
321 | Is the YOLO service running correctly?
322 |
323 | <function_calls>
324 | <invoke name="test_connection">
325 | </invoke>
326 | </function_calls>
327 | ```
328 |
329 | ## Troubleshooting
330 |
331 | ### Camera Issues
332 |
333 | If the camera doesn't work, try different camera IDs:
334 |
335 | ```
336 | <function_calls>
337 | <invoke name="start_camera_detection">
338 | <parameter name="camera_id">1</parameter> <!-- Try 0, 1, or 2 -->
339 | </invoke>
340 | </function_calls>
341 | ```
342 |
343 | ### Model Not Found
344 |
345 | If a model is not found, make sure you've downloaded it to one of the configured directories:
346 |
347 | ```
348 | <function_calls>
349 | <invoke name="get_model_directories">
350 | </invoke>
351 | </function_calls>
352 | ```
353 |
354 | ### Performance Issues
355 |
356 | For better performance with limited resources, use the smaller models (e.g., yolov8n.pt instead of yolov8x.pt)
357 |
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
1 | mcp
2 | ultralytics
3 | opencv-python
4 | numpy
5 | pillow
```
--------------------------------------------------------------------------------
/mcp-config.json:
--------------------------------------------------------------------------------
```json
1 | {
2 | "mcpServers": {
3 | "yolo-service": {
4 | "command": "D:\\BackDataService\\YOLO-MCP-Server\\.venv\\Scripts\\python.exe",
5 | "args": [
6 | "D:\\BackDataService\\YOLO-MCP-Server\\server.py"
7 | ],
8 | "env": {
9 | "PYTHONPATH": "D:\\BackDataService\\YOLO-MCP-Server"
10 | }
11 | }
12 | }
13 | }
14 |
```
--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------
```python
1 | # Import necessary Python standard libraries
2 | import os # For operating with file system, handling files and directory paths
3 | import json # For processing JSON format data
4 | import subprocess # For creating and managing subprocesses
5 | import sys # For accessing Python interpreter related variables and functions
6 | import platform # For getting current operating system information
7 |
8 | def setup_venv():
9 | """
10 | Function to set up Python virtual environment
11 |
12 | Features:
13 | - Checks if Python version meets requirements (3.10+)
14 | - Creates Python virtual environment (if it doesn't exist)
15 | - Installs required dependencies in the newly created virtual environment
16 |
17 | No parameters required
18 |
19 | Returns: Path to Python interpreter in the virtual environment
20 | """
21 | # Check Python version
22 | python_version = sys.version_info
23 | if python_version.major < 3 or (python_version.major == 3 and python_version.minor < 10):
24 | print("Error: Python 3.10 or higher is required.")
25 | sys.exit(1)
26 |
27 | # Get absolute path of the directory containing the current script
28 | base_path = os.path.abspath(os.path.dirname(__file__))
29 | # Set virtual environment directory path, will create a directory named '.venv' under base_path
30 | venv_path = os.path.join(base_path, '.venv')
31 | # Flag whether a new virtual environment was created
32 | venv_created = False
33 |
34 | # Check if virtual environment already exists
35 | if not os.path.exists(venv_path):
36 | print("Creating virtual environment...")
37 | # Use Python's venv module to create virtual environment
38 | # sys.executable gets the path of the current Python interpreter
39 | subprocess.run([sys.executable, '-m', 'venv', venv_path], check=True)
40 | print("Virtual environment created successfully!")
41 | venv_created = True
42 | else:
43 | print("Virtual environment already exists.")
44 |
45 | # Determine pip and python executable paths based on operating system
46 | is_windows = platform.system() == "Windows"
47 | if is_windows:
48 | pip_path = os.path.join(venv_path, 'Scripts', 'pip.exe')
49 | python_path = os.path.join(venv_path, 'Scripts', 'python.exe')
50 | else:
51 | pip_path = os.path.join(venv_path, 'bin', 'pip')
52 | python_path = os.path.join(venv_path, 'bin', 'python')
53 |
54 | # Install or update dependencies
55 | print("\nInstalling requirements...")
56 |
57 | # Create requirements.txt with necessary packages for YOLO MCP server
58 | requirements = [
59 | "mcp", # Model Context Protocol for server
60 | "ultralytics", # YOLO models
61 | "opencv-python", # For camera operations
62 | "numpy", # Numerical operations
63 | "pillow" # Image processing
64 | ]
65 |
66 | requirements_path = os.path.join(base_path, 'requirements.txt')
67 | with open(requirements_path, 'w') as f:
68 | f.write('\n'.join(requirements))
69 |
70 | # Update pip using the python executable (more reliable method)
71 | try:
72 | subprocess.run([python_path, '-m', 'pip', 'install', '--upgrade', 'pip'], check=True)
73 | print("Pip upgraded successfully.")
74 | except subprocess.CalledProcessError:
75 | print("Warning: Pip upgrade failed, continuing with existing version.")
76 |
77 | # Install requirements
78 | subprocess.run([pip_path, 'install', '-r', requirements_path], check=True)
79 |
80 | print("Requirements installed successfully!")
81 |
82 | return python_path
83 |
84 | def generate_mcp_config(python_path):
85 | """
86 | Function to generate MCP (Model Context Protocol) configuration file for YOLO service
87 |
88 | Features:
89 | - Creates configuration containing Python interpreter path and server script path
90 | - Saves configuration as JSON format file
91 | - Prints configuration information for different MCP clients
92 |
93 | Parameters:
94 | - python_path: Path to Python interpreter in the virtual environment
95 |
96 | Returns: None
97 | """
98 | # Get absolute path of the directory containing the current script
99 | base_path = os.path.abspath(os.path.dirname(__file__))
100 |
101 | # Path to YOLO MCP server script
102 | server_script_path = os.path.join(base_path, 'server.py')
103 |
104 | # Create MCP configuration dictionary
105 | config = {
106 | "mcpServers": {
107 | "yolo-service": {
108 | "command": python_path,
109 | "args": [server_script_path],
110 | "env": {
111 | "PYTHONPATH": base_path
112 | }
113 | }
114 | }
115 | }
116 |
117 | # Save configuration to JSON file
118 | config_path = os.path.join(base_path, 'mcp-config.json')
119 | with open(config_path, 'w') as f:
120 | json.dump(config, f, indent=2) # indent=2 gives the JSON file good formatting
121 |
122 | # Print configuration information
123 | print(f"\nMCP configuration has been written to: {config_path}")
124 | print(f"\nMCP configuration for Cursor:\n\n{python_path} {server_script_path}")
125 | print("\nMCP configuration for Windsurf/Claude Desktop:")
126 | print(json.dumps(config, indent=2))
127 |
128 | # Provide instructions for adding configuration to Claude Desktop configuration file
129 | if platform.system() == "Windows":
130 | claude_config_path = os.path.expandvars("%APPDATA%\\Claude\\claude_desktop_config.json")
131 | else: # macOS
132 | claude_config_path = os.path.expanduser("~/Library/Application Support/Claude/claude_desktop_config.json")
133 |
134 | print(f"\nTo use with Claude Desktop, merge this configuration into: {claude_config_path}")
135 |
136 | # Code executed when the script is run directly (not imported)
137 | if __name__ == '__main__':
138 | # Execute main functions in sequence:
139 | # 1. Set up virtual environment and install dependencies
140 | python_path = setup_venv()
141 | # 2. Generate MCP configuration file
142 | generate_mcp_config(python_path)
143 |
144 | print("\nSetup complete! You can now use the YOLO MCP server with compatible clients.")
```
--------------------------------------------------------------------------------
/server_combine_terminal.py:
--------------------------------------------------------------------------------
```python
1 | # server.py - CLI version (command return only)
2 | import fnmatch
3 | import os
4 | import base64
5 | import time
6 | import threading
7 | import json
8 | import tempfile
9 | import platform
10 | from io import BytesIO
11 | from typing import List, Dict, Any, Optional, Union
12 | import numpy as np
13 | from PIL import Image
14 |
15 | from mcp.server.fastmcp import FastMCP
16 |
17 | # Set up logging configuration
18 | import os.path
19 | import sys
20 | import logging
21 | import contextlib
22 | import signal
23 | import atexit
24 |
25 | logging.basicConfig(
26 | level=logging.INFO,
27 | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
28 | handlers=[
29 | logging.FileHandler("yolo_service.log"),
30 | logging.StreamHandler(sys.stderr)
31 | ]
32 | )
33 | camera_startup_status = None # Will store error details if startup fails
34 | camera_last_error = None
35 | logger = logging.getLogger('yolo_service')
36 |
37 | # Global variables for camera control
38 | camera_running = False
39 | camera_thread = None
40 | detection_results = []
41 | camera_last_access_time = 0
42 | CAMERA_INACTIVITY_TIMEOUT = 60 # Auto-shutdown after 60 seconds of inactivity
43 |
44 | def load_image(image_source, is_path=False):
45 | """
46 | Load image from file path or base64 data
47 |
48 | Args:
49 | image_source: File path or base64 encoded image data
50 | is_path: Whether image_source is a file path
51 |
52 | Returns:
53 | PIL Image object
54 | """
55 | try:
56 | if is_path:
57 | # Load image from file path
58 | if os.path.exists(image_source):
59 | return Image.open(image_source)
60 | else:
61 | raise FileNotFoundError(f"Image file not found: {image_source}")
62 | else:
63 | # Load image from base64 data
64 | image_bytes = base64.b64decode(image_source)
65 | return Image.open(BytesIO(image_bytes))
66 | except Exception as e:
67 | raise ValueError(f"Failed to load image: {str(e)}")
68 |
69 | # Modified function to just return the command string
70 | def run_yolo_cli(command_args, capture_output=True, timeout=60):
71 | """
72 | Return the YOLO CLI command string without executing it
73 |
74 | Args:
75 | command_args: List of command arguments to pass to yolo CLI
76 | capture_output: Not used, kept for compatibility with original function
77 | timeout: Not used, kept for compatibility with original function
78 |
79 | Returns:
80 | Dictionary containing the command string
81 | """
82 | # Build the complete command
83 | cmd = ["yolo"] + command_args
84 | cmd_str = " ".join(cmd)
85 |
86 | # Log the command
87 | logger.info(f"Would run YOLO CLI command: {cmd_str}")
88 |
89 | # Return the command string in a similar structure as the original function
90 | return {
91 | "success": True,
92 | "command": cmd_str,
93 | "would_execute": True,
94 | "note": "CLI execution disabled, showing command only"
95 | }
96 |
97 | # Create MCP server
98 | mcp = FastMCP("YOLO_Service")
99 |
100 | # Global configuration
101 | CONFIG = {
102 | "model_dirs": [
103 | ".", # Current directory
104 | "./models", # Models subdirectory
105 | os.path.join(os.path.dirname(os.path.abspath(__file__)), "models"),
106 | ]
107 | }
108 |
109 | # Function to save base64 data to temp file
110 | def save_base64_to_temp(base64_data, prefix="image", suffix=".jpg"):
111 | """Save base64 encoded data to a temporary file and return the path"""
112 | try:
113 | # Create a temporary file
114 | fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix=prefix)
115 |
116 | # Decode base64 data
117 | image_data = base64.b64decode(base64_data)
118 |
119 | # Write data to file
120 | with os.fdopen(fd, 'wb') as temp_file:
121 | temp_file.write(image_data)
122 |
123 | return temp_path
124 | except Exception as e:
125 | logger.error(f"Error saving base64 to temp file: {str(e)}")
126 | raise ValueError(f"Failed to save base64 data: {str(e)}")
127 |
128 | @mcp.tool()
129 | def get_model_directories() -> Dict[str, Any]:
130 | """Get information about configured model directories and available models"""
131 | directories = []
132 |
133 | for directory in CONFIG["model_dirs"]:
134 | dir_info = {
135 | "path": directory,
136 | "exists": os.path.exists(directory),
137 | "is_directory": os.path.isdir(directory) if os.path.exists(directory) else False,
138 | "models": []
139 | }
140 |
141 | if dir_info["exists"] and dir_info["is_directory"]:
142 | for filename in os.listdir(directory):
143 | if filename.endswith(".pt"):
144 | dir_info["models"].append(filename)
145 |
146 | directories.append(dir_info)
147 |
148 | return {
149 | "configured_directories": CONFIG["model_dirs"],
150 | "directory_details": directories,
151 | "available_models": list_available_models(),
152 | "loaded_models": [] # No longer track loaded models with CLI approach
153 | }
154 |
155 | @mcp.tool()
156 | def detect_objects(
157 | image_data: str,
158 | model_name: str = "yolov8n.pt",
159 | confidence: float = 0.25,
160 | save_results: bool = False,
161 | is_path: bool = False
162 | ) -> Dict[str, Any]:
163 | """
164 | Return the YOLO CLI command for object detection without executing it
165 |
166 | Args:
167 | image_data: Base64 encoded image or file path (if is_path=True)
168 | model_name: YOLO model name
169 | confidence: Detection confidence threshold
170 | save_results: Whether to save results to disk
171 | is_path: Whether image_data is a file path
172 |
173 | Returns:
174 | Dictionary containing command that would be executed
175 | """
176 | try:
177 | # Determine source path
178 | if is_path:
179 | source_path = image_data
180 | if not os.path.exists(source_path):
181 | return {
182 | "error": f"Image file not found: {source_path}",
183 | "source": source_path
184 | }
185 | else:
186 | # For base64, we would save to temp file, but we'll just indicate this
187 | source_path = "[temp_file_from_base64]"
188 |
189 | # Determine full model path
190 | model_path = None
191 | for directory in CONFIG["model_dirs"]:
192 | potential_path = os.path.join(directory, model_name)
193 | if os.path.exists(potential_path):
194 | model_path = potential_path
195 | break
196 |
197 | if model_path is None:
198 | available = list_available_models()
199 | available_str = ", ".join(available) if available else "none"
200 | return {
201 | "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
202 | "source": image_data if is_path else "base64_image"
203 | }
204 |
205 | # Setup output directory for save_results
206 | output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
207 |
208 | # Build YOLO CLI command
209 | cmd_args = [
210 | "detect", # Task
211 | "predict", # Mode
212 | f"model={model_path}",
213 | f"source={source_path}",
214 | f"conf={confidence}",
215 | "format=json", # Request JSON output for parsing
216 | ]
217 |
218 | if save_results:
219 | cmd_args.append(f"project={output_dir}")
220 | cmd_args.append("save=True")
221 | else:
222 | cmd_args.append("save=False")
223 |
224 | # Get command string without executing
225 | result = run_yolo_cli(cmd_args)
226 |
227 | # Return command information
228 | return {
229 | "status": "command_generated",
230 | "model_used": model_name,
231 | "model_path": model_path,
232 | "source": source_path,
233 | "command": result["command"],
234 | "note": "Command generated but not executed - detection results would be returned from actual execution",
235 | "parameters": {
236 | "confidence": confidence,
237 | "save_results": save_results,
238 | "is_path": is_path,
239 | "output_dir": output_dir if save_results else None
240 | }
241 | }
242 |
243 | except Exception as e:
244 | logger.error(f"Error in detect_objects command generation: {str(e)}")
245 | return {
246 | "error": f"Failed to generate detection command: {str(e)}",
247 | "source": image_data if is_path else "base64_image"
248 | }
249 |
250 | @mcp.tool()
251 | def segment_objects(
252 | image_data: str,
253 | model_name: str = "yolov11n-seg.pt",
254 | confidence: float = 0.25,
255 | save_results: bool = False,
256 | is_path: bool = False
257 | ) -> Dict[str, Any]:
258 | """
259 | Return the YOLO CLI command for segmentation without executing it
260 |
261 | Args:
262 | image_data: Base64 encoded image or file path (if is_path=True)
263 | model_name: YOLO segmentation model name
264 | confidence: Detection confidence threshold
265 | save_results: Whether to save results to disk
266 | is_path: Whether image_data is a file path
267 |
268 | Returns:
269 | Dictionary containing command that would be executed
270 | """
271 | try:
272 | # Determine source path
273 | if is_path:
274 | source_path = image_data
275 | if not os.path.exists(source_path):
276 | return {
277 | "error": f"Image file not found: {source_path}",
278 | "source": source_path
279 | }
280 | else:
281 | # For base64, we would save to temp file, but we'll just indicate this
282 | source_path = "[temp_file_from_base64]"
283 |
284 | # Determine full model path
285 | model_path = None
286 | for directory in CONFIG["model_dirs"]:
287 | potential_path = os.path.join(directory, model_name)
288 | if os.path.exists(potential_path):
289 | model_path = potential_path
290 | break
291 |
292 | if model_path is None:
293 | available = list_available_models()
294 | available_str = ", ".join(available) if available else "none"
295 | return {
296 | "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
297 | "source": image_data if is_path else "base64_image"
298 | }
299 |
300 | # Setup output directory for save_results
301 | output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
302 |
303 | # Build YOLO CLI command
304 | cmd_args = [
305 | "segment", # Task
306 | "predict", # Mode
307 | f"model={model_path}",
308 | f"source={source_path}",
309 | f"conf={confidence}",
310 | "format=json", # Request JSON output for parsing
311 | ]
312 |
313 | if save_results:
314 | cmd_args.append(f"project={output_dir}")
315 | cmd_args.append("save=True")
316 | else:
317 | cmd_args.append("save=False")
318 |
319 | # Get command string without executing
320 | result = run_yolo_cli(cmd_args)
321 |
322 | # Return command information
323 | return {
324 | "status": "command_generated",
325 | "model_used": model_name,
326 | "model_path": model_path,
327 | "source": source_path,
328 | "command": result["command"],
329 | "note": "Command generated but not executed - segmentation results would be returned from actual execution",
330 | "parameters": {
331 | "confidence": confidence,
332 | "save_results": save_results,
333 | "is_path": is_path,
334 | "output_dir": output_dir if save_results else None
335 | }
336 | }
337 |
338 | except Exception as e:
339 | logger.error(f"Error in segment_objects command generation: {str(e)}")
340 | return {
341 | "error": f"Failed to generate segmentation command: {str(e)}",
342 | "source": image_data if is_path else "base64_image"
343 | }
344 |
345 | @mcp.tool()
346 | def classify_image(
347 | image_data: str,
348 | model_name: str = "yolov11n-cls.pt",
349 | top_k: int = 5,
350 | save_results: bool = False,
351 | is_path: bool = False
352 | ) -> Dict[str, Any]:
353 | """
354 | Return the YOLO CLI command for image classification without executing it
355 |
356 | Args:
357 | image_data: Base64 encoded image or file path (if is_path=True)
358 | model_name: YOLO classification model name
359 | top_k: Number of top categories to return
360 | save_results: Whether to save results to disk
361 | is_path: Whether image_data is a file path
362 |
363 | Returns:
364 | Dictionary containing command that would be executed
365 | """
366 | try:
367 | # Determine source path
368 | if is_path:
369 | source_path = image_data
370 | if not os.path.exists(source_path):
371 | return {
372 | "error": f"Image file not found: {source_path}",
373 | "source": source_path
374 | }
375 | else:
376 | # For base64, we would save to temp file, but we'll just indicate this
377 | source_path = "[temp_file_from_base64]"
378 |
379 | # Determine full model path
380 | model_path = None
381 | for directory in CONFIG["model_dirs"]:
382 | potential_path = os.path.join(directory, model_name)
383 | if os.path.exists(potential_path):
384 | model_path = potential_path
385 | break
386 |
387 | if model_path is None:
388 | available = list_available_models()
389 | available_str = ", ".join(available) if available else "none"
390 | return {
391 | "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
392 | "source": image_data if is_path else "base64_image"
393 | }
394 |
395 | # Setup output directory for save_results
396 | output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
397 |
398 | # Build YOLO CLI command
399 | cmd_args = [
400 | "classify", # Task
401 | "predict", # Mode
402 | f"model={model_path}",
403 | f"source={source_path}",
404 | "format=json", # Request JSON output for parsing
405 | ]
406 |
407 | if save_results:
408 | cmd_args.append(f"project={output_dir}")
409 | cmd_args.append("save=True")
410 | else:
411 | cmd_args.append("save=False")
412 |
413 | # Get command string without executing
414 | result = run_yolo_cli(cmd_args)
415 |
416 | # Return command information
417 | return {
418 | "status": "command_generated",
419 | "model_used": model_name,
420 | "model_path": model_path,
421 | "source": source_path,
422 | "command": result["command"],
423 | "note": "Command generated but not executed - classification results would be returned from actual execution",
424 | "parameters": {
425 | "top_k": top_k,
426 | "save_results": save_results,
427 | "is_path": is_path,
428 | "output_dir": output_dir if save_results else None
429 | }
430 | }
431 |
432 | except Exception as e:
433 | logger.error(f"Error in classify_image command generation: {str(e)}")
434 | return {
435 | "error": f"Failed to generate classification command: {str(e)}",
436 | "source": image_data if is_path else "base64_image"
437 | }
438 |
439 | @mcp.tool()
440 | def track_objects(
441 | image_data: str,
442 | model_name: str = "yolov8n.pt",
443 | confidence: float = 0.25,
444 | tracker: str = "bytetrack.yaml",
445 | save_results: bool = False
446 | ) -> Dict[str, Any]:
447 | """
448 | Return the YOLO CLI command for object tracking without executing it
449 |
450 | Args:
451 | image_data: Base64 encoded image
452 | model_name: YOLO model name
453 | confidence: Detection confidence threshold
454 | tracker: Tracker name to use (e.g., 'bytetrack.yaml', 'botsort.yaml')
455 | save_results: Whether to save results to disk
456 |
457 | Returns:
458 | Dictionary containing command that would be executed
459 | """
460 | try:
461 | # For base64, we would save to temp file, but we'll just indicate this
462 | source_path = "[temp_file_from_base64]"
463 |
464 | # Determine full model path
465 | model_path = None
466 | for directory in CONFIG["model_dirs"]:
467 | potential_path = os.path.join(directory, model_name)
468 | if os.path.exists(potential_path):
469 | model_path = potential_path
470 | break
471 |
472 | if model_path is None:
473 | available = list_available_models()
474 | available_str = ", ".join(available) if available else "none"
475 | return {
476 | "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
477 | }
478 |
479 | # Setup output directory for save_results
480 | output_dir = os.path.join(tempfile.gettempdir(), "yolo_track_results")
481 |
482 | # Build YOLO CLI command
483 | cmd_args = [
484 | "track", # Combined task and mode for tracking
485 | f"model={model_path}",
486 | f"source={source_path}",
487 | f"conf={confidence}",
488 | f"tracker={tracker}",
489 | "format=json", # Request JSON output for parsing
490 | ]
491 |
492 | if save_results:
493 | cmd_args.append(f"project={output_dir}")
494 | cmd_args.append("save=True")
495 | else:
496 | cmd_args.append("save=False")
497 |
498 | # Get command string without executing
499 | result = run_yolo_cli(cmd_args)
500 |
501 | # Return command information
502 | return {
503 | "status": "command_generated",
504 | "model_used": model_name,
505 | "model_path": model_path,
506 | "source": source_path,
507 | "command": result["command"],
508 | "note": "Command generated but not executed - tracking results would be returned from actual execution",
509 | "parameters": {
510 | "confidence": confidence,
511 | "tracker": tracker,
512 | "save_results": save_results,
513 | "output_dir": output_dir if save_results else None
514 | }
515 | }
516 |
517 | except Exception as e:
518 | logger.error(f"Error in track_objects command generation: {str(e)}")
519 | return {
520 | "error": f"Failed to generate tracking command: {str(e)}"
521 | }
522 |
523 | @mcp.tool()
524 | def train_model(
525 | dataset_path: str,
526 | model_name: str = "yolov8n.pt",
527 | epochs: int = 100,
528 | imgsz: int = 640,
529 | batch: int = 16,
530 | name: str = "yolo_custom_model",
531 | project: str = "runs/train"
532 | ) -> Dict[str, Any]:
533 | """
534 | Return the YOLO CLI command for model training without executing it
535 |
536 | Args:
537 | dataset_path: Path to YOLO format dataset
538 | model_name: Base model to start with
539 | epochs: Number of training epochs
540 | imgsz: Image size for training
541 | batch: Batch size
542 | name: Name for the training run
543 | project: Project directory
544 |
545 | Returns:
546 | Dictionary containing command that would be executed
547 | """
548 | # Validate dataset path
549 | if not os.path.exists(dataset_path):
550 | return {"error": f"Dataset not found: {dataset_path}"}
551 |
552 | # Determine full model path
553 | model_path = None
554 | for directory in CONFIG["model_dirs"]:
555 | potential_path = os.path.join(directory, model_name)
556 | if os.path.exists(potential_path):
557 | model_path = potential_path
558 | break
559 |
560 | if model_path is None:
561 | available = list_available_models()
562 | available_str = ", ".join(available) if available else "none"
563 | return {
564 | "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
565 | }
566 |
567 | # Determine task type based on model name
568 | task = "detect" # Default task
569 | if "seg" in model_name:
570 | task = "segment"
571 | elif "pose" in model_name:
572 | task = "pose"
573 | elif "cls" in model_name:
574 | task = "classify"
575 | elif "obb" in model_name:
576 | task = "obb"
577 |
578 | # Build YOLO CLI command
579 | cmd_args = [
580 | task, # Task
581 | "train", # Mode
582 | f"model={model_path}",
583 | f"data={dataset_path}",
584 | f"epochs={epochs}",
585 | f"imgsz={imgsz}",
586 | f"batch={batch}",
587 | f"name={name}",
588 | f"project={project}"
589 | ]
590 |
591 | # Get command string without executing
592 | result = run_yolo_cli(cmd_args)
593 |
594 | # Return command information
595 | return {
596 | "status": "command_generated",
597 | "model_used": model_name,
598 | "model_path": model_path,
599 | "command": result["command"],
600 | "note": "Command generated but not executed - training would start with actual execution",
601 | "parameters": {
602 | "dataset_path": dataset_path,
603 | "epochs": epochs,
604 | "imgsz": imgsz,
605 | "batch": batch,
606 | "name": name,
607 | "project": project,
608 | "task": task
609 | }
610 | }
611 |
612 | @mcp.tool()
613 | def validate_model(
614 | model_path: str,
615 | data_path: str,
616 | imgsz: int = 640,
617 | batch: int = 16
618 | ) -> Dict[str, Any]:
619 | """
620 | Return the YOLO CLI command for model validation without executing it
621 |
622 | Args:
623 | model_path: Path to YOLO model (.pt file)
624 | data_path: Path to YOLO format validation dataset
625 | imgsz: Image size for validation
626 | batch: Batch size
627 |
628 | Returns:
629 | Dictionary containing command that would be executed
630 | """
631 | # Validate model path
632 | if not os.path.exists(model_path):
633 | return {"error": f"Model file not found: {model_path}"}
634 |
635 | # Validate dataset path
636 | if not os.path.exists(data_path):
637 | return {"error": f"Dataset not found: {data_path}"}
638 |
639 | # Determine task type based on model name
640 | model_name = os.path.basename(model_path)
641 | task = "detect" # Default task
642 | if "seg" in model_name:
643 | task = "segment"
644 | elif "pose" in model_name:
645 | task = "pose"
646 | elif "cls" in model_name:
647 | task = "classify"
648 | elif "obb" in model_name:
649 | task = "obb"
650 |
651 | # Build YOLO CLI command
652 | cmd_args = [
653 | task, # Task
654 | "val", # Mode
655 | f"model={model_path}",
656 | f"data={data_path}",
657 | f"imgsz={imgsz}",
658 | f"batch={batch}"
659 | ]
660 |
661 | # Get command string without executing
662 | result = run_yolo_cli(cmd_args)
663 |
664 | # Return command information
665 | return {
666 | "status": "command_generated",
667 | "model_path": model_path,
668 | "command": result["command"],
669 | "note": "Command generated but not executed - validation would begin with actual execution",
670 | "parameters": {
671 | "data_path": data_path,
672 | "imgsz": imgsz,
673 | "batch": batch,
674 | "task": task
675 | }
676 | }
677 |
678 | @mcp.tool()
679 | def export_model(
680 | model_path: str,
681 | format: str = "onnx",
682 | imgsz: int = 640
683 | ) -> Dict[str, Any]:
684 | """
685 | Return the YOLO CLI command for model export without executing it
686 |
687 | Args:
688 | model_path: Path to YOLO model (.pt file)
689 | format: Export format (onnx, torchscript, openvino, etc.)
690 | imgsz: Image size for export
691 |
692 | Returns:
693 | Dictionary containing command that would be executed
694 | """
695 | # Validate model path
696 | if not os.path.exists(model_path):
697 | return {"error": f"Model file not found: {model_path}"}
698 |
699 | # Valid export formats
700 | valid_formats = [
701 | "torchscript", "onnx", "openvino", "engine", "coreml", "saved_model",
702 | "pb", "tflite", "edgetpu", "tfjs", "paddle"
703 | ]
704 |
705 | if format not in valid_formats:
706 | return {"error": f"Invalid export format: {format}. Valid formats include: {', '.join(valid_formats)}"}
707 |
708 | # Build YOLO CLI command
709 | cmd_args = [
710 | "export", # Combined task and mode for export
711 | f"model={model_path}",
712 | f"format={format}",
713 | f"imgsz={imgsz}"
714 | ]
715 |
716 | # Get command string without executing
717 | result = run_yolo_cli(cmd_args)
718 |
719 | # Return command information
720 | return {
721 | "status": "command_generated",
722 | "model_path": model_path,
723 | "command": result["command"],
724 | "note": "Command generated but not executed - export would begin with actual execution",
725 | "parameters": {
726 | "format": format,
727 | "imgsz": imgsz,
728 | "expected_output": f"{os.path.splitext(model_path)[0]}.{format}"
729 | }
730 | }
731 |
732 | @mcp.tool()
733 | def list_available_models() -> List[str]:
734 | """List available YOLO models that actually exist on disk in any configured directory"""
735 | # Common YOLO model patterns
736 | model_patterns = [
737 | "yolov11*.pt",
738 | "yolov8*.pt"
739 | ]
740 |
741 | # Find all existing models in all configured directories
742 | available_models = set()
743 | for directory in CONFIG["model_dirs"]:
744 | if not os.path.exists(directory):
745 | continue
746 |
747 | # Check for model files directly
748 | for filename in os.listdir(directory):
749 | if filename.endswith(".pt") and any(
750 | fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
751 | ):
752 | available_models.add(filename)
753 |
754 | # Convert to sorted list
755 | result = sorted(list(available_models))
756 |
757 | if not result:
758 | logger.warning("No model files found in configured directories.")
759 | return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
760 |
761 | return result
762 |
763 | @mcp.tool()
764 | def start_camera_detection(
765 | model_name: str = "yolov8n.pt",
766 | confidence: float = 0.25,
767 | camera_id: int = 0
768 | ) -> Dict[str, Any]:
769 | """
770 | Return the YOLO CLI command for starting camera detection without executing it
771 |
772 | Args:
773 | model_name: YOLO model name to use
774 | confidence: Detection confidence threshold
775 | camera_id: Camera device ID (0 is usually the default camera)
776 |
777 | Returns:
778 | Dictionary containing command that would be executed
779 | """
780 | # Determine full model path
781 | model_path = None
782 | for directory in CONFIG["model_dirs"]:
783 | potential_path = os.path.join(directory, model_name)
784 | if os.path.exists(potential_path):
785 | model_path = potential_path
786 | break
787 |
788 | if model_path is None:
789 | available = list_available_models()
790 | available_str = ", ".join(available) if available else "none"
791 | return {
792 | "error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
793 | }
794 |
795 | # Determine task type based on model name
796 | task = "detect" # Default task
797 | if "seg" in model_name:
798 | task = "segment"
799 | elif "pose" in model_name:
800 | task = "pose"
801 | elif "cls" in model_name:
802 | task = "classify"
803 |
804 | # Build YOLO CLI command
805 | cmd_args = [
806 | task, # Task
807 | "predict", # Mode
808 | f"model={model_path}",
809 | f"source={camera_id}", # Camera source ID
810 | f"conf={confidence}",
811 | "format=json",
812 | "save=False", # Don't save frames by default
813 | "show=True" # Show GUI window for camera view
814 | ]
815 |
816 | # Get command string without executing
817 | result = run_yolo_cli(cmd_args)
818 |
819 | # Return command information
820 | return {
821 | "status": "command_generated",
822 | "model_used": model_name,
823 | "model_path": model_path,
824 | "command": result["command"],
825 | "note": "Command generated but not executed - camera would start with actual execution",
826 | "parameters": {
827 | "confidence": confidence,
828 | "camera_id": camera_id,
829 | "task": task
830 | }
831 | }
832 |
833 | @mcp.tool()
834 | def stop_camera_detection() -> Dict[str, Any]:
835 | """
836 | Simulate stopping camera detection (no actual command to execute)
837 |
838 | Returns:
839 | Information message
840 | """
841 | return {
842 | "status": "command_generated",
843 | "message": "To stop camera detection, close the YOLO window or press 'q' in the terminal",
844 | "note": "Since commands are not executed, no actual camera is running"
845 | }
846 |
847 | @mcp.tool()
848 | def get_camera_detections() -> Dict[str, Any]:
849 | """
850 | Simulate getting latest camera detections (no actual command to execute)
851 |
852 | Returns:
853 | Information message
854 | """
855 | return {
856 | "status": "command_generated",
857 | "message": "Camera detections would be returned here if a camera was running",
858 | "note": "Since commands are not executed, no camera is running and no detections are available"
859 | }
860 |
861 | @mcp.tool()
862 | def comprehensive_image_analysis(
863 | image_path: str,
864 | confidence: float = 0.25,
865 | save_results: bool = False
866 | ) -> Dict[str, Any]:
867 | """
868 | Return the YOLO CLI commands for comprehensive image analysis without executing them
869 |
870 | Args:
871 | image_path: Path to the image file
872 | confidence: Detection confidence threshold
873 | save_results: Whether to save results to disk
874 |
875 | Returns:
876 | Dictionary containing commands that would be executed
877 | """
878 | if not os.path.exists(image_path):
879 | return {"error": f"Image file not found: {image_path}"}
880 |
881 | commands = []
882 |
883 | # 1. Object detection
884 | detect_result = detect_objects(
885 | image_data=image_path,
886 | model_name="yolov11n.pt",
887 | confidence=confidence,
888 | save_results=save_results,
889 | is_path=True
890 | )
891 | if "command" in detect_result:
892 | commands.append({
893 | "task": "object_detection",
894 | "command": detect_result["command"]
895 | })
896 |
897 | # 2. Scene classification
898 | try:
899 | cls_result = classify_image(
900 | image_data=image_path,
901 | model_name="yolov8n-cls.pt",
902 | top_k=3,
903 | save_results=save_results,
904 | is_path=True
905 | )
906 | if "command" in cls_result:
907 | commands.append({
908 | "task": "classification",
909 | "command": cls_result["command"]
910 | })
911 | except Exception as e:
912 | logger.error(f"Error generating classification command: {str(e)}")
913 |
914 | # 3. Pose detection if available
915 | for directory in CONFIG["model_dirs"]:
916 | pose_model_path = os.path.join(directory, "yolov8n-pose.pt")
917 | if os.path.exists(pose_model_path):
918 | # Build YOLO CLI command for pose detection
919 | cmd_args = [
920 | "pose", # Task
921 | "predict", # Mode
922 | f"model={pose_model_path}",
923 | f"source={image_path}",
924 | f"conf={confidence}",
925 | "format=json",
926 | ]
927 |
928 | if save_results:
929 | output_dir = os.path.join(tempfile.gettempdir(), "yolo_pose_results")
930 | cmd_args.append(f"project={output_dir}")
931 | cmd_args.append("save=True")
932 | else:
933 | cmd_args.append("save=False")
934 |
935 | result = run_yolo_cli(cmd_args)
936 |
937 | commands.append({
938 | "task": "pose_detection",
939 | "command": result["command"]
940 | })
941 | break
942 |
943 | return {
944 | "status": "commands_generated",
945 | "image_path": image_path,
946 | "commands": commands,
947 | "note": "Commands generated but not executed - comprehensive analysis would occur with actual execution",
948 | "parameters": {
949 | "confidence": confidence,
950 | "save_results": save_results
951 | }
952 | }
953 |
954 | @mcp.tool()
955 | def analyze_image_from_path(
956 | image_path: str,
957 | model_name: str = "yolov8n.pt",
958 | confidence: float = 0.25,
959 | save_results: bool = False
960 | ) -> Dict[str, Any]:
961 | """
962 | Return the YOLO CLI command for image analysis without executing it
963 |
964 | Args:
965 | image_path: Path to the image file
966 | model_name: YOLO model name
967 | confidence: Detection confidence threshold
968 | save_results: Whether to save results to disk
969 |
970 | Returns:
971 | Dictionary containing command that would be executed
972 | """
973 | try:
974 | # Call detect_objects function with is_path=True
975 | return detect_objects(
976 | image_data=image_path,
977 | model_name=model_name,
978 | confidence=confidence,
979 | save_results=save_results,
980 | is_path=True
981 | )
982 | except Exception as e:
983 | return {
984 | "error": f"Failed to generate analysis command: {str(e)}",
985 | "image_path": image_path
986 | }
987 |
988 | @mcp.tool()
989 | def test_connection() -> Dict[str, Any]:
990 | """
991 | Test if YOLO CLI service is available
992 |
993 | Returns:
994 | Status information and available tools
995 | """
996 | # Build a simple YOLO CLI version command
997 | cmd_args = ["--version"]
998 | result = run_yolo_cli(cmd_args)
999 |
1000 | return {
1001 | "status": "YOLO CLI command generator is running",
1002 | "command_mode": "Command generation only, no execution",
1003 | "version_command": result["command"],
1004 | "available_models": list_available_models(),
1005 | "available_tools": [
1006 | "list_available_models", "detect_objects", "segment_objects",
1007 | "classify_image", "track_objects", "train_model", "validate_model",
1008 | "export_model", "start_camera_detection", "stop_camera_detection",
1009 | "get_camera_detections", "test_connection",
1010 | # Additional tools
1011 | "analyze_image_from_path",
1012 | "comprehensive_image_analysis"
1013 | ],
1014 | "note": "This service only generates YOLO commands without executing them"
1015 | }
1016 |
1017 | # Modify the main execution section
1018 | if __name__ == "__main__":
1019 | import platform
1020 |
1021 | logger.info("Starting YOLO CLI command generator service")
1022 | logger.info(f"Platform: {platform.system()} {platform.release()}")
1023 | logger.info("⚠️ Commands will be generated but NOT executed")
1024 |
1025 | # Initialize and run server
1026 | logger.info("Starting MCP server...")
1027 | mcp.run(transport='stdio')
```
--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------
```python
1 | # server.py
2 | import fnmatch
3 | import os
4 | import base64
5 | import cv2
6 | import time
7 | import threading
8 | from io import BytesIO
9 | from typing import List, Dict, Any, Optional, Union
10 | import numpy as np
11 | from PIL import Image
12 |
13 | from mcp.server.fastmcp import FastMCP
14 | from ultralytics import YOLO
15 |
16 | # Add this near the top of server.py with other imports
17 | import os.path
18 | import sys
19 | import logging
20 | import contextlib
21 | import logging
22 | import sys
23 | import contextlib
24 | import signal
25 | import atexit
26 |
27 | # Set up logging configuration - add this near the top of the file
28 | logging.basicConfig(
29 | level=logging.INFO,
30 | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
31 | handlers=[
32 | logging.FileHandler("yolo_service.log"),
33 | logging.StreamHandler(sys.stderr)
34 | ]
35 | )
36 |
37 | logger = logging.getLogger('yolo_service')
38 |
39 | # Global variables for camera control
40 | camera_running = False
41 | camera_thread = None
42 | detection_results = []
43 | camera_last_access_time = 0
44 | CAMERA_INACTIVITY_TIMEOUT = 60 # Auto-shutdown after 60 seconds of inactivity
45 |
46 | @contextlib.contextmanager
47 | def redirect_stdout_to_stderr():
48 | old_stdout = sys.stdout
49 | sys.stdout = sys.stderr
50 | try:
51 | yield
52 | finally:
53 | sys.stdout = old_stdout
54 |
55 | def camera_watchdog_thread():
56 | """Monitor thread that auto-stops the camera after inactivity"""
57 | global camera_running, camera_last_access_time
58 |
59 | logger.info("Camera watchdog thread started")
60 |
61 | while True:
62 | # Sleep for a short time to avoid excessive CPU usage
63 | time.sleep(5)
64 |
65 | # Check if camera is running
66 | if camera_running:
67 | current_time = time.time()
68 | elapsed_time = current_time - camera_last_access_time
69 |
70 | # If no access for more than the timeout, auto-stop
71 | if elapsed_time > CAMERA_INACTIVITY_TIMEOUT:
72 | logger.info(f"Auto-stopping camera after {elapsed_time:.1f} seconds of inactivity")
73 | stop_camera_detection()
74 | else:
75 | # If camera is not running, no need to check frequently
76 | time.sleep(10)
77 |
78 |
79 | def load_image(image_source, is_path=False):
80 | """
81 | Load image from file path or base64 data
82 |
83 | Args:
84 | image_source: File path or base64 encoded image data
85 | is_path: Whether image_source is a file path
86 |
87 | Returns:
88 | PIL Image object
89 | """
90 | try:
91 | if is_path:
92 | # Load image from file path
93 | if os.path.exists(image_source):
94 | return Image.open(image_source)
95 | else:
96 | raise FileNotFoundError(f"Image file not found: {image_source}")
97 | else:
98 | # Load image from base64 data
99 | image_bytes = base64.b64decode(image_source)
100 | return Image.open(BytesIO(image_bytes))
101 | except Exception as e:
102 | raise ValueError(f"Failed to load image: {str(e)}")
103 |
104 | # Create MCP server
105 | mcp = FastMCP("YOLO_Service")
106 |
107 | # Global model cache
108 | models = {}
109 |
110 | def get_model(model_name: str = "yolov8n.pt") -> YOLO:
111 | """Get or load YOLO model from any of the configured model directories"""
112 | if model_name in models:
113 | return models[model_name]
114 |
115 | # Try to find the model in any of the configured directories
116 | model_path = None
117 | for directory in CONFIG["model_dirs"]:
118 | potential_path = os.path.join(directory, model_name)
119 | if os.path.exists(potential_path):
120 | model_path = potential_path
121 | break
122 |
123 | if model_path is None:
124 | available = list_available_models()
125 | available_str = ", ".join(available) if available else "none"
126 | raise FileNotFoundError(f"Model '{model_name}' not found in any configured directories. Available models: {available_str}")
127 |
128 | # Load and cache the model - with stdout redirected
129 | logger.info(f"Loading model: {model_name} from {model_path}")
130 | with redirect_stdout_to_stderr():
131 | models[model_name] = YOLO(model_path)
132 | return models[model_name]
133 |
134 | # Global configuration
135 | CONFIG = {
136 | "model_dirs": [
137 | ".", # Current directory
138 | "./models", # Models subdirectory
139 | os.path.join(os.path.dirname(os.path.abspath(__file__)), "models"), # Absolute path to models
140 | # Add any other potential model directories here
141 | ]
142 | }
143 |
144 |
145 |
146 | # Add a new tool to get information about model directories
147 | @mcp.tool()
148 | def get_model_directories() -> Dict[str, Any]:
149 | """Get information about configured model directories and available models"""
150 | directories = []
151 |
152 | for directory in CONFIG["model_dirs"]:
153 | dir_info = {
154 | "path": directory,
155 | "exists": os.path.exists(directory),
156 | "is_directory": os.path.isdir(directory) if os.path.exists(directory) else False,
157 | "models": []
158 | }
159 |
160 | if dir_info["exists"] and dir_info["is_directory"]:
161 | for filename in os.listdir(directory):
162 | if filename.endswith(".pt"):
163 | dir_info["models"].append(filename)
164 |
165 | directories.append(dir_info)
166 |
167 | return {
168 | "configured_directories": CONFIG["model_dirs"],
169 | "directory_details": directories,
170 | "available_models": list_available_models(),
171 | "loaded_models": list(models.keys())
172 | }
173 |
174 | @mcp.tool()
175 | def detect_objects(
176 | image_data: str,
177 | model_name: str = "yolov8n.pt",
178 | confidence: float = 0.25,
179 | save_results: bool = False,
180 | is_path: bool = False
181 | ) -> Dict[str, Any]:
182 | """
183 | Detect objects in an image using YOLO
184 |
185 | Args:
186 | image_data: Base64 encoded image or file path (if is_path=True)
187 | model_name: YOLO model name
188 | confidence: Detection confidence threshold
189 | save_results: Whether to save results to disk
190 | is_path: Whether image_data is a file path
191 |
192 | Returns:
193 | Dictionary containing detection results
194 | """
195 | try:
196 | # Load image (supports path or base64)
197 | image = load_image(image_data, is_path=is_path)
198 |
199 | # Load model and perform detection - with stdout redirected
200 | model = get_model(model_name)
201 | with redirect_stdout_to_stderr(): # Ensure all YOLO outputs go to stderr
202 | results = model.predict(image, conf=confidence, save=save_results)
203 |
204 | # Format results
205 | formatted_results = []
206 | for result in results:
207 | boxes = result.boxes
208 | detections = []
209 |
210 | for i in range(len(boxes)):
211 | box = boxes[i]
212 | x1, y1, x2, y2 = box.xyxy[0].tolist()
213 | confidence = float(box.conf[0])
214 | class_id = int(box.cls[0])
215 | class_name = result.names[class_id]
216 |
217 | detections.append({
218 | "box": [x1, y1, x2, y2],
219 | "confidence": confidence,
220 | "class_id": class_id,
221 | "class_name": class_name
222 | })
223 |
224 | formatted_results.append({
225 | "detections": detections,
226 | "image_shape": result.orig_shape
227 | })
228 |
229 | return {
230 | "results": formatted_results,
231 | "model_used": model_name,
232 | "total_detections": sum(len(r["detections"]) for r in formatted_results),
233 | "source": image_data if is_path else "base64_image"
234 | }
235 | except Exception as e:
236 | logger.error(f"Error in detect_objects: {str(e)}")
237 | return {
238 | "error": f"Failed to detect objects: {str(e)}",
239 | "source": image_data if is_path else "base64_image"
240 | }
241 |
242 | @mcp.tool()
243 | def segment_objects(
244 | image_data: str,
245 | model_name: str = "yolov11n-seg.pt",
246 | confidence: float = 0.25,
247 | save_results: bool = False,
248 | is_path: bool = False
249 | ) -> Dict[str, Any]:
250 | """
251 | Perform instance segmentation on an image using YOLO
252 |
253 | Args:
254 | image_data: Base64 encoded image or file path (if is_path=True)
255 | model_name: YOLO segmentation model name
256 | confidence: Detection confidence threshold
257 | save_results: Whether to save results to disk
258 | is_path: Whether image_data is a file path
259 |
260 | Returns:
261 | Dictionary containing segmentation results
262 | """
263 | try:
264 | # Load image (supports path or base64)
265 | image = load_image(image_data, is_path=is_path)
266 |
267 | # Load model and perform segmentation
268 | model = get_model(model_name)
269 | with redirect_stdout_to_stderr(): # Add this context manager
270 | results = model.predict(image, conf=confidence, save=save_results)
271 |
272 | # Format results
273 | formatted_results = []
274 | for result in results:
275 | if not hasattr(result, 'masks') or result.masks is None:
276 | continue
277 |
278 | boxes = result.boxes
279 | masks = result.masks
280 | segments = []
281 |
282 | for i in range(len(boxes)):
283 | box = boxes[i]
284 | mask = masks[i].data[0].cpu().numpy() if masks else None
285 |
286 | x1, y1, x2, y2 = box.xyxy[0].tolist()
287 | confidence = float(box.conf[0])
288 | class_id = int(box.cls[0])
289 | class_name = result.names[class_id]
290 |
291 | segment = {
292 | "box": [x1, y1, x2, y2],
293 | "confidence": confidence,
294 | "class_id": class_id,
295 | "class_name": class_name
296 | }
297 |
298 | if mask is not None:
299 | # Convert binary mask to simplified format for API response
300 | segment["mask"] = mask.tolist()
301 |
302 | segments.append(segment)
303 |
304 | formatted_results.append({
305 | "segments": segments,
306 | "image_shape": result.orig_shape
307 | })
308 |
309 | return {
310 | "results": formatted_results,
311 | "model_used": model_name,
312 | "total_segments": sum(len(r["segments"]) for r in formatted_results),
313 | "source": image_data if is_path else "base64_image"
314 | }
315 | except Exception as e:
316 | return {
317 | "error": f"Failed to segment objects: {str(e)}",
318 | "source": image_data if is_path else "base64_image"
319 | }
320 |
321 |
322 | @mcp.tool()
323 | def classify_image(
324 | image_data: str,
325 | model_name: str = "yolov11n-cls.pt",
326 | top_k: int = 5,
327 | save_results: bool = False,
328 | is_path: bool = False
329 | ) -> Dict[str, Any]:
330 | """
331 | Classify an image using YOLO classification model
332 |
333 | Args:
334 | image_data: Base64 encoded image or file path (if is_path=True)
335 | model_name: YOLO classification model name
336 | top_k: Number of top categories to return
337 | save_results: Whether to save results to disk
338 | is_path: Whether image_data is a file path
339 |
340 | Returns:
341 | Dictionary containing classification results
342 | """
343 | try:
344 | # Load image (supports path or base64)
345 | image = load_image(image_data, is_path=is_path)
346 |
347 | # Load model and perform classification
348 | model = get_model(model_name)
349 | with redirect_stdout_to_stderr(): # Add this context manager
350 | results = model.predict(image, save=save_results)
351 |
352 | # Format results
353 | formatted_results = []
354 | for result in results:
355 | if not hasattr(result, 'probs') or result.probs is None:
356 | continue
357 |
358 | probs = result.probs
359 | top_indices = probs.top5
360 | top_probs = probs.top5conf.tolist()
361 | top_classes = [result.names[idx] for idx in top_indices]
362 |
363 | classifications = [
364 | {"class_id": int(idx), "class_name": name, "probability": float(prob)}
365 | for idx, name, prob in zip(top_indices[:top_k], top_classes[:top_k], top_probs[:top_k])
366 | ]
367 |
368 | formatted_results.append({
369 | "classifications": classifications,
370 | "image_shape": result.orig_shape
371 | })
372 |
373 | return {
374 | "results": formatted_results,
375 | "model_used": model_name,
376 | "top_k": top_k,
377 | "source": image_data if is_path else "base64_image"
378 | }
379 | except Exception as e:
380 | return {
381 | "error": f"Failed to classify image: {str(e)}",
382 | "source": image_data if is_path else "base64_image"
383 | }
384 |
385 |
386 | @mcp.tool()
387 | def track_objects(
388 | image_data: str,
389 | model_name: str = "yolov8n.pt",
390 | confidence: float = 0.25,
391 | tracker: str = "bytetrack.yaml",
392 | save_results: bool = False
393 | ) -> Dict[str, Any]:
394 | """
395 | Track objects in an image sequence using YOLO
396 |
397 | Args:
398 | image_data: Base64 encoded image
399 | model_name: YOLO model name
400 | confidence: Detection confidence threshold
401 | tracker: Tracker name to use (e.g., 'bytetrack.yaml', 'botsort.yaml')
402 | save_results: Whether to save results to disk
403 |
404 | Returns:
405 | Dictionary containing tracking results
406 | """
407 | # Decode Base64 image
408 | image_bytes = base64.b64decode(image_data)
409 | image = Image.open(BytesIO(image_bytes))
410 |
411 | # Load model and perform tracking
412 | model = get_model(model_name)
413 | # Add redirect_stdout_to_stderr context manager
414 | with redirect_stdout_to_stderr():
415 | results = model.track(image, conf=confidence, tracker=tracker, save=save_results)
416 |
417 | # Format results
418 | formatted_results = []
419 | for result in results:
420 | if not hasattr(result, 'boxes') or result.boxes is None:
421 | continue
422 |
423 | boxes = result.boxes
424 | tracks = []
425 |
426 | for i in range(len(boxes)):
427 | box = boxes[i]
428 | x1, y1, x2, y2 = box.xyxy[0].tolist()
429 | confidence = float(box.conf[0])
430 | class_id = int(box.cls[0])
431 | class_name = result.names[class_id]
432 |
433 | # Extract track ID (if any)
434 | track_id = int(box.id[0]) if box.id is not None else None
435 |
436 | track = {
437 | "box": [x1, y1, x2, y2],
438 | "confidence": confidence,
439 | "class_id": class_id,
440 | "class_name": class_name,
441 | "track_id": track_id
442 | }
443 |
444 | tracks.append(track)
445 |
446 | formatted_results.append({
447 | "tracks": tracks,
448 | "image_shape": result.orig_shape
449 | })
450 |
451 | return {
452 | "results": formatted_results,
453 | "model_used": model_name,
454 | "tracker": tracker,
455 | "total_tracks": sum(len(r["tracks"]) for r in formatted_results)
456 | }
457 |
458 | # 3. FIX train_model FUNCTION TO USE REDIRECTION:
459 | @mcp.tool()
460 | def train_model(
461 | dataset_path: str,
462 | model_name: str = "yolov8n.pt",
463 | epochs: int = 100,
464 | imgsz: int = 640,
465 | batch: int = 16,
466 | name: str = "yolo_custom_model",
467 | project: str = "runs/train"
468 | ) -> Dict[str, Any]:
469 | """
470 | Train a YOLO model on a custom dataset
471 |
472 | Args:
473 | dataset_path: Path to YOLO format dataset
474 | model_name: Base model to start with
475 | epochs: Number of training epochs
476 | imgsz: Image size for training
477 | batch: Batch size
478 | name: Name for the training run
479 | project: Project directory
480 |
481 | Returns:
482 | Dictionary containing training results
483 | """
484 | # Validate dataset path
485 | if not os.path.exists(dataset_path):
486 | return {"error": f"Dataset not found: {dataset_path}"}
487 |
488 | # Initialize model
489 | model = get_model(model_name)
490 |
491 | # Train model
492 | try:
493 | # Add redirect_stdout_to_stderr context manager
494 | with redirect_stdout_to_stderr():
495 | results = model.train(
496 | data=dataset_path,
497 | epochs=epochs,
498 | imgsz=imgsz,
499 | batch=batch,
500 | name=name,
501 | project=project
502 | )
503 |
504 | # Get best model path
505 | best_model_path = os.path.join(project, name, "weights", "best.pt")
506 |
507 | return {
508 | "status": "success",
509 | "model_path": best_model_path,
510 | "epochs_completed": epochs,
511 | "final_metrics": {
512 | "precision": float(results.results_dict.get("metrics/precision(B)", 0)),
513 | "recall": float(results.results_dict.get("metrics/recall(B)", 0)),
514 | "mAP50": float(results.results_dict.get("metrics/mAP50(B)", 0)),
515 | "mAP50-95": float(results.results_dict.get("metrics/mAP50-95(B)", 0))
516 | }
517 | }
518 | except Exception as e:
519 | return {"error": f"Training failed: {str(e)}"}
520 |
521 | # 4. FIX validate_model FUNCTION TO USE REDIRECTION:
522 | @mcp.tool()
523 | def validate_model(
524 | model_path: str,
525 | data_path: str,
526 | imgsz: int = 640,
527 | batch: int = 16
528 | ) -> Dict[str, Any]:
529 | """
530 | Validate a YOLO model on a dataset
531 |
532 | Args:
533 | model_path: Path to YOLO model (.pt file)
534 | data_path: Path to YOLO format validation dataset
535 | imgsz: Image size for validation
536 | batch: Batch size
537 |
538 | Returns:
539 | Dictionary containing validation results
540 | """
541 | # Validate model path
542 | if not os.path.exists(model_path):
543 | return {"error": f"Model file not found: {model_path}"}
544 |
545 | # Validate dataset path
546 | if not os.path.exists(data_path):
547 | return {"error": f"Dataset not found: {data_path}"}
548 |
549 | # Load model
550 | try:
551 | model = get_model(model_path)
552 | except Exception as e:
553 | return {"error": f"Failed to load model: {str(e)}"}
554 |
555 | # Validate model
556 | try:
557 | # Add redirect_stdout_to_stderr context manager
558 | with redirect_stdout_to_stderr():
559 | results = model.val(data=data_path, imgsz=imgsz, batch=batch)
560 |
561 | return {
562 | "status": "success",
563 | "metrics": {
564 | "precision": float(results.results_dict.get("metrics/precision(B)", 0)),
565 | "recall": float(results.results_dict.get("metrics/recall(B)", 0)),
566 | "mAP50": float(results.results_dict.get("metrics/mAP50(B)", 0)),
567 | "mAP50-95": float(results.results_dict.get("metrics/mAP50-95(B)", 0))
568 | }
569 | }
570 | except Exception as e:
571 | return {"error": f"Validation failed: {str(e)}"}
572 |
573 | # 5. FIX export_model FUNCTION TO USE REDIRECTION:
574 | @mcp.tool()
575 | def export_model(
576 | model_path: str,
577 | format: str = "onnx",
578 | imgsz: int = 640
579 | ) -> Dict[str, Any]:
580 | """
581 | Export a YOLO model to different formats
582 |
583 | Args:
584 | model_path: Path to YOLO model (.pt file)
585 | format: Export format (onnx, torchscript, openvino, etc.)
586 | imgsz: Image size for export
587 |
588 | Returns:
589 | Dictionary containing export results
590 | """
591 | # Validate model path
592 | if not os.path.exists(model_path):
593 | return {"error": f"Model file not found: {model_path}"}
594 |
595 | # Valid export formats
596 | valid_formats = [
597 | "torchscript", "onnx", "openvino", "engine", "coreml", "saved_model",
598 | "pb", "tflite", "edgetpu", "tfjs", "paddle"
599 | ]
600 |
601 | if format not in valid_formats:
602 | return {"error": f"Invalid export format: {format}. Valid formats include: {', '.join(valid_formats)}"}
603 |
604 | # Load model
605 | try:
606 | model = get_model(model_path)
607 | except Exception as e:
608 | return {"error": f"Failed to load model: {str(e)}"}
609 |
610 | # Export model
611 | try:
612 | # Add redirect_stdout_to_stderr context manager
613 | with redirect_stdout_to_stderr():
614 | export_path = model.export(format=format, imgsz=imgsz)
615 |
616 | return {
617 | "status": "success",
618 | "export_path": str(export_path),
619 | "format": format
620 | }
621 | except Exception as e:
622 | return {"error": f"Export failed: {str(e)}"}
623 |
624 | # 6. ADD REDIRECTION TO get_model_info FUNCTION:
625 | @mcp.resource("model_info/{model_name}")
626 | def get_model_info(model_name: str) -> Dict[str, Any]:
627 | """
628 | Get information about a YOLO model
629 |
630 | Args:
631 | model_name: YOLO model name
632 |
633 | Returns:
634 | Dictionary containing model information
635 | """
636 | try:
637 | model = get_model(model_name)
638 |
639 | # Get model task
640 | task = 'detect' # Default task
641 | if 'seg' in model_name:
642 | task = 'segment'
643 | elif 'pose' in model_name:
644 | task = 'pose'
645 | elif 'cls' in model_name:
646 | task = 'classify'
647 | elif 'obb' in model_name:
648 | task = 'obb'
649 |
650 | # Make sure any model property access that might trigger output is wrapped
651 | with redirect_stdout_to_stderr():
652 | yaml_str = str(model.yaml)
653 | pt_path = str(model.pt_path) if hasattr(model, 'pt_path') else None
654 | class_names = model.names
655 |
656 | # Get model info
657 | return {
658 | "model_name": model_name,
659 | "task": task,
660 | "yaml": yaml_str,
661 | "pt_path": pt_path,
662 | "class_names": class_names
663 | }
664 | except Exception as e:
665 | return {"error": f"Failed to get model info: {str(e)}"}
666 |
667 | # 7. MODIFY list_available_models to use logging instead of print
668 | @mcp.tool()
669 | def list_available_models() -> List[str]:
670 | """List available YOLO models that actually exist on disk in any configured directory"""
671 | # Common YOLO model patterns
672 | model_patterns = [
673 | "yolov11*.pt",
674 | "yolov8*.pt"
675 | ]
676 |
677 | # Find all existing models in all configured directories
678 | available_models = set()
679 | for directory in CONFIG["model_dirs"]:
680 | if not os.path.exists(directory):
681 | continue
682 |
683 | # Check for model files directly
684 | for filename in os.listdir(directory):
685 | if filename.endswith(".pt") and any(
686 | fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
687 | ):
688 | available_models.add(filename)
689 |
690 | # Convert to sorted list
691 | result = sorted(list(available_models))
692 |
693 | if not result:
694 | # Replace print with logger
695 | logger.warning("No model files found in configured directories.")
696 | return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
697 |
698 | return result
699 | @mcp.resource("model_info/{model_name}")
700 | def get_model_info(model_name: str) -> Dict[str, Any]:
701 | """
702 | Get information about a YOLO model
703 |
704 | Args:
705 | model_name: YOLO model name
706 |
707 | Returns:
708 | Dictionary containing model information
709 | """
710 | try:
711 | model = get_model(model_name)
712 |
713 | # Get model task
714 | task = 'detect' # Default task
715 | if 'seg' in model_name:
716 | task = 'segment'
717 | elif 'pose' in model_name:
718 | task = 'pose'
719 | elif 'cls' in model_name:
720 | task = 'classify'
721 | elif 'obb' in model_name:
722 | task = 'obb'
723 |
724 | # Get model info
725 | return {
726 | "model_name": model_name,
727 | "task": task,
728 | "yaml": str(model.yaml),
729 | "pt_path": str(model.pt_path) if hasattr(model, 'pt_path') else None,
730 | "class_names": model.names
731 | }
732 | except Exception as e:
733 | return {"error": f"Failed to get model info: {str(e)}"}
734 |
735 | @mcp.tool()
736 | def list_available_models() -> List[str]:
737 | """List available YOLO models that actually exist on disk in any configured directory"""
738 | # Common YOLO model patterns
739 | model_patterns = [
740 | "yolov11*.pt",
741 | "yolov8*.pt"
742 | ]
743 |
744 | # Find all existing models in all configured directories
745 | available_models = set()
746 | for directory in CONFIG["model_dirs"]:
747 | if not os.path.exists(directory):
748 | continue
749 |
750 | # Check for model files directly
751 | for filename in os.listdir(directory):
752 | if filename.endswith(".pt") and any(
753 | fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
754 | ):
755 | available_models.add(filename)
756 |
757 | # Convert to sorted list
758 | result = sorted(list(available_models))
759 |
760 | if not result:
761 | print("Warning: No model files found in configured directories.")
762 | return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
763 |
764 | return result
765 |
766 |
767 |
768 | # Camera detection background thread
769 | camera_thread = None
770 | camera_running = False
771 | detection_results = []
772 |
773 | def camera_detection_thread(model_name, confidence, fps_limit=30, camera_id=0):
774 | """Background thread for camera detection"""
775 | global camera_running, detection_results
776 |
777 | # Load model
778 | try:
779 | with redirect_stdout_to_stderr():
780 | model = get_model(model_name)
781 | logger.info(f"Model {model_name} loaded successfully")
782 | except Exception as e:
783 | logger.error(f"Error loading model: {str(e)}")
784 | camera_running = False
785 | detection_results.append({
786 | "timestamp": time.time(),
787 | "error": f"Failed to load model: {str(e)}",
788 | "detections": []
789 | })
790 | return
791 |
792 | # Rest of the function...
793 | # Try to open camera with multiple attempts and multiple camera IDs if necessary
794 | cap = None
795 | error_message = ""
796 |
797 | # Try camera IDs from 0 to 2
798 | for cam_id in range(3):
799 | try:
800 | logger.info(f"Attempting to open camera with ID {cam_id}...")
801 | cap = cv2.VideoCapture(cam_id)
802 | if cap.isOpened():
803 | logger.info(f"Successfully opened camera {cam_id}")
804 | break
805 | except Exception as e:
806 | error_message = f"Error opening camera {cam_id}: {str(e)}"
807 | logger.error(error_message)
808 |
809 | # Check if any camera was successfully opened
810 | if cap is None or not cap.isOpened():
811 | logger.error("Error: Could not open any camera.")
812 | camera_running = False
813 | detection_results.append({
814 | "timestamp": time.time(),
815 | "error": "Failed to open camera. Make sure camera is connected and not in use by another application.",
816 | "camera_status": "unavailable",
817 | "detections": []
818 | })
819 | return
820 |
821 | # Get camera properties for diagnostics
822 | width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
823 | height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
824 | fps = cap.get(cv2.CAP_PROP_FPS)
825 |
826 | logger.info(f"Camera properties: {width}x{height} at {fps} FPS")
827 |
828 | # Calculate frame interval based on fps_limit
829 | frame_interval = 1.0 / fps_limit
830 | frame_count = 0
831 | error_count = 0
832 |
833 | while camera_running:
834 | start_time = time.time()
835 |
836 | try:
837 | ret, frame = cap.read()
838 | if not ret:
839 | logger.warning(f"Error: Failed to capture frame (attempt {error_count+1}).")
840 | error_count += 1
841 |
842 | # Add error to detection results
843 | detection_results.append({
844 | "timestamp": time.time(),
845 | "error": f"Failed to capture frame (attempt {error_count})",
846 | "camera_status": "error",
847 | "detections": []
848 | })
849 |
850 | # If we have consistent failures, try to restart the camera
851 | if error_count >= 5:
852 | logger.warning("Too many frame capture errors, attempting to restart camera...")
853 | cap.release()
854 | time.sleep(1)
855 | cap = cv2.VideoCapture(camera_id)
856 | error_count = 0
857 | if not cap.isOpened():
858 | logger.error("Failed to reopen camera after errors.")
859 | break
860 |
861 | time.sleep(1) # Wait before trying again
862 | continue
863 |
864 | # Reset error count on successful frame capture
865 | error_count = 0
866 | frame_count += 1
867 |
868 | # Perform detection on frame
869 | with redirect_stdout_to_stderr(): # Add this context manager
870 | results = model.predict(frame, conf=confidence)
871 |
872 | # Update detection results (only keep the last 10)
873 | if len(detection_results) >= 10:
874 | detection_results.pop(0)
875 |
876 | # Format results
877 | for result in results:
878 | boxes = result.boxes
879 | detections = []
880 |
881 | for i in range(len(boxes)):
882 | box = boxes[i]
883 | x1, y1, x2, y2 = box.xyxy[0].tolist()
884 | conf = float(box.conf[0])
885 | class_id = int(box.cls[0])
886 | class_name = result.names[class_id]
887 |
888 | detections.append({
889 | "box": [x1, y1, x2, y2],
890 | "confidence": conf,
891 | "class_id": class_id,
892 | "class_name": class_name
893 | })
894 |
895 | detection_results.append({
896 | "timestamp": time.time(),
897 | "frame_count": frame_count,
898 | "detections": detections,
899 | "camera_status": "running",
900 | "image_shape": result.orig_shape
901 | })
902 |
903 | # Log occasional status
904 | if frame_count % 30 == 0:
905 | logger.info(f"Camera running: processed {frame_count} frames")
906 | detection_count = sum(len(r.get("detections", [])) for r in detection_results if "detections" in r)
907 | logger.info(f"Total detections in current buffer: {detection_count}")
908 |
909 | # Limit FPS by waiting if necessary
910 | elapsed = time.time() - start_time
911 | if elapsed < frame_interval:
912 | time.sleep(frame_interval - elapsed)
913 |
914 | except Exception as e:
915 | logger.error(f"Error in camera thread: {str(e)}")
916 | detection_results.append({
917 | "timestamp": time.time(),
918 | "error": f"Exception in camera processing: {str(e)}",
919 | "camera_status": "error",
920 | "detections": []
921 | })
922 | time.sleep(1) # Wait before continuing
923 |
924 | # Clean up
925 | logger.info("Shutting down camera...")
926 | if cap is not None:
927 | cap.release()
928 |
929 | @mcp.tool()
930 | def start_camera_detection(
931 | model_name: str = "yolov8n.pt",
932 | confidence: float = 0.25,
933 | camera_id: int = 0
934 | ) -> Dict[str, Any]:
935 | """
936 | Start realtime object detection using the computer's camera
937 |
938 | Args:
939 | model_name: YOLO model name to use
940 | confidence: Detection confidence threshold
941 | camera_id: Camera device ID (0 is usually the default camera)
942 |
943 | Returns:
944 | Status of camera detection
945 | """
946 | global camera_thread, camera_running, detection_results, camera_last_access_time
947 |
948 | # Check if already running
949 | if camera_running:
950 | # Update last access time
951 | camera_last_access_time = time.time()
952 | return {"status": "success", "message": "Camera detection is already running"}
953 |
954 | # Clear previous results
955 | detection_results = []
956 |
957 | # First, try to check if OpenCV is properly installed
958 | try:
959 | cv2_version = cv2.__version__
960 | logger.info(f"OpenCV version: {cv2_version}")
961 | except Exception as e:
962 | logger.error(f"OpenCV not properly installed: {str(e)}")
963 | return {
964 | "status": "error",
965 | "message": f"OpenCV not properly installed: {str(e)}",
966 | "solution": "Please check OpenCV installation"
967 | }
968 |
969 | # Start detection thread
970 | camera_running = True
971 | camera_last_access_time = time.time() # Update access time
972 | camera_thread = threading.Thread(
973 | target=camera_detection_thread,
974 | args=(model_name, confidence, 30, camera_id),
975 | daemon=True
976 | )
977 | camera_thread.start()
978 |
979 | # Add initial status to detection results
980 | detection_results.append({
981 | "timestamp": time.time(),
982 | "system_info": {
983 | "os": platform.system() if 'platform' in globals() else "Unknown",
984 | "opencv_version": cv2.__version__,
985 | "camera_id": camera_id
986 | },
987 | "camera_status": "starting",
988 | "detections": []
989 | })
990 |
991 | return {
992 | "status": "success",
993 | "message": f"Started camera detection using model {model_name}",
994 | "model": model_name,
995 | "confidence": confidence,
996 | "camera_id": camera_id,
997 | "auto_shutdown": f"Camera will auto-shutdown after {CAMERA_INACTIVITY_TIMEOUT} seconds of inactivity",
998 | "note": "If camera doesn't work, try different camera_id values (0, 1, or 2)"
999 | }
1000 |
1001 |
1002 | @mcp.tool()
1003 | def stop_camera_detection() -> Dict[str, Any]:
1004 | """
1005 | Stop realtime camera detection
1006 |
1007 | Returns:
1008 | Status message
1009 | """
1010 | global camera_running
1011 |
1012 | if not camera_running:
1013 | return {"status": "error", "message": "Camera detection is not running"}
1014 |
1015 | logger.info("Stopping camera detection by user request")
1016 | camera_running = False
1017 |
1018 | # Wait for thread to terminate
1019 | if camera_thread and camera_thread.is_alive():
1020 | camera_thread.join(timeout=2.0)
1021 |
1022 | return {
1023 | "status": "success",
1024 | "message": "Stopped camera detection"
1025 | }
1026 |
1027 |
1028 | @mcp.tool()
1029 | def get_camera_detections() -> Dict[str, Any]:
1030 | """
1031 | Get the latest detections from the camera
1032 |
1033 | Returns:
1034 | Dictionary with recent detections
1035 | """
1036 | global detection_results, camera_thread, camera_last_access_time
1037 |
1038 | # Update the last access time whenever this function is called
1039 | if camera_running:
1040 | camera_last_access_time = time.time()
1041 |
1042 | # Check if thread is alive
1043 | thread_alive = camera_thread is not None and camera_thread.is_alive()
1044 |
1045 | # If camera_running is True but thread is dead, there's an issue
1046 | if camera_running and not thread_alive:
1047 | return {
1048 | "status": "error",
1049 | "message": "Camera thread has stopped unexpectedly",
1050 | "is_running": False,
1051 | "camera_status": "error",
1052 | "thread_alive": thread_alive,
1053 | "detections": detection_results,
1054 | "count": len(detection_results),
1055 | "solution": "Please try restart the camera with a different camera_id"
1056 | }
1057 |
1058 | if not camera_running:
1059 | return {
1060 | "status": "error",
1061 | "message": "Camera detection is not running",
1062 | "is_running": False,
1063 | "camera_status": "stopped"
1064 | }
1065 |
1066 | # Check for errors in detection results
1067 | errors = [result.get("error") for result in detection_results if "error" in result]
1068 | recent_errors = errors[-5:] if errors else []
1069 |
1070 | # Count actual detections
1071 | detection_count = sum(len(result.get("detections", [])) for result in detection_results if "detections" in result)
1072 |
1073 | return {
1074 | "status": "success",
1075 | "is_running": camera_running,
1076 | "thread_alive": thread_alive,
1077 | "detections": detection_results,
1078 | "count": len(detection_results),
1079 | "total_detections": detection_count,
1080 | "recent_errors": recent_errors if recent_errors else None,
1081 | "camera_status": "error" if recent_errors else "running",
1082 | "inactivity_timeout": {
1083 | "seconds_remaining": int(CAMERA_INACTIVITY_TIMEOUT - (time.time() - camera_last_access_time)),
1084 | "last_access": camera_last_access_time
1085 | }
1086 | }
1087 |
1088 | def cleanup_resources():
1089 | """Clean up resources when the server is shutting down"""
1090 | global camera_running
1091 |
1092 | logger.info("Cleaning up resources...")
1093 |
1094 | # Stop camera if it's running
1095 | if camera_running:
1096 | logger.info("Shutting down camera during server exit")
1097 | camera_running = False
1098 |
1099 | # Give the camera thread a moment to clean up
1100 | if camera_thread and camera_thread.is_alive():
1101 | camera_thread.join(timeout=2.0)
1102 |
1103 | logger.info("Cleanup complete")
1104 | atexit.register(cleanup_resources)
1105 |
1106 | def signal_handler(sig, frame):
1107 | """Handle termination signals"""
1108 | logger.info(f"Received signal {sig}, shutting down...")
1109 | cleanup_resources()
1110 | sys.exit(0)
1111 |
1112 | signal.signal(signal.SIGINT, signal_handler)
1113 | signal.signal(signal.SIGTERM, signal_handler)
1114 |
1115 | def start_watchdog():
1116 | """Start the camera watchdog thread"""
1117 | watchdog = threading.Thread(
1118 | target=camera_watchdog_thread,
1119 | daemon=True
1120 | )
1121 | watchdog.start()
1122 | return watchdog
1123 |
1124 | @mcp.tool()
1125 | def comprehensive_image_analysis(
1126 | image_path: str,
1127 | confidence: float = 0.25,
1128 | save_results: bool = False
1129 | ) -> Dict[str, Any]:
1130 | """
1131 | Perform comprehensive analysis on an image by combining multiple model results
1132 |
1133 | Args:
1134 | image_path: Path to the image file
1135 | confidence: Detection confidence threshold
1136 | save_results: Whether to save results to disk
1137 |
1138 | Returns:
1139 | Dictionary containing comprehensive analysis results
1140 | """
1141 | try:
1142 | if not os.path.exists(image_path):
1143 | return {"error": f"Image file not found: {image_path}"}
1144 |
1145 | # Load image
1146 | image = load_image(image_path, is_path=True)
1147 |
1148 | analysis_results = {}
1149 |
1150 | # 1. Object detection
1151 | object_model = get_model("yolov11n.pt")
1152 | with redirect_stdout_to_stderr(): # Add this context manager
1153 | object_results = object_model.predict(image, conf=confidence, save=save_results)
1154 |
1155 | # Process object detection results
1156 | detected_objects = []
1157 | for result in object_results:
1158 | boxes = result.boxes
1159 | for i in range(len(boxes)):
1160 | box = boxes[i]
1161 | conf = float(box.conf[0])
1162 | class_id = int(box.cls[0])
1163 | class_name = result.names[class_id]
1164 | detected_objects.append({
1165 | "class_name": class_name,
1166 | "confidence": conf
1167 | })
1168 | analysis_results["objects"] = detected_objects
1169 |
1170 | # 2. Scene classification
1171 | try:
1172 | cls_model = get_model("yolov8n-cls.pt")
1173 | with redirect_stdout_to_stderr(): # Add this context manager
1174 | cls_results = cls_model.predict(image, save=False)
1175 |
1176 | scene_classifications = []
1177 | for result in cls_results:
1178 | if hasattr(result, 'probs') and result.probs is not None:
1179 | probs = result.probs
1180 | top_indices = probs.top5
1181 | top_probs = probs.top5conf.tolist()
1182 | top_classes = [result.names[idx] for idx in top_indices]
1183 |
1184 | for idx, name, prob in zip(top_indices[:3], top_classes[:3], top_probs[:3]):
1185 | scene_classifications.append({
1186 | "class_name": name,
1187 | "probability": float(prob)
1188 | })
1189 | analysis_results["scene"] = scene_classifications
1190 | except Exception as e:
1191 | analysis_results["scene_error"] = str(e)
1192 |
1193 | # 3. Human pose detection
1194 | try:
1195 | pose_model = get_model("yolov8n-pose.pt")
1196 | with redirect_stdout_to_stderr(): # Add this context manager
1197 | pose_results = pose_model.predict(image, conf=confidence, save=False)
1198 |
1199 | detected_poses = []
1200 | for result in pose_results:
1201 | if hasattr(result, 'keypoints') and result.keypoints is not None:
1202 | boxes = result.boxes
1203 | keypoints = result.keypoints
1204 |
1205 | for i in range(len(boxes)):
1206 | box = boxes[i]
1207 | conf = float(box.conf[0])
1208 | detected_poses.append({
1209 | "person_confidence": conf,
1210 | "has_keypoints": keypoints[i].data.shape[1] if keypoints else 0
1211 | })
1212 | analysis_results["poses"] = detected_poses
1213 | except Exception as e:
1214 | analysis_results["pose_error"] = str(e)
1215 |
1216 | # Rest of the function remains the same...
1217 | # 4. Comprehensive task description
1218 | tasks = []
1219 |
1220 | # Detect main objects
1221 | main_objects = [obj["class_name"] for obj in detected_objects if obj["confidence"] > 0.5]
1222 | if "person" in main_objects:
1223 | tasks.append("Person Detection")
1224 |
1225 | # Check for weapon objects
1226 | weapon_objects = ["sword", "knife", "katana", "gun", "pistol", "rifle"]
1227 | weapons = [obj for obj in main_objects if any(weapon in obj.lower() for weapon in weapon_objects)]
1228 | if weapons:
1229 | tasks.append(f"Weapon Detection ({', '.join(weapons)})")
1230 |
1231 | # Count people
1232 | person_count = main_objects.count("person")
1233 | if person_count > 0:
1234 | tasks.append(f"Person Count ({person_count} people)")
1235 |
1236 | # Pose analysis
1237 | if "poses" in analysis_results and analysis_results["poses"]:
1238 | tasks.append("Human Pose Analysis")
1239 |
1240 | # Scene classification
1241 | if "scene" in analysis_results and analysis_results["scene"]:
1242 | scene_types = [scene["class_name"] for scene in analysis_results["scene"][:2]]
1243 | tasks.append(f"Scene Classification ({', '.join(scene_types)})")
1244 |
1245 | analysis_results["identified_tasks"] = tasks
1246 |
1247 | # Return comprehensive results
1248 | return {
1249 | "status": "success",
1250 | "image_path": image_path,
1251 | "analysis": analysis_results,
1252 | "summary": "Tasks identified in the image: " + ", ".join(tasks) if tasks else "No clear tasks identified"
1253 | }
1254 | except Exception as e:
1255 | return {
1256 | "status": "error",
1257 | "image_path": image_path,
1258 | "error": f"Comprehensive analysis failed: {str(e)}"
1259 | }
1260 |
1261 |
1262 | @mcp.tool()
1263 | def analyze_image_from_path(
1264 | image_path: str,
1265 | model_name: str = "yolov8n.pt",
1266 | confidence: float = 0.25,
1267 | save_results: bool = False
1268 | ) -> Dict[str, Any]:
1269 | """
1270 | Analyze image from file path using YOLO
1271 |
1272 | Args:
1273 | image_path: Path to the image file
1274 | model_name: YOLO model name
1275 | confidence: Detection confidence threshold
1276 | save_results: Whether to save results to disk
1277 |
1278 | Returns:
1279 | Dictionary containing detection results
1280 | """
1281 | try:
1282 | # Call detect_objects function with is_path=True
1283 | return detect_objects(
1284 | image_data=image_path,
1285 | model_name=model_name,
1286 | confidence=confidence,
1287 | save_results=save_results,
1288 | is_path=True
1289 | )
1290 | except Exception as e:
1291 | return {
1292 | "error": f"Failed to analyze image: {str(e)}",
1293 | "image_path": image_path
1294 | }
1295 |
1296 | @mcp.tool()
1297 | def test_connection() -> Dict[str, Any]:
1298 | """
1299 | Test if YOLO MCP service is running properly
1300 |
1301 | Returns:
1302 | Status information and available tools
1303 | """
1304 | return {
1305 | "status": "YOLO MCP service is running normally",
1306 | "available_models": list_available_models(),
1307 | "available_tools": [
1308 | "list_available_models", "detect_objects", "segment_objects",
1309 | "classify_image", "detect_poses", "detect_oriented_objects",
1310 | "track_objects", "train_model", "validate_model",
1311 | "export_model", "start_camera_detection", "stop_camera_detection",
1312 | "get_camera_detections", "test_connection",
1313 | # Additional tools
1314 | "analyze_image_from_path",
1315 | "comprehensive_image_analysis"
1316 | ],
1317 | "new_features": [
1318 | "Support for loading images directly from file paths",
1319 | "Support for comprehensive image analysis with task identification",
1320 | "All detection functions support both file paths and base64 data"
1321 | ]
1322 | }
1323 |
1324 | # Modify the main execution section
1325 | if __name__ == "__main__":
1326 | logger.info("Starting YOLO MCP service")
1327 |
1328 | # Start the camera watchdog thread
1329 | watchdog_thread = start_watchdog()
1330 |
1331 | # Initialize and run server
1332 | mcp.run(transport='stdio')
```