# Directory Structure ``` ├── .gitignore ├── addon.py ├── bc3_writer.py ├── dockerfile ├── LICENSE.md ├── pyproject.toml ├── README.md ├── resources │ ├── bc3_helper_files │ │ ├── element_categories.json │ │ ├── precios_unitarios.json │ │ ├── spatial_labels_en.json │ │ ├── spatial_labels_es.json │ │ └── unit_prices.json │ └── table_of_contents.json ├── tools.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` *.md __pycache__ exports ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Bonsai-mcp - Model Context Protocol Integration for IFC through IfcOpenShell and Blender Bonsai-mcp is a fork of [BlenderMCP](https://github.com/ahujasid/blender-mcp) that extends the original functionality with dedicated support for IFC (Industry Foundation Classes) models through Bonsai. This integration is a platform to let LLMs read and modify IFC files. ## Features - **IFC-specific functionality**: Query IFC models, analyze spatial structures, examine building elements and extract quantities - **Eleven IFC tools included**: Inspect project info, list entities, examine properties, explore spatial structure, analyze relationships and more - **Sequential Thinking**: Includes the sequential thinking tool from [modelcontextprotocol/servers](https://github.com/modelcontextprotocol/servers/tree/main/src/sequentialthinking) for structured problem solving - **Execute Code tool from the original BlenderMCP implementation**: Create and modify objects, apply materials, and execute Python code in Blender ## Components The system consists of two main components: 1. **Blender Addon (`addon.py`)**: A Blender addon that creates a socket server within Blender to receive and execute commands, including IFC-specific operations 2. **MCP Server (`tools.py`)**: A Python server that implements the Model Context Protocol and connects to the Blender addon ## Installation - Through MCP Client Settings ### Prerequisites - Blender 4.0 or newer - Python 3.12 or newer - uv package manager - Bonsai BIM addon for Blender (for IFC functionality) **Installing uv:** **Mac:** ```bash brew install uv ``` **Windows:** ```bash powershell -c "irm https://astral.sh/uv/install.ps1 | iex" set Path=C:\Users\[username]\.local\bin;%Path% ``` For other platforms, see the [uv installation guide](https://docs.astral.sh/uv/getting-started/installation/). ### Clone the repository ```bash git clone https://github.com/JotaDeRodriguez/Bonsai_mcp ``` ### Claude for Desktop Integration Edit your `claude_desktop_config.json` file (Claude > Settings > Developer > Edit Config) to include: ```json { "mcpServers": { "Bonsai-mcp": { "command": "uv", "args": [ "--directory", "\\your\\path\\to\\Bonsai_mcp", "run", "tools.py" ] } } } ``` ## Installation via Docker The repository comes with a Dockerfile that makes deployment simple and consistent across different environments. ## Quick Start ```bash # Clone the repository git clone https://github.com/JotaDeRodriguez/Bonsai_mcp cd Bonsai_mcp # Build the Docker image docker build -t bonsai_mcp . # Run the container docker run -p 8000:8000 --name bonsai_mcp bonsai_mcp ``` Once running, the container will expose the MCP tools as REST/OpenAPI APIs at `http://localhost:8000`. - To verify youtr installation, open your browser and navigate to - `http://localhost:8000/docs` - You'll see the Swagger UI with all available endpoints - Test an endpoint by clicking on it, then click "Try it out" and "Execute" ### Connecting to Open WebUI or Other API Clients To connect this API to Open WebUI: 1. In Open WebUI, go to Settings > Manage Tool Servers 2. Add a new connection with: - URL: `http://localhost:8000` - Path to OpenAPI spec: `/openapi.json` - Authentication: None (unless configured otherwise) ### Environment Variables The Docker container accepts several environment variables to customize its behavior: ```bash # Example with custom settings docker run -p 8000:8000 \ -e BLENDER_HOST=host.docker.internal \ -e BLENDER_PORT=9876 \ -e MCP_HOST=0.0.0.0 \ -e MCP_PORT=8000 \ --name bonsai_mcp bonsai_mcp ``` ## Installing the Blender Addon 1. Download the `addon.py` file from this repo 2. Open Blender 3. Go to Edit > Preferences > Add-ons 4. Click "Install..." and select the `addon.py` file 5. Enable the addon by checking the box next to "Interface: Blender MCP - IFC" ## Usage ### Starting the Connection 1. In Blender, go to the 3D View sidebar (press N if not visible) 2. Find the "Blender MCP - IFC" tab 3. Click "Connect to Claude" 4. Make sure the MCP server is running ### Using with Claude Once connected, you'll see a hammer icon in Claude's interface with tools for the Blender MCP IFC integration. ## IFC Tools This repo includes multiple IFC-specific tools that enable comprehensive querying and manipulation of IFC models: **get_ifc_project_info**: Retrieves basic information about the IFC project, including name, description, and counts of different entity types. Example: "What is the basic information about this IFC project?" **list_ifc_entities**: Lists IFC entities of a specific type (walls, doors, spaces, etc.) with options to limit results and filter by selection. Example: "List all the walls in this IFC model" or "Show me the windows in this building" **get_ifc_properties**: Retrieves all properties of a specific IFC entity by its GlobalId or from currently selected objects. Example: "What are the properties of this wall with ID 1Dvrgv7Tf5IfTEapMkwDQY?" **get_ifc_spatial_structure**: Gets the spatial hierarchy of the IFC model (site, building, storeys, spaces). Example: "Show me the spatial structure of this building" **get_ifc_relationships**: Retrieves all relationships for a specific IFC entity. Example: "What are the relationships of the entrance door?" **get_selected_ifc_entities**: Gets information about IFC entities corresponding to objects currently selected in the Blender UI. Example: "Tell me about the elements I've selected in Blender" **get_user_view**: Captures the current Blender viewport as an image, allowing visualization of the model from the user's perspective. Example: "Show me what the user is currently seeing in Blender" **export_ifc_data**: Exports IFC data to a structured JSON or CSV file, with options to filter by entity type or building level. Example: "Export all wall data to a CSV file" **place_ifc_object**: Creates and positions an IFC element in the model at specified coordinates with optional rotation. Example: "Place a door at coordinates X:10, Y:5, Z:0 with 90 degrees rotation" **get_ifc_quantities**: Calculate and get quantities (m2, m3, etc.) for IFC elements, with options to filter by entity type or selected ones. Example: "Give me the area of all the walls in the building using the tool get_ifc_quantities" **get_ifc_total_structure**: Retrieves the complete hierarchical structure of the IFC model including spatial elements (Project, Site, Building, Storeys) and all building elements within each spatial container. This comprehensive view combines spatial hierarchy with building elements, essential for generating complete reports and budgets. Example: "Show me the complete structure of this IFC model including all building elements organized by floor" **export_drawing_png**: Exports 2D and 3D drawings as high-resolution PNG images with customizable resolution and view parameters. Creates orthographic plan views from above at specified height offsets. Example: "Generate a floor plan PNG for the ground floor at 1920x1080 resolution" **get_ifc_georeferencing_info**: Retrieves comprehensive georeferencing information from IFC files including coordinate reference systems (CRS), map conversions, world coordinate systems, true north direction, and site geographic coordinates. Example: "What georeferencing information is available in this IFC model?" **georeference_ifc_model**: Creates or updates georeferencing information in IFC models, allowing you to set coordinate reference systems using EPSG codes or custom CRS definitions, establish map conversions with eastings/northings coordinates, and configure site geographic positioning. Example: "Georeference this IFC model using EPSG:4326 with coordinates at latitude 40.7589, longitude -73.9851" **export_bc3_budget**: Exports a BC3 budget file (FIEBDC-3/2016 format) based on the IFC model loaded in Blender. This tool creates a complete construction budget by extracting the IFC spatial structure, grouping building elements by type and category (structure, masonry, slabs, carpentry, installations, furniture), assigning unit prices from a comprehensive database, and generating detailed measurements. Supports multi-language output (Spanish/English) with proper encoding for international characters. The BC3 format is the Spanish standard for construction budgets and cost estimation. Example: "Generate a BC3 budget file in Spanish for this building model" ### Features - **Automatic element categorization**: Building elements are automatically classified into categories: - **ESTR**: Structural elements (beams, columns, footings, piles, ramps, stairs) - **ALB**: Masonry (walls) - **FORG**: Slabs and roofs - **CARP**: Carpentry (doors, windows) - **INST**: Installations (pipes, fittings, terminals, railings) - **MOB**: Furniture - **Accurate measurements**: - Walls measured by NetSideArea (accounts for openings like doors and windows) - Slabs and roofs measured by GrossVolume - Beams, columns, and piles measured by length (meters) - Doors, windows, and furniture counted as units - **Multi-language support**: Generate budgets in Spanish or English with proper character encoding (windows-1252) - **Hierarchical structure**: Budget chapters follow the IFC spatial hierarchy (Project → Site → Building → Storey) - **Unit price database**: Includes comprehensive unit prices for common construction elements, fully customizable via JSON files - **Sorted measurements**: Elements within each category are sorted alphabetically for easier review ### Configuration Files The BC3 export uses external JSON configuration files located in `resources/bc3_helper_files/`: - `precios_unitarios.json` / `unit_prices.json`: Unit prices per IFC element type - `spatial_labels_es.json` / `spatial_labels_en.json`: Spatial element translations - `element_categories.json`: IFC type to budget category mappings These files can be customized to adapt the budget generation to specific project needs or regional pricing standards. ### Output BC3 files are exported to the `exports/` folder with proper FIEBDC-3/2016 format, including: - Complete hierarchical chapter structure - Detailed measurements for each element - Unit prices and totals - Full compliance with Spanish construction budget standard bc3 ## MCP Resources This integration provides access to structured documentation through MCP resources: **file://table_of_contents.md**: Contains the complete technical report structure template for generating comprehensive building reports. This resource provides a standardized table of contents that can be used as a reference when creating technical documentation from IFC models. ## MCP Prompts The server includes specialized MCP Prompts for automated report generation: **Technical_building_report**: Generates comprehensive technical building reports based on IFC models loaded in Blender. This prompt provides a structured workflow for creating professional architectural documentation in multiple languages (English, Spanish, French, German, Italian, Portuguese). The prompt guides the analysis through systematic data extraction from the IFC model, including spatial structure, quantities, materials, and building systems, culminating in a complete technical report with drawings and 3D visualizations. ## Execute Blender Code Legacy feature from the original MCP implementation. Allows Claude to execute arbitrary Python code in Blender. Use with caution. ## Sequential Thinking Tool This integration includes the Sequential Thinking tool for structured problem-solving and analysis. It facilitates a step-by-step thinking process that can branch, revise, and adapt as understanding deepens - perfect for complex IFC model analysis or planning tasks. Example: "Use sequential thinking to analyze this building's energy efficiency based on the IFC model" ## Example Commands Here are some examples of what you can ask Claude to do with IFC models: - "Analyze this IFC model and tell me how many walls, doors and windows it has" - "Show me the spatial structure of this building model" - "List all spaces in this IFC model and their properties" - "Identify all structural elements in this building" - "What are the relationships between this wall and other elements?" - "Generate a report of the measurements from the IFC model opened in Blender" - "Use sequential thinking to create a maintenance plan for this building based on the IFC model" - "Generate a BC3 budget file in Spanish for the current IFC model" - "Export a construction cost estimate to BC3 format with English descriptions" ## Troubleshooting - **Connection issues**: Make sure the Blender addon server is running, and the MCP server is configured in Claude - **IFC model not loading**: Verify that you have the Bonsai BIM addon installed and that an IFC file is loaded - **Timeout errors**: Try simplifying your requests or breaking them into smaller steps **Docker:** - **"Connection refused" errors**: Make sure Blender is running and the addon is enabled with the server started - **CORS issues**: The API has CORS enabled by default for all origins. If you encounter issues, check your client's CORS settings - **Performance concerns**: For large IFC models, the API responses might be slower. Consider adjusting timeouts in your client ## Technical Details The IFC integration uses the Bonsai BIM module to access ifcopenshell functionality within Blender. The communication follows the same JSON-based protocol over TCP sockets as the original BlenderMCP. ## Limitations & Security Considerations - The `execute_blender_code` tool from the original project is still available, allowing running arbitrary Python code in Blender. Use with caution and always save your work. - Complex IFC models may require breaking down operations into smaller steps. - IFC query performance depends on model size and complexity. - Get User View tool returns a base64 encoded image. Please ensure the client supports it. ## Contributions This MIT licensed repo is open to be forked, modified and used in any way. I'm open to ideas and collaborations, so don't hesitate to get in contact with me for contributions. ## Credits - Original BlenderMCP by [Siddharth Ahuja](https://github.com/ahujasid/blender-mcp) - Sequential Thinking tool from [modelcontextprotocol/servers](https://github.com/modelcontextprotocol/servers/tree/main/src/sequentialthinking) - IFC integration built upon the Bonsai BIM addon for Blender ## TO DO Integration and testing with more MCP Clients ``` -------------------------------------------------------------------------------- /LICENSE.md: -------------------------------------------------------------------------------- ```markdown MIT License Copyright (c) 2025 Juan Rodriguez Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ``` -------------------------------------------------------------------------------- /resources/bc3_helper_files/spatial_labels_es.json: -------------------------------------------------------------------------------- ```json { "spatial_labels": { "IfcProject": "Proyecto", "IfcSite": "Parcela", "IfcBuilding": "Edificio", "IfcBuildingStorey": "Planta", "IfcBridge": "Puente", "IfcBridgePart": "Subestructura" } } ``` -------------------------------------------------------------------------------- /resources/bc3_helper_files/spatial_labels_en.json: -------------------------------------------------------------------------------- ```json { "spatial_labels": { "IfcProject": "Project", "IfcSite": "Site", "IfcBuilding": "Building", "IfcBuildingStorey": "Building Storey", "IfcBridge": "Bridge", "IfcBridgePart": "Substructure" } } ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "Bonsai_mcp" version = "0.1.0" description = "A minimal MCP Server to illusrate the interaction between Bonsai BIM (Blender) and Claude." readme = "README.md" requires-python = ">=3.10" dependencies = [ "httpx>=0.28.1", "mcp[cli]>=1.4.1", "pillow>=11.3.0", ] authors = [ {name = "JotaDeRodriguez"} ] license = {text = "MIT"} [project.urls] "Homepage" = "https://github.com/JotaDeRodriguez/Bonsai_mcp" "Bug Tracker" = "https://github.com/JotaDeRodriguez/Bonsai_mcp/issues" ``` -------------------------------------------------------------------------------- /resources/bc3_helper_files/element_categories.json: -------------------------------------------------------------------------------- ```json { "element_categories": { "ESTR": [ "IfcBeam", "IfcColumn", "IfcFooting", "IfcPile", "IfcRamp", "IfcStair" ], "ALB": [ "IfcWall", "IfcWallStandardCase" ], "FORG": [ "IfcSlab", "IfcRoof", "IfcCovering" ], "CARP": [ "IfcDoor", "IfcWindow" ], "INST": [ "IfcFlowSegment", "IfcFlowFitting", "IfcFlowTerminal", "IfcDistributionElement", "IfcRailing" ], "MOB": [ "IfcFurnishingElement", "IfcFurniture" ] } } ``` -------------------------------------------------------------------------------- /dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM python:3.11-slim WORKDIR /app # Copy your application files COPY tools.py /app/ # Install dependencies RUN pip install mcpo uv # Set environment variables with defaults ENV MCP_HOST="0.0.0.0" ENV MCP_PORT=8000 ENV BLENDER_HOST="host.docker.internal" ENV BLENDER_PORT=9876 # Expose the port EXPOSE ${MCP_PORT} # Create a startup script that will modify the tools.py file before running RUN echo '#!/bin/bash\n\ # Replace localhost with the BLENDER_HOST environment variable in tools.py\n\ sed -i "s/host=\"localhost\"/host=\"$BLENDER_HOST\"/g" tools.py\n\ sed -i "s/host='\''localhost'\''/host='\''$BLENDER_HOST'\''/g" tools.py\n\ # Print the modification for debugging\n\ echo "Modified Blender host to: $BLENDER_HOST"\n\ # Run the MCPO server\n\ uvx mcpo --host $MCP_HOST --port $MCP_PORT -- python tools.py\n\ ' > /app/start.sh && chmod +x /app/start.sh # Run the startup script CMD ["/app/start.sh"] ``` -------------------------------------------------------------------------------- /resources/table_of_contents.json: -------------------------------------------------------------------------------- ```json { "title": "Technical Report – Basic Building Project", "children": { "1": { "title": "Introduction", "children": { "1.1": "Project objective", "1.2": "Documentation scope", "1.3": "Background and justification" } }, "2": { "title": "General Building Data", "children": { "2.1": "Location and site", "2.2": "Building intended use", "2.3": "Built and usable area (per floor)", "2.4": "Capacity (users, dwellings, premises, etc.)", "2.5": "Urban planning and technical regulations applicable" } }, "3": { "title": "Environmental Conditions", "children": { "3.1": "Site description", "3.2": "Urban environment and access", "3.3": "Topographic and climatic conditions" } }, "4": { "title": "Architectural Solution", "children": { "4.1": "General design criteria", "4.2": "Functional distribution and space organization", "4.3": "Facade and volumetric description", "4.4": "Accessibility and evacuation" } }, "5": { "title": "Planned Construction Systems", "children": { "5.1": "Structure (type and main material)", "5.2": "Foundation (adopted criteria)", "5.3": "External enclosures", "5.4": "Roofs and carpentry", "5.5": "Main interior finishes" } }, "6": { "title": "General Installations", "children": { "6.1": "Water supply and drainage", "6.2": "Electricity and telecommunications", "6.3": "Air conditioning and ventilation (if applicable)", "6.4": "Fire protection (basic criteria)", "6.5": "Renewable energy (if applicable)" } }, "7": { "title": "Safety and Health", "children": { "7.1": "Main risk identification", "7.2": "Safety criteria adopted" } }, "8": { "title": "Regulatory Compliance", "children": { "8.1": "Urban planning regulations", "8.2": "Habitability and accessibility regulations", "8.3": "Fire safety regulations", "8.4": "Energy efficiency regulations" } }, "9": { "title": "Areas and Area Schedule", "children": { "9.1": "Summary table of built and usable areas", "9.2": "Occupancy and buildability ratios" } }, "10": { "title": "Conclusions", "children": { "10.1": "Justification of the adopted solution", "10.2": "Adaptation to environment and regulations" } }, "11": { "title": "Annexes", "children": { "11.1": "Basic project plans", "11.2": "Complementary documentation (if applicable)" } } } } ``` -------------------------------------------------------------------------------- /resources/bc3_helper_files/unit_prices.json: -------------------------------------------------------------------------------- ```json { "prices": [ { "code": "ESTR001", "ifc_class": "IfcBeam", "description": "Beam structural element", "long_description": "Structural beam element in concrete, steel or wood. Includes formwork, reinforcement and concrete pouring.", "unit": "m", "unit_price": 150.00 }, { "code": "ESTR002", "ifc_class": "IfcColumn", "description": "Column structural element", "long_description": "Structural column element in concrete, steel or composite. Includes formwork, reinforcement, concrete and finishes.", "unit": "m", "unit_price": 180.00 }, { "code": "ESTR003", "ifc_class": "IfcFooting", "description": "Footing structural element", "long_description": "Foundation footing element. Includes excavation, lean concrete, reinforcement, concrete and waterproofing.", "unit": "m3", "unit_price": 120.00 }, { "code": "ESTR004", "ifc_class": "IfcPile", "description": "Pile structural element", "long_description": "Deep foundation pile element. Includes drilling, reinforcement cage and concrete filling.", "unit": "m", "unit_price": 200.00 }, { "code": "ESTR005", "ifc_class": "IfcRamp", "description": "Ramp structural element", "long_description": "Accessibility ramp structure. Includes concrete base, reinforcement, surface finishing and handrails.", "unit": "m2", "unit_price": 140.00 }, { "code": "ESTR006", "ifc_class": "IfcStair", "description": "Stair structural element", "long_description": "Staircase unit including structure, steps, risers, handrails and surface finishes.", "unit": "ud", "unit_price": 160.00 }, { "code": "ALB001", "ifc_class": "IfcWall", "description": "Wall masonry element", "long_description": "Masonry wall construction. Includes brickwork or blockwork, mortar, insulation and plaster finishes on both sides.", "unit": "m2", "unit_price": 85.00 }, { "code": "ALB002", "ifc_class": "IfcWallStandardCase", "description": "WallStandardCase masonry element", "long_description": "Standard masonry wall with standard layers. Includes structural blocks, thermal insulation and interior/exterior finishes.", "unit": "m2", "unit_price": 90.00 }, { "code": "FORG001", "ifc_class": "IfcSlab", "description": "Slab forging element", "long_description": "Concrete slab construction. Includes formwork, reinforcement mesh, concrete pouring and surface finishing.", "unit": "m3", "unit_price": 110.00 }, { "code": "FORG002", "ifc_class": "IfcRoof", "description": "Roof forging element", "long_description": "Roof structure and covering system. Includes structural layer, thermal insulation, waterproofing membrane and protective layer.", "unit": "m2", "unit_price": 130.00 }, { "code": "FORG003", "ifc_class": "IfcCovering", "description": "Covering forging element", "long_description": "Floor or ceiling covering installation. Includes base preparation, adhesive, covering material and finishing joints.", "unit": "m2", "unit_price": 95.00 }, { "code": "CARP001", "ifc_class": "IfcDoor", "description": "Door carpentry element", "long_description": "Door unit including frame, door leaf, hinges, lock, handles and finishes. Installation and adjustment included.", "unit": "ud", "unit_price": 250.00 }, { "code": "CARP002", "ifc_class": "IfcWindow", "description": "Window carpentry element", "long_description": "Window unit with frame, glazing, opening mechanism, weatherstripping and hardware. Installation and sealing included.", "unit": "ud", "unit_price": 300.00 }, { "code": "INST001", "ifc_class": "IfcFlowSegment", "description": "FlowSegment installation element", "long_description": "Pipe or duct segment for fluid distribution systems. Includes material, fittings, supports and installation.", "unit": "m", "unit_price": 75.00 }, { "code": "INST002", "ifc_class": "IfcFlowFitting", "description": "FlowFitting installation element", "long_description": "Pipe or duct fitting element (elbow, tee, reducer, etc.). Includes material, gaskets and installation.", "unit": "ud", "unit_price": 60.00 }, { "code": "INST003", "ifc_class": "IfcFlowTerminal", "description": "FlowTerminal installation element", "long_description": "Terminal device for HVAC or plumbing systems (diffuser, outlet, tap, etc.). Includes device, connections and commissioning.", "unit": "ud", "unit_price": 80.00 }, { "code": "INST004", "ifc_class": "IfcDistributionElement", "description": "DistributionElement installation element", "long_description": "General distribution system element. Includes component, accessories, supports and installation work.", "unit": "ud", "unit_price": 70.00 }, { "code": "INST005", "ifc_class": "IfcRailing", "description": "Railing installation element", "long_description": "Railing or guardrail system. Includes posts, handrail, infill panels, anchors and surface treatment.", "unit": "m", "unit_price": 100.00 }, { "code": "MOB001", "ifc_class": "IfcFurnishingElement", "description": "FurnishingElement furniture element", "long_description": "Built-in furnishing element. Includes manufacturing, finishing, hardware, delivery and installation.", "unit": "ud", "unit_price": 120.00 }, { "code": "MOB002", "ifc_class": "IfcFurniture", "description": "Furniture element", "long_description": "Furniture unit or piece. Includes product, assembly, adjustment and placement in final position.", "unit": "ud", "unit_price": 150.00 } ] } ``` -------------------------------------------------------------------------------- /resources/bc3_helper_files/precios_unitarios.json: -------------------------------------------------------------------------------- ```json { "prices": [ { "code": "ESTR001", "ifc_class": "IfcBeam", "description": "Viga estructural", "long_description": "Elemento de viga estructural en hormigón, acero o madera. Incluye encofrado, armadura y vertido de hormigón.", "unit": "m", "unit_price": 150.00 }, { "code": "ESTR002", "ifc_class": "IfcColumn", "description": "Pilar estructural", "long_description": "Elemento de pilar estructural en hormigón, acero o mixto. Incluye encofrado, armadura, hormigón y acabados.", "unit": "m", "unit_price": 180.00 }, { "code": "ESTR003", "ifc_class": "IfcFooting", "description": "Zapata de cimentación", "long_description": "Elemento de zapata de cimentación. Incluye excavación, hormigón de limpieza, armadura, hormigón e impermeabilización.", "unit": "m3", "unit_price": 120.00 }, { "code": "ESTR004", "ifc_class": "IfcPile", "description": "Pilote de cimentación", "long_description": "Elemento de pilote de cimentación profunda. Incluye perforación, jaula de armado y relleno de hormigón.", "unit": "m", "unit_price": 200.00 }, { "code": "ESTR005", "ifc_class": "IfcRamp", "description": "Rampa de accesibilidad", "long_description": "Estructura de rampa de accesibilidad. Incluye base de hormigón, armadura, acabado superficial y barandillas.", "unit": "m2", "unit_price": 140.00 }, { "code": "ESTR006", "ifc_class": "IfcStair", "description": "Escalera estructural", "long_description": "Unidad de escalera incluyendo estructura, peldaños, tabicas, barandillas y acabados superficiales.", "unit": "ud", "unit_price": 160.00 }, { "code": "ALB001", "ifc_class": "IfcWall", "description": "Muro de albañilería", "long_description": "Construcción de muro de albañilería. Incluye fábrica de ladrillo o bloque, mortero, aislamiento y enfoscado a ambas caras.", "unit": "m2", "unit_price": 85.00 }, { "code": "ALB002", "ifc_class": "IfcWallStandardCase", "description": "Muro estándar de albañilería", "long_description": "Muro de albañilería con capas estándar. Incluye bloques estructurales, aislamiento térmico y acabados interior/exterior.", "unit": "m2", "unit_price": 90.00 }, { "code": "FORG001", "ifc_class": "IfcSlab", "description": "Forjado de hormigón", "long_description": "Construcción de forjado de hormigón. Incluye encofrado, malla de armado, vertido de hormigón y acabado superficial.", "unit": "m3", "unit_price": 110.00 }, { "code": "FORG002", "ifc_class": "IfcRoof", "description": "Cubierta", "long_description": "Sistema de estructura y cobertura de cubierta. Incluye capa estructural, aislamiento térmico, membrana impermeabilizante y capa de protección.", "unit": "m2", "unit_price": 130.00 }, { "code": "FORG003", "ifc_class": "IfcCovering", "description": "Revestimiento", "long_description": "Instalación de revestimiento de suelo o techo. Incluye preparación de base, adhesivo, material de revestimiento y juntas de acabado.", "unit": "m2", "unit_price": 95.00 }, { "code": "CARP001", "ifc_class": "IfcDoor", "description": "Puerta de carpintería", "long_description": "Unidad de puerta incluyendo marco, hoja, bisagras, cerradura, manillas y acabados. Instalación y ajuste incluidos.", "unit": "ud", "unit_price": 250.00 }, { "code": "CARP002", "ifc_class": "IfcWindow", "description": "Ventana de carpintería", "long_description": "Unidad de ventana con marco, acristalamiento, mecanismo de apertura, burletes y herrajes. Instalación y sellado incluidos.", "unit": "ud", "unit_price": 300.00 }, { "code": "INST001", "ifc_class": "IfcFlowSegment", "description": "Tramo de instalación", "long_description": "Segmento de tubería o conducto para sistemas de distribución de fluidos. Incluye material, accesorios, soportes e instalación.", "unit": "m", "unit_price": 75.00 }, { "code": "INST002", "ifc_class": "IfcFlowFitting", "description": "Accesorio de instalación", "long_description": "Elemento de accesorio de tubería o conducto (codo, te, reducción, etc.). Incluye material, juntas e instalación.", "unit": "ud", "unit_price": 60.00 }, { "code": "INST003", "ifc_class": "IfcFlowTerminal", "description": "Terminal de instalación", "long_description": "Dispositivo terminal para sistemas de climatización o fontanería (difusor, salida, grifo, etc.). Incluye dispositivo, conexiones y puesta en marcha.", "unit": "ud", "unit_price": 80.00 }, { "code": "INST004", "ifc_class": "IfcDistributionElement", "description": "Elemento de distribución", "long_description": "Elemento general de sistema de distribución. Incluye componente, accesorios, soportes y trabajos de instalación.", "unit": "ud", "unit_price": 70.00 }, { "code": "INST005", "ifc_class": "IfcRailing", "description": "Barandilla", "long_description": "Sistema de barandilla o guarda. Incluye postes, pasamanos, paneles de relleno, anclajes y tratamiento superficial.", "unit": "m", "unit_price": 100.00 }, { "code": "MOB001", "ifc_class": "IfcFurnishingElement", "description": "Elemento de mobiliario", "long_description": "Elemento de mobiliario fijo. Incluye fabricación, acabados, herrajes, entrega e instalación.", "unit": "ud", "unit_price": 120.00 }, { "code": "MOB002", "ifc_class": "IfcFurniture", "description": "Mueble", "long_description": "Unidad o pieza de mobiliario. Incluye producto, montaje, ajuste y colocación en posición final.", "unit": "ud", "unit_price": 150.00 } ] } ``` -------------------------------------------------------------------------------- /bc3_writer.py: -------------------------------------------------------------------------------- ```python import json from pathlib import Path from typing import Dict, List, Any, Union from datetime import datetime from collections import defaultdict # Base path for BC3 helper files BASE_PATH = Path(__file__).parent / 'resources' / 'bc3_helper_files' class IFC2BC3Converter: """ Converts IFC structures (JSON) to BC3 format for construction budgets. Architecture: - Loads and validates input data (IFC structure, quantities, prices) - Generates chapter hierarchy from IFC spatial structure - Groups building elements into budget items by type - Exports to FIEBDC-3 (BC3) format with windows-1252 encoding Method Groups: 1. Initialization & Configuration Loading 2. Data Parsing & Indexing 3. Code Generation & Formatting 4. IFC Element Classification 5. Quantity & Measurement Extraction 6. BC3 Record Building 7. Core Processing & Conversion Logic 8. Public API Methods """ # Class constants SPATIAL_TYPES = {'IfcProject', 'IfcSite', 'IfcBuilding', 'IfcBuildingStorey', 'IfcBridge', 'IfcBridgePart'} IGNORED_TYPES = {'IfcSpace', 'IfcAnnotation', 'IfcGrid', 'IfcAxis'} def __init__(self, structure_data: Union[str, Dict], quantities_data: Union[str, Dict], language: str = 'es'): """ Initializes the converter with input data. Args: structure_data: JSON string or dict with IFC structure quantities_data: JSON string or dict with IFC quantities language: Language for the budget ('es' or 'en'). Default 'es' """ # Parse input data self.structure_data = self._parse_json_input(structure_data) self.quantities_data = self._parse_json_input(quantities_data) self.quantities_by_id = self._index_quantities() # Configuration self.language = language self.unit_prices = self._load_unit_prices() self.spatial_labels = self._load_spatial_labels() self.element_categories = self._load_element_categories() # Counters using defaultdict for simplification self.chapter_counters = defaultdict(int) self.item_counters = defaultdict(int) # Registry of items and positions self.items_per_chapter = defaultdict(set) self.item_positions = defaultdict(dict) # Global registry of created concepts (to avoid duplicates) self.created_concepts = set() # Invert mapping for O(1) lookup self._ifc_to_category = self._build_ifc_category_map() # Cache for code-to-position conversions self._position_cache = {} # ============================================================================ # 1. INITIALIZATION & CONFIGURATION LOADING # ============================================================================ # Methods that load external configuration from JSON files and build # internal data structures during initialization. def _load_unit_prices(self) -> Dict[str, Dict]: """ Loads unit prices from JSON file based on language. Optimized: Loads all prices at once (more efficient than lazy loading since typically most types are used in an IFC model). Returns: Dict with ifc_class as key and dict {code, description, long_description, unit, price} as value """ filename = 'precios_unitarios.json' if self.language == 'es' else 'unit_prices.json' prices_path = BASE_PATH / filename if not prices_path.exists(): print(f"Warning: Unit prices file not found at {prices_path}") return {} try: with open(prices_path, 'r', encoding='utf-8') as f: data = json.load(f) # Dict comprehension is faster than loop + assignment return { item['ifc_class']: { 'code': item['code'], 'description': item['description'], 'long_description': item['long_description'], 'unit': item['unit'], 'price': item['unit_price'] } for item in data.get('prices', []) } except Exception as e: print(f"Error loading unit prices: {e}") return {} def _load_spatial_labels(self) -> Dict[str, str]: """ Loads spatial element labels from JSON file according to language. Returns: Dict with IFC type as key and translated label as value """ filename = f'spatial_labels_{self.language}.json' labels_path = BASE_PATH / filename if not Path(labels_path).exists(): print(f"Warning: Spatial labels file not found at {labels_path}") return {} try: with open(labels_path, 'r', encoding='utf-8') as f: data = json.load(f) return data.get('spatial_labels', {}) except Exception as e: print(f"Error loading spatial labels: {e}") return {} def _load_element_categories(self) -> Dict[str, set]: """ Loads element categories from JSON file. Returns: Dict with category code as key and set of IFC types as value """ categories_path = BASE_PATH / 'element_categories.json' if not Path(categories_path).exists(): print(f"Warning: Element categories file not found at {categories_path}") return {} try: with open(categories_path, 'r', encoding='utf-8') as f: data = json.load(f) # Convert lists to sets for O(1) membership testing return { category: set(ifc_types) for category, ifc_types in data.get('element_categories', {}).items() } except Exception as e: print(f"Error loading element categories: {e}") return {} def _build_ifc_category_map(self) -> Dict[str, str]: """Builds reverse mapping of IFC type -> category for O(1) lookup.""" return { ifc_type: category for category, types in self.element_categories.items() for ifc_type in types } # ============================================================================ # 2. DATA PARSING & INDEXING # ============================================================================ # Methods that parse and index input data for efficient access during # conversion process. @staticmethod def _parse_json_input(data: Union[str, Dict]) -> Dict: """Parses input that can be JSON string or dict.""" return json.loads(data) if isinstance(data, str) else data def _index_quantities(self) -> Dict[str, Dict]: """Indexes quantities by element ID for O(1) access.""" elements = self.quantities_data.get('elements', []) return {elem['id']: elem for elem in elements} # ============================================================================ # 3. CODE GENERATION & FORMATTING # ============================================================================ # Methods that generate hierarchical codes, format positions, and escape # text for BC3 format compliance. def _generate_chapter_code(self, parent_code: str = '') -> str: """Generates a hierarchical chapter code.""" # Root level uses sequential numbering: 01#, 02#, 03#... if parent_code == 'R_A_I_Z##': self.chapter_counters['root'] += 1 return f'{self.chapter_counters["root"]:02d}#' # Sub-levels use hierarchical notation: 01.01#, 01.01.01#... base_code = parent_code.rstrip('#') self.chapter_counters[base_code] += 1 return f'{base_code}.{self.chapter_counters[base_code]:02d}#' def _generate_item_code(self, category: str, chapter_code: str = None) -> str: """Generates a unique code for a budget item globally (not per chapter).""" # Use only category as key to ensure global uniqueness self.item_counters[category] += 1 return f"{category}{self.item_counters[category]:03d}" def _chapter_code_to_position(self, chapter_code: str) -> str: """ Converts chapter code to position format with caching. Example: '01.02.03#' -> '1\\2\\3' """ if chapter_code in self._position_cache: return self._position_cache[chapter_code] clean_code = chapter_code.rstrip('#') parts = clean_code.split('.') position_parts = [str(int(part)) for part in parts] result = '\\'.join(position_parts) self._position_cache[chapter_code] = result return result @staticmethod def _escape_bc3_text(text: str) -> str: """Escapes special characters for BC3 format.""" if not text: return '' # Normalize and clean whitespace text = str(text).strip().replace('\r', ' ').replace('\n', ' ').replace('\t', ' ') # Escape BC3 special characters return text.replace('|', ' ').replace('~', '-') # ============================================================================ # 4. IFC ELEMENT CLASSIFICATION # ============================================================================ # Methods that classify, categorize and filter IFC elements based on their # type and properties. def _get_category_code(self, ifc_type: str) -> str: """Gets category code for an IFC type (O(1) lookup).""" return self._ifc_to_category.get(ifc_type, 'OTROS') def _get_spatial_element_label(self, ifc_type: str) -> str: """Gets translated label for spatial elements from loaded JSON.""" return self.spatial_labels.get(ifc_type, ifc_type) @classmethod def _is_spatial_element(cls, ifc_type: str) -> bool: """Determines if an IFC element is spatial (container).""" return ifc_type in cls.SPATIAL_TYPES @classmethod def _is_ignored_element(cls, ifc_type: str) -> bool: """Determines if an element should be ignored.""" return ifc_type in cls.IGNORED_TYPES def _group_elements_by_type(self, elements: List[Dict]) -> Dict[str, List[Dict]]: """Groups elements by IFC type, ignoring invalid types.""" groups = defaultdict(list) for elem in elements: if not self._is_ignored_element(elem['type']): groups[elem['type']].append(elem) return groups def _is_unit_based_element(self, ifc_type: str) -> bool: """ Determines if an element is measured by unit (no dimensions needed). Unit-based elements: doors, windows, furniture, stairs, railings, fittings, terminals. """ unit_based_types = { 'IfcDoor', 'IfcWindow', # CARP - Carpentry 'IfcFurnishingElement', 'IfcFurniture', # MOB - Furniture 'IfcStair', # ESTR - Stairs (counted as units) 'IfcFlowFitting', 'IfcFlowTerminal', 'IfcDistributionElement', 'IfcRailing' # INST - Installations } return ifc_type in unit_based_types def _is_linear_element(self, ifc_type: str) -> bool: """ Determines if an element is measured by length (meters). Linear elements: beams, columns, piles. """ linear_types = { 'IfcBeam', # ESTR - Beams 'IfcColumn', # ESTR - Columns 'IfcPile' # ESTR - Piles } return ifc_type in linear_types # ============================================================================ # 5. QUANTITY & MEASUREMENT EXTRACTION # ============================================================================ # Methods that extract quantities, dimensions, and measurements from IFC # elements and format them for BC3 records. def _get_quantities_for_element(self, element_id: str) -> Dict[str, float]: """Gets quantities for an element.""" return self.quantities_by_id.get(element_id, {}).get('quantities', {}) @staticmethod def _get_measurement_dimensions(quantities: Dict[str, float], ifc_type: str = None) -> tuple: """ Extracts dimensions from quantities based on element type. - Walls (IfcWall*): Use NetSideArea (accounts for doors/windows) - Slabs/Roofs (IfcSlab, IfcRoof): Use GrossVolume - Other elements: Use NetVolume or fallback values Returns (units, length, width, height) """ if not quantities: return (1.0, 0.0, 0.0, 0.0) # Walls: ONLY use NetSideArea (lateral area without openings) if ifc_type and ifc_type.startswith('IfcWall'): net_side_area = quantities.get('NetSideArea', 0.0) # Force return NetSideArea for walls, even if 0 return (1.0, net_side_area, 0.0, 0.0) # Slabs and Roofs: ONLY use GrossVolume if ifc_type in ('IfcSlab', 'IfcRoof'): gross_volume = quantities.get('GrossVolume', 0.0) # Force return GrossVolume for slabs/roofs, even if 0 return (1.0, gross_volume, 0.0, 0.0) # Priority 1: Use NetVolume (accounts for openings and voids) net_volume = quantities.get('NetVolume', 0.0) if net_volume > 0: return (1.0, net_volume, 0.0, 0.0) # Priority 2: Use NetSideArea as fallback net_side_area = quantities.get('NetSideArea', 0.0) if net_side_area > 0: return (1.0, net_side_area, 0.0, 0.0) # Priority 3: Use GrossVolume or GrossSideArea as fallback gross_volume = quantities.get('GrossVolume', 0.0) gross_side_area = quantities.get('GrossSideArea', 0.0) if gross_volume > 0: return (1.0, gross_volume, 0.0, 0.0) elif gross_side_area > 0: return (1.0, gross_side_area, 0.0, 0.0) # Priority 4: Use basic dimensions (for linear elements) length = quantities.get('Length', 0.0) width = quantities.get('Width', 0.0) height = quantities.get('Height', 0.0) return (1.0, length, width, height) def _get_item_data(self, ifc_type: str, category: str, chapter_code: str) -> Dict[str, Any]: """Gets all necessary data to create a budget item.""" price_data = self.unit_prices.get(ifc_type, {}) return { 'code': price_data.get('code', self._generate_item_code(category, chapter_code)), 'description': price_data.get('description', ifc_type.replace('Ifc', '')), 'long_description': price_data.get('long_description', f"Item for {ifc_type}"), 'unit': price_data.get('unit', 'ud'), 'price': price_data.get('price', 100.0) } def _create_measurement_lines(self, elements: List[Dict], ifc_type: str) -> List[str]: """Creates measurement lines for a list of elements, sorted alphabetically by name.""" # Sort elements by name before processing (handle None values) sorted_elements = sorted(elements, key=lambda e: e.get('name') or '') measurement_lines = [] for idx, elem in enumerate(sorted_elements, 1): elem_name = self._escape_bc3_text(elem.get('name', f'Element {idx}')) # Elements measured by unit (doors, windows, furniture) don't need dimensions if self._is_unit_based_element(ifc_type): line_parts = [elem_name, "1.000", "", "", ""] # Linear elements (beams, columns, piles) measured by length elif self._is_linear_element(ifc_type): quantities = self._get_quantities_for_element(elem['id']) length = quantities.get('Length', 0.0) line_parts = [ elem_name, "1.000", f"{length:.2f}" if length > 0 else "", "", "" ] else: quantities = self._get_quantities_for_element(elem['id']) units, length, width, height = self._get_measurement_dimensions(quantities, ifc_type) line_parts = [ elem_name, f"{units:.3f}", f"{length:.2f}" if length > 0 else "", f"{width:.2f}" if width > 0 else "", f"{height:.2f}" if height > 0 else "" ] measurement_lines.append('\\'.join(line_parts)) return measurement_lines # ============================================================================ # 6. BC3 RECORD BUILDING # ============================================================================ # Methods that construct individual BC3 format records (~V, ~K, ~C, ~D, ~T, ~M). # These are the low-level builders for BC3 file structure. @staticmethod def _create_bc3_header() -> List[str]: """Creates BC3 file header lines.""" date_code = datetime.now().strftime('%d%m%Y') return [ f'~V||FIEBDC-3/2016\\{date_code}|IFC2BC3 Converter|\\|ANSI||', '~K|3\\3\\3\\2\\2\\2\\2\\2\\|0\\0\\0\\0\\0\\|3\\2\\\\2\\2\\\\2\\2\\2\\3\\3\\3\\3\\2\\EUR\\|' ] def _build_chapter_record(self, code: str, name: str) -> str: """Builds ~C record for a chapter.""" return f"~C|{code}\\||{name}|0\\||||||" def _build_decomposition_record(self, code: str, child_codes: List[str]) -> str: """Builds ~D decomposition record.""" children_str = '\\'.join([f"{c}\\\\1.000" for c in child_codes]) return f"~D|{code}|{children_str}|" def _build_item_record(self, code: str, unit: str, name: str, price: float, date: str) -> str: """Builds ~C record for a budget item.""" return f"~C|{code}|{unit}|{name}|{price:.2f}||{date}|" def _build_text_record(self, code: str, description: str) -> str: """Builds ~T descriptive text record.""" return f"~T|{code}|{description}|" def _build_measurement_record(self, chapter_code: str, item_code: str, position: str, measurement_content: str) -> str: """Builds ~M measurements record.""" return f"~M|{chapter_code}\\{item_code}|{position}|0|\\{measurement_content}\\|" # ============================================================================ # 7. CORE PROCESSING & CONVERSION LOGIC # ============================================================================ # Methods that orchestrate the conversion process by processing spatial # structure and building elements recursively. def _process_spatial_node(self, node: Dict, parent_code: str, lines: List[str], depth: int = 0) -> str: """Recursively processes a spatial node (chapter) from IFC structure.""" if self._is_ignored_element(node['type']): return None # Generate chapter code and name code = 'R_A_I_Z##' if depth == 0 else self._generate_chapter_code(parent_code) label = self._get_spatial_element_label(node['type']) node_name = node.get('name', '') full_name = f"{label} - {node_name}" if node_name else label name = self._escape_bc3_text(full_name) # Add chapter record lines.append(self._build_chapter_record(code, name)) decomposition_codes = [] # Process building elements building_elements = node.get('building_elements', []) if building_elements: item_codes = self._process_building_elements(building_elements, code, lines) decomposition_codes.extend(item_codes) # Process spatial children recursively for child in node.get('children', []): if self._is_spatial_element(child['type']): child_code = self._process_spatial_node(child, code, lines, depth + 1) if child_code: decomposition_codes.append(child_code) # Add decomposition record if decomposition_codes: lines.append(self._build_decomposition_record(code, decomposition_codes)) return code def _process_building_elements(self, elements: List[Dict], chapter_code: str, lines: List[str]) -> List[str]: """ Processes building elements and groups them by category. Optimized: Batch operations to reduce concatenation overhead. """ created_items = [] chapter_key = chapter_code.rstrip('#') # Group elements by type elements_by_type = self._group_elements_by_type(elements) # Pre-calculate common values outside loop chapter_position = self._chapter_code_to_position(chapter_code) date_str = datetime.now().strftime("%d%m%Y") # Process each group for ifc_type, type_elements in elements_by_type.items(): category = self._get_category_code(ifc_type) item_key = f"{ifc_type}_{chapter_key}" # Check if this item already exists in this chapter if item_key in self.items_per_chapter[chapter_key]: continue self.items_per_chapter[chapter_key].add(item_key) # Get item data item_data = self._get_item_data(ifc_type, category, chapter_code) item_code = item_data['code'] # Register position position = len(self.item_positions[chapter_key]) + 1 self.item_positions[chapter_key][item_code] = position # Escape texts (batch) name = self._escape_bc3_text(item_data['description']) long_desc = self._escape_bc3_text(item_data['long_description']) batch_records = [] # Only create ~C and ~T records if this concept hasn't been created globally if item_code not in self.created_concepts: self.created_concepts.add(item_code) batch_records.extend([ self._build_item_record(item_code, item_data['unit'], name, item_data['price'], date_str), self._build_text_record(item_code, long_desc) ]) # Always create measurements for this chapter measurement_lines = self._create_measurement_lines(type_elements, ifc_type) full_position = f"{chapter_position}\\{position}" measurement_content = '\\\\'.join(measurement_lines) batch_records.append( self._build_measurement_record(chapter_code, item_code, full_position, measurement_content) ) # Add batch at once (more efficient than 3 individual appends) lines.extend(batch_records) created_items.append(item_code) return created_items # ============================================================================ # 8. PUBLIC API METHODS # ============================================================================ # Public methods that provide the main interface for converting and # exporting BC3 files. def convert(self) -> str: """Performs complete conversion and returns BC3 file content.""" lines = self._create_bc3_header() # Process structure from root # Try different possible root keys root = self.structure_data.get('structure') if not root and 'type' in self.structure_data: # If structure_data itself is the root node root = self.structure_data if root: self._process_spatial_node(root, '', lines, depth=0) else: print(f"Warning: No structure found. Keys available: {list(self.structure_data.keys())}") return '\n'.join(lines) def export(self, output_filename: str = 'ifc2bc3.bc3'): """Exports BC3 file to exports folder.""" script_dir = Path(__file__).parent exports_dir = script_dir / 'exports' exports_dir.mkdir(exist_ok=True) bc3_content = self.convert() output_path = exports_dir / output_filename with open(output_path, 'w', encoding='windows-1252', newline='\r\n', errors='strict') as f: f.write(bc3_content) print(f"BC3 file successfully exported: {output_path}") return output_path ``` -------------------------------------------------------------------------------- /tools.py: -------------------------------------------------------------------------------- ```python # blender_mcp_server.py from mcp.server.fastmcp import FastMCP, Context, Image import socket import json import asyncio import logging from dataclasses import dataclass from contextlib import asynccontextmanager from typing import AsyncIterator, Dict, Any, List, TypedDict import os from pathlib import Path import base64 from urllib.parse import urlparse from typing import Optional import sys from bc3_writer import IFC2BC3Converter # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("BlenderMCPServer") @dataclass class BlenderConnection: host: str port: int sock: socket.socket = None # Changed from 'socket' to 'sock' to avoid naming conflict def connect(self) -> bool: """Connect to the Blender addon socket server""" if self.sock: return True try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) logger.info(f"Connected to Blender at {self.host}:{self.port}") return True except Exception as e: logger.error(f"Failed to connect to Blender: {str(e)}") self.sock = None return False def disconnect(self): """Disconnect from the Blender addon""" if self.sock: try: self.sock.close() except Exception as e: logger.error(f"Error disconnecting from Blender: {str(e)}") finally: self.sock = None def receive_full_response(self, sock, buffer_size=8192): """Receive the complete response, potentially in multiple chunks""" chunks = [] # Use a consistent timeout value that matches the addon's timeout sock.settimeout(15.0) # Match the addon's timeout try: while True: try: chunk = sock.recv(buffer_size) if not chunk: # If we get an empty chunk, the connection might be closed if not chunks: # If we haven't received anything yet, this is an error raise Exception("Connection closed before receiving any data") break chunks.append(chunk) # Check if we've received a complete JSON object try: data = b''.join(chunks) json.loads(data.decode('utf-8')) # If we get here, it parsed successfully logger.info(f"Received complete response ({len(data)} bytes)") return data except json.JSONDecodeError: # Incomplete JSON, continue receiving continue except socket.timeout: # If we hit a timeout during receiving, break the loop and try to use what we have logger.warning("Socket timeout during chunked receive") break except (ConnectionError, BrokenPipeError, ConnectionResetError) as e: logger.error(f"Socket connection error during receive: {str(e)}") raise # Re-raise to be handled by the caller except socket.timeout: logger.warning("Socket timeout during chunked receive") except Exception as e: logger.error(f"Error during receive: {str(e)}") raise # If we get here, we either timed out or broke out of the loop # Try to use what we have if chunks: data = b''.join(chunks) logger.info(f"Returning data after receive completion ({len(data)} bytes)") try: # Try to parse what we have json.loads(data.decode('utf-8')) return data except json.JSONDecodeError: # If we can't parse it, it's incomplete raise Exception("Incomplete JSON response received") else: raise Exception("No data received") def send_command(self, command_type: str, params: Dict[str, Any] | None = None) -> Dict[str, Any]: """Send a command to Blender and return the response""" if not self.sock and not self.connect(): raise ConnectionError("Not connected to Blender") command = { "type": command_type, "params": params or {} } try: # Log the command being sent logger.info(f"Sending command: {command_type} with params: {params}") # Send the command self.sock.sendall(json.dumps(command).encode('utf-8')) logger.info(f"Command sent, waiting for response...") # Set a timeout for receiving - use the same timeout as in receive_full_response self.sock.settimeout(15.0) # Match the addon's timeout # Receive the response using the improved receive_full_response method response_data = self.receive_full_response(self.sock) logger.info(f"Received {len(response_data)} bytes of data") response = json.loads(response_data.decode('utf-8')) logger.info(f"Response parsed, status: {response.get('status', 'unknown')}") if response.get("status") == "error": logger.error(f"Blender error: {response.get('message')}") raise Exception(response.get("message", "Unknown error from Blender")) return response.get("result", {}) except socket.timeout: logger.error("Socket timeout while waiting for response from Blender") # Don't try to reconnect here - let the get_blender_connection handle reconnection # Just invalidate the current socket so it will be recreated next time self.sock = None raise Exception("Timeout waiting for Blender response - try simplifying your request") except (ConnectionError, BrokenPipeError, ConnectionResetError) as e: logger.error(f"Socket connection error: {str(e)}") self.sock = None raise Exception(f"Connection to Blender lost: {str(e)}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON response from Blender: {str(e)}") # Try to log what was received if 'response_data' in locals() and response_data: logger.error(f"Raw response (first 200 bytes): {response_data[:200]}") raise Exception(f"Invalid response from Blender: {str(e)}") except Exception as e: logger.error(f"Error communicating with Blender: {str(e)}") # Don't try to reconnect here - let the get_blender_connection handle reconnection self.sock = None raise Exception(f"Communication error with Blender: {str(e)}") @asynccontextmanager async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]: """Manage server startup and shutdown lifecycle""" # We don't need to create a connection here since we're using the global connection # for resources and tools try: # Just log that we're starting up logger.info("BlenderMCP server starting up") # Try to connect to Blender on startup to verify it's available try: # This will initialize the global connection if needed blender = get_blender_connection() logger.info("Successfully connected to Blender on startup") except Exception as e: logger.warning(f"Could not connect to Blender on startup: {str(e)}") logger.warning("Make sure the Blender addon is running before using Blender resources or tools") # Return an empty context - we're using the global connection yield {} finally: # Clean up the global connection on shutdown global _blender_connection if _blender_connection: logger.info("Disconnecting from Blender on shutdown") _blender_connection.disconnect() _blender_connection = None logger.info("BlenderMCP server shut down") # Create the MCP server with lifespan support mcp = FastMCP( "Bonsai MCP", description="IFC manipulation through Blender and MCP", lifespan=server_lifespan ) # Resource endpoints # Global connection for resources (since resources can't access context) _blender_connection = None def get_blender_connection(): """Get or create a persistent Blender connection""" global _blender_connection # If we have an existing connection, check if it's still valid if _blender_connection is not None: try: # Simple ping to check if connection is still alive _blender_connection.send_command("get_ifc_project_info") return _blender_connection except Exception as e: # Connection is dead, close it and create a new one logger.warning(f"Existing connection is no longer valid: {str(e)}") try: _blender_connection.disconnect() except: pass _blender_connection = None # Create a new connection if needed if _blender_connection is None: _blender_connection = BlenderConnection(host="localhost", port=9876) if not _blender_connection.connect(): logger.error("Failed to connect to Blender") _blender_connection = None raise Exception("Could not connect to Blender. Make sure the Blender addon is running.") logger.info("Created new persistent connection to Blender") return _blender_connection # ------------------------------- # MCP TOOLS # ------------------------------- @mcp.tool() def execute_blender_code(ctx: Context, code: str) -> str: """ Execute arbitrary Python code in Blender. Parameters: - code: The Python code to execute """ try: # Get the global connection blender = get_blender_connection() result = blender.send_command("execute_code", {"code": code}) return f"Code executed successfully: {result.get('result', '')}" except Exception as e: logger.error(f"Error executing code: {str(e)}") return f"Error executing code: {str(e)}" ### IFC Tools @mcp.tool() def get_ifc_project_info() -> str: """ Get basic information about the IFC project, including name, description, and counts of different entity types. Returns: A JSON-formatted string with project information """ try: blender = get_blender_connection() result = blender.send_command("get_ifc_project_info") # Return the formatted JSON of the results return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error getting IFC project info: {str(e)}") return f"Error getting IFC project info: {str(e)}" @mcp.tool() def get_selected_ifc_entities() -> str: """ Get IFC entities corresponding to the currently selected objects in Blender. This allows working specifically with objects the user has manually selected in the Blender UI. Returns: A JSON-formatted string with information about the selected IFC entities """ try: blender = get_blender_connection() result = blender.send_command("get_selected_ifc_entities") # Return the formatted JSON of the results return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error getting selected IFC entities: {str(e)}") return f"Error getting selected IFC entities: {str(e)}" # Modify the existing list_ifc_entities function to accept a selected_only parameter @mcp.tool() def list_ifc_entities(entity_type: str | None = None, limit: int = 50, selected_only: bool = False) -> str: """ List IFC entities of a specific type. Can be filtered to only include objects currently selected in the Blender UI. Args: entity_type: Type of IFC entity to list (e.g., "IfcWall") limit: Maximum number of entities to return selected_only: If True, only return information about selected objects Returns: A JSON-formatted string listing the specified entities """ try: blender = get_blender_connection() result = blender.send_command("list_ifc_entities", { "entity_type": entity_type, "limit": limit, "selected_only": selected_only }) # Return the formatted JSON of the results return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error listing IFC entities: {str(e)}") return f"Error listing IFC entities: {str(e)}" # Modify the existing get_ifc_properties function to accept a selected_only parameter @mcp.tool() def get_ifc_properties(global_id: str | None = None, selected_only: bool = False) -> str: """ Get properties of IFC entities. Can be used to get properties of a specific entity by GlobalId, or to get properties of all currently selected objects in Blender. Args: global_id: GlobalId of a specific IFC entity (optional if selected_only is True) selected_only: If True, return properties for all selected objects instead of a specific entity Returns: A JSON-formatted string with entity information and properties """ try: blender = get_blender_connection() # Validate parameters if not global_id and not selected_only: return json.dumps({"error": "Either global_id or selected_only must be specified"}, indent=2) result = blender.send_command("get_ifc_properties", { "global_id": global_id, "selected_only": selected_only }) # Return the formatted JSON of the results return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error getting IFC properties: {str(e)}") return f"Error getting IFC properties: {str(e)}" @mcp.tool() def get_ifc_spatial_structure() -> str: """ Get the spatial structure of the IFC model (site, building, storey, space hierarchy). Returns: A JSON-formatted string representing the hierarchical structure of the IFC model """ try: blender = get_blender_connection() result = blender.send_command("get_ifc_spatial_structure") # Return the formatted JSON of the results return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error getting IFC spatial structure: {str(e)}") return f"Error getting IFC spatial structure: {str(e)}" @mcp.tool() def get_ifc_total_structure() -> str: """ Get the complete IFC structure including spatial hierarchy and all building elements. This function extends the basic spatial structure to include building elements like walls, doors, windows, columns, beams, etc. that are contained within each spatial element. It provides a comprehensive view of how the building is organized both spatially and in terms of its physical components. Returns: A JSON-formatted string representing the complete hierarchical structure of the IFC model including spatial elements and their contained building elements, plus summary statistics """ try: blender = get_blender_connection() result = blender.send_command("get_ifc_total_structure") # Return the formatted JSON of the results return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error getting IFC total structure: {str(e)}") return f"Error getting IFC total structure: {str(e)}" @mcp.tool() def get_ifc_relationships(global_id: str) -> str: """ Get all relationships for a specific IFC entity. Args: global_id: GlobalId of the IFC entity Returns: A JSON-formatted string with all relationships the entity participates in """ try: blender = get_blender_connection() result = blender.send_command("get_ifc_relationships", { "global_id": global_id }) # Return the formatted JSON of the results return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error getting IFC relationships: {str(e)}") return f"Error getting IFC relationships: {str(e)}" @mcp.tool() def export_ifc_data( entity_type: str | None = None, level_name: str | None = None, output_format: str = "csv", ctx: Context | None = None ) -> str: """ Export IFC data to a file in JSON or CSV format. This tool extracts IFC data and creates a structured export file. You can filter by entity type and/or building level, and choose the output format. Args: entity_type: Type of IFC entity to export (e.g., "IfcWall") - leave empty for all entities level_name: Name of the building level to filter by (e.g., "Level 1") - leave empty for all levels output_format: "json" or "csv" format for the output file Returns: Confirmation message with the export file path or an error message """ try: # Get Blender connection blender = get_blender_connection() # Validate output format if output_format not in ["json", "csv"]: return "Error: output_format must be 'json' or 'csv'" # Execute the export code in Blender result = blender.send_command("export_ifc_data", { "entity_type": entity_type, "level_name": level_name, "output_format": output_format }) # Check for errors from Blender if isinstance(result, dict) and "error" in result: return f"Error: {result['error']}" # Return the result with export summary # return result return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error exporting IFC data: {str(e)}") return f"Error exporting IFC data: {str(e)}" @mcp.tool() def place_ifc_object( type_name: str, x: float, y: float, z: float, rotation: float = 0.0, ctx: Context| None = None ) -> str: """ Place an IFC object at a specified location with optional rotation. This tool allows you to create and position IFC elements in the model. The object is placed using the specified IFC type and positioned at the given coordinates with optional rotation around the Z axis. Args: type_name: Name of the IFC element type to place (must exist in the model) x: X-coordinate in model space y: Y-coordinate in model space z: Z-coordinate in model space rotation: Rotation angle in degrees around the Z axis (default: 0) Returns: A message with the result of the placement operation """ try: # Get Blender connection blender = get_blender_connection() # Send command to place the object result = blender.send_command("place_ifc_object", { "type_name": type_name, "location": [x, y, z], "rotation": rotation }) # Check for errors if isinstance(result, dict) and "error" in result: return f"Error placing object: {result['error']}" # Format success message if isinstance(result, dict) and result.get("success"): return (f"Successfully placed '{type_name}' object at ({x}, {y}, {z}) " f"with {rotation}° rotation.\nObject name: {result.get('blender_name')}, " f"Global ID: {result.get('global_id')}") # Return the raw result as string if it's not a success or error dict return f"Placement result: {json.dumps(result, indent=2)}" except Exception as e: logger.error(f"Error placing IFC object: {str(e)}") return f"Error placing IFC object: {str(e)}" @mcp.tool() def get_user_view() -> Image: """ Capture and return the current Blender viewport as an image. Shows what the user is currently seeing in Blender. Focus mostly on the 3D viewport. Use the UI to assist in your understanding of the scene but only refer to it if specifically prompted. Args: max_dimension: Maximum dimension (width or height) in pixels for the returned image compression_quality: Image compression quality (1-100, higher is better quality but larger) Returns: An image of the current Blender viewport """ max_dimension = 800 compression_quality = 85 # Use PIL to compress the image from PIL import Image as PILImage import io try: # Get the global connection blender = get_blender_connection() # Request current view result = blender.send_command("get_current_view") if "error" in result: # logger.error(f"Error getting view from Blender: {result.get('error')}") raise Exception(f"Error getting current view: {result.get('error')}") # Extract image information if "data" not in result or "width" not in result or "height" not in result: # logger.error("Incomplete image data returned from Blender") raise Exception("Incomplete image data returned from Blender") # Decode the base64 image data image_data = base64.b64decode(result["data"]) original_width = result["width"] original_height = result["height"] original_format = result.get("format", "png") # Compression is only needed if the image is large if original_width > 800 or original_height > 800 or len(image_data) > 1000000: # logger.info(f"Compressing image (original size: {len(image_data)} bytes)") # Open image from binary data img = PILImage.open(io.BytesIO(image_data)) # Resize if needed if original_width > max_dimension or original_height > max_dimension: # Calculate new dimensions maintaining aspect ratio if original_width > original_height: new_width = max_dimension new_height = int(original_height * (max_dimension / original_width)) else: new_height = max_dimension new_width = int(original_width * (max_dimension / original_height)) # Resize using high-quality resampling img = img.resize((new_width, new_height), PILImage.Resampling.LANCZOS) # Convert to RGB if needed if img.mode == 'RGBA': img = img.convert('RGB') # Save as JPEG with compression output = io.BytesIO() img.save(output, format='JPEG', quality=compression_quality, optimize=True) compressed_data = output.getvalue() # logger.info(f"Image compressed from {len(image_data)} to {len(compressed_data)} bytes") # Return compressed image return Image(data=compressed_data, format="jpeg") else: # Image is small enough, return as-is return Image(data=image_data, format=original_format) except Exception as e: # logger.error(f"Error processing viewport image: {str(e)}") raise Exception(f"Error processing viewport image: {str(e)}") @mcp.tool() def get_ifc_quantities() -> str: """ Extract and get basic qtos about the IFC project. Returns: A JSON-formatted string with project quantities information """ try: blender = get_blender_connection() result = blender.send_command("get_ifc_quantities") # Return the formatted JSON of the results return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error getting IFC project quantities: {str(e)}") return f"Error getting IFC project quantities: {str(e)}" @mcp.tool() def export_bc3_budget(language: str = 'es') -> str: """ Export a BC3 budget file (FIEBDC-3/2016) based on the IFC model loaded in Blender. This tool creates a complete construction budget in BC3 format by: 1. Extracting the complete IFC spatial structure (Project → Site → Building → Storey) 2. Extracting IFC quantities and measurements for all building elements 3. Converting to BC3 hierarchical format with IFC2BC3Converter: - Generates budget chapters from IFC spatial hierarchy - Groups building elements by type and categories defined in external JSON - Assigns unit prices from language-specific JSON database - Creates detailed measurements sorted alphabetically 4. Exports to BC3 file with windows-1252 encoding Features: - Multi-language support (Spanish/English) for descriptions and labels - Automatic element categorization using external JSON configuration - Optimized conversion with O(1) lookups and batch operations - Detailed measurements with dimensions (units, length, width, height) - Full FIEBDC-3/2016 format compliance Configuration files (in resources/bc3_helper_files/): - precios_unitarios.json / unit_prices.json: Unit prices per IFC type - spatial_labels_es.json / spatial_labels_en.json: Spatial element translations - element_categories.json: IFC type to category mappings Args: language: Language for the budget file ('es' for Spanish, 'en' for English). Default is 'es'. Returns: A confirmation message with the path to the generated BC3 file in the exports/ folder. """ try: # Get IFC data logger.info("Getting IFC data...") ifc_total_structure = get_ifc_total_structure() ifc_quantities = get_ifc_quantities() # Validate that we got valid JSON responses # If there's an error, these functions return error strings starting with "Error" if isinstance(ifc_total_structure, str) and ifc_total_structure.startswith("Error"): return f"Failed to get IFC structure: {ifc_total_structure}" if isinstance(ifc_quantities, str) and ifc_quantities.startswith("Error"): return f"Failed to get IFC quantities: {ifc_quantities}" # Try to parse the JSON to ensure it's valid try: structure_data = json.loads(ifc_total_structure) if isinstance(ifc_total_structure, str) else ifc_total_structure quantities_data = json.loads(ifc_quantities) if isinstance(ifc_quantities, str) else ifc_quantities except json.JSONDecodeError as e: return f"Invalid JSON data received from Blender. Structure error: {str(e)}" converter = IFC2BC3Converter(structure_data, quantities_data, language=language) output_path = converter.export() return f"BC3 file successfully created at: {output_path}" except Exception as e: logger.error(f"Error creating BC3 budget: {str(e)}") return f"Error creating BC3 budget: {str(e)}" # WIP, not ready to be implemented: # @mcp.tool() # def create_plan_view(height_offset: float = 0.5, view_type: str = "top", # resolution_x: int = 400, resolution_y: int = 400, # output_path: str = None) -> Image: # """ # Create a plan view (top-down view) at the specified height above the first building story. # Args: # height_offset: Height in meters above the building story (default 0.5m) # view_type: Type of view - "top", "front", "right", "left" (note: only "top" is fully implemented) # resolution_x: Horizontal resolution of the render in pixels - Keep it small, max 800 x 800, recomended 400 x 400 # resolution_y: Vertical resolution of the render in pixels # output_path: Optional path to save the rendered image # Returns: # A rendered image showing the plan view of the model # """ # try: # # Get the global connection # blender = get_blender_connection() # # Request an orthographic render # result = blender.send_command("create_orthographic_render", { # "view_type": view_type, # "height_offset": height_offset, # "resolution_x": resolution_x, # "resolution_y": resolution_y, # "output_path": output_path # Can be None to use a temporary file # }) # if "error" in result: # raise Exception(f"Error creating plan view: {result.get('error', 'Unknown error')}") # if "data" not in result: # raise Exception("No image data returned from Blender") # # Decode the base64 image data # image_data = base64.b64decode(result["data"]) # # Return as an Image object # return Image(data=image_data, format="png") # except Exception as e: # logger.error(f"Error creating plan view: {str(e)}") # raise Exception(f"Error creating plan view: {str(e)}") @mcp.tool() def export_drawing_png( height_offset: float = 0.5, view_type: str = "top", resolution_x: int = 1920, resolution_y: int = 1080, storey_name: str | None = None, output_path: str | None = None ) -> dict: """Export drawings as PNG images with custom resolution. Creates a drawing, with the view type specified, of the IFC building at the specified height above the floor level. Supports custom resolution for high-quality architectural drawings. Args: height_offset: Height in meters above the storey level for the camera position (default 0.5m) view_type: Type of view - "top" for plan view, "front", "right" and "left" for elevation views, and "isometric" for 3D view resolution_x: Horizontal resolution in pixels (default 1920, max recommended 4096) resolution_y: Vertical resolution in pixels (default 1080, max recommended 4096) storey_name: Specific storey name to add to the file name (if None, prints default in the file name) output_path: Optional file path to save the PNG (if None, returns as base64 image) Returns: metadata and the path of the file image of the drawing at the specified resolution """ try: # Validate resolution limits for performance if resolution_x > 4096 or resolution_y > 4096: raise Exception("Resolution too high. Maximum recommended: 4096x4096 pixels") if resolution_x < 100 or resolution_y < 100: raise Exception("Resolution too low. Minimum: 100x100 pixels") # Get the global connection blender = get_blender_connection() # Request drawing render result = blender.send_command("export_drawing_png", { "view_type": view_type, "height_offset": height_offset, "resolution_x": resolution_x, "resolution_y": resolution_y, "storey_name": storey_name, "output_path": output_path }) if "error" in result: raise Exception(f"Error creating {view_type} drawing: {result.get('error', 'Unknown error')}") if "data" not in result: raise Exception("No image data returned from Blender") # Decode the base64 image data image_data = base64.b64decode(result["data"]) # Ensure output path exists if not output_path: os.makedirs("./exports/drawings", exist_ok=True) # Generate filename based on view type view_name = { "top": "plan_view", "front": "front_elevation", "right": "right_elevation", "left": "left_elevation", "isometric": "isometric_view" }.get(view_type, view_type) filename = f"{view_name}_{storey_name or 'default'}.png" output_path = os.path.join("./exports/drawings", filename) # Save to file with open(output_path, "wb") as f: f.write(image_data) # Return only metadata return { "status": "success", "file_path": os.path.abspath(output_path), # Opcional: si tienes un servidor de archivos, podrías devolver también una URL # "url": f"http://localhost:8000/files/{filename}" } except Exception as e: logger.error(f"Error exporting drawing: {str(e)}") return { "status": "error", "message": str(e) } @mcp.tool() def get_ifc_georeferencing_info(include_contexts: bool = False) -> str: """ Checks whether the IFC currently opened in Bonsai/BlenderBIM is georeferenced and returns the key georeferencing information. Parameters ---------- include_contexts : bool If True, adds a breakdown of the RepresentationContexts and operations. Returns -------- str (JSON pretty-printed) { "georeferenced": true|false, "crs": { "name": str|null, "geodetic_datum": str|null, "vertical_datum": str|null, "map_unit": str|null }, "map_conversion": { "eastings": float|null, "northings": float|null, "orthogonal_height": float|null, "scale": float|null, "x_axis_abscissa": float|null, "x_axis_ordinate": float|null }, "world_coordinate_system": { "origin": [x, y, z]|null }, "true_north": { "direction_ratios": [x, y]|null }, "site": { "local_placement_origin": [x, y, z]|null, "ref_latitude": [deg, min, sec, millionth]|null, "ref_longitude": [deg, min, sec, millionth]|null, "ref_elevation": float|null }, "contexts": [...], # only if include_contexts = true "warnings": [ ... ] # Informational message } Notes ----- - This tool acts as a wrapper: it sends the "get_ifc_georeferencing_info" command to the Blender add-on. The add-on must implement that logic (reading IfcProject/IfcGeometricRepresentationContext, IfcMapConversion, TargetCRS, IfcSite.RefLatitude/RefLongitude/RefElevation, etc.). - It always returns a JSON string with indentation for easier reading. """ blender = get_blender_connection() params = { "include_contexts": bool(include_contexts) } try: result = blender.send_command("get_ifc_georeferencing_info", params) # Ensures that the result is serializable and easy to read return json.dumps(result, ensure_ascii=False, indent=2) except Exception as e: logger.exception("get_ifc_georeferencing_info error") return json.dumps( { "georeferenced": False, "error": "Unable to retrieve georeferencing information from the IFC model.", "details": str(e) }, ensure_ascii=False, indent=2 ) @mcp.tool() def georeference_ifc_model( crs_mode: str, epsg: int = None, crs_name: str = None, geodetic_datum: str = None, map_projection: str = None, map_zone: str = None, eastings: float = None, northings: float = None, orthogonal_height: float = 0.0, scale: float = 1.0, x_axis_abscissa: float = None, x_axis_ordinate: float = None, true_north_azimuth_deg: float = None, context_filter: str = "Model", context_index: int = None, site_ref_latitude: list = None, # [deg, min, sec, millionth] site_ref_longitude: list = None, # [deg, min, sec, millionth] site_ref_elevation: float = None, site_ref_latitude_dd: float = None, # Decimal degrees (optional) site_ref_longitude_dd: float = None, # Decimal degrees (optional) overwrite: bool = False, dry_run: bool = False, write_path: str = None, ) -> str: """ Georeferences the IFC currently opened in Bonsai/BlenderBIM by creating or updating IfcProjectedCRS and IfcMapConversion. Optionally updates IfcSite and writes the file to disk. """ import json blender = get_blender_connection() # Build params excluding None values to keep the payload clean params = { "crs_mode": crs_mode, "epsg": epsg, "crs_name": crs_name, "geodetic_datum": geodetic_datum, "map_projection": map_projection, "map_zone": map_zone, "eastings": eastings, "northings": northings, "orthogonal_height": orthogonal_height, "scale": scale, "x_axis_abscissa": x_axis_abscissa, "x_axis_ordinate": x_axis_ordinate, "true_north_azimuth_deg": true_north_azimuth_deg, "context_filter": context_filter, "context_index": context_index, "site_ref_latitude": site_ref_latitude, "site_ref_longitude": site_ref_longitude, "site_ref_elevation": site_ref_elevation, "site_ref_latitude_dd": site_ref_latitude_dd, "site_ref_longitude_dd": site_ref_longitude_dd, "overwrite": overwrite, "dry_run": dry_run, "write_path": write_path, } params = {k: v for k, v in params.items() if v is not None} try: result = blender.send_command("georeference_ifc_model", params) return json.dumps(result, ensure_ascii=False, indent=2) except Exception as e: logger.exception("georeference_ifc_model error") return json.dumps( {"success": False, "error": "Could not georeference the model.", "details": str(e)}, ensure_ascii=False, indent=2, ) @mcp.tool() def generate_ids( title: str, specs: Union[List[dict], str], # accepts a list of dicts or a JSON string description: str = "", author: str = "", ids_version: Union[str, float] = "", # IDS version (Not IFC version) purpose: str = "", milestone: str = "", date_iso: str = None, output_path: str = None, ) -> str: """ Creates an .ids file in Blender/Bonsai by calling the add-on handler 'generate_ids'. Parameters: - title (str): Title of the IDS. - specs (list | JSON str): List of 'specs' containing 'applicability' and 'requirements'. Each facet is a dict with at least a 'type' field ("Entity", "Attribute", "Property", "Material", "Classification", "PartOf") and its corresponding attributes. - description, author, ids_version, date_iso, purpose, milestone: IDS metadata fields. - output_path (str): Full path to the .ids file to be created. If omitted, the add-on will generate a default name. Returns: - JSON (str) with the handler result: {"ok": bool, "output_path": "...", "message": "..."} or {"ok": False, "error": "..."} """ blender = get_blender_connection() # Allow 'specs' to be received as JSON text (convenient when the client builds it as a string) if isinstance(specs, str): try: specs = json.loads(specs) except Exception as e: return json.dumps( {"ok": False, "error": "Argument 'specs' is not a valid JSON", "details": str(e)}, ensure_ascii=False, indent=2 ) # Basic validations to avoid sending garbage to the add-on if not isinstance(title, str) or not title.strip(): return json.dumps({"ok": False, "error": "Empty or invalid 'title' parameter."}, ensure_ascii=False, indent=2) if not isinstance(specs, list) or not specs: return json.dumps({"ok": False, "error": "You must provide at least one 'spec' in 'specs'."}, ensure_ascii=False, indent=2) # Safe coercion of ids_version to str if ids_version is not None and not isinstance(ids_version, str): ids_version = str(ids_version) params: dict[str, Any] = { "title": title, "specs": specs, "description": description, "author": author, "ids_version": ids_version, # ← the handler maps it to the 'version' field of the IDS "date_iso": date_iso, "output_path": output_path, "purpose": purpose, "milestone": milestone, } # Cleanup: remove keys with None values to keep the payload clean params = {k: v for k, v in params.items() if v is not None} try: # Assignment name must match EXACTLY the one in addon.py result = blender.send_command("generate_ids", params) # Returns JSON return json.dumps(result, ensure_ascii=False, indent=2) except Exception as e: return json.dumps({"ok": False, "error": "Fallo al crear IDS", "details": str(e)}, ensure_ascii=False, indent=2) # ------------------------------- # MCP RESOURCES # ------------------------------- # Base path of the resource files BASE_PATH = Path("./resources") @mcp.resource("file://table_of_contents.json") def formulas_rp() -> str: """Read the content of table_of_contents.json file""" file_path = BASE_PATH / "table_of_contents.json" try: with open(file_path, 'r', encoding='utf-8') as f: return f.read() except FileNotFoundError: return f"Error: File not found {file_path}" except Exception as e: return f"Error reading file: {str(e)}" # ------------------------------- # MCP PROMPTS # ------------------------------- @mcp.prompt("Technical_building_report") def technical_building_report(project_name: str, project_location: str, language: str = "english") -> str: """ Generate a comprehensive technical building report based on an IFC model loaded in Blender. Args: project_name: Name of the project/building project_location: Building location (city, address) language: Report language - "english", "spanish", "french", "german", "italian", "portuguese" Returns: Structured technical report following basic project standards in the selected language. """ # Language-specific instructions language_instructions = { "english": { "role": "You are a technical architect specialized in creating technical reports for basic building projects.", "objective": f"Your objective is to generate a comprehensive technical report for the building \"{project_name}\" located in \"{project_location}\", using data from the IFC model loaded in Blender.", "workflow_title": "## MANDATORY WORKFLOW:", "report_language": "Write the entire report in English." }, "spanish": { "role": "Eres un arquitecto técnico especializado en la creación de memorias técnicas de proyectos básicos de edificación.", "objective": f"Tu objetivo es generar una memoria técnica completa del edificio \"{project_name}\" localizado en \"{project_location}\", utilizando los datos del modelo IFC cargado en Blender.", "workflow_title": "## FLUJO DE TRABAJO OBLIGATORIO:", "report_language": "Redacta todo el informe en español." }, "french": { "role": "Vous êtes un architecte technique spécialisé dans la création de rapports techniques pour les projets de bâtiment de base.", "objective": f"Votre objectif est de générer un rapport technique complet pour le bâtiment \"{project_name}\" situé à \"{project_location}\", en utilisant les données du modèle IFC chargé dans Blender.", "workflow_title": "## FLUX DE TRAVAIL OBLIGATOIRE:", "report_language": "Rédigez tout le rapport en français." }, "german": { "role": "Sie sind ein technischer Architekt, der sich auf die Erstellung technischer Berichte für grundlegende Bauprojekte spezialisiert hat.", "objective": f"Ihr Ziel ist es, einen umfassenden technischen Bericht für das Gebäude \"{project_name}\" in \"{project_location}\" zu erstellen, unter Verwendung der Daten aus dem in Blender geladenen IFC-Modell.", "workflow_title": "## OBLIGATORISCHER ARBEITSABLAUF:", "report_language": "Verfassen Sie den gesamten Bericht auf Deutsch." }, "italian": { "role": "Sei un architetto tecnico specializzato nella creazione di relazioni tecniche per progetti edilizi di base.", "objective": f"Il tuo obiettivo è generare una relazione tecnica completa per l'edificio \"{project_name}\" situato a \"{project_location}\", utilizzando i dati del modello IFC caricato in Blender.", "workflow_title": "## FLUSSO DI LAVORO OBBLIGATORIO:", "report_language": "Scrivi tutto il rapporto in italiano." }, "portuguese": { "role": "Você é um arquiteto técnico especializado na criação de relatórios técnicos para projetos básicos de construção.", "objective": f"Seu objetivo é gerar um relatório técnico abrangente para o edifício \"{project_name}\" localizado em \"{project_location}\", usando dados do modelo IFC carregado no Blender.", "workflow_title": "## FLUXO DE TRABALHO OBRIGATÓRIO:", "report_language": "Escreva todo o relatório em português." } } # Get language instructions (default to English if language not supported) lang_config = language_instructions.get(language.lower(), language_instructions["english"]) return f""" {lang_config["role"]} {lang_config["objective"]} **LANGUAGE REQUIREMENT:** {lang_config["report_language"]} {lang_config["workflow_title"]} ### 1. INITIAL IFC MODEL ANALYSIS - **Use MCP tool:** `get_ifc_project_info` to get basic project information - **Use MCP tool:** `get_ifc_spatial_structure` to understand the building's spatial structure - **Use MCP tool:** `get_user_view` to capture a general view of the model ### 2. OBTAIN TABLE OF CONTENTS - **Access MCP resource:** `file://table_of_contents.json` to get the complete technical report structure ### 3. DETAILED ANALYSIS BY SECTIONS #### 3.1 For "General Building Data" Section: - **Use:** `get_ifc_quantities` to obtain areas and volumes - **Use:** `list_ifc_entities` with entity_type="IfcSpace" for spaces - **Use:** `list_ifc_entities` with entity_type="IfcBuildingStorey" for floors #### 3.2 For "Architectural Solution" Section: - **Use:** `list_ifc_entities` with entity_type="IfcWall" for walls - **Use:** `list_ifc_entities` with entity_type="IfcDoor" for doors - **Use:** `list_ifc_entities` with entity_type="IfcWindow" for windows - **Use:** `get_user_view` to capture representative views #### 3.3 For "Construction Systems" Section: - **Use:** `list_ifc_entities` with entity_type="IfcBeam" for beams - **Use:** `list_ifc_entities` with entity_type="IfcColumn" for columns - **Use:** `list_ifc_entities` with entity_type="IfcSlab" for slabs - **Use:** `list_ifc_entities` with entity_type="IfcRoof" for roofs - **Use:** `get_ifc_properties` to obtain material properties #### 3.4 For Building Services: - **Use:** `list_ifc_entities` with entity_type="IfcPipeSegment" for plumbing - **Use:** `list_ifc_entities` with entity_type="IfcCableSegment" for electrical - **Use:** `list_ifc_entities` with entity_type="IfcDuctSegment" for HVAC #### 3.5 For drawings and Graphic Documentation: - **Use:** `export_drawing_png` 5 times, using as parameter each time "top", "front", "right", "left" and "isometric", to generate architectural drawings. - **Configure:** resolution_x=1920, resolution_y=1080 for adequate quality - **Use:** `get_user_view` for complementary 3D views ### 4. TECHNICAL REPORT STRUCTURE Organize the document following exactly the structure from the `table_of_contents.json` resource: **TECHNICAL REPORT – BASIC PROJECT: {project_name}** **Location:** {project_location} #### 1. INTRODUCTION - Define object and scope based on IFC model data - Justify the adopted architectural solution #### 2. GENERAL BUILDING DATA - **Location:** {project_location} - **Areas:** Extract from quantities and spaces analysis - **Distribution:** Based on IFC spatial structure - **Regulations:** Identify applicable regulations according to use and location #### 3-11. DEVELOPMENT OF ALL SECTIONS - Complete each section according to the index, using data extracted from the IFC model - Include summary tables of areas, materials and construction elements - Generate technical conclusions based on evidence ### 5. MANDATORY GRAPHIC DOCUMENTATION - **2D drawings:**Include the 4 2D drawings generated before in the 3.5 section with the Tool `export_drawing_png` ("top", "front", "right", "left") - **3D views:** Include the isometric 3D view generated before in the 3.5 section with the Tool `export_drawing_png` - **Organize:** All images in section 11. Annexes ### 6. TECHNICAL TABLES AND CHARTS - **Areas summary table:** Extracted from quantities - **Elements listing:** By typologies (walls, columns, beams, etc.) - **Material properties:** From IFC properties ## RESPONSE FORMAT: ### MARKDOWN STRUCTURE: ```markdown # TECHNICAL REPORT – BASIC PROJECT ## {project_name} ### Project Data: - **Location:** {project_location} - **Date:** [current date] - **IFC Model:** [model information] [Complete development of all index sections] ``` ### QUALITY CRITERIA: - **Technical precision:** All numerical data extracted directly from IFC model - **Completeness:** Cover all index sections mandatory - **Professional format:** Markdown tables, structured text, integrated images - **Consistency:** Verify data consistency between sections ## CRITICAL VALIDATIONS: 1. **Verify Blender connection:** Confirm IFC model is loaded 2. **Complete all sections:** Do not omit any index section 3. **Include graphic documentation:** drawings and 3D views mandatory 4. **Quantitative data:** Areas, volumes and quantities verified 5. **Regulatory consistency:** Applicable regulations according to use and location **IMPORTANT:** If any MCP tool fails or doesn't return data, document the limitation and indicate that section requires manual completion in executive project phase. Proceed to generate the technical report following this detailed workflow. """ # Main execution def main(): """Run the MCP server""" mcp.run() if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /addon.py: -------------------------------------------------------------------------------- ```python import bpy import mathutils import json import threading import socket import time import requests import tempfile import traceback import os import shutil from bpy.props import StringProperty, IntProperty, BoolProperty, EnumProperty import base64 import bpy import ifcopenshell from bonsai.bim.ifc import IfcStore bl_info = { "name": "Bonsai MCP", "author": "JotaDeRodriguez", "version": (0, 2), "blender": (3, 0, 0), "location": "View3D > Sidebar > Bonsai MCP", "description": "Connect Claude to Blender via MCP. Aimed at IFC projects", "category": "Interface", } class BlenderMCPServer: def __init__(self, host='localhost', port=9876): self.host = host self.port = port self.running = False self.socket = None self.server_thread = None def start(self): if self.running: print("Server is already running") return self.running = True try: # Create socket self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind((self.host, self.port)) self.socket.listen(1) # Start server thread self.server_thread = threading.Thread(target=self._server_loop) self.server_thread.daemon = True self.server_thread.start() print(f"BlenderMCP server started on {self.host}:{self.port}") except Exception as e: print(f"Failed to start server: {str(e)}") self.stop() def stop(self): self.running = False # Close socket if self.socket: try: self.socket.close() except: pass self.socket = None # Wait for thread to finish if self.server_thread: try: if self.server_thread.is_alive(): self.server_thread.join(timeout=1.0) except: pass self.server_thread = None print("BlenderMCP server stopped") def _server_loop(self): """Main server loop in a separate thread""" print("Server thread started") self.socket.settimeout(1.0) # Timeout to allow for stopping while self.running: try: # Accept new connection try: client, address = self.socket.accept() print(f"Connected to client: {address}") # Handle client in a separate thread client_thread = threading.Thread( target=self._handle_client, args=(client,) ) client_thread.daemon = True client_thread.start() except socket.timeout: # Just check running condition continue except Exception as e: print(f"Error accepting connection: {str(e)}") time.sleep(0.5) except Exception as e: print(f"Error in server loop: {str(e)}") if not self.running: break time.sleep(0.5) print("Server thread stopped") def _handle_client(self, client): """Handle connected client""" print("Client handler started") client.settimeout(None) # No timeout buffer = b'' try: while self.running: # Receive data try: data = client.recv(8192) if not data: print("Client disconnected") break buffer += data try: # Try to parse command command = json.loads(buffer.decode('utf-8')) buffer = b'' # Execute command in Blender's main thread def execute_wrapper(): try: response = self.execute_command(command) response_json = json.dumps(response) try: client.sendall(response_json.encode('utf-8')) except: print("Failed to send response - client disconnected") except Exception as e: print(f"Error executing command: {str(e)}") traceback.print_exc() try: error_response = { "status": "error", "message": str(e) } client.sendall(json.dumps(error_response).encode('utf-8')) except: pass return None # Schedule execution in main thread bpy.app.timers.register(execute_wrapper, first_interval=0.0) except json.JSONDecodeError: # Incomplete data, wait for more pass except Exception as e: print(f"Error receiving data: {str(e)}") break except Exception as e: print(f"Error in client handler: {str(e)}") finally: try: client.close() except: pass print("Client handler stopped") def execute_command(self, command): """Execute a command in the main Blender thread""" try: cmd_type = command.get("type") params = command.get("params", {}) # Ensure we're in the right context if cmd_type in ["create_object", "modify_object", "delete_object"]: override = bpy.context.copy() override['area'] = [area for area in bpy.context.screen.areas if area.type == 'VIEW_3D'][0] with bpy.context.temp_override(**override): return self._execute_command_internal(command) else: return self._execute_command_internal(command) except Exception as e: print(f"Error executing command: {str(e)}") traceback.print_exc() return {"status": "error", "message": str(e)} def _execute_command_internal(self, command): """Internal command execution with proper context""" cmd_type = command.get("type") params = command.get("params", {}) # Base handlers that are always available handlers = { "execute_code": self.execute_code, "get_ifc_project_info": self.get_ifc_project_info, "list_ifc_entities": self.list_ifc_entities, "get_ifc_properties": self.get_ifc_properties, "get_ifc_spatial_structure": self.get_ifc_spatial_structure, "get_ifc_total_structure": self.get_ifc_total_structure, "get_ifc_relationships": self.get_ifc_relationships, "get_selected_ifc_entities": self.get_selected_ifc_entities, "get_current_view": self.get_current_view, "export_ifc_data": self.export_ifc_data, "place_ifc_object": self.place_ifc_object, "get_ifc_quantities": self.get_ifc_quantities, "export_drawing_png": self.export_drawing_png, "get_ifc_georeferencing_info": self.get_ifc_georeferencing_info, "georeference_ifc_model": self.georeference_ifc_model, "generate_ids": self.generate_ids, } handler = handlers.get(cmd_type) if handler: try: print(f"Executing handler for {cmd_type}") result = handler(**params) print(f"Handler execution complete") return {"status": "success", "result": result} except Exception as e: print(f"Error in handler: {str(e)}") traceback.print_exc() return {"status": "error", "message": str(e)} else: return {"status": "error", "message": f"Unknown command type: {cmd_type}"} def execute_code(self, code): """Execute arbitrary Blender Python code""" # This is powerful but potentially dangerous - use with caution try: # Create a local namespace for execution namespace = {"bpy": bpy} exec(code, namespace) return {"executed": True} except Exception as e: raise Exception(f"Code execution error: {str(e)}") @staticmethod def get_selected_ifc_entities(): """ Get the IFC entities corresponding to the currently selected Blender objects. Returns: List of IFC entities for the selected objects """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # Get currently selected objects selected_objects = bpy.context.selected_objects if not selected_objects: return {"selected_count": 0, "message": "No objects selected in Blender"} # Collect IFC entities from selected objects selected_entities = [] for obj in selected_objects: if hasattr(obj, "BIMObjectProperties") and obj.BIMObjectProperties.ifc_definition_id: entity_id = obj.BIMObjectProperties.ifc_definition_id entity = file.by_id(entity_id) if entity: entity_info = { "id": entity.GlobalId if hasattr(entity, "GlobalId") else f"Entity_{entity.id()}", "ifc_id": entity.id(), "type": entity.is_a(), "name": entity.Name if hasattr(entity, "Name") else None, "blender_name": obj.name } selected_entities.append(entity_info) return { "selected_count": len(selected_entities), "selected_entities": selected_entities } except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} ### SPECIFIC IFC METHODS ### @staticmethod def get_ifc_project_info(): """ Get basic information about the IFC project. Returns: Dictionary with project name, description, and basic metrics """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # Get project information projects = file.by_type("IfcProject") if not projects: return {"error": "No IfcProject found in the model"} project = projects[0] # Basic project info info = { "id": project.GlobalId, "name": project.Name if hasattr(project, "Name") else "Unnamed Project", "description": project.Description if hasattr(project, "Description") else None, "entity_counts": {} } # Count entities by type entity_types = ["IfcWall", "IfcDoor", "IfcWindow", "IfcSlab", "IfcBeam", "IfcColumn", "IfcSpace", "IfcBuildingStorey"] for entity_type in entity_types: entities = file.by_type(entity_type) info["entity_counts"][entity_type] = len(entities) return info except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def list_ifc_entities(entity_type=None, limit=50, selected_only=False): """ List IFC entities of a specific type. Parameters: entity_type: Type of IFC entity to list (e.g., "IfcWall") limit: Maximum number of entities to return Returns: List of entities with basic properties """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # If we're only looking at selected objects if selected_only: selected_result = BlenderMCPServer.get_selected_ifc_entities() # Check for errors if "error" in selected_result: return selected_result # If no objects are selected, return early if selected_result["selected_count"] == 0: return selected_result # If entity_type is specified, filter the selected entities if entity_type: filtered_entities = [ entity for entity in selected_result["selected_entities"] if entity["type"] == entity_type ] return { "type": entity_type, "selected_count": len(filtered_entities), "entities": filtered_entities[:limit] } else: # Group selected entities by type entity_types = {} for entity in selected_result["selected_entities"]: entity_type = entity["type"] if entity_type in entity_types: entity_types[entity_type].append(entity) else: entity_types[entity_type] = [entity] return { "selected_count": selected_result["selected_count"], "entity_types": [ {"type": t, "count": len(entities), "entities": entities[:limit]} for t, entities in entity_types.items() ] } # Original functionality for non-selected mode if not entity_type: # If no type specified, list available entity types entity_types = {} for entity in file.wrapped_data.entities: entity_type = entity.is_a() if entity_type in entity_types: entity_types[entity_type] += 1 else: entity_types[entity_type] = 1 return { "available_types": [{"type": k, "count": v} for k, v in entity_types.items()] } # Get entities of the specified type entities = file.by_type(entity_type) # Prepare the result result = { "type": entity_type, "total_count": len(entities), "entities": [] } # Add entity data (limited) for i, entity in enumerate(entities): if i >= limit: break entity_data = { "id": entity.GlobalId if hasattr(entity, "GlobalId") else f"Entity_{entity.id()}", "name": entity.Name if hasattr(entity, "Name") else None } result["entities"].append(entity_data) return result except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def get_ifc_properties(global_id=None, selected_only=False): """ Get all properties of a specific IFC entity. Parameters: global_id: GlobalId of the IFC entity Returns: Dictionary with entity information and properties """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # If we're only looking at selected objects if selected_only: selected_result = BlenderMCPServer.get_selected_ifc_entities() # Check for errors if "error" in selected_result: return selected_result # If no objects are selected, return early if selected_result["selected_count"] == 0: return selected_result # Process each selected entity result = { "selected_count": selected_result["selected_count"], "entities": [] } for entity_info in selected_result["selected_entities"]: # Find entity by GlobalId entity = file.by_guid(entity_info["id"]) if not entity: continue # Get basic entity info entity_data = { "id": entity.GlobalId, "type": entity.is_a(), "name": entity.Name if hasattr(entity, "Name") else None, "description": entity.Description if hasattr(entity, "Description") else None, "blender_name": entity_info["blender_name"], "property_sets": {} } # Get all property sets psets = ifcopenshell.util.element.get_psets(entity) for pset_name, pset_data in psets.items(): entity_data["property_sets"][pset_name] = pset_data result["entities"].append(entity_data) return result # If we're looking at a specific entity elif global_id: # Find entity by GlobalId entity = file.by_guid(global_id) if not entity: return {"error": f"No entity found with GlobalId: {global_id}"} # Get basic entity info entity_info = { "id": entity.GlobalId, "type": entity.is_a(), "name": entity.Name if hasattr(entity, "Name") else None, "description": entity.Description if hasattr(entity, "Description") else None, "property_sets": {} } # Get all property sets psets = ifcopenshell.util.element.get_psets(entity) for pset_name, pset_data in psets.items(): entity_info["property_sets"][pset_name] = pset_data return entity_info else: return {"error": "Either global_id or selected_only must be specified"} except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def get_ifc_spatial_structure(): """ Get the spatial structure of the IFC model (site, building, storey, space hierarchy). Returns: Hierarchical structure of the IFC model's spatial elements """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # Start with projects projects = file.by_type("IfcProject") if not projects: return {"error": "No IfcProject found in the model"} def get_children(parent): """Get immediate children of the given element""" if hasattr(parent, "IsDecomposedBy"): rel_aggregates = parent.IsDecomposedBy children = [] for rel in rel_aggregates: children.extend(rel.RelatedObjects) return children return [] def create_structure(element): """Recursively create the structure for an element""" result = { "id": element.GlobalId, "type": element.is_a(), "name": element.Name if hasattr(element, "Name") else None, "children": [] } for child in get_children(element): result["children"].append(create_structure(child)) return result # Create the structure starting from the project structure = create_structure(projects[0]) return structure except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def get_ifc_total_structure(): """ Get the complete IFC structure including spatial hierarchy and building elements. This function extends the spatial structure to include building elements like walls, doors, windows, etc. that are contained in each spatial element. Returns: Complete hierarchical structure with spatial elements and their contained building elements """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # Start with projects projects = file.by_type("IfcProject") if not projects: return {"error": "No IfcProject found in the model"} def get_spatial_children(parent): """Get immediate spatial children of the given element""" if hasattr(parent, "IsDecomposedBy"): rel_aggregates = parent.IsDecomposedBy children = [] for rel in rel_aggregates: children.extend(rel.RelatedObjects) return children return [] def get_contained_elements(spatial_element): """Get building elements contained in this spatial element""" contained_elements = [] # Check for IfcRelContainedInSpatialStructure relationships if hasattr(spatial_element, "ContainsElements"): for rel in spatial_element.ContainsElements: for element in rel.RelatedElements: element_info = { "id": element.GlobalId, "type": element.is_a(), "name": element.Name if hasattr(element, "Name") else None, "description": element.Description if hasattr(element, "Description") else None } contained_elements.append(element_info) return contained_elements def create_total_structure(element): """Recursively create the complete structure for an element""" result = { "id": element.GlobalId, "type": element.is_a(), "name": element.Name if hasattr(element, "Name") else None, "description": element.Description if hasattr(element, "Description") else None, "children": [], "building_elements": [] } # Add spatial children (other spatial elements) for child in get_spatial_children(element): result["children"].append(create_total_structure(child)) # Add contained building elements (walls, doors, windows, etc.) result["building_elements"] = get_contained_elements(element) return result # Create the complete structure starting from the project total_structure = create_total_structure(projects[0]) return total_structure except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def get_ifc_relationships(global_id): """ Get all relationships for a specific IFC entity. Parameters: global_id: GlobalId of the IFC entity Returns: Dictionary with all relationships the entity participates in """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # Find entity by GlobalId entity = file.by_guid(global_id) if not entity: return {"error": f"No entity found with GlobalId: {global_id}"} # Basic entity info entity_info = { "id": entity.GlobalId, "type": entity.is_a(), "name": entity.Name if hasattr(entity, "Name") else None, "relationships": { "contains": [], "contained_in": [], "connects": [], "connected_by": [], "defines": [], "defined_by": [] } } # Check if entity contains other elements if hasattr(entity, "IsDecomposedBy"): for rel in entity.IsDecomposedBy: for obj in rel.RelatedObjects: entity_info["relationships"]["contains"].append({ "id": obj.GlobalId, "type": obj.is_a(), "name": obj.Name if hasattr(obj, "Name") else None }) # Check if entity is contained in other elements if hasattr(entity, "Decomposes"): for rel in entity.Decomposes: rel_obj = rel.RelatingObject entity_info["relationships"]["contained_in"].append({ "id": rel_obj.GlobalId, "type": rel_obj.is_a(), "name": rel_obj.Name if hasattr(rel_obj, "Name") else None }) # For physical connections (depends on entity type) if hasattr(entity, "ConnectedTo"): for rel in entity.ConnectedTo: for obj in rel.RelatedElement: entity_info["relationships"]["connects"].append({ "id": obj.GlobalId, "type": obj.is_a(), "name": obj.Name if hasattr(obj, "Name") else None, "connection_type": rel.ConnectionType if hasattr(rel, "ConnectionType") else None }) if hasattr(entity, "ConnectedFrom"): for rel in entity.ConnectedFrom: obj = rel.RelatingElement entity_info["relationships"]["connected_by"].append({ "id": obj.GlobalId, "type": obj.is_a(), "name": obj.Name if hasattr(obj, "Name") else None, "connection_type": rel.ConnectionType if hasattr(rel, "ConnectionType") else None }) return entity_info except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def export_ifc_data(entity_type=None, level_name=None, output_format="csv"): """Export IFC data to a structured file""" try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} data_list = [] # Filter objects based on type if entity_type: objects = file.by_type(entity_type) else: objects = file.by_type("IfcElement") # Create a data dictionary for each object for obj in objects: obj_data = {} # Get level/storey information container_level = None try: containing_structure = ifcopenshell.util.element.get_container(obj) if containing_structure and containing_structure.is_a("IfcBuildingStorey"): container_level = containing_structure.Name except Exception as e: pass # Skip if we're filtering by level and this doesn't match if level_name and container_level != level_name: continue # Basic information obj_data['ExpressId'] = obj.id() obj_data['GlobalId'] = obj.GlobalId if hasattr(obj, "GlobalId") else None obj_data['IfcClass'] = obj.is_a() obj_data['Name'] = obj.Name if hasattr(obj, "Name") else None obj_data['Description'] = obj.Description if hasattr(obj, "Description") else None obj_data['LevelName'] = container_level # Get predefined type if available try: obj_data['PredefinedType'] = ifcopenshell.util.element.get_predefined_type(obj) except: obj_data['PredefinedType'] = None # Get type information try: type_obj = ifcopenshell.util.element.get_type(obj) obj_data['TypeName'] = type_obj.Name if type_obj and hasattr(type_obj, "Name") else None obj_data['TypeClass'] = type_obj.is_a() if type_obj else None except: obj_data['TypeName'] = None obj_data['TypeClass'] = None # Get property sets (simplify structure for export) try: property_sets = ifcopenshell.util.element.get_psets(obj) # Flatten property sets for better export compatibility for pset_name, pset_data in property_sets.items(): for prop_name, prop_value in pset_data.items(): obj_data[f"{pset_name}.{prop_name}"] = prop_value except Exception as e: pass data_list.append(obj_data) if not data_list: return "No data found matching the specified criteria" # Determine output directory - try multiple options to ensure it works in various environments output_dirs = [ "C:\\Users\\Public\\Documents" if os.name == "nt" else None, # Public Documents "/usr/share" if os.name != "nt" else None, # Unix share directory "/tmp", # Unix temp directory "C:\\Temp" if os.name == "nt" else None, # Windows temp directory ] output_dir = None for dir_path in output_dirs: if dir_path and os.path.exists(dir_path) and os.access(dir_path, os.W_OK): output_dir = dir_path break if not output_dir: return {"error": "Could not find a writable directory for output"} # Create filename based on filters filters = [] if entity_type: filters.append(entity_type) if level_name: filters.append(level_name) filter_str = "_".join(filters) if filters else "all" timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"ifc_export_{filter_str}_{timestamp}.{output_format}" filepath = os.path.join(output_dir, filename) # Export based on format if output_format == "json": with open(filepath, 'w') as f: json.dump(data_list, f, indent=2) elif output_format == "csv": import pandas as pd df = pd.DataFrame(data_list) df.to_csv(filepath, index=False) # Summary info for the response entity_count = len(data_list) entity_types = set(item['IfcClass'] for item in data_list) levels = set(item['LevelName'] for item in data_list if item['LevelName']) return { "success": True, "message": f"Data exported successfully to {filepath}", "filepath": filepath, "format": output_format, "summary": { "entity_count": entity_count, "entity_types": list(entity_types), "levels": list(levels) } } except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def place_ifc_object(type_name, location, rotation=None): """ Place an IFC object at specified location with optional rotation Args: type_name: Name of the IFC element type location: [x, y, z] list or tuple for position rotation: Value in degrees for rotation around Z axis (optional) Returns: Dictionary with information about the created object """ try: import ifcopenshell from bonsai.bim.ifc import IfcStore import math # Convert location to tuple if it's not already if isinstance(location, list): location = tuple(location) def find_type_by_name(name): file = IfcStore.get_file() for element in file.by_type("IfcElementType"): if element.Name == name: return element.id() return None # Find the type ID type_id = find_type_by_name(type_name) if not type_id: return {"error": f"Type '{type_name}' not found. Please check if this type exists in the model."} # Store original context original_context = bpy.context.copy() # Ensure we're in 3D View context override = bpy.context.copy() for area in bpy.context.screen.areas: if area.type == 'VIEW_3D': override["area"] = area override["region"] = area.regions[-1] break # Set cursor location bpy.context.scene.cursor.location = location # Get properties to set up parameters props = bpy.context.scene.BIMModelProperties # Store original rl_mode and set to CURSOR to use cursor's Z position original_rl_mode = props.rl_mode props.rl_mode = 'CURSOR' # Create the object using the override context with bpy.context.temp_override(**override): bpy.ops.bim.add_occurrence(relating_type_id=type_id) # Get the newly created object obj = bpy.context.active_object if not obj: props.rl_mode = original_rl_mode return {"error": "Failed to create object"} # Force the Z position explicitly obj.location.z = location[2] # Apply rotation if provided if rotation is not None: # Convert degrees to radians for Blender's rotation_euler full_rotation = (0, 0, math.radians(float(rotation))) obj.rotation_euler = full_rotation # Sync the changes back to IFC # Use the appropriate method depending on what's available if hasattr(bpy.ops.bim, "update_representation"): bpy.ops.bim.update_representation(obj=obj.name) # Restore original rl_mode props.rl_mode = original_rl_mode # Get the IFC entity for the new object entity_id = obj.BIMObjectProperties.ifc_definition_id if entity_id: file = IfcStore.get_file() entity = file.by_id(entity_id) global_id = entity.GlobalId if hasattr(entity, "GlobalId") else None else: global_id = None # Return information about the created object return { "success": True, "blender_name": obj.name, "global_id": global_id, "location": list(obj.location), "rotation": list(obj.rotation_euler), "type_name": type_name } except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} ### Ability to see @staticmethod def get_current_view(): """Capture and return the current viewport as an image""" try: # Find a 3D View for area in bpy.context.screen.areas: if area.type == 'VIEW_3D': break else: return {"error": "No 3D View available"} # Create temporary file to save the viewport screenshot temp_file = tempfile.NamedTemporaryFile(suffix='.png', delete=False) temp_path = temp_file.name temp_file.close() # Find appropriate region for region in area.regions: if region.type == 'WINDOW': break else: return {"error": "No appropriate region found in 3D View"} # Use temp_override instead of the old override dictionary with bpy.context.temp_override(area=area, region=region): # Save screenshot bpy.ops.screen.screenshot(filepath=temp_path) # Read the image data and encode as base64 with open(temp_path, 'rb') as f: image_data = f.read() # Clean up os.unlink(temp_path) # Return base64 encoded image return { "width": area.width, "height": area.height, "format": "png", "data": base64.b64encode(image_data).decode('utf-8') } except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def get_ifc_quantities(entity_type=None, selected_only=False): """ Calculate and get quantities (m2, m3, etc.) for IFC elements. Parameters: entity_type: Type of IFC entity to get quantities for (e.g., "IfcWall", "IfcSlab") selected_only: If True, only get quantities for selected objects Returns: Dictionary with quantities for the specified elements """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # Check if BaseQuantities already exist to avoid re-calculating quantities_exist = False sample_elements = file.by_type("IfcElement")[:10] if file.by_type("IfcElement") else [] for elem in sample_elements: psets = ifcopenshell.util.element.get_psets(elem) if any(qset in psets for qset in ["BaseQuantities", "Qto_WallBaseQuantities", "Qto_SlabBaseQuantities", "Qto_BeamBaseQuantities"]): quantities_exist = True break # Only calculate quantities if they don't exist yet if not quantities_exist: try: bpy.ops.bim.perform_quantity_take_off() except Exception as e: return {"error": f"Failed to calculate quantities: {str(e)}"} elements_data = [] # If we're only looking at selected objects if selected_only: selected_result = BlenderMCPServer.get_selected_ifc_entities() # Check for errors if "error" in selected_result: return selected_result # If no objects are selected, return early if selected_result["selected_count"] == 0: return selected_result # Process each selected entity for entity_info in selected_result["selected_entities"]: # Find entity by GlobalId entity = file.by_guid(entity_info["id"]) if not entity: continue # Filter by type if specified if entity_type and entity.is_a() != entity_type: continue # Extract quantities element_data = extract_quantities(entity, entity_info["blender_name"]) if element_data: elements_data.append(element_data) else: # Get entities based on type or default to common element types if entity_type: entities = file.by_type(entity_type) else: # Get common element types that have quantities entity_types = ["IfcWall", "IfcSlab", "IfcBeam", "IfcColumn", "IfcDoor", "IfcWindow"] entities = [] for etype in entity_types: entities.extend(file.by_type(etype)) # Process each entity for entity in entities: element_data = extract_quantities(entity) if element_data: elements_data.append(element_data) # Summary statistics summary = { "total_elements": len(elements_data), "element_types": {} } # Group by element type for summary for element in elements_data: etype = element["type"] if etype not in summary["element_types"]: summary["element_types"][etype] = {"count": 0, "total_area": 0, "total_volume": 0} summary["element_types"][etype]["count"] += 1 if element["quantities"].get("area"): summary["element_types"][etype]["total_area"] += element["quantities"]["area"] if element["quantities"].get("volume"): summary["element_types"][etype]["total_volume"] += element["quantities"]["volume"] return { "success": True, "elements": elements_data, "summary": summary } except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def export_drawing_png(view_type="top", height_offset=0.5, resolution_x=1920, resolution_y=1080, storey_name=None, output_path=None): """ Export drawings as PNG images with custom resolution. Creates 2D and 3D views of IFC building, particularly useful for architectural drawings. Args: view_type: "top" for plan view, "front", "right", "left" for elevations, "isometric" for 3D view height_offset: Height in meters above storey level for camera position resolution_x: Horizontal resolution in pixels resolution_y: Vertical resolution in pixels storey_name: Specific storey name to render (None for all/ground floor) output_path: File path to save PNG (None for temp file) Returns: Dict with base64 encoded image data and metadata """ try: import tempfile import os # Validate parameters if resolution_x > 4096 or resolution_y > 4096: return {"error": "Resolution too high. Maximum: 4096x4096"} if resolution_x < 100 or resolution_y < 100: return {"error": "Resolution too low. Minimum: 100x100"} # Check if IFC file is loaded file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # Store original render settings scene = bpy.context.scene original_engine = scene.render.engine original_res_x = scene.render.resolution_x original_res_y = scene.render.resolution_y original_filepath = scene.render.filepath # Set up render settings for drawing scene.render.engine = 'BLENDER_WORKBENCH' # Fast, good for architectural drawings scene.render.resolution_x = resolution_x scene.render.resolution_y = resolution_y scene.render.resolution_percentage = 100 # Store original camera if exists original_camera = bpy.context.scene.camera # Create temporary camera for orthographic rendering bpy.ops.object.camera_add() camera = bpy.context.object camera.name = "TempDrawingCamera" bpy.context.scene.camera = camera # Set camera to orthographic camera.data.type = 'ORTHO' camera.data.ortho_scale = 50 # Adjust based on building size # Position camera based on view type and storey if view_type == "top": # Find building bounds to position camera appropriately all_objects = [obj for obj in bpy.context.scene.objects if obj.type == 'MESH' and obj.visible_get()] if all_objects: # Calculate bounding box of all visible objects min_x = min_y = min_z = float('inf') max_x = max_y = max_z = float('-inf') for obj in all_objects: bbox = [obj.matrix_world @ mathutils.Vector(corner) for corner in obj.bound_box] for corner in bbox: min_x = min(min_x, corner.x) max_x = max(max_x, corner.x) min_y = min(min_y, corner.y) max_y = max(max_y, corner.y) min_z = min(min_z, corner.z) max_z = max(max_z, corner.z) # Position camera above the building center_x = (min_x + max_x) / 2 center_y = (min_y + max_y) / 2 # For plan view, position camera above camera_height = max_z + height_offset camera.location = (center_x, center_y, camera_height) camera.rotation_euler = (0, 0, 0) # Look down # Adjust orthographic scale based on building size building_width = max(max_x - min_x, max_y - min_y) * 1.2 # Add 20% margin camera.data.ortho_scale = building_width else: # Default position if no objects found camera.location = (0, 0, 10) camera.rotation_euler = (0, 0, 0) elif view_type in ["front", "right", "left"]: # For elevations, position camera accordingly # This is a simplified implementation - could be enhanced all_objects = [obj for obj in bpy.context.scene.objects if obj.type == 'MESH' and obj.visible_get()] if all_objects: # Calculate bounds min_x = min_y = min_z = float('inf') max_x = max_y = max_z = float('-inf') for obj in all_objects: bbox = [obj.matrix_world @ mathutils.Vector(corner) for corner in obj.bound_box] for corner in bbox: min_x = min(min_x, corner.x) max_x = max(max_x, corner.x) min_y = min(min_y, corner.y) max_y = max(max_y, corner.y) min_z = min(min_z, corner.z) max_z = max(max_z, corner.z) center_x = (min_x + max_x) / 2 center_y = (min_y + max_y) / 2 center_z = (min_z + max_z) / 2 building_depth = max(max_x - min_x, max_y - min_y) * 2 if view_type == "front": camera.location = (center_x, center_y - building_depth, center_z) camera.rotation_euler = (1.5708, 0, 0) # 90 degrees X rotation elif view_type == "right": camera.location = (center_x + building_depth, center_y, center_z) camera.rotation_euler = (1.5708, 0, 1.5708) # Look from right elif view_type == "left": camera.location = (center_x - building_depth, center_y, center_z) camera.rotation_euler = (1.5708, 0, -1.5708) # Look from left # Adjust scale for elevations building_height = max_z - min_z building_width = max(max_x - min_x, max_y - min_y) camera.data.ortho_scale = max(building_height, building_width) * 1.2 elif view_type == "isometric": # For isometric view, use perspective camera positioned diagonally camera.data.type = 'PERSP' camera.data.lens = 35 # 35mm lens for nice perspective all_objects = [obj for obj in bpy.context.scene.objects if obj.type == 'MESH' and obj.visible_get()] if all_objects: # Calculate bounds min_x = min_y = min_z = float('inf') max_x = max_y = max_z = float('-inf') for obj in all_objects: bbox = [obj.matrix_world @ mathutils.Vector(corner) for corner in obj.bound_box] for corner in bbox: min_x = min(min_x, corner.x) max_x = max(max_x, corner.x) min_y = min(min_y, corner.y) max_y = max(max_y, corner.y) min_z = min(min_z, corner.z) max_z = max(max_z, corner.z) center_x = (min_x + max_x) / 2 center_y = (min_y + max_y) / 2 center_z = (min_z + max_z) / 2 # Calculate distance to frame the building nicely building_size = max(max_x - min_x, max_y - min_y, max_z - min_z) distance = building_size * 1.2 # Distance multiplier for good framing # Position camera for isometric view (45° angles) # Classic isometric position: up and back, looking down at 30° import math angle_rad = math.radians(45) camera_x = center_x + distance * math.cos(angle_rad) camera_y = center_y - distance * math.sin(angle_rad) camera_z = center_z + distance * 0.3 # Lower elevation for better facade view camera.location = (camera_x, camera_y, camera_z) # Point camera at building center direction = mathutils.Vector((center_x - camera_x, center_y - camera_y, center_z - camera_z)) camera.rotation_euler = direction.to_track_quat('-Z', 'Y').to_euler() else: # Default isometric position camera.location = (15, -15, 10) camera.rotation_euler = (1.1, 0, 0.785) # ~63°, 0°, ~45° # Set up output file path if output_path: render_path = output_path else: temp_dir = tempfile.gettempdir() render_path = os.path.join(temp_dir, f"drawing_{view_type}_{int(time.time())}.png") scene.render.filepath = render_path scene.render.image_settings.file_format = 'PNG' # Render the image bpy.ops.render.render(write_still=True) # Read the rendered image and encode as base64 if os.path.exists(render_path): with open(render_path, 'rb') as f: image_data = f.read() # Clean up temporary file if we created it if not output_path: os.remove(render_path) # Restore original settings scene.render.engine = original_engine scene.render.resolution_x = original_res_x scene.render.resolution_y = original_res_y scene.render.filepath = original_filepath bpy.context.scene.camera = original_camera # Delete temporary camera bpy.data.objects.remove(camera, do_unlink=True) # Return base64 encoded image import base64 return { "success": True, "data": base64.b64encode(image_data).decode('utf-8'), "format": "png", "resolution": f"{resolution_x}x{resolution_y}", "view_type": view_type, "output_path": render_path if output_path else None } else: return {"error": "Failed to create render file"} except Exception as e: # Restore settings on error try: scene = bpy.context.scene scene.render.engine = original_engine scene.render.resolution_x = original_res_x scene.render.resolution_y = original_res_y scene.render.filepath = original_filepath bpy.context.scene.camera = original_camera # Clean up camera if it exists if 'camera' in locals() and camera: bpy.data.objects.remove(camera, do_unlink=True) except: pass import traceback return {"error": f"Error creating drawing: {str(e)}", "traceback": traceback.format_exc()} @staticmethod def get_ifc_georeferencing_info(include_contexts: bool = False): """ Retrieves georeferencing information from the currently opened IFC file (CRS, MapConversion, WCS, TrueNorth, IfcSite). Args: include_contexts (bool): If True, adds the breakdown of RepresentationContexts and operations Returns: dict: Structure with: { "georeferenced": bool, "crs": { "name": str|None, "geodetic_datum": str|None, "vertical_datum": str|None, "map_unit": str|None }, "map_conversion": { "eastings": float|None, "northings": float|None, "orthogonal_height": float|None, "scale": float|None, "x_axis_abscissa": float|None, "x_axis_ordinate": float|None }, "world_coordinate_system": {"origin": [x,y,z]|None}, "true_north": {"direction_ratios": [x,y]|None}, "site": { "local_placement_origin": [x,y,z]|None, "ref_latitude": [deg,min,sec,millionth]|None, "ref_longitude": [deg,min,sec,millionth]|None, "ref_elevation": float|None }, "contexts": [...], # only if include_contexts=True "warnings": [...] } """ try: file = IfcStore.get_file() debug = {"entered": True, "has_ifc": file is not None, "projects": 0, "sites": 0, "contexts": 0} if file is None: return {"error": "No IFC file is currently loaded", "debug": debug} warnings = [] result = { "georeferenced": False, "crs": { "name": None, "geodetic_datum": None, "vertical_datum": None, "map_unit": None }, "map_conversion": { "eastings": None, "northings": None, "orthogonal_height": None, "scale": None, "x_axis_abscissa": None, "x_axis_ordinate": None }, "world_coordinate_system": {"origin": None}, "true_north": {"direction_ratios": None}, "site": { "local_placement_origin": None, "ref_latitude": None, "ref_longitude": None, "ref_elevation": None }, "contexts": [], "warnings": warnings, "debug":debug, } # --- IfcProject & RepresentationContexts --- projects = file.by_type("IfcProject") debug["projects"] = len(projects) if projects: project = projects[0] contexts = getattr(project, "RepresentationContexts", None) or [] debug["contexts"] = len(contexts) for ctx in contexts: ctx_entry = { "context_identifier": getattr(ctx, "ContextIdentifier", None), "context_type": getattr(ctx, "ContextType", None), "world_origin": None, "true_north": None, "has_coordinate_operation": [] } # WorldCoordinateSystem → Local origin try: wcs = getattr(ctx, "WorldCoordinateSystem", None) if wcs and getattr(wcs, "Location", None): loc = wcs.Location if getattr(loc, "Coordinates", None): coords = list(loc.Coordinates) result["world_coordinate_system"]["origin"] = coords ctx_entry["world_origin"] = coords except Exception as e: warnings.append(f"WorldCoordinateSystem read error: {str(e)}") # TrueNorth try: if hasattr(ctx, "TrueNorth") and ctx.TrueNorth: tn = ctx.TrueNorth ratios = list(getattr(tn, "DirectionRatios", []) or []) result["true_north"]["direction_ratios"] = ratios ctx_entry["true_north"] = ratios except Exception as e: warnings.append(f"TrueNorth read error: {str(e)}") # HasCoordinateOperation → IfcMapConversion / TargetCRS try: if hasattr(ctx, "HasCoordinateOperation") and ctx.HasCoordinateOperation: for op in ctx.HasCoordinateOperation: op_entry = {"type": op.is_a(), "target_crs": None, "map_conversion": None} # TargetCRS crs = getattr(op, "TargetCRS", None) if crs: result["crs"]["name"] = getattr(crs, "Name", None) result["crs"]["geodetic_datum"] = getattr(crs, "GeodeticDatum", None) result["crs"]["vertical_datum"] = getattr(crs, "VerticalDatum", None) try: map_unit = getattr(crs, "MapUnit", None) result["crs"]["map_unit"] = map_unit.Name if map_unit else None except Exception: result["crs"]["map_unit"] = None op_entry["target_crs"] = { "name": result["crs"]["name"], "geodetic_datum": result["crs"]["geodetic_datum"], "vertical_datum": result["crs"]["vertical_datum"], "map_unit": result["crs"]["map_unit"] } # IfcMapConversion if op.is_a("IfcMapConversion"): mc = { "eastings": getattr(op, "Eastings", None), "northings": getattr(op, "Northings", None), "orthogonal_height": getattr(op, "OrthogonalHeight", None), "scale": getattr(op, "Scale", None), "x_axis_abscissa": getattr(op, "XAxisAbscissa", None), "x_axis_ordinate": getattr(op, "XAxisOrdinate", None) } result["map_conversion"].update(mc) op_entry["map_conversion"] = mc ctx_entry["has_coordinate_operation"].append(op_entry) except Exception as e: warnings.append(f"HasCoordinateOperation read error: {str(e)}") if include_contexts: result["contexts"].append(ctx_entry) else: warnings.append("IfcProject entity was not found.") # --- IfcSite (lat/long/alt local origin of placement) --- try: sites = file.by_type("IfcSite") debug["sites"] = len(sites) if sites: site = sites[0] # LocalPlacement try: if getattr(site, "ObjectPlacement", None): placement = site.ObjectPlacement axisPlacement = getattr(placement, "RelativePlacement", None) if axisPlacement and getattr(axisPlacement, "Location", None): loc = axisPlacement.Location if getattr(loc, "Coordinates", None): result["site"]["local_placement_origin"] = list(loc.Coordinates) except Exception as e: warnings.append(f"IfcSite.ObjectPlacement read error: {str(e)}") # Lat/Long/Alt try: lat = getattr(site, "RefLatitude", None) lon = getattr(site, "RefLongitude", None) ele = getattr(site, "RefElevation", None) result["site"]["ref_latitude"] = list(lat) if lat else None result["site"]["ref_longitude"] = list(lon) if lon else None result["site"]["ref_elevation"] = ele except Exception as e: warnings.append(f"IfcSite (lat/long/elev) read error: {str(e)}") else: warnings.append("IfcSite was not found.") except Exception as e: warnings.append(f"Error while querying IfcSite: {str(e)}") # --- Heuristic to determine georeferencing --- geo_flags = [ any(result["crs"].values()), any(v is not None for v in result["map_conversion"].values()) ] result["georeferenced"] = all(geo_flags) return result except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def georeference_ifc_model( crs_mode: str, epsg: int = None, crs_name: str = None, geodetic_datum: str = None, map_projection: str = None, map_zone: str = None, eastings: float = None, northings: float = None, orthogonal_height: float = 0.0, scale: float = 1.0, x_axis_abscissa: float = None, x_axis_ordinate: float = None, true_north_azimuth_deg: float = None, context_filter: str = "Model", context_index: int = None, site_ref_latitude: list = None, # IFC format [deg, min, sec, millionth] site_ref_longitude: list = None, # IFC format [deg, min, sec, millionth] site_ref_elevation: float = None, site_ref_latitude_dd: float = None, # Decimal degrees (optional) site_ref_longitude_dd: float = None, # Decimal degrees (optional) overwrite: bool = False, dry_run: bool = False, write_path: str = None, ): """ Usage: Creates/updates IfcProjectedCRS + IfcMapConversion in the opened IFC. Optionally updates IfcSite.RefLatitude/RefLongitude/RefElevation. If `pyproj` is available, it can convert Lat/Long (degrees) ⇄ E/N (meters) according to the given EPSG. Requirements: CRS declaration is ALWAYS required: - crs_mode="epsg" + epsg=XXXX OR - crs_mode="custom" + (crs_name, geodetic_datum, map_projection [, map_zone]) Minimum MapConversion information: - eastings + northings (if missing but lat/long + EPSG + pyproj are available, they are computed) """ import math from bonsai.bim.ifc import IfcStore file = IfcStore.get_file() if file is None: return {"success": False, "error": "No IFC file is currently loaded"} warnings = [] actions = {"created_crs": False, "created_map_conversion": False, "updated_map_conversion": False, "updated_site": False, "overwrote": False, "wrote_file": False} debug = {} # ---------- helpers ---------- def dd_to_ifc_dms(dd: float): """Converts decimal degrees to [deg, min, sec, millionth] (sign carried by degrees).""" if dd is None: return None sign = -1 if dd < 0 else 1 v = abs(dd) deg = int(v) rem = (v - deg) * 60 minutes = int(rem) sec_float = (rem - minutes) * 60 seconds = int(sec_float) millionth = int(round((sec_float - seconds) * 1_000_000)) # Normalizes rounding (e.g. 59.999999 → 60) if millionth == 1_000_000: seconds += 1 millionth = 0 if seconds == 60: minutes += 1 seconds = 0 if minutes == 60: deg += 1 minutes = 0 return [sign * deg, minutes, seconds, millionth] def select_context(): ctxs = file.by_type("IfcGeometricRepresentationContext") or [] if not ctxs: return None, "No IfcGeometricRepresentationContext found" if context_index is not None and 0 <= context_index < len(ctxs): return ctxs[context_index], None # By filter (default "Model", case-insensitive) if context_filter: for c in ctxs: if (getattr(c, "ContextType", None) or "").lower() == context_filter.lower(): return c, None # Fallback to the first one return ctxs[0], None # ---------- 1) CRS Validation ---------- if crs_mode not in ("epsg", "custom"): return {"success": False, "error": "crs_mode must be 'epsg' or 'custom'"} if crs_mode == "epsg": if not epsg: return {"success": False, "error": "epsg code required when crs_mode='epsg'"} crs_name_final = f"EPSG:{epsg}" geodetic_datum = geodetic_datum or "WGS84" map_projection = map_projection or "TransverseMercator" # usual UTM # map_zone is optional else: # custom missing = [k for k in ("crs_name", "geodetic_datum", "map_projection") if locals().get(k) in (None, "")] if missing: return {"success": False, "error": f"Missing fields for custom CRS: {', '.join(missing)}"} crs_name_final = crs_name # ---------- 2) Complete E/N from Lat/Long (if missing and pyproj is available) ---------- proj_used = None try: if (eastings is None or northings is None) and (site_ref_latitude_dd is not None and site_ref_longitude_dd is not None) and crs_mode == "epsg": try: from pyproj import Transformer # Assume lat/long in WGS84; if the EPSG is not WGS84-derived, pyproj handles the conversion transformer = Transformer.from_crs("EPSG:4326", f"EPSG:{epsg}", always_xy=True) e, n = transformer.transform(site_ref_longitude_dd, site_ref_latitude_dd) eastings = e if eastings is None else eastings northings = n if northings is None else northings proj_used = f"EPSG:4326->EPSG:{epsg}" except Exception as _e: warnings.append(f"Could not convert Lat/Long to E/N: {_e}. Provide eastings/northings manually.") except Exception as _e: warnings.append(f"pyproj not available to compute E/N: {_e}. Provide eastings/northings manually.") # ---------- E/N Validation ---------- if eastings is None or northings is None: return {"success": False, "error": "eastings and northings are required (or provide lat/long + EPSG with pyproj installed)"} # ---------- 3) Select context ---------- context, ctx_err = select_context() if not context: return {"success": False, "error": ctx_err or "No context found"} # ---------- 4) Detect existing ones and handle overwrite ---------- # Inverse: context.HasCoordinateOperation is already handled by ifcopenshell as an attribute existing_ops = list(getattr(context, "HasCoordinateOperation", []) or []) existing_map = None existing_crs = None for op in existing_ops: if op.is_a("IfcMapConversion"): existing_map = op existing_crs = getattr(op, "TargetCRS", None) break if existing_map and not overwrite: return { "success": True, "georeferenced": True, "message": "MapConversion already exists. Use overwrite=True to replace it.", "context_used": {"identifier": getattr(context, "ContextIdentifier", None), "type": getattr(context, "ContextType", None)}, "map_conversion": { "eastings": getattr(existing_map, "Eastings", None), "northings": getattr(existing_map, "Northings", None), "orthogonal_height": getattr(existing_map, "OrthogonalHeight", None), "scale": getattr(existing_map, "Scale", None), "x_axis_abscissa": getattr(existing_map, "XAxisAbscissa", None), "x_axis_ordinate": getattr(existing_map, "XAxisOrdinate", None), }, "crs": { "name": getattr(existing_crs, "Name", None) if existing_crs else None, "geodetic_datum": getattr(existing_crs, "GeodeticDatum", None) if existing_crs else None, "map_projection": getattr(existing_crs, "MapProjection", None) if existing_crs else None, "map_zone": getattr(existing_crs, "MapZone", None) if existing_crs else None, }, "warnings": warnings, "actions": actions, } # ---------- 5) Build/Update CRS ---------- if existing_crs and overwrite: actions["overwrote"] = True try: file.remove(existing_crs) except Exception: warnings.append("Could not remove the existing CRS; a new one will be created anyway.") # If custom, use the provided values; if EPSG, build the name and defaults crs_kwargs = { "Name": crs_name_final, "GeodeticDatum": geodetic_datum, "MapProjection": map_projection, } if map_zone: crs_kwargs["MapZone"] = map_zone crs_entity = file.create_entity("IfcProjectedCRS", **crs_kwargs) actions["created_crs"] = True # ---------- 6) Calculate orientation (optional) ---------- # If true_north_azimuth_deg is given as the azimuth from North (model +Y axis) towards East (clockwise), # We can derive an approximate X vector: X = (cos(az+90°), sin(az+90°)). if (x_axis_abscissa is None or x_axis_ordinate is None) and (true_north_azimuth_deg is not None): az = math.radians(true_north_azimuth_deg) # Estimated X vector rotated 90° from North: x_axis_abscissa = math.cos(az + math.pi / 2.0) x_axis_ordinate = math.sin(az + math.pi / 2.0) # Defaults if still missing x_axis_abscissa = 1.0 if x_axis_abscissa is None else float(x_axis_abscissa) x_axis_ordinate = 0.0 if x_axis_ordinate is None else float(x_axis_ordinate) scale = 1.0 if scale is None else float(scale) orthogonal_height = 0.0 if orthogonal_height is None else float(orthogonal_height) # ---------- 7) Build/Update IfcMapConversion ---------- if existing_map and overwrite: try: file.remove(existing_map) except Exception: warnings.append("Could not remove the existing MapConversion; another one will be created anyway.") map_kwargs = { "SourceCRS": context, "TargetCRS": crs_entity, "Eastings": float(eastings), "Northings": float(northings), "OrthogonalHeight": float(orthogonal_height), "XAxisAbscissa": float(x_axis_abscissa), "XAxisOrdinate": float(x_axis_ordinate), "Scale": float(scale), } map_entity = file.create_entity("IfcMapConversion", **map_kwargs) actions["created_map_conversion"] = True # ---------- 8) (Optional) Update IfcSite ---------- try: sites = file.by_type("IfcSite") or [] if sites: site = sites[0] # If no IFC lists are provided but decimal degrees are, convert them if site_ref_latitude is None and site_ref_latitude_dd is not None: site_ref_latitude = dd_to_ifc_dms(site_ref_latitude_dd) if site_ref_longitude is None and site_ref_longitude_dd is not None: site_ref_longitude = dd_to_ifc_dms(site_ref_longitude_dd) changed = False if site_ref_latitude is not None: site.RefLatitude = site_ref_latitude changed = True if site_ref_longitude is not None: site.RefLongitude = site_ref_longitude changed = True if site_ref_elevation is not None: site.RefElevation = float(site_ref_elevation) changed = True if changed: actions["updated_site"] = True else: warnings.append("No IfcSite found; lat/long/elevation were not updated.") except Exception as e: warnings.append(f"Could not update IfcSite: {e}") # ---------- 9) (Optional) Save ---------- if write_path and not dry_run: try: file.write(write_path) actions["wrote_file"] = True except Exception as e: warnings.append(f"Could not write IFC to'{write_path}': {e}") # ---------- 10) Response ---------- return { "success": True, "georeferenced": True, "crs": { "name": getattr(crs_entity, "Name", None), "geodetic_datum": getattr(crs_entity, "GeodeticDatum", None), "map_projection": getattr(crs_entity, "MapProjection", None), "map_zone": getattr(crs_entity, "MapZone", None), }, "map_conversion": { "eastings": float(eastings), "northings": float(northings), "orthogonal_height": float(orthogonal_height), "scale": float(scale), "x_axis_abscissa": float(x_axis_abscissa), "x_axis_ordinate": float(x_axis_ordinate), }, "context_used": { "identifier": getattr(context, "ContextIdentifier", None), "type": getattr(context, "ContextType", None), }, "site": { "ref_latitude": site_ref_latitude, "ref_longitude": site_ref_longitude, "ref_elevation": site_ref_elevation, }, "proj_used": proj_used, "warnings": warnings, "actions": actions, } @staticmethod def generate_ids( title: str, specs: list, description: str = "", author: str = "", ids_version: str = "", purpose: str = "", milestone: str = "", output_path: str = None, date_iso: str = None, ): """ Generates an .ids file with robust handling of: - Synonyms: 'name' → 'baseName', 'minValue/maxValue' + inclusivity, 'minOccurs/maxOccurs' → cardinality. - Operators inside 'value' ("> 30", "≤0.45"), in keys (op/target/threshold/limit), and extracted from 'description' (ONLY within requirements; never in applicability). - Correct restriction mapping: * Numeric → ids.Restriction(base="double" | "integer", options={...}) * Textual (IFCLABEL/TEXT) → ids.Restriction(base="string", options={"pattern": [anchored regexes]}) - Automatic dataType inference with hints (ThermalTransmittance → IFCTHERMALTRANSMITTANCEMEASURE, IsExternal → IFCBOOLEAN, etc.). - PredefinedType remains as an Attribute within APPLICABILITY (NOT absorbed into Entity.predefinedType). """ #Libraries/Dependencies # ----------------------------------------------------------------------------------------------------------- try: from ifctester import ids except Exception as e: return {"ok": False, "error": "Could not import ifctester.ids", "details": str(e)} import os, datetime, re from numbers import Number #Validations # ----------------------------------------------------------------------------------------------------------- if not isinstance(title, str) or not title.strip(): return {"ok": False, "error": "Invalid or empty 'title' parameter."} if not isinstance(specs, list) or len(specs) == 0: return {"ok": False, "error": "You must provide at least one specification in 'specs'."} # Utils # ----------------------------------------------------------------------------------------------------------- def _norm_card(c): """ Usage: Normalizes the given cardinality value, ensuring it matches one of the valid terms. Inputs: c (str | None): Cardinality value to normalize. Can be 'required', 'optional', or 'prohibited'. Output: str | None: Normalized lowercase value if valid, or None if not provided. Exceptions: ValueError: Raised if the input value does not correspond to a valid cardinality. """ if c is None: return None c = str(c).strip().lower() if c in ("required", "optional", "prohibited"): return c raise ValueError("Invalid cardinality: use 'required', 'optional', or 'prohibited'.") def _card_from_occurs(minOccurs, maxOccurs): """ Usage: Derives the cardinality ('required' or 'optional') based on the values of minOccurs and maxOccurs. Inputs: minOccurs (int | str | None): Minimum number of occurrences. If greater than 0, the field is considered 'required'. maxOccurs (int | str | None): Maximum number of occurrences. Not used directly, included for completeness. Output: str | None: Returns 'required' if minOccurs > 0, 'optional' if minOccurs == 0, or None if conversion fails. """ try: if minOccurs is None: return None m = int(minOccurs) return "required" if m > 0 else "optional" except Exception: return None def _is_bool_like(v): """ Usage: Checks whether a given value can be interpreted as a boolean. Inputs: v (any): Value to evaluate. Can be of any type (bool, str, int, etc.). Output: bool: Returns True if the value represents a boolean-like token (e.g., True, False, "yes", "no", "1", "0", "y", "n", "t", "f"), otherwise returns False. """ if isinstance(v, bool): return True if v is None: return False s = str(v).strip().lower() return s in ("true", "false", "1", "0", "yes", "no", "y", "n", "t", "f") def _to_bool_token(v): """ Usage: Converts a boolean-like value into a standardized string token ("TRUE" or "FALSE"). Inputs: v (any): Value to convert. Can be a boolean, string, or numeric value representing truthiness. Output: str | None: Returns "TRUE" or "FALSE" if the value matches a recognized boolean pattern, or None if it cannot be interpreted as boolean. """ if isinstance(v, bool): return "TRUE" if v else "FALSE" s = str(v).strip().lower() if s in ("true", "1", "yes", "y", "t"): return "TRUE" if s in ("false", "0", "no", "n", "f"): return "FALSE" return None # Hints for *MEASURE* types and by property name MEASURE_HINTS = { "THERMALTRANSMITTANCE": "IFCTHERMALTRANSMITTANCEMEASURE", "UVALUE": "IFCTHERMALTRANSMITTANCEMEASURE", "RATIOMEASURE": "IFCRATIOMEASURE", "AREAMEASURE": "IFCAREAMEASURE", "LENGTHMEASURE": "IFCLENGTHMEASURE", "SOUNDPRESSURELEVELMEASURE": "IFCSOUNDPRESSURELEVELMEASURE", } PROPERTY_DATATYPE_HINTS = { "THERMALTRANSMITTANCE": "IFCTHERMALTRANSMITTANCEMEASURE", "ISEXTERNAL": "IFCBOOLEAN", "ACOUSTICRATING": "IFCLABEL", } def _norm_ifc_version(v: str | None) -> str | None: """ Usage: Normalizes the given IFC schema version string to a standardized format. Inputs: v (str | None): Input version value (e.g., "4", "IFC 4", "2x3", "IFC4.3"). Output: str | None: Returns the normalized IFC version (e.g., "IFC4", "IFC2X3", "IFC4X3"), or None if the input is empty or invalid. """ if not v: return None s = str(v).strip().upper() m = {"4": "IFC4", "IFC 4": "IFC4", "2X3": "IFC2X3", "IFC 2X3": "IFC2X3", "IFC4.3": "IFC4X3"} return m.get(s, s) def _strip_ifc_prefix(dt: str | None) -> str | None: """ Usage: Removes leading and trailing spaces from the given string and converts it to uppercase. Typically used to normalize IFC data type names. Inputs: dt (str | None): Data type string to normalize (e.g., " ifcreal "). Output: str | None: Uppercase, trimmed string (e.g., "IFCREAL"), or None if the input is empty or None. """ return dt.strip().upper() if dt else None def _is_number_like(v) -> bool: """ Usage: Checks whether the given value can be interpreted as a numeric value. Inputs: v (any): Value to evaluate. Can be of any type (int, float, str, etc.). Output: bool: Returns True if the value represents a number (including numeric strings like "3.5" or "2,7"), otherwise returns False. """ if isinstance(v, Number): return True if v is None: return False try: float(str(v).strip().replace(",", ".")) return True except Exception: return False def _guess_numeric_base_from_ifc(dt_upper: str | None) -> str: """ Usage: Determines the numeric base type ('integer' or 'double') from an IFC data type string. Inputs: dt_upper (str | None): Uppercase IFC data type name (e.g., "IFCINTEGER", "IFCREAL"). Output: str: Returns "integer" if the type contains "INTEGER"; otherwise returns "double". Defaults to "double" when no input is provided. """ if not dt_upper: return "double" if "INTEGER" in dt_upper: return "integer" return "double" # comparators in string ("> 30", "<=0.45", "≥3", "≤ 3") _cmp_regex = re.compile(r"^\s*(>=|=>|≤|<=|≥|>|<)\s*([0-9]+(?:[.,][0-9]+)?)\s*$") _normalize_op = {">=":">=", "=>":">=", "≥":">=", "<=":"<=", "≤":"<="} def _extract_op_target_from_string(s: str): """ Usage: Extracts a comparison operator and its numeric target value from a string expression. Inputs: s (str): String containing a comparison, e.g., "> 30", "<=0.45", "≥3", or "≤ 3". Output: tuple(str | None, float | None): Returns a tuple (operator, target_value), where operator is one of ">", ">=", "<", or "<=". Returns (None, None) if the string does not match a valid pattern. """ m = _cmp_regex.match(s) if not m: return None, None op, num = m.group(1), m.group(2) op = _normalize_op.get(op, op) try: tgt = float(num.replace(",", ".")) except Exception: return None, None return op, tgt # English descriptions (>= before >) _desc_ops = [ (r"(greater\s+than\s+or\s+equal\s+to|greater\s+or\s+equal\s+to|equal\s+or\s+greater\s+than|≥)", ">="), (r"(less\s+than\s+or\s+equal\s+to|not\s+greater\s+than|≤|at\s+most|maximum)", "<="), (r"(greater\s+than|more\s+than|>)", ">"), (r"(less\s+than|fewer\s+than|<)", "<"), ] _num_regex = re.compile(r"([0-9]+(?:[.,][0-9]+)?)") def _extract_from_description(desc: str): """ Usage: Extracts a comparison operator and numeric target value from a descriptive text. Designed to interpret expressions such as "greater than 30" or "less than or equal to 0.45". Inputs: desc (str): Description text potentially containing a numeric comparison. Output: tuple(str | None, float | None): Returns a tuple (operator, target_value), where operator is one of ">", ">=", "<", or "<=", and target_value is the numeric value extracted. Returns (None, None) if no valid pattern is found. """ if not desc: return None, None text = desc.strip().lower() for pat, op in _desc_ops: if re.search(pat, text): m = _num_regex.search(text) if m: try: tgt = float(m.group(1).replace(",", ".")) return op, tgt except Exception: pass return None, None # anchored regexes for integers (numeric fallback for decimals) def _regex_for_threshold(threshold: float, op: str) -> list[str]: """ Usage: Builds one or more anchored regular expressions to validate integer values against a numeric threshold and comparison operator. For non-integer thresholds, returns a generic numeric pattern as fallback. Inputs: threshold (float): Numeric limit used for the comparison (e.g., 30, 10.5). op (str): Comparison operator, one of ">", ">=", "<", or "<=". Output: list[str]: A list containing one or more anchored regex patterns that match integer strings satisfying the given condition. Returns a generic numeric regex pattern as fallback for decimals. """ if abs(threshold - round(threshold)) < 1e-9: t = int(round(threshold)) def gt_int(n): if n <= 8: return rf"^([{n+1}-9]|[1-9]\d|[1-9]\d{{2,}})$" if n <= 98: tens, units = divmod(n + 1, 10) p1 = rf"{tens}[{units}-9]" if units > 0 else rf"{tens}\d" p2 = rf"[{tens+1}-9]\d" if tens < 9 else "" parts = [p1, p2, r"[1-9]\d{2,}"] return "^(" + "|".join([p for p in parts if p]) + ")$" return r"^[1-9]\d{2,}$" def ge_int(n): if n <= 9: return rf"^([{n}-9]|[1-9]\d|[1-9]\d{{2,}})$" if n <= 99: tens, units = divmod(n, 10) p1 = rf"{tens}[{units}-9]" p2 = rf"[{tens+1}-9]\d" if tens < 9 else "" parts = [p1, p2, r"[1-9]\d{2,}"] return "^(" + "|".join([p for p in parts if p]) + ")$" return r"^[1-9]\d{2,}$" def lt_int(n): if n <= 0: return r"^(?!)$" if n <= 10: return rf"^[0-9]$" if n == 10 else rf"^[0-{n-1}]$" tens, units = divmod(n - 1, 10) if tens == 1: return r"^([0-9]|1[0-9])$" return rf"^([0-9]|[1-{tens-1}]\d|{tens}[0-{units}])$" def le_int(n): if n < 10: return rf"^[0-{n}]$" tens, units = divmod(n, 10) if tens == 1: return r"^([0-9]|1[0-9])$" if units == 9 else rf"^([0-9]|1[0-{units}])$" parts = [r"[0-9]"] if tens > 1: parts.append(rf"[1-{tens-1}]\d") parts.append(rf"{tens}[0-{units}]") return "^(" + "|".join(parts) + ")$" if op == ">": return [gt_int(t)] elif op == ">=": return [ge_int(t)] elif op == "<": return [lt_int(t)] elif op == "<=": return [le_int(t)] return [r"^\d+(?:[.,]\d+)?$"] # fallback for decimals (plain numeric string) def _build_restriction_for_text(op: str | None, target, bounds: dict): """ Usage: Builds a text-based IDS restriction (ids.Restriction) using regex patterns derived from numeric thresholds and comparison operators. Used when a property has textual dataType (e.g., IFCLABEL) but represents numeric conditions. Inputs: op (str | None): Comparison operator (">", ">=", "<", "<=") if explicitly provided. target (any): Target value for the comparison. Can be numeric or string. bounds (dict): Dictionary of limit values such as {"minInclusive": ..., "maxExclusive": ..., "maxInclusive": ...}. Output: ids.Restriction | None: Returns an ids.Restriction object with regex patterns for matching the specified numeric range in string form, or None if no valid pattern can be built. """ if op and target is not None and _is_number_like(target): return ids.Restriction(base="string", options={"pattern": _regex_for_threshold(float(target), op)}) patterns = [] if bounds.get("minExclusive") is not None: patterns += _regex_for_threshold(float(bounds["minExclusive"]), ">") if bounds.get("minInclusive") is not None: patterns += _regex_for_threshold(float(bounds["minInclusive"]), ">=") if bounds.get("maxExclusive") is not None: patterns += _regex_for_threshold(float(bounds["maxExclusive"]), "<") if bounds.get("maxInclusive") is not None: patterns += _regex_for_threshold(float(bounds["maxInclusive"]), "<=") return ids.Restriction(base="string", options={"pattern": patterns}) if patterns else None def _build_numeric_restriction(dt_upper: str | None, op: str | None, target, bounds: dict): """ Usage: Builds a numeric IDS restriction (ids.Restriction) from a data type, comparison operator, target value, and optional numeric bounds. Inputs: dt_upper (str | None): Uppercase IFC data type name (e.g., "IFCREAL", "IFCINTEGER"). op (str | None): Comparison operator (">", ">=", "<", "<=") if provided. target (any): Target value for the comparison. Converted to float when applicable. bounds (dict): Dictionary containing optional boundary values such as {"minInclusive": ..., "maxExclusive": ..., "maxInclusive": ...}. Output: ids.Restriction | None: Returns an ids.Restriction object with the appropriate numeric limits, or None if no valid restriction can be created. """ if not (op or any(v is not None for v in bounds.values())): return None base_num = _guess_numeric_base_from_ifc(dt_upper) opts = {} if op and target is not None: v = float(str(target).replace(",", ".")) if op == ">": opts["minExclusive"] = v elif op == ">=": opts["minInclusive"] = v elif op == "<": opts["maxExclusive"] = v elif op == "<=": opts["maxInclusive"] = v for k in ("minInclusive","maxInclusive","minExclusive","maxExclusive"): if bounds.get(k) is not None: opts[k] = float(str(bounds[k]).replace(",", ".")) if not opts: return None return ids.Restriction(base=base_num, options=opts) def _infer_ids_datatype(pset: str | None, baseName: str | None, provided_dt: str | None, value, op: str | None, bounds: dict) -> str: """ Usage: Infers the appropriate IFC data type (e.g., IFCREAL, IFCINTEGER, IFCBOOLEAN, IFCLABEL) for a given property based on its name, provided data type, value, and restrictions. Inputs: pset (str | None): Name of the property set to which the property belongs. baseName (str | None): Base name of the property (e.g., "ThermalTransmittance", "IsExternal"). provided_dt (str | None): Data type explicitly provided in the input, if any. value (any): Property value or an ids.Restriction object. op (str | None): Comparison operator (">", ">=", "<", "<=") if defined. bounds (dict): Dictionary containing limit values such as {"minInclusive": ..., "maxExclusive": ..., "maxInclusive": ...}. Output: str: Returns the inferred IFC data type string, such as "IFCREAL", "IFCINTEGER", "IFCBOOLEAN", or "IFCLABEL". """ # if a dataType is provided, normalize and promote it if applicable if provided_dt: dtU = _strip_ifc_prefix(provided_dt) if baseName and dtU in ("IFCREAL", "IFCNUMBER", "NUMBER", "REAL"): hint = PROPERTY_DATATYPE_HINTS.get(str(baseName).strip().upper()) if hint: return hint if dtU in MEASURE_HINTS: return MEASURE_HINTS[dtU] return dtU # hints by name if baseName: hint = PROPERTY_DATATYPE_HINTS.get(str(baseName).strip().upper()) if hint: return hint # value = Restriction if isinstance(value, ids.Restriction): base = getattr(value, "base", "").lower() if base in ("integer",): return "IFCINTEGER" if base in ("double","number","real","float"): return "IFCREAL" return "IFCLABEL" # if op/bounds -> numeric if op or any(v is not None for v in bounds.values()): return "IFCREAL" # booleans if _is_bool_like(value): return "IFCBOOLEAN" # literal numbers if _is_number_like(value): try: iv = int(str(value)) if float(str(value)) == float(iv): return "IFCINTEGER" except Exception: pass return "IFCREAL" # text return "IFCLABEL" # (optional) Absorption of PredefinedType into Entity.predefinedType — DISABLED def _absorb_predefined_type(applicability_list: list): """ Usage: Transfers the value of a PREDEFINEDTYPE attribute into the corresponding Entity's predefinedType field within the applicability list. This operation effectively absorbs the PREDEFINEDTYPE entry into the Entity definition. Inputs: applicability_list (list): List of facet dictionaries containing 'Entity' and 'Attribute' definitions. Output: list: The updated applicability list where the PREDEFINEDTYPE value has been moved to the Entity's 'predefinedType' field, if applicable. Returns the original list if no valid Entity or PREDEFINEDTYPE attribute is found. """ if not isinstance(applicability_list, list): return applicability_list idx = next((i for i,f in enumerate(applicability_list) if (f.get("type") == "Entity")), None) if idx is None: return applicability_list for i,f in enumerate(list(applicability_list)): if f.get("type") == "Attribute" and str(f.get("name","")).strip().upper() == "PREDEFINEDTYPE": val = f.get("value") if val not in (None, ""): applicability_list[idx]["predefinedType"] = val applicability_list.pop(i) break return applicability_list # IDS Root # ----------------------------------------------------------------------------------------------------------- try: ids_root = ids.Ids( title=(title or "Untitled"), description=(description or None), author=(author or None), version=(str(ids_version) if ids_version else None), purpose=(purpose or None), milestone=(milestone or None), date=(date_iso or datetime.date.today().isoformat()), ) try: ids_root.title = (title or "Untitled") except Exception: pass try: ids_root.info.title = (title or "Untitled") except Exception: pass except Exception as e: return {"ok": False, "error": "Could not initialize the IDS", "details": str(e)} # Facets (with context) # ----------------------------------------------------------------------------------------------------------- def _facet_from_dict(f, spec_desc: str | None, context: str): """ Usage: Builds an IDS facet object (e.g., Entity, Attribute, Property, Material, Classification, or PartOf) from a dictionary definition. Handles data normalization, type inference, comparison extraction, and restriction creation for both applicability and requirements contexts. Inputs: f (dict): Dictionary describing a facet, including its type and relevant attributes. spec_desc (str | None): Optional specification description used to infer operators or targets when not explicitly provided. context (str): Indicates the facet context, either 'applicability' or 'requirements'. Only in 'requirements' can operator/target be extracted from the description. Output: ids.Entity | ids.Attribute | ids.Property | ids.Material | ids.Classification | ids.PartOf: Returns the corresponding ids.* object based on the facet type. Exceptions: ValueError: Raised if the facet type is unsupported or required fields are missing (e.g., Property without propertySet or baseName, Attribute without name). """ t = (f.get("type") or "").strip() if t == "Entity": ent_name = f.get("name", "") or f.get("entity", "") or f.get("Name", "") ent_name = ent_name.strip() if ent_name.lower().startswith("ifc") and not ent_name.isupper(): ent_name = ent_name.upper() # 'IfcWall' -> 'IFCWALL' return ids.Entity( name=ent_name, predefinedType=f.get("predefinedType", ""), # we keep it separate (not absorbed) instructions=f.get("instructions", ""), ) elif t == "Attribute": name = f.get("name") or f.get("Name") if not name: raise ValueError("Attribute requires 'name'.") kwargs = dict(name=name) if f.get("value") not in (None, ""): val = f["value"] if _is_bool_like(val): tok = _to_bool_token(val) kwargs["value"] = tok if tok else val else: kwargs["value"] = val # Cardinality from occurs card = _card_from_occurs(f.get("minOccurs"), f.get("maxOccurs")) if card: kwargs["cardinality"] = card if f.get("cardinality"): kwargs["cardinality"] = _norm_card(f.get("cardinality")) if f.get("instructions"): kwargs["instructions"] = f["instructions"] return ids.Attribute(**kwargs) elif t == "Property": pset = f.get("propertySet") or f.get("pset") or f.get("psetName") base = f.get("baseName") or f.get("name") or f.get("Name") if not pset or not base: raise ValueError("Property requires 'propertySet' and 'baseName'.") val_in = f.get("value", None) bounds = { "minInclusive": f.get("minInclusive"), "maxInclusive": f.get("maxInclusive"), "minExclusive": f.get("minExclusive"), "maxExclusive": f.get("maxExclusive"), } # minValue/maxValue + inclusivity if f.get("minValue") is not None: if bool(f.get("minInclusive")): bounds["minInclusive"] = f.get("minValue") else: bounds["minExclusive"] = f.get("minValue") if f.get("maxValue") is not None: if bool(f.get("maxInclusive")): bounds["maxInclusive"] = f.get("maxValue") else: bounds["maxExclusive"] = f.get("maxValue") if isinstance(val_in, dict): for k in ("minInclusive","maxInclusive","minExclusive","maxExclusive"): if k in val_in and bounds.get(k) is None: bounds[k] = val_in[k] # explicit operator op = f.get("op") or f.get("operator") or f.get("comparison") or f.get("cmp") or f.get("relation") target = f.get("target") or f.get("threshold") or f.get("limit") # operator in 'value' string ("> 30") if target is None and isinstance(val_in, str): _op2, _tg2 = _extract_op_target_from_string(val_in) if _op2 and _tg2 is not None: op, target, val_in = _op2, _tg2, None # ONLY IN REQUIREMENTS: extract from description if context == "requirements" and (not op and all(v is None for v in bounds.values()) and target is None and spec_desc): _op3, _tg3 = _extract_from_description(spec_desc) if _op3 and _tg3 is not None: op, target = _op3, _tg3 # cardinality from occurs card = _card_from_occurs(f.get("minOccurs"), f.get("maxOccurs")) dt = _infer_ids_datatype(pset, base, f.get("dataType"), val_in, op, bounds) # boolean normalization if _is_bool_like(val_in): tok = _to_bool_token(val_in) if tok is not None: val_in = tok if not dt: dt = "IFCBOOLEAN" # Restriction when applicable restriction_obj = None if op or any(v is not None for v in bounds.values()): if dt in ("IFCLABEL","IFCTEXT"): restriction_obj = _build_restriction_for_text(op, target if target is not None else val_in, bounds) else: restriction_obj = _build_numeric_restriction(dt, op, target if target is not None else val_in, bounds) if isinstance(val_in, ids.Restriction): restriction_obj = val_in kwargs = dict(propertySet=pset, baseName=base) if restriction_obj is not None: kwargs["value"] = restriction_obj if dt: kwargs["dataType"] = dt else: if val_in not in (None, ""): kwargs["value"] = val_in if dt: kwargs["dataType"] = dt if f.get("uri"): kwargs["uri"] = f["uri"] if f.get("instructions"): kwargs["instructions"] = f["instructions"] if card: kwargs["cardinality"] = card if f.get("cardinality"): kwargs["cardinality"] = _norm_card(f.get("cardinality")) if (op or any(v is not None for v in bounds.values())) and "cardinality" not in kwargs: kwargs["cardinality"] = "required" return ids.Property(**kwargs) elif t == "Material": kwargs = {} if f.get("value"): kwargs["value"] = f["value"] if f.get("uri"): kwargs["uri"] = f["uri"] if f.get("cardinality"): kwargs["cardinality"] = _norm_card(f["cardinality"]) if f.get("instructions"): kwargs["instructions"] = f["instructions"] return ids.Material(**kwargs) elif t == "Classification": return ids.Classification( value=f.get("value", ""), system=f.get("system", ""), uri=f.get("uri", ""), cardinality=_norm_card(f.get("cardinality")), instructions=f.get("instructions", ""), ) elif t == "PartOf": return ids.PartOf( name=f.get("name", ""), predefinedType=f.get("predefinedType", ""), relation=f.get("relation", ""), cardinality=_norm_card(f.get("cardinality")), instructions=f.get("instructions", ""), ) else: raise ValueError(f"Unsupported or empty facet type: '{t}'.") # Construction # ----------------------------------------------------------------------------------------------------------- total_specs = total_app = total_req = 0 try: for s in specs: if not isinstance(s, dict): raise ValueError("Each 'spec' must be a dict.") applicability = s.get("applicability", []) requirements = s.get("requirements", []) if not isinstance(applicability, list) or not isinstance(requirements, list): raise ValueError("'applicability' and 'requirements' must be lists.") # Do NOT absorb PredefinedType (it remains as an Attribute in applicability) # applicability = _absorb_predefined_type(applicability) spec_obj = ids.Specification() if s.get("name"): try: spec_obj.name = s["name"] except Exception: pass if s.get("description"): try: spec_obj.description = s["description"] except Exception: pass # ifcVersion: use the provided one; if not, default to IFC4 canon = _norm_ifc_version(s.get("ifcVersion") or "IFC4") try: spec_obj.ifcVersion = canon except Exception: pass for f in applicability: facet = _facet_from_dict(f, s.get("description"), context="applicability") spec_obj.applicability.append(facet); total_app += 1 for f in requirements: facet = _facet_from_dict(f, s.get("description"), context="requirements") spec_obj.requirements.append(facet); total_req += 1 ids_root.specifications.append(spec_obj); total_specs += 1 except Exception as e: return {"ok": False, "error": "Error while building the IDS specifications", "details": str(e)} if total_specs == 0: return {"ok": False, "error": "No Specification was created. Check 'specs'."} # Saved # ----------------------------------------------------------------------------------------------------------- try: if not output_path: safe_title = "".join(c for c in title if c.isalnum() or c in (" ","-","_")).rstrip() or "ids" today = (date_iso if date_iso else datetime.date.today().isoformat()) output_path = os.path.abspath(f"{safe_title}_{today}.ids") os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) ids_root.to_xml(output_path) except Exception as e: return {"ok": False, "error": "Could not save the IDS file", "details": str(e)} return { "ok": True, "output_path": output_path, "message": f"IDS '{title}' generated. Specs: {total_specs}, facets: {total_app} appl. / {total_req} req." } #endregion def extract_quantities(entity, blender_name=None): """ Extract quantity information from an IFC entity. Parameters: entity: IFC entity object blender_name: Optional Blender object name Returns: Dictionary with element info and quantities """ try: # Get all property sets psets = ifcopenshell.util.element.get_psets(entity) # Basic element info element_data = { "id": entity.GlobalId if hasattr(entity, "GlobalId") else f"Entity_{entity.id()}", "name": entity.Name if hasattr(entity, "Name") else None, "type": entity.is_a(), "blender_name": blender_name, "quantities": {} } # Look for quantity information in different property sets quantity_sources = ["BaseQuantities", "ArchiCADQuantities", "Qto_WallBaseQuantities", "Qto_SlabBaseQuantities", "Qto_BeamBaseQuantities", "Qto_ColumnBaseQuantities"] # Extract quantities from property sets - keep original names for pset_name in quantity_sources: if pset_name in psets: pset_data = psets[pset_name] for prop_name, prop_value in pset_data.items(): # Only include numeric values and skip the 'id' field if isinstance(prop_value, (int, float)) and prop_name != 'id': element_data["quantities"][prop_name] = prop_value return element_data if element_data["quantities"] else None except Exception as e: return None # Blender UI Panel class BLENDERMCP_PT_Panel(bpy.types.Panel): bl_label = "Bonsai MCP" bl_idname = "BLENDERMCP_PT_Panel" bl_space_type = 'VIEW_3D' bl_region_type = 'UI' bl_category = 'Bonsai MCP' def draw(self, context): layout = self.layout scene = context.scene layout.prop(scene, "blendermcp_port") if not scene.blendermcp_server_running: layout.operator("blendermcp.start_server", text="Start MCP Server") else: layout.operator("blendermcp.stop_server", text="Stop MCP Server") layout.label(text=f"Running on port {scene.blendermcp_port}") # Operator to start the server class BLENDERMCP_OT_StartServer(bpy.types.Operator): bl_idname = "blendermcp.start_server" bl_label = "Connect to Claude" bl_description = "Start the BlenderMCP server to connect with Claude" def execute(self, context): scene = context.scene # Create a new server instance if not hasattr(bpy.types, "blendermcp_server") or not bpy.types.blendermcp_server: bpy.types.blendermcp_server = BlenderMCPServer(port=scene.blendermcp_port) # Start the server bpy.types.blendermcp_server.start() scene.blendermcp_server_running = True return {'FINISHED'} # Operator to stop the server class BLENDERMCP_OT_StopServer(bpy.types.Operator): bl_idname = "blendermcp.stop_server" bl_label = "Stop the connection to Claude" bl_description = "Stop the connection to Claude" def execute(self, context): scene = context.scene # Stop the server if it exists if hasattr(bpy.types, "blendermcp_server") and bpy.types.blendermcp_server: bpy.types.blendermcp_server.stop() del bpy.types.blendermcp_server scene.blendermcp_server_running = False return {'FINISHED'} # Registration functions def register(): bpy.types.Scene.blendermcp_port = IntProperty( name="Port", description="Port for the BlenderMCP server", default=9876, min=1024, max=65535 ) bpy.types.Scene.blendermcp_server_running = bpy.props.BoolProperty( name="Server Running", default=False ) bpy.utils.register_class(BLENDERMCP_PT_Panel) bpy.utils.register_class(BLENDERMCP_OT_StartServer) bpy.utils.register_class(BLENDERMCP_OT_StopServer) print("BlenderMCP addon registered") def unregister(): # Stop the server if it's running if hasattr(bpy.types, "blendermcp_server") and bpy.types.blendermcp_server: bpy.types.blendermcp_server.stop() del bpy.types.blendermcp_server bpy.utils.unregister_class(BLENDERMCP_PT_Panel) bpy.utils.unregister_class(BLENDERMCP_OT_StartServer) bpy.utils.unregister_class(BLENDERMCP_OT_StopServer) del bpy.types.Scene.blendermcp_port del bpy.types.Scene.blendermcp_server_running print("BlenderMCP addon unregistered") if __name__ == "__main__": register() ```