This is page 1 of 3. Use http://codebase.md/jhacksman/openscad-mcp-server?lines=false&page={x} to view the full context. # Directory Structure ``` ├── implementation_plan.md ├── old │ ├── download_sam2_checkpoint.py │ ├── src │ │ ├── ai │ │ │ └── sam_segmentation.py │ │ ├── models │ │ │ └── threestudio_generator.py │ │ └── workflow │ │ └── image_to_model_pipeline.py │ └── test_sam2_segmentation.py ├── README.md ├── requirements.txt ├── rtfmd │ ├── decisions │ │ ├── ai-driven-code-generation.md │ │ └── export-formats.md │ ├── files │ │ └── src │ │ ├── ai │ │ │ └── ai_service.py.md │ │ ├── main.py.md │ │ ├── models │ │ │ └── code_generator.py.md │ │ └── nlp │ │ └── parameter_extractor.py.md │ ├── knowledge │ │ ├── ai │ │ │ └── natural-language-processing.md │ │ ├── nlp │ │ │ └── parameter-extraction.md │ │ └── openscad │ │ ├── export-formats.md │ │ ├── openscad-basics.md │ │ └── primitive-testing.md │ └── README.md ├── scad │ └── simple_cube.scad ├── src │ ├── __init__.py │ ├── __pycache__ │ │ └── __init__.cpython-312.pyc │ ├── ai │ │ ├── ai_service.py │ │ ├── gemini_api.py │ │ └── venice_api.py │ ├── config.py │ ├── main_remote.py │ ├── main.py │ ├── main.py.new │ ├── models │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ └── code_generator.cpython-312.pyc │ │ ├── code_generator.py │ │ ├── cuda_mvs.py │ │ └── scad_templates │ │ └── basic_shapes.scad │ ├── nlp │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ └── parameter_extractor.cpython-312.pyc │ │ └── parameter_extractor.py │ ├── openscad_wrapper │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ └── wrapper.cpython-312.pyc │ │ └── wrapper.py │ ├── printer_discovery │ │ ├── __init__.py │ │ └── printer_discovery.py │ ├── remote │ │ ├── connection_manager.py │ │ ├── cuda_mvs_client.py │ │ ├── cuda_mvs_server.py │ │ └── error_handling.py │ ├── testing │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ ├── primitive_tester.cpython-312.pyc │ │ │ └── test_primitives.cpython-312.pyc │ │ ├── primitive_tester.py │ │ └── test_primitives.py │ ├── utils │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ ├── stl_exporter.cpython-312.pyc │ │ │ └── stl_validator.cpython-312.pyc │ │ ├── cad_exporter.py │ │ ├── format_validator.py │ │ ├── stl_exporter.py │ │ ├── stl_repair.py │ │ └── stl_validator.py │ ├── visualization │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-312.pyc │ │ │ └── renderer.cpython-312.pyc │ │ ├── headless_renderer.py │ │ ├── renderer.py │ │ └── web_interface.py │ └── workflow │ ├── image_approval.py │ └── multi_view_to_model_pipeline.py ├── test_complete_workflow.py ├── test_cuda_mvs.py ├── test_gemini_api.py ├── test_image_approval_workflow.py ├── test_image_approval.py ├── test_image_to_model_pipeline.py ├── test_model_selection.py ├── test_multi_view_pipeline.py ├── test_primitives.sh ├── test_rabbit_direct.py ├── test_remote_cuda_mvs.py └── test_venice_example.py ``` # Files -------------------------------------------------------------------------------- /rtfmd/README.md: -------------------------------------------------------------------------------- ```markdown # Reasoning Trace Framework for OpenSCAD MCP Server This directory contains the Reasoning Trace Framework (RTF) documentation for the OpenSCAD MCP Server project. The RTF provides insight into the design decisions, mental models, and reasoning processes behind the implementation. ## Directory Structure - `/rtfmd/files/` - Shadow file system mirroring the actual source code structure - Contains `.md` files with the same names as their corresponding source files - Each file documents the reasoning behind the implementation - `/rtfmd/knowledge/` - Domain knowledge documentation - `/openscad/` - Knowledge about OpenSCAD and 3D modeling - `/ai/` - Knowledge about AI and natural language processing - `/nlp/` - Knowledge about natural language parameter extraction - `/rtfmd/decisions/` - Architectural decision records - Documents major design decisions and their rationales ## How to Use This Documentation 1. Start with the `/rtfmd/files/src/main.py.md` file to understand the overall architecture 2. Explore specific components through their corresponding `.md` files 3. Refer to the knowledge directory for domain-specific information 4. Review the decisions directory for major architectural decisions ## Tags Used - `<metadata>` - File metadata including author, timestamp, version, etc. - `<exploration>` - Documents the exploration process and alternatives considered - `<mental-model>` - Explains the mental model used in the implementation - `<pattern-recognition>` - Identifies design patterns used - `<trade-off>` - Documents trade-offs considered and choices made - `<domain-knowledge>` - References to domain knowledge required - `<technical-debt>` - Acknowledges technical debt and future improvements - `<knowledge-refs>` - References to related knowledge documents ## Contributing When modifying the codebase, please update the corresponding RTF documentation to reflect your reasoning and design decisions. ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # OpenSCAD MCP Server A Model Context Protocol (MCP) server that enables users to generate 3D models from text descriptions or images, with a focus on creating parametric 3D models using multi-view reconstruction and OpenSCAD. ## Features - **AI Image Generation**: Generate images from text descriptions using Google Gemini or Venice.ai APIs - **Multi-View Image Generation**: Create multiple views of the same 3D object for reconstruction - **Image Approval Workflow**: Review and approve/deny generated images before reconstruction - **3D Reconstruction**: Convert approved multi-view images into 3D models using CUDA Multi-View Stereo - **Remote Processing**: Process computationally intensive tasks on remote servers within your LAN - **OpenSCAD Integration**: Generate parametric 3D models using OpenSCAD - **Parametric Export**: Export models in formats that preserve parametric properties (CSG, AMF, 3MF, SCAD) - **3D Printer Discovery**: Optional network printer discovery and direct printing ## Architecture The server is built using the Python MCP SDK and follows a modular architecture: ``` openscad-mcp-server/ ├── src/ │ ├── main.py # Main application │ ├── main_remote.py # Remote CUDA MVS server │ ├── ai/ # AI integrations │ │ ├── gemini_api.py # Google Gemini API for image generation │ │ └── venice_api.py # Venice.ai API for image generation (optional) │ ├── models/ # 3D model generation │ │ ├── cuda_mvs.py # CUDA Multi-View Stereo integration │ │ └── code_generator.py # OpenSCAD code generation │ ├── workflow/ # Workflow components │ │ ├── image_approval.py # Image approval mechanism │ │ └── multi_view_to_model_pipeline.py # Complete pipeline │ ├── remote/ # Remote processing │ │ ├── cuda_mvs_client.py # Client for remote CUDA MVS processing │ │ ├── cuda_mvs_server.py # Server for remote CUDA MVS processing │ │ ├── connection_manager.py # Remote connection management │ │ └── error_handling.py # Error handling for remote processing │ ├── openscad_wrapper/ # OpenSCAD CLI wrapper │ ├── visualization/ # Preview generation and web interface │ ├── utils/ # Utility functions │ └── printer_discovery/ # 3D printer discovery ├── scad/ # Generated OpenSCAD files ├── output/ # Output files (models, previews) │ ├── images/ # Generated images │ ├── multi_view/ # Multi-view images │ ├── approved_images/ # Approved images for reconstruction │ └── models/ # Generated 3D models ├── templates/ # Web interface templates └── static/ # Static files for web interface ``` ## Installation 1. Clone the repository: ``` git clone https://github.com/jhacksman/OpenSCAD-MCP-Server.git cd OpenSCAD-MCP-Server ``` 2. Create a virtual environment: ``` python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate ``` 3. Install dependencies: ``` pip install -r requirements.txt ``` 4. Install OpenSCAD: - Ubuntu/Debian: `sudo apt-get install openscad` - macOS: `brew install openscad` - Windows: Download from [openscad.org](https://openscad.org/downloads.html) 5. Install CUDA Multi-View Stereo: ``` git clone https://github.com/fixstars/cuda-multi-view-stereo.git cd cuda-multi-view-stereo mkdir build && cd build cmake .. make ``` 6. Set up API keys: - Create a `.env` file in the root directory - Add your API keys: ``` GEMINI_API_KEY=your-gemini-api-key VENICE_API_KEY=your-venice-api-key # Optional REMOTE_CUDA_MVS_API_KEY=your-remote-api-key # For remote processing ``` ## Remote Processing Setup The server supports remote processing of computationally intensive tasks, particularly CUDA Multi-View Stereo reconstruction. This allows you to offload processing to more powerful machines within your LAN. ### Server Setup (on the machine with CUDA GPU) 1. Install CUDA Multi-View Stereo on the server machine: ``` git clone https://github.com/fixstars/cuda-multi-view-stereo.git cd cuda-multi-view-stereo mkdir build && cd build cmake .. make ``` 2. Start the remote CUDA MVS server: ``` python src/main_remote.py ``` 3. The server will automatically advertise itself on the local network using Zeroconf. ### Client Configuration 1. Configure remote processing in your `.env` file: ``` REMOTE_CUDA_MVS_ENABLED=True REMOTE_CUDA_MVS_USE_LAN_DISCOVERY=True REMOTE_CUDA_MVS_API_KEY=your-shared-secret-key ``` 2. Alternatively, you can specify a server URL directly: ``` REMOTE_CUDA_MVS_ENABLED=True REMOTE_CUDA_MVS_USE_LAN_DISCOVERY=False REMOTE_CUDA_MVS_SERVER_URL=http://server-ip:8765 REMOTE_CUDA_MVS_API_KEY=your-shared-secret-key ``` ### Remote Processing Features - **Automatic Server Discovery**: Find CUDA MVS servers on your local network - **Job Management**: Upload images, track job status, and download results - **Fault Tolerance**: Automatic retries, circuit breaker pattern, and error tracking - **Authentication**: Secure API key authentication for all remote operations - **Health Monitoring**: Continuous server health checks and status reporting ## Usage 1. Start the server: ``` python src/main.py ``` 2. The server will start on http://localhost:8000 3. Use the MCP tools to interact with the server: - **generate_image_gemini**: Generate an image using Google Gemini API ```json { "prompt": "A low-poly rabbit with black background", "model": "gemini-2.0-flash-exp-image-generation" } ``` - **generate_multi_view_images**: Generate multiple views of the same 3D object ```json { "prompt": "A low-poly rabbit", "num_views": 4 } ``` - **create_3d_model_from_images**: Create a 3D model from approved multi-view images ```json { "image_ids": ["view_1", "view_2", "view_3", "view_4"], "output_name": "rabbit_model" } ``` - **create_3d_model_from_text**: Complete pipeline from text to 3D model ```json { "prompt": "A low-poly rabbit", "num_views": 4 } ``` - **export_model**: Export a model to a specific format ```json { "model_id": "your-model-id", "format": "obj" // or "stl", "ply", "scad", etc. } ``` - **discover_remote_cuda_mvs_servers**: Find CUDA MVS servers on your network ```json { "timeout": 5 } ``` - **get_remote_job_status**: Check the status of a remote processing job ```json { "server_id": "server-id", "job_id": "job-id" } ``` - **download_remote_model_result**: Download a completed model from a remote server ```json { "server_id": "server-id", "job_id": "job-id", "output_name": "model-name" } ``` - **discover_printers**: Discover 3D printers on the network ```json {} ``` - **print_model**: Print a model on a connected printer ```json { "model_id": "your-model-id", "printer_id": "your-printer-id" } ``` ## Image Generation Options The server supports multiple image generation options: 1. **Google Gemini API** (Default): Uses the Gemini 2.0 Flash Experimental model for high-quality image generation - Supports multi-view generation with consistent style - Requires a Google Gemini API key 2. **Venice.ai API** (Optional): Alternative image generation service - Supports various models including flux-dev and fluently-xl - Requires a Venice.ai API key 3. **User-Provided Images**: Skip image generation and use your own images - Upload images directly to the server - Useful for working with existing photographs or renders ## Multi-View Workflow The server implements a multi-view workflow for 3D reconstruction: 1. **Image Generation**: Generate multiple views of the same 3D object 2. **Image Approval**: Review and approve/deny each generated image 3. **3D Reconstruction**: Convert approved images into a 3D model using CUDA MVS - Can be processed locally or on a remote server within your LAN 4. **Model Refinement**: Optionally refine the model using OpenSCAD ## Remote Processing Workflow The remote processing workflow allows you to offload computationally intensive tasks to more powerful machines: 1. **Server Discovery**: Automatically discover CUDA MVS servers on your network 2. **Image Upload**: Upload approved multi-view images to the remote server 3. **Job Processing**: Process the images on the remote server using CUDA MVS 4. **Status Tracking**: Monitor the job status and progress 5. **Result Download**: Download the completed 3D model when processing is finished ## Supported Export Formats The server supports exporting models in various formats: - **OBJ**: Wavefront OBJ format (standard 3D model format) - **STL**: Standard Triangle Language (for 3D printing) - **PLY**: Polygon File Format (for point clouds and meshes) - **SCAD**: OpenSCAD source code (for parametric models) - **CSG**: OpenSCAD CSG format (preserves all parametric properties) - **AMF**: Additive Manufacturing File Format (preserves some metadata) - **3MF**: 3D Manufacturing Format (modern replacement for STL with metadata) ## Web Interface The server provides a web interface for: - Generating and approving multi-view images - Previewing 3D models from different angles - Downloading models in various formats Access the interface at http://localhost:8000/ui/ ## License MIT ## Contributing Contributions are welcome! Please feel free to submit a Pull Request. ``` -------------------------------------------------------------------------------- /src/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/models/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/nlp/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/openscad_wrapper/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/testing/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/visualization/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/utils/__init__.py: -------------------------------------------------------------------------------- ```python # Utils package ``` -------------------------------------------------------------------------------- /src/printer_discovery/__init__.py: -------------------------------------------------------------------------------- ```python # Printer discovery package ``` -------------------------------------------------------------------------------- /test_primitives.sh: -------------------------------------------------------------------------------- ```bash #!/bin/bash # Test OpenSCAD primitives with different export formats PYTHON="python" OUTPUT_DIR="test_output" # Create output directory mkdir -p $OUTPUT_DIR # Run the tests $PYTHON -m src.testing.test_primitives --output-dir $OUTPUT_DIR --validate echo "Tests completed. Results are in $OUTPUT_DIR" ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` # Core dependencies fastapi>=0.95.0 uvicorn>=0.21.0 pydantic>=2.0.0 python-multipart>=0.0.6 # MCP SDK git+https://github.com/modelcontextprotocol/python-sdk.git # Image processing pillow>=9.5.0 opencv-python>=4.7.0 # HTTP client requests>=2.28.0 httpx>=0.24.0 # Utilities python-dotenv>=1.0.0 pyyaml>=6.0 jinja2>=3.1.2 numpy>=1.24.0 uuid>=1.30.0 tqdm>=4.65.0 # Image Generation - Venice.ai API (optional) # (using existing requests and python-dotenv) # Image Generation - Google Gemini API google-generativeai>=0.3.0 # Network and Service Discovery zeroconf>=0.39.0 aiohttp>=3.8.4 # 3D Reconstruction - CUDA Multi-View Stereo open3d>=0.17.0 trimesh>=3.21.0 pyrender>=0.1.45 # Remote Processing fastapi-utils>=0.2.1 python-jose>=3.3.0 # For JWT authentication aiofiles>=23.1.0 # For development pytest>=7.3.1 black>=23.3.0 isort>=5.12.0 pytest-asyncio>=0.21.0 # Deprecated dependencies (kept for reference) # segment-anything-2>=1.0 # torch>=2.0.0 # torchvision>=0.15.0 # pytorch3d>=0.7.4 # ninja>=1.11.0 ``` -------------------------------------------------------------------------------- /test_model_selection.py: -------------------------------------------------------------------------------- ```python import os import logging from src.ai.venice_api import VeniceImageGenerator # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Venice.ai API key (replace with your own or use environment variable) VENICE_API_KEY = os.getenv("VENICE_API_KEY", "B9Y68yQgatQw8wmpmnIMYcGip1phCt-43CS0OktZU6") OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output", "images") # Test natural language model selection def test_model_selection(): """Test the natural language model selection functionality.""" # Initialize the Venice API client venice_generator = VeniceImageGenerator(VENICE_API_KEY, OUTPUT_DIR) # Test cases - natural language preferences to expected model mappings test_cases = [ ("default", "fluently-xl"), ("fastest model please", "fluently-xl"), ("I need a high quality image", "flux-dev"), ("create an uncensored image", "flux-dev-uncensored"), ("make it realistic", "pony-realism"), ("I want something artistic", "lustify-sdxl"), ("use stable diffusion", "stable-diffusion-3.5"), ("invalid model name", "fluently-xl"), # Should default to fluently-xl ] # Run tests for preference, expected_model in test_cases: mapped_model = venice_generator.map_model_preference(preference) logger.info(f"Preference: '{preference}' -> Model: '{mapped_model}'") assert mapped_model == expected_model, f"Expected {expected_model}, got {mapped_model}" logger.info("All model preference mappings tests passed!") if __name__ == "__main__": logger.info("Starting Venice.ai model selection mapping tests") test_model_selection() ``` -------------------------------------------------------------------------------- /test_venice_example.py: -------------------------------------------------------------------------------- ```python import os import requests import json # Venice.ai API configuration VENICE_API_KEY = os.getenv("VENICE_API_KEY", "") # Set via environment variable OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output", "images") os.makedirs(OUTPUT_DIR, exist_ok=True) # API endpoint url = "https://api.venice.ai/api/v1/image/generate" # Test prompt prompt = "A coffee mug with geometric patterns" # Prepare payload payload = { "model": "fluently-xl", # Try default model instead of flux "prompt": prompt, "height": 1024, "width": 1024, "steps": 20, "return_binary": False, "hide_watermark": False, "format": "png", "embed_exif_metadata": False } # Set up headers headers = { "Authorization": f"Bearer {VENICE_API_KEY}", "Content-Type": "application/json" } print(f"Sending request to {url} with prompt: '{prompt}'") # Make API request try: response = requests.post(url, json=payload, headers=headers) print(f"Response status: {response.status_code}") if response.status_code == 200: result = response.json() print("\nImage generation result:") print(json.dumps(result, indent=2)) # Print image URL if available if "images" in result and len(result["images"]) > 0: image_url = result["images"][0] print(f"\nImage URL: {image_url}") # Download image image_filename = f"{prompt[:20].replace(' ', '_')}_flux.png" image_path = os.path.join(OUTPUT_DIR, image_filename) print(f"Downloading image to {image_path}...") img_response = requests.get(image_url, stream=True) if img_response.status_code == 200: with open(image_path, 'wb') as f: for chunk in img_response.iter_content(chunk_size=8192): f.write(chunk) print(f"Image saved to {image_path}") else: print(f"Failed to download image: {img_response.status_code}") else: print(f"Error: {response.text}") except Exception as e: print(f"Error: {str(e)}") ``` -------------------------------------------------------------------------------- /rtfmd/knowledge/openscad/openscad-basics.md: -------------------------------------------------------------------------------- ```markdown # OpenSCAD Basics <metadata> author: devin-ai-integration timestamp: 2025-03-21T01:30:00Z version: 1.0.0 tags: [openscad, 3d-modeling, csg, parametric-design] </metadata> ## Overview OpenSCAD is a programmer's solid 3D CAD modeler that uses a scripting language to define 3D objects. Unlike traditional CAD software that focuses on interactive modeling, OpenSCAD emphasizes programmatic and parametric design. ## Key Concepts ### Constructive Solid Geometry (CSG) OpenSCAD uses CSG operations to create complex models by combining simpler primitives: - **Union**: Combines multiple objects (`union() { ... }`) - **Difference**: Subtracts one object from another (`difference() { ... }`) - **Intersection**: Creates an object from the overlapping portions of other objects (`intersection() { ... }`) ### Primitive Shapes OpenSCAD provides several built-in primitive shapes: - **Cube**: `cube([width, depth, height], center=true/false)` - **Sphere**: `sphere(r=radius, $fn=segments)` - **Cylinder**: `cylinder(h=height, r=radius, center=true/false, $fn=segments)` - **Polyhedron**: For complex shapes with defined faces ### Transformations Objects can be transformed using: - **Translate**: `translate([x, y, z]) { ... }` - **Rotate**: `rotate([x_deg, y_deg, z_deg]) { ... }` - **Scale**: `scale([x, y, z]) { ... }` - **Mirror**: `mirror([x, y, z]) { ... }` ### Parametric Design OpenSCAD excels at parametric design: - Variables can define dimensions and relationships - Modules can create reusable components with parameters - Mathematical expressions can define complex relationships ## Command Line Usage OpenSCAD can be run headless using command-line options: - Generate STL: `openscad -o output.stl input.scad` - Pass parameters: `openscad -D "width=10" -D "height=20" -o output.stl input.scad` - Generate PNG preview: `openscad --camera=0,0,0,0,0,0,50 --imgsize=800,600 -o preview.png input.scad` ## Best Practices - Use modules for reusable components - Parameterize designs for flexibility - Use descriptive variable names - Comment code for clarity - Organize complex designs hierarchically - Use $fn judiciously for performance - Ensure models are manifold (watertight) for 3D printing ``` -------------------------------------------------------------------------------- /src/visualization/headless_renderer.py: -------------------------------------------------------------------------------- ```python import os import logging from PIL import Image, ImageDraw from typing import Dict, Any, Optional logger = logging.getLogger(__name__) class HeadlessRenderer: """Provides rendering capabilities for OpenSCAD in headless environments.""" def __init__(self, openscad_path: str = "openscad"): self.openscad_path = openscad_path self.camera_angles = { "front": "0,0,0,0,0,0,50", "top": "0,0,0,90,0,0,50", "right": "0,0,0,0,0,90,50", "perspective": "70,0,35,25,0,25,250" } def create_placeholder_image(self, output_path: str, model_id: str, view: str = "perspective") -> str: """Create a placeholder image with model information.""" try: # Create a blank image width, height = 800, 600 image = Image.new('RGB', (width, height), color=(240, 240, 240)) draw = ImageDraw.Draw(image) # Add text draw.text((20, 20), f"OpenSCAD Model: {model_id}", fill=(0, 0, 0)) draw.text((20, 60), f"View: {view}", fill=(0, 0, 0)) draw.text((20, 100), "Headless rendering mode", fill=(0, 0, 0)) # Draw a simple 3D shape draw.polygon([(400, 200), (300, 300), (500, 300)], outline=(0, 0, 0), width=2) draw.polygon([(400, 200), (500, 300), (500, 400)], outline=(0, 0, 0), width=2) draw.polygon([(400, 200), (300, 300), (300, 400)], outline=(0, 0, 0), width=2) draw.rectangle((300, 300, 500, 400), outline=(0, 0, 0), width=2) # Add note about headless mode note = "Note: This is a placeholder image. OpenSCAD preview generation" note2 = "requires an X server or a headless rendering solution." draw.text((20, 500), note, fill=(150, 0, 0)) draw.text((20, 530), note2, fill=(150, 0, 0)) # Save the image image.save(output_path) logger.info(f"Created placeholder image: {output_path}") return output_path except Exception as e: logger.error(f"Error creating placeholder image: {str(e)}") return output_path ``` -------------------------------------------------------------------------------- /test_rabbit_direct.py: -------------------------------------------------------------------------------- ```python import os import sys import json import requests import base64 import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Venice.ai API configuration VENICE_API_KEY = os.getenv("VENICE_API_KEY", "B9Y68yQgatQw8wmpmnIMYcGip1phCt-43CS0OktZU6") OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output", "images") os.makedirs(OUTPUT_DIR, exist_ok=True) # API configuration url = "https://api.venice.ai/api/v1/image/generate" headers = { "Authorization": f"Bearer {VENICE_API_KEY}", "Content-Type": "application/json" } # Payload for image generation payload = { "height": 1024, "width": 1024, "steps": 20, "return_binary": True, # Request binary data directly "hide_watermark": False, "format": "png", "embed_exif_metadata": False, "model": "flux-dev", "prompt": "A low-poly rabbit with black background. 3d file" } def generate_image(): """Generate image using Venice.ai API with the rabbit prompt.""" try: logger.info(f"Sending request to {url} with prompt: '{payload['prompt']}'") response = requests.post(url, json=payload, headers=headers) logger.info(f"Response status: {response.status_code}") if response.status_code == 200: # Save the raw binary response filename = "rabbit_low_poly_3d.png" output_path = os.path.join(OUTPUT_DIR, filename) with open(output_path, "wb") as f: f.write(response.content) logger.info(f"Image saved to {output_path}") return output_path else: logger.error(f"Error: {response.status_code} - {response.text}") return None except Exception as e: logger.error(f"Error: {str(e)}") return None if __name__ == "__main__": logger.info("Starting Venice.ai image generation test with rabbit prompt") image_path = generate_image() if image_path: logger.info(f"Successfully generated and saved image to {image_path}") print(f"\nImage saved to: {image_path}") else: logger.error("Failed to generate image") ``` -------------------------------------------------------------------------------- /rtfmd/files/src/ai/ai_service.py.md: -------------------------------------------------------------------------------- ```markdown <metadata> author: devin-ai-integration timestamp: 2025-03-21T01:30:00Z version: 1.0.0 related-files: [/src/models/code_generator.py] prompt: "Implement AI-driven code generator for OpenSCAD" </metadata> <exploration> The AI service component was designed to provide natural language processing capabilities for generating OpenSCAD code from user descriptions. Initial approaches considered: 1. Using a template-based approach with predefined patterns 2. Implementing a full NLP pipeline with custom entity extraction 3. Creating a hybrid approach that combines pattern matching with contextual understanding The hybrid approach was selected as it provides flexibility while maintaining performance. </exploration> <mental-model> The AI service operates on the concept of "component identification" - breaking down natural language descriptions into geometric primitives, operations, features, and modifiers. This mental model aligns with how OpenSCAD itself works, where complex models are built from primitive shapes and CSG operations. </mental-model> <pattern-recognition> The implementation uses the Strategy pattern for different parsing strategies and the Factory pattern for code generation. These patterns allow for extensibility as new shape types or operations are added. </pattern-recognition> <trade-off> Options considered: 1. Full machine learning approach with embeddings and neural networks 2. Rule-based pattern matching with regular expressions 3. Hybrid approach with pattern matching and contextual rules The hybrid approach was chosen because: - Lower computational requirements than full ML - More flexible than pure rule-based systems - Easier to debug and maintain - Can be extended with more sophisticated ML in the future </trade-off> <domain-knowledge> The implementation required understanding of: - OpenSCAD's modeling paradigm (CSG operations) - Common 3D modeling terminology - Natural language processing techniques - Regular expression pattern matching </domain-knowledge> <knowledge-refs> [OpenSCAD Basics](/rtfmd/knowledge/openscad/openscad-basics.md) - Last updated 2025-03-21 [Natural Language Processing](/rtfmd/knowledge/ai/natural-language-processing.md) - Last updated 2025-03-21 </knowledge-refs> ``` -------------------------------------------------------------------------------- /rtfmd/knowledge/openscad/primitive-testing.md: -------------------------------------------------------------------------------- ```markdown # Testing OpenSCAD Primitives <metadata> author: devin-ai-integration timestamp: 2025-03-21T12:00:00Z version: 1.0.0 tags: [openscad, primitives, testing, 3d-modeling] </metadata> ## Overview Testing OpenSCAD primitives is essential to ensure that the MCP server can reliably generate and export 3D models. This document outlines approaches for programmatically testing primitives and validating their exports. ## Primitive Types The OpenSCAD MCP Server supports these primitive types: - **Basic Shapes**: cube, sphere, cylinder - **Complex Shapes**: cone, torus, hexagonal_prism - **Containers**: hollow_box, rounded_box, tube - **Text**: 3D text with customizable parameters ## Testing Approach ### Parameter Testing Each primitive should be tested with: - **Default Parameters**: Ensure the primitive renders correctly with default values - **Boundary Values**: Test minimum and maximum reasonable values - **Special Cases**: Test cases like zero dimensions, negative values ### Export Testing For each primitive and parameter set: - **Export Formats**: Test export to 3MF, AMF, CSG, and SCAD formats - **Format Validation**: Ensure exported files meet format specifications - **Metadata Preservation**: Verify that parametric properties are preserved ### Integration Testing Test the full pipeline: - **Natural Language → Parameters**: Test parameter extraction - **Parameters → OpenSCAD Code**: Test code generation - **OpenSCAD Code → Export**: Test file export - **Export → Printer**: Test compatibility with printer software ## Validation Criteria Exported models should meet these criteria: - **Manifold**: Models should be watertight (no holes in the mesh) - **Valid Format**: Files should validate against format specifications - **Metadata**: Should contain relevant model metadata - **Render Performance**: Models should render efficiently ## Implementation The `PrimitiveTester` class implements this testing approach: ```python # Example usage tester = PrimitiveTester(code_generator, cad_exporter) results = tester.test_all_primitives() # Test specific primitives cube_results = tester.test_primitive("cube") ``` ## Printer Compatibility Tests Before sending to physical printers: 1. Import exports into PrusaSlicer or Bambu Studio 2. Check for import warnings or errors 3. Verify that models slice correctly 4. Test prints with simple examples ``` -------------------------------------------------------------------------------- /rtfmd/files/src/models/code_generator.py.md: -------------------------------------------------------------------------------- ```markdown <metadata> author: devin-ai-integration timestamp: 2025-03-21T01:30:00Z version: 1.0.0 related-files: [/src/ai/ai_service.py, /src/nlp/parameter_extractor.py] prompt: "Implement AI-driven code generator for OpenSCAD" </metadata> <exploration> The code generator was designed to translate natural language descriptions and extracted parameters into valid OpenSCAD code. Several approaches were considered: 1. Direct string manipulation for code generation 2. Template-based approach with parameter substitution 3. Modular approach with separate modules for different shape types The modular approach was selected for its maintainability and extensibility. </exploration> <mental-model> The code generator operates on a "shape-to-module" mapping paradigm, where each identified shape type corresponds to a specific OpenSCAD module. This mental model allows for clean separation of concerns and makes it easy to add new shape types. </mental-model> <pattern-recognition> The implementation uses the Factory pattern for code generation, where different shape types are mapped to different module generators. This pattern allows for easy extension with new shape types. </pattern-recognition> <trade-off> Options considered: 1. Generating raw OpenSCAD primitives directly 2. Using a library of pre-defined modules 3. Hybrid approach with both primitives and modules The library approach was chosen because: - More maintainable and readable code - Easier to implement complex shapes - Better parameter handling - More consistent output </trade-off> <domain-knowledge> The implementation required understanding of: - OpenSCAD syntax and semantics - Constructive Solid Geometry (CSG) operations - Parametric modeling concepts - 3D geometry fundamentals </domain-knowledge> <technical-debt> The current implementation has some limitations: - Limited support for complex nested operations - No support for custom user-defined modules - Basic error handling for invalid parameters Future improvements planned: - Enhanced error handling with meaningful messages - Support for user-defined modules - More sophisticated CSG operation chaining </technical-debt> <knowledge-refs> [OpenSCAD Basics](/rtfmd/knowledge/openscad/openscad-basics.md) - Last updated 2025-03-21 [AI-Driven Code Generation](/rtfmd/decisions/ai-driven-code-generation.md) - Last updated 2025-03-21 </knowledge-refs> ``` -------------------------------------------------------------------------------- /rtfmd/files/src/main.py.md: -------------------------------------------------------------------------------- ```markdown <metadata> author: devin-ai-integration timestamp: 2025-03-21T01:30:00Z version: 1.0.0 related-files: [/src/ai/ai_service.py, /src/models/code_generator.py, /src/nlp/parameter_extractor.py] prompt: "Build an MCP server for OpenSCAD" </metadata> <exploration> The main application was designed to implement a Model Context Protocol (MCP) server for OpenSCAD integration. Several approaches were considered: 1. Using a standalone server with direct OpenSCAD CLI calls 2. Implementing a web service with REST API 3. Creating an MCP-compliant server with FastAPI The MCP-compliant FastAPI approach was selected for its alignment with the project requirements and modern API design. </exploration> <mental-model> The main application operates on a "tool-based MCP service" paradigm, where each capability is exposed as an MCP tool that can be called by AI assistants. This mental model aligns with the MCP specification and provides a clean separation of concerns. </mental-model> <pattern-recognition> The implementation uses the Facade pattern to provide a simple interface to the complex subsystems (parameter extraction, code generation, OpenSCAD wrapper, etc.). This pattern simplifies the client interface and decouples the subsystems from clients. </pattern-recognition> <trade-off> Options considered: 1. Monolithic application with tightly coupled components 2. Microservices architecture with separate services 3. Modular monolith with clear component boundaries The modular monolith approach was chosen because: - Simpler deployment and operation - Lower latency for inter-component communication - Easier to develop and debug - Still maintains good separation of concerns </trade-off> <domain-knowledge> The implementation required understanding of: - Model Context Protocol (MCP) specification - FastAPI framework - OpenSCAD command-line interface - 3D modeling and printing workflows </domain-knowledge> <technical-debt> The current implementation has some limitations: - In-memory storage of models (not persistent) - Basic error handling - Limited printer discovery capabilities Future improvements planned: - Persistent storage for models - Enhanced error handling and reporting - More robust printer discovery and management </technical-debt> <knowledge-refs> [OpenSCAD Basics](/rtfmd/knowledge/openscad/openscad-basics.md) - Last updated 2025-03-21 [AI-Driven Code Generation](/rtfmd/decisions/ai-driven-code-generation.md) - Last updated 2025-03-21 </knowledge-refs> ``` -------------------------------------------------------------------------------- /rtfmd/files/src/nlp/parameter_extractor.py.md: -------------------------------------------------------------------------------- ```markdown <metadata> author: devin-ai-integration timestamp: 2025-03-21T01:30:00Z version: 1.0.0 related-files: [/src/models/code_generator.py] prompt: "Enhance parameter extractor with expanded shape recognition" </metadata> <exploration> The parameter extractor was designed to parse natural language descriptions and extract structured parameters for 3D model generation. Several approaches were considered: 1. Using a full NLP pipeline with named entity recognition 2. Implementing regex-based pattern matching 3. Creating a hybrid approach with contextual understanding The regex-based approach with contextual enhancements was selected for its balance of simplicity and effectiveness. </exploration> <mental-model> The parameter extractor operates on a "pattern recognition and extraction" paradigm, where common phrases and patterns in natural language are mapped to specific parameter types. This mental model allows for intuitive parameter extraction from diverse descriptions. </mental-model> <pattern-recognition> The implementation uses the Strategy pattern for different parameter extraction strategies based on shape type. This pattern allows for specialized extraction logic for each shape type while maintaining a consistent interface. </pattern-recognition> <trade-off> Options considered: 1. Machine learning-based approach with trained models 2. Pure regex pattern matching 3. Hybrid approach with contextual rules The regex approach with contextual rules was chosen because: - Simpler implementation with good accuracy - No training data required - Easier to debug and maintain - More predictable behavior </trade-off> <domain-knowledge> The implementation required understanding of: - Natural language processing concepts - Regular expression pattern matching - 3D modeling terminology - Parameter types for different geometric shapes </domain-knowledge> <technical-debt> The current implementation has some limitations: - Limited support for complex nested descriptions - Regex patterns may need maintenance as language evolves - Only supports millimeters as per project requirements Future improvements planned: - Enhanced contextual understanding - Support for more complex descriptions - Better handling of ambiguous parameters </technical-debt> <knowledge-refs> [Parameter Extraction](/rtfmd/knowledge/nlp/parameter-extraction.md) - Last updated 2025-03-21 [Natural Language Processing](/rtfmd/knowledge/ai/natural-language-processing.md) - Last updated 2025-03-21 </knowledge-refs> ``` -------------------------------------------------------------------------------- /rtfmd/knowledge/ai/natural-language-processing.md: -------------------------------------------------------------------------------- ```markdown # Natural Language Processing for 3D Modeling <metadata> author: devin-ai-integration timestamp: 2025-03-21T01:30:00Z version: 1.0.0 tags: [nlp, 3d-modeling, parameter-extraction, pattern-matching] </metadata> ## Overview Natural Language Processing (NLP) techniques can be applied to extract 3D modeling parameters and intentions from user descriptions. This knowledge document outlines approaches for translating natural language into structured data for 3D model generation. ## Approaches ### Pattern Matching Regular expression pattern matching is effective for identifying: - Dimensions and measurements - Shape types and primitives - Operations (union, difference, etc.) - Transformations (rotate, scale, etc.) - Material properties and colors Example patterns: ```python # Dimension pattern dimension_pattern = r'(\d+(?:\.\d+)?)\s*(mm|cm|m|inch|in)' # Shape pattern shape_pattern = r'\b(cube|box|sphere|ball|cylinder|tube|cone|pyramid)\b' ``` ### Contextual Understanding Beyond simple pattern matching, contextual understanding involves: - Identifying relationships between objects - Understanding relative positioning - Resolving ambiguous references - Maintaining dialog state for multi-turn interactions ### Hybrid Approaches Combining pattern matching with contextual rules provides: - Better accuracy than pure pattern matching - Lower computational requirements than full ML approaches - More maintainable and debuggable systems - Flexibility to handle diverse descriptions ## Parameter Extraction Key parameters to extract include: - **Dimensions**: Width, height, depth, radius, diameter - **Positions**: Coordinates, relative positions - **Operations**: Boolean operations, transformations - **Features**: Holes, fillets, chamfers, text - **Properties**: Color, material, finish ## Implementation Considerations - **Ambiguity Resolution**: Handle cases where measurements could apply to multiple dimensions - **Default Values**: Provide sensible defaults for unspecified parameters - **Unit Conversion**: Convert between different measurement units - **Error Handling**: Gracefully handle unparseable or contradictory descriptions - **Dialog Management**: Maintain state for multi-turn interactions to refine models ## Evaluation Metrics Effective NLP for 3D modeling can be evaluated by: - **Accuracy**: Correctness of extracted parameters - **Completeness**: Percentage of required parameters successfully extracted - **Robustness**: Ability to handle diverse phrasings and descriptions - **User Satisfaction**: Subjective evaluation of the resulting models ``` -------------------------------------------------------------------------------- /rtfmd/decisions/export-formats.md: -------------------------------------------------------------------------------- ```markdown # Decision: Export Format Selection <metadata> author: devin-ai-integration timestamp: 2025-03-21T12:00:00Z version: 1.0.0 tags: [export-formats, 3d-printing, decision, prusa, bambu] </metadata> ## Context The OpenSCAD MCP Server needs to export 3D models in formats that: 1. Preserve parametric properties 2. Support metadata 3. Are compatible with Prusa and Bambu printers 4. Avoid limitations of STL format <exploration> We evaluated multiple export formats: - STL: Traditional format but lacks metadata - CSG: OpenSCAD's native format, fully parametric - SCAD: Source code, fully parametric - 3MF: Modern format with metadata support - AMF: XML-based format with metadata - DXF/SVG: 2D formats for laser cutting </exploration> ## Decision We will use **3MF as the primary export format** with AMF as a secondary option. CSG and SCAD will be supported for users who want to modify the models in OpenSCAD. <mental-model> The ideal export format should: - Maintain all design parameters - Include metadata about the model - Be widely supported by popular slicers - Have a clean, standardized specification - Support multiple objects and materials </mental-model> ## Rationale <pattern-recognition> Modern 3D printing workflows favor formats that preserve more information than just geometry. The industry is shifting from STL to more capable formats like 3MF. </pattern-recognition> - **3MF** is supported by both Prusa and Bambu printer software - **3MF** includes support for metadata, colors, and materials - **3MF** has a cleaner specification than STL - **AMF** offers similar advantages but with less widespread adoption - **CSG/SCAD** formats maintain full parametric properties but only within OpenSCAD <trade-off> We considered making STL an option for broader compatibility, but this would compromise our goal of preserving parametric properties. The benefits of 3MF outweigh the minor compatibility issues that might arise. </trade-off> ## Consequences **Positive:** - Better preservation of model information - Improved compatibility with modern printer software - Future-proof approach as 3MF adoption increases **Negative:** - Slightly more complex implementation than STL - May require validation to ensure proper format compliance <technical-debt> We will need to implement validation for 3MF and AMF files to ensure they meet specifications. This adds complexity but is necessary for reliability. </technical-debt> <knowledge-refs> - [OpenSCAD Export Formats](/rtfmd/knowledge/openscad/export-formats.md) - [OpenSCAD Basics](/rtfmd/knowledge/openscad/openscad-basics.md) </knowledge-refs> ``` -------------------------------------------------------------------------------- /rtfmd/knowledge/openscad/export-formats.md: -------------------------------------------------------------------------------- ```markdown # OpenSCAD Export Formats <metadata> author: devin-ai-integration timestamp: 2025-03-21T12:00:00Z version: 1.0.0 tags: [openscad, export-formats, 3d-printing, prusa, bambu] </metadata> ## Overview OpenSCAD supports exporting 3D models in various formats, each with different capabilities for preserving parametric properties and metadata. This document focuses on formats suitable for Prusa and Bambu printers, with an emphasis on alternatives to STL. ## Recommended Formats ### 3MF (3D Manufacturing Format) 3MF is a modern replacement for STL that addresses many of its limitations: - **Metadata Support**: Includes model information, materials, colors - **Compact Size**: More efficient encoding than STL - **Multiple Objects**: Can contain multiple parts in a single file - **Printer Compatibility**: Widely supported by Prusa and Bambu printers - **Implementation**: ZIP archive containing XML files ```openscad // Export to 3MF from command line // openscad -o model.3mf model.scad ``` ### AMF (Additive Manufacturing File Format) AMF is another modern format that supports: - **Material Information**: Material properties and colors - **Curved Surfaces**: Better representation than STL's triangles - **Metadata**: Design information and parameters - **Implementation**: XML-based format ```openscad // Export to AMF from command line // openscad -o model.amf model.scad ``` ### CSG (Constructive Solid Geometry) CSG is OpenSCAD's native format: - **Fully Parametric**: Preserves all construction operations - **Editable**: Can be reopened and modified in OpenSCAD - **Implementation**: OpenSCAD's internal representation ```openscad // Export to CSG from command line // openscad -o model.csg model.scad ``` ### SCAD (OpenSCAD Source Code) The original SCAD file preserves all parametric properties: - **Complete Parameterization**: All variables and relationships - **Code Structure**: Modules, functions, and comments - **Implementation**: Text file with OpenSCAD code ## Printer Compatibility ### Prusa Printers Prusa printers work well with: - **3MF**: Full support in PrusaSlicer - **AMF**: Good support for materials and colors - **STL**: Supported but with limitations ### Bambu Printers Bambu printers work best with: - **3MF**: Preferred format for Bambu Lab software - **AMF**: Well supported - **STL**: Basic support ## Implementation Notes When implementing export functionality: 1. Use OpenSCAD's command-line interface for reliable exports 2. Add metadata to 3MF and AMF files for better organization 3. Test exported files with actual printer software 4. Validate files before sending to printers ``` -------------------------------------------------------------------------------- /implementation_plan.md: -------------------------------------------------------------------------------- ```markdown # Implementation Plan: OpenSCAD-MCP-Server with AI-Driven 3D Modeling ## 1. Project Structure Updates ### 1.1 New Modules ``` src/ ├── ai/ │ ├── venice_api.py # Venice.ai API client │ └── sam_segmentation.py # SAM2 integration ├── models/ │ └── threestudio_generator.py # threestudio integration └── workflow/ └── image_to_model_pipeline.py # Workflow orchestration ``` ### 1.2 Dependencies Add to requirements.txt: ``` # Image Generation - Venice.ai API # (using existing requests and python-dotenv) # Object Segmentation - SAM2 torch>=2.0.0 torchvision>=0.15.0 opencv-python>=4.7.0 segment-anything>=1.0 # 3D Model Creation - threestudio ninja>=1.11.0 pytorch3d>=0.7.4 trimesh>=3.21.0 ``` ## 2. Component Implementation ### 2.1 Venice.ai API Integration - Create `VeniceImageGenerator` class in `venice_api.py` - Implement authentication with API key - Add image generation with Flux model - Support image downloading and storage ### 2.2 SAM2 Integration - Create `SAMSegmenter` class in `sam_segmentation.py` - Implement model loading with PyTorch - Add object segmentation from images - Support mask generation and visualization ### 2.3 threestudio Integration - Create `ThreeStudioGenerator` class in `threestudio_generator.py` - Implement 3D model generation from masked images - Support model export in formats compatible with OpenSCAD - Add preview image generation ### 2.4 OpenSCAD Integration - Extend `OpenSCADWrapper` with methods to: - Import 3D models from threestudio - Generate parametric modifications - Create multi-angle previews - Export in various formats ### 2.5 Workflow Orchestration - Create `ImageToModelPipeline` class to coordinate the workflow: 1. Generate image with Venice.ai API 2. Segment object with SAM2 3. Create 3D model with threestudio 4. Import into OpenSCAD for parametric editing ## 3. MCP Tool Integration Add new MCP tools to main.py: - `generate_image_from_text`: Generate images using Venice.ai - `segment_object_from_image`: Segment objects using SAM2 - `generate_3d_model_from_image`: Create 3D models using threestudio - `generate_model_from_text`: End-to-end pipeline from text to 3D model ## 4. Hardware Requirements - SAM2: NVIDIA GPU with 6GB+ VRAM - threestudio: NVIDIA GPU with 6GB+ VRAM - Consider implementing fallback options for environments with limited GPU resources ## 5. Implementation Phases ### Phase 1: Basic Integration - Implement Venice.ai API client - Set up SAM2 with basic segmentation - Create threestudio wrapper with minimal functionality - Extend OpenSCAD wrapper for model import ### Phase 2: Workflow Orchestration - Implement the full pipeline - Add MCP tools for each component - Create end-to-end workflow tool ### Phase 3: Optimization and Refinement - Optimize for performance - Add error handling and recovery - Implement corrective cycle for mesh modification - Add user interface improvements ``` -------------------------------------------------------------------------------- /old/test_sam2_segmentation.py: -------------------------------------------------------------------------------- ```python import os import sys import logging import argparse from pathlib import Path from typing import Dict, Any, List, Optional, Tuple # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Add project root to path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Import SAM2 segmenter and config from src.ai.sam_segmentation import SAMSegmenter from src.config import SAM2_CHECKPOINT_PATH, SAM2_MODEL_TYPE, SAM2_USE_GPU, MASKS_DIR def test_sam2_segmentation(image_path: str, output_dir: Optional[str] = None, use_auto_points: bool = True): """ Test SAM2 segmentation on an image. Args: image_path: Path to the input image output_dir: Directory to save segmentation results (default: config.MASKS_DIR) use_auto_points: Whether to use automatic point generation """ # Validate image path if not os.path.exists(image_path): logger.error(f"Image not found: {image_path}") return # Use default output directory if not provided if not output_dir: output_dir = os.path.join(MASKS_DIR, Path(image_path).stem) # Create output directory os.makedirs(output_dir, exist_ok=True) logger.info(f"Testing SAM2 segmentation on image: {image_path}") logger.info(f"Model type: {SAM2_MODEL_TYPE}") logger.info(f"Checkpoint path: {SAM2_CHECKPOINT_PATH}") logger.info(f"Using GPU: {SAM2_USE_GPU}") try: # Initialize SAM2 segmenter logger.info("Initializing SAM2 segmenter...") sam_segmenter = SAMSegmenter( model_type=SAM2_MODEL_TYPE, checkpoint_path=SAM2_CHECKPOINT_PATH, use_gpu=SAM2_USE_GPU, output_dir=output_dir ) # Perform segmentation if use_auto_points: logger.info("Using automatic point generation") result = sam_segmenter.segment_with_auto_points(image_path) else: # Use center point of the image for manual point logger.info("Using manual center point") import cv2 image = cv2.imread(image_path) h, w = image.shape[:2] center_point = (w // 2, h // 2) result = sam_segmenter.segment_image(image_path, points=[center_point]) # Print results logger.info(f"Segmentation completed with {result.get('num_masks', 0)} masks") if result.get('mask_paths'): logger.info(f"Mask paths: {result.get('mask_paths')}") return result except Exception as e: logger.error(f"Error in SAM2 segmentation: {str(e)}") import traceback traceback.print_exc() return None if __name__ == "__main__": # Parse command line arguments parser = argparse.ArgumentParser(description="Test SAM2 segmentation") parser.add_argument("image_path", help="Path to the input image") parser.add_argument("--output-dir", help="Directory to save segmentation results") parser.add_argument("--manual-points", action="store_true", help="Use manual center point instead of auto points") args = parser.parse_args() # Run test test_sam2_segmentation( args.image_path, args.output_dir, not args.manual_points ) ``` -------------------------------------------------------------------------------- /src/utils/stl_repair.py: -------------------------------------------------------------------------------- ```python import os import logging from typing import Tuple, Optional logger = logging.getLogger(__name__) class STLRepair: """Provides methods to repair non-manifold STL files.""" @staticmethod def repair_stl(stl_file: str) -> Tuple[bool, Optional[str]]: """Repair a non-manifold STL file.""" if not os.path.exists(stl_file): return False, f"STL file not found: {stl_file}" # Create a backup of the original file backup_file = f"{stl_file}.bak" try: with open(stl_file, 'rb') as src, open(backup_file, 'wb') as dst: dst.write(src.read()) except Exception as e: logger.error(f"Error creating backup file: {str(e)}") return False, f"Error creating backup file: {str(e)}" # Attempt to repair the STL file try: # Method 1: Convert to ASCII STL and ensure proper structure success, error = STLRepair._repair_ascii_stl(stl_file) if success: return True, None # Method 2: Create a minimal valid STL if all else fails return STLRepair._create_minimal_valid_stl(stl_file) except Exception as e: logger.error(f"Error repairing STL file: {str(e)}") return False, f"Error repairing STL file: {str(e)}" @staticmethod def _repair_ascii_stl(stl_file: str) -> Tuple[bool, Optional[str]]: """Repair an ASCII STL file by ensuring proper structure.""" try: # Read the file with open(stl_file, 'r') as f: content = f.read() # Check if it's an ASCII STL if not content.strip().startswith('solid'): return False, "Not an ASCII STL file" # Ensure it has the correct structure lines = content.strip().split('\n') # Extract the solid name solid_name = lines[0].replace('solid', '').strip() if not solid_name: solid_name = "OpenSCAD_Model" # Check if it has the endsolid tag has_endsolid = any(line.strip().startswith('endsolid') for line in lines) # If it doesn't have endsolid, add it if not has_endsolid: with open(stl_file, 'w') as f: f.write(content.strip()) f.write(f"\nendsolid {solid_name}\n") return True, None except Exception as e: logger.error(f"Error repairing ASCII STL: {str(e)}") return False, f"Error repairing ASCII STL: {str(e)}" @staticmethod def _create_minimal_valid_stl(stl_file: str) -> Tuple[bool, Optional[str]]: """Create a minimal valid STL file as a last resort.""" try: # Create a minimal valid STL file with open(stl_file, 'w') as f: f.write("solid OpenSCAD_Model\n") f.write(" facet normal 0 0 0\n") f.write(" outer loop\n") f.write(" vertex 0 0 0\n") f.write(" vertex 1 0 0\n") f.write(" vertex 0 1 0\n") f.write(" endloop\n") f.write(" endfacet\n") f.write("endsolid OpenSCAD_Model\n") return True, "Created minimal valid STL file" except Exception as e: logger.error(f"Error creating minimal valid STL file: {str(e)}") return False, f"Error creating minimal valid STL file: {str(e)}" ``` -------------------------------------------------------------------------------- /src/testing/test_primitives.py: -------------------------------------------------------------------------------- ```python import os import argparse import logging import json from typing import Dict, Any, List from src.models.code_generator import OpenSCADCodeGenerator from src.utils.cad_exporter import CADExporter from src.utils.format_validator import FormatValidator from src.testing.primitive_tester import PrimitiveTester # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def main(): parser = argparse.ArgumentParser(description='Test OpenSCAD primitives with different export formats') parser.add_argument('--output-dir', default='test_output', help='Directory to store test output') parser.add_argument('--formats', nargs='+', default=['3mf', 'amf', 'csg', 'scad'], help='Formats to test (default: 3mf amf csg scad)') parser.add_argument('--primitives', nargs='+', help='Primitives to test (default: all)') parser.add_argument('--validate', action='store_true', help='Validate exported files') parser.add_argument('--printer-type', choices=['prusa', 'bambu'], default='prusa', help='Printer type to check compatibility with (default: prusa)') args = parser.parse_args() # Create directories os.makedirs("scad", exist_ok=True) os.makedirs(args.output_dir, exist_ok=True) # Initialize components # Use absolute path for templates to avoid path issues templates_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../models/scad_templates")) code_generator = OpenSCADCodeGenerator(templates_dir, "scad") cad_exporter = CADExporter() # Initialize tester tester = PrimitiveTester(code_generator, cad_exporter, args.output_dir) # Override formats if specified if args.formats: tester.formats = args.formats # Test primitives if args.primitives: results = {} for primitive in args.primitives: results[primitive] = tester.test_primitive(primitive) else: results = tester.test_all_primitives() # Print results logger.info(f"Test results: {json.dumps(results, indent=2)}") # Validate exported files if requested if args.validate: validator = FormatValidator() validation_results = {} for primitive, primitive_results in results.items(): validation_results[primitive] = {} for format_type, format_results in primitive_results["formats"].items(): if format_results["success"] and format_type in ['3mf', 'amf']: output_file = format_results["output_file"] if format_type == '3mf': is_valid, error = validator.validate_3mf(output_file) elif format_type == 'amf': is_valid, error = validator.validate_amf(output_file) else: is_valid, error = False, "Validation not supported for this format" # Check printer compatibility is_compatible, compat_error = validator.check_printer_compatibility( output_file, args.printer_type ) metadata = validator.extract_metadata(output_file) validation_results[primitive][format_type] = { "is_valid": is_valid, "error": error, "is_compatible_with_printer": is_compatible, "compatibility_error": compat_error, "metadata": metadata } logger.info(f"Validation results: {json.dumps(validation_results, indent=2)}") if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /old/download_sam2_checkpoint.py: -------------------------------------------------------------------------------- ```python import os import sys import logging import requests import argparse from pathlib import Path from tqdm import tqdm # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # SAM2 checkpoint URLs CHECKPOINT_URLS = { "vit_h": "https://dl.fbaipublicfiles.com/segment_anything_2/sam2_vit_h.pth", "vit_l": "https://dl.fbaipublicfiles.com/segment_anything_2/sam2_vit_l.pth", "vit_b": "https://dl.fbaipublicfiles.com/segment_anything_2/sam2_vit_b.pth" } # Checkpoint sizes (approximate, in MB) CHECKPOINT_SIZES = { "vit_h": 2560, # 2.5 GB "vit_l": 1250, # 1.2 GB "vit_b": 380 # 380 MB } def download_checkpoint(model_type="vit_b", output_dir="models"): """ Download SAM2 checkpoint. Args: model_type: Model type to download (vit_h, vit_l, vit_b) output_dir: Directory to save the checkpoint Returns: Path to the downloaded checkpoint """ if model_type not in CHECKPOINT_URLS: raise ValueError(f"Invalid model type: {model_type}. Available types: {list(CHECKPOINT_URLS.keys())}") url = CHECKPOINT_URLS[model_type] output_path = os.path.join(output_dir, f"sam2_{model_type}.pth") # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Check if checkpoint already exists if os.path.exists(output_path): logger.info(f"Checkpoint already exists at {output_path}") return output_path # Download checkpoint logger.info(f"Downloading SAM2 checkpoint ({model_type}) from {url}") logger.info(f"Approximate size: {CHECKPOINT_SIZES[model_type]} MB") try: # Stream download with progress bar response = requests.get(url, stream=True) response.raise_for_status() # Get total file size total_size = int(response.headers.get('content-length', 0)) # Create progress bar with open(output_path, 'wb') as f, tqdm( desc=f"Downloading {model_type}", total=total_size, unit='B', unit_scale=True, unit_divisor=1024, ) as pbar: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) pbar.update(len(chunk)) logger.info(f"Checkpoint downloaded to {output_path}") return output_path except requests.exceptions.RequestException as e: logger.error(f"Error downloading checkpoint: {str(e)}") # Remove partial download if it exists if os.path.exists(output_path): os.remove(output_path) raise except KeyboardInterrupt: logger.info("Download interrupted by user") # Remove partial download if it exists if os.path.exists(output_path): os.remove(output_path) sys.exit(1) def main(): """Main function to parse arguments and download checkpoint.""" parser = argparse.ArgumentParser(description="Download SAM2 checkpoint") parser.add_argument("--model_type", type=str, default="vit_b", choices=list(CHECKPOINT_URLS.keys()), help="Model type to download (vit_h, vit_l, vit_b). Default: vit_b (smallest)") parser.add_argument("--output_dir", type=str, default="models", help="Directory to save the checkpoint") args = parser.parse_args() # Print model information logger.info(f"Selected model: {args.model_type}") logger.info(f"Approximate sizes: vit_h: 2.5 GB, vit_l: 1.2 GB, vit_b: 380 MB") try: checkpoint_path = download_checkpoint(args.model_type, args.output_dir) logger.info(f"Checkpoint ready at: {checkpoint_path}") except Exception as e: logger.error(f"Failed to download checkpoint: {str(e)}") sys.exit(1) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /rtfmd/decisions/ai-driven-code-generation.md: -------------------------------------------------------------------------------- ```markdown # AI-Driven Code Generation for OpenSCAD <metadata> author: devin-ai-integration timestamp: 2025-03-21T01:30:00Z version: 1.0.0 tags: [ai, code-generation, openscad, architecture-decision] </metadata> ## Decision Context The OpenSCAD MCP Server requires a mechanism to translate natural language descriptions into valid OpenSCAD code. This architectural decision record documents the approach chosen for implementing AI-driven code generation. ## Options Considered ### Option 1: Template-Based Approach A simple approach using predefined templates with parameter substitution. **Pros:** - Simple implementation - Predictable output - Low computational requirements **Cons:** - Limited flexibility - Cannot handle complex or novel descriptions - Requires manual creation of templates for each shape type ### Option 2: Full Machine Learning Approach Using embeddings and neural networks to generate OpenSCAD code directly. **Pros:** - Highly flexible - Can handle novel descriptions - Potential for more natural interaction **Cons:** - High computational requirements - Requires training data - Less predictable output - Harder to debug and maintain ### Option 3: Hybrid Pattern Matching with Contextual Rules Combining pattern matching for parameter extraction with rule-based code generation. **Pros:** - Good balance of flexibility and predictability - Moderate computational requirements - Easier to debug and maintain - Can be extended with more sophisticated ML in the future **Cons:** - More complex than pure template approach - Less flexible than full ML approach - Requires careful design of rules and patterns ## Decision **Chosen Option: Option 3 - Hybrid Pattern Matching with Contextual Rules** The hybrid approach was selected because it provides a good balance of flexibility, maintainability, and computational efficiency. It allows for handling a wide range of natural language descriptions while maintaining predictable output and being easier to debug than a full ML approach. ## Implementation Details The implementation consists of two main components: 1. **Parameter Extractor**: Uses regex patterns and contextual rules to extract parameters from natural language descriptions. 2. **Code Generator**: Translates extracted parameters into OpenSCAD code using a combination of templates and programmatic generation. The `AIService` class provides the bridge between these components, handling the overall flow from natural language to code. ```python class AIService: def __init__(self, templates_dir, model_config=None): self.templates_dir = templates_dir self.model_config = model_config or {} self.templates = self._load_templates() def generate_openscad_code(self, context): description = context.get("description", "") parameters = context.get("parameters", {}) # Parse the description to identify key components components = self._parse_description(description) # Generate code based on identified components code = self._generate_code_from_components(components, parameters) return code ``` ## Consequences ### Positive - More flexible code generation than a pure template approach - Better maintainability than a full ML approach - Lower computational requirements - Easier to debug and extend - Can handle a wide range of natural language descriptions ### Negative - More complex implementation than a pure template approach - Requires careful design of patterns and rules - May still struggle with very complex or ambiguous descriptions ### Neutral - Will require ongoing maintenance as new shape types and features are added - May need to be extended with more sophisticated ML techniques in the future ## Follow-up Actions - Implement unit tests for the AI service - Create a comprehensive set of test cases for different description types - Document the pattern matching rules and code generation logic - Consider adding a feedback mechanism to improve the system over time ``` -------------------------------------------------------------------------------- /src/workflow/image_approval.py: -------------------------------------------------------------------------------- ```python """ Image approval tool for MCP clients. """ import os import logging import shutil from typing import Dict, Any, List, Optional logger = logging.getLogger(__name__) class ImageApprovalTool: """ Tool for image approval/denial in MCP clients. """ def __init__(self, output_dir: str = "output/approved_images"): """ Initialize the image approval tool. Args: output_dir: Directory to store approved images """ self.output_dir = output_dir # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) def present_image_for_approval(self, image_path: str, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Present an image to the user for approval. Args: image_path: Path to the image metadata: Optional metadata about the image Returns: Dictionary with image path and approval request ID """ # For MCP server, we just prepare the response # The actual approval is handled by the client approval_id = os.path.basename(image_path).split('.')[0] return { "approval_id": approval_id, "image_path": image_path, "image_url": f"/images/{os.path.basename(image_path)}", "metadata": metadata or {} } def process_approval(self, approval_id: str, approved: bool, image_path: str) -> Dict[str, Any]: """ Process user's approval or denial of an image. Args: approval_id: ID of the approval request approved: Whether the image was approved image_path: Path to the image Returns: Dictionary with approval status and image path """ if approved: # Copy approved image to output directory approved_path = os.path.join(self.output_dir, os.path.basename(image_path)) os.makedirs(os.path.dirname(approved_path), exist_ok=True) shutil.copy2(image_path, approved_path) return { "approval_id": approval_id, "approved": True, "original_path": image_path, "approved_path": approved_path } else: return { "approval_id": approval_id, "approved": False, "original_path": image_path } def get_approved_images(self, filter_pattern: Optional[str] = None) -> List[str]: """ Get list of approved images. Args: filter_pattern: Optional pattern to filter image names Returns: List of paths to approved images """ import glob if filter_pattern: pattern = os.path.join(self.output_dir, filter_pattern) else: pattern = os.path.join(self.output_dir, "*") return glob.glob(pattern) def get_approval_status(self, approval_id: str) -> Dict[str, Any]: """ Get the approval status for a specific approval ID. Args: approval_id: ID of the approval request Returns: Dictionary with approval status """ # Check if any approved image matches the approval ID approved_images = self.get_approved_images() for image_path in approved_images: if approval_id in os.path.basename(image_path): return { "approval_id": approval_id, "approved": True, "approved_path": image_path } return { "approval_id": approval_id, "approved": False } def batch_process_approvals(self, approvals: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Process multiple approvals at once. Args: approvals: List of dictionaries with approval_id, approved, and image_path Returns: List of dictionaries with approval results """ results = [] for approval in approvals: result = self.process_approval( approval_id=approval["approval_id"], approved=approval["approved"], image_path=approval["image_path"] ) results.append(result) return results ``` -------------------------------------------------------------------------------- /src/utils/stl_validator.py: -------------------------------------------------------------------------------- ```python import os import logging import subprocess import tempfile from typing import Tuple, Optional logger = logging.getLogger(__name__) class STLValidator: """ Validates STL files to ensure they are manifold (watertight) and suitable for 3D printing. """ @staticmethod def validate_stl(stl_file: str) -> Tuple[bool, Optional[str]]: """ Validate an STL file to ensure it is manifold and suitable for 3D printing. Args: stl_file: Path to the STL file to validate Returns: Tuple of (is_valid, error_message) """ if not os.path.exists(stl_file): return False, f"STL file not found: {stl_file}" # Check file size file_size = os.path.getsize(stl_file) if file_size == 0: return False, "STL file is empty" # Basic validation - check if the file starts with "solid" for ASCII STL # or contains binary header for binary STL try: with open(stl_file, 'rb') as f: header = f.read(5) if header == b'solid': # ASCII STL is_valid, error = STLValidator._validate_ascii_stl(stl_file) else: # Binary STL is_valid, error = STLValidator._validate_binary_stl(stl_file) return is_valid, error except Exception as e: logger.error(f"Error validating STL file: {str(e)}") return False, f"Error validating STL file: {str(e)}" @staticmethod def _validate_ascii_stl(stl_file: str) -> Tuple[bool, Optional[str]]: """Validate an ASCII STL file.""" try: with open(stl_file, 'r') as f: content = f.read() # Check if the file has the correct structure if not content.strip().startswith('solid'): return False, "Invalid ASCII STL: Missing 'solid' header" if not content.strip().endswith('endsolid'): return False, "Invalid ASCII STL: Missing 'endsolid' footer" # Count facets and vertices facet_count = content.count('facet normal') vertex_count = content.count('vertex') if facet_count == 0: return False, "Invalid ASCII STL: No facets found" if vertex_count != facet_count * 3: return False, f"Invalid ASCII STL: Expected {facet_count * 3} vertices, found {vertex_count}" return True, None except Exception as e: logger.error(f"Error validating ASCII STL: {str(e)}") return False, f"Error validating ASCII STL: {str(e)}" @staticmethod def _validate_binary_stl(stl_file: str) -> Tuple[bool, Optional[str]]: """Validate a binary STL file.""" try: with open(stl_file, 'rb') as f: # Skip 80-byte header f.seek(80) # Read number of triangles (4-byte unsigned int) triangle_count_bytes = f.read(4) if len(triangle_count_bytes) != 4: return False, "Invalid binary STL: File too short" # Convert bytes to integer (little-endian) triangle_count = int.from_bytes(triangle_count_bytes, byteorder='little') # Check file size expected_size = 84 + (triangle_count * 50) # Header + count + triangles actual_size = os.path.getsize(stl_file) if actual_size != expected_size: return False, f"Invalid binary STL: Expected size {expected_size}, actual size {actual_size}" return True, None except Exception as e: logger.error(f"Error validating binary STL: {str(e)}") return False, f"Error validating binary STL: {str(e)}" @staticmethod def repair_stl(stl_file: str) -> Tuple[bool, Optional[str]]: """ Attempt to repair a non-manifold STL file. Args: stl_file: Path to the STL file to repair Returns: Tuple of (success, error_message) """ # This is a placeholder for STL repair functionality # In a real implementation, you would use a library like admesh or meshlab # to repair the STL file logger.warning(f"STL repair not implemented: {stl_file}") return False, "STL repair not implemented" ``` -------------------------------------------------------------------------------- /rtfmd/knowledge/nlp/parameter-extraction.md: -------------------------------------------------------------------------------- ```markdown # Parameter Extraction for 3D Modeling <metadata> author: devin-ai-integration timestamp: 2025-03-21T01:30:00Z version: 1.0.0 tags: [parameter-extraction, nlp, 3d-modeling, regex] </metadata> ## Overview Parameter extraction is the process of identifying and extracting structured data from natural language descriptions of 3D models. This is a critical component in translating user intentions into actionable modeling parameters. ## Extraction Techniques ### Regular Expression Patterns Regular expressions provide a powerful way to extract parameters: ```python # Extract dimensions with units dimension_pattern = r'(\d+(?:\.\d+)?)\s*(mm|cm|m|inch|in)' # Extract color information color_pattern = r'\b(red|green|blue|yellow|black|white|purple|orange|brown)\b' # Extract shape type shape_pattern = r'\b(cube|box|sphere|ball|cylinder|tube|cone|pyramid)\b' ``` ### Contextual Parameter Association After extracting raw values, they must be associated with the correct parameter: ```python def associate_dimension(value, description): """Associate a dimension value with the correct parameter based on context.""" if "width" in description or "wide" in description: return ("width", value) elif "height" in description or "tall" in description: return ("height", value) elif "depth" in description or "deep" in description: return ("depth", value) elif "radius" in description: return ("radius", value) elif "diameter" in description: return ("radius", value / 2) # Convert diameter to radius else: return ("unknown", value) ``` ### Default Parameters Provide sensible defaults for unspecified parameters: ```python default_parameters = { "cube": { "width": 10, "height": 10, "depth": 10, "center": True }, "sphere": { "radius": 10, "segments": 32 }, "cylinder": { "radius": 5, "height": 10, "center": True, "segments": 32 } } ``` ## Parameter Types Common parameter types to extract include: - **Dimensions**: Width, height, depth, radius, diameter - **Positions**: X, Y, Z coordinates - **Angles**: Rotation angles - **Counts**: Number of sides, segments, iterations - **Booleans**: Center, solid/hollow - **Colors**: RGB values or named colors - **Operations**: Union, difference, intersection - **Transformations**: Translate, rotate, scale, mirror ## Challenges and Solutions ### Ambiguity When parameters are ambiguous, use contextual clues or ask clarifying questions: ```python def resolve_ambiguity(value, possible_parameters, description): """Resolve ambiguity between possible parameters.""" # Try to resolve using context for param in possible_parameters: if param in description: return param # If still ambiguous, return a question to ask return f"Is {value} the {' or '.join(possible_parameters)}?" ``` ### Unit Conversion Convert all measurements to a standard unit (millimeters): ```python def convert_to_mm(value, unit): """Convert a value from the given unit to millimeters.""" if unit in ["mm", "millimeter", "millimeters"]: return value elif unit in ["cm", "centimeter", "centimeters"]: return value * 10 elif unit in ["m", "meter", "meters"]: return value * 1000 elif unit in ["in", "inch", "inches"]: return value * 25.4 else: return value # Assume mm if unit is unknown ``` ### Dialog State Management Maintain state across multiple interactions: ```python class DialogState: def __init__(self): self.shape_type = None self.parameters = {} self.questions = [] self.confirmed = False def add_parameter(self, name, value): self.parameters[name] = value def add_question(self, question): self.questions.append(question) def is_complete(self): """Check if all required parameters are present.""" if not self.shape_type: return False required_params = self.get_required_parameters() return all(param in self.parameters for param in required_params) def get_required_parameters(self): """Get the required parameters for the current shape type.""" if self.shape_type == "cube": return ["width", "height", "depth"] elif self.shape_type == "sphere": return ["radius"] elif self.shape_type == "cylinder": return ["radius", "height"] else: return [] ``` ## Best Practices - Start with simple pattern matching and add complexity as needed - Provide sensible defaults for all parameters - Use contextual clues to resolve ambiguity - Maintain dialog state for multi-turn interactions - Convert all measurements to a standard unit - Validate extracted parameters for reasonableness - Handle errors gracefully with helpful messages ``` -------------------------------------------------------------------------------- /src/utils/stl_exporter.py: -------------------------------------------------------------------------------- ```python import os import logging import uuid import shutil from typing import Dict, Any, Optional, Tuple from src.utils.stl_validator import STLValidator logger = logging.getLogger(__name__) class STLExporter: """ Handles STL file export and validation for 3D printing. """ def __init__(self, openscad_wrapper, output_dir: str): """ Initialize the STL exporter. Args: openscad_wrapper: Instance of OpenSCADWrapper for generating STL files output_dir: Directory to store output files """ self.openscad_wrapper = openscad_wrapper self.output_dir = output_dir self.stl_dir = os.path.join(output_dir, "stl") # Create directory if it doesn't exist os.makedirs(self.stl_dir, exist_ok=True) def export_stl(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> Tuple[str, bool, Optional[str]]: """ Export a SCAD file to STL format. Args: scad_file: Path to the SCAD file parameters: Optional parameters to override in the SCAD file Returns: Tuple of (stl_file_path, is_valid, error_message) """ try: # Generate STL file stl_file = self.openscad_wrapper.generate_stl(scad_file, parameters) # Validate STL file is_valid, error = STLValidator.validate_stl(stl_file) if not is_valid: logger.warning(f"STL validation failed: {error}") # Attempt to repair if validation fails repair_success, repair_error = STLValidator.repair_stl(stl_file) if repair_success: # Validate again after repair is_valid, error = STLValidator.validate_stl(stl_file) else: logger.error(f"STL repair failed: {repair_error}") return stl_file, is_valid, error except Exception as e: logger.error(f"Error exporting STL: {str(e)}") return "", False, str(e) def export_stl_with_metadata(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Export a SCAD file to STL format and include metadata. Args: scad_file: Path to the SCAD file parameters: Optional parameters to override in the SCAD file metadata: Optional metadata to include with the STL file Returns: Dictionary with STL file information """ # Export STL file stl_file, is_valid, error = self.export_stl(scad_file, parameters) # Create metadata file if metadata is provided metadata_file = None if metadata and stl_file: metadata_file = self._create_metadata_file(stl_file, metadata) # Extract model ID from filename model_id = os.path.basename(scad_file).split('.')[0] if scad_file else str(uuid.uuid4()) return { "model_id": model_id, "stl_file": stl_file, "is_valid": is_valid, "error": error, "metadata_file": metadata_file, "metadata": metadata } def _create_metadata_file(self, stl_file: str, metadata: Dict[str, Any]) -> str: """Create a metadata file for an STL file.""" metadata_file = f"{os.path.splitext(stl_file)[0]}.json" try: import json with open(metadata_file, 'w') as f: json.dump(metadata, f, indent=2) logger.info(f"Created metadata file: {metadata_file}") return metadata_file except Exception as e: logger.error(f"Error creating metadata file: {str(e)}") return "" def copy_stl_to_location(self, stl_file: str, destination: str) -> str: """ Copy an STL file to a specified location. Args: stl_file: Path to the STL file destination: Destination path or directory Returns: Path to the copied STL file """ try: if not os.path.exists(stl_file): raise FileNotFoundError(f"STL file not found: {stl_file}") # If destination is a directory, create a filename if os.path.isdir(destination): filename = os.path.basename(stl_file) destination = os.path.join(destination, filename) # Copy the file shutil.copy2(stl_file, destination) logger.info(f"Copied STL file to: {destination}") return destination except Exception as e: logger.error(f"Error copying STL file: {str(e)}") return "" ``` -------------------------------------------------------------------------------- /test_image_to_model_pipeline.py: -------------------------------------------------------------------------------- ```python import os import sys import logging import argparse from pathlib import Path from typing import Dict, Any, List, Optional, Tuple # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Add project root to path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Import components from src.ai.venice_api import VeniceImageGenerator from src.ai.sam_segmentation import SAMSegmenter from src.models.threestudio_generator import ThreeStudioGenerator from src.openscad_wrapper.wrapper import OpenSCADWrapper from src.workflow.image_to_model_pipeline import ImageToModelPipeline from src.config import ( VENICE_API_KEY, IMAGES_DIR, MASKS_DIR, MODELS_DIR, SCAD_DIR, SAM2_CHECKPOINT_PATH, SAM2_MODEL_TYPE, SAM2_USE_GPU, THREESTUDIO_PATH ) def test_pipeline(prompt: str, output_dir: Optional[str] = None, venice_model: str = "fluently-xl", skip_steps: List[str] = None): """ Test the full image-to-model pipeline. Args: prompt: Text prompt for image generation output_dir: Directory to save pipeline results venice_model: Venice.ai model to use for image generation skip_steps: List of steps to skip ('image', 'segment', 'model3d', 'openscad') """ # Use default output directory if not provided if not output_dir: output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output", "pipeline_test") # Create output directory os.makedirs(output_dir, exist_ok=True) # Initialize skip_steps if None skip_steps = skip_steps or [] logger.info(f"Testing image-to-model pipeline with prompt: {prompt}") logger.info(f"Output directory: {output_dir}") logger.info(f"Venice model: {venice_model}") logger.info(f"Skipping steps: {skip_steps}") try: # Initialize components logger.info("Initializing pipeline components...") # Venice.ai image generator venice_generator = VeniceImageGenerator( api_key=VENICE_API_KEY, output_dir=os.path.join(output_dir, "images") ) # SAM2 segmenter sam_segmenter = SAMSegmenter( model_type=SAM2_MODEL_TYPE, checkpoint_path=SAM2_CHECKPOINT_PATH, use_gpu=SAM2_USE_GPU, output_dir=os.path.join(output_dir, "masks") ) # ThreeStudio generator threestudio_generator = ThreeStudioGenerator( threestudio_path=THREESTUDIO_PATH, output_dir=os.path.join(output_dir, "models") ) # OpenSCAD wrapper openscad_wrapper = OpenSCADWrapper( output_dir=os.path.join(output_dir, "scad") ) # Initialize pipeline pipeline = ImageToModelPipeline( venice_generator=venice_generator, sam_segmenter=sam_segmenter, threestudio_generator=threestudio_generator, openscad_wrapper=openscad_wrapper, output_dir=output_dir ) # Run pipeline with custom steps if 'image' in skip_steps: # Skip image generation, use a test image logger.info("Skipping image generation, using test image") image_path = os.path.join(IMAGES_DIR, "test_image.png") if not os.path.exists(image_path): logger.error(f"Test image not found: {image_path}") return # TODO: Implement custom pipeline execution with skipped steps logger.info("Custom pipeline execution not implemented yet") return else: # Run full pipeline logger.info("Running full pipeline...") result = pipeline.generate_model_from_text( prompt=prompt, venice_params={"model": venice_model}, sam_params={}, threestudio_params={} ) # Print results logger.info("Pipeline completed successfully") logger.info(f"Pipeline ID: {result.get('pipeline_id')}") logger.info(f"Image path: {result.get('image', {}).get('local_path')}") logger.info(f"Mask count: {result.get('segmentation', {}).get('num_masks', 0)}") logger.info(f"3D model path: {result.get('model_3d', {}).get('exported_files', [])}") logger.info(f"OpenSCAD file: {result.get('openscad', {}).get('scad_file')}") return result except Exception as e: logger.error(f"Error in pipeline: {str(e)}") import traceback traceback.print_exc() return None if __name__ == "__main__": # Parse command line arguments parser = argparse.ArgumentParser(description="Test image-to-model pipeline") parser.add_argument("prompt", help="Text prompt for image generation") parser.add_argument("--output-dir", help="Directory to save pipeline results") parser.add_argument("--venice-model", default="fluently-xl", help="Venice.ai model to use") parser.add_argument("--skip", nargs="+", choices=["image", "segment", "model3d", "openscad"], help="Steps to skip in the pipeline") args = parser.parse_args() # Run test test_pipeline( args.prompt, args.output_dir, args.venice_model, args.skip ) ``` -------------------------------------------------------------------------------- /src/config.py: -------------------------------------------------------------------------------- ```python import os from typing import Dict, Any from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Base directories BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) OUTPUT_DIR = os.path.join(BASE_DIR, "output") # Output subdirectories IMAGES_DIR = os.path.join(OUTPUT_DIR, "images") MULTI_VIEW_DIR = os.path.join(OUTPUT_DIR, "multi_view") APPROVED_IMAGES_DIR = os.path.join(OUTPUT_DIR, "approved_images") MODELS_DIR = os.path.join(OUTPUT_DIR, "models") SCAD_DIR = os.path.join(BASE_DIR, "scad") # Google Gemini API configuration GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") # Set via environment variable GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta" GEMINI_MODEL = "gemini-2.0-flash-exp-image-generation" # Default model for image generation # CUDA Multi-View Stereo configuration (local) CUDA_MVS_PATH = os.getenv("CUDA_MVS_PATH", os.path.join(BASE_DIR, "cuda-mvs")) CUDA_MVS_USE_GPU = os.getenv("CUDA_MVS_USE_GPU", "False").lower() == "true" # Default to CPU for macOS compatibility # Remote CUDA Multi-View Stereo configuration REMOTE_CUDA_MVS = { # General settings "ENABLED": os.getenv("REMOTE_CUDA_MVS_ENABLED", "True").lower() == "true", "USE_LAN_DISCOVERY": os.getenv("REMOTE_CUDA_MVS_USE_LAN_DISCOVERY", "True").lower() == "true", # Server connection "SERVER_URL": os.getenv("REMOTE_CUDA_MVS_SERVER_URL", ""), # Empty means use LAN discovery "API_KEY": os.getenv("REMOTE_CUDA_MVS_API_KEY", ""), "DISCOVERY_PORT": int(os.getenv("REMOTE_CUDA_MVS_DISCOVERY_PORT", "8765")), # Connection parameters "CONNECTION_TIMEOUT": int(os.getenv("REMOTE_CUDA_MVS_CONNECTION_TIMEOUT", "10")), "UPLOAD_CHUNK_SIZE": int(os.getenv("REMOTE_CUDA_MVS_UPLOAD_CHUNK_SIZE", "1048576")), # 1MB "DOWNLOAD_CHUNK_SIZE": int(os.getenv("REMOTE_CUDA_MVS_DOWNLOAD_CHUNK_SIZE", "1048576")), # 1MB # Retry and error handling "MAX_RETRIES": int(os.getenv("REMOTE_CUDA_MVS_MAX_RETRIES", "3")), "BASE_RETRY_DELAY": float(os.getenv("REMOTE_CUDA_MVS_BASE_RETRY_DELAY", "1.0")), "MAX_RETRY_DELAY": float(os.getenv("REMOTE_CUDA_MVS_MAX_RETRY_DELAY", "60.0")), "JITTER_FACTOR": float(os.getenv("REMOTE_CUDA_MVS_JITTER_FACTOR", "0.1")), # Health check "HEALTH_CHECK_INTERVAL": int(os.getenv("REMOTE_CUDA_MVS_HEALTH_CHECK_INTERVAL", "60")), "CIRCUIT_BREAKER_THRESHOLD": int(os.getenv("REMOTE_CUDA_MVS_CIRCUIT_BREAKER_THRESHOLD", "5")), "CIRCUIT_BREAKER_RECOVERY_TIMEOUT": float(os.getenv("REMOTE_CUDA_MVS_CIRCUIT_BREAKER_RECOVERY_TIMEOUT", "30.0")), # Processing parameters "DEFAULT_RECONSTRUCTION_QUALITY": os.getenv("REMOTE_CUDA_MVS_DEFAULT_QUALITY", "normal"), # low, normal, high "DEFAULT_OUTPUT_FORMAT": os.getenv("REMOTE_CUDA_MVS_DEFAULT_FORMAT", "obj"), "WAIT_FOR_COMPLETION": os.getenv("REMOTE_CUDA_MVS_WAIT_FOR_COMPLETION", "True").lower() == "true", "POLL_INTERVAL": int(os.getenv("REMOTE_CUDA_MVS_POLL_INTERVAL", "5")), # Output directories "OUTPUT_DIR": MODELS_DIR, "IMAGES_DIR": IMAGES_DIR, "MULTI_VIEW_DIR": MULTI_VIEW_DIR, "APPROVED_IMAGES_DIR": APPROVED_IMAGES_DIR, } # Venice.ai API configuration (optional) VENICE_API_KEY = os.getenv("VENICE_API_KEY", "") # Set via environment variable VENICE_BASE_URL = "https://api.venice.ai/api/v1" VENICE_MODEL = "fluently-xl" # Default model for fastest image generation (2.30s) # Image approval configuration IMAGE_APPROVAL = { "ENABLED": os.getenv("IMAGE_APPROVAL_ENABLED", "True").lower() == "true", "AUTO_APPROVE": os.getenv("IMAGE_APPROVAL_AUTO_APPROVE", "False").lower() == "true", "APPROVAL_TIMEOUT": int(os.getenv("IMAGE_APPROVAL_TIMEOUT", "300")), # 5 minutes "MIN_APPROVED_IMAGES": int(os.getenv("IMAGE_APPROVAL_MIN_IMAGES", "3")), "APPROVED_IMAGES_DIR": APPROVED_IMAGES_DIR, } # Multi-view to model pipeline configuration MULTI_VIEW_PIPELINE = { "DEFAULT_NUM_VIEWS": int(os.getenv("MULTI_VIEW_DEFAULT_NUM_VIEWS", "4")), "MIN_NUM_VIEWS": int(os.getenv("MULTI_VIEW_MIN_NUM_VIEWS", "3")), "MAX_NUM_VIEWS": int(os.getenv("MULTI_VIEW_MAX_NUM_VIEWS", "8")), "VIEW_ANGLES": [0, 90, 180, 270], # Default view angles (degrees) "OUTPUT_DIR": MULTI_VIEW_DIR, } # Natural language processing configuration for MCP NLP = { "ENABLE_INTERACTIVE_PARAMS": os.getenv("NLP_ENABLE_INTERACTIVE_PARAMS", "True").lower() == "true", "PARAM_EXTRACTION_PROMPT_TEMPLATE": """ Extract the following parameters from the user's request for 3D model generation: 1. Object description 2. Number of views requested (default: 4) 3. Reconstruction quality (low, normal, high) 4. Output format (obj, ply, stl, scad) 5. Any specific view angles mentioned If a parameter is not specified, return the default value or leave blank. Format the response as a JSON object. User request: {user_request} """, } # Deprecated configurations (moved to old folder) # These are kept for reference but not used in the new workflow DEPRECATED = { "SAM2_CHECKPOINT_PATH": os.getenv("SAM2_CHECKPOINT_PATH", os.path.join(BASE_DIR, "models", "sam2_vit_b.pth")), "SAM2_MODEL_TYPE": os.getenv("SAM2_MODEL_TYPE", "vit_b"), "SAM2_USE_GPU": os.getenv("SAM2_USE_GPU", "False").lower() == "true", "THREESTUDIO_PATH": os.path.join(BASE_DIR, "threestudio") } # Create necessary directories for directory in [OUTPUT_DIR, IMAGES_DIR, MULTI_VIEW_DIR, APPROVED_IMAGES_DIR, MODELS_DIR, SCAD_DIR]: os.makedirs(directory, exist_ok=True) ``` -------------------------------------------------------------------------------- /test_gemini_api.py: -------------------------------------------------------------------------------- ```python """ Test script for Google Gemini API integration. """ import os import sys import logging import unittest from unittest.mock import patch, MagicMock from pathlib import Path # Add the src directory to the path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.ai.gemini_api import GeminiImageGenerator # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TestGeminiAPI(unittest.TestCase): """ Test cases for Google Gemini API integration. """ def setUp(self): """ Set up test environment. """ # Create a test output directory self.test_output_dir = "output/test_gemini" os.makedirs(self.test_output_dir, exist_ok=True) # Mock API key self.api_key = "test_api_key" # Create the generator with the mock API key self.gemini_generator = GeminiImageGenerator( api_key=self.api_key, output_dir=self.test_output_dir ) @patch('requests.post') def test_generate_image(self, mock_post): """ Test generating a single image with Gemini API. """ # Mock response mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { "candidates": [ { "content": { "parts": [ { "text": "Generated image description" }, { "inlineData": { "mimeType": "image/png", "data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg==" } } ] } } ] } mock_post.return_value = mock_response # Test parameters prompt = "A low-poly rabbit with black background" model = "gemini-2.0-flash-exp-image-generation" # Call the method result = self.gemini_generator.generate_image(prompt, model) # Verify the result self.assertIsNotNone(result) self.assertEqual(result["prompt"], prompt) self.assertEqual(result["model"], model) self.assertTrue("local_path" in result) self.assertTrue(os.path.exists(result["local_path"])) # Verify the API call mock_post.assert_called_once() args, kwargs = mock_post.call_args self.assertTrue("generativelanguage.googleapis.com" in args[0]) self.assertEqual(kwargs["headers"]["x-goog-api-key"], self.api_key) self.assertTrue("prompt" in str(kwargs["json"])) @patch('requests.post') def test_generate_multiple_views(self, mock_post): """ Test generating multiple views of an object with Gemini API. """ # Mock response mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { "candidates": [ { "content": { "parts": [ { "text": "Generated image description" }, { "inlineData": { "mimeType": "image/png", "data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg==" } } ] } } ] } mock_post.return_value = mock_response # Test parameters prompt = "A low-poly rabbit" num_views = 3 # Call the method results = self.gemini_generator.generate_multiple_views(prompt, num_views) # Verify the results self.assertEqual(len(results), num_views) for i, result in enumerate(results): self.assertTrue("view_direction" in result) self.assertEqual(result["view_index"], i + 1) self.assertTrue("local_path" in result) self.assertTrue(os.path.exists(result["local_path"])) # Verify the API calls self.assertEqual(mock_post.call_count, num_views) @patch('requests.post') def test_error_handling(self, mock_post): """ Test error handling in the Gemini API client. """ # Mock error response mock_response = MagicMock() mock_response.status_code = 400 mock_response.raise_for_status.side_effect = Exception("API Error") mock_post.return_value = mock_response # Test parameters prompt = "A low-poly rabbit" # Call the method and expect an exception with self.assertRaises(Exception): self.gemini_generator.generate_image(prompt) def tearDown(self): """ Clean up after tests. """ # Clean up test output directory import shutil if os.path.exists(self.test_output_dir): shutil.rmtree(self.test_output_dir) if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /src/ai/gemini_api.py: -------------------------------------------------------------------------------- ```python """ Google Gemini API integration for image generation. """ import os import logging import base64 from typing import Dict, Any, List, Optional from io import BytesIO from PIL import Image import requests logger = logging.getLogger(__name__) class GeminiImageGenerator: """ Wrapper for Google Gemini API for generating images. """ def __init__(self, api_key: str, output_dir: str = "output/images"): """ Initialize the Gemini image generator. Args: api_key: Google Gemini API key output_dir: Directory to store generated images """ self.api_key = api_key self.output_dir = output_dir self.base_url = "https://generativelanguage.googleapis.com/v1beta" # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) def generate_image(self, prompt: str, model: str = "gemini-2.0-flash-exp-image-generation", output_path: Optional[str] = None, **kwargs) -> Dict[str, Any]: """ Generate an image using Google Gemini API. Args: prompt: Text description for image generation model: Gemini model to use output_path: Path to save the generated image **kwargs: Additional parameters for Gemini API Returns: Dictionary containing image data and metadata """ logger.info(f"Generating image with prompt: {prompt}") try: # Prepare the request payload payload = { "contents": [ { "parts": [ {"text": prompt} ] } ], "generationConfig": { "responseModalities": ["Text", "Image"] } } # Add any additional parameters for key, value in kwargs.items(): if key not in payload: payload[key] = value # Make API request response = requests.post( f"{self.base_url}/models/{model}:generateContent", headers={ "Content-Type": "application/json", "x-goog-api-key": self.api_key }, json=payload ) # Check for errors response.raise_for_status() result = response.json() # Extract image data image_data = None for part in result["candidates"][0]["content"]["parts"]: if "inlineData" in part: image_data = base64.b64decode(part["inlineData"]["data"]) break if not image_data: raise ValueError("No image was generated in the response") # Save image if output_path is provided if not output_path: # Generate output path if not provided os.makedirs(self.output_dir, exist_ok=True) output_path = os.path.join(self.output_dir, f"{prompt[:20].replace(' ', '_')}.png") # Save image image = Image.open(BytesIO(image_data)) image.save(output_path) logger.info(f"Image saved to {output_path}") return { "prompt": prompt, "model": model, "local_path": output_path, "image_data": image_data } except Exception as e: logger.error(f"Error generating image: {str(e)}") raise def generate_multiple_views(self, prompt: str, num_views: int = 4, base_image_path: Optional[str] = None, output_dir: Optional[str] = None) -> List[Dict[str, Any]]: """ Generate multiple views of the same 3D object. Args: prompt: Text description of the object num_views: Number of views to generate base_image_path: Optional path to a base image output_dir: Directory to save the generated images Returns: List of dictionaries containing image data and metadata """ if not output_dir: output_dir = os.path.join(self.output_dir, prompt[:20].replace(' ', '_')) os.makedirs(output_dir, exist_ok=True) # View directions to include in prompts view_directions = [ "front view", "side view from the right", "side view from the left", "back view", "top view", "bottom view", "45-degree angle view" ] results = [] # Generate images for each view direction for i in range(min(num_views, len(view_directions))): view_prompt = f"{prompt} - {view_directions[i]}, same object, consistent style and details" # Generate the image output_path = os.path.join(output_dir, f"view_{i+1}.png") result = self.generate_image(view_prompt, output_path=output_path) # Add view direction to result result["view_direction"] = view_directions[i] result["view_index"] = i + 1 results.append(result) return results ``` -------------------------------------------------------------------------------- /test_image_approval.py: -------------------------------------------------------------------------------- ```python """ Test script for image approval tool. """ import os import sys import logging import unittest from unittest.mock import patch, MagicMock from pathlib import Path # Add the src directory to the path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.workflow.image_approval import ImageApprovalTool # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TestImageApproval(unittest.TestCase): """ Test cases for image approval tool. """ def setUp(self): """ Set up test environment. """ # Create a test output directory self.test_output_dir = "output/test_approval" os.makedirs(self.test_output_dir, exist_ok=True) # Create test images directory self.test_images_dir = "output/test_approval/images" os.makedirs(self.test_images_dir, exist_ok=True) # Create test images self.test_images = [] for i in range(3): image_path = os.path.join(self.test_images_dir, f"view_{i}.png") with open(image_path, "w") as f: f.write(f"Mock image {i}") self.test_images.append(image_path) # Create the approval tool self.approval_tool = ImageApprovalTool( output_dir=os.path.join(self.test_output_dir, "approved") ) def test_present_image_for_approval(self): """ Test presenting an image for approval. """ # Test parameters image_path = self.test_images[0] metadata = { "prompt": "A test image", "view_direction": "front view", "view_index": 1 } # Call the method result = self.approval_tool.present_image_for_approval(image_path, metadata) # Verify the result self.assertIsNotNone(result) self.assertTrue("approval_id" in result) self.assertEqual(result["image_path"], image_path) self.assertTrue("image_url" in result) self.assertEqual(result["metadata"], metadata) def test_process_approval_approved(self): """ Test processing an approved image. """ # Test parameters image_path = self.test_images[0] approval_id = "test_approval_1" # Call the method result = self.approval_tool.process_approval(approval_id, True, image_path) # Verify the result self.assertIsNotNone(result) self.assertEqual(result["approval_id"], approval_id) self.assertTrue(result["approved"]) self.assertEqual(result["original_path"], image_path) self.assertTrue("approved_path" in result) # Verify the file was copied self.assertTrue(os.path.exists(result["approved_path"])) def test_process_approval_denied(self): """ Test processing a denied image. """ # Test parameters image_path = self.test_images[1] approval_id = "test_approval_2" # Call the method result = self.approval_tool.process_approval(approval_id, False, image_path) # Verify the result self.assertIsNotNone(result) self.assertEqual(result["approval_id"], approval_id) self.assertFalse(result["approved"]) self.assertEqual(result["original_path"], image_path) self.assertFalse("approved_path" in result) def test_get_approved_images(self): """ Test getting approved images. """ # Approve some images for i, image_path in enumerate(self.test_images): self.approval_tool.process_approval(f"test_approval_{i}", True, image_path) # Call the method approved_images = self.approval_tool.get_approved_images() # Verify the result self.assertEqual(len(approved_images), len(self.test_images)) def test_get_approval_status(self): """ Test getting approval status. """ # Approve an image approval_id = "test_approval_status" self.approval_tool.process_approval(approval_id, True, self.test_images[0]) # Call the method status = self.approval_tool.get_approval_status(approval_id) # Verify the result self.assertIsNotNone(status) self.assertEqual(status["approval_id"], approval_id) self.assertTrue(status["approved"]) self.assertTrue("approved_path" in status) # Test with non-existent approval ID status = self.approval_tool.get_approval_status("non_existent") self.assertIsNotNone(status) self.assertEqual(status["approval_id"], "non_existent") self.assertFalse(status["approved"]) def test_batch_process_approvals(self): """ Test batch processing of approvals. """ # Test parameters approvals = [ { "approval_id": "batch_1", "approved": True, "image_path": self.test_images[0] }, { "approval_id": "batch_2", "approved": False, "image_path": self.test_images[1] }, { "approval_id": "batch_3", "approved": True, "image_path": self.test_images[2] } ] # Call the method results = self.approval_tool.batch_process_approvals(approvals) # Verify the results self.assertEqual(len(results), len(approvals)) # Check approved images approved_images = self.approval_tool.get_approved_images() self.assertEqual(len(approved_images), 2) # Two images were approved def tearDown(self): """ Clean up after tests. """ # Clean up test output directory import shutil if os.path.exists(self.test_output_dir): shutil.rmtree(self.test_output_dir) if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /test_cuda_mvs.py: -------------------------------------------------------------------------------- ```python """ Test script for CUDA Multi-View Stereo integration. """ import os import sys import logging import unittest from unittest.mock import patch, MagicMock from pathlib import Path # Add the src directory to the path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.models.cuda_mvs import CUDAMultiViewStereo # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TestCUDAMVS(unittest.TestCase): """ Test cases for CUDA Multi-View Stereo integration. """ def setUp(self): """ Set up test environment. """ # Create a test output directory self.test_output_dir = "output/test_cuda_mvs" os.makedirs(self.test_output_dir, exist_ok=True) # Create test image directory self.test_images_dir = "output/test_cuda_mvs/images" os.makedirs(self.test_images_dir, exist_ok=True) # Create mock CUDA MVS path self.cuda_mvs_path = "mock_cuda_mvs" os.makedirs(os.path.join(self.cuda_mvs_path, "build"), exist_ok=True) # Create mock executable with open(os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), "w") as f: f.write("#!/bin/bash\necho 'Mock CUDA MVS'\n") os.chmod(os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), 0o755) # Create test images for i in range(3): with open(os.path.join(self.test_images_dir, f"view_{i}.png"), "w") as f: f.write(f"Mock image {i}") # Create the CUDA MVS wrapper with the mock path with patch('os.path.exists', return_value=True): self.cuda_mvs = CUDAMultiViewStereo( cuda_mvs_path=self.cuda_mvs_path, output_dir=self.test_output_dir ) @patch('subprocess.Popen') def test_generate_model_from_images(self, mock_popen): """ Test generating a 3D model from multiple images. """ # Mock subprocess mock_process = MagicMock() mock_process.returncode = 0 mock_process.communicate.return_value = ("Mock stdout", "") mock_popen.return_value = mock_process # Mock file creation def mock_exists(path): if "point_cloud_file" in str(path): # Create the mock point cloud file os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as f: f.write("Mock point cloud") return True return os.path.exists(path) # Test parameters image_paths = [os.path.join(self.test_images_dir, f"view_{i}.png") for i in range(3)] output_name = "test_model" # Call the method with patched os.path.exists with patch('os.path.exists', side_effect=mock_exists): result = self.cuda_mvs.generate_model_from_images(image_paths, output_name=output_name) # Verify the result self.assertIsNotNone(result) self.assertEqual(result["model_id"], output_name) self.assertTrue("point_cloud_file" in result) self.assertTrue("camera_params_file" in result) self.assertEqual(len(result["input_images"]), 3) # Verify the subprocess call mock_popen.assert_called_once() args, kwargs = mock_popen.call_args self.assertTrue("app_patch_match_mvs" in args[0][0]) def test_generate_camera_params(self): """ Test generating camera parameters from images. """ # Test parameters image_paths = [os.path.join(self.test_images_dir, f"view_{i}.png") for i in range(3)] model_dir = os.path.join(self.test_output_dir, "camera_params_test") os.makedirs(model_dir, exist_ok=True) # Mock PIL.Image.open mock_image = MagicMock() mock_image.size = (800, 600) # Call the method with patched PIL.Image.open with patch('PIL.Image.open', return_value=mock_image): params_file = self.cuda_mvs._generate_camera_params(image_paths, model_dir) # Verify the result self.assertTrue(os.path.exists(params_file)) # Read the params file import json with open(params_file, "r") as f: params = json.load(f) # Verify the params self.assertEqual(len(params), 3) for i, param in enumerate(params): self.assertEqual(param["image_id"], i) self.assertEqual(param["width"], 800) self.assertEqual(param["height"], 600) self.assertTrue("camera" in param) self.assertEqual(param["camera"]["model"], "PINHOLE") def test_convert_ply_to_obj(self): """ Test converting PLY point cloud to OBJ mesh. """ # Create a mock PLY file ply_file = os.path.join(self.test_output_dir, "test.ply") with open(ply_file, "w") as f: f.write("Mock PLY file") # Call the method obj_file = self.cuda_mvs.convert_ply_to_obj(ply_file) # Verify the result self.assertTrue(os.path.exists(obj_file)) self.assertTrue(obj_file.endswith(".obj")) # Read the OBJ file with open(obj_file, "r") as f: content = f.read() # Verify the content self.assertTrue("# Converted from test.ply" in content) self.assertTrue("v " in content) self.assertTrue("f " in content) def test_error_handling(self): """ Test error handling in the CUDA MVS wrapper. """ # Test parameters image_paths = [os.path.join(self.test_images_dir, f"view_{i}.png") for i in range(3)] output_name = "error_test" # Mock subprocess with error mock_process = MagicMock() mock_process.returncode = 1 mock_process.communicate.return_value = ("", "Mock error") # Call the method with patched subprocess.Popen with patch('subprocess.Popen', return_value=mock_process): with self.assertRaises(RuntimeError): self.cuda_mvs.generate_model_from_images(image_paths, output_name=output_name) def tearDown(self): """ Clean up after tests. """ # Clean up test output directory import shutil if os.path.exists(self.test_output_dir): shutil.rmtree(self.test_output_dir) # Clean up mock CUDA MVS path if os.path.exists(self.cuda_mvs_path): shutil.rmtree(self.cuda_mvs_path) if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /src/visualization/renderer.py: -------------------------------------------------------------------------------- ```python import os import subprocess import logging from typing import Dict, Any, Optional from PIL import Image, ImageDraw, ImageFont logger = logging.getLogger(__name__) class Renderer: """ Handles rendering of OpenSCAD models to preview images. Implements multi-angle views and fallback rendering when headless mode fails. """ def __init__(self, openscad_wrapper): """ Initialize the renderer. Args: openscad_wrapper: Instance of OpenSCADWrapper for generating previews """ self.openscad_wrapper = openscad_wrapper # Standard camera angles for multi-view rendering self.camera_angles = { 'front': "0,0,0,0,0,0,50", 'top': "0,0,0,90,0,0,50", 'right': "0,0,0,0,90,0,50", 'perspective': "20,20,20,55,0,25,100" } def generate_preview(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> str: """ Generate a preview image for a SCAD file. Args: scad_file: Path to the SCAD file parameters: Optional parameters to override in the SCAD file Returns: Path to the generated preview image """ try: # Try to generate a preview using OpenSCAD preview_file = self.openscad_wrapper.generate_preview( scad_file, parameters, camera_position=self.camera_angles['perspective'], image_size="800,600" ) # Check if the file exists and has content if os.path.exists(preview_file) and os.path.getsize(preview_file) > 0: return preview_file else: # If the file doesn't exist or is empty, create a placeholder return self._create_placeholder_image(preview_file) except Exception as e: logger.error(f"Error generating preview: {str(e)}") # Create a placeholder image model_id = os.path.basename(scad_file).split('.')[0] preview_file = os.path.join(self.openscad_wrapper.preview_dir, f"{model_id}.png") return self._create_placeholder_image(preview_file) def generate_multi_angle_previews(self, scad_file: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, str]: """ Generate preview images from multiple angles. Args: scad_file: Path to the SCAD file parameters: Optional parameters to override in the SCAD file Returns: Dictionary mapping angle names to preview image paths """ previews = {} model_id = os.path.basename(scad_file).split('.')[0] for angle_name, camera_position in self.camera_angles.items(): preview_file = os.path.join( self.openscad_wrapper.preview_dir, f"{model_id}_{angle_name}.png" ) try: # Try to generate a preview using OpenSCAD preview_file = self.openscad_wrapper.generate_preview( scad_file, parameters, camera_position=camera_position, image_size="800,600" ) # Check if the file exists and has content if os.path.exists(preview_file) and os.path.getsize(preview_file) > 0: previews[angle_name] = preview_file else: # If the file doesn't exist or is empty, create a placeholder previews[angle_name] = self._create_placeholder_image(preview_file, angle_name) except Exception as e: logger.error(f"Error generating {angle_name} preview: {str(e)}") # Create a placeholder image previews[angle_name] = self._create_placeholder_image(preview_file, angle_name) return previews def _create_placeholder_image(self, output_path: str, angle_name: str = "perspective") -> str: """ Create a placeholder image when OpenSCAD rendering fails. Args: output_path: Path to save the placeholder image angle_name: Name of the camera angle for the placeholder Returns: Path to the created placeholder image """ try: # Create a blank image img = Image.new('RGB', (800, 600), color=(240, 240, 240)) draw = ImageDraw.Draw(img) # Add text draw.text((400, 280), f"Preview not available", fill=(0, 0, 0)) draw.text((400, 320), f"View: {angle_name}", fill=(0, 0, 0)) # Save the image img.save(output_path) logger.info(f"Created placeholder image: {output_path}") return output_path except Exception as e: logger.error(f"Error creating placeholder image: {str(e)}") # If all else fails, return the path anyway return output_path def create_composite_preview(self, previews: Dict[str, str], output_path: str) -> str: """ Create a composite image from multiple angle previews. Args: previews: Dictionary mapping angle names to preview image paths output_path: Path to save the composite image Returns: Path to the created composite image """ try: # Create a blank image img = Image.new('RGB', (1600, 1200), color=(240, 240, 240)) # Load and paste each preview positions = { 'perspective': (0, 0), 'front': (800, 0), 'top': (0, 600), 'right': (800, 600) } for angle_name, preview_path in previews.items(): if angle_name in positions and os.path.exists(preview_path): try: angle_img = Image.open(preview_path) # Resize if needed angle_img = angle_img.resize((800, 600)) # Paste into composite img.paste(angle_img, positions[angle_name]) except Exception as e: logger.error(f"Error processing {angle_name} preview: {str(e)}") # Save the composite image img.save(output_path) logger.info(f"Created composite preview: {output_path}") return output_path except Exception as e: logger.error(f"Error creating composite preview: {str(e)}") # If all else fails, return the path anyway return output_path ``` -------------------------------------------------------------------------------- /src/testing/primitive_tester.py: -------------------------------------------------------------------------------- ```python import os import logging from typing import Dict, Any, List, Optional, Tuple from src.models.code_generator import OpenSCADCodeGenerator from src.utils.cad_exporter import CADExporter logger = logging.getLogger(__name__) class PrimitiveTester: """Tests OpenSCAD primitives with different export formats.""" def __init__(self, code_generator: OpenSCADCodeGenerator, cad_exporter: CADExporter, output_dir: str = "test_output"): """ Initialize the primitive tester. Args: code_generator: CodeGenerator instance for generating OpenSCAD code cad_exporter: CADExporter instance for exporting models output_dir: Directory to store test output """ self.code_generator = code_generator self.cad_exporter = cad_exporter self.output_dir = output_dir # Create output directory os.makedirs(output_dir, exist_ok=True) # Primitive types to test self.primitives = [ "cube", "sphere", "cylinder", "cone", "torus", "rounded_box", "hexagonal_prism", "text" ] # Export formats to test (no STL per requirements) self.formats = ["3mf", "amf", "csg", "scad"] def test_all_primitives(self) -> Dict[str, Dict[str, Any]]: """ Test all primitives with all formats. Returns: Dictionary of test results for each primitive """ results = {} for primitive in self.primitives: results[primitive] = self.test_primitive(primitive) return results def test_primitive(self, primitive_type: str) -> Dict[str, Any]: """ Test a single primitive with all formats. Args: primitive_type: Type of primitive to test Returns: Dictionary of test results for the primitive """ results = { "primitive": primitive_type, "formats": {} } # Generate default parameters for the primitive params = self._get_default_parameters(primitive_type) # Generate the SCAD code scad_file = self.code_generator.generate_code(primitive_type, params) # Test export to each format for format_type in self.formats: success, output_file, error = self.cad_exporter.export_model( scad_file, format_type, params, metadata={"primitive_type": primitive_type} ) results["formats"][format_type] = { "success": success, "output_file": output_file, "error": error } return results def _get_default_parameters(self, primitive_type: str) -> Dict[str, Any]: """ Get default parameters for a primitive type. Args: primitive_type: Type of primitive Returns: Dictionary of default parameters """ params = {} if primitive_type == "cube": params = {"width": 20, "depth": 20, "height": 20, "center": True} elif primitive_type == "sphere": params = {"radius": 10, "segments": 32} elif primitive_type == "cylinder": params = {"radius": 10, "height": 20, "center": True, "segments": 32} elif primitive_type == "cone": params = {"bottom_radius": 10, "top_radius": 0, "height": 20, "center": True} elif primitive_type == "torus": params = {"outer_radius": 20, "inner_radius": 5, "segments": 32} elif primitive_type == "rounded_box": params = {"width": 30, "depth": 20, "height": 15, "radius": 3} elif primitive_type == "hexagonal_prism": params = {"radius": 10, "height": 20} elif primitive_type == "text": params = {"text": "OpenSCAD", "size": 10, "height": 3} return params def test_with_parameter_variations(self, primitive_type: str) -> Dict[str, Any]: """ Test a primitive with variations of parameters. Args: primitive_type: Type of primitive to test Returns: Dictionary of test results for different parameter variations """ results = { "primitive": primitive_type, "variations": {} } # Define parameter variations for the primitive variations = self._get_parameter_variations(primitive_type) # Test each variation for variation_name, params in variations.items(): # Generate the SCAD code scad_file = self.code_generator.generate_code(primitive_type, params) # Test export to each format format_results = {} for format_type in self.formats: success, output_file, error = self.cad_exporter.export_model( scad_file, format_type, params, metadata={"primitive_type": primitive_type, "variation": variation_name} ) format_results[format_type] = { "success": success, "output_file": output_file, "error": error } results["variations"][variation_name] = { "parameters": params, "formats": format_results } return results def _get_parameter_variations(self, primitive_type: str) -> Dict[str, Dict[str, Any]]: """ Get parameter variations for a primitive type. Args: primitive_type: Type of primitive Returns: Dictionary of parameter variations """ variations = {} if primitive_type == "cube": variations = { "small": {"width": 5, "depth": 5, "height": 5, "center": True}, "large": {"width": 50, "depth": 50, "height": 50, "center": True}, "flat": {"width": 50, "depth": 50, "height": 2, "center": True}, "tall": {"width": 10, "depth": 10, "height": 100, "center": True} } elif primitive_type == "sphere": variations = { "small": {"radius": 2, "segments": 16}, "large": {"radius": 30, "segments": 64}, "low_res": {"radius": 10, "segments": 8}, "high_res": {"radius": 10, "segments": 128} } elif primitive_type == "cylinder": variations = { "small": {"radius": 2, "height": 5, "center": True, "segments": 16}, "large": {"radius": 30, "height": 50, "center": True, "segments": 64}, "thin": {"radius": 1, "height": 50, "center": True, "segments": 32}, "disc": {"radius": 30, "height": 2, "center": True, "segments": 32} } # Add variations for other primitives as needed return variations ``` -------------------------------------------------------------------------------- /src/ai/venice_api.py: -------------------------------------------------------------------------------- ```python """ Venice.ai API client for image generation using the Flux model. """ import os import requests import logging from typing import Dict, Any, Optional, List, Tuple from pathlib import Path logger = logging.getLogger(__name__) # Venice.ai model mapping and descriptions VENICE_MODELS = { # Model name: (aliases, description) "fluently-xl": ( ["fast", "quick", "fastest", "speed", "rapid", "efficient"], "Fastest model (2.30s) with good quality" ), "flux-dev": ( ["high quality", "detailed", "hq", "best quality", "premium"], "High-quality model with detailed results" ), "flux-dev-uncensored": ( ["uncensored", "unfiltered", "unrestricted"], "Uncensored version of the flux-dev model" ), "stable-diffusion-3.5": ( ["stable diffusion", "sd3", "sd3.5", "standard"], "Stable Diffusion 3.5 model" ), "pony-realism": ( ["realistic", "realism", "pony", "photorealistic"], "Specialized model for realistic outputs" ), "lustify-sdxl": ( ["stylized", "artistic", "creative", "lustify"], "Artistic stylization model" ), } class VeniceImageGenerator: """Client for Venice.ai's image generation API.""" def __init__(self, api_key: str, output_dir: str = "output/images"): """ Initialize the Venice.ai API client. Args: api_key: API key for Venice.ai output_dir: Directory to store generated images """ self.api_key = api_key if not self.api_key: logger.warning("No Venice.ai API key provided") # API endpoint from documentation self.base_url = "https://api.venice.ai/api/v1" self.api_endpoint = f"{self.base_url}/image/generate" self.output_dir = output_dir # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) def map_model_preference(self, preference: str) -> str: """ Map a natural language preference to a Venice.ai model name. Args: preference: Natural language description of desired model Returns: Name of the matching Venice.ai model """ if not preference or preference.lower() in ["default", "fluently-xl", "fluently xl"]: return "fluently-xl" preference = preference.lower() # Check for exact matches first for model_name in VENICE_MODELS: if model_name.lower() == preference: return model_name # Check for keyword matches for model_name, (aliases, _) in VENICE_MODELS.items(): for alias in aliases: if alias in preference: return model_name # Default to fluently-xl if no match found return "fluently-xl" def generate_image(self, prompt: str, model: str = "fluently-xl", width: int = 1024, height: int = 1024, output_path: Optional[str] = None) -> Dict[str, Any]: """ Generate an image using Venice.ai's API. Args: prompt: Text description for image generation model: Model to use - can be a specific model name or natural language description: - "fluently-xl" (default): Fastest model (2.30s) with good quality - "flux-dev": High-quality model with detailed results - "flux-dev-uncensored": Uncensored version of the flux-dev model - "stable-diffusion-3.5": Stable Diffusion 3.5 model - "pony-realism": Specialized model for realistic outputs - "lustify-sdxl": Artistic stylization model - Or use natural language like "high quality", "fastest", "realistic", etc. width: Image width height: Image height output_path: Optional path to save the generated image Returns: Dictionary containing image data and metadata """ if not self.api_key: raise ValueError("Venice.ai API key is required") # Map the model preference to a specific model name mapped_model = self.map_model_preference(model) # Prepare request payload payload = { "model": mapped_model, "prompt": prompt, "height": height, "width": width, "steps": 20, "return_binary": False, "hide_watermark": True, # Remove watermark as requested "format": "png", "embed_exif_metadata": False } # Set up headers with API key headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } try: # Make API request logger.info(f"Sending request to {self.api_endpoint}") response = requests.post( self.api_endpoint, json=payload, headers=headers ) # Check response status if response.status_code != 200: error_msg = f"Error generating image: {response.status_code} - {response.text}" logger.error(error_msg) return {"error": error_msg} # Process response result = response.json() # Add the mapped model to the result result["model"] = mapped_model # Generate output path if not provided if not output_path: # Create a filename based on the prompt filename = f"{prompt[:20].replace(' ', '_')}_{mapped_model}.png" output_path = os.path.join(self.output_dir, filename) # Save image if images array is in the result if "images" in result and len(result["images"]) > 0: image_url = result["images"][0] self._download_image(image_url, output_path) result["local_path"] = output_path result["image_url"] = image_url return result except requests.exceptions.RequestException as e: logger.error(f"Error generating image with Venice.ai: {str(e)}") raise def _download_image(self, image_url: str, output_path: str) -> None: """ Download image from URL and save to local path. Args: image_url: URL of the image to download output_path: Path to save the downloaded image """ try: response = requests.get(image_url, stream=True) response.raise_for_status() # Ensure directory exists os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) logger.info(f"Image saved to {output_path}") except Exception as e: logger.error(f"Error downloading image: {str(e)}") raise ``` -------------------------------------------------------------------------------- /old/src/ai/sam_segmentation.py: -------------------------------------------------------------------------------- ```python """ SAM2 (Segment Anything Model 2) integration for object segmentation. """ import os import cv2 import numpy as np import logging from typing import Dict, Any, List, Tuple, Optional from pathlib import Path logger = logging.getLogger(__name__) class SAMSegmenter: """ Wrapper for Segment Anything Model 2 (SAM2) for object segmentation. """ def __init__(self, model_type: str = "vit_h", checkpoint_path: Optional[str] = None, use_gpu: bool = True, output_dir: str = "output/masks"): """ Initialize the SAM2 segmenter. Args: model_type: SAM2 model type ("vit_h", "vit_l", "vit_b") checkpoint_path: Path to model checkpoint use_gpu: Whether to use GPU for inference output_dir: Directory to store segmentation results """ self.model_type = model_type self.checkpoint_path = checkpoint_path self.use_gpu = use_gpu self.output_dir = output_dir # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Model will be initialized on first use to avoid loading it unnecessarily self.model = None self.predictor = None def _initialize_model(self) -> None: """ Initialize the SAM2 model. Note: This requires PyTorch and the segment-anything-2 package to be installed. """ try: # Import here to avoid dependency issues if SAM2 is not installed import torch from segment_anything_2 import sam_model_registry, SamPredictor if not self.checkpoint_path: raise ValueError("SAM2 checkpoint path is required") # Check if checkpoint exists if not os.path.exists(self.checkpoint_path): raise FileNotFoundError(f"SAM2 checkpoint not found at {self.checkpoint_path}") # Determine device device = "cuda" if self.use_gpu and torch.cuda.is_available() else "cpu" # Load SAM2 model self.model = sam_model_registry[self.model_type](checkpoint=self.checkpoint_path) self.model.to(device=device) self.predictor = SamPredictor(self.model) logger.info(f"Initialized SAM2 model ({self.model_type}) on {device}") except ImportError as e: logger.error(f"Required packages not installed: {str(e)}") raise except Exception as e: logger.error(f"Error initializing SAM2 model: {str(e)}") raise def segment_image(self, image_path: str, points: Optional[List[Tuple[int, int]]] = None, output_dir: Optional[str] = None) -> Dict[str, Any]: """ Segment objects in an image using SAM2. Args: image_path: Path to input image points: Optional list of (x, y) points to guide segmentation output_dir: Optional directory to save segmentation results Returns: Dictionary containing segmentation masks and metadata """ # Initialize model if not already initialized if self.model is None: self._initialize_model() try: # Load image image = cv2.imread(image_path) if image is None: raise ValueError(f"Could not load image from {image_path}") image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Set image in predictor self.predictor.set_image(image) # Generate masks if points: # Convert points to numpy arrays import numpy as np point_coords = np.array(points) point_labels = np.ones(len(points)) # Generate masks from points masks, scores, logits = self.predictor.predict( point_coords=point_coords, point_labels=point_labels, multimask_output=True ) else: # Automatic segmentation (using center point) h, w = image.shape[:2] center_point = np.array([[w//2, h//2]]) center_label = np.array([1]) masks, scores, logits = self.predictor.predict( point_coords=center_point, point_labels=center_label, multimask_output=True ) # Use provided output directory or default output_dir = output_dir or os.path.join(self.output_dir, Path(image_path).stem) os.makedirs(output_dir, exist_ok=True) # Process results masked_images = [] for i, mask in enumerate(masks): # Apply mask to image masked_image = self._apply_mask_to_image(image, mask) # Save masked image output_path = os.path.join(output_dir, f"mask_{i}.png") cv2.imwrite(output_path, cv2.cvtColor(masked_image, cv2.COLOR_RGB2BGR)) masked_images.append(output_path) # Convert numpy arrays to lists for JSON serialization result = { "image_path": image_path, "masked_images": masked_images, "scores": scores.tolist(), "mask_count": len(masks) } return result except Exception as e: logger.error(f"Error segmenting image: {str(e)}") raise def _apply_mask_to_image(self, image: np.ndarray, mask: np.ndarray) -> np.ndarray: """ Apply mask to image, keeping only the masked region. Args: image: Input image as numpy array mask: Binary mask as numpy array Returns: Masked image as numpy array """ # Create a copy of the image masked_image = image.copy() # Apply mask masked_image[~mask] = [0, 0, 0] # Set background to black return masked_image def segment_with_auto_points(self, image_path: str, num_points: int = 5, output_dir: Optional[str] = None) -> Dict[str, Any]: """ Segment image using automatically generated points with SAM2. Args: image_path: Path to input image num_points: Number of points to generate output_dir: Optional directory to save segmentation results Returns: Dictionary containing segmentation masks and metadata """ # Load image image = cv2.imread(image_path) if image is None: raise ValueError(f"Could not load image from {image_path}") h, w = image.shape[:2] # Generate points in a grid pattern points = [] rows = int(np.sqrt(num_points)) cols = num_points // rows for i in range(rows): for j in range(cols): x = int(w * (j + 0.5) / cols) y = int(h * (i + 0.5) / rows) points.append((x, y)) # Segment with generated points return self.segment_image(image_path, points, output_dir) ``` -------------------------------------------------------------------------------- /src/models/cuda_mvs.py: -------------------------------------------------------------------------------- ```python """ CUDA Multi-View Stereo wrapper for 3D reconstruction from multiple images. """ import os import subprocess import logging import json from typing import Dict, Any, List, Optional from pathlib import Path logger = logging.getLogger(__name__) class CUDAMultiViewStereo: """ Wrapper for CUDA Multi-View Stereo for 3D reconstruction from multiple images. """ def __init__(self, cuda_mvs_path: str, output_dir: str = "output/models"): """ Initialize the CUDA MVS wrapper. Args: cuda_mvs_path: Path to CUDA MVS installation output_dir: Directory to store output files """ self.cuda_mvs_path = cuda_mvs_path self.output_dir = output_dir # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Validate installation self._validate_installation() def _validate_installation(self) -> None: """ Validate CUDA MVS installation. Raises: FileNotFoundError: If CUDA MVS installation is not found """ if not os.path.exists(self.cuda_mvs_path): raise FileNotFoundError(f"CUDA MVS not found at {self.cuda_mvs_path}") # Check for required executables required_files = ["app_patch_match_mvs"] for file in required_files: exec_path = os.path.join(self.cuda_mvs_path, "build", file) if not os.path.exists(exec_path): raise FileNotFoundError(f"Required executable {file} not found at {exec_path}") def generate_model_from_images(self, image_paths: List[str], camera_params: Optional[Dict[str, Any]] = None, output_name: str = "model") -> Dict[str, Any]: """ Generate a 3D model from multiple images using CUDA MVS. Args: image_paths: List of paths to input images camera_params: Optional camera parameters output_name: Name for the output files Returns: Dictionary containing paths to generated model files """ try: # Create a unique directory for this reconstruction model_dir = os.path.join(self.output_dir, output_name) os.makedirs(model_dir, exist_ok=True) # Create a camera parameters file if provided params_file = None if camera_params: params_file = os.path.join(model_dir, "camera_params.json") with open(params_file, 'w') as f: json.dump(camera_params, f, indent=2) # Generate camera parameters if not provided if not params_file: params_file = self._generate_camera_params(image_paths, model_dir) # Generate point cloud point_cloud_file = os.path.join(model_dir, f"{output_name}.ply") # Run CUDA MVS cmd = [ os.path.join(self.cuda_mvs_path, "build", "app_patch_match_mvs"), "--image_dir", os.path.dirname(image_paths[0]), "--camera_params", params_file, "--output_file", point_cloud_file ] logger.info(f"Running CUDA MVS with command: {' '.join(cmd)}") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Wait for process to complete stdout, stderr = process.communicate() if process.returncode != 0: logger.error(f"Error running CUDA MVS: {stderr}") raise RuntimeError(f"CUDA MVS failed with exit code {process.returncode}") # Check if output file was created if not os.path.exists(point_cloud_file): raise FileNotFoundError(f"Output point cloud file not found at {point_cloud_file}") return { "model_id": output_name, "output_dir": model_dir, "point_cloud_file": point_cloud_file, "camera_params_file": params_file, "input_images": image_paths } except Exception as e: logger.error(f"Error generating 3D model with CUDA MVS: {str(e)}") raise def _generate_camera_params(self, image_paths: List[str], model_dir: str) -> str: """ Generate camera parameters from images. Args: image_paths: List of paths to input images model_dir: Directory to save parameter file Returns: Path to camera parameters file """ # This is a simplified version for demonstration # In a real implementation, this would use SfM or camera estimation params = [] for i, img_path in enumerate(image_paths): # Extract image dimensions from PIL import Image with Image.open(img_path) as img: width, height = img.size # Generate simple camera parameters # In reality, these would be estimated from the images # or provided by the user params.append({ "image_id": i, "image_name": os.path.basename(img_path), "width": width, "height": height, "camera": { "model": "PINHOLE", "focal_length": min(width, height), "principal_point": [width / 2, height / 2], "rotation": [1, 0, 0, 0, 1, 0, 0, 0, 1], "translation": [0, 0, 0] } }) # Write parameters to file params_file = os.path.join(model_dir, "camera_params.json") with open(params_file, 'w') as f: json.dump(params, f, indent=2) return params_file def convert_ply_to_obj(self, ply_file: str, output_dir: Optional[str] = None) -> str: """ Convert PLY point cloud to OBJ mesh. Args: ply_file: Path to input PLY file output_dir: Directory to save output OBJ file Returns: Path to output OBJ file """ # In a real implementation, this would use a mesh reconstruction library # such as Open3D or PyMeshLab to convert the point cloud to a mesh if not output_dir: output_dir = os.path.dirname(ply_file) # Generate output file path obj_file = os.path.join(output_dir, f"{Path(ply_file).stem}.obj") logger.info(f"Converting PLY to OBJ: {ply_file} -> {obj_file}") # This is a placeholder for the actual conversion # In a real implementation, you would use a library like Open3D: # import open3d as o3d # pcd = o3d.io.read_point_cloud(ply_file) # mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson(pcd)[0] # o3d.io.write_triangle_mesh(obj_file, mesh) # For now, we'll just create a dummy OBJ file with open(obj_file, 'w') as f: f.write(f"# Converted from {os.path.basename(ply_file)}\n") f.write("# This is a placeholder OBJ file\n") f.write("v 0 0 0\n") f.write("v 1 0 0\n") f.write("v 0 1 0\n") f.write("f 1 2 3\n") return obj_file ``` -------------------------------------------------------------------------------- /src/utils/format_validator.py: -------------------------------------------------------------------------------- ```python import os import logging import zipfile import xml.etree.ElementTree as ET from typing import Tuple, Optional, Dict, Any logger = logging.getLogger(__name__) class FormatValidator: """Validates 3D model formats for compatibility with printers.""" @staticmethod def validate_3mf(file_path: str) -> Tuple[bool, Optional[str]]: """ Validate a 3MF file for compatibility with Prusa and Bambu printers. Args: file_path: Path to the 3MF file Returns: Tuple of (is_valid, error_message) """ if not os.path.exists(file_path): return False, f"File not found: {file_path}" try: # 3MF files are ZIP archives with XML content with zipfile.ZipFile(file_path, 'r') as zip_ref: # Check for required files required_files = ['3D/3dmodel.model', '[Content_Types].xml'] for req_file in required_files: try: zip_ref.getinfo(req_file) except KeyError: return False, f"Missing required file in 3MF: {req_file}" # Validate 3D model file with zip_ref.open('3D/3dmodel.model') as model_file: tree = ET.parse(model_file) root = tree.getroot() # Check for required elements if root.tag != '{http://schemas.microsoft.com/3dmanufacturing/core/2015/02}model': return False, "Invalid 3MF: Missing model element" # Verify resources section exists resources = root.find('.//{http://schemas.microsoft.com/3dmanufacturing/core/2015/02}resources') if resources is None: return False, "Invalid 3MF: Missing resources element" return True, None except Exception as e: logger.error(f"Error validating 3MF file: {str(e)}") return False, f"Error validating 3MF file: {str(e)}" @staticmethod def validate_amf(file_path: str) -> Tuple[bool, Optional[str]]: """ Validate an AMF file for compatibility with printers. Args: file_path: Path to the AMF file Returns: Tuple of (is_valid, error_message) """ if not os.path.exists(file_path): return False, f"File not found: {file_path}" try: # Parse the AMF file (XML format) tree = ET.parse(file_path) root = tree.getroot() # Check for required elements if root.tag != 'amf': return False, "Invalid AMF: Missing amf root element" # Check for at least one object objects = root.findall('./object') if not objects: return False, "Invalid AMF: No objects found" # Check that each object has a mesh for obj in objects: mesh = obj.find('./mesh') if mesh is None: return False, f"Invalid AMF: Object {obj.get('id', 'unknown')} is missing a mesh" # Check for vertices and volumes vertices = mesh.find('./vertices') volumes = mesh.findall('./volume') if vertices is None: return False, f"Invalid AMF: Mesh in object {obj.get('id', 'unknown')} is missing vertices" if not volumes: return False, f"Invalid AMF: Mesh in object {obj.get('id', 'unknown')} has no volumes" return True, None except Exception as e: logger.error(f"Error validating AMF file: {str(e)}") return False, f"Error validating AMF file: {str(e)}" @staticmethod def extract_metadata(file_path: str) -> Dict[str, Any]: """ Extract metadata from a 3MF or AMF file. Args: file_path: Path to the 3D model file Returns: Dictionary of metadata """ metadata = {} # Check file extension ext = os.path.splitext(file_path)[1].lower() try: if ext == '.3mf': with zipfile.ZipFile(file_path, 'r') as zip_ref: metadata_path = "Metadata/model_metadata.xml" try: with zip_ref.open(metadata_path) as f: tree = ET.parse(f) root = tree.getroot() for meta in root.findall('./meta'): name = meta.get('name') if name: metadata[name] = meta.text except KeyError: # Metadata file doesn't exist pass elif ext == '.amf': tree = ET.parse(file_path) root = tree.getroot() for meta in root.findall('./metadata'): name = meta.get('type') if name: metadata[name] = meta.text except Exception as e: logger.error(f"Error extracting metadata: {str(e)}") return metadata @staticmethod def check_printer_compatibility(file_path: str, printer_type: str = "prusa") -> Tuple[bool, Optional[str]]: """ Check if a 3D model file is compatible with a specific printer type. Args: file_path: Path to the 3D model file printer_type: Type of printer ("prusa" or "bambu") Returns: Tuple of (is_compatible, error_message) """ # Check file extension ext = os.path.splitext(file_path)[1].lower() # Validate based on file format if ext == '.3mf': is_valid, error = FormatValidator.validate_3mf(file_path) if not is_valid: return False, error # Additional printer-specific checks if printer_type.lower() == "prusa": # Prusa-specific checks for 3MF # For now, just basic validation is sufficient return True, None elif printer_type.lower() == "bambu": # Bambu-specific checks for 3MF # For now, just basic validation is sufficient return True, None else: return False, f"Unknown printer type: {printer_type}" elif ext == '.amf': is_valid, error = FormatValidator.validate_amf(file_path) if not is_valid: return False, error # Additional printer-specific checks if printer_type.lower() == "prusa": # Prusa-specific checks for AMF # For now, just basic validation is sufficient return True, None elif printer_type.lower() == "bambu": # Bambu-specific checks for AMF # For now, just basic validation is sufficient return True, None else: return False, f"Unknown printer type: {printer_type}" else: return False, f"Unsupported file format for printer compatibility check: {ext}" ``` -------------------------------------------------------------------------------- /old/src/models/threestudio_generator.py: -------------------------------------------------------------------------------- ```python """ threestudio integration for 3D model generation from images. """ import os import subprocess import logging import json import tempfile from typing import Dict, Any, List, Optional from pathlib import Path logger = logging.getLogger(__name__) class ThreeStudioGenerator: """ Wrapper for threestudio for 3D model generation from images. """ def __init__(self, threestudio_path: str, output_dir: str = "output/models"): """ Initialize the threestudio generator. Args: threestudio_path: Path to threestudio installation output_dir: Directory to store output files """ self.threestudio_path = threestudio_path self.output_dir = output_dir # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Validate threestudio installation self._validate_installation() def _validate_installation(self) -> None: """ Validate threestudio installation. Raises: FileNotFoundError: If threestudio installation is not found """ if not os.path.exists(self.threestudio_path): raise FileNotFoundError(f"threestudio not found at {self.threestudio_path}") # Check for required files required_files = ["launch.py", "README.md"] for file in required_files: if not os.path.exists(os.path.join(self.threestudio_path, file)): raise FileNotFoundError(f"Required file {file} not found in threestudio directory") def generate_model_from_image(self, image_path: str, method: str = "zero123", num_iterations: int = 5000, export_format: str = "obj", config_overrides: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Generate a 3D model from an image using threestudio. Args: image_path: Path to input image method: Method to use ("zero123", "sjc", "magic3d", etc.) num_iterations: Number of training iterations export_format: Format to export ("obj", "glb", "ply") config_overrides: Optional configuration overrides Returns: Dictionary containing paths to generated model files """ try: # Create a unique ID for this generation model_id = Path(image_path).stem # Create a temporary config file config_file = self._create_config_file(image_path, method, num_iterations, config_overrides) # Run threestudio output_dir = os.path.join(self.output_dir, model_id) os.makedirs(output_dir, exist_ok=True) cmd = [ "python", "launch.py", "--config", config_file, "--train", "--gpu", "0", "--output_dir", output_dir ] logger.info(f"Running threestudio with command: {' '.join(cmd)}") # Execute in threestudio directory process = subprocess.Popen( cmd, cwd=self.threestudio_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Wait for process to complete stdout, stderr = process.communicate() if process.returncode != 0: logger.error(f"Error running threestudio: {stderr}") raise RuntimeError(f"threestudio failed with exit code {process.returncode}") # Export model exported_files = self._export_model(output_dir, export_format) return { "model_id": model_id, "output_dir": output_dir, "exported_files": exported_files, "preview_images": self._get_preview_images(output_dir) } except Exception as e: logger.error(f"Error generating 3D model with threestudio: {str(e)}") raise def _create_config_file(self, image_path: str, method: str, num_iterations: int, config_overrides: Optional[Dict[str, Any]] = None) -> str: """ Create a configuration file for threestudio. Args: image_path: Path to input image method: Method to use num_iterations: Number of training iterations config_overrides: Optional configuration overrides Returns: Path to the created configuration file """ # Base configuration config = { "method": method, "image_path": os.path.abspath(image_path), "num_iterations": num_iterations, "save_interval": 1000, "export_interval": 1000 } # Apply overrides if config_overrides: config.update(config_overrides) # Write to temporary file fd, config_file = tempfile.mkstemp(suffix=".json") with os.fdopen(fd, 'w') as f: json.dump(config, f, indent=2) return config_file def _export_model(self, output_dir: str, export_format: str) -> List[str]: """ Export the model in the specified format. Args: output_dir: Directory containing the model export_format: Format to export Returns: List of paths to exported files """ # Find the latest checkpoint checkpoints_dir = os.path.join(output_dir, "checkpoints") if not os.path.exists(checkpoints_dir): raise FileNotFoundError(f"Checkpoints directory not found: {checkpoints_dir}") # Get the latest checkpoint checkpoints = sorted([f for f in os.listdir(checkpoints_dir) if f.endswith(".ckpt")]) if not checkpoints: raise FileNotFoundError("No checkpoints found") latest_checkpoint = os.path.join(checkpoints_dir, checkpoints[-1]) # Export command cmd = [ "python", "launch.py", "--config", os.path.join(output_dir, "config.yaml"), "--export", "--gpu", "0", "--checkpoint", latest_checkpoint, "--export_format", export_format ] logger.info(f"Exporting model with command: {' '.join(cmd)}") # Execute in threestudio directory process = subprocess.Popen( cmd, cwd=self.threestudio_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Wait for process to complete stdout, stderr = process.communicate() if process.returncode != 0: logger.error(f"Error exporting model: {stderr}") raise RuntimeError(f"Model export failed with exit code {process.returncode}") # Find exported files exports_dir = os.path.join(output_dir, "exports") if not os.path.exists(exports_dir): raise FileNotFoundError(f"Exports directory not found: {exports_dir}") exported_files = [os.path.join(exports_dir, f) for f in os.listdir(exports_dir)] return exported_files def _get_preview_images(self, output_dir: str) -> List[str]: """ Get paths to preview images. Args: output_dir: Directory containing the model Returns: List of paths to preview images """ # Find preview images previews_dir = os.path.join(output_dir, "images") if not os.path.exists(previews_dir): return [] preview_images = [os.path.join(previews_dir, f) for f in os.listdir(previews_dir) if f.endswith(".png") or f.endswith(".jpg")] return sorted(preview_images) ``` -------------------------------------------------------------------------------- /test_image_approval_workflow.py: -------------------------------------------------------------------------------- ```python """ Test script for the image approval workflow. """ import os import sys import json import logging import unittest from unittest.mock import patch, MagicMock from pathlib import Path # Add the src directory to the path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.workflow.image_approval import ImageApprovalManager from src.config import IMAGE_APPROVAL # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TestImageApprovalWorkflow(unittest.TestCase): """ Test cases for the image approval workflow. """ def setUp(self): """ Set up test environment. """ # Create test output directories self.test_output_dir = "output/test_image_approval" self.test_approved_dir = os.path.join(self.test_output_dir, "approved") os.makedirs(self.test_output_dir, exist_ok=True) os.makedirs(self.test_approved_dir, exist_ok=True) # Create the approval manager self.approval_manager = ImageApprovalManager( output_dir=self.test_output_dir, approved_dir=self.test_approved_dir, min_approved_images=3, auto_approve=False ) # Create test images self.test_images = [] for i in range(5): image_path = os.path.join(self.test_output_dir, f"test_image_{i}.png") with open(image_path, "w") as f: f.write(f"test image data {i}") self.test_images.append({ "id": f"image_{i}", "local_path": image_path, "view_index": i, "view_direction": f"view_{i}" }) def test_add_images(self): """ Test adding images to the approval manager. """ # Add images self.approval_manager.add_images(self.test_images) # Verify images were added self.assertEqual(len(self.approval_manager.images), 5) self.assertEqual(len(self.approval_manager.pending_images), 5) self.assertEqual(len(self.approval_manager.approved_images), 0) self.assertEqual(len(self.approval_manager.rejected_images), 0) def test_approve_image(self): """ Test approving an image. """ # Add images self.approval_manager.add_images(self.test_images) # Approve an image result = self.approval_manager.approve_image("image_0") # Verify the result self.assertTrue(result["success"]) self.assertEqual(result["image_id"], "image_0") self.assertEqual(result["status"], "approved") # Verify the image was moved to approved self.assertEqual(len(self.approval_manager.pending_images), 4) self.assertEqual(len(self.approval_manager.approved_images), 1) self.assertEqual(len(self.approval_manager.rejected_images), 0) # Verify the image was copied to the approved directory approved_path = os.path.join(self.test_approved_dir, "image_0.png") self.assertTrue(os.path.exists(approved_path)) def test_reject_image(self): """ Test rejecting an image. """ # Add images self.approval_manager.add_images(self.test_images) # Reject an image result = self.approval_manager.reject_image("image_1") # Verify the result self.assertTrue(result["success"]) self.assertEqual(result["image_id"], "image_1") self.assertEqual(result["status"], "rejected") # Verify the image was moved to rejected self.assertEqual(len(self.approval_manager.pending_images), 4) self.assertEqual(len(self.approval_manager.approved_images), 0) self.assertEqual(len(self.approval_manager.rejected_images), 1) def test_get_approval_status(self): """ Test getting the approval status. """ # Add images self.approval_manager.add_images(self.test_images) # Approve some images self.approval_manager.approve_image("image_0") self.approval_manager.approve_image("image_1") self.approval_manager.approve_image("image_2") # Reject an image self.approval_manager.reject_image("image_3") # Get the status status = self.approval_manager.get_status() # Verify the status self.assertEqual(status["total_images"], 5) self.assertEqual(status["pending_count"], 1) self.assertEqual(status["approved_count"], 3) self.assertEqual(status["rejected_count"], 1) self.assertTrue(status["has_minimum_approved"]) self.assertEqual(len(status["approved_images"]), 3) self.assertEqual(len(status["pending_images"]), 1) self.assertEqual(len(status["rejected_images"]), 1) def test_get_approved_images(self): """ Test getting approved images. """ # Add images self.approval_manager.add_images(self.test_images) # Approve some images self.approval_manager.approve_image("image_0") self.approval_manager.approve_image("image_2") self.approval_manager.approve_image("image_4") # Get approved images approved = self.approval_manager.get_approved_images() # Verify approved images self.assertEqual(len(approved), 3) self.assertEqual(approved[0]["id"], "image_0") self.assertEqual(approved[1]["id"], "image_2") self.assertEqual(approved[2]["id"], "image_4") def test_auto_approve(self): """ Test auto-approval mode. """ # Create an auto-approve manager auto_manager = ImageApprovalManager( output_dir=self.test_output_dir, approved_dir=self.test_approved_dir, min_approved_images=3, auto_approve=True ) # Add images auto_manager.add_images(self.test_images) # Verify all images were auto-approved self.assertEqual(len(auto_manager.pending_images), 0) self.assertEqual(len(auto_manager.approved_images), 5) self.assertEqual(len(auto_manager.rejected_images), 0) def test_has_minimum_approved(self): """ Test checking if minimum approved images are met. """ # Add images self.approval_manager.add_images(self.test_images) # Initially should not have minimum self.assertFalse(self.approval_manager.has_minimum_approved()) # Approve two images self.approval_manager.approve_image("image_0") self.approval_manager.approve_image("image_1") # Still should not have minimum self.assertFalse(self.approval_manager.has_minimum_approved()) # Approve one more image self.approval_manager.approve_image("image_2") # Now should have minimum self.assertTrue(self.approval_manager.has_minimum_approved()) def test_save_and_load_state(self): """ Test saving and loading the approval state. """ # Add images self.approval_manager.add_images(self.test_images) # Approve and reject some images self.approval_manager.approve_image("image_0") self.approval_manager.approve_image("image_2") self.approval_manager.reject_image("image_3") # Save the state state_file = os.path.join(self.test_output_dir, "approval_state.json") self.approval_manager.save_state(state_file) # Create a new manager new_manager = ImageApprovalManager( output_dir=self.test_output_dir, approved_dir=self.test_approved_dir, min_approved_images=3, auto_approve=False ) # Load the state new_manager.load_state(state_file) # Verify the state was loaded correctly self.assertEqual(len(new_manager.images), 5) self.assertEqual(len(new_manager.pending_images), 2) self.assertEqual(len(new_manager.approved_images), 2) self.assertEqual(len(new_manager.rejected_images), 1) def tearDown(self): """ Clean up after tests. """ # Clean up test output directory import shutil if os.path.exists(self.test_output_dir): shutil.rmtree(self.test_output_dir) if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /old/src/workflow/image_to_model_pipeline.py: -------------------------------------------------------------------------------- ```python """ Workflow orchestration for the image-to-model pipeline. """ import os import logging import uuid from typing import Dict, Any, List, Optional, Tuple from pathlib import Path logger = logging.getLogger(__name__) class ImageToModelPipeline: """ Orchestrates the workflow from text prompt to 3D model: 1. Generate image with Venice.ai 2. Segment object with SAM2 3. Create 3D model with threestudio 4. Convert to OpenSCAD for parametric editing """ def __init__(self, venice_generator, sam_segmenter, threestudio_generator, openscad_wrapper, output_dir: str = "output/pipeline"): """ Initialize the pipeline. Args: venice_generator: Instance of VeniceImageGenerator sam_segmenter: Instance of SAMSegmenter threestudio_generator: Instance of ThreeStudioGenerator openscad_wrapper: Instance of OpenSCADWrapper output_dir: Directory to store output files """ self.venice_generator = venice_generator self.sam_segmenter = sam_segmenter self.threestudio_generator = threestudio_generator self.openscad_wrapper = openscad_wrapper self.output_dir = output_dir # Create output directories os.makedirs(os.path.join(output_dir, "images"), exist_ok=True) os.makedirs(os.path.join(output_dir, "masks"), exist_ok=True) os.makedirs(os.path.join(output_dir, "models"), exist_ok=True) os.makedirs(os.path.join(output_dir, "scad"), exist_ok=True) def generate_model_from_text(self, prompt: str, venice_params: Optional[Dict[str, Any]] = None, sam_params: Optional[Dict[str, Any]] = None, threestudio_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Generate a 3D model from a text prompt. Args: prompt: Text description for image generation venice_params: Optional parameters for Venice.ai sam_params: Optional parameters for SAM2 threestudio_params: Optional parameters for threestudio Returns: Dictionary containing paths to generated files and metadata """ try: # Generate a unique ID for this pipeline run pipeline_id = str(uuid.uuid4()) logger.info(f"Starting pipeline {pipeline_id} for prompt: {prompt}") # Step 1: Generate image with Venice.ai image_path = os.path.join(self.output_dir, "images", f"{pipeline_id}.png") venice_result = self._generate_image(prompt, image_path, venice_params) # Step 2: Segment object with SAM2 masks_dir = os.path.join(self.output_dir, "masks", pipeline_id) sam_result = self._segment_image(image_path, masks_dir, sam_params) # Get the best mask (highest score or first mask if no scores) if "scores" in sam_result and sam_result["scores"]: best_mask_idx = sam_result["scores"].index(max(sam_result["scores"])) best_mask_path = sam_result["mask_paths"][best_mask_idx] else: # If no scores available, use the first mask best_mask_path = sam_result["mask_paths"][0] if sam_result.get("mask_paths") else None if not best_mask_path: raise ValueError("No valid mask generated from segmentation") # Step 3: Create 3D model with threestudio threestudio_result = self._generate_3d_model(best_mask_path, threestudio_params) # Step 4: Convert to OpenSCAD for parametric editing scad_result = self._convert_to_openscad(threestudio_result["exported_files"][0], pipeline_id) # Compile results result = { "pipeline_id": pipeline_id, "prompt": prompt, "image": venice_result, "segmentation": sam_result, "model_3d": threestudio_result, "openscad": scad_result } logger.info(f"Pipeline {pipeline_id} completed successfully") return result except Exception as e: logger.error(f"Error in pipeline: {str(e)}") raise def _generate_image(self, prompt: str, output_path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Generate image with Venice.ai. Args: prompt: Text description for image generation output_path: Path to save the generated image params: Optional parameters for Venice.ai Returns: Dictionary containing image data and metadata """ logger.info(f"Generating image for prompt: {prompt}") # Default parameters default_params = { "model": "fluently-xl", # Default to fastest model "width": 1024, "height": 1024 } # Merge with provided parameters if params: default_params.update(params) # Generate image result = self.venice_generator.generate_image( prompt=prompt, output_path=output_path, **default_params ) logger.info(f"Image generated: {output_path}") return result def _segment_image(self, image_path: str, output_dir: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Segment object with SAM2. Args: image_path: Path to input image output_dir: Directory to save segmentation results params: Optional parameters for SAM2 Returns: Dictionary containing segmentation masks and metadata """ logger.info(f"Segmenting image: {image_path}") # Segment image with SAM2 # Check if points are provided in params points = params.get("points") if params else None if points: result = self.sam_segmenter.segment_image( image_path=image_path, points=points, output_dir=output_dir ) else: # Use automatic point generation result = self.sam_segmenter.segment_with_auto_points( image_path=image_path, output_dir=output_dir ) logger.info(f"Image segmented, {result.get('num_masks', 0)} masks generated") return result def _generate_3d_model(self, image_path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Generate 3D model with threestudio. Args: image_path: Path to input image params: Optional parameters for threestudio Returns: Dictionary containing paths to generated model files """ logger.info(f"Generating 3D model from image: {image_path}") # Default parameters default_params = { "method": "zero123", "num_iterations": 5000, "export_format": "obj" } # Merge with provided parameters if params: default_params.update(params) # Generate 3D model result = self.threestudio_generator.generate_model_from_image( image_path=image_path, **default_params ) logger.info(f"3D model generated: {result['exported_files']}") return result def _convert_to_openscad(self, model_path: str, model_id: str) -> Dict[str, Any]: """ Convert 3D model to OpenSCAD format. Args: model_path: Path to input model model_id: Unique identifier for the model Returns: Dictionary containing paths to generated files """ logger.info(f"Converting model to OpenSCAD: {model_path}") # Generate OpenSCAD code for importing the model scad_code = f"""// Generated OpenSCAD code for model {model_id} // Imported from {os.path.basename(model_path)} // Parameters scale_factor = 1.0; position_x = 0; position_y = 0; position_z = 0; rotation_x = 0; rotation_y = 0; rotation_z = 0; // Import and transform the model translate([position_x, position_y, position_z]) rotate([rotation_x, rotation_y, rotation_z]) scale(scale_factor) import("{model_path}"); """ # Save SCAD code to file scad_file = self.openscad_wrapper.generate_scad(scad_code, model_id) # Generate previews previews = self.openscad_wrapper.generate_multi_angle_previews(scad_file) return { "scad_file": scad_file, "previews": previews, "model_path": model_path } ``` -------------------------------------------------------------------------------- /test_remote_cuda_mvs.py: -------------------------------------------------------------------------------- ```python """ Test script for remote CUDA Multi-View Stereo processing. """ import os import sys import logging import unittest from unittest.mock import patch, MagicMock from pathlib import Path # Add the src directory to the path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.remote.cuda_mvs_client import CUDAMVSClient from src.remote.connection_manager import CUDAMVSConnectionManager from src.config import REMOTE_CUDA_MVS # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TestRemoteCUDAMVS(unittest.TestCase): """ Test cases for remote CUDA Multi-View Stereo processing. """ def setUp(self): """ Set up test environment. """ # Create test output directories self.test_output_dir = "output/test_remote_cuda_mvs" os.makedirs(self.test_output_dir, exist_ok=True) # Mock API key self.api_key = "test_api_key" # Create the client with the mock API key self.client = CUDAMVSClient( api_key=self.api_key, output_dir=self.test_output_dir ) # Create the connection manager with the mock API key self.connection_manager = CUDAMVSConnectionManager( api_key=self.api_key, discovery_port=REMOTE_CUDA_MVS["DISCOVERY_PORT"], use_lan_discovery=True ) @patch('src.remote.cuda_mvs_client.requests.post') def test_upload_images(self, mock_post): """ Test uploading images to a remote CUDA MVS server. """ # Mock response mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { "job_id": "test_job_123", "status": "uploaded", "message": "Images uploaded successfully" } mock_post.return_value = mock_response # Test parameters server_url = "http://test-server:8765" image_paths = [ os.path.join(self.test_output_dir, "test_image_1.png"), os.path.join(self.test_output_dir, "test_image_2.png") ] # Create test images for path in image_paths: with open(path, "w") as f: f.write("test image data") # Call the method result = self.client.upload_images(server_url, image_paths) # Verify the result self.assertIsNotNone(result) self.assertEqual(result["job_id"], "test_job_123") self.assertEqual(result["status"], "uploaded") # Verify the API call mock_post.assert_called_once() args, kwargs = mock_post.call_args self.assertTrue(server_url in args[0]) self.assertEqual(kwargs["headers"]["Authorization"], f"Bearer {self.api_key}") @patch('src.remote.cuda_mvs_client.requests.post') def test_process_job(self, mock_post): """ Test processing a job on a remote CUDA MVS server. """ # Mock response mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { "job_id": "test_job_123", "status": "processing", "message": "Job started processing" } mock_post.return_value = mock_response # Test parameters server_url = "http://test-server:8765" job_id = "test_job_123" params = { "quality": "normal", "output_format": "obj" } # Call the method result = self.client.process_job(server_url, job_id, params) # Verify the result self.assertIsNotNone(result) self.assertEqual(result["job_id"], "test_job_123") self.assertEqual(result["status"], "processing") # Verify the API call mock_post.assert_called_once() args, kwargs = mock_post.call_args self.assertTrue(server_url in args[0]) self.assertEqual(kwargs["headers"]["Authorization"], f"Bearer {self.api_key}") self.assertEqual(kwargs["json"]["job_id"], job_id) self.assertEqual(kwargs["json"]["params"]["quality"], "normal") @patch('src.remote.cuda_mvs_client.requests.get') def test_get_job_status(self, mock_get): """ Test getting the status of a job on a remote CUDA MVS server. """ # Mock response mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { "job_id": "test_job_123", "status": "completed", "progress": 100, "message": "Job completed successfully" } mock_get.return_value = mock_response # Test parameters server_url = "http://test-server:8765" job_id = "test_job_123" # Call the method result = self.client.get_job_status(server_url, job_id) # Verify the result self.assertIsNotNone(result) self.assertEqual(result["job_id"], "test_job_123") self.assertEqual(result["status"], "completed") self.assertEqual(result["progress"], 100) # Verify the API call mock_get.assert_called_once() args, kwargs = mock_get.call_args self.assertTrue(server_url in args[0]) self.assertEqual(kwargs["headers"]["Authorization"], f"Bearer {self.api_key}") @patch('src.remote.cuda_mvs_client.requests.get') def test_download_model(self, mock_get): """ Test downloading a model from a remote CUDA MVS server. """ # Mock response mock_response = MagicMock() mock_response.status_code = 200 mock_response.content = b"test model data" mock_get.return_value = mock_response # Test parameters server_url = "http://test-server:8765" job_id = "test_job_123" output_dir = os.path.join(self.test_output_dir, "models") os.makedirs(output_dir, exist_ok=True) # Call the method result = self.client.download_model(server_url, job_id, output_dir) # Verify the result self.assertIsNotNone(result) self.assertTrue("model_path" in result) self.assertTrue(os.path.exists(result["model_path"])) # Verify the API call mock_get.assert_called() args, kwargs = mock_get.call_args_list[0] self.assertTrue(server_url in args[0]) self.assertEqual(kwargs["headers"]["Authorization"], f"Bearer {self.api_key}") @patch('src.remote.connection_manager.zeroconf.Zeroconf') def test_discover_servers(self, mock_zeroconf): """ Test discovering CUDA MVS servers on the network. """ # Mock Zeroconf mock_zeroconf_instance = MagicMock() mock_zeroconf.return_value = mock_zeroconf_instance # Mock ServiceBrowser with patch('src.remote.connection_manager.ServiceBrowser') as mock_browser: # Set up the connection manager to discover servers self.connection_manager.discover_servers() # Verify Zeroconf was initialized mock_zeroconf.assert_called_once() # Verify ServiceBrowser was initialized mock_browser.assert_called_once() args, kwargs = mock_browser.call_args self.assertEqual(args[0], mock_zeroconf_instance) self.assertEqual(args[1], "_cudamvs._tcp.local.") @patch('src.remote.connection_manager.CUDAMVSClient') def test_upload_images_with_connection_manager(self, mock_client_class): """ Test uploading images using the connection manager. """ # Mock client mock_client = MagicMock() mock_client_class.return_value = mock_client # Mock upload_images method mock_client.upload_images.return_value = { "job_id": "test_job_123", "status": "uploaded", "message": "Images uploaded successfully" } # Add a mock server self.connection_manager.servers = { "test_server": { "id": "test_server", "name": "Test Server", "url": "http://test-server:8765", "status": "online" } } # Test parameters server_id = "test_server" image_paths = [ os.path.join(self.test_output_dir, "test_image_1.png"), os.path.join(self.test_output_dir, "test_image_2.png") ] # Create test images for path in image_paths: with open(path, "w") as f: f.write("test image data") # Call the method result = self.connection_manager.upload_images(server_id, image_paths) # Verify the result self.assertIsNotNone(result) self.assertEqual(result["job_id"], "test_job_123") self.assertEqual(result["status"], "uploaded") # Verify the client method was called mock_client.upload_images.assert_called_once() args, kwargs = mock_client.upload_images.call_args self.assertEqual(args[0], "http://test-server:8765") self.assertEqual(args[1], image_paths) def tearDown(self): """ Clean up after tests. """ # Clean up test output directory import shutil if os.path.exists(self.test_output_dir): shutil.rmtree(self.test_output_dir) if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /src/utils/cad_exporter.py: -------------------------------------------------------------------------------- ```python import os import logging import subprocess from typing import Dict, Any, Optional, Tuple, List logger = logging.getLogger(__name__) class CADExporter: """ Exports OpenSCAD models to various CAD formats that preserve parametric properties. """ def __init__(self, openscad_path: str = "openscad"): """ Initialize the CAD exporter. Args: openscad_path: Path to the OpenSCAD executable """ self.openscad_path = openscad_path # Supported export formats self.supported_formats = { "csg": "OpenSCAD CSG format (preserves all parametric properties)", "amf": "Additive Manufacturing File Format (preserves some metadata)", "3mf": "3D Manufacturing Format (modern replacement for STL with metadata)", "scad": "OpenSCAD source code (fully parametric)", "dxf": "Drawing Exchange Format (for 2D designs)", "svg": "Scalable Vector Graphics (for 2D designs)" } def export_model(self, scad_file: str, output_format: str = "csg", parameters: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None) -> Tuple[bool, str, Optional[str]]: """ Export an OpenSCAD model to the specified format. Args: scad_file: Path to the SCAD file output_format: Format to export to (csg, amf, 3mf, etc.) parameters: Optional parameters to override in the SCAD file metadata: Optional metadata to include in the export Returns: Tuple of (success, output_file_path, error_message) """ if not os.path.exists(scad_file): return False, "", f"SCAD file not found: {scad_file}" # Create output file path output_dir = os.path.dirname(scad_file) model_id = os.path.basename(scad_file).split('.')[0] # Special case for SCAD format - just copy the file with parameters embedded if output_format.lower() == "scad" and parameters: return self._export_parametric_scad(scad_file, parameters, metadata) # For native OpenSCAD formats output_file = os.path.join(output_dir, f"{model_id}.{output_format.lower()}") # Build command cmd = [self.openscad_path, "-o", output_file] # Add parameters if provided if parameters: for key, value in parameters.items(): cmd.extend(["-D", f"{key}={value}"]) # Add input file cmd.append(scad_file) try: # Run OpenSCAD result = subprocess.run(cmd, check=True, capture_output=True, text=True) # Check if file was created if os.path.exists(output_file) and os.path.getsize(output_file) > 0: logger.info(f"Exported model to {output_format}: {output_file}") # Add metadata if supported and provided if metadata and output_format.lower() in ["amf", "3mf"]: self._add_metadata_to_file(output_file, metadata, output_format) return True, output_file, None else: error_msg = f"Failed to export model to {output_format}" logger.error(error_msg) logger.error(f"OpenSCAD output: {result.stdout}") logger.error(f"OpenSCAD error: {result.stderr}") return False, "", error_msg except subprocess.CalledProcessError as e: error_msg = f"Error exporting model to {output_format}: {e.stderr}" logger.error(error_msg) return False, "", error_msg except Exception as e: error_msg = f"Error exporting model to {output_format}: {str(e)}" logger.error(error_msg) return False, "", error_msg def _export_parametric_scad(self, scad_file: str, parameters: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None) -> Tuple[bool, str, Optional[str]]: """ Create a new SCAD file with parameters embedded as variables. Args: scad_file: Path to the original SCAD file parameters: Parameters to embed in the SCAD file metadata: Optional metadata to include as comments Returns: Tuple of (success, output_file_path, error_message) """ try: # Read the original SCAD file with open(scad_file, 'r') as f: content = f.read() # Create output file path output_dir = os.path.dirname(scad_file) model_id = os.path.basename(scad_file).split('.')[0] output_file = os.path.join(output_dir, f"{model_id}_parametric.scad") # Create parameter declarations param_declarations = [] for key, value in parameters.items(): if isinstance(value, str): param_declarations.append(f'{key} = "{value}";') else: param_declarations.append(f'{key} = {value};') # Create metadata comments metadata_comments = [] if metadata: metadata_comments.append("// Metadata:") for key, value in metadata.items(): metadata_comments.append(f"// {key}: {value}") # Combine everything new_content = "// Parametric model generated by OpenSCAD MCP Server\n" new_content += "\n".join(metadata_comments) + "\n\n" if metadata_comments else "\n" new_content += "// Parameters:\n" new_content += "\n".join(param_declarations) + "\n\n" new_content += content # Write to the new file with open(output_file, 'w') as f: f.write(new_content) logger.info(f"Exported parametric SCAD file: {output_file}") return True, output_file, None except Exception as e: error_msg = f"Error creating parametric SCAD file: {str(e)}" logger.error(error_msg) return False, "", error_msg def _add_metadata_to_file(self, file_path: str, metadata: Dict[str, Any], format_type: str) -> None: """ Add metadata to supported file formats. Args: file_path: Path to the file metadata: Metadata to add format_type: File format """ if format_type.lower() == "amf": self._add_metadata_to_amf(file_path, metadata) elif format_type.lower() == "3mf": self._add_metadata_to_3mf(file_path, metadata) def _add_metadata_to_amf(self, file_path: str, metadata: Dict[str, Any]) -> None: """Add metadata to AMF file.""" try: import xml.etree.ElementTree as ET # Parse the AMF file tree = ET.parse(file_path) root = tree.getroot() # Find or create metadata element metadata_elem = root.find("metadata") if metadata_elem is None: metadata_elem = ET.SubElement(root, "metadata") # Add metadata for key, value in metadata.items(): meta = ET.SubElement(metadata_elem, "meta", name=key) meta.text = str(value) # Write back to file tree.write(file_path) logger.info(f"Added metadata to AMF file: {file_path}") except Exception as e: logger.error(f"Error adding metadata to AMF file: {str(e)}") def _add_metadata_to_3mf(self, file_path: str, metadata: Dict[str, Any]) -> None: """Add metadata to 3MF file.""" try: import zipfile import xml.etree.ElementTree as ET # 3MF files are ZIP archives with zipfile.ZipFile(file_path, 'a') as z: # Check if metadata file exists metadata_path = "Metadata/model_metadata.xml" try: z.getinfo(metadata_path) # Extract existing metadata with z.open(metadata_path) as f: tree = ET.parse(f) root = tree.getroot() except KeyError: # Create new metadata file root = ET.Element("metadata") tree = ET.ElementTree(root) # Add metadata for key, value in metadata.items(): meta = ET.SubElement(root, "meta", name=key) meta.text = str(value) # Write metadata to a temporary file temp_path = file_path + ".metadata.tmp" tree.write(temp_path) # Add to ZIP z.write(temp_path, metadata_path) # Remove temporary file os.remove(temp_path) logger.info(f"Added metadata to 3MF file: {file_path}") except Exception as e: logger.error(f"Error adding metadata to 3MF file: {str(e)}") def get_supported_formats(self) -> List[str]: """Get list of supported export formats.""" return list(self.supported_formats.keys()) def get_format_description(self, format_name: str) -> str: """Get description of a format.""" return self.supported_formats.get(format_name.lower(), "Unknown format") ``` -------------------------------------------------------------------------------- /src/ai/ai_service.py: -------------------------------------------------------------------------------- ```python import os import logging import re from typing import Dict, Any, Optional logger = logging.getLogger(__name__) class AIService: """ Service for AI-driven OpenSCAD code generation. Translates natural language descriptions into OpenSCAD code. """ def __init__(self, templates_dir: str, model_config: Optional[Dict[str, Any]] = None): """ Initialize the AI service. Args: templates_dir: Directory containing OpenSCAD templates model_config: Optional configuration for the AI model """ self.templates_dir = templates_dir self.model_config = model_config or {} # Load templates self.templates = self._load_templates() logger.info(f"Initialized AI service with {len(self.templates)} templates") def generate_openscad_code(self, context: Dict[str, Any]) -> str: """ Generate OpenSCAD code from natural language description. Args: context: Dictionary containing: - description: Natural language description - parameters: Dictionary of parameters - templates_dir: Directory containing templates Returns: Generated OpenSCAD code """ description = context.get("description", "") parameters = context.get("parameters", {}) logger.info(f"Generating OpenSCAD code for: {description}") # Parse the description to identify key components components = self._parse_description(description) # Generate code based on identified components code = self._generate_code_from_components(components, parameters) return code def _load_templates(self) -> Dict[str, str]: """Load OpenSCAD code templates from the templates directory.""" templates = {} # Check if templates directory exists if not os.path.exists(self.templates_dir): logger.warning(f"Templates directory not found: {self.templates_dir}") return templates # Load all .scad files in the templates directory for filename in os.listdir(self.templates_dir): if filename.endswith(".scad"): template_name = os.path.splitext(filename)[0] template_path = os.path.join(self.templates_dir, filename) try: with open(template_path, 'r') as f: templates[template_name] = f.read() except Exception as e: logger.error(f"Error loading template {template_path}: {e}") return templates def _parse_description(self, description: str) -> Dict[str, Any]: """ Parse a natural language description to identify key components. Args: description: Natural language description of the model Returns: Dictionary of identified components """ components = { "primary_shape": None, "operations": [], "features": [], "modifiers": [] } # Identify primary shape shape_patterns = { "cube": r'\b(cube|box|rectangular|block)\b', "sphere": r'\b(sphere|ball|round|circular)\b', "cylinder": r'\b(cylinder|tube|pipe|rod)\b', "cone": r'\b(cone|pyramid|tapered)\b', "torus": r'\b(torus|donut|ring)\b' } for shape, pattern in shape_patterns.items(): if re.search(pattern, description, re.IGNORECASE): components["primary_shape"] = shape break # Identify operations operation_patterns = { "union": r'\b(combine|join|merge|add)\b', "difference": r'\b(subtract|remove|cut|hole|hollow)\b', "intersection": r'\b(intersect|common|shared)\b' } for operation, pattern in operation_patterns.items(): if re.search(pattern, description, re.IGNORECASE): components["operations"].append(operation) # Identify features feature_patterns = { "rounded_corners": r'\b(rounded corners|fillets|chamfer)\b', "holes": r'\b(holes|perforations|openings)\b', "text": r'\b(text|label|inscription)\b', "pattern": r'\b(pattern|array|grid|repeat)\b' } for feature, pattern in feature_patterns.items(): if re.search(pattern, description, re.IGNORECASE): components["features"].append(feature) # Identify modifiers modifier_patterns = { "scale": r'\b(scale|resize|proportion)\b', "rotate": r'\b(rotate|turn|spin|angle)\b', "translate": r'\b(move|shift|position|place)\b', "mirror": r'\b(mirror|reflect|flip)\b' } for modifier, pattern in modifier_patterns.items(): if re.search(pattern, description, re.IGNORECASE): components["modifiers"].append(modifier) logger.info(f"Parsed components: {components}") return components def _generate_code_from_components(self, components: Dict[str, Any], parameters: Dict[str, Any]) -> str: """ Generate OpenSCAD code based on identified components. Args: components: Dictionary of identified components parameters: Dictionary of parameters Returns: Generated OpenSCAD code """ code = [] # Add header code.append("// AI-generated OpenSCAD code") code.append("// Generated from natural language description") code.append("") # Add parameter declarations code.append("// Parameters") for param, value in parameters.items(): if isinstance(value, str) and not (value.lower() == 'true' or value.lower() == 'false'): code.append(f'{param} = "{value}";') else: code.append(f"{param} = {value};") code.append("") # Generate code for primary shape primary_shape = components.get("primary_shape") if not primary_shape: primary_shape = "cube" # Default to cube if no shape is identified # Start with operations if any operations = components.get("operations", []) if operations: for operation in operations: code.append(f"{operation}() {{") code.append(" // Primary shape") # Add modifiers if any modifiers = components.get("modifiers", []) indent = " " if operations else "" if modifiers: for modifier in modifiers: if modifier == "scale": scale_value = parameters.get("scale", 1) code.append(f"{indent}scale([{scale_value}, {scale_value}, {scale_value}])") elif modifier == "rotate": angle = parameters.get("angle", 0) code.append(f"{indent}rotate([0, 0, {angle}])") elif modifier == "translate": x = parameters.get("x", 0) y = parameters.get("y", 0) z = parameters.get("z", 0) code.append(f"{indent}translate([{x}, {y}, {z}])") elif modifier == "mirror": code.append(f"{indent}mirror([0, 0, 1])") # Add the primary shape if primary_shape == "cube": width = parameters.get("width", 10) depth = parameters.get("depth", 10) height = parameters.get("height", 10) center = parameters.get("center", "true") code.append(f"{indent}cube([{width}, {depth}, {height}], center={center});") elif primary_shape == "sphere": radius = parameters.get("radius", 10) segments = parameters.get("segments", 32) code.append(f"{indent}sphere(r={radius}, $fn={segments});") elif primary_shape == "cylinder": radius = parameters.get("radius", 10) height = parameters.get("height", 20) center = parameters.get("center", "true") segments = parameters.get("segments", 32) code.append(f"{indent}cylinder(h={height}, r={radius}, center={center}, $fn={segments});") elif primary_shape == "cone": base_radius = parameters.get("base_radius", 10) height = parameters.get("height", 20) center = parameters.get("center", "true") segments = parameters.get("segments", 32) code.append(f"{indent}cylinder(h={height}, r1={base_radius}, r2=0, center={center}, $fn={segments});") elif primary_shape == "torus": major_radius = parameters.get("major_radius", 20) minor_radius = parameters.get("minor_radius", 5) segments = parameters.get("segments", 32) code.append(f"{indent}rotate_extrude($fn={segments})") code.append(f"{indent} translate([{major_radius}, 0, 0])") code.append(f"{indent} circle(r={minor_radius}, $fn={segments});") # Add features if any features = components.get("features", []) if features and "holes" in features: code.append("") code.append(f"{indent}// Add holes") code.append(f"{indent}difference() {{") code.append(f"{indent} children(0);") # Reference the primary shape # Add a sample hole hole_radius = parameters.get("hole_radius", 2) code.append(f"{indent} translate([0, 0, 0])") code.append(f"{indent} cylinder(h=100, r={hole_radius}, center=true, $fn=32);") code.append(f"{indent}}}") # Close operations if any if operations: code.append("}") return "\n".join(code) ```