This is page 1 of 3. Use http://codebase.md/jhacksman/openscad-mcp-server?lines=false&page={x} to view the full context.
# Directory Structure
```
├── implementation_plan.md
├── old
│ ├── download_sam2_checkpoint.py
│ ├── src
│ │ ├── ai
│ │ │ └── sam_segmentation.py
│ │ ├── models
│ │ │ └── threestudio_generator.py
│ │ └── workflow
│ │ └── image_to_model_pipeline.py
│ └── test_sam2_segmentation.py
├── README.md
├── requirements.txt
├── rtfmd
│ ├── decisions
│ │ ├── ai-driven-code-generation.md
│ │ └── export-formats.md
│ ├── files
│ │ └── src
│ │ ├── ai
│ │ │ └── ai_service.py.md
│ │ ├── main.py.md
│ │ ├── models
│ │ │ └── code_generator.py.md
│ │ └── nlp
│ │ └── parameter_extractor.py.md
│ ├── knowledge
│ │ ├── ai
│ │ │ └── natural-language-processing.md
│ │ ├── nlp
│ │ │ └── parameter-extraction.md
│ │ └── openscad
│ │ ├── export-formats.md
│ │ ├── openscad-basics.md
│ │ └── primitive-testing.md
│ └── README.md
├── scad
│ └── simple_cube.scad
├── src
│ ├── __init__.py
│ ├── __pycache__
│ │ └── __init__.cpython-312.pyc
│ ├── ai
│ │ ├── ai_service.py
│ │ ├── gemini_api.py
│ │ └── venice_api.py
│ ├── config.py
│ ├── main_remote.py
│ ├── main.py
│ ├── main.py.new
│ ├── models
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ └── code_generator.cpython-312.pyc
│ │ ├── code_generator.py
│ │ ├── cuda_mvs.py
│ │ └── scad_templates
│ │ └── basic_shapes.scad
│ ├── nlp
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ └── parameter_extractor.cpython-312.pyc
│ │ └── parameter_extractor.py
│ ├── openscad_wrapper
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ └── wrapper.cpython-312.pyc
│ │ └── wrapper.py
│ ├── printer_discovery
│ │ ├── __init__.py
│ │ └── printer_discovery.py
│ ├── remote
│ │ ├── connection_manager.py
│ │ ├── cuda_mvs_client.py
│ │ ├── cuda_mvs_server.py
│ │ └── error_handling.py
│ ├── testing
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ ├── primitive_tester.cpython-312.pyc
│ │ │ └── test_primitives.cpython-312.pyc
│ │ ├── primitive_tester.py
│ │ └── test_primitives.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ ├── stl_exporter.cpython-312.pyc
│ │ │ └── stl_validator.cpython-312.pyc
│ │ ├── cad_exporter.py
│ │ ├── format_validator.py
│ │ ├── stl_exporter.py
│ │ ├── stl_repair.py
│ │ └── stl_validator.py
│ ├── visualization
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ └── renderer.cpython-312.pyc
│ │ ├── headless_renderer.py
│ │ ├── renderer.py
│ │ └── web_interface.py
│ └── workflow
│ ├── image_approval.py
│ └── multi_view_to_model_pipeline.py
├── test_complete_workflow.py
├── test_cuda_mvs.py
├── test_gemini_api.py
├── test_image_approval_workflow.py
├── test_image_approval.py
├── test_image_to_model_pipeline.py
├── test_model_selection.py
├── test_multi_view_pipeline.py
├── test_primitives.sh
├── test_rabbit_direct.py
├── test_remote_cuda_mvs.py
└── test_venice_example.py
```
# Files
--------------------------------------------------------------------------------
/rtfmd/README.md:
--------------------------------------------------------------------------------
```markdown
# Reasoning Trace Framework for OpenSCAD MCP Server
This directory contains the Reasoning Trace Framework (RTF) documentation for the OpenSCAD MCP Server project. The RTF provides insight into the design decisions, mental models, and reasoning processes behind the implementation.
## Directory Structure
- `/rtfmd/files/` - Shadow file system mirroring the actual source code structure
- Contains `.md` files with the same names as their corresponding source files
- Each file documents the reasoning behind the implementation
- `/rtfmd/knowledge/` - Domain knowledge documentation
- `/openscad/` - Knowledge about OpenSCAD and 3D modeling
- `/ai/` - Knowledge about AI and natural language processing
- `/nlp/` - Knowledge about natural language parameter extraction
- `/rtfmd/decisions/` - Architectural decision records
- Documents major design decisions and their rationales
## How to Use This Documentation
1. Start with the `/rtfmd/files/src/main.py.md` file to understand the overall architecture
2. Explore specific components through their corresponding `.md` files
3. Refer to the knowledge directory for domain-specific information
4. Review the decisions directory for major architectural decisions
## Tags Used
- `<metadata>` - File metadata including author, timestamp, version, etc.
- `<exploration>` - Documents the exploration process and alternatives considered
- `<mental-model>` - Explains the mental model used in the implementation
- `<pattern-recognition>` - Identifies design patterns used
- `<trade-off>` - Documents trade-offs considered and choices made
- `<domain-knowledge>` - References to domain knowledge required
- `<technical-debt>` - Acknowledges technical debt and future improvements
- `<knowledge-refs>` - References to related knowledge documents
## Contributing
When modifying the codebase, please update the corresponding RTF documentation to reflect your reasoning and design decisions.
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# OpenSCAD MCP Server
A Model Context Protocol (MCP) server that enables users to generate 3D models from text descriptions or images, with a focus on creating parametric 3D models using multi-view reconstruction and OpenSCAD.
## Features
- **AI Image Generation**: Generate images from text descriptions using Google Gemini or Venice.ai APIs
- **Multi-View Image Generation**: Create multiple views of the same 3D object for reconstruction
- **Image Approval Workflow**: Review and approve/deny generated images before reconstruction
- **3D Reconstruction**: Convert approved multi-view images into 3D models using CUDA Multi-View Stereo
- **Remote Processing**: Process computationally intensive tasks on remote servers within your LAN
- **OpenSCAD Integration**: Generate parametric 3D models using OpenSCAD
- **Parametric Export**: Export models in formats that preserve parametric properties (CSG, AMF, 3MF, SCAD)
- **3D Printer Discovery**: Optional network printer discovery and direct printing
## Architecture
The server is built using the Python MCP SDK and follows a modular architecture:
```
openscad-mcp-server/
├── src/
│ ├── main.py # Main application
│ ├── main_remote.py # Remote CUDA MVS server
│ ├── ai/ # AI integrations
│ │ ├── gemini_api.py # Google Gemini API for image generation
│ │ └── venice_api.py # Venice.ai API for image generation (optional)
│ ├── models/ # 3D model generation
│ │ ├── cuda_mvs.py # CUDA Multi-View Stereo integration
│ │ └── code_generator.py # OpenSCAD code generation
│ ├── workflow/ # Workflow components
│ │ ├── image_approval.py # Image approval mechanism
│ │ └── multi_view_to_model_pipeline.py # Complete pipeline
│ ├── remote/ # Remote processing
│ │ ├── cuda_mvs_client.py # Client for remote CUDA MVS processing
│ │ ├── cuda_mvs_server.py # Server for remote CUDA MVS processing
│ │ ├── connection_manager.py # Remote connection management
│ │ └── error_handling.py # Error handling for remote processing
│ ├── openscad_wrapper/ # OpenSCAD CLI wrapper
│ ├── visualization/ # Preview generation and web interface
│ ├── utils/ # Utility functions
│ └── printer_discovery/ # 3D printer discovery
├── scad/ # Generated OpenSCAD files
├── output/ # Output files (models, previews)
│ ├── images/ # Generated images
│ ├── multi_view/ # Multi-view images
│ ├── approved_images/ # Approved images for reconstruction
│ └── models/ # Generated 3D models
├── templates/ # Web interface templates
└── static/ # Static files for web interface
```
## Installation
1. Clone the repository:
```
git clone https://github.com/jhacksman/OpenSCAD-MCP-Server.git
cd OpenSCAD-MCP-Server
```
2. Create a virtual environment:
```
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
3. Install dependencies:
```
pip install -r requirements.txt
```
4. Install OpenSCAD:
- Ubuntu/Debian: `sudo apt-get install openscad`
- macOS: `brew install openscad`
- Windows: Download from [openscad.org](https://openscad.org/downloads.html)
5. Install CUDA Multi-View Stereo:
```
git clone https://github.com/fixstars/cuda-multi-view-stereo.git
cd cuda-multi-view-stereo
mkdir build && cd build
cmake ..
make
```
6. Set up API keys:
- Create a `.env` file in the root directory
- Add your API keys:
```
GEMINI_API_KEY=your-gemini-api-key
VENICE_API_KEY=your-venice-api-key # Optional
REMOTE_CUDA_MVS_API_KEY=your-remote-api-key # For remote processing
```
## Remote Processing Setup
The server supports remote processing of computationally intensive tasks, particularly CUDA Multi-View Stereo reconstruction. This allows you to offload processing to more powerful machines within your LAN.
### Server Setup (on the machine with CUDA GPU)
1. Install CUDA Multi-View Stereo on the server machine:
```
git clone https://github.com/fixstars/cuda-multi-view-stereo.git
cd cuda-multi-view-stereo
mkdir build && cd build
cmake ..
make
```
2. Start the remote CUDA MVS server:
```
python src/main_remote.py
```
3. The server will automatically advertise itself on the local network using Zeroconf.
### Client Configuration
1. Configure remote processing in your `.env` file:
```
REMOTE_CUDA_MVS_ENABLED=True
REMOTE_CUDA_MVS_USE_LAN_DISCOVERY=True
REMOTE_CUDA_MVS_API_KEY=your-shared-secret-key
```
2. Alternatively, you can specify a server URL directly:
```
REMOTE_CUDA_MVS_ENABLED=True
REMOTE_CUDA_MVS_USE_LAN_DISCOVERY=False
REMOTE_CUDA_MVS_SERVER_URL=http://server-ip:8765
REMOTE_CUDA_MVS_API_KEY=your-shared-secret-key
```
### Remote Processing Features
- **Automatic Server Discovery**: Find CUDA MVS servers on your local network
- **Job Management**: Upload images, track job status, and download results
- **Fault Tolerance**: Automatic retries, circuit breaker pattern, and error tracking
- **Authentication**: Secure API key authentication for all remote operations
- **Health Monitoring**: Continuous server health checks and status reporting
## Usage
1. Start the server:
```
python src/main.py
```
2. The server will start on http://localhost:8000
3. Use the MCP tools to interact with the server:
- **generate_image_gemini**: Generate an image using Google Gemini API
```json
{
"prompt": "A low-poly rabbit with black background",
"model": "gemini-2.0-flash-exp-image-generation"
}
```
- **generate_multi_view_images**: Generate multiple views of the same 3D object
```json
{
"prompt": "A low-poly rabbit",
"num_views": 4
}
```
- **create_3d_model_from_images**: Create a 3D model from approved multi-view images
```json
{
"image_ids": ["view_1", "view_2", "view_3", "view_4"],
"output_name": "rabbit_model"
}
```
- **create_3d_model_from_text**: Complete pipeline from text to 3D model
```json
{
"prompt": "A low-poly rabbit",
"num_views": 4
}
```
- **export_model**: Export a model to a specific format
```json
{
"model_id": "your-model-id",
"format": "obj" // or "stl", "ply", "scad", etc.
}
```
- **discover_remote_cuda_mvs_servers**: Find CUDA MVS servers on your network
```json
{
"timeout": 5
}
```
- **get_remote_job_status**: Check the status of a remote processing job
```json
{
"server_id": "server-id",
"job_id": "job-id"
}
```
- **download_remote_model_result**: Download a completed model from a remote server
```json
{
"server_id": "server-id",
"job_id": "job-id",
"output_name": "model-name"
}
```
- **discover_printers**: Discover 3D printers on the network
```json
{}
```
- **print_model**: Print a model on a connected printer
```json
{
"model_id": "your-model-id",
"printer_id": "your-printer-id"
}
```
## Image Generation Options
The server supports multiple image generation options:
1. **Google Gemini API** (Default): Uses the Gemini 2.0 Flash Experimental model for high-quality image generation
- Supports multi-view generation with consistent style
- Requires a Google Gemini API key
2. **Venice.ai API** (Optional): Alternative image generation service
- Supports various models including flux-dev and fluently-xl
- Requires a Venice.ai API key
3. **User-Provided Images**: Skip image generation and use your own images
- Upload images directly to the server
- Useful for working with existing photographs or renders
## Multi-View Workflow
The server implements a multi-view workflow for 3D reconstruction:
1. **Image Generation**: Generate multiple views of the same 3D object
2. **Image Approval**: Review and approve/deny each generated image
3. **3D Reconstruction**: Convert approved images into a 3D model using CUDA MVS
- Can be processed locally or on a remote server within your LAN
4. **Model Refinement**: Optionally refine the model using OpenSCAD
## Remote Processing Workflow
The remote processing workflow allows you to offload computationally intensive tasks to more powerful machines:
1. **Server Discovery**: Automatically discover CUDA MVS servers on your network
2. **Image Upload**: Upload approved multi-view images to the remote server
3. **Job Processing**: Process the images on the remote server using CUDA MVS
4. **Status Tracking**: Monitor the job status and progress
5. **Result Download**: Download the completed 3D model when processing is finished
## Supported Export Formats
The server supports exporting models in various formats:
- **OBJ**: Wavefront OBJ format (standard 3D model format)
- **STL**: Standard Triangle Language (for 3D printing)
- **PLY**: Polygon File Format (for point clouds and meshes)
- **SCAD**: OpenSCAD source code (for parametric models)
- **CSG**: OpenSCAD CSG format (preserves all parametric properties)
- **AMF**: Additive Manufacturing File Format (preserves some metadata)
- **3MF**: 3D Manufacturing Format (modern replacement for STL with metadata)
## Web Interface
The server provides a web interface for:
- Generating and approving multi-view images
- Previewing 3D models from different angles
- Downloading models in various formats
Access the interface at http://localhost:8000/ui/
## License
MIT
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
```
--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/models/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/nlp/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/openscad_wrapper/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/testing/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/visualization/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/utils/__init__.py:
--------------------------------------------------------------------------------
```python
# Utils package
```
--------------------------------------------------------------------------------
/src/printer_discovery/__init__.py:
--------------------------------------------------------------------------------
```python
# Printer discovery package
```
--------------------------------------------------------------------------------
/test_primitives.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
# Test OpenSCAD primitives with different export formats
PYTHON="python"
OUTPUT_DIR="test_output"
# Create output directory
mkdir -p $OUTPUT_DIR
# Run the tests
$PYTHON -m src.testing.test_primitives --output-dir $OUTPUT_DIR --validate
echo "Tests completed. Results are in $OUTPUT_DIR"
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
# Core dependencies
fastapi>=0.95.0
uvicorn>=0.21.0
pydantic>=2.0.0
python-multipart>=0.0.6
# MCP SDK
git+https://github.com/modelcontextprotocol/python-sdk.git
# Image processing
pillow>=9.5.0
opencv-python>=4.7.0
# HTTP client
requests>=2.28.0
httpx>=0.24.0
# Utilities
python-dotenv>=1.0.0
pyyaml>=6.0
jinja2>=3.1.2
numpy>=1.24.0
uuid>=1.30.0
tqdm>=4.65.0
# Image Generation - Venice.ai API (optional)
# (using existing requests and python-dotenv)
# Image Generation - Google Gemini API
google-generativeai>=0.3.0
# Network and Service Discovery
zeroconf>=0.39.0
aiohttp>=3.8.4
# 3D Reconstruction - CUDA Multi-View Stereo
open3d>=0.17.0
trimesh>=3.21.0
pyrender>=0.1.45
# Remote Processing
fastapi-utils>=0.2.1
python-jose>=3.3.0 # For JWT authentication
aiofiles>=23.1.0
# For development
pytest>=7.3.1
black>=23.3.0
isort>=5.12.0
pytest-asyncio>=0.21.0
# Deprecated dependencies (kept for reference)
# segment-anything-2>=1.0
# torch>=2.0.0
# torchvision>=0.15.0
# pytorch3d>=0.7.4
# ninja>=1.11.0
```
--------------------------------------------------------------------------------
/test_model_selection.py:
--------------------------------------------------------------------------------
```python
import os
import logging
from src.ai.venice_api import VeniceImageGenerator
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Venice.ai API key (replace with your own or use environment variable)
VENICE_API_KEY = os.getenv("VENICE_API_KEY", "B9Y68yQgatQw8wmpmnIMYcGip1phCt-43CS0OktZU6")
OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output", "images")
# Test natural language model selection
def test_model_selection():
"""Test the natural language model selection functionality."""
# Initialize the Venice API client
venice_generator = VeniceImageGenerator(VENICE_API_KEY, OUTPUT_DIR)
# Test cases - natural language preferences to expected model mappings
test_cases = [
("default", "fluently-xl"),
("fastest model please", "fluently-xl"),
("I need a high quality image", "flux-dev"),
("create an uncensored image", "flux-dev-uncensored"),
("make it realistic", "pony-realism"),
("I want something artistic", "lustify-sdxl"),
("use stable diffusion", "stable-diffusion-3.5"),
("invalid model name", "fluently-xl"), # Should default to fluently-xl
]
# Run tests
for preference, expected_model in test_cases:
mapped_model = venice_generator.map_model_preference(preference)
logger.info(f"Preference: '{preference}' -> Model: '{mapped_model}'")
assert mapped_model == expected_model, f"Expected {expected_model}, got {mapped_model}"
logger.info("All model preference mappings tests passed!")
if __name__ == "__main__":
logger.info("Starting Venice.ai model selection mapping tests")
test_model_selection()
```
--------------------------------------------------------------------------------
/test_venice_example.py:
--------------------------------------------------------------------------------
```python
import os
import requests
import json
# Venice.ai API configuration
VENICE_API_KEY = os.getenv("VENICE_API_KEY", "") # Set via environment variable
OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output", "images")
os.makedirs(OUTPUT_DIR, exist_ok=True)
# API endpoint
url = "https://api.venice.ai/api/v1/image/generate"
# Test prompt
prompt = "A coffee mug with geometric patterns"
# Prepare payload
payload = {
"model": "fluently-xl", # Try default model instead of flux
"prompt": prompt,
"height": 1024,
"width": 1024,
"steps": 20,
"return_binary": False,
"hide_watermark": False,
"format": "png",
"embed_exif_metadata": False
}
# Set up headers
headers = {
"Authorization": f"Bearer {VENICE_API_KEY}",
"Content-Type": "application/json"
}
print(f"Sending request to {url} with prompt: '{prompt}'")
# Make API request
try:
response = requests.post(url, json=payload, headers=headers)
print(f"Response status: {response.status_code}")
if response.status_code == 200:
result = response.json()
print("\nImage generation result:")
print(json.dumps(result, indent=2))
# Print image URL if available
if "images" in result and len(result["images"]) > 0:
image_url = result["images"][0]
print(f"\nImage URL: {image_url}")
# Download image
image_filename = f"{prompt[:20].replace(' ', '_')}_flux.png"
image_path = os.path.join(OUTPUT_DIR, image_filename)
print(f"Downloading image to {image_path}...")
img_response = requests.get(image_url, stream=True)
if img_response.status_code == 200:
with open(image_path, 'wb') as f:
for chunk in img_response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Image saved to {image_path}")
else:
print(f"Failed to download image: {img_response.status_code}")
else:
print(f"Error: {response.text}")
except Exception as e:
print(f"Error: {str(e)}")
```
--------------------------------------------------------------------------------
/rtfmd/knowledge/openscad/openscad-basics.md:
--------------------------------------------------------------------------------
```markdown
# OpenSCAD Basics
<metadata>
author: devin-ai-integration
timestamp: 2025-03-21T01:30:00Z
version: 1.0.0
tags: [openscad, 3d-modeling, csg, parametric-design]
</metadata>
## Overview
OpenSCAD is a programmer's solid 3D CAD modeler that uses a scripting language to define 3D objects. Unlike traditional CAD software that focuses on interactive modeling, OpenSCAD emphasizes programmatic and parametric design.
## Key Concepts
### Constructive Solid Geometry (CSG)
OpenSCAD uses CSG operations to create complex models by combining simpler primitives:
- **Union**: Combines multiple objects (`union() { ... }`)
- **Difference**: Subtracts one object from another (`difference() { ... }`)
- **Intersection**: Creates an object from the overlapping portions of other objects (`intersection() { ... }`)
### Primitive Shapes
OpenSCAD provides several built-in primitive shapes:
- **Cube**: `cube([width, depth, height], center=true/false)`
- **Sphere**: `sphere(r=radius, $fn=segments)`
- **Cylinder**: `cylinder(h=height, r=radius, center=true/false, $fn=segments)`
- **Polyhedron**: For complex shapes with defined faces
### Transformations
Objects can be transformed using:
- **Translate**: `translate([x, y, z]) { ... }`
- **Rotate**: `rotate([x_deg, y_deg, z_deg]) { ... }`
- **Scale**: `scale([x, y, z]) { ... }`
- **Mirror**: `mirror([x, y, z]) { ... }`
### Parametric Design
OpenSCAD excels at parametric design:
- Variables can define dimensions and relationships
- Modules can create reusable components with parameters
- Mathematical expressions can define complex relationships
## Command Line Usage
OpenSCAD can be run headless using command-line options:
- Generate STL: `openscad -o output.stl input.scad`
- Pass parameters: `openscad -D "width=10" -D "height=20" -o output.stl input.scad`
- Generate PNG preview: `openscad --camera=0,0,0,0,0,0,50 --imgsize=800,600 -o preview.png input.scad`
## Best Practices
- Use modules for reusable components
- Parameterize designs for flexibility
- Use descriptive variable names
- Comment code for clarity
- Organize complex designs hierarchically
- Use $fn judiciously for performance
- Ensure models are manifold (watertight) for 3D printing
```
--------------------------------------------------------------------------------
/src/visualization/headless_renderer.py:
--------------------------------------------------------------------------------
```python
import os
import logging
from PIL import Image, ImageDraw
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class HeadlessRenderer:
"""Provides rendering capabilities for OpenSCAD in headless environments."""
def __init__(self, openscad_path: str = "openscad"):
self.openscad_path = openscad_path
self.camera_angles = {
"front": "0,0,0,0,0,0,50",
"top": "0,0,0,90,0,0,50",
"right": "0,0,0,0,0,90,50",
"perspective": "70,0,35,25,0,25,250"
}
def create_placeholder_image(self, output_path: str, model_id: str, view: str = "perspective") -> str:
"""Create a placeholder image with model information."""
try:
# Create a blank image
width, height = 800, 600
image = Image.new('RGB', (width, height), color=(240, 240, 240))
draw = ImageDraw.Draw(image)
# Add text
draw.text((20, 20), f"OpenSCAD Model: {model_id}", fill=(0, 0, 0))
draw.text((20, 60), f"View: {view}", fill=(0, 0, 0))
draw.text((20, 100), "Headless rendering mode", fill=(0, 0, 0))
# Draw a simple 3D shape
draw.polygon([(400, 200), (300, 300), (500, 300)], outline=(0, 0, 0), width=2)
draw.polygon([(400, 200), (500, 300), (500, 400)], outline=(0, 0, 0), width=2)
draw.polygon([(400, 200), (300, 300), (300, 400)], outline=(0, 0, 0), width=2)
draw.rectangle((300, 300, 500, 400), outline=(0, 0, 0), width=2)
# Add note about headless mode
note = "Note: This is a placeholder image. OpenSCAD preview generation"
note2 = "requires an X server or a headless rendering solution."
draw.text((20, 500), note, fill=(150, 0, 0))
draw.text((20, 530), note2, fill=(150, 0, 0))
# Save the image
image.save(output_path)
logger.info(f"Created placeholder image: {output_path}")
return output_path
except Exception as e:
logger.error(f"Error creating placeholder image: {str(e)}")
return output_path
```
--------------------------------------------------------------------------------
/test_rabbit_direct.py:
--------------------------------------------------------------------------------
```python
import os
import sys
import json
import requests
import base64
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Venice.ai API configuration
VENICE_API_KEY = os.getenv("VENICE_API_KEY", "B9Y68yQgatQw8wmpmnIMYcGip1phCt-43CS0OktZU6")
OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output", "images")
os.makedirs(OUTPUT_DIR, exist_ok=True)
# API configuration
url = "https://api.venice.ai/api/v1/image/generate"
headers = {
"Authorization": f"Bearer {VENICE_API_KEY}",
"Content-Type": "application/json"
}
# Payload for image generation
payload = {
"height": 1024,
"width": 1024,
"steps": 20,
"return_binary": True, # Request binary data directly
"hide_watermark": False,
"format": "png",
"embed_exif_metadata": False,
"model": "flux-dev",
"prompt": "A low-poly rabbit with black background. 3d file"
}
def generate_image():
"""Generate image using Venice.ai API with the rabbit prompt."""
try:
logger.info(f"Sending request to {url} with prompt: '{payload['prompt']}'")
response = requests.post(url, json=payload, headers=headers)
logger.info(f"Response status: {response.status_code}")
if response.status_code == 200:
# Save the raw binary response
filename = "rabbit_low_poly_3d.png"
output_path = os.path.join(OUTPUT_DIR, filename)
with open(output_path, "wb") as f:
f.write(response.content)
logger.info(f"Image saved to {output_path}")
return output_path
else:
logger.error(f"Error: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error: {str(e)}")
return None
if __name__ == "__main__":
logger.info("Starting Venice.ai image generation test with rabbit prompt")
image_path = generate_image()
if image_path:
logger.info(f"Successfully generated and saved image to {image_path}")
print(f"\nImage saved to: {image_path}")
else:
logger.error("Failed to generate image")
```
--------------------------------------------------------------------------------
/rtfmd/files/src/ai/ai_service.py.md:
--------------------------------------------------------------------------------
```markdown
<metadata>
author: devin-ai-integration
timestamp: 2025-03-21T01:30:00Z
version: 1.0.0
related-files: [/src/models/code_generator.py]
prompt: "Implement AI-driven code generator for OpenSCAD"
</metadata>
<exploration>
The AI service component was designed to provide natural language processing capabilities for generating OpenSCAD code from user descriptions. Initial approaches considered:
1. Using a template-based approach with predefined patterns
2. Implementing a full NLP pipeline with custom entity extraction
3. Creating a hybrid approach that combines pattern matching with contextual understanding
The hybrid approach was selected as it provides flexibility while maintaining performance.
</exploration>
<mental-model>
The AI service operates on the concept of "component identification" - breaking down natural language descriptions into geometric primitives, operations, features, and modifiers. This mental model aligns with how OpenSCAD itself works, where complex models are built from primitive shapes and CSG operations.
</mental-model>
<pattern-recognition>
The implementation uses the Strategy pattern for different parsing strategies and the Factory pattern for code generation. These patterns allow for extensibility as new shape types or operations are added.
</pattern-recognition>
<trade-off>
Options considered:
1. Full machine learning approach with embeddings and neural networks
2. Rule-based pattern matching with regular expressions
3. Hybrid approach with pattern matching and contextual rules
The hybrid approach was chosen because:
- Lower computational requirements than full ML
- More flexible than pure rule-based systems
- Easier to debug and maintain
- Can be extended with more sophisticated ML in the future
</trade-off>
<domain-knowledge>
The implementation required understanding of:
- OpenSCAD's modeling paradigm (CSG operations)
- Common 3D modeling terminology
- Natural language processing techniques
- Regular expression pattern matching
</domain-knowledge>
<knowledge-refs>
[OpenSCAD Basics](/rtfmd/knowledge/openscad/openscad-basics.md) - Last updated 2025-03-21
[Natural Language Processing](/rtfmd/knowledge/ai/natural-language-processing.md) - Last updated 2025-03-21
</knowledge-refs>
```
--------------------------------------------------------------------------------
/rtfmd/knowledge/openscad/primitive-testing.md:
--------------------------------------------------------------------------------
```markdown
# Testing OpenSCAD Primitives
<metadata>
author: devin-ai-integration
timestamp: 2025-03-21T12:00:00Z
version: 1.0.0
tags: [openscad, primitives, testing, 3d-modeling]
</metadata>
## Overview
Testing OpenSCAD primitives is essential to ensure that the MCP server can reliably generate and export 3D models. This document outlines approaches for programmatically testing primitives and validating their exports.
## Primitive Types
The OpenSCAD MCP Server supports these primitive types:
- **Basic Shapes**: cube, sphere, cylinder
- **Complex Shapes**: cone, torus, hexagonal_prism
- **Containers**: hollow_box, rounded_box, tube
- **Text**: 3D text with customizable parameters
## Testing Approach
### Parameter Testing
Each primitive should be tested with:
- **Default Parameters**: Ensure the primitive renders correctly with default values
- **Boundary Values**: Test minimum and maximum reasonable values
- **Special Cases**: Test cases like zero dimensions, negative values
### Export Testing
For each primitive and parameter set:
- **Export Formats**: Test export to 3MF, AMF, CSG, and SCAD formats
- **Format Validation**: Ensure exported files meet format specifications
- **Metadata Preservation**: Verify that parametric properties are preserved
### Integration Testing
Test the full pipeline:
- **Natural Language → Parameters**: Test parameter extraction
- **Parameters → OpenSCAD Code**: Test code generation
- **OpenSCAD Code → Export**: Test file export
- **Export → Printer**: Test compatibility with printer software
## Validation Criteria
Exported models should meet these criteria:
- **Manifold**: Models should be watertight (no holes in the mesh)
- **Valid Format**: Files should validate against format specifications
- **Metadata**: Should contain relevant model metadata
- **Render Performance**: Models should render efficiently
## Implementation
The `PrimitiveTester` class implements this testing approach:
```python
# Example usage
tester = PrimitiveTester(code_generator, cad_exporter)
results = tester.test_all_primitives()
# Test specific primitives
cube_results = tester.test_primitive("cube")
```
## Printer Compatibility Tests
Before sending to physical printers:
1. Import exports into PrusaSlicer or Bambu Studio
2. Check for import warnings or errors
3. Verify that models slice correctly
4. Test prints with simple examples
```
--------------------------------------------------------------------------------
/rtfmd/files/src/models/code_generator.py.md:
--------------------------------------------------------------------------------
```markdown
<metadata>
author: devin-ai-integration
timestamp: 2025-03-21T01:30:00Z
version: 1.0.0
related-files: [/src/ai/ai_service.py, /src/nlp/parameter_extractor.py]
prompt: "Implement AI-driven code generator for OpenSCAD"
</metadata>
<exploration>
The code generator was designed to translate natural language descriptions and extracted parameters into valid OpenSCAD code. Several approaches were considered:
1. Direct string manipulation for code generation
2. Template-based approach with parameter substitution
3. Modular approach with separate modules for different shape types
The modular approach was selected for its maintainability and extensibility.
</exploration>
<mental-model>
The code generator operates on a "shape-to-module" mapping paradigm, where each identified shape type corresponds to a specific OpenSCAD module. This mental model allows for clean separation of concerns and makes it easy to add new shape types.
</mental-model>
<pattern-recognition>
The implementation uses the Factory pattern for code generation, where different shape types are mapped to different module generators. This pattern allows for easy extension with new shape types.
</pattern-recognition>
<trade-off>
Options considered:
1. Generating raw OpenSCAD primitives directly
2. Using a library of pre-defined modules
3. Hybrid approach with both primitives and modules
The library approach was chosen because:
- More maintainable and readable code
- Easier to implement complex shapes
- Better parameter handling
- More consistent output
</trade-off>
<domain-knowledge>
The implementation required understanding of:
- OpenSCAD syntax and semantics
- Constructive Solid Geometry (CSG) operations
- Parametric modeling concepts
- 3D geometry fundamentals
</domain-knowledge>
<technical-debt>
The current implementation has some limitations:
- Limited support for complex nested operations
- No support for custom user-defined modules
- Basic error handling for invalid parameters
Future improvements planned:
- Enhanced error handling with meaningful messages
- Support for user-defined modules
- More sophisticated CSG operation chaining
</technical-debt>
<knowledge-refs>
[OpenSCAD Basics](/rtfmd/knowledge/openscad/openscad-basics.md) - Last updated 2025-03-21
[AI-Driven Code Generation](/rtfmd/decisions/ai-driven-code-generation.md) - Last updated 2025-03-21
</knowledge-refs>
```
--------------------------------------------------------------------------------
/rtfmd/files/src/main.py.md:
--------------------------------------------------------------------------------
```markdown
<metadata>
author: devin-ai-integration
timestamp: 2025-03-21T01:30:00Z
version: 1.0.0
related-files: [/src/ai/ai_service.py, /src/models/code_generator.py, /src/nlp/parameter_extractor.py]
prompt: "Build an MCP server for OpenSCAD"
</metadata>
<exploration>
The main application was designed to implement a Model Context Protocol (MCP) server for OpenSCAD integration. Several approaches were considered:
1. Using a standalone server with direct OpenSCAD CLI calls
2. Implementing a web service with REST API
3. Creating an MCP-compliant server with FastAPI
The MCP-compliant FastAPI approach was selected for its alignment with the project requirements and modern API design.
</exploration>
<mental-model>
The main application operates on a "tool-based MCP service" paradigm, where each capability is exposed as an MCP tool that can be called by AI assistants. This mental model aligns with the MCP specification and provides a clean separation of concerns.
</mental-model>
<pattern-recognition>
The implementation uses the Facade pattern to provide a simple interface to the complex subsystems (parameter extraction, code generation, OpenSCAD wrapper, etc.). This pattern simplifies the client interface and decouples the subsystems from clients.
</pattern-recognition>
<trade-off>
Options considered:
1. Monolithic application with tightly coupled components
2. Microservices architecture with separate services
3. Modular monolith with clear component boundaries
The modular monolith approach was chosen because:
- Simpler deployment and operation
- Lower latency for inter-component communication
- Easier to develop and debug
- Still maintains good separation of concerns
</trade-off>
<domain-knowledge>
The implementation required understanding of:
- Model Context Protocol (MCP) specification
- FastAPI framework
- OpenSCAD command-line interface
- 3D modeling and printing workflows
</domain-knowledge>
<technical-debt>
The current implementation has some limitations:
- In-memory storage of models (not persistent)
- Basic error handling
- Limited printer discovery capabilities
Future improvements planned:
- Persistent storage for models
- Enhanced error handling and reporting
- More robust printer discovery and management
</technical-debt>
<knowledge-refs>
[OpenSCAD Basics](/rtfmd/knowledge/openscad/openscad-basics.md) - Last updated 2025-03-21
[AI-Driven Code Generation](/rtfmd/decisions/ai-driven-code-generation.md) - Last updated 2025-03-21
</knowledge-refs>
```
--------------------------------------------------------------------------------
/rtfmd/files/src/nlp/parameter_extractor.py.md:
--------------------------------------------------------------------------------
```markdown
<metadata>
author: devin-ai-integration
timestamp: 2025-03-21T01:30:00Z
version: 1.0.0
related-files: [/src/models/code_generator.py]
prompt: "Enhance parameter extractor with expanded shape recognition"
</metadata>
<exploration>
The parameter extractor was designed to parse natural language descriptions and extract structured parameters for 3D model generation. Several approaches were considered:
1. Using a full NLP pipeline with named entity recognition
2. Implementing regex-based pattern matching
3. Creating a hybrid approach with contextual understanding
The regex-based approach with contextual enhancements was selected for its balance of simplicity and effectiveness.
</exploration>
<mental-model>
The parameter extractor operates on a "pattern recognition and extraction" paradigm, where common phrases and patterns in natural language are mapped to specific parameter types. This mental model allows for intuitive parameter extraction from diverse descriptions.
</mental-model>
<pattern-recognition>
The implementation uses the Strategy pattern for different parameter extraction strategies based on shape type. This pattern allows for specialized extraction logic for each shape type while maintaining a consistent interface.
</pattern-recognition>
<trade-off>
Options considered:
1. Machine learning-based approach with trained models
2. Pure regex pattern matching
3. Hybrid approach with contextual rules
The regex approach with contextual rules was chosen because:
- Simpler implementation with good accuracy
- No training data required
- Easier to debug and maintain
- More predictable behavior
</trade-off>
<domain-knowledge>
The implementation required understanding of:
- Natural language processing concepts
- Regular expression pattern matching
- 3D modeling terminology
- Parameter types for different geometric shapes
</domain-knowledge>
<technical-debt>
The current implementation has some limitations:
- Limited support for complex nested descriptions
- Regex patterns may need maintenance as language evolves
- Only supports millimeters as per project requirements
Future improvements planned:
- Enhanced contextual understanding
- Support for more complex descriptions
- Better handling of ambiguous parameters
</technical-debt>
<knowledge-refs>
[Parameter Extraction](/rtfmd/knowledge/nlp/parameter-extraction.md) - Last updated 2025-03-21
[Natural Language Processing](/rtfmd/knowledge/ai/natural-language-processing.md) - Last updated 2025-03-21
</knowledge-refs>
```
--------------------------------------------------------------------------------
/rtfmd/knowledge/ai/natural-language-processing.md:
--------------------------------------------------------------------------------
```markdown
# Natural Language Processing for 3D Modeling
<metadata>
author: devin-ai-integration
timestamp: 2025-03-21T01:30:00Z
version: 1.0.0
tags: [nlp, 3d-modeling, parameter-extraction, pattern-matching]
</metadata>
## Overview
Natural Language Processing (NLP) techniques can be applied to extract 3D modeling parameters and intentions from user descriptions. This knowledge document outlines approaches for translating natural language into structured data for 3D model generation.
## Approaches
### Pattern Matching
Regular expression pattern matching is effective for identifying:
- Dimensions and measurements
- Shape types and primitives
- Operations (union, difference, etc.)
- Transformations (rotate, scale, etc.)
- Material properties and colors
Example patterns:
```python
# Dimension pattern
dimension_pattern = r'(\d+(?:\.\d+)?)\s*(mm|cm|m|inch|in)'
# Shape pattern
shape_pattern = r'\b(cube|box|sphere|ball|cylinder|tube|cone|pyramid)\b'
```
### Contextual Understanding
Beyond simple pattern matching, contextual understanding involves:
- Identifying relationships between objects
- Understanding relative positioning
- Resolving ambiguous references
- Maintaining dialog state for multi-turn interactions
### Hybrid Approaches
Combining pattern matching with contextual rules provides:
- Better accuracy than pure pattern matching
- Lower computational requirements than full ML approaches
- More maintainable and debuggable systems
- Flexibility to handle diverse descriptions
## Parameter Extraction
Key parameters to extract include:
- **Dimensions**: Width, height, depth, radius, diameter
- **Positions**: Coordinates, relative positions
- **Operations**: Boolean operations, transformations
- **Features**: Holes, fillets, chamfers, text
- **Properties**: Color, material, finish
## Implementation Considerations
- **Ambiguity Resolution**: Handle cases where measurements could apply to multiple dimensions
- **Default Values**: Provide sensible defaults for unspecified parameters
- **Unit Conversion**: Convert between different measurement units
- **Error Handling**: Gracefully handle unparseable or contradictory descriptions
- **Dialog Management**: Maintain state for multi-turn interactions to refine models
## Evaluation Metrics
Effective NLP for 3D modeling can be evaluated by:
- **Accuracy**: Correctness of extracted parameters
- **Completeness**: Percentage of required parameters successfully extracted
- **Robustness**: Ability to handle diverse phrasings and descriptions
- **User Satisfaction**: Subjective evaluation of the resulting models
```
--------------------------------------------------------------------------------
/rtfmd/decisions/export-formats.md:
--------------------------------------------------------------------------------
```markdown
# Decision: Export Format Selection
<metadata>
author: devin-ai-integration
timestamp: 2025-03-21T12:00:00Z
version: 1.0.0
tags: [export-formats, 3d-printing, decision, prusa, bambu]
</metadata>
## Context
The OpenSCAD MCP Server needs to export 3D models in formats that:
1. Preserve parametric properties
2. Support metadata
3. Are compatible with Prusa and Bambu printers
4. Avoid limitations of STL format
<exploration>
We evaluated multiple export formats:
- STL: Traditional format but lacks metadata
- CSG: OpenSCAD's native format, fully parametric
- SCAD: Source code, fully parametric
- 3MF: Modern format with metadata support
- AMF: XML-based format with metadata
- DXF/SVG: 2D formats for laser cutting
</exploration>
## Decision
We will use **3MF as the primary export format** with AMF as a secondary option.
CSG and SCAD will be supported for users who want to modify the models in OpenSCAD.
<mental-model>
The ideal export format should:
- Maintain all design parameters
- Include metadata about the model
- Be widely supported by popular slicers
- Have a clean, standardized specification
- Support multiple objects and materials
</mental-model>
## Rationale
<pattern-recognition>
Modern 3D printing workflows favor formats that preserve more information than just geometry. The industry is shifting from STL to more capable formats like 3MF.
</pattern-recognition>
- **3MF** is supported by both Prusa and Bambu printer software
- **3MF** includes support for metadata, colors, and materials
- **3MF** has a cleaner specification than STL
- **AMF** offers similar advantages but with less widespread adoption
- **CSG/SCAD** formats maintain full parametric properties but only within OpenSCAD
<trade-off>
We considered making STL an option for broader compatibility, but this would compromise our goal of preserving parametric properties. The benefits of 3MF outweigh the minor compatibility issues that might arise.
</trade-off>
## Consequences
**Positive:**
- Better preservation of model information
- Improved compatibility with modern printer software
- Future-proof approach as 3MF adoption increases
**Negative:**
- Slightly more complex implementation than STL
- May require validation to ensure proper format compliance
<technical-debt>
We will need to implement validation for 3MF and AMF files to ensure they meet specifications. This adds complexity but is necessary for reliability.
</technical-debt>
<knowledge-refs>
- [OpenSCAD Export Formats](/rtfmd/knowledge/openscad/export-formats.md)
- [OpenSCAD Basics](/rtfmd/knowledge/openscad/openscad-basics.md)
</knowledge-refs>
```
--------------------------------------------------------------------------------
/rtfmd/knowledge/openscad/export-formats.md:
--------------------------------------------------------------------------------
```markdown
# OpenSCAD Export Formats
<metadata>
author: devin-ai-integration
timestamp: 2025-03-21T12:00:00Z
version: 1.0.0
tags: [openscad, export-formats, 3d-printing, prusa, bambu]
</metadata>
## Overview
OpenSCAD supports exporting 3D models in various formats, each with different capabilities for preserving parametric properties and metadata. This document focuses on formats suitable for Prusa and Bambu printers, with an emphasis on alternatives to STL.
## Recommended Formats
### 3MF (3D Manufacturing Format)
3MF is a modern replacement for STL that addresses many of its limitations:
- **Metadata Support**: Includes model information, materials, colors
- **Compact Size**: More efficient encoding than STL
- **Multiple Objects**: Can contain multiple parts in a single file
- **Printer Compatibility**: Widely supported by Prusa and Bambu printers
- **Implementation**: ZIP archive containing XML files
```openscad
// Export to 3MF from command line
// openscad -o model.3mf model.scad
```
### AMF (Additive Manufacturing File Format)
AMF is another modern format that supports:
- **Material Information**: Material properties and colors
- **Curved Surfaces**: Better representation than STL's triangles
- **Metadata**: Design information and parameters
- **Implementation**: XML-based format
```openscad
// Export to AMF from command line
// openscad -o model.amf model.scad
```
### CSG (Constructive Solid Geometry)
CSG is OpenSCAD's native format:
- **Fully Parametric**: Preserves all construction operations
- **Editable**: Can be reopened and modified in OpenSCAD
- **Implementation**: OpenSCAD's internal representation
```openscad
// Export to CSG from command line
// openscad -o model.csg model.scad
```
### SCAD (OpenSCAD Source Code)
The original SCAD file preserves all parametric properties:
- **Complete Parameterization**: All variables and relationships
- **Code Structure**: Modules, functions, and comments
- **Implementation**: Text file with OpenSCAD code
## Printer Compatibility
### Prusa Printers
Prusa printers work well with:
- **3MF**: Full support in PrusaSlicer
- **AMF**: Good support for materials and colors
- **STL**: Supported but with limitations
### Bambu Printers
Bambu printers work best with:
- **3MF**: Preferred format for Bambu Lab software
- **AMF**: Well supported
- **STL**: Basic support
## Implementation Notes
When implementing export functionality:
1. Use OpenSCAD's command-line interface for reliable exports
2. Add metadata to 3MF and AMF files for better organization
3. Test exported files with actual printer software
4. Validate files before sending to printers
```
--------------------------------------------------------------------------------
/implementation_plan.md:
--------------------------------------------------------------------------------
```markdown
# Implementation Plan: OpenSCAD-MCP-Server with AI-Driven 3D Modeling
## 1. Project Structure Updates
### 1.1 New Modules
```
src/
├── ai/
│ ├── venice_api.py # Venice.ai API client
│ └── sam_segmentation.py # SAM2 integration
├── models/
│ └── threestudio_generator.py # threestudio integration
└── workflow/
└── image_to_model_pipeline.py # Workflow orchestration
```
### 1.2 Dependencies
Add to requirements.txt:
```
# Image Generation - Venice.ai API
# (using existing requests and python-dotenv)
# Object Segmentation - SAM2
torch>=2.0.0
torchvision>=0.15.0
opencv-python>=4.7.0
segment-anything>=1.0
# 3D Model Creation - threestudio
ninja>=1.11.0
pytorch3d>=0.7.4
trimesh>=3.21.0
```
## 2. Component Implementation
### 2.1 Venice.ai API Integration
- Create `VeniceImageGenerator` class in `venice_api.py`
- Implement authentication with API key
- Add image generation with Flux model
- Support image downloading and storage
### 2.2 SAM2 Integration
- Create `SAMSegmenter` class in `sam_segmentation.py`
- Implement model loading with PyTorch
- Add object segmentation from images
- Support mask generation and visualization
### 2.3 threestudio Integration
- Create `ThreeStudioGenerator` class in `threestudio_generator.py`
- Implement 3D model generation from masked images
- Support model export in formats compatible with OpenSCAD
- Add preview image generation
### 2.4 OpenSCAD Integration
- Extend `OpenSCADWrapper` with methods to:
- Import 3D models from threestudio
- Generate parametric modifications
- Create multi-angle previews
- Export in various formats
### 2.5 Workflow Orchestration
- Create `ImageToModelPipeline` class to coordinate the workflow:
1. Generate image with Venice.ai API
2. Segment object with SAM2
3. Create 3D model with threestudio
4. Import into OpenSCAD for parametric editing
## 3. MCP Tool Integration
Add new MCP tools to main.py:
- `generate_image_from_text`: Generate images using Venice.ai
- `segment_object_from_image`: Segment objects using SAM2
- `generate_3d_model_from_image`: Create 3D models using threestudio
- `generate_model_from_text`: End-to-end pipeline from text to 3D model
## 4. Hardware Requirements
- SAM2: NVIDIA GPU with 6GB+ VRAM
- threestudio: NVIDIA GPU with 6GB+ VRAM
- Consider implementing fallback options for environments with limited GPU resources
## 5. Implementation Phases
### Phase 1: Basic Integration
- Implement Venice.ai API client
- Set up SAM2 with basic segmentation
- Create threestudio wrapper with minimal functionality
- Extend OpenSCAD wrapper for model import
### Phase 2: Workflow Orchestration
- Implement the full pipeline
- Add MCP tools for each component
- Create end-to-end workflow tool
### Phase 3: Optimization and Refinement
- Optimize for performance
- Add error handling and recovery
- Implement corrective cycle for mesh modification
- Add user interface improvements
```
--------------------------------------------------------------------------------
/old/test_sam2_segmentation.py:
--------------------------------------------------------------------------------
```python
import os
import sys
import logging
import argparse
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import SAM2 segmenter and config
from src.ai.sam_segmentation import SAMSegmenter
from src.config import SAM2_CHECKPOINT_PATH, SAM2_MODEL_TYPE, SAM2_USE_GPU, MASKS_DIR
def test_sam2_segmentation(image_path: str, output_dir: Optional[str] = None, use_auto_points: bool = True):
"""
Test SAM2 segmentation on an image.
Args:
image_path: Path to the input image
output_dir: Directory to save segmentation results (default: config.MASKS_DIR)
use_auto_points: Whether to use automatic point generation
"""
# Validate image path
if not os.path.exists(image_path):
logger.error(f"Image not found: {image_path}")
return
# Use default output directory if not provided
if not output_dir:
output_dir = os.path.join(MASKS_DIR, Path(image_path).stem)
# Create output directory
os.makedirs(output_dir, exist_ok=True)
logger.info(f"Testing SAM2 segmentation on image: {image_path}")
logger.info(f"Model type: {SAM2_MODEL_TYPE}")
logger.info(f"Checkpoint path: {SAM2_CHECKPOINT_PATH}")
logger.info(f"Using GPU: {SAM2_USE_GPU}")
try:
# Initialize SAM2 segmenter
logger.info("Initializing SAM2 segmenter...")
sam_segmenter = SAMSegmenter(
model_type=SAM2_MODEL_TYPE,
checkpoint_path=SAM2_CHECKPOINT_PATH,
use_gpu=SAM2_USE_GPU,
output_dir=output_dir
)
# Perform segmentation
if use_auto_points:
logger.info("Using automatic point generation")
result = sam_segmenter.segment_with_auto_points(image_path)
else:
# Use center point of the image for manual point
logger.info("Using manual center point")
import cv2
image = cv2.imread(image_path)
h, w = image.shape[:2]
center_point = (w // 2, h // 2)
result = sam_segmenter.segment_image(image_path, points=[center_point])
# Print results
logger.info(f"Segmentation completed with {result.get('num_masks', 0)} masks")
if result.get('mask_paths'):
logger.info(f"Mask paths: {result.get('mask_paths')}")
return result
except Exception as e:
logger.error(f"Error in SAM2 segmentation: {str(e)}")
import traceback
traceback.print_exc()
return None
if __name__ == "__main__":
# Parse command line arguments
parser = argparse.ArgumentParser(description="Test SAM2 segmentation")
parser.add_argument("image_path", help="Path to the input image")
parser.add_argument("--output-dir", help="Directory to save segmentation results")
parser.add_argument("--manual-points", action="store_true", help="Use manual center point instead of auto points")
args = parser.parse_args()
# Run test
test_sam2_segmentation(
args.image_path,
args.output_dir,
not args.manual_points
)
```
--------------------------------------------------------------------------------
/src/utils/stl_repair.py:
--------------------------------------------------------------------------------
```python
import os
import logging
from typing import Tuple, Optional
logger = logging.getLogger(__name__)
class STLRepair:
"""Provides methods to repair non-manifold STL files."""
@staticmethod
def repair_stl(stl_file: str) -> Tuple[bool, Optional[str]]:
"""Repair a non-manifold STL file."""
if not os.path.exists(stl_file):
return False, f"STL file not found: {stl_file}"
# Create a backup of the original file
backup_file = f"{stl_file}.bak"
try:
with open(stl_file, 'rb') as src, open(backup_file, 'wb') as dst:
dst.write(src.read())
except Exception as e:
logger.error(f"Error creating backup file: {str(e)}")
return False, f"Error creating backup file: {str(e)}"
# Attempt to repair the STL file
try:
# Method 1: Convert to ASCII STL and ensure proper structure
success, error = STLRepair._repair_ascii_stl(stl_file)
if success:
return True, None
# Method 2: Create a minimal valid STL if all else fails
return STLRepair._create_minimal_valid_stl(stl_file)
except Exception as e:
logger.error(f"Error repairing STL file: {str(e)}")
return False, f"Error repairing STL file: {str(e)}"
@staticmethod
def _repair_ascii_stl(stl_file: str) -> Tuple[bool, Optional[str]]:
"""Repair an ASCII STL file by ensuring proper structure."""
try:
# Read the file
with open(stl_file, 'r') as f:
content = f.read()
# Check if it's an ASCII STL
if not content.strip().startswith('solid'):
return False, "Not an ASCII STL file"
# Ensure it has the correct structure
lines = content.strip().split('\n')
# Extract the solid name
solid_name = lines[0].replace('solid', '').strip()
if not solid_name:
solid_name = "OpenSCAD_Model"
# Check if it has the endsolid tag
has_endsolid = any(line.strip().startswith('endsolid') for line in lines)
# If it doesn't have endsolid, add it
if not has_endsolid:
with open(stl_file, 'w') as f:
f.write(content.strip())
f.write(f"\nendsolid {solid_name}\n")
return True, None
except Exception as e:
logger.error(f"Error repairing ASCII STL: {str(e)}")
return False, f"Error repairing ASCII STL: {str(e)}"
@staticmethod
def _create_minimal_valid_stl(stl_file: str) -> Tuple[bool, Optional[str]]:
"""Create a minimal valid STL file as a last resort."""
try:
# Create a minimal valid STL file
with open(stl_file, 'w') as f:
f.write("solid OpenSCAD_Model\n")
f.write(" facet normal 0 0 0\n")
f.write(" outer loop\n")
f.write(" vertex 0 0 0\n")
f.write(" vertex 1 0 0\n")
f.write(" vertex 0 1 0\n")
f.write(" endloop\n")
f.write(" endfacet\n")
f.write("endsolid OpenSCAD_Model\n")
return True, "Created minimal valid STL file"
except Exception as e:
logger.error(f"Error creating minimal valid STL file: {str(e)}")
return False, f"Error creating minimal valid STL file: {str(e)}"
```
--------------------------------------------------------------------------------
/src/testing/test_primitives.py:
--------------------------------------------------------------------------------
```python
import os
import argparse
import logging
import json
from typing import Dict, Any, List
from src.models.code_generator import OpenSCADCodeGenerator
from src.utils.cad_exporter import CADExporter
from src.utils.format_validator import FormatValidator
from src.testing.primitive_tester import PrimitiveTester
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser(description='Test OpenSCAD primitives with different export formats')
parser.add_argument('--output-dir', default='test_output', help='Directory to store test output')
parser.add_argument('--formats', nargs='+', default=['3mf', 'amf', 'csg', 'scad'],
help='Formats to test (default: 3mf amf csg scad)')
parser.add_argument('--primitives', nargs='+',
help='Primitives to test (default: all)')
parser.add_argument('--validate', action='store_true',
help='Validate exported files')
parser.add_argument('--printer-type', choices=['prusa', 'bambu'], default='prusa',
help='Printer type to check compatibility with (default: prusa)')
args = parser.parse_args()
# Create directories
os.makedirs("scad", exist_ok=True)
os.makedirs(args.output_dir, exist_ok=True)
# Initialize components
# Use absolute path for templates to avoid path issues
templates_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../models/scad_templates"))
code_generator = OpenSCADCodeGenerator(templates_dir, "scad")
cad_exporter = CADExporter()
# Initialize tester
tester = PrimitiveTester(code_generator, cad_exporter, args.output_dir)
# Override formats if specified
if args.formats:
tester.formats = args.formats
# Test primitives
if args.primitives:
results = {}
for primitive in args.primitives:
results[primitive] = tester.test_primitive(primitive)
else:
results = tester.test_all_primitives()
# Print results
logger.info(f"Test results: {json.dumps(results, indent=2)}")
# Validate exported files if requested
if args.validate:
validator = FormatValidator()
validation_results = {}
for primitive, primitive_results in results.items():
validation_results[primitive] = {}
for format_type, format_results in primitive_results["formats"].items():
if format_results["success"] and format_type in ['3mf', 'amf']:
output_file = format_results["output_file"]
if format_type == '3mf':
is_valid, error = validator.validate_3mf(output_file)
elif format_type == 'amf':
is_valid, error = validator.validate_amf(output_file)
else:
is_valid, error = False, "Validation not supported for this format"
# Check printer compatibility
is_compatible, compat_error = validator.check_printer_compatibility(
output_file, args.printer_type
)
metadata = validator.extract_metadata(output_file)
validation_results[primitive][format_type] = {
"is_valid": is_valid,
"error": error,
"is_compatible_with_printer": is_compatible,
"compatibility_error": compat_error,
"metadata": metadata
}
logger.info(f"Validation results: {json.dumps(validation_results, indent=2)}")
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/old/download_sam2_checkpoint.py:
--------------------------------------------------------------------------------
```python
import os
import sys
import logging
import requests
import argparse
from pathlib import Path
from tqdm import tqdm
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# SAM2 checkpoint URLs
CHECKPOINT_URLS = {
"vit_h": "https://dl.fbaipublicfiles.com/segment_anything_2/sam2_vit_h.pth",
"vit_l": "https://dl.fbaipublicfiles.com/segment_anything_2/sam2_vit_l.pth",
"vit_b": "https://dl.fbaipublicfiles.com/segment_anything_2/sam2_vit_b.pth"
}
# Checkpoint sizes (approximate, in MB)
CHECKPOINT_SIZES = {
"vit_h": 2560, # 2.5 GB
"vit_l": 1250, # 1.2 GB
"vit_b": 380 # 380 MB
}
def download_checkpoint(model_type="vit_b", output_dir="models"):
"""
Download SAM2 checkpoint.
Args:
model_type: Model type to download (vit_h, vit_l, vit_b)
output_dir: Directory to save the checkpoint
Returns:
Path to the downloaded checkpoint
"""
if model_type not in CHECKPOINT_URLS:
raise ValueError(f"Invalid model type: {model_type}. Available types: {list(CHECKPOINT_URLS.keys())}")
url = CHECKPOINT_URLS[model_type]
output_path = os.path.join(output_dir, f"sam2_{model_type}.pth")
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Check if checkpoint already exists
if os.path.exists(output_path):
logger.info(f"Checkpoint already exists at {output_path}")
return output_path
# Download checkpoint
logger.info(f"Downloading SAM2 checkpoint ({model_type}) from {url}")
logger.info(f"Approximate size: {CHECKPOINT_SIZES[model_type]} MB")
try:
# Stream download with progress bar
response = requests.get(url, stream=True)
response.raise_for_status()
# Get total file size
total_size = int(response.headers.get('content-length', 0))
# Create progress bar
with open(output_path, 'wb') as f, tqdm(
desc=f"Downloading {model_type}",
total=total_size,
unit='B',
unit_scale=True,
unit_divisor=1024,
) as pbar:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
pbar.update(len(chunk))
logger.info(f"Checkpoint downloaded to {output_path}")
return output_path
except requests.exceptions.RequestException as e:
logger.error(f"Error downloading checkpoint: {str(e)}")
# Remove partial download if it exists
if os.path.exists(output_path):
os.remove(output_path)
raise
except KeyboardInterrupt:
logger.info("Download interrupted by user")
# Remove partial download if it exists
if os.path.exists(output_path):
os.remove(output_path)
sys.exit(1)
def main():
"""Main function to parse arguments and download checkpoint."""
parser = argparse.ArgumentParser(description="Download SAM2 checkpoint")
parser.add_argument("--model_type", type=str, default="vit_b", choices=list(CHECKPOINT_URLS.keys()),
help="Model type to download (vit_h, vit_l, vit_b). Default: vit_b (smallest)")
parser.add_argument("--output_dir", type=str, default="models",
help="Directory to save the checkpoint")
args = parser.parse_args()
# Print model information
logger.info(f"Selected model: {args.model_type}")
logger.info(f"Approximate sizes: vit_h: 2.5 GB, vit_l: 1.2 GB, vit_b: 380 MB")
try:
checkpoint_path = download_checkpoint(args.model_type, args.output_dir)
logger.info(f"Checkpoint ready at: {checkpoint_path}")
except Exception as e:
logger.error(f"Failed to download checkpoint: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/rtfmd/decisions/ai-driven-code-generation.md:
--------------------------------------------------------------------------------
```markdown
# AI-Driven Code Generation for OpenSCAD
<metadata>
author: devin-ai-integration
timestamp: 2025-03-21T01:30:00Z
version: 1.0.0
tags: [ai, code-generation, openscad, architecture-decision]
</metadata>
## Decision Context
The OpenSCAD MCP Server requires a mechanism to translate natural language descriptions into valid OpenSCAD code. This architectural decision record documents the approach chosen for implementing AI-driven code generation.
## Options Considered
### Option 1: Template-Based Approach
A simple approach using predefined templates with parameter substitution.
**Pros:**
- Simple implementation
- Predictable output
- Low computational requirements
**Cons:**
- Limited flexibility
- Cannot handle complex or novel descriptions
- Requires manual creation of templates for each shape type
### Option 2: Full Machine Learning Approach
Using embeddings and neural networks to generate OpenSCAD code directly.
**Pros:**
- Highly flexible
- Can handle novel descriptions
- Potential for more natural interaction
**Cons:**
- High computational requirements
- Requires training data
- Less predictable output
- Harder to debug and maintain
### Option 3: Hybrid Pattern Matching with Contextual Rules
Combining pattern matching for parameter extraction with rule-based code generation.
**Pros:**
- Good balance of flexibility and predictability
- Moderate computational requirements
- Easier to debug and maintain
- Can be extended with more sophisticated ML in the future
**Cons:**
- More complex than pure template approach
- Less flexible than full ML approach
- Requires careful design of rules and patterns
## Decision
**Chosen Option: Option 3 - Hybrid Pattern Matching with Contextual Rules**
The hybrid approach was selected because it provides a good balance of flexibility, maintainability, and computational efficiency. It allows for handling a wide range of natural language descriptions while maintaining predictable output and being easier to debug than a full ML approach.
## Implementation Details
The implementation consists of two main components:
1. **Parameter Extractor**: Uses regex patterns and contextual rules to extract parameters from natural language descriptions.
2. **Code Generator**: Translates extracted parameters into OpenSCAD code using a combination of templates and programmatic generation.
The `AIService` class provides the bridge between these components, handling the overall flow from natural language to code.
```python
class AIService:
def __init__(self, templates_dir, model_config=None):
self.templates_dir = templates_dir
self.model_config = model_config or {}
self.templates = self._load_templates()
def generate_openscad_code(self, context):
description = context.get("description", "")
parameters = context.get("parameters", {})
# Parse the description to identify key components
components = self._parse_description(description)
# Generate code based on identified components
code = self._generate_code_from_components(components, parameters)
return code
```
## Consequences
### Positive
- More flexible code generation than a pure template approach
- Better maintainability than a full ML approach
- Lower computational requirements
- Easier to debug and extend
- Can handle a wide range of natural language descriptions
### Negative
- More complex implementation than a pure template approach
- Requires careful design of patterns and rules
- May still struggle with very complex or ambiguous descriptions
### Neutral
- Will require ongoing maintenance as new shape types and features are added
- May need to be extended with more sophisticated ML techniques in the future
## Follow-up Actions
- Implement unit tests for the AI service
- Create a comprehensive set of test cases for different description types
- Document the pattern matching rules and code generation logic
- Consider adding a feedback mechanism to improve the system over time
```
--------------------------------------------------------------------------------
/src/workflow/image_approval.py:
--------------------------------------------------------------------------------
```python
"""
Image approval tool for MCP clients.
"""
import os
import logging
import shutil
from typing import Dict, Any, List, Optional
logger = logging.getLogger(__name__)
class ImageApprovalTool:
"""
Tool for image approval/denial in MCP clients.
"""
def __init__(self, output_dir: str = "output/approved_images"):
"""
Initialize the image approval tool.
Args:
output_dir: Directory to store approved images
"""
self.output_dir = output_dir
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
def present_image_for_approval(self, image_path: str, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Present an image to the user for approval.
Args:
image_path: Path to the image
metadata: Optional metadata about the image
Returns:
Dictionary with image path and approval request ID
"""
# For MCP server, we just prepare the response
# The actual approval is handled by the client
approval_id = os.path.basename(image_path).split('.')[0]
return {
"approval_id": approval_id,
"image_path": image_path,
"image_url": f"/images/{os.path.basename(image_path)}",
"metadata": metadata or {}
}
def process_approval(self, approval_id: str, approved: bool, image_path: str) -> Dict[str, Any]:
"""
Process user's approval or denial of an image.
Args:
approval_id: ID of the approval request
approved: Whether the image was approved
image_path: Path to the image
Returns:
Dictionary with approval status and image path
"""
if approved:
# Copy approved image to output directory
approved_path = os.path.join(self.output_dir, os.path.basename(image_path))
os.makedirs(os.path.dirname(approved_path), exist_ok=True)
shutil.copy2(image_path, approved_path)
return {
"approval_id": approval_id,
"approved": True,
"original_path": image_path,
"approved_path": approved_path
}
else:
return {
"approval_id": approval_id,
"approved": False,
"original_path": image_path
}
def get_approved_images(self, filter_pattern: Optional[str] = None) -> List[str]:
"""
Get list of approved images.
Args:
filter_pattern: Optional pattern to filter image names
Returns:
List of paths to approved images
"""
import glob
if filter_pattern:
pattern = os.path.join(self.output_dir, filter_pattern)
else:
pattern = os.path.join(self.output_dir, "*")
return glob.glob(pattern)
def get_approval_status(self, approval_id: str) -> Dict[str, Any]:
"""
Get the approval status for a specific approval ID.
Args:
approval_id: ID of the approval request
Returns:
Dictionary with approval status
"""
# Check if any approved image matches the approval ID
approved_images = self.get_approved_images()
for image_path in approved_images:
if approval_id in os.path.basename(image_path):
return {
"approval_id": approval_id,
"approved": True,
"approved_path": image_path
}
return {
"approval_id": approval_id,
"approved": False
}
def batch_process_approvals(self, approvals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Process multiple approvals at once.
Args:
approvals: List of dictionaries with approval_id, approved, and image_path
Returns:
List of dictionaries with approval results
"""
results = []
for approval in approvals:
result = self.process_approval(
approval_id=approval["approval_id"],
approved=approval["approved"],
image_path=approval["image_path"]
)
results.append(result)
return results
```
--------------------------------------------------------------------------------
/src/utils/stl_validator.py:
--------------------------------------------------------------------------------
```python
import os
import logging
import subprocess
import tempfile
from typing import Tuple, Optional
logger = logging.getLogger(__name__)
class STLValidator:
"""
Validates STL files to ensure they are manifold (watertight) and suitable for 3D printing.
"""
@staticmethod
def validate_stl(stl_file: str) -> Tuple[bool, Optional[str]]:
"""
Validate an STL file to ensure it is manifold and suitable for 3D printing.
Args:
stl_file: Path to the STL file to validate
Returns:
Tuple of (is_valid, error_message)
"""
if not os.path.exists(stl_file):
return False, f"STL file not found: {stl_file}"
# Check file size
file_size = os.path.getsize(stl_file)
if file_size == 0:
return False, "STL file is empty"
# Basic validation - check if the file starts with "solid" for ASCII STL
# or contains binary header for binary STL
try:
with open(stl_file, 'rb') as f:
header = f.read(5)
if header == b'solid':
# ASCII STL
is_valid, error = STLValidator._validate_ascii_stl(stl_file)
else:
# Binary STL
is_valid, error = STLValidator._validate_binary_stl(stl_file)
return is_valid, error
except Exception as e:
logger.error(f"Error validating STL file: {str(e)}")
return False, f"Error validating STL file: {str(e)}"
@staticmethod
def _validate_ascii_stl(stl_file: str) -> Tuple[bool, Optional[str]]:
"""Validate an ASCII STL file."""
try:
with open(stl_file, 'r') as f:
content = f.read()
# Check if the file has the correct structure
if not content.strip().startswith('solid'):
return False, "Invalid ASCII STL: Missing 'solid' header"
if not content.strip().endswith('endsolid'):
return False, "Invalid ASCII STL: Missing 'endsolid' footer"
# Count facets and vertices
facet_count = content.count('facet normal')
vertex_count = content.count('vertex')
if facet_count == 0:
return False, "Invalid ASCII STL: No facets found"
if vertex_count != facet_count * 3:
return False, f"Invalid ASCII STL: Expected {facet_count * 3} vertices, found {vertex_count}"
return True, None
except Exception as e:
logger.error(f"Error validating ASCII STL: {str(e)}")
return False, f"Error validating ASCII STL: {str(e)}"
@staticmethod
def _validate_binary_stl(stl_file: str) -> Tuple[bool, Optional[str]]:
"""Validate a binary STL file."""
try:
with open(stl_file, 'rb') as f:
# Skip 80-byte header
f.seek(80)
# Read number of triangles (4-byte unsigned int)
triangle_count_bytes = f.read(4)
if len(triangle_count_bytes) != 4:
return False, "Invalid binary STL: File too short"
# Convert bytes to integer (little-endian)
triangle_count = int.from_bytes(triangle_count_bytes, byteorder='little')
# Check file size
expected_size = 84 + (triangle_count * 50) # Header + count + triangles
actual_size = os.path.getsize(stl_file)
if actual_size != expected_size:
return False, f"Invalid binary STL: Expected size {expected_size}, actual size {actual_size}"
return True, None
except Exception as e:
logger.error(f"Error validating binary STL: {str(e)}")
return False, f"Error validating binary STL: {str(e)}"
@staticmethod
def repair_stl(stl_file: str) -> Tuple[bool, Optional[str]]:
"""
Attempt to repair a non-manifold STL file.
Args:
stl_file: Path to the STL file to repair
Returns:
Tuple of (success, error_message)
"""
# This is a placeholder for STL repair functionality
# In a real implementation, you would use a library like admesh or meshlab
# to repair the STL file
logger.warning(f"STL repair not implemented: {stl_file}")
return False, "STL repair not implemented"
```
--------------------------------------------------------------------------------
/rtfmd/knowledge/nlp/parameter-extraction.md:
--------------------------------------------------------------------------------
```markdown
# Parameter Extraction for 3D Modeling
<metadata>
author: devin-ai-integration
timestamp: 2025-03-21T01:30:00Z
version: 1.0.0
tags: [parameter-extraction, nlp, 3d-modeling, regex]
</metadata>
## Overview
Parameter extraction is the process of identifying and extracting structured data from natural language descriptions of 3D models. This is a critical component in translating user intentions into actionable modeling parameters.
## Extraction Techniques
### Regular Expression Patterns
Regular expressions provide a powerful way to extract parameters:
```python
# Extract dimensions with units
dimension_pattern = r'(\d+(?:\.\d+)?)\s*(mm|cm|m|inch|in)'
# Extract color information
color_pattern = r'\b(red|green|blue|yellow|black|white|purple|orange|brown)\b'
# Extract shape type
shape_pattern = r'\b(cube|box|sphere|ball|cylinder|tube|cone|pyramid)\b'
```
### Contextual Parameter Association
After extracting raw values, they must be associated with the correct parameter:
```python
def associate_dimension(value, description):
"""Associate a dimension value with the correct parameter based on context."""
if "width" in description or "wide" in description:
return ("width", value)
elif "height" in description or "tall" in description:
return ("height", value)
elif "depth" in description or "deep" in description:
return ("depth", value)
elif "radius" in description:
return ("radius", value)
elif "diameter" in description:
return ("radius", value / 2) # Convert diameter to radius
else:
return ("unknown", value)
```
### Default Parameters
Provide sensible defaults for unspecified parameters:
```python
default_parameters = {
"cube": {
"width": 10,
"height": 10,
"depth": 10,
"center": True
},
"sphere": {
"radius": 10,
"segments": 32
},
"cylinder": {
"radius": 5,
"height": 10,
"center": True,
"segments": 32
}
}
```
## Parameter Types
Common parameter types to extract include:
- **Dimensions**: Width, height, depth, radius, diameter
- **Positions**: X, Y, Z coordinates
- **Angles**: Rotation angles
- **Counts**: Number of sides, segments, iterations
- **Booleans**: Center, solid/hollow
- **Colors**: RGB values or named colors
- **Operations**: Union, difference, intersection
- **Transformations**: Translate, rotate, scale, mirror
## Challenges and Solutions
### Ambiguity
When parameters are ambiguous, use contextual clues or ask clarifying questions:
```python
def resolve_ambiguity(value, possible_parameters, description):
"""Resolve ambiguity between possible parameters."""
# Try to resolve using context
for param in possible_parameters:
if param in description:
return param
# If still ambiguous, return a question to ask
return f"Is {value} the {' or '.join(possible_parameters)}?"
```
### Unit Conversion
Convert all measurements to a standard unit (millimeters):
```python
def convert_to_mm(value, unit):
"""Convert a value from the given unit to millimeters."""
if unit in ["mm", "millimeter", "millimeters"]:
return value
elif unit in ["cm", "centimeter", "centimeters"]:
return value * 10
elif unit in ["m", "meter", "meters"]:
return value * 1000
elif unit in ["in", "inch", "inches"]:
return value * 25.4
else:
return value # Assume mm if unit is unknown
```
### Dialog State Management
Maintain state across multiple interactions:
```python
class DialogState:
def __init__(self):
self.shape_type = None
self.parameters = {}
self.questions = []
self.confirmed = False
def add_parameter(self, name, value):
self.parameters[name] = value
def add_question(self, question):
self.questions.append(question)
def is_complete(self):
"""Check if all required parameters are present."""
if not self.shape_type:
return False
required_params = self.get_required_parameters()
return all(param in self.parameters for param in required_params)
def get_required_parameters(self):
"""Get the required parameters for the current shape type."""
if self.shape_type == "cube":
return ["width", "height", "depth"]
elif self.shape_type == "sphere":
return ["radius"]
elif self.shape_type == "cylinder":
return ["radius", "height"]
else:
return []
```
## Best Practices
- Start with simple pattern matching and add complexity as needed
- Provide sensible defaults for all parameters
- Use contextual clues to resolve ambiguity
- Maintain dialog state for multi-turn interactions
- Convert all measurements to a standard unit
- Validate extracted parameters for reasonableness
- Handle errors gracefully with helpful messages
```
--------------------------------------------------------------------------------
/src/utils/stl_exporter.py:
--------------------------------------------------------------------------------
```python
import os
import logging
import uuid
import shutil
from typing import Dict, Any, Optional, Tuple
from src.utils.stl_validator import STLValidator
logger = logging.getLogger(__name__)
class STLExporter:
"""
Handles STL file export and validation for 3D printing.
"""
def __init__(self, openscad_wrapper, output_dir: str):
"""
Initialize the STL exporter.
Args:
openscad_wrapper: Instance of OpenSCADWrapper for generating STL files
output_dir: Directory to store output files
"""
self.openscad_wrapper = openscad_wrapper
self.output_dir = output_dir
self.stl_dir = os.path.join(output_dir, "stl")
# Create directory if it doesn't exist
os.makedirs(self.stl_dir, exist_ok=True)
def export_stl(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> Tuple[str, bool, Optional[str]]:
"""
Export a SCAD file to STL format.
Args:
scad_file: Path to the SCAD file
parameters: Optional parameters to override in the SCAD file
Returns:
Tuple of (stl_file_path, is_valid, error_message)
"""
try:
# Generate STL file
stl_file = self.openscad_wrapper.generate_stl(scad_file, parameters)
# Validate STL file
is_valid, error = STLValidator.validate_stl(stl_file)
if not is_valid:
logger.warning(f"STL validation failed: {error}")
# Attempt to repair if validation fails
repair_success, repair_error = STLValidator.repair_stl(stl_file)
if repair_success:
# Validate again after repair
is_valid, error = STLValidator.validate_stl(stl_file)
else:
logger.error(f"STL repair failed: {repair_error}")
return stl_file, is_valid, error
except Exception as e:
logger.error(f"Error exporting STL: {str(e)}")
return "", False, str(e)
def export_stl_with_metadata(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Export a SCAD file to STL format and include metadata.
Args:
scad_file: Path to the SCAD file
parameters: Optional parameters to override in the SCAD file
metadata: Optional metadata to include with the STL file
Returns:
Dictionary with STL file information
"""
# Export STL file
stl_file, is_valid, error = self.export_stl(scad_file, parameters)
# Create metadata file if metadata is provided
metadata_file = None
if metadata and stl_file:
metadata_file = self._create_metadata_file(stl_file, metadata)
# Extract model ID from filename
model_id = os.path.basename(scad_file).split('.')[0] if scad_file else str(uuid.uuid4())
return {
"model_id": model_id,
"stl_file": stl_file,
"is_valid": is_valid,
"error": error,
"metadata_file": metadata_file,
"metadata": metadata
}
def _create_metadata_file(self, stl_file: str, metadata: Dict[str, Any]) -> str:
"""Create a metadata file for an STL file."""
metadata_file = f"{os.path.splitext(stl_file)[0]}.json"
try:
import json
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
logger.info(f"Created metadata file: {metadata_file}")
return metadata_file
except Exception as e:
logger.error(f"Error creating metadata file: {str(e)}")
return ""
def copy_stl_to_location(self, stl_file: str, destination: str) -> str:
"""
Copy an STL file to a specified location.
Args:
stl_file: Path to the STL file
destination: Destination path or directory
Returns:
Path to the copied STL file
"""
try:
if not os.path.exists(stl_file):
raise FileNotFoundError(f"STL file not found: {stl_file}")
# If destination is a directory, create a filename
if os.path.isdir(destination):
filename = os.path.basename(stl_file)
destination = os.path.join(destination, filename)
# Copy the file
shutil.copy2(stl_file, destination)
logger.info(f"Copied STL file to: {destination}")
return destination
except Exception as e:
logger.error(f"Error copying STL file: {str(e)}")
return ""
```
--------------------------------------------------------------------------------
/test_image_to_model_pipeline.py:
--------------------------------------------------------------------------------
```python
import os
import sys
import logging
import argparse
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import components
from src.ai.venice_api import VeniceImageGenerator
from src.ai.sam_segmentation import SAMSegmenter
from src.models.threestudio_generator import ThreeStudioGenerator
from src.openscad_wrapper.wrapper import OpenSCADWrapper
from src.workflow.image_to_model_pipeline import ImageToModelPipeline
from src.config import (
VENICE_API_KEY, IMAGES_DIR, MASKS_DIR, MODELS_DIR, SCAD_DIR,
SAM2_CHECKPOINT_PATH, SAM2_MODEL_TYPE, SAM2_USE_GPU, THREESTUDIO_PATH
)
def test_pipeline(prompt: str, output_dir: Optional[str] = None,
venice_model: str = "fluently-xl", skip_steps: List[str] = None):
"""
Test the full image-to-model pipeline.
Args:
prompt: Text prompt for image generation
output_dir: Directory to save pipeline results
venice_model: Venice.ai model to use for image generation
skip_steps: List of steps to skip ('image', 'segment', 'model3d', 'openscad')
"""
# Use default output directory if not provided
if not output_dir:
output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output", "pipeline_test")
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Initialize skip_steps if None
skip_steps = skip_steps or []
logger.info(f"Testing image-to-model pipeline with prompt: {prompt}")
logger.info(f"Output directory: {output_dir}")
logger.info(f"Venice model: {venice_model}")
logger.info(f"Skipping steps: {skip_steps}")
try:
# Initialize components
logger.info("Initializing pipeline components...")
# Venice.ai image generator
venice_generator = VeniceImageGenerator(
api_key=VENICE_API_KEY,
output_dir=os.path.join(output_dir, "images")
)
# SAM2 segmenter
sam_segmenter = SAMSegmenter(
model_type=SAM2_MODEL_TYPE,
checkpoint_path=SAM2_CHECKPOINT_PATH,
use_gpu=SAM2_USE_GPU,
output_dir=os.path.join(output_dir, "masks")
)
# ThreeStudio generator
threestudio_generator = ThreeStudioGenerator(
threestudio_path=THREESTUDIO_PATH,
output_dir=os.path.join(output_dir, "models")
)
# OpenSCAD wrapper
openscad_wrapper = OpenSCADWrapper(
output_dir=os.path.join(output_dir, "scad")
)
# Initialize pipeline
pipeline = ImageToModelPipeline(
venice_generator=venice_generator,
sam_segmenter=sam_segmenter,
threestudio_generator=threestudio_generator,
openscad_wrapper=openscad_wrapper,
output_dir=output_dir
)
# Run pipeline with custom steps
if 'image' in skip_steps:
# Skip image generation, use a test image
logger.info("Skipping image generation, using test image")
image_path = os.path.join(IMAGES_DIR, "test_image.png")
if not os.path.exists(image_path):
logger.error(f"Test image not found: {image_path}")
return
# TODO: Implement custom pipeline execution with skipped steps
logger.info("Custom pipeline execution not implemented yet")
return
else:
# Run full pipeline
logger.info("Running full pipeline...")
result = pipeline.generate_model_from_text(
prompt=prompt,
venice_params={"model": venice_model},
sam_params={},
threestudio_params={}
)
# Print results
logger.info("Pipeline completed successfully")
logger.info(f"Pipeline ID: {result.get('pipeline_id')}")
logger.info(f"Image path: {result.get('image', {}).get('local_path')}")
logger.info(f"Mask count: {result.get('segmentation', {}).get('num_masks', 0)}")
logger.info(f"3D model path: {result.get('model_3d', {}).get('exported_files', [])}")
logger.info(f"OpenSCAD file: {result.get('openscad', {}).get('scad_file')}")
return result
except Exception as e:
logger.error(f"Error in pipeline: {str(e)}")
import traceback
traceback.print_exc()
return None
if __name__ == "__main__":
# Parse command line arguments
parser = argparse.ArgumentParser(description="Test image-to-model pipeline")
parser.add_argument("prompt", help="Text prompt for image generation")
parser.add_argument("--output-dir", help="Directory to save pipeline results")
parser.add_argument("--venice-model", default="fluently-xl", help="Venice.ai model to use")
parser.add_argument("--skip", nargs="+", choices=["image", "segment", "model3d", "openscad"],
help="Steps to skip in the pipeline")
args = parser.parse_args()
# Run test
test_pipeline(
args.prompt,
args.output_dir,
args.venice_model,
args.skip
)
```
--------------------------------------------------------------------------------
/src/config.py:
--------------------------------------------------------------------------------
```python
import os
from typing import Dict, Any
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Base directories
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
OUTPUT_DIR = os.path.join(BASE_DIR, "output")
# Output subdirectories
IMAGES_DIR = os.path.join(OUTPUT_DIR, "images")
MULTI_VIEW_DIR = os.path.join(OUTPUT_DIR, "multi_view")
APPROVED_IMAGES_DIR = os.path.join(OUTPUT_DIR, "approved_images")
MODELS_DIR = os.path.join(OUTPUT_DIR, "models")
SCAD_DIR = os.path.join(BASE_DIR, "scad")
# Google Gemini API configuration
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") # Set via environment variable
GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta"
GEMINI_MODEL = "gemini-2.0-flash-exp-image-generation" # Default model for image generation
# CUDA Multi-View Stereo configuration (local)
CUDA_MVS_PATH = os.getenv("CUDA_MVS_PATH", os.path.join(BASE_DIR, "cuda-mvs"))
CUDA_MVS_USE_GPU = os.getenv("CUDA_MVS_USE_GPU", "False").lower() == "true" # Default to CPU for macOS compatibility
# Remote CUDA Multi-View Stereo configuration
REMOTE_CUDA_MVS = {
# General settings
"ENABLED": os.getenv("REMOTE_CUDA_MVS_ENABLED", "True").lower() == "true",
"USE_LAN_DISCOVERY": os.getenv("REMOTE_CUDA_MVS_USE_LAN_DISCOVERY", "True").lower() == "true",
# Server connection
"SERVER_URL": os.getenv("REMOTE_CUDA_MVS_SERVER_URL", ""), # Empty means use LAN discovery
"API_KEY": os.getenv("REMOTE_CUDA_MVS_API_KEY", ""),
"DISCOVERY_PORT": int(os.getenv("REMOTE_CUDA_MVS_DISCOVERY_PORT", "8765")),
# Connection parameters
"CONNECTION_TIMEOUT": int(os.getenv("REMOTE_CUDA_MVS_CONNECTION_TIMEOUT", "10")),
"UPLOAD_CHUNK_SIZE": int(os.getenv("REMOTE_CUDA_MVS_UPLOAD_CHUNK_SIZE", "1048576")), # 1MB
"DOWNLOAD_CHUNK_SIZE": int(os.getenv("REMOTE_CUDA_MVS_DOWNLOAD_CHUNK_SIZE", "1048576")), # 1MB
# Retry and error handling
"MAX_RETRIES": int(os.getenv("REMOTE_CUDA_MVS_MAX_RETRIES", "3")),
"BASE_RETRY_DELAY": float(os.getenv("REMOTE_CUDA_MVS_BASE_RETRY_DELAY", "1.0")),
"MAX_RETRY_DELAY": float(os.getenv("REMOTE_CUDA_MVS_MAX_RETRY_DELAY", "60.0")),
"JITTER_FACTOR": float(os.getenv("REMOTE_CUDA_MVS_JITTER_FACTOR", "0.1")),
# Health check
"HEALTH_CHECK_INTERVAL": int(os.getenv("REMOTE_CUDA_MVS_HEALTH_CHECK_INTERVAL", "60")),
"CIRCUIT_BREAKER_THRESHOLD": int(os.getenv("REMOTE_CUDA_MVS_CIRCUIT_BREAKER_THRESHOLD", "5")),
"CIRCUIT_BREAKER_RECOVERY_TIMEOUT": float(os.getenv("REMOTE_CUDA_MVS_CIRCUIT_BREAKER_RECOVERY_TIMEOUT", "30.0")),
# Processing parameters
"DEFAULT_RECONSTRUCTION_QUALITY": os.getenv("REMOTE_CUDA_MVS_DEFAULT_QUALITY", "normal"), # low, normal, high
"DEFAULT_OUTPUT_FORMAT": os.getenv("REMOTE_CUDA_MVS_DEFAULT_FORMAT", "obj"),
"WAIT_FOR_COMPLETION": os.getenv("REMOTE_CUDA_MVS_WAIT_FOR_COMPLETION", "True").lower() == "true",
"POLL_INTERVAL": int(os.getenv("REMOTE_CUDA_MVS_POLL_INTERVAL", "5")),
# Output directories
"OUTPUT_DIR": MODELS_DIR,
"IMAGES_DIR": IMAGES_DIR,
"MULTI_VIEW_DIR": MULTI_VIEW_DIR,
"APPROVED_IMAGES_DIR": APPROVED_IMAGES_DIR,
}
# Venice.ai API configuration (optional)
VENICE_API_KEY = os.getenv("VENICE_API_KEY", "") # Set via environment variable
VENICE_BASE_URL = "https://api.venice.ai/api/v1"
VENICE_MODEL = "fluently-xl" # Default model for fastest image generation (2.30s)
# Image approval configuration
IMAGE_APPROVAL = {
"ENABLED": os.getenv("IMAGE_APPROVAL_ENABLED", "True").lower() == "true",
"AUTO_APPROVE": os.getenv("IMAGE_APPROVAL_AUTO_APPROVE", "False").lower() == "true",
"APPROVAL_TIMEOUT": int(os.getenv("IMAGE_APPROVAL_TIMEOUT", "300")), # 5 minutes
"MIN_APPROVED_IMAGES": int(os.getenv("IMAGE_APPROVAL_MIN_IMAGES", "3")),
"APPROVED_IMAGES_DIR": APPROVED_IMAGES_DIR,
}
# Multi-view to model pipeline configuration
MULTI_VIEW_PIPELINE = {
"DEFAULT_NUM_VIEWS": int(os.getenv("MULTI_VIEW_DEFAULT_NUM_VIEWS", "4")),
"MIN_NUM_VIEWS": int(os.getenv("MULTI_VIEW_MIN_NUM_VIEWS", "3")),
"MAX_NUM_VIEWS": int(os.getenv("MULTI_VIEW_MAX_NUM_VIEWS", "8")),
"VIEW_ANGLES": [0, 90, 180, 270], # Default view angles (degrees)
"OUTPUT_DIR": MULTI_VIEW_DIR,
}
# Natural language processing configuration for MCP
NLP = {
"ENABLE_INTERACTIVE_PARAMS": os.getenv("NLP_ENABLE_INTERACTIVE_PARAMS", "True").lower() == "true",
"PARAM_EXTRACTION_PROMPT_TEMPLATE": """
Extract the following parameters from the user's request for 3D model generation:
1. Object description
2. Number of views requested (default: 4)
3. Reconstruction quality (low, normal, high)
4. Output format (obj, ply, stl, scad)
5. Any specific view angles mentioned
If a parameter is not specified, return the default value or leave blank.
Format the response as a JSON object.
User request: {user_request}
""",
}
# Deprecated configurations (moved to old folder)
# These are kept for reference but not used in the new workflow
DEPRECATED = {
"SAM2_CHECKPOINT_PATH": os.getenv("SAM2_CHECKPOINT_PATH", os.path.join(BASE_DIR, "models", "sam2_vit_b.pth")),
"SAM2_MODEL_TYPE": os.getenv("SAM2_MODEL_TYPE", "vit_b"),
"SAM2_USE_GPU": os.getenv("SAM2_USE_GPU", "False").lower() == "true",
"THREESTUDIO_PATH": os.path.join(BASE_DIR, "threestudio")
}
# Create necessary directories
for directory in [OUTPUT_DIR, IMAGES_DIR, MULTI_VIEW_DIR, APPROVED_IMAGES_DIR, MODELS_DIR, SCAD_DIR]:
os.makedirs(directory, exist_ok=True)
```
--------------------------------------------------------------------------------
/test_gemini_api.py:
--------------------------------------------------------------------------------
```python
"""
Test script for Google Gemini API integration.
"""
import os
import sys
import logging
import unittest
from unittest.mock import patch, MagicMock
from pathlib import Path
# Add the src directory to the path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.ai.gemini_api import GeminiImageGenerator
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TestGeminiAPI(unittest.TestCase):
"""
Test cases for Google Gemini API integration.
"""
def setUp(self):
"""
Set up test environment.
"""
# Create a test output directory
self.test_output_dir = "output/test_gemini"
os.makedirs(self.test_output_dir, exist_ok=True)
# Mock API key
self.api_key = "test_api_key"
# Create the generator with the mock API key
self.gemini_generator = GeminiImageGenerator(
api_key=self.api_key,
output_dir=self.test_output_dir
)
@patch('requests.post')
def test_generate_image(self, mock_post):
"""
Test generating a single image with Gemini API.
"""
# Mock response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"candidates": [
{
"content": {
"parts": [
{
"text": "Generated image description"
},
{
"inlineData": {
"mimeType": "image/png",
"data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg=="
}
}
]
}
}
]
}
mock_post.return_value = mock_response
# Test parameters
prompt = "A low-poly rabbit with black background"
model = "gemini-2.0-flash-exp-image-generation"
# Call the method
result = self.gemini_generator.generate_image(prompt, model)
# Verify the result
self.assertIsNotNone(result)
self.assertEqual(result["prompt"], prompt)
self.assertEqual(result["model"], model)
self.assertTrue("local_path" in result)
self.assertTrue(os.path.exists(result["local_path"]))
# Verify the API call
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
self.assertTrue("generativelanguage.googleapis.com" in args[0])
self.assertEqual(kwargs["headers"]["x-goog-api-key"], self.api_key)
self.assertTrue("prompt" in str(kwargs["json"]))
@patch('requests.post')
def test_generate_multiple_views(self, mock_post):
"""
Test generating multiple views of an object with Gemini API.
"""
# Mock response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"candidates": [
{
"content": {
"parts": [
{
"text": "Generated image description"
},
{
"inlineData": {
"mimeType": "image/png",
"data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg=="
}
}
]
}
}
]
}
mock_post.return_value = mock_response
# Test parameters
prompt = "A low-poly rabbit"
num_views = 3
# Call the method
results = self.gemini_generator.generate_multiple_views(prompt, num_views)
# Verify the results
self.assertEqual(len(results), num_views)
for i, result in enumerate(results):
self.assertTrue("view_direction" in result)
self.assertEqual(result["view_index"], i + 1)
self.assertTrue("local_path" in result)
self.assertTrue(os.path.exists(result["local_path"]))
# Verify the API calls
self.assertEqual(mock_post.call_count, num_views)
@patch('requests.post')
def test_error_handling(self, mock_post):
"""
Test error handling in the Gemini API client.
"""
# Mock error response
mock_response = MagicMock()
mock_response.status_code = 400
mock_response.raise_for_status.side_effect = Exception("API Error")
mock_post.return_value = mock_response
# Test parameters
prompt = "A low-poly rabbit"
# Call the method and expect an exception
with self.assertRaises(Exception):
self.gemini_generator.generate_image(prompt)
def tearDown(self):
"""
Clean up after tests.
"""
# Clean up test output directory
import shutil
if os.path.exists(self.test_output_dir):
shutil.rmtree(self.test_output_dir)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/src/ai/gemini_api.py:
--------------------------------------------------------------------------------
```python
"""
Google Gemini API integration for image generation.
"""
import os
import logging
import base64
from typing import Dict, Any, List, Optional
from io import BytesIO
from PIL import Image
import requests
logger = logging.getLogger(__name__)
class GeminiImageGenerator:
"""
Wrapper for Google Gemini API for generating images.
"""
def __init__(self, api_key: str, output_dir: str = "output/images"):
"""
Initialize the Gemini image generator.
Args:
api_key: Google Gemini API key
output_dir: Directory to store generated images
"""
self.api_key = api_key
self.output_dir = output_dir
self.base_url = "https://generativelanguage.googleapis.com/v1beta"
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
def generate_image(self, prompt: str, model: str = "gemini-2.0-flash-exp-image-generation",
output_path: Optional[str] = None, **kwargs) -> Dict[str, Any]:
"""
Generate an image using Google Gemini API.
Args:
prompt: Text description for image generation
model: Gemini model to use
output_path: Path to save the generated image
**kwargs: Additional parameters for Gemini API
Returns:
Dictionary containing image data and metadata
"""
logger.info(f"Generating image with prompt: {prompt}")
try:
# Prepare the request payload
payload = {
"contents": [
{
"parts": [
{"text": prompt}
]
}
],
"generationConfig": {
"responseModalities": ["Text", "Image"]
}
}
# Add any additional parameters
for key, value in kwargs.items():
if key not in payload:
payload[key] = value
# Make API request
response = requests.post(
f"{self.base_url}/models/{model}:generateContent",
headers={
"Content-Type": "application/json",
"x-goog-api-key": self.api_key
},
json=payload
)
# Check for errors
response.raise_for_status()
result = response.json()
# Extract image data
image_data = None
for part in result["candidates"][0]["content"]["parts"]:
if "inlineData" in part:
image_data = base64.b64decode(part["inlineData"]["data"])
break
if not image_data:
raise ValueError("No image was generated in the response")
# Save image if output_path is provided
if not output_path:
# Generate output path if not provided
os.makedirs(self.output_dir, exist_ok=True)
output_path = os.path.join(self.output_dir, f"{prompt[:20].replace(' ', '_')}.png")
# Save image
image = Image.open(BytesIO(image_data))
image.save(output_path)
logger.info(f"Image saved to {output_path}")
return {
"prompt": prompt,
"model": model,
"local_path": output_path,
"image_data": image_data
}
except Exception as e:
logger.error(f"Error generating image: {str(e)}")
raise
def generate_multiple_views(self, prompt: str, num_views: int = 4,
base_image_path: Optional[str] = None,
output_dir: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Generate multiple views of the same 3D object.
Args:
prompt: Text description of the object
num_views: Number of views to generate
base_image_path: Optional path to a base image
output_dir: Directory to save the generated images
Returns:
List of dictionaries containing image data and metadata
"""
if not output_dir:
output_dir = os.path.join(self.output_dir, prompt[:20].replace(' ', '_'))
os.makedirs(output_dir, exist_ok=True)
# View directions to include in prompts
view_directions = [
"front view", "side view from the right",
"side view from the left", "back view",
"top view", "bottom view", "45-degree angle view"
]
results = []
# Generate images for each view direction
for i in range(min(num_views, len(view_directions))):
view_prompt = f"{prompt} - {view_directions[i]}, same object, consistent style and details"
# Generate the image
output_path = os.path.join(output_dir, f"view_{i+1}.png")
result = self.generate_image(view_prompt, output_path=output_path)
# Add view direction to result
result["view_direction"] = view_directions[i]
result["view_index"] = i + 1
results.append(result)
return results
```
--------------------------------------------------------------------------------
/test_image_approval.py:
--------------------------------------------------------------------------------
```python
"""
Test script for image approval tool.
"""
import os
import sys
import logging
import unittest
from unittest.mock import patch, MagicMock
from pathlib import Path
# Add the src directory to the path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.workflow.image_approval import ImageApprovalTool
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TestImageApproval(unittest.TestCase):
"""
Test cases for image approval tool.
"""
def setUp(self):
"""
Set up test environment.
"""
# Create a test output directory
self.test_output_dir = "output/test_approval"
os.makedirs(self.test_output_dir, exist_ok=True)
# Create test images directory
self.test_images_dir = "output/test_approval/images"
os.makedirs(self.test_images_dir, exist_ok=True)
# Create test images
self.test_images = []
for i in range(3):
image_path = os.path.join(self.test_images_dir, f"view_{i}.png")
with open(image_path, "w") as f:
f.write(f"Mock image {i}")
self.test_images.append(image_path)
# Create the approval tool
self.approval_tool = ImageApprovalTool(
output_dir=os.path.join(self.test_output_dir, "approved")
)
def test_present_image_for_approval(self):
"""
Test presenting an image for approval.
"""
# Test parameters
image_path = self.test_images[0]
metadata = {
"prompt": "A test image",
"view_direction": "front view",
"view_index": 1
}
# Call the method
result = self.approval_tool.present_image_for_approval(image_path, metadata)
# Verify the result
self.assertIsNotNone(result)
self.assertTrue("approval_id" in result)
self.assertEqual(result["image_path"], image_path)
self.assertTrue("image_url" in result)
self.assertEqual(result["metadata"], metadata)
def test_process_approval_approved(self):
"""
Test processing an approved image.
"""
# Test parameters
image_path = self.test_images[0]
approval_id = "test_approval_1"
# Call the method
result = self.approval_tool.process_approval(approval_id, True, image_path)
# Verify the result
self.assertIsNotNone(result)
self.assertEqual(result["approval_id"], approval_id)
self.assertTrue(result["approved"])
self.assertEqual(result["original_path"], image_path)
self.assertTrue("approved_path" in result)
# Verify the file was copied
self.assertTrue(os.path.exists(result["approved_path"]))
def test_process_approval_denied(self):
"""
Test processing a denied image.
"""
# Test parameters
image_path = self.test_images[1]
approval_id = "test_approval_2"
# Call the method
result = self.approval_tool.process_approval(approval_id, False, image_path)
# Verify the result
self.assertIsNotNone(result)
self.assertEqual(result["approval_id"], approval_id)
self.assertFalse(result["approved"])
self.assertEqual(result["original_path"], image_path)
self.assertFalse("approved_path" in result)
def test_get_approved_images(self):
"""
Test getting approved images.
"""
# Approve some images
for i, image_path in enumerate(self.test_images):
self.approval_tool.process_approval(f"test_approval_{i}", True, image_path)
# Call the method
approved_images = self.approval_tool.get_approved_images()
# Verify the result
self.assertEqual(len(approved_images), len(self.test_images))
def test_get_approval_status(self):
"""
Test getting approval status.
"""
# Approve an image
approval_id = "test_approval_status"
self.approval_tool.process_approval(approval_id, True, self.test_images[0])
# Call the method
status = self.approval_tool.get_approval_status(approval_id)
# Verify the result
self.assertIsNotNone(status)
self.assertEqual(status["approval_id"], approval_id)
self.assertTrue(status["approved"])
self.assertTrue("approved_path" in status)
# Test with non-existent approval ID
status = self.approval_tool.get_approval_status("non_existent")
self.assertIsNotNone(status)
self.assertEqual(status["approval_id"], "non_existent")
self.assertFalse(status["approved"])
def test_batch_process_approvals(self):
"""
Test batch processing of approvals.
"""
# Test parameters
approvals = [
{
"approval_id": "batch_1",
"approved": True,
"image_path": self.test_images[0]
},
{
"approval_id": "batch_2",
"approved": False,
"image_path": self.test_images[1]
},
{
"approval_id": "batch_3",
"approved": True,
"image_path": self.test_images[2]
}
]
# Call the method
results = self.approval_tool.batch_process_approvals(approvals)
# Verify the results
self.assertEqual(len(results), len(approvals))
# Check approved images
approved_images = self.approval_tool.get_approved_images()
self.assertEqual(len(approved_images), 2) # Two images were approved
def tearDown(self):
"""
Clean up after tests.
"""
# Clean up test output directory
import shutil
if os.path.exists(self.test_output_dir):
shutil.rmtree(self.test_output_dir)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/test_cuda_mvs.py:
--------------------------------------------------------------------------------
```python
"""
Test script for CUDA Multi-View Stereo integration.
"""
import os
import sys
import logging
import unittest
from unittest.mock import patch, MagicMock
from pathlib import Path
# Add the src directory to the path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.models.cuda_mvs import CUDAMultiViewStereo
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TestCUDAMVS(unittest.TestCase):
"""
Test cases for CUDA Multi-View Stereo integration.
"""
def setUp(self):
"""
Set up test environment.
"""
# Create a test output directory
self.test_output_dir = "output/test_cuda_mvs"
os.makedirs(self.test_output_dir, exist_ok=True)
# Create test image directory
self.test_images_dir = "output/test_cuda_mvs/images"
os.makedirs(self.test_images_dir, exist_ok=True)
# Create mock CUDA MVS path
self.cuda_mvs_path = "mock_cuda_mvs"
os.makedirs(os.path.join(self.cuda_mvs_path, "build"), exist_ok=True)
# Create mock executable
with open(os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), "w") as f:
f.write("#!/bin/bash\necho 'Mock CUDA MVS'\n")
os.chmod(os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), 0o755)
# Create test images
for i in range(3):
with open(os.path.join(self.test_images_dir, f"view_{i}.png"), "w") as f:
f.write(f"Mock image {i}")
# Create the CUDA MVS wrapper with the mock path
with patch('os.path.exists', return_value=True):
self.cuda_mvs = CUDAMultiViewStereo(
cuda_mvs_path=self.cuda_mvs_path,
output_dir=self.test_output_dir
)
@patch('subprocess.Popen')
def test_generate_model_from_images(self, mock_popen):
"""
Test generating a 3D model from multiple images.
"""
# Mock subprocess
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.communicate.return_value = ("Mock stdout", "")
mock_popen.return_value = mock_process
# Mock file creation
def mock_exists(path):
if "point_cloud_file" in str(path):
# Create the mock point cloud file
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write("Mock point cloud")
return True
return os.path.exists(path)
# Test parameters
image_paths = [os.path.join(self.test_images_dir, f"view_{i}.png") for i in range(3)]
output_name = "test_model"
# Call the method with patched os.path.exists
with patch('os.path.exists', side_effect=mock_exists):
result = self.cuda_mvs.generate_model_from_images(image_paths, output_name=output_name)
# Verify the result
self.assertIsNotNone(result)
self.assertEqual(result["model_id"], output_name)
self.assertTrue("point_cloud_file" in result)
self.assertTrue("camera_params_file" in result)
self.assertEqual(len(result["input_images"]), 3)
# Verify the subprocess call
mock_popen.assert_called_once()
args, kwargs = mock_popen.call_args
self.assertTrue("app_patch_match_mvs" in args[0][0])
def test_generate_camera_params(self):
"""
Test generating camera parameters from images.
"""
# Test parameters
image_paths = [os.path.join(self.test_images_dir, f"view_{i}.png") for i in range(3)]
model_dir = os.path.join(self.test_output_dir, "camera_params_test")
os.makedirs(model_dir, exist_ok=True)
# Mock PIL.Image.open
mock_image = MagicMock()
mock_image.size = (800, 600)
# Call the method with patched PIL.Image.open
with patch('PIL.Image.open', return_value=mock_image):
params_file = self.cuda_mvs._generate_camera_params(image_paths, model_dir)
# Verify the result
self.assertTrue(os.path.exists(params_file))
# Read the params file
import json
with open(params_file, "r") as f:
params = json.load(f)
# Verify the params
self.assertEqual(len(params), 3)
for i, param in enumerate(params):
self.assertEqual(param["image_id"], i)
self.assertEqual(param["width"], 800)
self.assertEqual(param["height"], 600)
self.assertTrue("camera" in param)
self.assertEqual(param["camera"]["model"], "PINHOLE")
def test_convert_ply_to_obj(self):
"""
Test converting PLY point cloud to OBJ mesh.
"""
# Create a mock PLY file
ply_file = os.path.join(self.test_output_dir, "test.ply")
with open(ply_file, "w") as f:
f.write("Mock PLY file")
# Call the method
obj_file = self.cuda_mvs.convert_ply_to_obj(ply_file)
# Verify the result
self.assertTrue(os.path.exists(obj_file))
self.assertTrue(obj_file.endswith(".obj"))
# Read the OBJ file
with open(obj_file, "r") as f:
content = f.read()
# Verify the content
self.assertTrue("# Converted from test.ply" in content)
self.assertTrue("v " in content)
self.assertTrue("f " in content)
def test_error_handling(self):
"""
Test error handling in the CUDA MVS wrapper.
"""
# Test parameters
image_paths = [os.path.join(self.test_images_dir, f"view_{i}.png") for i in range(3)]
output_name = "error_test"
# Mock subprocess with error
mock_process = MagicMock()
mock_process.returncode = 1
mock_process.communicate.return_value = ("", "Mock error")
# Call the method with patched subprocess.Popen
with patch('subprocess.Popen', return_value=mock_process):
with self.assertRaises(RuntimeError):
self.cuda_mvs.generate_model_from_images(image_paths, output_name=output_name)
def tearDown(self):
"""
Clean up after tests.
"""
# Clean up test output directory
import shutil
if os.path.exists(self.test_output_dir):
shutil.rmtree(self.test_output_dir)
# Clean up mock CUDA MVS path
if os.path.exists(self.cuda_mvs_path):
shutil.rmtree(self.cuda_mvs_path)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/src/visualization/renderer.py:
--------------------------------------------------------------------------------
```python
import os
import subprocess
import logging
from typing import Dict, Any, Optional
from PIL import Image, ImageDraw, ImageFont
logger = logging.getLogger(__name__)
class Renderer:
"""
Handles rendering of OpenSCAD models to preview images.
Implements multi-angle views and fallback rendering when headless mode fails.
"""
def __init__(self, openscad_wrapper):
"""
Initialize the renderer.
Args:
openscad_wrapper: Instance of OpenSCADWrapper for generating previews
"""
self.openscad_wrapper = openscad_wrapper
# Standard camera angles for multi-view rendering
self.camera_angles = {
'front': "0,0,0,0,0,0,50",
'top': "0,0,0,90,0,0,50",
'right': "0,0,0,0,90,0,50",
'perspective': "20,20,20,55,0,25,100"
}
def generate_preview(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> str:
"""
Generate a preview image for a SCAD file.
Args:
scad_file: Path to the SCAD file
parameters: Optional parameters to override in the SCAD file
Returns:
Path to the generated preview image
"""
try:
# Try to generate a preview using OpenSCAD
preview_file = self.openscad_wrapper.generate_preview(
scad_file,
parameters,
camera_position=self.camera_angles['perspective'],
image_size="800,600"
)
# Check if the file exists and has content
if os.path.exists(preview_file) and os.path.getsize(preview_file) > 0:
return preview_file
else:
# If the file doesn't exist or is empty, create a placeholder
return self._create_placeholder_image(preview_file)
except Exception as e:
logger.error(f"Error generating preview: {str(e)}")
# Create a placeholder image
model_id = os.path.basename(scad_file).split('.')[0]
preview_file = os.path.join(self.openscad_wrapper.preview_dir, f"{model_id}.png")
return self._create_placeholder_image(preview_file)
def generate_multi_angle_previews(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
"""
Generate preview images from multiple angles.
Args:
scad_file: Path to the SCAD file
parameters: Optional parameters to override in the SCAD file
Returns:
Dictionary mapping angle names to preview image paths
"""
previews = {}
model_id = os.path.basename(scad_file).split('.')[0]
for angle_name, camera_position in self.camera_angles.items():
preview_file = os.path.join(
self.openscad_wrapper.preview_dir,
f"{model_id}_{angle_name}.png"
)
try:
# Try to generate a preview using OpenSCAD
preview_file = self.openscad_wrapper.generate_preview(
scad_file,
parameters,
camera_position=camera_position,
image_size="800,600"
)
# Check if the file exists and has content
if os.path.exists(preview_file) and os.path.getsize(preview_file) > 0:
previews[angle_name] = preview_file
else:
# If the file doesn't exist or is empty, create a placeholder
previews[angle_name] = self._create_placeholder_image(preview_file, angle_name)
except Exception as e:
logger.error(f"Error generating {angle_name} preview: {str(e)}")
# Create a placeholder image
previews[angle_name] = self._create_placeholder_image(preview_file, angle_name)
return previews
def _create_placeholder_image(self, output_path: str, angle_name: str = "perspective") -> str:
"""
Create a placeholder image when OpenSCAD rendering fails.
Args:
output_path: Path to save the placeholder image
angle_name: Name of the camera angle for the placeholder
Returns:
Path to the created placeholder image
"""
try:
# Create a blank image
img = Image.new('RGB', (800, 600), color=(240, 240, 240))
draw = ImageDraw.Draw(img)
# Add text
draw.text((400, 280), f"Preview not available", fill=(0, 0, 0))
draw.text((400, 320), f"View: {angle_name}", fill=(0, 0, 0))
# Save the image
img.save(output_path)
logger.info(f"Created placeholder image: {output_path}")
return output_path
except Exception as e:
logger.error(f"Error creating placeholder image: {str(e)}")
# If all else fails, return the path anyway
return output_path
def create_composite_preview(self, previews: Dict[str, str], output_path: str) -> str:
"""
Create a composite image from multiple angle previews.
Args:
previews: Dictionary mapping angle names to preview image paths
output_path: Path to save the composite image
Returns:
Path to the created composite image
"""
try:
# Create a blank image
img = Image.new('RGB', (1600, 1200), color=(240, 240, 240))
# Load and paste each preview
positions = {
'perspective': (0, 0),
'front': (800, 0),
'top': (0, 600),
'right': (800, 600)
}
for angle_name, preview_path in previews.items():
if angle_name in positions and os.path.exists(preview_path):
try:
angle_img = Image.open(preview_path)
# Resize if needed
angle_img = angle_img.resize((800, 600))
# Paste into composite
img.paste(angle_img, positions[angle_name])
except Exception as e:
logger.error(f"Error processing {angle_name} preview: {str(e)}")
# Save the composite image
img.save(output_path)
logger.info(f"Created composite preview: {output_path}")
return output_path
except Exception as e:
logger.error(f"Error creating composite preview: {str(e)}")
# If all else fails, return the path anyway
return output_path
```
--------------------------------------------------------------------------------
/src/testing/primitive_tester.py:
--------------------------------------------------------------------------------
```python
import os
import logging
from typing import Dict, Any, List, Optional, Tuple
from src.models.code_generator import OpenSCADCodeGenerator
from src.utils.cad_exporter import CADExporter
logger = logging.getLogger(__name__)
class PrimitiveTester:
"""Tests OpenSCAD primitives with different export formats."""
def __init__(self, code_generator: OpenSCADCodeGenerator, cad_exporter: CADExporter,
output_dir: str = "test_output"):
"""
Initialize the primitive tester.
Args:
code_generator: CodeGenerator instance for generating OpenSCAD code
cad_exporter: CADExporter instance for exporting models
output_dir: Directory to store test output
"""
self.code_generator = code_generator
self.cad_exporter = cad_exporter
self.output_dir = output_dir
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Primitive types to test
self.primitives = [
"cube", "sphere", "cylinder", "cone", "torus",
"rounded_box", "hexagonal_prism", "text"
]
# Export formats to test (no STL per requirements)
self.formats = ["3mf", "amf", "csg", "scad"]
def test_all_primitives(self) -> Dict[str, Dict[str, Any]]:
"""
Test all primitives with all formats.
Returns:
Dictionary of test results for each primitive
"""
results = {}
for primitive in self.primitives:
results[primitive] = self.test_primitive(primitive)
return results
def test_primitive(self, primitive_type: str) -> Dict[str, Any]:
"""
Test a single primitive with all formats.
Args:
primitive_type: Type of primitive to test
Returns:
Dictionary of test results for the primitive
"""
results = {
"primitive": primitive_type,
"formats": {}
}
# Generate default parameters for the primitive
params = self._get_default_parameters(primitive_type)
# Generate the SCAD code
scad_file = self.code_generator.generate_code(primitive_type, params)
# Test export to each format
for format_type in self.formats:
success, output_file, error = self.cad_exporter.export_model(
scad_file,
format_type,
params,
metadata={"primitive_type": primitive_type}
)
results["formats"][format_type] = {
"success": success,
"output_file": output_file,
"error": error
}
return results
def _get_default_parameters(self, primitive_type: str) -> Dict[str, Any]:
"""
Get default parameters for a primitive type.
Args:
primitive_type: Type of primitive
Returns:
Dictionary of default parameters
"""
params = {}
if primitive_type == "cube":
params = {"width": 20, "depth": 20, "height": 20, "center": True}
elif primitive_type == "sphere":
params = {"radius": 10, "segments": 32}
elif primitive_type == "cylinder":
params = {"radius": 10, "height": 20, "center": True, "segments": 32}
elif primitive_type == "cone":
params = {"bottom_radius": 10, "top_radius": 0, "height": 20, "center": True}
elif primitive_type == "torus":
params = {"outer_radius": 20, "inner_radius": 5, "segments": 32}
elif primitive_type == "rounded_box":
params = {"width": 30, "depth": 20, "height": 15, "radius": 3}
elif primitive_type == "hexagonal_prism":
params = {"radius": 10, "height": 20}
elif primitive_type == "text":
params = {"text": "OpenSCAD", "size": 10, "height": 3}
return params
def test_with_parameter_variations(self, primitive_type: str) -> Dict[str, Any]:
"""
Test a primitive with variations of parameters.
Args:
primitive_type: Type of primitive to test
Returns:
Dictionary of test results for different parameter variations
"""
results = {
"primitive": primitive_type,
"variations": {}
}
# Define parameter variations for the primitive
variations = self._get_parameter_variations(primitive_type)
# Test each variation
for variation_name, params in variations.items():
# Generate the SCAD code
scad_file = self.code_generator.generate_code(primitive_type, params)
# Test export to each format
format_results = {}
for format_type in self.formats:
success, output_file, error = self.cad_exporter.export_model(
scad_file,
format_type,
params,
metadata={"primitive_type": primitive_type, "variation": variation_name}
)
format_results[format_type] = {
"success": success,
"output_file": output_file,
"error": error
}
results["variations"][variation_name] = {
"parameters": params,
"formats": format_results
}
return results
def _get_parameter_variations(self, primitive_type: str) -> Dict[str, Dict[str, Any]]:
"""
Get parameter variations for a primitive type.
Args:
primitive_type: Type of primitive
Returns:
Dictionary of parameter variations
"""
variations = {}
if primitive_type == "cube":
variations = {
"small": {"width": 5, "depth": 5, "height": 5, "center": True},
"large": {"width": 50, "depth": 50, "height": 50, "center": True},
"flat": {"width": 50, "depth": 50, "height": 2, "center": True},
"tall": {"width": 10, "depth": 10, "height": 100, "center": True}
}
elif primitive_type == "sphere":
variations = {
"small": {"radius": 2, "segments": 16},
"large": {"radius": 30, "segments": 64},
"low_res": {"radius": 10, "segments": 8},
"high_res": {"radius": 10, "segments": 128}
}
elif primitive_type == "cylinder":
variations = {
"small": {"radius": 2, "height": 5, "center": True, "segments": 16},
"large": {"radius": 30, "height": 50, "center": True, "segments": 64},
"thin": {"radius": 1, "height": 50, "center": True, "segments": 32},
"disc": {"radius": 30, "height": 2, "center": True, "segments": 32}
}
# Add variations for other primitives as needed
return variations
```
--------------------------------------------------------------------------------
/src/ai/venice_api.py:
--------------------------------------------------------------------------------
```python
"""
Venice.ai API client for image generation using the Flux model.
"""
import os
import requests
import logging
from typing import Dict, Any, Optional, List, Tuple
from pathlib import Path
logger = logging.getLogger(__name__)
# Venice.ai model mapping and descriptions
VENICE_MODELS = {
# Model name: (aliases, description)
"fluently-xl": (
["fast", "quick", "fastest", "speed", "rapid", "efficient"],
"Fastest model (2.30s) with good quality"
),
"flux-dev": (
["high quality", "detailed", "hq", "best quality", "premium"],
"High-quality model with detailed results"
),
"flux-dev-uncensored": (
["uncensored", "unfiltered", "unrestricted"],
"Uncensored version of the flux-dev model"
),
"stable-diffusion-3.5": (
["stable diffusion", "sd3", "sd3.5", "standard"],
"Stable Diffusion 3.5 model"
),
"pony-realism": (
["realistic", "realism", "pony", "photorealistic"],
"Specialized model for realistic outputs"
),
"lustify-sdxl": (
["stylized", "artistic", "creative", "lustify"],
"Artistic stylization model"
),
}
class VeniceImageGenerator:
"""Client for Venice.ai's image generation API."""
def __init__(self, api_key: str, output_dir: str = "output/images"):
"""
Initialize the Venice.ai API client.
Args:
api_key: API key for Venice.ai
output_dir: Directory to store generated images
"""
self.api_key = api_key
if not self.api_key:
logger.warning("No Venice.ai API key provided")
# API endpoint from documentation
self.base_url = "https://api.venice.ai/api/v1"
self.api_endpoint = f"{self.base_url}/image/generate"
self.output_dir = output_dir
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
def map_model_preference(self, preference: str) -> str:
"""
Map a natural language preference to a Venice.ai model name.
Args:
preference: Natural language description of desired model
Returns:
Name of the matching Venice.ai model
"""
if not preference or preference.lower() in ["default", "fluently-xl", "fluently xl"]:
return "fluently-xl"
preference = preference.lower()
# Check for exact matches first
for model_name in VENICE_MODELS:
if model_name.lower() == preference:
return model_name
# Check for keyword matches
for model_name, (aliases, _) in VENICE_MODELS.items():
for alias in aliases:
if alias in preference:
return model_name
# Default to fluently-xl if no match found
return "fluently-xl"
def generate_image(self, prompt: str, model: str = "fluently-xl",
width: int = 1024, height: int = 1024,
output_path: Optional[str] = None) -> Dict[str, Any]:
"""
Generate an image using Venice.ai's API.
Args:
prompt: Text description for image generation
model: Model to use - can be a specific model name or natural language description:
- "fluently-xl" (default): Fastest model (2.30s) with good quality
- "flux-dev": High-quality model with detailed results
- "flux-dev-uncensored": Uncensored version of the flux-dev model
- "stable-diffusion-3.5": Stable Diffusion 3.5 model
- "pony-realism": Specialized model for realistic outputs
- "lustify-sdxl": Artistic stylization model
- Or use natural language like "high quality", "fastest", "realistic", etc.
width: Image width
height: Image height
output_path: Optional path to save the generated image
Returns:
Dictionary containing image data and metadata
"""
if not self.api_key:
raise ValueError("Venice.ai API key is required")
# Map the model preference to a specific model name
mapped_model = self.map_model_preference(model)
# Prepare request payload
payload = {
"model": mapped_model,
"prompt": prompt,
"height": height,
"width": width,
"steps": 20,
"return_binary": False,
"hide_watermark": True, # Remove watermark as requested
"format": "png",
"embed_exif_metadata": False
}
# Set up headers with API key
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
try:
# Make API request
logger.info(f"Sending request to {self.api_endpoint}")
response = requests.post(
self.api_endpoint,
json=payload,
headers=headers
)
# Check response status
if response.status_code != 200:
error_msg = f"Error generating image: {response.status_code} - {response.text}"
logger.error(error_msg)
return {"error": error_msg}
# Process response
result = response.json()
# Add the mapped model to the result
result["model"] = mapped_model
# Generate output path if not provided
if not output_path:
# Create a filename based on the prompt
filename = f"{prompt[:20].replace(' ', '_')}_{mapped_model}.png"
output_path = os.path.join(self.output_dir, filename)
# Save image if images array is in the result
if "images" in result and len(result["images"]) > 0:
image_url = result["images"][0]
self._download_image(image_url, output_path)
result["local_path"] = output_path
result["image_url"] = image_url
return result
except requests.exceptions.RequestException as e:
logger.error(f"Error generating image with Venice.ai: {str(e)}")
raise
def _download_image(self, image_url: str, output_path: str) -> None:
"""
Download image from URL and save to local path.
Args:
image_url: URL of the image to download
output_path: Path to save the downloaded image
"""
try:
response = requests.get(image_url, stream=True)
response.raise_for_status()
# Ensure directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"Image saved to {output_path}")
except Exception as e:
logger.error(f"Error downloading image: {str(e)}")
raise
```
--------------------------------------------------------------------------------
/old/src/ai/sam_segmentation.py:
--------------------------------------------------------------------------------
```python
"""
SAM2 (Segment Anything Model 2) integration for object segmentation.
"""
import os
import cv2
import numpy as np
import logging
from typing import Dict, Any, List, Tuple, Optional
from pathlib import Path
logger = logging.getLogger(__name__)
class SAMSegmenter:
"""
Wrapper for Segment Anything Model 2 (SAM2) for object segmentation.
"""
def __init__(self, model_type: str = "vit_h", checkpoint_path: Optional[str] = None,
use_gpu: bool = True, output_dir: str = "output/masks"):
"""
Initialize the SAM2 segmenter.
Args:
model_type: SAM2 model type ("vit_h", "vit_l", "vit_b")
checkpoint_path: Path to model checkpoint
use_gpu: Whether to use GPU for inference
output_dir: Directory to store segmentation results
"""
self.model_type = model_type
self.checkpoint_path = checkpoint_path
self.use_gpu = use_gpu
self.output_dir = output_dir
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Model will be initialized on first use to avoid loading it unnecessarily
self.model = None
self.predictor = None
def _initialize_model(self) -> None:
"""
Initialize the SAM2 model.
Note: This requires PyTorch and the segment-anything-2 package to be installed.
"""
try:
# Import here to avoid dependency issues if SAM2 is not installed
import torch
from segment_anything_2 import sam_model_registry, SamPredictor
if not self.checkpoint_path:
raise ValueError("SAM2 checkpoint path is required")
# Check if checkpoint exists
if not os.path.exists(self.checkpoint_path):
raise FileNotFoundError(f"SAM2 checkpoint not found at {self.checkpoint_path}")
# Determine device
device = "cuda" if self.use_gpu and torch.cuda.is_available() else "cpu"
# Load SAM2 model
self.model = sam_model_registry[self.model_type](checkpoint=self.checkpoint_path)
self.model.to(device=device)
self.predictor = SamPredictor(self.model)
logger.info(f"Initialized SAM2 model ({self.model_type}) on {device}")
except ImportError as e:
logger.error(f"Required packages not installed: {str(e)}")
raise
except Exception as e:
logger.error(f"Error initializing SAM2 model: {str(e)}")
raise
def segment_image(self, image_path: str, points: Optional[List[Tuple[int, int]]] = None,
output_dir: Optional[str] = None) -> Dict[str, Any]:
"""
Segment objects in an image using SAM2.
Args:
image_path: Path to input image
points: Optional list of (x, y) points to guide segmentation
output_dir: Optional directory to save segmentation results
Returns:
Dictionary containing segmentation masks and metadata
"""
# Initialize model if not already initialized
if self.model is None:
self._initialize_model()
try:
# Load image
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Could not load image from {image_path}")
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Set image in predictor
self.predictor.set_image(image)
# Generate masks
if points:
# Convert points to numpy arrays
import numpy as np
point_coords = np.array(points)
point_labels = np.ones(len(points))
# Generate masks from points
masks, scores, logits = self.predictor.predict(
point_coords=point_coords,
point_labels=point_labels,
multimask_output=True
)
else:
# Automatic segmentation (using center point)
h, w = image.shape[:2]
center_point = np.array([[w//2, h//2]])
center_label = np.array([1])
masks, scores, logits = self.predictor.predict(
point_coords=center_point,
point_labels=center_label,
multimask_output=True
)
# Use provided output directory or default
output_dir = output_dir or os.path.join(self.output_dir, Path(image_path).stem)
os.makedirs(output_dir, exist_ok=True)
# Process results
masked_images = []
for i, mask in enumerate(masks):
# Apply mask to image
masked_image = self._apply_mask_to_image(image, mask)
# Save masked image
output_path = os.path.join(output_dir, f"mask_{i}.png")
cv2.imwrite(output_path, cv2.cvtColor(masked_image, cv2.COLOR_RGB2BGR))
masked_images.append(output_path)
# Convert numpy arrays to lists for JSON serialization
result = {
"image_path": image_path,
"masked_images": masked_images,
"scores": scores.tolist(),
"mask_count": len(masks)
}
return result
except Exception as e:
logger.error(f"Error segmenting image: {str(e)}")
raise
def _apply_mask_to_image(self, image: np.ndarray, mask: np.ndarray) -> np.ndarray:
"""
Apply mask to image, keeping only the masked region.
Args:
image: Input image as numpy array
mask: Binary mask as numpy array
Returns:
Masked image as numpy array
"""
# Create a copy of the image
masked_image = image.copy()
# Apply mask
masked_image[~mask] = [0, 0, 0] # Set background to black
return masked_image
def segment_with_auto_points(self, image_path: str, num_points: int = 5,
output_dir: Optional[str] = None) -> Dict[str, Any]:
"""
Segment image using automatically generated points with SAM2.
Args:
image_path: Path to input image
num_points: Number of points to generate
output_dir: Optional directory to save segmentation results
Returns:
Dictionary containing segmentation masks and metadata
"""
# Load image
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Could not load image from {image_path}")
h, w = image.shape[:2]
# Generate points in a grid pattern
points = []
rows = int(np.sqrt(num_points))
cols = num_points // rows
for i in range(rows):
for j in range(cols):
x = int(w * (j + 0.5) / cols)
y = int(h * (i + 0.5) / rows)
points.append((x, y))
# Segment with generated points
return self.segment_image(image_path, points, output_dir)
```
--------------------------------------------------------------------------------
/src/models/cuda_mvs.py:
--------------------------------------------------------------------------------
```python
"""
CUDA Multi-View Stereo wrapper for 3D reconstruction from multiple images.
"""
import os
import subprocess
import logging
import json
from typing import Dict, Any, List, Optional
from pathlib import Path
logger = logging.getLogger(__name__)
class CUDAMultiViewStereo:
"""
Wrapper for CUDA Multi-View Stereo for 3D reconstruction from multiple images.
"""
def __init__(self, cuda_mvs_path: str, output_dir: str = "output/models"):
"""
Initialize the CUDA MVS wrapper.
Args:
cuda_mvs_path: Path to CUDA MVS installation
output_dir: Directory to store output files
"""
self.cuda_mvs_path = cuda_mvs_path
self.output_dir = output_dir
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Validate installation
self._validate_installation()
def _validate_installation(self) -> None:
"""
Validate CUDA MVS installation.
Raises:
FileNotFoundError: If CUDA MVS installation is not found
"""
if not os.path.exists(self.cuda_mvs_path):
raise FileNotFoundError(f"CUDA MVS not found at {self.cuda_mvs_path}")
# Check for required executables
required_files = ["app_patch_match_mvs"]
for file in required_files:
exec_path = os.path.join(self.cuda_mvs_path, "build", file)
if not os.path.exists(exec_path):
raise FileNotFoundError(f"Required executable {file} not found at {exec_path}")
def generate_model_from_images(self, image_paths: List[str],
camera_params: Optional[Dict[str, Any]] = None,
output_name: str = "model") -> Dict[str, Any]:
"""
Generate a 3D model from multiple images using CUDA MVS.
Args:
image_paths: List of paths to input images
camera_params: Optional camera parameters
output_name: Name for the output files
Returns:
Dictionary containing paths to generated model files
"""
try:
# Create a unique directory for this reconstruction
model_dir = os.path.join(self.output_dir, output_name)
os.makedirs(model_dir, exist_ok=True)
# Create a camera parameters file if provided
params_file = None
if camera_params:
params_file = os.path.join(model_dir, "camera_params.json")
with open(params_file, 'w') as f:
json.dump(camera_params, f, indent=2)
# Generate camera parameters if not provided
if not params_file:
params_file = self._generate_camera_params(image_paths, model_dir)
# Generate point cloud
point_cloud_file = os.path.join(model_dir, f"{output_name}.ply")
# Run CUDA MVS
cmd = [
os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"),
"--image_dir", os.path.dirname(image_paths[0]),
"--camera_params", params_file,
"--output_file", point_cloud_file
]
logger.info(f"Running CUDA MVS with command: {' '.join(cmd)}")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Wait for process to complete
stdout, stderr = process.communicate()
if process.returncode != 0:
logger.error(f"Error running CUDA MVS: {stderr}")
raise RuntimeError(f"CUDA MVS failed with exit code {process.returncode}")
# Check if output file was created
if not os.path.exists(point_cloud_file):
raise FileNotFoundError(f"Output point cloud file not found at {point_cloud_file}")
return {
"model_id": output_name,
"output_dir": model_dir,
"point_cloud_file": point_cloud_file,
"camera_params_file": params_file,
"input_images": image_paths
}
except Exception as e:
logger.error(f"Error generating 3D model with CUDA MVS: {str(e)}")
raise
def _generate_camera_params(self, image_paths: List[str], model_dir: str) -> str:
"""
Generate camera parameters from images.
Args:
image_paths: List of paths to input images
model_dir: Directory to save parameter file
Returns:
Path to camera parameters file
"""
# This is a simplified version for demonstration
# In a real implementation, this would use SfM or camera estimation
params = []
for i, img_path in enumerate(image_paths):
# Extract image dimensions
from PIL import Image
with Image.open(img_path) as img:
width, height = img.size
# Generate simple camera parameters
# In reality, these would be estimated from the images
# or provided by the user
params.append({
"image_id": i,
"image_name": os.path.basename(img_path),
"width": width,
"height": height,
"camera": {
"model": "PINHOLE",
"focal_length": min(width, height),
"principal_point": [width / 2, height / 2],
"rotation": [1, 0, 0, 0, 1, 0, 0, 0, 1],
"translation": [0, 0, 0]
}
})
# Write parameters to file
params_file = os.path.join(model_dir, "camera_params.json")
with open(params_file, 'w') as f:
json.dump(params, f, indent=2)
return params_file
def convert_ply_to_obj(self, ply_file: str, output_dir: Optional[str] = None) -> str:
"""
Convert PLY point cloud to OBJ mesh.
Args:
ply_file: Path to input PLY file
output_dir: Directory to save output OBJ file
Returns:
Path to output OBJ file
"""
# In a real implementation, this would use a mesh reconstruction library
# such as Open3D or PyMeshLab to convert the point cloud to a mesh
if not output_dir:
output_dir = os.path.dirname(ply_file)
# Generate output file path
obj_file = os.path.join(output_dir, f"{Path(ply_file).stem}.obj")
logger.info(f"Converting PLY to OBJ: {ply_file} -> {obj_file}")
# This is a placeholder for the actual conversion
# In a real implementation, you would use a library like Open3D:
# import open3d as o3d
# pcd = o3d.io.read_point_cloud(ply_file)
# mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson(pcd)[0]
# o3d.io.write_triangle_mesh(obj_file, mesh)
# For now, we'll just create a dummy OBJ file
with open(obj_file, 'w') as f:
f.write(f"# Converted from {os.path.basename(ply_file)}\n")
f.write("# This is a placeholder OBJ file\n")
f.write("v 0 0 0\n")
f.write("v 1 0 0\n")
f.write("v 0 1 0\n")
f.write("f 1 2 3\n")
return obj_file
```
--------------------------------------------------------------------------------
/src/utils/format_validator.py:
--------------------------------------------------------------------------------
```python
import os
import logging
import zipfile
import xml.etree.ElementTree as ET
from typing import Tuple, Optional, Dict, Any
logger = logging.getLogger(__name__)
class FormatValidator:
"""Validates 3D model formats for compatibility with printers."""
@staticmethod
def validate_3mf(file_path: str) -> Tuple[bool, Optional[str]]:
"""
Validate a 3MF file for compatibility with Prusa and Bambu printers.
Args:
file_path: Path to the 3MF file
Returns:
Tuple of (is_valid, error_message)
"""
if not os.path.exists(file_path):
return False, f"File not found: {file_path}"
try:
# 3MF files are ZIP archives with XML content
with zipfile.ZipFile(file_path, 'r') as zip_ref:
# Check for required files
required_files = ['3D/3dmodel.model', '[Content_Types].xml']
for req_file in required_files:
try:
zip_ref.getinfo(req_file)
except KeyError:
return False, f"Missing required file in 3MF: {req_file}"
# Validate 3D model file
with zip_ref.open('3D/3dmodel.model') as model_file:
tree = ET.parse(model_file)
root = tree.getroot()
# Check for required elements
if root.tag != '{http://schemas.microsoft.com/3dmanufacturing/core/2015/02}model':
return False, "Invalid 3MF: Missing model element"
# Verify resources section exists
resources = root.find('.//{http://schemas.microsoft.com/3dmanufacturing/core/2015/02}resources')
if resources is None:
return False, "Invalid 3MF: Missing resources element"
return True, None
except Exception as e:
logger.error(f"Error validating 3MF file: {str(e)}")
return False, f"Error validating 3MF file: {str(e)}"
@staticmethod
def validate_amf(file_path: str) -> Tuple[bool, Optional[str]]:
"""
Validate an AMF file for compatibility with printers.
Args:
file_path: Path to the AMF file
Returns:
Tuple of (is_valid, error_message)
"""
if not os.path.exists(file_path):
return False, f"File not found: {file_path}"
try:
# Parse the AMF file (XML format)
tree = ET.parse(file_path)
root = tree.getroot()
# Check for required elements
if root.tag != 'amf':
return False, "Invalid AMF: Missing amf root element"
# Check for at least one object
objects = root.findall('./object')
if not objects:
return False, "Invalid AMF: No objects found"
# Check that each object has a mesh
for obj in objects:
mesh = obj.find('./mesh')
if mesh is None:
return False, f"Invalid AMF: Object {obj.get('id', 'unknown')} is missing a mesh"
# Check for vertices and volumes
vertices = mesh.find('./vertices')
volumes = mesh.findall('./volume')
if vertices is None:
return False, f"Invalid AMF: Mesh in object {obj.get('id', 'unknown')} is missing vertices"
if not volumes:
return False, f"Invalid AMF: Mesh in object {obj.get('id', 'unknown')} has no volumes"
return True, None
except Exception as e:
logger.error(f"Error validating AMF file: {str(e)}")
return False, f"Error validating AMF file: {str(e)}"
@staticmethod
def extract_metadata(file_path: str) -> Dict[str, Any]:
"""
Extract metadata from a 3MF or AMF file.
Args:
file_path: Path to the 3D model file
Returns:
Dictionary of metadata
"""
metadata = {}
# Check file extension
ext = os.path.splitext(file_path)[1].lower()
try:
if ext == '.3mf':
with zipfile.ZipFile(file_path, 'r') as zip_ref:
metadata_path = "Metadata/model_metadata.xml"
try:
with zip_ref.open(metadata_path) as f:
tree = ET.parse(f)
root = tree.getroot()
for meta in root.findall('./meta'):
name = meta.get('name')
if name:
metadata[name] = meta.text
except KeyError:
# Metadata file doesn't exist
pass
elif ext == '.amf':
tree = ET.parse(file_path)
root = tree.getroot()
for meta in root.findall('./metadata'):
name = meta.get('type')
if name:
metadata[name] = meta.text
except Exception as e:
logger.error(f"Error extracting metadata: {str(e)}")
return metadata
@staticmethod
def check_printer_compatibility(file_path: str, printer_type: str = "prusa") -> Tuple[bool, Optional[str]]:
"""
Check if a 3D model file is compatible with a specific printer type.
Args:
file_path: Path to the 3D model file
printer_type: Type of printer ("prusa" or "bambu")
Returns:
Tuple of (is_compatible, error_message)
"""
# Check file extension
ext = os.path.splitext(file_path)[1].lower()
# Validate based on file format
if ext == '.3mf':
is_valid, error = FormatValidator.validate_3mf(file_path)
if not is_valid:
return False, error
# Additional printer-specific checks
if printer_type.lower() == "prusa":
# Prusa-specific checks for 3MF
# For now, just basic validation is sufficient
return True, None
elif printer_type.lower() == "bambu":
# Bambu-specific checks for 3MF
# For now, just basic validation is sufficient
return True, None
else:
return False, f"Unknown printer type: {printer_type}"
elif ext == '.amf':
is_valid, error = FormatValidator.validate_amf(file_path)
if not is_valid:
return False, error
# Additional printer-specific checks
if printer_type.lower() == "prusa":
# Prusa-specific checks for AMF
# For now, just basic validation is sufficient
return True, None
elif printer_type.lower() == "bambu":
# Bambu-specific checks for AMF
# For now, just basic validation is sufficient
return True, None
else:
return False, f"Unknown printer type: {printer_type}"
else:
return False, f"Unsupported file format for printer compatibility check: {ext}"
```
--------------------------------------------------------------------------------
/old/src/models/threestudio_generator.py:
--------------------------------------------------------------------------------
```python
"""
threestudio integration for 3D model generation from images.
"""
import os
import subprocess
import logging
import json
import tempfile
from typing import Dict, Any, List, Optional
from pathlib import Path
logger = logging.getLogger(__name__)
class ThreeStudioGenerator:
"""
Wrapper for threestudio for 3D model generation from images.
"""
def __init__(self, threestudio_path: str, output_dir: str = "output/models"):
"""
Initialize the threestudio generator.
Args:
threestudio_path: Path to threestudio installation
output_dir: Directory to store output files
"""
self.threestudio_path = threestudio_path
self.output_dir = output_dir
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Validate threestudio installation
self._validate_installation()
def _validate_installation(self) -> None:
"""
Validate threestudio installation.
Raises:
FileNotFoundError: If threestudio installation is not found
"""
if not os.path.exists(self.threestudio_path):
raise FileNotFoundError(f"threestudio not found at {self.threestudio_path}")
# Check for required files
required_files = ["launch.py", "README.md"]
for file in required_files:
if not os.path.exists(os.path.join(self.threestudio_path, file)):
raise FileNotFoundError(f"Required file {file} not found in threestudio directory")
def generate_model_from_image(self, image_path: str, method: str = "zero123",
num_iterations: int = 5000, export_format: str = "obj",
config_overrides: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Generate a 3D model from an image using threestudio.
Args:
image_path: Path to input image
method: Method to use ("zero123", "sjc", "magic3d", etc.)
num_iterations: Number of training iterations
export_format: Format to export ("obj", "glb", "ply")
config_overrides: Optional configuration overrides
Returns:
Dictionary containing paths to generated model files
"""
try:
# Create a unique ID for this generation
model_id = Path(image_path).stem
# Create a temporary config file
config_file = self._create_config_file(image_path, method, num_iterations, config_overrides)
# Run threestudio
output_dir = os.path.join(self.output_dir, model_id)
os.makedirs(output_dir, exist_ok=True)
cmd = [
"python", "launch.py",
"--config", config_file,
"--train",
"--gpu", "0",
"--output_dir", output_dir
]
logger.info(f"Running threestudio with command: {' '.join(cmd)}")
# Execute in threestudio directory
process = subprocess.Popen(
cmd,
cwd=self.threestudio_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Wait for process to complete
stdout, stderr = process.communicate()
if process.returncode != 0:
logger.error(f"Error running threestudio: {stderr}")
raise RuntimeError(f"threestudio failed with exit code {process.returncode}")
# Export model
exported_files = self._export_model(output_dir, export_format)
return {
"model_id": model_id,
"output_dir": output_dir,
"exported_files": exported_files,
"preview_images": self._get_preview_images(output_dir)
}
except Exception as e:
logger.error(f"Error generating 3D model with threestudio: {str(e)}")
raise
def _create_config_file(self, image_path: str, method: str, num_iterations: int,
config_overrides: Optional[Dict[str, Any]] = None) -> str:
"""
Create a configuration file for threestudio.
Args:
image_path: Path to input image
method: Method to use
num_iterations: Number of training iterations
config_overrides: Optional configuration overrides
Returns:
Path to the created configuration file
"""
# Base configuration
config = {
"method": method,
"image_path": os.path.abspath(image_path),
"num_iterations": num_iterations,
"save_interval": 1000,
"export_interval": 1000
}
# Apply overrides
if config_overrides:
config.update(config_overrides)
# Write to temporary file
fd, config_file = tempfile.mkstemp(suffix=".json")
with os.fdopen(fd, 'w') as f:
json.dump(config, f, indent=2)
return config_file
def _export_model(self, output_dir: str, export_format: str) -> List[str]:
"""
Export the model in the specified format.
Args:
output_dir: Directory containing the model
export_format: Format to export
Returns:
List of paths to exported files
"""
# Find the latest checkpoint
checkpoints_dir = os.path.join(output_dir, "checkpoints")
if not os.path.exists(checkpoints_dir):
raise FileNotFoundError(f"Checkpoints directory not found: {checkpoints_dir}")
# Get the latest checkpoint
checkpoints = sorted([f for f in os.listdir(checkpoints_dir) if f.endswith(".ckpt")])
if not checkpoints:
raise FileNotFoundError("No checkpoints found")
latest_checkpoint = os.path.join(checkpoints_dir, checkpoints[-1])
# Export command
cmd = [
"python", "launch.py",
"--config", os.path.join(output_dir, "config.yaml"),
"--export",
"--gpu", "0",
"--checkpoint", latest_checkpoint,
"--export_format", export_format
]
logger.info(f"Exporting model with command: {' '.join(cmd)}")
# Execute in threestudio directory
process = subprocess.Popen(
cmd,
cwd=self.threestudio_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Wait for process to complete
stdout, stderr = process.communicate()
if process.returncode != 0:
logger.error(f"Error exporting model: {stderr}")
raise RuntimeError(f"Model export failed with exit code {process.returncode}")
# Find exported files
exports_dir = os.path.join(output_dir, "exports")
if not os.path.exists(exports_dir):
raise FileNotFoundError(f"Exports directory not found: {exports_dir}")
exported_files = [os.path.join(exports_dir, f) for f in os.listdir(exports_dir)]
return exported_files
def _get_preview_images(self, output_dir: str) -> List[str]:
"""
Get paths to preview images.
Args:
output_dir: Directory containing the model
Returns:
List of paths to preview images
"""
# Find preview images
previews_dir = os.path.join(output_dir, "images")
if not os.path.exists(previews_dir):
return []
preview_images = [os.path.join(previews_dir, f) for f in os.listdir(previews_dir)
if f.endswith(".png") or f.endswith(".jpg")]
return sorted(preview_images)
```
--------------------------------------------------------------------------------
/test_image_approval_workflow.py:
--------------------------------------------------------------------------------
```python
"""
Test script for the image approval workflow.
"""
import os
import sys
import json
import logging
import unittest
from unittest.mock import patch, MagicMock
from pathlib import Path
# Add the src directory to the path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.workflow.image_approval import ImageApprovalManager
from src.config import IMAGE_APPROVAL
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TestImageApprovalWorkflow(unittest.TestCase):
"""
Test cases for the image approval workflow.
"""
def setUp(self):
"""
Set up test environment.
"""
# Create test output directories
self.test_output_dir = "output/test_image_approval"
self.test_approved_dir = os.path.join(self.test_output_dir, "approved")
os.makedirs(self.test_output_dir, exist_ok=True)
os.makedirs(self.test_approved_dir, exist_ok=True)
# Create the approval manager
self.approval_manager = ImageApprovalManager(
output_dir=self.test_output_dir,
approved_dir=self.test_approved_dir,
min_approved_images=3,
auto_approve=False
)
# Create test images
self.test_images = []
for i in range(5):
image_path = os.path.join(self.test_output_dir, f"test_image_{i}.png")
with open(image_path, "w") as f:
f.write(f"test image data {i}")
self.test_images.append({
"id": f"image_{i}",
"local_path": image_path,
"view_index": i,
"view_direction": f"view_{i}"
})
def test_add_images(self):
"""
Test adding images to the approval manager.
"""
# Add images
self.approval_manager.add_images(self.test_images)
# Verify images were added
self.assertEqual(len(self.approval_manager.images), 5)
self.assertEqual(len(self.approval_manager.pending_images), 5)
self.assertEqual(len(self.approval_manager.approved_images), 0)
self.assertEqual(len(self.approval_manager.rejected_images), 0)
def test_approve_image(self):
"""
Test approving an image.
"""
# Add images
self.approval_manager.add_images(self.test_images)
# Approve an image
result = self.approval_manager.approve_image("image_0")
# Verify the result
self.assertTrue(result["success"])
self.assertEqual(result["image_id"], "image_0")
self.assertEqual(result["status"], "approved")
# Verify the image was moved to approved
self.assertEqual(len(self.approval_manager.pending_images), 4)
self.assertEqual(len(self.approval_manager.approved_images), 1)
self.assertEqual(len(self.approval_manager.rejected_images), 0)
# Verify the image was copied to the approved directory
approved_path = os.path.join(self.test_approved_dir, "image_0.png")
self.assertTrue(os.path.exists(approved_path))
def test_reject_image(self):
"""
Test rejecting an image.
"""
# Add images
self.approval_manager.add_images(self.test_images)
# Reject an image
result = self.approval_manager.reject_image("image_1")
# Verify the result
self.assertTrue(result["success"])
self.assertEqual(result["image_id"], "image_1")
self.assertEqual(result["status"], "rejected")
# Verify the image was moved to rejected
self.assertEqual(len(self.approval_manager.pending_images), 4)
self.assertEqual(len(self.approval_manager.approved_images), 0)
self.assertEqual(len(self.approval_manager.rejected_images), 1)
def test_get_approval_status(self):
"""
Test getting the approval status.
"""
# Add images
self.approval_manager.add_images(self.test_images)
# Approve some images
self.approval_manager.approve_image("image_0")
self.approval_manager.approve_image("image_1")
self.approval_manager.approve_image("image_2")
# Reject an image
self.approval_manager.reject_image("image_3")
# Get the status
status = self.approval_manager.get_status()
# Verify the status
self.assertEqual(status["total_images"], 5)
self.assertEqual(status["pending_count"], 1)
self.assertEqual(status["approved_count"], 3)
self.assertEqual(status["rejected_count"], 1)
self.assertTrue(status["has_minimum_approved"])
self.assertEqual(len(status["approved_images"]), 3)
self.assertEqual(len(status["pending_images"]), 1)
self.assertEqual(len(status["rejected_images"]), 1)
def test_get_approved_images(self):
"""
Test getting approved images.
"""
# Add images
self.approval_manager.add_images(self.test_images)
# Approve some images
self.approval_manager.approve_image("image_0")
self.approval_manager.approve_image("image_2")
self.approval_manager.approve_image("image_4")
# Get approved images
approved = self.approval_manager.get_approved_images()
# Verify approved images
self.assertEqual(len(approved), 3)
self.assertEqual(approved[0]["id"], "image_0")
self.assertEqual(approved[1]["id"], "image_2")
self.assertEqual(approved[2]["id"], "image_4")
def test_auto_approve(self):
"""
Test auto-approval mode.
"""
# Create an auto-approve manager
auto_manager = ImageApprovalManager(
output_dir=self.test_output_dir,
approved_dir=self.test_approved_dir,
min_approved_images=3,
auto_approve=True
)
# Add images
auto_manager.add_images(self.test_images)
# Verify all images were auto-approved
self.assertEqual(len(auto_manager.pending_images), 0)
self.assertEqual(len(auto_manager.approved_images), 5)
self.assertEqual(len(auto_manager.rejected_images), 0)
def test_has_minimum_approved(self):
"""
Test checking if minimum approved images are met.
"""
# Add images
self.approval_manager.add_images(self.test_images)
# Initially should not have minimum
self.assertFalse(self.approval_manager.has_minimum_approved())
# Approve two images
self.approval_manager.approve_image("image_0")
self.approval_manager.approve_image("image_1")
# Still should not have minimum
self.assertFalse(self.approval_manager.has_minimum_approved())
# Approve one more image
self.approval_manager.approve_image("image_2")
# Now should have minimum
self.assertTrue(self.approval_manager.has_minimum_approved())
def test_save_and_load_state(self):
"""
Test saving and loading the approval state.
"""
# Add images
self.approval_manager.add_images(self.test_images)
# Approve and reject some images
self.approval_manager.approve_image("image_0")
self.approval_manager.approve_image("image_2")
self.approval_manager.reject_image("image_3")
# Save the state
state_file = os.path.join(self.test_output_dir, "approval_state.json")
self.approval_manager.save_state(state_file)
# Create a new manager
new_manager = ImageApprovalManager(
output_dir=self.test_output_dir,
approved_dir=self.test_approved_dir,
min_approved_images=3,
auto_approve=False
)
# Load the state
new_manager.load_state(state_file)
# Verify the state was loaded correctly
self.assertEqual(len(new_manager.images), 5)
self.assertEqual(len(new_manager.pending_images), 2)
self.assertEqual(len(new_manager.approved_images), 2)
self.assertEqual(len(new_manager.rejected_images), 1)
def tearDown(self):
"""
Clean up after tests.
"""
# Clean up test output directory
import shutil
if os.path.exists(self.test_output_dir):
shutil.rmtree(self.test_output_dir)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/old/src/workflow/image_to_model_pipeline.py:
--------------------------------------------------------------------------------
```python
"""
Workflow orchestration for the image-to-model pipeline.
"""
import os
import logging
import uuid
from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path
logger = logging.getLogger(__name__)
class ImageToModelPipeline:
"""
Orchestrates the workflow from text prompt to 3D model:
1. Generate image with Venice.ai
2. Segment object with SAM2
3. Create 3D model with threestudio
4. Convert to OpenSCAD for parametric editing
"""
def __init__(self,
venice_generator,
sam_segmenter,
threestudio_generator,
openscad_wrapper,
output_dir: str = "output/pipeline"):
"""
Initialize the pipeline.
Args:
venice_generator: Instance of VeniceImageGenerator
sam_segmenter: Instance of SAMSegmenter
threestudio_generator: Instance of ThreeStudioGenerator
openscad_wrapper: Instance of OpenSCADWrapper
output_dir: Directory to store output files
"""
self.venice_generator = venice_generator
self.sam_segmenter = sam_segmenter
self.threestudio_generator = threestudio_generator
self.openscad_wrapper = openscad_wrapper
self.output_dir = output_dir
# Create output directories
os.makedirs(os.path.join(output_dir, "images"), exist_ok=True)
os.makedirs(os.path.join(output_dir, "masks"), exist_ok=True)
os.makedirs(os.path.join(output_dir, "models"), exist_ok=True)
os.makedirs(os.path.join(output_dir, "scad"), exist_ok=True)
def generate_model_from_text(self, prompt: str,
venice_params: Optional[Dict[str, Any]] = None,
sam_params: Optional[Dict[str, Any]] = None,
threestudio_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Generate a 3D model from a text prompt.
Args:
prompt: Text description for image generation
venice_params: Optional parameters for Venice.ai
sam_params: Optional parameters for SAM2
threestudio_params: Optional parameters for threestudio
Returns:
Dictionary containing paths to generated files and metadata
"""
try:
# Generate a unique ID for this pipeline run
pipeline_id = str(uuid.uuid4())
logger.info(f"Starting pipeline {pipeline_id} for prompt: {prompt}")
# Step 1: Generate image with Venice.ai
image_path = os.path.join(self.output_dir, "images", f"{pipeline_id}.png")
venice_result = self._generate_image(prompt, image_path, venice_params)
# Step 2: Segment object with SAM2
masks_dir = os.path.join(self.output_dir, "masks", pipeline_id)
sam_result = self._segment_image(image_path, masks_dir, sam_params)
# Get the best mask (highest score or first mask if no scores)
if "scores" in sam_result and sam_result["scores"]:
best_mask_idx = sam_result["scores"].index(max(sam_result["scores"]))
best_mask_path = sam_result["mask_paths"][best_mask_idx]
else:
# If no scores available, use the first mask
best_mask_path = sam_result["mask_paths"][0] if sam_result.get("mask_paths") else None
if not best_mask_path:
raise ValueError("No valid mask generated from segmentation")
# Step 3: Create 3D model with threestudio
threestudio_result = self._generate_3d_model(best_mask_path, threestudio_params)
# Step 4: Convert to OpenSCAD for parametric editing
scad_result = self._convert_to_openscad(threestudio_result["exported_files"][0], pipeline_id)
# Compile results
result = {
"pipeline_id": pipeline_id,
"prompt": prompt,
"image": venice_result,
"segmentation": sam_result,
"model_3d": threestudio_result,
"openscad": scad_result
}
logger.info(f"Pipeline {pipeline_id} completed successfully")
return result
except Exception as e:
logger.error(f"Error in pipeline: {str(e)}")
raise
def _generate_image(self, prompt: str, output_path: str,
params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Generate image with Venice.ai.
Args:
prompt: Text description for image generation
output_path: Path to save the generated image
params: Optional parameters for Venice.ai
Returns:
Dictionary containing image data and metadata
"""
logger.info(f"Generating image for prompt: {prompt}")
# Default parameters
default_params = {
"model": "fluently-xl", # Default to fastest model
"width": 1024,
"height": 1024
}
# Merge with provided parameters
if params:
default_params.update(params)
# Generate image
result = self.venice_generator.generate_image(
prompt=prompt,
output_path=output_path,
**default_params
)
logger.info(f"Image generated: {output_path}")
return result
def _segment_image(self, image_path: str, output_dir: str,
params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Segment object with SAM2.
Args:
image_path: Path to input image
output_dir: Directory to save segmentation results
params: Optional parameters for SAM2
Returns:
Dictionary containing segmentation masks and metadata
"""
logger.info(f"Segmenting image: {image_path}")
# Segment image with SAM2
# Check if points are provided in params
points = params.get("points") if params else None
if points:
result = self.sam_segmenter.segment_image(
image_path=image_path,
points=points,
output_dir=output_dir
)
else:
# Use automatic point generation
result = self.sam_segmenter.segment_with_auto_points(
image_path=image_path,
output_dir=output_dir
)
logger.info(f"Image segmented, {result.get('num_masks', 0)} masks generated")
return result
def _generate_3d_model(self, image_path: str,
params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Generate 3D model with threestudio.
Args:
image_path: Path to input image
params: Optional parameters for threestudio
Returns:
Dictionary containing paths to generated model files
"""
logger.info(f"Generating 3D model from image: {image_path}")
# Default parameters
default_params = {
"method": "zero123",
"num_iterations": 5000,
"export_format": "obj"
}
# Merge with provided parameters
if params:
default_params.update(params)
# Generate 3D model
result = self.threestudio_generator.generate_model_from_image(
image_path=image_path,
**default_params
)
logger.info(f"3D model generated: {result['exported_files']}")
return result
def _convert_to_openscad(self, model_path: str, model_id: str) -> Dict[str, Any]:
"""
Convert 3D model to OpenSCAD format.
Args:
model_path: Path to input model
model_id: Unique identifier for the model
Returns:
Dictionary containing paths to generated files
"""
logger.info(f"Converting model to OpenSCAD: {model_path}")
# Generate OpenSCAD code for importing the model
scad_code = f"""// Generated OpenSCAD code for model {model_id}
// Imported from {os.path.basename(model_path)}
// Parameters
scale_factor = 1.0;
position_x = 0;
position_y = 0;
position_z = 0;
rotation_x = 0;
rotation_y = 0;
rotation_z = 0;
// Import and transform the model
translate([position_x, position_y, position_z])
rotate([rotation_x, rotation_y, rotation_z])
scale(scale_factor)
import("{model_path}");
"""
# Save SCAD code to file
scad_file = self.openscad_wrapper.generate_scad(scad_code, model_id)
# Generate previews
previews = self.openscad_wrapper.generate_multi_angle_previews(scad_file)
return {
"scad_file": scad_file,
"previews": previews,
"model_path": model_path
}
```
--------------------------------------------------------------------------------
/test_remote_cuda_mvs.py:
--------------------------------------------------------------------------------
```python
"""
Test script for remote CUDA Multi-View Stereo processing.
"""
import os
import sys
import logging
import unittest
from unittest.mock import patch, MagicMock
from pathlib import Path
# Add the src directory to the path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.remote.cuda_mvs_client import CUDAMVSClient
from src.remote.connection_manager import CUDAMVSConnectionManager
from src.config import REMOTE_CUDA_MVS
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TestRemoteCUDAMVS(unittest.TestCase):
"""
Test cases for remote CUDA Multi-View Stereo processing.
"""
def setUp(self):
"""
Set up test environment.
"""
# Create test output directories
self.test_output_dir = "output/test_remote_cuda_mvs"
os.makedirs(self.test_output_dir, exist_ok=True)
# Mock API key
self.api_key = "test_api_key"
# Create the client with the mock API key
self.client = CUDAMVSClient(
api_key=self.api_key,
output_dir=self.test_output_dir
)
# Create the connection manager with the mock API key
self.connection_manager = CUDAMVSConnectionManager(
api_key=self.api_key,
discovery_port=REMOTE_CUDA_MVS["DISCOVERY_PORT"],
use_lan_discovery=True
)
@patch('src.remote.cuda_mvs_client.requests.post')
def test_upload_images(self, mock_post):
"""
Test uploading images to a remote CUDA MVS server.
"""
# Mock response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"job_id": "test_job_123",
"status": "uploaded",
"message": "Images uploaded successfully"
}
mock_post.return_value = mock_response
# Test parameters
server_url = "http://test-server:8765"
image_paths = [
os.path.join(self.test_output_dir, "test_image_1.png"),
os.path.join(self.test_output_dir, "test_image_2.png")
]
# Create test images
for path in image_paths:
with open(path, "w") as f:
f.write("test image data")
# Call the method
result = self.client.upload_images(server_url, image_paths)
# Verify the result
self.assertIsNotNone(result)
self.assertEqual(result["job_id"], "test_job_123")
self.assertEqual(result["status"], "uploaded")
# Verify the API call
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
self.assertTrue(server_url in args[0])
self.assertEqual(kwargs["headers"]["Authorization"], f"Bearer {self.api_key}")
@patch('src.remote.cuda_mvs_client.requests.post')
def test_process_job(self, mock_post):
"""
Test processing a job on a remote CUDA MVS server.
"""
# Mock response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"job_id": "test_job_123",
"status": "processing",
"message": "Job started processing"
}
mock_post.return_value = mock_response
# Test parameters
server_url = "http://test-server:8765"
job_id = "test_job_123"
params = {
"quality": "normal",
"output_format": "obj"
}
# Call the method
result = self.client.process_job(server_url, job_id, params)
# Verify the result
self.assertIsNotNone(result)
self.assertEqual(result["job_id"], "test_job_123")
self.assertEqual(result["status"], "processing")
# Verify the API call
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
self.assertTrue(server_url in args[0])
self.assertEqual(kwargs["headers"]["Authorization"], f"Bearer {self.api_key}")
self.assertEqual(kwargs["json"]["job_id"], job_id)
self.assertEqual(kwargs["json"]["params"]["quality"], "normal")
@patch('src.remote.cuda_mvs_client.requests.get')
def test_get_job_status(self, mock_get):
"""
Test getting the status of a job on a remote CUDA MVS server.
"""
# Mock response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"job_id": "test_job_123",
"status": "completed",
"progress": 100,
"message": "Job completed successfully"
}
mock_get.return_value = mock_response
# Test parameters
server_url = "http://test-server:8765"
job_id = "test_job_123"
# Call the method
result = self.client.get_job_status(server_url, job_id)
# Verify the result
self.assertIsNotNone(result)
self.assertEqual(result["job_id"], "test_job_123")
self.assertEqual(result["status"], "completed")
self.assertEqual(result["progress"], 100)
# Verify the API call
mock_get.assert_called_once()
args, kwargs = mock_get.call_args
self.assertTrue(server_url in args[0])
self.assertEqual(kwargs["headers"]["Authorization"], f"Bearer {self.api_key}")
@patch('src.remote.cuda_mvs_client.requests.get')
def test_download_model(self, mock_get):
"""
Test downloading a model from a remote CUDA MVS server.
"""
# Mock response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.content = b"test model data"
mock_get.return_value = mock_response
# Test parameters
server_url = "http://test-server:8765"
job_id = "test_job_123"
output_dir = os.path.join(self.test_output_dir, "models")
os.makedirs(output_dir, exist_ok=True)
# Call the method
result = self.client.download_model(server_url, job_id, output_dir)
# Verify the result
self.assertIsNotNone(result)
self.assertTrue("model_path" in result)
self.assertTrue(os.path.exists(result["model_path"]))
# Verify the API call
mock_get.assert_called()
args, kwargs = mock_get.call_args_list[0]
self.assertTrue(server_url in args[0])
self.assertEqual(kwargs["headers"]["Authorization"], f"Bearer {self.api_key}")
@patch('src.remote.connection_manager.zeroconf.Zeroconf')
def test_discover_servers(self, mock_zeroconf):
"""
Test discovering CUDA MVS servers on the network.
"""
# Mock Zeroconf
mock_zeroconf_instance = MagicMock()
mock_zeroconf.return_value = mock_zeroconf_instance
# Mock ServiceBrowser
with patch('src.remote.connection_manager.ServiceBrowser') as mock_browser:
# Set up the connection manager to discover servers
self.connection_manager.discover_servers()
# Verify Zeroconf was initialized
mock_zeroconf.assert_called_once()
# Verify ServiceBrowser was initialized
mock_browser.assert_called_once()
args, kwargs = mock_browser.call_args
self.assertEqual(args[0], mock_zeroconf_instance)
self.assertEqual(args[1], "_cudamvs._tcp.local.")
@patch('src.remote.connection_manager.CUDAMVSClient')
def test_upload_images_with_connection_manager(self, mock_client_class):
"""
Test uploading images using the connection manager.
"""
# Mock client
mock_client = MagicMock()
mock_client_class.return_value = mock_client
# Mock upload_images method
mock_client.upload_images.return_value = {
"job_id": "test_job_123",
"status": "uploaded",
"message": "Images uploaded successfully"
}
# Add a mock server
self.connection_manager.servers = {
"test_server": {
"id": "test_server",
"name": "Test Server",
"url": "http://test-server:8765",
"status": "online"
}
}
# Test parameters
server_id = "test_server"
image_paths = [
os.path.join(self.test_output_dir, "test_image_1.png"),
os.path.join(self.test_output_dir, "test_image_2.png")
]
# Create test images
for path in image_paths:
with open(path, "w") as f:
f.write("test image data")
# Call the method
result = self.connection_manager.upload_images(server_id, image_paths)
# Verify the result
self.assertIsNotNone(result)
self.assertEqual(result["job_id"], "test_job_123")
self.assertEqual(result["status"], "uploaded")
# Verify the client method was called
mock_client.upload_images.assert_called_once()
args, kwargs = mock_client.upload_images.call_args
self.assertEqual(args[0], "http://test-server:8765")
self.assertEqual(args[1], image_paths)
def tearDown(self):
"""
Clean up after tests.
"""
# Clean up test output directory
import shutil
if os.path.exists(self.test_output_dir):
shutil.rmtree(self.test_output_dir)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/src/utils/cad_exporter.py:
--------------------------------------------------------------------------------
```python
import os
import logging
import subprocess
from typing import Dict, Any, Optional, Tuple, List
logger = logging.getLogger(__name__)
class CADExporter:
"""
Exports OpenSCAD models to various CAD formats that preserve parametric properties.
"""
def __init__(self, openscad_path: str = "openscad"):
"""
Initialize the CAD exporter.
Args:
openscad_path: Path to the OpenSCAD executable
"""
self.openscad_path = openscad_path
# Supported export formats
self.supported_formats = {
"csg": "OpenSCAD CSG format (preserves all parametric properties)",
"amf": "Additive Manufacturing File Format (preserves some metadata)",
"3mf": "3D Manufacturing Format (modern replacement for STL with metadata)",
"scad": "OpenSCAD source code (fully parametric)",
"dxf": "Drawing Exchange Format (for 2D designs)",
"svg": "Scalable Vector Graphics (for 2D designs)"
}
def export_model(self, scad_file: str, output_format: str = "csg",
parameters: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None) -> Tuple[bool, str, Optional[str]]:
"""
Export an OpenSCAD model to the specified format.
Args:
scad_file: Path to the SCAD file
output_format: Format to export to (csg, amf, 3mf, etc.)
parameters: Optional parameters to override in the SCAD file
metadata: Optional metadata to include in the export
Returns:
Tuple of (success, output_file_path, error_message)
"""
if not os.path.exists(scad_file):
return False, "", f"SCAD file not found: {scad_file}"
# Create output file path
output_dir = os.path.dirname(scad_file)
model_id = os.path.basename(scad_file).split('.')[0]
# Special case for SCAD format - just copy the file with parameters embedded
if output_format.lower() == "scad" and parameters:
return self._export_parametric_scad(scad_file, parameters, metadata)
# For native OpenSCAD formats
output_file = os.path.join(output_dir, f"{model_id}.{output_format.lower()}")
# Build command
cmd = [self.openscad_path, "-o", output_file]
# Add parameters if provided
if parameters:
for key, value in parameters.items():
cmd.extend(["-D", f"{key}={value}"])
# Add input file
cmd.append(scad_file)
try:
# Run OpenSCAD
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
# Check if file was created
if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
logger.info(f"Exported model to {output_format}: {output_file}")
# Add metadata if supported and provided
if metadata and output_format.lower() in ["amf", "3mf"]:
self._add_metadata_to_file(output_file, metadata, output_format)
return True, output_file, None
else:
error_msg = f"Failed to export model to {output_format}"
logger.error(error_msg)
logger.error(f"OpenSCAD output: {result.stdout}")
logger.error(f"OpenSCAD error: {result.stderr}")
return False, "", error_msg
except subprocess.CalledProcessError as e:
error_msg = f"Error exporting model to {output_format}: {e.stderr}"
logger.error(error_msg)
return False, "", error_msg
except Exception as e:
error_msg = f"Error exporting model to {output_format}: {str(e)}"
logger.error(error_msg)
return False, "", error_msg
def _export_parametric_scad(self, scad_file: str, parameters: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None) -> Tuple[bool, str, Optional[str]]:
"""
Create a new SCAD file with parameters embedded as variables.
Args:
scad_file: Path to the original SCAD file
parameters: Parameters to embed in the SCAD file
metadata: Optional metadata to include as comments
Returns:
Tuple of (success, output_file_path, error_message)
"""
try:
# Read the original SCAD file
with open(scad_file, 'r') as f:
content = f.read()
# Create output file path
output_dir = os.path.dirname(scad_file)
model_id = os.path.basename(scad_file).split('.')[0]
output_file = os.path.join(output_dir, f"{model_id}_parametric.scad")
# Create parameter declarations
param_declarations = []
for key, value in parameters.items():
if isinstance(value, str):
param_declarations.append(f'{key} = "{value}";')
else:
param_declarations.append(f'{key} = {value};')
# Create metadata comments
metadata_comments = []
if metadata:
metadata_comments.append("// Metadata:")
for key, value in metadata.items():
metadata_comments.append(f"// {key}: {value}")
# Combine everything
new_content = "// Parametric model generated by OpenSCAD MCP Server\n"
new_content += "\n".join(metadata_comments) + "\n\n" if metadata_comments else "\n"
new_content += "// Parameters:\n"
new_content += "\n".join(param_declarations) + "\n\n"
new_content += content
# Write to the new file
with open(output_file, 'w') as f:
f.write(new_content)
logger.info(f"Exported parametric SCAD file: {output_file}")
return True, output_file, None
except Exception as e:
error_msg = f"Error creating parametric SCAD file: {str(e)}"
logger.error(error_msg)
return False, "", error_msg
def _add_metadata_to_file(self, file_path: str, metadata: Dict[str, Any], format_type: str) -> None:
"""
Add metadata to supported file formats.
Args:
file_path: Path to the file
metadata: Metadata to add
format_type: File format
"""
if format_type.lower() == "amf":
self._add_metadata_to_amf(file_path, metadata)
elif format_type.lower() == "3mf":
self._add_metadata_to_3mf(file_path, metadata)
def _add_metadata_to_amf(self, file_path: str, metadata: Dict[str, Any]) -> None:
"""Add metadata to AMF file."""
try:
import xml.etree.ElementTree as ET
# Parse the AMF file
tree = ET.parse(file_path)
root = tree.getroot()
# Find or create metadata element
metadata_elem = root.find("metadata")
if metadata_elem is None:
metadata_elem = ET.SubElement(root, "metadata")
# Add metadata
for key, value in metadata.items():
meta = ET.SubElement(metadata_elem, "meta", name=key)
meta.text = str(value)
# Write back to file
tree.write(file_path)
logger.info(f"Added metadata to AMF file: {file_path}")
except Exception as e:
logger.error(f"Error adding metadata to AMF file: {str(e)}")
def _add_metadata_to_3mf(self, file_path: str, metadata: Dict[str, Any]) -> None:
"""Add metadata to 3MF file."""
try:
import zipfile
import xml.etree.ElementTree as ET
# 3MF files are ZIP archives
with zipfile.ZipFile(file_path, 'a') as z:
# Check if metadata file exists
metadata_path = "Metadata/model_metadata.xml"
try:
z.getinfo(metadata_path)
# Extract existing metadata
with z.open(metadata_path) as f:
tree = ET.parse(f)
root = tree.getroot()
except KeyError:
# Create new metadata file
root = ET.Element("metadata")
tree = ET.ElementTree(root)
# Add metadata
for key, value in metadata.items():
meta = ET.SubElement(root, "meta", name=key)
meta.text = str(value)
# Write metadata to a temporary file
temp_path = file_path + ".metadata.tmp"
tree.write(temp_path)
# Add to ZIP
z.write(temp_path, metadata_path)
# Remove temporary file
os.remove(temp_path)
logger.info(f"Added metadata to 3MF file: {file_path}")
except Exception as e:
logger.error(f"Error adding metadata to 3MF file: {str(e)}")
def get_supported_formats(self) -> List[str]:
"""Get list of supported export formats."""
return list(self.supported_formats.keys())
def get_format_description(self, format_name: str) -> str:
"""Get description of a format."""
return self.supported_formats.get(format_name.lower(), "Unknown format")
```
--------------------------------------------------------------------------------
/src/ai/ai_service.py:
--------------------------------------------------------------------------------
```python
import os
import logging
import re
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class AIService:
"""
Service for AI-driven OpenSCAD code generation.
Translates natural language descriptions into OpenSCAD code.
"""
def __init__(self, templates_dir: str, model_config: Optional[Dict[str, Any]] = None):
"""
Initialize the AI service.
Args:
templates_dir: Directory containing OpenSCAD templates
model_config: Optional configuration for the AI model
"""
self.templates_dir = templates_dir
self.model_config = model_config or {}
# Load templates
self.templates = self._load_templates()
logger.info(f"Initialized AI service with {len(self.templates)} templates")
def generate_openscad_code(self, context: Dict[str, Any]) -> str:
"""
Generate OpenSCAD code from natural language description.
Args:
context: Dictionary containing:
- description: Natural language description
- parameters: Dictionary of parameters
- templates_dir: Directory containing templates
Returns:
Generated OpenSCAD code
"""
description = context.get("description", "")
parameters = context.get("parameters", {})
logger.info(f"Generating OpenSCAD code for: {description}")
# Parse the description to identify key components
components = self._parse_description(description)
# Generate code based on identified components
code = self._generate_code_from_components(components, parameters)
return code
def _load_templates(self) -> Dict[str, str]:
"""Load OpenSCAD code templates from the templates directory."""
templates = {}
# Check if templates directory exists
if not os.path.exists(self.templates_dir):
logger.warning(f"Templates directory not found: {self.templates_dir}")
return templates
# Load all .scad files in the templates directory
for filename in os.listdir(self.templates_dir):
if filename.endswith(".scad"):
template_name = os.path.splitext(filename)[0]
template_path = os.path.join(self.templates_dir, filename)
try:
with open(template_path, 'r') as f:
templates[template_name] = f.read()
except Exception as e:
logger.error(f"Error loading template {template_path}: {e}")
return templates
def _parse_description(self, description: str) -> Dict[str, Any]:
"""
Parse a natural language description to identify key components.
Args:
description: Natural language description of the model
Returns:
Dictionary of identified components
"""
components = {
"primary_shape": None,
"operations": [],
"features": [],
"modifiers": []
}
# Identify primary shape
shape_patterns = {
"cube": r'\b(cube|box|rectangular|block)\b',
"sphere": r'\b(sphere|ball|round|circular)\b',
"cylinder": r'\b(cylinder|tube|pipe|rod)\b',
"cone": r'\b(cone|pyramid|tapered)\b',
"torus": r'\b(torus|donut|ring)\b'
}
for shape, pattern in shape_patterns.items():
if re.search(pattern, description, re.IGNORECASE):
components["primary_shape"] = shape
break
# Identify operations
operation_patterns = {
"union": r'\b(combine|join|merge|add)\b',
"difference": r'\b(subtract|remove|cut|hole|hollow)\b',
"intersection": r'\b(intersect|common|shared)\b'
}
for operation, pattern in operation_patterns.items():
if re.search(pattern, description, re.IGNORECASE):
components["operations"].append(operation)
# Identify features
feature_patterns = {
"rounded_corners": r'\b(rounded corners|fillets|chamfer)\b',
"holes": r'\b(holes|perforations|openings)\b',
"text": r'\b(text|label|inscription)\b',
"pattern": r'\b(pattern|array|grid|repeat)\b'
}
for feature, pattern in feature_patterns.items():
if re.search(pattern, description, re.IGNORECASE):
components["features"].append(feature)
# Identify modifiers
modifier_patterns = {
"scale": r'\b(scale|resize|proportion)\b',
"rotate": r'\b(rotate|turn|spin|angle)\b',
"translate": r'\b(move|shift|position|place)\b',
"mirror": r'\b(mirror|reflect|flip)\b'
}
for modifier, pattern in modifier_patterns.items():
if re.search(pattern, description, re.IGNORECASE):
components["modifiers"].append(modifier)
logger.info(f"Parsed components: {components}")
return components
def _generate_code_from_components(self, components: Dict[str, Any], parameters: Dict[str, Any]) -> str:
"""
Generate OpenSCAD code based on identified components.
Args:
components: Dictionary of identified components
parameters: Dictionary of parameters
Returns:
Generated OpenSCAD code
"""
code = []
# Add header
code.append("// AI-generated OpenSCAD code")
code.append("// Generated from natural language description")
code.append("")
# Add parameter declarations
code.append("// Parameters")
for param, value in parameters.items():
if isinstance(value, str) and not (value.lower() == 'true' or value.lower() == 'false'):
code.append(f'{param} = "{value}";')
else:
code.append(f"{param} = {value};")
code.append("")
# Generate code for primary shape
primary_shape = components.get("primary_shape")
if not primary_shape:
primary_shape = "cube" # Default to cube if no shape is identified
# Start with operations if any
operations = components.get("operations", [])
if operations:
for operation in operations:
code.append(f"{operation}() {{")
code.append(" // Primary shape")
# Add modifiers if any
modifiers = components.get("modifiers", [])
indent = " " if operations else ""
if modifiers:
for modifier in modifiers:
if modifier == "scale":
scale_value = parameters.get("scale", 1)
code.append(f"{indent}scale([{scale_value}, {scale_value}, {scale_value}])")
elif modifier == "rotate":
angle = parameters.get("angle", 0)
code.append(f"{indent}rotate([0, 0, {angle}])")
elif modifier == "translate":
x = parameters.get("x", 0)
y = parameters.get("y", 0)
z = parameters.get("z", 0)
code.append(f"{indent}translate([{x}, {y}, {z}])")
elif modifier == "mirror":
code.append(f"{indent}mirror([0, 0, 1])")
# Add the primary shape
if primary_shape == "cube":
width = parameters.get("width", 10)
depth = parameters.get("depth", 10)
height = parameters.get("height", 10)
center = parameters.get("center", "true")
code.append(f"{indent}cube([{width}, {depth}, {height}], center={center});")
elif primary_shape == "sphere":
radius = parameters.get("radius", 10)
segments = parameters.get("segments", 32)
code.append(f"{indent}sphere(r={radius}, $fn={segments});")
elif primary_shape == "cylinder":
radius = parameters.get("radius", 10)
height = parameters.get("height", 20)
center = parameters.get("center", "true")
segments = parameters.get("segments", 32)
code.append(f"{indent}cylinder(h={height}, r={radius}, center={center}, $fn={segments});")
elif primary_shape == "cone":
base_radius = parameters.get("base_radius", 10)
height = parameters.get("height", 20)
center = parameters.get("center", "true")
segments = parameters.get("segments", 32)
code.append(f"{indent}cylinder(h={height}, r1={base_radius}, r2=0, center={center}, $fn={segments});")
elif primary_shape == "torus":
major_radius = parameters.get("major_radius", 20)
minor_radius = parameters.get("minor_radius", 5)
segments = parameters.get("segments", 32)
code.append(f"{indent}rotate_extrude($fn={segments})")
code.append(f"{indent} translate([{major_radius}, 0, 0])")
code.append(f"{indent} circle(r={minor_radius}, $fn={segments});")
# Add features if any
features = components.get("features", [])
if features and "holes" in features:
code.append("")
code.append(f"{indent}// Add holes")
code.append(f"{indent}difference() {{")
code.append(f"{indent} children(0);") # Reference the primary shape
# Add a sample hole
hole_radius = parameters.get("hole_radius", 2)
code.append(f"{indent} translate([0, 0, 0])")
code.append(f"{indent} cylinder(h=100, r={hole_radius}, center=true, $fn=32);")
code.append(f"{indent}}}")
# Close operations if any
if operations:
code.append("}")
return "\n".join(code)
```