This is page 2 of 3. Use http://codebase.md/jhacksman/openscad-mcp-server?lines=true&page={x} to view the full context. # Directory Structure ``` ├── implementation_plan.md ├── old │ ├── download_sam2_checkpoint.py │ ├── src │ │ ├── ai │ │ │ └── sam_segmentation.py │ │ ├── models │ │ │ └── threestudio_generator.py │ │ └── workflow │ │ └── image_to_model_pipeline.py │ └── test_sam2_segmentation.py ├── README.md ├── requirements.txt ├── rtfmd │ ├── decisions │ │ ├── ai-driven-code-generation.md │ │ └── export-formats.md │ ├── files │ │ └── src │ │ ├── ai │ │ │ └── ai_service.py.md │ │ ├── main.py.md │ │ ├── models │ │ │ └── code_generator.py.md │ │ └── nlp │ │ └── parameter_extractor.py.md │ ├── knowledge │ │ ├── ai │ │ │ └── natural-language-processing.md │ │ ├── nlp │ │ │ └── parameter-extraction.md │ │ └── openscad │ │ ├── export-formats.md │ │ ├── openscad-basics.md │ │ └── primitive-testing.md │ └── README.md ├── scad │ └── simple_cube.scad ├── src │ ├── __init__.py │ ├── __pycache__ │ │ └── __init__.cpython-312.pyc │ ├── ai │ │ ├── ai_service.py │ │ ├── gemini_api.py │ │ └── venice_api.py │ ├── config.py │ ├── main_remote.py │ ├── main.py │ ├── main.py.new │ ├── models │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ └── code_generator.cpython-312.pyc │ │ ├── code_generator.py │ │ ├── cuda_mvs.py │ │ └── scad_templates │ │ └── basic_shapes.scad │ ├── nlp │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ └── parameter_extractor.cpython-312.pyc │ │ └── parameter_extractor.py │ ├── openscad_wrapper │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ └── wrapper.cpython-312.pyc │ │ └── wrapper.py │ ├── printer_discovery │ │ ├── __init__.py │ │ └── printer_discovery.py │ ├── remote │ │ ├── connection_manager.py │ │ ├── cuda_mvs_client.py │ │ ├── cuda_mvs_server.py │ │ └── error_handling.py │ ├── testing │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ ├── primitive_tester.cpython-312.pyc │ │ │ └── test_primitives.cpython-312.pyc │ │ ├── primitive_tester.py │ │ └── test_primitives.py │ ├── utils │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ ├── stl_exporter.cpython-312.pyc │ │ │ └── stl_validator.cpython-312.pyc │ │ ├── cad_exporter.py │ │ ├── format_validator.py │ │ ├── stl_exporter.py │ │ ├── stl_repair.py │ │ └── stl_validator.py │ ├── visualization │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ └── renderer.cpython-312.pyc │ │ ├── headless_renderer.py │ │ ├── renderer.py │ │ └── web_interface.py │ └── workflow │ ├── image_approval.py │ └── multi_view_to_model_pipeline.py ├── test_complete_workflow.py ├── test_cuda_mvs.py ├── test_gemini_api.py ├── test_image_approval_workflow.py ├── test_image_approval.py ├── test_image_to_model_pipeline.py ├── test_model_selection.py ├── test_multi_view_pipeline.py ├── test_primitives.sh ├── test_rabbit_direct.py ├── test_remote_cuda_mvs.py └── test_venice_example.py ``` # Files -------------------------------------------------------------------------------- /src/utils/format_validator.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | import logging 3 | import zipfile 4 | import xml.etree.ElementTree as ET 5 | from typing import Tuple, Optional, Dict, Any 6 | 7 | logger = logging.getLogger(__name__) 8 | 9 | class FormatValidator: 10 | """Validates 3D model formats for compatibility with printers.""" 11 | 12 | @staticmethod 13 | def validate_3mf(file_path: str) -> Tuple[bool, Optional[str]]: 14 | """ 15 | Validate a 3MF file for compatibility with Prusa and Bambu printers. 16 | 17 | Args: 18 | file_path: Path to the 3MF file 19 | 20 | Returns: 21 | Tuple of (is_valid, error_message) 22 | """ 23 | if not os.path.exists(file_path): 24 | return False, f"File not found: {file_path}" 25 | 26 | try: 27 | # 3MF files are ZIP archives with XML content 28 | with zipfile.ZipFile(file_path, 'r') as zip_ref: 29 | # Check for required files 30 | required_files = ['3D/3dmodel.model', '[Content_Types].xml'] 31 | for req_file in required_files: 32 | try: 33 | zip_ref.getinfo(req_file) 34 | except KeyError: 35 | return False, f"Missing required file in 3MF: {req_file}" 36 | 37 | # Validate 3D model file 38 | with zip_ref.open('3D/3dmodel.model') as model_file: 39 | tree = ET.parse(model_file) 40 | root = tree.getroot() 41 | 42 | # Check for required elements 43 | if root.tag != '{http://schemas.microsoft.com/3dmanufacturing/core/2015/02}model': 44 | return False, "Invalid 3MF: Missing model element" 45 | 46 | # Verify resources section exists 47 | resources = root.find('.//{http://schemas.microsoft.com/3dmanufacturing/core/2015/02}resources') 48 | if resources is None: 49 | return False, "Invalid 3MF: Missing resources element" 50 | 51 | return True, None 52 | except Exception as e: 53 | logger.error(f"Error validating 3MF file: {str(e)}") 54 | return False, f"Error validating 3MF file: {str(e)}" 55 | 56 | @staticmethod 57 | def validate_amf(file_path: str) -> Tuple[bool, Optional[str]]: 58 | """ 59 | Validate an AMF file for compatibility with printers. 60 | 61 | Args: 62 | file_path: Path to the AMF file 63 | 64 | Returns: 65 | Tuple of (is_valid, error_message) 66 | """ 67 | if not os.path.exists(file_path): 68 | return False, f"File not found: {file_path}" 69 | 70 | try: 71 | # Parse the AMF file (XML format) 72 | tree = ET.parse(file_path) 73 | root = tree.getroot() 74 | 75 | # Check for required elements 76 | if root.tag != 'amf': 77 | return False, "Invalid AMF: Missing amf root element" 78 | 79 | # Check for at least one object 80 | objects = root.findall('./object') 81 | if not objects: 82 | return False, "Invalid AMF: No objects found" 83 | 84 | # Check that each object has a mesh 85 | for obj in objects: 86 | mesh = obj.find('./mesh') 87 | if mesh is None: 88 | return False, f"Invalid AMF: Object {obj.get('id', 'unknown')} is missing a mesh" 89 | 90 | # Check for vertices and volumes 91 | vertices = mesh.find('./vertices') 92 | volumes = mesh.findall('./volume') 93 | 94 | if vertices is None: 95 | return False, f"Invalid AMF: Mesh in object {obj.get('id', 'unknown')} is missing vertices" 96 | 97 | if not volumes: 98 | return False, f"Invalid AMF: Mesh in object {obj.get('id', 'unknown')} has no volumes" 99 | 100 | return True, None 101 | except Exception as e: 102 | logger.error(f"Error validating AMF file: {str(e)}") 103 | return False, f"Error validating AMF file: {str(e)}" 104 | 105 | @staticmethod 106 | def extract_metadata(file_path: str) -> Dict[str, Any]: 107 | """ 108 | Extract metadata from a 3MF or AMF file. 109 | 110 | Args: 111 | file_path: Path to the 3D model file 112 | 113 | Returns: 114 | Dictionary of metadata 115 | """ 116 | metadata = {} 117 | 118 | # Check file extension 119 | ext = os.path.splitext(file_path)[1].lower() 120 | 121 | try: 122 | if ext == '.3mf': 123 | with zipfile.ZipFile(file_path, 'r') as zip_ref: 124 | metadata_path = "Metadata/model_metadata.xml" 125 | try: 126 | with zip_ref.open(metadata_path) as f: 127 | tree = ET.parse(f) 128 | root = tree.getroot() 129 | 130 | for meta in root.findall('./meta'): 131 | name = meta.get('name') 132 | if name: 133 | metadata[name] = meta.text 134 | except KeyError: 135 | # Metadata file doesn't exist 136 | pass 137 | 138 | elif ext == '.amf': 139 | tree = ET.parse(file_path) 140 | root = tree.getroot() 141 | 142 | for meta in root.findall('./metadata'): 143 | name = meta.get('type') 144 | if name: 145 | metadata[name] = meta.text 146 | except Exception as e: 147 | logger.error(f"Error extracting metadata: {str(e)}") 148 | 149 | return metadata 150 | 151 | @staticmethod 152 | def check_printer_compatibility(file_path: str, printer_type: str = "prusa") -> Tuple[bool, Optional[str]]: 153 | """ 154 | Check if a 3D model file is compatible with a specific printer type. 155 | 156 | Args: 157 | file_path: Path to the 3D model file 158 | printer_type: Type of printer ("prusa" or "bambu") 159 | 160 | Returns: 161 | Tuple of (is_compatible, error_message) 162 | """ 163 | # Check file extension 164 | ext = os.path.splitext(file_path)[1].lower() 165 | 166 | # Validate based on file format 167 | if ext == '.3mf': 168 | is_valid, error = FormatValidator.validate_3mf(file_path) 169 | if not is_valid: 170 | return False, error 171 | 172 | # Additional printer-specific checks 173 | if printer_type.lower() == "prusa": 174 | # Prusa-specific checks for 3MF 175 | # For now, just basic validation is sufficient 176 | return True, None 177 | 178 | elif printer_type.lower() == "bambu": 179 | # Bambu-specific checks for 3MF 180 | # For now, just basic validation is sufficient 181 | return True, None 182 | 183 | else: 184 | return False, f"Unknown printer type: {printer_type}" 185 | 186 | elif ext == '.amf': 187 | is_valid, error = FormatValidator.validate_amf(file_path) 188 | if not is_valid: 189 | return False, error 190 | 191 | # Additional printer-specific checks 192 | if printer_type.lower() == "prusa": 193 | # Prusa-specific checks for AMF 194 | # For now, just basic validation is sufficient 195 | return True, None 196 | 197 | elif printer_type.lower() == "bambu": 198 | # Bambu-specific checks for AMF 199 | # For now, just basic validation is sufficient 200 | return True, None 201 | 202 | else: 203 | return False, f"Unknown printer type: {printer_type}" 204 | 205 | else: 206 | return False, f"Unsupported file format for printer compatibility check: {ext}" 207 | ``` -------------------------------------------------------------------------------- /old/src/models/threestudio_generator.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | threestudio integration for 3D model generation from images. 3 | """ 4 | 5 | import os 6 | import subprocess 7 | import logging 8 | import json 9 | import tempfile 10 | from typing import Dict, Any, List, Optional 11 | from pathlib import Path 12 | 13 | logger = logging.getLogger(__name__) 14 | 15 | class ThreeStudioGenerator: 16 | """ 17 | Wrapper for threestudio for 3D model generation from images. 18 | """ 19 | 20 | def __init__(self, threestudio_path: str, output_dir: str = "output/models"): 21 | """ 22 | Initialize the threestudio generator. 23 | 24 | Args: 25 | threestudio_path: Path to threestudio installation 26 | output_dir: Directory to store output files 27 | """ 28 | self.threestudio_path = threestudio_path 29 | self.output_dir = output_dir 30 | 31 | # Create output directory if it doesn't exist 32 | os.makedirs(output_dir, exist_ok=True) 33 | 34 | # Validate threestudio installation 35 | self._validate_installation() 36 | 37 | def _validate_installation(self) -> None: 38 | """ 39 | Validate threestudio installation. 40 | 41 | Raises: 42 | FileNotFoundError: If threestudio installation is not found 43 | """ 44 | if not os.path.exists(self.threestudio_path): 45 | raise FileNotFoundError(f"threestudio not found at {self.threestudio_path}") 46 | 47 | # Check for required files 48 | required_files = ["launch.py", "README.md"] 49 | for file in required_files: 50 | if not os.path.exists(os.path.join(self.threestudio_path, file)): 51 | raise FileNotFoundError(f"Required file {file} not found in threestudio directory") 52 | 53 | def generate_model_from_image(self, image_path: str, method: str = "zero123", 54 | num_iterations: int = 5000, export_format: str = "obj", 55 | config_overrides: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 56 | """ 57 | Generate a 3D model from an image using threestudio. 58 | 59 | Args: 60 | image_path: Path to input image 61 | method: Method to use ("zero123", "sjc", "magic3d", etc.) 62 | num_iterations: Number of training iterations 63 | export_format: Format to export ("obj", "glb", "ply") 64 | config_overrides: Optional configuration overrides 65 | 66 | Returns: 67 | Dictionary containing paths to generated model files 68 | """ 69 | try: 70 | # Create a unique ID for this generation 71 | model_id = Path(image_path).stem 72 | 73 | # Create a temporary config file 74 | config_file = self._create_config_file(image_path, method, num_iterations, config_overrides) 75 | 76 | # Run threestudio 77 | output_dir = os.path.join(self.output_dir, model_id) 78 | os.makedirs(output_dir, exist_ok=True) 79 | 80 | cmd = [ 81 | "python", "launch.py", 82 | "--config", config_file, 83 | "--train", 84 | "--gpu", "0", 85 | "--output_dir", output_dir 86 | ] 87 | 88 | logger.info(f"Running threestudio with command: {' '.join(cmd)}") 89 | 90 | # Execute in threestudio directory 91 | process = subprocess.Popen( 92 | cmd, 93 | cwd=self.threestudio_path, 94 | stdout=subprocess.PIPE, 95 | stderr=subprocess.PIPE, 96 | text=True 97 | ) 98 | 99 | # Wait for process to complete 100 | stdout, stderr = process.communicate() 101 | 102 | if process.returncode != 0: 103 | logger.error(f"Error running threestudio: {stderr}") 104 | raise RuntimeError(f"threestudio failed with exit code {process.returncode}") 105 | 106 | # Export model 107 | exported_files = self._export_model(output_dir, export_format) 108 | 109 | return { 110 | "model_id": model_id, 111 | "output_dir": output_dir, 112 | "exported_files": exported_files, 113 | "preview_images": self._get_preview_images(output_dir) 114 | } 115 | except Exception as e: 116 | logger.error(f"Error generating 3D model with threestudio: {str(e)}") 117 | raise 118 | 119 | def _create_config_file(self, image_path: str, method: str, num_iterations: int, 120 | config_overrides: Optional[Dict[str, Any]] = None) -> str: 121 | """ 122 | Create a configuration file for threestudio. 123 | 124 | Args: 125 | image_path: Path to input image 126 | method: Method to use 127 | num_iterations: Number of training iterations 128 | config_overrides: Optional configuration overrides 129 | 130 | Returns: 131 | Path to the created configuration file 132 | """ 133 | # Base configuration 134 | config = { 135 | "method": method, 136 | "image_path": os.path.abspath(image_path), 137 | "num_iterations": num_iterations, 138 | "save_interval": 1000, 139 | "export_interval": 1000 140 | } 141 | 142 | # Apply overrides 143 | if config_overrides: 144 | config.update(config_overrides) 145 | 146 | # Write to temporary file 147 | fd, config_file = tempfile.mkstemp(suffix=".json") 148 | with os.fdopen(fd, 'w') as f: 149 | json.dump(config, f, indent=2) 150 | 151 | return config_file 152 | 153 | def _export_model(self, output_dir: str, export_format: str) -> List[str]: 154 | """ 155 | Export the model in the specified format. 156 | 157 | Args: 158 | output_dir: Directory containing the model 159 | export_format: Format to export 160 | 161 | Returns: 162 | List of paths to exported files 163 | """ 164 | # Find the latest checkpoint 165 | checkpoints_dir = os.path.join(output_dir, "checkpoints") 166 | if not os.path.exists(checkpoints_dir): 167 | raise FileNotFoundError(f"Checkpoints directory not found: {checkpoints_dir}") 168 | 169 | # Get the latest checkpoint 170 | checkpoints = sorted([f for f in os.listdir(checkpoints_dir) if f.endswith(".ckpt")]) 171 | if not checkpoints: 172 | raise FileNotFoundError("No checkpoints found") 173 | 174 | latest_checkpoint = os.path.join(checkpoints_dir, checkpoints[-1]) 175 | 176 | # Export command 177 | cmd = [ 178 | "python", "launch.py", 179 | "--config", os.path.join(output_dir, "config.yaml"), 180 | "--export", 181 | "--gpu", "0", 182 | "--checkpoint", latest_checkpoint, 183 | "--export_format", export_format 184 | ] 185 | 186 | logger.info(f"Exporting model with command: {' '.join(cmd)}") 187 | 188 | # Execute in threestudio directory 189 | process = subprocess.Popen( 190 | cmd, 191 | cwd=self.threestudio_path, 192 | stdout=subprocess.PIPE, 193 | stderr=subprocess.PIPE, 194 | text=True 195 | ) 196 | 197 | # Wait for process to complete 198 | stdout, stderr = process.communicate() 199 | 200 | if process.returncode != 0: 201 | logger.error(f"Error exporting model: {stderr}") 202 | raise RuntimeError(f"Model export failed with exit code {process.returncode}") 203 | 204 | # Find exported files 205 | exports_dir = os.path.join(output_dir, "exports") 206 | if not os.path.exists(exports_dir): 207 | raise FileNotFoundError(f"Exports directory not found: {exports_dir}") 208 | 209 | exported_files = [os.path.join(exports_dir, f) for f in os.listdir(exports_dir)] 210 | 211 | return exported_files 212 | 213 | def _get_preview_images(self, output_dir: str) -> List[str]: 214 | """ 215 | Get paths to preview images. 216 | 217 | Args: 218 | output_dir: Directory containing the model 219 | 220 | Returns: 221 | List of paths to preview images 222 | """ 223 | # Find preview images 224 | previews_dir = os.path.join(output_dir, "images") 225 | if not os.path.exists(previews_dir): 226 | return [] 227 | 228 | preview_images = [os.path.join(previews_dir, f) for f in os.listdir(previews_dir) 229 | if f.endswith(".png") or f.endswith(".jpg")] 230 | 231 | return sorted(preview_images) 232 | ``` -------------------------------------------------------------------------------- /test_image_approval_workflow.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Test script for the image approval workflow. 3 | """ 4 | 5 | import os 6 | import sys 7 | import json 8 | import logging 9 | import unittest 10 | from unittest.mock import patch, MagicMock 11 | from pathlib import Path 12 | 13 | # Add the src directory to the path 14 | sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 15 | 16 | from src.workflow.image_approval import ImageApprovalManager 17 | from src.config import IMAGE_APPROVAL 18 | 19 | # Configure logging 20 | logging.basicConfig(level=logging.INFO) 21 | logger = logging.getLogger(__name__) 22 | 23 | class TestImageApprovalWorkflow(unittest.TestCase): 24 | """ 25 | Test cases for the image approval workflow. 26 | """ 27 | 28 | def setUp(self): 29 | """ 30 | Set up test environment. 31 | """ 32 | # Create test output directories 33 | self.test_output_dir = "output/test_image_approval" 34 | self.test_approved_dir = os.path.join(self.test_output_dir, "approved") 35 | os.makedirs(self.test_output_dir, exist_ok=True) 36 | os.makedirs(self.test_approved_dir, exist_ok=True) 37 | 38 | # Create the approval manager 39 | self.approval_manager = ImageApprovalManager( 40 | output_dir=self.test_output_dir, 41 | approved_dir=self.test_approved_dir, 42 | min_approved_images=3, 43 | auto_approve=False 44 | ) 45 | 46 | # Create test images 47 | self.test_images = [] 48 | for i in range(5): 49 | image_path = os.path.join(self.test_output_dir, f"test_image_{i}.png") 50 | with open(image_path, "w") as f: 51 | f.write(f"test image data {i}") 52 | self.test_images.append({ 53 | "id": f"image_{i}", 54 | "local_path": image_path, 55 | "view_index": i, 56 | "view_direction": f"view_{i}" 57 | }) 58 | 59 | def test_add_images(self): 60 | """ 61 | Test adding images to the approval manager. 62 | """ 63 | # Add images 64 | self.approval_manager.add_images(self.test_images) 65 | 66 | # Verify images were added 67 | self.assertEqual(len(self.approval_manager.images), 5) 68 | self.assertEqual(len(self.approval_manager.pending_images), 5) 69 | self.assertEqual(len(self.approval_manager.approved_images), 0) 70 | self.assertEqual(len(self.approval_manager.rejected_images), 0) 71 | 72 | def test_approve_image(self): 73 | """ 74 | Test approving an image. 75 | """ 76 | # Add images 77 | self.approval_manager.add_images(self.test_images) 78 | 79 | # Approve an image 80 | result = self.approval_manager.approve_image("image_0") 81 | 82 | # Verify the result 83 | self.assertTrue(result["success"]) 84 | self.assertEqual(result["image_id"], "image_0") 85 | self.assertEqual(result["status"], "approved") 86 | 87 | # Verify the image was moved to approved 88 | self.assertEqual(len(self.approval_manager.pending_images), 4) 89 | self.assertEqual(len(self.approval_manager.approved_images), 1) 90 | self.assertEqual(len(self.approval_manager.rejected_images), 0) 91 | 92 | # Verify the image was copied to the approved directory 93 | approved_path = os.path.join(self.test_approved_dir, "image_0.png") 94 | self.assertTrue(os.path.exists(approved_path)) 95 | 96 | def test_reject_image(self): 97 | """ 98 | Test rejecting an image. 99 | """ 100 | # Add images 101 | self.approval_manager.add_images(self.test_images) 102 | 103 | # Reject an image 104 | result = self.approval_manager.reject_image("image_1") 105 | 106 | # Verify the result 107 | self.assertTrue(result["success"]) 108 | self.assertEqual(result["image_id"], "image_1") 109 | self.assertEqual(result["status"], "rejected") 110 | 111 | # Verify the image was moved to rejected 112 | self.assertEqual(len(self.approval_manager.pending_images), 4) 113 | self.assertEqual(len(self.approval_manager.approved_images), 0) 114 | self.assertEqual(len(self.approval_manager.rejected_images), 1) 115 | 116 | def test_get_approval_status(self): 117 | """ 118 | Test getting the approval status. 119 | """ 120 | # Add images 121 | self.approval_manager.add_images(self.test_images) 122 | 123 | # Approve some images 124 | self.approval_manager.approve_image("image_0") 125 | self.approval_manager.approve_image("image_1") 126 | self.approval_manager.approve_image("image_2") 127 | 128 | # Reject an image 129 | self.approval_manager.reject_image("image_3") 130 | 131 | # Get the status 132 | status = self.approval_manager.get_status() 133 | 134 | # Verify the status 135 | self.assertEqual(status["total_images"], 5) 136 | self.assertEqual(status["pending_count"], 1) 137 | self.assertEqual(status["approved_count"], 3) 138 | self.assertEqual(status["rejected_count"], 1) 139 | self.assertTrue(status["has_minimum_approved"]) 140 | self.assertEqual(len(status["approved_images"]), 3) 141 | self.assertEqual(len(status["pending_images"]), 1) 142 | self.assertEqual(len(status["rejected_images"]), 1) 143 | 144 | def test_get_approved_images(self): 145 | """ 146 | Test getting approved images. 147 | """ 148 | # Add images 149 | self.approval_manager.add_images(self.test_images) 150 | 151 | # Approve some images 152 | self.approval_manager.approve_image("image_0") 153 | self.approval_manager.approve_image("image_2") 154 | self.approval_manager.approve_image("image_4") 155 | 156 | # Get approved images 157 | approved = self.approval_manager.get_approved_images() 158 | 159 | # Verify approved images 160 | self.assertEqual(len(approved), 3) 161 | self.assertEqual(approved[0]["id"], "image_0") 162 | self.assertEqual(approved[1]["id"], "image_2") 163 | self.assertEqual(approved[2]["id"], "image_4") 164 | 165 | def test_auto_approve(self): 166 | """ 167 | Test auto-approval mode. 168 | """ 169 | # Create an auto-approve manager 170 | auto_manager = ImageApprovalManager( 171 | output_dir=self.test_output_dir, 172 | approved_dir=self.test_approved_dir, 173 | min_approved_images=3, 174 | auto_approve=True 175 | ) 176 | 177 | # Add images 178 | auto_manager.add_images(self.test_images) 179 | 180 | # Verify all images were auto-approved 181 | self.assertEqual(len(auto_manager.pending_images), 0) 182 | self.assertEqual(len(auto_manager.approved_images), 5) 183 | self.assertEqual(len(auto_manager.rejected_images), 0) 184 | 185 | def test_has_minimum_approved(self): 186 | """ 187 | Test checking if minimum approved images are met. 188 | """ 189 | # Add images 190 | self.approval_manager.add_images(self.test_images) 191 | 192 | # Initially should not have minimum 193 | self.assertFalse(self.approval_manager.has_minimum_approved()) 194 | 195 | # Approve two images 196 | self.approval_manager.approve_image("image_0") 197 | self.approval_manager.approve_image("image_1") 198 | 199 | # Still should not have minimum 200 | self.assertFalse(self.approval_manager.has_minimum_approved()) 201 | 202 | # Approve one more image 203 | self.approval_manager.approve_image("image_2") 204 | 205 | # Now should have minimum 206 | self.assertTrue(self.approval_manager.has_minimum_approved()) 207 | 208 | def test_save_and_load_state(self): 209 | """ 210 | Test saving and loading the approval state. 211 | """ 212 | # Add images 213 | self.approval_manager.add_images(self.test_images) 214 | 215 | # Approve and reject some images 216 | self.approval_manager.approve_image("image_0") 217 | self.approval_manager.approve_image("image_2") 218 | self.approval_manager.reject_image("image_3") 219 | 220 | # Save the state 221 | state_file = os.path.join(self.test_output_dir, "approval_state.json") 222 | self.approval_manager.save_state(state_file) 223 | 224 | # Create a new manager 225 | new_manager = ImageApprovalManager( 226 | output_dir=self.test_output_dir, 227 | approved_dir=self.test_approved_dir, 228 | min_approved_images=3, 229 | auto_approve=False 230 | ) 231 | 232 | # Load the state 233 | new_manager.load_state(state_file) 234 | 235 | # Verify the state was loaded correctly 236 | self.assertEqual(len(new_manager.images), 5) 237 | self.assertEqual(len(new_manager.pending_images), 2) 238 | self.assertEqual(len(new_manager.approved_images), 2) 239 | self.assertEqual(len(new_manager.rejected_images), 1) 240 | 241 | def tearDown(self): 242 | """ 243 | Clean up after tests. 244 | """ 245 | # Clean up test output directory 246 | import shutil 247 | if os.path.exists(self.test_output_dir): 248 | shutil.rmtree(self.test_output_dir) 249 | 250 | if __name__ == "__main__": 251 | unittest.main() 252 | ``` -------------------------------------------------------------------------------- /old/src/workflow/image_to_model_pipeline.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Workflow orchestration for the image-to-model pipeline. 3 | """ 4 | 5 | import os 6 | import logging 7 | import uuid 8 | from typing import Dict, Any, List, Optional, Tuple 9 | from pathlib import Path 10 | 11 | logger = logging.getLogger(__name__) 12 | 13 | class ImageToModelPipeline: 14 | """ 15 | Orchestrates the workflow from text prompt to 3D model: 16 | 1. Generate image with Venice.ai 17 | 2. Segment object with SAM2 18 | 3. Create 3D model with threestudio 19 | 4. Convert to OpenSCAD for parametric editing 20 | """ 21 | 22 | def __init__(self, 23 | venice_generator, 24 | sam_segmenter, 25 | threestudio_generator, 26 | openscad_wrapper, 27 | output_dir: str = "output/pipeline"): 28 | """ 29 | Initialize the pipeline. 30 | 31 | Args: 32 | venice_generator: Instance of VeniceImageGenerator 33 | sam_segmenter: Instance of SAMSegmenter 34 | threestudio_generator: Instance of ThreeStudioGenerator 35 | openscad_wrapper: Instance of OpenSCADWrapper 36 | output_dir: Directory to store output files 37 | """ 38 | self.venice_generator = venice_generator 39 | self.sam_segmenter = sam_segmenter 40 | self.threestudio_generator = threestudio_generator 41 | self.openscad_wrapper = openscad_wrapper 42 | self.output_dir = output_dir 43 | 44 | # Create output directories 45 | os.makedirs(os.path.join(output_dir, "images"), exist_ok=True) 46 | os.makedirs(os.path.join(output_dir, "masks"), exist_ok=True) 47 | os.makedirs(os.path.join(output_dir, "models"), exist_ok=True) 48 | os.makedirs(os.path.join(output_dir, "scad"), exist_ok=True) 49 | 50 | def generate_model_from_text(self, prompt: str, 51 | venice_params: Optional[Dict[str, Any]] = None, 52 | sam_params: Optional[Dict[str, Any]] = None, 53 | threestudio_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 54 | """ 55 | Generate a 3D model from a text prompt. 56 | 57 | Args: 58 | prompt: Text description for image generation 59 | venice_params: Optional parameters for Venice.ai 60 | sam_params: Optional parameters for SAM2 61 | threestudio_params: Optional parameters for threestudio 62 | 63 | Returns: 64 | Dictionary containing paths to generated files and metadata 65 | """ 66 | try: 67 | # Generate a unique ID for this pipeline run 68 | pipeline_id = str(uuid.uuid4()) 69 | logger.info(f"Starting pipeline {pipeline_id} for prompt: {prompt}") 70 | 71 | # Step 1: Generate image with Venice.ai 72 | image_path = os.path.join(self.output_dir, "images", f"{pipeline_id}.png") 73 | venice_result = self._generate_image(prompt, image_path, venice_params) 74 | 75 | # Step 2: Segment object with SAM2 76 | masks_dir = os.path.join(self.output_dir, "masks", pipeline_id) 77 | sam_result = self._segment_image(image_path, masks_dir, sam_params) 78 | 79 | # Get the best mask (highest score or first mask if no scores) 80 | if "scores" in sam_result and sam_result["scores"]: 81 | best_mask_idx = sam_result["scores"].index(max(sam_result["scores"])) 82 | best_mask_path = sam_result["mask_paths"][best_mask_idx] 83 | else: 84 | # If no scores available, use the first mask 85 | best_mask_path = sam_result["mask_paths"][0] if sam_result.get("mask_paths") else None 86 | 87 | if not best_mask_path: 88 | raise ValueError("No valid mask generated from segmentation") 89 | 90 | # Step 3: Create 3D model with threestudio 91 | threestudio_result = self._generate_3d_model(best_mask_path, threestudio_params) 92 | 93 | # Step 4: Convert to OpenSCAD for parametric editing 94 | scad_result = self._convert_to_openscad(threestudio_result["exported_files"][0], pipeline_id) 95 | 96 | # Compile results 97 | result = { 98 | "pipeline_id": pipeline_id, 99 | "prompt": prompt, 100 | "image": venice_result, 101 | "segmentation": sam_result, 102 | "model_3d": threestudio_result, 103 | "openscad": scad_result 104 | } 105 | 106 | logger.info(f"Pipeline {pipeline_id} completed successfully") 107 | return result 108 | except Exception as e: 109 | logger.error(f"Error in pipeline: {str(e)}") 110 | raise 111 | 112 | def _generate_image(self, prompt: str, output_path: str, 113 | params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 114 | """ 115 | Generate image with Venice.ai. 116 | 117 | Args: 118 | prompt: Text description for image generation 119 | output_path: Path to save the generated image 120 | params: Optional parameters for Venice.ai 121 | 122 | Returns: 123 | Dictionary containing image data and metadata 124 | """ 125 | logger.info(f"Generating image for prompt: {prompt}") 126 | 127 | # Default parameters 128 | default_params = { 129 | "model": "fluently-xl", # Default to fastest model 130 | "width": 1024, 131 | "height": 1024 132 | } 133 | 134 | # Merge with provided parameters 135 | if params: 136 | default_params.update(params) 137 | 138 | # Generate image 139 | result = self.venice_generator.generate_image( 140 | prompt=prompt, 141 | output_path=output_path, 142 | **default_params 143 | ) 144 | 145 | logger.info(f"Image generated: {output_path}") 146 | return result 147 | 148 | def _segment_image(self, image_path: str, output_dir: str, 149 | params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 150 | """ 151 | Segment object with SAM2. 152 | 153 | Args: 154 | image_path: Path to input image 155 | output_dir: Directory to save segmentation results 156 | params: Optional parameters for SAM2 157 | 158 | Returns: 159 | Dictionary containing segmentation masks and metadata 160 | """ 161 | logger.info(f"Segmenting image: {image_path}") 162 | 163 | # Segment image with SAM2 164 | # Check if points are provided in params 165 | points = params.get("points") if params else None 166 | 167 | if points: 168 | result = self.sam_segmenter.segment_image( 169 | image_path=image_path, 170 | points=points, 171 | output_dir=output_dir 172 | ) 173 | else: 174 | # Use automatic point generation 175 | result = self.sam_segmenter.segment_with_auto_points( 176 | image_path=image_path, 177 | output_dir=output_dir 178 | ) 179 | 180 | logger.info(f"Image segmented, {result.get('num_masks', 0)} masks generated") 181 | return result 182 | 183 | def _generate_3d_model(self, image_path: str, 184 | params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 185 | """ 186 | Generate 3D model with threestudio. 187 | 188 | Args: 189 | image_path: Path to input image 190 | params: Optional parameters for threestudio 191 | 192 | Returns: 193 | Dictionary containing paths to generated model files 194 | """ 195 | logger.info(f"Generating 3D model from image: {image_path}") 196 | 197 | # Default parameters 198 | default_params = { 199 | "method": "zero123", 200 | "num_iterations": 5000, 201 | "export_format": "obj" 202 | } 203 | 204 | # Merge with provided parameters 205 | if params: 206 | default_params.update(params) 207 | 208 | # Generate 3D model 209 | result = self.threestudio_generator.generate_model_from_image( 210 | image_path=image_path, 211 | **default_params 212 | ) 213 | 214 | logger.info(f"3D model generated: {result['exported_files']}") 215 | return result 216 | 217 | def _convert_to_openscad(self, model_path: str, model_id: str) -> Dict[str, Any]: 218 | """ 219 | Convert 3D model to OpenSCAD format. 220 | 221 | Args: 222 | model_path: Path to input model 223 | model_id: Unique identifier for the model 224 | 225 | Returns: 226 | Dictionary containing paths to generated files 227 | """ 228 | logger.info(f"Converting model to OpenSCAD: {model_path}") 229 | 230 | # Generate OpenSCAD code for importing the model 231 | scad_code = f"""// Generated OpenSCAD code for model {model_id} 232 | // Imported from {os.path.basename(model_path)} 233 | 234 | // Parameters 235 | scale_factor = 1.0; 236 | position_x = 0; 237 | position_y = 0; 238 | position_z = 0; 239 | rotation_x = 0; 240 | rotation_y = 0; 241 | rotation_z = 0; 242 | 243 | // Import and transform the model 244 | translate([position_x, position_y, position_z]) 245 | rotate([rotation_x, rotation_y, rotation_z]) 246 | scale(scale_factor) 247 | import("{model_path}"); 248 | """ 249 | 250 | # Save SCAD code to file 251 | scad_file = self.openscad_wrapper.generate_scad(scad_code, model_id) 252 | 253 | # Generate previews 254 | previews = self.openscad_wrapper.generate_multi_angle_previews(scad_file) 255 | 256 | return { 257 | "scad_file": scad_file, 258 | "previews": previews, 259 | "model_path": model_path 260 | } 261 | ``` -------------------------------------------------------------------------------- /test_remote_cuda_mvs.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Test script for remote CUDA Multi-View Stereo processing. 3 | """ 4 | 5 | import os 6 | import sys 7 | import logging 8 | import unittest 9 | from unittest.mock import patch, MagicMock 10 | from pathlib import Path 11 | 12 | # Add the src directory to the path 13 | sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 14 | 15 | from src.remote.cuda_mvs_client import CUDAMVSClient 16 | from src.remote.connection_manager import CUDAMVSConnectionManager 17 | from src.config import REMOTE_CUDA_MVS 18 | 19 | # Configure logging 20 | logging.basicConfig(level=logging.INFO) 21 | logger = logging.getLogger(__name__) 22 | 23 | class TestRemoteCUDAMVS(unittest.TestCase): 24 | """ 25 | Test cases for remote CUDA Multi-View Stereo processing. 26 | """ 27 | 28 | def setUp(self): 29 | """ 30 | Set up test environment. 31 | """ 32 | # Create test output directories 33 | self.test_output_dir = "output/test_remote_cuda_mvs" 34 | os.makedirs(self.test_output_dir, exist_ok=True) 35 | 36 | # Mock API key 37 | self.api_key = "test_api_key" 38 | 39 | # Create the client with the mock API key 40 | self.client = CUDAMVSClient( 41 | api_key=self.api_key, 42 | output_dir=self.test_output_dir 43 | ) 44 | 45 | # Create the connection manager with the mock API key 46 | self.connection_manager = CUDAMVSConnectionManager( 47 | api_key=self.api_key, 48 | discovery_port=REMOTE_CUDA_MVS["DISCOVERY_PORT"], 49 | use_lan_discovery=True 50 | ) 51 | 52 | @patch('src.remote.cuda_mvs_client.requests.post') 53 | def test_upload_images(self, mock_post): 54 | """ 55 | Test uploading images to a remote CUDA MVS server. 56 | """ 57 | # Mock response 58 | mock_response = MagicMock() 59 | mock_response.status_code = 200 60 | mock_response.json.return_value = { 61 | "job_id": "test_job_123", 62 | "status": "uploaded", 63 | "message": "Images uploaded successfully" 64 | } 65 | mock_post.return_value = mock_response 66 | 67 | # Test parameters 68 | server_url = "http://test-server:8765" 69 | image_paths = [ 70 | os.path.join(self.test_output_dir, "test_image_1.png"), 71 | os.path.join(self.test_output_dir, "test_image_2.png") 72 | ] 73 | 74 | # Create test images 75 | for path in image_paths: 76 | with open(path, "w") as f: 77 | f.write("test image data") 78 | 79 | # Call the method 80 | result = self.client.upload_images(server_url, image_paths) 81 | 82 | # Verify the result 83 | self.assertIsNotNone(result) 84 | self.assertEqual(result["job_id"], "test_job_123") 85 | self.assertEqual(result["status"], "uploaded") 86 | 87 | # Verify the API call 88 | mock_post.assert_called_once() 89 | args, kwargs = mock_post.call_args 90 | self.assertTrue(server_url in args[0]) 91 | self.assertEqual(kwargs["headers"]["Authorization"], f"Bearer {self.api_key}") 92 | 93 | @patch('src.remote.cuda_mvs_client.requests.post') 94 | def test_process_job(self, mock_post): 95 | """ 96 | Test processing a job on a remote CUDA MVS server. 97 | """ 98 | # Mock response 99 | mock_response = MagicMock() 100 | mock_response.status_code = 200 101 | mock_response.json.return_value = { 102 | "job_id": "test_job_123", 103 | "status": "processing", 104 | "message": "Job started processing" 105 | } 106 | mock_post.return_value = mock_response 107 | 108 | # Test parameters 109 | server_url = "http://test-server:8765" 110 | job_id = "test_job_123" 111 | params = { 112 | "quality": "normal", 113 | "output_format": "obj" 114 | } 115 | 116 | # Call the method 117 | result = self.client.process_job(server_url, job_id, params) 118 | 119 | # Verify the result 120 | self.assertIsNotNone(result) 121 | self.assertEqual(result["job_id"], "test_job_123") 122 | self.assertEqual(result["status"], "processing") 123 | 124 | # Verify the API call 125 | mock_post.assert_called_once() 126 | args, kwargs = mock_post.call_args 127 | self.assertTrue(server_url in args[0]) 128 | self.assertEqual(kwargs["headers"]["Authorization"], f"Bearer {self.api_key}") 129 | self.assertEqual(kwargs["json"]["job_id"], job_id) 130 | self.assertEqual(kwargs["json"]["params"]["quality"], "normal") 131 | 132 | @patch('src.remote.cuda_mvs_client.requests.get') 133 | def test_get_job_status(self, mock_get): 134 | """ 135 | Test getting the status of a job on a remote CUDA MVS server. 136 | """ 137 | # Mock response 138 | mock_response = MagicMock() 139 | mock_response.status_code = 200 140 | mock_response.json.return_value = { 141 | "job_id": "test_job_123", 142 | "status": "completed", 143 | "progress": 100, 144 | "message": "Job completed successfully" 145 | } 146 | mock_get.return_value = mock_response 147 | 148 | # Test parameters 149 | server_url = "http://test-server:8765" 150 | job_id = "test_job_123" 151 | 152 | # Call the method 153 | result = self.client.get_job_status(server_url, job_id) 154 | 155 | # Verify the result 156 | self.assertIsNotNone(result) 157 | self.assertEqual(result["job_id"], "test_job_123") 158 | self.assertEqual(result["status"], "completed") 159 | self.assertEqual(result["progress"], 100) 160 | 161 | # Verify the API call 162 | mock_get.assert_called_once() 163 | args, kwargs = mock_get.call_args 164 | self.assertTrue(server_url in args[0]) 165 | self.assertEqual(kwargs["headers"]["Authorization"], f"Bearer {self.api_key}") 166 | 167 | @patch('src.remote.cuda_mvs_client.requests.get') 168 | def test_download_model(self, mock_get): 169 | """ 170 | Test downloading a model from a remote CUDA MVS server. 171 | """ 172 | # Mock response 173 | mock_response = MagicMock() 174 | mock_response.status_code = 200 175 | mock_response.content = b"test model data" 176 | mock_get.return_value = mock_response 177 | 178 | # Test parameters 179 | server_url = "http://test-server:8765" 180 | job_id = "test_job_123" 181 | output_dir = os.path.join(self.test_output_dir, "models") 182 | os.makedirs(output_dir, exist_ok=True) 183 | 184 | # Call the method 185 | result = self.client.download_model(server_url, job_id, output_dir) 186 | 187 | # Verify the result 188 | self.assertIsNotNone(result) 189 | self.assertTrue("model_path" in result) 190 | self.assertTrue(os.path.exists(result["model_path"])) 191 | 192 | # Verify the API call 193 | mock_get.assert_called() 194 | args, kwargs = mock_get.call_args_list[0] 195 | self.assertTrue(server_url in args[0]) 196 | self.assertEqual(kwargs["headers"]["Authorization"], f"Bearer {self.api_key}") 197 | 198 | @patch('src.remote.connection_manager.zeroconf.Zeroconf') 199 | def test_discover_servers(self, mock_zeroconf): 200 | """ 201 | Test discovering CUDA MVS servers on the network. 202 | """ 203 | # Mock Zeroconf 204 | mock_zeroconf_instance = MagicMock() 205 | mock_zeroconf.return_value = mock_zeroconf_instance 206 | 207 | # Mock ServiceBrowser 208 | with patch('src.remote.connection_manager.ServiceBrowser') as mock_browser: 209 | # Set up the connection manager to discover servers 210 | self.connection_manager.discover_servers() 211 | 212 | # Verify Zeroconf was initialized 213 | mock_zeroconf.assert_called_once() 214 | 215 | # Verify ServiceBrowser was initialized 216 | mock_browser.assert_called_once() 217 | args, kwargs = mock_browser.call_args 218 | self.assertEqual(args[0], mock_zeroconf_instance) 219 | self.assertEqual(args[1], "_cudamvs._tcp.local.") 220 | 221 | @patch('src.remote.connection_manager.CUDAMVSClient') 222 | def test_upload_images_with_connection_manager(self, mock_client_class): 223 | """ 224 | Test uploading images using the connection manager. 225 | """ 226 | # Mock client 227 | mock_client = MagicMock() 228 | mock_client_class.return_value = mock_client 229 | 230 | # Mock upload_images method 231 | mock_client.upload_images.return_value = { 232 | "job_id": "test_job_123", 233 | "status": "uploaded", 234 | "message": "Images uploaded successfully" 235 | } 236 | 237 | # Add a mock server 238 | self.connection_manager.servers = { 239 | "test_server": { 240 | "id": "test_server", 241 | "name": "Test Server", 242 | "url": "http://test-server:8765", 243 | "status": "online" 244 | } 245 | } 246 | 247 | # Test parameters 248 | server_id = "test_server" 249 | image_paths = [ 250 | os.path.join(self.test_output_dir, "test_image_1.png"), 251 | os.path.join(self.test_output_dir, "test_image_2.png") 252 | ] 253 | 254 | # Create test images 255 | for path in image_paths: 256 | with open(path, "w") as f: 257 | f.write("test image data") 258 | 259 | # Call the method 260 | result = self.connection_manager.upload_images(server_id, image_paths) 261 | 262 | # Verify the result 263 | self.assertIsNotNone(result) 264 | self.assertEqual(result["job_id"], "test_job_123") 265 | self.assertEqual(result["status"], "uploaded") 266 | 267 | # Verify the client method was called 268 | mock_client.upload_images.assert_called_once() 269 | args, kwargs = mock_client.upload_images.call_args 270 | self.assertEqual(args[0], "http://test-server:8765") 271 | self.assertEqual(args[1], image_paths) 272 | 273 | def tearDown(self): 274 | """ 275 | Clean up after tests. 276 | """ 277 | # Clean up test output directory 278 | import shutil 279 | if os.path.exists(self.test_output_dir): 280 | shutil.rmtree(self.test_output_dir) 281 | 282 | if __name__ == "__main__": 283 | unittest.main() 284 | ``` -------------------------------------------------------------------------------- /src/utils/cad_exporter.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | import logging 3 | import subprocess 4 | from typing import Dict, Any, Optional, Tuple, List 5 | 6 | logger = logging.getLogger(__name__) 7 | 8 | class CADExporter: 9 | """ 10 | Exports OpenSCAD models to various CAD formats that preserve parametric properties. 11 | """ 12 | 13 | def __init__(self, openscad_path: str = "openscad"): 14 | """ 15 | Initialize the CAD exporter. 16 | 17 | Args: 18 | openscad_path: Path to the OpenSCAD executable 19 | """ 20 | self.openscad_path = openscad_path 21 | 22 | # Supported export formats 23 | self.supported_formats = { 24 | "csg": "OpenSCAD CSG format (preserves all parametric properties)", 25 | "amf": "Additive Manufacturing File Format (preserves some metadata)", 26 | "3mf": "3D Manufacturing Format (modern replacement for STL with metadata)", 27 | "scad": "OpenSCAD source code (fully parametric)", 28 | "dxf": "Drawing Exchange Format (for 2D designs)", 29 | "svg": "Scalable Vector Graphics (for 2D designs)" 30 | } 31 | 32 | def export_model(self, scad_file: str, output_format: str = "csg", 33 | parameters: Optional[Dict[str, Any]] = None, 34 | metadata: Optional[Dict[str, Any]] = None) -> Tuple[bool, str, Optional[str]]: 35 | """ 36 | Export an OpenSCAD model to the specified format. 37 | 38 | Args: 39 | scad_file: Path to the SCAD file 40 | output_format: Format to export to (csg, amf, 3mf, etc.) 41 | parameters: Optional parameters to override in the SCAD file 42 | metadata: Optional metadata to include in the export 43 | 44 | Returns: 45 | Tuple of (success, output_file_path, error_message) 46 | """ 47 | if not os.path.exists(scad_file): 48 | return False, "", f"SCAD file not found: {scad_file}" 49 | 50 | # Create output file path 51 | output_dir = os.path.dirname(scad_file) 52 | model_id = os.path.basename(scad_file).split('.')[0] 53 | 54 | # Special case for SCAD format - just copy the file with parameters embedded 55 | if output_format.lower() == "scad" and parameters: 56 | return self._export_parametric_scad(scad_file, parameters, metadata) 57 | 58 | # For native OpenSCAD formats 59 | output_file = os.path.join(output_dir, f"{model_id}.{output_format.lower()}") 60 | 61 | # Build command 62 | cmd = [self.openscad_path, "-o", output_file] 63 | 64 | # Add parameters if provided 65 | if parameters: 66 | for key, value in parameters.items(): 67 | cmd.extend(["-D", f"{key}={value}"]) 68 | 69 | # Add input file 70 | cmd.append(scad_file) 71 | 72 | try: 73 | # Run OpenSCAD 74 | result = subprocess.run(cmd, check=True, capture_output=True, text=True) 75 | 76 | # Check if file was created 77 | if os.path.exists(output_file) and os.path.getsize(output_file) > 0: 78 | logger.info(f"Exported model to {output_format}: {output_file}") 79 | 80 | # Add metadata if supported and provided 81 | if metadata and output_format.lower() in ["amf", "3mf"]: 82 | self._add_metadata_to_file(output_file, metadata, output_format) 83 | 84 | return True, output_file, None 85 | else: 86 | error_msg = f"Failed to export model to {output_format}" 87 | logger.error(error_msg) 88 | logger.error(f"OpenSCAD output: {result.stdout}") 89 | logger.error(f"OpenSCAD error: {result.stderr}") 90 | return False, "", error_msg 91 | except subprocess.CalledProcessError as e: 92 | error_msg = f"Error exporting model to {output_format}: {e.stderr}" 93 | logger.error(error_msg) 94 | return False, "", error_msg 95 | except Exception as e: 96 | error_msg = f"Error exporting model to {output_format}: {str(e)}" 97 | logger.error(error_msg) 98 | return False, "", error_msg 99 | 100 | def _export_parametric_scad(self, scad_file: str, parameters: Dict[str, Any], 101 | metadata: Optional[Dict[str, Any]] = None) -> Tuple[bool, str, Optional[str]]: 102 | """ 103 | Create a new SCAD file with parameters embedded as variables. 104 | 105 | Args: 106 | scad_file: Path to the original SCAD file 107 | parameters: Parameters to embed in the SCAD file 108 | metadata: Optional metadata to include as comments 109 | 110 | Returns: 111 | Tuple of (success, output_file_path, error_message) 112 | """ 113 | try: 114 | # Read the original SCAD file 115 | with open(scad_file, 'r') as f: 116 | content = f.read() 117 | 118 | # Create output file path 119 | output_dir = os.path.dirname(scad_file) 120 | model_id = os.path.basename(scad_file).split('.')[0] 121 | output_file = os.path.join(output_dir, f"{model_id}_parametric.scad") 122 | 123 | # Create parameter declarations 124 | param_declarations = [] 125 | for key, value in parameters.items(): 126 | if isinstance(value, str): 127 | param_declarations.append(f'{key} = "{value}";') 128 | else: 129 | param_declarations.append(f'{key} = {value};') 130 | 131 | # Create metadata comments 132 | metadata_comments = [] 133 | if metadata: 134 | metadata_comments.append("// Metadata:") 135 | for key, value in metadata.items(): 136 | metadata_comments.append(f"// {key}: {value}") 137 | 138 | # Combine everything 139 | new_content = "// Parametric model generated by OpenSCAD MCP Server\n" 140 | new_content += "\n".join(metadata_comments) + "\n\n" if metadata_comments else "\n" 141 | new_content += "// Parameters:\n" 142 | new_content += "\n".join(param_declarations) + "\n\n" 143 | new_content += content 144 | 145 | # Write to the new file 146 | with open(output_file, 'w') as f: 147 | f.write(new_content) 148 | 149 | logger.info(f"Exported parametric SCAD file: {output_file}") 150 | return True, output_file, None 151 | except Exception as e: 152 | error_msg = f"Error creating parametric SCAD file: {str(e)}" 153 | logger.error(error_msg) 154 | return False, "", error_msg 155 | 156 | def _add_metadata_to_file(self, file_path: str, metadata: Dict[str, Any], format_type: str) -> None: 157 | """ 158 | Add metadata to supported file formats. 159 | 160 | Args: 161 | file_path: Path to the file 162 | metadata: Metadata to add 163 | format_type: File format 164 | """ 165 | if format_type.lower() == "amf": 166 | self._add_metadata_to_amf(file_path, metadata) 167 | elif format_type.lower() == "3mf": 168 | self._add_metadata_to_3mf(file_path, metadata) 169 | 170 | def _add_metadata_to_amf(self, file_path: str, metadata: Dict[str, Any]) -> None: 171 | """Add metadata to AMF file.""" 172 | try: 173 | import xml.etree.ElementTree as ET 174 | 175 | # Parse the AMF file 176 | tree = ET.parse(file_path) 177 | root = tree.getroot() 178 | 179 | # Find or create metadata element 180 | metadata_elem = root.find("metadata") 181 | if metadata_elem is None: 182 | metadata_elem = ET.SubElement(root, "metadata") 183 | 184 | # Add metadata 185 | for key, value in metadata.items(): 186 | meta = ET.SubElement(metadata_elem, "meta", name=key) 187 | meta.text = str(value) 188 | 189 | # Write back to file 190 | tree.write(file_path) 191 | logger.info(f"Added metadata to AMF file: {file_path}") 192 | except Exception as e: 193 | logger.error(f"Error adding metadata to AMF file: {str(e)}") 194 | 195 | def _add_metadata_to_3mf(self, file_path: str, metadata: Dict[str, Any]) -> None: 196 | """Add metadata to 3MF file.""" 197 | try: 198 | import zipfile 199 | import xml.etree.ElementTree as ET 200 | 201 | # 3MF files are ZIP archives 202 | with zipfile.ZipFile(file_path, 'a') as z: 203 | # Check if metadata file exists 204 | metadata_path = "Metadata/model_metadata.xml" 205 | try: 206 | z.getinfo(metadata_path) 207 | # Extract existing metadata 208 | with z.open(metadata_path) as f: 209 | tree = ET.parse(f) 210 | root = tree.getroot() 211 | except KeyError: 212 | # Create new metadata file 213 | root = ET.Element("metadata") 214 | tree = ET.ElementTree(root) 215 | 216 | # Add metadata 217 | for key, value in metadata.items(): 218 | meta = ET.SubElement(root, "meta", name=key) 219 | meta.text = str(value) 220 | 221 | # Write metadata to a temporary file 222 | temp_path = file_path + ".metadata.tmp" 223 | tree.write(temp_path) 224 | 225 | # Add to ZIP 226 | z.write(temp_path, metadata_path) 227 | 228 | # Remove temporary file 229 | os.remove(temp_path) 230 | 231 | logger.info(f"Added metadata to 3MF file: {file_path}") 232 | except Exception as e: 233 | logger.error(f"Error adding metadata to 3MF file: {str(e)}") 234 | 235 | def get_supported_formats(self) -> List[str]: 236 | """Get list of supported export formats.""" 237 | return list(self.supported_formats.keys()) 238 | 239 | def get_format_description(self, format_name: str) -> str: 240 | """Get description of a format.""" 241 | return self.supported_formats.get(format_name.lower(), "Unknown format") 242 | ``` -------------------------------------------------------------------------------- /src/ai/ai_service.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | import logging 3 | import re 4 | from typing import Dict, Any, Optional 5 | 6 | logger = logging.getLogger(__name__) 7 | 8 | class AIService: 9 | """ 10 | Service for AI-driven OpenSCAD code generation. 11 | Translates natural language descriptions into OpenSCAD code. 12 | """ 13 | 14 | def __init__(self, templates_dir: str, model_config: Optional[Dict[str, Any]] = None): 15 | """ 16 | Initialize the AI service. 17 | 18 | Args: 19 | templates_dir: Directory containing OpenSCAD templates 20 | model_config: Optional configuration for the AI model 21 | """ 22 | self.templates_dir = templates_dir 23 | self.model_config = model_config or {} 24 | 25 | # Load templates 26 | self.templates = self._load_templates() 27 | 28 | logger.info(f"Initialized AI service with {len(self.templates)} templates") 29 | 30 | def generate_openscad_code(self, context: Dict[str, Any]) -> str: 31 | """ 32 | Generate OpenSCAD code from natural language description. 33 | 34 | Args: 35 | context: Dictionary containing: 36 | - description: Natural language description 37 | - parameters: Dictionary of parameters 38 | - templates_dir: Directory containing templates 39 | 40 | Returns: 41 | Generated OpenSCAD code 42 | """ 43 | description = context.get("description", "") 44 | parameters = context.get("parameters", {}) 45 | 46 | logger.info(f"Generating OpenSCAD code for: {description}") 47 | 48 | # Parse the description to identify key components 49 | components = self._parse_description(description) 50 | 51 | # Generate code based on identified components 52 | code = self._generate_code_from_components(components, parameters) 53 | 54 | return code 55 | 56 | def _load_templates(self) -> Dict[str, str]: 57 | """Load OpenSCAD code templates from the templates directory.""" 58 | templates = {} 59 | 60 | # Check if templates directory exists 61 | if not os.path.exists(self.templates_dir): 62 | logger.warning(f"Templates directory not found: {self.templates_dir}") 63 | return templates 64 | 65 | # Load all .scad files in the templates directory 66 | for filename in os.listdir(self.templates_dir): 67 | if filename.endswith(".scad"): 68 | template_name = os.path.splitext(filename)[0] 69 | template_path = os.path.join(self.templates_dir, filename) 70 | 71 | try: 72 | with open(template_path, 'r') as f: 73 | templates[template_name] = f.read() 74 | except Exception as e: 75 | logger.error(f"Error loading template {template_path}: {e}") 76 | 77 | return templates 78 | 79 | def _parse_description(self, description: str) -> Dict[str, Any]: 80 | """ 81 | Parse a natural language description to identify key components. 82 | 83 | Args: 84 | description: Natural language description of the model 85 | 86 | Returns: 87 | Dictionary of identified components 88 | """ 89 | components = { 90 | "primary_shape": None, 91 | "operations": [], 92 | "features": [], 93 | "modifiers": [] 94 | } 95 | 96 | # Identify primary shape 97 | shape_patterns = { 98 | "cube": r'\b(cube|box|rectangular|block)\b', 99 | "sphere": r'\b(sphere|ball|round|circular)\b', 100 | "cylinder": r'\b(cylinder|tube|pipe|rod)\b', 101 | "cone": r'\b(cone|pyramid|tapered)\b', 102 | "torus": r'\b(torus|donut|ring)\b' 103 | } 104 | 105 | for shape, pattern in shape_patterns.items(): 106 | if re.search(pattern, description, re.IGNORECASE): 107 | components["primary_shape"] = shape 108 | break 109 | 110 | # Identify operations 111 | operation_patterns = { 112 | "union": r'\b(combine|join|merge|add)\b', 113 | "difference": r'\b(subtract|remove|cut|hole|hollow)\b', 114 | "intersection": r'\b(intersect|common|shared)\b' 115 | } 116 | 117 | for operation, pattern in operation_patterns.items(): 118 | if re.search(pattern, description, re.IGNORECASE): 119 | components["operations"].append(operation) 120 | 121 | # Identify features 122 | feature_patterns = { 123 | "rounded_corners": r'\b(rounded corners|fillets|chamfer)\b', 124 | "holes": r'\b(holes|perforations|openings)\b', 125 | "text": r'\b(text|label|inscription)\b', 126 | "pattern": r'\b(pattern|array|grid|repeat)\b' 127 | } 128 | 129 | for feature, pattern in feature_patterns.items(): 130 | if re.search(pattern, description, re.IGNORECASE): 131 | components["features"].append(feature) 132 | 133 | # Identify modifiers 134 | modifier_patterns = { 135 | "scale": r'\b(scale|resize|proportion)\b', 136 | "rotate": r'\b(rotate|turn|spin|angle)\b', 137 | "translate": r'\b(move|shift|position|place)\b', 138 | "mirror": r'\b(mirror|reflect|flip)\b' 139 | } 140 | 141 | for modifier, pattern in modifier_patterns.items(): 142 | if re.search(pattern, description, re.IGNORECASE): 143 | components["modifiers"].append(modifier) 144 | 145 | logger.info(f"Parsed components: {components}") 146 | return components 147 | 148 | def _generate_code_from_components(self, components: Dict[str, Any], parameters: Dict[str, Any]) -> str: 149 | """ 150 | Generate OpenSCAD code based on identified components. 151 | 152 | Args: 153 | components: Dictionary of identified components 154 | parameters: Dictionary of parameters 155 | 156 | Returns: 157 | Generated OpenSCAD code 158 | """ 159 | code = [] 160 | 161 | # Add header 162 | code.append("// AI-generated OpenSCAD code") 163 | code.append("// Generated from natural language description") 164 | code.append("") 165 | 166 | # Add parameter declarations 167 | code.append("// Parameters") 168 | for param, value in parameters.items(): 169 | if isinstance(value, str) and not (value.lower() == 'true' or value.lower() == 'false'): 170 | code.append(f'{param} = "{value}";') 171 | else: 172 | code.append(f"{param} = {value};") 173 | code.append("") 174 | 175 | # Generate code for primary shape 176 | primary_shape = components.get("primary_shape") 177 | if not primary_shape: 178 | primary_shape = "cube" # Default to cube if no shape is identified 179 | 180 | # Start with operations if any 181 | operations = components.get("operations", []) 182 | if operations: 183 | for operation in operations: 184 | code.append(f"{operation}() {{") 185 | code.append(" // Primary shape") 186 | 187 | # Add modifiers if any 188 | modifiers = components.get("modifiers", []) 189 | indent = " " if operations else "" 190 | 191 | if modifiers: 192 | for modifier in modifiers: 193 | if modifier == "scale": 194 | scale_value = parameters.get("scale", 1) 195 | code.append(f"{indent}scale([{scale_value}, {scale_value}, {scale_value}])") 196 | elif modifier == "rotate": 197 | angle = parameters.get("angle", 0) 198 | code.append(f"{indent}rotate([0, 0, {angle}])") 199 | elif modifier == "translate": 200 | x = parameters.get("x", 0) 201 | y = parameters.get("y", 0) 202 | z = parameters.get("z", 0) 203 | code.append(f"{indent}translate([{x}, {y}, {z}])") 204 | elif modifier == "mirror": 205 | code.append(f"{indent}mirror([0, 0, 1])") 206 | 207 | # Add the primary shape 208 | if primary_shape == "cube": 209 | width = parameters.get("width", 10) 210 | depth = parameters.get("depth", 10) 211 | height = parameters.get("height", 10) 212 | center = parameters.get("center", "true") 213 | code.append(f"{indent}cube([{width}, {depth}, {height}], center={center});") 214 | elif primary_shape == "sphere": 215 | radius = parameters.get("radius", 10) 216 | segments = parameters.get("segments", 32) 217 | code.append(f"{indent}sphere(r={radius}, $fn={segments});") 218 | elif primary_shape == "cylinder": 219 | radius = parameters.get("radius", 10) 220 | height = parameters.get("height", 20) 221 | center = parameters.get("center", "true") 222 | segments = parameters.get("segments", 32) 223 | code.append(f"{indent}cylinder(h={height}, r={radius}, center={center}, $fn={segments});") 224 | elif primary_shape == "cone": 225 | base_radius = parameters.get("base_radius", 10) 226 | height = parameters.get("height", 20) 227 | center = parameters.get("center", "true") 228 | segments = parameters.get("segments", 32) 229 | code.append(f"{indent}cylinder(h={height}, r1={base_radius}, r2=0, center={center}, $fn={segments});") 230 | elif primary_shape == "torus": 231 | major_radius = parameters.get("major_radius", 20) 232 | minor_radius = parameters.get("minor_radius", 5) 233 | segments = parameters.get("segments", 32) 234 | code.append(f"{indent}rotate_extrude($fn={segments})") 235 | code.append(f"{indent} translate([{major_radius}, 0, 0])") 236 | code.append(f"{indent} circle(r={minor_radius}, $fn={segments});") 237 | 238 | # Add features if any 239 | features = components.get("features", []) 240 | if features and "holes" in features: 241 | code.append("") 242 | code.append(f"{indent}// Add holes") 243 | code.append(f"{indent}difference() {{") 244 | code.append(f"{indent} children(0);") # Reference the primary shape 245 | 246 | # Add a sample hole 247 | hole_radius = parameters.get("hole_radius", 2) 248 | code.append(f"{indent} translate([0, 0, 0])") 249 | code.append(f"{indent} cylinder(h=100, r={hole_radius}, center=true, $fn=32);") 250 | 251 | code.append(f"{indent}}}") 252 | 253 | # Close operations if any 254 | if operations: 255 | code.append("}") 256 | 257 | return "\n".join(code) 258 | ``` -------------------------------------------------------------------------------- /test_multi_view_pipeline.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Test script for multi-view to model pipeline. 3 | """ 4 | 5 | import os 6 | import sys 7 | import logging 8 | import unittest 9 | from unittest.mock import patch, MagicMock 10 | from pathlib import Path 11 | 12 | # Add the src directory to the path 13 | sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 14 | 15 | from src.workflow.multi_view_to_model_pipeline import MultiViewToModelPipeline 16 | from src.ai.gemini_api import GeminiImageGenerator 17 | from src.models.cuda_mvs import CUDAMultiViewStereo 18 | from src.workflow.image_approval import ImageApprovalTool 19 | 20 | # Configure logging 21 | logging.basicConfig(level=logging.INFO) 22 | logger = logging.getLogger(__name__) 23 | 24 | class TestMultiViewPipeline(unittest.TestCase): 25 | """ 26 | Test cases for multi-view to model pipeline. 27 | """ 28 | 29 | def setUp(self): 30 | """ 31 | Set up test environment. 32 | """ 33 | # Create test directories 34 | self.test_output_dir = "output/test_pipeline" 35 | self.test_images_dir = os.path.join(self.test_output_dir, "images") 36 | self.test_models_dir = os.path.join(self.test_output_dir, "models") 37 | 38 | for directory in [self.test_output_dir, self.test_images_dir, self.test_models_dir]: 39 | os.makedirs(directory, exist_ok=True) 40 | 41 | # Create mock CUDA MVS path 42 | self.cuda_mvs_path = "mock_cuda_mvs" 43 | os.makedirs(os.path.join(self.cuda_mvs_path, "build"), exist_ok=True) 44 | 45 | # Create mock executable 46 | with open(os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), "w") as f: 47 | f.write("#!/bin/bash\necho 'Mock CUDA MVS'\n") 48 | os.chmod(os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), 0o755) 49 | 50 | # Create mock components 51 | self.mock_gemini = MagicMock(spec=GeminiImageGenerator) 52 | self.mock_cuda_mvs = MagicMock(spec=CUDAMultiViewStereo) 53 | self.mock_approval = MagicMock(spec=ImageApprovalTool) 54 | 55 | # Configure mock responses 56 | self.configure_mocks() 57 | 58 | # Create the pipeline with mock components 59 | self.pipeline = MultiViewToModelPipeline( 60 | gemini_generator=self.mock_gemini, 61 | cuda_mvs=self.mock_cuda_mvs, 62 | approval_tool=self.mock_approval, 63 | output_dir=self.test_output_dir 64 | ) 65 | 66 | def configure_mocks(self): 67 | """ 68 | Configure mock responses for components. 69 | """ 70 | # Mock Gemini image generation 71 | def mock_generate_image(prompt, **kwargs): 72 | image_path = os.path.join(self.test_images_dir, f"{prompt[:10].replace(' ', '_')}.png") 73 | with open(image_path, "w") as f: 74 | f.write(f"Mock image for {prompt}") 75 | return { 76 | "prompt": prompt, 77 | "local_path": image_path, 78 | "image_data": b"mock_image_data" 79 | } 80 | 81 | def mock_generate_multiple_views(prompt, num_views, **kwargs): 82 | results = [] 83 | for i in range(num_views): 84 | image_path = os.path.join(self.test_images_dir, f"view_{i}.png") 85 | with open(image_path, "w") as f: 86 | f.write(f"Mock image for {prompt} - view {i}") 87 | results.append({ 88 | "prompt": f"{prompt} - view {i}", 89 | "local_path": image_path, 90 | "image_data": b"mock_image_data", 91 | "view_direction": f"view {i}", 92 | "view_index": i + 1 93 | }) 94 | return results 95 | 96 | self.mock_gemini.generate_image.side_effect = mock_generate_image 97 | self.mock_gemini.generate_multiple_views.side_effect = mock_generate_multiple_views 98 | 99 | # Mock CUDA MVS 100 | def mock_generate_model(image_paths, **kwargs): 101 | model_dir = os.path.join(self.test_models_dir, "mock_model") 102 | os.makedirs(model_dir, exist_ok=True) 103 | 104 | point_cloud_file = os.path.join(model_dir, "mock_model.ply") 105 | with open(point_cloud_file, "w") as f: 106 | f.write("Mock point cloud") 107 | 108 | obj_file = os.path.join(model_dir, "mock_model.obj") 109 | with open(obj_file, "w") as f: 110 | f.write("Mock OBJ file") 111 | 112 | return { 113 | "model_id": "mock_model", 114 | "output_dir": model_dir, 115 | "point_cloud_file": point_cloud_file, 116 | "obj_file": obj_file, 117 | "input_images": image_paths 118 | } 119 | 120 | self.mock_cuda_mvs.generate_model_from_images.side_effect = mock_generate_model 121 | self.mock_cuda_mvs.convert_ply_to_obj.return_value = os.path.join(self.test_models_dir, "mock_model", "mock_model.obj") 122 | 123 | # Mock approval tool 124 | def mock_present_image(image_path, metadata): 125 | return { 126 | "approval_id": os.path.basename(image_path).split('.')[0], 127 | "image_path": image_path, 128 | "image_url": f"/images/{os.path.basename(image_path)}", 129 | "metadata": metadata or {} 130 | } 131 | 132 | def mock_process_approval(approval_id, approved, image_path): 133 | if approved: 134 | approved_path = os.path.join(self.test_output_dir, "approved", os.path.basename(image_path)) 135 | os.makedirs(os.path.dirname(approved_path), exist_ok=True) 136 | with open(approved_path, "w") as f: 137 | f.write(f"Approved image {approval_id}") 138 | 139 | return { 140 | "approval_id": approval_id, 141 | "approved": True, 142 | "original_path": image_path, 143 | "approved_path": approved_path 144 | } 145 | else: 146 | return { 147 | "approval_id": approval_id, 148 | "approved": False, 149 | "original_path": image_path 150 | } 151 | 152 | self.mock_approval.present_image_for_approval.side_effect = mock_present_image 153 | self.mock_approval.process_approval.side_effect = mock_process_approval 154 | self.mock_approval.get_approved_images.return_value = [ 155 | os.path.join(self.test_output_dir, "approved", f"view_{i}.png") for i in range(3) 156 | ] 157 | 158 | def test_generate_model_from_text(self): 159 | """ 160 | Test generating a 3D model from text prompt. 161 | """ 162 | # Test parameters 163 | prompt = "A low-poly rabbit" 164 | num_views = 3 165 | 166 | # Mock approvals - approve all images 167 | def mock_get_approval(approval_request): 168 | return True 169 | 170 | # Call the method 171 | result = self.pipeline.generate_model_from_text( 172 | prompt, num_views=num_views, get_approval_callback=mock_get_approval 173 | ) 174 | 175 | # Verify the result 176 | self.assertIsNotNone(result) 177 | self.assertTrue("model_id" in result) 178 | self.assertTrue("obj_file" in result) 179 | self.assertTrue("point_cloud_file" in result) 180 | 181 | # Verify component calls 182 | self.mock_gemini.generate_multiple_views.assert_called_once_with( 183 | prompt, num_views=num_views, output_dir=os.path.join(self.test_output_dir, "multi_view") 184 | ) 185 | 186 | self.assertEqual(self.mock_approval.present_image_for_approval.call_count, num_views) 187 | self.assertEqual(self.mock_approval.process_approval.call_count, num_views) 188 | 189 | self.mock_cuda_mvs.generate_model_from_images.assert_called_once() 190 | self.mock_cuda_mvs.convert_ply_to_obj.assert_called_once() 191 | 192 | def test_generate_model_from_image(self): 193 | """ 194 | Test generating a 3D model from a base image. 195 | """ 196 | # Create a mock base image 197 | base_image_path = os.path.join(self.test_images_dir, "base_image.png") 198 | with open(base_image_path, "w") as f: 199 | f.write("Mock base image") 200 | 201 | # Test parameters 202 | prompt = "A low-poly rabbit based on this image" 203 | num_views = 3 204 | 205 | # Mock approvals - approve all images 206 | def mock_get_approval(approval_request): 207 | return True 208 | 209 | # Call the method 210 | result = self.pipeline.generate_model_from_image( 211 | base_image_path, prompt, num_views=num_views, get_approval_callback=mock_get_approval 212 | ) 213 | 214 | # Verify the result 215 | self.assertIsNotNone(result) 216 | self.assertTrue("model_id" in result) 217 | self.assertTrue("obj_file" in result) 218 | self.assertTrue("point_cloud_file" in result) 219 | 220 | # Verify component calls 221 | self.mock_gemini.generate_multiple_views.assert_called_once_with( 222 | prompt, num_views=num_views, base_image_path=base_image_path, 223 | output_dir=os.path.join(self.test_output_dir, "multi_view") 224 | ) 225 | 226 | self.assertEqual(self.mock_approval.present_image_for_approval.call_count, num_views) 227 | self.assertEqual(self.mock_approval.process_approval.call_count, num_views) 228 | 229 | self.mock_cuda_mvs.generate_model_from_images.assert_called_once() 230 | self.mock_cuda_mvs.convert_ply_to_obj.assert_called_once() 231 | 232 | def test_selective_approval(self): 233 | """ 234 | Test selective approval of generated images. 235 | """ 236 | # Test parameters 237 | prompt = "A low-poly rabbit" 238 | num_views = 4 239 | 240 | # Mock approvals - only approve views 0 and 2 241 | def mock_get_approval(approval_request): 242 | view_index = int(approval_request["approval_id"].split('_')[1]) 243 | return view_index % 2 == 0 # Approve even-indexed views 244 | 245 | # Call the method 246 | result = self.pipeline.generate_model_from_text( 247 | prompt, num_views=num_views, get_approval_callback=mock_get_approval 248 | ) 249 | 250 | # Verify the result 251 | self.assertIsNotNone(result) 252 | 253 | # Verify component calls 254 | self.assertEqual(self.mock_approval.present_image_for_approval.call_count, num_views) 255 | self.assertEqual(self.mock_approval.process_approval.call_count, num_views) 256 | 257 | # Only 2 images should be approved and used for model generation 258 | approved_images = [call[0][0] for call in self.mock_cuda_mvs.generate_model_from_images.call_args_list] 259 | if approved_images: 260 | self.assertEqual(len(approved_images[0]), 2) # Only 2 images approved 261 | 262 | def test_error_handling(self): 263 | """ 264 | Test error handling in the pipeline. 265 | """ 266 | # Test parameters 267 | prompt = "A low-poly rabbit" 268 | 269 | # Mock error in Gemini API 270 | self.mock_gemini.generate_multiple_views.side_effect = Exception("Mock API error") 271 | 272 | # Call the method and expect an exception 273 | with self.assertRaises(Exception): 274 | self.pipeline.generate_model_from_text(prompt) 275 | 276 | def tearDown(self): 277 | """ 278 | Clean up after tests. 279 | """ 280 | # Clean up test output directory 281 | import shutil 282 | if os.path.exists(self.test_output_dir): 283 | shutil.rmtree(self.test_output_dir) 284 | 285 | # Clean up mock CUDA MVS path 286 | if os.path.exists(self.cuda_mvs_path): 287 | shutil.rmtree(self.cuda_mvs_path) 288 | 289 | if __name__ == "__main__": 290 | unittest.main() 291 | ``` -------------------------------------------------------------------------------- /src/main_remote.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Main module for OpenSCAD MCP Server with remote CUDA MVS processing. 3 | 4 | This module adds remote CUDA MVS processing capabilities to the MCP server. 5 | """ 6 | 7 | import os 8 | import logging 9 | from typing import Dict, Any, List, Optional 10 | 11 | # Import remote processing components 12 | from src.remote.cuda_mvs_client import CUDAMVSClient 13 | from src.remote.connection_manager import CUDAMVSConnectionManager 14 | from src.config import REMOTE_CUDA_MVS 15 | 16 | # Configure logging 17 | logging.basicConfig(level=logging.INFO) 18 | logger = logging.getLogger(__name__) 19 | 20 | # Initialize remote processing components if enabled 21 | remote_connection_manager = None 22 | remote_jobs = {} 23 | 24 | def initialize_remote_processing(): 25 | """ 26 | Initialize remote CUDA MVS processing components. 27 | 28 | Returns: 29 | CUDAMVSConnectionManager instance if enabled, None otherwise 30 | """ 31 | global remote_connection_manager 32 | 33 | if REMOTE_CUDA_MVS["ENABLED"]: 34 | logger.info("Initializing remote CUDA MVS connection manager") 35 | remote_connection_manager = CUDAMVSConnectionManager( 36 | api_key=REMOTE_CUDA_MVS["API_KEY"], 37 | discovery_port=REMOTE_CUDA_MVS["DISCOVERY_PORT"], 38 | use_lan_discovery=REMOTE_CUDA_MVS["USE_LAN_DISCOVERY"], 39 | server_url=REMOTE_CUDA_MVS["SERVER_URL"] if REMOTE_CUDA_MVS["SERVER_URL"] else None 40 | ) 41 | return remote_connection_manager 42 | 43 | return None 44 | 45 | def discover_remote_servers(): 46 | """ 47 | Discover remote CUDA MVS servers on the network. 48 | 49 | Returns: 50 | List of discovered servers 51 | """ 52 | if not remote_connection_manager: 53 | logger.warning("Remote CUDA MVS processing is not enabled") 54 | return [] 55 | 56 | return remote_connection_manager.discover_servers() 57 | 58 | def get_server_status(server_id: str): 59 | """ 60 | Get the status of a remote CUDA MVS server. 61 | 62 | Args: 63 | server_id: ID of the server to get status for 64 | 65 | Returns: 66 | Server status information 67 | """ 68 | if not remote_connection_manager: 69 | logger.warning("Remote CUDA MVS processing is not enabled") 70 | return None 71 | 72 | return remote_connection_manager.get_server_status(server_id) 73 | 74 | def upload_images_to_server(server_id: str, image_paths: List[str], job_id: Optional[str] = None): 75 | """ 76 | Upload images to a remote CUDA MVS server. 77 | 78 | Args: 79 | server_id: ID of the server to upload to 80 | image_paths: List of image paths to upload 81 | job_id: Optional job ID to use 82 | 83 | Returns: 84 | Job information 85 | """ 86 | if not remote_connection_manager: 87 | logger.warning("Remote CUDA MVS processing is not enabled") 88 | return None 89 | 90 | return remote_connection_manager.upload_images(server_id, image_paths, job_id) 91 | 92 | def process_images_remotely(server_id: str, job_id: str, params: Dict[str, Any] = None): 93 | """ 94 | Process uploaded images on a remote CUDA MVS server. 95 | 96 | Args: 97 | server_id: ID of the server to process on 98 | job_id: Job ID of the uploaded images 99 | params: Optional processing parameters 100 | 101 | Returns: 102 | Job status information 103 | """ 104 | if not remote_connection_manager: 105 | logger.warning("Remote CUDA MVS processing is not enabled") 106 | return None 107 | 108 | # Set default parameters if not provided 109 | if params is None: 110 | params = { 111 | "quality": REMOTE_CUDA_MVS["DEFAULT_RECONSTRUCTION_QUALITY"], 112 | "output_format": REMOTE_CUDA_MVS["DEFAULT_OUTPUT_FORMAT"] 113 | } 114 | 115 | # Start processing 116 | result = remote_connection_manager.process_job(server_id, job_id, params) 117 | 118 | # Store job information 119 | if result and "job_id" in result: 120 | remote_jobs[result["job_id"]] = { 121 | "server_id": server_id, 122 | "job_id": result["job_id"], 123 | "status": result.get("status", "processing"), 124 | "params": params 125 | } 126 | 127 | return result 128 | 129 | def get_job_status(job_id: str): 130 | """ 131 | Get the status of a remote processing job. 132 | 133 | Args: 134 | job_id: ID of the job to get status for 135 | 136 | Returns: 137 | Job status information 138 | """ 139 | if not remote_connection_manager: 140 | logger.warning("Remote CUDA MVS processing is not enabled") 141 | return None 142 | 143 | # Check if job exists 144 | if job_id not in remote_jobs: 145 | logger.warning(f"Job with ID {job_id} not found") 146 | return None 147 | 148 | # Get job information 149 | job_info = remote_jobs[job_id] 150 | 151 | # Get status from server 152 | status = remote_connection_manager.get_job_status(job_info["server_id"], job_id) 153 | 154 | # Update job information 155 | if status: 156 | job_info["status"] = status.get("status", job_info["status"]) 157 | job_info["progress"] = status.get("progress", 0) 158 | job_info["message"] = status.get("message", "") 159 | 160 | return job_info 161 | 162 | def download_model(job_id: str, output_dir: Optional[str] = None): 163 | """ 164 | Download a processed model from a remote CUDA MVS server. 165 | 166 | Args: 167 | job_id: ID of the job to download model for 168 | output_dir: Optional directory to save the model to 169 | 170 | Returns: 171 | Model information 172 | """ 173 | if not remote_connection_manager: 174 | logger.warning("Remote CUDA MVS processing is not enabled") 175 | return None 176 | 177 | # Check if job exists 178 | if job_id not in remote_jobs: 179 | logger.warning(f"Job with ID {job_id} not found") 180 | return None 181 | 182 | # Get job information 183 | job_info = remote_jobs[job_id] 184 | 185 | # Set default output directory if not provided 186 | if output_dir is None: 187 | output_dir = os.path.join(REMOTE_CUDA_MVS["OUTPUT_DIR"], job_id) 188 | 189 | # Create output directory if it doesn't exist 190 | os.makedirs(output_dir, exist_ok=True) 191 | 192 | # Download model 193 | result = remote_connection_manager.download_model(job_info["server_id"], job_id, output_dir) 194 | 195 | # Update job information 196 | if result: 197 | job_info["model_path"] = result.get("model_path") 198 | job_info["point_cloud_path"] = result.get("point_cloud_path") 199 | job_info["completed"] = True 200 | 201 | return result 202 | 203 | def cancel_job(job_id: str): 204 | """ 205 | Cancel a remote processing job. 206 | 207 | Args: 208 | job_id: ID of the job to cancel 209 | 210 | Returns: 211 | Cancellation result 212 | """ 213 | if not remote_connection_manager: 214 | logger.warning("Remote CUDA MVS processing is not enabled") 215 | return None 216 | 217 | # Check if job exists 218 | if job_id not in remote_jobs: 219 | logger.warning(f"Job with ID {job_id} not found") 220 | return None 221 | 222 | # Get job information 223 | job_info = remote_jobs[job_id] 224 | 225 | # Cancel job 226 | result = remote_connection_manager.cancel_job(job_info["server_id"], job_id) 227 | 228 | # Update job information 229 | if result and result.get("cancelled", False): 230 | job_info["status"] = "cancelled" 231 | job_info["message"] = "Job cancelled by user" 232 | 233 | return result 234 | 235 | # MCP tool functions for remote processing 236 | 237 | def discover_remote_cuda_mvs_servers(): 238 | """ 239 | MCP tool function to discover remote CUDA MVS servers. 240 | 241 | Returns: 242 | Dictionary with discovered servers 243 | """ 244 | servers = discover_remote_servers() 245 | 246 | return { 247 | "servers": servers, 248 | "count": len(servers) 249 | } 250 | 251 | def get_remote_server_status(server_id: str): 252 | """ 253 | MCP tool function to get the status of a remote CUDA MVS server. 254 | 255 | Args: 256 | server_id: ID of the server to get status for 257 | 258 | Returns: 259 | Dictionary with server status 260 | """ 261 | status = get_server_status(server_id) 262 | 263 | if not status: 264 | raise ValueError(f"Failed to get status for server with ID {server_id}") 265 | 266 | return status 267 | 268 | def process_images_with_remote_cuda_mvs( 269 | server_id: str, 270 | image_paths: List[str], 271 | quality: str = REMOTE_CUDA_MVS["DEFAULT_RECONSTRUCTION_QUALITY"], 272 | output_format: str = REMOTE_CUDA_MVS["DEFAULT_OUTPUT_FORMAT"], 273 | wait_for_completion: bool = REMOTE_CUDA_MVS["WAIT_FOR_COMPLETION"] 274 | ): 275 | """ 276 | MCP tool function to process images with remote CUDA MVS. 277 | 278 | Args: 279 | server_id: ID of the server to process on 280 | image_paths: List of image paths to process 281 | quality: Reconstruction quality (low, normal, high) 282 | output_format: Output format (obj, ply) 283 | wait_for_completion: Whether to wait for job completion 284 | 285 | Returns: 286 | Dictionary with job information 287 | """ 288 | # Upload images 289 | upload_result = upload_images_to_server(server_id, image_paths) 290 | 291 | if not upload_result or "job_id" not in upload_result: 292 | raise ValueError("Failed to upload images to server") 293 | 294 | job_id = upload_result["job_id"] 295 | 296 | # Process images 297 | process_result = process_images_remotely( 298 | server_id, 299 | job_id, 300 | { 301 | "quality": quality, 302 | "output_format": output_format 303 | } 304 | ) 305 | 306 | if not process_result: 307 | raise ValueError(f"Failed to process images for job {job_id}") 308 | 309 | # Wait for completion if requested 310 | if wait_for_completion: 311 | import time 312 | 313 | while True: 314 | status = get_job_status(job_id) 315 | 316 | if not status: 317 | raise ValueError(f"Failed to get status for job {job_id}") 318 | 319 | if status["status"] in ["completed", "failed", "cancelled"]: 320 | break 321 | 322 | time.sleep(REMOTE_CUDA_MVS["POLL_INTERVAL"]) 323 | 324 | if status["status"] == "completed": 325 | # Download model 326 | download_result = download_model(job_id) 327 | 328 | if not download_result: 329 | raise ValueError(f"Failed to download model for job {job_id}") 330 | 331 | return { 332 | "job_id": job_id, 333 | "status": "completed", 334 | "model_path": download_result.get("model_path"), 335 | "point_cloud_path": download_result.get("point_cloud_path") 336 | } 337 | else: 338 | return { 339 | "job_id": job_id, 340 | "status": status["status"], 341 | "message": status.get("message", "") 342 | } 343 | 344 | # Return job information without waiting 345 | return { 346 | "job_id": job_id, 347 | "status": "processing", 348 | "server_id": server_id 349 | } 350 | 351 | def get_remote_job_status(job_id: str): 352 | """ 353 | MCP tool function to get the status of a remote processing job. 354 | 355 | Args: 356 | job_id: ID of the job to get status for 357 | 358 | Returns: 359 | Dictionary with job status 360 | """ 361 | status = get_job_status(job_id) 362 | 363 | if not status: 364 | raise ValueError(f"Failed to get status for job with ID {job_id}") 365 | 366 | return status 367 | 368 | def download_remote_model(job_id: str, output_dir: Optional[str] = None): 369 | """ 370 | MCP tool function to download a processed model from a remote CUDA MVS server. 371 | 372 | Args: 373 | job_id: ID of the job to download model for 374 | output_dir: Optional directory to save the model to 375 | 376 | Returns: 377 | Dictionary with model information 378 | """ 379 | result = download_model(job_id, output_dir) 380 | 381 | if not result: 382 | raise ValueError(f"Failed to download model for job with ID {job_id}") 383 | 384 | return result 385 | 386 | def cancel_remote_job(job_id: str): 387 | """ 388 | MCP tool function to cancel a remote processing job. 389 | 390 | Args: 391 | job_id: ID of the job to cancel 392 | 393 | Returns: 394 | Dictionary with cancellation result 395 | """ 396 | result = cancel_job(job_id) 397 | 398 | if not result: 399 | raise ValueError(f"Failed to cancel job with ID {job_id}") 400 | 401 | return result 402 | ``` -------------------------------------------------------------------------------- /src/models/code_generator.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | import logging 3 | import uuid 4 | from typing import Dict, Any, List, Tuple, Optional 5 | 6 | logger = logging.getLogger(__name__) 7 | 8 | class CodeGenerator: 9 | """ 10 | Generates OpenSCAD code from natural language descriptions and parameters. 11 | Implements translation of requirements to OpenSCAD primitives and modules. 12 | """ 13 | 14 | def __init__(self, scad_templates_dir: str, output_dir: str, ai_service=None): 15 | """ 16 | Initialize the code generator. 17 | 18 | Args: 19 | scad_templates_dir: Directory containing SCAD template files 20 | output_dir: Directory to store generated SCAD files 21 | ai_service: Optional AI service for enhanced code generation 22 | """ 23 | self.scad_templates_dir = scad_templates_dir 24 | self.output_dir = output_dir 25 | self.ai_service = ai_service 26 | 27 | # Create output directory if it doesn't exist 28 | os.makedirs(output_dir, exist_ok=True) 29 | 30 | # Map of shape types to their corresponding module names 31 | self.shape_module_map = { 32 | 'cube': 'parametric_cube', 33 | 'sphere': 'parametric_sphere', 34 | 'cylinder': 'parametric_cylinder', 35 | 'box': 'hollow_box', 36 | 'rounded_box': 'rounded_box', 37 | 'container': 'rounded_container', 38 | 'tube': 'tube', 39 | 'cone': 'cone', 40 | 'wedge': 'wedge', 41 | 'rounded_cylinder': 'rounded_cylinder', 42 | 'torus': 'torus', 43 | 'hexagonal_prism': 'hexagonal_prism', 44 | 'text': 'text_3d', 45 | 'prism': 'triangular_prism', 46 | 'custom': 'custom_shape' 47 | } 48 | 49 | # Parameter mapping from natural language to OpenSCAD parameters 50 | self.parameter_map = { 51 | 'width': 'width', 52 | 'depth': 'depth', 53 | 'height': 'height', 54 | 'radius': 'radius', 55 | 'thickness': 'thickness', 56 | 'segments': 'segments', 57 | 'center': 'center', 58 | 'inner_radius': 'inner_radius', 59 | 'outer_radius': 'outer_radius', 60 | 'corner_radius': 'corner_radius', 61 | 'text': 'text', 62 | 'size': 'size', 63 | 'font': 'font', 64 | 'base_radius': 'base_radius', 65 | 'major_radius': 'major_radius', 66 | 'minor_radius': 'minor_radius', 67 | 'angle': 'angle', 68 | 'scale': 'scale', 69 | 'resolution': 'resolution' 70 | } 71 | 72 | def generate_code(self, model_type: str, parameters: Dict[str, Any], description: Optional[str] = None) -> str: 73 | """ 74 | Generate OpenSCAD code for a given model type and parameters. 75 | 76 | Args: 77 | model_type: Type of model to generate 78 | parameters: Dictionary of parameters for the model 79 | description: Optional natural language description for AI-driven generation 80 | 81 | Returns: 82 | Path to the generated SCAD file 83 | """ 84 | # Generate a unique ID for the model 85 | model_id = str(uuid.uuid4()) 86 | scad_file = os.path.join(self.output_dir, f"{model_id}.scad") 87 | 88 | # Check if we should use AI-driven generation for complex models 89 | if model_type == 'custom' and description and self.ai_service: 90 | scad_code = self._generate_ai_driven_code(description, parameters) 91 | else: 92 | # Get the module name for the model type 93 | module_name = self.shape_module_map.get(model_type) 94 | if not module_name: 95 | raise ValueError(f"Unsupported model type: {model_type}") 96 | 97 | # Map parameters to OpenSCAD parameter names 98 | scad_params = self._map_parameters(parameters) 99 | 100 | # Generate the OpenSCAD code 101 | scad_code = self._generate_scad_code(module_name, scad_params) 102 | 103 | # Write the code to a file 104 | with open(scad_file, 'w') as f: 105 | f.write(scad_code) 106 | 107 | logger.info(f"Generated OpenSCAD code: {scad_file}") 108 | return scad_file 109 | 110 | def update_code(self, scad_file: str, parameters: Dict[str, Any]) -> str: 111 | """ 112 | Update an existing SCAD file with new parameters. 113 | 114 | Args: 115 | scad_file: Path to the SCAD file to update 116 | parameters: New parameters to apply 117 | 118 | Returns: 119 | Path to the updated SCAD file 120 | """ 121 | if not os.path.exists(scad_file): 122 | raise FileNotFoundError(f"SCAD file not found: {scad_file}") 123 | 124 | # Read the existing SCAD file 125 | with open(scad_file, 'r') as f: 126 | scad_code = f.read() 127 | 128 | # Determine the module name from the code 129 | module_name = None 130 | for shape_type, module in self.shape_module_map.items(): 131 | if module in scad_code: 132 | module_name = module 133 | break 134 | 135 | if not module_name: 136 | raise ValueError("Could not determine module name from existing SCAD file") 137 | 138 | # Map parameters to OpenSCAD parameter names 139 | scad_params = self._map_parameters(parameters) 140 | 141 | # Generate the updated OpenSCAD code 142 | updated_code = self._generate_scad_code(module_name, scad_params) 143 | 144 | # Write the updated code to the file 145 | with open(scad_file, 'w') as f: 146 | f.write(updated_code) 147 | 148 | logger.info(f"Updated OpenSCAD code: {scad_file}") 149 | return scad_file 150 | 151 | def combine_models(self, operations: List[Dict[str, Any]]) -> str: 152 | """ 153 | Combine multiple models using CSG operations. 154 | 155 | Args: 156 | operations: List of operations, each containing: 157 | - model_type: Type of model 158 | - parameters: Parameters for the model 159 | - operation: CSG operation (union, difference, intersection) 160 | - transform: Optional transformation to apply 161 | 162 | Returns: 163 | Path to the generated SCAD file 164 | """ 165 | # Generate a unique ID for the combined model 166 | model_id = str(uuid.uuid4()) 167 | scad_file = os.path.join(self.output_dir, f"{model_id}.scad") 168 | 169 | # Include the basic shapes library 170 | scad_code = f"""// Combined model 171 | include <{os.path.join(self.scad_templates_dir, "basic_shapes.scad")}>; 172 | 173 | """ 174 | 175 | # Process each operation 176 | current_op = None 177 | for i, op in enumerate(operations): 178 | model_type = op.get('model_type') 179 | parameters = op.get('parameters', {}) 180 | operation = op.get('operation') 181 | transform = op.get('transform') 182 | 183 | # Get the module name for the model type 184 | if model_type is None: 185 | raise ValueError("Model type cannot be None") 186 | module_name = self.shape_module_map.get(str(model_type)) 187 | if not module_name: 188 | raise ValueError(f"Unsupported model type: {model_type}") 189 | 190 | # Map parameters to OpenSCAD parameter names 191 | scad_params = self._map_parameters(parameters) 192 | 193 | # Format parameters for the module call 194 | params_str = ", ".join([f"{k}={v}" for k, v in scad_params.items()]) 195 | 196 | # Start or continue the CSG operation chain 197 | if i == 0: 198 | # First operation doesn't need an operator 199 | if operation: 200 | current_op = operation 201 | scad_code += f"{operation}() {{\n" 202 | 203 | # Add the module call with optional transformation 204 | if transform: 205 | scad_code += f" {transform} {module_name}({params_str});\n" 206 | else: 207 | scad_code += f" {module_name}({params_str});\n" 208 | else: 209 | # Check if we need to close the previous operation and start a new one 210 | if operation and operation != current_op: 211 | if current_op: 212 | scad_code += "}\n\n" 213 | current_op = operation 214 | scad_code += f"{operation}() {{\n" 215 | 216 | # Add the module call with optional transformation 217 | if transform: 218 | scad_code += f" {transform} {module_name}({params_str});\n" 219 | else: 220 | scad_code += f" {module_name}({params_str});\n" 221 | 222 | # Close the final operation if needed 223 | if current_op: 224 | scad_code += "}\n" 225 | 226 | # Write the code to a file 227 | with open(scad_file, 'w') as f: 228 | f.write(scad_code) 229 | 230 | logger.info(f"Generated combined OpenSCAD code: {scad_file}") 231 | return scad_file 232 | 233 | def _map_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]: 234 | """Map natural language parameters to OpenSCAD parameters.""" 235 | scad_params = {} 236 | 237 | for param, value in parameters.items(): 238 | # Map the parameter name if it exists in the mapping 239 | scad_param = self.parameter_map.get(param, param) 240 | 241 | # Format the value appropriately for OpenSCAD 242 | if isinstance(value, bool): 243 | scad_params[scad_param] = str(value).lower() 244 | elif isinstance(value, str): 245 | if value.lower() == 'true' or value.lower() == 'false': 246 | scad_params[scad_param] = value.lower() 247 | else: 248 | # For text parameters, add quotes 249 | scad_params[scad_param] = f'"{value}"' 250 | else: 251 | scad_params[scad_param] = value 252 | 253 | return scad_params 254 | 255 | def _generate_scad_code(self, module_name: str, parameters: Dict[str, Any]) -> str: 256 | """Generate OpenSCAD code for a module with parameters.""" 257 | # Include the basic shapes library 258 | scad_code = f"""// Generated OpenSCAD code 259 | include <{os.path.join(self.scad_templates_dir, "basic_shapes.scad")}>; 260 | 261 | // Parameters 262 | """ 263 | 264 | # Add parameter declarations 265 | for param, value in parameters.items(): 266 | scad_code += f"{param} = {value};\n" 267 | 268 | # Add the module call 269 | scad_code += f"\n// Model\n{module_name}(" 270 | 271 | # Add parameters to the module call 272 | param_list = [f"{param}={param}" for param in parameters.keys()] 273 | scad_code += ", ".join(param_list) 274 | 275 | scad_code += ");\n" 276 | 277 | return scad_code 278 | 279 | def _generate_ai_driven_code(self, description: str, parameters: Dict[str, Any]) -> str: 280 | """ 281 | Generate OpenSCAD code using AI-driven techniques based on natural language description. 282 | 283 | Args: 284 | description: Natural language description of the model 285 | parameters: Dictionary of parameters for the model 286 | 287 | Returns: 288 | Generated OpenSCAD code 289 | """ 290 | if not self.ai_service: 291 | logger.warning("AI service not available, falling back to basic shape generation") 292 | # Fall back to a basic cube if AI service is not available 293 | return self._generate_scad_code('parametric_cube', {'width': 10, 'height': 10, 'depth': 10}) 294 | 295 | try: 296 | # Use the AI service to generate OpenSCAD code 297 | logger.info(f"Generating OpenSCAD code from description: {description}") 298 | 299 | # Prepare context for the AI service 300 | context = { 301 | "description": description, 302 | "parameters": parameters, 303 | "templates_dir": self.scad_templates_dir 304 | } 305 | 306 | # Call the AI service to generate code 307 | scad_code = self.ai_service.generate_openscad_code(context) 308 | 309 | # Ensure the code includes the basic shapes library 310 | if "include <" not in scad_code: 311 | scad_code = f"""// AI-generated OpenSCAD code 312 | include <{os.path.join(self.scad_templates_dir, "basic_shapes.scad")}>; 313 | 314 | {scad_code} 315 | """ 316 | 317 | return scad_code 318 | except Exception as e: 319 | logger.error(f"Error generating AI-driven code: {e}") 320 | # Fall back to a basic shape if there's an error 321 | return self._generate_scad_code('parametric_cube', {'width': 10, 'height': 10, 'depth': 10}) 322 | ``` -------------------------------------------------------------------------------- /test_complete_workflow.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Test script for the complete workflow from text to 3D model. 3 | """ 4 | 5 | import os 6 | import sys 7 | import logging 8 | import unittest 9 | from unittest.mock import patch, MagicMock 10 | from pathlib import Path 11 | 12 | # Add the src directory to the path 13 | sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 14 | 15 | from src.ai.gemini_api import GeminiImageGenerator 16 | from src.workflow.image_approval import ImageApprovalManager 17 | from src.models.cuda_mvs import CUDAMultiViewStereo 18 | from src.workflow.multi_view_to_model_pipeline import MultiViewToModelPipeline 19 | from src.config import MULTI_VIEW_PIPELINE, IMAGE_APPROVAL, REMOTE_CUDA_MVS 20 | 21 | # Configure logging 22 | logging.basicConfig(level=logging.INFO) 23 | logger = logging.getLogger(__name__) 24 | 25 | class TestCompleteWorkflow(unittest.TestCase): 26 | """ 27 | Test cases for the complete workflow from text to 3D model. 28 | """ 29 | 30 | def setUp(self): 31 | """ 32 | Set up test environment. 33 | """ 34 | # Create test output directories 35 | self.test_output_dir = "output/test_complete_workflow" 36 | self.test_images_dir = os.path.join(self.test_output_dir, "images") 37 | self.test_multi_view_dir = os.path.join(self.test_output_dir, "multi_view") 38 | self.test_approved_dir = os.path.join(self.test_output_dir, "approved") 39 | self.test_models_dir = os.path.join(self.test_output_dir, "models") 40 | 41 | os.makedirs(self.test_output_dir, exist_ok=True) 42 | os.makedirs(self.test_images_dir, exist_ok=True) 43 | os.makedirs(self.test_multi_view_dir, exist_ok=True) 44 | os.makedirs(self.test_approved_dir, exist_ok=True) 45 | os.makedirs(self.test_models_dir, exist_ok=True) 46 | 47 | # Mock API key 48 | self.api_key = "test_api_key" 49 | 50 | # Create the components 51 | self.image_generator = GeminiImageGenerator( 52 | api_key=self.api_key, 53 | output_dir=self.test_images_dir 54 | ) 55 | 56 | self.approval_manager = ImageApprovalManager( 57 | output_dir=self.test_multi_view_dir, 58 | approved_dir=self.test_approved_dir, 59 | min_approved_images=3, 60 | auto_approve=False 61 | ) 62 | 63 | self.cuda_mvs = CUDAMultiViewStereo( 64 | output_dir=self.test_models_dir, 65 | use_gpu=False 66 | ) 67 | 68 | # Create the pipeline 69 | self.pipeline = MultiViewToModelPipeline( 70 | image_generator=self.image_generator, 71 | approval_manager=self.approval_manager, 72 | model_generator=self.cuda_mvs, 73 | output_dir=self.test_output_dir, 74 | config=MULTI_VIEW_PIPELINE 75 | ) 76 | 77 | @patch('src.ai.gemini_api.requests.post') 78 | def test_generate_images_from_text(self, mock_post): 79 | """ 80 | Test generating images from text description. 81 | """ 82 | # Mock response 83 | mock_response = MagicMock() 84 | mock_response.status_code = 200 85 | mock_response.json.return_value = { 86 | "candidates": [ 87 | { 88 | "content": { 89 | "parts": [ 90 | { 91 | "text": "Generated image description" 92 | }, 93 | { 94 | "inlineData": { 95 | "mimeType": "image/png", 96 | "data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg==" 97 | } 98 | } 99 | ] 100 | } 101 | } 102 | ] 103 | } 104 | mock_post.return_value = mock_response 105 | 106 | # Test parameters 107 | prompt = "A low-poly rabbit" 108 | num_views = 4 109 | 110 | # Call the method 111 | results = self.pipeline.generate_images_from_text(prompt, num_views) 112 | 113 | # Verify the results 114 | self.assertEqual(len(results), num_views) 115 | for i, result in enumerate(results): 116 | self.assertTrue("view_direction" in result) 117 | self.assertEqual(result["view_index"], i + 1) 118 | self.assertTrue("local_path" in result) 119 | self.assertTrue(os.path.exists(result["local_path"])) 120 | 121 | # Verify the API calls 122 | self.assertEqual(mock_post.call_count, num_views) 123 | 124 | def test_approve_images(self): 125 | """ 126 | Test approving images in the workflow. 127 | """ 128 | # Create test images 129 | test_images = [] 130 | for i in range(4): 131 | image_path = os.path.join(self.test_multi_view_dir, f"test_image_{i}.png") 132 | with open(image_path, "w") as f: 133 | f.write(f"test image data {i}") 134 | test_images.append({ 135 | "id": f"image_{i}", 136 | "local_path": image_path, 137 | "view_index": i + 1, 138 | "view_direction": f"view_{i}" 139 | }) 140 | 141 | # Add images to the pipeline 142 | self.pipeline.add_images_for_approval(test_images) 143 | 144 | # Approve some images 145 | self.pipeline.approve_image("image_0") 146 | self.pipeline.approve_image("image_1") 147 | self.pipeline.approve_image("image_2") 148 | 149 | # Reject an image 150 | self.pipeline.reject_image("image_3") 151 | 152 | # Get the status 153 | status = self.pipeline.get_approval_status() 154 | 155 | # Verify the status 156 | self.assertEqual(status["total_images"], 4) 157 | self.assertEqual(status["pending_count"], 0) 158 | self.assertEqual(status["approved_count"], 3) 159 | self.assertEqual(status["rejected_count"], 1) 160 | self.assertTrue(status["has_minimum_approved"]) 161 | 162 | @patch('src.models.cuda_mvs.subprocess.run') 163 | def test_create_model_from_approved_images(self, mock_run): 164 | """ 165 | Test creating a 3D model from approved images. 166 | """ 167 | # Mock subprocess.run 168 | mock_process = MagicMock() 169 | mock_process.returncode = 0 170 | mock_run.return_value = mock_process 171 | 172 | # Create test images 173 | test_images = [] 174 | for i in range(4): 175 | image_path = os.path.join(self.test_approved_dir, f"image_{i}.png") 176 | with open(image_path, "w") as f: 177 | f.write(f"test image data {i}") 178 | test_images.append({ 179 | "id": f"image_{i}", 180 | "local_path": image_path, 181 | "view_index": i + 1, 182 | "view_direction": f"view_{i}" 183 | }) 184 | 185 | # Create a mock model file 186 | model_path = os.path.join(self.test_models_dir, "test_model.obj") 187 | with open(model_path, "w") as f: 188 | f.write("test model data") 189 | 190 | # Call the method 191 | result = self.pipeline.create_model_from_approved_images("test_model") 192 | 193 | # Verify the result 194 | self.assertIsNotNone(result) 195 | self.assertTrue("model_path" in result) 196 | self.assertTrue("model_id" in result) 197 | self.assertTrue("format" in result) 198 | self.assertEqual(result["format"], "obj") 199 | 200 | @patch('src.ai.gemini_api.requests.post') 201 | @patch('src.models.cuda_mvs.subprocess.run') 202 | def test_complete_workflow(self, mock_run, mock_post): 203 | """ 204 | Test the complete workflow from text to 3D model. 205 | """ 206 | # Mock Gemini API response 207 | mock_response = MagicMock() 208 | mock_response.status_code = 200 209 | mock_response.json.return_value = { 210 | "candidates": [ 211 | { 212 | "content": { 213 | "parts": [ 214 | { 215 | "text": "Generated image description" 216 | }, 217 | { 218 | "inlineData": { 219 | "mimeType": "image/png", 220 | "data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg==" 221 | } 222 | } 223 | ] 224 | } 225 | } 226 | ] 227 | } 228 | mock_post.return_value = mock_response 229 | 230 | # Mock subprocess.run 231 | mock_process = MagicMock() 232 | mock_process.returncode = 0 233 | mock_run.return_value = mock_process 234 | 235 | # Create a mock model file 236 | model_path = os.path.join(self.test_models_dir, "test_model.obj") 237 | with open(model_path, "w") as f: 238 | f.write("test model data") 239 | 240 | # Create a pipeline with auto-approve 241 | auto_pipeline = MultiViewToModelPipeline( 242 | image_generator=self.image_generator, 243 | approval_manager=ImageApprovalManager( 244 | output_dir=self.test_multi_view_dir, 245 | approved_dir=self.test_approved_dir, 246 | min_approved_images=3, 247 | auto_approve=True 248 | ), 249 | model_generator=self.cuda_mvs, 250 | output_dir=self.test_output_dir, 251 | config=MULTI_VIEW_PIPELINE 252 | ) 253 | 254 | # Test parameters 255 | prompt = "A low-poly rabbit" 256 | num_views = 4 257 | 258 | # Call the complete workflow 259 | result = auto_pipeline.complete_workflow(prompt, num_views, "test_model") 260 | 261 | # Verify the result 262 | self.assertIsNotNone(result) 263 | self.assertTrue("model_path" in result) 264 | self.assertTrue("model_id" in result) 265 | self.assertTrue("format" in result) 266 | self.assertEqual(result["format"], "obj") 267 | self.assertTrue("prompt" in result) 268 | self.assertEqual(result["prompt"], prompt) 269 | self.assertTrue("num_views" in result) 270 | self.assertEqual(result["num_views"], num_views) 271 | self.assertTrue("approved_images" in result) 272 | self.assertEqual(len(result["approved_images"]), num_views) 273 | 274 | @patch('src.remote.cuda_mvs_client.requests.post') 275 | @patch('src.remote.cuda_mvs_client.requests.get') 276 | def test_remote_workflow(self, mock_get, mock_post): 277 | """ 278 | Test the workflow with remote CUDA MVS processing. 279 | """ 280 | # Mock upload response 281 | mock_upload_response = MagicMock() 282 | mock_upload_response.status_code = 200 283 | mock_upload_response.json.return_value = { 284 | "job_id": "test_job_123", 285 | "status": "uploaded", 286 | "message": "Images uploaded successfully" 287 | } 288 | 289 | # Mock process response 290 | mock_process_response = MagicMock() 291 | mock_process_response.status_code = 200 292 | mock_process_response.json.return_value = { 293 | "job_id": "test_job_123", 294 | "status": "processing", 295 | "message": "Job started processing" 296 | } 297 | 298 | # Mock status response 299 | mock_status_response = MagicMock() 300 | mock_status_response.status_code = 200 301 | mock_status_response.json.return_value = { 302 | "job_id": "test_job_123", 303 | "status": "completed", 304 | "progress": 100, 305 | "message": "Job completed successfully" 306 | } 307 | 308 | # Mock download response 309 | mock_download_response = MagicMock() 310 | mock_download_response.status_code = 200 311 | mock_download_response.content = b"test model data" 312 | 313 | # Set up the mock responses 314 | mock_post.side_effect = [mock_upload_response, mock_process_response] 315 | mock_get.side_effect = [mock_status_response, mock_download_response] 316 | 317 | # Create test images 318 | test_images = [] 319 | for i in range(4): 320 | image_path = os.path.join(self.test_approved_dir, f"image_{i}.png") 321 | with open(image_path, "w") as f: 322 | f.write(f"test image data {i}") 323 | test_images.append({ 324 | "id": f"image_{i}", 325 | "local_path": image_path, 326 | "view_index": i + 1, 327 | "view_direction": f"view_{i}" 328 | }) 329 | 330 | # Create a remote CUDA MVS client 331 | from src.remote.cuda_mvs_client import CUDAMVSClient 332 | remote_client = CUDAMVSClient( 333 | api_key=self.api_key, 334 | output_dir=self.test_models_dir 335 | ) 336 | 337 | # Create a pipeline with the remote client 338 | remote_pipeline = MultiViewToModelPipeline( 339 | image_generator=self.image_generator, 340 | approval_manager=self.approval_manager, 341 | model_generator=remote_client, 342 | output_dir=self.test_output_dir, 343 | config=MULTI_VIEW_PIPELINE 344 | ) 345 | 346 | # Add the approved images 347 | remote_pipeline.add_images_for_approval(test_images) 348 | for image in test_images: 349 | remote_pipeline.approve_image(image["id"]) 350 | 351 | # Call the method to create a model using the remote client 352 | with patch('src.workflow.multi_view_to_model_pipeline.CUDAMVSClient', return_value=remote_client): 353 | result = remote_pipeline.create_model_from_approved_images("test_model", server_url="http://test-server:8765") 354 | 355 | # Verify the result 356 | self.assertIsNotNone(result) 357 | self.assertTrue("model_path" in result) 358 | self.assertTrue("model_id" in result) 359 | self.assertTrue("format" in result) 360 | self.assertEqual(result["format"], "obj") 361 | self.assertTrue("job_id" in result) 362 | self.assertEqual(result["job_id"], "test_job_123") 363 | 364 | def tearDown(self): 365 | """ 366 | Clean up after tests. 367 | """ 368 | # Clean up test output directory 369 | import shutil 370 | if os.path.exists(self.test_output_dir): 371 | shutil.rmtree(self.test_output_dir) 372 | 373 | if __name__ == "__main__": 374 | unittest.main() 375 | ``` -------------------------------------------------------------------------------- /src/workflow/multi_view_to_model_pipeline.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Workflow orchestration for the multi-view to model pipeline. 3 | """ 4 | 5 | import os 6 | import logging 7 | import uuid 8 | from typing import Dict, Any, List, Optional 9 | from pathlib import Path 10 | 11 | logger = logging.getLogger(__name__) 12 | 13 | class MultiViewToModelPipeline: 14 | """ 15 | Orchestrates the workflow from image or text prompt to 3D model: 16 | 1. Generate image with Venice.ai or Google Gemini (optional) 17 | 2. Generate multiple views with Google Gemini 18 | 3. Process user approval of views 19 | 4. Create 3D model with CUDA Multi-View Stereo 20 | 5. Convert to OpenSCAD for parametric editing (optional) 21 | """ 22 | 23 | def __init__(self, 24 | gemini_generator=None, 25 | venice_generator=None, 26 | cuda_mvs=None, 27 | openscad_wrapper=None, 28 | approval_tool=None, 29 | output_dir: str = "output/pipeline"): 30 | """ 31 | Initialize the pipeline. 32 | 33 | Args: 34 | gemini_generator: Instance of GeminiImageGenerator 35 | venice_generator: Instance of VeniceImageGenerator (optional) 36 | cuda_mvs: Instance of CUDAMultiViewStereo 37 | openscad_wrapper: Instance of OpenSCADWrapper (optional) 38 | approval_tool: Instance of ImageApprovalTool 39 | output_dir: Directory to store output files 40 | """ 41 | self.gemini_generator = gemini_generator 42 | self.venice_generator = venice_generator 43 | self.cuda_mvs = cuda_mvs 44 | self.openscad_wrapper = openscad_wrapper 45 | self.approval_tool = approval_tool 46 | self.output_dir = output_dir 47 | 48 | # Create output directories 49 | os.makedirs(os.path.join(output_dir, "images"), exist_ok=True) 50 | os.makedirs(os.path.join(output_dir, "multi_view"), exist_ok=True) 51 | os.makedirs(os.path.join(output_dir, "approved"), exist_ok=True) 52 | os.makedirs(os.path.join(output_dir, "models"), exist_ok=True) 53 | os.makedirs(os.path.join(output_dir, "scad"), exist_ok=True) 54 | 55 | def generate_model_from_text(self, prompt: str, 56 | use_venice: bool = False, 57 | num_views: int = 4, 58 | gemini_params: Optional[Dict[str, Any]] = None, 59 | venice_params: Optional[Dict[str, Any]] = None, 60 | cuda_mvs_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 61 | """ 62 | Generate a 3D model from a text prompt. 63 | 64 | Args: 65 | prompt: Text description for image generation 66 | use_venice: Whether to use Venice.ai for initial image 67 | num_views: Number of views to generate 68 | gemini_params: Optional parameters for Google Gemini 69 | venice_params: Optional parameters for Venice.ai 70 | cuda_mvs_params: Optional parameters for CUDA MVS 71 | 72 | Returns: 73 | Dictionary containing paths to generated files and metadata 74 | """ 75 | try: 76 | # Generate a unique ID for this pipeline run 77 | pipeline_id = str(uuid.uuid4()) 78 | logger.info(f"Starting pipeline {pipeline_id} for prompt: {prompt}") 79 | 80 | # Step 1: Generate initial image 81 | if use_venice and self.venice_generator: 82 | # Use Venice.ai for initial image 83 | logger.info("Using Venice.ai for initial image generation") 84 | image_path = os.path.join(self.output_dir, "images", f"{pipeline_id}_venice.png") 85 | initial_result = self.venice_generator.generate_image( 86 | prompt=prompt, 87 | output_path=image_path, 88 | **(venice_params or {}) 89 | ) 90 | else: 91 | # Use Google Gemini for initial image 92 | logger.info("Using Google Gemini for initial image generation") 93 | image_path = os.path.join(self.output_dir, "images", f"{pipeline_id}_gemini.png") 94 | initial_result = self.gemini_generator.generate_image( 95 | prompt=prompt, 96 | output_path=image_path, 97 | **(gemini_params or {}) 98 | ) 99 | 100 | # Step 2: Generate multiple views 101 | logger.info(f"Generating {num_views} views with Google Gemini") 102 | multi_view_dir = os.path.join(self.output_dir, "multi_view", pipeline_id) 103 | 104 | multi_views = self.gemini_generator.generate_multiple_views( 105 | prompt=prompt, 106 | num_views=num_views, 107 | base_image_path=image_path, 108 | output_dir=multi_view_dir 109 | ) 110 | 111 | # Step 3: Present images for approval 112 | # In a real implementation, this would be handled by the MCP client 113 | # through the MCP tools interface 114 | logger.info("Preparing images for approval") 115 | approval_requests = [] 116 | for view in multi_views: 117 | approval_request = self.approval_tool.present_image_for_approval( 118 | image_path=view["local_path"], 119 | metadata={ 120 | "prompt": view.get("prompt"), 121 | "view_direction": view.get("view_direction"), 122 | "view_index": view.get("view_index") 123 | } 124 | ) 125 | approval_requests.append(approval_request) 126 | 127 | # For the purpose of this implementation, we'll assume all views are approved 128 | # In a real implementation, this would be handled by the MCP client 129 | approved_images = [] 130 | for req in approval_requests: 131 | approval_result = self.approval_tool.process_approval( 132 | approval_id=req["approval_id"], 133 | approved=True, 134 | image_path=req["image_path"] 135 | ) 136 | if approval_result["approved"]: 137 | approved_images.append(approval_result["approved_path"]) 138 | 139 | # Step 4: Generate 3D model with CUDA MVS 140 | logger.info("Generating 3D model with CUDA MVS") 141 | model_result = self.cuda_mvs.generate_model_from_images( 142 | image_paths=approved_images, 143 | output_name=pipeline_id, 144 | **(cuda_mvs_params or {}) 145 | ) 146 | 147 | # Step 5: Convert to OpenSCAD (if wrapper is available) 148 | scad_result = None 149 | if self.openscad_wrapper and model_result.get("point_cloud_file"): 150 | logger.info("Converting to OpenSCAD") 151 | scad_result = self._convert_to_openscad(model_result["point_cloud_file"], pipeline_id) 152 | 153 | # Compile results 154 | result = { 155 | "pipeline_id": pipeline_id, 156 | "prompt": prompt, 157 | "initial_image": initial_result, 158 | "multi_views": multi_views, 159 | "approved_images": approved_images, 160 | "model_3d": model_result, 161 | } 162 | 163 | if scad_result: 164 | result["openscad"] = scad_result 165 | 166 | logger.info(f"Pipeline {pipeline_id} completed successfully") 167 | return result 168 | 169 | except Exception as e: 170 | logger.error(f"Error in pipeline: {str(e)}") 171 | raise 172 | 173 | def generate_model_from_image(self, image_path: str, 174 | prompt: Optional[str] = None, 175 | num_views: int = 4, 176 | gemini_params: Optional[Dict[str, Any]] = None, 177 | cuda_mvs_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 178 | """ 179 | Generate a 3D model from an existing image. 180 | 181 | Args: 182 | image_path: Path to input image 183 | prompt: Optional text description to guide multi-view generation 184 | num_views: Number of views to generate 185 | gemini_params: Optional parameters for Google Gemini 186 | cuda_mvs_params: Optional parameters for CUDA MVS 187 | 188 | Returns: 189 | Dictionary containing paths to generated files and metadata 190 | """ 191 | try: 192 | # Generate a unique ID for this pipeline run 193 | pipeline_id = str(uuid.uuid4()) 194 | logger.info(f"Starting pipeline {pipeline_id} from image: {image_path}") 195 | 196 | # Use provided prompt or generate one from the image 197 | if not prompt: 198 | # In a real implementation, you might use an image captioning model 199 | # to generate a description of the image 200 | prompt = f"3D object in the image {os.path.basename(image_path)}" 201 | 202 | # Step 1: Generate multiple views 203 | logger.info(f"Generating {num_views} views with Google Gemini") 204 | multi_view_dir = os.path.join(self.output_dir, "multi_view", pipeline_id) 205 | 206 | multi_views = self.gemini_generator.generate_multiple_views( 207 | prompt=prompt, 208 | num_views=num_views, 209 | base_image_path=image_path, 210 | output_dir=multi_view_dir 211 | ) 212 | 213 | # Step 2: Present images for approval 214 | logger.info("Preparing images for approval") 215 | approval_requests = [] 216 | for view in multi_views: 217 | approval_request = self.approval_tool.present_image_for_approval( 218 | image_path=view["local_path"], 219 | metadata={ 220 | "prompt": view.get("prompt"), 221 | "view_direction": view.get("view_direction"), 222 | "view_index": view.get("view_index") 223 | } 224 | ) 225 | approval_requests.append(approval_request) 226 | 227 | # For the purpose of this implementation, we'll assume all views are approved 228 | approved_images = [] 229 | for req in approval_requests: 230 | approval_result = self.approval_tool.process_approval( 231 | approval_id=req["approval_id"], 232 | approved=True, 233 | image_path=req["image_path"] 234 | ) 235 | if approval_result["approved"]: 236 | approved_images.append(approval_result["approved_path"]) 237 | 238 | # Step 3: Generate 3D model with CUDA MVS 239 | logger.info("Generating 3D model with CUDA MVS") 240 | model_result = self.cuda_mvs.generate_model_from_images( 241 | image_paths=approved_images, 242 | output_name=pipeline_id, 243 | **(cuda_mvs_params or {}) 244 | ) 245 | 246 | # Step 4: Convert to OpenSCAD (if wrapper is available) 247 | scad_result = None 248 | if self.openscad_wrapper and model_result.get("point_cloud_file"): 249 | logger.info("Converting to OpenSCAD") 250 | scad_result = self._convert_to_openscad(model_result["point_cloud_file"], pipeline_id) 251 | 252 | # Compile results 253 | result = { 254 | "pipeline_id": pipeline_id, 255 | "prompt": prompt, 256 | "input_image": image_path, 257 | "multi_views": multi_views, 258 | "approved_images": approved_images, 259 | "model_3d": model_result, 260 | } 261 | 262 | if scad_result: 263 | result["openscad"] = scad_result 264 | 265 | logger.info(f"Pipeline {pipeline_id} completed successfully") 266 | return result 267 | 268 | except Exception as e: 269 | logger.error(f"Error in pipeline: {str(e)}") 270 | raise 271 | 272 | def _convert_to_openscad(self, model_path: str, model_id: str) -> Dict[str, Any]: 273 | """ 274 | Convert 3D model to OpenSCAD format. 275 | 276 | Args: 277 | model_path: Path to input model (PLY file) 278 | model_id: Unique identifier for the model 279 | 280 | Returns: 281 | Dictionary containing paths to generated files 282 | """ 283 | logger.info(f"Converting model to OpenSCAD: {model_path}") 284 | 285 | # Convert PLY to OBJ if needed 286 | if model_path.endswith('.ply'): 287 | obj_path = self.cuda_mvs.convert_ply_to_obj(model_path) 288 | model_path = obj_path 289 | 290 | # Generate OpenSCAD code for importing the model 291 | scad_code = f"""// Generated OpenSCAD code for model {model_id} 292 | // Imported from {os.path.basename(model_path)} 293 | 294 | // Parameters 295 | scale_factor = 1.0; 296 | position_x = 0; 297 | position_y = 0; 298 | position_z = 0; 299 | rotation_x = 0; 300 | rotation_y = 0; 301 | rotation_z = 0; 302 | 303 | // Import and transform the model 304 | translate([position_x, position_y, position_z]) 305 | rotate([rotation_x, rotation_y, rotation_z]) 306 | scale(scale_factor) 307 | import("{model_path}"); 308 | """ 309 | 310 | # Save SCAD code to file 311 | scad_file = self.openscad_wrapper.generate_scad(scad_code, model_id) 312 | 313 | # Generate previews 314 | previews = self.openscad_wrapper.generate_multi_angle_previews(scad_file) 315 | 316 | return { 317 | "scad_file": scad_file, 318 | "previews": previews, 319 | "model_path": model_path 320 | } 321 | 322 | def process_approval_results(self, approval_results: List[Dict[str, Any]]) -> List[str]: 323 | """ 324 | Process approval results from the MCP client. 325 | 326 | Args: 327 | approval_results: List of approval results from the client 328 | 329 | Returns: 330 | List of paths to approved images 331 | """ 332 | approved_images = [] 333 | 334 | for result in approval_results: 335 | if result.get("approved", False) and "approved_path" in result: 336 | approved_images.append(result["approved_path"]) 337 | 338 | return approved_images 339 | ``` -------------------------------------------------------------------------------- /src/remote/error_handling.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Error handling and retry mechanisms for remote CUDA MVS processing. 3 | 4 | This module provides utilities for handling network errors, implementing 5 | retry mechanisms, and ensuring robust communication with remote CUDA MVS servers. 6 | """ 7 | 8 | import time 9 | import random 10 | import logging 11 | import functools 12 | from typing import Callable, Any, Type, Union, List, Dict, Optional, Tuple 13 | import requests 14 | 15 | # Configure logging 16 | logging.basicConfig(level=logging.INFO) 17 | logger = logging.getLogger(__name__) 18 | 19 | # Define common exception types for network operations 20 | NETWORK_EXCEPTIONS = ( 21 | requests.exceptions.ConnectionError, 22 | requests.exceptions.Timeout, 23 | requests.exceptions.HTTPError, 24 | requests.exceptions.RequestException, 25 | ConnectionRefusedError, 26 | TimeoutError, 27 | ) 28 | 29 | def retry_with_backoff( 30 | max_retries: int = 3, 31 | base_delay: float = 1.0, 32 | max_delay: float = 60.0, 33 | exception_types: Tuple[Type[Exception], ...] = NETWORK_EXCEPTIONS, 34 | jitter_factor: float = 0.1, 35 | logger_instance: Optional[logging.Logger] = None 36 | ) -> Callable: 37 | """ 38 | Decorator for retrying a function with exponential backoff. 39 | 40 | Args: 41 | max_retries: Maximum number of retries 42 | base_delay: Base delay in seconds 43 | max_delay: Maximum delay in seconds 44 | exception_types: Tuple of exception types to catch and retry 45 | jitter_factor: Factor for random jitter (0.0 to 1.0) 46 | logger_instance: Logger instance to use (uses module logger if None) 47 | 48 | Returns: 49 | Decorator function 50 | """ 51 | log = logger_instance or logger 52 | 53 | def decorator(func: Callable) -> Callable: 54 | @functools.wraps(func) 55 | def wrapper(*args, **kwargs) -> Any: 56 | retries = 0 57 | last_exception = None 58 | 59 | while True: 60 | try: 61 | return func(*args, **kwargs) 62 | except exception_types as e: 63 | retries += 1 64 | last_exception = e 65 | 66 | if retries > max_retries: 67 | log.error(f"Max retries ({max_retries}) exceeded: {str(e)}") 68 | raise 69 | 70 | # Calculate delay with exponential backoff 71 | delay = min(max_delay, base_delay * (2 ** (retries - 1))) 72 | 73 | # Add jitter to avoid thundering herd problem 74 | jitter = random.uniform(0, jitter_factor * delay) 75 | sleep_time = delay + jitter 76 | 77 | log.warning(f"Retry {retries}/{max_retries} after {sleep_time:.2f}s: {str(e)}") 78 | time.sleep(sleep_time) 79 | except Exception as e: 80 | # Don't retry other exceptions 81 | log.error(f"Non-retryable exception: {str(e)}") 82 | raise 83 | 84 | return wrapper 85 | 86 | return decorator 87 | 88 | def timeout_handler( 89 | timeout: float, 90 | default_value: Any = None, 91 | exception_types: Tuple[Type[Exception], ...] = (TimeoutError, requests.exceptions.Timeout), 92 | logger_instance: Optional[logging.Logger] = None 93 | ) -> Callable: 94 | """ 95 | Decorator for handling timeouts in function calls. 96 | 97 | Args: 98 | timeout: Timeout in seconds 99 | default_value: Value to return if timeout occurs 100 | exception_types: Tuple of exception types to catch as timeouts 101 | logger_instance: Logger instance to use (uses module logger if None) 102 | 103 | Returns: 104 | Decorator function 105 | """ 106 | log = logger_instance or logger 107 | 108 | def decorator(func: Callable) -> Callable: 109 | @functools.wraps(func) 110 | def wrapper(*args, **kwargs) -> Any: 111 | import signal 112 | 113 | def timeout_handler(signum, frame): 114 | raise TimeoutError(f"Function {func.__name__} timed out after {timeout} seconds") 115 | 116 | # Set timeout using signal 117 | original_handler = signal.signal(signal.SIGALRM, timeout_handler) 118 | signal.alarm(int(timeout)) 119 | 120 | try: 121 | return func(*args, **kwargs) 122 | except exception_types as e: 123 | log.warning(f"Timeout in {func.__name__}: {str(e)}") 124 | return default_value 125 | finally: 126 | # Reset signal handler and alarm 127 | signal.signal(signal.SIGALRM, original_handler) 128 | signal.alarm(0) 129 | 130 | return wrapper 131 | 132 | return decorator 133 | 134 | class NetworkErrorTracker: 135 | """ 136 | Tracks network errors and provides information about error patterns. 137 | 138 | This class helps identify persistent network issues and can be used 139 | to make decisions about server availability. 140 | """ 141 | 142 | def __init__( 143 | self, 144 | error_window: int = 10, 145 | error_threshold: float = 0.5, 146 | reset_after: int = 100 147 | ): 148 | """ 149 | Initialize the error tracker. 150 | 151 | Args: 152 | error_window: Number of recent requests to consider 153 | error_threshold: Error rate threshold to consider a server problematic 154 | reset_after: Number of successful requests after which to reset error count 155 | """ 156 | self.error_window = error_window 157 | self.error_threshold = error_threshold 158 | self.reset_after = reset_after 159 | 160 | self.requests = [] # List of (timestamp, success) tuples 161 | self.consecutive_successes = 0 162 | self.consecutive_failures = 0 163 | 164 | def record_request(self, success: bool) -> None: 165 | """ 166 | Record the result of a request. 167 | 168 | Args: 169 | success: Whether the request was successful 170 | """ 171 | timestamp = time.time() 172 | self.requests.append((timestamp, success)) 173 | 174 | # Trim old requests outside the window 175 | self._trim_old_requests() 176 | 177 | # Update consecutive counters 178 | if success: 179 | self.consecutive_successes += 1 180 | self.consecutive_failures = 0 181 | 182 | # Reset error count after enough consecutive successes 183 | if self.consecutive_successes >= self.reset_after: 184 | self.requests = [(timestamp, True)] 185 | self.consecutive_successes = 1 186 | else: 187 | self.consecutive_failures += 1 188 | self.consecutive_successes = 0 189 | 190 | def _trim_old_requests(self) -> None: 191 | """ 192 | Remove requests that are outside the current window. 193 | """ 194 | if len(self.requests) > self.error_window: 195 | self.requests = self.requests[-self.error_window:] 196 | 197 | def get_error_rate(self) -> float: 198 | """ 199 | Get the current error rate. 200 | 201 | Returns: 202 | Error rate as a float between 0.0 and 1.0 203 | """ 204 | if not self.requests: 205 | return 0.0 206 | 207 | failures = sum(1 for _, success in self.requests if not success) 208 | return failures / len(self.requests) 209 | 210 | def is_server_problematic(self) -> bool: 211 | """ 212 | Check if the server is experiencing persistent issues. 213 | 214 | Returns: 215 | True if the server is problematic, False otherwise 216 | """ 217 | return self.get_error_rate() >= self.error_threshold 218 | 219 | def get_status(self) -> Dict[str, Any]: 220 | """ 221 | Get the current status of the error tracker. 222 | 223 | Returns: 224 | Dictionary with status information 225 | """ 226 | return { 227 | "error_rate": self.get_error_rate(), 228 | "is_problematic": self.is_server_problematic(), 229 | "consecutive_successes": self.consecutive_successes, 230 | "consecutive_failures": self.consecutive_failures, 231 | "total_requests": len(self.requests), 232 | "recent_failures": sum(1 for _, success in self.requests if not success) 233 | } 234 | 235 | class CircuitBreaker: 236 | """ 237 | Circuit breaker pattern implementation for network requests. 238 | 239 | This class helps prevent cascading failures by stopping requests 240 | to a problematic server until it recovers. 241 | """ 242 | 243 | # Circuit states 244 | CLOSED = "closed" # Normal operation 245 | OPEN = "open" # No requests allowed 246 | HALF_OPEN = "half_open" # Testing if service is back 247 | 248 | def __init__( 249 | self, 250 | failure_threshold: int = 5, 251 | recovery_timeout: float = 30.0, 252 | reset_timeout: float = 60.0, 253 | logger_instance: Optional[logging.Logger] = None 254 | ): 255 | """ 256 | Initialize the circuit breaker. 257 | 258 | Args: 259 | failure_threshold: Number of consecutive failures before opening 260 | recovery_timeout: Time in seconds before testing recovery 261 | reset_timeout: Time in seconds before fully resetting 262 | logger_instance: Logger instance to use (uses module logger if None) 263 | """ 264 | self.failure_threshold = failure_threshold 265 | self.recovery_timeout = recovery_timeout 266 | self.reset_timeout = reset_timeout 267 | self.log = logger_instance or logger 268 | 269 | self.state = self.CLOSED 270 | self.failure_count = 0 271 | self.last_failure_time = 0 272 | self.last_success_time = 0 273 | 274 | def record_success(self) -> None: 275 | """ 276 | Record a successful request. 277 | """ 278 | self.last_success_time = time.time() 279 | 280 | if self.state == self.HALF_OPEN: 281 | self.log.info("Circuit breaker reset to closed state after successful test request") 282 | self.state = self.CLOSED 283 | self.failure_count = 0 284 | elif self.state == self.CLOSED: 285 | # Reset failure count after a successful request 286 | self.failure_count = 0 287 | 288 | def record_failure(self) -> None: 289 | """ 290 | Record a failed request. 291 | """ 292 | self.last_failure_time = time.time() 293 | 294 | if self.state == self.CLOSED: 295 | self.failure_count += 1 296 | 297 | if self.failure_count >= self.failure_threshold: 298 | self.log.warning(f"Circuit breaker opened after {self.failure_count} consecutive failures") 299 | self.state = self.OPEN 300 | elif self.state == self.HALF_OPEN: 301 | self.log.warning("Circuit breaker opened again after failed test request") 302 | self.state = self.OPEN 303 | 304 | def allow_request(self) -> bool: 305 | """ 306 | Check if a request should be allowed. 307 | 308 | Returns: 309 | True if the request should be allowed, False otherwise 310 | """ 311 | if self.state == self.CLOSED: 312 | return True 313 | 314 | if self.state == self.OPEN: 315 | # Check if recovery timeout has elapsed 316 | if time.time() - self.last_failure_time > self.recovery_timeout: 317 | self.log.info("Circuit breaker entering half-open state to test service") 318 | self.state = self.HALF_OPEN 319 | return True 320 | return False 321 | 322 | # In HALF_OPEN state, allow only one request 323 | return True 324 | 325 | def reset(self) -> None: 326 | """ 327 | Reset the circuit breaker to closed state. 328 | """ 329 | self.state = self.CLOSED 330 | self.failure_count = 0 331 | self.log.info("Circuit breaker manually reset to closed state") 332 | 333 | def get_status(self) -> Dict[str, Any]: 334 | """ 335 | Get the current status of the circuit breaker. 336 | 337 | Returns: 338 | Dictionary with status information 339 | """ 340 | now = time.time() 341 | return { 342 | "state": self.state, 343 | "failure_count": self.failure_count, 344 | "time_since_last_failure": now - self.last_failure_time if self.last_failure_time > 0 else None, 345 | "time_since_last_success": now - self.last_success_time if self.last_success_time > 0 else None, 346 | "recovery_timeout": self.recovery_timeout, 347 | "reset_timeout": self.reset_timeout 348 | } 349 | 350 | def safe_request( 351 | url: str, 352 | method: str = "GET", 353 | circuit_breaker: Optional[CircuitBreaker] = None, 354 | error_tracker: Optional[NetworkErrorTracker] = None, 355 | retry_count: int = 3, 356 | timeout: float = 30.0, 357 | **kwargs 358 | ) -> Optional[requests.Response]: 359 | """ 360 | Make a safe HTTP request with circuit breaker and retry logic. 361 | 362 | Args: 363 | url: URL to request 364 | method: HTTP method (GET, POST, etc.) 365 | circuit_breaker: Circuit breaker instance 366 | error_tracker: Error tracker instance 367 | retry_count: Number of retries 368 | timeout: Request timeout in seconds 369 | **kwargs: Additional arguments for requests 370 | 371 | Returns: 372 | Response object or None if request failed 373 | """ 374 | # Check circuit breaker 375 | if circuit_breaker and not circuit_breaker.allow_request(): 376 | logger.warning(f"Circuit breaker prevented request to {url}") 377 | return None 378 | 379 | # Set default timeout 380 | kwargs.setdefault("timeout", timeout) 381 | 382 | # Make request with retry 383 | response = None 384 | success = False 385 | 386 | try: 387 | for attempt in range(retry_count + 1): 388 | try: 389 | response = requests.request(method, url, **kwargs) 390 | response.raise_for_status() 391 | success = True 392 | break 393 | except NETWORK_EXCEPTIONS as e: 394 | if attempt < retry_count: 395 | delay = 2 ** attempt + random.uniform(0, 1) 396 | logger.warning(f"Request to {url} failed (attempt {attempt+1}/{retry_count+1}): {str(e)}. Retrying in {delay:.2f}s") 397 | time.sleep(delay) 398 | else: 399 | logger.error(f"Request to {url} failed after {retry_count+1} attempts: {str(e)}") 400 | raise 401 | except Exception as e: 402 | logger.error(f"Error making request to {url}: {str(e)}") 403 | success = False 404 | 405 | # Update circuit breaker and error tracker 406 | if circuit_breaker: 407 | if success: 408 | circuit_breaker.record_success() 409 | else: 410 | circuit_breaker.record_failure() 411 | 412 | if error_tracker: 413 | error_tracker.record_request(success) 414 | 415 | return response if success else None 416 | ``` -------------------------------------------------------------------------------- /src/openscad_wrapper/wrapper.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | import subprocess 3 | import uuid 4 | import logging 5 | from typing import Dict, Any, List, Tuple, Optional 6 | 7 | logger = logging.getLogger(__name__) 8 | 9 | class OpenSCADWrapper: 10 | """ 11 | Wrapper for OpenSCAD command-line interface. 12 | Provides methods to generate SCAD code, STL files, and preview images. 13 | """ 14 | 15 | def __init__(self, scad_dir: str, output_dir: str): 16 | """ 17 | Initialize the OpenSCAD wrapper. 18 | 19 | Args: 20 | scad_dir: Directory to store SCAD files 21 | output_dir: Directory to store output files (STL, PNG) 22 | """ 23 | self.scad_dir = scad_dir 24 | self.output_dir = output_dir 25 | self.stl_dir = os.path.join(output_dir, "stl") 26 | self.preview_dir = os.path.join(output_dir, "preview") 27 | 28 | # Create directories if they don't exist 29 | os.makedirs(self.scad_dir, exist_ok=True) 30 | os.makedirs(self.stl_dir, exist_ok=True) 31 | os.makedirs(self.preview_dir, exist_ok=True) 32 | 33 | # Basic shape templates 34 | self.shape_templates = { 35 | "cube": self._cube_template, 36 | "sphere": self._sphere_template, 37 | "cylinder": self._cylinder_template, 38 | "box": self._box_template, 39 | "rounded_box": self._rounded_box_template, 40 | } 41 | 42 | def generate_scad_code(self, model_type: str, parameters: Dict[str, Any]) -> str: 43 | """ 44 | Generate OpenSCAD code for a given model type and parameters. 45 | 46 | Args: 47 | model_type: Type of model to generate (cube, sphere, cylinder, etc.) 48 | parameters: Dictionary of parameters for the model 49 | 50 | Returns: 51 | Path to the generated SCAD file 52 | """ 53 | model_id = str(uuid.uuid4()) 54 | scad_file = os.path.join(self.scad_dir, f"{model_id}.scad") 55 | 56 | # Get the template function for the model type 57 | template_func = self.shape_templates.get(model_type) 58 | if not template_func: 59 | raise ValueError(f"Unsupported model type: {model_type}") 60 | 61 | # Generate SCAD code using the template 62 | scad_code = template_func(parameters) 63 | 64 | # Write SCAD code to file 65 | with open(scad_file, 'w') as f: 66 | f.write(scad_code) 67 | 68 | logger.info(f"Generated SCAD file: {scad_file}") 69 | return scad_file 70 | 71 | def generate_scad(self, scad_code: str, model_id: str) -> str: 72 | """ 73 | Save OpenSCAD code to a file with a specific model ID. 74 | 75 | Args: 76 | scad_code: OpenSCAD code to save 77 | model_id: ID to use for the file name 78 | 79 | Returns: 80 | Path to the saved SCAD file 81 | """ 82 | scad_file = os.path.join(self.scad_dir, f"{model_id}.scad") 83 | 84 | # Write SCAD code to file 85 | with open(scad_file, 'w') as f: 86 | f.write(scad_code) 87 | 88 | logger.info(f"Generated SCAD file: {scad_file}") 89 | return scad_file 90 | 91 | def update_scad_code(self, model_id: str, parameters: Dict[str, Any]) -> str: 92 | """ 93 | Update an existing SCAD file with new parameters. 94 | 95 | Args: 96 | model_id: ID of the model to update 97 | parameters: New parameters for the model 98 | 99 | Returns: 100 | Path to the updated SCAD file 101 | """ 102 | scad_file = os.path.join(self.scad_dir, f"{model_id}.scad") 103 | if not os.path.exists(scad_file): 104 | raise FileNotFoundError(f"SCAD file not found: {scad_file}") 105 | 106 | # Read the existing SCAD file to determine its type 107 | with open(scad_file, 'r') as f: 108 | scad_code = f.read() 109 | 110 | # Determine model type from the code (simplified approach) 111 | model_type = None 112 | for shape_type in self.shape_templates: 113 | if shape_type in scad_code.lower(): 114 | model_type = shape_type 115 | break 116 | 117 | if not model_type: 118 | raise ValueError("Could not determine model type from existing SCAD file") 119 | 120 | # Generate new SCAD code 121 | new_scad_code = self.shape_templates[model_type](parameters) 122 | 123 | # Write updated SCAD code to file 124 | with open(scad_file, 'w') as f: 125 | f.write(new_scad_code) 126 | 127 | logger.info(f"Updated SCAD file: {scad_file}") 128 | return scad_file 129 | 130 | def generate_stl(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> str: 131 | """ 132 | Generate an STL file from a SCAD file. 133 | 134 | Args: 135 | scad_file: Path to the SCAD file 136 | parameters: Optional parameters to override in the SCAD file 137 | 138 | Returns: 139 | Path to the generated STL file 140 | """ 141 | model_id = os.path.basename(scad_file).split('.')[0] 142 | stl_file = os.path.join(self.stl_dir, f"{model_id}.stl") 143 | 144 | # Build command 145 | cmd = ["openscad", "-o", stl_file] 146 | 147 | # Add parameters if provided 148 | if parameters: 149 | for key, value in parameters.items(): 150 | cmd.extend(["-D", f"{key}={value}"]) 151 | 152 | # Add input file 153 | cmd.append(scad_file) 154 | 155 | # Run OpenSCAD 156 | try: 157 | result = subprocess.run(cmd, check=True, capture_output=True, text=True) 158 | logger.info(f"Generated STL file: {stl_file}") 159 | logger.debug(result.stdout) 160 | return stl_file 161 | except subprocess.CalledProcessError as e: 162 | logger.error(f"Error generating STL file: {e.stderr}") 163 | raise RuntimeError(f"Failed to generate STL file: {e.stderr}") 164 | 165 | def generate_preview(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None, 166 | camera_position: str = "0,0,0,0,0,0,50", 167 | image_size: str = "800,600") -> str: 168 | """ 169 | Generate a preview image from a SCAD file. 170 | 171 | Args: 172 | scad_file: Path to the SCAD file 173 | parameters: Optional parameters to override in the SCAD file 174 | camera_position: Camera position in format "tx,ty,tz,rx,ry,rz,dist" 175 | image_size: Image size in format "width,height" 176 | 177 | Returns: 178 | Path to the generated preview image 179 | """ 180 | model_id = os.path.basename(scad_file).split('.')[0] 181 | preview_file = os.path.join(self.preview_dir, f"{model_id}.png") 182 | 183 | # Build command 184 | cmd = ["openscad", "--camera", camera_position, "--imgsize", image_size, "-o", preview_file] 185 | 186 | # Add parameters if provided 187 | if parameters: 188 | for key, value in parameters.items(): 189 | cmd.extend(["-D", f"{key}={value}"]) 190 | 191 | # Add input file 192 | cmd.append(scad_file) 193 | 194 | # Run OpenSCAD 195 | try: 196 | result = subprocess.run(cmd, check=True, capture_output=True, text=True) 197 | logger.info(f"Generated preview image: {preview_file}") 198 | logger.debug(result.stdout) 199 | return preview_file 200 | except subprocess.CalledProcessError as e: 201 | logger.error(f"Error generating preview image: {e.stderr}") 202 | # Since we know there might be issues with headless rendering, we'll create a placeholder 203 | logger.warning("Using placeholder image due to rendering error") 204 | return self._create_placeholder_image(preview_file) 205 | 206 | def _create_placeholder_image(self, output_path: str) -> str: 207 | """Create a simple placeholder image when rendering fails.""" 208 | try: 209 | from PIL import Image, ImageDraw, ImageFont 210 | 211 | # Create a blank image 212 | img = Image.new('RGB', (800, 600), color=(240, 240, 240)) 213 | draw = ImageDraw.Draw(img) 214 | 215 | # Add text 216 | draw.text((400, 300), "Preview not available", fill=(0, 0, 0)) 217 | 218 | # Save the image 219 | img.save(output_path) 220 | return output_path 221 | except Exception as e: 222 | logger.error(f"Error creating placeholder image: {str(e)}") 223 | # If all else fails, return the path anyway 224 | return output_path 225 | 226 | def generate_multi_angle_previews(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, str]: 227 | """ 228 | Generate preview images from multiple angles for a SCAD file. 229 | 230 | Args: 231 | scad_file: Path to the SCAD file 232 | parameters: Optional parameters to override in the SCAD file 233 | 234 | Returns: 235 | Dictionary mapping view names to preview image paths 236 | """ 237 | # Define camera positions for different views 238 | camera_positions = { 239 | "front": "0,0,0,0,0,0,50", 240 | "top": "0,0,0,90,0,0,50", 241 | "right": "0,0,0,0,90,0,50", 242 | "perspective": "40,30,30,55,0,25,100" 243 | } 244 | 245 | # Generate preview for each view 246 | previews = {} 247 | for view, camera_position in camera_positions.items(): 248 | try: 249 | model_id = os.path.basename(scad_file).split('.')[0] 250 | preview_file = os.path.join(self.preview_dir, f"{model_id}_{view}.png") 251 | 252 | # Build command 253 | cmd = ["openscad", "--camera", camera_position, "--imgsize", "800,600", "-o", preview_file] 254 | 255 | # Add parameters if provided 256 | if parameters: 257 | for key, value in parameters.items(): 258 | cmd.extend(["-D", f"{key}={value}"]) 259 | 260 | # Add input file 261 | cmd.append(scad_file) 262 | 263 | # Run OpenSCAD 264 | result = subprocess.run(cmd, check=True, capture_output=True, text=True) 265 | logger.info(f"Generated {view} preview: {preview_file}") 266 | previews[view] = preview_file 267 | except subprocess.CalledProcessError as e: 268 | logger.error(f"Error generating {view} preview: {e.stderr}") 269 | # Create a placeholder image for this view 270 | preview_file = os.path.join(self.preview_dir, f"{model_id}_{view}.png") 271 | previews[view] = self._create_placeholder_image(preview_file) 272 | 273 | return previews 274 | 275 | # Template functions for basic shapes 276 | 277 | def _cube_template(self, params: Dict[str, Any]) -> str: 278 | """Generate SCAD code for a cube.""" 279 | size_x = params.get('width', 10) 280 | size_y = params.get('depth', 10) 281 | size_z = params.get('height', 10) 282 | center = params.get('center', 'false').lower() == 'true' 283 | 284 | return f"""// Cube 285 | // Parameters: 286 | // width = {size_x} 287 | // depth = {size_y} 288 | // height = {size_z} 289 | // center = {str(center).lower()} 290 | 291 | width = {size_x}; 292 | depth = {size_y}; 293 | height = {size_z}; 294 | center = {str(center).lower()}; 295 | 296 | cube([width, depth, height], center=center); 297 | """ 298 | 299 | def _sphere_template(self, params: Dict[str, Any]) -> str: 300 | """Generate SCAD code for a sphere.""" 301 | radius = params.get('radius', 10) 302 | segments = params.get('segments', 32) 303 | 304 | return f"""// Sphere 305 | // Parameters: 306 | // radius = {radius} 307 | // segments = {segments} 308 | 309 | radius = {radius}; 310 | $fn = {segments}; 311 | 312 | sphere(r=radius); 313 | """ 314 | 315 | def _cylinder_template(self, params: Dict[str, Any]) -> str: 316 | """Generate SCAD code for a cylinder.""" 317 | radius = params.get('radius', 10) 318 | height = params.get('height', 20) 319 | center = params.get('center', 'false').lower() == 'true' 320 | segments = params.get('segments', 32) 321 | 322 | return f"""// Cylinder 323 | // Parameters: 324 | // radius = {radius} 325 | // height = {height} 326 | // center = {str(center).lower()} 327 | // segments = {segments} 328 | 329 | radius = {radius}; 330 | height = {height}; 331 | center = {str(center).lower()}; 332 | $fn = {segments}; 333 | 334 | cylinder(h=height, r=radius, center=center); 335 | """ 336 | 337 | def _box_template(self, params: Dict[str, Any]) -> str: 338 | """Generate SCAD code for a hollow box.""" 339 | width = params.get('width', 30) 340 | depth = params.get('depth', 20) 341 | height = params.get('height', 15) 342 | thickness = params.get('thickness', 2) 343 | 344 | return f"""// Hollow Box 345 | // Parameters: 346 | // width = {width} 347 | // depth = {depth} 348 | // height = {height} 349 | // thickness = {thickness} 350 | 351 | width = {width}; 352 | depth = {depth}; 353 | height = {height}; 354 | thickness = {thickness}; 355 | 356 | module box(width, depth, height, thickness) {{ 357 | difference() {{ 358 | cube([width, depth, height]); 359 | translate([thickness, thickness, thickness]) 360 | cube([width - 2*thickness, depth - 2*thickness, height - thickness]); 361 | }} 362 | }} 363 | 364 | box(width, depth, height, thickness); 365 | """ 366 | 367 | def _rounded_box_template(self, params: Dict[str, Any]) -> str: 368 | """Generate SCAD code for a rounded box.""" 369 | width = params.get('width', 30) 370 | depth = params.get('depth', 20) 371 | height = params.get('height', 15) 372 | radius = params.get('radius', 3) 373 | segments = params.get('segments', 32) 374 | 375 | return f"""// Rounded Box 376 | // Parameters: 377 | // width = {width} 378 | // depth = {depth} 379 | // height = {height} 380 | // radius = {radius} 381 | // segments = {segments} 382 | 383 | width = {width}; 384 | depth = {depth}; 385 | height = {height}; 386 | radius = {radius}; 387 | $fn = {segments}; 388 | 389 | module rounded_box(width, depth, height, radius) {{ 390 | hull() {{ 391 | translate([radius, radius, radius]) 392 | sphere(r=radius); 393 | 394 | translate([width-radius, radius, radius]) 395 | sphere(r=radius); 396 | 397 | translate([radius, depth-radius, radius]) 398 | sphere(r=radius); 399 | 400 | translate([width-radius, depth-radius, radius]) 401 | sphere(r=radius); 402 | 403 | translate([radius, radius, height-radius]) 404 | sphere(r=radius); 405 | 406 | translate([width-radius, radius, height-radius]) 407 | sphere(r=radius); 408 | 409 | translate([radius, depth-radius, height-radius]) 410 | sphere(r=radius); 411 | 412 | translate([width-radius, depth-radius, height-radius]) 413 | sphere(r=radius); 414 | }} 415 | }} 416 | 417 | rounded_box(width, depth, height, radius); 418 | """ 419 | ``` -------------------------------------------------------------------------------- /src/remote/cuda_mvs_client.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Client for remote CUDA Multi-View Stereo processing. 3 | 4 | This module provides a client to connect to a remote CUDA MVS server 5 | within the LAN for processing multi-view images into 3D models. 6 | """ 7 | 8 | import os 9 | import json 10 | import logging 11 | import requests 12 | import base64 13 | from typing import Dict, List, Optional, Any, Union 14 | from pathlib import Path 15 | import uuid 16 | 17 | # Configure logging 18 | logging.basicConfig(level=logging.INFO) 19 | logger = logging.getLogger(__name__) 20 | 21 | class CUDAMVSClient: 22 | """ 23 | Client for connecting to a remote CUDA Multi-View Stereo server. 24 | 25 | This client handles: 26 | 1. Discovering available CUDA MVS servers on the LAN 27 | 2. Uploading images to the server 28 | 3. Requesting 3D reconstruction 29 | 4. Downloading the resulting 3D models 30 | 5. Monitoring job status 31 | """ 32 | 33 | def __init__( 34 | self, 35 | server_url: Optional[str] = None, 36 | api_key: Optional[str] = None, 37 | output_dir: str = "output/models", 38 | discovery_port: int = 8765, 39 | connection_timeout: int = 10, 40 | upload_chunk_size: int = 1024 * 1024, # 1MB chunks 41 | ): 42 | """ 43 | Initialize the CUDA MVS client. 44 | 45 | Args: 46 | server_url: URL of the CUDA MVS server (if known) 47 | api_key: API key for authentication (if required) 48 | output_dir: Directory to save downloaded models 49 | discovery_port: Port used for server discovery 50 | connection_timeout: Timeout for server connections in seconds 51 | upload_chunk_size: Chunk size for file uploads in bytes 52 | """ 53 | self.server_url = server_url 54 | self.api_key = api_key 55 | self.output_dir = output_dir 56 | self.discovery_port = discovery_port 57 | self.connection_timeout = connection_timeout 58 | self.upload_chunk_size = upload_chunk_size 59 | 60 | # Create output directory if it doesn't exist 61 | os.makedirs(output_dir, exist_ok=True) 62 | 63 | # Initialize session for connection pooling 64 | self.session = requests.Session() 65 | if api_key: 66 | self.session.headers.update({"Authorization": f"Bearer {api_key}"}) 67 | 68 | def discover_servers(self) -> List[Dict[str, Any]]: 69 | """ 70 | Discover CUDA MVS servers on the local network. 71 | 72 | Returns: 73 | List of dictionaries containing server information: 74 | [ 75 | { 76 | "server_id": "unique-server-id", 77 | "name": "CUDA MVS Server 1", 78 | "url": "http://192.168.1.100:8765", 79 | "capabilities": { 80 | "max_images": 50, 81 | "max_resolution": 4096, 82 | "supported_formats": ["jpg", "png"], 83 | "gpu_info": "NVIDIA RTX 4090 24GB" 84 | }, 85 | "status": "available" 86 | }, 87 | ... 88 | ] 89 | """ 90 | import socket 91 | import json 92 | from zeroconf import ServiceBrowser, ServiceListener, Zeroconf 93 | 94 | discovered_servers = [] 95 | 96 | class CUDAMVSListener(ServiceListener): 97 | def add_service(self, zc, type_, name): 98 | info = zc.get_service_info(type_, name) 99 | if info: 100 | server_info = { 101 | "server_id": name.split('.')[0], 102 | "name": info.properties.get(b'name', b'Unknown').decode('utf-8'), 103 | "url": f"http://{socket.inet_ntoa(info.addresses[0])}:{info.port}", 104 | "capabilities": json.loads(info.properties.get(b'capabilities', b'{}').decode('utf-8')), 105 | "status": "available" 106 | } 107 | discovered_servers.append(server_info) 108 | logger.info(f"Discovered CUDA MVS server: {server_info['name']} at {server_info['url']}") 109 | 110 | try: 111 | zeroconf = Zeroconf() 112 | listener = CUDAMVSListener() 113 | browser = ServiceBrowser(zeroconf, "_cudamvs._tcp.local.", listener) 114 | 115 | # Wait for discovery (non-blocking in production code) 116 | import time 117 | time.sleep(2) # Give some time for discovery 118 | 119 | zeroconf.close() 120 | return discovered_servers 121 | except Exception as e: 122 | logger.error(f"Error discovering CUDA MVS servers: {e}") 123 | return [] 124 | 125 | def test_connection(self, server_url: Optional[str] = None) -> Dict[str, Any]: 126 | """ 127 | Test connection to a CUDA MVS server. 128 | 129 | Args: 130 | server_url: URL of the server to test (uses self.server_url if None) 131 | 132 | Returns: 133 | Dictionary with connection status and server information 134 | """ 135 | url = server_url or self.server_url 136 | if not url: 137 | return {"status": "error", "message": "No server URL provided"} 138 | 139 | try: 140 | response = self.session.get( 141 | f"{url}/api/status", 142 | timeout=self.connection_timeout 143 | ) 144 | 145 | if response.status_code == 200: 146 | return { 147 | "status": "success", 148 | "server_info": response.json(), 149 | "latency_ms": response.elapsed.total_seconds() * 1000 150 | } 151 | else: 152 | return { 153 | "status": "error", 154 | "message": f"Server returned status code {response.status_code}", 155 | "details": response.text 156 | } 157 | except requests.exceptions.RequestException as e: 158 | return {"status": "error", "message": f"Connection error: {str(e)}"} 159 | 160 | def upload_images(self, image_paths: List[str]) -> Dict[str, Any]: 161 | """ 162 | Upload images to the CUDA MVS server. 163 | 164 | Args: 165 | image_paths: List of paths to images to upload 166 | 167 | Returns: 168 | Dictionary with upload status and job information 169 | """ 170 | if not self.server_url: 171 | return {"status": "error", "message": "No server URL configured"} 172 | 173 | # Create a new job 174 | try: 175 | response = self.session.post( 176 | f"{self.server_url}/api/jobs", 177 | json={"num_images": len(image_paths)}, 178 | timeout=self.connection_timeout 179 | ) 180 | 181 | if response.status_code != 201: 182 | return { 183 | "status": "error", 184 | "message": f"Failed to create job: {response.status_code}", 185 | "details": response.text 186 | } 187 | 188 | job_info = response.json() 189 | job_id = job_info["job_id"] 190 | 191 | # Upload each image 192 | for i, image_path in enumerate(image_paths): 193 | # Check if file exists 194 | if not os.path.exists(image_path): 195 | return { 196 | "status": "error", 197 | "message": f"Image file not found: {image_path}" 198 | } 199 | 200 | # Get file size for progress tracking 201 | file_size = os.path.getsize(image_path) 202 | 203 | # Prepare upload 204 | with open(image_path, "rb") as f: 205 | files = { 206 | "file": (os.path.basename(image_path), f, "image/jpeg" if image_path.endswith(".jpg") else "image/png") 207 | } 208 | 209 | metadata = { 210 | "image_index": i, 211 | "total_images": len(image_paths), 212 | "filename": os.path.basename(image_path) 213 | } 214 | 215 | response = self.session.post( 216 | f"{self.server_url}/api/jobs/{job_id}/images", 217 | files=files, 218 | data={"metadata": json.dumps(metadata)}, 219 | timeout=None # No timeout for uploads 220 | ) 221 | 222 | if response.status_code != 200: 223 | return { 224 | "status": "error", 225 | "message": f"Failed to upload image {i+1}/{len(image_paths)}: {response.status_code}", 226 | "details": response.text 227 | } 228 | 229 | logger.info(f"Uploaded image {i+1}/{len(image_paths)}: {os.path.basename(image_path)}") 230 | 231 | # Start processing 232 | response = self.session.post( 233 | f"{self.server_url}/api/jobs/{job_id}/process", 234 | timeout=self.connection_timeout 235 | ) 236 | 237 | if response.status_code != 202: 238 | return { 239 | "status": "error", 240 | "message": f"Failed to start processing: {response.status_code}", 241 | "details": response.text 242 | } 243 | 244 | return { 245 | "status": "success", 246 | "job_id": job_id, 247 | "message": f"Uploaded {len(image_paths)} images and started processing", 248 | "job_url": f"{self.server_url}/api/jobs/{job_id}" 249 | } 250 | 251 | except requests.exceptions.RequestException as e: 252 | return {"status": "error", "message": f"Upload error: {str(e)}"} 253 | 254 | def get_job_status(self, job_id: str) -> Dict[str, Any]: 255 | """ 256 | Get the status of a CUDA MVS job. 257 | 258 | Args: 259 | job_id: ID of the job to check 260 | 261 | Returns: 262 | Dictionary with job status information 263 | """ 264 | if not self.server_url: 265 | return {"status": "error", "message": "No server URL configured"} 266 | 267 | try: 268 | response = self.session.get( 269 | f"{self.server_url}/api/jobs/{job_id}", 270 | timeout=self.connection_timeout 271 | ) 272 | 273 | if response.status_code == 200: 274 | return { 275 | "status": "success", 276 | "job_info": response.json() 277 | } 278 | else: 279 | return { 280 | "status": "error", 281 | "message": f"Failed to get job status: {response.status_code}", 282 | "details": response.text 283 | } 284 | except requests.exceptions.RequestException as e: 285 | return {"status": "error", "message": f"Connection error: {str(e)}"} 286 | 287 | def download_model(self, job_id: str, output_format: str = "obj") -> Dict[str, Any]: 288 | """ 289 | Download a processed 3D model from the CUDA MVS server. 290 | 291 | Args: 292 | job_id: ID of the job to download 293 | output_format: Format of the model to download (obj, ply, etc.) 294 | 295 | Returns: 296 | Dictionary with download status and local file path 297 | """ 298 | if not self.server_url: 299 | return {"status": "error", "message": "No server URL configured"} 300 | 301 | # Check job status first 302 | status_result = self.get_job_status(job_id) 303 | if status_result["status"] != "success": 304 | return status_result 305 | 306 | job_info = status_result["job_info"] 307 | if job_info["status"] != "completed": 308 | return { 309 | "status": "error", 310 | "message": f"Job is not completed yet. Current status: {job_info['status']}", 311 | "job_info": job_info 312 | } 313 | 314 | # Download the model 315 | try: 316 | response = self.session.get( 317 | f"{self.server_url}/api/jobs/{job_id}/model?format={output_format}", 318 | stream=True, 319 | timeout=None # No timeout for downloads 320 | ) 321 | 322 | if response.status_code != 200: 323 | return { 324 | "status": "error", 325 | "message": f"Failed to download model: {response.status_code}", 326 | "details": response.text 327 | } 328 | 329 | # Create a unique filename 330 | model_id = job_info.get("model_id", str(uuid.uuid4())) 331 | output_path = os.path.join(self.output_dir, f"{model_id}.{output_format}") 332 | 333 | # Save the file 334 | with open(output_path, "wb") as f: 335 | for chunk in response.iter_content(chunk_size=8192): 336 | if chunk: 337 | f.write(chunk) 338 | 339 | logger.info(f"Downloaded model to {output_path}") 340 | 341 | return { 342 | "status": "success", 343 | "model_id": model_id, 344 | "local_path": output_path, 345 | "format": output_format, 346 | "job_id": job_id 347 | } 348 | 349 | except requests.exceptions.RequestException as e: 350 | return {"status": "error", "message": f"Download error: {str(e)}"} 351 | 352 | def cancel_job(self, job_id: str) -> Dict[str, Any]: 353 | """ 354 | Cancel a running CUDA MVS job. 355 | 356 | Args: 357 | job_id: ID of the job to cancel 358 | 359 | Returns: 360 | Dictionary with cancellation status 361 | """ 362 | if not self.server_url: 363 | return {"status": "error", "message": "No server URL configured"} 364 | 365 | try: 366 | response = self.session.delete( 367 | f"{self.server_url}/api/jobs/{job_id}", 368 | timeout=self.connection_timeout 369 | ) 370 | 371 | if response.status_code == 200: 372 | return { 373 | "status": "success", 374 | "message": "Job cancelled successfully" 375 | } 376 | else: 377 | return { 378 | "status": "error", 379 | "message": f"Failed to cancel job: {response.status_code}", 380 | "details": response.text 381 | } 382 | except requests.exceptions.RequestException as e: 383 | return {"status": "error", "message": f"Connection error: {str(e)}"} 384 | 385 | def generate_model_from_images( 386 | self, 387 | image_paths: List[str], 388 | output_format: str = "obj", 389 | wait_for_completion: bool = True, 390 | poll_interval: int = 5 391 | ) -> Dict[str, Any]: 392 | """ 393 | Complete workflow to generate a 3D model from images. 394 | 395 | Args: 396 | image_paths: List of paths to images 397 | output_format: Format of the output model 398 | wait_for_completion: Whether to wait for job completion 399 | poll_interval: Interval in seconds to poll for job status 400 | 401 | Returns: 402 | Dictionary with job status and model information if completed 403 | """ 404 | # Upload images and start processing 405 | upload_result = self.upload_images(image_paths) 406 | if upload_result["status"] != "success": 407 | return upload_result 408 | 409 | job_id = upload_result["job_id"] 410 | 411 | # If not waiting for completion, return the job info 412 | if not wait_for_completion: 413 | return upload_result 414 | 415 | # Poll for job completion 416 | import time 417 | while True: 418 | status_result = self.get_job_status(job_id) 419 | if status_result["status"] != "success": 420 | return status_result 421 | 422 | job_info = status_result["job_info"] 423 | if job_info["status"] == "completed": 424 | # Download the model 425 | return self.download_model(job_id, output_format) 426 | elif job_info["status"] == "failed": 427 | return { 428 | "status": "error", 429 | "message": "Job processing failed", 430 | "job_info": job_info 431 | } 432 | 433 | # Wait before polling again 434 | time.sleep(poll_interval) 435 | logger.info(f"Job {job_id} status: {job_info['status']}, progress: {job_info.get('progress', 0)}%") 436 | ```