#
tokens: 44230/50000 13/67 files (page 2/3)
lines: off (toggle) GitHub
raw markdown copy
This is page 2 of 3. Use http://codebase.md/jhacksman/openscad-mcp-server?page={x} to view the full context.

# Directory Structure

```
├── implementation_plan.md
├── old
│   ├── download_sam2_checkpoint.py
│   ├── src
│   │   ├── ai
│   │   │   └── sam_segmentation.py
│   │   ├── models
│   │   │   └── threestudio_generator.py
│   │   └── workflow
│   │       └── image_to_model_pipeline.py
│   └── test_sam2_segmentation.py
├── README.md
├── requirements.txt
├── rtfmd
│   ├── decisions
│   │   ├── ai-driven-code-generation.md
│   │   └── export-formats.md
│   ├── files
│   │   └── src
│   │       ├── ai
│   │       │   └── ai_service.py.md
│   │       ├── main.py.md
│   │       ├── models
│   │       │   └── code_generator.py.md
│   │       └── nlp
│   │           └── parameter_extractor.py.md
│   ├── knowledge
│   │   ├── ai
│   │   │   └── natural-language-processing.md
│   │   ├── nlp
│   │   │   └── parameter-extraction.md
│   │   └── openscad
│   │       ├── export-formats.md
│   │       ├── openscad-basics.md
│   │       └── primitive-testing.md
│   └── README.md
├── scad
│   └── simple_cube.scad
├── src
│   ├── __init__.py
│   ├── __pycache__
│   │   └── __init__.cpython-312.pyc
│   ├── ai
│   │   ├── ai_service.py
│   │   ├── gemini_api.py
│   │   └── venice_api.py
│   ├── config.py
│   ├── main_remote.py
│   ├── main.py
│   ├── main.py.new
│   ├── models
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   │   ├── __init__.cpython-312.pyc
│   │   │   └── code_generator.cpython-312.pyc
│   │   ├── code_generator.py
│   │   ├── cuda_mvs.py
│   │   └── scad_templates
│   │       └── basic_shapes.scad
│   ├── nlp
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   │   ├── __init__.cpython-312.pyc
│   │   │   └── parameter_extractor.cpython-312.pyc
│   │   └── parameter_extractor.py
│   ├── openscad_wrapper
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   │   ├── __init__.cpython-312.pyc
│   │   │   └── wrapper.cpython-312.pyc
│   │   └── wrapper.py
│   ├── printer_discovery
│   │   ├── __init__.py
│   │   └── printer_discovery.py
│   ├── remote
│   │   ├── connection_manager.py
│   │   ├── cuda_mvs_client.py
│   │   ├── cuda_mvs_server.py
│   │   └── error_handling.py
│   ├── testing
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   │   ├── __init__.cpython-312.pyc
│   │   │   ├── primitive_tester.cpython-312.pyc
│   │   │   └── test_primitives.cpython-312.pyc
│   │   ├── primitive_tester.py
│   │   └── test_primitives.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   │   ├── __init__.cpython-312.pyc
│   │   │   ├── stl_exporter.cpython-312.pyc
│   │   │   └── stl_validator.cpython-312.pyc
│   │   ├── cad_exporter.py
│   │   ├── format_validator.py
│   │   ├── stl_exporter.py
│   │   ├── stl_repair.py
│   │   └── stl_validator.py
│   ├── visualization
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   │   ├── __init__.cpython-312.pyc
│   │   │   └── renderer.cpython-312.pyc
│   │   ├── headless_renderer.py
│   │   ├── renderer.py
│   │   └── web_interface.py
│   └── workflow
│       ├── image_approval.py
│       └── multi_view_to_model_pipeline.py
├── test_complete_workflow.py
├── test_cuda_mvs.py
├── test_gemini_api.py
├── test_image_approval_workflow.py
├── test_image_approval.py
├── test_image_to_model_pipeline.py
├── test_model_selection.py
├── test_multi_view_pipeline.py
├── test_primitives.sh
├── test_rabbit_direct.py
├── test_remote_cuda_mvs.py
└── test_venice_example.py
```

# Files

--------------------------------------------------------------------------------
/test_multi_view_pipeline.py:
--------------------------------------------------------------------------------

```python
"""
Test script for multi-view to model pipeline.
"""

import os
import sys
import logging
import unittest
from unittest.mock import patch, MagicMock
from pathlib import Path

# Add the src directory to the path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from src.workflow.multi_view_to_model_pipeline import MultiViewToModelPipeline
from src.ai.gemini_api import GeminiImageGenerator
from src.models.cuda_mvs import CUDAMultiViewStereo
from src.workflow.image_approval import ImageApprovalTool

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TestMultiViewPipeline(unittest.TestCase):
    """
    Test cases for multi-view to model pipeline.
    """
    
    def setUp(self):
        """
        Set up test environment.
        """
        # Create test directories
        self.test_output_dir = "output/test_pipeline"
        self.test_images_dir = os.path.join(self.test_output_dir, "images")
        self.test_models_dir = os.path.join(self.test_output_dir, "models")
        
        for directory in [self.test_output_dir, self.test_images_dir, self.test_models_dir]:
            os.makedirs(directory, exist_ok=True)
        
        # Create mock CUDA MVS path
        self.cuda_mvs_path = "mock_cuda_mvs"
        os.makedirs(os.path.join(self.cuda_mvs_path, "build"), exist_ok=True)
        
        # Create mock executable
        with open(os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), "w") as f:
            f.write("#!/bin/bash\necho 'Mock CUDA MVS'\n")
        os.chmod(os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), 0o755)
        
        # Create mock components
        self.mock_gemini = MagicMock(spec=GeminiImageGenerator)
        self.mock_cuda_mvs = MagicMock(spec=CUDAMultiViewStereo)
        self.mock_approval = MagicMock(spec=ImageApprovalTool)
        
        # Configure mock responses
        self.configure_mocks()
        
        # Create the pipeline with mock components
        self.pipeline = MultiViewToModelPipeline(
            gemini_generator=self.mock_gemini,
            cuda_mvs=self.mock_cuda_mvs,
            approval_tool=self.mock_approval,
            output_dir=self.test_output_dir
        )
    
    def configure_mocks(self):
        """
        Configure mock responses for components.
        """
        # Mock Gemini image generation
        def mock_generate_image(prompt, **kwargs):
            image_path = os.path.join(self.test_images_dir, f"{prompt[:10].replace(' ', '_')}.png")
            with open(image_path, "w") as f:
                f.write(f"Mock image for {prompt}")
            return {
                "prompt": prompt,
                "local_path": image_path,
                "image_data": b"mock_image_data"
            }
        
        def mock_generate_multiple_views(prompt, num_views, **kwargs):
            results = []
            for i in range(num_views):
                image_path = os.path.join(self.test_images_dir, f"view_{i}.png")
                with open(image_path, "w") as f:
                    f.write(f"Mock image for {prompt} - view {i}")
                results.append({
                    "prompt": f"{prompt} - view {i}",
                    "local_path": image_path,
                    "image_data": b"mock_image_data",
                    "view_direction": f"view {i}",
                    "view_index": i + 1
                })
            return results
        
        self.mock_gemini.generate_image.side_effect = mock_generate_image
        self.mock_gemini.generate_multiple_views.side_effect = mock_generate_multiple_views
        
        # Mock CUDA MVS
        def mock_generate_model(image_paths, **kwargs):
            model_dir = os.path.join(self.test_models_dir, "mock_model")
            os.makedirs(model_dir, exist_ok=True)
            
            point_cloud_file = os.path.join(model_dir, "mock_model.ply")
            with open(point_cloud_file, "w") as f:
                f.write("Mock point cloud")
            
            obj_file = os.path.join(model_dir, "mock_model.obj")
            with open(obj_file, "w") as f:
                f.write("Mock OBJ file")
            
            return {
                "model_id": "mock_model",
                "output_dir": model_dir,
                "point_cloud_file": point_cloud_file,
                "obj_file": obj_file,
                "input_images": image_paths
            }
        
        self.mock_cuda_mvs.generate_model_from_images.side_effect = mock_generate_model
        self.mock_cuda_mvs.convert_ply_to_obj.return_value = os.path.join(self.test_models_dir, "mock_model", "mock_model.obj")
        
        # Mock approval tool
        def mock_present_image(image_path, metadata):
            return {
                "approval_id": os.path.basename(image_path).split('.')[0],
                "image_path": image_path,
                "image_url": f"/images/{os.path.basename(image_path)}",
                "metadata": metadata or {}
            }
        
        def mock_process_approval(approval_id, approved, image_path):
            if approved:
                approved_path = os.path.join(self.test_output_dir, "approved", os.path.basename(image_path))
                os.makedirs(os.path.dirname(approved_path), exist_ok=True)
                with open(approved_path, "w") as f:
                    f.write(f"Approved image {approval_id}")
                
                return {
                    "approval_id": approval_id,
                    "approved": True,
                    "original_path": image_path,
                    "approved_path": approved_path
                }
            else:
                return {
                    "approval_id": approval_id,
                    "approved": False,
                    "original_path": image_path
                }
        
        self.mock_approval.present_image_for_approval.side_effect = mock_present_image
        self.mock_approval.process_approval.side_effect = mock_process_approval
        self.mock_approval.get_approved_images.return_value = [
            os.path.join(self.test_output_dir, "approved", f"view_{i}.png") for i in range(3)
        ]
    
    def test_generate_model_from_text(self):
        """
        Test generating a 3D model from text prompt.
        """
        # Test parameters
        prompt = "A low-poly rabbit"
        num_views = 3
        
        # Mock approvals - approve all images
        def mock_get_approval(approval_request):
            return True
        
        # Call the method
        result = self.pipeline.generate_model_from_text(
            prompt, num_views=num_views, get_approval_callback=mock_get_approval
        )
        
        # Verify the result
        self.assertIsNotNone(result)
        self.assertTrue("model_id" in result)
        self.assertTrue("obj_file" in result)
        self.assertTrue("point_cloud_file" in result)
        
        # Verify component calls
        self.mock_gemini.generate_multiple_views.assert_called_once_with(
            prompt, num_views=num_views, output_dir=os.path.join(self.test_output_dir, "multi_view")
        )
        
        self.assertEqual(self.mock_approval.present_image_for_approval.call_count, num_views)
        self.assertEqual(self.mock_approval.process_approval.call_count, num_views)
        
        self.mock_cuda_mvs.generate_model_from_images.assert_called_once()
        self.mock_cuda_mvs.convert_ply_to_obj.assert_called_once()
    
    def test_generate_model_from_image(self):
        """
        Test generating a 3D model from a base image.
        """
        # Create a mock base image
        base_image_path = os.path.join(self.test_images_dir, "base_image.png")
        with open(base_image_path, "w") as f:
            f.write("Mock base image")
        
        # Test parameters
        prompt = "A low-poly rabbit based on this image"
        num_views = 3
        
        # Mock approvals - approve all images
        def mock_get_approval(approval_request):
            return True
        
        # Call the method
        result = self.pipeline.generate_model_from_image(
            base_image_path, prompt, num_views=num_views, get_approval_callback=mock_get_approval
        )
        
        # Verify the result
        self.assertIsNotNone(result)
        self.assertTrue("model_id" in result)
        self.assertTrue("obj_file" in result)
        self.assertTrue("point_cloud_file" in result)
        
        # Verify component calls
        self.mock_gemini.generate_multiple_views.assert_called_once_with(
            prompt, num_views=num_views, base_image_path=base_image_path, 
            output_dir=os.path.join(self.test_output_dir, "multi_view")
        )
        
        self.assertEqual(self.mock_approval.present_image_for_approval.call_count, num_views)
        self.assertEqual(self.mock_approval.process_approval.call_count, num_views)
        
        self.mock_cuda_mvs.generate_model_from_images.assert_called_once()
        self.mock_cuda_mvs.convert_ply_to_obj.assert_called_once()
    
    def test_selective_approval(self):
        """
        Test selective approval of generated images.
        """
        # Test parameters
        prompt = "A low-poly rabbit"
        num_views = 4
        
        # Mock approvals - only approve views 0 and 2
        def mock_get_approval(approval_request):
            view_index = int(approval_request["approval_id"].split('_')[1])
            return view_index % 2 == 0  # Approve even-indexed views
        
        # Call the method
        result = self.pipeline.generate_model_from_text(
            prompt, num_views=num_views, get_approval_callback=mock_get_approval
        )
        
        # Verify the result
        self.assertIsNotNone(result)
        
        # Verify component calls
        self.assertEqual(self.mock_approval.present_image_for_approval.call_count, num_views)
        self.assertEqual(self.mock_approval.process_approval.call_count, num_views)
        
        # Only 2 images should be approved and used for model generation
        approved_images = [call[0][0] for call in self.mock_cuda_mvs.generate_model_from_images.call_args_list]
        if approved_images:
            self.assertEqual(len(approved_images[0]), 2)  # Only 2 images approved
    
    def test_error_handling(self):
        """
        Test error handling in the pipeline.
        """
        # Test parameters
        prompt = "A low-poly rabbit"
        
        # Mock error in Gemini API
        self.mock_gemini.generate_multiple_views.side_effect = Exception("Mock API error")
        
        # Call the method and expect an exception
        with self.assertRaises(Exception):
            self.pipeline.generate_model_from_text(prompt)
    
    def tearDown(self):
        """
        Clean up after tests.
        """
        # Clean up test output directory
        import shutil
        if os.path.exists(self.test_output_dir):
            shutil.rmtree(self.test_output_dir)
        
        # Clean up mock CUDA MVS path
        if os.path.exists(self.cuda_mvs_path):
            shutil.rmtree(self.cuda_mvs_path)

if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/src/main_remote.py:
--------------------------------------------------------------------------------

```python
"""
Main module for OpenSCAD MCP Server with remote CUDA MVS processing.

This module adds remote CUDA MVS processing capabilities to the MCP server.
"""

import os
import logging
from typing import Dict, Any, List, Optional

# Import remote processing components
from src.remote.cuda_mvs_client import CUDAMVSClient
from src.remote.connection_manager import CUDAMVSConnectionManager
from src.config import REMOTE_CUDA_MVS

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize remote processing components if enabled
remote_connection_manager = None
remote_jobs = {}

def initialize_remote_processing():
    """
    Initialize remote CUDA MVS processing components.
    
    Returns:
        CUDAMVSConnectionManager instance if enabled, None otherwise
    """
    global remote_connection_manager
    
    if REMOTE_CUDA_MVS["ENABLED"]:
        logger.info("Initializing remote CUDA MVS connection manager")
        remote_connection_manager = CUDAMVSConnectionManager(
            api_key=REMOTE_CUDA_MVS["API_KEY"],
            discovery_port=REMOTE_CUDA_MVS["DISCOVERY_PORT"],
            use_lan_discovery=REMOTE_CUDA_MVS["USE_LAN_DISCOVERY"],
            server_url=REMOTE_CUDA_MVS["SERVER_URL"] if REMOTE_CUDA_MVS["SERVER_URL"] else None
        )
        return remote_connection_manager
    
    return None

def discover_remote_servers():
    """
    Discover remote CUDA MVS servers on the network.
    
    Returns:
        List of discovered servers
    """
    if not remote_connection_manager:
        logger.warning("Remote CUDA MVS processing is not enabled")
        return []
    
    return remote_connection_manager.discover_servers()

def get_server_status(server_id: str):
    """
    Get the status of a remote CUDA MVS server.
    
    Args:
        server_id: ID of the server to get status for
        
    Returns:
        Server status information
    """
    if not remote_connection_manager:
        logger.warning("Remote CUDA MVS processing is not enabled")
        return None
    
    return remote_connection_manager.get_server_status(server_id)

def upload_images_to_server(server_id: str, image_paths: List[str], job_id: Optional[str] = None):
    """
    Upload images to a remote CUDA MVS server.
    
    Args:
        server_id: ID of the server to upload to
        image_paths: List of image paths to upload
        job_id: Optional job ID to use
        
    Returns:
        Job information
    """
    if not remote_connection_manager:
        logger.warning("Remote CUDA MVS processing is not enabled")
        return None
    
    return remote_connection_manager.upload_images(server_id, image_paths, job_id)

def process_images_remotely(server_id: str, job_id: str, params: Dict[str, Any] = None):
    """
    Process uploaded images on a remote CUDA MVS server.
    
    Args:
        server_id: ID of the server to process on
        job_id: Job ID of the uploaded images
        params: Optional processing parameters
        
    Returns:
        Job status information
    """
    if not remote_connection_manager:
        logger.warning("Remote CUDA MVS processing is not enabled")
        return None
    
    # Set default parameters if not provided
    if params is None:
        params = {
            "quality": REMOTE_CUDA_MVS["DEFAULT_RECONSTRUCTION_QUALITY"],
            "output_format": REMOTE_CUDA_MVS["DEFAULT_OUTPUT_FORMAT"]
        }
    
    # Start processing
    result = remote_connection_manager.process_job(server_id, job_id, params)
    
    # Store job information
    if result and "job_id" in result:
        remote_jobs[result["job_id"]] = {
            "server_id": server_id,
            "job_id": result["job_id"],
            "status": result.get("status", "processing"),
            "params": params
        }
    
    return result

def get_job_status(job_id: str):
    """
    Get the status of a remote processing job.
    
    Args:
        job_id: ID of the job to get status for
        
    Returns:
        Job status information
    """
    if not remote_connection_manager:
        logger.warning("Remote CUDA MVS processing is not enabled")
        return None
    
    # Check if job exists
    if job_id not in remote_jobs:
        logger.warning(f"Job with ID {job_id} not found")
        return None
    
    # Get job information
    job_info = remote_jobs[job_id]
    
    # Get status from server
    status = remote_connection_manager.get_job_status(job_info["server_id"], job_id)
    
    # Update job information
    if status:
        job_info["status"] = status.get("status", job_info["status"])
        job_info["progress"] = status.get("progress", 0)
        job_info["message"] = status.get("message", "")
    
    return job_info

def download_model(job_id: str, output_dir: Optional[str] = None):
    """
    Download a processed model from a remote CUDA MVS server.
    
    Args:
        job_id: ID of the job to download model for
        output_dir: Optional directory to save the model to
        
    Returns:
        Model information
    """
    if not remote_connection_manager:
        logger.warning("Remote CUDA MVS processing is not enabled")
        return None
    
    # Check if job exists
    if job_id not in remote_jobs:
        logger.warning(f"Job with ID {job_id} not found")
        return None
    
    # Get job information
    job_info = remote_jobs[job_id]
    
    # Set default output directory if not provided
    if output_dir is None:
        output_dir = os.path.join(REMOTE_CUDA_MVS["OUTPUT_DIR"], job_id)
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Download model
    result = remote_connection_manager.download_model(job_info["server_id"], job_id, output_dir)
    
    # Update job information
    if result:
        job_info["model_path"] = result.get("model_path")
        job_info["point_cloud_path"] = result.get("point_cloud_path")
        job_info["completed"] = True
    
    return result

def cancel_job(job_id: str):
    """
    Cancel a remote processing job.
    
    Args:
        job_id: ID of the job to cancel
        
    Returns:
        Cancellation result
    """
    if not remote_connection_manager:
        logger.warning("Remote CUDA MVS processing is not enabled")
        return None
    
    # Check if job exists
    if job_id not in remote_jobs:
        logger.warning(f"Job with ID {job_id} not found")
        return None
    
    # Get job information
    job_info = remote_jobs[job_id]
    
    # Cancel job
    result = remote_connection_manager.cancel_job(job_info["server_id"], job_id)
    
    # Update job information
    if result and result.get("cancelled", False):
        job_info["status"] = "cancelled"
        job_info["message"] = "Job cancelled by user"
    
    return result

# MCP tool functions for remote processing

def discover_remote_cuda_mvs_servers():
    """
    MCP tool function to discover remote CUDA MVS servers.
    
    Returns:
        Dictionary with discovered servers
    """
    servers = discover_remote_servers()
    
    return {
        "servers": servers,
        "count": len(servers)
    }

def get_remote_server_status(server_id: str):
    """
    MCP tool function to get the status of a remote CUDA MVS server.
    
    Args:
        server_id: ID of the server to get status for
        
    Returns:
        Dictionary with server status
    """
    status = get_server_status(server_id)
    
    if not status:
        raise ValueError(f"Failed to get status for server with ID {server_id}")
    
    return status

def process_images_with_remote_cuda_mvs(
    server_id: str,
    image_paths: List[str],
    quality: str = REMOTE_CUDA_MVS["DEFAULT_RECONSTRUCTION_QUALITY"],
    output_format: str = REMOTE_CUDA_MVS["DEFAULT_OUTPUT_FORMAT"],
    wait_for_completion: bool = REMOTE_CUDA_MVS["WAIT_FOR_COMPLETION"]
):
    """
    MCP tool function to process images with remote CUDA MVS.
    
    Args:
        server_id: ID of the server to process on
        image_paths: List of image paths to process
        quality: Reconstruction quality (low, normal, high)
        output_format: Output format (obj, ply)
        wait_for_completion: Whether to wait for job completion
        
    Returns:
        Dictionary with job information
    """
    # Upload images
    upload_result = upload_images_to_server(server_id, image_paths)
    
    if not upload_result or "job_id" not in upload_result:
        raise ValueError("Failed to upload images to server")
    
    job_id = upload_result["job_id"]
    
    # Process images
    process_result = process_images_remotely(
        server_id,
        job_id,
        {
            "quality": quality,
            "output_format": output_format
        }
    )
    
    if not process_result:
        raise ValueError(f"Failed to process images for job {job_id}")
    
    # Wait for completion if requested
    if wait_for_completion:
        import time
        
        while True:
            status = get_job_status(job_id)
            
            if not status:
                raise ValueError(f"Failed to get status for job {job_id}")
            
            if status["status"] in ["completed", "failed", "cancelled"]:
                break
            
            time.sleep(REMOTE_CUDA_MVS["POLL_INTERVAL"])
        
        if status["status"] == "completed":
            # Download model
            download_result = download_model(job_id)
            
            if not download_result:
                raise ValueError(f"Failed to download model for job {job_id}")
            
            return {
                "job_id": job_id,
                "status": "completed",
                "model_path": download_result.get("model_path"),
                "point_cloud_path": download_result.get("point_cloud_path")
            }
        else:
            return {
                "job_id": job_id,
                "status": status["status"],
                "message": status.get("message", "")
            }
    
    # Return job information without waiting
    return {
        "job_id": job_id,
        "status": "processing",
        "server_id": server_id
    }

def get_remote_job_status(job_id: str):
    """
    MCP tool function to get the status of a remote processing job.
    
    Args:
        job_id: ID of the job to get status for
        
    Returns:
        Dictionary with job status
    """
    status = get_job_status(job_id)
    
    if not status:
        raise ValueError(f"Failed to get status for job with ID {job_id}")
    
    return status

def download_remote_model(job_id: str, output_dir: Optional[str] = None):
    """
    MCP tool function to download a processed model from a remote CUDA MVS server.
    
    Args:
        job_id: ID of the job to download model for
        output_dir: Optional directory to save the model to
        
    Returns:
        Dictionary with model information
    """
    result = download_model(job_id, output_dir)
    
    if not result:
        raise ValueError(f"Failed to download model for job with ID {job_id}")
    
    return result

def cancel_remote_job(job_id: str):
    """
    MCP tool function to cancel a remote processing job.
    
    Args:
        job_id: ID of the job to cancel
        
    Returns:
        Dictionary with cancellation result
    """
    result = cancel_job(job_id)
    
    if not result:
        raise ValueError(f"Failed to cancel job with ID {job_id}")
    
    return result

```

--------------------------------------------------------------------------------
/src/models/code_generator.py:
--------------------------------------------------------------------------------

```python
import os
import logging
import uuid
from typing import Dict, Any, List, Tuple, Optional

logger = logging.getLogger(__name__)

class CodeGenerator:
    """
    Generates OpenSCAD code from natural language descriptions and parameters.
    Implements translation of requirements to OpenSCAD primitives and modules.
    """
    
    def __init__(self, scad_templates_dir: str, output_dir: str, ai_service=None):
        """
        Initialize the code generator.
        
        Args:
            scad_templates_dir: Directory containing SCAD template files
            output_dir: Directory to store generated SCAD files
            ai_service: Optional AI service for enhanced code generation
        """
        self.scad_templates_dir = scad_templates_dir
        self.output_dir = output_dir
        self.ai_service = ai_service
        
        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)
        
        # Map of shape types to their corresponding module names
        self.shape_module_map = {
            'cube': 'parametric_cube',
            'sphere': 'parametric_sphere',
            'cylinder': 'parametric_cylinder',
            'box': 'hollow_box',
            'rounded_box': 'rounded_box',
            'container': 'rounded_container',
            'tube': 'tube',
            'cone': 'cone',
            'wedge': 'wedge',
            'rounded_cylinder': 'rounded_cylinder',
            'torus': 'torus',
            'hexagonal_prism': 'hexagonal_prism',
            'text': 'text_3d',
            'prism': 'triangular_prism',
            'custom': 'custom_shape'
        }
        
        # Parameter mapping from natural language to OpenSCAD parameters
        self.parameter_map = {
            'width': 'width',
            'depth': 'depth',
            'height': 'height',
            'radius': 'radius',
            'thickness': 'thickness',
            'segments': 'segments',
            'center': 'center',
            'inner_radius': 'inner_radius',
            'outer_radius': 'outer_radius',
            'corner_radius': 'corner_radius',
            'text': 'text',
            'size': 'size',
            'font': 'font',
            'base_radius': 'base_radius',
            'major_radius': 'major_radius',
            'minor_radius': 'minor_radius',
            'angle': 'angle',
            'scale': 'scale',
            'resolution': 'resolution'
        }
    
    def generate_code(self, model_type: str, parameters: Dict[str, Any], description: Optional[str] = None) -> str:
        """
        Generate OpenSCAD code for a given model type and parameters.
        
        Args:
            model_type: Type of model to generate
            parameters: Dictionary of parameters for the model
            description: Optional natural language description for AI-driven generation
            
        Returns:
            Path to the generated SCAD file
        """
        # Generate a unique ID for the model
        model_id = str(uuid.uuid4())
        scad_file = os.path.join(self.output_dir, f"{model_id}.scad")
        
        # Check if we should use AI-driven generation for complex models
        if model_type == 'custom' and description and self.ai_service:
            scad_code = self._generate_ai_driven_code(description, parameters)
        else:
            # Get the module name for the model type
            module_name = self.shape_module_map.get(model_type)
            if not module_name:
                raise ValueError(f"Unsupported model type: {model_type}")
            
            # Map parameters to OpenSCAD parameter names
            scad_params = self._map_parameters(parameters)
            
            # Generate the OpenSCAD code
            scad_code = self._generate_scad_code(module_name, scad_params)
        
        # Write the code to a file
        with open(scad_file, 'w') as f:
            f.write(scad_code)
        
        logger.info(f"Generated OpenSCAD code: {scad_file}")
        return scad_file
    
    def update_code(self, scad_file: str, parameters: Dict[str, Any]) -> str:
        """
        Update an existing SCAD file with new parameters.
        
        Args:
            scad_file: Path to the SCAD file to update
            parameters: New parameters to apply
            
        Returns:
            Path to the updated SCAD file
        """
        if not os.path.exists(scad_file):
            raise FileNotFoundError(f"SCAD file not found: {scad_file}")
        
        # Read the existing SCAD file
        with open(scad_file, 'r') as f:
            scad_code = f.read()
        
        # Determine the module name from the code
        module_name = None
        for shape_type, module in self.shape_module_map.items():
            if module in scad_code:
                module_name = module
                break
        
        if not module_name:
            raise ValueError("Could not determine module name from existing SCAD file")
        
        # Map parameters to OpenSCAD parameter names
        scad_params = self._map_parameters(parameters)
        
        # Generate the updated OpenSCAD code
        updated_code = self._generate_scad_code(module_name, scad_params)
        
        # Write the updated code to the file
        with open(scad_file, 'w') as f:
            f.write(updated_code)
        
        logger.info(f"Updated OpenSCAD code: {scad_file}")
        return scad_file
    
    def combine_models(self, operations: List[Dict[str, Any]]) -> str:
        """
        Combine multiple models using CSG operations.
        
        Args:
            operations: List of operations, each containing:
                - model_type: Type of model
                - parameters: Parameters for the model
                - operation: CSG operation (union, difference, intersection)
                - transform: Optional transformation to apply
            
        Returns:
            Path to the generated SCAD file
        """
        # Generate a unique ID for the combined model
        model_id = str(uuid.uuid4())
        scad_file = os.path.join(self.output_dir, f"{model_id}.scad")
        
        # Include the basic shapes library
        scad_code = f"""// Combined model
include <{os.path.join(self.scad_templates_dir, "basic_shapes.scad")}>;

"""
        
        # Process each operation
        current_op = None
        for i, op in enumerate(operations):
            model_type = op.get('model_type')
            parameters = op.get('parameters', {})
            operation = op.get('operation')
            transform = op.get('transform')
            
            # Get the module name for the model type
            if model_type is None:
                raise ValueError("Model type cannot be None")
            module_name = self.shape_module_map.get(str(model_type))
            if not module_name:
                raise ValueError(f"Unsupported model type: {model_type}")
            
            # Map parameters to OpenSCAD parameter names
            scad_params = self._map_parameters(parameters)
            
            # Format parameters for the module call
            params_str = ", ".join([f"{k}={v}" for k, v in scad_params.items()])
            
            # Start or continue the CSG operation chain
            if i == 0:
                # First operation doesn't need an operator
                if operation:
                    current_op = operation
                    scad_code += f"{operation}() {{\n"
                
                # Add the module call with optional transformation
                if transform:
                    scad_code += f"    {transform} {module_name}({params_str});\n"
                else:
                    scad_code += f"    {module_name}({params_str});\n"
            else:
                # Check if we need to close the previous operation and start a new one
                if operation and operation != current_op:
                    if current_op:
                        scad_code += "}\n\n"
                    current_op = operation
                    scad_code += f"{operation}() {{\n"
                
                # Add the module call with optional transformation
                if transform:
                    scad_code += f"    {transform} {module_name}({params_str});\n"
                else:
                    scad_code += f"    {module_name}({params_str});\n"
        
        # Close the final operation if needed
        if current_op:
            scad_code += "}\n"
        
        # Write the code to a file
        with open(scad_file, 'w') as f:
            f.write(scad_code)
        
        logger.info(f"Generated combined OpenSCAD code: {scad_file}")
        return scad_file
    
    def _map_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Map natural language parameters to OpenSCAD parameters."""
        scad_params = {}
        
        for param, value in parameters.items():
            # Map the parameter name if it exists in the mapping
            scad_param = self.parameter_map.get(param, param)
            
            # Format the value appropriately for OpenSCAD
            if isinstance(value, bool):
                scad_params[scad_param] = str(value).lower()
            elif isinstance(value, str):
                if value.lower() == 'true' or value.lower() == 'false':
                    scad_params[scad_param] = value.lower()
                else:
                    # For text parameters, add quotes
                    scad_params[scad_param] = f'"{value}"'
            else:
                scad_params[scad_param] = value
        
        return scad_params
    
    def _generate_scad_code(self, module_name: str, parameters: Dict[str, Any]) -> str:
        """Generate OpenSCAD code for a module with parameters."""
        # Include the basic shapes library
        scad_code = f"""// Generated OpenSCAD code
include <{os.path.join(self.scad_templates_dir, "basic_shapes.scad")}>;

// Parameters
"""
        
        # Add parameter declarations
        for param, value in parameters.items():
            scad_code += f"{param} = {value};\n"
        
        # Add the module call
        scad_code += f"\n// Model\n{module_name}("
        
        # Add parameters to the module call
        param_list = [f"{param}={param}" for param in parameters.keys()]
        scad_code += ", ".join(param_list)
        
        scad_code += ");\n"
        
        return scad_code
        
    def _generate_ai_driven_code(self, description: str, parameters: Dict[str, Any]) -> str:
        """
        Generate OpenSCAD code using AI-driven techniques based on natural language description.
        
        Args:
            description: Natural language description of the model
            parameters: Dictionary of parameters for the model
            
        Returns:
            Generated OpenSCAD code
        """
        if not self.ai_service:
            logger.warning("AI service not available, falling back to basic shape generation")
            # Fall back to a basic cube if AI service is not available
            return self._generate_scad_code('parametric_cube', {'width': 10, 'height': 10, 'depth': 10})
        
        try:
            # Use the AI service to generate OpenSCAD code
            logger.info(f"Generating OpenSCAD code from description: {description}")
            
            # Prepare context for the AI service
            context = {
                "description": description,
                "parameters": parameters,
                "templates_dir": self.scad_templates_dir
            }
            
            # Call the AI service to generate code
            scad_code = self.ai_service.generate_openscad_code(context)
            
            # Ensure the code includes the basic shapes library
            if "include <" not in scad_code:
                scad_code = f"""// AI-generated OpenSCAD code
include <{os.path.join(self.scad_templates_dir, "basic_shapes.scad")}>;

{scad_code}
"""
            
            return scad_code
        except Exception as e:
            logger.error(f"Error generating AI-driven code: {e}")
            # Fall back to a basic shape if there's an error
            return self._generate_scad_code('parametric_cube', {'width': 10, 'height': 10, 'depth': 10})

```

--------------------------------------------------------------------------------
/test_complete_workflow.py:
--------------------------------------------------------------------------------

```python
"""
Test script for the complete workflow from text to 3D model.
"""

import os
import sys
import logging
import unittest
from unittest.mock import patch, MagicMock
from pathlib import Path

# Add the src directory to the path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from src.ai.gemini_api import GeminiImageGenerator
from src.workflow.image_approval import ImageApprovalManager
from src.models.cuda_mvs import CUDAMultiViewStereo
from src.workflow.multi_view_to_model_pipeline import MultiViewToModelPipeline
from src.config import MULTI_VIEW_PIPELINE, IMAGE_APPROVAL, REMOTE_CUDA_MVS

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TestCompleteWorkflow(unittest.TestCase):
    """
    Test cases for the complete workflow from text to 3D model.
    """
    
    def setUp(self):
        """
        Set up test environment.
        """
        # Create test output directories
        self.test_output_dir = "output/test_complete_workflow"
        self.test_images_dir = os.path.join(self.test_output_dir, "images")
        self.test_multi_view_dir = os.path.join(self.test_output_dir, "multi_view")
        self.test_approved_dir = os.path.join(self.test_output_dir, "approved")
        self.test_models_dir = os.path.join(self.test_output_dir, "models")
        
        os.makedirs(self.test_output_dir, exist_ok=True)
        os.makedirs(self.test_images_dir, exist_ok=True)
        os.makedirs(self.test_multi_view_dir, exist_ok=True)
        os.makedirs(self.test_approved_dir, exist_ok=True)
        os.makedirs(self.test_models_dir, exist_ok=True)
        
        # Mock API key
        self.api_key = "test_api_key"
        
        # Create the components
        self.image_generator = GeminiImageGenerator(
            api_key=self.api_key,
            output_dir=self.test_images_dir
        )
        
        self.approval_manager = ImageApprovalManager(
            output_dir=self.test_multi_view_dir,
            approved_dir=self.test_approved_dir,
            min_approved_images=3,
            auto_approve=False
        )
        
        self.cuda_mvs = CUDAMultiViewStereo(
            output_dir=self.test_models_dir,
            use_gpu=False
        )
        
        # Create the pipeline
        self.pipeline = MultiViewToModelPipeline(
            image_generator=self.image_generator,
            approval_manager=self.approval_manager,
            model_generator=self.cuda_mvs,
            output_dir=self.test_output_dir,
            config=MULTI_VIEW_PIPELINE
        )
    
    @patch('src.ai.gemini_api.requests.post')
    def test_generate_images_from_text(self, mock_post):
        """
        Test generating images from text description.
        """
        # Mock response
        mock_response = MagicMock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            "candidates": [
                {
                    "content": {
                        "parts": [
                            {
                                "text": "Generated image description"
                            },
                            {
                                "inlineData": {
                                    "mimeType": "image/png",
                                    "data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg=="
                                }
                            }
                        ]
                    }
                }
            ]
        }
        mock_post.return_value = mock_response
        
        # Test parameters
        prompt = "A low-poly rabbit"
        num_views = 4
        
        # Call the method
        results = self.pipeline.generate_images_from_text(prompt, num_views)
        
        # Verify the results
        self.assertEqual(len(results), num_views)
        for i, result in enumerate(results):
            self.assertTrue("view_direction" in result)
            self.assertEqual(result["view_index"], i + 1)
            self.assertTrue("local_path" in result)
            self.assertTrue(os.path.exists(result["local_path"]))
        
        # Verify the API calls
        self.assertEqual(mock_post.call_count, num_views)
    
    def test_approve_images(self):
        """
        Test approving images in the workflow.
        """
        # Create test images
        test_images = []
        for i in range(4):
            image_path = os.path.join(self.test_multi_view_dir, f"test_image_{i}.png")
            with open(image_path, "w") as f:
                f.write(f"test image data {i}")
            test_images.append({
                "id": f"image_{i}",
                "local_path": image_path,
                "view_index": i + 1,
                "view_direction": f"view_{i}"
            })
        
        # Add images to the pipeline
        self.pipeline.add_images_for_approval(test_images)
        
        # Approve some images
        self.pipeline.approve_image("image_0")
        self.pipeline.approve_image("image_1")
        self.pipeline.approve_image("image_2")
        
        # Reject an image
        self.pipeline.reject_image("image_3")
        
        # Get the status
        status = self.pipeline.get_approval_status()
        
        # Verify the status
        self.assertEqual(status["total_images"], 4)
        self.assertEqual(status["pending_count"], 0)
        self.assertEqual(status["approved_count"], 3)
        self.assertEqual(status["rejected_count"], 1)
        self.assertTrue(status["has_minimum_approved"])
    
    @patch('src.models.cuda_mvs.subprocess.run')
    def test_create_model_from_approved_images(self, mock_run):
        """
        Test creating a 3D model from approved images.
        """
        # Mock subprocess.run
        mock_process = MagicMock()
        mock_process.returncode = 0
        mock_run.return_value = mock_process
        
        # Create test images
        test_images = []
        for i in range(4):
            image_path = os.path.join(self.test_approved_dir, f"image_{i}.png")
            with open(image_path, "w") as f:
                f.write(f"test image data {i}")
            test_images.append({
                "id": f"image_{i}",
                "local_path": image_path,
                "view_index": i + 1,
                "view_direction": f"view_{i}"
            })
        
        # Create a mock model file
        model_path = os.path.join(self.test_models_dir, "test_model.obj")
        with open(model_path, "w") as f:
            f.write("test model data")
        
        # Call the method
        result = self.pipeline.create_model_from_approved_images("test_model")
        
        # Verify the result
        self.assertIsNotNone(result)
        self.assertTrue("model_path" in result)
        self.assertTrue("model_id" in result)
        self.assertTrue("format" in result)
        self.assertEqual(result["format"], "obj")
    
    @patch('src.ai.gemini_api.requests.post')
    @patch('src.models.cuda_mvs.subprocess.run')
    def test_complete_workflow(self, mock_run, mock_post):
        """
        Test the complete workflow from text to 3D model.
        """
        # Mock Gemini API response
        mock_response = MagicMock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            "candidates": [
                {
                    "content": {
                        "parts": [
                            {
                                "text": "Generated image description"
                            },
                            {
                                "inlineData": {
                                    "mimeType": "image/png",
                                    "data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg=="
                                }
                            }
                        ]
                    }
                }
            ]
        }
        mock_post.return_value = mock_response
        
        # Mock subprocess.run
        mock_process = MagicMock()
        mock_process.returncode = 0
        mock_run.return_value = mock_process
        
        # Create a mock model file
        model_path = os.path.join(self.test_models_dir, "test_model.obj")
        with open(model_path, "w") as f:
            f.write("test model data")
        
        # Create a pipeline with auto-approve
        auto_pipeline = MultiViewToModelPipeline(
            image_generator=self.image_generator,
            approval_manager=ImageApprovalManager(
                output_dir=self.test_multi_view_dir,
                approved_dir=self.test_approved_dir,
                min_approved_images=3,
                auto_approve=True
            ),
            model_generator=self.cuda_mvs,
            output_dir=self.test_output_dir,
            config=MULTI_VIEW_PIPELINE
        )
        
        # Test parameters
        prompt = "A low-poly rabbit"
        num_views = 4
        
        # Call the complete workflow
        result = auto_pipeline.complete_workflow(prompt, num_views, "test_model")
        
        # Verify the result
        self.assertIsNotNone(result)
        self.assertTrue("model_path" in result)
        self.assertTrue("model_id" in result)
        self.assertTrue("format" in result)
        self.assertEqual(result["format"], "obj")
        self.assertTrue("prompt" in result)
        self.assertEqual(result["prompt"], prompt)
        self.assertTrue("num_views" in result)
        self.assertEqual(result["num_views"], num_views)
        self.assertTrue("approved_images" in result)
        self.assertEqual(len(result["approved_images"]), num_views)
    
    @patch('src.remote.cuda_mvs_client.requests.post')
    @patch('src.remote.cuda_mvs_client.requests.get')
    def test_remote_workflow(self, mock_get, mock_post):
        """
        Test the workflow with remote CUDA MVS processing.
        """
        # Mock upload response
        mock_upload_response = MagicMock()
        mock_upload_response.status_code = 200
        mock_upload_response.json.return_value = {
            "job_id": "test_job_123",
            "status": "uploaded",
            "message": "Images uploaded successfully"
        }
        
        # Mock process response
        mock_process_response = MagicMock()
        mock_process_response.status_code = 200
        mock_process_response.json.return_value = {
            "job_id": "test_job_123",
            "status": "processing",
            "message": "Job started processing"
        }
        
        # Mock status response
        mock_status_response = MagicMock()
        mock_status_response.status_code = 200
        mock_status_response.json.return_value = {
            "job_id": "test_job_123",
            "status": "completed",
            "progress": 100,
            "message": "Job completed successfully"
        }
        
        # Mock download response
        mock_download_response = MagicMock()
        mock_download_response.status_code = 200
        mock_download_response.content = b"test model data"
        
        # Set up the mock responses
        mock_post.side_effect = [mock_upload_response, mock_process_response]
        mock_get.side_effect = [mock_status_response, mock_download_response]
        
        # Create test images
        test_images = []
        for i in range(4):
            image_path = os.path.join(self.test_approved_dir, f"image_{i}.png")
            with open(image_path, "w") as f:
                f.write(f"test image data {i}")
            test_images.append({
                "id": f"image_{i}",
                "local_path": image_path,
                "view_index": i + 1,
                "view_direction": f"view_{i}"
            })
        
        # Create a remote CUDA MVS client
        from src.remote.cuda_mvs_client import CUDAMVSClient
        remote_client = CUDAMVSClient(
            api_key=self.api_key,
            output_dir=self.test_models_dir
        )
        
        # Create a pipeline with the remote client
        remote_pipeline = MultiViewToModelPipeline(
            image_generator=self.image_generator,
            approval_manager=self.approval_manager,
            model_generator=remote_client,
            output_dir=self.test_output_dir,
            config=MULTI_VIEW_PIPELINE
        )
        
        # Add the approved images
        remote_pipeline.add_images_for_approval(test_images)
        for image in test_images:
            remote_pipeline.approve_image(image["id"])
        
        # Call the method to create a model using the remote client
        with patch('src.workflow.multi_view_to_model_pipeline.CUDAMVSClient', return_value=remote_client):
            result = remote_pipeline.create_model_from_approved_images("test_model", server_url="http://test-server:8765")
        
        # Verify the result
        self.assertIsNotNone(result)
        self.assertTrue("model_path" in result)
        self.assertTrue("model_id" in result)
        self.assertTrue("format" in result)
        self.assertEqual(result["format"], "obj")
        self.assertTrue("job_id" in result)
        self.assertEqual(result["job_id"], "test_job_123")
    
    def tearDown(self):
        """
        Clean up after tests.
        """
        # Clean up test output directory
        import shutil
        if os.path.exists(self.test_output_dir):
            shutil.rmtree(self.test_output_dir)

if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/src/workflow/multi_view_to_model_pipeline.py:
--------------------------------------------------------------------------------

```python
"""
Workflow orchestration for the multi-view to model pipeline.
"""

import os
import logging
import uuid
from typing import Dict, Any, List, Optional
from pathlib import Path

logger = logging.getLogger(__name__)

class MultiViewToModelPipeline:
    """
    Orchestrates the workflow from image or text prompt to 3D model:
    1. Generate image with Venice.ai or Google Gemini (optional)
    2. Generate multiple views with Google Gemini
    3. Process user approval of views
    4. Create 3D model with CUDA Multi-View Stereo
    5. Convert to OpenSCAD for parametric editing (optional)
    """
    
    def __init__(self,
               gemini_generator=None,
               venice_generator=None,
               cuda_mvs=None,
               openscad_wrapper=None,
               approval_tool=None,
               output_dir: str = "output/pipeline"):
        """
        Initialize the pipeline.
        
        Args:
            gemini_generator: Instance of GeminiImageGenerator
            venice_generator: Instance of VeniceImageGenerator (optional)
            cuda_mvs: Instance of CUDAMultiViewStereo
            openscad_wrapper: Instance of OpenSCADWrapper (optional)
            approval_tool: Instance of ImageApprovalTool
            output_dir: Directory to store output files
        """
        self.gemini_generator = gemini_generator
        self.venice_generator = venice_generator
        self.cuda_mvs = cuda_mvs
        self.openscad_wrapper = openscad_wrapper
        self.approval_tool = approval_tool
        self.output_dir = output_dir
        
        # Create output directories
        os.makedirs(os.path.join(output_dir, "images"), exist_ok=True)
        os.makedirs(os.path.join(output_dir, "multi_view"), exist_ok=True)
        os.makedirs(os.path.join(output_dir, "approved"), exist_ok=True)
        os.makedirs(os.path.join(output_dir, "models"), exist_ok=True)
        os.makedirs(os.path.join(output_dir, "scad"), exist_ok=True)
    
    def generate_model_from_text(self, prompt: str,
                               use_venice: bool = False,
                               num_views: int = 4,
                               gemini_params: Optional[Dict[str, Any]] = None,
                               venice_params: Optional[Dict[str, Any]] = None,
                               cuda_mvs_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        Generate a 3D model from a text prompt.
        
        Args:
            prompt: Text description for image generation
            use_venice: Whether to use Venice.ai for initial image
            num_views: Number of views to generate
            gemini_params: Optional parameters for Google Gemini
            venice_params: Optional parameters for Venice.ai
            cuda_mvs_params: Optional parameters for CUDA MVS
            
        Returns:
            Dictionary containing paths to generated files and metadata
        """
        try:
            # Generate a unique ID for this pipeline run
            pipeline_id = str(uuid.uuid4())
            logger.info(f"Starting pipeline {pipeline_id} for prompt: {prompt}")
            
            # Step 1: Generate initial image
            if use_venice and self.venice_generator:
                # Use Venice.ai for initial image
                logger.info("Using Venice.ai for initial image generation")
                image_path = os.path.join(self.output_dir, "images", f"{pipeline_id}_venice.png")
                initial_result = self.venice_generator.generate_image(
                    prompt=prompt,
                    output_path=image_path,
                    **(venice_params or {})
                )
            else:
                # Use Google Gemini for initial image
                logger.info("Using Google Gemini for initial image generation")
                image_path = os.path.join(self.output_dir, "images", f"{pipeline_id}_gemini.png")
                initial_result = self.gemini_generator.generate_image(
                    prompt=prompt,
                    output_path=image_path,
                    **(gemini_params or {})
                )
            
            # Step 2: Generate multiple views
            logger.info(f"Generating {num_views} views with Google Gemini")
            multi_view_dir = os.path.join(self.output_dir, "multi_view", pipeline_id)
            
            multi_views = self.gemini_generator.generate_multiple_views(
                prompt=prompt,
                num_views=num_views,
                base_image_path=image_path,
                output_dir=multi_view_dir
            )
            
            # Step 3: Present images for approval
            # In a real implementation, this would be handled by the MCP client
            # through the MCP tools interface
            logger.info("Preparing images for approval")
            approval_requests = []
            for view in multi_views:
                approval_request = self.approval_tool.present_image_for_approval(
                    image_path=view["local_path"],
                    metadata={
                        "prompt": view.get("prompt"),
                        "view_direction": view.get("view_direction"),
                        "view_index": view.get("view_index")
                    }
                )
                approval_requests.append(approval_request)
            
            # For the purpose of this implementation, we'll assume all views are approved
            # In a real implementation, this would be handled by the MCP client
            approved_images = []
            for req in approval_requests:
                approval_result = self.approval_tool.process_approval(
                    approval_id=req["approval_id"],
                    approved=True,
                    image_path=req["image_path"]
                )
                if approval_result["approved"]:
                    approved_images.append(approval_result["approved_path"])
            
            # Step 4: Generate 3D model with CUDA MVS
            logger.info("Generating 3D model with CUDA MVS")
            model_result = self.cuda_mvs.generate_model_from_images(
                image_paths=approved_images,
                output_name=pipeline_id,
                **(cuda_mvs_params or {})
            )
            
            # Step 5: Convert to OpenSCAD (if wrapper is available)
            scad_result = None
            if self.openscad_wrapper and model_result.get("point_cloud_file"):
                logger.info("Converting to OpenSCAD")
                scad_result = self._convert_to_openscad(model_result["point_cloud_file"], pipeline_id)
            
            # Compile results
            result = {
                "pipeline_id": pipeline_id,
                "prompt": prompt,
                "initial_image": initial_result,
                "multi_views": multi_views,
                "approved_images": approved_images,
                "model_3d": model_result,
            }
            
            if scad_result:
                result["openscad"] = scad_result
            
            logger.info(f"Pipeline {pipeline_id} completed successfully")
            return result
            
        except Exception as e:
            logger.error(f"Error in pipeline: {str(e)}")
            raise
    
    def generate_model_from_image(self, image_path: str,
                                prompt: Optional[str] = None,
                                num_views: int = 4,
                                gemini_params: Optional[Dict[str, Any]] = None,
                                cuda_mvs_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        Generate a 3D model from an existing image.
        
        Args:
            image_path: Path to input image
            prompt: Optional text description to guide multi-view generation
            num_views: Number of views to generate
            gemini_params: Optional parameters for Google Gemini
            cuda_mvs_params: Optional parameters for CUDA MVS
            
        Returns:
            Dictionary containing paths to generated files and metadata
        """
        try:
            # Generate a unique ID for this pipeline run
            pipeline_id = str(uuid.uuid4())
            logger.info(f"Starting pipeline {pipeline_id} from image: {image_path}")
            
            # Use provided prompt or generate one from the image
            if not prompt:
                # In a real implementation, you might use an image captioning model
                # to generate a description of the image
                prompt = f"3D object in the image {os.path.basename(image_path)}"
            
            # Step 1: Generate multiple views
            logger.info(f"Generating {num_views} views with Google Gemini")
            multi_view_dir = os.path.join(self.output_dir, "multi_view", pipeline_id)
            
            multi_views = self.gemini_generator.generate_multiple_views(
                prompt=prompt,
                num_views=num_views,
                base_image_path=image_path,
                output_dir=multi_view_dir
            )
            
            # Step 2: Present images for approval
            logger.info("Preparing images for approval")
            approval_requests = []
            for view in multi_views:
                approval_request = self.approval_tool.present_image_for_approval(
                    image_path=view["local_path"],
                    metadata={
                        "prompt": view.get("prompt"),
                        "view_direction": view.get("view_direction"),
                        "view_index": view.get("view_index")
                    }
                )
                approval_requests.append(approval_request)
            
            # For the purpose of this implementation, we'll assume all views are approved
            approved_images = []
            for req in approval_requests:
                approval_result = self.approval_tool.process_approval(
                    approval_id=req["approval_id"],
                    approved=True,
                    image_path=req["image_path"]
                )
                if approval_result["approved"]:
                    approved_images.append(approval_result["approved_path"])
            
            # Step 3: Generate 3D model with CUDA MVS
            logger.info("Generating 3D model with CUDA MVS")
            model_result = self.cuda_mvs.generate_model_from_images(
                image_paths=approved_images,
                output_name=pipeline_id,
                **(cuda_mvs_params or {})
            )
            
            # Step 4: Convert to OpenSCAD (if wrapper is available)
            scad_result = None
            if self.openscad_wrapper and model_result.get("point_cloud_file"):
                logger.info("Converting to OpenSCAD")
                scad_result = self._convert_to_openscad(model_result["point_cloud_file"], pipeline_id)
            
            # Compile results
            result = {
                "pipeline_id": pipeline_id,
                "prompt": prompt,
                "input_image": image_path,
                "multi_views": multi_views,
                "approved_images": approved_images,
                "model_3d": model_result,
            }
            
            if scad_result:
                result["openscad"] = scad_result
            
            logger.info(f"Pipeline {pipeline_id} completed successfully")
            return result
            
        except Exception as e:
            logger.error(f"Error in pipeline: {str(e)}")
            raise
    
    def _convert_to_openscad(self, model_path: str, model_id: str) -> Dict[str, Any]:
        """
        Convert 3D model to OpenSCAD format.
        
        Args:
            model_path: Path to input model (PLY file)
            model_id: Unique identifier for the model
            
        Returns:
            Dictionary containing paths to generated files
        """
        logger.info(f"Converting model to OpenSCAD: {model_path}")
        
        # Convert PLY to OBJ if needed
        if model_path.endswith('.ply'):
            obj_path = self.cuda_mvs.convert_ply_to_obj(model_path)
            model_path = obj_path
        
        # Generate OpenSCAD code for importing the model
        scad_code = f"""// Generated OpenSCAD code for model {model_id}
// Imported from {os.path.basename(model_path)}

// Parameters
scale_factor = 1.0;
position_x = 0;
position_y = 0;
position_z = 0;
rotation_x = 0;
rotation_y = 0;
rotation_z = 0;

// Import and transform the model
translate([position_x, position_y, position_z])
rotate([rotation_x, rotation_y, rotation_z])
scale(scale_factor)
import("{model_path}");
"""
        
        # Save SCAD code to file
        scad_file = self.openscad_wrapper.generate_scad(scad_code, model_id)
        
        # Generate previews
        previews = self.openscad_wrapper.generate_multi_angle_previews(scad_file)
        
        return {
            "scad_file": scad_file,
            "previews": previews,
            "model_path": model_path
        }
    
    def process_approval_results(self, approval_results: List[Dict[str, Any]]) -> List[str]:
        """
        Process approval results from the MCP client.
        
        Args:
            approval_results: List of approval results from the client
            
        Returns:
            List of paths to approved images
        """
        approved_images = []
        
        for result in approval_results:
            if result.get("approved", False) and "approved_path" in result:
                approved_images.append(result["approved_path"])
        
        return approved_images

```

--------------------------------------------------------------------------------
/src/remote/error_handling.py:
--------------------------------------------------------------------------------

```python
"""
Error handling and retry mechanisms for remote CUDA MVS processing.

This module provides utilities for handling network errors, implementing
retry mechanisms, and ensuring robust communication with remote CUDA MVS servers.
"""

import time
import random
import logging
import functools
from typing import Callable, Any, Type, Union, List, Dict, Optional, Tuple
import requests

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define common exception types for network operations
NETWORK_EXCEPTIONS = (
    requests.exceptions.ConnectionError,
    requests.exceptions.Timeout,
    requests.exceptions.HTTPError,
    requests.exceptions.RequestException,
    ConnectionRefusedError,
    TimeoutError,
)

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exception_types: Tuple[Type[Exception], ...] = NETWORK_EXCEPTIONS,
    jitter_factor: float = 0.1,
    logger_instance: Optional[logging.Logger] = None
) -> Callable:
    """
    Decorator for retrying a function with exponential backoff.
    
    Args:
        max_retries: Maximum number of retries
        base_delay: Base delay in seconds
        max_delay: Maximum delay in seconds
        exception_types: Tuple of exception types to catch and retry
        jitter_factor: Factor for random jitter (0.0 to 1.0)
        logger_instance: Logger instance to use (uses module logger if None)
    
    Returns:
        Decorator function
    """
    log = logger_instance or logger
    
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            retries = 0
            last_exception = None
            
            while True:
                try:
                    return func(*args, **kwargs)
                except exception_types as e:
                    retries += 1
                    last_exception = e
                    
                    if retries > max_retries:
                        log.error(f"Max retries ({max_retries}) exceeded: {str(e)}")
                        raise
                    
                    # Calculate delay with exponential backoff
                    delay = min(max_delay, base_delay * (2 ** (retries - 1)))
                    
                    # Add jitter to avoid thundering herd problem
                    jitter = random.uniform(0, jitter_factor * delay)
                    sleep_time = delay + jitter
                    
                    log.warning(f"Retry {retries}/{max_retries} after {sleep_time:.2f}s: {str(e)}")
                    time.sleep(sleep_time)
                except Exception as e:
                    # Don't retry other exceptions
                    log.error(f"Non-retryable exception: {str(e)}")
                    raise
        
        return wrapper
    
    return decorator

def timeout_handler(
    timeout: float,
    default_value: Any = None,
    exception_types: Tuple[Type[Exception], ...] = (TimeoutError, requests.exceptions.Timeout),
    logger_instance: Optional[logging.Logger] = None
) -> Callable:
    """
    Decorator for handling timeouts in function calls.
    
    Args:
        timeout: Timeout in seconds
        default_value: Value to return if timeout occurs
        exception_types: Tuple of exception types to catch as timeouts
        logger_instance: Logger instance to use (uses module logger if None)
    
    Returns:
        Decorator function
    """
    log = logger_instance or logger
    
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            import signal
            
            def timeout_handler(signum, frame):
                raise TimeoutError(f"Function {func.__name__} timed out after {timeout} seconds")
            
            # Set timeout using signal
            original_handler = signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(int(timeout))
            
            try:
                return func(*args, **kwargs)
            except exception_types as e:
                log.warning(f"Timeout in {func.__name__}: {str(e)}")
                return default_value
            finally:
                # Reset signal handler and alarm
                signal.signal(signal.SIGALRM, original_handler)
                signal.alarm(0)
        
        return wrapper
    
    return decorator

class NetworkErrorTracker:
    """
    Tracks network errors and provides information about error patterns.
    
    This class helps identify persistent network issues and can be used
    to make decisions about server availability.
    """
    
    def __init__(
        self,
        error_window: int = 10,
        error_threshold: float = 0.5,
        reset_after: int = 100
    ):
        """
        Initialize the error tracker.
        
        Args:
            error_window: Number of recent requests to consider
            error_threshold: Error rate threshold to consider a server problematic
            reset_after: Number of successful requests after which to reset error count
        """
        self.error_window = error_window
        self.error_threshold = error_threshold
        self.reset_after = reset_after
        
        self.requests = []  # List of (timestamp, success) tuples
        self.consecutive_successes = 0
        self.consecutive_failures = 0
    
    def record_request(self, success: bool) -> None:
        """
        Record the result of a request.
        
        Args:
            success: Whether the request was successful
        """
        timestamp = time.time()
        self.requests.append((timestamp, success))
        
        # Trim old requests outside the window
        self._trim_old_requests()
        
        # Update consecutive counters
        if success:
            self.consecutive_successes += 1
            self.consecutive_failures = 0
            
            # Reset error count after enough consecutive successes
            if self.consecutive_successes >= self.reset_after:
                self.requests = [(timestamp, True)]
                self.consecutive_successes = 1
        else:
            self.consecutive_failures += 1
            self.consecutive_successes = 0
    
    def _trim_old_requests(self) -> None:
        """
        Remove requests that are outside the current window.
        """
        if len(self.requests) > self.error_window:
            self.requests = self.requests[-self.error_window:]
    
    def get_error_rate(self) -> float:
        """
        Get the current error rate.
        
        Returns:
            Error rate as a float between 0.0 and 1.0
        """
        if not self.requests:
            return 0.0
        
        failures = sum(1 for _, success in self.requests if not success)
        return failures / len(self.requests)
    
    def is_server_problematic(self) -> bool:
        """
        Check if the server is experiencing persistent issues.
        
        Returns:
            True if the server is problematic, False otherwise
        """
        return self.get_error_rate() >= self.error_threshold
    
    def get_status(self) -> Dict[str, Any]:
        """
        Get the current status of the error tracker.
        
        Returns:
            Dictionary with status information
        """
        return {
            "error_rate": self.get_error_rate(),
            "is_problematic": self.is_server_problematic(),
            "consecutive_successes": self.consecutive_successes,
            "consecutive_failures": self.consecutive_failures,
            "total_requests": len(self.requests),
            "recent_failures": sum(1 for _, success in self.requests if not success)
        }

class CircuitBreaker:
    """
    Circuit breaker pattern implementation for network requests.
    
    This class helps prevent cascading failures by stopping requests
    to a problematic server until it recovers.
    """
    
    # Circuit states
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # No requests allowed
    HALF_OPEN = "half_open"  # Testing if service is back
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        reset_timeout: float = 60.0,
        logger_instance: Optional[logging.Logger] = None
    ):
        """
        Initialize the circuit breaker.
        
        Args:
            failure_threshold: Number of consecutive failures before opening
            recovery_timeout: Time in seconds before testing recovery
            reset_timeout: Time in seconds before fully resetting
            logger_instance: Logger instance to use (uses module logger if None)
        """
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.reset_timeout = reset_timeout
        self.log = logger_instance or logger
        
        self.state = self.CLOSED
        self.failure_count = 0
        self.last_failure_time = 0
        self.last_success_time = 0
    
    def record_success(self) -> None:
        """
        Record a successful request.
        """
        self.last_success_time = time.time()
        
        if self.state == self.HALF_OPEN:
            self.log.info("Circuit breaker reset to closed state after successful test request")
            self.state = self.CLOSED
            self.failure_count = 0
        elif self.state == self.CLOSED:
            # Reset failure count after a successful request
            self.failure_count = 0
    
    def record_failure(self) -> None:
        """
        Record a failed request.
        """
        self.last_failure_time = time.time()
        
        if self.state == self.CLOSED:
            self.failure_count += 1
            
            if self.failure_count >= self.failure_threshold:
                self.log.warning(f"Circuit breaker opened after {self.failure_count} consecutive failures")
                self.state = self.OPEN
        elif self.state == self.HALF_OPEN:
            self.log.warning("Circuit breaker opened again after failed test request")
            self.state = self.OPEN
    
    def allow_request(self) -> bool:
        """
        Check if a request should be allowed.
        
        Returns:
            True if the request should be allowed, False otherwise
        """
        if self.state == self.CLOSED:
            return True
        
        if self.state == self.OPEN:
            # Check if recovery timeout has elapsed
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.log.info("Circuit breaker entering half-open state to test service")
                self.state = self.HALF_OPEN
                return True
            return False
        
        # In HALF_OPEN state, allow only one request
        return True
    
    def reset(self) -> None:
        """
        Reset the circuit breaker to closed state.
        """
        self.state = self.CLOSED
        self.failure_count = 0
        self.log.info("Circuit breaker manually reset to closed state")
    
    def get_status(self) -> Dict[str, Any]:
        """
        Get the current status of the circuit breaker.
        
        Returns:
            Dictionary with status information
        """
        now = time.time()
        return {
            "state": self.state,
            "failure_count": self.failure_count,
            "time_since_last_failure": now - self.last_failure_time if self.last_failure_time > 0 else None,
            "time_since_last_success": now - self.last_success_time if self.last_success_time > 0 else None,
            "recovery_timeout": self.recovery_timeout,
            "reset_timeout": self.reset_timeout
        }

def safe_request(
    url: str,
    method: str = "GET",
    circuit_breaker: Optional[CircuitBreaker] = None,
    error_tracker: Optional[NetworkErrorTracker] = None,
    retry_count: int = 3,
    timeout: float = 30.0,
    **kwargs
) -> Optional[requests.Response]:
    """
    Make a safe HTTP request with circuit breaker and retry logic.
    
    Args:
        url: URL to request
        method: HTTP method (GET, POST, etc.)
        circuit_breaker: Circuit breaker instance
        error_tracker: Error tracker instance
        retry_count: Number of retries
        timeout: Request timeout in seconds
        **kwargs: Additional arguments for requests
    
    Returns:
        Response object or None if request failed
    """
    # Check circuit breaker
    if circuit_breaker and not circuit_breaker.allow_request():
        logger.warning(f"Circuit breaker prevented request to {url}")
        return None
    
    # Set default timeout
    kwargs.setdefault("timeout", timeout)
    
    # Make request with retry
    response = None
    success = False
    
    try:
        for attempt in range(retry_count + 1):
            try:
                response = requests.request(method, url, **kwargs)
                response.raise_for_status()
                success = True
                break
            except NETWORK_EXCEPTIONS as e:
                if attempt < retry_count:
                    delay = 2 ** attempt + random.uniform(0, 1)
                    logger.warning(f"Request to {url} failed (attempt {attempt+1}/{retry_count+1}): {str(e)}. Retrying in {delay:.2f}s")
                    time.sleep(delay)
                else:
                    logger.error(f"Request to {url} failed after {retry_count+1} attempts: {str(e)}")
                    raise
    except Exception as e:
        logger.error(f"Error making request to {url}: {str(e)}")
        success = False
    
    # Update circuit breaker and error tracker
    if circuit_breaker:
        if success:
            circuit_breaker.record_success()
        else:
            circuit_breaker.record_failure()
    
    if error_tracker:
        error_tracker.record_request(success)
    
    return response if success else None

```

--------------------------------------------------------------------------------
/src/openscad_wrapper/wrapper.py:
--------------------------------------------------------------------------------

```python
import os
import subprocess
import uuid
import logging
from typing import Dict, Any, List, Tuple, Optional

logger = logging.getLogger(__name__)

class OpenSCADWrapper:
    """
    Wrapper for OpenSCAD command-line interface.
    Provides methods to generate SCAD code, STL files, and preview images.
    """
    
    def __init__(self, scad_dir: str, output_dir: str):
        """
        Initialize the OpenSCAD wrapper.
        
        Args:
            scad_dir: Directory to store SCAD files
            output_dir: Directory to store output files (STL, PNG)
        """
        self.scad_dir = scad_dir
        self.output_dir = output_dir
        self.stl_dir = os.path.join(output_dir, "stl")
        self.preview_dir = os.path.join(output_dir, "preview")
        
        # Create directories if they don't exist
        os.makedirs(self.scad_dir, exist_ok=True)
        os.makedirs(self.stl_dir, exist_ok=True)
        os.makedirs(self.preview_dir, exist_ok=True)
        
        # Basic shape templates
        self.shape_templates = {
            "cube": self._cube_template,
            "sphere": self._sphere_template,
            "cylinder": self._cylinder_template,
            "box": self._box_template,
            "rounded_box": self._rounded_box_template,
        }
    
    def generate_scad_code(self, model_type: str, parameters: Dict[str, Any]) -> str:
        """
        Generate OpenSCAD code for a given model type and parameters.
        
        Args:
            model_type: Type of model to generate (cube, sphere, cylinder, etc.)
            parameters: Dictionary of parameters for the model
            
        Returns:
            Path to the generated SCAD file
        """
        model_id = str(uuid.uuid4())
        scad_file = os.path.join(self.scad_dir, f"{model_id}.scad")
        
        # Get the template function for the model type
        template_func = self.shape_templates.get(model_type)
        if not template_func:
            raise ValueError(f"Unsupported model type: {model_type}")
        
        # Generate SCAD code using the template
        scad_code = template_func(parameters)
        
        # Write SCAD code to file
        with open(scad_file, 'w') as f:
            f.write(scad_code)
        
        logger.info(f"Generated SCAD file: {scad_file}")
        return scad_file
        
    def generate_scad(self, scad_code: str, model_id: str) -> str:
        """
        Save OpenSCAD code to a file with a specific model ID.
        
        Args:
            scad_code: OpenSCAD code to save
            model_id: ID to use for the file name
            
        Returns:
            Path to the saved SCAD file
        """
        scad_file = os.path.join(self.scad_dir, f"{model_id}.scad")
        
        # Write SCAD code to file
        with open(scad_file, 'w') as f:
            f.write(scad_code)
        
        logger.info(f"Generated SCAD file: {scad_file}")
        return scad_file
    
    def update_scad_code(self, model_id: str, parameters: Dict[str, Any]) -> str:
        """
        Update an existing SCAD file with new parameters.
        
        Args:
            model_id: ID of the model to update
            parameters: New parameters for the model
            
        Returns:
            Path to the updated SCAD file
        """
        scad_file = os.path.join(self.scad_dir, f"{model_id}.scad")
        if not os.path.exists(scad_file):
            raise FileNotFoundError(f"SCAD file not found: {scad_file}")
        
        # Read the existing SCAD file to determine its type
        with open(scad_file, 'r') as f:
            scad_code = f.read()
        
        # Determine model type from the code (simplified approach)
        model_type = None
        for shape_type in self.shape_templates:
            if shape_type in scad_code.lower():
                model_type = shape_type
                break
        
        if not model_type:
            raise ValueError("Could not determine model type from existing SCAD file")
        
        # Generate new SCAD code
        new_scad_code = self.shape_templates[model_type](parameters)
        
        # Write updated SCAD code to file
        with open(scad_file, 'w') as f:
            f.write(new_scad_code)
        
        logger.info(f"Updated SCAD file: {scad_file}")
        return scad_file
    
    def generate_stl(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> str:
        """
        Generate an STL file from a SCAD file.
        
        Args:
            scad_file: Path to the SCAD file
            parameters: Optional parameters to override in the SCAD file
            
        Returns:
            Path to the generated STL file
        """
        model_id = os.path.basename(scad_file).split('.')[0]
        stl_file = os.path.join(self.stl_dir, f"{model_id}.stl")
        
        # Build command
        cmd = ["openscad", "-o", stl_file]
        
        # Add parameters if provided
        if parameters:
            for key, value in parameters.items():
                cmd.extend(["-D", f"{key}={value}"])
        
        # Add input file
        cmd.append(scad_file)
        
        # Run OpenSCAD
        try:
            result = subprocess.run(cmd, check=True, capture_output=True, text=True)
            logger.info(f"Generated STL file: {stl_file}")
            logger.debug(result.stdout)
            return stl_file
        except subprocess.CalledProcessError as e:
            logger.error(f"Error generating STL file: {e.stderr}")
            raise RuntimeError(f"Failed to generate STL file: {e.stderr}")
    
    def generate_preview(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None,
                        camera_position: str = "0,0,0,0,0,0,50", 
                        image_size: str = "800,600") -> str:
        """
        Generate a preview image from a SCAD file.
        
        Args:
            scad_file: Path to the SCAD file
            parameters: Optional parameters to override in the SCAD file
            camera_position: Camera position in format "tx,ty,tz,rx,ry,rz,dist"
            image_size: Image size in format "width,height"
            
        Returns:
            Path to the generated preview image
        """
        model_id = os.path.basename(scad_file).split('.')[0]
        preview_file = os.path.join(self.preview_dir, f"{model_id}.png")
        
        # Build command
        cmd = ["openscad", "--camera", camera_position, "--imgsize", image_size, "-o", preview_file]
        
        # Add parameters if provided
        if parameters:
            for key, value in parameters.items():
                cmd.extend(["-D", f"{key}={value}"])
        
        # Add input file
        cmd.append(scad_file)
        
        # Run OpenSCAD
        try:
            result = subprocess.run(cmd, check=True, capture_output=True, text=True)
            logger.info(f"Generated preview image: {preview_file}")
            logger.debug(result.stdout)
            return preview_file
        except subprocess.CalledProcessError as e:
            logger.error(f"Error generating preview image: {e.stderr}")
            # Since we know there might be issues with headless rendering, we'll create a placeholder
            logger.warning("Using placeholder image due to rendering error")
            return self._create_placeholder_image(preview_file)
    
    def _create_placeholder_image(self, output_path: str) -> str:
        """Create a simple placeholder image when rendering fails."""
        try:
            from PIL import Image, ImageDraw, ImageFont
            
            # Create a blank image
            img = Image.new('RGB', (800, 600), color=(240, 240, 240))
            draw = ImageDraw.Draw(img)
            
            # Add text
            draw.text((400, 300), "Preview not available", fill=(0, 0, 0))
            
            # Save the image
            img.save(output_path)
            return output_path
        except Exception as e:
            logger.error(f"Error creating placeholder image: {str(e)}")
            # If all else fails, return the path anyway
            return output_path
            
    def generate_multi_angle_previews(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
        """
        Generate preview images from multiple angles for a SCAD file.
        
        Args:
            scad_file: Path to the SCAD file
            parameters: Optional parameters to override in the SCAD file
            
        Returns:
            Dictionary mapping view names to preview image paths
        """
        # Define camera positions for different views
        camera_positions = {
            "front": "0,0,0,0,0,0,50",
            "top": "0,0,0,90,0,0,50",
            "right": "0,0,0,0,90,0,50",
            "perspective": "40,30,30,55,0,25,100"
        }
        
        # Generate preview for each view
        previews = {}
        for view, camera_position in camera_positions.items():
            try:
                model_id = os.path.basename(scad_file).split('.')[0]
                preview_file = os.path.join(self.preview_dir, f"{model_id}_{view}.png")
                
                # Build command
                cmd = ["openscad", "--camera", camera_position, "--imgsize", "800,600", "-o", preview_file]
                
                # Add parameters if provided
                if parameters:
                    for key, value in parameters.items():
                        cmd.extend(["-D", f"{key}={value}"])
                
                # Add input file
                cmd.append(scad_file)
                
                # Run OpenSCAD
                result = subprocess.run(cmd, check=True, capture_output=True, text=True)
                logger.info(f"Generated {view} preview: {preview_file}")
                previews[view] = preview_file
            except subprocess.CalledProcessError as e:
                logger.error(f"Error generating {view} preview: {e.stderr}")
                # Create a placeholder image for this view
                preview_file = os.path.join(self.preview_dir, f"{model_id}_{view}.png")
                previews[view] = self._create_placeholder_image(preview_file)
        
        return previews
    
    # Template functions for basic shapes
    
    def _cube_template(self, params: Dict[str, Any]) -> str:
        """Generate SCAD code for a cube."""
        size_x = params.get('width', 10)
        size_y = params.get('depth', 10)
        size_z = params.get('height', 10)
        center = params.get('center', 'false').lower() == 'true'
        
        return f"""// Cube
// Parameters:
//   width = {size_x}
//   depth = {size_y}
//   height = {size_z}
//   center = {str(center).lower()}

width = {size_x};
depth = {size_y};
height = {size_z};
center = {str(center).lower()};

cube([width, depth, height], center=center);
"""
    
    def _sphere_template(self, params: Dict[str, Any]) -> str:
        """Generate SCAD code for a sphere."""
        radius = params.get('radius', 10)
        segments = params.get('segments', 32)
        
        return f"""// Sphere
// Parameters:
//   radius = {radius}
//   segments = {segments}

radius = {radius};
$fn = {segments};

sphere(r=radius);
"""
    
    def _cylinder_template(self, params: Dict[str, Any]) -> str:
        """Generate SCAD code for a cylinder."""
        radius = params.get('radius', 10)
        height = params.get('height', 20)
        center = params.get('center', 'false').lower() == 'true'
        segments = params.get('segments', 32)
        
        return f"""// Cylinder
// Parameters:
//   radius = {radius}
//   height = {height}
//   center = {str(center).lower()}
//   segments = {segments}

radius = {radius};
height = {height};
center = {str(center).lower()};
$fn = {segments};

cylinder(h=height, r=radius, center=center);
"""
    
    def _box_template(self, params: Dict[str, Any]) -> str:
        """Generate SCAD code for a hollow box."""
        width = params.get('width', 30)
        depth = params.get('depth', 20)
        height = params.get('height', 15)
        thickness = params.get('thickness', 2)
        
        return f"""// Hollow Box
// Parameters:
//   width = {width}
//   depth = {depth}
//   height = {height}
//   thickness = {thickness}

width = {width};
depth = {depth};
height = {height};
thickness = {thickness};

module box(width, depth, height, thickness) {{
    difference() {{
        cube([width, depth, height]);
        translate([thickness, thickness, thickness])
        cube([width - 2*thickness, depth - 2*thickness, height - thickness]);
    }}
}}

box(width, depth, height, thickness);
"""
    
    def _rounded_box_template(self, params: Dict[str, Any]) -> str:
        """Generate SCAD code for a rounded box."""
        width = params.get('width', 30)
        depth = params.get('depth', 20)
        height = params.get('height', 15)
        radius = params.get('radius', 3)
        segments = params.get('segments', 32)
        
        return f"""// Rounded Box
// Parameters:
//   width = {width}
//   depth = {depth}
//   height = {height}
//   radius = {radius}
//   segments = {segments}

width = {width};
depth = {depth};
height = {height};
radius = {radius};
$fn = {segments};

module rounded_box(width, depth, height, radius) {{
    hull() {{
        translate([radius, radius, radius])
        sphere(r=radius);
        
        translate([width-radius, radius, radius])
        sphere(r=radius);
        
        translate([radius, depth-radius, radius])
        sphere(r=radius);
        
        translate([width-radius, depth-radius, radius])
        sphere(r=radius);
        
        translate([radius, radius, height-radius])
        sphere(r=radius);
        
        translate([width-radius, radius, height-radius])
        sphere(r=radius);
        
        translate([radius, depth-radius, height-radius])
        sphere(r=radius);
        
        translate([width-radius, depth-radius, height-radius])
        sphere(r=radius);
    }}
}}

rounded_box(width, depth, height, radius);
"""

```

--------------------------------------------------------------------------------
/src/remote/cuda_mvs_client.py:
--------------------------------------------------------------------------------

```python
"""
Client for remote CUDA Multi-View Stereo processing.

This module provides a client to connect to a remote CUDA MVS server
within the LAN for processing multi-view images into 3D models.
"""

import os
import json
import logging
import requests
import base64
from typing import Dict, List, Optional, Any, Union
from pathlib import Path
import uuid

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CUDAMVSClient:
    """
    Client for connecting to a remote CUDA Multi-View Stereo server.
    
    This client handles:
    1. Discovering available CUDA MVS servers on the LAN
    2. Uploading images to the server
    3. Requesting 3D reconstruction
    4. Downloading the resulting 3D models
    5. Monitoring job status
    """
    
    def __init__(
        self,
        server_url: Optional[str] = None,
        api_key: Optional[str] = None,
        output_dir: str = "output/models",
        discovery_port: int = 8765,
        connection_timeout: int = 10,
        upload_chunk_size: int = 1024 * 1024,  # 1MB chunks
    ):
        """
        Initialize the CUDA MVS client.
        
        Args:
            server_url: URL of the CUDA MVS server (if known)
            api_key: API key for authentication (if required)
            output_dir: Directory to save downloaded models
            discovery_port: Port used for server discovery
            connection_timeout: Timeout for server connections in seconds
            upload_chunk_size: Chunk size for file uploads in bytes
        """
        self.server_url = server_url
        self.api_key = api_key
        self.output_dir = output_dir
        self.discovery_port = discovery_port
        self.connection_timeout = connection_timeout
        self.upload_chunk_size = upload_chunk_size
        
        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)
        
        # Initialize session for connection pooling
        self.session = requests.Session()
        if api_key:
            self.session.headers.update({"Authorization": f"Bearer {api_key}"})
    
    def discover_servers(self) -> List[Dict[str, Any]]:
        """
        Discover CUDA MVS servers on the local network.
        
        Returns:
            List of dictionaries containing server information:
            [
                {
                    "server_id": "unique-server-id",
                    "name": "CUDA MVS Server 1",
                    "url": "http://192.168.1.100:8765",
                    "capabilities": {
                        "max_images": 50,
                        "max_resolution": 4096,
                        "supported_formats": ["jpg", "png"],
                        "gpu_info": "NVIDIA RTX 4090 24GB"
                    },
                    "status": "available"
                },
                ...
            ]
        """
        import socket
        import json
        from zeroconf import ServiceBrowser, ServiceListener, Zeroconf
        
        discovered_servers = []
        
        class CUDAMVSListener(ServiceListener):
            def add_service(self, zc, type_, name):
                info = zc.get_service_info(type_, name)
                if info:
                    server_info = {
                        "server_id": name.split('.')[0],
                        "name": info.properties.get(b'name', b'Unknown').decode('utf-8'),
                        "url": f"http://{socket.inet_ntoa(info.addresses[0])}:{info.port}",
                        "capabilities": json.loads(info.properties.get(b'capabilities', b'{}').decode('utf-8')),
                        "status": "available"
                    }
                    discovered_servers.append(server_info)
                    logger.info(f"Discovered CUDA MVS server: {server_info['name']} at {server_info['url']}")
        
        try:
            zeroconf = Zeroconf()
            listener = CUDAMVSListener()
            browser = ServiceBrowser(zeroconf, "_cudamvs._tcp.local.", listener)
            
            # Wait for discovery (non-blocking in production code)
            import time
            time.sleep(2)  # Give some time for discovery
            
            zeroconf.close()
            return discovered_servers
        except Exception as e:
            logger.error(f"Error discovering CUDA MVS servers: {e}")
            return []
    
    def test_connection(self, server_url: Optional[str] = None) -> Dict[str, Any]:
        """
        Test connection to a CUDA MVS server.
        
        Args:
            server_url: URL of the server to test (uses self.server_url if None)
            
        Returns:
            Dictionary with connection status and server information
        """
        url = server_url or self.server_url
        if not url:
            return {"status": "error", "message": "No server URL provided"}
        
        try:
            response = self.session.get(
                f"{url}/api/status",
                timeout=self.connection_timeout
            )
            
            if response.status_code == 200:
                return {
                    "status": "success",
                    "server_info": response.json(),
                    "latency_ms": response.elapsed.total_seconds() * 1000
                }
            else:
                return {
                    "status": "error",
                    "message": f"Server returned status code {response.status_code}",
                    "details": response.text
                }
        except requests.exceptions.RequestException as e:
            return {"status": "error", "message": f"Connection error: {str(e)}"}
    
    def upload_images(self, image_paths: List[str]) -> Dict[str, Any]:
        """
        Upload images to the CUDA MVS server.
        
        Args:
            image_paths: List of paths to images to upload
            
        Returns:
            Dictionary with upload status and job information
        """
        if not self.server_url:
            return {"status": "error", "message": "No server URL configured"}
        
        # Create a new job
        try:
            response = self.session.post(
                f"{self.server_url}/api/jobs",
                json={"num_images": len(image_paths)},
                timeout=self.connection_timeout
            )
            
            if response.status_code != 201:
                return {
                    "status": "error",
                    "message": f"Failed to create job: {response.status_code}",
                    "details": response.text
                }
            
            job_info = response.json()
            job_id = job_info["job_id"]
            
            # Upload each image
            for i, image_path in enumerate(image_paths):
                # Check if file exists
                if not os.path.exists(image_path):
                    return {
                        "status": "error",
                        "message": f"Image file not found: {image_path}"
                    }
                
                # Get file size for progress tracking
                file_size = os.path.getsize(image_path)
                
                # Prepare upload
                with open(image_path, "rb") as f:
                    files = {
                        "file": (os.path.basename(image_path), f, "image/jpeg" if image_path.endswith(".jpg") else "image/png")
                    }
                    
                    metadata = {
                        "image_index": i,
                        "total_images": len(image_paths),
                        "filename": os.path.basename(image_path)
                    }
                    
                    response = self.session.post(
                        f"{self.server_url}/api/jobs/{job_id}/images",
                        files=files,
                        data={"metadata": json.dumps(metadata)},
                        timeout=None  # No timeout for uploads
                    )
                    
                    if response.status_code != 200:
                        return {
                            "status": "error",
                            "message": f"Failed to upload image {i+1}/{len(image_paths)}: {response.status_code}",
                            "details": response.text
                        }
                
                logger.info(f"Uploaded image {i+1}/{len(image_paths)}: {os.path.basename(image_path)}")
            
            # Start processing
            response = self.session.post(
                f"{self.server_url}/api/jobs/{job_id}/process",
                timeout=self.connection_timeout
            )
            
            if response.status_code != 202:
                return {
                    "status": "error",
                    "message": f"Failed to start processing: {response.status_code}",
                    "details": response.text
                }
            
            return {
                "status": "success",
                "job_id": job_id,
                "message": f"Uploaded {len(image_paths)} images and started processing",
                "job_url": f"{self.server_url}/api/jobs/{job_id}"
            }
            
        except requests.exceptions.RequestException as e:
            return {"status": "error", "message": f"Upload error: {str(e)}"}
    
    def get_job_status(self, job_id: str) -> Dict[str, Any]:
        """
        Get the status of a CUDA MVS job.
        
        Args:
            job_id: ID of the job to check
            
        Returns:
            Dictionary with job status information
        """
        if not self.server_url:
            return {"status": "error", "message": "No server URL configured"}
        
        try:
            response = self.session.get(
                f"{self.server_url}/api/jobs/{job_id}",
                timeout=self.connection_timeout
            )
            
            if response.status_code == 200:
                return {
                    "status": "success",
                    "job_info": response.json()
                }
            else:
                return {
                    "status": "error",
                    "message": f"Failed to get job status: {response.status_code}",
                    "details": response.text
                }
        except requests.exceptions.RequestException as e:
            return {"status": "error", "message": f"Connection error: {str(e)}"}
    
    def download_model(self, job_id: str, output_format: str = "obj") -> Dict[str, Any]:
        """
        Download a processed 3D model from the CUDA MVS server.
        
        Args:
            job_id: ID of the job to download
            output_format: Format of the model to download (obj, ply, etc.)
            
        Returns:
            Dictionary with download status and local file path
        """
        if not self.server_url:
            return {"status": "error", "message": "No server URL configured"}
        
        # Check job status first
        status_result = self.get_job_status(job_id)
        if status_result["status"] != "success":
            return status_result
        
        job_info = status_result["job_info"]
        if job_info["status"] != "completed":
            return {
                "status": "error",
                "message": f"Job is not completed yet. Current status: {job_info['status']}",
                "job_info": job_info
            }
        
        # Download the model
        try:
            response = self.session.get(
                f"{self.server_url}/api/jobs/{job_id}/model?format={output_format}",
                stream=True,
                timeout=None  # No timeout for downloads
            )
            
            if response.status_code != 200:
                return {
                    "status": "error",
                    "message": f"Failed to download model: {response.status_code}",
                    "details": response.text
                }
            
            # Create a unique filename
            model_id = job_info.get("model_id", str(uuid.uuid4()))
            output_path = os.path.join(self.output_dir, f"{model_id}.{output_format}")
            
            # Save the file
            with open(output_path, "wb") as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
            
            logger.info(f"Downloaded model to {output_path}")
            
            return {
                "status": "success",
                "model_id": model_id,
                "local_path": output_path,
                "format": output_format,
                "job_id": job_id
            }
            
        except requests.exceptions.RequestException as e:
            return {"status": "error", "message": f"Download error: {str(e)}"}
    
    def cancel_job(self, job_id: str) -> Dict[str, Any]:
        """
        Cancel a running CUDA MVS job.
        
        Args:
            job_id: ID of the job to cancel
            
        Returns:
            Dictionary with cancellation status
        """
        if not self.server_url:
            return {"status": "error", "message": "No server URL configured"}
        
        try:
            response = self.session.delete(
                f"{self.server_url}/api/jobs/{job_id}",
                timeout=self.connection_timeout
            )
            
            if response.status_code == 200:
                return {
                    "status": "success",
                    "message": "Job cancelled successfully"
                }
            else:
                return {
                    "status": "error",
                    "message": f"Failed to cancel job: {response.status_code}",
                    "details": response.text
                }
        except requests.exceptions.RequestException as e:
            return {"status": "error", "message": f"Connection error: {str(e)}"}
    
    def generate_model_from_images(
        self,
        image_paths: List[str],
        output_format: str = "obj",
        wait_for_completion: bool = True,
        poll_interval: int = 5
    ) -> Dict[str, Any]:
        """
        Complete workflow to generate a 3D model from images.
        
        Args:
            image_paths: List of paths to images
            output_format: Format of the output model
            wait_for_completion: Whether to wait for job completion
            poll_interval: Interval in seconds to poll for job status
            
        Returns:
            Dictionary with job status and model information if completed
        """
        # Upload images and start processing
        upload_result = self.upload_images(image_paths)
        if upload_result["status"] != "success":
            return upload_result
        
        job_id = upload_result["job_id"]
        
        # If not waiting for completion, return the job info
        if not wait_for_completion:
            return upload_result
        
        # Poll for job completion
        import time
        while True:
            status_result = self.get_job_status(job_id)
            if status_result["status"] != "success":
                return status_result
            
            job_info = status_result["job_info"]
            if job_info["status"] == "completed":
                # Download the model
                return self.download_model(job_id, output_format)
            elif job_info["status"] == "failed":
                return {
                    "status": "error",
                    "message": "Job processing failed",
                    "job_info": job_info
                }
            
            # Wait before polling again
            time.sleep(poll_interval)
            logger.info(f"Job {job_id} status: {job_info['status']}, progress: {job_info.get('progress', 0)}%")

```

--------------------------------------------------------------------------------
/src/printer_discovery/printer_discovery.py:
--------------------------------------------------------------------------------

```python
import os
import logging
import socket
import json
import time
import threading
from typing import Dict, List, Any, Optional, Callable

logger = logging.getLogger(__name__)

class PrinterDiscovery:
    """
    Discovers 3D printers on the network and provides interfaces for direct printing.
    """
    
    def __init__(self):
        """Initialize the printer discovery service."""
        self.printers = {}  # Dictionary of discovered printers
        self.discovery_thread = None
        self.discovery_stop_event = threading.Event()
        self.discovery_callback = None
    
    def start_discovery(self, callback: Optional[Callable[[Dict[str, Any]], None]] = None) -> None:
        """
        Start discovering 3D printers on the network.
        
        Args:
            callback: Optional callback function to call when a printer is discovered
        """
        if self.discovery_thread and self.discovery_thread.is_alive():
            logger.warning("Printer discovery already running")
            return
        
        self.discovery_callback = callback
        self.discovery_stop_event.clear()
        self.discovery_thread = threading.Thread(target=self._discover_printers)
        self.discovery_thread.daemon = True
        self.discovery_thread.start()
        
        logger.info("Started printer discovery")
    
    def stop_discovery(self) -> None:
        """Stop discovering 3D printers."""
        if self.discovery_thread and self.discovery_thread.is_alive():
            self.discovery_stop_event.set()
            self.discovery_thread.join(timeout=2.0)
            logger.info("Stopped printer discovery")
        else:
            logger.warning("Printer discovery not running")
    
    def get_printers(self) -> Dict[str, Any]:
        """
        Get the list of discovered printers.
        
        Returns:
            Dictionary of printer information
        """
        return self.printers
    
    def _discover_printers(self) -> None:
        """Discover 3D printers on the network using various protocols."""
        # This is a simplified implementation that simulates printer discovery
        # In a real implementation, you would use protocols like mDNS, SNMP, or OctoPrint API
        
        # Simulate discovering printers
        while not self.discovery_stop_event.is_set():
            try:
                # Simulate network discovery
                self._discover_octoprint_printers()
                self._discover_prusa_printers()
                self._discover_ultimaker_printers()
                
                # Wait before next discovery cycle
                time.sleep(10)
            except Exception as e:
                logger.error(f"Error in printer discovery: {str(e)}")
                time.sleep(5)
    
    def _discover_octoprint_printers(self) -> None:
        """Discover OctoPrint servers on the network."""
        # Simulate discovering OctoPrint servers
        # In a real implementation, you would use mDNS to discover OctoPrint instances
        
        # Simulate finding a printer
        printer_id = "octoprint_1"
        if printer_id not in self.printers:
            printer_info = {
                "id": printer_id,
                "name": "OctoPrint Printer",
                "type": "octoprint",
                "address": "192.168.1.100",
                "port": 80,
                "api_key": None,  # Would need to be provided by user
                "status": "online",
                "capabilities": ["print", "status", "cancel"]
            }
            
            self.printers[printer_id] = printer_info
            
            if self.discovery_callback:
                self.discovery_callback(printer_info)
            
            logger.info(f"Discovered OctoPrint printer: {printer_info['name']}")
    
    def _discover_prusa_printers(self) -> None:
        """Discover Prusa printers on the network."""
        # Simulate discovering Prusa printers
        
        # Simulate finding a printer
        printer_id = "prusa_1"
        if printer_id not in self.printers:
            printer_info = {
                "id": printer_id,
                "name": "Prusa MK3S",
                "type": "prusa",
                "address": "192.168.1.101",
                "port": 80,
                "status": "online",
                "capabilities": ["print", "status"]
            }
            
            self.printers[printer_id] = printer_info
            
            if self.discovery_callback:
                self.discovery_callback(printer_info)
            
            logger.info(f"Discovered Prusa printer: {printer_info['name']}")
    
    def _discover_ultimaker_printers(self) -> None:
        """Discover Ultimaker printers on the network."""
        # Simulate discovering Ultimaker printers
        
        # Simulate finding a printer
        printer_id = "ultimaker_1"
        if printer_id not in self.printers:
            printer_info = {
                "id": printer_id,
                "name": "Ultimaker S5",
                "type": "ultimaker",
                "address": "192.168.1.102",
                "port": 80,
                "status": "online",
                "capabilities": ["print", "status", "cancel"]
            }
            
            self.printers[printer_id] = printer_info
            
            if self.discovery_callback:
                self.discovery_callback(printer_info)
            
            logger.info(f"Discovered Ultimaker printer: {printer_info['name']}")


class PrinterInterface:
    """
    Interface for communicating with 3D printers.
    """
    
    def __init__(self, printer_discovery: PrinterDiscovery):
        """
        Initialize the printer interface.
        
        Args:
            printer_discovery: Instance of PrinterDiscovery for finding printers
        """
        self.printer_discovery = printer_discovery
        self.connected_printers = {}  # Dictionary of connected printers
    
    def connect_to_printer(self, printer_id: str, credentials: Optional[Dict[str, Any]] = None) -> bool:
        """
        Connect to a specific printer.
        
        Args:
            printer_id: ID of the printer to connect to
            credentials: Optional credentials for authentication
            
        Returns:
            True if connection successful, False otherwise
        """
        printers = self.printer_discovery.get_printers()
        if printer_id not in printers:
            logger.error(f"Printer not found: {printer_id}")
            return False
        
        printer_info = printers[printer_id]
        
        # Create appropriate printer client based on type
        if printer_info["type"] == "octoprint":
            client = OctoPrintClient(printer_info, credentials)
        elif printer_info["type"] == "prusa":
            client = PrusaClient(printer_info, credentials)
        elif printer_info["type"] == "ultimaker":
            client = UltimakerClient(printer_info, credentials)
        else:
            logger.error(f"Unsupported printer type: {printer_info['type']}")
            return False
        
        # Connect to the printer
        if client.connect():
            self.connected_printers[printer_id] = client
            return True
        else:
            return False
    
    def disconnect_from_printer(self, printer_id: str) -> bool:
        """
        Disconnect from a specific printer.
        
        Args:
            printer_id: ID of the printer to disconnect from
            
        Returns:
            True if disconnection successful, False otherwise
        """
        if printer_id not in self.connected_printers:
            logger.error(f"Not connected to printer: {printer_id}")
            return False
        
        client = self.connected_printers[printer_id]
        if client.disconnect():
            del self.connected_printers[printer_id]
            return True
        else:
            return False
    
    def print_file(self, printer_id: str, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool:
        """
        Send a file to a printer for printing.
        
        Args:
            printer_id: ID of the printer to print on
            file_path: Path to the STL file to print
            print_settings: Optional print settings
            
        Returns:
            True if print job started successfully, False otherwise
        """
        if printer_id not in self.connected_printers:
            logger.error(f"Not connected to printer: {printer_id}")
            return False
        
        client = self.connected_printers[printer_id]
        return client.print_file(file_path, print_settings)
    
    def get_printer_status(self, printer_id: str) -> Dict[str, Any]:
        """
        Get the status of a specific printer.
        
        Args:
            printer_id: ID of the printer to get status for
            
        Returns:
            Dictionary with printer status information
        """
        if printer_id not in self.connected_printers:
            logger.error(f"Not connected to printer: {printer_id}")
            return {"error": "Not connected to printer"}
        
        client = self.connected_printers[printer_id]
        return client.get_status()
    
    def cancel_print(self, printer_id: str) -> bool:
        """
        Cancel a print job on a specific printer.
        
        Args:
            printer_id: ID of the printer to cancel print on
            
        Returns:
            True if cancellation successful, False otherwise
        """
        if printer_id not in self.connected_printers:
            logger.error(f"Not connected to printer: {printer_id}")
            return False
        
        client = self.connected_printers[printer_id]
        return client.cancel_print()


class PrinterClient:
    """Base class for printer clients."""
    
    def __init__(self, printer_info: Dict[str, Any], credentials: Optional[Dict[str, Any]] = None):
        """
        Initialize the printer client.
        
        Args:
            printer_info: Information about the printer
            credentials: Optional credentials for authentication
        """
        self.printer_info = printer_info
        self.credentials = credentials or {}
        self.connected = False
    
    def connect(self) -> bool:
        """
        Connect to the printer.
        
        Returns:
            True if connection successful, False otherwise
        """
        # Base implementation - should be overridden by subclasses
        self.connected = True
        return True
    
    def disconnect(self) -> bool:
        """
        Disconnect from the printer.
        
        Returns:
            True if disconnection successful, False otherwise
        """
        # Base implementation - should be overridden by subclasses
        self.connected = False
        return True
    
    def print_file(self, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool:
        """
        Send a file to the printer for printing.
        
        Args:
            file_path: Path to the STL file to print
            print_settings: Optional print settings
            
        Returns:
            True if print job started successfully, False otherwise
        """
        # Base implementation - should be overridden by subclasses
        if not self.connected:
            logger.error("Not connected to printer")
            return False
        
        logger.info(f"Printing file: {file_path}")
        return True
    
    def get_status(self) -> Dict[str, Any]:
        """
        Get the status of the printer.
        
        Returns:
            Dictionary with printer status information
        """
        # Base implementation - should be overridden by subclasses
        if not self.connected:
            return {"status": "disconnected"}
        
        return {"status": "connected"}
    
    def cancel_print(self) -> bool:
        """
        Cancel the current print job.
        
        Returns:
            True if cancellation successful, False otherwise
        """
        # Base implementation - should be overridden by subclasses
        if not self.connected:
            logger.error("Not connected to printer")
            return False
        
        logger.info("Cancelling print job")
        return True


class OctoPrintClient(PrinterClient):
    """Client for OctoPrint printers."""
    
    def connect(self) -> bool:
        """Connect to an OctoPrint server."""
        try:
            # In a real implementation, you would use the OctoPrint API
            # to connect to the printer
            
            # Check if API key is provided
            if "api_key" not in self.credentials:
                logger.error("API key required for OctoPrint")
                return False
            
            # Simulate connection
            logger.info(f"Connected to OctoPrint server: {self.printer_info['address']}")
            self.connected = True
            return True
        except Exception as e:
            logger.error(f"Error connecting to OctoPrint server: {str(e)}")
            return False
    
    def print_file(self, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool:
        """Send a file to an OctoPrint server for printing."""
        if not self.connected:
            logger.error("Not connected to OctoPrint server")
            return False
        
        try:
            # In a real implementation, you would use the OctoPrint API
            # to upload the file and start printing
            
            # Check if file exists
            if not os.path.exists(file_path):
                logger.error(f"File not found: {file_path}")
                return False
            
            # Simulate printing
            logger.info(f"Printing file on OctoPrint server: {file_path}")
            return True
        except Exception as e:
            logger.error(f"Error printing file on OctoPrint server: {str(e)}")
            return False
    
    def get_status(self) -> Dict[str, Any]:
        """Get the status of an OctoPrint server."""
        if not self.connected:
            return {"status": "disconnected"}
        
        try:
            # In a real implementation, you would use the OctoPrint API
            # to get the printer status
            
            # Simulate status
            return {
                "status": "connected",
                "printer": {
                    "state": "operational",
                    "temperature": {
                        "bed": {"actual": 60.0, "target": 60.0},
                        "tool0": {"actual": 210.0, "target": 210.0}
                    }
                },
                "job": {
                    "file": {"name": "example.gcode"},
                    "progress": {"completion": 0.0, "printTime": 0, "printTimeLeft": 0}
                }
            }
        except Exception as e:
            logger.error(f"Error getting status from OctoPrint server: {str(e)}")
            return {"status": "error", "message": str(e)}


class PrusaClient(PrinterClient):
    """Client for Prusa printers."""
    
    def connect(self) -> bool:
        """Connect to a Prusa printer."""
        try:
            # In a real implementation, you would use the Prusa API
            # to connect to the printer
            
            # Simulate connection
            logger.info(f"Connected to Prusa printer: {self.printer_info['address']}")
            self.connected = True
            return True
        except Exception as e:
            logger.error(f"Error connecting to Prusa printer: {str(e)}")
            return False


class UltimakerClient(PrinterClient):
    """Client for Ultimaker printers."""
    
    def connect(self) -> bool:
        """Connect to an Ultimaker printer."""
        try:
            # In a real implementation, you would use the Ultimaker API
            # to connect to the printer
            
            # Simulate connection
            logger.info(f"Connected to Ultimaker printer: {self.printer_info['address']}")
            self.connected = True
            return True
        except Exception as e:
            logger.error(f"Error connecting to Ultimaker printer: {str(e)}")
            return False

```

--------------------------------------------------------------------------------
/src/nlp/parameter_extractor.py:
--------------------------------------------------------------------------------

```python
import re
import logging
from typing import Dict, Any, Tuple, List, Optional
import json

logger = logging.getLogger(__name__)

class ParameterExtractor:
    """
    Extract parameters from natural language descriptions.
    Implements dialog flow for collecting specifications and translating them to OpenSCAD parameters.
    """
    
    def __init__(self):
        """Initialize the parameter extractor."""
        # Using only millimeters as per project requirements
        self.unit_conversions = {
            'mm': 1.0
        }
        
        # Shape recognition patterns with expanded vocabulary
        self.shape_patterns = {
            'cube': r'\b(cube|box|square|rectangular|block|cuboid|brick)\b',
            'sphere': r'\b(sphere|ball|round|circular|globe|orb)\b',
            'cylinder': r'\b(cylinder|tube|pipe|rod|circular column|pillar|column)\b',
            'box': r'\b(hollow box|container|case|enclosure|bin|chest|tray)\b',
            'rounded_box': r'\b(rounded box|rounded container|rounded case|rounded enclosure|smooth box|rounded corners|chamfered box)\b',
            'cone': r'\b(cone|pyramid|tapered cylinder|funnel)\b',
            'torus': r'\b(torus|donut|ring|loop|circular ring)\b',
            'prism': r'\b(prism|triangular prism|wedge|triangular shape)\b',
            'custom': r'\b(custom|complex|special|unique|combined|composite)\b'
        }
        
        # Parameter recognition patterns with enhanced unit detection
        self.parameter_patterns = {
            'width': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:wide|width|across|w)',
            'height': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:high|height|tall|h)',
            'depth': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:deep|depth|long|d|length)',
            'radius': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:radius|r)',
            'diameter': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:diameter|dia)',
            'thickness': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:thick|thickness|t)',
            'segments': r'(\d+)\s*(?:segments|sides|faces|facets|smoothness)',
            'center': r'\b(centered|center|middle|origin)\b',
            'angle': r'(\d+(?:\.\d+)?)\s*(?:deg|degree|degrees|°)?\s*(?:angle|rotation|rotate|tilt)',
            'scale': r'(\d+(?:\.\d+)?)\s*(?:x|times|scale|scaling|factor)',
            'resolution': r'(\d+(?:\.\d+)?)\s*(?:resolution|quality|detail)'
        }
        
        # Dialog state for multi-turn conversations
        self.dialog_state = {}
    
    def extract_parameters(self, description: str, model_type: Optional[str] = None, 
                              existing_parameters: Optional[Dict[str, Any]] = None) -> Tuple[str, Dict[str, Any]]:
        """
        Extract model type and parameters from a natural language description.
        
        Args:
            description: Natural language description of the 3D object
            model_type: Optional model type for context (if already known)
            existing_parameters: Optional existing parameters for context (for modifications)
            
        Returns:
            Tuple of (model_type, parameters)
        """
        # Use provided model_type or determine from description
        if model_type is None:
            model_type = self._determine_shape_type(description)
        
        # Start with existing parameters if provided
        parameters = existing_parameters.copy() if existing_parameters else {}
        
        # Extract parameters based on the shape type
        new_parameters = self._extract_shape_parameters(description, model_type)
        
        # Update parameters with newly extracted ones
        parameters.update(new_parameters)
        
        # Apply default parameters if needed
        parameters = self._apply_default_parameters(model_type, parameters)
        
        logger.info(f"Extracted model type: {model_type}, parameters: {parameters}")
        return model_type, parameters
    
    def extract_parameters_from_modifications(self, modifications: str, model_type: Optional[str] = None, 
                                               existing_parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        Extract parameters from modification description with contextual understanding.
        
        Args:
            modifications: Description of modifications to make
            model_type: Optional model type for context
            existing_parameters: Optional existing parameters for context
            
        Returns:
            Dictionary of parameters to update
        """
        # Start with existing parameters if provided
        parameters = existing_parameters.copy() if existing_parameters else {}
        
        # Extract all possible parameters from the modifications
        new_parameters = {}
        for param_name, pattern in self.parameter_patterns.items():
            matches = re.findall(pattern, modifications, re.IGNORECASE)
            if matches:
                # Take the last match if multiple are found
                value = matches[-1]
                if isinstance(value, tuple):
                    value = value[0]  # Extract from capture group
                new_parameters[param_name] = self._convert_to_mm(value, modifications)
        
        # Update parameters with newly extracted ones
        parameters.update(new_parameters)
        
        # Apply contextual understanding based on model type
        if model_type and not new_parameters:
            # If no explicit parameters were found, try to infer from context
            # For now, we'll just log this case since inference is complex
            logger.info(f"No explicit parameters found in '{modifications}', using existing parameters")
        
        logger.info(f"Extracted modification parameters: {parameters}")
        return parameters
    
    def get_missing_parameters(self, model_type: str, parameters: Dict[str, Any]) -> List[str]:
        """
        Determine which required parameters are missing for a given model type.
        
        Args:
            model_type: Type of model
            parameters: Currently extracted parameters
            
        Returns:
            List of missing parameter names
        """
        required_params = self._get_required_parameters(model_type)
        return [param for param in required_params if param not in parameters]
    
    def update_dialog_state(self, user_id: str, model_type: Optional[str] = None, 
                           parameters: Optional[Dict[str, Any]] = None) -> None:
        """
        Update the dialog state for a user.
        
        Args:
            user_id: Unique identifier for the user
            model_type: Optional model type to update
            parameters: Optional parameters to update
        """
        if user_id not in self.dialog_state:
            self.dialog_state[user_id] = {
                'model_type': None,
                'parameters': {},
                'missing_parameters': [],
                'current_question': None
            }
        
        if model_type:
            self.dialog_state[user_id]['model_type'] = model_type
        
        if parameters:
            self.dialog_state[user_id]['parameters'].update(parameters)
            
        # Update missing parameters
        if self.dialog_state[user_id]['model_type']:
            missing = self.get_missing_parameters(
                self.dialog_state[user_id]['model_type'],
                self.dialog_state[user_id]['parameters']
            )
            self.dialog_state[user_id]['missing_parameters'] = missing
    
    def get_next_question(self, user_id: str) -> Optional[str]:
        """
        Get the next question to ask the user based on missing parameters.
        
        Args:
            user_id: Unique identifier for the user
            
        Returns:
            Question string or None if all parameters are collected
        """
        if user_id not in self.dialog_state:
            return "What kind of 3D object would you like to create?"
        
        state = self.dialog_state[user_id]
        
        # If we don't have a model type yet, ask for it
        if not state['model_type']:
            state['current_question'] = "What kind of 3D object would you like to create?"
            return state['current_question']
        
        # If we have missing parameters, ask for the first one
        if state['missing_parameters']:
            param = state['missing_parameters'][0]
            question = self._get_parameter_question(param, state['model_type'])
            state['current_question'] = question
            return question
        
        # All parameters collected
        state['current_question'] = None
        return None
    
    def process_answer(self, user_id: str, answer: str) -> Dict[str, Any]:
        """
        Process a user's answer to a question.
        
        Args:
            user_id: Unique identifier for the user
            answer: User's answer to the current question
            
        Returns:
            Updated dialog state
        """
        if user_id not in self.dialog_state:
            # Initialize with default state
            self.update_dialog_state(user_id)
        
        state = self.dialog_state[user_id]
        current_question = state['current_question']
        
        # Process based on current question
        if not state['model_type']:
            # Trying to determine the model type
            model_type = self._determine_shape_type(answer)
            self.update_dialog_state(user_id, model_type=model_type)
        elif state['missing_parameters']:
            # Trying to collect a specific parameter
            param = state['missing_parameters'][0]
            value = self._extract_parameter_value(param, answer)
            if value is not None:
                self.update_dialog_state(user_id, parameters={param: value})
        
        # Return the updated state
        return self.dialog_state[user_id]
    
    def _determine_shape_type(self, description: str) -> str:
        """
        Determine the shape type from the description.
        Enhanced to support more shape types and better pattern matching.
        """
        # Check for explicit shape mentions
        for shape, pattern in self.shape_patterns.items():
            if re.search(pattern, description, re.IGNORECASE):
                logger.info(f"Detected shape type: {shape} from pattern: {pattern}")
                return shape
        
        # Try to infer shape from context if no explicit mention
        if re.search(r'\b(round|circular|sphere|ball)\b', description, re.IGNORECASE):
            return "sphere"
        elif re.search(r'\b(tall|column|pillar|rod)\b', description, re.IGNORECASE):
            return "cylinder"
        elif re.search(r'\b(box|container|case|enclosure)\b', description, re.IGNORECASE):
            # Determine if it should be a rounded box
            if re.search(r'\b(rounded|smooth|chamfered)\b', description, re.IGNORECASE):
                return "rounded_box"
            return "box"
        
        # Default to cube if no shape is detected
        logger.info("No specific shape detected, defaulting to cube")
        return "cube"
    
    def _extract_shape_parameters(self, description: str, model_type: str) -> Dict[str, Any]:
        """Extract parameters for a specific shape type."""
        parameters = {}
        
        # Extract all possible parameters
        for param_name, pattern in self.parameter_patterns.items():
            matches = re.findall(pattern, description, re.IGNORECASE)
            if matches:
                # Take the last match if multiple are found
                value = matches[-1]
                if isinstance(value, tuple):
                    value = value[0]  # Extract from capture group
                parameters[param_name] = self._convert_to_mm(value, description)
        
        # Special case for diameter -> radius conversion
        if 'diameter' in parameters and 'radius' not in parameters:
            parameters['radius'] = parameters['diameter'] / 2
            del parameters['diameter']
        
        # Special case for center parameter
        if 'center' in parameters:
            center_value = parameters['center']
            if isinstance(center_value, (int, float)):
                # Convert numeric value to boolean string
                parameters['center'] = 'true' if center_value > 0 else 'false'
            else:
                # Convert string value to boolean string
                center_str = str(center_value).lower()
                parameters['center'] = 'true' if center_str in ['true', 'yes', 'y', '1'] else 'false'
        
        return parameters
    
    def _convert_to_mm(self, value_str: str, context: str) -> float:
        """
        Convert a value to millimeters.
        As per project requirements, we only use millimeters for design.
        """
        try:
            value = float(value_str)
            
            # Since we're only using millimeters, we just return the value directly
            # This simplifies the conversion logic while maintaining the function interface
            logger.info(f"Using value {value} in millimeters")
            return value
        except ValueError:
            logger.warning(f"Could not convert value to float: {value_str}")
            return 0.0
    
    def _apply_default_parameters(self, model_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Apply default parameters based on the model type."""
        defaults = {
            'cube': {'width': 10, 'depth': 10, 'height': 10, 'center': 'false'},
            'sphere': {'radius': 10, 'segments': 32},
            'cylinder': {'radius': 10, 'height': 20, 'center': 'false', 'segments': 32},
            'box': {'width': 30, 'depth': 20, 'height': 15, 'thickness': 2},
            'rounded_box': {'width': 30, 'depth': 20, 'height': 15, 'radius': 3, 'segments': 32},
            'cone': {'base_radius': 10, 'height': 20, 'center': 'false', 'segments': 32},
            'torus': {'major_radius': 20, 'minor_radius': 5, 'segments': 32},
            'prism': {'width': 20, 'height': 15, 'depth': 20, 'center': 'false'},
            'custom': {'width': 20, 'height': 20, 'depth': 20, 'center': 'false'}
        }
        
        # Get defaults for the model type
        model_defaults = defaults.get(model_type, {})
        
        # Apply defaults for missing parameters
        for param, default_value in model_defaults.items():
            if param not in parameters:
                parameters[param] = default_value
        
        return parameters
    
    def _get_required_parameters(self, model_type: str) -> List[str]:
        """Get the list of required parameters for a model type."""
        required_params = {
            'cube': ['width', 'depth', 'height'],
            'sphere': ['radius'],
            'cylinder': ['radius', 'height'],
            'box': ['width', 'depth', 'height', 'thickness'],
            'rounded_box': ['width', 'depth', 'height', 'radius'],
            'cone': ['base_radius', 'height'],
            'torus': ['major_radius', 'minor_radius'],
            'prism': ['width', 'height', 'depth'],
            'custom': ['width', 'height', 'depth']
        }
        
        return required_params.get(model_type, [])
    
    def _get_parameter_question(self, param: str, model_type: str) -> str:
        """Get a question to ask for a specific parameter."""
        questions = {
            'width': f"What should be the width of the {model_type} in mm?",
            'depth': f"What should be the depth of the {model_type} in mm?",
            'height': f"What should be the height of the {model_type} in mm?",
            'radius': f"What should be the radius of the {model_type} in mm?",
            'thickness': f"What should be the wall thickness of the {model_type} in mm?",
            'segments': f"How many segments should the {model_type} have for smoothness?",
            'base_radius': f"What should be the base radius of the {model_type} in mm?",
            'major_radius': f"What should be the major radius of the {model_type} in mm?",
            'minor_radius': f"What should be the minor radius of the {model_type} in mm?",
            'diameter': f"What should be the diameter of the {model_type} in mm?",
            'angle': f"What should be the angle of the {model_type} in degrees?",
            'scale': f"What should be the scale factor for the {model_type}?",
            'resolution': f"What resolution should the {model_type} have (higher means more detailed)?",
            'center': f"Should the {model_type} be centered? (yes/no)"
        }
        
        return questions.get(param, f"What should be the {param} of the {model_type}?")
    
    def _extract_parameter_value(self, param: str, answer: str) -> Optional[float]:
        """Extract a parameter value from an answer."""
        pattern = self.parameter_patterns.get(param)
        if not pattern:
            # For parameters without specific patterns, try to extract any number
            pattern = r'(\d+(?:\.\d+)?)'
        
        matches = re.findall(pattern, answer, re.IGNORECASE)
        if matches:
            value = matches[-1]
            if isinstance(value, tuple):
                value = value[0]  # Extract from capture group
            return self._convert_to_mm(value, answer)
        
        # Try to extract just a number
        matches = re.findall(r'(\d+(?:\.\d+)?)', answer)
        if matches:
            value = matches[-1]
            return self._convert_to_mm(value, answer)
        
        return None

```

--------------------------------------------------------------------------------
/src/remote/connection_manager.py:
--------------------------------------------------------------------------------

```python
"""
Connection manager for remote CUDA Multi-View Stereo processing.

This module provides functionality to discover, connect to, and manage
connections with remote CUDA MVS servers within the LAN.
"""

import os
import json
import logging
import time
import threading
from typing import Dict, List, Optional, Any, Union, Callable
import socket
from zeroconf import ServiceBrowser, ServiceListener, Zeroconf

from src.remote.cuda_mvs_client import CUDAMVSClient

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CUDAMVSConnectionManager:
    """
    Connection manager for remote CUDA MVS servers.
    
    This class handles:
    1. Discovering available CUDA MVS servers on the LAN
    2. Managing connections to multiple servers
    3. Load balancing across available servers
    4. Monitoring server health and status
    5. Automatic failover if a server becomes unavailable
    """
    
    def __init__(
        self,
        api_key: Optional[str] = None,
        discovery_port: int = 8765,
        connection_timeout: int = 10,
        health_check_interval: int = 60,
        auto_discover: bool = True
    ):
        """
        Initialize the connection manager.
        
        Args:
            api_key: API key for authentication (if required)
            discovery_port: Port used for server discovery
            connection_timeout: Timeout for server connections in seconds
            health_check_interval: Interval for health checks in seconds
            auto_discover: Whether to automatically discover servers on startup
        """
        self.api_key = api_key
        self.discovery_port = discovery_port
        self.connection_timeout = connection_timeout
        self.health_check_interval = health_check_interval
        
        # Server tracking
        self.servers: Dict[str, Dict[str, Any]] = {}
        self.clients: Dict[str, CUDAMVSClient] = {}
        self.server_lock = threading.RLock()
        
        # Health check thread
        self.health_check_thread = None
        self.health_check_stop_event = threading.Event()
        
        # Discovery
        self.zeroconf = None
        self.browser = None
        
        # Start discovery if enabled
        if auto_discover:
            self.start_discovery()
            
        # Start health check thread
        self.start_health_check()
    
    def start_discovery(self):
        """
        Start discovering CUDA MVS servers on the LAN.
        """
        if self.zeroconf is not None:
            return
        
        try:
            self.zeroconf = Zeroconf()
            listener = CUDAMVSServiceListener(self)
            self.browser = ServiceBrowser(self.zeroconf, "_cudamvs._tcp.local.", listener)
            logger.info("Started CUDA MVS server discovery")
        except Exception as e:
            logger.error(f"Error starting discovery: {e}")
    
    def stop_discovery(self):
        """
        Stop discovering CUDA MVS servers.
        """
        if self.zeroconf is not None:
            try:
                self.zeroconf.close()
                self.zeroconf = None
                self.browser = None
                logger.info("Stopped CUDA MVS server discovery")
            except Exception as e:
                logger.error(f"Error stopping discovery: {e}")
    
    def start_health_check(self):
        """
        Start the health check thread.
        """
        if self.health_check_thread is not None and self.health_check_thread.is_alive():
            return
        
        self.health_check_stop_event.clear()
        self.health_check_thread = threading.Thread(
            target=self._health_check_loop,
            daemon=True
        )
        self.health_check_thread.start()
        logger.info("Started health check thread")
    
    def stop_health_check(self):
        """
        Stop the health check thread.
        """
        if self.health_check_thread is not None:
            self.health_check_stop_event.set()
            self.health_check_thread.join(timeout=5)
            self.health_check_thread = None
            logger.info("Stopped health check thread")
    
    def _health_check_loop(self):
        """
        Health check loop that runs in a separate thread.
        """
        while not self.health_check_stop_event.is_set():
            try:
                self.check_all_servers()
            except Exception as e:
                logger.error(f"Error in health check: {e}")
            
            # Wait for the next check interval or until stopped
            self.health_check_stop_event.wait(self.health_check_interval)
    
    def add_server(self, server_info: Dict[str, Any]):
        """
        Add a server to the manager.
        
        Args:
            server_info: Dictionary with server information
        """
        server_id = server_info.get("server_id")
        if not server_id:
            logger.error("Cannot add server without server_id")
            return
        
        with self.server_lock:
            # Check if server already exists
            if server_id in self.servers:
                # Update existing server info
                self.servers[server_id].update(server_info)
                logger.info(f"Updated server: {server_info.get('name')} at {server_info.get('url')}")
            else:
                # Add new server
                self.servers[server_id] = server_info
                
                # Create client for the server
                self.clients[server_id] = CUDAMVSClient(
                    server_url=server_info.get("url"),
                    api_key=self.api_key,
                    connection_timeout=self.connection_timeout
                )
                
                logger.info(f"Added server: {server_info.get('name')} at {server_info.get('url')}")
    
    def remove_server(self, server_id: str):
        """
        Remove a server from the manager.
        
        Args:
            server_id: ID of the server to remove
        """
        with self.server_lock:
            if server_id in self.servers:
                server_info = self.servers.pop(server_id)
                if server_id in self.clients:
                    del self.clients[server_id]
                logger.info(f"Removed server: {server_info.get('name')} at {server_info.get('url')}")
    
    def get_servers(self) -> List[Dict[str, Any]]:
        """
        Get a list of all servers.
        
        Returns:
            List of dictionaries with server information
        """
        with self.server_lock:
            return list(self.servers.values())
    
    def get_server(self, server_id: str) -> Optional[Dict[str, Any]]:
        """
        Get information about a specific server.
        
        Args:
            server_id: ID of the server
            
        Returns:
            Dictionary with server information or None if not found
        """
        with self.server_lock:
            return self.servers.get(server_id)
    
    def get_client(self, server_id: str) -> Optional[CUDAMVSClient]:
        """
        Get the client for a specific server.
        
        Args:
            server_id: ID of the server
            
        Returns:
            CUDAMVSClient instance or None if not found
        """
        with self.server_lock:
            return self.clients.get(server_id)
    
    def get_best_server(self) -> Optional[str]:
        """
        Get the ID of the best server to use based on availability and load.
        
        Returns:
            Server ID or None if no servers are available
        """
        with self.server_lock:
            available_servers = [
                server_id for server_id, server in self.servers.items()
                if server.get("status") == "available"
            ]
            
            if not available_servers:
                return None
            
            # For now, just return the first available server
            # In a more advanced implementation, this would consider
            # server load, capabilities, latency, etc.
            return available_servers[0]
    
    def check_server(self, server_id: str) -> Dict[str, Any]:
        """
        Check the status of a specific server.
        
        Args:
            server_id: ID of the server to check
            
        Returns:
            Dictionary with server status information
        """
        client = self.get_client(server_id)
        if not client:
            return {"status": "error", "message": f"Server {server_id} not found"}
        
        # Test connection
        result = client.test_connection()
        
        with self.server_lock:
            if server_id in self.servers:
                # Update server status
                if result["status"] == "success":
                    self.servers[server_id]["status"] = "available"
                    self.servers[server_id]["last_check"] = time.time()
                    self.servers[server_id]["latency_ms"] = result.get("latency_ms")
                    
                    # Update capabilities if available
                    if "server_info" in result and "capabilities" in result["server_info"]:
                        self.servers[server_id]["capabilities"] = result["server_info"]["capabilities"]
                else:
                    self.servers[server_id]["status"] = "unavailable"
                    self.servers[server_id]["last_check"] = time.time()
                    self.servers[server_id]["error"] = result.get("message")
        
        return result
    
    def check_all_servers(self) -> Dict[str, Dict[str, Any]]:
        """
        Check the status of all servers.
        
        Returns:
            Dictionary mapping server IDs to status information
        """
        results = {}
        
        with self.server_lock:
            server_ids = list(self.servers.keys())
        
        for server_id in server_ids:
            results[server_id] = self.check_server(server_id)
        
        return results
    
    def discover_servers(self) -> List[Dict[str, Any]]:
        """
        Manually discover CUDA MVS servers on the LAN.
        
        Returns:
            List of dictionaries containing server information
        """
        # Create a temporary client to discover servers
        client = CUDAMVSClient(
            api_key=self.api_key,
            discovery_port=self.discovery_port,
            connection_timeout=self.connection_timeout
        )
        
        discovered_servers = client.discover_servers()
        
        # Add discovered servers
        for server_info in discovered_servers:
            self.add_server(server_info)
        
        return discovered_servers
    
    def upload_images_to_best_server(self, image_paths: List[str]) -> Dict[str, Any]:
        """
        Upload images to the best available server.
        
        Args:
            image_paths: List of paths to images to upload
            
        Returns:
            Dictionary with upload status and job information
        """
        server_id = self.get_best_server()
        if not server_id:
            return {"status": "error", "message": "No available servers"}
        
        client = self.get_client(server_id)
        if not client:
            return {"status": "error", "message": f"Client for server {server_id} not found"}
        
        # Upload images
        result = client.upload_images(image_paths)
        
        # Add server information to result
        result["server_id"] = server_id
        result["server_name"] = self.servers[server_id].get("name")
        
        return result
    
    def generate_model_from_images(
        self,
        image_paths: List[str],
        output_format: str = "obj",
        wait_for_completion: bool = True,
        poll_interval: int = 5,
        server_id: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Generate a 3D model from images using the best available server.
        
        Args:
            image_paths: List of paths to images
            output_format: Format of the output model
            wait_for_completion: Whether to wait for job completion
            poll_interval: Interval in seconds to poll for job status
            server_id: ID of the server to use (uses best server if None)
            
        Returns:
            Dictionary with job status and model information if completed
        """
        # Get server to use
        if server_id is None:
            server_id = self.get_best_server()
            if not server_id:
                return {"status": "error", "message": "No available servers"}
        
        client = self.get_client(server_id)
        if not client:
            return {"status": "error", "message": f"Client for server {server_id} not found"}
        
        # Generate model
        result = client.generate_model_from_images(
            image_paths=image_paths,
            output_format=output_format,
            wait_for_completion=wait_for_completion,
            poll_interval=poll_interval
        )
        
        # Add server information to result
        result["server_id"] = server_id
        result["server_name"] = self.servers[server_id].get("name")
        
        return result
    
    def get_job_status(self, job_id: str, server_id: str) -> Dict[str, Any]:
        """
        Get the status of a job on a specific server.
        
        Args:
            job_id: ID of the job to check
            server_id: ID of the server
            
        Returns:
            Dictionary with job status information
        """
        client = self.get_client(server_id)
        if not client:
            return {"status": "error", "message": f"Client for server {server_id} not found"}
        
        return client.get_job_status(job_id)
    
    def download_model(self, job_id: str, server_id: str, output_format: str = "obj") -> Dict[str, Any]:
        """
        Download a processed 3D model from a specific server.
        
        Args:
            job_id: ID of the job to download
            server_id: ID of the server
            output_format: Format of the model to download
            
        Returns:
            Dictionary with download status and local file path
        """
        client = self.get_client(server_id)
        if not client:
            return {"status": "error", "message": f"Client for server {server_id} not found"}
        
        return client.download_model(job_id, output_format)
    
    def cancel_job(self, job_id: str, server_id: str) -> Dict[str, Any]:
        """
        Cancel a running job on a specific server.
        
        Args:
            job_id: ID of the job to cancel
            server_id: ID of the server
            
        Returns:
            Dictionary with cancellation status
        """
        client = self.get_client(server_id)
        if not client:
            return {"status": "error", "message": f"Client for server {server_id} not found"}
        
        return client.cancel_job(job_id)
    
    def cleanup(self):
        """
        Clean up resources.
        """
        self.stop_health_check()
        self.stop_discovery()


class CUDAMVSServiceListener(ServiceListener):
    """
    Zeroconf service listener for CUDA MVS servers.
    """
    
    def __init__(self, connection_manager: CUDAMVSConnectionManager):
        """
        Initialize the service listener.
        
        Args:
            connection_manager: Connection manager to update with discovered servers
        """
        self.connection_manager = connection_manager
    
    def add_service(self, zc: Zeroconf, type_: str, name: str):
        """
        Called when a service is discovered.
        
        Args:
            zc: Zeroconf instance
            type_: Service type
            name: Service name
        """
        info = zc.get_service_info(type_, name)
        if info:
            try:
                # Extract server information
                server_id = name.split('.')[0]
                server_name = info.properties.get(b'name', b'Unknown').decode('utf-8')
                
                # Get server URL
                addresses = info.parsed_addresses()
                if not addresses:
                    return
                
                server_url = f"http://{addresses[0]}:{info.port}"
                
                # Parse capabilities
                capabilities = {}
                if b'capabilities' in info.properties:
                    try:
                        capabilities = json.loads(info.properties[b'capabilities'].decode('utf-8'))
                    except json.JSONDecodeError:
                        pass
                
                # Create server info
                server_info = {
                    "server_id": server_id,
                    "name": server_name,
                    "url": server_url,
                    "capabilities": capabilities,
                    "status": "unknown",
                    "discovered_at": time.time()
                }
                
                # Add server to connection manager
                self.connection_manager.add_server(server_info)
                
            except Exception as e:
                logger.error(f"Error processing discovered service: {e}")
    
    def remove_service(self, zc: Zeroconf, type_: str, name: str):
        """
        Called when a service is removed.
        
        Args:
            zc: Zeroconf instance
            type_: Service type
            name: Service name
        """
        try:
            server_id = name.split('.')[0]
            self.connection_manager.remove_server(server_id)
        except Exception as e:
            logger.error(f"Error removing service: {e}")
    
    def update_service(self, zc: Zeroconf, type_: str, name: str):
        """
        Called when a service is updated.
        
        Args:
            zc: Zeroconf instance
            type_: Service type
            name: Service name
        """
        self.add_service(zc, type_, name)

```

--------------------------------------------------------------------------------
/src/visualization/web_interface.py:
--------------------------------------------------------------------------------

```python
import os
import base64
import logging
from typing import Dict, Any, List, Optional
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles

logger = logging.getLogger(__name__)

class WebInterface:
    """
    Web interface for displaying model previews and managing 3D models.
    """
    
    def __init__(self, app, static_dir: str, templates_dir: str, output_dir: str):
        """
        Initialize the web interface.
        
        Args:
            app: FastAPI application
            static_dir: Directory for static files
            templates_dir: Directory for templates
            output_dir: Directory containing output files (STL, PNG)
        """
        self.app = app
        self.static_dir = static_dir
        self.templates_dir = templates_dir
        self.output_dir = output_dir
        self.preview_dir = os.path.join(output_dir, "preview")
        self.stl_dir = os.path.join(output_dir, "stl")
        
        # Create directories if they don't exist
        os.makedirs(self.static_dir, exist_ok=True)
        os.makedirs(self.templates_dir, exist_ok=True)
        
        # Create router
        self.router = APIRouter(prefix="/ui", tags=["UI"])
        
        # Set up static files
        self.app.mount("/static", StaticFiles(directory=static_dir), name="static")
        
        # Set up templates
        self.templates = Jinja2Templates(directory=templates_dir)
        
        # Register routes
        self._register_routes()
        
        # Create template files
        self._create_template_files()
        
        # Create static files
        self._create_static_files()
    
    def _register_routes(self):
        """Register routes for the web interface."""
        # Home page
        @self.router.get("/", response_class=HTMLResponse)
        async def home(request: Request):
            return self.templates.TemplateResponse("index.html", {"request": request})
        
        # Model preview page
        @self.router.get("/preview/{model_id}", response_class=HTMLResponse)
        async def preview(request: Request, model_id: str):
            # Check if preview exists
            preview_file = os.path.join(self.preview_dir, f"{model_id}.png")
            if not os.path.exists(preview_file):
                raise HTTPException(status_code=404, detail="Preview not found")
            
            # Get multi-angle previews if they exist
            angles = ["front", "top", "right", "perspective"]
            previews = {}
            
            for angle in angles:
                angle_file = os.path.join(self.preview_dir, f"{model_id}_{angle}.png")
                if os.path.exists(angle_file):
                    previews[angle] = f"/api/preview/{model_id}_{angle}"
            
            # If no multi-angle previews, use the main preview
            if not previews:
                previews["main"] = f"/api/preview/{model_id}"
            
            # Check if STL exists
            stl_file = os.path.join(self.stl_dir, f"{model_id}.stl")
            stl_url = f"/api/stl/{model_id}" if os.path.exists(stl_file) else None
            
            return self.templates.TemplateResponse(
                "preview.html", 
                {
                    "request": request, 
                    "model_id": model_id,
                    "previews": previews,
                    "stl_url": stl_url
                }
            )
        
        # List all models
        @self.router.get("/models", response_class=HTMLResponse)
        async def list_models(request: Request):
            # Get all STL files
            stl_files = []
            if os.path.exists(self.stl_dir):
                stl_files = [f for f in os.listdir(self.stl_dir) if f.endswith(".stl")]
            
            # Extract model IDs
            model_ids = [os.path.splitext(f)[0] for f in stl_files]
            
            # Get preview URLs
            models = []
            for model_id in model_ids:
                preview_file = os.path.join(self.preview_dir, f"{model_id}.png")
                preview_url = f"/api/preview/{model_id}" if os.path.exists(preview_file) else None
                stl_url = f"/api/stl/{model_id}"
                
                models.append({
                    "id": model_id,
                    "preview_url": preview_url,
                    "stl_url": stl_url
                })
            
            return self.templates.TemplateResponse(
                "models.html", 
                {"request": request, "models": models}
            )
        
        # API endpoints for serving files
        
        # Serve preview image
        @self.app.get("/api/preview/{preview_id}")
        async def get_preview(preview_id: str):
            preview_file = os.path.join(self.preview_dir, f"{preview_id}.png")
            if not os.path.exists(preview_file):
                raise HTTPException(status_code=404, detail="Preview not found")
            
            # Return the file
            with open(preview_file, "rb") as f:
                content = f.read()
            
            return {
                "content": base64.b64encode(content).decode("utf-8"),
                "content_type": "image/png"
            }
        
        # Serve STL file
        @self.app.get("/api/stl/{model_id}")
        async def get_stl(model_id: str):
            stl_file = os.path.join(self.stl_dir, f"{model_id}.stl")
            if not os.path.exists(stl_file):
                raise HTTPException(status_code=404, detail="STL file not found")
            
            # Return the file
            with open(stl_file, "rb") as f:
                content = f.read()
            
            return {
                "content": base64.b64encode(content).decode("utf-8"),
                "content_type": "application/octet-stream",
                "filename": f"{model_id}.stl"
            }
        
        # Register the router with the app
        self.app.include_router(self.router)
    
    def _create_template_files(self):
        """Create template files for the web interface."""
        # Create base template
        base_template = """<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>{% block title %}OpenSCAD MCP Server{% endblock %}</title>
    <link rel="stylesheet" href="/static/styles.css">
</head>
<body>
    <header>
        <h1>OpenSCAD MCP Server</h1>
        <nav>
            <ul>
                <li><a href="/ui/">Home</a></li>
                <li><a href="/ui/models">Models</a></li>
            </ul>
        </nav>
    </header>
    
    <main>
        {% block content %}{% endblock %}
    </main>
    
    <footer>
        <p>OpenSCAD MCP Server - Model Context Protocol Implementation</p>
    </footer>
    
    <script src="/static/script.js"></script>
</body>
</html>
"""
        
        # Create index template
        index_template = """{% extends "base.html" %}

{% block title %}OpenSCAD MCP Server - Home{% endblock %}

{% block content %}
<section class="hero">
    <h2>Welcome to OpenSCAD MCP Server</h2>
    <p>A Model Context Protocol server for generating 3D models with OpenSCAD</p>
</section>

<section class="features">
    <div class="feature">
        <h3>Natural Language Processing</h3>
        <p>Describe 3D objects in natural language and get parametric models</p>
    </div>
    
    <div class="feature">
        <h3>Preview Generation</h3>
        <p>See your models from multiple angles before exporting</p>
    </div>
    
    <div class="feature">
        <h3>STL Export</h3>
        <p>Generate STL files ready for 3D printing</p>
    </div>
</section>
{% endblock %}
"""
        
        # Create preview template
        preview_template = """{% extends "base.html" %}

{% block title %}Model Preview - {{ model_id }}{% endblock %}

{% block content %}
<section class="model-preview">
    <h2>Model Preview: {{ model_id }}</h2>
    
    <div class="preview-container">
        {% for angle, url in previews.items() %}
        <div class="preview-angle">
            <h3>{{ angle|title }} View</h3>
            <img src="{{ url }}" alt="{{ angle }} view of {{ model_id }}" class="preview-image" data-angle="{{ angle }}">
        </div>
        {% endfor %}
    </div>
    
    {% if stl_url %}
    <div class="download-container">
        <a href="{{ stl_url }}" class="download-button" download="{{ model_id }}.stl">Download STL</a>
    </div>
    {% endif %}
</section>
{% endblock %}
"""
        
        # Create models template
        models_template = """{% extends "base.html" %}

{% block title %}All Models{% endblock %}

{% block content %}
<section class="models-list">
    <h2>All Models</h2>
    
    {% if models %}
    <div class="models-grid">
        {% for model in models %}
        <div class="model-card">
            <h3>{{ model.id }}</h3>
            {% if model.preview_url %}
            <img src="{{ model.preview_url }}" alt="Preview of {{ model.id }}" class="model-thumbnail">
            {% else %}
            <div class="no-preview">No preview available</div>
            {% endif %}
            <div class="model-actions">
                <a href="/ui/preview/{{ model.id }}" class="view-button">View</a>
                <a href="{{ model.stl_url }}" class="download-button" download="{{ model.id }}.stl">Download</a>
            </div>
        </div>
        {% endfor %}
    </div>
    {% else %}
    <p>No models found.</p>
    {% endif %}
</section>
{% endblock %}
"""
        
        # Write templates to files
        os.makedirs(self.templates_dir, exist_ok=True)
        
        with open(os.path.join(self.templates_dir, "base.html"), "w") as f:
            f.write(base_template)
        
        with open(os.path.join(self.templates_dir, "index.html"), "w") as f:
            f.write(index_template)
        
        with open(os.path.join(self.templates_dir, "preview.html"), "w") as f:
            f.write(preview_template)
        
        with open(os.path.join(self.templates_dir, "models.html"), "w") as f:
            f.write(models_template)
    
    def _create_static_files(self):
        """Create static files for the web interface."""
        # Create CSS file
        css = """/* Base styles */
* {
    box-sizing: border-box;
    margin: 0;
    padding: 0;
}

body {
    font-family: Arial, sans-serif;
    line-height: 1.6;
    color: #333;
    background-color: #f4f4f4;
}

header {
    background-color: #333;
    color: #fff;
    padding: 1rem;
}

header h1 {
    margin-bottom: 0.5rem;
}

nav ul {
    display: flex;
    list-style: none;
}

nav ul li {
    margin-right: 1rem;
}

nav ul li a {
    color: #fff;
    text-decoration: none;
}

nav ul li a:hover {
    text-decoration: underline;
}

main {
    max-width: 1200px;
    margin: 0 auto;
    padding: 2rem;
}

footer {
    background-color: #333;
    color: #fff;
    text-align: center;
    padding: 1rem;
    margin-top: 2rem;
}

/* Home page */
.hero {
    text-align: center;
    margin-bottom: 2rem;
}

.hero h2 {
    font-size: 2rem;
    margin-bottom: 1rem;
}

.features {
    display: flex;
    justify-content: space-between;
    flex-wrap: wrap;
}

.feature {
    flex: 1;
    background-color: #fff;
    padding: 1.5rem;
    margin: 0.5rem;
    border-radius: 5px;
    box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1);
}

.feature h3 {
    margin-bottom: 1rem;
}

/* Preview page */
.model-preview h2 {
    margin-bottom: 1.5rem;
}

.preview-container {
    display: flex;
    flex-wrap: wrap;
    gap: 1rem;
    margin-bottom: 1.5rem;
}

.preview-angle {
    flex: 1;
    min-width: 300px;
    background-color: #fff;
    padding: 1rem;
    border-radius: 5px;
    box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1);
}

.preview-angle h3 {
    margin-bottom: 0.5rem;
}

.preview-image {
    width: 100%;
    height: auto;
    border: 1px solid #ddd;
}

.download-container {
    text-align: center;
    margin-top: 1rem;
}

.download-button {
    display: inline-block;
    background-color: #4CAF50;
    color: white;
    padding: 0.5rem 1rem;
    text-decoration: none;
    border-radius: 4px;
}

.download-button:hover {
    background-color: #45a049;
}

/* Models page */
.models-grid {
    display: grid;
    grid-template-columns: repeat(auto-fill, minmax(250px, 1fr));
    gap: 1.5rem;
}

.model-card {
    background-color: #fff;
    border-radius: 5px;
    box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1);
    overflow: hidden;
}

.model-card h3 {
    padding: 1rem;
    background-color: #f8f8f8;
    border-bottom: 1px solid #eee;
}

.model-thumbnail {
    width: 100%;
    height: 200px;
    object-fit: contain;
    background-color: #f4f4f4;
}

.no-preview {
    width: 100%;
    height: 200px;
    display: flex;
    align-items: center;
    justify-content: center;
    background-color: #f4f4f4;
    color: #999;
}

.model-actions {
    display: flex;
    padding: 1rem;
}

.view-button, .download-button {
    flex: 1;
    text-align: center;
    padding: 0.5rem;
    text-decoration: none;
    border-radius: 4px;
    margin: 0 0.25rem;
}

.view-button {
    background-color: #2196F3;
    color: white;
}

.view-button:hover {
    background-color: #0b7dda;
}
"""
        
        # Create JavaScript file
        js = """// JavaScript for OpenSCAD MCP Server web interface

document.addEventListener('DOMContentLoaded', function() {
    // Handle image loading errors
    const images = document.querySelectorAll('img');
    images.forEach(img => {
        img.onerror = function() {
            this.src = '/static/placeholder.png';
        };
    });
    
    // Handle STL download
    const downloadButtons = document.querySelectorAll('.download-button');
    downloadButtons.forEach(button => {
        button.addEventListener('click', async function(e) {
            e.preventDefault();
            
            const url = this.getAttribute('href');
            const filename = this.hasAttribute('download') ? this.getAttribute('download') : 'model.stl';
            
            try {
                const response = await fetch(url);
                const data = await response.json();
                
                // Decode base64 content
                const content = atob(data.content);
                
                // Convert to Blob
                const bytes = new Uint8Array(content.length);
                for (let i = 0; i < content.length; i++) {
                    bytes[i] = content.charCodeAt(i);
                }
                const blob = new Blob([bytes], { type: data.content_type });
                
                // Create download link
                const downloadLink = document.createElement('a');
                downloadLink.href = URL.createObjectURL(blob);
                downloadLink.download = data.filename || filename;
                
                // Trigger download
                document.body.appendChild(downloadLink);
                downloadLink.click();
                document.body.removeChild(downloadLink);
            } catch (error) {
                console.error('Error downloading file:', error);
                alert('Error downloading file. Please try again.');
            }
        });
    });
    
    // Handle preview images
    const previewImages = document.querySelectorAll('.preview-image');
    previewImages.forEach(img => {
        img.addEventListener('click', function() {
            const url = this.getAttribute('src');
            const angle = this.getAttribute('data-angle');
            
            // Create modal for larger view
            const modal = document.createElement('div');
            modal.className = 'preview-modal';
            modal.innerHTML = `
                <div class="modal-content">
                    <span class="close-button">&times;</span>
                    <h3>${angle ? angle.charAt(0).toUpperCase() + angle.slice(1) + ' View' : 'Preview'}</h3>
                    <img src="${url}" alt="Preview">
                </div>
            `;
            
            // Add modal styles
            modal.style.position = 'fixed';
            modal.style.top = '0';
            modal.style.left = '0';
            modal.style.width = '100%';
            modal.style.height = '100%';
            modal.style.backgroundColor = 'rgba(0,0,0,0.7)';
            modal.style.display = 'flex';
            modal.style.alignItems = 'center';
            modal.style.justifyContent = 'center';
            modal.style.zIndex = '1000';
            
            const modalContent = modal.querySelector('.modal-content');
            modalContent.style.backgroundColor = '#fff';
            modalContent.style.padding = '20px';
            modalContent.style.borderRadius = '5px';
            modalContent.style.maxWidth = '90%';
            modalContent.style.maxHeight = '90%';
            modalContent.style.overflow = 'auto';
            
            const closeButton = modal.querySelector('.close-button');
            closeButton.style.float = 'right';
            closeButton.style.fontSize = '1.5rem';
            closeButton.style.fontWeight = 'bold';
            closeButton.style.cursor = 'pointer';
            
            const modalImg = modal.querySelector('img');
            modalImg.style.maxWidth = '100%';
            modalImg.style.maxHeight = '70vh';
            modalImg.style.display = 'block';
            modalImg.style.margin = '0 auto';
            
            // Add modal to body
            document.body.appendChild(modal);
            
            // Close modal when clicking close button or outside the modal
            closeButton.addEventListener('click', function() {
                document.body.removeChild(modal);
            });
            
            modal.addEventListener('click', function(e) {
                if (e.target === modal) {
                    document.body.removeChild(modal);
                }
            });
        });
    });
});
"""
        
        # Create placeholder image
        placeholder_svg = """<svg xmlns="http://www.w3.org/2000/svg" width="800" height="600" viewBox="0 0 800 600">
    <rect width="800" height="600" fill="#f0f0f0"/>
    <text x="400" y="300" font-family="Arial" font-size="24" text-anchor="middle" fill="#999">Preview not available</text>
</svg>"""
        
        # Write static files
        os.makedirs(self.static_dir, exist_ok=True)
        
        with open(os.path.join(self.static_dir, "styles.css"), "w") as f:
            f.write(css)
        
        with open(os.path.join(self.static_dir, "script.js"), "w") as f:
            f.write(js)
        
        with open(os.path.join(self.static_dir, "placeholder.svg"), "w") as f:
            f.write(placeholder_svg)

```

--------------------------------------------------------------------------------
/src/remote/cuda_mvs_server.py:
--------------------------------------------------------------------------------

```python
"""
Server for remote CUDA Multi-View Stereo processing.

This module provides a server implementation that can be deployed on a machine
with CUDA capabilities to process multi-view images into 3D models remotely.
"""

import os
import sys
import json
import uuid
import logging
import argparse
import tempfile
import subprocess
from typing import Dict, List, Optional, Any, Union
from pathlib import Path
import shutil
import time

from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks, Depends
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import APIKeyHeader
import uvicorn
from pydantic import BaseModel, Field
from zeroconf import ServiceInfo, Zeroconf

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Constants
DEFAULT_PORT = 8765
DEFAULT_HOST = "0.0.0.0"
DEFAULT_CUDA_MVS_PATH = "/opt/cuda-multi-view-stereo"
DEFAULT_OUTPUT_DIR = "output"
DEFAULT_MAX_JOBS = 5
DEFAULT_MAX_IMAGES_PER_JOB = 50
DEFAULT_JOB_TIMEOUT = 3600  # 1 hour

# Models
class JobStatus(BaseModel):
    """Job status model."""
    job_id: str
    status: str = "created"  # created, uploading, processing, completed, failed
    created_at: float = Field(default_factory=time.time)
    updated_at: float = Field(default_factory=time.time)
    num_images: int = 0
    images_uploaded: int = 0
    progress: float = 0.0
    error_message: Optional[str] = None
    model_id: Optional[str] = None
    output_dir: Optional[str] = None
    point_cloud_file: Optional[str] = None
    obj_file: Optional[str] = None
    processing_time: Optional[float] = None

class ServerConfig(BaseModel):
    """Server configuration model."""
    cuda_mvs_path: str = DEFAULT_CUDA_MVS_PATH
    output_dir: str = DEFAULT_OUTPUT_DIR
    max_jobs: int = DEFAULT_MAX_JOBS
    max_images_per_job: int = DEFAULT_MAX_IMAGES_PER_JOB
    job_timeout: int = DEFAULT_JOB_TIMEOUT
    api_key: Optional[str] = None
    server_name: str = "CUDA MVS Server"
    advertise_service: bool = True
    gpu_info: Optional[str] = None

class JobRequest(BaseModel):
    """Job creation request model."""
    num_images: int

class ProcessRequest(BaseModel):
    """Process job request model."""
    reconstruction_quality: str = "normal"  # low, normal, high
    output_formats: List[str] = ["obj", "ply"]

class ServerInfo(BaseModel):
    """Server information model."""
    server_id: str
    name: str
    version: str = "1.0.0"
    status: str = "running"
    capabilities: Dict[str, Any]
    jobs: Dict[str, Any]
    uptime: float

# Server implementation
class CUDAMVSServer:
    """
    Server for remote CUDA Multi-View Stereo processing.
    
    This server:
    1. Accepts image uploads
    2. Processes images using CUDA MVS
    3. Provides 3D model downloads
    4. Advertises itself on the local network
    """
    
    def __init__(self, config: ServerConfig):
        """
        Initialize the CUDA MVS server.
        
        Args:
            config: Server configuration
        """
        self.config = config
        self.app = FastAPI(title="CUDA MVS Server", description="Remote CUDA Multi-View Stereo processing server")
        self.jobs: Dict[str, JobStatus] = {}
        self.server_id = str(uuid.uuid4())
        self.start_time = time.time()
        self.zeroconf = None
        
        # Create output directory
        os.makedirs(config.output_dir, exist_ok=True)
        
        # Configure CORS
        self.app.add_middleware(
            CORSMiddleware,
            allow_origins=["*"],
            allow_credentials=True,
            allow_methods=["*"],
            allow_headers=["*"],
        )
        
        # Detect GPU info
        self.detect_gpu_info()
        
        # Register routes
        self.register_routes()
        
        # Advertise service if enabled
        if config.advertise_service:
            self.advertise_service()
    
    def detect_gpu_info(self):
        """Detect GPU information."""
        if not self.config.gpu_info:
            try:
                # Try to get GPU info using nvidia-smi
                result = subprocess.run(
                    ["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader"],
                    capture_output=True,
                    text=True,
                    check=True
                )
                self.config.gpu_info = result.stdout.strip()
            except (subprocess.SubprocessError, FileNotFoundError):
                # If nvidia-smi fails, try lspci
                try:
                    result = subprocess.run(
                        ["lspci", "-v", "|", "grep", "-i", "vga"],
                        capture_output=True,
                        text=True,
                        shell=True
                    )
                    self.config.gpu_info = result.stdout.strip()
                except subprocess.SubprocessError:
                    self.config.gpu_info = "Unknown GPU"
    
    def register_routes(self):
        """Register API routes."""
        
        # Authentication dependency
        api_key_header = APIKeyHeader(name="Authorization", auto_error=False)
        
        async def verify_api_key(api_key: str = Depends(api_key_header)):
            if self.config.api_key:
                if not api_key:
                    raise HTTPException(status_code=401, detail="API key required")
                
                # Check if the API key is in the format "Bearer <key>"
                if api_key.startswith("Bearer "):
                    api_key = api_key[7:]
                
                if api_key != self.config.api_key:
                    raise HTTPException(status_code=401, detail="Invalid API key")
            return True
        
        # Status endpoint
        @self.app.get("/api/status")
        async def get_status():
            return self.get_server_info()
        
        # Job management endpoints
        @self.app.post("/api/jobs", status_code=201, dependencies=[Depends(verify_api_key)])
        async def create_job(job_request: JobRequest):
            return self.create_job(job_request.num_images)
        
        @self.app.get("/api/jobs", dependencies=[Depends(verify_api_key)])
        async def list_jobs():
            return {"jobs": self.jobs}
        
        @self.app.get("/api/jobs/{job_id}", dependencies=[Depends(verify_api_key)])
        async def get_job(job_id: str):
            if job_id not in self.jobs:
                raise HTTPException(status_code=404, detail="Job not found")
            return self.jobs[job_id]
        
        @self.app.delete("/api/jobs/{job_id}", dependencies=[Depends(verify_api_key)])
        async def cancel_job(job_id: str):
            if job_id not in self.jobs:
                raise HTTPException(status_code=404, detail="Job not found")
            
            # Cancel the job
            job = self.jobs[job_id]
            if job.status in ["created", "uploading", "processing"]:
                job.status = "cancelled"
                job.updated_at = time.time()
                
                # Clean up job directory
                job_dir = os.path.join(self.config.output_dir, job_id)
                if os.path.exists(job_dir):
                    shutil.rmtree(job_dir)
                
                return {"status": "success", "message": "Job cancelled"}
            else:
                return {"status": "error", "message": f"Cannot cancel job in {job.status} state"}
        
        # Image upload endpoint
        @self.app.post("/api/jobs/{job_id}/images", dependencies=[Depends(verify_api_key)])
        async def upload_image(
            job_id: str,
            file: UploadFile = File(...),
            metadata: str = Form(None)
        ):
            if job_id not in self.jobs:
                raise HTTPException(status_code=404, detail="Job not found")
            
            job = self.jobs[job_id]
            
            # Check if job is in a valid state for uploads
            if job.status not in ["created", "uploading"]:
                raise HTTPException(
                    status_code=400,
                    detail=f"Cannot upload images to job in {job.status} state"
                )
            
            # Check if we've reached the maximum number of images
            if job.images_uploaded >= job.num_images:
                raise HTTPException(
                    status_code=400,
                    detail=f"Maximum number of images ({job.num_images}) already uploaded"
                )
            
            # Update job status
            job.status = "uploading"
            job.updated_at = time.time()
            
            # Create job directory if it doesn't exist
            job_dir = os.path.join(self.config.output_dir, job_id, "images")
            os.makedirs(job_dir, exist_ok=True)
            
            # Parse metadata
            image_metadata = {}
            if metadata:
                try:
                    image_metadata = json.loads(metadata)
                except json.JSONDecodeError:
                    logger.warning(f"Invalid metadata format: {metadata}")
            
            # Save the file
            image_index = job.images_uploaded
            file_extension = os.path.splitext(file.filename)[1]
            image_path = os.path.join(job_dir, f"image_{image_index:04d}{file_extension}")
            
            with open(image_path, "wb") as f:
                shutil.copyfileobj(file.file, f)
            
            # Update job status
            job.images_uploaded += 1
            job.updated_at = time.time()
            
            return {
                "status": "success",
                "job_id": job_id,
                "image_index": image_index,
                "image_path": image_path,
                "images_uploaded": job.images_uploaded,
                "total_images": job.num_images
            }
        
        # Process job endpoint
        @self.app.post("/api/jobs/{job_id}/process", status_code=202, dependencies=[Depends(verify_api_key)])
        async def process_job(
            job_id: str,
            process_request: ProcessRequest,
            background_tasks: BackgroundTasks
        ):
            if job_id not in self.jobs:
                raise HTTPException(status_code=404, detail="Job not found")
            
            job = self.jobs[job_id]
            
            # Check if job is in a valid state for processing
            if job.status != "uploading":
                raise HTTPException(
                    status_code=400,
                    detail=f"Cannot process job in {job.status} state"
                )
            
            # Check if all images have been uploaded
            if job.images_uploaded < job.num_images:
                raise HTTPException(
                    status_code=400,
                    detail=f"Not all images uploaded ({job.images_uploaded}/{job.num_images})"
                )
            
            # Update job status
            job.status = "processing"
            job.updated_at = time.time()
            job.progress = 0.0
            
            # Start processing in the background
            background_tasks.add_task(
                self.process_job_task,
                job_id,
                process_request.reconstruction_quality,
                process_request.output_formats
            )
            
            return {
                "status": "success",
                "job_id": job_id,
                "message": "Processing started"
            }
        
        # Model download endpoint
        @self.app.get("/api/jobs/{job_id}/model", dependencies=[Depends(verify_api_key)])
        async def download_model(job_id: str, format: str = "obj"):
            if job_id not in self.jobs:
                raise HTTPException(status_code=404, detail="Job not found")
            
            job = self.jobs[job_id]
            
            # Check if job is completed
            if job.status != "completed":
                raise HTTPException(
                    status_code=400,
                    detail=f"Job is not completed. Current status: {job.status}"
                )
            
            # Check if the requested format is available
            if format == "obj" and job.obj_file:
                return FileResponse(job.obj_file, filename=f"{job.model_id}.obj")
            elif format == "ply" and job.point_cloud_file:
                return FileResponse(job.point_cloud_file, filename=f"{job.model_id}.ply")
            else:
                raise HTTPException(
                    status_code=404,
                    detail=f"Model in {format} format not available"
                )
    
    def create_job(self, num_images: int) -> Dict[str, Any]:
        """
        Create a new job.
        
        Args:
            num_images: Number of images to be uploaded
            
        Returns:
            Dictionary with job information
        """
        # Check if we've reached the maximum number of jobs
        active_jobs = sum(1 for job in self.jobs.values() if job.status in ["created", "uploading", "processing"])
        if active_jobs >= self.config.max_jobs:
            raise HTTPException(
                status_code=429,
                detail=f"Maximum number of active jobs ({self.config.max_jobs}) reached"
            )
        
        # Check if the number of images is valid
        if num_images <= 0 or num_images > self.config.max_images_per_job:
            raise HTTPException(
                status_code=400,
                detail=f"Number of images must be between 1 and {self.config.max_images_per_job}"
            )
        
        # Create a new job
        job_id = str(uuid.uuid4())
        job = JobStatus(
            job_id=job_id,
            num_images=num_images
        )
        
        # Add job to the list
        self.jobs[job_id] = job
        
        # Create job directory
        job_dir = os.path.join(self.config.output_dir, job_id)
        os.makedirs(job_dir, exist_ok=True)
        
        return job.dict()
    
    async def process_job_task(
        self,
        job_id: str,
        reconstruction_quality: str,
        output_formats: List[str]
    ):
        """
        Process a job in the background.
        
        Args:
            job_id: ID of the job to process
            reconstruction_quality: Quality of the reconstruction (low, normal, high)
            output_formats: List of output formats to generate
        """
        if job_id not in self.jobs:
            logger.error(f"Job {job_id} not found")
            return
        
        job = self.jobs[job_id]
        job_dir = os.path.join(self.config.output_dir, job_id)
        images_dir = os.path.join(job_dir, "images")
        output_dir = os.path.join(job_dir, "output")
        
        # Create output directory
        os.makedirs(output_dir, exist_ok=True)
        
        # Generate a model ID
        model_id = f"model_{job_id[:8]}"
        job.model_id = model_id
        
        # Update job status
        job.status = "processing"
        job.progress = 0.0
        job.updated_at = time.time()
        
        try:
            # Start timing
            start_time = time.time()
            
            # Run CUDA MVS
            await self.run_cuda_mvs(
                job,
                images_dir,
                output_dir,
                model_id,
                reconstruction_quality
            )
            
            # Convert output formats if needed
            if "obj" in output_formats and not job.obj_file:
                # Convert PLY to OBJ if needed
                ply_file = job.point_cloud_file
                if ply_file and os.path.exists(ply_file):
                    obj_file = os.path.join(output_dir, f"{model_id}.obj")
                    await self.convert_ply_to_obj(ply_file, obj_file)
                    job.obj_file = obj_file
            
            # Calculate processing time
            job.processing_time = time.time() - start_time
            
            # Update job status
            job.status = "completed"
            job.progress = 100.0
            job.updated_at = time.time()
            job.output_dir = output_dir
            
            logger.info(f"Job {job_id} completed successfully in {job.processing_time:.2f} seconds")
            
        except Exception as e:
            # Update job status
            job.status = "failed"
            job.error_message = str(e)
            job.updated_at = time.time()
            
            logger.error(f"Job {job_id} failed: {e}")
    
    async def run_cuda_mvs(
        self,
        job: JobStatus,
        images_dir: str,
        output_dir: str,
        model_id: str,
        reconstruction_quality: str
    ):
        """
        Run CUDA MVS on the uploaded images.
        
        Args:
            job: Job status object
            images_dir: Directory containing the images
            output_dir: Directory to save the output
            model_id: ID of the model
            reconstruction_quality: Quality of the reconstruction
        """
        # Check if CUDA MVS is installed
        cuda_mvs_executable = os.path.join(self.config.cuda_mvs_path, "build", "app_patch_match_mvs")
        if not os.path.exists(cuda_mvs_executable):
            raise FileNotFoundError(f"CUDA MVS executable not found at {cuda_mvs_executable}")
        
        # Create a list of image paths
        image_files = []
        for file in os.listdir(images_dir):
            if file.lower().endswith((".jpg", ".jpeg", ".png")):
                image_files.append(os.path.join(images_dir, file))
        
        if not image_files:
            raise ValueError("No valid image files found")
        
        # Sort image files to ensure consistent order
        image_files.sort()
        
        # Create a camera parameter file
        camera_params_file = os.path.join(output_dir, "cameras.txt")
        await self.generate_camera_params(image_files, camera_params_file)
        
        # Set quality parameters
        if reconstruction_quality == "low":
            num_iterations = 3
            max_resolution = 1024
        elif reconstruction_quality == "normal":
            num_iterations = 5
            max_resolution = 2048
        elif reconstruction_quality == "high":
            num_iterations = 7
            max_resolution = 4096
        else:
            num_iterations = 5
            max_resolution = 2048
        
        # Prepare output files
        point_cloud_file = os.path.join(output_dir, f"{model_id}.ply")
        
        # Build the command
        cmd = [
            cuda_mvs_executable,
            "--input_folder", images_dir,
            "--camera_file", camera_params_file,
            "--output_folder", output_dir,
            "--output_file", point_cloud_file,
            "--num_iterations", str(num_iterations),
            "--max_resolution", str(max_resolution)
        ]
        
        # Run the command
        logger.info(f"Running CUDA MVS: {' '.join(cmd)}")
        
        process = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        
        # Monitor progress
        while True:
            if process.poll() is not None:
                break
            
            # Read output line by line
            output = process.stdout.readline()
            if output:
                # Try to parse progress information
                if "Progress:" in output:
                    try:
                        progress_str = output.split("Progress:")[1].strip().rstrip("%")
                        progress = float(progress_str)
                        job.progress = progress
                        job.updated_at = time.time()
                    except (ValueError, IndexError):
                        pass
            
            # Sleep briefly to avoid CPU spinning
            await asyncio.sleep(0.1)
        
        # Get the final output
        stdout, stderr = process.communicate()
        
        # Check if the process was successful
        if process.returncode != 0:
            raise RuntimeError(f"CUDA MVS failed with error: {stderr}")
        
        # Check if the output file was created
        if not os.path.exists(point_cloud_file):
            raise FileNotFoundError(f"Output file not created: {point_cloud_file}")
        
        # Update job with the output file
        job.point_cloud_file = point_cloud_file
    
    async def generate_camera_params(self, image_files: List[str], output_file: str):
        """
        Generate camera parameters for CUDA MVS.
        
        Args:
            image_files: List of image files
            output_file: Output file for camera parameters
        """
        # For now, use a simple camera model with default parameters
        # In a real implementation, this would use structure from motion
        # to estimate camera parameters from the images
        
        with open(output_file, "w") as f:
            f.write(f"# Camera parameters for {len(image_files)} images\n")
            f.write("# Format: image_name width height fx fy cx cy\n")
            
            for i, image_file in enumerate(image_files):
                # Get image dimensions
                from PIL import Image
                with Image.open(image_file) as img:
                    width, height = img.size
                
                # Use default camera parameters
                fx = width * 1.2  # Focal length x
                fy = height * 1.2  # Focal length y
                cx = width / 2  # Principal point x
                cy = height / 2  # Principal point y
                
                # Write camera parameters
                f.write(f"{os.path.basename(image_file)} {width} {height} {fx} {fy} {cx} {cy}\n")
    
    async def convert_ply_to_obj(self, ply_file: str, obj_file: str):
        """
        Convert PLY file to OBJ format.
        
        Args:
            ply_file: Input PLY file
            obj_file: Output OBJ file
        """
        try:
            import open3d as o3d
            
            # Load the PLY file
            mesh = o3d.io.read_triangle_mesh(ply_file)
            
            # Save as OBJ
            o3d.io.write_triangle_mesh(obj_file, mesh)
            
            logger.info(f"Converted {ply_file} to {obj_file}")
            
        except ImportError:
            # If open3d is not available, use a subprocess
            try:
                # Try using meshlab
                subprocess.run(
                    ["meshlabserver", "-i", ply_file, "-o", obj_file],
                    check=True,
                    capture_output=True
                )
            except (subprocess.SubprocessError, FileNotFoundError):
                # If meshlab is not available, try using assimp
                try:
                    subprocess.run(
                        ["assimp", "export", ply_file, obj_file],
                        check=True,
                        capture_output=True
                    )
                except (subprocess.SubprocessError, FileNotFoundError):
                    raise RuntimeError("No suitable tool found to convert PLY to OBJ")
    
    def get_server_info(self) -> Dict[str, Any]:
        """
        Get server information.
        
        Returns:
            Dictionary with server information
        """
        # Count active jobs
        active_jobs = sum(1 for job in self.jobs.values() if job.status in ["created", "uploading", "processing"])
        
        # Get server capabilities
        capabilities = {
            "max_jobs": self.config.max_jobs,
            "max_images_per_job": self.config.max_images_per_job,
            "job_timeout": self.config.job_timeout,
            "supported_formats": ["obj", "ply"],
            "gpu_info": self.config.gpu_info
        }
        
        # Get job summary
        job_summary = {
            "total": len(self.jobs),
            "active": active_jobs,
            "completed": sum(1 for job in self.jobs.values() if job.status == "completed"),
            "failed": sum(1 for job in self.jobs.values() if job.status == "failed")
        }
        
        return {
            "server_id": self.server_id,
            "name": self.config.server_name,
            "version": "1.0.0",
            "status": "running",
            "capabilities": capabilities,
            "jobs": job_summary,
            "uptime": time.time() - self.start_time
        }
    
    def advertise_service(self):
        """Advertise the server on the local network using Zeroconf."""
        try:
            import socket
            from zeroconf import ServiceInfo, Zeroconf
            
            # Get local IP address
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(("8.8.8.8", 80))
            local_ip = s.getsockname()[0]
            s.close()
            
            # Create service info
            capabilities = {
                "max_jobs": self.config.max_jobs,
                "max_images_per_job": self.config.max_images_per_job,
                "supported_formats": ["obj", "ply"],
                "gpu_info": self.config.gpu_info
            }
            
            service_info = ServiceInfo(
                "_cudamvs._tcp.local.",
                f"{self.server_id}._cudamvs._tcp.local.",
                addresses=[socket.inet_aton(local_ip)],
                port=DEFAULT_PORT,
                properties={
                    b"name": self.config.server_name.encode("utf-8"),
                    b"capabilities": json.dumps(capabilities).encode("utf-8")
                }
            )
            
            # Register service
            self.zeroconf = Zeroconf()
            self.zeroconf.register_service(service_info)
            
            logger.info(f"Advertising CUDA MVS service on {local_ip}:{DEFAULT_PORT}")
            
        except Exception as e:
            logger.error(f"Failed to advertise service: {e}")
    
    def run(self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT):
        """
        Run the server.
        
        Args:
            host: Host to bind to
            port: Port to bind to
        """
        uvicorn.run(self.app, host=host, port=port)
    
    def cleanup(self):
        """Clean up resources."""
        if self.zeroconf:
            self.zeroconf.close()

# Main entry point
def main():
    """Main entry point."""
    parser = argparse.ArgumentParser(description="CUDA MVS Server")
    parser.add_argument("--host", default=DEFAULT_HOST, help="Host to bind to")
    parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Port to bind to")
    parser.add_argument("--cuda-mvs-path", default=DEFAULT_CUDA_MVS_PATH, help="Path to CUDA MVS installation")
    parser.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR, help="Output directory")
    parser.add_argument("--max-jobs", type=int, default=DEFAULT_MAX_JOBS, help="Maximum number of concurrent jobs")
    parser.add_argument("--max-images", type=int, default=DEFAULT_MAX_IMAGES_PER_JOB, help="Maximum images per job")
    parser.add_argument("--job-timeout", type=int, default=DEFAULT_JOB_TIMEOUT, help="Job timeout in seconds")
    parser.add_argument("--api-key", help="API key for authentication")
    parser.add_argument("--server-name", default="CUDA MVS Server", help="Server name")
    parser.add_argument("--no-advertise", action="store_true", help="Don't advertise service on the network")
    
    args = parser.parse_args()
    
    # Create server configuration
    config = ServerConfig(
        cuda_mvs_path=args.cuda_mvs_path,
        output_dir=args.output_dir,
        max_jobs=args.max_jobs,
        max_images_per_job=args.max_images,
        job_timeout=args.job_timeout,
        api_key=args.api_key,
        server_name=args.server_name,
        advertise_service=not args.no_advertise
    )
    
    # Create and run server
    server = CUDAMVSServer(config)
    
    try:
        server.run(host=args.host, port=args.port)
    finally:
        server.cleanup()

if __name__ == "__main__":
    # Add asyncio import for async/await support
    import asyncio
    main()

```
Page 2/3FirstPrevNextLast