This is page 2 of 3. Use http://codebase.md/jhacksman/openscad-mcp-server?page={x} to view the full context. # Directory Structure ``` ├── implementation_plan.md ├── old │ ├── download_sam2_checkpoint.py │ ├── src │ │ ├── ai │ │ │ └── sam_segmentation.py │ │ ├── models │ │ │ └── threestudio_generator.py │ │ └── workflow │ │ └── image_to_model_pipeline.py │ └── test_sam2_segmentation.py ├── README.md ├── requirements.txt ├── rtfmd │ ├── decisions │ │ ├── ai-driven-code-generation.md │ │ └── export-formats.md │ ├── files │ │ └── src │ │ ├── ai │ │ │ └── ai_service.py.md │ │ ├── main.py.md │ │ ├── models │ │ │ └── code_generator.py.md │ │ └── nlp │ │ └── parameter_extractor.py.md │ ├── knowledge │ │ ├── ai │ │ │ └── natural-language-processing.md │ │ ├── nlp │ │ │ └── parameter-extraction.md │ │ └── openscad │ │ ├── export-formats.md │ │ ├── openscad-basics.md │ │ └── primitive-testing.md │ └── README.md ├── scad │ └── simple_cube.scad ├── src │ ├── __init__.py │ ├── __pycache__ │ │ └── __init__.cpython-312.pyc │ ├── ai │ │ ├── ai_service.py │ │ ├── gemini_api.py │ │ └── venice_api.py │ ├── config.py │ ├── main_remote.py │ ├── main.py │ ├── main.py.new │ ├── models │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ └── code_generator.cpython-312.pyc │ │ ├── code_generator.py │ │ ├── cuda_mvs.py │ │ └── scad_templates │ │ └── basic_shapes.scad │ ├── nlp │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ └── parameter_extractor.cpython-312.pyc │ │ └── parameter_extractor.py │ ├── openscad_wrapper │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ └── wrapper.cpython-312.pyc │ │ └── wrapper.py │ ├── printer_discovery │ │ ├── __init__.py │ │ └── printer_discovery.py │ ├── remote │ │ ├── connection_manager.py │ │ ├── cuda_mvs_client.py │ │ ├── cuda_mvs_server.py │ │ └── error_handling.py │ ├── testing │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ ├── primitive_tester.cpython-312.pyc │ │ │ └── test_primitives.cpython-312.pyc │ │ ├── primitive_tester.py │ │ └── test_primitives.py │ ├── utils │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ ├── stl_exporter.cpython-312.pyc │ │ │ └── stl_validator.cpython-312.pyc │ │ ├── cad_exporter.py │ │ ├── format_validator.py │ │ ├── stl_exporter.py │ │ ├── stl_repair.py │ │ └── stl_validator.py │ ├── visualization │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ └── renderer.cpython-312.pyc │ │ ├── headless_renderer.py │ │ ├── renderer.py │ │ └── web_interface.py │ └── workflow │ ├── image_approval.py │ └── multi_view_to_model_pipeline.py ├── test_complete_workflow.py ├── test_cuda_mvs.py ├── test_gemini_api.py ├── test_image_approval_workflow.py ├── test_image_approval.py ├── test_image_to_model_pipeline.py ├── test_model_selection.py ├── test_multi_view_pipeline.py ├── test_primitives.sh ├── test_rabbit_direct.py ├── test_remote_cuda_mvs.py └── test_venice_example.py ``` # Files -------------------------------------------------------------------------------- /test_multi_view_pipeline.py: -------------------------------------------------------------------------------- ```python """ Test script for multi-view to model pipeline. """ import os import sys import logging import unittest from unittest.mock import patch, MagicMock from pathlib import Path # Add the src directory to the path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.workflow.multi_view_to_model_pipeline import MultiViewToModelPipeline from src.ai.gemini_api import GeminiImageGenerator from src.models.cuda_mvs import CUDAMultiViewStereo from src.workflow.image_approval import ImageApprovalTool # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TestMultiViewPipeline(unittest.TestCase): """ Test cases for multi-view to model pipeline. """ def setUp(self): """ Set up test environment. """ # Create test directories self.test_output_dir = "output/test_pipeline" self.test_images_dir = os.path.join(self.test_output_dir, "images") self.test_models_dir = os.path.join(self.test_output_dir, "models") for directory in [self.test_output_dir, self.test_images_dir, self.test_models_dir]: os.makedirs(directory, exist_ok=True) # Create mock CUDA MVS path self.cuda_mvs_path = "mock_cuda_mvs" os.makedirs(os.path.join(self.cuda_mvs_path, "build"), exist_ok=True) # Create mock executable with open(os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), "w") as f: f.write("#!/bin/bash\necho 'Mock CUDA MVS'\n") os.chmod(os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), 0o755) # Create mock components self.mock_gemini = MagicMock(spec=GeminiImageGenerator) self.mock_cuda_mvs = MagicMock(spec=CUDAMultiViewStereo) self.mock_approval = MagicMock(spec=ImageApprovalTool) # Configure mock responses self.configure_mocks() # Create the pipeline with mock components self.pipeline = MultiViewToModelPipeline( gemini_generator=self.mock_gemini, cuda_mvs=self.mock_cuda_mvs, approval_tool=self.mock_approval, output_dir=self.test_output_dir ) def configure_mocks(self): """ Configure mock responses for components. """ # Mock Gemini image generation def mock_generate_image(prompt, **kwargs): image_path = os.path.join(self.test_images_dir, f"{prompt[:10].replace(' ', '_')}.png") with open(image_path, "w") as f: f.write(f"Mock image for {prompt}") return { "prompt": prompt, "local_path": image_path, "image_data": b"mock_image_data" } def mock_generate_multiple_views(prompt, num_views, **kwargs): results = [] for i in range(num_views): image_path = os.path.join(self.test_images_dir, f"view_{i}.png") with open(image_path, "w") as f: f.write(f"Mock image for {prompt} - view {i}") results.append({ "prompt": f"{prompt} - view {i}", "local_path": image_path, "image_data": b"mock_image_data", "view_direction": f"view {i}", "view_index": i + 1 }) return results self.mock_gemini.generate_image.side_effect = mock_generate_image self.mock_gemini.generate_multiple_views.side_effect = mock_generate_multiple_views # Mock CUDA MVS def mock_generate_model(image_paths, **kwargs): model_dir = os.path.join(self.test_models_dir, "mock_model") os.makedirs(model_dir, exist_ok=True) point_cloud_file = os.path.join(model_dir, "mock_model.ply") with open(point_cloud_file, "w") as f: f.write("Mock point cloud") obj_file = os.path.join(model_dir, "mock_model.obj") with open(obj_file, "w") as f: f.write("Mock OBJ file") return { "model_id": "mock_model", "output_dir": model_dir, "point_cloud_file": point_cloud_file, "obj_file": obj_file, "input_images": image_paths } self.mock_cuda_mvs.generate_model_from_images.side_effect = mock_generate_model self.mock_cuda_mvs.convert_ply_to_obj.return_value = os.path.join(self.test_models_dir, "mock_model", "mock_model.obj") # Mock approval tool def mock_present_image(image_path, metadata): return { "approval_id": os.path.basename(image_path).split('.')[0], "image_path": image_path, "image_url": f"/images/{os.path.basename(image_path)}", "metadata": metadata or {} } def mock_process_approval(approval_id, approved, image_path): if approved: approved_path = os.path.join(self.test_output_dir, "approved", os.path.basename(image_path)) os.makedirs(os.path.dirname(approved_path), exist_ok=True) with open(approved_path, "w") as f: f.write(f"Approved image {approval_id}") return { "approval_id": approval_id, "approved": True, "original_path": image_path, "approved_path": approved_path } else: return { "approval_id": approval_id, "approved": False, "original_path": image_path } self.mock_approval.present_image_for_approval.side_effect = mock_present_image self.mock_approval.process_approval.side_effect = mock_process_approval self.mock_approval.get_approved_images.return_value = [ os.path.join(self.test_output_dir, "approved", f"view_{i}.png") for i in range(3) ] def test_generate_model_from_text(self): """ Test generating a 3D model from text prompt. """ # Test parameters prompt = "A low-poly rabbit" num_views = 3 # Mock approvals - approve all images def mock_get_approval(approval_request): return True # Call the method result = self.pipeline.generate_model_from_text( prompt, num_views=num_views, get_approval_callback=mock_get_approval ) # Verify the result self.assertIsNotNone(result) self.assertTrue("model_id" in result) self.assertTrue("obj_file" in result) self.assertTrue("point_cloud_file" in result) # Verify component calls self.mock_gemini.generate_multiple_views.assert_called_once_with( prompt, num_views=num_views, output_dir=os.path.join(self.test_output_dir, "multi_view") ) self.assertEqual(self.mock_approval.present_image_for_approval.call_count, num_views) self.assertEqual(self.mock_approval.process_approval.call_count, num_views) self.mock_cuda_mvs.generate_model_from_images.assert_called_once() self.mock_cuda_mvs.convert_ply_to_obj.assert_called_once() def test_generate_model_from_image(self): """ Test generating a 3D model from a base image. """ # Create a mock base image base_image_path = os.path.join(self.test_images_dir, "base_image.png") with open(base_image_path, "w") as f: f.write("Mock base image") # Test parameters prompt = "A low-poly rabbit based on this image" num_views = 3 # Mock approvals - approve all images def mock_get_approval(approval_request): return True # Call the method result = self.pipeline.generate_model_from_image( base_image_path, prompt, num_views=num_views, get_approval_callback=mock_get_approval ) # Verify the result self.assertIsNotNone(result) self.assertTrue("model_id" in result) self.assertTrue("obj_file" in result) self.assertTrue("point_cloud_file" in result) # Verify component calls self.mock_gemini.generate_multiple_views.assert_called_once_with( prompt, num_views=num_views, base_image_path=base_image_path, output_dir=os.path.join(self.test_output_dir, "multi_view") ) self.assertEqual(self.mock_approval.present_image_for_approval.call_count, num_views) self.assertEqual(self.mock_approval.process_approval.call_count, num_views) self.mock_cuda_mvs.generate_model_from_images.assert_called_once() self.mock_cuda_mvs.convert_ply_to_obj.assert_called_once() def test_selective_approval(self): """ Test selective approval of generated images. """ # Test parameters prompt = "A low-poly rabbit" num_views = 4 # Mock approvals - only approve views 0 and 2 def mock_get_approval(approval_request): view_index = int(approval_request["approval_id"].split('_')[1]) return view_index % 2 == 0 # Approve even-indexed views # Call the method result = self.pipeline.generate_model_from_text( prompt, num_views=num_views, get_approval_callback=mock_get_approval ) # Verify the result self.assertIsNotNone(result) # Verify component calls self.assertEqual(self.mock_approval.present_image_for_approval.call_count, num_views) self.assertEqual(self.mock_approval.process_approval.call_count, num_views) # Only 2 images should be approved and used for model generation approved_images = [call[0][0] for call in self.mock_cuda_mvs.generate_model_from_images.call_args_list] if approved_images: self.assertEqual(len(approved_images[0]), 2) # Only 2 images approved def test_error_handling(self): """ Test error handling in the pipeline. """ # Test parameters prompt = "A low-poly rabbit" # Mock error in Gemini API self.mock_gemini.generate_multiple_views.side_effect = Exception("Mock API error") # Call the method and expect an exception with self.assertRaises(Exception): self.pipeline.generate_model_from_text(prompt) def tearDown(self): """ Clean up after tests. """ # Clean up test output directory import shutil if os.path.exists(self.test_output_dir): shutil.rmtree(self.test_output_dir) # Clean up mock CUDA MVS path if os.path.exists(self.cuda_mvs_path): shutil.rmtree(self.cuda_mvs_path) if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /src/main_remote.py: -------------------------------------------------------------------------------- ```python """ Main module for OpenSCAD MCP Server with remote CUDA MVS processing. This module adds remote CUDA MVS processing capabilities to the MCP server. """ import os import logging from typing import Dict, Any, List, Optional # Import remote processing components from src.remote.cuda_mvs_client import CUDAMVSClient from src.remote.connection_manager import CUDAMVSConnectionManager from src.config import REMOTE_CUDA_MVS # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize remote processing components if enabled remote_connection_manager = None remote_jobs = {} def initialize_remote_processing(): """ Initialize remote CUDA MVS processing components. Returns: CUDAMVSConnectionManager instance if enabled, None otherwise """ global remote_connection_manager if REMOTE_CUDA_MVS["ENABLED"]: logger.info("Initializing remote CUDA MVS connection manager") remote_connection_manager = CUDAMVSConnectionManager( api_key=REMOTE_CUDA_MVS["API_KEY"], discovery_port=REMOTE_CUDA_MVS["DISCOVERY_PORT"], use_lan_discovery=REMOTE_CUDA_MVS["USE_LAN_DISCOVERY"], server_url=REMOTE_CUDA_MVS["SERVER_URL"] if REMOTE_CUDA_MVS["SERVER_URL"] else None ) return remote_connection_manager return None def discover_remote_servers(): """ Discover remote CUDA MVS servers on the network. Returns: List of discovered servers """ if not remote_connection_manager: logger.warning("Remote CUDA MVS processing is not enabled") return [] return remote_connection_manager.discover_servers() def get_server_status(server_id: str): """ Get the status of a remote CUDA MVS server. Args: server_id: ID of the server to get status for Returns: Server status information """ if not remote_connection_manager: logger.warning("Remote CUDA MVS processing is not enabled") return None return remote_connection_manager.get_server_status(server_id) def upload_images_to_server(server_id: str, image_paths: List[str], job_id: Optional[str] = None): """ Upload images to a remote CUDA MVS server. Args: server_id: ID of the server to upload to image_paths: List of image paths to upload job_id: Optional job ID to use Returns: Job information """ if not remote_connection_manager: logger.warning("Remote CUDA MVS processing is not enabled") return None return remote_connection_manager.upload_images(server_id, image_paths, job_id) def process_images_remotely(server_id: str, job_id: str, params: Dict[str, Any] = None): """ Process uploaded images on a remote CUDA MVS server. Args: server_id: ID of the server to process on job_id: Job ID of the uploaded images params: Optional processing parameters Returns: Job status information """ if not remote_connection_manager: logger.warning("Remote CUDA MVS processing is not enabled") return None # Set default parameters if not provided if params is None: params = { "quality": REMOTE_CUDA_MVS["DEFAULT_RECONSTRUCTION_QUALITY"], "output_format": REMOTE_CUDA_MVS["DEFAULT_OUTPUT_FORMAT"] } # Start processing result = remote_connection_manager.process_job(server_id, job_id, params) # Store job information if result and "job_id" in result: remote_jobs[result["job_id"]] = { "server_id": server_id, "job_id": result["job_id"], "status": result.get("status", "processing"), "params": params } return result def get_job_status(job_id: str): """ Get the status of a remote processing job. Args: job_id: ID of the job to get status for Returns: Job status information """ if not remote_connection_manager: logger.warning("Remote CUDA MVS processing is not enabled") return None # Check if job exists if job_id not in remote_jobs: logger.warning(f"Job with ID {job_id} not found") return None # Get job information job_info = remote_jobs[job_id] # Get status from server status = remote_connection_manager.get_job_status(job_info["server_id"], job_id) # Update job information if status: job_info["status"] = status.get("status", job_info["status"]) job_info["progress"] = status.get("progress", 0) job_info["message"] = status.get("message", "") return job_info def download_model(job_id: str, output_dir: Optional[str] = None): """ Download a processed model from a remote CUDA MVS server. Args: job_id: ID of the job to download model for output_dir: Optional directory to save the model to Returns: Model information """ if not remote_connection_manager: logger.warning("Remote CUDA MVS processing is not enabled") return None # Check if job exists if job_id not in remote_jobs: logger.warning(f"Job with ID {job_id} not found") return None # Get job information job_info = remote_jobs[job_id] # Set default output directory if not provided if output_dir is None: output_dir = os.path.join(REMOTE_CUDA_MVS["OUTPUT_DIR"], job_id) # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Download model result = remote_connection_manager.download_model(job_info["server_id"], job_id, output_dir) # Update job information if result: job_info["model_path"] = result.get("model_path") job_info["point_cloud_path"] = result.get("point_cloud_path") job_info["completed"] = True return result def cancel_job(job_id: str): """ Cancel a remote processing job. Args: job_id: ID of the job to cancel Returns: Cancellation result """ if not remote_connection_manager: logger.warning("Remote CUDA MVS processing is not enabled") return None # Check if job exists if job_id not in remote_jobs: logger.warning(f"Job with ID {job_id} not found") return None # Get job information job_info = remote_jobs[job_id] # Cancel job result = remote_connection_manager.cancel_job(job_info["server_id"], job_id) # Update job information if result and result.get("cancelled", False): job_info["status"] = "cancelled" job_info["message"] = "Job cancelled by user" return result # MCP tool functions for remote processing def discover_remote_cuda_mvs_servers(): """ MCP tool function to discover remote CUDA MVS servers. Returns: Dictionary with discovered servers """ servers = discover_remote_servers() return { "servers": servers, "count": len(servers) } def get_remote_server_status(server_id: str): """ MCP tool function to get the status of a remote CUDA MVS server. Args: server_id: ID of the server to get status for Returns: Dictionary with server status """ status = get_server_status(server_id) if not status: raise ValueError(f"Failed to get status for server with ID {server_id}") return status def process_images_with_remote_cuda_mvs( server_id: str, image_paths: List[str], quality: str = REMOTE_CUDA_MVS["DEFAULT_RECONSTRUCTION_QUALITY"], output_format: str = REMOTE_CUDA_MVS["DEFAULT_OUTPUT_FORMAT"], wait_for_completion: bool = REMOTE_CUDA_MVS["WAIT_FOR_COMPLETION"] ): """ MCP tool function to process images with remote CUDA MVS. Args: server_id: ID of the server to process on image_paths: List of image paths to process quality: Reconstruction quality (low, normal, high) output_format: Output format (obj, ply) wait_for_completion: Whether to wait for job completion Returns: Dictionary with job information """ # Upload images upload_result = upload_images_to_server(server_id, image_paths) if not upload_result or "job_id" not in upload_result: raise ValueError("Failed to upload images to server") job_id = upload_result["job_id"] # Process images process_result = process_images_remotely( server_id, job_id, { "quality": quality, "output_format": output_format } ) if not process_result: raise ValueError(f"Failed to process images for job {job_id}") # Wait for completion if requested if wait_for_completion: import time while True: status = get_job_status(job_id) if not status: raise ValueError(f"Failed to get status for job {job_id}") if status["status"] in ["completed", "failed", "cancelled"]: break time.sleep(REMOTE_CUDA_MVS["POLL_INTERVAL"]) if status["status"] == "completed": # Download model download_result = download_model(job_id) if not download_result: raise ValueError(f"Failed to download model for job {job_id}") return { "job_id": job_id, "status": "completed", "model_path": download_result.get("model_path"), "point_cloud_path": download_result.get("point_cloud_path") } else: return { "job_id": job_id, "status": status["status"], "message": status.get("message", "") } # Return job information without waiting return { "job_id": job_id, "status": "processing", "server_id": server_id } def get_remote_job_status(job_id: str): """ MCP tool function to get the status of a remote processing job. Args: job_id: ID of the job to get status for Returns: Dictionary with job status """ status = get_job_status(job_id) if not status: raise ValueError(f"Failed to get status for job with ID {job_id}") return status def download_remote_model(job_id: str, output_dir: Optional[str] = None): """ MCP tool function to download a processed model from a remote CUDA MVS server. Args: job_id: ID of the job to download model for output_dir: Optional directory to save the model to Returns: Dictionary with model information """ result = download_model(job_id, output_dir) if not result: raise ValueError(f"Failed to download model for job with ID {job_id}") return result def cancel_remote_job(job_id: str): """ MCP tool function to cancel a remote processing job. Args: job_id: ID of the job to cancel Returns: Dictionary with cancellation result """ result = cancel_job(job_id) if not result: raise ValueError(f"Failed to cancel job with ID {job_id}") return result ``` -------------------------------------------------------------------------------- /src/models/code_generator.py: -------------------------------------------------------------------------------- ```python import os import logging import uuid from typing import Dict, Any, List, Tuple, Optional logger = logging.getLogger(__name__) class CodeGenerator: """ Generates OpenSCAD code from natural language descriptions and parameters. Implements translation of requirements to OpenSCAD primitives and modules. """ def __init__(self, scad_templates_dir: str, output_dir: str, ai_service=None): """ Initialize the code generator. Args: scad_templates_dir: Directory containing SCAD template files output_dir: Directory to store generated SCAD files ai_service: Optional AI service for enhanced code generation """ self.scad_templates_dir = scad_templates_dir self.output_dir = output_dir self.ai_service = ai_service # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Map of shape types to their corresponding module names self.shape_module_map = { 'cube': 'parametric_cube', 'sphere': 'parametric_sphere', 'cylinder': 'parametric_cylinder', 'box': 'hollow_box', 'rounded_box': 'rounded_box', 'container': 'rounded_container', 'tube': 'tube', 'cone': 'cone', 'wedge': 'wedge', 'rounded_cylinder': 'rounded_cylinder', 'torus': 'torus', 'hexagonal_prism': 'hexagonal_prism', 'text': 'text_3d', 'prism': 'triangular_prism', 'custom': 'custom_shape' } # Parameter mapping from natural language to OpenSCAD parameters self.parameter_map = { 'width': 'width', 'depth': 'depth', 'height': 'height', 'radius': 'radius', 'thickness': 'thickness', 'segments': 'segments', 'center': 'center', 'inner_radius': 'inner_radius', 'outer_radius': 'outer_radius', 'corner_radius': 'corner_radius', 'text': 'text', 'size': 'size', 'font': 'font', 'base_radius': 'base_radius', 'major_radius': 'major_radius', 'minor_radius': 'minor_radius', 'angle': 'angle', 'scale': 'scale', 'resolution': 'resolution' } def generate_code(self, model_type: str, parameters: Dict[str, Any], description: Optional[str] = None) -> str: """ Generate OpenSCAD code for a given model type and parameters. Args: model_type: Type of model to generate parameters: Dictionary of parameters for the model description: Optional natural language description for AI-driven generation Returns: Path to the generated SCAD file """ # Generate a unique ID for the model model_id = str(uuid.uuid4()) scad_file = os.path.join(self.output_dir, f"{model_id}.scad") # Check if we should use AI-driven generation for complex models if model_type == 'custom' and description and self.ai_service: scad_code = self._generate_ai_driven_code(description, parameters) else: # Get the module name for the model type module_name = self.shape_module_map.get(model_type) if not module_name: raise ValueError(f"Unsupported model type: {model_type}") # Map parameters to OpenSCAD parameter names scad_params = self._map_parameters(parameters) # Generate the OpenSCAD code scad_code = self._generate_scad_code(module_name, scad_params) # Write the code to a file with open(scad_file, 'w') as f: f.write(scad_code) logger.info(f"Generated OpenSCAD code: {scad_file}") return scad_file def update_code(self, scad_file: str, parameters: Dict[str, Any]) -> str: """ Update an existing SCAD file with new parameters. Args: scad_file: Path to the SCAD file to update parameters: New parameters to apply Returns: Path to the updated SCAD file """ if not os.path.exists(scad_file): raise FileNotFoundError(f"SCAD file not found: {scad_file}") # Read the existing SCAD file with open(scad_file, 'r') as f: scad_code = f.read() # Determine the module name from the code module_name = None for shape_type, module in self.shape_module_map.items(): if module in scad_code: module_name = module break if not module_name: raise ValueError("Could not determine module name from existing SCAD file") # Map parameters to OpenSCAD parameter names scad_params = self._map_parameters(parameters) # Generate the updated OpenSCAD code updated_code = self._generate_scad_code(module_name, scad_params) # Write the updated code to the file with open(scad_file, 'w') as f: f.write(updated_code) logger.info(f"Updated OpenSCAD code: {scad_file}") return scad_file def combine_models(self, operations: List[Dict[str, Any]]) -> str: """ Combine multiple models using CSG operations. Args: operations: List of operations, each containing: - model_type: Type of model - parameters: Parameters for the model - operation: CSG operation (union, difference, intersection) - transform: Optional transformation to apply Returns: Path to the generated SCAD file """ # Generate a unique ID for the combined model model_id = str(uuid.uuid4()) scad_file = os.path.join(self.output_dir, f"{model_id}.scad") # Include the basic shapes library scad_code = f"""// Combined model include <{os.path.join(self.scad_templates_dir, "basic_shapes.scad")}>; """ # Process each operation current_op = None for i, op in enumerate(operations): model_type = op.get('model_type') parameters = op.get('parameters', {}) operation = op.get('operation') transform = op.get('transform') # Get the module name for the model type if model_type is None: raise ValueError("Model type cannot be None") module_name = self.shape_module_map.get(str(model_type)) if not module_name: raise ValueError(f"Unsupported model type: {model_type}") # Map parameters to OpenSCAD parameter names scad_params = self._map_parameters(parameters) # Format parameters for the module call params_str = ", ".join([f"{k}={v}" for k, v in scad_params.items()]) # Start or continue the CSG operation chain if i == 0: # First operation doesn't need an operator if operation: current_op = operation scad_code += f"{operation}() {{\n" # Add the module call with optional transformation if transform: scad_code += f" {transform} {module_name}({params_str});\n" else: scad_code += f" {module_name}({params_str});\n" else: # Check if we need to close the previous operation and start a new one if operation and operation != current_op: if current_op: scad_code += "}\n\n" current_op = operation scad_code += f"{operation}() {{\n" # Add the module call with optional transformation if transform: scad_code += f" {transform} {module_name}({params_str});\n" else: scad_code += f" {module_name}({params_str});\n" # Close the final operation if needed if current_op: scad_code += "}\n" # Write the code to a file with open(scad_file, 'w') as f: f.write(scad_code) logger.info(f"Generated combined OpenSCAD code: {scad_file}") return scad_file def _map_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]: """Map natural language parameters to OpenSCAD parameters.""" scad_params = {} for param, value in parameters.items(): # Map the parameter name if it exists in the mapping scad_param = self.parameter_map.get(param, param) # Format the value appropriately for OpenSCAD if isinstance(value, bool): scad_params[scad_param] = str(value).lower() elif isinstance(value, str): if value.lower() == 'true' or value.lower() == 'false': scad_params[scad_param] = value.lower() else: # For text parameters, add quotes scad_params[scad_param] = f'"{value}"' else: scad_params[scad_param] = value return scad_params def _generate_scad_code(self, module_name: str, parameters: Dict[str, Any]) -> str: """Generate OpenSCAD code for a module with parameters.""" # Include the basic shapes library scad_code = f"""// Generated OpenSCAD code include <{os.path.join(self.scad_templates_dir, "basic_shapes.scad")}>; // Parameters """ # Add parameter declarations for param, value in parameters.items(): scad_code += f"{param} = {value};\n" # Add the module call scad_code += f"\n// Model\n{module_name}(" # Add parameters to the module call param_list = [f"{param}={param}" for param in parameters.keys()] scad_code += ", ".join(param_list) scad_code += ");\n" return scad_code def _generate_ai_driven_code(self, description: str, parameters: Dict[str, Any]) -> str: """ Generate OpenSCAD code using AI-driven techniques based on natural language description. Args: description: Natural language description of the model parameters: Dictionary of parameters for the model Returns: Generated OpenSCAD code """ if not self.ai_service: logger.warning("AI service not available, falling back to basic shape generation") # Fall back to a basic cube if AI service is not available return self._generate_scad_code('parametric_cube', {'width': 10, 'height': 10, 'depth': 10}) try: # Use the AI service to generate OpenSCAD code logger.info(f"Generating OpenSCAD code from description: {description}") # Prepare context for the AI service context = { "description": description, "parameters": parameters, "templates_dir": self.scad_templates_dir } # Call the AI service to generate code scad_code = self.ai_service.generate_openscad_code(context) # Ensure the code includes the basic shapes library if "include <" not in scad_code: scad_code = f"""// AI-generated OpenSCAD code include <{os.path.join(self.scad_templates_dir, "basic_shapes.scad")}>; {scad_code} """ return scad_code except Exception as e: logger.error(f"Error generating AI-driven code: {e}") # Fall back to a basic shape if there's an error return self._generate_scad_code('parametric_cube', {'width': 10, 'height': 10, 'depth': 10}) ``` -------------------------------------------------------------------------------- /test_complete_workflow.py: -------------------------------------------------------------------------------- ```python """ Test script for the complete workflow from text to 3D model. """ import os import sys import logging import unittest from unittest.mock import patch, MagicMock from pathlib import Path # Add the src directory to the path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.ai.gemini_api import GeminiImageGenerator from src.workflow.image_approval import ImageApprovalManager from src.models.cuda_mvs import CUDAMultiViewStereo from src.workflow.multi_view_to_model_pipeline import MultiViewToModelPipeline from src.config import MULTI_VIEW_PIPELINE, IMAGE_APPROVAL, REMOTE_CUDA_MVS # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TestCompleteWorkflow(unittest.TestCase): """ Test cases for the complete workflow from text to 3D model. """ def setUp(self): """ Set up test environment. """ # Create test output directories self.test_output_dir = "output/test_complete_workflow" self.test_images_dir = os.path.join(self.test_output_dir, "images") self.test_multi_view_dir = os.path.join(self.test_output_dir, "multi_view") self.test_approved_dir = os.path.join(self.test_output_dir, "approved") self.test_models_dir = os.path.join(self.test_output_dir, "models") os.makedirs(self.test_output_dir, exist_ok=True) os.makedirs(self.test_images_dir, exist_ok=True) os.makedirs(self.test_multi_view_dir, exist_ok=True) os.makedirs(self.test_approved_dir, exist_ok=True) os.makedirs(self.test_models_dir, exist_ok=True) # Mock API key self.api_key = "test_api_key" # Create the components self.image_generator = GeminiImageGenerator( api_key=self.api_key, output_dir=self.test_images_dir ) self.approval_manager = ImageApprovalManager( output_dir=self.test_multi_view_dir, approved_dir=self.test_approved_dir, min_approved_images=3, auto_approve=False ) self.cuda_mvs = CUDAMultiViewStereo( output_dir=self.test_models_dir, use_gpu=False ) # Create the pipeline self.pipeline = MultiViewToModelPipeline( image_generator=self.image_generator, approval_manager=self.approval_manager, model_generator=self.cuda_mvs, output_dir=self.test_output_dir, config=MULTI_VIEW_PIPELINE ) @patch('src.ai.gemini_api.requests.post') def test_generate_images_from_text(self, mock_post): """ Test generating images from text description. """ # Mock response mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { "candidates": [ { "content": { "parts": [ { "text": "Generated image description" }, { "inlineData": { "mimeType": "image/png", "data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg==" } } ] } } ] } mock_post.return_value = mock_response # Test parameters prompt = "A low-poly rabbit" num_views = 4 # Call the method results = self.pipeline.generate_images_from_text(prompt, num_views) # Verify the results self.assertEqual(len(results), num_views) for i, result in enumerate(results): self.assertTrue("view_direction" in result) self.assertEqual(result["view_index"], i + 1) self.assertTrue("local_path" in result) self.assertTrue(os.path.exists(result["local_path"])) # Verify the API calls self.assertEqual(mock_post.call_count, num_views) def test_approve_images(self): """ Test approving images in the workflow. """ # Create test images test_images = [] for i in range(4): image_path = os.path.join(self.test_multi_view_dir, f"test_image_{i}.png") with open(image_path, "w") as f: f.write(f"test image data {i}") test_images.append({ "id": f"image_{i}", "local_path": image_path, "view_index": i + 1, "view_direction": f"view_{i}" }) # Add images to the pipeline self.pipeline.add_images_for_approval(test_images) # Approve some images self.pipeline.approve_image("image_0") self.pipeline.approve_image("image_1") self.pipeline.approve_image("image_2") # Reject an image self.pipeline.reject_image("image_3") # Get the status status = self.pipeline.get_approval_status() # Verify the status self.assertEqual(status["total_images"], 4) self.assertEqual(status["pending_count"], 0) self.assertEqual(status["approved_count"], 3) self.assertEqual(status["rejected_count"], 1) self.assertTrue(status["has_minimum_approved"]) @patch('src.models.cuda_mvs.subprocess.run') def test_create_model_from_approved_images(self, mock_run): """ Test creating a 3D model from approved images. """ # Mock subprocess.run mock_process = MagicMock() mock_process.returncode = 0 mock_run.return_value = mock_process # Create test images test_images = [] for i in range(4): image_path = os.path.join(self.test_approved_dir, f"image_{i}.png") with open(image_path, "w") as f: f.write(f"test image data {i}") test_images.append({ "id": f"image_{i}", "local_path": image_path, "view_index": i + 1, "view_direction": f"view_{i}" }) # Create a mock model file model_path = os.path.join(self.test_models_dir, "test_model.obj") with open(model_path, "w") as f: f.write("test model data") # Call the method result = self.pipeline.create_model_from_approved_images("test_model") # Verify the result self.assertIsNotNone(result) self.assertTrue("model_path" in result) self.assertTrue("model_id" in result) self.assertTrue("format" in result) self.assertEqual(result["format"], "obj") @patch('src.ai.gemini_api.requests.post') @patch('src.models.cuda_mvs.subprocess.run') def test_complete_workflow(self, mock_run, mock_post): """ Test the complete workflow from text to 3D model. """ # Mock Gemini API response mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { "candidates": [ { "content": { "parts": [ { "text": "Generated image description" }, { "inlineData": { "mimeType": "image/png", "data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg==" } } ] } } ] } mock_post.return_value = mock_response # Mock subprocess.run mock_process = MagicMock() mock_process.returncode = 0 mock_run.return_value = mock_process # Create a mock model file model_path = os.path.join(self.test_models_dir, "test_model.obj") with open(model_path, "w") as f: f.write("test model data") # Create a pipeline with auto-approve auto_pipeline = MultiViewToModelPipeline( image_generator=self.image_generator, approval_manager=ImageApprovalManager( output_dir=self.test_multi_view_dir, approved_dir=self.test_approved_dir, min_approved_images=3, auto_approve=True ), model_generator=self.cuda_mvs, output_dir=self.test_output_dir, config=MULTI_VIEW_PIPELINE ) # Test parameters prompt = "A low-poly rabbit" num_views = 4 # Call the complete workflow result = auto_pipeline.complete_workflow(prompt, num_views, "test_model") # Verify the result self.assertIsNotNone(result) self.assertTrue("model_path" in result) self.assertTrue("model_id" in result) self.assertTrue("format" in result) self.assertEqual(result["format"], "obj") self.assertTrue("prompt" in result) self.assertEqual(result["prompt"], prompt) self.assertTrue("num_views" in result) self.assertEqual(result["num_views"], num_views) self.assertTrue("approved_images" in result) self.assertEqual(len(result["approved_images"]), num_views) @patch('src.remote.cuda_mvs_client.requests.post') @patch('src.remote.cuda_mvs_client.requests.get') def test_remote_workflow(self, mock_get, mock_post): """ Test the workflow with remote CUDA MVS processing. """ # Mock upload response mock_upload_response = MagicMock() mock_upload_response.status_code = 200 mock_upload_response.json.return_value = { "job_id": "test_job_123", "status": "uploaded", "message": "Images uploaded successfully" } # Mock process response mock_process_response = MagicMock() mock_process_response.status_code = 200 mock_process_response.json.return_value = { "job_id": "test_job_123", "status": "processing", "message": "Job started processing" } # Mock status response mock_status_response = MagicMock() mock_status_response.status_code = 200 mock_status_response.json.return_value = { "job_id": "test_job_123", "status": "completed", "progress": 100, "message": "Job completed successfully" } # Mock download response mock_download_response = MagicMock() mock_download_response.status_code = 200 mock_download_response.content = b"test model data" # Set up the mock responses mock_post.side_effect = [mock_upload_response, mock_process_response] mock_get.side_effect = [mock_status_response, mock_download_response] # Create test images test_images = [] for i in range(4): image_path = os.path.join(self.test_approved_dir, f"image_{i}.png") with open(image_path, "w") as f: f.write(f"test image data {i}") test_images.append({ "id": f"image_{i}", "local_path": image_path, "view_index": i + 1, "view_direction": f"view_{i}" }) # Create a remote CUDA MVS client from src.remote.cuda_mvs_client import CUDAMVSClient remote_client = CUDAMVSClient( api_key=self.api_key, output_dir=self.test_models_dir ) # Create a pipeline with the remote client remote_pipeline = MultiViewToModelPipeline( image_generator=self.image_generator, approval_manager=self.approval_manager, model_generator=remote_client, output_dir=self.test_output_dir, config=MULTI_VIEW_PIPELINE ) # Add the approved images remote_pipeline.add_images_for_approval(test_images) for image in test_images: remote_pipeline.approve_image(image["id"]) # Call the method to create a model using the remote client with patch('src.workflow.multi_view_to_model_pipeline.CUDAMVSClient', return_value=remote_client): result = remote_pipeline.create_model_from_approved_images("test_model", server_url="http://test-server:8765") # Verify the result self.assertIsNotNone(result) self.assertTrue("model_path" in result) self.assertTrue("model_id" in result) self.assertTrue("format" in result) self.assertEqual(result["format"], "obj") self.assertTrue("job_id" in result) self.assertEqual(result["job_id"], "test_job_123") def tearDown(self): """ Clean up after tests. """ # Clean up test output directory import shutil if os.path.exists(self.test_output_dir): shutil.rmtree(self.test_output_dir) if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /src/workflow/multi_view_to_model_pipeline.py: -------------------------------------------------------------------------------- ```python """ Workflow orchestration for the multi-view to model pipeline. """ import os import logging import uuid from typing import Dict, Any, List, Optional from pathlib import Path logger = logging.getLogger(__name__) class MultiViewToModelPipeline: """ Orchestrates the workflow from image or text prompt to 3D model: 1. Generate image with Venice.ai or Google Gemini (optional) 2. Generate multiple views with Google Gemini 3. Process user approval of views 4. Create 3D model with CUDA Multi-View Stereo 5. Convert to OpenSCAD for parametric editing (optional) """ def __init__(self, gemini_generator=None, venice_generator=None, cuda_mvs=None, openscad_wrapper=None, approval_tool=None, output_dir: str = "output/pipeline"): """ Initialize the pipeline. Args: gemini_generator: Instance of GeminiImageGenerator venice_generator: Instance of VeniceImageGenerator (optional) cuda_mvs: Instance of CUDAMultiViewStereo openscad_wrapper: Instance of OpenSCADWrapper (optional) approval_tool: Instance of ImageApprovalTool output_dir: Directory to store output files """ self.gemini_generator = gemini_generator self.venice_generator = venice_generator self.cuda_mvs = cuda_mvs self.openscad_wrapper = openscad_wrapper self.approval_tool = approval_tool self.output_dir = output_dir # Create output directories os.makedirs(os.path.join(output_dir, "images"), exist_ok=True) os.makedirs(os.path.join(output_dir, "multi_view"), exist_ok=True) os.makedirs(os.path.join(output_dir, "approved"), exist_ok=True) os.makedirs(os.path.join(output_dir, "models"), exist_ok=True) os.makedirs(os.path.join(output_dir, "scad"), exist_ok=True) def generate_model_from_text(self, prompt: str, use_venice: bool = False, num_views: int = 4, gemini_params: Optional[Dict[str, Any]] = None, venice_params: Optional[Dict[str, Any]] = None, cuda_mvs_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Generate a 3D model from a text prompt. Args: prompt: Text description for image generation use_venice: Whether to use Venice.ai for initial image num_views: Number of views to generate gemini_params: Optional parameters for Google Gemini venice_params: Optional parameters for Venice.ai cuda_mvs_params: Optional parameters for CUDA MVS Returns: Dictionary containing paths to generated files and metadata """ try: # Generate a unique ID for this pipeline run pipeline_id = str(uuid.uuid4()) logger.info(f"Starting pipeline {pipeline_id} for prompt: {prompt}") # Step 1: Generate initial image if use_venice and self.venice_generator: # Use Venice.ai for initial image logger.info("Using Venice.ai for initial image generation") image_path = os.path.join(self.output_dir, "images", f"{pipeline_id}_venice.png") initial_result = self.venice_generator.generate_image( prompt=prompt, output_path=image_path, **(venice_params or {}) ) else: # Use Google Gemini for initial image logger.info("Using Google Gemini for initial image generation") image_path = os.path.join(self.output_dir, "images", f"{pipeline_id}_gemini.png") initial_result = self.gemini_generator.generate_image( prompt=prompt, output_path=image_path, **(gemini_params or {}) ) # Step 2: Generate multiple views logger.info(f"Generating {num_views} views with Google Gemini") multi_view_dir = os.path.join(self.output_dir, "multi_view", pipeline_id) multi_views = self.gemini_generator.generate_multiple_views( prompt=prompt, num_views=num_views, base_image_path=image_path, output_dir=multi_view_dir ) # Step 3: Present images for approval # In a real implementation, this would be handled by the MCP client # through the MCP tools interface logger.info("Preparing images for approval") approval_requests = [] for view in multi_views: approval_request = self.approval_tool.present_image_for_approval( image_path=view["local_path"], metadata={ "prompt": view.get("prompt"), "view_direction": view.get("view_direction"), "view_index": view.get("view_index") } ) approval_requests.append(approval_request) # For the purpose of this implementation, we'll assume all views are approved # In a real implementation, this would be handled by the MCP client approved_images = [] for req in approval_requests: approval_result = self.approval_tool.process_approval( approval_id=req["approval_id"], approved=True, image_path=req["image_path"] ) if approval_result["approved"]: approved_images.append(approval_result["approved_path"]) # Step 4: Generate 3D model with CUDA MVS logger.info("Generating 3D model with CUDA MVS") model_result = self.cuda_mvs.generate_model_from_images( image_paths=approved_images, output_name=pipeline_id, **(cuda_mvs_params or {}) ) # Step 5: Convert to OpenSCAD (if wrapper is available) scad_result = None if self.openscad_wrapper and model_result.get("point_cloud_file"): logger.info("Converting to OpenSCAD") scad_result = self._convert_to_openscad(model_result["point_cloud_file"], pipeline_id) # Compile results result = { "pipeline_id": pipeline_id, "prompt": prompt, "initial_image": initial_result, "multi_views": multi_views, "approved_images": approved_images, "model_3d": model_result, } if scad_result: result["openscad"] = scad_result logger.info(f"Pipeline {pipeline_id} completed successfully") return result except Exception as e: logger.error(f"Error in pipeline: {str(e)}") raise def generate_model_from_image(self, image_path: str, prompt: Optional[str] = None, num_views: int = 4, gemini_params: Optional[Dict[str, Any]] = None, cuda_mvs_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Generate a 3D model from an existing image. Args: image_path: Path to input image prompt: Optional text description to guide multi-view generation num_views: Number of views to generate gemini_params: Optional parameters for Google Gemini cuda_mvs_params: Optional parameters for CUDA MVS Returns: Dictionary containing paths to generated files and metadata """ try: # Generate a unique ID for this pipeline run pipeline_id = str(uuid.uuid4()) logger.info(f"Starting pipeline {pipeline_id} from image: {image_path}") # Use provided prompt or generate one from the image if not prompt: # In a real implementation, you might use an image captioning model # to generate a description of the image prompt = f"3D object in the image {os.path.basename(image_path)}" # Step 1: Generate multiple views logger.info(f"Generating {num_views} views with Google Gemini") multi_view_dir = os.path.join(self.output_dir, "multi_view", pipeline_id) multi_views = self.gemini_generator.generate_multiple_views( prompt=prompt, num_views=num_views, base_image_path=image_path, output_dir=multi_view_dir ) # Step 2: Present images for approval logger.info("Preparing images for approval") approval_requests = [] for view in multi_views: approval_request = self.approval_tool.present_image_for_approval( image_path=view["local_path"], metadata={ "prompt": view.get("prompt"), "view_direction": view.get("view_direction"), "view_index": view.get("view_index") } ) approval_requests.append(approval_request) # For the purpose of this implementation, we'll assume all views are approved approved_images = [] for req in approval_requests: approval_result = self.approval_tool.process_approval( approval_id=req["approval_id"], approved=True, image_path=req["image_path"] ) if approval_result["approved"]: approved_images.append(approval_result["approved_path"]) # Step 3: Generate 3D model with CUDA MVS logger.info("Generating 3D model with CUDA MVS") model_result = self.cuda_mvs.generate_model_from_images( image_paths=approved_images, output_name=pipeline_id, **(cuda_mvs_params or {}) ) # Step 4: Convert to OpenSCAD (if wrapper is available) scad_result = None if self.openscad_wrapper and model_result.get("point_cloud_file"): logger.info("Converting to OpenSCAD") scad_result = self._convert_to_openscad(model_result["point_cloud_file"], pipeline_id) # Compile results result = { "pipeline_id": pipeline_id, "prompt": prompt, "input_image": image_path, "multi_views": multi_views, "approved_images": approved_images, "model_3d": model_result, } if scad_result: result["openscad"] = scad_result logger.info(f"Pipeline {pipeline_id} completed successfully") return result except Exception as e: logger.error(f"Error in pipeline: {str(e)}") raise def _convert_to_openscad(self, model_path: str, model_id: str) -> Dict[str, Any]: """ Convert 3D model to OpenSCAD format. Args: model_path: Path to input model (PLY file) model_id: Unique identifier for the model Returns: Dictionary containing paths to generated files """ logger.info(f"Converting model to OpenSCAD: {model_path}") # Convert PLY to OBJ if needed if model_path.endswith('.ply'): obj_path = self.cuda_mvs.convert_ply_to_obj(model_path) model_path = obj_path # Generate OpenSCAD code for importing the model scad_code = f"""// Generated OpenSCAD code for model {model_id} // Imported from {os.path.basename(model_path)} // Parameters scale_factor = 1.0; position_x = 0; position_y = 0; position_z = 0; rotation_x = 0; rotation_y = 0; rotation_z = 0; // Import and transform the model translate([position_x, position_y, position_z]) rotate([rotation_x, rotation_y, rotation_z]) scale(scale_factor) import("{model_path}"); """ # Save SCAD code to file scad_file = self.openscad_wrapper.generate_scad(scad_code, model_id) # Generate previews previews = self.openscad_wrapper.generate_multi_angle_previews(scad_file) return { "scad_file": scad_file, "previews": previews, "model_path": model_path } def process_approval_results(self, approval_results: List[Dict[str, Any]]) -> List[str]: """ Process approval results from the MCP client. Args: approval_results: List of approval results from the client Returns: List of paths to approved images """ approved_images = [] for result in approval_results: if result.get("approved", False) and "approved_path" in result: approved_images.append(result["approved_path"]) return approved_images ``` -------------------------------------------------------------------------------- /src/remote/error_handling.py: -------------------------------------------------------------------------------- ```python """ Error handling and retry mechanisms for remote CUDA MVS processing. This module provides utilities for handling network errors, implementing retry mechanisms, and ensuring robust communication with remote CUDA MVS servers. """ import time import random import logging import functools from typing import Callable, Any, Type, Union, List, Dict, Optional, Tuple import requests # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define common exception types for network operations NETWORK_EXCEPTIONS = ( requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.HTTPError, requests.exceptions.RequestException, ConnectionRefusedError, TimeoutError, ) def retry_with_backoff( max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, exception_types: Tuple[Type[Exception], ...] = NETWORK_EXCEPTIONS, jitter_factor: float = 0.1, logger_instance: Optional[logging.Logger] = None ) -> Callable: """ Decorator for retrying a function with exponential backoff. Args: max_retries: Maximum number of retries base_delay: Base delay in seconds max_delay: Maximum delay in seconds exception_types: Tuple of exception types to catch and retry jitter_factor: Factor for random jitter (0.0 to 1.0) logger_instance: Logger instance to use (uses module logger if None) Returns: Decorator function """ log = logger_instance or logger def decorator(func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args, **kwargs) -> Any: retries = 0 last_exception = None while True: try: return func(*args, **kwargs) except exception_types as e: retries += 1 last_exception = e if retries > max_retries: log.error(f"Max retries ({max_retries}) exceeded: {str(e)}") raise # Calculate delay with exponential backoff delay = min(max_delay, base_delay * (2 ** (retries - 1))) # Add jitter to avoid thundering herd problem jitter = random.uniform(0, jitter_factor * delay) sleep_time = delay + jitter log.warning(f"Retry {retries}/{max_retries} after {sleep_time:.2f}s: {str(e)}") time.sleep(sleep_time) except Exception as e: # Don't retry other exceptions log.error(f"Non-retryable exception: {str(e)}") raise return wrapper return decorator def timeout_handler( timeout: float, default_value: Any = None, exception_types: Tuple[Type[Exception], ...] = (TimeoutError, requests.exceptions.Timeout), logger_instance: Optional[logging.Logger] = None ) -> Callable: """ Decorator for handling timeouts in function calls. Args: timeout: Timeout in seconds default_value: Value to return if timeout occurs exception_types: Tuple of exception types to catch as timeouts logger_instance: Logger instance to use (uses module logger if None) Returns: Decorator function """ log = logger_instance or logger def decorator(func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args, **kwargs) -> Any: import signal def timeout_handler(signum, frame): raise TimeoutError(f"Function {func.__name__} timed out after {timeout} seconds") # Set timeout using signal original_handler = signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(int(timeout)) try: return func(*args, **kwargs) except exception_types as e: log.warning(f"Timeout in {func.__name__}: {str(e)}") return default_value finally: # Reset signal handler and alarm signal.signal(signal.SIGALRM, original_handler) signal.alarm(0) return wrapper return decorator class NetworkErrorTracker: """ Tracks network errors and provides information about error patterns. This class helps identify persistent network issues and can be used to make decisions about server availability. """ def __init__( self, error_window: int = 10, error_threshold: float = 0.5, reset_after: int = 100 ): """ Initialize the error tracker. Args: error_window: Number of recent requests to consider error_threshold: Error rate threshold to consider a server problematic reset_after: Number of successful requests after which to reset error count """ self.error_window = error_window self.error_threshold = error_threshold self.reset_after = reset_after self.requests = [] # List of (timestamp, success) tuples self.consecutive_successes = 0 self.consecutive_failures = 0 def record_request(self, success: bool) -> None: """ Record the result of a request. Args: success: Whether the request was successful """ timestamp = time.time() self.requests.append((timestamp, success)) # Trim old requests outside the window self._trim_old_requests() # Update consecutive counters if success: self.consecutive_successes += 1 self.consecutive_failures = 0 # Reset error count after enough consecutive successes if self.consecutive_successes >= self.reset_after: self.requests = [(timestamp, True)] self.consecutive_successes = 1 else: self.consecutive_failures += 1 self.consecutive_successes = 0 def _trim_old_requests(self) -> None: """ Remove requests that are outside the current window. """ if len(self.requests) > self.error_window: self.requests = self.requests[-self.error_window:] def get_error_rate(self) -> float: """ Get the current error rate. Returns: Error rate as a float between 0.0 and 1.0 """ if not self.requests: return 0.0 failures = sum(1 for _, success in self.requests if not success) return failures / len(self.requests) def is_server_problematic(self) -> bool: """ Check if the server is experiencing persistent issues. Returns: True if the server is problematic, False otherwise """ return self.get_error_rate() >= self.error_threshold def get_status(self) -> Dict[str, Any]: """ Get the current status of the error tracker. Returns: Dictionary with status information """ return { "error_rate": self.get_error_rate(), "is_problematic": self.is_server_problematic(), "consecutive_successes": self.consecutive_successes, "consecutive_failures": self.consecutive_failures, "total_requests": len(self.requests), "recent_failures": sum(1 for _, success in self.requests if not success) } class CircuitBreaker: """ Circuit breaker pattern implementation for network requests. This class helps prevent cascading failures by stopping requests to a problematic server until it recovers. """ # Circuit states CLOSED = "closed" # Normal operation OPEN = "open" # No requests allowed HALF_OPEN = "half_open" # Testing if service is back def __init__( self, failure_threshold: int = 5, recovery_timeout: float = 30.0, reset_timeout: float = 60.0, logger_instance: Optional[logging.Logger] = None ): """ Initialize the circuit breaker. Args: failure_threshold: Number of consecutive failures before opening recovery_timeout: Time in seconds before testing recovery reset_timeout: Time in seconds before fully resetting logger_instance: Logger instance to use (uses module logger if None) """ self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.reset_timeout = reset_timeout self.log = logger_instance or logger self.state = self.CLOSED self.failure_count = 0 self.last_failure_time = 0 self.last_success_time = 0 def record_success(self) -> None: """ Record a successful request. """ self.last_success_time = time.time() if self.state == self.HALF_OPEN: self.log.info("Circuit breaker reset to closed state after successful test request") self.state = self.CLOSED self.failure_count = 0 elif self.state == self.CLOSED: # Reset failure count after a successful request self.failure_count = 0 def record_failure(self) -> None: """ Record a failed request. """ self.last_failure_time = time.time() if self.state == self.CLOSED: self.failure_count += 1 if self.failure_count >= self.failure_threshold: self.log.warning(f"Circuit breaker opened after {self.failure_count} consecutive failures") self.state = self.OPEN elif self.state == self.HALF_OPEN: self.log.warning("Circuit breaker opened again after failed test request") self.state = self.OPEN def allow_request(self) -> bool: """ Check if a request should be allowed. Returns: True if the request should be allowed, False otherwise """ if self.state == self.CLOSED: return True if self.state == self.OPEN: # Check if recovery timeout has elapsed if time.time() - self.last_failure_time > self.recovery_timeout: self.log.info("Circuit breaker entering half-open state to test service") self.state = self.HALF_OPEN return True return False # In HALF_OPEN state, allow only one request return True def reset(self) -> None: """ Reset the circuit breaker to closed state. """ self.state = self.CLOSED self.failure_count = 0 self.log.info("Circuit breaker manually reset to closed state") def get_status(self) -> Dict[str, Any]: """ Get the current status of the circuit breaker. Returns: Dictionary with status information """ now = time.time() return { "state": self.state, "failure_count": self.failure_count, "time_since_last_failure": now - self.last_failure_time if self.last_failure_time > 0 else None, "time_since_last_success": now - self.last_success_time if self.last_success_time > 0 else None, "recovery_timeout": self.recovery_timeout, "reset_timeout": self.reset_timeout } def safe_request( url: str, method: str = "GET", circuit_breaker: Optional[CircuitBreaker] = None, error_tracker: Optional[NetworkErrorTracker] = None, retry_count: int = 3, timeout: float = 30.0, **kwargs ) -> Optional[requests.Response]: """ Make a safe HTTP request with circuit breaker and retry logic. Args: url: URL to request method: HTTP method (GET, POST, etc.) circuit_breaker: Circuit breaker instance error_tracker: Error tracker instance retry_count: Number of retries timeout: Request timeout in seconds **kwargs: Additional arguments for requests Returns: Response object or None if request failed """ # Check circuit breaker if circuit_breaker and not circuit_breaker.allow_request(): logger.warning(f"Circuit breaker prevented request to {url}") return None # Set default timeout kwargs.setdefault("timeout", timeout) # Make request with retry response = None success = False try: for attempt in range(retry_count + 1): try: response = requests.request(method, url, **kwargs) response.raise_for_status() success = True break except NETWORK_EXCEPTIONS as e: if attempt < retry_count: delay = 2 ** attempt + random.uniform(0, 1) logger.warning(f"Request to {url} failed (attempt {attempt+1}/{retry_count+1}): {str(e)}. Retrying in {delay:.2f}s") time.sleep(delay) else: logger.error(f"Request to {url} failed after {retry_count+1} attempts: {str(e)}") raise except Exception as e: logger.error(f"Error making request to {url}: {str(e)}") success = False # Update circuit breaker and error tracker if circuit_breaker: if success: circuit_breaker.record_success() else: circuit_breaker.record_failure() if error_tracker: error_tracker.record_request(success) return response if success else None ``` -------------------------------------------------------------------------------- /src/openscad_wrapper/wrapper.py: -------------------------------------------------------------------------------- ```python import os import subprocess import uuid import logging from typing import Dict, Any, List, Tuple, Optional logger = logging.getLogger(__name__) class OpenSCADWrapper: """ Wrapper for OpenSCAD command-line interface. Provides methods to generate SCAD code, STL files, and preview images. """ def __init__(self, scad_dir: str, output_dir: str): """ Initialize the OpenSCAD wrapper. Args: scad_dir: Directory to store SCAD files output_dir: Directory to store output files (STL, PNG) """ self.scad_dir = scad_dir self.output_dir = output_dir self.stl_dir = os.path.join(output_dir, "stl") self.preview_dir = os.path.join(output_dir, "preview") # Create directories if they don't exist os.makedirs(self.scad_dir, exist_ok=True) os.makedirs(self.stl_dir, exist_ok=True) os.makedirs(self.preview_dir, exist_ok=True) # Basic shape templates self.shape_templates = { "cube": self._cube_template, "sphere": self._sphere_template, "cylinder": self._cylinder_template, "box": self._box_template, "rounded_box": self._rounded_box_template, } def generate_scad_code(self, model_type: str, parameters: Dict[str, Any]) -> str: """ Generate OpenSCAD code for a given model type and parameters. Args: model_type: Type of model to generate (cube, sphere, cylinder, etc.) parameters: Dictionary of parameters for the model Returns: Path to the generated SCAD file """ model_id = str(uuid.uuid4()) scad_file = os.path.join(self.scad_dir, f"{model_id}.scad") # Get the template function for the model type template_func = self.shape_templates.get(model_type) if not template_func: raise ValueError(f"Unsupported model type: {model_type}") # Generate SCAD code using the template scad_code = template_func(parameters) # Write SCAD code to file with open(scad_file, 'w') as f: f.write(scad_code) logger.info(f"Generated SCAD file: {scad_file}") return scad_file def generate_scad(self, scad_code: str, model_id: str) -> str: """ Save OpenSCAD code to a file with a specific model ID. Args: scad_code: OpenSCAD code to save model_id: ID to use for the file name Returns: Path to the saved SCAD file """ scad_file = os.path.join(self.scad_dir, f"{model_id}.scad") # Write SCAD code to file with open(scad_file, 'w') as f: f.write(scad_code) logger.info(f"Generated SCAD file: {scad_file}") return scad_file def update_scad_code(self, model_id: str, parameters: Dict[str, Any]) -> str: """ Update an existing SCAD file with new parameters. Args: model_id: ID of the model to update parameters: New parameters for the model Returns: Path to the updated SCAD file """ scad_file = os.path.join(self.scad_dir, f"{model_id}.scad") if not os.path.exists(scad_file): raise FileNotFoundError(f"SCAD file not found: {scad_file}") # Read the existing SCAD file to determine its type with open(scad_file, 'r') as f: scad_code = f.read() # Determine model type from the code (simplified approach) model_type = None for shape_type in self.shape_templates: if shape_type in scad_code.lower(): model_type = shape_type break if not model_type: raise ValueError("Could not determine model type from existing SCAD file") # Generate new SCAD code new_scad_code = self.shape_templates[model_type](parameters) # Write updated SCAD code to file with open(scad_file, 'w') as f: f.write(new_scad_code) logger.info(f"Updated SCAD file: {scad_file}") return scad_file def generate_stl(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> str: """ Generate an STL file from a SCAD file. Args: scad_file: Path to the SCAD file parameters: Optional parameters to override in the SCAD file Returns: Path to the generated STL file """ model_id = os.path.basename(scad_file).split('.')[0] stl_file = os.path.join(self.stl_dir, f"{model_id}.stl") # Build command cmd = ["openscad", "-o", stl_file] # Add parameters if provided if parameters: for key, value in parameters.items(): cmd.extend(["-D", f"{key}={value}"]) # Add input file cmd.append(scad_file) # Run OpenSCAD try: result = subprocess.run(cmd, check=True, capture_output=True, text=True) logger.info(f"Generated STL file: {stl_file}") logger.debug(result.stdout) return stl_file except subprocess.CalledProcessError as e: logger.error(f"Error generating STL file: {e.stderr}") raise RuntimeError(f"Failed to generate STL file: {e.stderr}") def generate_preview(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None, camera_position: str = "0,0,0,0,0,0,50", image_size: str = "800,600") -> str: """ Generate a preview image from a SCAD file. Args: scad_file: Path to the SCAD file parameters: Optional parameters to override in the SCAD file camera_position: Camera position in format "tx,ty,tz,rx,ry,rz,dist" image_size: Image size in format "width,height" Returns: Path to the generated preview image """ model_id = os.path.basename(scad_file).split('.')[0] preview_file = os.path.join(self.preview_dir, f"{model_id}.png") # Build command cmd = ["openscad", "--camera", camera_position, "--imgsize", image_size, "-o", preview_file] # Add parameters if provided if parameters: for key, value in parameters.items(): cmd.extend(["-D", f"{key}={value}"]) # Add input file cmd.append(scad_file) # Run OpenSCAD try: result = subprocess.run(cmd, check=True, capture_output=True, text=True) logger.info(f"Generated preview image: {preview_file}") logger.debug(result.stdout) return preview_file except subprocess.CalledProcessError as e: logger.error(f"Error generating preview image: {e.stderr}") # Since we know there might be issues with headless rendering, we'll create a placeholder logger.warning("Using placeholder image due to rendering error") return self._create_placeholder_image(preview_file) def _create_placeholder_image(self, output_path: str) -> str: """Create a simple placeholder image when rendering fails.""" try: from PIL import Image, ImageDraw, ImageFont # Create a blank image img = Image.new('RGB', (800, 600), color=(240, 240, 240)) draw = ImageDraw.Draw(img) # Add text draw.text((400, 300), "Preview not available", fill=(0, 0, 0)) # Save the image img.save(output_path) return output_path except Exception as e: logger.error(f"Error creating placeholder image: {str(e)}") # If all else fails, return the path anyway return output_path def generate_multi_angle_previews(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, str]: """ Generate preview images from multiple angles for a SCAD file. Args: scad_file: Path to the SCAD file parameters: Optional parameters to override in the SCAD file Returns: Dictionary mapping view names to preview image paths """ # Define camera positions for different views camera_positions = { "front": "0,0,0,0,0,0,50", "top": "0,0,0,90,0,0,50", "right": "0,0,0,0,90,0,50", "perspective": "40,30,30,55,0,25,100" } # Generate preview for each view previews = {} for view, camera_position in camera_positions.items(): try: model_id = os.path.basename(scad_file).split('.')[0] preview_file = os.path.join(self.preview_dir, f"{model_id}_{view}.png") # Build command cmd = ["openscad", "--camera", camera_position, "--imgsize", "800,600", "-o", preview_file] # Add parameters if provided if parameters: for key, value in parameters.items(): cmd.extend(["-D", f"{key}={value}"]) # Add input file cmd.append(scad_file) # Run OpenSCAD result = subprocess.run(cmd, check=True, capture_output=True, text=True) logger.info(f"Generated {view} preview: {preview_file}") previews[view] = preview_file except subprocess.CalledProcessError as e: logger.error(f"Error generating {view} preview: {e.stderr}") # Create a placeholder image for this view preview_file = os.path.join(self.preview_dir, f"{model_id}_{view}.png") previews[view] = self._create_placeholder_image(preview_file) return previews # Template functions for basic shapes def _cube_template(self, params: Dict[str, Any]) -> str: """Generate SCAD code for a cube.""" size_x = params.get('width', 10) size_y = params.get('depth', 10) size_z = params.get('height', 10) center = params.get('center', 'false').lower() == 'true' return f"""// Cube // Parameters: // width = {size_x} // depth = {size_y} // height = {size_z} // center = {str(center).lower()} width = {size_x}; depth = {size_y}; height = {size_z}; center = {str(center).lower()}; cube([width, depth, height], center=center); """ def _sphere_template(self, params: Dict[str, Any]) -> str: """Generate SCAD code for a sphere.""" radius = params.get('radius', 10) segments = params.get('segments', 32) return f"""// Sphere // Parameters: // radius = {radius} // segments = {segments} radius = {radius}; $fn = {segments}; sphere(r=radius); """ def _cylinder_template(self, params: Dict[str, Any]) -> str: """Generate SCAD code for a cylinder.""" radius = params.get('radius', 10) height = params.get('height', 20) center = params.get('center', 'false').lower() == 'true' segments = params.get('segments', 32) return f"""// Cylinder // Parameters: // radius = {radius} // height = {height} // center = {str(center).lower()} // segments = {segments} radius = {radius}; height = {height}; center = {str(center).lower()}; $fn = {segments}; cylinder(h=height, r=radius, center=center); """ def _box_template(self, params: Dict[str, Any]) -> str: """Generate SCAD code for a hollow box.""" width = params.get('width', 30) depth = params.get('depth', 20) height = params.get('height', 15) thickness = params.get('thickness', 2) return f"""// Hollow Box // Parameters: // width = {width} // depth = {depth} // height = {height} // thickness = {thickness} width = {width}; depth = {depth}; height = {height}; thickness = {thickness}; module box(width, depth, height, thickness) {{ difference() {{ cube([width, depth, height]); translate([thickness, thickness, thickness]) cube([width - 2*thickness, depth - 2*thickness, height - thickness]); }} }} box(width, depth, height, thickness); """ def _rounded_box_template(self, params: Dict[str, Any]) -> str: """Generate SCAD code for a rounded box.""" width = params.get('width', 30) depth = params.get('depth', 20) height = params.get('height', 15) radius = params.get('radius', 3) segments = params.get('segments', 32) return f"""// Rounded Box // Parameters: // width = {width} // depth = {depth} // height = {height} // radius = {radius} // segments = {segments} width = {width}; depth = {depth}; height = {height}; radius = {radius}; $fn = {segments}; module rounded_box(width, depth, height, radius) {{ hull() {{ translate([radius, radius, radius]) sphere(r=radius); translate([width-radius, radius, radius]) sphere(r=radius); translate([radius, depth-radius, radius]) sphere(r=radius); translate([width-radius, depth-radius, radius]) sphere(r=radius); translate([radius, radius, height-radius]) sphere(r=radius); translate([width-radius, radius, height-radius]) sphere(r=radius); translate([radius, depth-radius, height-radius]) sphere(r=radius); translate([width-radius, depth-radius, height-radius]) sphere(r=radius); }} }} rounded_box(width, depth, height, radius); """ ``` -------------------------------------------------------------------------------- /src/remote/cuda_mvs_client.py: -------------------------------------------------------------------------------- ```python """ Client for remote CUDA Multi-View Stereo processing. This module provides a client to connect to a remote CUDA MVS server within the LAN for processing multi-view images into 3D models. """ import os import json import logging import requests import base64 from typing import Dict, List, Optional, Any, Union from pathlib import Path import uuid # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CUDAMVSClient: """ Client for connecting to a remote CUDA Multi-View Stereo server. This client handles: 1. Discovering available CUDA MVS servers on the LAN 2. Uploading images to the server 3. Requesting 3D reconstruction 4. Downloading the resulting 3D models 5. Monitoring job status """ def __init__( self, server_url: Optional[str] = None, api_key: Optional[str] = None, output_dir: str = "output/models", discovery_port: int = 8765, connection_timeout: int = 10, upload_chunk_size: int = 1024 * 1024, # 1MB chunks ): """ Initialize the CUDA MVS client. Args: server_url: URL of the CUDA MVS server (if known) api_key: API key for authentication (if required) output_dir: Directory to save downloaded models discovery_port: Port used for server discovery connection_timeout: Timeout for server connections in seconds upload_chunk_size: Chunk size for file uploads in bytes """ self.server_url = server_url self.api_key = api_key self.output_dir = output_dir self.discovery_port = discovery_port self.connection_timeout = connection_timeout self.upload_chunk_size = upload_chunk_size # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Initialize session for connection pooling self.session = requests.Session() if api_key: self.session.headers.update({"Authorization": f"Bearer {api_key}"}) def discover_servers(self) -> List[Dict[str, Any]]: """ Discover CUDA MVS servers on the local network. Returns: List of dictionaries containing server information: [ { "server_id": "unique-server-id", "name": "CUDA MVS Server 1", "url": "http://192.168.1.100:8765", "capabilities": { "max_images": 50, "max_resolution": 4096, "supported_formats": ["jpg", "png"], "gpu_info": "NVIDIA RTX 4090 24GB" }, "status": "available" }, ... ] """ import socket import json from zeroconf import ServiceBrowser, ServiceListener, Zeroconf discovered_servers = [] class CUDAMVSListener(ServiceListener): def add_service(self, zc, type_, name): info = zc.get_service_info(type_, name) if info: server_info = { "server_id": name.split('.')[0], "name": info.properties.get(b'name', b'Unknown').decode('utf-8'), "url": f"http://{socket.inet_ntoa(info.addresses[0])}:{info.port}", "capabilities": json.loads(info.properties.get(b'capabilities', b'{}').decode('utf-8')), "status": "available" } discovered_servers.append(server_info) logger.info(f"Discovered CUDA MVS server: {server_info['name']} at {server_info['url']}") try: zeroconf = Zeroconf() listener = CUDAMVSListener() browser = ServiceBrowser(zeroconf, "_cudamvs._tcp.local.", listener) # Wait for discovery (non-blocking in production code) import time time.sleep(2) # Give some time for discovery zeroconf.close() return discovered_servers except Exception as e: logger.error(f"Error discovering CUDA MVS servers: {e}") return [] def test_connection(self, server_url: Optional[str] = None) -> Dict[str, Any]: """ Test connection to a CUDA MVS server. Args: server_url: URL of the server to test (uses self.server_url if None) Returns: Dictionary with connection status and server information """ url = server_url or self.server_url if not url: return {"status": "error", "message": "No server URL provided"} try: response = self.session.get( f"{url}/api/status", timeout=self.connection_timeout ) if response.status_code == 200: return { "status": "success", "server_info": response.json(), "latency_ms": response.elapsed.total_seconds() * 1000 } else: return { "status": "error", "message": f"Server returned status code {response.status_code}", "details": response.text } except requests.exceptions.RequestException as e: return {"status": "error", "message": f"Connection error: {str(e)}"} def upload_images(self, image_paths: List[str]) -> Dict[str, Any]: """ Upload images to the CUDA MVS server. Args: image_paths: List of paths to images to upload Returns: Dictionary with upload status and job information """ if not self.server_url: return {"status": "error", "message": "No server URL configured"} # Create a new job try: response = self.session.post( f"{self.server_url}/api/jobs", json={"num_images": len(image_paths)}, timeout=self.connection_timeout ) if response.status_code != 201: return { "status": "error", "message": f"Failed to create job: {response.status_code}", "details": response.text } job_info = response.json() job_id = job_info["job_id"] # Upload each image for i, image_path in enumerate(image_paths): # Check if file exists if not os.path.exists(image_path): return { "status": "error", "message": f"Image file not found: {image_path}" } # Get file size for progress tracking file_size = os.path.getsize(image_path) # Prepare upload with open(image_path, "rb") as f: files = { "file": (os.path.basename(image_path), f, "image/jpeg" if image_path.endswith(".jpg") else "image/png") } metadata = { "image_index": i, "total_images": len(image_paths), "filename": os.path.basename(image_path) } response = self.session.post( f"{self.server_url}/api/jobs/{job_id}/images", files=files, data={"metadata": json.dumps(metadata)}, timeout=None # No timeout for uploads ) if response.status_code != 200: return { "status": "error", "message": f"Failed to upload image {i+1}/{len(image_paths)}: {response.status_code}", "details": response.text } logger.info(f"Uploaded image {i+1}/{len(image_paths)}: {os.path.basename(image_path)}") # Start processing response = self.session.post( f"{self.server_url}/api/jobs/{job_id}/process", timeout=self.connection_timeout ) if response.status_code != 202: return { "status": "error", "message": f"Failed to start processing: {response.status_code}", "details": response.text } return { "status": "success", "job_id": job_id, "message": f"Uploaded {len(image_paths)} images and started processing", "job_url": f"{self.server_url}/api/jobs/{job_id}" } except requests.exceptions.RequestException as e: return {"status": "error", "message": f"Upload error: {str(e)}"} def get_job_status(self, job_id: str) -> Dict[str, Any]: """ Get the status of a CUDA MVS job. Args: job_id: ID of the job to check Returns: Dictionary with job status information """ if not self.server_url: return {"status": "error", "message": "No server URL configured"} try: response = self.session.get( f"{self.server_url}/api/jobs/{job_id}", timeout=self.connection_timeout ) if response.status_code == 200: return { "status": "success", "job_info": response.json() } else: return { "status": "error", "message": f"Failed to get job status: {response.status_code}", "details": response.text } except requests.exceptions.RequestException as e: return {"status": "error", "message": f"Connection error: {str(e)}"} def download_model(self, job_id: str, output_format: str = "obj") -> Dict[str, Any]: """ Download a processed 3D model from the CUDA MVS server. Args: job_id: ID of the job to download output_format: Format of the model to download (obj, ply, etc.) Returns: Dictionary with download status and local file path """ if not self.server_url: return {"status": "error", "message": "No server URL configured"} # Check job status first status_result = self.get_job_status(job_id) if status_result["status"] != "success": return status_result job_info = status_result["job_info"] if job_info["status"] != "completed": return { "status": "error", "message": f"Job is not completed yet. Current status: {job_info['status']}", "job_info": job_info } # Download the model try: response = self.session.get( f"{self.server_url}/api/jobs/{job_id}/model?format={output_format}", stream=True, timeout=None # No timeout for downloads ) if response.status_code != 200: return { "status": "error", "message": f"Failed to download model: {response.status_code}", "details": response.text } # Create a unique filename model_id = job_info.get("model_id", str(uuid.uuid4())) output_path = os.path.join(self.output_dir, f"{model_id}.{output_format}") # Save the file with open(output_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) logger.info(f"Downloaded model to {output_path}") return { "status": "success", "model_id": model_id, "local_path": output_path, "format": output_format, "job_id": job_id } except requests.exceptions.RequestException as e: return {"status": "error", "message": f"Download error: {str(e)}"} def cancel_job(self, job_id: str) -> Dict[str, Any]: """ Cancel a running CUDA MVS job. Args: job_id: ID of the job to cancel Returns: Dictionary with cancellation status """ if not self.server_url: return {"status": "error", "message": "No server URL configured"} try: response = self.session.delete( f"{self.server_url}/api/jobs/{job_id}", timeout=self.connection_timeout ) if response.status_code == 200: return { "status": "success", "message": "Job cancelled successfully" } else: return { "status": "error", "message": f"Failed to cancel job: {response.status_code}", "details": response.text } except requests.exceptions.RequestException as e: return {"status": "error", "message": f"Connection error: {str(e)}"} def generate_model_from_images( self, image_paths: List[str], output_format: str = "obj", wait_for_completion: bool = True, poll_interval: int = 5 ) -> Dict[str, Any]: """ Complete workflow to generate a 3D model from images. Args: image_paths: List of paths to images output_format: Format of the output model wait_for_completion: Whether to wait for job completion poll_interval: Interval in seconds to poll for job status Returns: Dictionary with job status and model information if completed """ # Upload images and start processing upload_result = self.upload_images(image_paths) if upload_result["status"] != "success": return upload_result job_id = upload_result["job_id"] # If not waiting for completion, return the job info if not wait_for_completion: return upload_result # Poll for job completion import time while True: status_result = self.get_job_status(job_id) if status_result["status"] != "success": return status_result job_info = status_result["job_info"] if job_info["status"] == "completed": # Download the model return self.download_model(job_id, output_format) elif job_info["status"] == "failed": return { "status": "error", "message": "Job processing failed", "job_info": job_info } # Wait before polling again time.sleep(poll_interval) logger.info(f"Job {job_id} status: {job_info['status']}, progress: {job_info.get('progress', 0)}%") ``` -------------------------------------------------------------------------------- /src/printer_discovery/printer_discovery.py: -------------------------------------------------------------------------------- ```python import os import logging import socket import json import time import threading from typing import Dict, List, Any, Optional, Callable logger = logging.getLogger(__name__) class PrinterDiscovery: """ Discovers 3D printers on the network and provides interfaces for direct printing. """ def __init__(self): """Initialize the printer discovery service.""" self.printers = {} # Dictionary of discovered printers self.discovery_thread = None self.discovery_stop_event = threading.Event() self.discovery_callback = None def start_discovery(self, callback: Optional[Callable[[Dict[str, Any]], None]] = None) -> None: """ Start discovering 3D printers on the network. Args: callback: Optional callback function to call when a printer is discovered """ if self.discovery_thread and self.discovery_thread.is_alive(): logger.warning("Printer discovery already running") return self.discovery_callback = callback self.discovery_stop_event.clear() self.discovery_thread = threading.Thread(target=self._discover_printers) self.discovery_thread.daemon = True self.discovery_thread.start() logger.info("Started printer discovery") def stop_discovery(self) -> None: """Stop discovering 3D printers.""" if self.discovery_thread and self.discovery_thread.is_alive(): self.discovery_stop_event.set() self.discovery_thread.join(timeout=2.0) logger.info("Stopped printer discovery") else: logger.warning("Printer discovery not running") def get_printers(self) -> Dict[str, Any]: """ Get the list of discovered printers. Returns: Dictionary of printer information """ return self.printers def _discover_printers(self) -> None: """Discover 3D printers on the network using various protocols.""" # This is a simplified implementation that simulates printer discovery # In a real implementation, you would use protocols like mDNS, SNMP, or OctoPrint API # Simulate discovering printers while not self.discovery_stop_event.is_set(): try: # Simulate network discovery self._discover_octoprint_printers() self._discover_prusa_printers() self._discover_ultimaker_printers() # Wait before next discovery cycle time.sleep(10) except Exception as e: logger.error(f"Error in printer discovery: {str(e)}") time.sleep(5) def _discover_octoprint_printers(self) -> None: """Discover OctoPrint servers on the network.""" # Simulate discovering OctoPrint servers # In a real implementation, you would use mDNS to discover OctoPrint instances # Simulate finding a printer printer_id = "octoprint_1" if printer_id not in self.printers: printer_info = { "id": printer_id, "name": "OctoPrint Printer", "type": "octoprint", "address": "192.168.1.100", "port": 80, "api_key": None, # Would need to be provided by user "status": "online", "capabilities": ["print", "status", "cancel"] } self.printers[printer_id] = printer_info if self.discovery_callback: self.discovery_callback(printer_info) logger.info(f"Discovered OctoPrint printer: {printer_info['name']}") def _discover_prusa_printers(self) -> None: """Discover Prusa printers on the network.""" # Simulate discovering Prusa printers # Simulate finding a printer printer_id = "prusa_1" if printer_id not in self.printers: printer_info = { "id": printer_id, "name": "Prusa MK3S", "type": "prusa", "address": "192.168.1.101", "port": 80, "status": "online", "capabilities": ["print", "status"] } self.printers[printer_id] = printer_info if self.discovery_callback: self.discovery_callback(printer_info) logger.info(f"Discovered Prusa printer: {printer_info['name']}") def _discover_ultimaker_printers(self) -> None: """Discover Ultimaker printers on the network.""" # Simulate discovering Ultimaker printers # Simulate finding a printer printer_id = "ultimaker_1" if printer_id not in self.printers: printer_info = { "id": printer_id, "name": "Ultimaker S5", "type": "ultimaker", "address": "192.168.1.102", "port": 80, "status": "online", "capabilities": ["print", "status", "cancel"] } self.printers[printer_id] = printer_info if self.discovery_callback: self.discovery_callback(printer_info) logger.info(f"Discovered Ultimaker printer: {printer_info['name']}") class PrinterInterface: """ Interface for communicating with 3D printers. """ def __init__(self, printer_discovery: PrinterDiscovery): """ Initialize the printer interface. Args: printer_discovery: Instance of PrinterDiscovery for finding printers """ self.printer_discovery = printer_discovery self.connected_printers = {} # Dictionary of connected printers def connect_to_printer(self, printer_id: str, credentials: Optional[Dict[str, Any]] = None) -> bool: """ Connect to a specific printer. Args: printer_id: ID of the printer to connect to credentials: Optional credentials for authentication Returns: True if connection successful, False otherwise """ printers = self.printer_discovery.get_printers() if printer_id not in printers: logger.error(f"Printer not found: {printer_id}") return False printer_info = printers[printer_id] # Create appropriate printer client based on type if printer_info["type"] == "octoprint": client = OctoPrintClient(printer_info, credentials) elif printer_info["type"] == "prusa": client = PrusaClient(printer_info, credentials) elif printer_info["type"] == "ultimaker": client = UltimakerClient(printer_info, credentials) else: logger.error(f"Unsupported printer type: {printer_info['type']}") return False # Connect to the printer if client.connect(): self.connected_printers[printer_id] = client return True else: return False def disconnect_from_printer(self, printer_id: str) -> bool: """ Disconnect from a specific printer. Args: printer_id: ID of the printer to disconnect from Returns: True if disconnection successful, False otherwise """ if printer_id not in self.connected_printers: logger.error(f"Not connected to printer: {printer_id}") return False client = self.connected_printers[printer_id] if client.disconnect(): del self.connected_printers[printer_id] return True else: return False def print_file(self, printer_id: str, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool: """ Send a file to a printer for printing. Args: printer_id: ID of the printer to print on file_path: Path to the STL file to print print_settings: Optional print settings Returns: True if print job started successfully, False otherwise """ if printer_id not in self.connected_printers: logger.error(f"Not connected to printer: {printer_id}") return False client = self.connected_printers[printer_id] return client.print_file(file_path, print_settings) def get_printer_status(self, printer_id: str) -> Dict[str, Any]: """ Get the status of a specific printer. Args: printer_id: ID of the printer to get status for Returns: Dictionary with printer status information """ if printer_id not in self.connected_printers: logger.error(f"Not connected to printer: {printer_id}") return {"error": "Not connected to printer"} client = self.connected_printers[printer_id] return client.get_status() def cancel_print(self, printer_id: str) -> bool: """ Cancel a print job on a specific printer. Args: printer_id: ID of the printer to cancel print on Returns: True if cancellation successful, False otherwise """ if printer_id not in self.connected_printers: logger.error(f"Not connected to printer: {printer_id}") return False client = self.connected_printers[printer_id] return client.cancel_print() class PrinterClient: """Base class for printer clients.""" def __init__(self, printer_info: Dict[str, Any], credentials: Optional[Dict[str, Any]] = None): """ Initialize the printer client. Args: printer_info: Information about the printer credentials: Optional credentials for authentication """ self.printer_info = printer_info self.credentials = credentials or {} self.connected = False def connect(self) -> bool: """ Connect to the printer. Returns: True if connection successful, False otherwise """ # Base implementation - should be overridden by subclasses self.connected = True return True def disconnect(self) -> bool: """ Disconnect from the printer. Returns: True if disconnection successful, False otherwise """ # Base implementation - should be overridden by subclasses self.connected = False return True def print_file(self, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool: """ Send a file to the printer for printing. Args: file_path: Path to the STL file to print print_settings: Optional print settings Returns: True if print job started successfully, False otherwise """ # Base implementation - should be overridden by subclasses if not self.connected: logger.error("Not connected to printer") return False logger.info(f"Printing file: {file_path}") return True def get_status(self) -> Dict[str, Any]: """ Get the status of the printer. Returns: Dictionary with printer status information """ # Base implementation - should be overridden by subclasses if not self.connected: return {"status": "disconnected"} return {"status": "connected"} def cancel_print(self) -> bool: """ Cancel the current print job. Returns: True if cancellation successful, False otherwise """ # Base implementation - should be overridden by subclasses if not self.connected: logger.error("Not connected to printer") return False logger.info("Cancelling print job") return True class OctoPrintClient(PrinterClient): """Client for OctoPrint printers.""" def connect(self) -> bool: """Connect to an OctoPrint server.""" try: # In a real implementation, you would use the OctoPrint API # to connect to the printer # Check if API key is provided if "api_key" not in self.credentials: logger.error("API key required for OctoPrint") return False # Simulate connection logger.info(f"Connected to OctoPrint server: {self.printer_info['address']}") self.connected = True return True except Exception as e: logger.error(f"Error connecting to OctoPrint server: {str(e)}") return False def print_file(self, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool: """Send a file to an OctoPrint server for printing.""" if not self.connected: logger.error("Not connected to OctoPrint server") return False try: # In a real implementation, you would use the OctoPrint API # to upload the file and start printing # Check if file exists if not os.path.exists(file_path): logger.error(f"File not found: {file_path}") return False # Simulate printing logger.info(f"Printing file on OctoPrint server: {file_path}") return True except Exception as e: logger.error(f"Error printing file on OctoPrint server: {str(e)}") return False def get_status(self) -> Dict[str, Any]: """Get the status of an OctoPrint server.""" if not self.connected: return {"status": "disconnected"} try: # In a real implementation, you would use the OctoPrint API # to get the printer status # Simulate status return { "status": "connected", "printer": { "state": "operational", "temperature": { "bed": {"actual": 60.0, "target": 60.0}, "tool0": {"actual": 210.0, "target": 210.0} } }, "job": { "file": {"name": "example.gcode"}, "progress": {"completion": 0.0, "printTime": 0, "printTimeLeft": 0} } } except Exception as e: logger.error(f"Error getting status from OctoPrint server: {str(e)}") return {"status": "error", "message": str(e)} class PrusaClient(PrinterClient): """Client for Prusa printers.""" def connect(self) -> bool: """Connect to a Prusa printer.""" try: # In a real implementation, you would use the Prusa API # to connect to the printer # Simulate connection logger.info(f"Connected to Prusa printer: {self.printer_info['address']}") self.connected = True return True except Exception as e: logger.error(f"Error connecting to Prusa printer: {str(e)}") return False class UltimakerClient(PrinterClient): """Client for Ultimaker printers.""" def connect(self) -> bool: """Connect to an Ultimaker printer.""" try: # In a real implementation, you would use the Ultimaker API # to connect to the printer # Simulate connection logger.info(f"Connected to Ultimaker printer: {self.printer_info['address']}") self.connected = True return True except Exception as e: logger.error(f"Error connecting to Ultimaker printer: {str(e)}") return False ``` -------------------------------------------------------------------------------- /src/nlp/parameter_extractor.py: -------------------------------------------------------------------------------- ```python import re import logging from typing import Dict, Any, Tuple, List, Optional import json logger = logging.getLogger(__name__) class ParameterExtractor: """ Extract parameters from natural language descriptions. Implements dialog flow for collecting specifications and translating them to OpenSCAD parameters. """ def __init__(self): """Initialize the parameter extractor.""" # Using only millimeters as per project requirements self.unit_conversions = { 'mm': 1.0 } # Shape recognition patterns with expanded vocabulary self.shape_patterns = { 'cube': r'\b(cube|box|square|rectangular|block|cuboid|brick)\b', 'sphere': r'\b(sphere|ball|round|circular|globe|orb)\b', 'cylinder': r'\b(cylinder|tube|pipe|rod|circular column|pillar|column)\b', 'box': r'\b(hollow box|container|case|enclosure|bin|chest|tray)\b', 'rounded_box': r'\b(rounded box|rounded container|rounded case|rounded enclosure|smooth box|rounded corners|chamfered box)\b', 'cone': r'\b(cone|pyramid|tapered cylinder|funnel)\b', 'torus': r'\b(torus|donut|ring|loop|circular ring)\b', 'prism': r'\b(prism|triangular prism|wedge|triangular shape)\b', 'custom': r'\b(custom|complex|special|unique|combined|composite)\b' } # Parameter recognition patterns with enhanced unit detection self.parameter_patterns = { 'width': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:wide|width|across|w)', 'height': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:high|height|tall|h)', 'depth': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:deep|depth|long|d|length)', 'radius': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:radius|r)', 'diameter': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:diameter|dia)', 'thickness': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:thick|thickness|t)', 'segments': r'(\d+)\s*(?:segments|sides|faces|facets|smoothness)', 'center': r'\b(centered|center|middle|origin)\b', 'angle': r'(\d+(?:\.\d+)?)\s*(?:deg|degree|degrees|°)?\s*(?:angle|rotation|rotate|tilt)', 'scale': r'(\d+(?:\.\d+)?)\s*(?:x|times|scale|scaling|factor)', 'resolution': r'(\d+(?:\.\d+)?)\s*(?:resolution|quality|detail)' } # Dialog state for multi-turn conversations self.dialog_state = {} def extract_parameters(self, description: str, model_type: Optional[str] = None, existing_parameters: Optional[Dict[str, Any]] = None) -> Tuple[str, Dict[str, Any]]: """ Extract model type and parameters from a natural language description. Args: description: Natural language description of the 3D object model_type: Optional model type for context (if already known) existing_parameters: Optional existing parameters for context (for modifications) Returns: Tuple of (model_type, parameters) """ # Use provided model_type or determine from description if model_type is None: model_type = self._determine_shape_type(description) # Start with existing parameters if provided parameters = existing_parameters.copy() if existing_parameters else {} # Extract parameters based on the shape type new_parameters = self._extract_shape_parameters(description, model_type) # Update parameters with newly extracted ones parameters.update(new_parameters) # Apply default parameters if needed parameters = self._apply_default_parameters(model_type, parameters) logger.info(f"Extracted model type: {model_type}, parameters: {parameters}") return model_type, parameters def extract_parameters_from_modifications(self, modifications: str, model_type: Optional[str] = None, existing_parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Extract parameters from modification description with contextual understanding. Args: modifications: Description of modifications to make model_type: Optional model type for context existing_parameters: Optional existing parameters for context Returns: Dictionary of parameters to update """ # Start with existing parameters if provided parameters = existing_parameters.copy() if existing_parameters else {} # Extract all possible parameters from the modifications new_parameters = {} for param_name, pattern in self.parameter_patterns.items(): matches = re.findall(pattern, modifications, re.IGNORECASE) if matches: # Take the last match if multiple are found value = matches[-1] if isinstance(value, tuple): value = value[0] # Extract from capture group new_parameters[param_name] = self._convert_to_mm(value, modifications) # Update parameters with newly extracted ones parameters.update(new_parameters) # Apply contextual understanding based on model type if model_type and not new_parameters: # If no explicit parameters were found, try to infer from context # For now, we'll just log this case since inference is complex logger.info(f"No explicit parameters found in '{modifications}', using existing parameters") logger.info(f"Extracted modification parameters: {parameters}") return parameters def get_missing_parameters(self, model_type: str, parameters: Dict[str, Any]) -> List[str]: """ Determine which required parameters are missing for a given model type. Args: model_type: Type of model parameters: Currently extracted parameters Returns: List of missing parameter names """ required_params = self._get_required_parameters(model_type) return [param for param in required_params if param not in parameters] def update_dialog_state(self, user_id: str, model_type: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None) -> None: """ Update the dialog state for a user. Args: user_id: Unique identifier for the user model_type: Optional model type to update parameters: Optional parameters to update """ if user_id not in self.dialog_state: self.dialog_state[user_id] = { 'model_type': None, 'parameters': {}, 'missing_parameters': [], 'current_question': None } if model_type: self.dialog_state[user_id]['model_type'] = model_type if parameters: self.dialog_state[user_id]['parameters'].update(parameters) # Update missing parameters if self.dialog_state[user_id]['model_type']: missing = self.get_missing_parameters( self.dialog_state[user_id]['model_type'], self.dialog_state[user_id]['parameters'] ) self.dialog_state[user_id]['missing_parameters'] = missing def get_next_question(self, user_id: str) -> Optional[str]: """ Get the next question to ask the user based on missing parameters. Args: user_id: Unique identifier for the user Returns: Question string or None if all parameters are collected """ if user_id not in self.dialog_state: return "What kind of 3D object would you like to create?" state = self.dialog_state[user_id] # If we don't have a model type yet, ask for it if not state['model_type']: state['current_question'] = "What kind of 3D object would you like to create?" return state['current_question'] # If we have missing parameters, ask for the first one if state['missing_parameters']: param = state['missing_parameters'][0] question = self._get_parameter_question(param, state['model_type']) state['current_question'] = question return question # All parameters collected state['current_question'] = None return None def process_answer(self, user_id: str, answer: str) -> Dict[str, Any]: """ Process a user's answer to a question. Args: user_id: Unique identifier for the user answer: User's answer to the current question Returns: Updated dialog state """ if user_id not in self.dialog_state: # Initialize with default state self.update_dialog_state(user_id) state = self.dialog_state[user_id] current_question = state['current_question'] # Process based on current question if not state['model_type']: # Trying to determine the model type model_type = self._determine_shape_type(answer) self.update_dialog_state(user_id, model_type=model_type) elif state['missing_parameters']: # Trying to collect a specific parameter param = state['missing_parameters'][0] value = self._extract_parameter_value(param, answer) if value is not None: self.update_dialog_state(user_id, parameters={param: value}) # Return the updated state return self.dialog_state[user_id] def _determine_shape_type(self, description: str) -> str: """ Determine the shape type from the description. Enhanced to support more shape types and better pattern matching. """ # Check for explicit shape mentions for shape, pattern in self.shape_patterns.items(): if re.search(pattern, description, re.IGNORECASE): logger.info(f"Detected shape type: {shape} from pattern: {pattern}") return shape # Try to infer shape from context if no explicit mention if re.search(r'\b(round|circular|sphere|ball)\b', description, re.IGNORECASE): return "sphere" elif re.search(r'\b(tall|column|pillar|rod)\b', description, re.IGNORECASE): return "cylinder" elif re.search(r'\b(box|container|case|enclosure)\b', description, re.IGNORECASE): # Determine if it should be a rounded box if re.search(r'\b(rounded|smooth|chamfered)\b', description, re.IGNORECASE): return "rounded_box" return "box" # Default to cube if no shape is detected logger.info("No specific shape detected, defaulting to cube") return "cube" def _extract_shape_parameters(self, description: str, model_type: str) -> Dict[str, Any]: """Extract parameters for a specific shape type.""" parameters = {} # Extract all possible parameters for param_name, pattern in self.parameter_patterns.items(): matches = re.findall(pattern, description, re.IGNORECASE) if matches: # Take the last match if multiple are found value = matches[-1] if isinstance(value, tuple): value = value[0] # Extract from capture group parameters[param_name] = self._convert_to_mm(value, description) # Special case for diameter -> radius conversion if 'diameter' in parameters and 'radius' not in parameters: parameters['radius'] = parameters['diameter'] / 2 del parameters['diameter'] # Special case for center parameter if 'center' in parameters: center_value = parameters['center'] if isinstance(center_value, (int, float)): # Convert numeric value to boolean string parameters['center'] = 'true' if center_value > 0 else 'false' else: # Convert string value to boolean string center_str = str(center_value).lower() parameters['center'] = 'true' if center_str in ['true', 'yes', 'y', '1'] else 'false' return parameters def _convert_to_mm(self, value_str: str, context: str) -> float: """ Convert a value to millimeters. As per project requirements, we only use millimeters for design. """ try: value = float(value_str) # Since we're only using millimeters, we just return the value directly # This simplifies the conversion logic while maintaining the function interface logger.info(f"Using value {value} in millimeters") return value except ValueError: logger.warning(f"Could not convert value to float: {value_str}") return 0.0 def _apply_default_parameters(self, model_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]: """Apply default parameters based on the model type.""" defaults = { 'cube': {'width': 10, 'depth': 10, 'height': 10, 'center': 'false'}, 'sphere': {'radius': 10, 'segments': 32}, 'cylinder': {'radius': 10, 'height': 20, 'center': 'false', 'segments': 32}, 'box': {'width': 30, 'depth': 20, 'height': 15, 'thickness': 2}, 'rounded_box': {'width': 30, 'depth': 20, 'height': 15, 'radius': 3, 'segments': 32}, 'cone': {'base_radius': 10, 'height': 20, 'center': 'false', 'segments': 32}, 'torus': {'major_radius': 20, 'minor_radius': 5, 'segments': 32}, 'prism': {'width': 20, 'height': 15, 'depth': 20, 'center': 'false'}, 'custom': {'width': 20, 'height': 20, 'depth': 20, 'center': 'false'} } # Get defaults for the model type model_defaults = defaults.get(model_type, {}) # Apply defaults for missing parameters for param, default_value in model_defaults.items(): if param not in parameters: parameters[param] = default_value return parameters def _get_required_parameters(self, model_type: str) -> List[str]: """Get the list of required parameters for a model type.""" required_params = { 'cube': ['width', 'depth', 'height'], 'sphere': ['radius'], 'cylinder': ['radius', 'height'], 'box': ['width', 'depth', 'height', 'thickness'], 'rounded_box': ['width', 'depth', 'height', 'radius'], 'cone': ['base_radius', 'height'], 'torus': ['major_radius', 'minor_radius'], 'prism': ['width', 'height', 'depth'], 'custom': ['width', 'height', 'depth'] } return required_params.get(model_type, []) def _get_parameter_question(self, param: str, model_type: str) -> str: """Get a question to ask for a specific parameter.""" questions = { 'width': f"What should be the width of the {model_type} in mm?", 'depth': f"What should be the depth of the {model_type} in mm?", 'height': f"What should be the height of the {model_type} in mm?", 'radius': f"What should be the radius of the {model_type} in mm?", 'thickness': f"What should be the wall thickness of the {model_type} in mm?", 'segments': f"How many segments should the {model_type} have for smoothness?", 'base_radius': f"What should be the base radius of the {model_type} in mm?", 'major_radius': f"What should be the major radius of the {model_type} in mm?", 'minor_radius': f"What should be the minor radius of the {model_type} in mm?", 'diameter': f"What should be the diameter of the {model_type} in mm?", 'angle': f"What should be the angle of the {model_type} in degrees?", 'scale': f"What should be the scale factor for the {model_type}?", 'resolution': f"What resolution should the {model_type} have (higher means more detailed)?", 'center': f"Should the {model_type} be centered? (yes/no)" } return questions.get(param, f"What should be the {param} of the {model_type}?") def _extract_parameter_value(self, param: str, answer: str) -> Optional[float]: """Extract a parameter value from an answer.""" pattern = self.parameter_patterns.get(param) if not pattern: # For parameters without specific patterns, try to extract any number pattern = r'(\d+(?:\.\d+)?)' matches = re.findall(pattern, answer, re.IGNORECASE) if matches: value = matches[-1] if isinstance(value, tuple): value = value[0] # Extract from capture group return self._convert_to_mm(value, answer) # Try to extract just a number matches = re.findall(r'(\d+(?:\.\d+)?)', answer) if matches: value = matches[-1] return self._convert_to_mm(value, answer) return None ``` -------------------------------------------------------------------------------- /src/remote/connection_manager.py: -------------------------------------------------------------------------------- ```python """ Connection manager for remote CUDA Multi-View Stereo processing. This module provides functionality to discover, connect to, and manage connections with remote CUDA MVS servers within the LAN. """ import os import json import logging import time import threading from typing import Dict, List, Optional, Any, Union, Callable import socket from zeroconf import ServiceBrowser, ServiceListener, Zeroconf from src.remote.cuda_mvs_client import CUDAMVSClient # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CUDAMVSConnectionManager: """ Connection manager for remote CUDA MVS servers. This class handles: 1. Discovering available CUDA MVS servers on the LAN 2. Managing connections to multiple servers 3. Load balancing across available servers 4. Monitoring server health and status 5. Automatic failover if a server becomes unavailable """ def __init__( self, api_key: Optional[str] = None, discovery_port: int = 8765, connection_timeout: int = 10, health_check_interval: int = 60, auto_discover: bool = True ): """ Initialize the connection manager. Args: api_key: API key for authentication (if required) discovery_port: Port used for server discovery connection_timeout: Timeout for server connections in seconds health_check_interval: Interval for health checks in seconds auto_discover: Whether to automatically discover servers on startup """ self.api_key = api_key self.discovery_port = discovery_port self.connection_timeout = connection_timeout self.health_check_interval = health_check_interval # Server tracking self.servers: Dict[str, Dict[str, Any]] = {} self.clients: Dict[str, CUDAMVSClient] = {} self.server_lock = threading.RLock() # Health check thread self.health_check_thread = None self.health_check_stop_event = threading.Event() # Discovery self.zeroconf = None self.browser = None # Start discovery if enabled if auto_discover: self.start_discovery() # Start health check thread self.start_health_check() def start_discovery(self): """ Start discovering CUDA MVS servers on the LAN. """ if self.zeroconf is not None: return try: self.zeroconf = Zeroconf() listener = CUDAMVSServiceListener(self) self.browser = ServiceBrowser(self.zeroconf, "_cudamvs._tcp.local.", listener) logger.info("Started CUDA MVS server discovery") except Exception as e: logger.error(f"Error starting discovery: {e}") def stop_discovery(self): """ Stop discovering CUDA MVS servers. """ if self.zeroconf is not None: try: self.zeroconf.close() self.zeroconf = None self.browser = None logger.info("Stopped CUDA MVS server discovery") except Exception as e: logger.error(f"Error stopping discovery: {e}") def start_health_check(self): """ Start the health check thread. """ if self.health_check_thread is not None and self.health_check_thread.is_alive(): return self.health_check_stop_event.clear() self.health_check_thread = threading.Thread( target=self._health_check_loop, daemon=True ) self.health_check_thread.start() logger.info("Started health check thread") def stop_health_check(self): """ Stop the health check thread. """ if self.health_check_thread is not None: self.health_check_stop_event.set() self.health_check_thread.join(timeout=5) self.health_check_thread = None logger.info("Stopped health check thread") def _health_check_loop(self): """ Health check loop that runs in a separate thread. """ while not self.health_check_stop_event.is_set(): try: self.check_all_servers() except Exception as e: logger.error(f"Error in health check: {e}") # Wait for the next check interval or until stopped self.health_check_stop_event.wait(self.health_check_interval) def add_server(self, server_info: Dict[str, Any]): """ Add a server to the manager. Args: server_info: Dictionary with server information """ server_id = server_info.get("server_id") if not server_id: logger.error("Cannot add server without server_id") return with self.server_lock: # Check if server already exists if server_id in self.servers: # Update existing server info self.servers[server_id].update(server_info) logger.info(f"Updated server: {server_info.get('name')} at {server_info.get('url')}") else: # Add new server self.servers[server_id] = server_info # Create client for the server self.clients[server_id] = CUDAMVSClient( server_url=server_info.get("url"), api_key=self.api_key, connection_timeout=self.connection_timeout ) logger.info(f"Added server: {server_info.get('name')} at {server_info.get('url')}") def remove_server(self, server_id: str): """ Remove a server from the manager. Args: server_id: ID of the server to remove """ with self.server_lock: if server_id in self.servers: server_info = self.servers.pop(server_id) if server_id in self.clients: del self.clients[server_id] logger.info(f"Removed server: {server_info.get('name')} at {server_info.get('url')}") def get_servers(self) -> List[Dict[str, Any]]: """ Get a list of all servers. Returns: List of dictionaries with server information """ with self.server_lock: return list(self.servers.values()) def get_server(self, server_id: str) -> Optional[Dict[str, Any]]: """ Get information about a specific server. Args: server_id: ID of the server Returns: Dictionary with server information or None if not found """ with self.server_lock: return self.servers.get(server_id) def get_client(self, server_id: str) -> Optional[CUDAMVSClient]: """ Get the client for a specific server. Args: server_id: ID of the server Returns: CUDAMVSClient instance or None if not found """ with self.server_lock: return self.clients.get(server_id) def get_best_server(self) -> Optional[str]: """ Get the ID of the best server to use based on availability and load. Returns: Server ID or None if no servers are available """ with self.server_lock: available_servers = [ server_id for server_id, server in self.servers.items() if server.get("status") == "available" ] if not available_servers: return None # For now, just return the first available server # In a more advanced implementation, this would consider # server load, capabilities, latency, etc. return available_servers[0] def check_server(self, server_id: str) -> Dict[str, Any]: """ Check the status of a specific server. Args: server_id: ID of the server to check Returns: Dictionary with server status information """ client = self.get_client(server_id) if not client: return {"status": "error", "message": f"Server {server_id} not found"} # Test connection result = client.test_connection() with self.server_lock: if server_id in self.servers: # Update server status if result["status"] == "success": self.servers[server_id]["status"] = "available" self.servers[server_id]["last_check"] = time.time() self.servers[server_id]["latency_ms"] = result.get("latency_ms") # Update capabilities if available if "server_info" in result and "capabilities" in result["server_info"]: self.servers[server_id]["capabilities"] = result["server_info"]["capabilities"] else: self.servers[server_id]["status"] = "unavailable" self.servers[server_id]["last_check"] = time.time() self.servers[server_id]["error"] = result.get("message") return result def check_all_servers(self) -> Dict[str, Dict[str, Any]]: """ Check the status of all servers. Returns: Dictionary mapping server IDs to status information """ results = {} with self.server_lock: server_ids = list(self.servers.keys()) for server_id in server_ids: results[server_id] = self.check_server(server_id) return results def discover_servers(self) -> List[Dict[str, Any]]: """ Manually discover CUDA MVS servers on the LAN. Returns: List of dictionaries containing server information """ # Create a temporary client to discover servers client = CUDAMVSClient( api_key=self.api_key, discovery_port=self.discovery_port, connection_timeout=self.connection_timeout ) discovered_servers = client.discover_servers() # Add discovered servers for server_info in discovered_servers: self.add_server(server_info) return discovered_servers def upload_images_to_best_server(self, image_paths: List[str]) -> Dict[str, Any]: """ Upload images to the best available server. Args: image_paths: List of paths to images to upload Returns: Dictionary with upload status and job information """ server_id = self.get_best_server() if not server_id: return {"status": "error", "message": "No available servers"} client = self.get_client(server_id) if not client: return {"status": "error", "message": f"Client for server {server_id} not found"} # Upload images result = client.upload_images(image_paths) # Add server information to result result["server_id"] = server_id result["server_name"] = self.servers[server_id].get("name") return result def generate_model_from_images( self, image_paths: List[str], output_format: str = "obj", wait_for_completion: bool = True, poll_interval: int = 5, server_id: Optional[str] = None ) -> Dict[str, Any]: """ Generate a 3D model from images using the best available server. Args: image_paths: List of paths to images output_format: Format of the output model wait_for_completion: Whether to wait for job completion poll_interval: Interval in seconds to poll for job status server_id: ID of the server to use (uses best server if None) Returns: Dictionary with job status and model information if completed """ # Get server to use if server_id is None: server_id = self.get_best_server() if not server_id: return {"status": "error", "message": "No available servers"} client = self.get_client(server_id) if not client: return {"status": "error", "message": f"Client for server {server_id} not found"} # Generate model result = client.generate_model_from_images( image_paths=image_paths, output_format=output_format, wait_for_completion=wait_for_completion, poll_interval=poll_interval ) # Add server information to result result["server_id"] = server_id result["server_name"] = self.servers[server_id].get("name") return result def get_job_status(self, job_id: str, server_id: str) -> Dict[str, Any]: """ Get the status of a job on a specific server. Args: job_id: ID of the job to check server_id: ID of the server Returns: Dictionary with job status information """ client = self.get_client(server_id) if not client: return {"status": "error", "message": f"Client for server {server_id} not found"} return client.get_job_status(job_id) def download_model(self, job_id: str, server_id: str, output_format: str = "obj") -> Dict[str, Any]: """ Download a processed 3D model from a specific server. Args: job_id: ID of the job to download server_id: ID of the server output_format: Format of the model to download Returns: Dictionary with download status and local file path """ client = self.get_client(server_id) if not client: return {"status": "error", "message": f"Client for server {server_id} not found"} return client.download_model(job_id, output_format) def cancel_job(self, job_id: str, server_id: str) -> Dict[str, Any]: """ Cancel a running job on a specific server. Args: job_id: ID of the job to cancel server_id: ID of the server Returns: Dictionary with cancellation status """ client = self.get_client(server_id) if not client: return {"status": "error", "message": f"Client for server {server_id} not found"} return client.cancel_job(job_id) def cleanup(self): """ Clean up resources. """ self.stop_health_check() self.stop_discovery() class CUDAMVSServiceListener(ServiceListener): """ Zeroconf service listener for CUDA MVS servers. """ def __init__(self, connection_manager: CUDAMVSConnectionManager): """ Initialize the service listener. Args: connection_manager: Connection manager to update with discovered servers """ self.connection_manager = connection_manager def add_service(self, zc: Zeroconf, type_: str, name: str): """ Called when a service is discovered. Args: zc: Zeroconf instance type_: Service type name: Service name """ info = zc.get_service_info(type_, name) if info: try: # Extract server information server_id = name.split('.')[0] server_name = info.properties.get(b'name', b'Unknown').decode('utf-8') # Get server URL addresses = info.parsed_addresses() if not addresses: return server_url = f"http://{addresses[0]}:{info.port}" # Parse capabilities capabilities = {} if b'capabilities' in info.properties: try: capabilities = json.loads(info.properties[b'capabilities'].decode('utf-8')) except json.JSONDecodeError: pass # Create server info server_info = { "server_id": server_id, "name": server_name, "url": server_url, "capabilities": capabilities, "status": "unknown", "discovered_at": time.time() } # Add server to connection manager self.connection_manager.add_server(server_info) except Exception as e: logger.error(f"Error processing discovered service: {e}") def remove_service(self, zc: Zeroconf, type_: str, name: str): """ Called when a service is removed. Args: zc: Zeroconf instance type_: Service type name: Service name """ try: server_id = name.split('.')[0] self.connection_manager.remove_server(server_id) except Exception as e: logger.error(f"Error removing service: {e}") def update_service(self, zc: Zeroconf, type_: str, name: str): """ Called when a service is updated. Args: zc: Zeroconf instance type_: Service type name: Service name """ self.add_service(zc, type_, name) ``` -------------------------------------------------------------------------------- /src/visualization/web_interface.py: -------------------------------------------------------------------------------- ```python import os import base64 import logging from typing import Dict, Any, List, Optional from fastapi import APIRouter, HTTPException, Request from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles logger = logging.getLogger(__name__) class WebInterface: """ Web interface for displaying model previews and managing 3D models. """ def __init__(self, app, static_dir: str, templates_dir: str, output_dir: str): """ Initialize the web interface. Args: app: FastAPI application static_dir: Directory for static files templates_dir: Directory for templates output_dir: Directory containing output files (STL, PNG) """ self.app = app self.static_dir = static_dir self.templates_dir = templates_dir self.output_dir = output_dir self.preview_dir = os.path.join(output_dir, "preview") self.stl_dir = os.path.join(output_dir, "stl") # Create directories if they don't exist os.makedirs(self.static_dir, exist_ok=True) os.makedirs(self.templates_dir, exist_ok=True) # Create router self.router = APIRouter(prefix="/ui", tags=["UI"]) # Set up static files self.app.mount("/static", StaticFiles(directory=static_dir), name="static") # Set up templates self.templates = Jinja2Templates(directory=templates_dir) # Register routes self._register_routes() # Create template files self._create_template_files() # Create static files self._create_static_files() def _register_routes(self): """Register routes for the web interface.""" # Home page @self.router.get("/", response_class=HTMLResponse) async def home(request: Request): return self.templates.TemplateResponse("index.html", {"request": request}) # Model preview page @self.router.get("/preview/{model_id}", response_class=HTMLResponse) async def preview(request: Request, model_id: str): # Check if preview exists preview_file = os.path.join(self.preview_dir, f"{model_id}.png") if not os.path.exists(preview_file): raise HTTPException(status_code=404, detail="Preview not found") # Get multi-angle previews if they exist angles = ["front", "top", "right", "perspective"] previews = {} for angle in angles: angle_file = os.path.join(self.preview_dir, f"{model_id}_{angle}.png") if os.path.exists(angle_file): previews[angle] = f"/api/preview/{model_id}_{angle}" # If no multi-angle previews, use the main preview if not previews: previews["main"] = f"/api/preview/{model_id}" # Check if STL exists stl_file = os.path.join(self.stl_dir, f"{model_id}.stl") stl_url = f"/api/stl/{model_id}" if os.path.exists(stl_file) else None return self.templates.TemplateResponse( "preview.html", { "request": request, "model_id": model_id, "previews": previews, "stl_url": stl_url } ) # List all models @self.router.get("/models", response_class=HTMLResponse) async def list_models(request: Request): # Get all STL files stl_files = [] if os.path.exists(self.stl_dir): stl_files = [f for f in os.listdir(self.stl_dir) if f.endswith(".stl")] # Extract model IDs model_ids = [os.path.splitext(f)[0] for f in stl_files] # Get preview URLs models = [] for model_id in model_ids: preview_file = os.path.join(self.preview_dir, f"{model_id}.png") preview_url = f"/api/preview/{model_id}" if os.path.exists(preview_file) else None stl_url = f"/api/stl/{model_id}" models.append({ "id": model_id, "preview_url": preview_url, "stl_url": stl_url }) return self.templates.TemplateResponse( "models.html", {"request": request, "models": models} ) # API endpoints for serving files # Serve preview image @self.app.get("/api/preview/{preview_id}") async def get_preview(preview_id: str): preview_file = os.path.join(self.preview_dir, f"{preview_id}.png") if not os.path.exists(preview_file): raise HTTPException(status_code=404, detail="Preview not found") # Return the file with open(preview_file, "rb") as f: content = f.read() return { "content": base64.b64encode(content).decode("utf-8"), "content_type": "image/png" } # Serve STL file @self.app.get("/api/stl/{model_id}") async def get_stl(model_id: str): stl_file = os.path.join(self.stl_dir, f"{model_id}.stl") if not os.path.exists(stl_file): raise HTTPException(status_code=404, detail="STL file not found") # Return the file with open(stl_file, "rb") as f: content = f.read() return { "content": base64.b64encode(content).decode("utf-8"), "content_type": "application/octet-stream", "filename": f"{model_id}.stl" } # Register the router with the app self.app.include_router(self.router) def _create_template_files(self): """Create template files for the web interface.""" # Create base template base_template = """<!DOCTYPE html> <html> <head> <meta charset="utf-8"> <meta name="viewport" content="width=device-width, initial-scale=1"> <title>{% block title %}OpenSCAD MCP Server{% endblock %}</title> <link rel="stylesheet" href="/static/styles.css"> </head> <body> <header> <h1>OpenSCAD MCP Server</h1> <nav> <ul> <li><a href="/ui/">Home</a></li> <li><a href="/ui/models">Models</a></li> </ul> </nav> </header> <main> {% block content %}{% endblock %} </main> <footer> <p>OpenSCAD MCP Server - Model Context Protocol Implementation</p> </footer> <script src="/static/script.js"></script> </body> </html> """ # Create index template index_template = """{% extends "base.html" %} {% block title %}OpenSCAD MCP Server - Home{% endblock %} {% block content %} <section class="hero"> <h2>Welcome to OpenSCAD MCP Server</h2> <p>A Model Context Protocol server for generating 3D models with OpenSCAD</p> </section> <section class="features"> <div class="feature"> <h3>Natural Language Processing</h3> <p>Describe 3D objects in natural language and get parametric models</p> </div> <div class="feature"> <h3>Preview Generation</h3> <p>See your models from multiple angles before exporting</p> </div> <div class="feature"> <h3>STL Export</h3> <p>Generate STL files ready for 3D printing</p> </div> </section> {% endblock %} """ # Create preview template preview_template = """{% extends "base.html" %} {% block title %}Model Preview - {{ model_id }}{% endblock %} {% block content %} <section class="model-preview"> <h2>Model Preview: {{ model_id }}</h2> <div class="preview-container"> {% for angle, url in previews.items() %} <div class="preview-angle"> <h3>{{ angle|title }} View</h3> <img src="{{ url }}" alt="{{ angle }} view of {{ model_id }}" class="preview-image" data-angle="{{ angle }}"> </div> {% endfor %} </div> {% if stl_url %} <div class="download-container"> <a href="{{ stl_url }}" class="download-button" download="{{ model_id }}.stl">Download STL</a> </div> {% endif %} </section> {% endblock %} """ # Create models template models_template = """{% extends "base.html" %} {% block title %}All Models{% endblock %} {% block content %} <section class="models-list"> <h2>All Models</h2> {% if models %} <div class="models-grid"> {% for model in models %} <div class="model-card"> <h3>{{ model.id }}</h3> {% if model.preview_url %} <img src="{{ model.preview_url }}" alt="Preview of {{ model.id }}" class="model-thumbnail"> {% else %} <div class="no-preview">No preview available</div> {% endif %} <div class="model-actions"> <a href="/ui/preview/{{ model.id }}" class="view-button">View</a> <a href="{{ model.stl_url }}" class="download-button" download="{{ model.id }}.stl">Download</a> </div> </div> {% endfor %} </div> {% else %} <p>No models found.</p> {% endif %} </section> {% endblock %} """ # Write templates to files os.makedirs(self.templates_dir, exist_ok=True) with open(os.path.join(self.templates_dir, "base.html"), "w") as f: f.write(base_template) with open(os.path.join(self.templates_dir, "index.html"), "w") as f: f.write(index_template) with open(os.path.join(self.templates_dir, "preview.html"), "w") as f: f.write(preview_template) with open(os.path.join(self.templates_dir, "models.html"), "w") as f: f.write(models_template) def _create_static_files(self): """Create static files for the web interface.""" # Create CSS file css = """/* Base styles */ * { box-sizing: border-box; margin: 0; padding: 0; } body { font-family: Arial, sans-serif; line-height: 1.6; color: #333; background-color: #f4f4f4; } header { background-color: #333; color: #fff; padding: 1rem; } header h1 { margin-bottom: 0.5rem; } nav ul { display: flex; list-style: none; } nav ul li { margin-right: 1rem; } nav ul li a { color: #fff; text-decoration: none; } nav ul li a:hover { text-decoration: underline; } main { max-width: 1200px; margin: 0 auto; padding: 2rem; } footer { background-color: #333; color: #fff; text-align: center; padding: 1rem; margin-top: 2rem; } /* Home page */ .hero { text-align: center; margin-bottom: 2rem; } .hero h2 { font-size: 2rem; margin-bottom: 1rem; } .features { display: flex; justify-content: space-between; flex-wrap: wrap; } .feature { flex: 1; background-color: #fff; padding: 1.5rem; margin: 0.5rem; border-radius: 5px; box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1); } .feature h3 { margin-bottom: 1rem; } /* Preview page */ .model-preview h2 { margin-bottom: 1.5rem; } .preview-container { display: flex; flex-wrap: wrap; gap: 1rem; margin-bottom: 1.5rem; } .preview-angle { flex: 1; min-width: 300px; background-color: #fff; padding: 1rem; border-radius: 5px; box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1); } .preview-angle h3 { margin-bottom: 0.5rem; } .preview-image { width: 100%; height: auto; border: 1px solid #ddd; } .download-container { text-align: center; margin-top: 1rem; } .download-button { display: inline-block; background-color: #4CAF50; color: white; padding: 0.5rem 1rem; text-decoration: none; border-radius: 4px; } .download-button:hover { background-color: #45a049; } /* Models page */ .models-grid { display: grid; grid-template-columns: repeat(auto-fill, minmax(250px, 1fr)); gap: 1.5rem; } .model-card { background-color: #fff; border-radius: 5px; box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1); overflow: hidden; } .model-card h3 { padding: 1rem; background-color: #f8f8f8; border-bottom: 1px solid #eee; } .model-thumbnail { width: 100%; height: 200px; object-fit: contain; background-color: #f4f4f4; } .no-preview { width: 100%; height: 200px; display: flex; align-items: center; justify-content: center; background-color: #f4f4f4; color: #999; } .model-actions { display: flex; padding: 1rem; } .view-button, .download-button { flex: 1; text-align: center; padding: 0.5rem; text-decoration: none; border-radius: 4px; margin: 0 0.25rem; } .view-button { background-color: #2196F3; color: white; } .view-button:hover { background-color: #0b7dda; } """ # Create JavaScript file js = """// JavaScript for OpenSCAD MCP Server web interface document.addEventListener('DOMContentLoaded', function() { // Handle image loading errors const images = document.querySelectorAll('img'); images.forEach(img => { img.onerror = function() { this.src = '/static/placeholder.png'; }; }); // Handle STL download const downloadButtons = document.querySelectorAll('.download-button'); downloadButtons.forEach(button => { button.addEventListener('click', async function(e) { e.preventDefault(); const url = this.getAttribute('href'); const filename = this.hasAttribute('download') ? this.getAttribute('download') : 'model.stl'; try { const response = await fetch(url); const data = await response.json(); // Decode base64 content const content = atob(data.content); // Convert to Blob const bytes = new Uint8Array(content.length); for (let i = 0; i < content.length; i++) { bytes[i] = content.charCodeAt(i); } const blob = new Blob([bytes], { type: data.content_type }); // Create download link const downloadLink = document.createElement('a'); downloadLink.href = URL.createObjectURL(blob); downloadLink.download = data.filename || filename; // Trigger download document.body.appendChild(downloadLink); downloadLink.click(); document.body.removeChild(downloadLink); } catch (error) { console.error('Error downloading file:', error); alert('Error downloading file. Please try again.'); } }); }); // Handle preview images const previewImages = document.querySelectorAll('.preview-image'); previewImages.forEach(img => { img.addEventListener('click', function() { const url = this.getAttribute('src'); const angle = this.getAttribute('data-angle'); // Create modal for larger view const modal = document.createElement('div'); modal.className = 'preview-modal'; modal.innerHTML = ` <div class="modal-content"> <span class="close-button">×</span> <h3>${angle ? angle.charAt(0).toUpperCase() + angle.slice(1) + ' View' : 'Preview'}</h3> <img src="${url}" alt="Preview"> </div> `; // Add modal styles modal.style.position = 'fixed'; modal.style.top = '0'; modal.style.left = '0'; modal.style.width = '100%'; modal.style.height = '100%'; modal.style.backgroundColor = 'rgba(0,0,0,0.7)'; modal.style.display = 'flex'; modal.style.alignItems = 'center'; modal.style.justifyContent = 'center'; modal.style.zIndex = '1000'; const modalContent = modal.querySelector('.modal-content'); modalContent.style.backgroundColor = '#fff'; modalContent.style.padding = '20px'; modalContent.style.borderRadius = '5px'; modalContent.style.maxWidth = '90%'; modalContent.style.maxHeight = '90%'; modalContent.style.overflow = 'auto'; const closeButton = modal.querySelector('.close-button'); closeButton.style.float = 'right'; closeButton.style.fontSize = '1.5rem'; closeButton.style.fontWeight = 'bold'; closeButton.style.cursor = 'pointer'; const modalImg = modal.querySelector('img'); modalImg.style.maxWidth = '100%'; modalImg.style.maxHeight = '70vh'; modalImg.style.display = 'block'; modalImg.style.margin = '0 auto'; // Add modal to body document.body.appendChild(modal); // Close modal when clicking close button or outside the modal closeButton.addEventListener('click', function() { document.body.removeChild(modal); }); modal.addEventListener('click', function(e) { if (e.target === modal) { document.body.removeChild(modal); } }); }); }); }); """ # Create placeholder image placeholder_svg = """<svg xmlns="http://www.w3.org/2000/svg" width="800" height="600" viewBox="0 0 800 600"> <rect width="800" height="600" fill="#f0f0f0"/> <text x="400" y="300" font-family="Arial" font-size="24" text-anchor="middle" fill="#999">Preview not available</text> </svg>""" # Write static files os.makedirs(self.static_dir, exist_ok=True) with open(os.path.join(self.static_dir, "styles.css"), "w") as f: f.write(css) with open(os.path.join(self.static_dir, "script.js"), "w") as f: f.write(js) with open(os.path.join(self.static_dir, "placeholder.svg"), "w") as f: f.write(placeholder_svg) ``` -------------------------------------------------------------------------------- /src/remote/cuda_mvs_server.py: -------------------------------------------------------------------------------- ```python """ Server for remote CUDA Multi-View Stereo processing. This module provides a server implementation that can be deployed on a machine with CUDA capabilities to process multi-view images into 3D models remotely. """ import os import sys import json import uuid import logging import argparse import tempfile import subprocess from typing import Dict, List, Optional, Any, Union from pathlib import Path import shutil import time from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks, Depends from fastapi.responses import FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.security import APIKeyHeader import uvicorn from pydantic import BaseModel, Field from zeroconf import ServiceInfo, Zeroconf # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Constants DEFAULT_PORT = 8765 DEFAULT_HOST = "0.0.0.0" DEFAULT_CUDA_MVS_PATH = "/opt/cuda-multi-view-stereo" DEFAULT_OUTPUT_DIR = "output" DEFAULT_MAX_JOBS = 5 DEFAULT_MAX_IMAGES_PER_JOB = 50 DEFAULT_JOB_TIMEOUT = 3600 # 1 hour # Models class JobStatus(BaseModel): """Job status model.""" job_id: str status: str = "created" # created, uploading, processing, completed, failed created_at: float = Field(default_factory=time.time) updated_at: float = Field(default_factory=time.time) num_images: int = 0 images_uploaded: int = 0 progress: float = 0.0 error_message: Optional[str] = None model_id: Optional[str] = None output_dir: Optional[str] = None point_cloud_file: Optional[str] = None obj_file: Optional[str] = None processing_time: Optional[float] = None class ServerConfig(BaseModel): """Server configuration model.""" cuda_mvs_path: str = DEFAULT_CUDA_MVS_PATH output_dir: str = DEFAULT_OUTPUT_DIR max_jobs: int = DEFAULT_MAX_JOBS max_images_per_job: int = DEFAULT_MAX_IMAGES_PER_JOB job_timeout: int = DEFAULT_JOB_TIMEOUT api_key: Optional[str] = None server_name: str = "CUDA MVS Server" advertise_service: bool = True gpu_info: Optional[str] = None class JobRequest(BaseModel): """Job creation request model.""" num_images: int class ProcessRequest(BaseModel): """Process job request model.""" reconstruction_quality: str = "normal" # low, normal, high output_formats: List[str] = ["obj", "ply"] class ServerInfo(BaseModel): """Server information model.""" server_id: str name: str version: str = "1.0.0" status: str = "running" capabilities: Dict[str, Any] jobs: Dict[str, Any] uptime: float # Server implementation class CUDAMVSServer: """ Server for remote CUDA Multi-View Stereo processing. This server: 1. Accepts image uploads 2. Processes images using CUDA MVS 3. Provides 3D model downloads 4. Advertises itself on the local network """ def __init__(self, config: ServerConfig): """ Initialize the CUDA MVS server. Args: config: Server configuration """ self.config = config self.app = FastAPI(title="CUDA MVS Server", description="Remote CUDA Multi-View Stereo processing server") self.jobs: Dict[str, JobStatus] = {} self.server_id = str(uuid.uuid4()) self.start_time = time.time() self.zeroconf = None # Create output directory os.makedirs(config.output_dir, exist_ok=True) # Configure CORS self.app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Detect GPU info self.detect_gpu_info() # Register routes self.register_routes() # Advertise service if enabled if config.advertise_service: self.advertise_service() def detect_gpu_info(self): """Detect GPU information.""" if not self.config.gpu_info: try: # Try to get GPU info using nvidia-smi result = subprocess.run( ["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader"], capture_output=True, text=True, check=True ) self.config.gpu_info = result.stdout.strip() except (subprocess.SubprocessError, FileNotFoundError): # If nvidia-smi fails, try lspci try: result = subprocess.run( ["lspci", "-v", "|", "grep", "-i", "vga"], capture_output=True, text=True, shell=True ) self.config.gpu_info = result.stdout.strip() except subprocess.SubprocessError: self.config.gpu_info = "Unknown GPU" def register_routes(self): """Register API routes.""" # Authentication dependency api_key_header = APIKeyHeader(name="Authorization", auto_error=False) async def verify_api_key(api_key: str = Depends(api_key_header)): if self.config.api_key: if not api_key: raise HTTPException(status_code=401, detail="API key required") # Check if the API key is in the format "Bearer <key>" if api_key.startswith("Bearer "): api_key = api_key[7:] if api_key != self.config.api_key: raise HTTPException(status_code=401, detail="Invalid API key") return True # Status endpoint @self.app.get("/api/status") async def get_status(): return self.get_server_info() # Job management endpoints @self.app.post("/api/jobs", status_code=201, dependencies=[Depends(verify_api_key)]) async def create_job(job_request: JobRequest): return self.create_job(job_request.num_images) @self.app.get("/api/jobs", dependencies=[Depends(verify_api_key)]) async def list_jobs(): return {"jobs": self.jobs} @self.app.get("/api/jobs/{job_id}", dependencies=[Depends(verify_api_key)]) async def get_job(job_id: str): if job_id not in self.jobs: raise HTTPException(status_code=404, detail="Job not found") return self.jobs[job_id] @self.app.delete("/api/jobs/{job_id}", dependencies=[Depends(verify_api_key)]) async def cancel_job(job_id: str): if job_id not in self.jobs: raise HTTPException(status_code=404, detail="Job not found") # Cancel the job job = self.jobs[job_id] if job.status in ["created", "uploading", "processing"]: job.status = "cancelled" job.updated_at = time.time() # Clean up job directory job_dir = os.path.join(self.config.output_dir, job_id) if os.path.exists(job_dir): shutil.rmtree(job_dir) return {"status": "success", "message": "Job cancelled"} else: return {"status": "error", "message": f"Cannot cancel job in {job.status} state"} # Image upload endpoint @self.app.post("/api/jobs/{job_id}/images", dependencies=[Depends(verify_api_key)]) async def upload_image( job_id: str, file: UploadFile = File(...), metadata: str = Form(None) ): if job_id not in self.jobs: raise HTTPException(status_code=404, detail="Job not found") job = self.jobs[job_id] # Check if job is in a valid state for uploads if job.status not in ["created", "uploading"]: raise HTTPException( status_code=400, detail=f"Cannot upload images to job in {job.status} state" ) # Check if we've reached the maximum number of images if job.images_uploaded >= job.num_images: raise HTTPException( status_code=400, detail=f"Maximum number of images ({job.num_images}) already uploaded" ) # Update job status job.status = "uploading" job.updated_at = time.time() # Create job directory if it doesn't exist job_dir = os.path.join(self.config.output_dir, job_id, "images") os.makedirs(job_dir, exist_ok=True) # Parse metadata image_metadata = {} if metadata: try: image_metadata = json.loads(metadata) except json.JSONDecodeError: logger.warning(f"Invalid metadata format: {metadata}") # Save the file image_index = job.images_uploaded file_extension = os.path.splitext(file.filename)[1] image_path = os.path.join(job_dir, f"image_{image_index:04d}{file_extension}") with open(image_path, "wb") as f: shutil.copyfileobj(file.file, f) # Update job status job.images_uploaded += 1 job.updated_at = time.time() return { "status": "success", "job_id": job_id, "image_index": image_index, "image_path": image_path, "images_uploaded": job.images_uploaded, "total_images": job.num_images } # Process job endpoint @self.app.post("/api/jobs/{job_id}/process", status_code=202, dependencies=[Depends(verify_api_key)]) async def process_job( job_id: str, process_request: ProcessRequest, background_tasks: BackgroundTasks ): if job_id not in self.jobs: raise HTTPException(status_code=404, detail="Job not found") job = self.jobs[job_id] # Check if job is in a valid state for processing if job.status != "uploading": raise HTTPException( status_code=400, detail=f"Cannot process job in {job.status} state" ) # Check if all images have been uploaded if job.images_uploaded < job.num_images: raise HTTPException( status_code=400, detail=f"Not all images uploaded ({job.images_uploaded}/{job.num_images})" ) # Update job status job.status = "processing" job.updated_at = time.time() job.progress = 0.0 # Start processing in the background background_tasks.add_task( self.process_job_task, job_id, process_request.reconstruction_quality, process_request.output_formats ) return { "status": "success", "job_id": job_id, "message": "Processing started" } # Model download endpoint @self.app.get("/api/jobs/{job_id}/model", dependencies=[Depends(verify_api_key)]) async def download_model(job_id: str, format: str = "obj"): if job_id not in self.jobs: raise HTTPException(status_code=404, detail="Job not found") job = self.jobs[job_id] # Check if job is completed if job.status != "completed": raise HTTPException( status_code=400, detail=f"Job is not completed. Current status: {job.status}" ) # Check if the requested format is available if format == "obj" and job.obj_file: return FileResponse(job.obj_file, filename=f"{job.model_id}.obj") elif format == "ply" and job.point_cloud_file: return FileResponse(job.point_cloud_file, filename=f"{job.model_id}.ply") else: raise HTTPException( status_code=404, detail=f"Model in {format} format not available" ) def create_job(self, num_images: int) -> Dict[str, Any]: """ Create a new job. Args: num_images: Number of images to be uploaded Returns: Dictionary with job information """ # Check if we've reached the maximum number of jobs active_jobs = sum(1 for job in self.jobs.values() if job.status in ["created", "uploading", "processing"]) if active_jobs >= self.config.max_jobs: raise HTTPException( status_code=429, detail=f"Maximum number of active jobs ({self.config.max_jobs}) reached" ) # Check if the number of images is valid if num_images <= 0 or num_images > self.config.max_images_per_job: raise HTTPException( status_code=400, detail=f"Number of images must be between 1 and {self.config.max_images_per_job}" ) # Create a new job job_id = str(uuid.uuid4()) job = JobStatus( job_id=job_id, num_images=num_images ) # Add job to the list self.jobs[job_id] = job # Create job directory job_dir = os.path.join(self.config.output_dir, job_id) os.makedirs(job_dir, exist_ok=True) return job.dict() async def process_job_task( self, job_id: str, reconstruction_quality: str, output_formats: List[str] ): """ Process a job in the background. Args: job_id: ID of the job to process reconstruction_quality: Quality of the reconstruction (low, normal, high) output_formats: List of output formats to generate """ if job_id not in self.jobs: logger.error(f"Job {job_id} not found") return job = self.jobs[job_id] job_dir = os.path.join(self.config.output_dir, job_id) images_dir = os.path.join(job_dir, "images") output_dir = os.path.join(job_dir, "output") # Create output directory os.makedirs(output_dir, exist_ok=True) # Generate a model ID model_id = f"model_{job_id[:8]}" job.model_id = model_id # Update job status job.status = "processing" job.progress = 0.0 job.updated_at = time.time() try: # Start timing start_time = time.time() # Run CUDA MVS await self.run_cuda_mvs( job, images_dir, output_dir, model_id, reconstruction_quality ) # Convert output formats if needed if "obj" in output_formats and not job.obj_file: # Convert PLY to OBJ if needed ply_file = job.point_cloud_file if ply_file and os.path.exists(ply_file): obj_file = os.path.join(output_dir, f"{model_id}.obj") await self.convert_ply_to_obj(ply_file, obj_file) job.obj_file = obj_file # Calculate processing time job.processing_time = time.time() - start_time # Update job status job.status = "completed" job.progress = 100.0 job.updated_at = time.time() job.output_dir = output_dir logger.info(f"Job {job_id} completed successfully in {job.processing_time:.2f} seconds") except Exception as e: # Update job status job.status = "failed" job.error_message = str(e) job.updated_at = time.time() logger.error(f"Job {job_id} failed: {e}") async def run_cuda_mvs( self, job: JobStatus, images_dir: str, output_dir: str, model_id: str, reconstruction_quality: str ): """ Run CUDA MVS on the uploaded images. Args: job: Job status object images_dir: Directory containing the images output_dir: Directory to save the output model_id: ID of the model reconstruction_quality: Quality of the reconstruction """ # Check if CUDA MVS is installed cuda_mvs_executable = os.path.join(self.config.cuda_mvs_path, "build", "app_patch_match_mvs") if not os.path.exists(cuda_mvs_executable): raise FileNotFoundError(f"CUDA MVS executable not found at {cuda_mvs_executable}") # Create a list of image paths image_files = [] for file in os.listdir(images_dir): if file.lower().endswith((".jpg", ".jpeg", ".png")): image_files.append(os.path.join(images_dir, file)) if not image_files: raise ValueError("No valid image files found") # Sort image files to ensure consistent order image_files.sort() # Create a camera parameter file camera_params_file = os.path.join(output_dir, "cameras.txt") await self.generate_camera_params(image_files, camera_params_file) # Set quality parameters if reconstruction_quality == "low": num_iterations = 3 max_resolution = 1024 elif reconstruction_quality == "normal": num_iterations = 5 max_resolution = 2048 elif reconstruction_quality == "high": num_iterations = 7 max_resolution = 4096 else: num_iterations = 5 max_resolution = 2048 # Prepare output files point_cloud_file = os.path.join(output_dir, f"{model_id}.ply") # Build the command cmd = [ cuda_mvs_executable, "--input_folder", images_dir, "--camera_file", camera_params_file, "--output_folder", output_dir, "--output_file", point_cloud_file, "--num_iterations", str(num_iterations), "--max_resolution", str(max_resolution) ] # Run the command logger.info(f"Running CUDA MVS: {' '.join(cmd)}") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Monitor progress while True: if process.poll() is not None: break # Read output line by line output = process.stdout.readline() if output: # Try to parse progress information if "Progress:" in output: try: progress_str = output.split("Progress:")[1].strip().rstrip("%") progress = float(progress_str) job.progress = progress job.updated_at = time.time() except (ValueError, IndexError): pass # Sleep briefly to avoid CPU spinning await asyncio.sleep(0.1) # Get the final output stdout, stderr = process.communicate() # Check if the process was successful if process.returncode != 0: raise RuntimeError(f"CUDA MVS failed with error: {stderr}") # Check if the output file was created if not os.path.exists(point_cloud_file): raise FileNotFoundError(f"Output file not created: {point_cloud_file}") # Update job with the output file job.point_cloud_file = point_cloud_file async def generate_camera_params(self, image_files: List[str], output_file: str): """ Generate camera parameters for CUDA MVS. Args: image_files: List of image files output_file: Output file for camera parameters """ # For now, use a simple camera model with default parameters # In a real implementation, this would use structure from motion # to estimate camera parameters from the images with open(output_file, "w") as f: f.write(f"# Camera parameters for {len(image_files)} images\n") f.write("# Format: image_name width height fx fy cx cy\n") for i, image_file in enumerate(image_files): # Get image dimensions from PIL import Image with Image.open(image_file) as img: width, height = img.size # Use default camera parameters fx = width * 1.2 # Focal length x fy = height * 1.2 # Focal length y cx = width / 2 # Principal point x cy = height / 2 # Principal point y # Write camera parameters f.write(f"{os.path.basename(image_file)} {width} {height} {fx} {fy} {cx} {cy}\n") async def convert_ply_to_obj(self, ply_file: str, obj_file: str): """ Convert PLY file to OBJ format. Args: ply_file: Input PLY file obj_file: Output OBJ file """ try: import open3d as o3d # Load the PLY file mesh = o3d.io.read_triangle_mesh(ply_file) # Save as OBJ o3d.io.write_triangle_mesh(obj_file, mesh) logger.info(f"Converted {ply_file} to {obj_file}") except ImportError: # If open3d is not available, use a subprocess try: # Try using meshlab subprocess.run( ["meshlabserver", "-i", ply_file, "-o", obj_file], check=True, capture_output=True ) except (subprocess.SubprocessError, FileNotFoundError): # If meshlab is not available, try using assimp try: subprocess.run( ["assimp", "export", ply_file, obj_file], check=True, capture_output=True ) except (subprocess.SubprocessError, FileNotFoundError): raise RuntimeError("No suitable tool found to convert PLY to OBJ") def get_server_info(self) -> Dict[str, Any]: """ Get server information. Returns: Dictionary with server information """ # Count active jobs active_jobs = sum(1 for job in self.jobs.values() if job.status in ["created", "uploading", "processing"]) # Get server capabilities capabilities = { "max_jobs": self.config.max_jobs, "max_images_per_job": self.config.max_images_per_job, "job_timeout": self.config.job_timeout, "supported_formats": ["obj", "ply"], "gpu_info": self.config.gpu_info } # Get job summary job_summary = { "total": len(self.jobs), "active": active_jobs, "completed": sum(1 for job in self.jobs.values() if job.status == "completed"), "failed": sum(1 for job in self.jobs.values() if job.status == "failed") } return { "server_id": self.server_id, "name": self.config.server_name, "version": "1.0.0", "status": "running", "capabilities": capabilities, "jobs": job_summary, "uptime": time.time() - self.start_time } def advertise_service(self): """Advertise the server on the local network using Zeroconf.""" try: import socket from zeroconf import ServiceInfo, Zeroconf # Get local IP address s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] s.close() # Create service info capabilities = { "max_jobs": self.config.max_jobs, "max_images_per_job": self.config.max_images_per_job, "supported_formats": ["obj", "ply"], "gpu_info": self.config.gpu_info } service_info = ServiceInfo( "_cudamvs._tcp.local.", f"{self.server_id}._cudamvs._tcp.local.", addresses=[socket.inet_aton(local_ip)], port=DEFAULT_PORT, properties={ b"name": self.config.server_name.encode("utf-8"), b"capabilities": json.dumps(capabilities).encode("utf-8") } ) # Register service self.zeroconf = Zeroconf() self.zeroconf.register_service(service_info) logger.info(f"Advertising CUDA MVS service on {local_ip}:{DEFAULT_PORT}") except Exception as e: logger.error(f"Failed to advertise service: {e}") def run(self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT): """ Run the server. Args: host: Host to bind to port: Port to bind to """ uvicorn.run(self.app, host=host, port=port) def cleanup(self): """Clean up resources.""" if self.zeroconf: self.zeroconf.close() # Main entry point def main(): """Main entry point.""" parser = argparse.ArgumentParser(description="CUDA MVS Server") parser.add_argument("--host", default=DEFAULT_HOST, help="Host to bind to") parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Port to bind to") parser.add_argument("--cuda-mvs-path", default=DEFAULT_CUDA_MVS_PATH, help="Path to CUDA MVS installation") parser.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR, help="Output directory") parser.add_argument("--max-jobs", type=int, default=DEFAULT_MAX_JOBS, help="Maximum number of concurrent jobs") parser.add_argument("--max-images", type=int, default=DEFAULT_MAX_IMAGES_PER_JOB, help="Maximum images per job") parser.add_argument("--job-timeout", type=int, default=DEFAULT_JOB_TIMEOUT, help="Job timeout in seconds") parser.add_argument("--api-key", help="API key for authentication") parser.add_argument("--server-name", default="CUDA MVS Server", help="Server name") parser.add_argument("--no-advertise", action="store_true", help="Don't advertise service on the network") args = parser.parse_args() # Create server configuration config = ServerConfig( cuda_mvs_path=args.cuda_mvs_path, output_dir=args.output_dir, max_jobs=args.max_jobs, max_images_per_job=args.max_images, job_timeout=args.job_timeout, api_key=args.api_key, server_name=args.server_name, advertise_service=not args.no_advertise ) # Create and run server server = CUDAMVSServer(config) try: server.run(host=args.host, port=args.port) finally: server.cleanup() if __name__ == "__main__": # Add asyncio import for async/await support import asyncio main() ```