This is page 8 of 13. Use http://codebase.md/justinpbarnett/unity-mcp?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .claude │ ├── prompts │ │ ├── nl-unity-suite-nl.md │ │ └── nl-unity-suite-t.md │ └── settings.json ├── .github │ ├── scripts │ │ └── mark_skipped.py │ └── workflows │ ├── bump-version.yml │ ├── claude-nl-suite.yml │ ├── github-repo-stats.yml │ └── unity-tests.yml ├── .gitignore ├── deploy-dev.bat ├── docs │ ├── CURSOR_HELP.md │ ├── CUSTOM_TOOLS.md │ ├── README-DEV-zh.md │ ├── README-DEV.md │ ├── screenshots │ │ ├── v5_01_uninstall.png │ │ ├── v5_02_install.png │ │ ├── v5_03_open_mcp_window.png │ │ ├── v5_04_rebuild_mcp_server.png │ │ ├── v5_05_rebuild_success.png │ │ ├── v6_2_create_python_tools_asset.png │ │ ├── v6_2_python_tools_asset.png │ │ ├── v6_new_ui_asset_store_version.png │ │ ├── v6_new_ui_dark.png │ │ └── v6_new_ui_light.png │ ├── TELEMETRY.md │ ├── v5_MIGRATION.md │ └── v6_NEW_UI_CHANGES.md ├── LICENSE ├── logo.png ├── mcp_source.py ├── MCPForUnity │ ├── Editor │ │ ├── AssemblyInfo.cs │ │ ├── AssemblyInfo.cs.meta │ │ ├── Data │ │ │ ├── DefaultServerConfig.cs │ │ │ ├── DefaultServerConfig.cs.meta │ │ │ ├── McpClients.cs │ │ │ ├── McpClients.cs.meta │ │ │ ├── PythonToolsAsset.cs │ │ │ └── PythonToolsAsset.cs.meta │ │ ├── Data.meta │ │ ├── Dependencies │ │ │ ├── DependencyManager.cs │ │ │ ├── DependencyManager.cs.meta │ │ │ ├── Models │ │ │ │ ├── DependencyCheckResult.cs │ │ │ │ ├── DependencyCheckResult.cs.meta │ │ │ │ ├── DependencyStatus.cs │ │ │ │ └── DependencyStatus.cs.meta │ │ │ ├── Models.meta │ │ │ ├── PlatformDetectors │ │ │ │ ├── IPlatformDetector.cs │ │ │ │ ├── IPlatformDetector.cs.meta │ │ │ │ ├── LinuxPlatformDetector.cs │ │ │ │ ├── LinuxPlatformDetector.cs.meta │ │ │ │ ├── MacOSPlatformDetector.cs │ │ │ │ ├── MacOSPlatformDetector.cs.meta │ │ │ │ ├── PlatformDetectorBase.cs │ │ │ │ ├── PlatformDetectorBase.cs.meta │ │ │ │ ├── WindowsPlatformDetector.cs │ │ │ │ └── WindowsPlatformDetector.cs.meta │ │ │ └── PlatformDetectors.meta │ │ ├── Dependencies.meta │ │ ├── External │ │ │ ├── Tommy.cs │ │ │ └── Tommy.cs.meta │ │ ├── External.meta │ │ ├── Helpers │ │ │ ├── AssetPathUtility.cs │ │ │ ├── AssetPathUtility.cs.meta │ │ │ ├── CodexConfigHelper.cs │ │ │ ├── CodexConfigHelper.cs.meta │ │ │ ├── ConfigJsonBuilder.cs │ │ │ ├── ConfigJsonBuilder.cs.meta │ │ │ ├── ExecPath.cs │ │ │ ├── ExecPath.cs.meta │ │ │ ├── GameObjectSerializer.cs │ │ │ ├── GameObjectSerializer.cs.meta │ │ │ ├── McpConfigFileHelper.cs │ │ │ ├── McpConfigFileHelper.cs.meta │ │ │ ├── McpConfigurationHelper.cs │ │ │ ├── McpConfigurationHelper.cs.meta │ │ │ ├── McpLog.cs │ │ │ ├── McpLog.cs.meta │ │ │ ├── McpPathResolver.cs │ │ │ ├── McpPathResolver.cs.meta │ │ │ ├── PackageDetector.cs │ │ │ ├── PackageDetector.cs.meta │ │ │ ├── PackageInstaller.cs │ │ │ ├── PackageInstaller.cs.meta │ │ │ ├── PortManager.cs │ │ │ ├── PortManager.cs.meta │ │ │ ├── PythonToolSyncProcessor.cs │ │ │ ├── PythonToolSyncProcessor.cs.meta │ │ │ ├── Response.cs │ │ │ ├── Response.cs.meta │ │ │ ├── ServerInstaller.cs │ │ │ ├── ServerInstaller.cs.meta │ │ │ ├── ServerPathResolver.cs │ │ │ ├── ServerPathResolver.cs.meta │ │ │ ├── TelemetryHelper.cs │ │ │ ├── TelemetryHelper.cs.meta │ │ │ ├── Vector3Helper.cs │ │ │ └── Vector3Helper.cs.meta │ │ ├── Helpers.meta │ │ ├── Importers │ │ │ ├── PythonFileImporter.cs │ │ │ └── PythonFileImporter.cs.meta │ │ ├── Importers.meta │ │ ├── MCPForUnity.Editor.asmdef │ │ ├── MCPForUnity.Editor.asmdef.meta │ │ ├── MCPForUnityBridge.cs │ │ ├── MCPForUnityBridge.cs.meta │ │ ├── Models │ │ │ ├── Command.cs │ │ │ ├── Command.cs.meta │ │ │ ├── McpClient.cs │ │ │ ├── McpClient.cs.meta │ │ │ ├── McpConfig.cs │ │ │ ├── McpConfig.cs.meta │ │ │ ├── MCPConfigServer.cs │ │ │ ├── MCPConfigServer.cs.meta │ │ │ ├── MCPConfigServers.cs │ │ │ ├── MCPConfigServers.cs.meta │ │ │ ├── McpStatus.cs │ │ │ ├── McpStatus.cs.meta │ │ │ ├── McpTypes.cs │ │ │ ├── McpTypes.cs.meta │ │ │ ├── ServerConfig.cs │ │ │ └── ServerConfig.cs.meta │ │ ├── Models.meta │ │ ├── Resources │ │ │ ├── McpForUnityResourceAttribute.cs │ │ │ ├── McpForUnityResourceAttribute.cs.meta │ │ │ ├── MenuItems │ │ │ │ ├── GetMenuItems.cs │ │ │ │ └── GetMenuItems.cs.meta │ │ │ ├── MenuItems.meta │ │ │ ├── Tests │ │ │ │ ├── GetTests.cs │ │ │ │ └── GetTests.cs.meta │ │ │ └── Tests.meta │ │ ├── Resources.meta │ │ ├── Services │ │ │ ├── BridgeControlService.cs │ │ │ ├── BridgeControlService.cs.meta │ │ │ ├── ClientConfigurationService.cs │ │ │ ├── ClientConfigurationService.cs.meta │ │ │ ├── IBridgeControlService.cs │ │ │ ├── IBridgeControlService.cs.meta │ │ │ ├── IClientConfigurationService.cs │ │ │ ├── IClientConfigurationService.cs.meta │ │ │ ├── IPackageUpdateService.cs │ │ │ ├── IPackageUpdateService.cs.meta │ │ │ ├── IPathResolverService.cs │ │ │ ├── IPathResolverService.cs.meta │ │ │ ├── IPythonToolRegistryService.cs │ │ │ ├── IPythonToolRegistryService.cs.meta │ │ │ ├── ITestRunnerService.cs │ │ │ ├── ITestRunnerService.cs.meta │ │ │ ├── IToolSyncService.cs │ │ │ ├── IToolSyncService.cs.meta │ │ │ ├── MCPServiceLocator.cs │ │ │ ├── MCPServiceLocator.cs.meta │ │ │ ├── PackageUpdateService.cs │ │ │ ├── PackageUpdateService.cs.meta │ │ │ ├── PathResolverService.cs │ │ │ ├── PathResolverService.cs.meta │ │ │ ├── PythonToolRegistryService.cs │ │ │ ├── PythonToolRegistryService.cs.meta │ │ │ ├── TestRunnerService.cs │ │ │ ├── TestRunnerService.cs.meta │ │ │ ├── ToolSyncService.cs │ │ │ └── ToolSyncService.cs.meta │ │ ├── Services.meta │ │ ├── Setup │ │ │ ├── SetupWizard.cs │ │ │ ├── SetupWizard.cs.meta │ │ │ ├── SetupWizardWindow.cs │ │ │ └── SetupWizardWindow.cs.meta │ │ ├── Setup.meta │ │ ├── Tools │ │ │ ├── CommandRegistry.cs │ │ │ ├── CommandRegistry.cs.meta │ │ │ ├── ExecuteMenuItem.cs │ │ │ ├── ExecuteMenuItem.cs.meta │ │ │ ├── ManageAsset.cs │ │ │ ├── ManageAsset.cs.meta │ │ │ ├── ManageEditor.cs │ │ │ ├── ManageEditor.cs.meta │ │ │ ├── ManageGameObject.cs │ │ │ ├── ManageGameObject.cs.meta │ │ │ ├── ManageScene.cs │ │ │ ├── ManageScene.cs.meta │ │ │ ├── ManageScript.cs │ │ │ ├── ManageScript.cs.meta │ │ │ ├── ManageShader.cs │ │ │ ├── ManageShader.cs.meta │ │ │ ├── McpForUnityToolAttribute.cs │ │ │ ├── McpForUnityToolAttribute.cs.meta │ │ │ ├── Prefabs │ │ │ │ ├── ManagePrefabs.cs │ │ │ │ └── ManagePrefabs.cs.meta │ │ │ ├── Prefabs.meta │ │ │ ├── ReadConsole.cs │ │ │ ├── ReadConsole.cs.meta │ │ │ ├── RunTests.cs │ │ │ └── RunTests.cs.meta │ │ ├── Tools.meta │ │ ├── Windows │ │ │ ├── ManualConfigEditorWindow.cs │ │ │ ├── ManualConfigEditorWindow.cs.meta │ │ │ ├── MCPForUnityEditorWindow.cs │ │ │ ├── MCPForUnityEditorWindow.cs.meta │ │ │ ├── MCPForUnityEditorWindowNew.cs │ │ │ ├── MCPForUnityEditorWindowNew.cs.meta │ │ │ ├── MCPForUnityEditorWindowNew.uss │ │ │ ├── MCPForUnityEditorWindowNew.uss.meta │ │ │ ├── MCPForUnityEditorWindowNew.uxml │ │ │ ├── MCPForUnityEditorWindowNew.uxml.meta │ │ │ ├── VSCodeManualSetupWindow.cs │ │ │ └── VSCodeManualSetupWindow.cs.meta │ │ └── Windows.meta │ ├── Editor.meta │ ├── package.json │ ├── package.json.meta │ ├── README.md │ ├── README.md.meta │ ├── Runtime │ │ ├── MCPForUnity.Runtime.asmdef │ │ ├── MCPForUnity.Runtime.asmdef.meta │ │ ├── Serialization │ │ │ ├── UnityTypeConverters.cs │ │ │ └── UnityTypeConverters.cs.meta │ │ └── Serialization.meta │ ├── Runtime.meta │ └── UnityMcpServer~ │ └── src │ ├── __init__.py │ ├── config.py │ ├── Dockerfile │ ├── models.py │ ├── module_discovery.py │ ├── port_discovery.py │ ├── pyproject.toml │ ├── pyrightconfig.json │ ├── registry │ │ ├── __init__.py │ │ ├── resource_registry.py │ │ └── tool_registry.py │ ├── reload_sentinel.py │ ├── resources │ │ ├── __init__.py │ │ ├── menu_items.py │ │ └── tests.py │ ├── server_version.txt │ ├── server.py │ ├── telemetry_decorator.py │ ├── telemetry.py │ ├── test_telemetry.py │ ├── tools │ │ ├── __init__.py │ │ ├── execute_menu_item.py │ │ ├── manage_asset.py │ │ ├── manage_editor.py │ │ ├── manage_gameobject.py │ │ ├── manage_prefabs.py │ │ ├── manage_scene.py │ │ ├── manage_script.py │ │ ├── manage_shader.py │ │ ├── read_console.py │ │ ├── resource_tools.py │ │ ├── run_tests.py │ │ └── script_apply_edits.py │ ├── unity_connection.py │ └── uv.lock ├── prune_tool_results.py ├── README-zh.md ├── README.md ├── restore-dev.bat ├── scripts │ └── validate-nlt-coverage.sh ├── test_unity_socket_framing.py ├── TestProjects │ └── UnityMCPTests │ ├── .gitignore │ ├── Assets │ │ ├── Editor.meta │ │ ├── Scenes │ │ │ ├── SampleScene.unity │ │ │ └── SampleScene.unity.meta │ │ ├── Scenes.meta │ │ ├── Scripts │ │ │ ├── Hello.cs │ │ │ ├── Hello.cs.meta │ │ │ ├── LongUnityScriptClaudeTest.cs │ │ │ ├── LongUnityScriptClaudeTest.cs.meta │ │ │ ├── TestAsmdef │ │ │ │ ├── CustomComponent.cs │ │ │ │ ├── CustomComponent.cs.meta │ │ │ │ ├── TestAsmdef.asmdef │ │ │ │ └── TestAsmdef.asmdef.meta │ │ │ └── TestAsmdef.meta │ │ ├── Scripts.meta │ │ ├── Tests │ │ │ ├── EditMode │ │ │ │ ├── Data │ │ │ │ │ ├── PythonToolsAssetTests.cs │ │ │ │ │ └── PythonToolsAssetTests.cs.meta │ │ │ │ ├── Data.meta │ │ │ │ ├── Helpers │ │ │ │ │ ├── CodexConfigHelperTests.cs │ │ │ │ │ ├── CodexConfigHelperTests.cs.meta │ │ │ │ │ ├── WriteToConfigTests.cs │ │ │ │ │ └── WriteToConfigTests.cs.meta │ │ │ │ ├── Helpers.meta │ │ │ │ ├── MCPForUnityTests.Editor.asmdef │ │ │ │ ├── MCPForUnityTests.Editor.asmdef.meta │ │ │ │ ├── Resources │ │ │ │ │ ├── GetMenuItemsTests.cs │ │ │ │ │ └── GetMenuItemsTests.cs.meta │ │ │ │ ├── Resources.meta │ │ │ │ ├── Services │ │ │ │ │ ├── PackageUpdateServiceTests.cs │ │ │ │ │ ├── PackageUpdateServiceTests.cs.meta │ │ │ │ │ ├── PythonToolRegistryServiceTests.cs │ │ │ │ │ ├── PythonToolRegistryServiceTests.cs.meta │ │ │ │ │ ├── ToolSyncServiceTests.cs │ │ │ │ │ └── ToolSyncServiceTests.cs.meta │ │ │ │ ├── Services.meta │ │ │ │ ├── Tools │ │ │ │ │ ├── AIPropertyMatchingTests.cs │ │ │ │ │ ├── AIPropertyMatchingTests.cs.meta │ │ │ │ │ ├── CommandRegistryTests.cs │ │ │ │ │ ├── CommandRegistryTests.cs.meta │ │ │ │ │ ├── ComponentResolverTests.cs │ │ │ │ │ ├── ComponentResolverTests.cs.meta │ │ │ │ │ ├── ExecuteMenuItemTests.cs │ │ │ │ │ ├── ExecuteMenuItemTests.cs.meta │ │ │ │ │ ├── ManageGameObjectTests.cs │ │ │ │ │ ├── ManageGameObjectTests.cs.meta │ │ │ │ │ ├── ManagePrefabsTests.cs │ │ │ │ │ ├── ManagePrefabsTests.cs.meta │ │ │ │ │ ├── ManageScriptValidationTests.cs │ │ │ │ │ ├── ManageScriptValidationTests.cs.meta │ │ │ │ │ └── MaterialMeshInstantiationTests.cs │ │ │ │ ├── Tools.meta │ │ │ │ ├── Windows │ │ │ │ │ ├── ManualConfigJsonBuilderTests.cs │ │ │ │ │ └── ManualConfigJsonBuilderTests.cs.meta │ │ │ │ └── Windows.meta │ │ │ └── EditMode.meta │ │ └── Tests.meta │ ├── Packages │ │ └── manifest.json │ └── ProjectSettings │ ├── Packages │ │ └── com.unity.testtools.codecoverage │ │ └── Settings.json │ └── ProjectVersion.txt ├── tests │ ├── conftest.py │ ├── test_edit_normalization_and_noop.py │ ├── test_edit_strict_and_warnings.py │ ├── test_find_in_file_minimal.py │ ├── test_get_sha.py │ ├── test_improved_anchor_matching.py │ ├── test_logging_stdout.py │ ├── test_manage_script_uri.py │ ├── test_read_console_truncate.py │ ├── test_read_resource_minimal.py │ ├── test_resources_api.py │ ├── test_script_editing.py │ ├── test_script_tools.py │ ├── test_telemetry_endpoint_validation.py │ ├── test_telemetry_queue_worker.py │ ├── test_telemetry_subaction.py │ ├── test_transport_framing.py │ └── test_validate_script_summary.py ├── tools │ └── stress_mcp.py └── UnityMcpBridge ├── Editor │ ├── AssemblyInfo.cs │ ├── AssemblyInfo.cs.meta │ ├── Data │ │ ├── DefaultServerConfig.cs │ │ ├── DefaultServerConfig.cs.meta │ │ ├── McpClients.cs │ │ └── McpClients.cs.meta │ ├── Data.meta │ ├── Dependencies │ │ ├── DependencyManager.cs │ │ ├── DependencyManager.cs.meta │ │ ├── Models │ │ │ ├── DependencyCheckResult.cs │ │ │ ├── DependencyCheckResult.cs.meta │ │ │ ├── DependencyStatus.cs │ │ │ └── DependencyStatus.cs.meta │ │ ├── Models.meta │ │ ├── PlatformDetectors │ │ │ ├── IPlatformDetector.cs │ │ │ ├── IPlatformDetector.cs.meta │ │ │ ├── LinuxPlatformDetector.cs │ │ │ ├── LinuxPlatformDetector.cs.meta │ │ │ ├── MacOSPlatformDetector.cs │ │ │ ├── MacOSPlatformDetector.cs.meta │ │ │ ├── PlatformDetectorBase.cs │ │ │ ├── PlatformDetectorBase.cs.meta │ │ │ ├── WindowsPlatformDetector.cs │ │ │ └── WindowsPlatformDetector.cs.meta │ │ └── PlatformDetectors.meta │ ├── Dependencies.meta │ ├── External │ │ ├── Tommy.cs │ │ └── Tommy.cs.meta │ ├── External.meta │ ├── Helpers │ │ ├── AssetPathUtility.cs │ │ ├── AssetPathUtility.cs.meta │ │ ├── CodexConfigHelper.cs │ │ ├── CodexConfigHelper.cs.meta │ │ ├── ConfigJsonBuilder.cs │ │ ├── ConfigJsonBuilder.cs.meta │ │ ├── ExecPath.cs │ │ ├── ExecPath.cs.meta │ │ ├── GameObjectSerializer.cs │ │ ├── GameObjectSerializer.cs.meta │ │ ├── McpConfigFileHelper.cs │ │ ├── McpConfigFileHelper.cs.meta │ │ ├── McpConfigurationHelper.cs │ │ ├── McpConfigurationHelper.cs.meta │ │ ├── McpLog.cs │ │ ├── McpLog.cs.meta │ │ ├── McpPathResolver.cs │ │ ├── McpPathResolver.cs.meta │ │ ├── PackageDetector.cs │ │ ├── PackageDetector.cs.meta │ │ ├── PackageInstaller.cs │ │ ├── PackageInstaller.cs.meta │ │ ├── PortManager.cs │ │ ├── PortManager.cs.meta │ │ ├── Response.cs │ │ ├── Response.cs.meta │ │ ├── ServerInstaller.cs │ │ ├── ServerInstaller.cs.meta │ │ ├── ServerPathResolver.cs │ │ ├── ServerPathResolver.cs.meta │ │ ├── TelemetryHelper.cs │ │ ├── TelemetryHelper.cs.meta │ │ ├── Vector3Helper.cs │ │ └── Vector3Helper.cs.meta │ ├── Helpers.meta │ ├── MCPForUnity.Editor.asmdef │ ├── MCPForUnity.Editor.asmdef.meta │ ├── MCPForUnityBridge.cs │ ├── MCPForUnityBridge.cs.meta │ ├── Models │ │ ├── Command.cs │ │ ├── Command.cs.meta │ │ ├── McpClient.cs │ │ ├── McpClient.cs.meta │ │ ├── McpConfig.cs │ │ ├── McpConfig.cs.meta │ │ ├── MCPConfigServer.cs │ │ ├── MCPConfigServer.cs.meta │ │ ├── MCPConfigServers.cs │ │ ├── MCPConfigServers.cs.meta │ │ ├── McpStatus.cs │ │ ├── McpStatus.cs.meta │ │ ├── McpTypes.cs │ │ ├── McpTypes.cs.meta │ │ ├── ServerConfig.cs │ │ └── ServerConfig.cs.meta │ ├── Models.meta │ ├── Setup │ │ ├── SetupWizard.cs │ │ ├── SetupWizard.cs.meta │ │ ├── SetupWizardWindow.cs │ │ └── SetupWizardWindow.cs.meta │ ├── Setup.meta │ ├── Tools │ │ ├── CommandRegistry.cs │ │ ├── CommandRegistry.cs.meta │ │ ├── ManageAsset.cs │ │ ├── ManageAsset.cs.meta │ │ ├── ManageEditor.cs │ │ ├── ManageEditor.cs.meta │ │ ├── ManageGameObject.cs │ │ ├── ManageGameObject.cs.meta │ │ ├── ManageScene.cs │ │ ├── ManageScene.cs.meta │ │ ├── ManageScript.cs │ │ ├── ManageScript.cs.meta │ │ ├── ManageShader.cs │ │ ├── ManageShader.cs.meta │ │ ├── McpForUnityToolAttribute.cs │ │ ├── McpForUnityToolAttribute.cs.meta │ │ ├── MenuItems │ │ │ ├── ManageMenuItem.cs │ │ │ ├── ManageMenuItem.cs.meta │ │ │ ├── MenuItemExecutor.cs │ │ │ ├── MenuItemExecutor.cs.meta │ │ │ ├── MenuItemsReader.cs │ │ │ └── MenuItemsReader.cs.meta │ │ ├── MenuItems.meta │ │ ├── Prefabs │ │ │ ├── ManagePrefabs.cs │ │ │ └── ManagePrefabs.cs.meta │ │ ├── Prefabs.meta │ │ ├── ReadConsole.cs │ │ └── ReadConsole.cs.meta │ ├── Tools.meta │ ├── Windows │ │ ├── ManualConfigEditorWindow.cs │ │ ├── ManualConfigEditorWindow.cs.meta │ │ ├── MCPForUnityEditorWindow.cs │ │ ├── MCPForUnityEditorWindow.cs.meta │ │ ├── VSCodeManualSetupWindow.cs │ │ └── VSCodeManualSetupWindow.cs.meta │ └── Windows.meta ├── Editor.meta ├── package.json ├── package.json.meta ├── README.md ├── README.md.meta ├── Runtime │ ├── MCPForUnity.Runtime.asmdef │ ├── MCPForUnity.Runtime.asmdef.meta │ ├── Serialization │ │ ├── UnityTypeConverters.cs │ │ └── UnityTypeConverters.cs.meta │ └── Serialization.meta ├── Runtime.meta └── UnityMcpServer~ └── src ├── __init__.py ├── config.py ├── Dockerfile ├── port_discovery.py ├── pyproject.toml ├── pyrightconfig.json ├── registry │ ├── __init__.py │ └── tool_registry.py ├── reload_sentinel.py ├── server_version.txt ├── server.py ├── telemetry_decorator.py ├── telemetry.py ├── test_telemetry.py ├── tools │ ├── __init__.py │ ├── manage_asset.py │ ├── manage_editor.py │ ├── manage_gameobject.py │ ├── manage_menu_item.py │ ├── manage_prefabs.py │ ├── manage_scene.py │ ├── manage_script.py │ ├── manage_shader.py │ ├── read_console.py │ ├── resource_tools.py │ └── script_apply_edits.py ├── unity_connection.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /MCPForUnity/Editor/Helpers/ServerInstaller.cs: -------------------------------------------------------------------------------- ```csharp using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.IO.Compression; using System.Linq; using System.Net; using System.Runtime.InteropServices; using UnityEditor; using UnityEngine; namespace MCPForUnity.Editor.Helpers { public static class ServerInstaller { private const string RootFolder = "UnityMCP"; private const string ServerFolder = "UnityMcpServer"; private const string VersionFileName = "server_version.txt"; /// <summary> /// Ensures the mcp-for-unity-server is installed locally by copying from the embedded package source. /// No network calls or Git operations are performed. /// </summary> public static void EnsureServerInstalled() { try { string saveLocation = GetSaveLocation(); TryCreateMacSymlinkForAppSupport(); string destRoot = Path.Combine(saveLocation, ServerFolder); string destSrc = Path.Combine(destRoot, "src"); // Detect legacy installs and version state (logs) DetectAndLogLegacyInstallStates(destRoot); // Resolve embedded source and versions if (!TryGetEmbeddedServerSource(out string embeddedSrc)) { // Asset Store install - no embedded server // Check if server was already downloaded if (File.Exists(Path.Combine(destSrc, "server.py"))) { McpLog.Info("Using previously downloaded MCP server.", always: false); } else { McpLog.Info("MCP server not found. Download via Window > MCP For Unity > Open MCP Window.", always: false); } return; // Graceful exit - no exception } string embeddedVer = ReadVersionFile(Path.Combine(embeddedSrc, VersionFileName)) ?? "unknown"; string installedVer = ReadVersionFile(Path.Combine(destSrc, VersionFileName)); bool destHasServer = File.Exists(Path.Combine(destSrc, "server.py")); bool needOverwrite = !destHasServer || string.IsNullOrEmpty(installedVer) || (!string.IsNullOrEmpty(embeddedVer) && CompareSemverSafe(installedVer, embeddedVer) < 0); // Ensure destination exists Directory.CreateDirectory(destRoot); if (needOverwrite) { // Copy the entire UnityMcpServer folder (parent of src) string embeddedRoot = Path.GetDirectoryName(embeddedSrc) ?? embeddedSrc; // go up from src to UnityMcpServer CopyDirectoryRecursive(embeddedRoot, destRoot); // Write/refresh version file try { File.WriteAllText(Path.Combine(destSrc, VersionFileName), embeddedVer ?? "unknown"); } catch { } McpLog.Info($"Installed/updated server to {destRoot} (version {embeddedVer})."); } // Cleanup legacy installs that are missing version or older than embedded foreach (var legacyRoot in GetLegacyRootsForDetection()) { try { string legacySrc = Path.Combine(legacyRoot, "src"); if (!File.Exists(Path.Combine(legacySrc, "server.py"))) continue; string legacyVer = ReadVersionFile(Path.Combine(legacySrc, VersionFileName)); bool legacyOlder = string.IsNullOrEmpty(legacyVer) || (!string.IsNullOrEmpty(embeddedVer) && CompareSemverSafe(legacyVer, embeddedVer) < 0); if (legacyOlder) { TryKillUvForPath(legacySrc); try { Directory.Delete(legacyRoot, recursive: true); McpLog.Info($"Removed legacy server at '{legacyRoot}'."); } catch (Exception ex) { McpLog.Warn($"Failed to remove legacy server at '{legacyRoot}': {ex.Message}"); } } } catch { } } // Clear overrides that might point at legacy locations try { EditorPrefs.DeleteKey("MCPForUnity.ServerSrc"); EditorPrefs.DeleteKey("MCPForUnity.PythonDirOverride"); } catch { } return; } catch (Exception ex) { // If a usable server is already present (installed or embedded), don't fail hard—just warn. bool hasInstalled = false; try { hasInstalled = File.Exists(Path.Combine(GetServerPath(), "server.py")); } catch { } if (hasInstalled || TryGetEmbeddedServerSource(out _)) { McpLog.Warn($"Using existing server; skipped install. Details: {ex.Message}"); return; } McpLog.Error($"Failed to ensure server installation: {ex.Message}"); } } public static string GetServerPath() { return Path.Combine(GetSaveLocation(), ServerFolder, "src"); } /// <summary> /// Gets the platform-specific save location for the server. /// </summary> private static string GetSaveLocation() { if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { // Use per-user LocalApplicationData for canonical install location var localAppData = Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData) ?? Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.UserProfile) ?? string.Empty, "AppData", "Local"); return Path.Combine(localAppData, RootFolder); } else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) { var xdg = Environment.GetEnvironmentVariable("XDG_DATA_HOME"); if (string.IsNullOrEmpty(xdg)) { xdg = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.UserProfile) ?? string.Empty, ".local", "share"); } return Path.Combine(xdg, RootFolder); } else if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) { // On macOS, use LocalApplicationData (~/Library/Application Support) var localAppSupport = Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData); // Unity/Mono may map LocalApplicationData to ~/.local/share on macOS; normalize to Application Support bool looksLikeXdg = !string.IsNullOrEmpty(localAppSupport) && localAppSupport.Replace('\\', '/').Contains("/.local/share"); if (string.IsNullOrEmpty(localAppSupport) || looksLikeXdg) { // Fallback: construct from $HOME var home = Environment.GetFolderPath(Environment.SpecialFolder.Personal) ?? string.Empty; localAppSupport = Path.Combine(home, "Library", "Application Support"); } TryCreateMacSymlinkForAppSupport(); return Path.Combine(localAppSupport, RootFolder); } throw new Exception("Unsupported operating system"); } /// <summary> /// On macOS, create a no-spaces symlink ~/Library/AppSupport -> ~/Library/Application Support /// to mitigate arg parsing and quoting issues in some MCP clients. /// Safe to call repeatedly. /// </summary> private static void TryCreateMacSymlinkForAppSupport() { try { if (!RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) return; string home = Environment.GetFolderPath(Environment.SpecialFolder.Personal) ?? string.Empty; if (string.IsNullOrEmpty(home)) return; string canonical = Path.Combine(home, "Library", "Application Support"); string symlink = Path.Combine(home, "Library", "AppSupport"); // If symlink exists already, nothing to do if (Directory.Exists(symlink) || File.Exists(symlink)) return; // Create symlink only if canonical exists if (!Directory.Exists(canonical)) return; // Use 'ln -s' to create a directory symlink (macOS) var psi = new ProcessStartInfo { FileName = "/bin/ln", Arguments = $"-s \"{canonical}\" \"{symlink}\"", UseShellExecute = false, RedirectStandardOutput = true, RedirectStandardError = true, CreateNoWindow = true }; using var p = Process.Start(psi); p?.WaitForExit(2000); } catch { /* best-effort */ } } private static bool IsDirectoryWritable(string path) { try { File.Create(Path.Combine(path, "test.txt")).Dispose(); File.Delete(Path.Combine(path, "test.txt")); return true; } catch { return false; } } /// <summary> /// Checks if the server is installed at the specified location. /// </summary> private static bool IsServerInstalled(string location) { return Directory.Exists(location) && File.Exists(Path.Combine(location, ServerFolder, "src", "server.py")); } /// <summary> /// Detects legacy installs or older versions and logs findings (no deletion yet). /// </summary> private static void DetectAndLogLegacyInstallStates(string canonicalRoot) { try { string canonicalSrc = Path.Combine(canonicalRoot, "src"); // Normalize canonical root for comparisons string normCanonicalRoot = NormalizePathSafe(canonicalRoot); string embeddedSrc = null; TryGetEmbeddedServerSource(out embeddedSrc); string embeddedVer = ReadVersionFile(Path.Combine(embeddedSrc ?? string.Empty, VersionFileName)); string installedVer = ReadVersionFile(Path.Combine(canonicalSrc, VersionFileName)); // Legacy paths (macOS/Linux .config; Windows roaming as example) foreach (var legacyRoot in GetLegacyRootsForDetection()) { // Skip logging for the canonical root itself if (PathsEqualSafe(legacyRoot, normCanonicalRoot)) continue; string legacySrc = Path.Combine(legacyRoot, "src"); bool hasServer = File.Exists(Path.Combine(legacySrc, "server.py")); string legacyVer = ReadVersionFile(Path.Combine(legacySrc, VersionFileName)); if (hasServer) { // Case 1: No version file if (string.IsNullOrEmpty(legacyVer)) { McpLog.Info("Detected legacy install without version file at: " + legacyRoot, always: false); } // Case 2: Lives in legacy path McpLog.Info("Detected legacy install path: " + legacyRoot, always: false); // Case 3: Has version but appears older than embedded if (!string.IsNullOrEmpty(embeddedVer) && !string.IsNullOrEmpty(legacyVer) && CompareSemverSafe(legacyVer, embeddedVer) < 0) { McpLog.Info($"Legacy install version {legacyVer} is older than embedded {embeddedVer}", always: false); } } } // Also log if canonical is missing version (treated as older) if (Directory.Exists(canonicalRoot)) { if (string.IsNullOrEmpty(installedVer)) { McpLog.Info("Canonical install missing version file (treat as older). Path: " + canonicalRoot, always: false); } else if (!string.IsNullOrEmpty(embeddedVer) && CompareSemverSafe(installedVer, embeddedVer) < 0) { McpLog.Info($"Canonical install version {installedVer} is older than embedded {embeddedVer}", always: false); } } } catch (Exception ex) { McpLog.Warn("Detect legacy/version state failed: " + ex.Message); } } private static string NormalizePathSafe(string path) { try { return string.IsNullOrEmpty(path) ? path : Path.GetFullPath(path.Trim()); } catch { return path; } } private static bool PathsEqualSafe(string a, string b) { if (string.IsNullOrEmpty(a) || string.IsNullOrEmpty(b)) return false; string na = NormalizePathSafe(a); string nb = NormalizePathSafe(b); try { if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { return string.Equals(na, nb, StringComparison.OrdinalIgnoreCase); } return string.Equals(na, nb, StringComparison.Ordinal); } catch { return false; } } private static IEnumerable<string> GetLegacyRootsForDetection() { var roots = new List<string>(); string home = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile) ?? string.Empty; // macOS/Linux legacy roots.Add(Path.Combine(home, ".config", "UnityMCP", "UnityMcpServer")); roots.Add(Path.Combine(home, ".local", "share", "UnityMCP", "UnityMcpServer")); // Windows roaming example try { string roaming = Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData) ?? string.Empty; if (!string.IsNullOrEmpty(roaming)) roots.Add(Path.Combine(roaming, "UnityMCP", "UnityMcpServer")); // Windows legacy: early installers/dev scripts used %LOCALAPPDATA%\Programs\UnityMCP\UnityMcpServer // Detect this location so we can clean up older copies during install/update. string localAppData = Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData) ?? string.Empty; if (!string.IsNullOrEmpty(localAppData)) roots.Add(Path.Combine(localAppData, "Programs", "UnityMCP", "UnityMcpServer")); } catch { } return roots; } private static void TryKillUvForPath(string serverSrcPath) { try { if (string.IsNullOrEmpty(serverSrcPath)) return; if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) return; var psi = new ProcessStartInfo { FileName = "/usr/bin/pgrep", Arguments = $"-f \"uv .*--directory {serverSrcPath}\"", UseShellExecute = false, RedirectStandardOutput = true, RedirectStandardError = true, CreateNoWindow = true }; using var p = Process.Start(psi); if (p == null) return; string outp = p.StandardOutput.ReadToEnd(); p.WaitForExit(1500); if (p.ExitCode == 0 && !string.IsNullOrEmpty(outp)) { foreach (var line in outp.Split(new[] { '\n', '\r' }, StringSplitOptions.RemoveEmptyEntries)) { if (int.TryParse(line.Trim(), out int pid)) { try { Process.GetProcessById(pid).Kill(); } catch { } } } } } catch { } } private static string ReadVersionFile(string path) { try { if (string.IsNullOrEmpty(path) || !File.Exists(path)) return null; string v = File.ReadAllText(path).Trim(); return string.IsNullOrEmpty(v) ? null : v; } catch { return null; } } private static int CompareSemverSafe(string a, string b) { try { if (string.IsNullOrEmpty(a) || string.IsNullOrEmpty(b)) return 0; var ap = a.Split('.'); var bp = b.Split('.'); for (int i = 0; i < Math.Max(ap.Length, bp.Length); i++) { int ai = (i < ap.Length && int.TryParse(ap[i], out var t1)) ? t1 : 0; int bi = (i < bp.Length && int.TryParse(bp[i], out var t2)) ? t2 : 0; if (ai != bi) return ai.CompareTo(bi); } return 0; } catch { return 0; } } /// <summary> /// Attempts to locate the embedded UnityMcpServer/src directory inside the installed package /// or common development locations. /// </summary> private static bool TryGetEmbeddedServerSource(out string srcPath) { return ServerPathResolver.TryFindEmbeddedServerSource(out srcPath); } private static readonly string[] _skipDirs = { ".venv", "__pycache__", ".pytest_cache", ".mypy_cache", ".git" }; private static void CopyDirectoryRecursive(string sourceDir, string destinationDir) { Directory.CreateDirectory(destinationDir); foreach (string filePath in Directory.GetFiles(sourceDir)) { string fileName = Path.GetFileName(filePath); string destFile = Path.Combine(destinationDir, fileName); File.Copy(filePath, destFile, overwrite: true); } foreach (string dirPath in Directory.GetDirectories(sourceDir)) { string dirName = Path.GetFileName(dirPath); foreach (var skip in _skipDirs) { if (dirName.Equals(skip, StringComparison.OrdinalIgnoreCase)) goto NextDir; } try { if ((File.GetAttributes(dirPath) & FileAttributes.ReparsePoint) != 0) continue; } catch { } string destSubDir = Path.Combine(destinationDir, dirName); CopyDirectoryRecursive(dirPath, destSubDir); NextDir:; } } public static bool RebuildMcpServer() { try { // Find embedded source if (!TryGetEmbeddedServerSource(out string embeddedSrc)) { McpLog.Error("RebuildMcpServer: Could not find embedded server source."); return false; } string saveLocation = GetSaveLocation(); string destRoot = Path.Combine(saveLocation, ServerFolder); string destSrc = Path.Combine(destRoot, "src"); // Kill any running uv processes for this server TryKillUvForPath(destSrc); // Delete the entire installed server directory if (Directory.Exists(destRoot)) { try { Directory.Delete(destRoot, recursive: true); McpLog.Info($"Deleted existing server at {destRoot}"); } catch (Exception ex) { McpLog.Error($"Failed to delete existing server: {ex.Message}"); return false; } } // Re-copy from embedded source string embeddedRoot = Path.GetDirectoryName(embeddedSrc) ?? embeddedSrc; Directory.CreateDirectory(destRoot); CopyDirectoryRecursive(embeddedRoot, destRoot); // Write version file string embeddedVer = ReadVersionFile(Path.Combine(embeddedSrc, VersionFileName)) ?? "unknown"; try { File.WriteAllText(Path.Combine(destSrc, VersionFileName), embeddedVer); } catch (Exception ex) { McpLog.Warn($"Failed to write version file: {ex.Message}"); } McpLog.Info($"Server rebuilt successfully at {destRoot} (version {embeddedVer})"); return true; } catch (Exception ex) { McpLog.Error($"RebuildMcpServer failed: {ex.Message}"); return false; } } internal static string FindUvPath() { // Allow user override via EditorPrefs try { string overridePath = EditorPrefs.GetString("MCPForUnity.UvPath", string.Empty); if (!string.IsNullOrEmpty(overridePath) && File.Exists(overridePath)) { if (ValidateUvBinary(overridePath)) return overridePath; } } catch { } string home = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile) ?? string.Empty; // Platform-specific candidate lists string[] candidates; if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { string localAppData = Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData) ?? string.Empty; string programFiles = Environment.GetFolderPath(Environment.SpecialFolder.ProgramFiles) ?? string.Empty; string appData = Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData) ?? string.Empty; // Fast path: resolve from PATH first try { var wherePsi = new ProcessStartInfo { FileName = "where", Arguments = "uv.exe", UseShellExecute = false, RedirectStandardOutput = true, RedirectStandardError = true, CreateNoWindow = true }; using var wp = Process.Start(wherePsi); string output = wp.StandardOutput.ReadToEnd().Trim(); wp.WaitForExit(1500); if (wp.ExitCode == 0 && !string.IsNullOrEmpty(output)) { foreach (var line in output.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries)) { string path = line.Trim(); if (File.Exists(path) && ValidateUvBinary(path)) return path; } } } catch { } // Windows Store (PythonSoftwareFoundation) install location probe // Example: %LOCALAPPDATA%\Packages\PythonSoftwareFoundation.Python.3.13_*\LocalCache\local-packages\Python313\Scripts\uv.exe try { string pkgsRoot = Path.Combine(localAppData, "Packages"); if (Directory.Exists(pkgsRoot)) { var pythonPkgs = Directory.GetDirectories(pkgsRoot, "PythonSoftwareFoundation.Python.*", SearchOption.TopDirectoryOnly) .OrderByDescending(p => p, StringComparer.OrdinalIgnoreCase); foreach (var pkg in pythonPkgs) { string localCache = Path.Combine(pkg, "LocalCache", "local-packages"); if (!Directory.Exists(localCache)) continue; var pyRoots = Directory.GetDirectories(localCache, "Python*", SearchOption.TopDirectoryOnly) .OrderByDescending(d => d, StringComparer.OrdinalIgnoreCase); foreach (var pyRoot in pyRoots) { string uvExe = Path.Combine(pyRoot, "Scripts", "uv.exe"); if (File.Exists(uvExe) && ValidateUvBinary(uvExe)) return uvExe; } } } } catch { } candidates = new[] { // Preferred: WinGet Links shims (stable entrypoints) // Per-user shim (LOCALAPPDATA) → machine-wide shim (Program Files\WinGet\Links) Path.Combine(localAppData, "Microsoft", "WinGet", "Links", "uv.exe"), Path.Combine(programFiles, "WinGet", "Links", "uv.exe"), // Common per-user installs Path.Combine(localAppData, @"Programs\Python\Python313\Scripts\uv.exe"), Path.Combine(localAppData, @"Programs\Python\Python312\Scripts\uv.exe"), Path.Combine(localAppData, @"Programs\Python\Python311\Scripts\uv.exe"), Path.Combine(localAppData, @"Programs\Python\Python310\Scripts\uv.exe"), Path.Combine(appData, @"Python\Python313\Scripts\uv.exe"), Path.Combine(appData, @"Python\Python312\Scripts\uv.exe"), Path.Combine(appData, @"Python\Python311\Scripts\uv.exe"), Path.Combine(appData, @"Python\Python310\Scripts\uv.exe"), // Program Files style installs (if a native installer was used) Path.Combine(programFiles, @"uv\uv.exe"), // Try simple name resolution later via PATH "uv.exe", "uv" }; } else { candidates = new[] { "/opt/homebrew/bin/uv", "/usr/local/bin/uv", "/usr/bin/uv", "/opt/local/bin/uv", Path.Combine(home, ".local", "bin", "uv"), "/opt/homebrew/opt/uv/bin/uv", // Framework Python installs "/Library/Frameworks/Python.framework/Versions/3.13/bin/uv", "/Library/Frameworks/Python.framework/Versions/3.12/bin/uv", // Fallback to PATH resolution by name "uv" }; } foreach (string c in candidates) { try { if (File.Exists(c) && ValidateUvBinary(c)) return c; } catch { /* ignore */ } } // Use platform-appropriate which/where to resolve from PATH (non-Windows handled here; Windows tried earlier) try { if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { var whichPsi = new ProcessStartInfo { FileName = "/usr/bin/which", Arguments = "uv", UseShellExecute = false, RedirectStandardOutput = true, RedirectStandardError = true, CreateNoWindow = true }; try { // Prepend common user-local and package manager locations so 'which' can see them in Unity's GUI env string homeDir = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile) ?? string.Empty; string prepend = string.Join(":", new[] { Path.Combine(homeDir, ".local", "bin"), "/opt/homebrew/bin", "/usr/local/bin", "/usr/bin", "/bin" }); string currentPath = Environment.GetEnvironmentVariable("PATH") ?? string.Empty; whichPsi.EnvironmentVariables["PATH"] = string.IsNullOrEmpty(currentPath) ? prepend : (prepend + ":" + currentPath); } catch { } using var wp = Process.Start(whichPsi); string output = wp.StandardOutput.ReadToEnd().Trim(); wp.WaitForExit(3000); if (wp.ExitCode == 0 && !string.IsNullOrEmpty(output) && File.Exists(output)) { if (ValidateUvBinary(output)) return output; } } } catch { } // Manual PATH scan try { string pathEnv = Environment.GetEnvironmentVariable("PATH") ?? string.Empty; string[] parts = pathEnv.Split(Path.PathSeparator); foreach (string part in parts) { try { // Check both uv and uv.exe string candidateUv = Path.Combine(part, "uv"); string candidateUvExe = Path.Combine(part, "uv.exe"); if (File.Exists(candidateUv) && ValidateUvBinary(candidateUv)) return candidateUv; if (File.Exists(candidateUvExe) && ValidateUvBinary(candidateUvExe)) return candidateUvExe; } catch { } } } catch { } return null; } private static bool ValidateUvBinary(string uvPath) { try { var psi = new ProcessStartInfo { FileName = uvPath, Arguments = "--version", UseShellExecute = false, RedirectStandardOutput = true, RedirectStandardError = true, CreateNoWindow = true }; using var p = Process.Start(psi); if (!p.WaitForExit(5000)) { try { p.Kill(); } catch { } return false; } if (p.ExitCode == 0) { string output = p.StandardOutput.ReadToEnd().Trim(); return output.StartsWith("uv "); } } catch { } return false; } /// <summary> /// Download and install server from GitHub release (Asset Store workflow) /// </summary> public static bool DownloadAndInstallServer() { string packageVersion = AssetPathUtility.GetPackageVersion(); if (packageVersion == "unknown") { McpLog.Error("Cannot determine package version for download."); return false; } string downloadUrl = $"https://github.com/CoplayDev/unity-mcp/releases/download/v{packageVersion}/mcp-for-unity-server-v{packageVersion}.zip"; string tempZip = Path.Combine(Path.GetTempPath(), $"mcp-server-v{packageVersion}.zip"); string destRoot = Path.Combine(GetSaveLocation(), ServerFolder); try { EditorUtility.DisplayProgressBar("MCP for Unity", "Downloading server...", 0.3f); // Download using (var client = new WebClient()) { client.DownloadFile(downloadUrl, tempZip); } EditorUtility.DisplayProgressBar("MCP for Unity", "Extracting server...", 0.7f); // Kill any running UV processes string destSrc = Path.Combine(destRoot, "src"); TryKillUvForPath(destSrc); // Delete old installation if (Directory.Exists(destRoot)) { try { Directory.Delete(destRoot, recursive: true); } catch (Exception ex) { McpLog.Warn($"Could not fully delete old server: {ex.Message}"); } } // Extract to temp location first string tempExtractDir = Path.Combine(Path.GetTempPath(), $"mcp-server-extract-{Guid.NewGuid()}"); Directory.CreateDirectory(tempExtractDir); try { ZipFile.ExtractToDirectory(tempZip, tempExtractDir); // The ZIP contains UnityMcpServer~ folder, find it and move its contents string extractedServerFolder = Path.Combine(tempExtractDir, "UnityMcpServer~"); Directory.CreateDirectory(destRoot); CopyDirectoryRecursive(extractedServerFolder, destRoot); } finally { // Cleanup temp extraction directory try { if (Directory.Exists(tempExtractDir)) { Directory.Delete(tempExtractDir, recursive: true); } } catch (Exception ex) { McpLog.Warn($"Could not fully delete temp extraction directory: {ex.Message}"); } } EditorUtility.ClearProgressBar(); McpLog.Info($"Server v{packageVersion} downloaded and installed successfully!"); return true; } catch (Exception ex) { EditorUtility.ClearProgressBar(); McpLog.Error($"Failed to download server: {ex.Message}"); EditorUtility.DisplayDialog( "Download Failed", $"Could not download server from GitHub.\n\n{ex.Message}\n\nPlease check your internet connection or try again later.", "OK" ); return false; } finally { try { if (File.Exists(tempZip)) File.Delete(tempZip); } catch (Exception ex) { McpLog.Warn($"Could not delete temp zip file: {ex.Message}"); } } } /// <summary> /// Check if the package has an embedded server (Git install vs Asset Store) /// </summary> public static bool HasEmbeddedServer() { return TryGetEmbeddedServerSource(out _); } /// <summary> /// Get the installed server version from the local installation /// </summary> public static string GetInstalledServerVersion() { try { string destRoot = Path.Combine(GetSaveLocation(), ServerFolder); string versionPath = Path.Combine(destRoot, "src", VersionFileName); if (File.Exists(versionPath)) { return File.ReadAllText(versionPath)?.Trim() ?? string.Empty; } } catch (Exception ex) { McpLog.Warn($"Could not read version file: {ex.Message}"); } return string.Empty; } } } ``` -------------------------------------------------------------------------------- /.github/workflows/claude-nl-suite.yml: -------------------------------------------------------------------------------- ```yaml name: Claude NL/T Full Suite (Unity live) on: [workflow_dispatch] permissions: contents: read checks: write concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true env: UNITY_IMAGE: unityci/editor:ubuntu-2021.3.45f2-linux-il2cpp-3 jobs: nl-suite: runs-on: ubuntu-latest timeout-minutes: 60 env: JUNIT_OUT: reports/junit-nl-suite.xml MD_OUT: reports/junit-nl-suite.md steps: # ---------- Secrets check ---------- - name: Detect secrets (outputs) id: detect env: UNITY_LICENSE: ${{ secrets.UNITY_LICENSE }} UNITY_EMAIL: ${{ secrets.UNITY_EMAIL }} UNITY_PASSWORD: ${{ secrets.UNITY_PASSWORD }} UNITY_SERIAL: ${{ secrets.UNITY_SERIAL }} ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} run: | set -e if [ -n "$ANTHROPIC_API_KEY" ]; then echo "anthropic_ok=true" >> "$GITHUB_OUTPUT"; else echo "anthropic_ok=false" >> "$GITHUB_OUTPUT"; fi if [ -n "$UNITY_LICENSE" ] || { [ -n "$UNITY_EMAIL" ] && [ -n "$UNITY_PASSWORD" ]; }; then echo "unity_ok=true" >> "$GITHUB_OUTPUT" else echo "unity_ok=false" >> "$GITHUB_OUTPUT" fi - uses: actions/checkout@v4 with: fetch-depth: 0 # ---------- Python env for MCP server (uv) ---------- - uses: astral-sh/setup-uv@v4 with: python-version: "3.11" - name: Install MCP server run: | set -eux uv venv echo "VIRTUAL_ENV=$GITHUB_WORKSPACE/.venv" >> "$GITHUB_ENV" echo "$GITHUB_WORKSPACE/.venv/bin" >> "$GITHUB_PATH" if [ -f MCPForUnity/UnityMcpServer~/src/pyproject.toml ]; then uv pip install -e MCPForUnity/UnityMcpServer~/src elif [ -f MCPForUnity/UnityMcpServer~/src/requirements.txt ]; then uv pip install -r MCPForUnity/UnityMcpServer~/src/requirements.txt elif [ -f MCPForUnity/UnityMcpServer~/pyproject.toml ]; then uv pip install -e MCPForUnity/UnityMcpServer~/ elif [ -f MCPForUnity/UnityMcpServer~/requirements.txt ]; then uv pip install -r MCPForUnity/UnityMcpServer~/requirements.txt else echo "No MCP Python deps found (skipping)" fi # --- Licensing: allow both ULF and EBL when available --- - name: Decide license sources id: lic shell: bash env: UNITY_LICENSE: ${{ secrets.UNITY_LICENSE }} UNITY_EMAIL: ${{ secrets.UNITY_EMAIL }} UNITY_PASSWORD: ${{ secrets.UNITY_PASSWORD }} UNITY_SERIAL: ${{ secrets.UNITY_SERIAL }} run: | set -eu use_ulf=false; use_ebl=false [[ -n "${UNITY_LICENSE:-}" ]] && use_ulf=true [[ -n "${UNITY_EMAIL:-}" && -n "${UNITY_PASSWORD:-}" ]] && use_ebl=true echo "use_ulf=$use_ulf" >> "$GITHUB_OUTPUT" echo "use_ebl=$use_ebl" >> "$GITHUB_OUTPUT" echo "has_serial=$([[ -n "${UNITY_SERIAL:-}" ]] && echo true || echo false)" >> "$GITHUB_OUTPUT" - name: Stage Unity .ulf license (from secret) if: steps.lic.outputs.use_ulf == 'true' id: ulf env: UNITY_LICENSE: ${{ secrets.UNITY_LICENSE }} shell: bash run: | set -eu mkdir -p "$RUNNER_TEMP/unity-license-ulf" "$RUNNER_TEMP/unity-local/Unity" f="$RUNNER_TEMP/unity-license-ulf/Unity_lic.ulf" if printf "%s" "$UNITY_LICENSE" | base64 -d - >/dev/null 2>&1; then printf "%s" "$UNITY_LICENSE" | base64 -d - > "$f" else printf "%s" "$UNITY_LICENSE" > "$f" fi chmod 600 "$f" || true # If someone pasted an entitlement XML into UNITY_LICENSE by mistake, re-home it: if head -c 100 "$f" | grep -qi '<\?xml'; then mkdir -p "$RUNNER_TEMP/unity-config/Unity/licenses" mv "$f" "$RUNNER_TEMP/unity-config/Unity/licenses/UnityEntitlementLicense.xml" echo "ok=false" >> "$GITHUB_OUTPUT" elif grep -qi '<Signature>' "$f"; then # provide it in the standard local-share path too cp -f "$f" "$RUNNER_TEMP/unity-local/Unity/Unity_lic.ulf" echo "ok=true" >> "$GITHUB_OUTPUT" else echo "ok=false" >> "$GITHUB_OUTPUT" fi # --- Activate via EBL inside the same Unity image (writes host-side entitlement) --- - name: Activate Unity (EBL via container - host-mount) if: steps.lic.outputs.use_ebl == 'true' shell: bash env: UNITY_IMAGE: ${{ env.UNITY_IMAGE }} UNITY_EMAIL: ${{ secrets.UNITY_EMAIL }} UNITY_PASSWORD: ${{ secrets.UNITY_PASSWORD }} UNITY_SERIAL: ${{ secrets.UNITY_SERIAL }} run: | set -euxo pipefail # host dirs to receive the full Unity config and local-share mkdir -p "$RUNNER_TEMP/unity-config" "$RUNNER_TEMP/unity-local" # Try Pro first if serial is present, otherwise named-user EBL. docker run --rm --network host \ -e HOME=/root \ -e UNITY_EMAIL -e UNITY_PASSWORD -e UNITY_SERIAL \ -v "$RUNNER_TEMP/unity-config:/root/.config/unity3d" \ -v "$RUNNER_TEMP/unity-local:/root/.local/share/unity3d" \ "$UNITY_IMAGE" bash -lc ' set -euxo pipefail if [[ -n "${UNITY_SERIAL:-}" ]]; then /opt/unity/Editor/Unity -batchmode -nographics -logFile - \ -username "$UNITY_EMAIL" -password "$UNITY_PASSWORD" -serial "$UNITY_SERIAL" -quit || true else /opt/unity/Editor/Unity -batchmode -nographics -logFile - \ -username "$UNITY_EMAIL" -password "$UNITY_PASSWORD" -quit || true fi ls -la /root/.config/unity3d/Unity/licenses || true ' # Verify entitlement written to host mount; allow ULF-only runs to proceed if ! find "$RUNNER_TEMP/unity-config" -type f -iname "*.xml" | grep -q .; then if [[ "${{ steps.ulf.outputs.ok }}" == "true" ]]; then echo "EBL entitlement not found; proceeding with ULF-only (ok=true)." else echo "No entitlement produced and no valid ULF; cannot continue." >&2 exit 1 fi fi # EBL entitlement is already written directly to $RUNNER_TEMP/unity-config by the activation step # ---------- Warm up project (import Library once) ---------- - name: Warm up project (import Library once) if: steps.lic.outputs.use_ulf == 'true' || steps.lic.outputs.use_ebl == 'true' shell: bash env: UNITY_IMAGE: ${{ env.UNITY_IMAGE }} ULF_OK: ${{ steps.ulf.outputs.ok }} run: | set -euxo pipefail manual_args=() if [[ "${ULF_OK:-false}" == "true" ]]; then manual_args=(-manualLicenseFile "/root/.local/share/unity3d/Unity/Unity_lic.ulf") fi docker run --rm --network host \ -e HOME=/root \ -v "${{ github.workspace }}:/workspace" -w /workspace \ -v "$RUNNER_TEMP/unity-config:/root/.config/unity3d" \ -v "$RUNNER_TEMP/unity-local:/root/.local/share/unity3d" \ "$UNITY_IMAGE" /opt/unity/Editor/Unity -batchmode -nographics -logFile - \ -projectPath /workspace/TestProjects/UnityMCPTests \ "${manual_args[@]}" \ -quit # ---------- Clean old MCP status ---------- - name: Clean old MCP status run: | set -eux mkdir -p "$HOME/.unity-mcp" rm -f "$HOME/.unity-mcp"/unity-mcp-status-*.json || true # ---------- Start headless Unity (persistent bridge) ---------- - name: Start Unity (persistent bridge) if: steps.lic.outputs.use_ulf == 'true' || steps.lic.outputs.use_ebl == 'true' shell: bash env: UNITY_IMAGE: ${{ env.UNITY_IMAGE }} ULF_OK: ${{ steps.ulf.outputs.ok }} run: | set -euxo pipefail manual_args=() if [[ "${ULF_OK:-false}" == "true" ]]; then manual_args=(-manualLicenseFile "/root/.local/share/unity3d/Unity/Unity_lic.ulf") fi mkdir -p "$RUNNER_TEMP/unity-status" docker rm -f unity-mcp >/dev/null 2>&1 || true docker run -d --name unity-mcp --network host \ -e HOME=/root \ -e UNITY_MCP_ALLOW_BATCH=1 \ -e UNITY_MCP_STATUS_DIR=/root/.unity-mcp \ -e UNITY_MCP_BIND_HOST=127.0.0.1 \ -v "${{ github.workspace }}:/workspace" -w /workspace \ -v "$RUNNER_TEMP/unity-status:/root/.unity-mcp" \ -v "$RUNNER_TEMP/unity-config:/root/.config/unity3d:ro" \ -v "$RUNNER_TEMP/unity-local:/root/.local/share/unity3d:ro" \ "$UNITY_IMAGE" /opt/unity/Editor/Unity -batchmode -nographics -logFile - \ -stackTraceLogType Full \ -projectPath /workspace/TestProjects/UnityMCPTests \ "${manual_args[@]}" \ -executeMethod MCPForUnity.Editor.MCPForUnityBridge.StartAutoConnect # ---------- Wait for Unity bridge ---------- - name: Wait for Unity bridge (robust) shell: bash run: | set -euo pipefail deadline=$((SECONDS+900)) # 15 min max fatal_after=$((SECONDS+120)) # give licensing 2 min to settle # Fail fast only if container actually died st="$(docker inspect -f '{{.State.Status}} {{.State.ExitCode}}' unity-mcp 2>/dev/null || true)" case "$st" in exited*|dead*) docker logs unity-mcp --tail 200 | sed -E 's/((email|serial|license|password|token)[^[:space:]]*)/[REDACTED]/Ig'; exit 1;; esac # Patterns ok_pat='(Bridge|MCP(For)?Unity|AutoConnect).*(listening|ready|started|port|bound)' # Only truly fatal signals; allow transient "Licensing::..." chatter license_fatal='No valid Unity|License is not active|cannot load ULF|Signature element not found|Token not found|0 entitlement|Entitlement.*(failed|denied)|License (activation|return|renewal).*(failed|expired|denied)' while [ $SECONDS -lt $deadline ]; do logs="$(docker logs unity-mcp 2>&1 || true)" # 1) Primary: status JSON exposes TCP port port="$(jq -r '.unity_port // empty' "$RUNNER_TEMP"/unity-status/unity-mcp-status-*.json 2>/dev/null | head -n1 || true)" if [[ -n "${port:-}" ]] && timeout 1 bash -lc "exec 3<>/dev/tcp/127.0.0.1/$port"; then echo "Bridge ready on port $port" exit 0 fi # 2) Secondary: log markers if echo "$logs" | grep -qiE "$ok_pat"; then echo "Bridge ready (log markers)" exit 0 fi # Only treat license failures as fatal *after* warm-up if [ $SECONDS -ge $fatal_after ] && echo "$logs" | grep -qiE "$license_fatal"; then echo "::error::Fatal licensing signal detected after warm-up" echo "$logs" | tail -n 200 | sed -E 's/((email|serial|license|password|token)[^[:space:]]*)/[REDACTED]/Ig' exit 1 fi # If the container dies mid-wait, bail st="$(docker inspect -f '{{.State.Status}}' unity-mcp 2>/dev/null || true)" if [[ "$st" != "running" ]]; then echo "::error::Unity container exited during wait"; docker logs unity-mcp --tail 200 | sed -E 's/((email|serial|license|password|token)[^[:space:]]*)/[REDACTED]/Ig' exit 1 fi sleep 2 done echo "::error::Bridge not ready before deadline" docker logs unity-mcp --tail 200 | sed -E 's/((email|serial|license|password|token)[^[:space:]]*)/[REDACTED]/Ig' exit 1 # (moved) — return license after Unity is stopped # ---------- MCP client config ---------- - name: Write MCP config (.claude/mcp.json) run: | set -eux mkdir -p .claude cat > .claude/mcp.json <<JSON { "mcpServers": { "unity": { "command": "uv", "args": ["run","--active","--directory","MCPForUnity/UnityMcpServer~/src","python","server.py"], "transport": { "type": "stdio" }, "env": { "PYTHONUNBUFFERED": "1", "MCP_LOG_LEVEL": "debug", "UNITY_PROJECT_ROOT": "$GITHUB_WORKSPACE/TestProjects/UnityMCPTests", "UNITY_MCP_STATUS_DIR": "$RUNNER_TEMP/unity-status", "UNITY_MCP_HOST": "127.0.0.1" } } } } JSON - name: Pin Claude tool permissions (.claude/settings.json) run: | set -eux mkdir -p .claude cat > .claude/settings.json <<'JSON' { "permissions": { "allow": [ "mcp__unity", "Edit(reports/**)" ], "deny": [ "Bash", "MultiEdit", "WebFetch", "WebSearch", "Task", "TodoWrite", "NotebookEdit", "NotebookRead" ] } } JSON # ---------- Reports & helper ---------- - name: Prepare reports and dirs run: | set -eux rm -f reports/*.xml reports/*.md || true mkdir -p reports reports/_snapshots reports/_staging - name: Create report skeletons run: | set -eu cat > "$JUNIT_OUT" <<'XML' <?xml version="1.0" encoding="UTF-8"?> <testsuites><testsuite name="UnityMCP.NL-T" tests="1" failures="1" errors="0" skipped="0" time="0"> <testcase name="NL-Suite.Bootstrap" classname="UnityMCP.NL-T"> <failure message="bootstrap">Bootstrap placeholder; suite will append real tests.</failure> </testcase> </testsuite></testsuites> XML printf '# Unity NL/T Editing Suite Test Results\n\n' > "$MD_OUT" - name: Verify Unity bridge status/port run: | set -euxo pipefail ls -la "$RUNNER_TEMP/unity-status" || true jq -r . "$RUNNER_TEMP"/unity-status/unity-mcp-status-*.json | sed -n '1,80p' || true shopt -s nullglob status_files=("$RUNNER_TEMP"/unity-status/unity-mcp-status-*.json) if ((${#status_files[@]})); then port="$(grep -hEo '"unity_port"[[:space:]]*:[[:space:]]*[0-9]+' "${status_files[@]}" \ | sed -E 's/.*: *([0-9]+).*/\1/' | head -n1 || true)" else port="" fi echo "unity_port=$port" if [[ -n "$port" ]]; then timeout 1 bash -lc "exec 3<>/dev/tcp/127.0.0.1/$port" && echo "TCP OK" fi # (removed) Revert helper and baseline snapshot are no longer used # ---------- Run suite in two passes ---------- - name: Run Claude NL pass uses: anthropics/claude-code-base-action@beta if: steps.detect.outputs.anthropic_ok == 'true' && steps.detect.outputs.unity_ok == 'true' continue-on-error: true with: use_node_cache: false prompt_file: .claude/prompts/nl-unity-suite-nl.md mcp_config: .claude/mcp.json settings: .claude/settings.json allowed_tools: "mcp__unity,Edit(reports/**),MultiEdit(reports/**)" disallowed_tools: "Bash,WebFetch,WebSearch,Task,TodoWrite,NotebookEdit,NotebookRead" model: claude-3-7-sonnet-20250219 append_system_prompt: | You are running the NL pass only. - Emit exactly NL-0, NL-1, NL-2, NL-3, NL-4. - Write each to reports/${ID}_results.xml. - Prefer a single MultiEdit(reports/**) batch. Do not emit any T-* tests. - Stop after NL-4_results.xml is written. timeout_minutes: "30" anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }} - name: Run Claude T pass A-J uses: anthropics/claude-code-base-action@beta if: steps.detect.outputs.anthropic_ok == 'true' && steps.detect.outputs.unity_ok == 'true' continue-on-error: true with: use_node_cache: false prompt_file: .claude/prompts/nl-unity-suite-t.md mcp_config: .claude/mcp.json settings: .claude/settings.json allowed_tools: "mcp__unity,Edit(reports/**),MultiEdit(reports/**)" disallowed_tools: "Bash,WebFetch,WebSearch,Task,TodoWrite,NotebookEdit,NotebookRead" model: claude-3-5-haiku-20241022 append_system_prompt: | You are running the T pass (A–J) only. Output requirements: - Emit exactly 10 test fragments: T-A, T-B, T-C, T-D, T-E, T-F, T-G, T-H, T-I, T-J. - Write each fragment to reports/${ID}_results.xml (e.g., T-A_results.xml). - Prefer a single MultiEdit(reports/**) call that writes all ten files in one batch. - If MultiEdit is not used, emit individual writes for any missing IDs until all ten exist. - Do not emit any NL-* fragments. Stop condition: - After T-J_results.xml is written, stop. timeout_minutes: "30" anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }} # (moved) Assert T coverage after staged fragments are promoted - name: Check T coverage incomplete (pre-retry) id: t_cov if: always() shell: bash run: | set -euo pipefail missing=() for id in T-A T-B T-C T-D T-E T-F T-G T-H T-I T-J; do if [[ ! -s "reports/${id}_results.xml" && ! -s "reports/_staging/${id}_results.xml" ]]; then missing+=("$id") fi done echo "missing=${#missing[@]}" >> "$GITHUB_OUTPUT" if (( ${#missing[@]} )); then echo "list=${missing[*]}" >> "$GITHUB_OUTPUT" fi - name: Retry T pass (Sonnet) if incomplete if: steps.t_cov.outputs.missing != '0' uses: anthropics/claude-code-base-action@beta with: use_node_cache: false prompt_file: .claude/prompts/nl-unity-suite-t.md mcp_config: .claude/mcp.json settings: .claude/settings.json allowed_tools: "mcp__unity,Edit(reports/**),MultiEdit(reports/**)" disallowed_tools: "Bash,MultiEdit(/!(reports/**)),WebFetch,WebSearch,Task,TodoWrite,NotebookEdit,NotebookRead" model: claude-3-7-sonnet-20250219 fallback_model: claude-3-5-haiku-20241022 append_system_prompt: | You are running the T pass only. Output requirements: - Emit exactly 10 test fragments: T-A, T-B, T-C, T-D, T-E, T-F, T-G, T-H, T-I, T-J. - Write each fragment to reports/${ID}_results.xml (e.g., T-A_results.xml). - Prefer a single MultiEdit(reports/**) call that writes all ten files in one batch. - If MultiEdit is not used, emit individual writes for any missing IDs until all ten exist. - Do not emit any NL-* fragments. Stop condition: - After T-J_results.xml is written, stop. timeout_minutes: "30" anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }} - name: Re-assert T coverage (post-retry) if: always() shell: bash run: | set -euo pipefail missing=() for id in T-A T-B T-C T-D T-E T-F T-G T-H T-I T-J; do [[ -s "reports/${id}_results.xml" ]] || missing+=("$id") done if (( ${#missing[@]} )); then echo "::error::Still missing T fragments: ${missing[*]}" exit 1 fi # (kept) Finalize staged report fragments (promote to reports/) # (removed duplicate) Finalize staged report fragments - name: Assert T coverage (after promotion) if: always() shell: bash run: | set -euo pipefail missing=() for id in T-A T-B T-C T-D T-E T-F T-G T-H T-I T-J; do if [[ ! -s "reports/${id}_results.xml" ]]; then # Accept staged fragment as present [[ -s "reports/_staging/${id}_results.xml" ]] || missing+=("$id") fi done if (( ${#missing[@]} )); then echo "::error::Missing T fragments: ${missing[*]}" exit 1 fi - name: Canonicalize testcase names (NL/T prefixes) if: always() shell: bash run: | python3 - <<'PY' from pathlib import Path import xml.etree.ElementTree as ET, re, os RULES = [ ("NL-0", r"\b(NL-0|Baseline|State\s*Capture)\b"), ("NL-1", r"\b(NL-1|Core\s*Method)\b"), ("NL-2", r"\b(NL-2|Anchor|Build\s*marker)\b"), ("NL-3", r"\b(NL-3|End[-\s]*of[-\s]*Class\s*Content|Tail\s*test\s*[ABC])\b"), ("NL-4", r"\b(NL-4|Console|Unity\s*console)\b"), ("T-A", r"\b(T-?A|Temporary\s*Helper)\b"), ("T-B", r"\b(T-?B|Method\s*Body\s*Interior)\b"), ("T-C", r"\b(T-?C|Different\s*Method\s*Interior|ApplyBlend)\b"), ("T-D", r"\b(T-?D|End[-\s]*of[-\s]*Class\s*Helper|TestHelper)\b"), ("T-E", r"\b(T-?E|Method\s*Evolution|Counter|IncrementCounter)\b"), ("T-F", r"\b(T-?F|Atomic\s*Multi[-\s]*Edit)\b"), ("T-G", r"\b(T-?G|Path\s*Normalization)\b"), ("T-H", r"\b(T-?H|Validation\s*on\s*Modified)\b"), ("T-I", r"\b(T-?I|Failure\s*Surface)\b"), ("T-J", r"\b(T-?J|Idempotenc(y|e))\b"), ] def canon_name(name: str) -> str: n = name or "" for tid, pat in RULES: if re.search(pat, n, flags=re.I): # If it already starts with the correct format, leave it alone if re.match(rf'^\s*{re.escape(tid)}\s*[—–-]', n, flags=re.I): return n.strip() # If it has a different separator, extract title and reformat title_match = re.search(rf'{re.escape(tid)}\s*[:.\-–—]\s*(.+)', n, flags=re.I) if title_match: title = title_match.group(1).strip() return f"{tid} — {title}" # Otherwise, just return the canonical ID return tid return n def id_from_filename(p: Path): n = p.name m = re.match(r'NL(\d+)_results\.xml$', n, re.I) if m: return f"NL-{int(m.group(1))}" m = re.match(r'T([A-J])_results\.xml$', n, re.I) if m: return f"T-{m.group(1).upper()}" return None frags = list(sorted(Path("reports").glob("*_results.xml"))) for frag in frags: try: tree = ET.parse(frag); root = tree.getroot() except Exception: continue if root.tag != "testcase": continue file_id = id_from_filename(frag) old = root.get("name") or "" # Prefer filename-derived ID; if name doesn't start with it, override if file_id: # Respect file's ID (prevents T-D being renamed to NL-3 by loose patterns) title = re.sub(r'^\s*(NL-\d+|T-[A-Z])\s*[—–:\-]\s*', '', old).strip() new = f"{file_id} — {title}" if title else file_id else: new = canon_name(old) if new != old and new: root.set("name", new) tree.write(frag, encoding="utf-8", xml_declaration=False) print(f'canon: {frag.name}: "{old}" -> "{new}"') # Note: Do not auto-relable fragments. We rely on per-test strict emission # and the backfill step to surface missing tests explicitly. PY - name: Backfill missing NL/T tests (fail placeholders) if: always() shell: bash run: | python3 - <<'PY' from pathlib import Path import xml.etree.ElementTree as ET import re DESIRED = ["NL-0","NL-1","NL-2","NL-3","NL-4","T-A","T-B","T-C","T-D","T-E","T-F","T-G","T-H","T-I","T-J"] seen = set() def id_from_filename(p: Path): n = p.name m = re.match(r'NL(\d+)_results\.xml$', n, re.I) if m: return f"NL-{int(m.group(1))}" m = re.match(r'T([A-J])_results\.xml$', n, re.I) if m: return f"T-{m.group(1).upper()}" return None for p in Path("reports").glob("*_results.xml"): try: r = ET.parse(p).getroot() except Exception: continue # Count by filename id primarily; fall back to testcase name if needed fid = id_from_filename(p) if fid in DESIRED: seen.add(fid) continue if r.tag == "testcase": name = (r.get("name") or "").strip() for d in DESIRED: if name.startswith(d): seen.add(d) break Path("reports").mkdir(parents=True, exist_ok=True) for d in DESIRED: if d in seen: continue frag = Path(f"reports/{d}_results.xml") tc = ET.Element("testcase", {"classname":"UnityMCP.NL-T", "name": d}) fail = ET.SubElement(tc, "failure", {"message":"not produced"}) fail.text = "The agent did not emit a fragment for this test." ET.ElementTree(tc).write(frag, encoding="utf-8", xml_declaration=False) print(f"backfill: {d}") PY - name: "Debug: list testcase names" if: always() run: | python3 - <<'PY' from pathlib import Path import xml.etree.ElementTree as ET for p in sorted(Path('reports').glob('*_results.xml')): try: r = ET.parse(p).getroot() if r.tag == 'testcase': print(f"{p.name}: {(r.get('name') or '').strip()}") except Exception: pass PY # ---------- Merge testcase fragments into JUnit ---------- - name: Normalize/assemble JUnit in-place (single file) if: always() shell: bash run: | python3 - <<'PY' from pathlib import Path import xml.etree.ElementTree as ET import re, os def localname(tag: str) -> str: return tag.rsplit('}', 1)[-1] if '}' in tag else tag src = Path(os.environ.get('JUNIT_OUT', 'reports/junit-nl-suite.xml')) if not src.exists(): raise SystemExit(0) tree = ET.parse(src) root = tree.getroot() suite = root.find('./*') if localname(root.tag) == 'testsuites' else root if suite is None: raise SystemExit(0) def id_from_filename(p: Path): n = p.name m = re.match(r'NL(\d+)_results\.xml$', n, re.I) if m: return f"NL-{int(m.group(1))}" m = re.match(r'T([A-J])_results\.xml$', n, re.I) if m: return f"T-{m.group(1).upper()}" return None def id_from_system_out(tc): so = tc.find('system-out') if so is not None and so.text: m = re.search(r'\b(NL-\d+|T-[A-Z])\b', so.text) if m: return m.group(1) return None fragments = sorted(Path('reports').glob('*_results.xml')) added = 0 renamed = 0 for frag in fragments: tcs = [] try: froot = ET.parse(frag).getroot() if localname(froot.tag) == 'testcase': tcs = [froot] else: tcs = list(froot.findall('.//testcase')) except Exception: txt = Path(frag).read_text(encoding='utf-8', errors='replace') # Extract all testcase nodes from raw text nodes = re.findall(r'<testcase[\s\S]*?</testcase>', txt, flags=re.DOTALL) for m in nodes: try: tcs.append(ET.fromstring(m)) except Exception: pass # Guard: keep only the first testcase from each fragment if len(tcs) > 1: tcs = tcs[:1] test_id = id_from_filename(frag) for tc in tcs: current_name = tc.get('name') or '' tid = test_id or id_from_system_out(tc) # Enforce filename-derived ID as prefix; repair names if needed if tid and not re.match(r'^\s*(NL-\d+|T-[A-Z])\b', current_name): title = current_name.strip() new_name = f'{tid} — {title}' if title else tid tc.set('name', new_name) elif tid and not re.match(rf'^\s*{re.escape(tid)}\b', current_name): # Replace any wrong leading ID with the correct one title = re.sub(r'^\s*(NL-\d+|T-[A-Z])\s*[—–:\-]\s*', '', current_name).strip() new_name = f'{tid} — {title}' if title else tid tc.set('name', new_name) renamed += 1 suite.append(tc) added += 1 if added: # Drop bootstrap placeholder and recompute counts for tc in list(suite.findall('.//testcase')): if (tc.get('name') or '') == 'NL-Suite.Bootstrap': suite.remove(tc) testcases = suite.findall('.//testcase') failures_cnt = sum(1 for tc in testcases if (tc.find('failure') is not None or tc.find('error') is not None)) suite.set('tests', str(len(testcases))) suite.set('failures', str(failures_cnt)) suite.set('errors', '0') suite.set('skipped', '0') tree.write(src, encoding='utf-8', xml_declaration=True) print(f"Appended {added} testcase(s); renamed {renamed} to canonical NL/T names.") PY # ---------- Markdown summary from JUnit ---------- - name: Build markdown summary from JUnit if: always() shell: bash run: | python3 - <<'PY' import xml.etree.ElementTree as ET from pathlib import Path import os, html, re def localname(tag: str) -> str: return tag.rsplit('}', 1)[-1] if '}' in tag else tag src = Path(os.environ.get('JUNIT_OUT', 'reports/junit-nl-suite.xml')) md_out = Path(os.environ.get('MD_OUT', 'reports/junit-nl-suite.md')) md_out.parent.mkdir(parents=True, exist_ok=True) if not src.exists(): md_out.write_text("# Unity NL/T Editing Suite Test Results\n\n(No JUnit found)\n", encoding='utf-8') raise SystemExit(0) tree = ET.parse(src) root = tree.getroot() suite = root.find('./*') if localname(root.tag) == 'testsuites' else root cases = [] if suite is None else list(suite.findall('.//testcase')) def id_from_case(tc): n = (tc.get('name') or '') m = re.match(r'\s*(NL-\d+|T-[A-Z])\b', n) if m: return m.group(1) so = tc.find('system-out') if so is not None and so.text: m = re.search(r'\b(NL-\d+|T-[A-Z])\b', so.text) if m: return m.group(1) return None id_status = {} name_map = {} for tc in cases: tid = id_from_case(tc) ok = (tc.find('failure') is None and tc.find('error') is None) if tid and tid not in id_status: id_status[tid] = ok name_map[tid] = (tc.get('name') or tid) desired = ['NL-0','NL-1','NL-2','NL-3','NL-4','T-A','T-B','T-C','T-D','T-E','T-F','T-G','T-H','T-I','T-J'] total = len(cases) failures = sum(1 for tc in cases if (tc.find('failure') is not None or tc.find('error') is not None)) passed = total - failures lines = [] lines += [ '# Unity NL/T Editing Suite Test Results', '', f'Totals: {passed} passed, {failures} failed, {total} total', '', '## Test Checklist' ] for p in desired: st = id_status.get(p, None) lines.append(f"- [x] {p}" if st is True else (f"- [ ] {p} (fail)" if st is False else f"- [ ] {p} (not run)")) lines.append('') lines.append('## Test Details') def order_key(n: str): if n.startswith('NL-'): try: return (0, int(n.split('-')[1])) except: return (0, 999) if n.startswith('T-') and len(n) > 2: return (1, ord(n[2])) return (2, n) MAX_CHARS = 2000 seen = set() for tid in sorted(id_status.keys(), key=order_key): seen.add(tid) tc = next((c for c in cases if (id_from_case(c) == tid)), None) if not tc: continue title = name_map.get(tid, tid) status_badge = "PASS" if id_status[tid] else "FAIL" lines.append(f"### {title} — {status_badge}") so = tc.find('system-out') text = '' if so is None or so.text is None else html.unescape(so.text.replace('\r\n','\n')) if text.strip(): t = text.strip() if len(t) > MAX_CHARS: t = t[:MAX_CHARS] + "\n…(truncated)" fence = '```' if '```' not in t else '````' lines += [fence, t, fence] else: lines.append('(no system-out)') node = tc.find('failure') or tc.find('error') if node is not None: msg = (node.get('message') or '').strip() body = (node.text or '').strip() if msg: lines.append(f"- Message: {msg}") if body: lines.append(f"- Detail: {body.splitlines()[0][:500]}") lines.append('') for tc in cases: if id_from_case(tc) in seen: continue title = tc.get('name') or '(unnamed)' status_badge = "PASS" if (tc.find('failure') is None and tc.find('error') is None) else "FAIL" lines.append(f"### {title} — {status_badge}") lines.append('(unmapped test id)') lines.append('') md_out.write_text('\n'.join(lines), encoding='utf-8') PY - name: "Debug: list report files" if: always() shell: bash run: | set -eux ls -la reports || true shopt -s nullglob for f in reports/*.xml; do echo "===== $f =====" head -n 40 "$f" || true done # ---------- Collect execution transcript (if present) ---------- - name: Collect action execution transcript if: always() shell: bash run: | set -eux if [ -f "$RUNNER_TEMP/claude-execution-output.json" ]; then cp "$RUNNER_TEMP/claude-execution-output.json" reports/claude-execution-output.json elif [ -f "/home/runner/work/_temp/claude-execution-output.json" ]; then cp "/home/runner/work/_temp/claude-execution-output.json" reports/claude-execution-output.json fi - name: Sanitize markdown (normalize newlines) if: always() run: | set -eu python3 - <<'PY' from pathlib import Path rp=Path('reports'); rp.mkdir(parents=True, exist_ok=True) for p in rp.glob('*.md'): b=p.read_bytes().replace(b'\x00', b'') s=b.decode('utf-8','replace').replace('\r\n','\n') p.write_text(s, encoding='utf-8', newline='\n') PY - name: NL/T details -> Job Summary if: always() run: | echo "## Unity NL/T Editing Suite — Summary" >> $GITHUB_STEP_SUMMARY python3 - <<'PY' >> $GITHUB_STEP_SUMMARY from pathlib import Path p = Path('reports/junit-nl-suite.md') if p.exists(): text = p.read_bytes().decode('utf-8', 'replace') MAX = 65000 print(text[:MAX]) if len(text) > MAX: print("\n\n_…truncated; full report in artifacts._") else: print("_No markdown report found._") PY - name: Fallback JUnit if missing if: always() run: | set -eu mkdir -p reports if [ ! -f "$JUNIT_OUT" ]; then printf '%s\n' \ '<?xml version="1.0" encoding="UTF-8"?>' \ '<testsuite name="UnityMCP.NL-T" tests="1" failures="1" time="0">' \ ' <testcase classname="UnityMCP.NL-T" name="NL-Suite.Execution" time="0.0">' \ ' <failure><![CDATA[No JUnit was produced by the NL suite step. See the step logs.]]></failure>' \ ' </testcase>' \ '</testsuite>' \ > "$JUNIT_OUT" fi - name: Publish JUnit report if: always() uses: mikepenz/action-junit-report@v5 with: report_paths: "${{ env.JUNIT_OUT }}" include_passed: true detailed_summary: true annotate_notice: true require_tests: false fail_on_parse_error: true - name: Upload artifacts (reports + fragments + transcript) if: always() uses: actions/upload-artifact@v4 with: name: claude-nl-suite-artifacts path: | ${{ env.JUNIT_OUT }} ${{ env.MD_OUT }} reports/*_results.xml reports/claude-execution-output.json retention-days: 7 # ---------- Always stop Unity ---------- - name: Stop Unity if: always() run: | docker logs --tail 400 unity-mcp | sed -E 's/((email|serial|license|password|token)[^[:space:]]*)/[REDACTED]/ig' || true docker rm -f unity-mcp || true - name: Return Pro license (if used) if: always() && steps.lic.outputs.use_ebl == 'true' && steps.lic.outputs.has_serial == 'true' uses: game-ci/unity-return-license@v2 continue-on-error: true env: UNITY_EMAIL: ${{ secrets.UNITY_EMAIL }} UNITY_PASSWORD: ${{ secrets.UNITY_PASSWORD }} UNITY_SERIAL: ${{ secrets.UNITY_SERIAL }} ``` -------------------------------------------------------------------------------- /MCPForUnity/UnityMcpServer~/src/tools/script_apply_edits.py: -------------------------------------------------------------------------------- ```python import base64 import hashlib import re from typing import Annotated, Any from mcp.server.fastmcp import Context from registry import mcp_for_unity_tool from unity_connection import send_command_with_retry def _apply_edits_locally(original_text: str, edits: list[dict[str, Any]]) -> str: text = original_text for edit in edits or []: op = ( (edit.get("op") or edit.get("operation") or edit.get("type") or edit.get("mode") or "") .strip() .lower() ) if not op: allowed = "anchor_insert, prepend, append, replace_range, regex_replace" raise RuntimeError( f"op is required; allowed: {allowed}. Use 'op' (aliases accepted: type/mode/operation)." ) if op == "prepend": prepend_text = edit.get("text", "") text = (prepend_text if prepend_text.endswith( "\n") else prepend_text + "\n") + text elif op == "append": append_text = edit.get("text", "") if not text.endswith("\n"): text += "\n" text += append_text if not text.endswith("\n"): text += "\n" elif op == "anchor_insert": anchor = edit.get("anchor", "") position = (edit.get("position") or "before").lower() insert_text = edit.get("text", "") flags = re.MULTILINE | ( re.IGNORECASE if edit.get("ignore_case") else 0) # Find the best match using improved heuristics match = _find_best_anchor_match( anchor, text, flags, bool(edit.get("prefer_last", True))) if not match: if edit.get("allow_noop", True): continue raise RuntimeError(f"anchor not found: {anchor}") idx = match.start() if position == "before" else match.end() text = text[:idx] + insert_text + text[idx:] elif op == "replace_range": start_line = int(edit.get("startLine", 1)) start_col = int(edit.get("startCol", 1)) end_line = int(edit.get("endLine", start_line)) end_col = int(edit.get("endCol", 1)) replacement = edit.get("text", "") lines = text.splitlines(keepends=True) max_line = len(lines) + 1 # 1-based, exclusive end if (start_line < 1 or end_line < start_line or end_line > max_line or start_col < 1 or end_col < 1): raise RuntimeError("replace_range out of bounds") def index_of(line: int, col: int) -> int: if line <= len(lines): return sum(len(l) for l in lines[: line - 1]) + (col - 1) return sum(len(l) for l in lines) a = index_of(start_line, start_col) b = index_of(end_line, end_col) text = text[:a] + replacement + text[b:] elif op == "regex_replace": pattern = edit.get("pattern", "") repl = edit.get("replacement", "") # Translate $n backrefs (our input) to Python \g<n> repl_py = re.sub(r"\$(\d+)", r"\\g<\1>", repl) count = int(edit.get("count", 0)) # 0 = replace all flags = re.MULTILINE if edit.get("ignore_case"): flags |= re.IGNORECASE text = re.sub(pattern, repl_py, text, count=count, flags=flags) else: allowed = "anchor_insert, prepend, append, replace_range, regex_replace" raise RuntimeError( f"unknown edit op: {op}; allowed: {allowed}. Use 'op' (aliases accepted: type/mode/operation).") return text def _find_best_anchor_match(pattern: str, text: str, flags: int, prefer_last: bool = True): """ Find the best anchor match using improved heuristics. For patterns like \\s*}\\s*$ that are meant to find class-ending braces, this function uses heuristics to choose the most semantically appropriate match: 1. If prefer_last=True, prefer the last match (common for class-end insertions) 2. Use indentation levels to distinguish class vs method braces 3. Consider context to avoid matches inside strings/comments Args: pattern: Regex pattern to search for text: Text to search in flags: Regex flags prefer_last: If True, prefer the last match over the first Returns: Match object of the best match, or None if no match found """ # Find all matches matches = list(re.finditer(pattern, text, flags)) if not matches: return None # If only one match, return it if len(matches) == 1: return matches[0] # For patterns that look like they're trying to match closing braces at end of lines is_closing_brace_pattern = '}' in pattern and ( '$' in pattern or pattern.endswith(r'\s*')) if is_closing_brace_pattern and prefer_last: # Use heuristics to find the best closing brace match return _find_best_closing_brace_match(matches, text) # Default behavior: use last match if prefer_last, otherwise first match return matches[-1] if prefer_last else matches[0] def _find_best_closing_brace_match(matches, text: str): """ Find the best closing brace match using C# structure heuristics. Enhanced heuristics for scope-aware matching: 1. Prefer matches with lower indentation (likely class-level) 2. Prefer matches closer to end of file 3. Avoid matches that seem to be inside method bodies 4. For #endregion patterns, ensure class-level context 5. Validate insertion point is at appropriate scope Args: matches: List of regex match objects text: The full text being searched Returns: The best match object """ if not matches: return None scored_matches = [] lines = text.splitlines() for match in matches: score = 0 start_pos = match.start() # Find which line this match is on lines_before = text[:start_pos].count('\n') line_num = lines_before if line_num < len(lines): line_content = lines[line_num] # Calculate indentation level (lower is better for class braces) indentation = len(line_content) - len(line_content.lstrip()) # Prefer lower indentation (class braces are typically less indented than method braces) # Max 20 points for indentation=0 score += max(0, 20 - indentation) # Prefer matches closer to end of file (class closing braces are typically at the end) distance_from_end = len(lines) - line_num # More points for being closer to end score += max(0, 10 - distance_from_end) # Look at surrounding context to avoid method braces context_start = max(0, line_num - 3) context_end = min(len(lines), line_num + 2) context_lines = lines[context_start:context_end] # Penalize if this looks like it's inside a method (has method-like patterns above) for context_line in context_lines: if re.search(r'\b(void|public|private|protected)\s+\w+\s*\(', context_line): score -= 5 # Penalty for being near method signatures # Bonus if this looks like a class-ending brace (very minimal indentation and near EOF) if indentation <= 4 and distance_from_end <= 3: score += 15 # Bonus for likely class-ending brace scored_matches.append((score, match)) # Return the match with the highest score scored_matches.sort(key=lambda x: x[0], reverse=True) best_match = scored_matches[0][1] return best_match def _infer_class_name(script_name: str) -> str: # Default to script name as class name (common Unity pattern) return (script_name or "").strip() def _extract_code_after(keyword: str, request: str) -> str: # Deprecated with NL removal; retained as no-op for compatibility idx = request.lower().find(keyword) if idx >= 0: return request[idx + len(keyword):].strip() return "" # Removed _is_structurally_balanced - validation now handled by C# side using Unity's compiler services def _normalize_script_locator(name: str, path: str) -> tuple[str, str]: """Best-effort normalization of script "name" and "path". Accepts any of: - name = "SmartReach", path = "Assets/Scripts/Interaction" - name = "SmartReach.cs", path = "Assets/Scripts/Interaction" - name = "Assets/Scripts/Interaction/SmartReach.cs", path = "" - path = "Assets/Scripts/Interaction/SmartReach.cs" (name empty) - name or path using uri prefixes: unity://path/..., file://... - accidental duplicates like "Assets/.../SmartReach.cs/SmartReach.cs" Returns (name_without_extension, directory_path_under_Assets). """ n = (name or "").strip() p = (path or "").strip() def strip_prefix(s: str) -> str: if s.startswith("unity://path/"): return s[len("unity://path/"):] if s.startswith("file://"): return s[len("file://"):] return s def collapse_duplicate_tail(s: str) -> str: # Collapse trailing "/X.cs/X.cs" to "/X.cs" parts = s.split("/") if len(parts) >= 2 and parts[-1] == parts[-2]: parts = parts[:-1] return "/".join(parts) # Prefer a full path if provided in either field candidate = "" for v in (n, p): v2 = strip_prefix(v) if v2.endswith(".cs") or v2.startswith("Assets/"): candidate = v2 break if candidate: candidate = collapse_duplicate_tail(candidate) # If a directory was passed in path and file in name, join them if not candidate.endswith(".cs") and n.endswith(".cs"): v2 = strip_prefix(n) candidate = (candidate.rstrip("/") + "/" + v2.split("/")[-1]) if candidate.endswith(".cs"): parts = candidate.split("/") file_name = parts[-1] dir_path = "/".join(parts[:-1]) if len(parts) > 1 else "Assets" base = file_name[:- 3] if file_name.lower().endswith(".cs") else file_name return base, dir_path # Fall back: remove extension from name if present and return given path base_name = n[:-3] if n.lower().endswith(".cs") else n return base_name, (p or "Assets") def _with_norm(resp: dict[str, Any] | Any, edits: list[dict[str, Any]], routing: str | None = None) -> dict[str, Any] | Any: if not isinstance(resp, dict): return resp data = resp.setdefault("data", {}) data.setdefault("normalizedEdits", edits) if routing: data["routing"] = routing return resp def _err(code: str, message: str, *, expected: dict[str, Any] | None = None, rewrite: dict[str, Any] | None = None, normalized: list[dict[str, Any]] | None = None, routing: str | None = None, extra: dict[str, Any] | None = None) -> dict[str, Any]: payload: dict[str, Any] = {"success": False, "code": code, "message": message} data: dict[str, Any] = {} if expected: data["expected"] = expected if rewrite: data["rewrite_suggestion"] = rewrite if normalized is not None: data["normalizedEdits"] = normalized if routing: data["routing"] = routing if extra: data.update(extra) if data: payload["data"] = data return payload # Natural-language parsing removed; clients should send structured edits. @mcp_for_unity_tool(name="script_apply_edits", description=( """Structured C# edits (methods/classes) with safer boundaries - prefer this over raw text. Best practices: - Prefer anchor_* ops for pattern-based insert/replace near stable markers - Use replace_method/delete_method for whole-method changes (keeps signatures balanced) - Avoid whole-file regex deletes; validators will guard unbalanced braces - For tail insertions, prefer anchor/regex_replace on final brace (class closing) - Pass options.validate='standard' for structural checks; 'relaxed' for interior-only edits Canonical fields (use these exact keys): - op: replace_method | insert_method | delete_method | anchor_insert | anchor_delete | anchor_replace - className: string (defaults to 'name' if omitted on method/class ops) - methodName: string (required for replace_method, delete_method) - replacement: string (required for replace_method, insert_method) - position: start | end | after | before (insert_method only) - afterMethodName / beforeMethodName: string (required when position='after'/'before') - anchor: regex string (for anchor_* ops) - text: string (for anchor_insert/anchor_replace) Examples: 1) Replace a method: { "name": "SmartReach", "path": "Assets/Scripts/Interaction", "edits": [ { "op": "replace_method", "className": "SmartReach", "methodName": "HasTarget", "replacement": "public bool HasTarget(){ return currentTarget!=null; }" } ], "options": {"validate": "standard", "refresh": "immediate"} } "2) Insert a method after another: { "name": "SmartReach", "path": "Assets/Scripts/Interaction", "edits": [ { "op": "insert_method", "className": "SmartReach", "replacement": "public void PrintSeries(){ Debug.Log(seriesName); }", "position": "after", "afterMethodName": "GetCurrentTarget" } ], } ]""" )) def script_apply_edits( ctx: Context, name: Annotated[str, "Name of the script to edit"], path: Annotated[str, "Path to the script to edit under Assets/ directory"], edits: Annotated[list[dict[str, Any]], "List of edits to apply to the script"], options: Annotated[dict[str, Any], "Options for the script edit"] | None = None, script_type: Annotated[str, "Type of the script to edit"] = "MonoBehaviour", namespace: Annotated[str, "Namespace of the script to edit"] | None = None, ) -> dict[str, Any]: ctx.info(f"Processing script_apply_edits: {name}") # Normalize locator first so downstream calls target the correct script file. name, path = _normalize_script_locator(name, path) # Normalize unsupported or aliased ops to known structured/text paths def _unwrap_and_alias(edit: dict[str, Any]) -> dict[str, Any]: # Unwrap single-key wrappers like {"replace_method": {...}} for wrapper_key in ( "replace_method", "insert_method", "delete_method", "replace_class", "delete_class", "anchor_insert", "anchor_replace", "anchor_delete", ): if wrapper_key in edit and isinstance(edit[wrapper_key], dict): inner = dict(edit[wrapper_key]) inner["op"] = wrapper_key edit = inner break e = dict(edit) op = (e.get("op") or e.get("operation") or e.get( "type") or e.get("mode") or "").strip().lower() if op: e["op"] = op # Common field aliases if "class_name" in e and "className" not in e: e["className"] = e.pop("class_name") if "class" in e and "className" not in e: e["className"] = e.pop("class") if "method_name" in e and "methodName" not in e: e["methodName"] = e.pop("method_name") # Some clients use a generic 'target' for method name if "target" in e and "methodName" not in e: e["methodName"] = e.pop("target") if "method" in e and "methodName" not in e: e["methodName"] = e.pop("method") if "new_content" in e and "replacement" not in e: e["replacement"] = e.pop("new_content") if "newMethod" in e and "replacement" not in e: e["replacement"] = e.pop("newMethod") if "new_method" in e and "replacement" not in e: e["replacement"] = e.pop("new_method") if "content" in e and "replacement" not in e: e["replacement"] = e.pop("content") if "after" in e and "afterMethodName" not in e: e["afterMethodName"] = e.pop("after") if "after_method" in e and "afterMethodName" not in e: e["afterMethodName"] = e.pop("after_method") if "before" in e and "beforeMethodName" not in e: e["beforeMethodName"] = e.pop("before") if "before_method" in e and "beforeMethodName" not in e: e["beforeMethodName"] = e.pop("before_method") # anchor_method → before/after based on position (default after) if "anchor_method" in e: anchor = e.pop("anchor_method") pos = (e.get("position") or "after").strip().lower() if pos == "before" and "beforeMethodName" not in e: e["beforeMethodName"] = anchor elif "afterMethodName" not in e: e["afterMethodName"] = anchor if "anchorText" in e and "anchor" not in e: e["anchor"] = e.pop("anchorText") if "pattern" in e and "anchor" not in e and e.get("op") and e["op"].startswith("anchor_"): e["anchor"] = e.pop("pattern") if "newText" in e and "text" not in e: e["text"] = e.pop("newText") # CI compatibility (T‑A/T‑E): # Accept method-anchored anchor_insert and upgrade to insert_method # Example incoming shape: # {"op":"anchor_insert","afterMethodName":"GetCurrentTarget","text":"..."} if ( e.get("op") == "anchor_insert" and not e.get("anchor") and (e.get("afterMethodName") or e.get("beforeMethodName")) ): e["op"] = "insert_method" if "replacement" not in e: e["replacement"] = e.get("text", "") # LSP-like range edit -> replace_range if "range" in e and isinstance(e["range"], dict): rng = e.pop("range") start = rng.get("start", {}) end = rng.get("end", {}) # Convert 0-based to 1-based line/col e["op"] = "replace_range" e["startLine"] = int(start.get("line", 0)) + 1 e["startCol"] = int(start.get("character", 0)) + 1 e["endLine"] = int(end.get("line", 0)) + 1 e["endCol"] = int(end.get("character", 0)) + 1 if "newText" in edit and "text" not in e: e["text"] = edit.get("newText", "") return e normalized_edits: list[dict[str, Any]] = [] for raw in edits or []: e = _unwrap_and_alias(raw) op = (e.get("op") or e.get("operation") or e.get( "type") or e.get("mode") or "").strip().lower() # Default className to script name if missing on structured method/class ops if op in ("replace_class", "delete_class", "replace_method", "delete_method", "insert_method") and not e.get("className"): e["className"] = name # Map common aliases for text ops if op in ("text_replace",): e["op"] = "replace_range" normalized_edits.append(e) continue if op in ("regex_delete",): e["op"] = "regex_replace" e.setdefault("text", "") normalized_edits.append(e) continue if op == "regex_replace" and ("replacement" not in e): if "text" in e: e["replacement"] = e.get("text", "") elif "insert" in e or "content" in e: e["replacement"] = e.get( "insert") or e.get("content") or "" if op == "anchor_insert" and not (e.get("text") or e.get("insert") or e.get("content") or e.get("replacement")): e["op"] = "anchor_delete" normalized_edits.append(e) continue normalized_edits.append(e) edits = normalized_edits normalized_for_echo = edits # Validate required fields and produce machine-parsable hints def error_with_hint(message: str, expected: dict[str, Any], suggestion: dict[str, Any]) -> dict[str, Any]: return _err("missing_field", message, expected=expected, rewrite=suggestion, normalized=normalized_for_echo) for e in edits or []: op = e.get("op", "") if op == "replace_method": if not e.get("methodName"): return error_with_hint( "replace_method requires 'methodName'.", {"op": "replace_method", "required": [ "className", "methodName", "replacement"]}, {"edits[0].methodName": "HasTarget"} ) if not (e.get("replacement") or e.get("text")): return error_with_hint( "replace_method requires 'replacement' (inline or base64).", {"op": "replace_method", "required": [ "className", "methodName", "replacement"]}, {"edits[0].replacement": "public bool X(){ return true; }"} ) elif op == "insert_method": if not (e.get("replacement") or e.get("text")): return error_with_hint( "insert_method requires a non-empty 'replacement'.", {"op": "insert_method", "required": ["className", "replacement"], "position": { "after_requires": "afterMethodName", "before_requires": "beforeMethodName"}}, {"edits[0].replacement": "public void PrintSeries(){ Debug.Log(\"1,2,3\"); }"} ) pos = (e.get("position") or "").lower() if pos == "after" and not e.get("afterMethodName"): return error_with_hint( "insert_method with position='after' requires 'afterMethodName'.", {"op": "insert_method", "position": { "after_requires": "afterMethodName"}}, {"edits[0].afterMethodName": "GetCurrentTarget"} ) if pos == "before" and not e.get("beforeMethodName"): return error_with_hint( "insert_method with position='before' requires 'beforeMethodName'.", {"op": "insert_method", "position": { "before_requires": "beforeMethodName"}}, {"edits[0].beforeMethodName": "GetCurrentTarget"} ) elif op == "delete_method": if not e.get("methodName"): return error_with_hint( "delete_method requires 'methodName'.", {"op": "delete_method", "required": [ "className", "methodName"]}, {"edits[0].methodName": "PrintSeries"} ) elif op in ("anchor_insert", "anchor_replace", "anchor_delete"): if not e.get("anchor"): return error_with_hint( f"{op} requires 'anchor' (regex).", {"op": op, "required": ["anchor"]}, {"edits[0].anchor": "(?m)^\\s*public\\s+bool\\s+HasTarget\\s*\\("} ) if op in ("anchor_insert", "anchor_replace") and not (e.get("text") or e.get("replacement")): return error_with_hint( f"{op} requires 'text'.", {"op": op, "required": ["anchor", "text"]}, {"edits[0].text": "/* comment */\n"} ) # Decide routing: structured vs text vs mixed STRUCT = {"replace_class", "delete_class", "replace_method", "delete_method", "insert_method", "anchor_delete", "anchor_replace", "anchor_insert"} TEXT = {"prepend", "append", "replace_range", "regex_replace"} ops_set = {(e.get("op") or "").lower() for e in edits or []} all_struct = ops_set.issubset(STRUCT) all_text = ops_set.issubset(TEXT) mixed = not (all_struct or all_text) # If everything is structured (method/class/anchor ops), forward directly to Unity's structured editor. if all_struct: opts2 = dict(options or {}) # For structured edits, prefer immediate refresh to avoid missed reloads when Editor is unfocused opts2.setdefault("refresh", "immediate") params_struct: dict[str, Any] = { "action": "edit", "name": name, "path": path, "namespace": namespace, "scriptType": script_type, "edits": edits, "options": opts2, } resp_struct = send_command_with_retry( "manage_script", params_struct) if isinstance(resp_struct, dict) and resp_struct.get("success"): pass # Optional sentinel reload removed (deprecated) return _with_norm(resp_struct if isinstance(resp_struct, dict) else {"success": False, "message": str(resp_struct)}, normalized_for_echo, routing="structured") # 1) read from Unity read_resp = send_command_with_retry("manage_script", { "action": "read", "name": name, "path": path, "namespace": namespace, "scriptType": script_type, }) if not isinstance(read_resp, dict) or not read_resp.get("success"): return read_resp if isinstance(read_resp, dict) else {"success": False, "message": str(read_resp)} data = read_resp.get("data") or read_resp.get( "result", {}).get("data") or {} contents = data.get("contents") if contents is None and data.get("contentsEncoded") and data.get("encodedContents"): contents = base64.b64decode( data["encodedContents"]).decode("utf-8") if contents is None: return {"success": False, "message": "No contents returned from Unity read."} # Optional preview/dry-run: apply locally and return diff without writing preview = bool((options or {}).get("preview")) # If we have a mixed batch (TEXT + STRUCT), apply text first with precondition, then structured if mixed: text_edits = [e for e in edits or [] if ( e.get("op") or "").lower() in TEXT] struct_edits = [e for e in edits or [] if ( e.get("op") or "").lower() in STRUCT] try: base_text = contents def line_col_from_index(idx: int) -> tuple[int, int]: line = base_text.count("\n", 0, idx) + 1 last_nl = base_text.rfind("\n", 0, idx) col = (idx - (last_nl + 1)) + \ 1 if last_nl >= 0 else idx + 1 return line, col at_edits: list[dict[str, Any]] = [] for e in text_edits: opx = (e.get("op") or e.get("operation") or e.get( "type") or e.get("mode") or "").strip().lower() text_field = e.get("text") or e.get("insert") or e.get( "content") or e.get("replacement") or "" if opx == "anchor_insert": anchor = e.get("anchor") or "" position = (e.get("position") or "after").lower() flags = re.MULTILINE | ( re.IGNORECASE if e.get("ignore_case") else 0) try: # Use improved anchor matching logic m = _find_best_anchor_match( anchor, base_text, flags, prefer_last=True) except Exception as ex: return _with_norm(_err("bad_regex", f"Invalid anchor regex: {ex}", normalized=normalized_for_echo, routing="mixed/text-first", extra={"hint": "Escape parentheses/braces or use a simpler anchor."}), normalized_for_echo, routing="mixed/text-first") if not m: return _with_norm({"success": False, "code": "anchor_not_found", "message": f"anchor not found: {anchor}"}, normalized_for_echo, routing="mixed/text-first") idx = m.start() if position == "before" else m.end() # Normalize insertion to avoid jammed methods text_field_norm = text_field if not text_field_norm.startswith("\n"): text_field_norm = "\n" + text_field_norm if not text_field_norm.endswith("\n"): text_field_norm = text_field_norm + "\n" sl, sc = line_col_from_index(idx) at_edits.append( {"startLine": sl, "startCol": sc, "endLine": sl, "endCol": sc, "newText": text_field_norm}) # do not mutate base_text when building atomic spans elif opx == "replace_range": if all(k in e for k in ("startLine", "startCol", "endLine", "endCol")): at_edits.append({ "startLine": int(e.get("startLine", 1)), "startCol": int(e.get("startCol", 1)), "endLine": int(e.get("endLine", 1)), "endCol": int(e.get("endCol", 1)), "newText": text_field }) else: return _with_norm(_err("missing_field", "replace_range requires startLine/startCol/endLine/endCol", normalized=normalized_for_echo, routing="mixed/text-first"), normalized_for_echo, routing="mixed/text-first") elif opx == "regex_replace": pattern = e.get("pattern") or "" try: regex_obj = re.compile(pattern, re.MULTILINE | ( re.IGNORECASE if e.get("ignore_case") else 0)) except Exception as ex: return _with_norm(_err("bad_regex", f"Invalid regex pattern: {ex}", normalized=normalized_for_echo, routing="mixed/text-first", extra={"hint": "Escape special chars or prefer structured delete for methods."}), normalized_for_echo, routing="mixed/text-first") m = regex_obj.search(base_text) if not m: continue # Expand $1, $2... in replacement using this match def _expand_dollars(rep: str, _m=m) -> str: return re.sub(r"\$(\d+)", lambda g: _m.group(int(g.group(1))) or "", rep) repl = _expand_dollars(text_field) sl, sc = line_col_from_index(m.start()) el, ec = line_col_from_index(m.end()) at_edits.append( {"startLine": sl, "startCol": sc, "endLine": el, "endCol": ec, "newText": repl}) # do not mutate base_text when building atomic spans elif opx in ("prepend", "append"): if opx == "prepend": sl, sc = 1, 1 at_edits.append( {"startLine": sl, "startCol": sc, "endLine": sl, "endCol": sc, "newText": text_field}) # prepend can be applied atomically without local mutation else: # Insert at true EOF position (handles both \n and \r\n correctly) eof_idx = len(base_text) sl, sc = line_col_from_index(eof_idx) new_text = ("\n" if not base_text.endswith( "\n") else "") + text_field at_edits.append( {"startLine": sl, "startCol": sc, "endLine": sl, "endCol": sc, "newText": new_text}) # do not mutate base_text when building atomic spans else: return _with_norm(_err("unknown_op", f"Unsupported text edit op: {opx}", normalized=normalized_for_echo, routing="mixed/text-first"), normalized_for_echo, routing="mixed/text-first") sha = hashlib.sha256(base_text.encode("utf-8")).hexdigest() if at_edits: params_text: dict[str, Any] = { "action": "apply_text_edits", "name": name, "path": path, "namespace": namespace, "scriptType": script_type, "edits": at_edits, "precondition_sha256": sha, "options": {"refresh": (options or {}).get("refresh", "debounced"), "validate": (options or {}).get("validate", "standard"), "applyMode": ("atomic" if len(at_edits) > 1 else (options or {}).get("applyMode", "sequential"))} } resp_text = send_command_with_retry( "manage_script", params_text) if not (isinstance(resp_text, dict) and resp_text.get("success")): return _with_norm(resp_text if isinstance(resp_text, dict) else {"success": False, "message": str(resp_text)}, normalized_for_echo, routing="mixed/text-first") # Optional sentinel reload removed (deprecated) except Exception as e: return _with_norm({"success": False, "message": f"Text edit conversion failed: {e}"}, normalized_for_echo, routing="mixed/text-first") if struct_edits: opts2 = dict(options or {}) # Prefer debounced background refresh unless explicitly overridden opts2.setdefault("refresh", "debounced") params_struct: dict[str, Any] = { "action": "edit", "name": name, "path": path, "namespace": namespace, "scriptType": script_type, "edits": struct_edits, "options": opts2 } resp_struct = send_command_with_retry( "manage_script", params_struct) if isinstance(resp_struct, dict) and resp_struct.get("success"): pass # Optional sentinel reload removed (deprecated) return _with_norm(resp_struct if isinstance(resp_struct, dict) else {"success": False, "message": str(resp_struct)}, normalized_for_echo, routing="mixed/text-first") return _with_norm({"success": True, "message": "Applied text edits (no structured ops)"}, normalized_for_echo, routing="mixed/text-first") # If the edits are text-ops, prefer sending them to Unity's apply_text_edits with precondition # so header guards and validation run on the C# side. # Supported conversions: anchor_insert, replace_range, regex_replace (first match only). text_ops = {(e.get("op") or e.get("operation") or e.get("type") or e.get( "mode") or "").strip().lower() for e in (edits or [])} structured_kinds = {"replace_class", "delete_class", "replace_method", "delete_method", "insert_method", "anchor_insert"} if not text_ops.issubset(structured_kinds): # Convert to apply_text_edits payload try: base_text = contents def line_col_from_index(idx: int) -> tuple[int, int]: # 1-based line/col against base buffer line = base_text.count("\n", 0, idx) + 1 last_nl = base_text.rfind("\n", 0, idx) col = (idx - (last_nl + 1)) + \ 1 if last_nl >= 0 else idx + 1 return line, col at_edits: list[dict[str, Any]] = [] import re as _re for e in edits or []: op = (e.get("op") or e.get("operation") or e.get( "type") or e.get("mode") or "").strip().lower() # aliasing for text field text_field = e.get("text") or e.get( "insert") or e.get("content") or "" if op == "anchor_insert": anchor = e.get("anchor") or "" position = (e.get("position") or "after").lower() # Use improved anchor matching logic with helpful errors, honoring ignore_case try: flags = re.MULTILINE | ( re.IGNORECASE if e.get("ignore_case") else 0) m = _find_best_anchor_match( anchor, base_text, flags, prefer_last=True) except Exception as ex: return _with_norm(_err("bad_regex", f"Invalid anchor regex: {ex}", normalized=normalized_for_echo, routing="text", extra={"hint": "Escape parentheses/braces or use a simpler anchor."}), normalized_for_echo, routing="text") if not m: return _with_norm({"success": False, "code": "anchor_not_found", "message": f"anchor not found: {anchor}"}, normalized_for_echo, routing="text") idx = m.start() if position == "before" else m.end() # Normalize insertion newlines if text_field and not text_field.startswith("\n"): text_field = "\n" + text_field if text_field and not text_field.endswith("\n"): text_field = text_field + "\n" sl, sc = line_col_from_index(idx) at_edits.append({ "startLine": sl, "startCol": sc, "endLine": sl, "endCol": sc, "newText": text_field or "" }) # Do not mutate base buffer when building an atomic batch elif op == "replace_range": # Directly forward if already in line/col form if "startLine" in e: at_edits.append({ "startLine": int(e.get("startLine", 1)), "startCol": int(e.get("startCol", 1)), "endLine": int(e.get("endLine", 1)), "endCol": int(e.get("endCol", 1)), "newText": text_field }) else: # If only indices provided, skip (we don't support index-based here) return _with_norm({"success": False, "code": "missing_field", "message": "replace_range requires startLine/startCol/endLine/endCol"}, normalized_for_echo, routing="text") elif op == "regex_replace": pattern = e.get("pattern") or "" repl = text_field flags = re.MULTILINE | ( re.IGNORECASE if e.get("ignore_case") else 0) # Early compile for clearer error messages try: regex_obj = re.compile(pattern, flags) except Exception as ex: return _with_norm(_err("bad_regex", f"Invalid regex pattern: {ex}", normalized=normalized_for_echo, routing="text", extra={"hint": "Escape special chars or prefer structured delete for methods."}), normalized_for_echo, routing="text") # Use smart anchor matching for consistent behavior with anchor_insert m = _find_best_anchor_match( pattern, base_text, flags, prefer_last=True) if not m: continue # Expand $1, $2... backrefs in replacement using the first match (consistent with mixed-path behavior) def _expand_dollars(rep: str, _m=m) -> str: return re.sub(r"\$(\d+)", lambda g: _m.group(int(g.group(1))) or "", rep) repl_expanded = _expand_dollars(repl) # Let C# side handle validation using Unity's built-in compiler services sl, sc = line_col_from_index(m.start()) el, ec = line_col_from_index(m.end()) at_edits.append({ "startLine": sl, "startCol": sc, "endLine": el, "endCol": ec, "newText": repl_expanded }) # Do not mutate base buffer when building an atomic batch else: return _with_norm({"success": False, "code": "unsupported_op", "message": f"Unsupported text edit op for server-side apply_text_edits: {op}"}, normalized_for_echo, routing="text") if not at_edits: return _with_norm({"success": False, "code": "no_spans", "message": "No applicable text edit spans computed (anchor not found or zero-length)."}, normalized_for_echo, routing="text") sha = hashlib.sha256(base_text.encode("utf-8")).hexdigest() params: dict[str, Any] = { "action": "apply_text_edits", "name": name, "path": path, "namespace": namespace, "scriptType": script_type, "edits": at_edits, "precondition_sha256": sha, "options": { "refresh": (options or {}).get("refresh", "debounced"), "validate": (options or {}).get("validate", "standard"), "applyMode": ("atomic" if len(at_edits) > 1 else (options or {}).get("applyMode", "sequential")) } } resp = send_command_with_retry("manage_script", params) if isinstance(resp, dict) and resp.get("success"): pass # Optional sentinel reload removed (deprecated) return _with_norm( resp if isinstance(resp, dict) else { "success": False, "message": str(resp)}, normalized_for_echo, routing="text" ) except Exception as e: return _with_norm({"success": False, "code": "conversion_failed", "message": f"Edit conversion failed: {e}"}, normalized_for_echo, routing="text") # For regex_replace, honor preview consistently: if preview=true, always return diff without writing. # If confirm=false (default) and preview not requested, return diff and instruct confirm=true to apply. if "regex_replace" in text_ops and (preview or not (options or {}).get("confirm")): try: preview_text = _apply_edits_locally(contents, edits) import difflib diff = list(difflib.unified_diff(contents.splitlines( ), preview_text.splitlines(), fromfile="before", tofile="after", n=2)) if len(diff) > 800: diff = diff[:800] + ["... (diff truncated) ..."] if preview: return {"success": True, "message": "Preview only (no write)", "data": {"diff": "\n".join(diff), "normalizedEdits": normalized_for_echo}} return _with_norm({"success": False, "message": "Preview diff; set options.confirm=true to apply.", "data": {"diff": "\n".join(diff)}}, normalized_for_echo, routing="text") except Exception as e: return _with_norm({"success": False, "code": "preview_failed", "message": f"Preview failed: {e}"}, normalized_for_echo, routing="text") # 2) apply edits locally (only if not text-ops) try: new_contents = _apply_edits_locally(contents, edits) except Exception as e: return {"success": False, "message": f"Edit application failed: {e}"} # Short-circuit no-op edits to avoid false "applied" reports downstream if new_contents == contents: return _with_norm({ "success": True, "message": "No-op: contents unchanged", "data": {"no_op": True, "evidence": {"reason": "identical_content"}} }, normalized_for_echo, routing="text") if preview: # Produce a compact unified diff limited to small context import difflib a = contents.splitlines() b = new_contents.splitlines() diff = list(difflib.unified_diff( a, b, fromfile="before", tofile="after", n=3)) # Limit diff size to keep responses small if len(diff) > 2000: diff = diff[:2000] + ["... (diff truncated) ..."] return {"success": True, "message": "Preview only (no write)", "data": {"diff": "\n".join(diff), "normalizedEdits": normalized_for_echo}} # 3) update to Unity # Default refresh/validate for natural usage on text path as well options = dict(options or {}) options.setdefault("validate", "standard") options.setdefault("refresh", "debounced") # Compute the SHA of the current file contents for the precondition old_lines = contents.splitlines(keepends=True) end_line = len(old_lines) + 1 # 1-based exclusive end sha = hashlib.sha256(contents.encode("utf-8")).hexdigest() # Apply a whole-file text edit rather than the deprecated 'update' action params = { "action": "apply_text_edits", "name": name, "path": path, "namespace": namespace, "scriptType": script_type, "edits": [ { "startLine": 1, "startCol": 1, "endLine": end_line, "endCol": 1, "newText": new_contents, } ], "precondition_sha256": sha, "options": options or {"validate": "standard", "refresh": "debounced"}, } write_resp = send_command_with_retry("manage_script", params) if isinstance(write_resp, dict) and write_resp.get("success"): pass # Optional sentinel reload removed (deprecated) return _with_norm( write_resp if isinstance(write_resp, dict) else {"success": False, "message": str(write_resp)}, normalized_for_echo, routing="text", ) ``` -------------------------------------------------------------------------------- /UnityMcpBridge/UnityMcpServer~/src/tools/script_apply_edits.py: -------------------------------------------------------------------------------- ```python import base64 import hashlib import re from typing import Annotated, Any from mcp.server.fastmcp import Context from registry import mcp_for_unity_tool from unity_connection import send_command_with_retry def _apply_edits_locally(original_text: str, edits: list[dict[str, Any]]) -> str: text = original_text for edit in edits or []: op = ( (edit.get("op") or edit.get("operation") or edit.get("type") or edit.get("mode") or "") .strip() .lower() ) if not op: allowed = "anchor_insert, prepend, append, replace_range, regex_replace" raise RuntimeError( f"op is required; allowed: {allowed}. Use 'op' (aliases accepted: type/mode/operation)." ) if op == "prepend": prepend_text = edit.get("text", "") text = (prepend_text if prepend_text.endswith( "\n") else prepend_text + "\n") + text elif op == "append": append_text = edit.get("text", "") if not text.endswith("\n"): text += "\n" text += append_text if not text.endswith("\n"): text += "\n" elif op == "anchor_insert": anchor = edit.get("anchor", "") position = (edit.get("position") or "before").lower() insert_text = edit.get("text", "") flags = re.MULTILINE | ( re.IGNORECASE if edit.get("ignore_case") else 0) # Find the best match using improved heuristics match = _find_best_anchor_match( anchor, text, flags, bool(edit.get("prefer_last", True))) if not match: if edit.get("allow_noop", True): continue raise RuntimeError(f"anchor not found: {anchor}") idx = match.start() if position == "before" else match.end() text = text[:idx] + insert_text + text[idx:] elif op == "replace_range": start_line = int(edit.get("startLine", 1)) start_col = int(edit.get("startCol", 1)) end_line = int(edit.get("endLine", start_line)) end_col = int(edit.get("endCol", 1)) replacement = edit.get("text", "") lines = text.splitlines(keepends=True) max_line = len(lines) + 1 # 1-based, exclusive end if (start_line < 1 or end_line < start_line or end_line > max_line or start_col < 1 or end_col < 1): raise RuntimeError("replace_range out of bounds") def index_of(line: int, col: int) -> int: if line <= len(lines): return sum(len(l) for l in lines[: line - 1]) + (col - 1) return sum(len(l) for l in lines) a = index_of(start_line, start_col) b = index_of(end_line, end_col) text = text[:a] + replacement + text[b:] elif op == "regex_replace": pattern = edit.get("pattern", "") repl = edit.get("replacement", "") # Translate $n backrefs (our input) to Python \g<n> repl_py = re.sub(r"\$(\d+)", r"\\g<\1>", repl) count = int(edit.get("count", 0)) # 0 = replace all flags = re.MULTILINE if edit.get("ignore_case"): flags |= re.IGNORECASE text = re.sub(pattern, repl_py, text, count=count, flags=flags) else: allowed = "anchor_insert, prepend, append, replace_range, regex_replace" raise RuntimeError( f"unknown edit op: {op}; allowed: {allowed}. Use 'op' (aliases accepted: type/mode/operation).") return text def _find_best_anchor_match(pattern: str, text: str, flags: int, prefer_last: bool = True): """ Find the best anchor match using improved heuristics. For patterns like \\s*}\\s*$ that are meant to find class-ending braces, this function uses heuristics to choose the most semantically appropriate match: 1. If prefer_last=True, prefer the last match (common for class-end insertions) 2. Use indentation levels to distinguish class vs method braces 3. Consider context to avoid matches inside strings/comments Args: pattern: Regex pattern to search for text: Text to search in flags: Regex flags prefer_last: If True, prefer the last match over the first Returns: Match object of the best match, or None if no match found """ # Find all matches matches = list(re.finditer(pattern, text, flags)) if not matches: return None # If only one match, return it if len(matches) == 1: return matches[0] # For patterns that look like they're trying to match closing braces at end of lines is_closing_brace_pattern = '}' in pattern and ( '$' in pattern or pattern.endswith(r'\s*')) if is_closing_brace_pattern and prefer_last: # Use heuristics to find the best closing brace match return _find_best_closing_brace_match(matches, text) # Default behavior: use last match if prefer_last, otherwise first match return matches[-1] if prefer_last else matches[0] def _find_best_closing_brace_match(matches, text: str): """ Find the best closing brace match using C# structure heuristics. Enhanced heuristics for scope-aware matching: 1. Prefer matches with lower indentation (likely class-level) 2. Prefer matches closer to end of file 3. Avoid matches that seem to be inside method bodies 4. For #endregion patterns, ensure class-level context 5. Validate insertion point is at appropriate scope Args: matches: List of regex match objects text: The full text being searched Returns: The best match object """ if not matches: return None scored_matches = [] lines = text.splitlines() for match in matches: score = 0 start_pos = match.start() # Find which line this match is on lines_before = text[:start_pos].count('\n') line_num = lines_before if line_num < len(lines): line_content = lines[line_num] # Calculate indentation level (lower is better for class braces) indentation = len(line_content) - len(line_content.lstrip()) # Prefer lower indentation (class braces are typically less indented than method braces) # Max 20 points for indentation=0 score += max(0, 20 - indentation) # Prefer matches closer to end of file (class closing braces are typically at the end) distance_from_end = len(lines) - line_num # More points for being closer to end score += max(0, 10 - distance_from_end) # Look at surrounding context to avoid method braces context_start = max(0, line_num - 3) context_end = min(len(lines), line_num + 2) context_lines = lines[context_start:context_end] # Penalize if this looks like it's inside a method (has method-like patterns above) for context_line in context_lines: if re.search(r'\b(void|public|private|protected)\s+\w+\s*\(', context_line): score -= 5 # Penalty for being near method signatures # Bonus if this looks like a class-ending brace (very minimal indentation and near EOF) if indentation <= 4 and distance_from_end <= 3: score += 15 # Bonus for likely class-ending brace scored_matches.append((score, match)) # Return the match with the highest score scored_matches.sort(key=lambda x: x[0], reverse=True) best_match = scored_matches[0][1] return best_match def _infer_class_name(script_name: str) -> str: # Default to script name as class name (common Unity pattern) return (script_name or "").strip() def _extract_code_after(keyword: str, request: str) -> str: # Deprecated with NL removal; retained as no-op for compatibility idx = request.lower().find(keyword) if idx >= 0: return request[idx + len(keyword):].strip() return "" # Removed _is_structurally_balanced - validation now handled by C# side using Unity's compiler services def _normalize_script_locator(name: str, path: str) -> tuple[str, str]: """Best-effort normalization of script "name" and "path". Accepts any of: - name = "SmartReach", path = "Assets/Scripts/Interaction" - name = "SmartReach.cs", path = "Assets/Scripts/Interaction" - name = "Assets/Scripts/Interaction/SmartReach.cs", path = "" - path = "Assets/Scripts/Interaction/SmartReach.cs" (name empty) - name or path using uri prefixes: unity://path/..., file://... - accidental duplicates like "Assets/.../SmartReach.cs/SmartReach.cs" Returns (name_without_extension, directory_path_under_Assets). """ n = (name or "").strip() p = (path or "").strip() def strip_prefix(s: str) -> str: if s.startswith("unity://path/"): return s[len("unity://path/"):] if s.startswith("file://"): return s[len("file://"):] return s def collapse_duplicate_tail(s: str) -> str: # Collapse trailing "/X.cs/X.cs" to "/X.cs" parts = s.split("/") if len(parts) >= 2 and parts[-1] == parts[-2]: parts = parts[:-1] return "/".join(parts) # Prefer a full path if provided in either field candidate = "" for v in (n, p): v2 = strip_prefix(v) if v2.endswith(".cs") or v2.startswith("Assets/"): candidate = v2 break if candidate: candidate = collapse_duplicate_tail(candidate) # If a directory was passed in path and file in name, join them if not candidate.endswith(".cs") and n.endswith(".cs"): v2 = strip_prefix(n) candidate = (candidate.rstrip("/") + "/" + v2.split("/")[-1]) if candidate.endswith(".cs"): parts = candidate.split("/") file_name = parts[-1] dir_path = "/".join(parts[:-1]) if len(parts) > 1 else "Assets" base = file_name[:- 3] if file_name.lower().endswith(".cs") else file_name return base, dir_path # Fall back: remove extension from name if present and return given path base_name = n[:-3] if n.lower().endswith(".cs") else n return base_name, (p or "Assets") def _with_norm(resp: dict[str, Any] | Any, edits: list[dict[str, Any]], routing: str | None = None) -> dict[str, Any] | Any: if not isinstance(resp, dict): return resp data = resp.setdefault("data", {}) data.setdefault("normalizedEdits", edits) if routing: data["routing"] = routing return resp def _err(code: str, message: str, *, expected: dict[str, Any] | None = None, rewrite: dict[str, Any] | None = None, normalized: list[dict[str, Any]] | None = None, routing: str | None = None, extra: dict[str, Any] | None = None) -> dict[str, Any]: payload: dict[str, Any] = {"success": False, "code": code, "message": message} data: dict[str, Any] = {} if expected: data["expected"] = expected if rewrite: data["rewrite_suggestion"] = rewrite if normalized is not None: data["normalizedEdits"] = normalized if routing: data["routing"] = routing if extra: data.update(extra) if data: payload["data"] = data return payload # Natural-language parsing removed; clients should send structured edits. @mcp_for_unity_tool(name="script_apply_edits", description=( """Structured C# edits (methods/classes) with safer boundaries - prefer this over raw text. Best practices: - Prefer anchor_* ops for pattern-based insert/replace near stable markers - Use replace_method/delete_method for whole-method changes (keeps signatures balanced) - Avoid whole-file regex deletes; validators will guard unbalanced braces - For tail insertions, prefer anchor/regex_replace on final brace (class closing) - Pass options.validate='standard' for structural checks; 'relaxed' for interior-only edits Canonical fields (use these exact keys): - op: replace_method | insert_method | delete_method | anchor_insert | anchor_delete | anchor_replace - className: string (defaults to 'name' if omitted on method/class ops) - methodName: string (required for replace_method, delete_method) - replacement: string (required for replace_method, insert_method) - position: start | end | after | before (insert_method only) - afterMethodName / beforeMethodName: string (required when position='after'/'before') - anchor: regex string (for anchor_* ops) - text: string (for anchor_insert/anchor_replace) Examples: 1) Replace a method: { "name": "SmartReach", "path": "Assets/Scripts/Interaction", "edits": [ { "op": "replace_method", "className": "SmartReach", "methodName": "HasTarget", "replacement": "public bool HasTarget(){ return currentTarget!=null; }" } ], "options": {"validate": "standard", "refresh": "immediate"} } "2) Insert a method after another: { "name": "SmartReach", "path": "Assets/Scripts/Interaction", "edits": [ { "op": "insert_method", "className": "SmartReach", "replacement": "public void PrintSeries(){ Debug.Log(seriesName); }", "position": "after", "afterMethodName": "GetCurrentTarget" } ], } ]""" )) def script_apply_edits( ctx: Context, name: Annotated[str, "Name of the script to edit"], path: Annotated[str, "Path to the script to edit under Assets/ directory"], edits: Annotated[list[dict[str, Any]], "List of edits to apply to the script"], options: Annotated[dict[str, Any], "Options for the script edit"] | None = None, script_type: Annotated[str, "Type of the script to edit"] = "MonoBehaviour", namespace: Annotated[str, "Namespace of the script to edit"] | None = None, ) -> dict[str, Any]: ctx.info(f"Processing script_apply_edits: {name}") # Normalize locator first so downstream calls target the correct script file. name, path = _normalize_script_locator(name, path) # Normalize unsupported or aliased ops to known structured/text paths def _unwrap_and_alias(edit: dict[str, Any]) -> dict[str, Any]: # Unwrap single-key wrappers like {"replace_method": {...}} for wrapper_key in ( "replace_method", "insert_method", "delete_method", "replace_class", "delete_class", "anchor_insert", "anchor_replace", "anchor_delete", ): if wrapper_key in edit and isinstance(edit[wrapper_key], dict): inner = dict(edit[wrapper_key]) inner["op"] = wrapper_key edit = inner break e = dict(edit) op = (e.get("op") or e.get("operation") or e.get( "type") or e.get("mode") or "").strip().lower() if op: e["op"] = op # Common field aliases if "class_name" in e and "className" not in e: e["className"] = e.pop("class_name") if "class" in e and "className" not in e: e["className"] = e.pop("class") if "method_name" in e and "methodName" not in e: e["methodName"] = e.pop("method_name") # Some clients use a generic 'target' for method name if "target" in e and "methodName" not in e: e["methodName"] = e.pop("target") if "method" in e and "methodName" not in e: e["methodName"] = e.pop("method") if "new_content" in e and "replacement" not in e: e["replacement"] = e.pop("new_content") if "newMethod" in e and "replacement" not in e: e["replacement"] = e.pop("newMethod") if "new_method" in e and "replacement" not in e: e["replacement"] = e.pop("new_method") if "content" in e and "replacement" not in e: e["replacement"] = e.pop("content") if "after" in e and "afterMethodName" not in e: e["afterMethodName"] = e.pop("after") if "after_method" in e and "afterMethodName" not in e: e["afterMethodName"] = e.pop("after_method") if "before" in e and "beforeMethodName" not in e: e["beforeMethodName"] = e.pop("before") if "before_method" in e and "beforeMethodName" not in e: e["beforeMethodName"] = e.pop("before_method") # anchor_method → before/after based on position (default after) if "anchor_method" in e: anchor = e.pop("anchor_method") pos = (e.get("position") or "after").strip().lower() if pos == "before" and "beforeMethodName" not in e: e["beforeMethodName"] = anchor elif "afterMethodName" not in e: e["afterMethodName"] = anchor if "anchorText" in e and "anchor" not in e: e["anchor"] = e.pop("anchorText") if "pattern" in e and "anchor" not in e and e.get("op") and e["op"].startswith("anchor_"): e["anchor"] = e.pop("pattern") if "newText" in e and "text" not in e: e["text"] = e.pop("newText") # CI compatibility (T‑A/T‑E): # Accept method-anchored anchor_insert and upgrade to insert_method # Example incoming shape: # {"op":"anchor_insert","afterMethodName":"GetCurrentTarget","text":"..."} if ( e.get("op") == "anchor_insert" and not e.get("anchor") and (e.get("afterMethodName") or e.get("beforeMethodName")) ): e["op"] = "insert_method" if "replacement" not in e: e["replacement"] = e.get("text", "") # LSP-like range edit -> replace_range if "range" in e and isinstance(e["range"], dict): rng = e.pop("range") start = rng.get("start", {}) end = rng.get("end", {}) # Convert 0-based to 1-based line/col e["op"] = "replace_range" e["startLine"] = int(start.get("line", 0)) + 1 e["startCol"] = int(start.get("character", 0)) + 1 e["endLine"] = int(end.get("line", 0)) + 1 e["endCol"] = int(end.get("character", 0)) + 1 if "newText" in edit and "text" not in e: e["text"] = edit.get("newText", "") return e normalized_edits: list[dict[str, Any]] = [] for raw in edits or []: e = _unwrap_and_alias(raw) op = (e.get("op") or e.get("operation") or e.get( "type") or e.get("mode") or "").strip().lower() # Default className to script name if missing on structured method/class ops if op in ("replace_class", "delete_class", "replace_method", "delete_method", "insert_method") and not e.get("className"): e["className"] = name # Map common aliases for text ops if op in ("text_replace",): e["op"] = "replace_range" normalized_edits.append(e) continue if op in ("regex_delete",): e["op"] = "regex_replace" e.setdefault("text", "") normalized_edits.append(e) continue if op == "regex_replace" and ("replacement" not in e): if "text" in e: e["replacement"] = e.get("text", "") elif "insert" in e or "content" in e: e["replacement"] = e.get( "insert") or e.get("content") or "" if op == "anchor_insert" and not (e.get("text") or e.get("insert") or e.get("content") or e.get("replacement")): e["op"] = "anchor_delete" normalized_edits.append(e) continue normalized_edits.append(e) edits = normalized_edits normalized_for_echo = edits # Validate required fields and produce machine-parsable hints def error_with_hint(message: str, expected: dict[str, Any], suggestion: dict[str, Any]) -> dict[str, Any]: return _err("missing_field", message, expected=expected, rewrite=suggestion, normalized=normalized_for_echo) for e in edits or []: op = e.get("op", "") if op == "replace_method": if not e.get("methodName"): return error_with_hint( "replace_method requires 'methodName'.", {"op": "replace_method", "required": [ "className", "methodName", "replacement"]}, {"edits[0].methodName": "HasTarget"} ) if not (e.get("replacement") or e.get("text")): return error_with_hint( "replace_method requires 'replacement' (inline or base64).", {"op": "replace_method", "required": [ "className", "methodName", "replacement"]}, {"edits[0].replacement": "public bool X(){ return true; }"} ) elif op == "insert_method": if not (e.get("replacement") or e.get("text")): return error_with_hint( "insert_method requires a non-empty 'replacement'.", {"op": "insert_method", "required": ["className", "replacement"], "position": { "after_requires": "afterMethodName", "before_requires": "beforeMethodName"}}, {"edits[0].replacement": "public void PrintSeries(){ Debug.Log(\"1,2,3\"); }"} ) pos = (e.get("position") or "").lower() if pos == "after" and not e.get("afterMethodName"): return error_with_hint( "insert_method with position='after' requires 'afterMethodName'.", {"op": "insert_method", "position": { "after_requires": "afterMethodName"}}, {"edits[0].afterMethodName": "GetCurrentTarget"} ) if pos == "before" and not e.get("beforeMethodName"): return error_with_hint( "insert_method with position='before' requires 'beforeMethodName'.", {"op": "insert_method", "position": { "before_requires": "beforeMethodName"}}, {"edits[0].beforeMethodName": "GetCurrentTarget"} ) elif op == "delete_method": if not e.get("methodName"): return error_with_hint( "delete_method requires 'methodName'.", {"op": "delete_method", "required": [ "className", "methodName"]}, {"edits[0].methodName": "PrintSeries"} ) elif op in ("anchor_insert", "anchor_replace", "anchor_delete"): if not e.get("anchor"): return error_with_hint( f"{op} requires 'anchor' (regex).", {"op": op, "required": ["anchor"]}, {"edits[0].anchor": "(?m)^\\s*public\\s+bool\\s+HasTarget\\s*\\("} ) if op in ("anchor_insert", "anchor_replace") and not (e.get("text") or e.get("replacement")): return error_with_hint( f"{op} requires 'text'.", {"op": op, "required": ["anchor", "text"]}, {"edits[0].text": "/* comment */\n"} ) # Decide routing: structured vs text vs mixed STRUCT = {"replace_class", "delete_class", "replace_method", "delete_method", "insert_method", "anchor_delete", "anchor_replace", "anchor_insert"} TEXT = {"prepend", "append", "replace_range", "regex_replace"} ops_set = {(e.get("op") or "").lower() for e in edits or []} all_struct = ops_set.issubset(STRUCT) all_text = ops_set.issubset(TEXT) mixed = not (all_struct or all_text) # If everything is structured (method/class/anchor ops), forward directly to Unity's structured editor. if all_struct: opts2 = dict(options or {}) # For structured edits, prefer immediate refresh to avoid missed reloads when Editor is unfocused opts2.setdefault("refresh", "immediate") params_struct: dict[str, Any] = { "action": "edit", "name": name, "path": path, "namespace": namespace, "scriptType": script_type, "edits": edits, "options": opts2, } resp_struct = send_command_with_retry( "manage_script", params_struct) if isinstance(resp_struct, dict) and resp_struct.get("success"): pass # Optional sentinel reload removed (deprecated) return _with_norm(resp_struct if isinstance(resp_struct, dict) else {"success": False, "message": str(resp_struct)}, normalized_for_echo, routing="structured") # 1) read from Unity read_resp = send_command_with_retry("manage_script", { "action": "read", "name": name, "path": path, "namespace": namespace, "scriptType": script_type, }) if not isinstance(read_resp, dict) or not read_resp.get("success"): return read_resp if isinstance(read_resp, dict) else {"success": False, "message": str(read_resp)} data = read_resp.get("data") or read_resp.get( "result", {}).get("data") or {} contents = data.get("contents") if contents is None and data.get("contentsEncoded") and data.get("encodedContents"): contents = base64.b64decode( data["encodedContents"]).decode("utf-8") if contents is None: return {"success": False, "message": "No contents returned from Unity read."} # Optional preview/dry-run: apply locally and return diff without writing preview = bool((options or {}).get("preview")) # If we have a mixed batch (TEXT + STRUCT), apply text first with precondition, then structured if mixed: text_edits = [e for e in edits or [] if ( e.get("op") or "").lower() in TEXT] struct_edits = [e for e in edits or [] if ( e.get("op") or "").lower() in STRUCT] try: base_text = contents def line_col_from_index(idx: int) -> tuple[int, int]: line = base_text.count("\n", 0, idx) + 1 last_nl = base_text.rfind("\n", 0, idx) col = (idx - (last_nl + 1)) + \ 1 if last_nl >= 0 else idx + 1 return line, col at_edits: list[dict[str, Any]] = [] for e in text_edits: opx = (e.get("op") or e.get("operation") or e.get( "type") or e.get("mode") or "").strip().lower() text_field = e.get("text") or e.get("insert") or e.get( "content") or e.get("replacement") or "" if opx == "anchor_insert": anchor = e.get("anchor") or "" position = (e.get("position") or "after").lower() flags = re.MULTILINE | ( re.IGNORECASE if e.get("ignore_case") else 0) try: # Use improved anchor matching logic m = _find_best_anchor_match( anchor, base_text, flags, prefer_last=True) except Exception as ex: return _with_norm(_err("bad_regex", f"Invalid anchor regex: {ex}", normalized=normalized_for_echo, routing="mixed/text-first", extra={"hint": "Escape parentheses/braces or use a simpler anchor."}), normalized_for_echo, routing="mixed/text-first") if not m: return _with_norm({"success": False, "code": "anchor_not_found", "message": f"anchor not found: {anchor}"}, normalized_for_echo, routing="mixed/text-first") idx = m.start() if position == "before" else m.end() # Normalize insertion to avoid jammed methods text_field_norm = text_field if not text_field_norm.startswith("\n"): text_field_norm = "\n" + text_field_norm if not text_field_norm.endswith("\n"): text_field_norm = text_field_norm + "\n" sl, sc = line_col_from_index(idx) at_edits.append( {"startLine": sl, "startCol": sc, "endLine": sl, "endCol": sc, "newText": text_field_norm}) # do not mutate base_text when building atomic spans elif opx == "replace_range": if all(k in e for k in ("startLine", "startCol", "endLine", "endCol")): at_edits.append({ "startLine": int(e.get("startLine", 1)), "startCol": int(e.get("startCol", 1)), "endLine": int(e.get("endLine", 1)), "endCol": int(e.get("endCol", 1)), "newText": text_field }) else: return _with_norm(_err("missing_field", "replace_range requires startLine/startCol/endLine/endCol", normalized=normalized_for_echo, routing="mixed/text-first"), normalized_for_echo, routing="mixed/text-first") elif opx == "regex_replace": pattern = e.get("pattern") or "" try: regex_obj = re.compile(pattern, re.MULTILINE | ( re.IGNORECASE if e.get("ignore_case") else 0)) except Exception as ex: return _with_norm(_err("bad_regex", f"Invalid regex pattern: {ex}", normalized=normalized_for_echo, routing="mixed/text-first", extra={"hint": "Escape special chars or prefer structured delete for methods."}), normalized_for_echo, routing="mixed/text-first") m = regex_obj.search(base_text) if not m: continue # Expand $1, $2... in replacement using this match def _expand_dollars(rep: str, _m=m) -> str: return re.sub(r"\$(\d+)", lambda g: _m.group(int(g.group(1))) or "", rep) repl = _expand_dollars(text_field) sl, sc = line_col_from_index(m.start()) el, ec = line_col_from_index(m.end()) at_edits.append( {"startLine": sl, "startCol": sc, "endLine": el, "endCol": ec, "newText": repl}) # do not mutate base_text when building atomic spans elif opx in ("prepend", "append"): if opx == "prepend": sl, sc = 1, 1 at_edits.append( {"startLine": sl, "startCol": sc, "endLine": sl, "endCol": sc, "newText": text_field}) # prepend can be applied atomically without local mutation else: # Insert at true EOF position (handles both \n and \r\n correctly) eof_idx = len(base_text) sl, sc = line_col_from_index(eof_idx) new_text = ("\n" if not base_text.endswith( "\n") else "") + text_field at_edits.append( {"startLine": sl, "startCol": sc, "endLine": sl, "endCol": sc, "newText": new_text}) # do not mutate base_text when building atomic spans else: return _with_norm(_err("unknown_op", f"Unsupported text edit op: {opx}", normalized=normalized_for_echo, routing="mixed/text-first"), normalized_for_echo, routing="mixed/text-first") sha = hashlib.sha256(base_text.encode("utf-8")).hexdigest() if at_edits: params_text: dict[str, Any] = { "action": "apply_text_edits", "name": name, "path": path, "namespace": namespace, "scriptType": script_type, "edits": at_edits, "precondition_sha256": sha, "options": {"refresh": (options or {}).get("refresh", "debounced"), "validate": (options or {}).get("validate", "standard"), "applyMode": ("atomic" if len(at_edits) > 1 else (options or {}).get("applyMode", "sequential"))} } resp_text = send_command_with_retry( "manage_script", params_text) if not (isinstance(resp_text, dict) and resp_text.get("success")): return _with_norm(resp_text if isinstance(resp_text, dict) else {"success": False, "message": str(resp_text)}, normalized_for_echo, routing="mixed/text-first") # Optional sentinel reload removed (deprecated) except Exception as e: return _with_norm({"success": False, "message": f"Text edit conversion failed: {e}"}, normalized_for_echo, routing="mixed/text-first") if struct_edits: opts2 = dict(options or {}) # Prefer debounced background refresh unless explicitly overridden opts2.setdefault("refresh", "debounced") params_struct: dict[str, Any] = { "action": "edit", "name": name, "path": path, "namespace": namespace, "scriptType": script_type, "edits": struct_edits, "options": opts2 } resp_struct = send_command_with_retry( "manage_script", params_struct) if isinstance(resp_struct, dict) and resp_struct.get("success"): pass # Optional sentinel reload removed (deprecated) return _with_norm(resp_struct if isinstance(resp_struct, dict) else {"success": False, "message": str(resp_struct)}, normalized_for_echo, routing="mixed/text-first") return _with_norm({"success": True, "message": "Applied text edits (no structured ops)"}, normalized_for_echo, routing="mixed/text-first") # If the edits are text-ops, prefer sending them to Unity's apply_text_edits with precondition # so header guards and validation run on the C# side. # Supported conversions: anchor_insert, replace_range, regex_replace (first match only). text_ops = {(e.get("op") or e.get("operation") or e.get("type") or e.get( "mode") or "").strip().lower() for e in (edits or [])} structured_kinds = {"replace_class", "delete_class", "replace_method", "delete_method", "insert_method", "anchor_insert"} if not text_ops.issubset(structured_kinds): # Convert to apply_text_edits payload try: base_text = contents def line_col_from_index(idx: int) -> tuple[int, int]: # 1-based line/col against base buffer line = base_text.count("\n", 0, idx) + 1 last_nl = base_text.rfind("\n", 0, idx) col = (idx - (last_nl + 1)) + \ 1 if last_nl >= 0 else idx + 1 return line, col at_edits: list[dict[str, Any]] = [] import re as _re for e in edits or []: op = (e.get("op") or e.get("operation") or e.get( "type") or e.get("mode") or "").strip().lower() # aliasing for text field text_field = e.get("text") or e.get( "insert") or e.get("content") or "" if op == "anchor_insert": anchor = e.get("anchor") or "" position = (e.get("position") or "after").lower() # Use improved anchor matching logic with helpful errors, honoring ignore_case try: flags = re.MULTILINE | ( re.IGNORECASE if e.get("ignore_case") else 0) m = _find_best_anchor_match( anchor, base_text, flags, prefer_last=True) except Exception as ex: return _with_norm(_err("bad_regex", f"Invalid anchor regex: {ex}", normalized=normalized_for_echo, routing="text", extra={"hint": "Escape parentheses/braces or use a simpler anchor."}), normalized_for_echo, routing="text") if not m: return _with_norm({"success": False, "code": "anchor_not_found", "message": f"anchor not found: {anchor}"}, normalized_for_echo, routing="text") idx = m.start() if position == "before" else m.end() # Normalize insertion newlines if text_field and not text_field.startswith("\n"): text_field = "\n" + text_field if text_field and not text_field.endswith("\n"): text_field = text_field + "\n" sl, sc = line_col_from_index(idx) at_edits.append({ "startLine": sl, "startCol": sc, "endLine": sl, "endCol": sc, "newText": text_field or "" }) # Do not mutate base buffer when building an atomic batch elif op == "replace_range": # Directly forward if already in line/col form if "startLine" in e: at_edits.append({ "startLine": int(e.get("startLine", 1)), "startCol": int(e.get("startCol", 1)), "endLine": int(e.get("endLine", 1)), "endCol": int(e.get("endCol", 1)), "newText": text_field }) else: # If only indices provided, skip (we don't support index-based here) return _with_norm({"success": False, "code": "missing_field", "message": "replace_range requires startLine/startCol/endLine/endCol"}, normalized_for_echo, routing="text") elif op == "regex_replace": pattern = e.get("pattern") or "" repl = text_field flags = re.MULTILINE | ( re.IGNORECASE if e.get("ignore_case") else 0) # Early compile for clearer error messages try: regex_obj = re.compile(pattern, flags) except Exception as ex: return _with_norm(_err("bad_regex", f"Invalid regex pattern: {ex}", normalized=normalized_for_echo, routing="text", extra={"hint": "Escape special chars or prefer structured delete for methods."}), normalized_for_echo, routing="text") # Use smart anchor matching for consistent behavior with anchor_insert m = _find_best_anchor_match( pattern, base_text, flags, prefer_last=True) if not m: continue # Expand $1, $2... backrefs in replacement using the first match (consistent with mixed-path behavior) def _expand_dollars(rep: str, _m=m) -> str: return re.sub(r"\$(\d+)", lambda g: _m.group(int(g.group(1))) or "", rep) repl_expanded = _expand_dollars(repl) # Let C# side handle validation using Unity's built-in compiler services sl, sc = line_col_from_index(m.start()) el, ec = line_col_from_index(m.end()) at_edits.append({ "startLine": sl, "startCol": sc, "endLine": el, "endCol": ec, "newText": repl_expanded }) # Do not mutate base buffer when building an atomic batch else: return _with_norm({"success": False, "code": "unsupported_op", "message": f"Unsupported text edit op for server-side apply_text_edits: {op}"}, normalized_for_echo, routing="text") if not at_edits: return _with_norm({"success": False, "code": "no_spans", "message": "No applicable text edit spans computed (anchor not found or zero-length)."}, normalized_for_echo, routing="text") sha = hashlib.sha256(base_text.encode("utf-8")).hexdigest() params: dict[str, Any] = { "action": "apply_text_edits", "name": name, "path": path, "namespace": namespace, "scriptType": script_type, "edits": at_edits, "precondition_sha256": sha, "options": { "refresh": (options or {}).get("refresh", "debounced"), "validate": (options or {}).get("validate", "standard"), "applyMode": ("atomic" if len(at_edits) > 1 else (options or {}).get("applyMode", "sequential")) } } resp = send_command_with_retry("manage_script", params) if isinstance(resp, dict) and resp.get("success"): pass # Optional sentinel reload removed (deprecated) return _with_norm( resp if isinstance(resp, dict) else { "success": False, "message": str(resp)}, normalized_for_echo, routing="text" ) except Exception as e: return _with_norm({"success": False, "code": "conversion_failed", "message": f"Edit conversion failed: {e}"}, normalized_for_echo, routing="text") # For regex_replace, honor preview consistently: if preview=true, always return diff without writing. # If confirm=false (default) and preview not requested, return diff and instruct confirm=true to apply. if "regex_replace" in text_ops and (preview or not (options or {}).get("confirm")): try: preview_text = _apply_edits_locally(contents, edits) import difflib diff = list(difflib.unified_diff(contents.splitlines( ), preview_text.splitlines(), fromfile="before", tofile="after", n=2)) if len(diff) > 800: diff = diff[:800] + ["... (diff truncated) ..."] if preview: return {"success": True, "message": "Preview only (no write)", "data": {"diff": "\n".join(diff), "normalizedEdits": normalized_for_echo}} return _with_norm({"success": False, "message": "Preview diff; set options.confirm=true to apply.", "data": {"diff": "\n".join(diff)}}, normalized_for_echo, routing="text") except Exception as e: return _with_norm({"success": False, "code": "preview_failed", "message": f"Preview failed: {e}"}, normalized_for_echo, routing="text") # 2) apply edits locally (only if not text-ops) try: new_contents = _apply_edits_locally(contents, edits) except Exception as e: return {"success": False, "message": f"Edit application failed: {e}"} # Short-circuit no-op edits to avoid false "applied" reports downstream if new_contents == contents: return _with_norm({ "success": True, "message": "No-op: contents unchanged", "data": {"no_op": True, "evidence": {"reason": "identical_content"}} }, normalized_for_echo, routing="text") if preview: # Produce a compact unified diff limited to small context import difflib a = contents.splitlines() b = new_contents.splitlines() diff = list(difflib.unified_diff( a, b, fromfile="before", tofile="after", n=3)) # Limit diff size to keep responses small if len(diff) > 2000: diff = diff[:2000] + ["... (diff truncated) ..."] return {"success": True, "message": "Preview only (no write)", "data": {"diff": "\n".join(diff), "normalizedEdits": normalized_for_echo}} # 3) update to Unity # Default refresh/validate for natural usage on text path as well options = dict(options or {}) options.setdefault("validate", "standard") options.setdefault("refresh", "debounced") # Compute the SHA of the current file contents for the precondition old_lines = contents.splitlines(keepends=True) end_line = len(old_lines) + 1 # 1-based exclusive end sha = hashlib.sha256(contents.encode("utf-8")).hexdigest() # Apply a whole-file text edit rather than the deprecated 'update' action params = { "action": "apply_text_edits", "name": name, "path": path, "namespace": namespace, "scriptType": script_type, "edits": [ { "startLine": 1, "startCol": 1, "endLine": end_line, "endCol": 1, "newText": new_contents, } ], "precondition_sha256": sha, "options": options or {"validate": "standard", "refresh": "debounced"}, } write_resp = send_command_with_retry("manage_script", params) if isinstance(write_resp, dict) and write_resp.get("success"): pass # Optional sentinel reload removed (deprecated) return _with_norm( write_resp if isinstance(write_resp, dict) else {"success": False, "message": str(write_resp)}, normalized_for_echo, routing="text", ) ```