This is page 2 of 3. Use http://codebase.md/jhacksman/openscad-mcp-server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── implementation_plan.md
├── old
│ ├── download_sam2_checkpoint.py
│ ├── src
│ │ ├── ai
│ │ │ └── sam_segmentation.py
│ │ ├── models
│ │ │ └── threestudio_generator.py
│ │ └── workflow
│ │ └── image_to_model_pipeline.py
│ └── test_sam2_segmentation.py
├── README.md
├── requirements.txt
├── rtfmd
│ ├── decisions
│ │ ├── ai-driven-code-generation.md
│ │ └── export-formats.md
│ ├── files
│ │ └── src
│ │ ├── ai
│ │ │ └── ai_service.py.md
│ │ ├── main.py.md
│ │ ├── models
│ │ │ └── code_generator.py.md
│ │ └── nlp
│ │ └── parameter_extractor.py.md
│ ├── knowledge
│ │ ├── ai
│ │ │ └── natural-language-processing.md
│ │ ├── nlp
│ │ │ └── parameter-extraction.md
│ │ └── openscad
│ │ ├── export-formats.md
│ │ ├── openscad-basics.md
│ │ └── primitive-testing.md
│ └── README.md
├── scad
│ └── simple_cube.scad
├── src
│ ├── __init__.py
│ ├── __pycache__
│ │ └── __init__.cpython-312.pyc
│ ├── ai
│ │ ├── ai_service.py
│ │ ├── gemini_api.py
│ │ └── venice_api.py
│ ├── config.py
│ ├── main_remote.py
│ ├── main.py
│ ├── main.py.new
│ ├── models
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ └── code_generator.cpython-312.pyc
│ │ ├── code_generator.py
│ │ ├── cuda_mvs.py
│ │ └── scad_templates
│ │ └── basic_shapes.scad
│ ├── nlp
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ └── parameter_extractor.cpython-312.pyc
│ │ └── parameter_extractor.py
│ ├── openscad_wrapper
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ └── wrapper.cpython-312.pyc
│ │ └── wrapper.py
│ ├── printer_discovery
│ │ ├── __init__.py
│ │ └── printer_discovery.py
│ ├── remote
│ │ ├── connection_manager.py
│ │ ├── cuda_mvs_client.py
│ │ ├── cuda_mvs_server.py
│ │ └── error_handling.py
│ ├── testing
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ ├── primitive_tester.cpython-312.pyc
│ │ │ └── test_primitives.cpython-312.pyc
│ │ ├── primitive_tester.py
│ │ └── test_primitives.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ ├── stl_exporter.cpython-312.pyc
│ │ │ └── stl_validator.cpython-312.pyc
│ │ ├── cad_exporter.py
│ │ ├── format_validator.py
│ │ ├── stl_exporter.py
│ │ ├── stl_repair.py
│ │ └── stl_validator.py
│ ├── visualization
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ └── renderer.cpython-312.pyc
│ │ ├── headless_renderer.py
│ │ ├── renderer.py
│ │ └── web_interface.py
│ └── workflow
│ ├── image_approval.py
│ └── multi_view_to_model_pipeline.py
├── test_complete_workflow.py
├── test_cuda_mvs.py
├── test_gemini_api.py
├── test_image_approval_workflow.py
├── test_image_approval.py
├── test_image_to_model_pipeline.py
├── test_model_selection.py
├── test_multi_view_pipeline.py
├── test_primitives.sh
├── test_rabbit_direct.py
├── test_remote_cuda_mvs.py
└── test_venice_example.py
```
# Files
--------------------------------------------------------------------------------
/test_multi_view_pipeline.py:
--------------------------------------------------------------------------------
```python
"""
Test script for multi-view to model pipeline.
"""
import os
import sys
import logging
import unittest
from unittest.mock import patch, MagicMock
from pathlib import Path
# Add the src directory to the path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.workflow.multi_view_to_model_pipeline import MultiViewToModelPipeline
from src.ai.gemini_api import GeminiImageGenerator
from src.models.cuda_mvs import CUDAMultiViewStereo
from src.workflow.image_approval import ImageApprovalTool
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TestMultiViewPipeline(unittest.TestCase):
"""
Test cases for multi-view to model pipeline.
"""
def setUp(self):
"""
Set up test environment.
"""
# Create test directories
self.test_output_dir = "output/test_pipeline"
self.test_images_dir = os.path.join(self.test_output_dir, "images")
self.test_models_dir = os.path.join(self.test_output_dir, "models")
for directory in [self.test_output_dir, self.test_images_dir, self.test_models_dir]:
os.makedirs(directory, exist_ok=True)
# Create mock CUDA MVS path
self.cuda_mvs_path = "mock_cuda_mvs"
os.makedirs(os.path.join(self.cuda_mvs_path, "build"), exist_ok=True)
# Create mock executable
with open(os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), "w") as f:
f.write("#!/bin/bash\necho 'Mock CUDA MVS'\n")
os.chmod(os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), 0o755)
# Create mock components
self.mock_gemini = MagicMock(spec=GeminiImageGenerator)
self.mock_cuda_mvs = MagicMock(spec=CUDAMultiViewStereo)
self.mock_approval = MagicMock(spec=ImageApprovalTool)
# Configure mock responses
self.configure_mocks()
# Create the pipeline with mock components
self.pipeline = MultiViewToModelPipeline(
gemini_generator=self.mock_gemini,
cuda_mvs=self.mock_cuda_mvs,
approval_tool=self.mock_approval,
output_dir=self.test_output_dir
)
def configure_mocks(self):
"""
Configure mock responses for components.
"""
# Mock Gemini image generation
def mock_generate_image(prompt, **kwargs):
image_path = os.path.join(self.test_images_dir, f"{prompt[:10].replace(' ', '_')}.png")
with open(image_path, "w") as f:
f.write(f"Mock image for {prompt}")
return {
"prompt": prompt,
"local_path": image_path,
"image_data": b"mock_image_data"
}
def mock_generate_multiple_views(prompt, num_views, **kwargs):
results = []
for i in range(num_views):
image_path = os.path.join(self.test_images_dir, f"view_{i}.png")
with open(image_path, "w") as f:
f.write(f"Mock image for {prompt} - view {i}")
results.append({
"prompt": f"{prompt} - view {i}",
"local_path": image_path,
"image_data": b"mock_image_data",
"view_direction": f"view {i}",
"view_index": i + 1
})
return results
self.mock_gemini.generate_image.side_effect = mock_generate_image
self.mock_gemini.generate_multiple_views.side_effect = mock_generate_multiple_views
# Mock CUDA MVS
def mock_generate_model(image_paths, **kwargs):
model_dir = os.path.join(self.test_models_dir, "mock_model")
os.makedirs(model_dir, exist_ok=True)
point_cloud_file = os.path.join(model_dir, "mock_model.ply")
with open(point_cloud_file, "w") as f:
f.write("Mock point cloud")
obj_file = os.path.join(model_dir, "mock_model.obj")
with open(obj_file, "w") as f:
f.write("Mock OBJ file")
return {
"model_id": "mock_model",
"output_dir": model_dir,
"point_cloud_file": point_cloud_file,
"obj_file": obj_file,
"input_images": image_paths
}
self.mock_cuda_mvs.generate_model_from_images.side_effect = mock_generate_model
self.mock_cuda_mvs.convert_ply_to_obj.return_value = os.path.join(self.test_models_dir, "mock_model", "mock_model.obj")
# Mock approval tool
def mock_present_image(image_path, metadata):
return {
"approval_id": os.path.basename(image_path).split('.')[0],
"image_path": image_path,
"image_url": f"/images/{os.path.basename(image_path)}",
"metadata": metadata or {}
}
def mock_process_approval(approval_id, approved, image_path):
if approved:
approved_path = os.path.join(self.test_output_dir, "approved", os.path.basename(image_path))
os.makedirs(os.path.dirname(approved_path), exist_ok=True)
with open(approved_path, "w") as f:
f.write(f"Approved image {approval_id}")
return {
"approval_id": approval_id,
"approved": True,
"original_path": image_path,
"approved_path": approved_path
}
else:
return {
"approval_id": approval_id,
"approved": False,
"original_path": image_path
}
self.mock_approval.present_image_for_approval.side_effect = mock_present_image
self.mock_approval.process_approval.side_effect = mock_process_approval
self.mock_approval.get_approved_images.return_value = [
os.path.join(self.test_output_dir, "approved", f"view_{i}.png") for i in range(3)
]
def test_generate_model_from_text(self):
"""
Test generating a 3D model from text prompt.
"""
# Test parameters
prompt = "A low-poly rabbit"
num_views = 3
# Mock approvals - approve all images
def mock_get_approval(approval_request):
return True
# Call the method
result = self.pipeline.generate_model_from_text(
prompt, num_views=num_views, get_approval_callback=mock_get_approval
)
# Verify the result
self.assertIsNotNone(result)
self.assertTrue("model_id" in result)
self.assertTrue("obj_file" in result)
self.assertTrue("point_cloud_file" in result)
# Verify component calls
self.mock_gemini.generate_multiple_views.assert_called_once_with(
prompt, num_views=num_views, output_dir=os.path.join(self.test_output_dir, "multi_view")
)
self.assertEqual(self.mock_approval.present_image_for_approval.call_count, num_views)
self.assertEqual(self.mock_approval.process_approval.call_count, num_views)
self.mock_cuda_mvs.generate_model_from_images.assert_called_once()
self.mock_cuda_mvs.convert_ply_to_obj.assert_called_once()
def test_generate_model_from_image(self):
"""
Test generating a 3D model from a base image.
"""
# Create a mock base image
base_image_path = os.path.join(self.test_images_dir, "base_image.png")
with open(base_image_path, "w") as f:
f.write("Mock base image")
# Test parameters
prompt = "A low-poly rabbit based on this image"
num_views = 3
# Mock approvals - approve all images
def mock_get_approval(approval_request):
return True
# Call the method
result = self.pipeline.generate_model_from_image(
base_image_path, prompt, num_views=num_views, get_approval_callback=mock_get_approval
)
# Verify the result
self.assertIsNotNone(result)
self.assertTrue("model_id" in result)
self.assertTrue("obj_file" in result)
self.assertTrue("point_cloud_file" in result)
# Verify component calls
self.mock_gemini.generate_multiple_views.assert_called_once_with(
prompt, num_views=num_views, base_image_path=base_image_path,
output_dir=os.path.join(self.test_output_dir, "multi_view")
)
self.assertEqual(self.mock_approval.present_image_for_approval.call_count, num_views)
self.assertEqual(self.mock_approval.process_approval.call_count, num_views)
self.mock_cuda_mvs.generate_model_from_images.assert_called_once()
self.mock_cuda_mvs.convert_ply_to_obj.assert_called_once()
def test_selective_approval(self):
"""
Test selective approval of generated images.
"""
# Test parameters
prompt = "A low-poly rabbit"
num_views = 4
# Mock approvals - only approve views 0 and 2
def mock_get_approval(approval_request):
view_index = int(approval_request["approval_id"].split('_')[1])
return view_index % 2 == 0 # Approve even-indexed views
# Call the method
result = self.pipeline.generate_model_from_text(
prompt, num_views=num_views, get_approval_callback=mock_get_approval
)
# Verify the result
self.assertIsNotNone(result)
# Verify component calls
self.assertEqual(self.mock_approval.present_image_for_approval.call_count, num_views)
self.assertEqual(self.mock_approval.process_approval.call_count, num_views)
# Only 2 images should be approved and used for model generation
approved_images = [call[0][0] for call in self.mock_cuda_mvs.generate_model_from_images.call_args_list]
if approved_images:
self.assertEqual(len(approved_images[0]), 2) # Only 2 images approved
def test_error_handling(self):
"""
Test error handling in the pipeline.
"""
# Test parameters
prompt = "A low-poly rabbit"
# Mock error in Gemini API
self.mock_gemini.generate_multiple_views.side_effect = Exception("Mock API error")
# Call the method and expect an exception
with self.assertRaises(Exception):
self.pipeline.generate_model_from_text(prompt)
def tearDown(self):
"""
Clean up after tests.
"""
# Clean up test output directory
import shutil
if os.path.exists(self.test_output_dir):
shutil.rmtree(self.test_output_dir)
# Clean up mock CUDA MVS path
if os.path.exists(self.cuda_mvs_path):
shutil.rmtree(self.cuda_mvs_path)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/src/main_remote.py:
--------------------------------------------------------------------------------
```python
"""
Main module for OpenSCAD MCP Server with remote CUDA MVS processing.
This module adds remote CUDA MVS processing capabilities to the MCP server.
"""
import os
import logging
from typing import Dict, Any, List, Optional
# Import remote processing components
from src.remote.cuda_mvs_client import CUDAMVSClient
from src.remote.connection_manager import CUDAMVSConnectionManager
from src.config import REMOTE_CUDA_MVS
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize remote processing components if enabled
remote_connection_manager = None
remote_jobs = {}
def initialize_remote_processing():
"""
Initialize remote CUDA MVS processing components.
Returns:
CUDAMVSConnectionManager instance if enabled, None otherwise
"""
global remote_connection_manager
if REMOTE_CUDA_MVS["ENABLED"]:
logger.info("Initializing remote CUDA MVS connection manager")
remote_connection_manager = CUDAMVSConnectionManager(
api_key=REMOTE_CUDA_MVS["API_KEY"],
discovery_port=REMOTE_CUDA_MVS["DISCOVERY_PORT"],
use_lan_discovery=REMOTE_CUDA_MVS["USE_LAN_DISCOVERY"],
server_url=REMOTE_CUDA_MVS["SERVER_URL"] if REMOTE_CUDA_MVS["SERVER_URL"] else None
)
return remote_connection_manager
return None
def discover_remote_servers():
"""
Discover remote CUDA MVS servers on the network.
Returns:
List of discovered servers
"""
if not remote_connection_manager:
logger.warning("Remote CUDA MVS processing is not enabled")
return []
return remote_connection_manager.discover_servers()
def get_server_status(server_id: str):
"""
Get the status of a remote CUDA MVS server.
Args:
server_id: ID of the server to get status for
Returns:
Server status information
"""
if not remote_connection_manager:
logger.warning("Remote CUDA MVS processing is not enabled")
return None
return remote_connection_manager.get_server_status(server_id)
def upload_images_to_server(server_id: str, image_paths: List[str], job_id: Optional[str] = None):
"""
Upload images to a remote CUDA MVS server.
Args:
server_id: ID of the server to upload to
image_paths: List of image paths to upload
job_id: Optional job ID to use
Returns:
Job information
"""
if not remote_connection_manager:
logger.warning("Remote CUDA MVS processing is not enabled")
return None
return remote_connection_manager.upload_images(server_id, image_paths, job_id)
def process_images_remotely(server_id: str, job_id: str, params: Dict[str, Any] = None):
"""
Process uploaded images on a remote CUDA MVS server.
Args:
server_id: ID of the server to process on
job_id: Job ID of the uploaded images
params: Optional processing parameters
Returns:
Job status information
"""
if not remote_connection_manager:
logger.warning("Remote CUDA MVS processing is not enabled")
return None
# Set default parameters if not provided
if params is None:
params = {
"quality": REMOTE_CUDA_MVS["DEFAULT_RECONSTRUCTION_QUALITY"],
"output_format": REMOTE_CUDA_MVS["DEFAULT_OUTPUT_FORMAT"]
}
# Start processing
result = remote_connection_manager.process_job(server_id, job_id, params)
# Store job information
if result and "job_id" in result:
remote_jobs[result["job_id"]] = {
"server_id": server_id,
"job_id": result["job_id"],
"status": result.get("status", "processing"),
"params": params
}
return result
def get_job_status(job_id: str):
"""
Get the status of a remote processing job.
Args:
job_id: ID of the job to get status for
Returns:
Job status information
"""
if not remote_connection_manager:
logger.warning("Remote CUDA MVS processing is not enabled")
return None
# Check if job exists
if job_id not in remote_jobs:
logger.warning(f"Job with ID {job_id} not found")
return None
# Get job information
job_info = remote_jobs[job_id]
# Get status from server
status = remote_connection_manager.get_job_status(job_info["server_id"], job_id)
# Update job information
if status:
job_info["status"] = status.get("status", job_info["status"])
job_info["progress"] = status.get("progress", 0)
job_info["message"] = status.get("message", "")
return job_info
def download_model(job_id: str, output_dir: Optional[str] = None):
"""
Download a processed model from a remote CUDA MVS server.
Args:
job_id: ID of the job to download model for
output_dir: Optional directory to save the model to
Returns:
Model information
"""
if not remote_connection_manager:
logger.warning("Remote CUDA MVS processing is not enabled")
return None
# Check if job exists
if job_id not in remote_jobs:
logger.warning(f"Job with ID {job_id} not found")
return None
# Get job information
job_info = remote_jobs[job_id]
# Set default output directory if not provided
if output_dir is None:
output_dir = os.path.join(REMOTE_CUDA_MVS["OUTPUT_DIR"], job_id)
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Download model
result = remote_connection_manager.download_model(job_info["server_id"], job_id, output_dir)
# Update job information
if result:
job_info["model_path"] = result.get("model_path")
job_info["point_cloud_path"] = result.get("point_cloud_path")
job_info["completed"] = True
return result
def cancel_job(job_id: str):
"""
Cancel a remote processing job.
Args:
job_id: ID of the job to cancel
Returns:
Cancellation result
"""
if not remote_connection_manager:
logger.warning("Remote CUDA MVS processing is not enabled")
return None
# Check if job exists
if job_id not in remote_jobs:
logger.warning(f"Job with ID {job_id} not found")
return None
# Get job information
job_info = remote_jobs[job_id]
# Cancel job
result = remote_connection_manager.cancel_job(job_info["server_id"], job_id)
# Update job information
if result and result.get("cancelled", False):
job_info["status"] = "cancelled"
job_info["message"] = "Job cancelled by user"
return result
# MCP tool functions for remote processing
def discover_remote_cuda_mvs_servers():
"""
MCP tool function to discover remote CUDA MVS servers.
Returns:
Dictionary with discovered servers
"""
servers = discover_remote_servers()
return {
"servers": servers,
"count": len(servers)
}
def get_remote_server_status(server_id: str):
"""
MCP tool function to get the status of a remote CUDA MVS server.
Args:
server_id: ID of the server to get status for
Returns:
Dictionary with server status
"""
status = get_server_status(server_id)
if not status:
raise ValueError(f"Failed to get status for server with ID {server_id}")
return status
def process_images_with_remote_cuda_mvs(
server_id: str,
image_paths: List[str],
quality: str = REMOTE_CUDA_MVS["DEFAULT_RECONSTRUCTION_QUALITY"],
output_format: str = REMOTE_CUDA_MVS["DEFAULT_OUTPUT_FORMAT"],
wait_for_completion: bool = REMOTE_CUDA_MVS["WAIT_FOR_COMPLETION"]
):
"""
MCP tool function to process images with remote CUDA MVS.
Args:
server_id: ID of the server to process on
image_paths: List of image paths to process
quality: Reconstruction quality (low, normal, high)
output_format: Output format (obj, ply)
wait_for_completion: Whether to wait for job completion
Returns:
Dictionary with job information
"""
# Upload images
upload_result = upload_images_to_server(server_id, image_paths)
if not upload_result or "job_id" not in upload_result:
raise ValueError("Failed to upload images to server")
job_id = upload_result["job_id"]
# Process images
process_result = process_images_remotely(
server_id,
job_id,
{
"quality": quality,
"output_format": output_format
}
)
if not process_result:
raise ValueError(f"Failed to process images for job {job_id}")
# Wait for completion if requested
if wait_for_completion:
import time
while True:
status = get_job_status(job_id)
if not status:
raise ValueError(f"Failed to get status for job {job_id}")
if status["status"] in ["completed", "failed", "cancelled"]:
break
time.sleep(REMOTE_CUDA_MVS["POLL_INTERVAL"])
if status["status"] == "completed":
# Download model
download_result = download_model(job_id)
if not download_result:
raise ValueError(f"Failed to download model for job {job_id}")
return {
"job_id": job_id,
"status": "completed",
"model_path": download_result.get("model_path"),
"point_cloud_path": download_result.get("point_cloud_path")
}
else:
return {
"job_id": job_id,
"status": status["status"],
"message": status.get("message", "")
}
# Return job information without waiting
return {
"job_id": job_id,
"status": "processing",
"server_id": server_id
}
def get_remote_job_status(job_id: str):
"""
MCP tool function to get the status of a remote processing job.
Args:
job_id: ID of the job to get status for
Returns:
Dictionary with job status
"""
status = get_job_status(job_id)
if not status:
raise ValueError(f"Failed to get status for job with ID {job_id}")
return status
def download_remote_model(job_id: str, output_dir: Optional[str] = None):
"""
MCP tool function to download a processed model from a remote CUDA MVS server.
Args:
job_id: ID of the job to download model for
output_dir: Optional directory to save the model to
Returns:
Dictionary with model information
"""
result = download_model(job_id, output_dir)
if not result:
raise ValueError(f"Failed to download model for job with ID {job_id}")
return result
def cancel_remote_job(job_id: str):
"""
MCP tool function to cancel a remote processing job.
Args:
job_id: ID of the job to cancel
Returns:
Dictionary with cancellation result
"""
result = cancel_job(job_id)
if not result:
raise ValueError(f"Failed to cancel job with ID {job_id}")
return result
```
--------------------------------------------------------------------------------
/src/models/code_generator.py:
--------------------------------------------------------------------------------
```python
import os
import logging
import uuid
from typing import Dict, Any, List, Tuple, Optional
logger = logging.getLogger(__name__)
class CodeGenerator:
"""
Generates OpenSCAD code from natural language descriptions and parameters.
Implements translation of requirements to OpenSCAD primitives and modules.
"""
def __init__(self, scad_templates_dir: str, output_dir: str, ai_service=None):
"""
Initialize the code generator.
Args:
scad_templates_dir: Directory containing SCAD template files
output_dir: Directory to store generated SCAD files
ai_service: Optional AI service for enhanced code generation
"""
self.scad_templates_dir = scad_templates_dir
self.output_dir = output_dir
self.ai_service = ai_service
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Map of shape types to their corresponding module names
self.shape_module_map = {
'cube': 'parametric_cube',
'sphere': 'parametric_sphere',
'cylinder': 'parametric_cylinder',
'box': 'hollow_box',
'rounded_box': 'rounded_box',
'container': 'rounded_container',
'tube': 'tube',
'cone': 'cone',
'wedge': 'wedge',
'rounded_cylinder': 'rounded_cylinder',
'torus': 'torus',
'hexagonal_prism': 'hexagonal_prism',
'text': 'text_3d',
'prism': 'triangular_prism',
'custom': 'custom_shape'
}
# Parameter mapping from natural language to OpenSCAD parameters
self.parameter_map = {
'width': 'width',
'depth': 'depth',
'height': 'height',
'radius': 'radius',
'thickness': 'thickness',
'segments': 'segments',
'center': 'center',
'inner_radius': 'inner_radius',
'outer_radius': 'outer_radius',
'corner_radius': 'corner_radius',
'text': 'text',
'size': 'size',
'font': 'font',
'base_radius': 'base_radius',
'major_radius': 'major_radius',
'minor_radius': 'minor_radius',
'angle': 'angle',
'scale': 'scale',
'resolution': 'resolution'
}
def generate_code(self, model_type: str, parameters: Dict[str, Any], description: Optional[str] = None) -> str:
"""
Generate OpenSCAD code for a given model type and parameters.
Args:
model_type: Type of model to generate
parameters: Dictionary of parameters for the model
description: Optional natural language description for AI-driven generation
Returns:
Path to the generated SCAD file
"""
# Generate a unique ID for the model
model_id = str(uuid.uuid4())
scad_file = os.path.join(self.output_dir, f"{model_id}.scad")
# Check if we should use AI-driven generation for complex models
if model_type == 'custom' and description and self.ai_service:
scad_code = self._generate_ai_driven_code(description, parameters)
else:
# Get the module name for the model type
module_name = self.shape_module_map.get(model_type)
if not module_name:
raise ValueError(f"Unsupported model type: {model_type}")
# Map parameters to OpenSCAD parameter names
scad_params = self._map_parameters(parameters)
# Generate the OpenSCAD code
scad_code = self._generate_scad_code(module_name, scad_params)
# Write the code to a file
with open(scad_file, 'w') as f:
f.write(scad_code)
logger.info(f"Generated OpenSCAD code: {scad_file}")
return scad_file
def update_code(self, scad_file: str, parameters: Dict[str, Any]) -> str:
"""
Update an existing SCAD file with new parameters.
Args:
scad_file: Path to the SCAD file to update
parameters: New parameters to apply
Returns:
Path to the updated SCAD file
"""
if not os.path.exists(scad_file):
raise FileNotFoundError(f"SCAD file not found: {scad_file}")
# Read the existing SCAD file
with open(scad_file, 'r') as f:
scad_code = f.read()
# Determine the module name from the code
module_name = None
for shape_type, module in self.shape_module_map.items():
if module in scad_code:
module_name = module
break
if not module_name:
raise ValueError("Could not determine module name from existing SCAD file")
# Map parameters to OpenSCAD parameter names
scad_params = self._map_parameters(parameters)
# Generate the updated OpenSCAD code
updated_code = self._generate_scad_code(module_name, scad_params)
# Write the updated code to the file
with open(scad_file, 'w') as f:
f.write(updated_code)
logger.info(f"Updated OpenSCAD code: {scad_file}")
return scad_file
def combine_models(self, operations: List[Dict[str, Any]]) -> str:
"""
Combine multiple models using CSG operations.
Args:
operations: List of operations, each containing:
- model_type: Type of model
- parameters: Parameters for the model
- operation: CSG operation (union, difference, intersection)
- transform: Optional transformation to apply
Returns:
Path to the generated SCAD file
"""
# Generate a unique ID for the combined model
model_id = str(uuid.uuid4())
scad_file = os.path.join(self.output_dir, f"{model_id}.scad")
# Include the basic shapes library
scad_code = f"""// Combined model
include <{os.path.join(self.scad_templates_dir, "basic_shapes.scad")}>;
"""
# Process each operation
current_op = None
for i, op in enumerate(operations):
model_type = op.get('model_type')
parameters = op.get('parameters', {})
operation = op.get('operation')
transform = op.get('transform')
# Get the module name for the model type
if model_type is None:
raise ValueError("Model type cannot be None")
module_name = self.shape_module_map.get(str(model_type))
if not module_name:
raise ValueError(f"Unsupported model type: {model_type}")
# Map parameters to OpenSCAD parameter names
scad_params = self._map_parameters(parameters)
# Format parameters for the module call
params_str = ", ".join([f"{k}={v}" for k, v in scad_params.items()])
# Start or continue the CSG operation chain
if i == 0:
# First operation doesn't need an operator
if operation:
current_op = operation
scad_code += f"{operation}() {{\n"
# Add the module call with optional transformation
if transform:
scad_code += f" {transform} {module_name}({params_str});\n"
else:
scad_code += f" {module_name}({params_str});\n"
else:
# Check if we need to close the previous operation and start a new one
if operation and operation != current_op:
if current_op:
scad_code += "}\n\n"
current_op = operation
scad_code += f"{operation}() {{\n"
# Add the module call with optional transformation
if transform:
scad_code += f" {transform} {module_name}({params_str});\n"
else:
scad_code += f" {module_name}({params_str});\n"
# Close the final operation if needed
if current_op:
scad_code += "}\n"
# Write the code to a file
with open(scad_file, 'w') as f:
f.write(scad_code)
logger.info(f"Generated combined OpenSCAD code: {scad_file}")
return scad_file
def _map_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Map natural language parameters to OpenSCAD parameters."""
scad_params = {}
for param, value in parameters.items():
# Map the parameter name if it exists in the mapping
scad_param = self.parameter_map.get(param, param)
# Format the value appropriately for OpenSCAD
if isinstance(value, bool):
scad_params[scad_param] = str(value).lower()
elif isinstance(value, str):
if value.lower() == 'true' or value.lower() == 'false':
scad_params[scad_param] = value.lower()
else:
# For text parameters, add quotes
scad_params[scad_param] = f'"{value}"'
else:
scad_params[scad_param] = value
return scad_params
def _generate_scad_code(self, module_name: str, parameters: Dict[str, Any]) -> str:
"""Generate OpenSCAD code for a module with parameters."""
# Include the basic shapes library
scad_code = f"""// Generated OpenSCAD code
include <{os.path.join(self.scad_templates_dir, "basic_shapes.scad")}>;
// Parameters
"""
# Add parameter declarations
for param, value in parameters.items():
scad_code += f"{param} = {value};\n"
# Add the module call
scad_code += f"\n// Model\n{module_name}("
# Add parameters to the module call
param_list = [f"{param}={param}" for param in parameters.keys()]
scad_code += ", ".join(param_list)
scad_code += ");\n"
return scad_code
def _generate_ai_driven_code(self, description: str, parameters: Dict[str, Any]) -> str:
"""
Generate OpenSCAD code using AI-driven techniques based on natural language description.
Args:
description: Natural language description of the model
parameters: Dictionary of parameters for the model
Returns:
Generated OpenSCAD code
"""
if not self.ai_service:
logger.warning("AI service not available, falling back to basic shape generation")
# Fall back to a basic cube if AI service is not available
return self._generate_scad_code('parametric_cube', {'width': 10, 'height': 10, 'depth': 10})
try:
# Use the AI service to generate OpenSCAD code
logger.info(f"Generating OpenSCAD code from description: {description}")
# Prepare context for the AI service
context = {
"description": description,
"parameters": parameters,
"templates_dir": self.scad_templates_dir
}
# Call the AI service to generate code
scad_code = self.ai_service.generate_openscad_code(context)
# Ensure the code includes the basic shapes library
if "include <" not in scad_code:
scad_code = f"""// AI-generated OpenSCAD code
include <{os.path.join(self.scad_templates_dir, "basic_shapes.scad")}>;
{scad_code}
"""
return scad_code
except Exception as e:
logger.error(f"Error generating AI-driven code: {e}")
# Fall back to a basic shape if there's an error
return self._generate_scad_code('parametric_cube', {'width': 10, 'height': 10, 'depth': 10})
```
--------------------------------------------------------------------------------
/test_complete_workflow.py:
--------------------------------------------------------------------------------
```python
"""
Test script for the complete workflow from text to 3D model.
"""
import os
import sys
import logging
import unittest
from unittest.mock import patch, MagicMock
from pathlib import Path
# Add the src directory to the path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.ai.gemini_api import GeminiImageGenerator
from src.workflow.image_approval import ImageApprovalManager
from src.models.cuda_mvs import CUDAMultiViewStereo
from src.workflow.multi_view_to_model_pipeline import MultiViewToModelPipeline
from src.config import MULTI_VIEW_PIPELINE, IMAGE_APPROVAL, REMOTE_CUDA_MVS
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TestCompleteWorkflow(unittest.TestCase):
"""
Test cases for the complete workflow from text to 3D model.
"""
def setUp(self):
"""
Set up test environment.
"""
# Create test output directories
self.test_output_dir = "output/test_complete_workflow"
self.test_images_dir = os.path.join(self.test_output_dir, "images")
self.test_multi_view_dir = os.path.join(self.test_output_dir, "multi_view")
self.test_approved_dir = os.path.join(self.test_output_dir, "approved")
self.test_models_dir = os.path.join(self.test_output_dir, "models")
os.makedirs(self.test_output_dir, exist_ok=True)
os.makedirs(self.test_images_dir, exist_ok=True)
os.makedirs(self.test_multi_view_dir, exist_ok=True)
os.makedirs(self.test_approved_dir, exist_ok=True)
os.makedirs(self.test_models_dir, exist_ok=True)
# Mock API key
self.api_key = "test_api_key"
# Create the components
self.image_generator = GeminiImageGenerator(
api_key=self.api_key,
output_dir=self.test_images_dir
)
self.approval_manager = ImageApprovalManager(
output_dir=self.test_multi_view_dir,
approved_dir=self.test_approved_dir,
min_approved_images=3,
auto_approve=False
)
self.cuda_mvs = CUDAMultiViewStereo(
output_dir=self.test_models_dir,
use_gpu=False
)
# Create the pipeline
self.pipeline = MultiViewToModelPipeline(
image_generator=self.image_generator,
approval_manager=self.approval_manager,
model_generator=self.cuda_mvs,
output_dir=self.test_output_dir,
config=MULTI_VIEW_PIPELINE
)
@patch('src.ai.gemini_api.requests.post')
def test_generate_images_from_text(self, mock_post):
"""
Test generating images from text description.
"""
# Mock response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"candidates": [
{
"content": {
"parts": [
{
"text": "Generated image description"
},
{
"inlineData": {
"mimeType": "image/png",
"data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg=="
}
}
]
}
}
]
}
mock_post.return_value = mock_response
# Test parameters
prompt = "A low-poly rabbit"
num_views = 4
# Call the method
results = self.pipeline.generate_images_from_text(prompt, num_views)
# Verify the results
self.assertEqual(len(results), num_views)
for i, result in enumerate(results):
self.assertTrue("view_direction" in result)
self.assertEqual(result["view_index"], i + 1)
self.assertTrue("local_path" in result)
self.assertTrue(os.path.exists(result["local_path"]))
# Verify the API calls
self.assertEqual(mock_post.call_count, num_views)
def test_approve_images(self):
"""
Test approving images in the workflow.
"""
# Create test images
test_images = []
for i in range(4):
image_path = os.path.join(self.test_multi_view_dir, f"test_image_{i}.png")
with open(image_path, "w") as f:
f.write(f"test image data {i}")
test_images.append({
"id": f"image_{i}",
"local_path": image_path,
"view_index": i + 1,
"view_direction": f"view_{i}"
})
# Add images to the pipeline
self.pipeline.add_images_for_approval(test_images)
# Approve some images
self.pipeline.approve_image("image_0")
self.pipeline.approve_image("image_1")
self.pipeline.approve_image("image_2")
# Reject an image
self.pipeline.reject_image("image_3")
# Get the status
status = self.pipeline.get_approval_status()
# Verify the status
self.assertEqual(status["total_images"], 4)
self.assertEqual(status["pending_count"], 0)
self.assertEqual(status["approved_count"], 3)
self.assertEqual(status["rejected_count"], 1)
self.assertTrue(status["has_minimum_approved"])
@patch('src.models.cuda_mvs.subprocess.run')
def test_create_model_from_approved_images(self, mock_run):
"""
Test creating a 3D model from approved images.
"""
# Mock subprocess.run
mock_process = MagicMock()
mock_process.returncode = 0
mock_run.return_value = mock_process
# Create test images
test_images = []
for i in range(4):
image_path = os.path.join(self.test_approved_dir, f"image_{i}.png")
with open(image_path, "w") as f:
f.write(f"test image data {i}")
test_images.append({
"id": f"image_{i}",
"local_path": image_path,
"view_index": i + 1,
"view_direction": f"view_{i}"
})
# Create a mock model file
model_path = os.path.join(self.test_models_dir, "test_model.obj")
with open(model_path, "w") as f:
f.write("test model data")
# Call the method
result = self.pipeline.create_model_from_approved_images("test_model")
# Verify the result
self.assertIsNotNone(result)
self.assertTrue("model_path" in result)
self.assertTrue("model_id" in result)
self.assertTrue("format" in result)
self.assertEqual(result["format"], "obj")
@patch('src.ai.gemini_api.requests.post')
@patch('src.models.cuda_mvs.subprocess.run')
def test_complete_workflow(self, mock_run, mock_post):
"""
Test the complete workflow from text to 3D model.
"""
# Mock Gemini API response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"candidates": [
{
"content": {
"parts": [
{
"text": "Generated image description"
},
{
"inlineData": {
"mimeType": "image/png",
"data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg=="
}
}
]
}
}
]
}
mock_post.return_value = mock_response
# Mock subprocess.run
mock_process = MagicMock()
mock_process.returncode = 0
mock_run.return_value = mock_process
# Create a mock model file
model_path = os.path.join(self.test_models_dir, "test_model.obj")
with open(model_path, "w") as f:
f.write("test model data")
# Create a pipeline with auto-approve
auto_pipeline = MultiViewToModelPipeline(
image_generator=self.image_generator,
approval_manager=ImageApprovalManager(
output_dir=self.test_multi_view_dir,
approved_dir=self.test_approved_dir,
min_approved_images=3,
auto_approve=True
),
model_generator=self.cuda_mvs,
output_dir=self.test_output_dir,
config=MULTI_VIEW_PIPELINE
)
# Test parameters
prompt = "A low-poly rabbit"
num_views = 4
# Call the complete workflow
result = auto_pipeline.complete_workflow(prompt, num_views, "test_model")
# Verify the result
self.assertIsNotNone(result)
self.assertTrue("model_path" in result)
self.assertTrue("model_id" in result)
self.assertTrue("format" in result)
self.assertEqual(result["format"], "obj")
self.assertTrue("prompt" in result)
self.assertEqual(result["prompt"], prompt)
self.assertTrue("num_views" in result)
self.assertEqual(result["num_views"], num_views)
self.assertTrue("approved_images" in result)
self.assertEqual(len(result["approved_images"]), num_views)
@patch('src.remote.cuda_mvs_client.requests.post')
@patch('src.remote.cuda_mvs_client.requests.get')
def test_remote_workflow(self, mock_get, mock_post):
"""
Test the workflow with remote CUDA MVS processing.
"""
# Mock upload response
mock_upload_response = MagicMock()
mock_upload_response.status_code = 200
mock_upload_response.json.return_value = {
"job_id": "test_job_123",
"status": "uploaded",
"message": "Images uploaded successfully"
}
# Mock process response
mock_process_response = MagicMock()
mock_process_response.status_code = 200
mock_process_response.json.return_value = {
"job_id": "test_job_123",
"status": "processing",
"message": "Job started processing"
}
# Mock status response
mock_status_response = MagicMock()
mock_status_response.status_code = 200
mock_status_response.json.return_value = {
"job_id": "test_job_123",
"status": "completed",
"progress": 100,
"message": "Job completed successfully"
}
# Mock download response
mock_download_response = MagicMock()
mock_download_response.status_code = 200
mock_download_response.content = b"test model data"
# Set up the mock responses
mock_post.side_effect = [mock_upload_response, mock_process_response]
mock_get.side_effect = [mock_status_response, mock_download_response]
# Create test images
test_images = []
for i in range(4):
image_path = os.path.join(self.test_approved_dir, f"image_{i}.png")
with open(image_path, "w") as f:
f.write(f"test image data {i}")
test_images.append({
"id": f"image_{i}",
"local_path": image_path,
"view_index": i + 1,
"view_direction": f"view_{i}"
})
# Create a remote CUDA MVS client
from src.remote.cuda_mvs_client import CUDAMVSClient
remote_client = CUDAMVSClient(
api_key=self.api_key,
output_dir=self.test_models_dir
)
# Create a pipeline with the remote client
remote_pipeline = MultiViewToModelPipeline(
image_generator=self.image_generator,
approval_manager=self.approval_manager,
model_generator=remote_client,
output_dir=self.test_output_dir,
config=MULTI_VIEW_PIPELINE
)
# Add the approved images
remote_pipeline.add_images_for_approval(test_images)
for image in test_images:
remote_pipeline.approve_image(image["id"])
# Call the method to create a model using the remote client
with patch('src.workflow.multi_view_to_model_pipeline.CUDAMVSClient', return_value=remote_client):
result = remote_pipeline.create_model_from_approved_images("test_model", server_url="http://test-server:8765")
# Verify the result
self.assertIsNotNone(result)
self.assertTrue("model_path" in result)
self.assertTrue("model_id" in result)
self.assertTrue("format" in result)
self.assertEqual(result["format"], "obj")
self.assertTrue("job_id" in result)
self.assertEqual(result["job_id"], "test_job_123")
def tearDown(self):
"""
Clean up after tests.
"""
# Clean up test output directory
import shutil
if os.path.exists(self.test_output_dir):
shutil.rmtree(self.test_output_dir)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/src/workflow/multi_view_to_model_pipeline.py:
--------------------------------------------------------------------------------
```python
"""
Workflow orchestration for the multi-view to model pipeline.
"""
import os
import logging
import uuid
from typing import Dict, Any, List, Optional
from pathlib import Path
logger = logging.getLogger(__name__)
class MultiViewToModelPipeline:
"""
Orchestrates the workflow from image or text prompt to 3D model:
1. Generate image with Venice.ai or Google Gemini (optional)
2. Generate multiple views with Google Gemini
3. Process user approval of views
4. Create 3D model with CUDA Multi-View Stereo
5. Convert to OpenSCAD for parametric editing (optional)
"""
def __init__(self,
gemini_generator=None,
venice_generator=None,
cuda_mvs=None,
openscad_wrapper=None,
approval_tool=None,
output_dir: str = "output/pipeline"):
"""
Initialize the pipeline.
Args:
gemini_generator: Instance of GeminiImageGenerator
venice_generator: Instance of VeniceImageGenerator (optional)
cuda_mvs: Instance of CUDAMultiViewStereo
openscad_wrapper: Instance of OpenSCADWrapper (optional)
approval_tool: Instance of ImageApprovalTool
output_dir: Directory to store output files
"""
self.gemini_generator = gemini_generator
self.venice_generator = venice_generator
self.cuda_mvs = cuda_mvs
self.openscad_wrapper = openscad_wrapper
self.approval_tool = approval_tool
self.output_dir = output_dir
# Create output directories
os.makedirs(os.path.join(output_dir, "images"), exist_ok=True)
os.makedirs(os.path.join(output_dir, "multi_view"), exist_ok=True)
os.makedirs(os.path.join(output_dir, "approved"), exist_ok=True)
os.makedirs(os.path.join(output_dir, "models"), exist_ok=True)
os.makedirs(os.path.join(output_dir, "scad"), exist_ok=True)
def generate_model_from_text(self, prompt: str,
use_venice: bool = False,
num_views: int = 4,
gemini_params: Optional[Dict[str, Any]] = None,
venice_params: Optional[Dict[str, Any]] = None,
cuda_mvs_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Generate a 3D model from a text prompt.
Args:
prompt: Text description for image generation
use_venice: Whether to use Venice.ai for initial image
num_views: Number of views to generate
gemini_params: Optional parameters for Google Gemini
venice_params: Optional parameters for Venice.ai
cuda_mvs_params: Optional parameters for CUDA MVS
Returns:
Dictionary containing paths to generated files and metadata
"""
try:
# Generate a unique ID for this pipeline run
pipeline_id = str(uuid.uuid4())
logger.info(f"Starting pipeline {pipeline_id} for prompt: {prompt}")
# Step 1: Generate initial image
if use_venice and self.venice_generator:
# Use Venice.ai for initial image
logger.info("Using Venice.ai for initial image generation")
image_path = os.path.join(self.output_dir, "images", f"{pipeline_id}_venice.png")
initial_result = self.venice_generator.generate_image(
prompt=prompt,
output_path=image_path,
**(venice_params or {})
)
else:
# Use Google Gemini for initial image
logger.info("Using Google Gemini for initial image generation")
image_path = os.path.join(self.output_dir, "images", f"{pipeline_id}_gemini.png")
initial_result = self.gemini_generator.generate_image(
prompt=prompt,
output_path=image_path,
**(gemini_params or {})
)
# Step 2: Generate multiple views
logger.info(f"Generating {num_views} views with Google Gemini")
multi_view_dir = os.path.join(self.output_dir, "multi_view", pipeline_id)
multi_views = self.gemini_generator.generate_multiple_views(
prompt=prompt,
num_views=num_views,
base_image_path=image_path,
output_dir=multi_view_dir
)
# Step 3: Present images for approval
# In a real implementation, this would be handled by the MCP client
# through the MCP tools interface
logger.info("Preparing images for approval")
approval_requests = []
for view in multi_views:
approval_request = self.approval_tool.present_image_for_approval(
image_path=view["local_path"],
metadata={
"prompt": view.get("prompt"),
"view_direction": view.get("view_direction"),
"view_index": view.get("view_index")
}
)
approval_requests.append(approval_request)
# For the purpose of this implementation, we'll assume all views are approved
# In a real implementation, this would be handled by the MCP client
approved_images = []
for req in approval_requests:
approval_result = self.approval_tool.process_approval(
approval_id=req["approval_id"],
approved=True,
image_path=req["image_path"]
)
if approval_result["approved"]:
approved_images.append(approval_result["approved_path"])
# Step 4: Generate 3D model with CUDA MVS
logger.info("Generating 3D model with CUDA MVS")
model_result = self.cuda_mvs.generate_model_from_images(
image_paths=approved_images,
output_name=pipeline_id,
**(cuda_mvs_params or {})
)
# Step 5: Convert to OpenSCAD (if wrapper is available)
scad_result = None
if self.openscad_wrapper and model_result.get("point_cloud_file"):
logger.info("Converting to OpenSCAD")
scad_result = self._convert_to_openscad(model_result["point_cloud_file"], pipeline_id)
# Compile results
result = {
"pipeline_id": pipeline_id,
"prompt": prompt,
"initial_image": initial_result,
"multi_views": multi_views,
"approved_images": approved_images,
"model_3d": model_result,
}
if scad_result:
result["openscad"] = scad_result
logger.info(f"Pipeline {pipeline_id} completed successfully")
return result
except Exception as e:
logger.error(f"Error in pipeline: {str(e)}")
raise
def generate_model_from_image(self, image_path: str,
prompt: Optional[str] = None,
num_views: int = 4,
gemini_params: Optional[Dict[str, Any]] = None,
cuda_mvs_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Generate a 3D model from an existing image.
Args:
image_path: Path to input image
prompt: Optional text description to guide multi-view generation
num_views: Number of views to generate
gemini_params: Optional parameters for Google Gemini
cuda_mvs_params: Optional parameters for CUDA MVS
Returns:
Dictionary containing paths to generated files and metadata
"""
try:
# Generate a unique ID for this pipeline run
pipeline_id = str(uuid.uuid4())
logger.info(f"Starting pipeline {pipeline_id} from image: {image_path}")
# Use provided prompt or generate one from the image
if not prompt:
# In a real implementation, you might use an image captioning model
# to generate a description of the image
prompt = f"3D object in the image {os.path.basename(image_path)}"
# Step 1: Generate multiple views
logger.info(f"Generating {num_views} views with Google Gemini")
multi_view_dir = os.path.join(self.output_dir, "multi_view", pipeline_id)
multi_views = self.gemini_generator.generate_multiple_views(
prompt=prompt,
num_views=num_views,
base_image_path=image_path,
output_dir=multi_view_dir
)
# Step 2: Present images for approval
logger.info("Preparing images for approval")
approval_requests = []
for view in multi_views:
approval_request = self.approval_tool.present_image_for_approval(
image_path=view["local_path"],
metadata={
"prompt": view.get("prompt"),
"view_direction": view.get("view_direction"),
"view_index": view.get("view_index")
}
)
approval_requests.append(approval_request)
# For the purpose of this implementation, we'll assume all views are approved
approved_images = []
for req in approval_requests:
approval_result = self.approval_tool.process_approval(
approval_id=req["approval_id"],
approved=True,
image_path=req["image_path"]
)
if approval_result["approved"]:
approved_images.append(approval_result["approved_path"])
# Step 3: Generate 3D model with CUDA MVS
logger.info("Generating 3D model with CUDA MVS")
model_result = self.cuda_mvs.generate_model_from_images(
image_paths=approved_images,
output_name=pipeline_id,
**(cuda_mvs_params or {})
)
# Step 4: Convert to OpenSCAD (if wrapper is available)
scad_result = None
if self.openscad_wrapper and model_result.get("point_cloud_file"):
logger.info("Converting to OpenSCAD")
scad_result = self._convert_to_openscad(model_result["point_cloud_file"], pipeline_id)
# Compile results
result = {
"pipeline_id": pipeline_id,
"prompt": prompt,
"input_image": image_path,
"multi_views": multi_views,
"approved_images": approved_images,
"model_3d": model_result,
}
if scad_result:
result["openscad"] = scad_result
logger.info(f"Pipeline {pipeline_id} completed successfully")
return result
except Exception as e:
logger.error(f"Error in pipeline: {str(e)}")
raise
def _convert_to_openscad(self, model_path: str, model_id: str) -> Dict[str, Any]:
"""
Convert 3D model to OpenSCAD format.
Args:
model_path: Path to input model (PLY file)
model_id: Unique identifier for the model
Returns:
Dictionary containing paths to generated files
"""
logger.info(f"Converting model to OpenSCAD: {model_path}")
# Convert PLY to OBJ if needed
if model_path.endswith('.ply'):
obj_path = self.cuda_mvs.convert_ply_to_obj(model_path)
model_path = obj_path
# Generate OpenSCAD code for importing the model
scad_code = f"""// Generated OpenSCAD code for model {model_id}
// Imported from {os.path.basename(model_path)}
// Parameters
scale_factor = 1.0;
position_x = 0;
position_y = 0;
position_z = 0;
rotation_x = 0;
rotation_y = 0;
rotation_z = 0;
// Import and transform the model
translate([position_x, position_y, position_z])
rotate([rotation_x, rotation_y, rotation_z])
scale(scale_factor)
import("{model_path}");
"""
# Save SCAD code to file
scad_file = self.openscad_wrapper.generate_scad(scad_code, model_id)
# Generate previews
previews = self.openscad_wrapper.generate_multi_angle_previews(scad_file)
return {
"scad_file": scad_file,
"previews": previews,
"model_path": model_path
}
def process_approval_results(self, approval_results: List[Dict[str, Any]]) -> List[str]:
"""
Process approval results from the MCP client.
Args:
approval_results: List of approval results from the client
Returns:
List of paths to approved images
"""
approved_images = []
for result in approval_results:
if result.get("approved", False) and "approved_path" in result:
approved_images.append(result["approved_path"])
return approved_images
```
--------------------------------------------------------------------------------
/src/remote/error_handling.py:
--------------------------------------------------------------------------------
```python
"""
Error handling and retry mechanisms for remote CUDA MVS processing.
This module provides utilities for handling network errors, implementing
retry mechanisms, and ensuring robust communication with remote CUDA MVS servers.
"""
import time
import random
import logging
import functools
from typing import Callable, Any, Type, Union, List, Dict, Optional, Tuple
import requests
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define common exception types for network operations
NETWORK_EXCEPTIONS = (
requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.HTTPError,
requests.exceptions.RequestException,
ConnectionRefusedError,
TimeoutError,
)
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exception_types: Tuple[Type[Exception], ...] = NETWORK_EXCEPTIONS,
jitter_factor: float = 0.1,
logger_instance: Optional[logging.Logger] = None
) -> Callable:
"""
Decorator for retrying a function with exponential backoff.
Args:
max_retries: Maximum number of retries
base_delay: Base delay in seconds
max_delay: Maximum delay in seconds
exception_types: Tuple of exception types to catch and retry
jitter_factor: Factor for random jitter (0.0 to 1.0)
logger_instance: Logger instance to use (uses module logger if None)
Returns:
Decorator function
"""
log = logger_instance or logger
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
retries = 0
last_exception = None
while True:
try:
return func(*args, **kwargs)
except exception_types as e:
retries += 1
last_exception = e
if retries > max_retries:
log.error(f"Max retries ({max_retries}) exceeded: {str(e)}")
raise
# Calculate delay with exponential backoff
delay = min(max_delay, base_delay * (2 ** (retries - 1)))
# Add jitter to avoid thundering herd problem
jitter = random.uniform(0, jitter_factor * delay)
sleep_time = delay + jitter
log.warning(f"Retry {retries}/{max_retries} after {sleep_time:.2f}s: {str(e)}")
time.sleep(sleep_time)
except Exception as e:
# Don't retry other exceptions
log.error(f"Non-retryable exception: {str(e)}")
raise
return wrapper
return decorator
def timeout_handler(
timeout: float,
default_value: Any = None,
exception_types: Tuple[Type[Exception], ...] = (TimeoutError, requests.exceptions.Timeout),
logger_instance: Optional[logging.Logger] = None
) -> Callable:
"""
Decorator for handling timeouts in function calls.
Args:
timeout: Timeout in seconds
default_value: Value to return if timeout occurs
exception_types: Tuple of exception types to catch as timeouts
logger_instance: Logger instance to use (uses module logger if None)
Returns:
Decorator function
"""
log = logger_instance or logger
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
import signal
def timeout_handler(signum, frame):
raise TimeoutError(f"Function {func.__name__} timed out after {timeout} seconds")
# Set timeout using signal
original_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(int(timeout))
try:
return func(*args, **kwargs)
except exception_types as e:
log.warning(f"Timeout in {func.__name__}: {str(e)}")
return default_value
finally:
# Reset signal handler and alarm
signal.signal(signal.SIGALRM, original_handler)
signal.alarm(0)
return wrapper
return decorator
class NetworkErrorTracker:
"""
Tracks network errors and provides information about error patterns.
This class helps identify persistent network issues and can be used
to make decisions about server availability.
"""
def __init__(
self,
error_window: int = 10,
error_threshold: float = 0.5,
reset_after: int = 100
):
"""
Initialize the error tracker.
Args:
error_window: Number of recent requests to consider
error_threshold: Error rate threshold to consider a server problematic
reset_after: Number of successful requests after which to reset error count
"""
self.error_window = error_window
self.error_threshold = error_threshold
self.reset_after = reset_after
self.requests = [] # List of (timestamp, success) tuples
self.consecutive_successes = 0
self.consecutive_failures = 0
def record_request(self, success: bool) -> None:
"""
Record the result of a request.
Args:
success: Whether the request was successful
"""
timestamp = time.time()
self.requests.append((timestamp, success))
# Trim old requests outside the window
self._trim_old_requests()
# Update consecutive counters
if success:
self.consecutive_successes += 1
self.consecutive_failures = 0
# Reset error count after enough consecutive successes
if self.consecutive_successes >= self.reset_after:
self.requests = [(timestamp, True)]
self.consecutive_successes = 1
else:
self.consecutive_failures += 1
self.consecutive_successes = 0
def _trim_old_requests(self) -> None:
"""
Remove requests that are outside the current window.
"""
if len(self.requests) > self.error_window:
self.requests = self.requests[-self.error_window:]
def get_error_rate(self) -> float:
"""
Get the current error rate.
Returns:
Error rate as a float between 0.0 and 1.0
"""
if not self.requests:
return 0.0
failures = sum(1 for _, success in self.requests if not success)
return failures / len(self.requests)
def is_server_problematic(self) -> bool:
"""
Check if the server is experiencing persistent issues.
Returns:
True if the server is problematic, False otherwise
"""
return self.get_error_rate() >= self.error_threshold
def get_status(self) -> Dict[str, Any]:
"""
Get the current status of the error tracker.
Returns:
Dictionary with status information
"""
return {
"error_rate": self.get_error_rate(),
"is_problematic": self.is_server_problematic(),
"consecutive_successes": self.consecutive_successes,
"consecutive_failures": self.consecutive_failures,
"total_requests": len(self.requests),
"recent_failures": sum(1 for _, success in self.requests if not success)
}
class CircuitBreaker:
"""
Circuit breaker pattern implementation for network requests.
This class helps prevent cascading failures by stopping requests
to a problematic server until it recovers.
"""
# Circuit states
CLOSED = "closed" # Normal operation
OPEN = "open" # No requests allowed
HALF_OPEN = "half_open" # Testing if service is back
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
reset_timeout: float = 60.0,
logger_instance: Optional[logging.Logger] = None
):
"""
Initialize the circuit breaker.
Args:
failure_threshold: Number of consecutive failures before opening
recovery_timeout: Time in seconds before testing recovery
reset_timeout: Time in seconds before fully resetting
logger_instance: Logger instance to use (uses module logger if None)
"""
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.reset_timeout = reset_timeout
self.log = logger_instance or logger
self.state = self.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.last_success_time = 0
def record_success(self) -> None:
"""
Record a successful request.
"""
self.last_success_time = time.time()
if self.state == self.HALF_OPEN:
self.log.info("Circuit breaker reset to closed state after successful test request")
self.state = self.CLOSED
self.failure_count = 0
elif self.state == self.CLOSED:
# Reset failure count after a successful request
self.failure_count = 0
def record_failure(self) -> None:
"""
Record a failed request.
"""
self.last_failure_time = time.time()
if self.state == self.CLOSED:
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.log.warning(f"Circuit breaker opened after {self.failure_count} consecutive failures")
self.state = self.OPEN
elif self.state == self.HALF_OPEN:
self.log.warning("Circuit breaker opened again after failed test request")
self.state = self.OPEN
def allow_request(self) -> bool:
"""
Check if a request should be allowed.
Returns:
True if the request should be allowed, False otherwise
"""
if self.state == self.CLOSED:
return True
if self.state == self.OPEN:
# Check if recovery timeout has elapsed
if time.time() - self.last_failure_time > self.recovery_timeout:
self.log.info("Circuit breaker entering half-open state to test service")
self.state = self.HALF_OPEN
return True
return False
# In HALF_OPEN state, allow only one request
return True
def reset(self) -> None:
"""
Reset the circuit breaker to closed state.
"""
self.state = self.CLOSED
self.failure_count = 0
self.log.info("Circuit breaker manually reset to closed state")
def get_status(self) -> Dict[str, Any]:
"""
Get the current status of the circuit breaker.
Returns:
Dictionary with status information
"""
now = time.time()
return {
"state": self.state,
"failure_count": self.failure_count,
"time_since_last_failure": now - self.last_failure_time if self.last_failure_time > 0 else None,
"time_since_last_success": now - self.last_success_time if self.last_success_time > 0 else None,
"recovery_timeout": self.recovery_timeout,
"reset_timeout": self.reset_timeout
}
def safe_request(
url: str,
method: str = "GET",
circuit_breaker: Optional[CircuitBreaker] = None,
error_tracker: Optional[NetworkErrorTracker] = None,
retry_count: int = 3,
timeout: float = 30.0,
**kwargs
) -> Optional[requests.Response]:
"""
Make a safe HTTP request with circuit breaker and retry logic.
Args:
url: URL to request
method: HTTP method (GET, POST, etc.)
circuit_breaker: Circuit breaker instance
error_tracker: Error tracker instance
retry_count: Number of retries
timeout: Request timeout in seconds
**kwargs: Additional arguments for requests
Returns:
Response object or None if request failed
"""
# Check circuit breaker
if circuit_breaker and not circuit_breaker.allow_request():
logger.warning(f"Circuit breaker prevented request to {url}")
return None
# Set default timeout
kwargs.setdefault("timeout", timeout)
# Make request with retry
response = None
success = False
try:
for attempt in range(retry_count + 1):
try:
response = requests.request(method, url, **kwargs)
response.raise_for_status()
success = True
break
except NETWORK_EXCEPTIONS as e:
if attempt < retry_count:
delay = 2 ** attempt + random.uniform(0, 1)
logger.warning(f"Request to {url} failed (attempt {attempt+1}/{retry_count+1}): {str(e)}. Retrying in {delay:.2f}s")
time.sleep(delay)
else:
logger.error(f"Request to {url} failed after {retry_count+1} attempts: {str(e)}")
raise
except Exception as e:
logger.error(f"Error making request to {url}: {str(e)}")
success = False
# Update circuit breaker and error tracker
if circuit_breaker:
if success:
circuit_breaker.record_success()
else:
circuit_breaker.record_failure()
if error_tracker:
error_tracker.record_request(success)
return response if success else None
```
--------------------------------------------------------------------------------
/src/openscad_wrapper/wrapper.py:
--------------------------------------------------------------------------------
```python
import os
import subprocess
import uuid
import logging
from typing import Dict, Any, List, Tuple, Optional
logger = logging.getLogger(__name__)
class OpenSCADWrapper:
"""
Wrapper for OpenSCAD command-line interface.
Provides methods to generate SCAD code, STL files, and preview images.
"""
def __init__(self, scad_dir: str, output_dir: str):
"""
Initialize the OpenSCAD wrapper.
Args:
scad_dir: Directory to store SCAD files
output_dir: Directory to store output files (STL, PNG)
"""
self.scad_dir = scad_dir
self.output_dir = output_dir
self.stl_dir = os.path.join(output_dir, "stl")
self.preview_dir = os.path.join(output_dir, "preview")
# Create directories if they don't exist
os.makedirs(self.scad_dir, exist_ok=True)
os.makedirs(self.stl_dir, exist_ok=True)
os.makedirs(self.preview_dir, exist_ok=True)
# Basic shape templates
self.shape_templates = {
"cube": self._cube_template,
"sphere": self._sphere_template,
"cylinder": self._cylinder_template,
"box": self._box_template,
"rounded_box": self._rounded_box_template,
}
def generate_scad_code(self, model_type: str, parameters: Dict[str, Any]) -> str:
"""
Generate OpenSCAD code for a given model type and parameters.
Args:
model_type: Type of model to generate (cube, sphere, cylinder, etc.)
parameters: Dictionary of parameters for the model
Returns:
Path to the generated SCAD file
"""
model_id = str(uuid.uuid4())
scad_file = os.path.join(self.scad_dir, f"{model_id}.scad")
# Get the template function for the model type
template_func = self.shape_templates.get(model_type)
if not template_func:
raise ValueError(f"Unsupported model type: {model_type}")
# Generate SCAD code using the template
scad_code = template_func(parameters)
# Write SCAD code to file
with open(scad_file, 'w') as f:
f.write(scad_code)
logger.info(f"Generated SCAD file: {scad_file}")
return scad_file
def generate_scad(self, scad_code: str, model_id: str) -> str:
"""
Save OpenSCAD code to a file with a specific model ID.
Args:
scad_code: OpenSCAD code to save
model_id: ID to use for the file name
Returns:
Path to the saved SCAD file
"""
scad_file = os.path.join(self.scad_dir, f"{model_id}.scad")
# Write SCAD code to file
with open(scad_file, 'w') as f:
f.write(scad_code)
logger.info(f"Generated SCAD file: {scad_file}")
return scad_file
def update_scad_code(self, model_id: str, parameters: Dict[str, Any]) -> str:
"""
Update an existing SCAD file with new parameters.
Args:
model_id: ID of the model to update
parameters: New parameters for the model
Returns:
Path to the updated SCAD file
"""
scad_file = os.path.join(self.scad_dir, f"{model_id}.scad")
if not os.path.exists(scad_file):
raise FileNotFoundError(f"SCAD file not found: {scad_file}")
# Read the existing SCAD file to determine its type
with open(scad_file, 'r') as f:
scad_code = f.read()
# Determine model type from the code (simplified approach)
model_type = None
for shape_type in self.shape_templates:
if shape_type in scad_code.lower():
model_type = shape_type
break
if not model_type:
raise ValueError("Could not determine model type from existing SCAD file")
# Generate new SCAD code
new_scad_code = self.shape_templates[model_type](parameters)
# Write updated SCAD code to file
with open(scad_file, 'w') as f:
f.write(new_scad_code)
logger.info(f"Updated SCAD file: {scad_file}")
return scad_file
def generate_stl(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> str:
"""
Generate an STL file from a SCAD file.
Args:
scad_file: Path to the SCAD file
parameters: Optional parameters to override in the SCAD file
Returns:
Path to the generated STL file
"""
model_id = os.path.basename(scad_file).split('.')[0]
stl_file = os.path.join(self.stl_dir, f"{model_id}.stl")
# Build command
cmd = ["openscad", "-o", stl_file]
# Add parameters if provided
if parameters:
for key, value in parameters.items():
cmd.extend(["-D", f"{key}={value}"])
# Add input file
cmd.append(scad_file)
# Run OpenSCAD
try:
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
logger.info(f"Generated STL file: {stl_file}")
logger.debug(result.stdout)
return stl_file
except subprocess.CalledProcessError as e:
logger.error(f"Error generating STL file: {e.stderr}")
raise RuntimeError(f"Failed to generate STL file: {e.stderr}")
def generate_preview(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None,
camera_position: str = "0,0,0,0,0,0,50",
image_size: str = "800,600") -> str:
"""
Generate a preview image from a SCAD file.
Args:
scad_file: Path to the SCAD file
parameters: Optional parameters to override in the SCAD file
camera_position: Camera position in format "tx,ty,tz,rx,ry,rz,dist"
image_size: Image size in format "width,height"
Returns:
Path to the generated preview image
"""
model_id = os.path.basename(scad_file).split('.')[0]
preview_file = os.path.join(self.preview_dir, f"{model_id}.png")
# Build command
cmd = ["openscad", "--camera", camera_position, "--imgsize", image_size, "-o", preview_file]
# Add parameters if provided
if parameters:
for key, value in parameters.items():
cmd.extend(["-D", f"{key}={value}"])
# Add input file
cmd.append(scad_file)
# Run OpenSCAD
try:
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
logger.info(f"Generated preview image: {preview_file}")
logger.debug(result.stdout)
return preview_file
except subprocess.CalledProcessError as e:
logger.error(f"Error generating preview image: {e.stderr}")
# Since we know there might be issues with headless rendering, we'll create a placeholder
logger.warning("Using placeholder image due to rendering error")
return self._create_placeholder_image(preview_file)
def _create_placeholder_image(self, output_path: str) -> str:
"""Create a simple placeholder image when rendering fails."""
try:
from PIL import Image, ImageDraw, ImageFont
# Create a blank image
img = Image.new('RGB', (800, 600), color=(240, 240, 240))
draw = ImageDraw.Draw(img)
# Add text
draw.text((400, 300), "Preview not available", fill=(0, 0, 0))
# Save the image
img.save(output_path)
return output_path
except Exception as e:
logger.error(f"Error creating placeholder image: {str(e)}")
# If all else fails, return the path anyway
return output_path
def generate_multi_angle_previews(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
"""
Generate preview images from multiple angles for a SCAD file.
Args:
scad_file: Path to the SCAD file
parameters: Optional parameters to override in the SCAD file
Returns:
Dictionary mapping view names to preview image paths
"""
# Define camera positions for different views
camera_positions = {
"front": "0,0,0,0,0,0,50",
"top": "0,0,0,90,0,0,50",
"right": "0,0,0,0,90,0,50",
"perspective": "40,30,30,55,0,25,100"
}
# Generate preview for each view
previews = {}
for view, camera_position in camera_positions.items():
try:
model_id = os.path.basename(scad_file).split('.')[0]
preview_file = os.path.join(self.preview_dir, f"{model_id}_{view}.png")
# Build command
cmd = ["openscad", "--camera", camera_position, "--imgsize", "800,600", "-o", preview_file]
# Add parameters if provided
if parameters:
for key, value in parameters.items():
cmd.extend(["-D", f"{key}={value}"])
# Add input file
cmd.append(scad_file)
# Run OpenSCAD
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
logger.info(f"Generated {view} preview: {preview_file}")
previews[view] = preview_file
except subprocess.CalledProcessError as e:
logger.error(f"Error generating {view} preview: {e.stderr}")
# Create a placeholder image for this view
preview_file = os.path.join(self.preview_dir, f"{model_id}_{view}.png")
previews[view] = self._create_placeholder_image(preview_file)
return previews
# Template functions for basic shapes
def _cube_template(self, params: Dict[str, Any]) -> str:
"""Generate SCAD code for a cube."""
size_x = params.get('width', 10)
size_y = params.get('depth', 10)
size_z = params.get('height', 10)
center = params.get('center', 'false').lower() == 'true'
return f"""// Cube
// Parameters:
// width = {size_x}
// depth = {size_y}
// height = {size_z}
// center = {str(center).lower()}
width = {size_x};
depth = {size_y};
height = {size_z};
center = {str(center).lower()};
cube([width, depth, height], center=center);
"""
def _sphere_template(self, params: Dict[str, Any]) -> str:
"""Generate SCAD code for a sphere."""
radius = params.get('radius', 10)
segments = params.get('segments', 32)
return f"""// Sphere
// Parameters:
// radius = {radius}
// segments = {segments}
radius = {radius};
$fn = {segments};
sphere(r=radius);
"""
def _cylinder_template(self, params: Dict[str, Any]) -> str:
"""Generate SCAD code for a cylinder."""
radius = params.get('radius', 10)
height = params.get('height', 20)
center = params.get('center', 'false').lower() == 'true'
segments = params.get('segments', 32)
return f"""// Cylinder
// Parameters:
// radius = {radius}
// height = {height}
// center = {str(center).lower()}
// segments = {segments}
radius = {radius};
height = {height};
center = {str(center).lower()};
$fn = {segments};
cylinder(h=height, r=radius, center=center);
"""
def _box_template(self, params: Dict[str, Any]) -> str:
"""Generate SCAD code for a hollow box."""
width = params.get('width', 30)
depth = params.get('depth', 20)
height = params.get('height', 15)
thickness = params.get('thickness', 2)
return f"""// Hollow Box
// Parameters:
// width = {width}
// depth = {depth}
// height = {height}
// thickness = {thickness}
width = {width};
depth = {depth};
height = {height};
thickness = {thickness};
module box(width, depth, height, thickness) {{
difference() {{
cube([width, depth, height]);
translate([thickness, thickness, thickness])
cube([width - 2*thickness, depth - 2*thickness, height - thickness]);
}}
}}
box(width, depth, height, thickness);
"""
def _rounded_box_template(self, params: Dict[str, Any]) -> str:
"""Generate SCAD code for a rounded box."""
width = params.get('width', 30)
depth = params.get('depth', 20)
height = params.get('height', 15)
radius = params.get('radius', 3)
segments = params.get('segments', 32)
return f"""// Rounded Box
// Parameters:
// width = {width}
// depth = {depth}
// height = {height}
// radius = {radius}
// segments = {segments}
width = {width};
depth = {depth};
height = {height};
radius = {radius};
$fn = {segments};
module rounded_box(width, depth, height, radius) {{
hull() {{
translate([radius, radius, radius])
sphere(r=radius);
translate([width-radius, radius, radius])
sphere(r=radius);
translate([radius, depth-radius, radius])
sphere(r=radius);
translate([width-radius, depth-radius, radius])
sphere(r=radius);
translate([radius, radius, height-radius])
sphere(r=radius);
translate([width-radius, radius, height-radius])
sphere(r=radius);
translate([radius, depth-radius, height-radius])
sphere(r=radius);
translate([width-radius, depth-radius, height-radius])
sphere(r=radius);
}}
}}
rounded_box(width, depth, height, radius);
"""
```
--------------------------------------------------------------------------------
/src/remote/cuda_mvs_client.py:
--------------------------------------------------------------------------------
```python
"""
Client for remote CUDA Multi-View Stereo processing.
This module provides a client to connect to a remote CUDA MVS server
within the LAN for processing multi-view images into 3D models.
"""
import os
import json
import logging
import requests
import base64
from typing import Dict, List, Optional, Any, Union
from pathlib import Path
import uuid
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CUDAMVSClient:
"""
Client for connecting to a remote CUDA Multi-View Stereo server.
This client handles:
1. Discovering available CUDA MVS servers on the LAN
2. Uploading images to the server
3. Requesting 3D reconstruction
4. Downloading the resulting 3D models
5. Monitoring job status
"""
def __init__(
self,
server_url: Optional[str] = None,
api_key: Optional[str] = None,
output_dir: str = "output/models",
discovery_port: int = 8765,
connection_timeout: int = 10,
upload_chunk_size: int = 1024 * 1024, # 1MB chunks
):
"""
Initialize the CUDA MVS client.
Args:
server_url: URL of the CUDA MVS server (if known)
api_key: API key for authentication (if required)
output_dir: Directory to save downloaded models
discovery_port: Port used for server discovery
connection_timeout: Timeout for server connections in seconds
upload_chunk_size: Chunk size for file uploads in bytes
"""
self.server_url = server_url
self.api_key = api_key
self.output_dir = output_dir
self.discovery_port = discovery_port
self.connection_timeout = connection_timeout
self.upload_chunk_size = upload_chunk_size
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Initialize session for connection pooling
self.session = requests.Session()
if api_key:
self.session.headers.update({"Authorization": f"Bearer {api_key}"})
def discover_servers(self) -> List[Dict[str, Any]]:
"""
Discover CUDA MVS servers on the local network.
Returns:
List of dictionaries containing server information:
[
{
"server_id": "unique-server-id",
"name": "CUDA MVS Server 1",
"url": "http://192.168.1.100:8765",
"capabilities": {
"max_images": 50,
"max_resolution": 4096,
"supported_formats": ["jpg", "png"],
"gpu_info": "NVIDIA RTX 4090 24GB"
},
"status": "available"
},
...
]
"""
import socket
import json
from zeroconf import ServiceBrowser, ServiceListener, Zeroconf
discovered_servers = []
class CUDAMVSListener(ServiceListener):
def add_service(self, zc, type_, name):
info = zc.get_service_info(type_, name)
if info:
server_info = {
"server_id": name.split('.')[0],
"name": info.properties.get(b'name', b'Unknown').decode('utf-8'),
"url": f"http://{socket.inet_ntoa(info.addresses[0])}:{info.port}",
"capabilities": json.loads(info.properties.get(b'capabilities', b'{}').decode('utf-8')),
"status": "available"
}
discovered_servers.append(server_info)
logger.info(f"Discovered CUDA MVS server: {server_info['name']} at {server_info['url']}")
try:
zeroconf = Zeroconf()
listener = CUDAMVSListener()
browser = ServiceBrowser(zeroconf, "_cudamvs._tcp.local.", listener)
# Wait for discovery (non-blocking in production code)
import time
time.sleep(2) # Give some time for discovery
zeroconf.close()
return discovered_servers
except Exception as e:
logger.error(f"Error discovering CUDA MVS servers: {e}")
return []
def test_connection(self, server_url: Optional[str] = None) -> Dict[str, Any]:
"""
Test connection to a CUDA MVS server.
Args:
server_url: URL of the server to test (uses self.server_url if None)
Returns:
Dictionary with connection status and server information
"""
url = server_url or self.server_url
if not url:
return {"status": "error", "message": "No server URL provided"}
try:
response = self.session.get(
f"{url}/api/status",
timeout=self.connection_timeout
)
if response.status_code == 200:
return {
"status": "success",
"server_info": response.json(),
"latency_ms": response.elapsed.total_seconds() * 1000
}
else:
return {
"status": "error",
"message": f"Server returned status code {response.status_code}",
"details": response.text
}
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"Connection error: {str(e)}"}
def upload_images(self, image_paths: List[str]) -> Dict[str, Any]:
"""
Upload images to the CUDA MVS server.
Args:
image_paths: List of paths to images to upload
Returns:
Dictionary with upload status and job information
"""
if not self.server_url:
return {"status": "error", "message": "No server URL configured"}
# Create a new job
try:
response = self.session.post(
f"{self.server_url}/api/jobs",
json={"num_images": len(image_paths)},
timeout=self.connection_timeout
)
if response.status_code != 201:
return {
"status": "error",
"message": f"Failed to create job: {response.status_code}",
"details": response.text
}
job_info = response.json()
job_id = job_info["job_id"]
# Upload each image
for i, image_path in enumerate(image_paths):
# Check if file exists
if not os.path.exists(image_path):
return {
"status": "error",
"message": f"Image file not found: {image_path}"
}
# Get file size for progress tracking
file_size = os.path.getsize(image_path)
# Prepare upload
with open(image_path, "rb") as f:
files = {
"file": (os.path.basename(image_path), f, "image/jpeg" if image_path.endswith(".jpg") else "image/png")
}
metadata = {
"image_index": i,
"total_images": len(image_paths),
"filename": os.path.basename(image_path)
}
response = self.session.post(
f"{self.server_url}/api/jobs/{job_id}/images",
files=files,
data={"metadata": json.dumps(metadata)},
timeout=None # No timeout for uploads
)
if response.status_code != 200:
return {
"status": "error",
"message": f"Failed to upload image {i+1}/{len(image_paths)}: {response.status_code}",
"details": response.text
}
logger.info(f"Uploaded image {i+1}/{len(image_paths)}: {os.path.basename(image_path)}")
# Start processing
response = self.session.post(
f"{self.server_url}/api/jobs/{job_id}/process",
timeout=self.connection_timeout
)
if response.status_code != 202:
return {
"status": "error",
"message": f"Failed to start processing: {response.status_code}",
"details": response.text
}
return {
"status": "success",
"job_id": job_id,
"message": f"Uploaded {len(image_paths)} images and started processing",
"job_url": f"{self.server_url}/api/jobs/{job_id}"
}
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"Upload error: {str(e)}"}
def get_job_status(self, job_id: str) -> Dict[str, Any]:
"""
Get the status of a CUDA MVS job.
Args:
job_id: ID of the job to check
Returns:
Dictionary with job status information
"""
if not self.server_url:
return {"status": "error", "message": "No server URL configured"}
try:
response = self.session.get(
f"{self.server_url}/api/jobs/{job_id}",
timeout=self.connection_timeout
)
if response.status_code == 200:
return {
"status": "success",
"job_info": response.json()
}
else:
return {
"status": "error",
"message": f"Failed to get job status: {response.status_code}",
"details": response.text
}
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"Connection error: {str(e)}"}
def download_model(self, job_id: str, output_format: str = "obj") -> Dict[str, Any]:
"""
Download a processed 3D model from the CUDA MVS server.
Args:
job_id: ID of the job to download
output_format: Format of the model to download (obj, ply, etc.)
Returns:
Dictionary with download status and local file path
"""
if not self.server_url:
return {"status": "error", "message": "No server URL configured"}
# Check job status first
status_result = self.get_job_status(job_id)
if status_result["status"] != "success":
return status_result
job_info = status_result["job_info"]
if job_info["status"] != "completed":
return {
"status": "error",
"message": f"Job is not completed yet. Current status: {job_info['status']}",
"job_info": job_info
}
# Download the model
try:
response = self.session.get(
f"{self.server_url}/api/jobs/{job_id}/model?format={output_format}",
stream=True,
timeout=None # No timeout for downloads
)
if response.status_code != 200:
return {
"status": "error",
"message": f"Failed to download model: {response.status_code}",
"details": response.text
}
# Create a unique filename
model_id = job_info.get("model_id", str(uuid.uuid4()))
output_path = os.path.join(self.output_dir, f"{model_id}.{output_format}")
# Save the file
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
logger.info(f"Downloaded model to {output_path}")
return {
"status": "success",
"model_id": model_id,
"local_path": output_path,
"format": output_format,
"job_id": job_id
}
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"Download error: {str(e)}"}
def cancel_job(self, job_id: str) -> Dict[str, Any]:
"""
Cancel a running CUDA MVS job.
Args:
job_id: ID of the job to cancel
Returns:
Dictionary with cancellation status
"""
if not self.server_url:
return {"status": "error", "message": "No server URL configured"}
try:
response = self.session.delete(
f"{self.server_url}/api/jobs/{job_id}",
timeout=self.connection_timeout
)
if response.status_code == 200:
return {
"status": "success",
"message": "Job cancelled successfully"
}
else:
return {
"status": "error",
"message": f"Failed to cancel job: {response.status_code}",
"details": response.text
}
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"Connection error: {str(e)}"}
def generate_model_from_images(
self,
image_paths: List[str],
output_format: str = "obj",
wait_for_completion: bool = True,
poll_interval: int = 5
) -> Dict[str, Any]:
"""
Complete workflow to generate a 3D model from images.
Args:
image_paths: List of paths to images
output_format: Format of the output model
wait_for_completion: Whether to wait for job completion
poll_interval: Interval in seconds to poll for job status
Returns:
Dictionary with job status and model information if completed
"""
# Upload images and start processing
upload_result = self.upload_images(image_paths)
if upload_result["status"] != "success":
return upload_result
job_id = upload_result["job_id"]
# If not waiting for completion, return the job info
if not wait_for_completion:
return upload_result
# Poll for job completion
import time
while True:
status_result = self.get_job_status(job_id)
if status_result["status"] != "success":
return status_result
job_info = status_result["job_info"]
if job_info["status"] == "completed":
# Download the model
return self.download_model(job_id, output_format)
elif job_info["status"] == "failed":
return {
"status": "error",
"message": "Job processing failed",
"job_info": job_info
}
# Wait before polling again
time.sleep(poll_interval)
logger.info(f"Job {job_id} status: {job_info['status']}, progress: {job_info.get('progress', 0)}%")
```
--------------------------------------------------------------------------------
/src/printer_discovery/printer_discovery.py:
--------------------------------------------------------------------------------
```python
import os
import logging
import socket
import json
import time
import threading
from typing import Dict, List, Any, Optional, Callable
logger = logging.getLogger(__name__)
class PrinterDiscovery:
"""
Discovers 3D printers on the network and provides interfaces for direct printing.
"""
def __init__(self):
"""Initialize the printer discovery service."""
self.printers = {} # Dictionary of discovered printers
self.discovery_thread = None
self.discovery_stop_event = threading.Event()
self.discovery_callback = None
def start_discovery(self, callback: Optional[Callable[[Dict[str, Any]], None]] = None) -> None:
"""
Start discovering 3D printers on the network.
Args:
callback: Optional callback function to call when a printer is discovered
"""
if self.discovery_thread and self.discovery_thread.is_alive():
logger.warning("Printer discovery already running")
return
self.discovery_callback = callback
self.discovery_stop_event.clear()
self.discovery_thread = threading.Thread(target=self._discover_printers)
self.discovery_thread.daemon = True
self.discovery_thread.start()
logger.info("Started printer discovery")
def stop_discovery(self) -> None:
"""Stop discovering 3D printers."""
if self.discovery_thread and self.discovery_thread.is_alive():
self.discovery_stop_event.set()
self.discovery_thread.join(timeout=2.0)
logger.info("Stopped printer discovery")
else:
logger.warning("Printer discovery not running")
def get_printers(self) -> Dict[str, Any]:
"""
Get the list of discovered printers.
Returns:
Dictionary of printer information
"""
return self.printers
def _discover_printers(self) -> None:
"""Discover 3D printers on the network using various protocols."""
# This is a simplified implementation that simulates printer discovery
# In a real implementation, you would use protocols like mDNS, SNMP, or OctoPrint API
# Simulate discovering printers
while not self.discovery_stop_event.is_set():
try:
# Simulate network discovery
self._discover_octoprint_printers()
self._discover_prusa_printers()
self._discover_ultimaker_printers()
# Wait before next discovery cycle
time.sleep(10)
except Exception as e:
logger.error(f"Error in printer discovery: {str(e)}")
time.sleep(5)
def _discover_octoprint_printers(self) -> None:
"""Discover OctoPrint servers on the network."""
# Simulate discovering OctoPrint servers
# In a real implementation, you would use mDNS to discover OctoPrint instances
# Simulate finding a printer
printer_id = "octoprint_1"
if printer_id not in self.printers:
printer_info = {
"id": printer_id,
"name": "OctoPrint Printer",
"type": "octoprint",
"address": "192.168.1.100",
"port": 80,
"api_key": None, # Would need to be provided by user
"status": "online",
"capabilities": ["print", "status", "cancel"]
}
self.printers[printer_id] = printer_info
if self.discovery_callback:
self.discovery_callback(printer_info)
logger.info(f"Discovered OctoPrint printer: {printer_info['name']}")
def _discover_prusa_printers(self) -> None:
"""Discover Prusa printers on the network."""
# Simulate discovering Prusa printers
# Simulate finding a printer
printer_id = "prusa_1"
if printer_id not in self.printers:
printer_info = {
"id": printer_id,
"name": "Prusa MK3S",
"type": "prusa",
"address": "192.168.1.101",
"port": 80,
"status": "online",
"capabilities": ["print", "status"]
}
self.printers[printer_id] = printer_info
if self.discovery_callback:
self.discovery_callback(printer_info)
logger.info(f"Discovered Prusa printer: {printer_info['name']}")
def _discover_ultimaker_printers(self) -> None:
"""Discover Ultimaker printers on the network."""
# Simulate discovering Ultimaker printers
# Simulate finding a printer
printer_id = "ultimaker_1"
if printer_id not in self.printers:
printer_info = {
"id": printer_id,
"name": "Ultimaker S5",
"type": "ultimaker",
"address": "192.168.1.102",
"port": 80,
"status": "online",
"capabilities": ["print", "status", "cancel"]
}
self.printers[printer_id] = printer_info
if self.discovery_callback:
self.discovery_callback(printer_info)
logger.info(f"Discovered Ultimaker printer: {printer_info['name']}")
class PrinterInterface:
"""
Interface for communicating with 3D printers.
"""
def __init__(self, printer_discovery: PrinterDiscovery):
"""
Initialize the printer interface.
Args:
printer_discovery: Instance of PrinterDiscovery for finding printers
"""
self.printer_discovery = printer_discovery
self.connected_printers = {} # Dictionary of connected printers
def connect_to_printer(self, printer_id: str, credentials: Optional[Dict[str, Any]] = None) -> bool:
"""
Connect to a specific printer.
Args:
printer_id: ID of the printer to connect to
credentials: Optional credentials for authentication
Returns:
True if connection successful, False otherwise
"""
printers = self.printer_discovery.get_printers()
if printer_id not in printers:
logger.error(f"Printer not found: {printer_id}")
return False
printer_info = printers[printer_id]
# Create appropriate printer client based on type
if printer_info["type"] == "octoprint":
client = OctoPrintClient(printer_info, credentials)
elif printer_info["type"] == "prusa":
client = PrusaClient(printer_info, credentials)
elif printer_info["type"] == "ultimaker":
client = UltimakerClient(printer_info, credentials)
else:
logger.error(f"Unsupported printer type: {printer_info['type']}")
return False
# Connect to the printer
if client.connect():
self.connected_printers[printer_id] = client
return True
else:
return False
def disconnect_from_printer(self, printer_id: str) -> bool:
"""
Disconnect from a specific printer.
Args:
printer_id: ID of the printer to disconnect from
Returns:
True if disconnection successful, False otherwise
"""
if printer_id not in self.connected_printers:
logger.error(f"Not connected to printer: {printer_id}")
return False
client = self.connected_printers[printer_id]
if client.disconnect():
del self.connected_printers[printer_id]
return True
else:
return False
def print_file(self, printer_id: str, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool:
"""
Send a file to a printer for printing.
Args:
printer_id: ID of the printer to print on
file_path: Path to the STL file to print
print_settings: Optional print settings
Returns:
True if print job started successfully, False otherwise
"""
if printer_id not in self.connected_printers:
logger.error(f"Not connected to printer: {printer_id}")
return False
client = self.connected_printers[printer_id]
return client.print_file(file_path, print_settings)
def get_printer_status(self, printer_id: str) -> Dict[str, Any]:
"""
Get the status of a specific printer.
Args:
printer_id: ID of the printer to get status for
Returns:
Dictionary with printer status information
"""
if printer_id not in self.connected_printers:
logger.error(f"Not connected to printer: {printer_id}")
return {"error": "Not connected to printer"}
client = self.connected_printers[printer_id]
return client.get_status()
def cancel_print(self, printer_id: str) -> bool:
"""
Cancel a print job on a specific printer.
Args:
printer_id: ID of the printer to cancel print on
Returns:
True if cancellation successful, False otherwise
"""
if printer_id not in self.connected_printers:
logger.error(f"Not connected to printer: {printer_id}")
return False
client = self.connected_printers[printer_id]
return client.cancel_print()
class PrinterClient:
"""Base class for printer clients."""
def __init__(self, printer_info: Dict[str, Any], credentials: Optional[Dict[str, Any]] = None):
"""
Initialize the printer client.
Args:
printer_info: Information about the printer
credentials: Optional credentials for authentication
"""
self.printer_info = printer_info
self.credentials = credentials or {}
self.connected = False
def connect(self) -> bool:
"""
Connect to the printer.
Returns:
True if connection successful, False otherwise
"""
# Base implementation - should be overridden by subclasses
self.connected = True
return True
def disconnect(self) -> bool:
"""
Disconnect from the printer.
Returns:
True if disconnection successful, False otherwise
"""
# Base implementation - should be overridden by subclasses
self.connected = False
return True
def print_file(self, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool:
"""
Send a file to the printer for printing.
Args:
file_path: Path to the STL file to print
print_settings: Optional print settings
Returns:
True if print job started successfully, False otherwise
"""
# Base implementation - should be overridden by subclasses
if not self.connected:
logger.error("Not connected to printer")
return False
logger.info(f"Printing file: {file_path}")
return True
def get_status(self) -> Dict[str, Any]:
"""
Get the status of the printer.
Returns:
Dictionary with printer status information
"""
# Base implementation - should be overridden by subclasses
if not self.connected:
return {"status": "disconnected"}
return {"status": "connected"}
def cancel_print(self) -> bool:
"""
Cancel the current print job.
Returns:
True if cancellation successful, False otherwise
"""
# Base implementation - should be overridden by subclasses
if not self.connected:
logger.error("Not connected to printer")
return False
logger.info("Cancelling print job")
return True
class OctoPrintClient(PrinterClient):
"""Client for OctoPrint printers."""
def connect(self) -> bool:
"""Connect to an OctoPrint server."""
try:
# In a real implementation, you would use the OctoPrint API
# to connect to the printer
# Check if API key is provided
if "api_key" not in self.credentials:
logger.error("API key required for OctoPrint")
return False
# Simulate connection
logger.info(f"Connected to OctoPrint server: {self.printer_info['address']}")
self.connected = True
return True
except Exception as e:
logger.error(f"Error connecting to OctoPrint server: {str(e)}")
return False
def print_file(self, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool:
"""Send a file to an OctoPrint server for printing."""
if not self.connected:
logger.error("Not connected to OctoPrint server")
return False
try:
# In a real implementation, you would use the OctoPrint API
# to upload the file and start printing
# Check if file exists
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
return False
# Simulate printing
logger.info(f"Printing file on OctoPrint server: {file_path}")
return True
except Exception as e:
logger.error(f"Error printing file on OctoPrint server: {str(e)}")
return False
def get_status(self) -> Dict[str, Any]:
"""Get the status of an OctoPrint server."""
if not self.connected:
return {"status": "disconnected"}
try:
# In a real implementation, you would use the OctoPrint API
# to get the printer status
# Simulate status
return {
"status": "connected",
"printer": {
"state": "operational",
"temperature": {
"bed": {"actual": 60.0, "target": 60.0},
"tool0": {"actual": 210.0, "target": 210.0}
}
},
"job": {
"file": {"name": "example.gcode"},
"progress": {"completion": 0.0, "printTime": 0, "printTimeLeft": 0}
}
}
except Exception as e:
logger.error(f"Error getting status from OctoPrint server: {str(e)}")
return {"status": "error", "message": str(e)}
class PrusaClient(PrinterClient):
"""Client for Prusa printers."""
def connect(self) -> bool:
"""Connect to a Prusa printer."""
try:
# In a real implementation, you would use the Prusa API
# to connect to the printer
# Simulate connection
logger.info(f"Connected to Prusa printer: {self.printer_info['address']}")
self.connected = True
return True
except Exception as e:
logger.error(f"Error connecting to Prusa printer: {str(e)}")
return False
class UltimakerClient(PrinterClient):
"""Client for Ultimaker printers."""
def connect(self) -> bool:
"""Connect to an Ultimaker printer."""
try:
# In a real implementation, you would use the Ultimaker API
# to connect to the printer
# Simulate connection
logger.info(f"Connected to Ultimaker printer: {self.printer_info['address']}")
self.connected = True
return True
except Exception as e:
logger.error(f"Error connecting to Ultimaker printer: {str(e)}")
return False
```
--------------------------------------------------------------------------------
/src/nlp/parameter_extractor.py:
--------------------------------------------------------------------------------
```python
import re
import logging
from typing import Dict, Any, Tuple, List, Optional
import json
logger = logging.getLogger(__name__)
class ParameterExtractor:
"""
Extract parameters from natural language descriptions.
Implements dialog flow for collecting specifications and translating them to OpenSCAD parameters.
"""
def __init__(self):
"""Initialize the parameter extractor."""
# Using only millimeters as per project requirements
self.unit_conversions = {
'mm': 1.0
}
# Shape recognition patterns with expanded vocabulary
self.shape_patterns = {
'cube': r'\b(cube|box|square|rectangular|block|cuboid|brick)\b',
'sphere': r'\b(sphere|ball|round|circular|globe|orb)\b',
'cylinder': r'\b(cylinder|tube|pipe|rod|circular column|pillar|column)\b',
'box': r'\b(hollow box|container|case|enclosure|bin|chest|tray)\b',
'rounded_box': r'\b(rounded box|rounded container|rounded case|rounded enclosure|smooth box|rounded corners|chamfered box)\b',
'cone': r'\b(cone|pyramid|tapered cylinder|funnel)\b',
'torus': r'\b(torus|donut|ring|loop|circular ring)\b',
'prism': r'\b(prism|triangular prism|wedge|triangular shape)\b',
'custom': r'\b(custom|complex|special|unique|combined|composite)\b'
}
# Parameter recognition patterns with enhanced unit detection
self.parameter_patterns = {
'width': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:wide|width|across|w)',
'height': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:high|height|tall|h)',
'depth': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:deep|depth|long|d|length)',
'radius': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:radius|r)',
'diameter': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:diameter|dia)',
'thickness': r'(\d+(?:\.\d+)?)\s*(?:mm|cm|m|in|inch|inches|ft|foot|feet)?\s*(?:thick|thickness|t)',
'segments': r'(\d+)\s*(?:segments|sides|faces|facets|smoothness)',
'center': r'\b(centered|center|middle|origin)\b',
'angle': r'(\d+(?:\.\d+)?)\s*(?:deg|degree|degrees|°)?\s*(?:angle|rotation|rotate|tilt)',
'scale': r'(\d+(?:\.\d+)?)\s*(?:x|times|scale|scaling|factor)',
'resolution': r'(\d+(?:\.\d+)?)\s*(?:resolution|quality|detail)'
}
# Dialog state for multi-turn conversations
self.dialog_state = {}
def extract_parameters(self, description: str, model_type: Optional[str] = None,
existing_parameters: Optional[Dict[str, Any]] = None) -> Tuple[str, Dict[str, Any]]:
"""
Extract model type and parameters from a natural language description.
Args:
description: Natural language description of the 3D object
model_type: Optional model type for context (if already known)
existing_parameters: Optional existing parameters for context (for modifications)
Returns:
Tuple of (model_type, parameters)
"""
# Use provided model_type or determine from description
if model_type is None:
model_type = self._determine_shape_type(description)
# Start with existing parameters if provided
parameters = existing_parameters.copy() if existing_parameters else {}
# Extract parameters based on the shape type
new_parameters = self._extract_shape_parameters(description, model_type)
# Update parameters with newly extracted ones
parameters.update(new_parameters)
# Apply default parameters if needed
parameters = self._apply_default_parameters(model_type, parameters)
logger.info(f"Extracted model type: {model_type}, parameters: {parameters}")
return model_type, parameters
def extract_parameters_from_modifications(self, modifications: str, model_type: Optional[str] = None,
existing_parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Extract parameters from modification description with contextual understanding.
Args:
modifications: Description of modifications to make
model_type: Optional model type for context
existing_parameters: Optional existing parameters for context
Returns:
Dictionary of parameters to update
"""
# Start with existing parameters if provided
parameters = existing_parameters.copy() if existing_parameters else {}
# Extract all possible parameters from the modifications
new_parameters = {}
for param_name, pattern in self.parameter_patterns.items():
matches = re.findall(pattern, modifications, re.IGNORECASE)
if matches:
# Take the last match if multiple are found
value = matches[-1]
if isinstance(value, tuple):
value = value[0] # Extract from capture group
new_parameters[param_name] = self._convert_to_mm(value, modifications)
# Update parameters with newly extracted ones
parameters.update(new_parameters)
# Apply contextual understanding based on model type
if model_type and not new_parameters:
# If no explicit parameters were found, try to infer from context
# For now, we'll just log this case since inference is complex
logger.info(f"No explicit parameters found in '{modifications}', using existing parameters")
logger.info(f"Extracted modification parameters: {parameters}")
return parameters
def get_missing_parameters(self, model_type: str, parameters: Dict[str, Any]) -> List[str]:
"""
Determine which required parameters are missing for a given model type.
Args:
model_type: Type of model
parameters: Currently extracted parameters
Returns:
List of missing parameter names
"""
required_params = self._get_required_parameters(model_type)
return [param for param in required_params if param not in parameters]
def update_dialog_state(self, user_id: str, model_type: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None) -> None:
"""
Update the dialog state for a user.
Args:
user_id: Unique identifier for the user
model_type: Optional model type to update
parameters: Optional parameters to update
"""
if user_id not in self.dialog_state:
self.dialog_state[user_id] = {
'model_type': None,
'parameters': {},
'missing_parameters': [],
'current_question': None
}
if model_type:
self.dialog_state[user_id]['model_type'] = model_type
if parameters:
self.dialog_state[user_id]['parameters'].update(parameters)
# Update missing parameters
if self.dialog_state[user_id]['model_type']:
missing = self.get_missing_parameters(
self.dialog_state[user_id]['model_type'],
self.dialog_state[user_id]['parameters']
)
self.dialog_state[user_id]['missing_parameters'] = missing
def get_next_question(self, user_id: str) -> Optional[str]:
"""
Get the next question to ask the user based on missing parameters.
Args:
user_id: Unique identifier for the user
Returns:
Question string or None if all parameters are collected
"""
if user_id not in self.dialog_state:
return "What kind of 3D object would you like to create?"
state = self.dialog_state[user_id]
# If we don't have a model type yet, ask for it
if not state['model_type']:
state['current_question'] = "What kind of 3D object would you like to create?"
return state['current_question']
# If we have missing parameters, ask for the first one
if state['missing_parameters']:
param = state['missing_parameters'][0]
question = self._get_parameter_question(param, state['model_type'])
state['current_question'] = question
return question
# All parameters collected
state['current_question'] = None
return None
def process_answer(self, user_id: str, answer: str) -> Dict[str, Any]:
"""
Process a user's answer to a question.
Args:
user_id: Unique identifier for the user
answer: User's answer to the current question
Returns:
Updated dialog state
"""
if user_id not in self.dialog_state:
# Initialize with default state
self.update_dialog_state(user_id)
state = self.dialog_state[user_id]
current_question = state['current_question']
# Process based on current question
if not state['model_type']:
# Trying to determine the model type
model_type = self._determine_shape_type(answer)
self.update_dialog_state(user_id, model_type=model_type)
elif state['missing_parameters']:
# Trying to collect a specific parameter
param = state['missing_parameters'][0]
value = self._extract_parameter_value(param, answer)
if value is not None:
self.update_dialog_state(user_id, parameters={param: value})
# Return the updated state
return self.dialog_state[user_id]
def _determine_shape_type(self, description: str) -> str:
"""
Determine the shape type from the description.
Enhanced to support more shape types and better pattern matching.
"""
# Check for explicit shape mentions
for shape, pattern in self.shape_patterns.items():
if re.search(pattern, description, re.IGNORECASE):
logger.info(f"Detected shape type: {shape} from pattern: {pattern}")
return shape
# Try to infer shape from context if no explicit mention
if re.search(r'\b(round|circular|sphere|ball)\b', description, re.IGNORECASE):
return "sphere"
elif re.search(r'\b(tall|column|pillar|rod)\b', description, re.IGNORECASE):
return "cylinder"
elif re.search(r'\b(box|container|case|enclosure)\b', description, re.IGNORECASE):
# Determine if it should be a rounded box
if re.search(r'\b(rounded|smooth|chamfered)\b', description, re.IGNORECASE):
return "rounded_box"
return "box"
# Default to cube if no shape is detected
logger.info("No specific shape detected, defaulting to cube")
return "cube"
def _extract_shape_parameters(self, description: str, model_type: str) -> Dict[str, Any]:
"""Extract parameters for a specific shape type."""
parameters = {}
# Extract all possible parameters
for param_name, pattern in self.parameter_patterns.items():
matches = re.findall(pattern, description, re.IGNORECASE)
if matches:
# Take the last match if multiple are found
value = matches[-1]
if isinstance(value, tuple):
value = value[0] # Extract from capture group
parameters[param_name] = self._convert_to_mm(value, description)
# Special case for diameter -> radius conversion
if 'diameter' in parameters and 'radius' not in parameters:
parameters['radius'] = parameters['diameter'] / 2
del parameters['diameter']
# Special case for center parameter
if 'center' in parameters:
center_value = parameters['center']
if isinstance(center_value, (int, float)):
# Convert numeric value to boolean string
parameters['center'] = 'true' if center_value > 0 else 'false'
else:
# Convert string value to boolean string
center_str = str(center_value).lower()
parameters['center'] = 'true' if center_str in ['true', 'yes', 'y', '1'] else 'false'
return parameters
def _convert_to_mm(self, value_str: str, context: str) -> float:
"""
Convert a value to millimeters.
As per project requirements, we only use millimeters for design.
"""
try:
value = float(value_str)
# Since we're only using millimeters, we just return the value directly
# This simplifies the conversion logic while maintaining the function interface
logger.info(f"Using value {value} in millimeters")
return value
except ValueError:
logger.warning(f"Could not convert value to float: {value_str}")
return 0.0
def _apply_default_parameters(self, model_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Apply default parameters based on the model type."""
defaults = {
'cube': {'width': 10, 'depth': 10, 'height': 10, 'center': 'false'},
'sphere': {'radius': 10, 'segments': 32},
'cylinder': {'radius': 10, 'height': 20, 'center': 'false', 'segments': 32},
'box': {'width': 30, 'depth': 20, 'height': 15, 'thickness': 2},
'rounded_box': {'width': 30, 'depth': 20, 'height': 15, 'radius': 3, 'segments': 32},
'cone': {'base_radius': 10, 'height': 20, 'center': 'false', 'segments': 32},
'torus': {'major_radius': 20, 'minor_radius': 5, 'segments': 32},
'prism': {'width': 20, 'height': 15, 'depth': 20, 'center': 'false'},
'custom': {'width': 20, 'height': 20, 'depth': 20, 'center': 'false'}
}
# Get defaults for the model type
model_defaults = defaults.get(model_type, {})
# Apply defaults for missing parameters
for param, default_value in model_defaults.items():
if param not in parameters:
parameters[param] = default_value
return parameters
def _get_required_parameters(self, model_type: str) -> List[str]:
"""Get the list of required parameters for a model type."""
required_params = {
'cube': ['width', 'depth', 'height'],
'sphere': ['radius'],
'cylinder': ['radius', 'height'],
'box': ['width', 'depth', 'height', 'thickness'],
'rounded_box': ['width', 'depth', 'height', 'radius'],
'cone': ['base_radius', 'height'],
'torus': ['major_radius', 'minor_radius'],
'prism': ['width', 'height', 'depth'],
'custom': ['width', 'height', 'depth']
}
return required_params.get(model_type, [])
def _get_parameter_question(self, param: str, model_type: str) -> str:
"""Get a question to ask for a specific parameter."""
questions = {
'width': f"What should be the width of the {model_type} in mm?",
'depth': f"What should be the depth of the {model_type} in mm?",
'height': f"What should be the height of the {model_type} in mm?",
'radius': f"What should be the radius of the {model_type} in mm?",
'thickness': f"What should be the wall thickness of the {model_type} in mm?",
'segments': f"How many segments should the {model_type} have for smoothness?",
'base_radius': f"What should be the base radius of the {model_type} in mm?",
'major_radius': f"What should be the major radius of the {model_type} in mm?",
'minor_radius': f"What should be the minor radius of the {model_type} in mm?",
'diameter': f"What should be the diameter of the {model_type} in mm?",
'angle': f"What should be the angle of the {model_type} in degrees?",
'scale': f"What should be the scale factor for the {model_type}?",
'resolution': f"What resolution should the {model_type} have (higher means more detailed)?",
'center': f"Should the {model_type} be centered? (yes/no)"
}
return questions.get(param, f"What should be the {param} of the {model_type}?")
def _extract_parameter_value(self, param: str, answer: str) -> Optional[float]:
"""Extract a parameter value from an answer."""
pattern = self.parameter_patterns.get(param)
if not pattern:
# For parameters without specific patterns, try to extract any number
pattern = r'(\d+(?:\.\d+)?)'
matches = re.findall(pattern, answer, re.IGNORECASE)
if matches:
value = matches[-1]
if isinstance(value, tuple):
value = value[0] # Extract from capture group
return self._convert_to_mm(value, answer)
# Try to extract just a number
matches = re.findall(r'(\d+(?:\.\d+)?)', answer)
if matches:
value = matches[-1]
return self._convert_to_mm(value, answer)
return None
```
--------------------------------------------------------------------------------
/src/remote/connection_manager.py:
--------------------------------------------------------------------------------
```python
"""
Connection manager for remote CUDA Multi-View Stereo processing.
This module provides functionality to discover, connect to, and manage
connections with remote CUDA MVS servers within the LAN.
"""
import os
import json
import logging
import time
import threading
from typing import Dict, List, Optional, Any, Union, Callable
import socket
from zeroconf import ServiceBrowser, ServiceListener, Zeroconf
from src.remote.cuda_mvs_client import CUDAMVSClient
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CUDAMVSConnectionManager:
"""
Connection manager for remote CUDA MVS servers.
This class handles:
1. Discovering available CUDA MVS servers on the LAN
2. Managing connections to multiple servers
3. Load balancing across available servers
4. Monitoring server health and status
5. Automatic failover if a server becomes unavailable
"""
def __init__(
self,
api_key: Optional[str] = None,
discovery_port: int = 8765,
connection_timeout: int = 10,
health_check_interval: int = 60,
auto_discover: bool = True
):
"""
Initialize the connection manager.
Args:
api_key: API key for authentication (if required)
discovery_port: Port used for server discovery
connection_timeout: Timeout for server connections in seconds
health_check_interval: Interval for health checks in seconds
auto_discover: Whether to automatically discover servers on startup
"""
self.api_key = api_key
self.discovery_port = discovery_port
self.connection_timeout = connection_timeout
self.health_check_interval = health_check_interval
# Server tracking
self.servers: Dict[str, Dict[str, Any]] = {}
self.clients: Dict[str, CUDAMVSClient] = {}
self.server_lock = threading.RLock()
# Health check thread
self.health_check_thread = None
self.health_check_stop_event = threading.Event()
# Discovery
self.zeroconf = None
self.browser = None
# Start discovery if enabled
if auto_discover:
self.start_discovery()
# Start health check thread
self.start_health_check()
def start_discovery(self):
"""
Start discovering CUDA MVS servers on the LAN.
"""
if self.zeroconf is not None:
return
try:
self.zeroconf = Zeroconf()
listener = CUDAMVSServiceListener(self)
self.browser = ServiceBrowser(self.zeroconf, "_cudamvs._tcp.local.", listener)
logger.info("Started CUDA MVS server discovery")
except Exception as e:
logger.error(f"Error starting discovery: {e}")
def stop_discovery(self):
"""
Stop discovering CUDA MVS servers.
"""
if self.zeroconf is not None:
try:
self.zeroconf.close()
self.zeroconf = None
self.browser = None
logger.info("Stopped CUDA MVS server discovery")
except Exception as e:
logger.error(f"Error stopping discovery: {e}")
def start_health_check(self):
"""
Start the health check thread.
"""
if self.health_check_thread is not None and self.health_check_thread.is_alive():
return
self.health_check_stop_event.clear()
self.health_check_thread = threading.Thread(
target=self._health_check_loop,
daemon=True
)
self.health_check_thread.start()
logger.info("Started health check thread")
def stop_health_check(self):
"""
Stop the health check thread.
"""
if self.health_check_thread is not None:
self.health_check_stop_event.set()
self.health_check_thread.join(timeout=5)
self.health_check_thread = None
logger.info("Stopped health check thread")
def _health_check_loop(self):
"""
Health check loop that runs in a separate thread.
"""
while not self.health_check_stop_event.is_set():
try:
self.check_all_servers()
except Exception as e:
logger.error(f"Error in health check: {e}")
# Wait for the next check interval or until stopped
self.health_check_stop_event.wait(self.health_check_interval)
def add_server(self, server_info: Dict[str, Any]):
"""
Add a server to the manager.
Args:
server_info: Dictionary with server information
"""
server_id = server_info.get("server_id")
if not server_id:
logger.error("Cannot add server without server_id")
return
with self.server_lock:
# Check if server already exists
if server_id in self.servers:
# Update existing server info
self.servers[server_id].update(server_info)
logger.info(f"Updated server: {server_info.get('name')} at {server_info.get('url')}")
else:
# Add new server
self.servers[server_id] = server_info
# Create client for the server
self.clients[server_id] = CUDAMVSClient(
server_url=server_info.get("url"),
api_key=self.api_key,
connection_timeout=self.connection_timeout
)
logger.info(f"Added server: {server_info.get('name')} at {server_info.get('url')}")
def remove_server(self, server_id: str):
"""
Remove a server from the manager.
Args:
server_id: ID of the server to remove
"""
with self.server_lock:
if server_id in self.servers:
server_info = self.servers.pop(server_id)
if server_id in self.clients:
del self.clients[server_id]
logger.info(f"Removed server: {server_info.get('name')} at {server_info.get('url')}")
def get_servers(self) -> List[Dict[str, Any]]:
"""
Get a list of all servers.
Returns:
List of dictionaries with server information
"""
with self.server_lock:
return list(self.servers.values())
def get_server(self, server_id: str) -> Optional[Dict[str, Any]]:
"""
Get information about a specific server.
Args:
server_id: ID of the server
Returns:
Dictionary with server information or None if not found
"""
with self.server_lock:
return self.servers.get(server_id)
def get_client(self, server_id: str) -> Optional[CUDAMVSClient]:
"""
Get the client for a specific server.
Args:
server_id: ID of the server
Returns:
CUDAMVSClient instance or None if not found
"""
with self.server_lock:
return self.clients.get(server_id)
def get_best_server(self) -> Optional[str]:
"""
Get the ID of the best server to use based on availability and load.
Returns:
Server ID or None if no servers are available
"""
with self.server_lock:
available_servers = [
server_id for server_id, server in self.servers.items()
if server.get("status") == "available"
]
if not available_servers:
return None
# For now, just return the first available server
# In a more advanced implementation, this would consider
# server load, capabilities, latency, etc.
return available_servers[0]
def check_server(self, server_id: str) -> Dict[str, Any]:
"""
Check the status of a specific server.
Args:
server_id: ID of the server to check
Returns:
Dictionary with server status information
"""
client = self.get_client(server_id)
if not client:
return {"status": "error", "message": f"Server {server_id} not found"}
# Test connection
result = client.test_connection()
with self.server_lock:
if server_id in self.servers:
# Update server status
if result["status"] == "success":
self.servers[server_id]["status"] = "available"
self.servers[server_id]["last_check"] = time.time()
self.servers[server_id]["latency_ms"] = result.get("latency_ms")
# Update capabilities if available
if "server_info" in result and "capabilities" in result["server_info"]:
self.servers[server_id]["capabilities"] = result["server_info"]["capabilities"]
else:
self.servers[server_id]["status"] = "unavailable"
self.servers[server_id]["last_check"] = time.time()
self.servers[server_id]["error"] = result.get("message")
return result
def check_all_servers(self) -> Dict[str, Dict[str, Any]]:
"""
Check the status of all servers.
Returns:
Dictionary mapping server IDs to status information
"""
results = {}
with self.server_lock:
server_ids = list(self.servers.keys())
for server_id in server_ids:
results[server_id] = self.check_server(server_id)
return results
def discover_servers(self) -> List[Dict[str, Any]]:
"""
Manually discover CUDA MVS servers on the LAN.
Returns:
List of dictionaries containing server information
"""
# Create a temporary client to discover servers
client = CUDAMVSClient(
api_key=self.api_key,
discovery_port=self.discovery_port,
connection_timeout=self.connection_timeout
)
discovered_servers = client.discover_servers()
# Add discovered servers
for server_info in discovered_servers:
self.add_server(server_info)
return discovered_servers
def upload_images_to_best_server(self, image_paths: List[str]) -> Dict[str, Any]:
"""
Upload images to the best available server.
Args:
image_paths: List of paths to images to upload
Returns:
Dictionary with upload status and job information
"""
server_id = self.get_best_server()
if not server_id:
return {"status": "error", "message": "No available servers"}
client = self.get_client(server_id)
if not client:
return {"status": "error", "message": f"Client for server {server_id} not found"}
# Upload images
result = client.upload_images(image_paths)
# Add server information to result
result["server_id"] = server_id
result["server_name"] = self.servers[server_id].get("name")
return result
def generate_model_from_images(
self,
image_paths: List[str],
output_format: str = "obj",
wait_for_completion: bool = True,
poll_interval: int = 5,
server_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Generate a 3D model from images using the best available server.
Args:
image_paths: List of paths to images
output_format: Format of the output model
wait_for_completion: Whether to wait for job completion
poll_interval: Interval in seconds to poll for job status
server_id: ID of the server to use (uses best server if None)
Returns:
Dictionary with job status and model information if completed
"""
# Get server to use
if server_id is None:
server_id = self.get_best_server()
if not server_id:
return {"status": "error", "message": "No available servers"}
client = self.get_client(server_id)
if not client:
return {"status": "error", "message": f"Client for server {server_id} not found"}
# Generate model
result = client.generate_model_from_images(
image_paths=image_paths,
output_format=output_format,
wait_for_completion=wait_for_completion,
poll_interval=poll_interval
)
# Add server information to result
result["server_id"] = server_id
result["server_name"] = self.servers[server_id].get("name")
return result
def get_job_status(self, job_id: str, server_id: str) -> Dict[str, Any]:
"""
Get the status of a job on a specific server.
Args:
job_id: ID of the job to check
server_id: ID of the server
Returns:
Dictionary with job status information
"""
client = self.get_client(server_id)
if not client:
return {"status": "error", "message": f"Client for server {server_id} not found"}
return client.get_job_status(job_id)
def download_model(self, job_id: str, server_id: str, output_format: str = "obj") -> Dict[str, Any]:
"""
Download a processed 3D model from a specific server.
Args:
job_id: ID of the job to download
server_id: ID of the server
output_format: Format of the model to download
Returns:
Dictionary with download status and local file path
"""
client = self.get_client(server_id)
if not client:
return {"status": "error", "message": f"Client for server {server_id} not found"}
return client.download_model(job_id, output_format)
def cancel_job(self, job_id: str, server_id: str) -> Dict[str, Any]:
"""
Cancel a running job on a specific server.
Args:
job_id: ID of the job to cancel
server_id: ID of the server
Returns:
Dictionary with cancellation status
"""
client = self.get_client(server_id)
if not client:
return {"status": "error", "message": f"Client for server {server_id} not found"}
return client.cancel_job(job_id)
def cleanup(self):
"""
Clean up resources.
"""
self.stop_health_check()
self.stop_discovery()
class CUDAMVSServiceListener(ServiceListener):
"""
Zeroconf service listener for CUDA MVS servers.
"""
def __init__(self, connection_manager: CUDAMVSConnectionManager):
"""
Initialize the service listener.
Args:
connection_manager: Connection manager to update with discovered servers
"""
self.connection_manager = connection_manager
def add_service(self, zc: Zeroconf, type_: str, name: str):
"""
Called when a service is discovered.
Args:
zc: Zeroconf instance
type_: Service type
name: Service name
"""
info = zc.get_service_info(type_, name)
if info:
try:
# Extract server information
server_id = name.split('.')[0]
server_name = info.properties.get(b'name', b'Unknown').decode('utf-8')
# Get server URL
addresses = info.parsed_addresses()
if not addresses:
return
server_url = f"http://{addresses[0]}:{info.port}"
# Parse capabilities
capabilities = {}
if b'capabilities' in info.properties:
try:
capabilities = json.loads(info.properties[b'capabilities'].decode('utf-8'))
except json.JSONDecodeError:
pass
# Create server info
server_info = {
"server_id": server_id,
"name": server_name,
"url": server_url,
"capabilities": capabilities,
"status": "unknown",
"discovered_at": time.time()
}
# Add server to connection manager
self.connection_manager.add_server(server_info)
except Exception as e:
logger.error(f"Error processing discovered service: {e}")
def remove_service(self, zc: Zeroconf, type_: str, name: str):
"""
Called when a service is removed.
Args:
zc: Zeroconf instance
type_: Service type
name: Service name
"""
try:
server_id = name.split('.')[0]
self.connection_manager.remove_server(server_id)
except Exception as e:
logger.error(f"Error removing service: {e}")
def update_service(self, zc: Zeroconf, type_: str, name: str):
"""
Called when a service is updated.
Args:
zc: Zeroconf instance
type_: Service type
name: Service name
"""
self.add_service(zc, type_, name)
```
--------------------------------------------------------------------------------
/src/visualization/web_interface.py:
--------------------------------------------------------------------------------
```python
import os
import base64
import logging
from typing import Dict, Any, List, Optional
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
logger = logging.getLogger(__name__)
class WebInterface:
"""
Web interface for displaying model previews and managing 3D models.
"""
def __init__(self, app, static_dir: str, templates_dir: str, output_dir: str):
"""
Initialize the web interface.
Args:
app: FastAPI application
static_dir: Directory for static files
templates_dir: Directory for templates
output_dir: Directory containing output files (STL, PNG)
"""
self.app = app
self.static_dir = static_dir
self.templates_dir = templates_dir
self.output_dir = output_dir
self.preview_dir = os.path.join(output_dir, "preview")
self.stl_dir = os.path.join(output_dir, "stl")
# Create directories if they don't exist
os.makedirs(self.static_dir, exist_ok=True)
os.makedirs(self.templates_dir, exist_ok=True)
# Create router
self.router = APIRouter(prefix="/ui", tags=["UI"])
# Set up static files
self.app.mount("/static", StaticFiles(directory=static_dir), name="static")
# Set up templates
self.templates = Jinja2Templates(directory=templates_dir)
# Register routes
self._register_routes()
# Create template files
self._create_template_files()
# Create static files
self._create_static_files()
def _register_routes(self):
"""Register routes for the web interface."""
# Home page
@self.router.get("/", response_class=HTMLResponse)
async def home(request: Request):
return self.templates.TemplateResponse("index.html", {"request": request})
# Model preview page
@self.router.get("/preview/{model_id}", response_class=HTMLResponse)
async def preview(request: Request, model_id: str):
# Check if preview exists
preview_file = os.path.join(self.preview_dir, f"{model_id}.png")
if not os.path.exists(preview_file):
raise HTTPException(status_code=404, detail="Preview not found")
# Get multi-angle previews if they exist
angles = ["front", "top", "right", "perspective"]
previews = {}
for angle in angles:
angle_file = os.path.join(self.preview_dir, f"{model_id}_{angle}.png")
if os.path.exists(angle_file):
previews[angle] = f"/api/preview/{model_id}_{angle}"
# If no multi-angle previews, use the main preview
if not previews:
previews["main"] = f"/api/preview/{model_id}"
# Check if STL exists
stl_file = os.path.join(self.stl_dir, f"{model_id}.stl")
stl_url = f"/api/stl/{model_id}" if os.path.exists(stl_file) else None
return self.templates.TemplateResponse(
"preview.html",
{
"request": request,
"model_id": model_id,
"previews": previews,
"stl_url": stl_url
}
)
# List all models
@self.router.get("/models", response_class=HTMLResponse)
async def list_models(request: Request):
# Get all STL files
stl_files = []
if os.path.exists(self.stl_dir):
stl_files = [f for f in os.listdir(self.stl_dir) if f.endswith(".stl")]
# Extract model IDs
model_ids = [os.path.splitext(f)[0] for f in stl_files]
# Get preview URLs
models = []
for model_id in model_ids:
preview_file = os.path.join(self.preview_dir, f"{model_id}.png")
preview_url = f"/api/preview/{model_id}" if os.path.exists(preview_file) else None
stl_url = f"/api/stl/{model_id}"
models.append({
"id": model_id,
"preview_url": preview_url,
"stl_url": stl_url
})
return self.templates.TemplateResponse(
"models.html",
{"request": request, "models": models}
)
# API endpoints for serving files
# Serve preview image
@self.app.get("/api/preview/{preview_id}")
async def get_preview(preview_id: str):
preview_file = os.path.join(self.preview_dir, f"{preview_id}.png")
if not os.path.exists(preview_file):
raise HTTPException(status_code=404, detail="Preview not found")
# Return the file
with open(preview_file, "rb") as f:
content = f.read()
return {
"content": base64.b64encode(content).decode("utf-8"),
"content_type": "image/png"
}
# Serve STL file
@self.app.get("/api/stl/{model_id}")
async def get_stl(model_id: str):
stl_file = os.path.join(self.stl_dir, f"{model_id}.stl")
if not os.path.exists(stl_file):
raise HTTPException(status_code=404, detail="STL file not found")
# Return the file
with open(stl_file, "rb") as f:
content = f.read()
return {
"content": base64.b64encode(content).decode("utf-8"),
"content_type": "application/octet-stream",
"filename": f"{model_id}.stl"
}
# Register the router with the app
self.app.include_router(self.router)
def _create_template_files(self):
"""Create template files for the web interface."""
# Create base template
base_template = """<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{% block title %}OpenSCAD MCP Server{% endblock %}</title>
<link rel="stylesheet" href="/static/styles.css">
</head>
<body>
<header>
<h1>OpenSCAD MCP Server</h1>
<nav>
<ul>
<li><a href="/ui/">Home</a></li>
<li><a href="/ui/models">Models</a></li>
</ul>
</nav>
</header>
<main>
{% block content %}{% endblock %}
</main>
<footer>
<p>OpenSCAD MCP Server - Model Context Protocol Implementation</p>
</footer>
<script src="/static/script.js"></script>
</body>
</html>
"""
# Create index template
index_template = """{% extends "base.html" %}
{% block title %}OpenSCAD MCP Server - Home{% endblock %}
{% block content %}
<section class="hero">
<h2>Welcome to OpenSCAD MCP Server</h2>
<p>A Model Context Protocol server for generating 3D models with OpenSCAD</p>
</section>
<section class="features">
<div class="feature">
<h3>Natural Language Processing</h3>
<p>Describe 3D objects in natural language and get parametric models</p>
</div>
<div class="feature">
<h3>Preview Generation</h3>
<p>See your models from multiple angles before exporting</p>
</div>
<div class="feature">
<h3>STL Export</h3>
<p>Generate STL files ready for 3D printing</p>
</div>
</section>
{% endblock %}
"""
# Create preview template
preview_template = """{% extends "base.html" %}
{% block title %}Model Preview - {{ model_id }}{% endblock %}
{% block content %}
<section class="model-preview">
<h2>Model Preview: {{ model_id }}</h2>
<div class="preview-container">
{% for angle, url in previews.items() %}
<div class="preview-angle">
<h3>{{ angle|title }} View</h3>
<img src="{{ url }}" alt="{{ angle }} view of {{ model_id }}" class="preview-image" data-angle="{{ angle }}">
</div>
{% endfor %}
</div>
{% if stl_url %}
<div class="download-container">
<a href="{{ stl_url }}" class="download-button" download="{{ model_id }}.stl">Download STL</a>
</div>
{% endif %}
</section>
{% endblock %}
"""
# Create models template
models_template = """{% extends "base.html" %}
{% block title %}All Models{% endblock %}
{% block content %}
<section class="models-list">
<h2>All Models</h2>
{% if models %}
<div class="models-grid">
{% for model in models %}
<div class="model-card">
<h3>{{ model.id }}</h3>
{% if model.preview_url %}
<img src="{{ model.preview_url }}" alt="Preview of {{ model.id }}" class="model-thumbnail">
{% else %}
<div class="no-preview">No preview available</div>
{% endif %}
<div class="model-actions">
<a href="/ui/preview/{{ model.id }}" class="view-button">View</a>
<a href="{{ model.stl_url }}" class="download-button" download="{{ model.id }}.stl">Download</a>
</div>
</div>
{% endfor %}
</div>
{% else %}
<p>No models found.</p>
{% endif %}
</section>
{% endblock %}
"""
# Write templates to files
os.makedirs(self.templates_dir, exist_ok=True)
with open(os.path.join(self.templates_dir, "base.html"), "w") as f:
f.write(base_template)
with open(os.path.join(self.templates_dir, "index.html"), "w") as f:
f.write(index_template)
with open(os.path.join(self.templates_dir, "preview.html"), "w") as f:
f.write(preview_template)
with open(os.path.join(self.templates_dir, "models.html"), "w") as f:
f.write(models_template)
def _create_static_files(self):
"""Create static files for the web interface."""
# Create CSS file
css = """/* Base styles */
* {
box-sizing: border-box;
margin: 0;
padding: 0;
}
body {
font-family: Arial, sans-serif;
line-height: 1.6;
color: #333;
background-color: #f4f4f4;
}
header {
background-color: #333;
color: #fff;
padding: 1rem;
}
header h1 {
margin-bottom: 0.5rem;
}
nav ul {
display: flex;
list-style: none;
}
nav ul li {
margin-right: 1rem;
}
nav ul li a {
color: #fff;
text-decoration: none;
}
nav ul li a:hover {
text-decoration: underline;
}
main {
max-width: 1200px;
margin: 0 auto;
padding: 2rem;
}
footer {
background-color: #333;
color: #fff;
text-align: center;
padding: 1rem;
margin-top: 2rem;
}
/* Home page */
.hero {
text-align: center;
margin-bottom: 2rem;
}
.hero h2 {
font-size: 2rem;
margin-bottom: 1rem;
}
.features {
display: flex;
justify-content: space-between;
flex-wrap: wrap;
}
.feature {
flex: 1;
background-color: #fff;
padding: 1.5rem;
margin: 0.5rem;
border-radius: 5px;
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1);
}
.feature h3 {
margin-bottom: 1rem;
}
/* Preview page */
.model-preview h2 {
margin-bottom: 1.5rem;
}
.preview-container {
display: flex;
flex-wrap: wrap;
gap: 1rem;
margin-bottom: 1.5rem;
}
.preview-angle {
flex: 1;
min-width: 300px;
background-color: #fff;
padding: 1rem;
border-radius: 5px;
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1);
}
.preview-angle h3 {
margin-bottom: 0.5rem;
}
.preview-image {
width: 100%;
height: auto;
border: 1px solid #ddd;
}
.download-container {
text-align: center;
margin-top: 1rem;
}
.download-button {
display: inline-block;
background-color: #4CAF50;
color: white;
padding: 0.5rem 1rem;
text-decoration: none;
border-radius: 4px;
}
.download-button:hover {
background-color: #45a049;
}
/* Models page */
.models-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(250px, 1fr));
gap: 1.5rem;
}
.model-card {
background-color: #fff;
border-radius: 5px;
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.1);
overflow: hidden;
}
.model-card h3 {
padding: 1rem;
background-color: #f8f8f8;
border-bottom: 1px solid #eee;
}
.model-thumbnail {
width: 100%;
height: 200px;
object-fit: contain;
background-color: #f4f4f4;
}
.no-preview {
width: 100%;
height: 200px;
display: flex;
align-items: center;
justify-content: center;
background-color: #f4f4f4;
color: #999;
}
.model-actions {
display: flex;
padding: 1rem;
}
.view-button, .download-button {
flex: 1;
text-align: center;
padding: 0.5rem;
text-decoration: none;
border-radius: 4px;
margin: 0 0.25rem;
}
.view-button {
background-color: #2196F3;
color: white;
}
.view-button:hover {
background-color: #0b7dda;
}
"""
# Create JavaScript file
js = """// JavaScript for OpenSCAD MCP Server web interface
document.addEventListener('DOMContentLoaded', function() {
// Handle image loading errors
const images = document.querySelectorAll('img');
images.forEach(img => {
img.onerror = function() {
this.src = '/static/placeholder.png';
};
});
// Handle STL download
const downloadButtons = document.querySelectorAll('.download-button');
downloadButtons.forEach(button => {
button.addEventListener('click', async function(e) {
e.preventDefault();
const url = this.getAttribute('href');
const filename = this.hasAttribute('download') ? this.getAttribute('download') : 'model.stl';
try {
const response = await fetch(url);
const data = await response.json();
// Decode base64 content
const content = atob(data.content);
// Convert to Blob
const bytes = new Uint8Array(content.length);
for (let i = 0; i < content.length; i++) {
bytes[i] = content.charCodeAt(i);
}
const blob = new Blob([bytes], { type: data.content_type });
// Create download link
const downloadLink = document.createElement('a');
downloadLink.href = URL.createObjectURL(blob);
downloadLink.download = data.filename || filename;
// Trigger download
document.body.appendChild(downloadLink);
downloadLink.click();
document.body.removeChild(downloadLink);
} catch (error) {
console.error('Error downloading file:', error);
alert('Error downloading file. Please try again.');
}
});
});
// Handle preview images
const previewImages = document.querySelectorAll('.preview-image');
previewImages.forEach(img => {
img.addEventListener('click', function() {
const url = this.getAttribute('src');
const angle = this.getAttribute('data-angle');
// Create modal for larger view
const modal = document.createElement('div');
modal.className = 'preview-modal';
modal.innerHTML = `
<div class="modal-content">
<span class="close-button">×</span>
<h3>${angle ? angle.charAt(0).toUpperCase() + angle.slice(1) + ' View' : 'Preview'}</h3>
<img src="${url}" alt="Preview">
</div>
`;
// Add modal styles
modal.style.position = 'fixed';
modal.style.top = '0';
modal.style.left = '0';
modal.style.width = '100%';
modal.style.height = '100%';
modal.style.backgroundColor = 'rgba(0,0,0,0.7)';
modal.style.display = 'flex';
modal.style.alignItems = 'center';
modal.style.justifyContent = 'center';
modal.style.zIndex = '1000';
const modalContent = modal.querySelector('.modal-content');
modalContent.style.backgroundColor = '#fff';
modalContent.style.padding = '20px';
modalContent.style.borderRadius = '5px';
modalContent.style.maxWidth = '90%';
modalContent.style.maxHeight = '90%';
modalContent.style.overflow = 'auto';
const closeButton = modal.querySelector('.close-button');
closeButton.style.float = 'right';
closeButton.style.fontSize = '1.5rem';
closeButton.style.fontWeight = 'bold';
closeButton.style.cursor = 'pointer';
const modalImg = modal.querySelector('img');
modalImg.style.maxWidth = '100%';
modalImg.style.maxHeight = '70vh';
modalImg.style.display = 'block';
modalImg.style.margin = '0 auto';
// Add modal to body
document.body.appendChild(modal);
// Close modal when clicking close button or outside the modal
closeButton.addEventListener('click', function() {
document.body.removeChild(modal);
});
modal.addEventListener('click', function(e) {
if (e.target === modal) {
document.body.removeChild(modal);
}
});
});
});
});
"""
# Create placeholder image
placeholder_svg = """<svg xmlns="http://www.w3.org/2000/svg" width="800" height="600" viewBox="0 0 800 600">
<rect width="800" height="600" fill="#f0f0f0"/>
<text x="400" y="300" font-family="Arial" font-size="24" text-anchor="middle" fill="#999">Preview not available</text>
</svg>"""
# Write static files
os.makedirs(self.static_dir, exist_ok=True)
with open(os.path.join(self.static_dir, "styles.css"), "w") as f:
f.write(css)
with open(os.path.join(self.static_dir, "script.js"), "w") as f:
f.write(js)
with open(os.path.join(self.static_dir, "placeholder.svg"), "w") as f:
f.write(placeholder_svg)
```
--------------------------------------------------------------------------------
/src/remote/cuda_mvs_server.py:
--------------------------------------------------------------------------------
```python
"""
Server for remote CUDA Multi-View Stereo processing.
This module provides a server implementation that can be deployed on a machine
with CUDA capabilities to process multi-view images into 3D models remotely.
"""
import os
import sys
import json
import uuid
import logging
import argparse
import tempfile
import subprocess
from typing import Dict, List, Optional, Any, Union
from pathlib import Path
import shutil
import time
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks, Depends
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import APIKeyHeader
import uvicorn
from pydantic import BaseModel, Field
from zeroconf import ServiceInfo, Zeroconf
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Constants
DEFAULT_PORT = 8765
DEFAULT_HOST = "0.0.0.0"
DEFAULT_CUDA_MVS_PATH = "/opt/cuda-multi-view-stereo"
DEFAULT_OUTPUT_DIR = "output"
DEFAULT_MAX_JOBS = 5
DEFAULT_MAX_IMAGES_PER_JOB = 50
DEFAULT_JOB_TIMEOUT = 3600 # 1 hour
# Models
class JobStatus(BaseModel):
"""Job status model."""
job_id: str
status: str = "created" # created, uploading, processing, completed, failed
created_at: float = Field(default_factory=time.time)
updated_at: float = Field(default_factory=time.time)
num_images: int = 0
images_uploaded: int = 0
progress: float = 0.0
error_message: Optional[str] = None
model_id: Optional[str] = None
output_dir: Optional[str] = None
point_cloud_file: Optional[str] = None
obj_file: Optional[str] = None
processing_time: Optional[float] = None
class ServerConfig(BaseModel):
"""Server configuration model."""
cuda_mvs_path: str = DEFAULT_CUDA_MVS_PATH
output_dir: str = DEFAULT_OUTPUT_DIR
max_jobs: int = DEFAULT_MAX_JOBS
max_images_per_job: int = DEFAULT_MAX_IMAGES_PER_JOB
job_timeout: int = DEFAULT_JOB_TIMEOUT
api_key: Optional[str] = None
server_name: str = "CUDA MVS Server"
advertise_service: bool = True
gpu_info: Optional[str] = None
class JobRequest(BaseModel):
"""Job creation request model."""
num_images: int
class ProcessRequest(BaseModel):
"""Process job request model."""
reconstruction_quality: str = "normal" # low, normal, high
output_formats: List[str] = ["obj", "ply"]
class ServerInfo(BaseModel):
"""Server information model."""
server_id: str
name: str
version: str = "1.0.0"
status: str = "running"
capabilities: Dict[str, Any]
jobs: Dict[str, Any]
uptime: float
# Server implementation
class CUDAMVSServer:
"""
Server for remote CUDA Multi-View Stereo processing.
This server:
1. Accepts image uploads
2. Processes images using CUDA MVS
3. Provides 3D model downloads
4. Advertises itself on the local network
"""
def __init__(self, config: ServerConfig):
"""
Initialize the CUDA MVS server.
Args:
config: Server configuration
"""
self.config = config
self.app = FastAPI(title="CUDA MVS Server", description="Remote CUDA Multi-View Stereo processing server")
self.jobs: Dict[str, JobStatus] = {}
self.server_id = str(uuid.uuid4())
self.start_time = time.time()
self.zeroconf = None
# Create output directory
os.makedirs(config.output_dir, exist_ok=True)
# Configure CORS
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Detect GPU info
self.detect_gpu_info()
# Register routes
self.register_routes()
# Advertise service if enabled
if config.advertise_service:
self.advertise_service()
def detect_gpu_info(self):
"""Detect GPU information."""
if not self.config.gpu_info:
try:
# Try to get GPU info using nvidia-smi
result = subprocess.run(
["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader"],
capture_output=True,
text=True,
check=True
)
self.config.gpu_info = result.stdout.strip()
except (subprocess.SubprocessError, FileNotFoundError):
# If nvidia-smi fails, try lspci
try:
result = subprocess.run(
["lspci", "-v", "|", "grep", "-i", "vga"],
capture_output=True,
text=True,
shell=True
)
self.config.gpu_info = result.stdout.strip()
except subprocess.SubprocessError:
self.config.gpu_info = "Unknown GPU"
def register_routes(self):
"""Register API routes."""
# Authentication dependency
api_key_header = APIKeyHeader(name="Authorization", auto_error=False)
async def verify_api_key(api_key: str = Depends(api_key_header)):
if self.config.api_key:
if not api_key:
raise HTTPException(status_code=401, detail="API key required")
# Check if the API key is in the format "Bearer <key>"
if api_key.startswith("Bearer "):
api_key = api_key[7:]
if api_key != self.config.api_key:
raise HTTPException(status_code=401, detail="Invalid API key")
return True
# Status endpoint
@self.app.get("/api/status")
async def get_status():
return self.get_server_info()
# Job management endpoints
@self.app.post("/api/jobs", status_code=201, dependencies=[Depends(verify_api_key)])
async def create_job(job_request: JobRequest):
return self.create_job(job_request.num_images)
@self.app.get("/api/jobs", dependencies=[Depends(verify_api_key)])
async def list_jobs():
return {"jobs": self.jobs}
@self.app.get("/api/jobs/{job_id}", dependencies=[Depends(verify_api_key)])
async def get_job(job_id: str):
if job_id not in self.jobs:
raise HTTPException(status_code=404, detail="Job not found")
return self.jobs[job_id]
@self.app.delete("/api/jobs/{job_id}", dependencies=[Depends(verify_api_key)])
async def cancel_job(job_id: str):
if job_id not in self.jobs:
raise HTTPException(status_code=404, detail="Job not found")
# Cancel the job
job = self.jobs[job_id]
if job.status in ["created", "uploading", "processing"]:
job.status = "cancelled"
job.updated_at = time.time()
# Clean up job directory
job_dir = os.path.join(self.config.output_dir, job_id)
if os.path.exists(job_dir):
shutil.rmtree(job_dir)
return {"status": "success", "message": "Job cancelled"}
else:
return {"status": "error", "message": f"Cannot cancel job in {job.status} state"}
# Image upload endpoint
@self.app.post("/api/jobs/{job_id}/images", dependencies=[Depends(verify_api_key)])
async def upload_image(
job_id: str,
file: UploadFile = File(...),
metadata: str = Form(None)
):
if job_id not in self.jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = self.jobs[job_id]
# Check if job is in a valid state for uploads
if job.status not in ["created", "uploading"]:
raise HTTPException(
status_code=400,
detail=f"Cannot upload images to job in {job.status} state"
)
# Check if we've reached the maximum number of images
if job.images_uploaded >= job.num_images:
raise HTTPException(
status_code=400,
detail=f"Maximum number of images ({job.num_images}) already uploaded"
)
# Update job status
job.status = "uploading"
job.updated_at = time.time()
# Create job directory if it doesn't exist
job_dir = os.path.join(self.config.output_dir, job_id, "images")
os.makedirs(job_dir, exist_ok=True)
# Parse metadata
image_metadata = {}
if metadata:
try:
image_metadata = json.loads(metadata)
except json.JSONDecodeError:
logger.warning(f"Invalid metadata format: {metadata}")
# Save the file
image_index = job.images_uploaded
file_extension = os.path.splitext(file.filename)[1]
image_path = os.path.join(job_dir, f"image_{image_index:04d}{file_extension}")
with open(image_path, "wb") as f:
shutil.copyfileobj(file.file, f)
# Update job status
job.images_uploaded += 1
job.updated_at = time.time()
return {
"status": "success",
"job_id": job_id,
"image_index": image_index,
"image_path": image_path,
"images_uploaded": job.images_uploaded,
"total_images": job.num_images
}
# Process job endpoint
@self.app.post("/api/jobs/{job_id}/process", status_code=202, dependencies=[Depends(verify_api_key)])
async def process_job(
job_id: str,
process_request: ProcessRequest,
background_tasks: BackgroundTasks
):
if job_id not in self.jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = self.jobs[job_id]
# Check if job is in a valid state for processing
if job.status != "uploading":
raise HTTPException(
status_code=400,
detail=f"Cannot process job in {job.status} state"
)
# Check if all images have been uploaded
if job.images_uploaded < job.num_images:
raise HTTPException(
status_code=400,
detail=f"Not all images uploaded ({job.images_uploaded}/{job.num_images})"
)
# Update job status
job.status = "processing"
job.updated_at = time.time()
job.progress = 0.0
# Start processing in the background
background_tasks.add_task(
self.process_job_task,
job_id,
process_request.reconstruction_quality,
process_request.output_formats
)
return {
"status": "success",
"job_id": job_id,
"message": "Processing started"
}
# Model download endpoint
@self.app.get("/api/jobs/{job_id}/model", dependencies=[Depends(verify_api_key)])
async def download_model(job_id: str, format: str = "obj"):
if job_id not in self.jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = self.jobs[job_id]
# Check if job is completed
if job.status != "completed":
raise HTTPException(
status_code=400,
detail=f"Job is not completed. Current status: {job.status}"
)
# Check if the requested format is available
if format == "obj" and job.obj_file:
return FileResponse(job.obj_file, filename=f"{job.model_id}.obj")
elif format == "ply" and job.point_cloud_file:
return FileResponse(job.point_cloud_file, filename=f"{job.model_id}.ply")
else:
raise HTTPException(
status_code=404,
detail=f"Model in {format} format not available"
)
def create_job(self, num_images: int) -> Dict[str, Any]:
"""
Create a new job.
Args:
num_images: Number of images to be uploaded
Returns:
Dictionary with job information
"""
# Check if we've reached the maximum number of jobs
active_jobs = sum(1 for job in self.jobs.values() if job.status in ["created", "uploading", "processing"])
if active_jobs >= self.config.max_jobs:
raise HTTPException(
status_code=429,
detail=f"Maximum number of active jobs ({self.config.max_jobs}) reached"
)
# Check if the number of images is valid
if num_images <= 0 or num_images > self.config.max_images_per_job:
raise HTTPException(
status_code=400,
detail=f"Number of images must be between 1 and {self.config.max_images_per_job}"
)
# Create a new job
job_id = str(uuid.uuid4())
job = JobStatus(
job_id=job_id,
num_images=num_images
)
# Add job to the list
self.jobs[job_id] = job
# Create job directory
job_dir = os.path.join(self.config.output_dir, job_id)
os.makedirs(job_dir, exist_ok=True)
return job.dict()
async def process_job_task(
self,
job_id: str,
reconstruction_quality: str,
output_formats: List[str]
):
"""
Process a job in the background.
Args:
job_id: ID of the job to process
reconstruction_quality: Quality of the reconstruction (low, normal, high)
output_formats: List of output formats to generate
"""
if job_id not in self.jobs:
logger.error(f"Job {job_id} not found")
return
job = self.jobs[job_id]
job_dir = os.path.join(self.config.output_dir, job_id)
images_dir = os.path.join(job_dir, "images")
output_dir = os.path.join(job_dir, "output")
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Generate a model ID
model_id = f"model_{job_id[:8]}"
job.model_id = model_id
# Update job status
job.status = "processing"
job.progress = 0.0
job.updated_at = time.time()
try:
# Start timing
start_time = time.time()
# Run CUDA MVS
await self.run_cuda_mvs(
job,
images_dir,
output_dir,
model_id,
reconstruction_quality
)
# Convert output formats if needed
if "obj" in output_formats and not job.obj_file:
# Convert PLY to OBJ if needed
ply_file = job.point_cloud_file
if ply_file and os.path.exists(ply_file):
obj_file = os.path.join(output_dir, f"{model_id}.obj")
await self.convert_ply_to_obj(ply_file, obj_file)
job.obj_file = obj_file
# Calculate processing time
job.processing_time = time.time() - start_time
# Update job status
job.status = "completed"
job.progress = 100.0
job.updated_at = time.time()
job.output_dir = output_dir
logger.info(f"Job {job_id} completed successfully in {job.processing_time:.2f} seconds")
except Exception as e:
# Update job status
job.status = "failed"
job.error_message = str(e)
job.updated_at = time.time()
logger.error(f"Job {job_id} failed: {e}")
async def run_cuda_mvs(
self,
job: JobStatus,
images_dir: str,
output_dir: str,
model_id: str,
reconstruction_quality: str
):
"""
Run CUDA MVS on the uploaded images.
Args:
job: Job status object
images_dir: Directory containing the images
output_dir: Directory to save the output
model_id: ID of the model
reconstruction_quality: Quality of the reconstruction
"""
# Check if CUDA MVS is installed
cuda_mvs_executable = os.path.join(self.config.cuda_mvs_path, "build", "app_patch_match_mvs")
if not os.path.exists(cuda_mvs_executable):
raise FileNotFoundError(f"CUDA MVS executable not found at {cuda_mvs_executable}")
# Create a list of image paths
image_files = []
for file in os.listdir(images_dir):
if file.lower().endswith((".jpg", ".jpeg", ".png")):
image_files.append(os.path.join(images_dir, file))
if not image_files:
raise ValueError("No valid image files found")
# Sort image files to ensure consistent order
image_files.sort()
# Create a camera parameter file
camera_params_file = os.path.join(output_dir, "cameras.txt")
await self.generate_camera_params(image_files, camera_params_file)
# Set quality parameters
if reconstruction_quality == "low":
num_iterations = 3
max_resolution = 1024
elif reconstruction_quality == "normal":
num_iterations = 5
max_resolution = 2048
elif reconstruction_quality == "high":
num_iterations = 7
max_resolution = 4096
else:
num_iterations = 5
max_resolution = 2048
# Prepare output files
point_cloud_file = os.path.join(output_dir, f"{model_id}.ply")
# Build the command
cmd = [
cuda_mvs_executable,
"--input_folder", images_dir,
"--camera_file", camera_params_file,
"--output_folder", output_dir,
"--output_file", point_cloud_file,
"--num_iterations", str(num_iterations),
"--max_resolution", str(max_resolution)
]
# Run the command
logger.info(f"Running CUDA MVS: {' '.join(cmd)}")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Monitor progress
while True:
if process.poll() is not None:
break
# Read output line by line
output = process.stdout.readline()
if output:
# Try to parse progress information
if "Progress:" in output:
try:
progress_str = output.split("Progress:")[1].strip().rstrip("%")
progress = float(progress_str)
job.progress = progress
job.updated_at = time.time()
except (ValueError, IndexError):
pass
# Sleep briefly to avoid CPU spinning
await asyncio.sleep(0.1)
# Get the final output
stdout, stderr = process.communicate()
# Check if the process was successful
if process.returncode != 0:
raise RuntimeError(f"CUDA MVS failed with error: {stderr}")
# Check if the output file was created
if not os.path.exists(point_cloud_file):
raise FileNotFoundError(f"Output file not created: {point_cloud_file}")
# Update job with the output file
job.point_cloud_file = point_cloud_file
async def generate_camera_params(self, image_files: List[str], output_file: str):
"""
Generate camera parameters for CUDA MVS.
Args:
image_files: List of image files
output_file: Output file for camera parameters
"""
# For now, use a simple camera model with default parameters
# In a real implementation, this would use structure from motion
# to estimate camera parameters from the images
with open(output_file, "w") as f:
f.write(f"# Camera parameters for {len(image_files)} images\n")
f.write("# Format: image_name width height fx fy cx cy\n")
for i, image_file in enumerate(image_files):
# Get image dimensions
from PIL import Image
with Image.open(image_file) as img:
width, height = img.size
# Use default camera parameters
fx = width * 1.2 # Focal length x
fy = height * 1.2 # Focal length y
cx = width / 2 # Principal point x
cy = height / 2 # Principal point y
# Write camera parameters
f.write(f"{os.path.basename(image_file)} {width} {height} {fx} {fy} {cx} {cy}\n")
async def convert_ply_to_obj(self, ply_file: str, obj_file: str):
"""
Convert PLY file to OBJ format.
Args:
ply_file: Input PLY file
obj_file: Output OBJ file
"""
try:
import open3d as o3d
# Load the PLY file
mesh = o3d.io.read_triangle_mesh(ply_file)
# Save as OBJ
o3d.io.write_triangle_mesh(obj_file, mesh)
logger.info(f"Converted {ply_file} to {obj_file}")
except ImportError:
# If open3d is not available, use a subprocess
try:
# Try using meshlab
subprocess.run(
["meshlabserver", "-i", ply_file, "-o", obj_file],
check=True,
capture_output=True
)
except (subprocess.SubprocessError, FileNotFoundError):
# If meshlab is not available, try using assimp
try:
subprocess.run(
["assimp", "export", ply_file, obj_file],
check=True,
capture_output=True
)
except (subprocess.SubprocessError, FileNotFoundError):
raise RuntimeError("No suitable tool found to convert PLY to OBJ")
def get_server_info(self) -> Dict[str, Any]:
"""
Get server information.
Returns:
Dictionary with server information
"""
# Count active jobs
active_jobs = sum(1 for job in self.jobs.values() if job.status in ["created", "uploading", "processing"])
# Get server capabilities
capabilities = {
"max_jobs": self.config.max_jobs,
"max_images_per_job": self.config.max_images_per_job,
"job_timeout": self.config.job_timeout,
"supported_formats": ["obj", "ply"],
"gpu_info": self.config.gpu_info
}
# Get job summary
job_summary = {
"total": len(self.jobs),
"active": active_jobs,
"completed": sum(1 for job in self.jobs.values() if job.status == "completed"),
"failed": sum(1 for job in self.jobs.values() if job.status == "failed")
}
return {
"server_id": self.server_id,
"name": self.config.server_name,
"version": "1.0.0",
"status": "running",
"capabilities": capabilities,
"jobs": job_summary,
"uptime": time.time() - self.start_time
}
def advertise_service(self):
"""Advertise the server on the local network using Zeroconf."""
try:
import socket
from zeroconf import ServiceInfo, Zeroconf
# Get local IP address
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
# Create service info
capabilities = {
"max_jobs": self.config.max_jobs,
"max_images_per_job": self.config.max_images_per_job,
"supported_formats": ["obj", "ply"],
"gpu_info": self.config.gpu_info
}
service_info = ServiceInfo(
"_cudamvs._tcp.local.",
f"{self.server_id}._cudamvs._tcp.local.",
addresses=[socket.inet_aton(local_ip)],
port=DEFAULT_PORT,
properties={
b"name": self.config.server_name.encode("utf-8"),
b"capabilities": json.dumps(capabilities).encode("utf-8")
}
)
# Register service
self.zeroconf = Zeroconf()
self.zeroconf.register_service(service_info)
logger.info(f"Advertising CUDA MVS service on {local_ip}:{DEFAULT_PORT}")
except Exception as e:
logger.error(f"Failed to advertise service: {e}")
def run(self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT):
"""
Run the server.
Args:
host: Host to bind to
port: Port to bind to
"""
uvicorn.run(self.app, host=host, port=port)
def cleanup(self):
"""Clean up resources."""
if self.zeroconf:
self.zeroconf.close()
# Main entry point
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="CUDA MVS Server")
parser.add_argument("--host", default=DEFAULT_HOST, help="Host to bind to")
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Port to bind to")
parser.add_argument("--cuda-mvs-path", default=DEFAULT_CUDA_MVS_PATH, help="Path to CUDA MVS installation")
parser.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR, help="Output directory")
parser.add_argument("--max-jobs", type=int, default=DEFAULT_MAX_JOBS, help="Maximum number of concurrent jobs")
parser.add_argument("--max-images", type=int, default=DEFAULT_MAX_IMAGES_PER_JOB, help="Maximum images per job")
parser.add_argument("--job-timeout", type=int, default=DEFAULT_JOB_TIMEOUT, help="Job timeout in seconds")
parser.add_argument("--api-key", help="API key for authentication")
parser.add_argument("--server-name", default="CUDA MVS Server", help="Server name")
parser.add_argument("--no-advertise", action="store_true", help="Don't advertise service on the network")
args = parser.parse_args()
# Create server configuration
config = ServerConfig(
cuda_mvs_path=args.cuda_mvs_path,
output_dir=args.output_dir,
max_jobs=args.max_jobs,
max_images_per_job=args.max_images,
job_timeout=args.job_timeout,
api_key=args.api_key,
server_name=args.server_name,
advertise_service=not args.no_advertise
)
# Create and run server
server = CUDAMVSServer(config)
try:
server.run(host=args.host, port=args.port)
finally:
server.cleanup()
if __name__ == "__main__":
# Add asyncio import for async/await support
import asyncio
main()
```