#
tokens: 47173/50000 12/263 files (page 5/13)
lines: off (toggle) GitHub
raw markdown copy
This is page 5 of 13. Use http://codebase.md/justinpbarnett/unity-mcp?page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── prompts
│   │   ├── nl-unity-suite-nl.md
│   │   └── nl-unity-suite-t.md
│   └── settings.json
├── .github
│   ├── scripts
│   │   └── mark_skipped.py
│   └── workflows
│       ├── bump-version.yml
│       ├── claude-nl-suite.yml
│       ├── github-repo-stats.yml
│       └── unity-tests.yml
├── .gitignore
├── deploy-dev.bat
├── docs
│   ├── CURSOR_HELP.md
│   ├── CUSTOM_TOOLS.md
│   ├── README-DEV-zh.md
│   ├── README-DEV.md
│   ├── screenshots
│   │   ├── v5_01_uninstall.png
│   │   ├── v5_02_install.png
│   │   ├── v5_03_open_mcp_window.png
│   │   ├── v5_04_rebuild_mcp_server.png
│   │   ├── v5_05_rebuild_success.png
│   │   ├── v6_2_create_python_tools_asset.png
│   │   ├── v6_2_python_tools_asset.png
│   │   ├── v6_new_ui_asset_store_version.png
│   │   ├── v6_new_ui_dark.png
│   │   └── v6_new_ui_light.png
│   ├── TELEMETRY.md
│   ├── v5_MIGRATION.md
│   └── v6_NEW_UI_CHANGES.md
├── LICENSE
├── logo.png
├── mcp_source.py
├── MCPForUnity
│   ├── Editor
│   │   ├── AssemblyInfo.cs
│   │   ├── AssemblyInfo.cs.meta
│   │   ├── Data
│   │   │   ├── DefaultServerConfig.cs
│   │   │   ├── DefaultServerConfig.cs.meta
│   │   │   ├── McpClients.cs
│   │   │   ├── McpClients.cs.meta
│   │   │   ├── PythonToolsAsset.cs
│   │   │   └── PythonToolsAsset.cs.meta
│   │   ├── Data.meta
│   │   ├── Dependencies
│   │   │   ├── DependencyManager.cs
│   │   │   ├── DependencyManager.cs.meta
│   │   │   ├── Models
│   │   │   │   ├── DependencyCheckResult.cs
│   │   │   │   ├── DependencyCheckResult.cs.meta
│   │   │   │   ├── DependencyStatus.cs
│   │   │   │   └── DependencyStatus.cs.meta
│   │   │   ├── Models.meta
│   │   │   ├── PlatformDetectors
│   │   │   │   ├── IPlatformDetector.cs
│   │   │   │   ├── IPlatformDetector.cs.meta
│   │   │   │   ├── LinuxPlatformDetector.cs
│   │   │   │   ├── LinuxPlatformDetector.cs.meta
│   │   │   │   ├── MacOSPlatformDetector.cs
│   │   │   │   ├── MacOSPlatformDetector.cs.meta
│   │   │   │   ├── PlatformDetectorBase.cs
│   │   │   │   ├── PlatformDetectorBase.cs.meta
│   │   │   │   ├── WindowsPlatformDetector.cs
│   │   │   │   └── WindowsPlatformDetector.cs.meta
│   │   │   └── PlatformDetectors.meta
│   │   ├── Dependencies.meta
│   │   ├── External
│   │   │   ├── Tommy.cs
│   │   │   └── Tommy.cs.meta
│   │   ├── External.meta
│   │   ├── Helpers
│   │   │   ├── AssetPathUtility.cs
│   │   │   ├── AssetPathUtility.cs.meta
│   │   │   ├── CodexConfigHelper.cs
│   │   │   ├── CodexConfigHelper.cs.meta
│   │   │   ├── ConfigJsonBuilder.cs
│   │   │   ├── ConfigJsonBuilder.cs.meta
│   │   │   ├── ExecPath.cs
│   │   │   ├── ExecPath.cs.meta
│   │   │   ├── GameObjectSerializer.cs
│   │   │   ├── GameObjectSerializer.cs.meta
│   │   │   ├── McpConfigFileHelper.cs
│   │   │   ├── McpConfigFileHelper.cs.meta
│   │   │   ├── McpConfigurationHelper.cs
│   │   │   ├── McpConfigurationHelper.cs.meta
│   │   │   ├── McpLog.cs
│   │   │   ├── McpLog.cs.meta
│   │   │   ├── McpPathResolver.cs
│   │   │   ├── McpPathResolver.cs.meta
│   │   │   ├── PackageDetector.cs
│   │   │   ├── PackageDetector.cs.meta
│   │   │   ├── PackageInstaller.cs
│   │   │   ├── PackageInstaller.cs.meta
│   │   │   ├── PortManager.cs
│   │   │   ├── PortManager.cs.meta
│   │   │   ├── PythonToolSyncProcessor.cs
│   │   │   ├── PythonToolSyncProcessor.cs.meta
│   │   │   ├── Response.cs
│   │   │   ├── Response.cs.meta
│   │   │   ├── ServerInstaller.cs
│   │   │   ├── ServerInstaller.cs.meta
│   │   │   ├── ServerPathResolver.cs
│   │   │   ├── ServerPathResolver.cs.meta
│   │   │   ├── TelemetryHelper.cs
│   │   │   ├── TelemetryHelper.cs.meta
│   │   │   ├── Vector3Helper.cs
│   │   │   └── Vector3Helper.cs.meta
│   │   ├── Helpers.meta
│   │   ├── Importers
│   │   │   ├── PythonFileImporter.cs
│   │   │   └── PythonFileImporter.cs.meta
│   │   ├── Importers.meta
│   │   ├── MCPForUnity.Editor.asmdef
│   │   ├── MCPForUnity.Editor.asmdef.meta
│   │   ├── MCPForUnityBridge.cs
│   │   ├── MCPForUnityBridge.cs.meta
│   │   ├── Models
│   │   │   ├── Command.cs
│   │   │   ├── Command.cs.meta
│   │   │   ├── McpClient.cs
│   │   │   ├── McpClient.cs.meta
│   │   │   ├── McpConfig.cs
│   │   │   ├── McpConfig.cs.meta
│   │   │   ├── MCPConfigServer.cs
│   │   │   ├── MCPConfigServer.cs.meta
│   │   │   ├── MCPConfigServers.cs
│   │   │   ├── MCPConfigServers.cs.meta
│   │   │   ├── McpStatus.cs
│   │   │   ├── McpStatus.cs.meta
│   │   │   ├── McpTypes.cs
│   │   │   ├── McpTypes.cs.meta
│   │   │   ├── ServerConfig.cs
│   │   │   └── ServerConfig.cs.meta
│   │   ├── Models.meta
│   │   ├── Resources
│   │   │   ├── McpForUnityResourceAttribute.cs
│   │   │   ├── McpForUnityResourceAttribute.cs.meta
│   │   │   ├── MenuItems
│   │   │   │   ├── GetMenuItems.cs
│   │   │   │   └── GetMenuItems.cs.meta
│   │   │   ├── MenuItems.meta
│   │   │   ├── Tests
│   │   │   │   ├── GetTests.cs
│   │   │   │   └── GetTests.cs.meta
│   │   │   └── Tests.meta
│   │   ├── Resources.meta
│   │   ├── Services
│   │   │   ├── BridgeControlService.cs
│   │   │   ├── BridgeControlService.cs.meta
│   │   │   ├── ClientConfigurationService.cs
│   │   │   ├── ClientConfigurationService.cs.meta
│   │   │   ├── IBridgeControlService.cs
│   │   │   ├── IBridgeControlService.cs.meta
│   │   │   ├── IClientConfigurationService.cs
│   │   │   ├── IClientConfigurationService.cs.meta
│   │   │   ├── IPackageUpdateService.cs
│   │   │   ├── IPackageUpdateService.cs.meta
│   │   │   ├── IPathResolverService.cs
│   │   │   ├── IPathResolverService.cs.meta
│   │   │   ├── IPythonToolRegistryService.cs
│   │   │   ├── IPythonToolRegistryService.cs.meta
│   │   │   ├── ITestRunnerService.cs
│   │   │   ├── ITestRunnerService.cs.meta
│   │   │   ├── IToolSyncService.cs
│   │   │   ├── IToolSyncService.cs.meta
│   │   │   ├── MCPServiceLocator.cs
│   │   │   ├── MCPServiceLocator.cs.meta
│   │   │   ├── PackageUpdateService.cs
│   │   │   ├── PackageUpdateService.cs.meta
│   │   │   ├── PathResolverService.cs
│   │   │   ├── PathResolverService.cs.meta
│   │   │   ├── PythonToolRegistryService.cs
│   │   │   ├── PythonToolRegistryService.cs.meta
│   │   │   ├── TestRunnerService.cs
│   │   │   ├── TestRunnerService.cs.meta
│   │   │   ├── ToolSyncService.cs
│   │   │   └── ToolSyncService.cs.meta
│   │   ├── Services.meta
│   │   ├── Setup
│   │   │   ├── SetupWizard.cs
│   │   │   ├── SetupWizard.cs.meta
│   │   │   ├── SetupWizardWindow.cs
│   │   │   └── SetupWizardWindow.cs.meta
│   │   ├── Setup.meta
│   │   ├── Tools
│   │   │   ├── CommandRegistry.cs
│   │   │   ├── CommandRegistry.cs.meta
│   │   │   ├── ExecuteMenuItem.cs
│   │   │   ├── ExecuteMenuItem.cs.meta
│   │   │   ├── ManageAsset.cs
│   │   │   ├── ManageAsset.cs.meta
│   │   │   ├── ManageEditor.cs
│   │   │   ├── ManageEditor.cs.meta
│   │   │   ├── ManageGameObject.cs
│   │   │   ├── ManageGameObject.cs.meta
│   │   │   ├── ManageScene.cs
│   │   │   ├── ManageScene.cs.meta
│   │   │   ├── ManageScript.cs
│   │   │   ├── ManageScript.cs.meta
│   │   │   ├── ManageShader.cs
│   │   │   ├── ManageShader.cs.meta
│   │   │   ├── McpForUnityToolAttribute.cs
│   │   │   ├── McpForUnityToolAttribute.cs.meta
│   │   │   ├── Prefabs
│   │   │   │   ├── ManagePrefabs.cs
│   │   │   │   └── ManagePrefabs.cs.meta
│   │   │   ├── Prefabs.meta
│   │   │   ├── ReadConsole.cs
│   │   │   ├── ReadConsole.cs.meta
│   │   │   ├── RunTests.cs
│   │   │   └── RunTests.cs.meta
│   │   ├── Tools.meta
│   │   ├── Windows
│   │   │   ├── ManualConfigEditorWindow.cs
│   │   │   ├── ManualConfigEditorWindow.cs.meta
│   │   │   ├── MCPForUnityEditorWindow.cs
│   │   │   ├── MCPForUnityEditorWindow.cs.meta
│   │   │   ├── MCPForUnityEditorWindowNew.cs
│   │   │   ├── MCPForUnityEditorWindowNew.cs.meta
│   │   │   ├── MCPForUnityEditorWindowNew.uss
│   │   │   ├── MCPForUnityEditorWindowNew.uss.meta
│   │   │   ├── MCPForUnityEditorWindowNew.uxml
│   │   │   ├── MCPForUnityEditorWindowNew.uxml.meta
│   │   │   ├── VSCodeManualSetupWindow.cs
│   │   │   └── VSCodeManualSetupWindow.cs.meta
│   │   └── Windows.meta
│   ├── Editor.meta
│   ├── package.json
│   ├── package.json.meta
│   ├── README.md
│   ├── README.md.meta
│   ├── Runtime
│   │   ├── MCPForUnity.Runtime.asmdef
│   │   ├── MCPForUnity.Runtime.asmdef.meta
│   │   ├── Serialization
│   │   │   ├── UnityTypeConverters.cs
│   │   │   └── UnityTypeConverters.cs.meta
│   │   └── Serialization.meta
│   ├── Runtime.meta
│   └── UnityMcpServer~
│       └── src
│           ├── __init__.py
│           ├── config.py
│           ├── Dockerfile
│           ├── models.py
│           ├── module_discovery.py
│           ├── port_discovery.py
│           ├── pyproject.toml
│           ├── pyrightconfig.json
│           ├── registry
│           │   ├── __init__.py
│           │   ├── resource_registry.py
│           │   └── tool_registry.py
│           ├── reload_sentinel.py
│           ├── resources
│           │   ├── __init__.py
│           │   ├── menu_items.py
│           │   └── tests.py
│           ├── server_version.txt
│           ├── server.py
│           ├── telemetry_decorator.py
│           ├── telemetry.py
│           ├── test_telemetry.py
│           ├── tools
│           │   ├── __init__.py
│           │   ├── execute_menu_item.py
│           │   ├── manage_asset.py
│           │   ├── manage_editor.py
│           │   ├── manage_gameobject.py
│           │   ├── manage_prefabs.py
│           │   ├── manage_scene.py
│           │   ├── manage_script.py
│           │   ├── manage_shader.py
│           │   ├── read_console.py
│           │   ├── resource_tools.py
│           │   ├── run_tests.py
│           │   └── script_apply_edits.py
│           ├── unity_connection.py
│           └── uv.lock
├── prune_tool_results.py
├── README-zh.md
├── README.md
├── restore-dev.bat
├── scripts
│   └── validate-nlt-coverage.sh
├── test_unity_socket_framing.py
├── TestProjects
│   └── UnityMCPTests
│       ├── .gitignore
│       ├── Assets
│       │   ├── Editor.meta
│       │   ├── Scenes
│       │   │   ├── SampleScene.unity
│       │   │   └── SampleScene.unity.meta
│       │   ├── Scenes.meta
│       │   ├── Scripts
│       │   │   ├── Hello.cs
│       │   │   ├── Hello.cs.meta
│       │   │   ├── LongUnityScriptClaudeTest.cs
│       │   │   ├── LongUnityScriptClaudeTest.cs.meta
│       │   │   ├── TestAsmdef
│       │   │   │   ├── CustomComponent.cs
│       │   │   │   ├── CustomComponent.cs.meta
│       │   │   │   ├── TestAsmdef.asmdef
│       │   │   │   └── TestAsmdef.asmdef.meta
│       │   │   └── TestAsmdef.meta
│       │   ├── Scripts.meta
│       │   ├── Tests
│       │   │   ├── EditMode
│       │   │   │   ├── Data
│       │   │   │   │   ├── PythonToolsAssetTests.cs
│       │   │   │   │   └── PythonToolsAssetTests.cs.meta
│       │   │   │   ├── Data.meta
│       │   │   │   ├── Helpers
│       │   │   │   │   ├── CodexConfigHelperTests.cs
│       │   │   │   │   ├── CodexConfigHelperTests.cs.meta
│       │   │   │   │   ├── WriteToConfigTests.cs
│       │   │   │   │   └── WriteToConfigTests.cs.meta
│       │   │   │   ├── Helpers.meta
│       │   │   │   ├── MCPForUnityTests.Editor.asmdef
│       │   │   │   ├── MCPForUnityTests.Editor.asmdef.meta
│       │   │   │   ├── Resources
│       │   │   │   │   ├── GetMenuItemsTests.cs
│       │   │   │   │   └── GetMenuItemsTests.cs.meta
│       │   │   │   ├── Resources.meta
│       │   │   │   ├── Services
│       │   │   │   │   ├── PackageUpdateServiceTests.cs
│       │   │   │   │   ├── PackageUpdateServiceTests.cs.meta
│       │   │   │   │   ├── PythonToolRegistryServiceTests.cs
│       │   │   │   │   ├── PythonToolRegistryServiceTests.cs.meta
│       │   │   │   │   ├── ToolSyncServiceTests.cs
│       │   │   │   │   └── ToolSyncServiceTests.cs.meta
│       │   │   │   ├── Services.meta
│       │   │   │   ├── Tools
│       │   │   │   │   ├── AIPropertyMatchingTests.cs
│       │   │   │   │   ├── AIPropertyMatchingTests.cs.meta
│       │   │   │   │   ├── CommandRegistryTests.cs
│       │   │   │   │   ├── CommandRegistryTests.cs.meta
│       │   │   │   │   ├── ComponentResolverTests.cs
│       │   │   │   │   ├── ComponentResolverTests.cs.meta
│       │   │   │   │   ├── ExecuteMenuItemTests.cs
│       │   │   │   │   ├── ExecuteMenuItemTests.cs.meta
│       │   │   │   │   ├── ManageGameObjectTests.cs
│       │   │   │   │   ├── ManageGameObjectTests.cs.meta
│       │   │   │   │   ├── ManagePrefabsTests.cs
│       │   │   │   │   ├── ManagePrefabsTests.cs.meta
│       │   │   │   │   ├── ManageScriptValidationTests.cs
│       │   │   │   │   └── ManageScriptValidationTests.cs.meta
│       │   │   │   ├── Tools.meta
│       │   │   │   ├── Windows
│       │   │   │   │   ├── ManualConfigJsonBuilderTests.cs
│       │   │   │   │   └── ManualConfigJsonBuilderTests.cs.meta
│       │   │   │   └── Windows.meta
│       │   │   └── EditMode.meta
│       │   └── Tests.meta
│       ├── Packages
│       │   └── manifest.json
│       └── ProjectSettings
│           ├── Packages
│           │   └── com.unity.testtools.codecoverage
│           │       └── Settings.json
│           └── ProjectVersion.txt
├── tests
│   ├── conftest.py
│   ├── test_edit_normalization_and_noop.py
│   ├── test_edit_strict_and_warnings.py
│   ├── test_find_in_file_minimal.py
│   ├── test_get_sha.py
│   ├── test_improved_anchor_matching.py
│   ├── test_logging_stdout.py
│   ├── test_manage_script_uri.py
│   ├── test_read_console_truncate.py
│   ├── test_read_resource_minimal.py
│   ├── test_resources_api.py
│   ├── test_script_editing.py
│   ├── test_script_tools.py
│   ├── test_telemetry_endpoint_validation.py
│   ├── test_telemetry_queue_worker.py
│   ├── test_telemetry_subaction.py
│   ├── test_transport_framing.py
│   └── test_validate_script_summary.py
├── tools
│   └── stress_mcp.py
└── UnityMcpBridge
    ├── Editor
    │   ├── AssemblyInfo.cs
    │   ├── AssemblyInfo.cs.meta
    │   ├── Data
    │   │   ├── DefaultServerConfig.cs
    │   │   ├── DefaultServerConfig.cs.meta
    │   │   ├── McpClients.cs
    │   │   └── McpClients.cs.meta
    │   ├── Data.meta
    │   ├── Dependencies
    │   │   ├── DependencyManager.cs
    │   │   ├── DependencyManager.cs.meta
    │   │   ├── Models
    │   │   │   ├── DependencyCheckResult.cs
    │   │   │   ├── DependencyCheckResult.cs.meta
    │   │   │   ├── DependencyStatus.cs
    │   │   │   └── DependencyStatus.cs.meta
    │   │   ├── Models.meta
    │   │   ├── PlatformDetectors
    │   │   │   ├── IPlatformDetector.cs
    │   │   │   ├── IPlatformDetector.cs.meta
    │   │   │   ├── LinuxPlatformDetector.cs
    │   │   │   ├── LinuxPlatformDetector.cs.meta
    │   │   │   ├── MacOSPlatformDetector.cs
    │   │   │   ├── MacOSPlatformDetector.cs.meta
    │   │   │   ├── PlatformDetectorBase.cs
    │   │   │   ├── PlatformDetectorBase.cs.meta
    │   │   │   ├── WindowsPlatformDetector.cs
    │   │   │   └── WindowsPlatformDetector.cs.meta
    │   │   └── PlatformDetectors.meta
    │   ├── Dependencies.meta
    │   ├── External
    │   │   ├── Tommy.cs
    │   │   └── Tommy.cs.meta
    │   ├── External.meta
    │   ├── Helpers
    │   │   ├── AssetPathUtility.cs
    │   │   ├── AssetPathUtility.cs.meta
    │   │   ├── CodexConfigHelper.cs
    │   │   ├── CodexConfigHelper.cs.meta
    │   │   ├── ConfigJsonBuilder.cs
    │   │   ├── ConfigJsonBuilder.cs.meta
    │   │   ├── ExecPath.cs
    │   │   ├── ExecPath.cs.meta
    │   │   ├── GameObjectSerializer.cs
    │   │   ├── GameObjectSerializer.cs.meta
    │   │   ├── McpConfigFileHelper.cs
    │   │   ├── McpConfigFileHelper.cs.meta
    │   │   ├── McpConfigurationHelper.cs
    │   │   ├── McpConfigurationHelper.cs.meta
    │   │   ├── McpLog.cs
    │   │   ├── McpLog.cs.meta
    │   │   ├── McpPathResolver.cs
    │   │   ├── McpPathResolver.cs.meta
    │   │   ├── PackageDetector.cs
    │   │   ├── PackageDetector.cs.meta
    │   │   ├── PackageInstaller.cs
    │   │   ├── PackageInstaller.cs.meta
    │   │   ├── PortManager.cs
    │   │   ├── PortManager.cs.meta
    │   │   ├── Response.cs
    │   │   ├── Response.cs.meta
    │   │   ├── ServerInstaller.cs
    │   │   ├── ServerInstaller.cs.meta
    │   │   ├── ServerPathResolver.cs
    │   │   ├── ServerPathResolver.cs.meta
    │   │   ├── TelemetryHelper.cs
    │   │   ├── TelemetryHelper.cs.meta
    │   │   ├── Vector3Helper.cs
    │   │   └── Vector3Helper.cs.meta
    │   ├── Helpers.meta
    │   ├── MCPForUnity.Editor.asmdef
    │   ├── MCPForUnity.Editor.asmdef.meta
    │   ├── MCPForUnityBridge.cs
    │   ├── MCPForUnityBridge.cs.meta
    │   ├── Models
    │   │   ├── Command.cs
    │   │   ├── Command.cs.meta
    │   │   ├── McpClient.cs
    │   │   ├── McpClient.cs.meta
    │   │   ├── McpConfig.cs
    │   │   ├── McpConfig.cs.meta
    │   │   ├── MCPConfigServer.cs
    │   │   ├── MCPConfigServer.cs.meta
    │   │   ├── MCPConfigServers.cs
    │   │   ├── MCPConfigServers.cs.meta
    │   │   ├── McpStatus.cs
    │   │   ├── McpStatus.cs.meta
    │   │   ├── McpTypes.cs
    │   │   ├── McpTypes.cs.meta
    │   │   ├── ServerConfig.cs
    │   │   └── ServerConfig.cs.meta
    │   ├── Models.meta
    │   ├── Setup
    │   │   ├── SetupWizard.cs
    │   │   ├── SetupWizard.cs.meta
    │   │   ├── SetupWizardWindow.cs
    │   │   └── SetupWizardWindow.cs.meta
    │   ├── Setup.meta
    │   ├── Tools
    │   │   ├── CommandRegistry.cs
    │   │   ├── CommandRegistry.cs.meta
    │   │   ├── ManageAsset.cs
    │   │   ├── ManageAsset.cs.meta
    │   │   ├── ManageEditor.cs
    │   │   ├── ManageEditor.cs.meta
    │   │   ├── ManageGameObject.cs
    │   │   ├── ManageGameObject.cs.meta
    │   │   ├── ManageScene.cs
    │   │   ├── ManageScene.cs.meta
    │   │   ├── ManageScript.cs
    │   │   ├── ManageScript.cs.meta
    │   │   ├── ManageShader.cs
    │   │   ├── ManageShader.cs.meta
    │   │   ├── McpForUnityToolAttribute.cs
    │   │   ├── McpForUnityToolAttribute.cs.meta
    │   │   ├── MenuItems
    │   │   │   ├── ManageMenuItem.cs
    │   │   │   ├── ManageMenuItem.cs.meta
    │   │   │   ├── MenuItemExecutor.cs
    │   │   │   ├── MenuItemExecutor.cs.meta
    │   │   │   ├── MenuItemsReader.cs
    │   │   │   └── MenuItemsReader.cs.meta
    │   │   ├── MenuItems.meta
    │   │   ├── Prefabs
    │   │   │   ├── ManagePrefabs.cs
    │   │   │   └── ManagePrefabs.cs.meta
    │   │   ├── Prefabs.meta
    │   │   ├── ReadConsole.cs
    │   │   └── ReadConsole.cs.meta
    │   ├── Tools.meta
    │   ├── Windows
    │   │   ├── ManualConfigEditorWindow.cs
    │   │   ├── ManualConfigEditorWindow.cs.meta
    │   │   ├── MCPForUnityEditorWindow.cs
    │   │   ├── MCPForUnityEditorWindow.cs.meta
    │   │   ├── VSCodeManualSetupWindow.cs
    │   │   └── VSCodeManualSetupWindow.cs.meta
    │   └── Windows.meta
    ├── Editor.meta
    ├── package.json
    ├── package.json.meta
    ├── README.md
    ├── README.md.meta
    ├── Runtime
    │   ├── MCPForUnity.Runtime.asmdef
    │   ├── MCPForUnity.Runtime.asmdef.meta
    │   ├── Serialization
    │   │   ├── UnityTypeConverters.cs
    │   │   └── UnityTypeConverters.cs.meta
    │   └── Serialization.meta
    ├── Runtime.meta
    └── UnityMcpServer~
        └── src
            ├── __init__.py
            ├── config.py
            ├── Dockerfile
            ├── port_discovery.py
            ├── pyproject.toml
            ├── pyrightconfig.json
            ├── registry
            │   ├── __init__.py
            │   └── tool_registry.py
            ├── reload_sentinel.py
            ├── server_version.txt
            ├── server.py
            ├── telemetry_decorator.py
            ├── telemetry.py
            ├── test_telemetry.py
            ├── tools
            │   ├── __init__.py
            │   ├── manage_asset.py
            │   ├── manage_editor.py
            │   ├── manage_gameobject.py
            │   ├── manage_menu_item.py
            │   ├── manage_prefabs.py
            │   ├── manage_scene.py
            │   ├── manage_script.py
            │   ├── manage_shader.py
            │   ├── read_console.py
            │   ├── resource_tools.py
            │   └── script_apply_edits.py
            ├── unity_connection.py
            └── uv.lock
```

# Files

--------------------------------------------------------------------------------
/docs/CUSTOM_TOOLS.md:
--------------------------------------------------------------------------------

```markdown
# Adding Custom Tools to MCP for Unity

MCP for Unity supports auto-discovery of custom tools using decorators (Python) and attributes (C#). This allows you to easily extend the MCP server with your own tools.

Be sure to review the developer README first:

| [English](README-DEV.md) | [简体中文](README-DEV-zh.md) |
|---------------------------|------------------------------|

---

# Part 1: How to Use (Quick Start Guide)

This section shows you how to add custom tools to your Unity project.

## Step 1: Create a PythonToolsAsset

First, create a ScriptableObject to manage your Python tools:

1. In Unity, right-click in the Project window
2. Select **Assets > Create > MCP For Unity > Python Tools**
3. Name it (e.g., `MyPythonTools`)

![Create Python Tools Asset](screenshots/v6_2_create_python_tools_asset.png)

## Step 2: Create Your Python Tool File

Create a Python file **anywhere in your Unity project**. For example, `Assets/Editor/MyTools/my_custom_tool.py`:

```python
from typing import Annotated, Any
from mcp.server.fastmcp import Context
from registry import mcp_for_unity_tool
from unity_connection import send_command_with_retry

@mcp_for_unity_tool(
    description="My custom tool that does something amazing"
)
async def my_custom_tool(
    ctx: Context,
    param1: Annotated[str, "Description of param1"],
    param2: Annotated[int, "Description of param2"] | None = None
) -> dict[str, Any]:
    await ctx.info(f"Processing my_custom_tool: {param1}")

    # Prepare parameters for Unity
    params = {
        "action": "do_something",
        "param1": param1,
        "param2": param2,
    }
    params = {k: v for k, v in params.items() if v is not None}

    # Send to Unity handler
    response = send_command_with_retry("my_custom_tool", params)
    return response if isinstance(response, dict) else {"success": False, "message": str(response)}
```

## Step 3: Add Python File to Asset

1. Select your `PythonToolsAsset` in the Project window
2. In the Inspector, expand **Python Files**
3. Drag your `.py` file into the list (or click **+** and select it)

![Python Tools Asset Inspector](screenshots/v6_2_python_tools_asset.png)

**Note:** If you can't see `.py` files in the object picker, go to **Window > MCP For Unity > Tool Sync > Reimport Python Files** to force Unity to recognize them as text assets.

## Step 4: Create C# Handler

Create a C# file anywhere in your Unity project (typically in `Editor/`):


```csharp
using Newtonsoft.Json.Linq;
using MCPForUnity.Editor.Helpers;

namespace MyProject.Editor.CustomTools
{
    [McpForUnityTool("my_custom_tool")]
    public static class MyCustomTool
    {
        public static object HandleCommand(JObject @params)
        {
            string action = @params["action"]?.ToString();
            string param1 = @params["param1"]?.ToString();
            int? param2 = @params["param2"]?.ToObject<int?>();

            // Your custom logic here
            if (string.IsNullOrEmpty(param1))
            {
                return Response.Error("param1 is required");
            }

            // Do something amazing
            DoSomethingAmazing(param1, param2);

            return Response.Success("Custom tool executed successfully!");
        }

        private static void DoSomethingAmazing(string param1, int? param2)
        {
            // Your implementation
        }
    }
}
```

## Step 5: Rebuild the MCP Server

1. Open the MCP for Unity window in the Unity Editor
2. Click **Rebuild Server** to apply your changes
3. Your tool is now available to MCP clients!

**What happens automatically:**
- ✅ Python files are synced to the MCP server on Unity startup
- ✅ Python files are synced when modified (you would need to rebuild the server)
- ✅ C# handlers are discovered via reflection
- ✅ Tools are registered with the MCP server

## Complete Example: Screenshot Tool

Here's a complete example showing how to create a screenshot capture tool.

### Python File (`Assets/Editor/ScreenShots/Python/screenshot_tool.py`)

```python
from typing import Annotated, Any

from mcp.server.fastmcp import Context

from registry import mcp_for_unity_tool
from unity_connection import send_command_with_retry


@mcp_for_unity_tool(
    description="Capture screenshots in Unity, saving them as PNGs"
)
async def capture_screenshot(
    ctx: Context,
    filename: Annotated[str, "Screenshot filename without extension, e.g., screenshot_01"],
) -> dict[str, Any]:
    await ctx.info(f"Capturing screenshot: {filename}")

    params = {
        "action": "capture",
        "filename": filename,
    }
    params = {k: v for k, v in params.items() if v is not None}

    response = send_command_with_retry("capture_screenshot", params)
    return response if isinstance(response, dict) else {"success": False, "message": str(response)}
```

### Add to PythonToolsAsset

1. Select your `PythonToolsAsset`
2. Add `screenshot_tool.py` to the **Python Files** list
3. The file will automatically sync to the MCP server

### C# Handler (`Assets/Editor/ScreenShots/CaptureScreenshotTool.cs`)

```csharp
using System.IO;
using Newtonsoft.Json.Linq;
using UnityEngine;
using MCPForUnity.Editor.Tools;

namespace MyProject.Editor.Tools
{
    [McpForUnityTool("capture_screenshot")]
    public static class CaptureScreenshotTool
    {
        public static object HandleCommand(JObject @params)
        {
            string filename = @params["filename"]?.ToString();

            if (string.IsNullOrEmpty(filename))
            {
                return MCPForUnity.Editor.Helpers.Response.Error("filename is required");
            }

            try
            {
                string absolutePath = Path.Combine(Application.dataPath, "Screenshots", filename);
                Directory.CreateDirectory(Path.GetDirectoryName(absolutePath));

                // Find the main camera
                Camera camera = Camera.main;
                if (camera == null)
                {
                    camera = Object.FindFirstObjectByType<Camera>();
                }

                if (camera == null)
                {
                    return MCPForUnity.Editor.Helpers.Response.Error("No camera found in the scene");
                }

                // Create a RenderTexture
                RenderTexture rt = new RenderTexture(Screen.width, Screen.height, 24);
                camera.targetTexture = rt;

                // Render the camera's view
                camera.Render();

                // Read pixels from the RenderTexture
                RenderTexture.active = rt;
                Texture2D screenshot = new Texture2D(Screen.width, Screen.height, TextureFormat.RGB24, false);
                screenshot.ReadPixels(new Rect(0, 0, Screen.width, Screen.height), 0, 0);
                screenshot.Apply();

                // Clean up
                camera.targetTexture = null;
                RenderTexture.active = null;
                Object.DestroyImmediate(rt);

                // Save to file
                byte[] bytes = screenshot.EncodeToPNG();
                File.WriteAllBytes(absolutePath, bytes);
                Object.DestroyImmediate(screenshot);

                return MCPForUnity.Editor.Helpers.Response.Success($"Screenshot saved to {absolutePath}", new
                {
                    path = absolutePath,
                });
            }
            catch (System.Exception ex)
            {
                return MCPForUnity.Editor.Helpers.Response.Error($"Failed to capture screenshot: {ex.Message}");
            }
        }
    }
}
```

### Rebuild and Test

1. Open the MCP for Unity window
2. Click **Rebuild Server**
3. Test your tool from your MCP client!

---

# Part 2: How It Works (Technical Details)

This section explains the technical implementation of the custom tools system.

## Python Side: Decorator System

### The `@mcp_for_unity_tool` Decorator

The decorator automatically registers your function as an MCP tool:

```python
@mcp_for_unity_tool(
    name="custom_name",          # Optional: function name used by default
    description="Tool description",  # Required: describe what the tool does
)
```

**How it works:**
- Auto-generates the tool name from the function name (e.g., `my_custom_tool`)
- Registers the tool with FastMCP during module import
- Supports all FastMCP `mcp.tool` decorator options: <https://gofastmcp.com/servers/tools#tools>

**Note:** All tools should have the `description` field. It's not strictly required, however, that parameter is the best place to define a description so that most MCP clients can read it. See [issue #289](https://github.com/CoplayDev/unity-mcp/issues/289).

### Auto-Discovery

Python tools are automatically discovered when:
- The Python file is added to a `PythonToolsAsset`
- The file is synced to `MCPForUnity/UnityMcpServer~/src/tools/custom/`
- The file is imported during server startup
- The decorator `@mcp_for_unity_tool` is used

### Sync System

The `PythonToolsAsset` system automatically syncs your Python files:

**When sync happens:**
- ✅ Unity starts up
- ✅ Python files are modified
- ✅ Python files are added/removed from the asset

**Manual controls:**
- **Sync Now:** Window > MCP For Unity > Tool Sync > Sync Python Tools
- **Toggle Auto-Sync:** Window > MCP For Unity > Tool Sync > Auto-Sync Python Tools
- **Reimport Python Files:** Window > MCP For Unity > Tool Sync > Reimport Python Files

**How it works:**
- Uses content hashing to detect changes (only syncs modified files)
- Files are copied to `MCPForUnity/UnityMcpServer~/src/tools/custom/`
- Stale files are automatically cleaned up

## C# Side: Attribute System

### The `[McpForUnityTool]` Attribute

The attribute marks your class as a tool handler:

```csharp
// Explicit command name
[McpForUnityTool("my_custom_tool")]
public static class MyCustomTool { }

// Auto-generated from class name (MyCustomTool → my_custom_tool)
[McpForUnityTool]
public static class MyCustomTool { }
```

### Auto-Discovery

C# handlers are automatically discovered when:
- The class has the `[McpForUnityTool]` attribute
- The class has a `public static HandleCommand(JObject)` method
- Unity loads the assembly containing the class

**How it works:**
- Unity scans all assemblies on startup
- Finds classes with `[McpForUnityTool]` attribute
- Registers them in the command registry
- Routes MCP commands to the appropriate handler

## Best Practices

### Python
- ✅ Use type hints with `Annotated` for parameter documentation
- ✅ Return `dict[str, Any]` with `{"success": bool, "message": str, "data": Any}`
- ✅ Use `ctx.info()` for logging
- ✅ Handle errors gracefully and return structured error responses
- ✅ Use `send_command_with_retry()` for Unity communication

### C#
- ✅ Use the `Response.Success()` and `Response.Error()` helper methods
- ✅ Validate input parameters before processing
- ✅ Use `@params["key"]?.ToObject<Type>()` for safe type conversion
- ✅ Return structured responses with meaningful data
- ✅ Handle exceptions and return error responses

## Debugging

### Python
- Check server logs: `~/Library/Application Support/UnityMCP/Logs/unity_mcp_server.log`
- Look for: `"Registered X MCP tools"` message on startup
- Use `ctx.info()` for debugging messages

### C#
- Check Unity Console for: `"MCP-FOR-UNITY: Auto-discovered X tools"` message
- Look for warnings about missing `HandleCommand` methods
- Use `Debug.Log()` in your handler for debugging

## Troubleshooting

**Tool not appearing:**
- **Python:** 
  - Ensure the `.py` file is added to a `PythonToolsAsset`
  - Check Unity Console for sync messages: "Python tools synced: X copied"
  - Verify file was synced to `UnityMcpServer~/src/tools/custom/`
  - Try manual sync: Window > MCP For Unity > Tool Sync > Sync Python Tools
  - Rebuild the server in the MCP for Unity window
- **C#:** 
  - Ensure the class has `[McpForUnityTool]` attribute
  - Ensure the class has a `public static HandleCommand(JObject)` method
  - Check Unity Console for: "MCP-FOR-UNITY: Auto-discovered X tools"

**Python files not showing in Inspector:**
- Go to **Window > MCP For Unity > Tool Sync > Reimport Python Files**
- This forces Unity to recognize `.py` files as TextAssets
- Check that `.py.meta` files show `ScriptedImporter` (not `DefaultImporter`)

**Sync not working:**
- Check if auto-sync is enabled: Window > MCP For Unity > Tool Sync > Auto-Sync Python Tools
- Look for errors in Unity Console
- Verify `PythonToolsAsset` has the correct files added

**Name conflicts:**
- Use explicit names in decorators/attributes to avoid conflicts
- Check registered tools: `CommandRegistry.GetAllCommandNames()` in C#

**Tool not being called:**
- Verify the command name matches between Python and C#
- Check that parameters are being passed correctly
- Look for errors in logs

```

--------------------------------------------------------------------------------
/UnityMcpBridge/Editor/Helpers/CodexConfigHelper.cs:
--------------------------------------------------------------------------------

```csharp
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using MCPForUnity.External.Tommy;
using Newtonsoft.Json;

namespace MCPForUnity.Editor.Helpers
{
    /// <summary>
    /// Codex CLI specific configuration helpers. Handles TOML snippet
    /// generation and lightweight parsing so Codex can join the auto-setup
    /// flow alongside JSON-based clients.
    /// </summary>
    public static class CodexConfigHelper
    {
        public static bool IsCodexConfigured(string pythonDir)
        {
            try
            {
                string basePath = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
                if (string.IsNullOrEmpty(basePath)) return false;

                string configPath = Path.Combine(basePath, ".codex", "config.toml");
                if (!File.Exists(configPath)) return false;

                string toml = File.ReadAllText(configPath);
                if (!TryParseCodexServer(toml, out _, out var args)) return false;

                string dir = McpConfigFileHelper.ExtractDirectoryArg(args);
                if (string.IsNullOrEmpty(dir)) return false;

                return McpConfigFileHelper.PathsEqual(dir, pythonDir);
            }
            catch
            {
                return false;
            }
        }

        public static string BuildCodexServerBlock(string uvPath, string serverSrc)
        {
            string argsArray = FormatTomlStringArray(new[] { "run", "--directory", serverSrc, "server.py" });

            var sb = new StringBuilder();
            sb.AppendLine("[mcp_servers.unityMCP]");
            sb.AppendLine($"command = \"{EscapeTomlString(uvPath)}\"");
            sb.AppendLine($"args = {argsArray}");
            sb.AppendLine($"startup_timeout_sec = 30");

            // Windows-specific environment block to help Codex locate needed paths
            try
            {
                if (Environment.OSVersion.Platform == PlatformID.Win32NT)
                {
                    string userProfile = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile) ?? string.Empty;
                    string appData = Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData) ?? string.Empty; // Roaming
                    string localAppData = Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData) ?? string.Empty;
                    string programData = Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData) ?? string.Empty;
                    string programFiles = Environment.GetFolderPath(Environment.SpecialFolder.ProgramFiles) ?? string.Empty;
                    string systemDrive = Environment.GetEnvironmentVariable("SystemDrive") ?? (Path.GetPathRoot(userProfile)?.TrimEnd('\\', '/') ?? "C:");
                    string systemRoot = Environment.GetEnvironmentVariable("SystemRoot") ?? Path.Combine(systemDrive + "\\", "Windows");
                    string comspec = Environment.GetEnvironmentVariable("COMSPEC") ?? Path.Combine(Environment.SystemDirectory ?? (systemRoot + "\\System32"), "cmd.exe");
                    string homeDrive = Environment.GetEnvironmentVariable("HOMEDRIVE");
                    string homePath = Environment.GetEnvironmentVariable("HOMEPATH");
                    if (string.IsNullOrEmpty(homeDrive))
                    {
                        homeDrive = systemDrive;
                    }
                    if (string.IsNullOrEmpty(homePath) && !string.IsNullOrEmpty(userProfile))
                    {
                        // Derive HOMEPATH from USERPROFILE (e.g., C:\\Users\\name -> \\Users\\name)
                        if (userProfile.StartsWith(homeDrive + "\\", StringComparison.OrdinalIgnoreCase))
                        {
                            homePath = userProfile.Substring(homeDrive.Length);
                        }
                        else
                        {
                            try
                            {
                                var root = Path.GetPathRoot(userProfile) ?? string.Empty; // e.g., C:\\
                                homePath = userProfile.Substring(root.Length - 1); // keep leading backslash
                            }
                            catch { homePath = "\\"; }
                        }
                    }

                    string powershell = Path.Combine(Environment.SystemDirectory ?? (systemRoot + "\\System32"), "WindowsPowerShell\\v1.0\\powershell.exe");
                    string pwsh = Path.Combine(programFiles, "PowerShell\\7\\pwsh.exe");

                    string tempDir = Path.Combine(localAppData, "Temp");

                    sb.AppendLine();
                    sb.AppendLine("[mcp_servers.unityMCP.env]");
                    sb.AppendLine($"SystemRoot = \"{EscapeTomlString(systemRoot)}\"");
                    sb.AppendLine($"APPDATA = \"{EscapeTomlString(appData)}\"");
                    sb.AppendLine($"COMSPEC = \"{EscapeTomlString(comspec)}\"");
                    sb.AppendLine($"HOMEDRIVE = \"{EscapeTomlString(homeDrive?.TrimEnd('\\') ?? string.Empty)}\"");
                    sb.AppendLine($"HOMEPATH = \"{EscapeTomlString(homePath ?? string.Empty)}\"");
                    sb.AppendLine($"LOCALAPPDATA = \"{EscapeTomlString(localAppData)}\"");
                    sb.AppendLine($"POWERSHELL = \"{EscapeTomlString(powershell)}\"");
                    sb.AppendLine($"PROGRAMDATA = \"{EscapeTomlString(programData)}\"");
                    sb.AppendLine($"PROGRAMFILES = \"{EscapeTomlString(programFiles)}\"");
                    sb.AppendLine($"PWSH = \"{EscapeTomlString(pwsh)}\"");
                    sb.AppendLine($"SYSTEMDRIVE = \"{EscapeTomlString(systemDrive)}\"");
                    sb.AppendLine($"SYSTEMROOT = \"{EscapeTomlString(systemRoot)}\"");
                    sb.AppendLine($"TEMP = \"{EscapeTomlString(tempDir)}\"");
                    sb.AppendLine($"TMP = \"{EscapeTomlString(tempDir)}\"");
                    sb.AppendLine($"USERPROFILE = \"{EscapeTomlString(userProfile)}\"");
                }
            }
            catch { /* best effort */ }

            return sb.ToString();
        }

        public static string UpsertCodexServerBlock(string existingToml, string newBlock)
        {
            if (string.IsNullOrWhiteSpace(existingToml))
            {
                // Default to snake_case section when creating new files
                return newBlock.TrimEnd() + Environment.NewLine;
            }

            StringBuilder sb = new StringBuilder();
            using StringReader reader = new StringReader(existingToml);
            string line;
            bool inTarget = false;
            bool replaced = false;

            // Support both TOML section casings and nested subtables (e.g., env)
            // Prefer the casing already present in the user's file; fall back to snake_case
            bool hasCamelSection = existingToml.IndexOf("[mcpServers.unityMCP]", StringComparison.OrdinalIgnoreCase) >= 0
                                   || existingToml.IndexOf("[mcpServers.unityMCP.", StringComparison.OrdinalIgnoreCase) >= 0;
            bool hasSnakeSection = existingToml.IndexOf("[mcp_servers.unityMCP]", StringComparison.OrdinalIgnoreCase) >= 0
                                   || existingToml.IndexOf("[mcp_servers.unityMCP.", StringComparison.OrdinalIgnoreCase) >= 0;
            bool preferCamel = hasCamelSection || (!hasSnakeSection && existingToml.IndexOf("[mcpServers]", StringComparison.OrdinalIgnoreCase) >= 0);

            // Prepare block variants matching the chosen casing, including nested tables
            string newBlockCamel = newBlock
                .Replace("[mcp_servers.unityMCP.env]", "[mcpServers.unityMCP.env]")
                .Replace("[mcp_servers.unityMCP]", "[mcpServers.unityMCP]");
            string newBlockEffective = preferCamel ? newBlockCamel : newBlock;

            static bool IsSection(string s)
            {
                string t = s.Trim();
                return t.StartsWith("[") && t.EndsWith("]") && !t.StartsWith("[[");
            }

            static string SectionName(string header)
            {
                string t = header.Trim();
                if (t.StartsWith("[") && t.EndsWith("]")) t = t.Substring(1, t.Length - 2);
                return t;
            }

            bool TargetOrChild(string section)
            {
                // Compare case-insensitively; accept both snake and camel as the same logical table
                string name = SectionName(section);
                return name.StartsWith("mcp_servers.unityMCP", StringComparison.OrdinalIgnoreCase)
                       || name.StartsWith("mcpServers.unityMCP", StringComparison.OrdinalIgnoreCase);
            }

            while ((line = reader.ReadLine()) != null)
            {
                string trimmed = line.Trim();
                bool isSection = IsSection(trimmed);
                if (isSection)
                {
                    // If we encounter the target section or any of its nested tables, mark/keep in-target
                    if (TargetOrChild(trimmed))
                    {
                        if (!replaced)
                        {
                            if (sb.Length > 0 && sb[^1] != '\n') sb.AppendLine();
                            sb.AppendLine(newBlockEffective.TrimEnd());
                            replaced = true;
                        }
                        inTarget = true;
                        continue;
                    }

                    // A new unrelated section ends the target region
                    if (inTarget)
                    {
                        inTarget = false;
                    }
                }

                if (inTarget)
                {
                    continue;
                }

                sb.AppendLine(line);
            }

            if (!replaced)
            {
                if (sb.Length > 0 && sb[^1] != '\n') sb.AppendLine();
                sb.AppendLine(newBlockEffective.TrimEnd());
            }

            return sb.ToString().TrimEnd() + Environment.NewLine;
        }

        public static bool TryParseCodexServer(string toml, out string command, out string[] args)
        {
            command = null;
            args = null;
            if (string.IsNullOrWhiteSpace(toml)) return false;

            try
            {
                using var reader = new StringReader(toml);
                TomlTable root = TOML.Parse(reader);
                if (root == null) return false;

                if (!TryGetTable(root, "mcp_servers", out var servers)
                    && !TryGetTable(root, "mcpServers", out servers))
                {
                    return false;
                }

                if (!TryGetTable(servers, "unityMCP", out var unity))
                {
                    return false;
                }

                command = GetTomlString(unity, "command");
                args = GetTomlStringArray(unity, "args");

                return !string.IsNullOrEmpty(command) && args != null;
            }
            catch (TomlParseException)
            {
                return false;
            }
            catch (TomlSyntaxException)
            {
                return false;
            }
            catch (FormatException)
            {
                return false;
            }
        }

        private static bool TryGetTable(TomlTable parent, string key, out TomlTable table)
        {
            table = null;
            if (parent == null) return false;

            if (parent.TryGetNode(key, out var node))
            {
                if (node is TomlTable tbl)
                {
                    table = tbl;
                    return true;
                }

                if (node is TomlArray array)
                {
                    var firstTable = array.Children.OfType<TomlTable>().FirstOrDefault();
                    if (firstTable != null)
                    {
                        table = firstTable;
                        return true;
                    }
                }
            }

            return false;
        }

        private static string GetTomlString(TomlTable table, string key)
        {
            if (table != null && table.TryGetNode(key, out var node))
            {
                if (node is TomlString str) return str.Value;
                if (node.HasValue) return node.ToString();
            }
            return null;
        }

        private static string[] GetTomlStringArray(TomlTable table, string key)
        {
            if (table == null) return null;
            if (!table.TryGetNode(key, out var node)) return null;

            if (node is TomlArray array)
            {
                List<string> values = new List<string>();
                foreach (TomlNode element in array.Children)
                {
                    if (element is TomlString str)
                    {
                        values.Add(str.Value);
                    }
                    else if (element.HasValue)
                    {
                        values.Add(element.ToString());
                    }
                }

                return values.Count > 0 ? values.ToArray() : Array.Empty<string>();
            }

            if (node is TomlString single)
            {
                return new[] { single.Value };
            }

            return null;
        }

        private static string FormatTomlStringArray(IEnumerable<string> values)
        {
            if (values == null) return "[]";
            StringBuilder sb = new StringBuilder();
            sb.Append('[');
            bool first = true;
            foreach (string value in values)
            {
                if (!first)
                {
                    sb.Append(", ");
                }
                sb.Append('"').Append(EscapeTomlString(value ?? string.Empty)).Append('"');
                first = false;
            }
            sb.Append(']');
            return sb.ToString();
        }

        private static string EscapeTomlString(string value)
        {
            if (string.IsNullOrEmpty(value)) return string.Empty;
            return value
                .Replace("\\", "\\\\")
                .Replace("\"", "\\\"");
        }

    }
}

```

--------------------------------------------------------------------------------
/MCPForUnity/Editor/Tools/CommandRegistry.cs:
--------------------------------------------------------------------------------

```csharp
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using MCPForUnity.Editor.Helpers;
using MCPForUnity.Editor.Resources;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace MCPForUnity.Editor.Tools
{
    /// <summary>
    /// Holds information about a registered command handler.
    /// </summary>
    class HandlerInfo
    {
        public string CommandName { get; }
        public Func<JObject, object> SyncHandler { get; }
        public Func<JObject, Task<object>> AsyncHandler { get; }

        public bool IsAsync => AsyncHandler != null;

        public HandlerInfo(string commandName, Func<JObject, object> syncHandler, Func<JObject, Task<object>> asyncHandler)
        {
            CommandName = commandName;
            SyncHandler = syncHandler;
            AsyncHandler = asyncHandler;
        }
    }

    /// <summary>
    /// Registry for all MCP command handlers via reflection.
    /// Handles both MCP tools and resources.
    /// </summary>
    public static class CommandRegistry
    {
        private static readonly Dictionary<string, HandlerInfo> _handlers = new();
        private static bool _initialized = false;

        /// <summary>
        /// Initialize and auto-discover all tools and resources marked with
        /// [McpForUnityTool] or [McpForUnityResource]
        /// </summary>
        public static void Initialize()
        {
            if (_initialized) return;

            AutoDiscoverCommands();
            _initialized = true;
        }

        /// <summary>
        /// Convert PascalCase or camelCase to snake_case
        /// </summary>
        private static string ToSnakeCase(string name)
        {
            if (string.IsNullOrEmpty(name)) return name;

            // Insert underscore before uppercase letters (except first)
            var s1 = Regex.Replace(name, "(.)([A-Z][a-z]+)", "$1_$2");
            var s2 = Regex.Replace(s1, "([a-z0-9])([A-Z])", "$1_$2");
            return s2.ToLower();
        }

        /// <summary>
        /// Auto-discover all types with [McpForUnityTool] or [McpForUnityResource] attributes
        /// </summary>
        private static void AutoDiscoverCommands()
        {
            try
            {
                var allTypes = AppDomain.CurrentDomain.GetAssemblies()
                    .Where(a => !a.IsDynamic)
                    .SelectMany(a =>
                    {
                        try { return a.GetTypes(); }
                        catch { return new Type[0]; }
                    })
                    .ToList();

                // Discover tools
                var toolTypes = allTypes.Where(t => t.GetCustomAttribute<McpForUnityToolAttribute>() != null);
                int toolCount = 0;
                foreach (var type in toolTypes)
                {
                    if (RegisterCommandType(type, isResource: false))
                        toolCount++;
                }

                // Discover resources
                var resourceTypes = allTypes.Where(t => t.GetCustomAttribute<McpForUnityResourceAttribute>() != null);
                int resourceCount = 0;
                foreach (var type in resourceTypes)
                {
                    if (RegisterCommandType(type, isResource: true))
                        resourceCount++;
                }

                McpLog.Info($"Auto-discovered {toolCount} tools and {resourceCount} resources ({_handlers.Count} total handlers)");
            }
            catch (Exception ex)
            {
                McpLog.Error($"Failed to auto-discover MCP commands: {ex.Message}");
            }
        }

        /// <summary>
        /// Register a command type (tool or resource) with the registry.
        /// Returns true if successfully registered, false otherwise.
        /// </summary>
        private static bool RegisterCommandType(Type type, bool isResource)
        {
            string commandName;
            string typeLabel = isResource ? "resource" : "tool";

            // Get command name from appropriate attribute
            if (isResource)
            {
                var resourceAttr = type.GetCustomAttribute<McpForUnityResourceAttribute>();
                commandName = resourceAttr.ResourceName;
            }
            else
            {
                var toolAttr = type.GetCustomAttribute<McpForUnityToolAttribute>();
                commandName = toolAttr.CommandName;
            }

            // Auto-generate command name if not explicitly provided
            if (string.IsNullOrEmpty(commandName))
            {
                commandName = ToSnakeCase(type.Name);
            }

            // Check for duplicate command names
            if (_handlers.ContainsKey(commandName))
            {
                McpLog.Warn(
                    $"Duplicate command name '{commandName}' detected. " +
                    $"{typeLabel} {type.Name} will override previously registered handler."
                );
            }

            // Find HandleCommand method
            var method = type.GetMethod(
                "HandleCommand",
                BindingFlags.Public | BindingFlags.Static,
                null,
                new[] { typeof(JObject) },
                null
            );

            if (method == null)
            {
                McpLog.Warn(
                    $"MCP {typeLabel} {type.Name} is marked with [McpForUnity{(isResource ? "Resource" : "Tool")}] " +
                    $"but has no public static HandleCommand(JObject) method"
                );
                return false;
            }

            try
            {
                HandlerInfo handlerInfo;

                if (typeof(Task).IsAssignableFrom(method.ReturnType))
                {
                    var asyncHandler = CreateAsyncHandlerDelegate(method, commandName);
                    handlerInfo = new HandlerInfo(commandName, null, asyncHandler);
                }
                else
                {
                    var handler = (Func<JObject, object>)Delegate.CreateDelegate(
                        typeof(Func<JObject, object>),
                        method
                    );
                    handlerInfo = new HandlerInfo(commandName, handler, null);
                }

                _handlers[commandName] = handlerInfo;
                return true;
            }
            catch (Exception ex)
            {
                McpLog.Error($"Failed to register {typeLabel} {type.Name}: {ex.Message}");
                return false;
            }
        }

        /// <summary>
        /// Get a command handler by name
        /// </summary>
        private static HandlerInfo GetHandlerInfo(string commandName)
        {
            if (!_handlers.TryGetValue(commandName, out var handler))
            {
                throw new InvalidOperationException(
                    $"Unknown or unsupported command type: {commandName}"
                );
            }
            return handler;
        }

        /// <summary>
        /// Get a synchronous command handler by name.
        /// Throws if the command is asynchronous.
        /// </summary>
        /// <param name="commandName"></param>
        /// <returns></returns>
        /// <exception cref="InvalidOperationException"></exception>
        public static Func<JObject, object> GetHandler(string commandName)
        {
            var handlerInfo = GetHandlerInfo(commandName);
            if (handlerInfo.IsAsync)
            {
                throw new InvalidOperationException(
                    $"Command '{commandName}' is asynchronous and must be executed via ExecuteCommand"
                );
            }

            return handlerInfo.SyncHandler;
        }

        /// <summary>
        /// Execute a command handler, supporting both synchronous and asynchronous (coroutine) handlers.
        /// If the handler returns an IEnumerator, it will be executed as a coroutine.
        /// </summary>
        /// <param name="commandName">The command name to execute</param>
        /// <param name="params">Command parameters</param>
        /// <param name="tcs">TaskCompletionSource to complete when async operation finishes</param>
        /// <returns>The result for synchronous commands, or null for async commands (TCS will be completed later)</returns>
        public static object ExecuteCommand(string commandName, JObject @params, TaskCompletionSource<string> tcs)
        {
            var handlerInfo = GetHandlerInfo(commandName);

            if (handlerInfo.IsAsync)
            {
                ExecuteAsyncHandler(handlerInfo, @params, commandName, tcs);
                return null;
            }

            if (handlerInfo.SyncHandler == null)
            {
                throw new InvalidOperationException($"Handler for '{commandName}' does not provide a synchronous implementation");
            }

            return handlerInfo.SyncHandler(@params);
        }

        /// <summary>
        /// Create a delegate for an async handler method that returns Task or Task<T>.
        /// The delegate will invoke the method and await its completion, returning the result.
        /// </summary>
        /// <param name="method"></param>
        /// <param name="commandName"></param>
        /// <returns></returns>
        /// <exception cref="InvalidOperationException"></exception>
        private static Func<JObject, Task<object>> CreateAsyncHandlerDelegate(MethodInfo method, string commandName)
        {
            return async (JObject parameters) =>
            {
                object rawResult;

                try
                {
                    rawResult = method.Invoke(null, new object[] { parameters });
                }
                catch (TargetInvocationException ex)
                {
                    throw ex.InnerException ?? ex;
                }

                if (rawResult == null)
                {
                    return null;
                }

                if (rawResult is not Task task)
                {
                    throw new InvalidOperationException(
                        $"Async handler '{commandName}' returned an object that is not a Task"
                    );
                }

                await task.ConfigureAwait(true);

                var taskType = task.GetType();
                if (taskType.IsGenericType)
                {
                    var resultProperty = taskType.GetProperty("Result");
                    if (resultProperty != null)
                    {
                        return resultProperty.GetValue(task);
                    }
                }

                return null;
            };
        }

        private static void ExecuteAsyncHandler(
            HandlerInfo handlerInfo,
            JObject parameters,
            string commandName,
            TaskCompletionSource<string> tcs)
        {
            if (handlerInfo.AsyncHandler == null)
            {
                throw new InvalidOperationException($"Async handler for '{commandName}' is not configured correctly");
            }

            Task<object> handlerTask;

            try
            {
                handlerTask = handlerInfo.AsyncHandler(parameters);
            }
            catch (Exception ex)
            {
                ReportAsyncFailure(commandName, tcs, ex);
                return;
            }

            if (handlerTask == null)
            {
                CompleteAsyncCommand(commandName, tcs, null);
                return;
            }

            async void AwaitHandler()
            {
                try
                {
                    var finalResult = await handlerTask.ConfigureAwait(true);
                    CompleteAsyncCommand(commandName, tcs, finalResult);
                }
                catch (Exception ex)
                {
                    ReportAsyncFailure(commandName, tcs, ex);
                }
            }

            AwaitHandler();
        }

        /// <summary>
        /// Complete the TaskCompletionSource for an async command with a success result.
        /// </summary>
        /// <param name="commandName"></param>
        /// <param name="tcs"></param>
        /// <param name="result"></param>
        private static void CompleteAsyncCommand(string commandName, TaskCompletionSource<string> tcs, object result)
        {
            try
            {
                var response = new { status = "success", result };
                string json = JsonConvert.SerializeObject(response);

                if (!tcs.TrySetResult(json))
                {
                    McpLog.Warn($"TCS for async command '{commandName}' was already completed");
                }
            }
            catch (Exception ex)
            {
                McpLog.Error($"Error completing async command '{commandName}': {ex.Message}\n{ex.StackTrace}");
                ReportAsyncFailure(commandName, tcs, ex);
            }
        }

        /// <summary>
        /// Report an error that occurred during async command execution.
        /// Completes the TaskCompletionSource with an error response.
        /// </summary>
        /// <param name="commandName"></param>
        /// <param name="tcs"></param>
        /// <param name="ex"></param>
        private static void ReportAsyncFailure(string commandName, TaskCompletionSource<string> tcs, Exception ex)
        {
            McpLog.Error($"Error in async command '{commandName}': {ex.Message}\n{ex.StackTrace}");

            var errorResponse = new
            {
                status = "error",
                error = ex.Message,
                command = commandName,
                stackTrace = ex.StackTrace
            };

            string json;
            try
            {
                json = JsonConvert.SerializeObject(errorResponse);
            }
            catch (Exception serializationEx)
            {
                McpLog.Error($"Failed to serialize error response for '{commandName}': {serializationEx.Message}");
                json = "{\"status\":\"error\",\"error\":\"Failed to complete command\"}";
            }

            if (!tcs.TrySetResult(json))
            {
                McpLog.Warn($"TCS for async command '{commandName}' was already completed when trying to report error");
            }
        }
    }
}

```

--------------------------------------------------------------------------------
/.claude/prompts/nl-unity-suite-t.md:
--------------------------------------------------------------------------------

```markdown
# Unity T Editing Suite — Additive Test Design
You are running inside CI for the `unity-mcp` repo. Use only the tools allowed by the workflow. Work autonomously; do not prompt the user. Do NOT spawn subagents.

**Print this once, verbatim, early in the run:**
AllowedTools: Write,mcp__unity__manage_editor,mcp__unity__list_resources,mcp__unity__read_resource,mcp__unity__apply_text_edits,mcp__unity__script_apply_edits,mcp__unity__validate_script,mcp__unity__find_in_file,mcp__unity__read_console,mcp__unity__get_sha

---

## Mission
1) Pick target file (prefer):
   - `unity://path/Assets/Scripts/LongUnityScriptClaudeTest.cs`
2) Execute T tests T-A..T-J in order using minimal, precise edits that build on the NL pass state.
3) Validate each edit with `mcp__unity__validate_script(level:"standard")`.
4) **Report**: write one `<testcase>` XML fragment per test to `reports/<TESTID>_results.xml`. Do **not** read or edit `$JUNIT_OUT`.

**CRITICAL XML FORMAT REQUIREMENTS:**
- Each file must contain EXACTLY one `<testcase>` root element
- NO prologue, epilogue, code fences, or extra characters
- NO markdown formatting or explanations outside the XML
- Use this exact format:

```xml
<testcase name="T-D — End-of-Class Helper" classname="UnityMCP.NL-T">
  <system-out><![CDATA[
(evidence of what was accomplished)
  ]]></system-out>
</testcase>
```

- If test fails, include: `<failure message="reason"/>`
- TESTID must be one of: T-A, T-B, T-C, T-D, T-E, T-F, T-G, T-H, T-I, T-J
5) **NO RESTORATION** - tests build additively on previous state.
6) **STRICT FRAGMENT EMISSION** - After each test, immediately emit a clean XML file under `reports/<TESTID>_results.xml` with exactly one `<testcase>` whose `name` begins with the exact test id. No prologue/epilogue or fences. If the test fails, include a `<failure message="..."/>` and still emit.

---

## Environment & Paths (CI)
- Always pass: `project_root: "TestProjects/UnityMCPTests"` and `ctx: {}` on list/read/edit/validate.
- **Canonical URIs only**:
  - Primary: `unity://path/Assets/...` (never embed `project_root` in the URI)
  - Relative (when supported): `Assets/...`

CI provides:
- `$JUNIT_OUT=reports/junit-nl-suite.xml` (pre‑created; leave alone)
- `$MD_OUT=reports/junit-nl-suite.md` (synthesized from JUnit)

---

## Transcript Minimization Rules
- Do not restate tool JSON; summarize in ≤ 2 short lines.
- Never paste full file contents. For matches, include only the matched line and ±1 line.
- Prefer `mcp__unity__find_in_file` for targeting; avoid `mcp__unity__read_resource` unless strictly necessary. If needed, limit to `head_bytes ≤ 256` or `tail_lines ≤ 10`.
- Per‑test `system-out` ≤ 400 chars: brief status only (no SHA).
- Console evidence: fetch the last 10 lines with `include_stacktrace:false` and include ≤ 3 lines in the fragment.
- Avoid quoting multi‑line diffs; reference markers instead.
— Console scans: perform two reads — last 10 `log/info` lines and up to 3 `error` entries (use `include_stacktrace:false`); include ≤ 3 lines total in the fragment; if no errors, state "no errors".
— Final check is folded into T‑J: perform an errors‑only scan (with `include_stacktrace:false`) and include a single "no errors" line or up to 3 error lines within the T‑J fragment.

---

## Tool Mapping
- **Anchors/regex/structured**: `mcp__unity__script_apply_edits`
  - Allowed ops: `anchor_insert`, `replace_method`, `insert_method`, `delete_method`, `regex_replace`
  - For `anchor_insert`, always set `"position": "before"` or `"after"`.
- **Precise ranges / atomic batch**: `mcp__unity__apply_text_edits` (non‑overlapping ranges)
STRICT OP GUARDRAILS
- Do not use `anchor_replace`. Structured edits must be one of: `anchor_insert`, `replace_method`, `insert_method`, `delete_method`, `regex_replace`.
- For multi‑spot textual tweaks in one operation, compute non‑overlapping ranges with `mcp__unity__find_in_file` and use `mcp__unity__apply_text_edits`.

- **Hash-only**: `mcp__unity__get_sha` — returns `{sha256,lengthBytes,lastModifiedUtc}` without file body
- **Validation**: `mcp__unity__validate_script(level:"standard")`
- **Dynamic targeting**: Use `mcp__unity__find_in_file` to locate current positions of methods/markers

---

## Additive Test Design Principles

**Key Changes from Reset-Based:**
1. **Dynamic Targeting**: Use `find_in_file` to locate methods/content, never hardcode line numbers
2. **State Awareness**: Each test expects the file state left by the previous test
3. **Content-Based Operations**: Target methods by signature, classes by name, not coordinates
4. **Cumulative Validation**: Ensure the file remains structurally sound throughout the sequence
5. **Composability**: Tests demonstrate how operations work together in real workflows

**State Tracking:**
- Track file SHA after each test (`mcp__unity__get_sha`) and use it as a precondition
  for `apply_text_edits` in T‑F/T‑G/T‑I to exercise `stale_file` semantics. Do not include SHA values in report fragments.
- Use content signatures (method names, comment markers) to verify expected state
- Validate structural integrity after each major change

---

### T-A. Temporary Helper Lifecycle (Returns to State C)
**Goal**: Test insert → verify → delete cycle for temporary code
**Actions**:
- Find current position of `GetCurrentTarget()` method (may have shifted from NL-2 comment)
- Insert temporary helper: `private int __TempHelper(int a, int b) => a + b;`
- Verify helper method exists and compiles
- Delete helper method via structured delete operation
- **Expected final state**: Return to State C (helper removed, other changes intact)

### Late-Test Editing Rule
- When modifying a method body, use `mcp__unity__script_apply_edits`. If the method is expression-bodied (`=>`), convert it to a block or replace the whole method definition. After the edit, run `mcp__unity__validate_script` and rollback on error. Use `//` comments in inserted code.

### T-B. Method Body Interior Edit (Additive State D)
**Goal**: Edit method interior without affecting structure, on modified file
**Actions**:
- Use `find_in_file` to locate current `HasTarget()` method (modified in NL-1)
- Edit method body interior: change return statement to `return true; /* test modification */`
- Validate with `mcp__unity__validate_script(level:"standard")` for consistency
- Verify edit succeeded and file remains balanced
- **Expected final state**: State C + modified HasTarget() body

### T-C. Different Method Interior Edit (Additive State E)
**Goal**: Edit a different method to show operations don't interfere
**Actions**:
- Locate `ApplyBlend()` method using content search
- Edit interior line to add null check: `if (animator == null) return; // safety check`
- Preserve method signature and structure  
- **Expected final state**: State D + modified ApplyBlend() method

### T-D. End-of-Class Helper (Additive State F)
**Goal**: Add permanent helper method at class end
**Actions**:
- Use smart anchor matching to find current class-ending brace (after NL-3 tail comments)
- Insert permanent helper before class brace: `private void TestHelper() { /* placeholder */ }`
- Validate with `mcp__unity__validate_script(level:"standard")`
- **IMMEDIATELY** write clean XML fragment to `reports/T-D_results.xml` (no extra text). The `<testcase name>` must start with `T-D`. Include brief evidence in `system-out`.
- **Expected final state**: State E + TestHelper() method before class end

### T-E. Method Evolution Lifecycle (Additive State G)
**Goal**: Insert → modify → finalize a field + companion method
**Actions**:
- Insert field: `private int Counter = 0;`
- Update it: find and replace with `private int Counter = 42; // initialized`
- Add companion method: `private void IncrementCounter() { Counter++; }`
- **Expected final state**: State F + Counter field + IncrementCounter() method

### T-F. Atomic Multi-Edit (Additive State H)
**Goal**: Multiple coordinated edits in single atomic operation
**Actions**:
- Read current file state to compute precise ranges
- Atomic edit combining:
  1. Add comment in `HasTarget()`: `// validated access`  
  2. Add comment in `ApplyBlend()`: `// safe animation`
  3. Add final class comment: `// end of test modifications`
- All edits computed from same file snapshot, applied atomically
- **Expected final state**: State G + three coordinated comments
- After applying the atomic edits, run `validate_script(level:"standard")` and emit a clean fragment to `reports/T-F_results.xml` with a short summary.

### T-G. Path Normalization Test (No State Change)
**Goal**: Verify URI forms work equivalently on modified file
**Actions**:
- Make identical edit using `unity://path/Assets/Scripts/LongUnityScriptClaudeTest.cs`
- Then using `Assets/Scripts/LongUnityScriptClaudeTest.cs` 
- Second should return `stale_file`, retry with updated SHA
- Verify both URI forms target same file
- **Expected final state**: State H (no content change, just path testing)
- Emit `reports/T-G_results.xml` showing evidence of stale SHA handling.

### T-H. Validation on Modified File (No State Change)
**Goal**: Ensure validation works correctly on heavily modified file
**Actions**:
- Run `validate_script(level:"standard")` on current state
- Verify no structural errors despite extensive modifications
- **Expected final state**: State H (validation only, no edits)
- Emit `reports/T-H_results.xml` confirming validation OK.

### T-I. Failure Surface Testing (No State Change)
**Goal**: Test error handling on real modified file
**Actions**:
- Attempt overlapping edits (should fail cleanly)
- Attempt edit with stale SHA (should fail cleanly) 
- Verify error responses are informative
- **Expected final state**: State H (failed operations don't modify file)
- Emit `reports/T-I_results.xml` capturing error evidence; file must contain one `<testcase>`.

### T-J. Idempotency on Modified File (Additive State I)
**Goal**: Verify operations behave predictably when repeated
**Actions**:
- **Insert (structured)**: `mcp__unity__script_apply_edits` with:
  `{"op":"anchor_insert","anchor":"// Tail test C","position":"after","text":"\n    // idempotency test marker"}`
- **Insert again** (same op) → expect `no_op: true`.
- **Remove (structured)**: `{"op":"regex_replace","pattern":"(?m)^\\s*// idempotency test marker\\r?\\n?","text":""}`
- **Remove again** (same `regex_replace`) → expect `no_op: true`.
- `mcp__unity__validate_script(level:"standard")`
- Perform a final console scan for errors/exceptions (errors only, up to 3); include "no errors" if none
- **IMMEDIATELY** write clean XML fragment to `reports/T-J_results.xml` with evidence of both `no_op: true` outcomes and the console result. The `<testcase name>` must start with `T-J`.
- **Expected final state**: State H + verified idempotent behavior

---

## Dynamic Targeting Examples

**Instead of hardcoded coordinates:**
```json
{"startLine": 31, "startCol": 26, "endLine": 31, "endCol": 58}
```

**Use content-aware targeting:**
```json
# Find current method location
find_in_file(pattern: "public bool HasTarget\\(\\)")
# Then compute edit ranges from found position
```

**Method targeting by signature:**
```json
{"op": "replace_method", "className": "LongUnityScriptClaudeTest", "methodName": "HasTarget"}
```

**Anchor-based insertions:**
```json  
{"op": "anchor_insert", "anchor": "private void Update\\(\\)", "position": "before", "text": "// comment"}
```

---

## State Verification Patterns

**After each test:**
1. Verify expected content exists: `find_in_file` for key markers
2. Check structural integrity: `validate_script(level:"standard")`  
3. Update SHA tracking for next test's preconditions
4. Emit a per‑test fragment to `reports/<TESTID>_results.xml` immediately. If the test failed, still write a single `<testcase>` with a `<failure message="..."/>` and evidence in `system-out`.
5. Log cumulative changes in test evidence (keep concise per Transcript Minimization Rules; never paste raw tool JSON)

**Error Recovery:**
- If test fails, log current state but continue (don't restore)
- Next test adapts to actual current state, not expected state
- Demonstrates resilience of operations on varied file conditions

---

## Benefits of Additive Design

1. **Realistic Workflows**: Tests mirror actual development patterns
2. **Robust Operations**: Proves edits work on evolving files, not just pristine baselines  
3. **Composability Validation**: Shows operations coordinate well together
4. **Simplified Infrastructure**: No restore scripts or snapshots needed
5. **Better Failure Analysis**: Failures don't cascade - each test adapts to current reality
6. **State Evolution Testing**: Validates SDK handles cumulative file modifications correctly

This additive approach produces a more realistic and maintainable test suite that better represents actual SDK usage patterns.

---

BAN ON EXTRA TOOLS AND DIRS
- Do not use any tools outside `AllowedTools`. Do not create directories; assume `reports/` exists.

---

## XML Fragment Templates (T-F .. T-J)

Use these skeletons verbatim as a starting point. Replace the bracketed placeholders with your evidence. Ensure each file contains exactly one `<testcase>` element and that the `name` begins with the exact test id.

```xml
<testcase name="T-F — Atomic Multi-Edit" classname="UnityMCP.NL-T">
  <system-out><![CDATA[
Applied 3 non-overlapping edits in one atomic call:
- HasTarget(): added "// validated access"
- ApplyBlend(): added "// safe animation"
- End-of-class: added "// end of test modifications"
validate_script: OK
  ]]></system-out>
</testcase>
```

```xml
<testcase name="T-G — Path Normalization Test" classname="UnityMCP.NL-T">
  <system-out><![CDATA[
Edit via unity://path/... succeeded.
Same edit via Assets/... returned stale_file, retried with updated hash: OK.
  ]]></system-out>
</testcase>
```

```xml
<testcase name="T-H — Validation on Modified File" classname="UnityMCP.NL-T">
  <system-out><![CDATA[
validate_script(level:"standard"): OK on the modified file.
  ]]></system-out>
</testcase>
```

```xml
<testcase name="T-I — Failure Surface Testing" classname="UnityMCP.NL-T">
  <system-out><![CDATA[
Overlapping edit: failed cleanly (error captured).
Stale hash edit: failed cleanly (error captured).
File unchanged.
  ]]></system-out>
</testcase>
```

```xml
<testcase name="T-J — Idempotency on Modified File" classname="UnityMCP.NL-T">
  <system-out><![CDATA[
Insert marker after "// Tail test C": OK.
Insert same marker again: no_op: true.
regex_remove marker: OK.
regex_remove again: no_op: true.
validate_script: OK.
  ]]></system-out>
</testcase>

```

--------------------------------------------------------------------------------
/tools/stress_mcp.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import asyncio
import argparse
import json
import os
import struct
import time
from pathlib import Path
import random
import sys


TIMEOUT = float(os.environ.get("MCP_STRESS_TIMEOUT", "2.0"))
DEBUG = os.environ.get("MCP_STRESS_DEBUG", "").lower() in ("1", "true", "yes")


def dlog(*args):
    if DEBUG:
        print(*args, file=sys.stderr)


def find_status_files() -> list[Path]:
    home = Path.home()
    status_dir = Path(os.environ.get(
        "UNITY_MCP_STATUS_DIR", home / ".unity-mcp"))
    if not status_dir.exists():
        return []
    return sorted(status_dir.glob("unity-mcp-status-*.json"), key=lambda p: p.stat().st_mtime, reverse=True)


def discover_port(project_path: str | None) -> int:
    # Default bridge port if nothing found
    default_port = 6400
    files = find_status_files()
    for f in files:
        try:
            data = json.loads(f.read_text())
            port = int(data.get("unity_port", 0) or 0)
            proj = data.get("project_path") or ""
            if project_path:
                # Match status for the given project if possible
                if proj and project_path in proj:
                    if 0 < port < 65536:
                        return port
            else:
                if 0 < port < 65536:
                    return port
        except Exception:
            pass
    return default_port


async def read_exact(reader: asyncio.StreamReader, n: int) -> bytes:
    buf = b""
    while len(buf) < n:
        chunk = await reader.read(n - len(buf))
        if not chunk:
            raise ConnectionError("Connection closed while reading")
        buf += chunk
    return buf


async def read_frame(reader: asyncio.StreamReader) -> bytes:
    header = await read_exact(reader, 8)
    (length,) = struct.unpack(">Q", header)
    if length <= 0 or length > (64 * 1024 * 1024):
        raise ValueError(f"Invalid frame length: {length}")
    return await read_exact(reader, length)


async def write_frame(writer: asyncio.StreamWriter, payload: bytes) -> None:
    header = struct.pack(">Q", len(payload))
    writer.write(header)
    writer.write(payload)
    await asyncio.wait_for(writer.drain(), timeout=TIMEOUT)


async def do_handshake(reader: asyncio.StreamReader) -> None:
    # Server sends a single line handshake: "WELCOME UNITY-MCP 1 FRAMING=1\n"
    line = await reader.readline()
    if not line or b"WELCOME UNITY-MCP" not in line:
        raise ConnectionError(f"Unexpected handshake from server: {line!r}")


def make_ping_frame() -> bytes:
    return b"ping"


def make_execute_menu_item(menu_path: str) -> bytes:
    # Retained for manual debugging; not used in normal stress runs
    payload = {"type": "execute_menu_item", "params": {
        "action": "execute", "menu_path": menu_path}}
    return json.dumps(payload).encode("utf-8")


async def client_loop(idx: int, host: str, port: int, stop_time: float, stats: dict):
    reconnect_delay = 0.2
    while time.time() < stop_time:
        writer = None
        try:
            # slight stagger to prevent burst synchronization across clients
            await asyncio.sleep(0.003 * (idx % 11))
            reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=TIMEOUT)
            await asyncio.wait_for(do_handshake(reader), timeout=TIMEOUT)
            # Send a quick ping first
            await write_frame(writer, make_ping_frame())
            # ignore content
            _ = await asyncio.wait_for(read_frame(reader), timeout=TIMEOUT)

            # Main activity loop (keep-alive + light load). Edit spam handled by reload_churn_task.
            while time.time() < stop_time:
                # Ping-only; edits are sent via reload_churn_task to avoid console spam
                await write_frame(writer, make_ping_frame())
                _ = await asyncio.wait_for(read_frame(reader), timeout=TIMEOUT)
                stats["pings"] += 1
                await asyncio.sleep(0.02 + random.uniform(-0.003, 0.003))

        except (ConnectionError, OSError, asyncio.IncompleteReadError, asyncio.TimeoutError):
            stats["disconnects"] += 1
            dlog(f"[client {idx}] disconnect/backoff {reconnect_delay}s")
            await asyncio.sleep(reconnect_delay)
            reconnect_delay = min(reconnect_delay * 1.5, 2.0)
            continue
        except Exception:
            stats["errors"] += 1
            dlog(f"[client {idx}] unexpected error")
            await asyncio.sleep(0.2)
            continue
        finally:
            if writer is not None:
                try:
                    writer.close()
                    await writer.wait_closed()
                except Exception:
                    pass


async def reload_churn_task(project_path: str, stop_time: float, unity_file: str | None, host: str, port: int, stats: dict, storm_count: int = 1):
    # Use script edit tool to touch a C# file, which triggers compilation reliably
    path = Path(unity_file) if unity_file else None
    seq = 0
    proj_root = Path(project_path).resolve() if project_path else None
    # Build candidate list for storm mode
    candidates: list[Path] = []
    if proj_root:
        try:
            for p in (proj_root / "Assets").rglob("*.cs"):
                candidates.append(p.resolve())
        except Exception:
            candidates = []
    if path and path.exists():
        rp = path.resolve()
        if rp not in candidates:
            candidates.append(rp)
    while time.time() < stop_time:
        try:
            if path and path.exists():
                # Determine files to touch this cycle
                targets: list[Path]
                if storm_count and storm_count > 1 and candidates:
                    k = min(max(1, storm_count), len(candidates))
                    targets = random.sample(candidates, k)
                else:
                    targets = [path]

                for tpath in targets:
                    # Build a tiny ApplyTextEdits request that toggles a trailing comment
                    relative = None
                    try:
                        # Derive Unity-relative path under Assets/ (cross-platform)
                        resolved = tpath.resolve()
                        parts = list(resolved.parts)
                        if "Assets" in parts:
                            i = parts.index("Assets")
                            relative = Path(*parts[i:]).as_posix()
                        elif proj_root and str(resolved).startswith(str(proj_root)):
                            rel = resolved.relative_to(proj_root)
                            parts2 = list(rel.parts)
                            if "Assets" in parts2:
                                i2 = parts2.index("Assets")
                                relative = Path(*parts2[i2:]).as_posix()
                    except Exception:
                        relative = None

                    if relative:
                        # Derive name and directory for ManageScript and compute precondition SHA + EOF position
                        name_base = Path(relative).stem
                        dir_path = str(
                            Path(relative).parent).replace('\\', '/')

                        # 1) Read current contents via manage_script.read to compute SHA and true EOF location
                        contents = None
                        read_success = False
                        for attempt in range(3):
                            writer = None
                            try:
                                reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=TIMEOUT)
                                await asyncio.wait_for(do_handshake(reader), timeout=TIMEOUT)
                                read_payload = {
                                    "type": "manage_script",
                                    "params": {
                                        "action": "read",
                                        "name": name_base,
                                        "path": dir_path
                                    }
                                }
                                await write_frame(writer, json.dumps(read_payload).encode("utf-8"))
                                resp = await asyncio.wait_for(read_frame(reader), timeout=TIMEOUT)

                                read_obj = json.loads(
                                    resp.decode("utf-8", errors="ignore"))
                                result = read_obj.get("result", read_obj) if isinstance(
                                    read_obj, dict) else {}
                                if result.get("success"):
                                    data_obj = result.get("data", {})
                                    contents = data_obj.get("contents") or ""
                                    read_success = True
                                    break
                            except Exception:
                                # retry with backoff
                                await asyncio.sleep(0.2 * (2 ** attempt) + random.uniform(0.0, 0.1))
                            finally:
                                if 'writer' in locals() and writer is not None:
                                    try:
                                        writer.close()
                                        await writer.wait_closed()
                                    except Exception:
                                        pass

                        if not read_success or contents is None:
                            stats["apply_errors"] = stats.get(
                                "apply_errors", 0) + 1
                            await asyncio.sleep(0.5)
                            continue

                        # Compute SHA and EOF insertion point
                        import hashlib
                        sha = hashlib.sha256(
                            contents.encode("utf-8")).hexdigest()
                        lines = contents.splitlines(keepends=True)
                        # Insert at true EOF (safe against header guards)
                        end_line = len(lines) + 1  # 1-based exclusive end
                        end_col = 1

                        # Build a unique marker append; ensure it begins with a newline if needed
                        marker = f"// MCP_STRESS seq={seq} time={int(time.time())}"
                        seq += 1
                        insert_text = ("\n" if not contents.endswith(
                            "\n") else "") + marker + "\n"

                        # 2) Apply text edits with immediate refresh and precondition
                        apply_payload = {
                            "type": "manage_script",
                            "params": {
                                "action": "apply_text_edits",
                                "name": name_base,
                                "path": dir_path,
                                "edits": [
                                    {
                                        "startLine": end_line,
                                        "startCol": end_col,
                                        "endLine": end_line,
                                        "endCol": end_col,
                                        "newText": insert_text
                                    }
                                ],
                                "precondition_sha256": sha,
                                "options": {"refresh": "immediate", "validate": "standard"}
                            }
                        }

                        apply_success = False
                        for attempt in range(3):
                            writer = None
                            try:
                                reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=TIMEOUT)
                                await asyncio.wait_for(do_handshake(reader), timeout=TIMEOUT)
                                await write_frame(writer, json.dumps(apply_payload).encode("utf-8"))
                                resp = await asyncio.wait_for(read_frame(reader), timeout=TIMEOUT)
                                try:
                                    data = json.loads(resp.decode(
                                        "utf-8", errors="ignore"))
                                    result = data.get("result", data) if isinstance(
                                        data, dict) else {}
                                    ok = bool(result.get("success", False))
                                    if ok:
                                        stats["applies"] = stats.get(
                                            "applies", 0) + 1
                                        apply_success = True
                                        break
                                except Exception:
                                    # fall through to retry
                                    pass
                            except Exception:
                                # retry with backoff
                                await asyncio.sleep(0.2 * (2 ** attempt) + random.uniform(0.0, 0.1))
                            finally:
                                if 'writer' in locals() and writer is not None:
                                    try:
                                        writer.close()
                                        await writer.wait_closed()
                                    except Exception:
                                        pass
                        if not apply_success:
                            stats["apply_errors"] = stats.get(
                                "apply_errors", 0) + 1

        except Exception:
            pass
        await asyncio.sleep(1.0)


async def main():
    ap = argparse.ArgumentParser(
        description="Stress test MCP for Unity with concurrent clients and reload churn")
    ap.add_argument("--host", default="127.0.0.1")
    ap.add_argument("--project", default=str(
        Path(__file__).resolve().parents[1] / "TestProjects" / "UnityMCPTests"))
    ap.add_argument("--unity-file", default=str(Path(__file__).resolve(
    ).parents[1] / "TestProjects" / "UnityMCPTests" / "Assets" / "Scripts" / "LongUnityScriptClaudeTest.cs"))
    ap.add_argument("--clients", type=int, default=10)
    ap.add_argument("--duration", type=int, default=60)
    ap.add_argument("--storm-count", type=int, default=1,
                    help="Number of scripts to touch each cycle")
    args = ap.parse_args()

    port = discover_port(args.project)
    stop_time = time.time() + max(10, args.duration)

    stats = {"pings": 0, "menus": 0, "mods": 0, "disconnects": 0, "errors": 0}
    tasks = []

    # Spawn clients
    for i in range(max(1, args.clients)):
        tasks.append(asyncio.create_task(
            client_loop(i, args.host, port, stop_time, stats)))

    # Spawn reload churn task
    tasks.append(asyncio.create_task(reload_churn_task(args.project, stop_time,
                 args.unity_file, args.host, port, stats, storm_count=args.storm_count)))

    await asyncio.gather(*tasks, return_exceptions=True)
    print(json.dumps({"port": port, "stats": stats}, indent=2))


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

```

--------------------------------------------------------------------------------
/TestProjects/UnityMCPTests/Assets/Tests/EditMode/Tools/ManageGameObjectTests.cs:
--------------------------------------------------------------------------------

```csharp
using System;
using System.Collections.Generic;
using NUnit.Framework;
using UnityEngine;
using UnityEditor;
using UnityEngine.TestTools;
using Newtonsoft.Json.Linq;
using MCPForUnity.Editor.Tools;

namespace MCPForUnityTests.Editor.Tools
{
    public class ManageGameObjectTests
    {
        private GameObject testGameObject;

        [SetUp]
        public void SetUp()
        {
            // Create a test GameObject for each test
            testGameObject = new GameObject("TestObject");
        }

        [TearDown]
        public void TearDown()
        {
            // Clean up test GameObject
            if (testGameObject != null)
            {
                UnityEngine.Object.DestroyImmediate(testGameObject);
            }
        }

        [Test]
        public void HandleCommand_ReturnsError_ForNullParams()
        {
            var result = ManageGameObject.HandleCommand(null);

            Assert.IsNotNull(result, "Should return a result object");
            // Note: Actual error checking would need access to Response structure
        }

        [Test]
        public void HandleCommand_ReturnsError_ForEmptyParams()
        {
            var emptyParams = new JObject();
            var result = ManageGameObject.HandleCommand(emptyParams);

            Assert.IsNotNull(result, "Should return a result object for empty params");
        }

        [Test]
        public void HandleCommand_ProcessesValidCreateAction()
        {
            var createParams = new JObject
            {
                ["action"] = "create",
                ["name"] = "TestCreateObject"
            };

            var result = ManageGameObject.HandleCommand(createParams);

            Assert.IsNotNull(result, "Should return a result for valid create action");

            // Clean up - find and destroy the created object
            var createdObject = GameObject.Find("TestCreateObject");
            if (createdObject != null)
            {
                UnityEngine.Object.DestroyImmediate(createdObject);
            }
        }

        [Test]
        public void ComponentResolver_Integration_WorksWithRealComponents()
        {
            // Test that our ComponentResolver works with actual Unity components
            var transformResult = ComponentResolver.TryResolve("Transform", out Type transformType, out string error);

            Assert.IsTrue(transformResult, "Should resolve Transform component");
            Assert.AreEqual(typeof(Transform), transformType, "Should return correct Transform type");
            Assert.IsEmpty(error, "Should have no error for valid component");
        }

        [Test]
        public void ComponentResolver_Integration_WorksWithBuiltInComponents()
        {
            var components = new[]
            {
                ("Rigidbody", typeof(Rigidbody)),
                ("Collider", typeof(Collider)),
                ("Renderer", typeof(Renderer)),
                ("Camera", typeof(Camera)),
                ("Light", typeof(Light))
            };

            foreach (var (componentName, expectedType) in components)
            {
                var result = ComponentResolver.TryResolve(componentName, out Type actualType, out string error);

                // Some components might not resolve (abstract classes), but the method should handle gracefully
                if (result)
                {
                    Assert.IsTrue(expectedType.IsAssignableFrom(actualType),
                        $"{componentName} should resolve to assignable type");
                }
                else
                {
                    Assert.IsNotEmpty(error, $"Should have error message for {componentName}");
                }
            }
        }

        [Test]
        public void PropertyMatching_Integration_WorksWithRealGameObject()
        {
            // Add a Rigidbody to test real property matching
            var rigidbody = testGameObject.AddComponent<Rigidbody>();

            var properties = ComponentResolver.GetAllComponentProperties(typeof(Rigidbody));

            Assert.IsNotEmpty(properties, "Rigidbody should have properties");
            Assert.Contains("mass", properties, "Rigidbody should have mass property");
            Assert.Contains("useGravity", properties, "Rigidbody should have useGravity property");

            // Test AI suggestions
            var suggestions = ComponentResolver.GetAIPropertySuggestions("Use Gravity", properties);
            Assert.Contains("useGravity", suggestions, "Should suggest useGravity for 'Use Gravity'");
        }

        [Test]
        public void PropertyMatching_HandlesMonoBehaviourProperties()
        {
            var properties = ComponentResolver.GetAllComponentProperties(typeof(MonoBehaviour));

            Assert.IsNotEmpty(properties, "MonoBehaviour should have properties");
            Assert.Contains("enabled", properties, "MonoBehaviour should have enabled property");
            Assert.Contains("name", properties, "MonoBehaviour should have name property");
            Assert.Contains("tag", properties, "MonoBehaviour should have tag property");
        }

        [Test]
        public void PropertyMatching_HandlesCaseVariations()
        {
            var testProperties = new List<string> { "maxReachDistance", "playerHealth", "movementSpeed" };

            var testCases = new[]
            {
                ("max reach distance", "maxReachDistance"),
                ("Max Reach Distance", "maxReachDistance"),
                ("MAX_REACH_DISTANCE", "maxReachDistance"),
                ("player health", "playerHealth"),
                ("movement speed", "movementSpeed")
            };

            foreach (var (input, expected) in testCases)
            {
                var suggestions = ComponentResolver.GetAIPropertySuggestions(input, testProperties);
                Assert.Contains(expected, suggestions, $"Should suggest {expected} for input '{input}'");
            }
        }

        [Test]
        public void ErrorHandling_ReturnsHelpfulMessages()
        {
            // This test verifies that error messages are helpful and contain suggestions
            var testProperties = new List<string> { "mass", "velocity", "drag", "useGravity" };
            var suggestions = ComponentResolver.GetAIPropertySuggestions("weight", testProperties);

            // Even if no perfect match, should return valid list
            Assert.IsNotNull(suggestions, "Should return valid suggestions list");

            // Test with completely invalid input
            var badSuggestions = ComponentResolver.GetAIPropertySuggestions("xyz123invalid", testProperties);
            Assert.IsNotNull(badSuggestions, "Should handle invalid input gracefully");
        }

        [Test]
        public void PerformanceTest_CachingWorks()
        {
            var properties = ComponentResolver.GetAllComponentProperties(typeof(Transform));
            var input = "Test Property Name";

            // First call - populate cache
            var startTime = System.DateTime.UtcNow;
            var suggestions1 = ComponentResolver.GetAIPropertySuggestions(input, properties);
            var firstCallTime = (System.DateTime.UtcNow - startTime).TotalMilliseconds;

            // Second call - should use cache
            startTime = System.DateTime.UtcNow;
            var suggestions2 = ComponentResolver.GetAIPropertySuggestions(input, properties);
            var secondCallTime = (System.DateTime.UtcNow - startTime).TotalMilliseconds;

            Assert.AreEqual(suggestions1.Count, suggestions2.Count, "Cached results should be identical");
            CollectionAssert.AreEqual(suggestions1, suggestions2, "Cached results should match exactly");

            // Second call should be faster (though this test might be flaky)
            Assert.LessOrEqual(secondCallTime, firstCallTime * 2, "Cached call should not be significantly slower");
        }

        [Test]
        public void SetComponentProperties_CollectsAllFailuresAndAppliesValidOnes()
        {
            // Arrange - add Transform and Rigidbody components to test with
            var transform = testGameObject.transform;
            var rigidbody = testGameObject.AddComponent<Rigidbody>();

            // Create a params object with mixed valid and invalid properties
            var setPropertiesParams = new JObject
            {
                ["action"] = "modify",
                ["target"] = testGameObject.name,
                ["search_method"] = "by_name",
                ["componentProperties"] = new JObject
                {
                    ["Transform"] = new JObject
                    {
                        ["localPosition"] = new JObject { ["x"] = 1.0f, ["y"] = 2.0f, ["z"] = 3.0f },  // Valid
                        ["rotatoin"] = new JObject { ["x"] = 0.0f, ["y"] = 90.0f, ["z"] = 0.0f }, // Invalid (typo - should be rotation)
                        ["localScale"] = new JObject { ["x"] = 2.0f, ["y"] = 2.0f, ["z"] = 2.0f }      // Valid
                    },
                    ["Rigidbody"] = new JObject
                    {
                        ["mass"] = 5.0f,            // Valid
                        ["invalidProp"] = "test",   // Invalid - doesn't exist
                        ["useGravity"] = true       // Valid
                    }
                }
            };

            // Store original values to verify changes  
            var originalLocalPosition = transform.localPosition;
            var originalLocalScale = transform.localScale;
            var originalMass = rigidbody.mass;
            var originalUseGravity = rigidbody.useGravity;

            Debug.Log($"BEFORE TEST - Mass: {rigidbody.mass}, UseGravity: {rigidbody.useGravity}");

            // Expect the warning logs from the invalid properties
            LogAssert.Expect(LogType.Warning, new System.Text.RegularExpressions.Regex("Property 'rotatoin' not found"));
            LogAssert.Expect(LogType.Warning, new System.Text.RegularExpressions.Regex("Property 'invalidProp' not found"));

            // Act
            var result = ManageGameObject.HandleCommand(setPropertiesParams);

            Debug.Log($"AFTER TEST - Mass: {rigidbody.mass}, UseGravity: {rigidbody.useGravity}");
            Debug.Log($"AFTER TEST - LocalPosition: {transform.localPosition}");
            Debug.Log($"AFTER TEST - LocalScale: {transform.localScale}");

            // Assert - verify that valid properties were set despite invalid ones
            Assert.AreEqual(new Vector3(1.0f, 2.0f, 3.0f), transform.localPosition,
                "Valid localPosition should be set even with other invalid properties");
            Assert.AreEqual(new Vector3(2.0f, 2.0f, 2.0f), transform.localScale,
                "Valid localScale should be set even with other invalid properties");
            Assert.AreEqual(5.0f, rigidbody.mass, 0.001f,
                "Valid mass should be set even with other invalid properties");
            Assert.AreEqual(true, rigidbody.useGravity,
                "Valid useGravity should be set even with other invalid properties");

            // Verify the result indicates errors (since we had invalid properties)
            Assert.IsNotNull(result, "Should return a result object");

            // The collect-and-continue behavior means we should get an error response 
            // that contains info about the failed properties, but valid ones were still applied
            // This proves the collect-and-continue behavior is working

            // Harden: verify structured error response with failures list contains both invalid fields
            var successProp = result.GetType().GetProperty("success");
            Assert.IsNotNull(successProp, "Result should expose 'success' property");
            Assert.IsFalse((bool)successProp.GetValue(result), "Result.success should be false for partial failure");

            var dataProp = result.GetType().GetProperty("data");
            Assert.IsNotNull(dataProp, "Result should include 'data' with errors");
            var dataVal = dataProp.GetValue(result);
            Assert.IsNotNull(dataVal, "Result.data should not be null");
            var errorsProp = dataVal.GetType().GetProperty("errors");
            Assert.IsNotNull(errorsProp, "Result.data should include 'errors' list");
            var errorsEnum = errorsProp.GetValue(dataVal) as System.Collections.IEnumerable;
            Assert.IsNotNull(errorsEnum, "errors should be enumerable");

            bool foundRotatoin = false;
            bool foundInvalidProp = false;
            foreach (var err in errorsEnum)
            {
                string s = err?.ToString() ?? string.Empty;
                if (s.Contains("rotatoin")) foundRotatoin = true;
                if (s.Contains("invalidProp")) foundInvalidProp = true;
            }
            Assert.IsTrue(foundRotatoin, "errors should mention the misspelled 'rotatoin' property");
            Assert.IsTrue(foundInvalidProp, "errors should mention the 'invalidProp' property");
        }

        [Test]
        public void SetComponentProperties_ContinuesAfterException()
        {
            // Arrange - create scenario that might cause exceptions
            var rigidbody = testGameObject.AddComponent<Rigidbody>();

            // Set initial values that we'll change
            rigidbody.mass = 1.0f;
            rigidbody.useGravity = true;

            var setPropertiesParams = new JObject
            {
                ["action"] = "modify",
                ["target"] = testGameObject.name,
                ["search_method"] = "by_name",
                ["componentProperties"] = new JObject
                {
                    ["Rigidbody"] = new JObject
                    {
                        ["mass"] = 2.5f,                    // Valid - should be set
                        ["velocity"] = "invalid_type",      // Invalid type - will cause exception  
                        ["useGravity"] = false              // Valid - should still be set after exception
                    }
                }
            };

            // Expect the error logs from the invalid property
            LogAssert.Expect(LogType.Error, new System.Text.RegularExpressions.Regex("Unexpected error converting token to UnityEngine.Vector3"));
            LogAssert.Expect(LogType.Error, new System.Text.RegularExpressions.Regex("SetProperty.*Failed to set 'velocity'"));
            LogAssert.Expect(LogType.Warning, new System.Text.RegularExpressions.Regex("Property 'velocity' not found"));

            // Act
            var result = ManageGameObject.HandleCommand(setPropertiesParams);

            // Assert - verify that valid properties before AND after the exception were still set
            Assert.AreEqual(2.5f, rigidbody.mass, 0.001f,
                "Mass should be set even if later property causes exception");
            Assert.AreEqual(false, rigidbody.useGravity,
                "UseGravity should be set even if previous property caused exception");

            Assert.IsNotNull(result, "Should return a result even with exceptions");

            // The key test: processing continued after the exception and set useGravity
            // This proves the collect-and-continue behavior works even with exceptions

            // Harden: verify structured error response contains velocity failure
            var successProp2 = result.GetType().GetProperty("success");
            Assert.IsNotNull(successProp2, "Result should expose 'success' property");
            Assert.IsFalse((bool)successProp2.GetValue(result), "Result.success should be false when an exception occurs for a property");

            var dataProp2 = result.GetType().GetProperty("data");
            Assert.IsNotNull(dataProp2, "Result should include 'data' with errors");
            var dataVal2 = dataProp2.GetValue(result);
            Assert.IsNotNull(dataVal2, "Result.data should not be null");
            var errorsProp2 = dataVal2.GetType().GetProperty("errors");
            Assert.IsNotNull(errorsProp2, "Result.data should include 'errors' list");
            var errorsEnum2 = errorsProp2.GetValue(dataVal2) as System.Collections.IEnumerable;
            Assert.IsNotNull(errorsEnum2, "errors should be enumerable");

            bool foundVelocityError = false;
            foreach (var err in errorsEnum2)
            {
                string s = err?.ToString() ?? string.Empty;
                if (s.Contains("velocity")) { foundVelocityError = true; break; }
            }
            Assert.IsTrue(foundVelocityError, "errors should include a message referencing 'velocity'");
        }
    }
}

```

--------------------------------------------------------------------------------
/UnityMcpBridge/UnityMcpServer~/src/telemetry.py:
--------------------------------------------------------------------------------

```python
"""
Privacy-focused, anonymous telemetry system for Unity MCP
Inspired by Onyx's telemetry implementation with Unity-specific adaptations

Fire-and-forget telemetry sender with a single background worker.
- No context/thread-local propagation to avoid re-entrancy into tool resolution.
- Small network timeouts to prevent stalls.
"""

import contextlib
from dataclasses import dataclass
from enum import Enum
import importlib
import json
import logging
import os
from pathlib import Path
import platform
import queue
import sys
import threading
import time
from typing import Optional, Dict, Any
from urllib.parse import urlparse
import uuid

import tomli

try:
    import httpx
    HAS_HTTPX = True
except ImportError:
    httpx = None  # type: ignore
    HAS_HTTPX = False


def get_package_version() -> str:
    """
    Open pyproject.toml and parse version
    We use the tomli library instead of tomllib to support Python 3.10
    """
    with open("pyproject.toml", "rb") as f:
        data = tomli.load(f)
    return data["project"]["version"]


MCP_VERSION = get_package_version()

logger = logging.getLogger("unity-mcp-telemetry")


class RecordType(str, Enum):
    """Types of telemetry records we collect"""
    VERSION = "version"
    STARTUP = "startup"
    USAGE = "usage"
    LATENCY = "latency"
    FAILURE = "failure"
    TOOL_EXECUTION = "tool_execution"
    UNITY_CONNECTION = "unity_connection"
    CLIENT_CONNECTION = "client_connection"


class MilestoneType(str, Enum):
    """Major user journey milestones"""
    FIRST_STARTUP = "first_startup"
    FIRST_TOOL_USAGE = "first_tool_usage"
    FIRST_SCRIPT_CREATION = "first_script_creation"
    FIRST_SCENE_MODIFICATION = "first_scene_modification"
    MULTIPLE_SESSIONS = "multiple_sessions"
    DAILY_ACTIVE_USER = "daily_active_user"
    WEEKLY_ACTIVE_USER = "weekly_active_user"


@dataclass
class TelemetryRecord:
    """Structure for telemetry data"""
    record_type: RecordType
    timestamp: float
    customer_uuid: str
    session_id: str
    data: Dict[str, Any]
    milestone: Optional[MilestoneType] = None


class TelemetryConfig:
    """Telemetry configuration"""

    def __init__(self):
        # Prefer config file, then allow env overrides
        server_config = None
        for modname in (
            "UnityMcpBridge.UnityMcpServer~.src.config",
            "UnityMcpBridge.UnityMcpServer.src.config",
            "src.config",
            "config",
        ):
            try:
                mod = importlib.import_module(modname)
                server_config = getattr(mod, "config", None)
                if server_config is not None:
                    break
            except Exception:
                continue

        # Determine enabled flag: config -> env DISABLE_* opt-out
        cfg_enabled = True if server_config is None else bool(
            getattr(server_config, "telemetry_enabled", True))
        self.enabled = cfg_enabled and not self._is_disabled()

        # Telemetry endpoint (Cloud Run default; override via env)
        cfg_default = None if server_config is None else getattr(
            server_config, "telemetry_endpoint", None)
        default_ep = cfg_default or "https://api-prod.coplay.dev/telemetry/events"
        self.default_endpoint = default_ep
        self.endpoint = self._validated_endpoint(
            os.environ.get("UNITY_MCP_TELEMETRY_ENDPOINT", default_ep),
            default_ep,
        )
        try:
            logger.info(
                "Telemetry configured: endpoint=%s (default=%s), timeout_env=%s",
                self.endpoint,
                default_ep,
                os.environ.get("UNITY_MCP_TELEMETRY_TIMEOUT") or "<unset>"
            )
        except Exception:
            pass

        # Local storage for UUID and milestones
        self.data_dir = self._get_data_directory()
        self.uuid_file = self.data_dir / "customer_uuid.txt"
        self.milestones_file = self.data_dir / "milestones.json"

        # Request timeout (small, fail fast). Override with UNITY_MCP_TELEMETRY_TIMEOUT
        try:
            self.timeout = float(os.environ.get(
                "UNITY_MCP_TELEMETRY_TIMEOUT", "1.5"))
        except Exception:
            self.timeout = 1.5
        try:
            logger.info("Telemetry timeout=%.2fs", self.timeout)
        except Exception:
            pass

        # Session tracking
        self.session_id = str(uuid.uuid4())

    def _is_disabled(self) -> bool:
        """Check if telemetry is disabled via environment variables"""
        disable_vars = [
            "DISABLE_TELEMETRY",
            "UNITY_MCP_DISABLE_TELEMETRY",
            "MCP_DISABLE_TELEMETRY"
        ]

        for var in disable_vars:
            if os.environ.get(var, "").lower() in ("true", "1", "yes", "on"):
                return True
        return False

    def _get_data_directory(self) -> Path:
        """Get directory for storing telemetry data"""
        if os.name == 'nt':  # Windows
            base_dir = Path(os.environ.get(
                'APPDATA', Path.home() / 'AppData' / 'Roaming'))
        elif os.name == 'posix':  # macOS/Linux
            if 'darwin' in os.uname().sysname.lower():  # macOS
                base_dir = Path.home() / 'Library' / 'Application Support'
            else:  # Linux
                base_dir = Path(os.environ.get('XDG_DATA_HOME',
                                Path.home() / '.local' / 'share'))
        else:
            base_dir = Path.home() / '.unity-mcp'

        data_dir = base_dir / 'UnityMCP'
        data_dir.mkdir(parents=True, exist_ok=True)
        return data_dir

    def _validated_endpoint(self, candidate: str, fallback: str) -> str:
        """Validate telemetry endpoint URL scheme; allow only http/https.
        Falls back to the provided default on error.
        """
        try:
            parsed = urlparse(candidate)
            if parsed.scheme not in ("https", "http"):
                raise ValueError(f"Unsupported scheme: {parsed.scheme}")
            # Basic sanity: require network location and path
            if not parsed.netloc:
                raise ValueError("Missing netloc in endpoint")
            # Reject localhost/loopback endpoints in production to avoid accidental local overrides
            host = parsed.hostname or ""
            if host in ("localhost", "127.0.0.1", "::1"):
                raise ValueError(
                    "Localhost endpoints are not allowed for telemetry")
            return candidate
        except Exception as e:
            logger.debug(
                f"Invalid telemetry endpoint '{candidate}', using default. Error: {e}",
                exc_info=True,
            )
            return fallback


class TelemetryCollector:
    """Main telemetry collection class"""

    def __init__(self):
        self.config = TelemetryConfig()
        self._customer_uuid: Optional[str] = None
        self._milestones: Dict[str, Dict[str, Any]] = {}
        self._lock: threading.Lock = threading.Lock()
        # Bounded queue with single background worker (records only; no context propagation)
        self._queue: "queue.Queue[TelemetryRecord]" = queue.Queue(maxsize=1000)
        # Load persistent data before starting worker so first events have UUID
        self._load_persistent_data()
        self._worker: threading.Thread = threading.Thread(
            target=self._worker_loop, daemon=True)
        self._worker.start()

    def _load_persistent_data(self):
        """Load UUID and milestones from disk"""
        # Load customer UUID
        try:
            if self.config.uuid_file.exists():
                self._customer_uuid = self.config.uuid_file.read_text(
                    encoding="utf-8").strip() or str(uuid.uuid4())
            else:
                self._customer_uuid = str(uuid.uuid4())
                try:
                    self.config.uuid_file.write_text(
                        self._customer_uuid, encoding="utf-8")
                    if os.name == "posix":
                        os.chmod(self.config.uuid_file, 0o600)
                except OSError as e:
                    logger.debug(
                        f"Failed to persist customer UUID: {e}", exc_info=True)
        except OSError as e:
            logger.debug(f"Failed to load customer UUID: {e}", exc_info=True)
            self._customer_uuid = str(uuid.uuid4())

        # Load milestones (failure here must not affect UUID)
        try:
            if self.config.milestones_file.exists():
                content = self.config.milestones_file.read_text(
                    encoding="utf-8")
                self._milestones = json.loads(content) or {}
                if not isinstance(self._milestones, dict):
                    self._milestones = {}
        except (OSError, json.JSONDecodeError, ValueError) as e:
            logger.debug(f"Failed to load milestones: {e}", exc_info=True)
            self._milestones = {}

    def _save_milestones(self):
        """Save milestones to disk. Caller must hold self._lock."""
        try:
            self.config.milestones_file.write_text(
                json.dumps(self._milestones, indent=2),
                encoding="utf-8",
            )
        except OSError as e:
            logger.warning(f"Failed to save milestones: {e}", exc_info=True)

    def record_milestone(self, milestone: MilestoneType, data: Optional[Dict[str, Any]] = None) -> bool:
        """Record a milestone event, returns True if this is the first occurrence"""
        if not self.config.enabled:
            return False
        milestone_key = milestone.value
        with self._lock:
            if milestone_key in self._milestones:
                return False  # Already recorded
            milestone_data = {
                "timestamp": time.time(),
                "data": data or {},
            }
            self._milestones[milestone_key] = milestone_data
            self._save_milestones()

        # Also send as telemetry record
        self.record(
            record_type=RecordType.USAGE,
            data={"milestone": milestone_key, **(data or {})},
            milestone=milestone
        )

        return True

    def record(self,
               record_type: RecordType,
               data: Dict[str, Any],
               milestone: Optional[MilestoneType] = None):
        """Record a telemetry event (async, non-blocking)"""
        if not self.config.enabled:
            return

        # Allow fallback sender when httpx is unavailable (no early return)

        record = TelemetryRecord(
            record_type=record_type,
            timestamp=time.time(),
            customer_uuid=self._customer_uuid or "unknown",
            session_id=self.config.session_id,
            data=data,
            milestone=milestone
        )
        # Enqueue for background worker (non-blocking). Drop on backpressure.
        try:
            self._queue.put_nowait(record)
        except queue.Full:
            logger.debug("Telemetry queue full; dropping %s",
                         record.record_type)

    def _worker_loop(self):
        """Background worker that serializes telemetry sends."""
        while True:
            rec = self._queue.get()
            try:
                # Run sender directly; do not reuse caller context/thread-locals
                self._send_telemetry(rec)
            except Exception:
                logger.debug("Telemetry worker send failed", exc_info=True)
            finally:
                with contextlib.suppress(Exception):
                    self._queue.task_done()

    def _send_telemetry(self, record: TelemetryRecord):
        """Send telemetry data to endpoint"""
        try:
            # System fingerprint (top-level remains concise; details stored in data JSON)
            _platform = platform.system()          # 'Darwin' | 'Linux' | 'Windows'
            _source = sys.platform                 # 'darwin' | 'linux' | 'win32'
            _platform_detail = f"{_platform} {platform.release()} ({platform.machine()})"
            _python_version = platform.python_version()

            # Enrich data JSON so BigQuery stores detailed fields without schema change
            enriched_data = dict(record.data or {})
            enriched_data.setdefault("platform_detail", _platform_detail)
            enriched_data.setdefault("python_version", _python_version)

            payload = {
                "record": record.record_type.value,
                "timestamp": record.timestamp,
                "customer_uuid": record.customer_uuid,
                "session_id": record.session_id,
                "data": enriched_data,
                "version": MCP_VERSION,
                "platform": _platform,
                "source": _source,
            }

            if record.milestone:
                payload["milestone"] = record.milestone.value

            # Prefer httpx when available; otherwise fall back to urllib
            if httpx:
                with httpx.Client(timeout=self.config.timeout) as client:
                    # Re-validate endpoint at send time to handle dynamic changes
                    endpoint = self.config._validated_endpoint(
                        self.config.endpoint, self.config.default_endpoint)
                    response = client.post(endpoint, json=payload)
                    if 200 <= response.status_code < 300:
                        logger.debug(f"Telemetry sent: {record.record_type}")
                    else:
                        logger.warning(
                            f"Telemetry failed: HTTP {response.status_code}")
            else:
                import urllib.request
                import urllib.error
                data_bytes = json.dumps(payload).encode("utf-8")
                endpoint = self.config._validated_endpoint(
                    self.config.endpoint, self.config.default_endpoint)
                req = urllib.request.Request(
                    endpoint,
                    data=data_bytes,
                    headers={"Content-Type": "application/json"},
                    method="POST",
                )
                try:
                    with urllib.request.urlopen(req, timeout=self.config.timeout) as resp:
                        if 200 <= resp.getcode() < 300:
                            logger.debug(
                                f"Telemetry sent (urllib): {record.record_type}")
                        else:
                            logger.warning(
                                f"Telemetry failed (urllib): HTTP {resp.getcode()}")
                except urllib.error.URLError as ue:
                    logger.warning(f"Telemetry send failed (urllib): {ue}")

        except Exception as e:
            # Never let telemetry errors interfere with app functionality
            logger.debug(f"Telemetry send failed: {e}")


# Global telemetry instance
_telemetry_collector: Optional[TelemetryCollector] = None


def get_telemetry() -> TelemetryCollector:
    """Get the global telemetry collector instance"""
    global _telemetry_collector
    if _telemetry_collector is None:
        _telemetry_collector = TelemetryCollector()
    return _telemetry_collector


def record_telemetry(record_type: RecordType,
                     data: Dict[str, Any],
                     milestone: Optional[MilestoneType] = None):
    """Convenience function to record telemetry"""
    get_telemetry().record(record_type, data, milestone)


def record_milestone(milestone: MilestoneType, data: Optional[Dict[str, Any]] = None) -> bool:
    """Convenience function to record a milestone"""
    return get_telemetry().record_milestone(milestone, data)


def record_tool_usage(tool_name: str, success: bool, duration_ms: float, error: Optional[str] = None, sub_action: Optional[str] = None):
    """Record tool usage telemetry

    Args:
        tool_name: Name of the tool invoked (e.g., 'manage_scene').
        success: Whether the tool completed successfully.
        duration_ms: Execution duration in milliseconds.
        error: Optional error message (truncated if present).
        sub_action: Optional sub-action/operation within the tool (e.g., 'get_hierarchy').
    """
    data = {
        "tool_name": tool_name,
        "success": success,
        "duration_ms": round(duration_ms, 2)
    }

    if sub_action is not None:
        try:
            data["sub_action"] = str(sub_action)
        except Exception:
            # Ensure telemetry is never disruptive
            data["sub_action"] = "unknown"

    if error:
        data["error"] = str(error)[:200]  # Limit error message length

    record_telemetry(RecordType.TOOL_EXECUTION, data)


def record_latency(operation: str, duration_ms: float, metadata: Optional[Dict[str, Any]] = None):
    """Record latency telemetry"""
    data = {
        "operation": operation,
        "duration_ms": round(duration_ms, 2)
    }

    if metadata:
        data.update(metadata)

    record_telemetry(RecordType.LATENCY, data)


def record_failure(component: str, error: str, metadata: Optional[Dict[str, Any]] = None):
    """Record failure telemetry"""
    data = {
        "component": component,
        "error": str(error)[:500]  # Limit error message length
    }

    if metadata:
        data.update(metadata)

    record_telemetry(RecordType.FAILURE, data)


def is_telemetry_enabled() -> bool:
    """Check if telemetry is enabled"""
    return get_telemetry().config.enabled

```

--------------------------------------------------------------------------------
/UnityMcpBridge/UnityMcpServer~/src/tools/resource_tools.py:
--------------------------------------------------------------------------------

```python
"""
Resource wrapper tools so clients that do not expose MCP resources primitives
can still list and read files via normal tools. These call into the same
safe path logic (re-implemented here to avoid importing server.py).
"""
import fnmatch
import hashlib
import os
from pathlib import Path
import re
from typing import Annotated, Any
from urllib.parse import urlparse, unquote

from mcp.server.fastmcp import Context

from registry import mcp_for_unity_tool
from unity_connection import send_command_with_retry


def _coerce_int(value: Any, default: int | None = None, minimum: int | None = None) -> int | None:
    """Safely coerce various inputs (str/float/etc.) to an int.
    Returns default on failure; clamps to minimum when provided.
    """
    if value is None:
        return default
    try:
        # Avoid treating booleans as ints implicitly
        if isinstance(value, bool):
            return default
        if isinstance(value, int):
            result = int(value)
        else:
            s = str(value).strip()
            if s.lower() in ("", "none", "null"):
                return default
            # Allow "10.0" or similar inputs
            result = int(float(s))
        if minimum is not None and result < minimum:
            return minimum
        return result
    except Exception:
        return default


def _resolve_project_root(override: str | None) -> Path:
    # 1) Explicit override
    if override:
        pr = Path(override).expanduser().resolve()
        if (pr / "Assets").exists():
            return pr
    # 2) Environment
    env = os.environ.get("UNITY_PROJECT_ROOT")
    if env:
        env_path = Path(env).expanduser()
        # If UNITY_PROJECT_ROOT is relative, resolve against repo root (cwd's repo) instead of src dir
        pr = (Path.cwd(
        ) / env_path).resolve() if not env_path.is_absolute() else env_path.resolve()
        if (pr / "Assets").exists():
            return pr
    # 3) Ask Unity via manage_editor.get_project_root
    try:
        resp = send_command_with_retry(
            "manage_editor", {"action": "get_project_root"})
        if isinstance(resp, dict) and resp.get("success"):
            pr = Path(resp.get("data", {}).get(
                "projectRoot", "")).expanduser().resolve()
            if pr and (pr / "Assets").exists():
                return pr
    except Exception:
        pass

    # 4) Walk up from CWD to find a Unity project (Assets + ProjectSettings)
    cur = Path.cwd().resolve()
    for _ in range(6):
        if (cur / "Assets").exists() and (cur / "ProjectSettings").exists():
            return cur
        if cur.parent == cur:
            break
        cur = cur.parent
    # 5) Search downwards (shallow) from repo root for first folder with Assets + ProjectSettings
    try:
        import os as _os
        root = Path.cwd().resolve()
        max_depth = 3
        for dirpath, dirnames, _ in _os.walk(root):
            rel = Path(dirpath).resolve()
            try:
                depth = len(rel.relative_to(root).parts)
            except Exception:
                # Unrelated mount/permission edge; skip deeper traversal
                dirnames[:] = []
                continue
            if depth > max_depth:
                # Prune deeper traversal
                dirnames[:] = []
                continue
            if (rel / "Assets").exists() and (rel / "ProjectSettings").exists():
                return rel
    except Exception:
        pass
    # 6) Fallback: CWD
    return Path.cwd().resolve()


def _resolve_safe_path_from_uri(uri: str, project: Path) -> Path | None:
    raw: str | None = None
    if uri.startswith("unity://path/"):
        raw = uri[len("unity://path/"):]
    elif uri.startswith("file://"):
        parsed = urlparse(uri)
        raw = unquote(parsed.path or "")
        # On Windows, urlparse('file:///C:/x') -> path='/C:/x'. Strip the leading slash for drive letters.
        try:
            import os as _os
            if _os.name == "nt" and raw.startswith("/") and re.match(r"^/[A-Za-z]:/", raw):
                raw = raw[1:]
            # UNC paths: file://server/share -> netloc='server', path='/share'. Treat as \\\\server/share
            if _os.name == "nt" and parsed.netloc:
                raw = f"//{parsed.netloc}{raw}"
        except Exception:
            pass
    elif uri.startswith("Assets/"):
        raw = uri
    if raw is None:
        return None
    # Normalize separators early
    raw = raw.replace("\\", "/")
    p = (project / raw).resolve()
    try:
        p.relative_to(project)
    except ValueError:
        return None
    return p


@mcp_for_unity_tool(description=("List project URIs (unity://path/...) under a folder (default: Assets). Only .cs files are returned by default; always appends unity://spec/script-edits.\n"))
async def list_resources(
    ctx: Context,
    pattern: Annotated[str, "Glob, default is *.cs"] | None = "*.cs",
    under: Annotated[str,
                     "Folder under project root, default is Assets"] = "Assets",
    limit: Annotated[int, "Page limit"] = 200,
    project_root: Annotated[str, "Project path"] | None = None,
) -> dict[str, Any]:
    ctx.info(f"Processing list_resources: {pattern}")
    try:
        project = _resolve_project_root(project_root)
        base = (project / under).resolve()
        try:
            base.relative_to(project)
        except ValueError:
            return {"success": False, "error": "Base path must be under project root"}
        # Enforce listing only under Assets
        try:
            base.relative_to(project / "Assets")
        except ValueError:
            return {"success": False, "error": "Listing is restricted to Assets/"}

        matches: list[str] = []
        limit_int = _coerce_int(limit, default=200, minimum=1)
        for p in base.rglob("*"):
            if not p.is_file():
                continue
            # Resolve symlinks and ensure the real path stays under project/Assets
            try:
                rp = p.resolve()
                rp.relative_to(project / "Assets")
            except Exception:
                continue
            # Enforce .cs extension regardless of provided pattern
            if p.suffix.lower() != ".cs":
                continue
            if pattern and not fnmatch.fnmatch(p.name, pattern):
                continue
            rel = p.relative_to(project).as_posix()
            matches.append(f"unity://path/{rel}")
            if len(matches) >= max(1, limit_int):
                break

        # Always include the canonical spec resource so NL clients can discover it
        if "unity://spec/script-edits" not in matches:
            matches.append("unity://spec/script-edits")

        return {"success": True, "data": {"uris": matches, "count": len(matches)}}
    except Exception as e:
        return {"success": False, "error": str(e)}


@mcp_for_unity_tool(description=("Reads a resource by unity://path/... URI with optional slicing."))
async def read_resource(
    ctx: Context,
    uri: Annotated[str, "The resource URI to read under Assets/"],
    start_line: Annotated[int,
                          "The starting line number (0-based)"] | None = None,
    line_count: Annotated[int,
                          "The number of lines to read"] | None = None,
    head_bytes: Annotated[int,
                          "The number of bytes to read from the start of the file"] | None = None,
    tail_lines: Annotated[int,
                          "The number of lines to read from the end of the file"] | None = None,
    project_root: Annotated[str,
                            "The project root directory"] | None = None,
    request: Annotated[str, "The request ID"] | None = None,
) -> dict[str, Any]:
    ctx.info(f"Processing read_resource: {uri}")
    try:
        # Serve the canonical spec directly when requested (allow bare or with scheme)
        if uri in ("unity://spec/script-edits", "spec/script-edits", "script-edits"):
            spec_json = (
                '{\n'
                '  "name": "Unity MCP - Script Edits v1",\n'
                '  "target_tool": "script_apply_edits",\n'
                '  "canonical_rules": {\n'
                '    "always_use": ["op","className","methodName","replacement","afterMethodName","beforeMethodName"],\n'
                '    "never_use": ["new_method","anchor_method","content","newText"],\n'
                '    "defaults": {\n'
                '      "className": "\u2190 server will default to \'name\' when omitted",\n'
                '      "position": "end"\n'
                '    }\n'
                '  },\n'
                '  "ops": [\n'
                '    {"op":"replace_method","required":["className","methodName","replacement"],"optional":["returnType","parametersSignature","attributesContains"],"examples":[{"note":"match overload by signature","parametersSignature":"(int a, string b)"},{"note":"ensure attributes retained","attributesContains":"ContextMenu"}]},\n'
                '    {"op":"insert_method","required":["className","replacement"],"position":{"enum":["start","end","after","before"],"after_requires":"afterMethodName","before_requires":"beforeMethodName"}},\n'
                '    {"op":"delete_method","required":["className","methodName"]},\n'
                '    {"op":"anchor_insert","required":["anchor","text"],"notes":"regex; position=before|after"}\n'
                '  ],\n'
                '  "apply_text_edits_recipe": {\n'
                '    "step1_read": { "tool": "resources/read", "args": {"uri": "unity://path/Assets/Scripts/Interaction/SmartReach.cs"} },\n'
                '    "step2_apply": {\n'
                '      "tool": "manage_script",\n'
                '      "args": {\n'
                '        "action": "apply_text_edits",\n'
                '        "name": "SmartReach", "path": "Assets/Scripts/Interaction",\n'
                '        "edits": [{"startLine": 42, "startCol": 1, "endLine": 42, "endCol": 1, "newText": "[MyAttr]\\n"}],\n'
                '        "precondition_sha256": "<sha-from-step1>",\n'
                '        "options": {"refresh": "immediate", "validate": "standard"}\n'
                '      }\n'
                '    },\n'
                '    "note": "newText is for apply_text_edits ranges only; use replacement in script_apply_edits ops."\n'
                '  },\n'
                '  "examples": [\n'
                '    {\n'
                '      "title": "Replace a method",\n'
                '      "args": {\n'
                '        "name": "SmartReach",\n'
                '        "path": "Assets/Scripts/Interaction",\n'
                '        "edits": [\n'
                '          {"op":"replace_method","className":"SmartReach","methodName":"HasTarget","replacement":"public bool HasTarget() { return currentTarget != null; }"}\n'
                '        ],\n'
                '        "options": { "validate": "standard", "refresh": "immediate" }\n'
                '      }\n'
                '    },\n'
                '    {\n'
                '      "title": "Insert a method after another",\n'
                '      "args": {\n'
                '        "name": "SmartReach",\n'
                '        "path": "Assets/Scripts/Interaction",\n'
                '        "edits": [\n'
                '          {"op":"insert_method","className":"SmartReach","replacement":"public void PrintSeries() { Debug.Log(seriesName); }","position":"after","afterMethodName":"GetCurrentTarget"}\n'
                '        ]\n'
                '      }\n'
                '    }\n'
                '  ]\n'
                '}\n'
            )
            sha = hashlib.sha256(spec_json.encode("utf-8")).hexdigest()
            return {"success": True, "data": {"text": spec_json, "metadata": {"sha256": sha}}}

        project = _resolve_project_root(project_root)
        p = _resolve_safe_path_from_uri(uri, project)
        if not p or not p.exists() or not p.is_file():
            return {"success": False, "error": f"Resource not found: {uri}"}
        try:
            p.relative_to(project / "Assets")
        except ValueError:
            return {"success": False, "error": "Read restricted to Assets/"}
        # Natural-language convenience: request like "last 120 lines", "first 200 lines",
        # "show 40 lines around MethodName", etc.
        if request:
            req = request.strip().lower()
            m = re.search(r"last\s+(\d+)\s+lines", req)
            if m:
                tail_lines = int(m.group(1))
            m = re.search(r"first\s+(\d+)\s+lines", req)
            if m:
                start_line = 1
                line_count = int(m.group(1))
            m = re.search(r"first\s+(\d+)\s*bytes", req)
            if m:
                head_bytes = int(m.group(1))
            m = re.search(
                r"show\s+(\d+)\s+lines\s+around\s+([A-Za-z_][A-Za-z0-9_]*)", req)
            if m:
                window = int(m.group(1))
                method = m.group(2)
                # naive search for method header to get a line number
                text_all = p.read_text(encoding="utf-8")
                lines_all = text_all.splitlines()
                pat = re.compile(
                    rf"^\s*(?:\[[^\]]+\]\s*)*(?:public|private|protected|internal|static|virtual|override|sealed|async|extern|unsafe|new|partial).*?\b{re.escape(method)}\s*\(", re.MULTILINE)
                hit_line = None
                for i, line in enumerate(lines_all, start=1):
                    if pat.search(line):
                        hit_line = i
                        break
                if hit_line:
                    half = max(1, window // 2)
                    start_line = max(1, hit_line - half)
                    line_count = window

        # Coerce numeric inputs defensively (string/float -> int)
        start_line = _coerce_int(start_line)
        line_count = _coerce_int(line_count)
        head_bytes = _coerce_int(head_bytes, minimum=1)
        tail_lines = _coerce_int(tail_lines, minimum=1)

        # Compute SHA over full file contents (metadata-only default)
        full_bytes = p.read_bytes()
        full_sha = hashlib.sha256(full_bytes).hexdigest()

        # Selection only when explicitly requested via windowing args or request text hints
        selection_requested = bool(head_bytes or tail_lines or (
            start_line is not None and line_count is not None) or request)
        if selection_requested:
            # Mutually exclusive windowing options precedence:
            # 1) head_bytes, 2) tail_lines, 3) start_line+line_count, else full text
            if head_bytes and head_bytes > 0:
                raw = full_bytes[: head_bytes]
                text = raw.decode("utf-8", errors="replace")
            else:
                text = full_bytes.decode("utf-8", errors="replace")
                if tail_lines is not None and tail_lines > 0:
                    lines = text.splitlines()
                    n = max(0, tail_lines)
                    text = "\n".join(lines[-n:])
                elif start_line is not None and line_count is not None and line_count >= 0:
                    lines = text.splitlines()
                    s = max(0, start_line - 1)
                    e = min(len(lines), s + line_count)
                    text = "\n".join(lines[s:e])
            return {"success": True, "data": {"text": text, "metadata": {"sha256": full_sha, "lengthBytes": len(full_bytes)}}}
        else:
            # Default: metadata only
            return {"success": True, "data": {"metadata": {"sha256": full_sha, "lengthBytes": len(full_bytes)}}}
    except Exception as e:
        return {"success": False, "error": str(e)}


@mcp_for_unity_tool(description="Searches a file with a regex pattern and returns line numbers and excerpts.")
async def find_in_file(
    ctx: Context,
    uri: Annotated[str, "The resource URI to search under Assets/ or file path form supported by read_resource"],
    pattern: Annotated[str, "The regex pattern to search for"],
    ignore_case: Annotated[bool, "Case-insensitive search"] | None = True,
    project_root: Annotated[str,
                            "The project root directory"] | None = None,
    max_results: Annotated[int,
                           "Cap results to avoid huge payloads"] = 200,
) -> dict[str, Any]:
    ctx.info(f"Processing find_in_file: {uri}")
    try:
        project = _resolve_project_root(project_root)
        p = _resolve_safe_path_from_uri(uri, project)
        if not p or not p.exists() or not p.is_file():
            return {"success": False, "error": f"Resource not found: {uri}"}

        text = p.read_text(encoding="utf-8")
        flags = re.MULTILINE
        if ignore_case:
            flags |= re.IGNORECASE
        rx = re.compile(pattern, flags)

        results = []
        max_results_int = _coerce_int(max_results, default=200, minimum=1)
        lines = text.splitlines()
        for i, line in enumerate(lines, start=1):
            m = rx.search(line)
            if m:
                start_col = m.start() + 1  # 1-based
                end_col = m.end() + 1      # 1-based, end exclusive
                results.append({
                    "startLine": i,
                    "startCol": start_col,
                    "endLine": i,
                    "endCol": end_col,
                })
                if max_results_int and len(results) >= max_results_int:
                    break

        return {"success": True, "data": {"matches": results, "count": len(results)}}
    except Exception as e:
        return {"success": False, "error": str(e)}

```

--------------------------------------------------------------------------------
/MCPForUnity/UnityMcpServer~/src/tools/resource_tools.py:
--------------------------------------------------------------------------------

```python
"""
Resource wrapper tools so clients that do not expose MCP resources primitives
can still list and read files via normal tools. These call into the same
safe path logic (re-implemented here to avoid importing server.py).
"""
import fnmatch
import hashlib
import os
from pathlib import Path
import re
from typing import Annotated, Any
from urllib.parse import urlparse, unquote

from mcp.server.fastmcp import Context

from registry import mcp_for_unity_tool
from unity_connection import send_command_with_retry


def _coerce_int(value: Any, default: int | None = None, minimum: int | None = None) -> int | None:
    """Safely coerce various inputs (str/float/etc.) to an int.
    Returns default on failure; clamps to minimum when provided.
    """
    if value is None:
        return default
    try:
        # Avoid treating booleans as ints implicitly
        if isinstance(value, bool):
            return default
        if isinstance(value, int):
            result = int(value)
        else:
            s = str(value).strip()
            if s.lower() in ("", "none", "null"):
                return default
            # Allow "10.0" or similar inputs
            result = int(float(s))
        if minimum is not None and result < minimum:
            return minimum
        return result
    except Exception:
        return default


def _resolve_project_root(override: str | None) -> Path:
    # 1) Explicit override
    if override:
        pr = Path(override).expanduser().resolve()
        if (pr / "Assets").exists():
            return pr
    # 2) Environment
    env = os.environ.get("UNITY_PROJECT_ROOT")
    if env:
        env_path = Path(env).expanduser()
        # If UNITY_PROJECT_ROOT is relative, resolve against repo root (cwd's repo) instead of src dir
        pr = (Path.cwd(
        ) / env_path).resolve() if not env_path.is_absolute() else env_path.resolve()
        if (pr / "Assets").exists():
            return pr
    # 3) Ask Unity via manage_editor.get_project_root
    try:
        resp = send_command_with_retry(
            "manage_editor", {"action": "get_project_root"})
        if isinstance(resp, dict) and resp.get("success"):
            pr = Path(resp.get("data", {}).get(
                "projectRoot", "")).expanduser().resolve()
            if pr and (pr / "Assets").exists():
                return pr
    except Exception:
        pass

    # 4) Walk up from CWD to find a Unity project (Assets + ProjectSettings)
    cur = Path.cwd().resolve()
    for _ in range(6):
        if (cur / "Assets").exists() and (cur / "ProjectSettings").exists():
            return cur
        if cur.parent == cur:
            break
        cur = cur.parent
    # 5) Search downwards (shallow) from repo root for first folder with Assets + ProjectSettings
    try:
        import os as _os
        root = Path.cwd().resolve()
        max_depth = 3
        for dirpath, dirnames, _ in _os.walk(root):
            rel = Path(dirpath).resolve()
            try:
                depth = len(rel.relative_to(root).parts)
            except Exception:
                # Unrelated mount/permission edge; skip deeper traversal
                dirnames[:] = []
                continue
            if depth > max_depth:
                # Prune deeper traversal
                dirnames[:] = []
                continue
            if (rel / "Assets").exists() and (rel / "ProjectSettings").exists():
                return rel
    except Exception:
        pass
    # 6) Fallback: CWD
    return Path.cwd().resolve()


def _resolve_safe_path_from_uri(uri: str, project: Path) -> Path | None:
    raw: str | None = None
    if uri.startswith("unity://path/"):
        raw = uri[len("unity://path/"):]
    elif uri.startswith("file://"):
        parsed = urlparse(uri)
        raw = unquote(parsed.path or "")
        # On Windows, urlparse('file:///C:/x') -> path='/C:/x'. Strip the leading slash for drive letters.
        try:
            import os as _os
            if _os.name == "nt" and raw.startswith("/") and re.match(r"^/[A-Za-z]:/", raw):
                raw = raw[1:]
            # UNC paths: file://server/share -> netloc='server', path='/share'. Treat as \\\\server/share
            if _os.name == "nt" and parsed.netloc:
                raw = f"//{parsed.netloc}{raw}"
        except Exception:
            pass
    elif uri.startswith("Assets/"):
        raw = uri
    if raw is None:
        return None
    # Normalize separators early
    raw = raw.replace("\\", "/")
    p = (project / raw).resolve()
    try:
        p.relative_to(project)
    except ValueError:
        return None
    return p


@mcp_for_unity_tool(description=("List project URIs (unity://path/...) under a folder (default: Assets). Only .cs files are returned by default; always appends unity://spec/script-edits.\n"))
async def list_resources(
    ctx: Context,
    pattern: Annotated[str, "Glob, default is *.cs"] | None = "*.cs",
    under: Annotated[str,
                     "Folder under project root, default is Assets"] = "Assets",
    limit: Annotated[int, "Page limit"] = 200,
    project_root: Annotated[str, "Project path"] | None = None,
) -> dict[str, Any]:
    ctx.info(f"Processing list_resources: {pattern}")
    try:
        project = _resolve_project_root(project_root)
        base = (project / under).resolve()
        try:
            base.relative_to(project)
        except ValueError:
            return {"success": False, "error": "Base path must be under project root"}
        # Enforce listing only under Assets
        try:
            base.relative_to(project / "Assets")
        except ValueError:
            return {"success": False, "error": "Listing is restricted to Assets/"}

        matches: list[str] = []
        limit_int = _coerce_int(limit, default=200, minimum=1)
        for p in base.rglob("*"):
            if not p.is_file():
                continue
            # Resolve symlinks and ensure the real path stays under project/Assets
            try:
                rp = p.resolve()
                rp.relative_to(project / "Assets")
            except Exception:
                continue
            # Enforce .cs extension regardless of provided pattern
            if p.suffix.lower() != ".cs":
                continue
            if pattern and not fnmatch.fnmatch(p.name, pattern):
                continue
            rel = p.relative_to(project).as_posix()
            matches.append(f"unity://path/{rel}")
            if len(matches) >= max(1, limit_int):
                break

        # Always include the canonical spec resource so NL clients can discover it
        if "unity://spec/script-edits" not in matches:
            matches.append("unity://spec/script-edits")

        return {"success": True, "data": {"uris": matches, "count": len(matches)}}
    except Exception as e:
        return {"success": False, "error": str(e)}


@mcp_for_unity_tool(description=("Reads a resource by unity://path/... URI with optional slicing."))
async def read_resource(
    ctx: Context,
    uri: Annotated[str, "The resource URI to read under Assets/"],
    start_line: Annotated[int,
                          "The starting line number (0-based)"] | None = None,
    line_count: Annotated[int,
                          "The number of lines to read"] | None = None,
    head_bytes: Annotated[int,
                          "The number of bytes to read from the start of the file"] | None = None,
    tail_lines: Annotated[int,
                          "The number of lines to read from the end of the file"] | None = None,
    project_root: Annotated[str,
                            "The project root directory"] | None = None,
    request: Annotated[str, "The request ID"] | None = None,
) -> dict[str, Any]:
    ctx.info(f"Processing read_resource: {uri}")
    try:
        # Serve the canonical spec directly when requested (allow bare or with scheme)
        if uri in ("unity://spec/script-edits", "spec/script-edits", "script-edits"):
            spec_json = (
                '{\n'
                '  "name": "MCP for Unity - Script Edits v1",\n'
                '  "target_tool": "script_apply_edits",\n'
                '  "canonical_rules": {\n'
                '    "always_use": ["op","className","methodName","replacement","afterMethodName","beforeMethodName"],\n'
                '    "never_use": ["new_method","anchor_method","content","newText"],\n'
                '    "defaults": {\n'
                '      "className": "\u2190 server will default to \'name\' when omitted",\n'
                '      "position": "end"\n'
                '    }\n'
                '  },\n'
                '  "ops": [\n'
                '    {"op":"replace_method","required":["className","methodName","replacement"],"optional":["returnType","parametersSignature","attributesContains"],"examples":[{"note":"match overload by signature","parametersSignature":"(int a, string b)"},{"note":"ensure attributes retained","attributesContains":"ContextMenu"}]},\n'
                '    {"op":"insert_method","required":["className","replacement"],"position":{"enum":["start","end","after","before"],"after_requires":"afterMethodName","before_requires":"beforeMethodName"}},\n'
                '    {"op":"delete_method","required":["className","methodName"]},\n'
                '    {"op":"anchor_insert","required":["anchor","text"],"notes":"regex; position=before|after"}\n'
                '  ],\n'
                '  "apply_text_edits_recipe": {\n'
                '    "step1_read": { "tool": "resources/read", "args": {"uri": "unity://path/Assets/Scripts/Interaction/SmartReach.cs"} },\n'
                '    "step2_apply": {\n'
                '      "tool": "manage_script",\n'
                '      "args": {\n'
                '        "action": "apply_text_edits",\n'
                '        "name": "SmartReach", "path": "Assets/Scripts/Interaction",\n'
                '        "edits": [{"startLine": 42, "startCol": 1, "endLine": 42, "endCol": 1, "newText": "[MyAttr]\\n"}],\n'
                '        "precondition_sha256": "<sha-from-step1>",\n'
                '        "options": {"refresh": "immediate", "validate": "standard"}\n'
                '      }\n'
                '    },\n'
                '    "note": "newText is for apply_text_edits ranges only; use replacement in script_apply_edits ops."\n'
                '  },\n'
                '  "examples": [\n'
                '    {\n'
                '      "title": "Replace a method",\n'
                '      "args": {\n'
                '        "name": "SmartReach",\n'
                '        "path": "Assets/Scripts/Interaction",\n'
                '        "edits": [\n'
                '          {"op":"replace_method","className":"SmartReach","methodName":"HasTarget","replacement":"public bool HasTarget() { return currentTarget != null; }"}\n'
                '        ],\n'
                '        "options": { "validate": "standard", "refresh": "immediate" }\n'
                '      }\n'
                '    },\n'
                '    {\n'
                '      "title": "Insert a method after another",\n'
                '      "args": {\n'
                '        "name": "SmartReach",\n'
                '        "path": "Assets/Scripts/Interaction",\n'
                '        "edits": [\n'
                '          {"op":"insert_method","className":"SmartReach","replacement":"public void PrintSeries() { Debug.Log(seriesName); }","position":"after","afterMethodName":"GetCurrentTarget"}\n'
                '        ]\n'
                '      }\n'
                '    }\n'
                '  ]\n'
                '}\n'
            )
            sha = hashlib.sha256(spec_json.encode("utf-8")).hexdigest()
            return {"success": True, "data": {"text": spec_json, "metadata": {"sha256": sha}}}

        project = _resolve_project_root(project_root)
        p = _resolve_safe_path_from_uri(uri, project)
        if not p or not p.exists() or not p.is_file():
            return {"success": False, "error": f"Resource not found: {uri}"}
        try:
            p.relative_to(project / "Assets")
        except ValueError:
            return {"success": False, "error": "Read restricted to Assets/"}
        # Natural-language convenience: request like "last 120 lines", "first 200 lines",
        # "show 40 lines around MethodName", etc.
        if request:
            req = request.strip().lower()
            m = re.search(r"last\s+(\d+)\s+lines", req)
            if m:
                tail_lines = int(m.group(1))
            m = re.search(r"first\s+(\d+)\s+lines", req)
            if m:
                start_line = 1
                line_count = int(m.group(1))
            m = re.search(r"first\s+(\d+)\s*bytes", req)
            if m:
                head_bytes = int(m.group(1))
            m = re.search(
                r"show\s+(\d+)\s+lines\s+around\s+([A-Za-z_][A-Za-z0-9_]*)", req)
            if m:
                window = int(m.group(1))
                method = m.group(2)
                # naive search for method header to get a line number
                text_all = p.read_text(encoding="utf-8")
                lines_all = text_all.splitlines()
                pat = re.compile(
                    rf"^\s*(?:\[[^\]]+\]\s*)*(?:public|private|protected|internal|static|virtual|override|sealed|async|extern|unsafe|new|partial).*?\b{re.escape(method)}\s*\(", re.MULTILINE)
                hit_line = None
                for i, line in enumerate(lines_all, start=1):
                    if pat.search(line):
                        hit_line = i
                        break
                if hit_line:
                    half = max(1, window // 2)
                    start_line = max(1, hit_line - half)
                    line_count = window

        # Coerce numeric inputs defensively (string/float -> int)
        start_line = _coerce_int(start_line)
        line_count = _coerce_int(line_count)
        head_bytes = _coerce_int(head_bytes, minimum=1)
        tail_lines = _coerce_int(tail_lines, minimum=1)

        # Compute SHA over full file contents (metadata-only default)
        full_bytes = p.read_bytes()
        full_sha = hashlib.sha256(full_bytes).hexdigest()

        # Selection only when explicitly requested via windowing args or request text hints
        selection_requested = bool(head_bytes or tail_lines or (
            start_line is not None and line_count is not None) or request)
        if selection_requested:
            # Mutually exclusive windowing options precedence:
            # 1) head_bytes, 2) tail_lines, 3) start_line+line_count, else full text
            if head_bytes and head_bytes > 0:
                raw = full_bytes[: head_bytes]
                text = raw.decode("utf-8", errors="replace")
            else:
                text = full_bytes.decode("utf-8", errors="replace")
                if tail_lines is not None and tail_lines > 0:
                    lines = text.splitlines()
                    n = max(0, tail_lines)
                    text = "\n".join(lines[-n:])
                elif start_line is not None and line_count is not None and line_count >= 0:
                    lines = text.splitlines()
                    s = max(0, start_line - 1)
                    e = min(len(lines), s + line_count)
                    text = "\n".join(lines[s:e])
            return {"success": True, "data": {"text": text, "metadata": {"sha256": full_sha, "lengthBytes": len(full_bytes)}}}
        else:
            # Default: metadata only
            return {"success": True, "data": {"metadata": {"sha256": full_sha, "lengthBytes": len(full_bytes)}}}
    except Exception as e:
        return {"success": False, "error": str(e)}


@mcp_for_unity_tool(description="Searches a file with a regex pattern and returns line numbers and excerpts.")
async def find_in_file(
    ctx: Context,
    uri: Annotated[str, "The resource URI to search under Assets/ or file path form supported by read_resource"],
    pattern: Annotated[str, "The regex pattern to search for"],
    ignore_case: Annotated[bool, "Case-insensitive search"] | None = True,
    project_root: Annotated[str,
                            "The project root directory"] | None = None,
    max_results: Annotated[int,
                           "Cap results to avoid huge payloads"] = 200,
) -> dict[str, Any]:
    ctx.info(f"Processing find_in_file: {uri}")
    try:
        project = _resolve_project_root(project_root)
        p = _resolve_safe_path_from_uri(uri, project)
        if not p or not p.exists() or not p.is_file():
            return {"success": False, "error": f"Resource not found: {uri}"}

        text = p.read_text(encoding="utf-8")
        flags = re.MULTILINE
        if ignore_case:
            flags |= re.IGNORECASE
        rx = re.compile(pattern, flags)

        results = []
        max_results_int = _coerce_int(max_results, default=200, minimum=1)
        lines = text.splitlines()
        for i, line in enumerate(lines, start=1):
            m = rx.search(line)
            if m:
                start_col = m.start() + 1  # 1-based
                end_col = m.end() + 1      # 1-based, end exclusive
                results.append({
                    "startLine": i,
                    "startCol": start_col,
                    "endLine": i,
                    "endCol": end_col,
                })
                if max_results_int and len(results) >= max_results_int:
                    break

        return {"success": True, "data": {"matches": results, "count": len(results)}}
    except Exception as e:
        return {"success": False, "error": str(e)}

```

--------------------------------------------------------------------------------
/MCPForUnity/UnityMcpServer~/src/telemetry.py:
--------------------------------------------------------------------------------

```python
"""
Privacy-focused, anonymous telemetry system for MCP for Unity
Inspired by Onyx's telemetry implementation with Unity-specific adaptations

Fire-and-forget telemetry sender with a single background worker.
- No context/thread-local propagation to avoid re-entrancy into tool resolution.
- Small network timeouts to prevent stalls.
"""

import contextlib
from dataclasses import dataclass
from enum import Enum
import importlib
import json
import logging
import os
from pathlib import Path
import platform
import queue
import sys
import threading
import time
from typing import Any
from urllib.parse import urlparse
import uuid

import tomli

try:
    import httpx
    HAS_HTTPX = True
except ImportError:
    httpx = None  # type: ignore
    HAS_HTTPX = False

logger = logging.getLogger("unity-mcp-telemetry")


def get_package_version() -> str:
    """
    Open pyproject.toml and parse version
    We use the tomli library instead of tomllib to support Python 3.10
    """
    with open("pyproject.toml", "rb") as f:
        data = tomli.load(f)
    return data["project"]["version"]


MCP_VERSION = get_package_version()


class RecordType(str, Enum):
    """Types of telemetry records we collect"""
    VERSION = "version"
    STARTUP = "startup"
    USAGE = "usage"
    LATENCY = "latency"
    FAILURE = "failure"
    RESOURCE_RETRIEVAL = "resource_retrieval"
    TOOL_EXECUTION = "tool_execution"
    UNITY_CONNECTION = "unity_connection"
    CLIENT_CONNECTION = "client_connection"


class MilestoneType(str, Enum):
    """Major user journey milestones"""
    FIRST_STARTUP = "first_startup"
    FIRST_TOOL_USAGE = "first_tool_usage"
    FIRST_SCRIPT_CREATION = "first_script_creation"
    FIRST_SCENE_MODIFICATION = "first_scene_modification"
    MULTIPLE_SESSIONS = "multiple_sessions"
    DAILY_ACTIVE_USER = "daily_active_user"
    WEEKLY_ACTIVE_USER = "weekly_active_user"


@dataclass
class TelemetryRecord:
    """Structure for telemetry data"""
    record_type: RecordType
    timestamp: float
    customer_uuid: str
    session_id: str
    data: dict[str, Any]
    milestone: MilestoneType | None = None


class TelemetryConfig:
    """Telemetry configuration"""

    def __init__(self):
        """
        Prefer config file, then allow env overrides
        """
        server_config = None
        for modname in (
            "MCPForUnity.UnityMcpServer~.src.config",
            "MCPForUnity.UnityMcpServer.src.config",
            "src.config",
            "config",
        ):
            try:
                mod = importlib.import_module(modname)
                server_config = getattr(mod, "config", None)
                if server_config is not None:
                    break
            except Exception:
                continue

        # Determine enabled flag: config -> env DISABLE_* opt-out
        cfg_enabled = True if server_config is None else bool(
            getattr(server_config, "telemetry_enabled", True))
        self.enabled = cfg_enabled and not self._is_disabled()

        # Telemetry endpoint (Cloud Run default; override via env)
        cfg_default = None if server_config is None else getattr(
            server_config, "telemetry_endpoint", None)
        default_ep = cfg_default or "https://api-prod.coplay.dev/telemetry/events"
        self.default_endpoint = default_ep
        self.endpoint = self._validated_endpoint(
            os.environ.get("UNITY_MCP_TELEMETRY_ENDPOINT", default_ep),
            default_ep,
        )
        try:
            logger.info(
                "Telemetry configured: endpoint=%s (default=%s), timeout_env=%s",
                self.endpoint,
                default_ep,
                os.environ.get("UNITY_MCP_TELEMETRY_TIMEOUT") or "<unset>"
            )
        except Exception:
            pass

        # Local storage for UUID and milestones
        self.data_dir = self._get_data_directory()
        self.uuid_file = self.data_dir / "customer_uuid.txt"
        self.milestones_file = self.data_dir / "milestones.json"

        # Request timeout (small, fail fast). Override with UNITY_MCP_TELEMETRY_TIMEOUT
        try:
            self.timeout = float(os.environ.get(
                "UNITY_MCP_TELEMETRY_TIMEOUT", "1.5"))
        except Exception:
            self.timeout = 1.5
        try:
            logger.info("Telemetry timeout=%.2fs", self.timeout)
        except Exception:
            pass

        # Session tracking
        self.session_id = str(uuid.uuid4())

    def _is_disabled(self) -> bool:
        """Check if telemetry is disabled via environment variables"""
        disable_vars = [
            "DISABLE_TELEMETRY",
            "UNITY_MCP_DISABLE_TELEMETRY",
            "MCP_DISABLE_TELEMETRY"
        ]

        for var in disable_vars:
            if os.environ.get(var, "").lower() in ("true", "1", "yes", "on"):
                return True
        return False

    def _get_data_directory(self) -> Path:
        """Get directory for storing telemetry data"""
        if os.name == 'nt':  # Windows
            base_dir = Path(os.environ.get(
                'APPDATA', Path.home() / 'AppData' / 'Roaming'))
        elif os.name == 'posix':  # macOS/Linux
            if 'darwin' in os.uname().sysname.lower():  # macOS
                base_dir = Path.home() / 'Library' / 'Application Support'
            else:  # Linux
                base_dir = Path(os.environ.get('XDG_DATA_HOME',
                                Path.home() / '.local' / 'share'))
        else:
            base_dir = Path.home() / '.unity-mcp'

        data_dir = base_dir / 'UnityMCP'
        data_dir.mkdir(parents=True, exist_ok=True)
        return data_dir

    def _validated_endpoint(self, candidate: str, fallback: str) -> str:
        """Validate telemetry endpoint URL scheme; allow only http/https.
        Falls back to the provided default on error.
        """
        try:
            parsed = urlparse(candidate)
            if parsed.scheme not in ("https", "http"):
                raise ValueError(f"Unsupported scheme: {parsed.scheme}")
            # Basic sanity: require network location and path
            if not parsed.netloc:
                raise ValueError("Missing netloc in endpoint")
            # Reject localhost/loopback endpoints in production to avoid accidental local overrides
            host = parsed.hostname or ""
            if host in ("localhost", "127.0.0.1", "::1"):
                raise ValueError(
                    "Localhost endpoints are not allowed for telemetry")
            return candidate
        except Exception as e:
            logger.debug(
                f"Invalid telemetry endpoint '{candidate}', using default. Error: {e}",
                exc_info=True,
            )
            return fallback


class TelemetryCollector:
    """Main telemetry collection class"""

    def __init__(self):
        self.config = TelemetryConfig()
        self._customer_uuid: str | None = None
        self._milestones: dict[str, dict[str, Any]] = {}
        self._lock: threading.Lock = threading.Lock()
        # Bounded queue with single background worker (records only; no context propagation)
        self._queue: "queue.Queue[TelemetryRecord]" = queue.Queue(maxsize=1000)
        # Load persistent data before starting worker so first events have UUID
        self._load_persistent_data()
        self._worker: threading.Thread = threading.Thread(
            target=self._worker_loop, daemon=True)
        self._worker.start()

    def _load_persistent_data(self):
        """Load UUID and milestones from disk"""
        # Load customer UUID
        try:
            if self.config.uuid_file.exists():
                self._customer_uuid = self.config.uuid_file.read_text(
                    encoding="utf-8").strip() or str(uuid.uuid4())
            else:
                self._customer_uuid = str(uuid.uuid4())
                try:
                    self.config.uuid_file.write_text(
                        self._customer_uuid, encoding="utf-8")
                    if os.name == "posix":
                        os.chmod(self.config.uuid_file, 0o600)
                except OSError as e:
                    logger.debug(
                        f"Failed to persist customer UUID: {e}", exc_info=True)
        except OSError as e:
            logger.debug(f"Failed to load customer UUID: {e}", exc_info=True)
            self._customer_uuid = str(uuid.uuid4())

        # Load milestones (failure here must not affect UUID)
        try:
            if self.config.milestones_file.exists():
                content = self.config.milestones_file.read_text(
                    encoding="utf-8")
                self._milestones = json.loads(content) or {}
                if not isinstance(self._milestones, dict):
                    self._milestones = {}
        except (OSError, json.JSONDecodeError, ValueError) as e:
            logger.debug(f"Failed to load milestones: {e}", exc_info=True)
            self._milestones = {}

    def _save_milestones(self):
        """Save milestones to disk. Caller must hold self._lock."""
        try:
            self.config.milestones_file.write_text(
                json.dumps(self._milestones, indent=2),
                encoding="utf-8",
            )
        except OSError as e:
            logger.warning(f"Failed to save milestones: {e}", exc_info=True)

    def record_milestone(self, milestone: MilestoneType, data: dict[str, Any] | None = None) -> bool:
        """Record a milestone event, returns True if this is the first occurrence"""
        if not self.config.enabled:
            return False
        milestone_key = milestone.value
        with self._lock:
            if milestone_key in self._milestones:
                return False  # Already recorded
            milestone_data = {
                "timestamp": time.time(),
                "data": data or {},
            }
            self._milestones[milestone_key] = milestone_data
            self._save_milestones()

        # Also send as telemetry record
        self.record(
            record_type=RecordType.USAGE,
            data={"milestone": milestone_key, **(data or {})},
            milestone=milestone
        )

        return True

    def record(self,
               record_type: RecordType,
               data: dict[str, Any],
               milestone: MilestoneType | None = None):
        """Record a telemetry event (async, non-blocking)"""
        if not self.config.enabled:
            return

        # Allow fallback sender when httpx is unavailable (no early return)

        record = TelemetryRecord(
            record_type=record_type,
            timestamp=time.time(),
            customer_uuid=self._customer_uuid or "unknown",
            session_id=self.config.session_id,
            data=data,
            milestone=milestone
        )
        # Enqueue for background worker (non-blocking). Drop on backpressure.
        try:
            self._queue.put_nowait(record)
        except queue.Full:
            logger.debug("Telemetry queue full; dropping %s",
                         record.record_type)

    def _worker_loop(self):
        """Background worker that serializes telemetry sends."""
        while True:
            rec = self._queue.get()
            try:
                # Run sender directly; do not reuse caller context/thread-locals
                self._send_telemetry(rec)
            except Exception:
                logger.debug("Telemetry worker send failed", exc_info=True)
            finally:
                with contextlib.suppress(Exception):
                    self._queue.task_done()

    def _send_telemetry(self, record: TelemetryRecord):
        """Send telemetry data to endpoint"""
        try:
            # System fingerprint (top-level remains concise; details stored in data JSON)
            _platform = platform.system()          # 'Darwin' | 'Linux' | 'Windows'
            _source = sys.platform                 # 'darwin' | 'linux' | 'win32'
            _platform_detail = f"{_platform} {platform.release()} ({platform.machine()})"
            _python_version = platform.python_version()

            # Enrich data JSON so BigQuery stores detailed fields without schema change
            enriched_data = dict(record.data or {})
            enriched_data.setdefault("platform_detail", _platform_detail)
            enriched_data.setdefault("python_version", _python_version)

            payload = {
                "record": record.record_type.value,
                "timestamp": record.timestamp,
                "customer_uuid": record.customer_uuid,
                "session_id": record.session_id,
                "data": enriched_data,
                "version": MCP_VERSION,
                "platform": _platform,
                "source": _source,
            }

            if record.milestone:
                payload["milestone"] = record.milestone.value

            # Prefer httpx when available; otherwise fall back to urllib
            if httpx:
                with httpx.Client(timeout=self.config.timeout) as client:
                    # Re-validate endpoint at send time to handle dynamic changes
                    endpoint = self.config._validated_endpoint(
                        self.config.endpoint, self.config.default_endpoint)
                    response = client.post(endpoint, json=payload)
                    if 200 <= response.status_code < 300:
                        logger.debug(f"Telemetry sent: {record.record_type}")
                    else:
                        logger.warning(
                            f"Telemetry failed: HTTP {response.status_code}")
            else:
                import urllib.request
                import urllib.error
                data_bytes = json.dumps(payload).encode("utf-8")
                endpoint = self.config._validated_endpoint(
                    self.config.endpoint, self.config.default_endpoint)
                req = urllib.request.Request(
                    endpoint,
                    data=data_bytes,
                    headers={"Content-Type": "application/json"},
                    method="POST",
                )
                try:
                    with urllib.request.urlopen(req, timeout=self.config.timeout) as resp:
                        if 200 <= resp.getcode() < 300:
                            logger.debug(
                                f"Telemetry sent (urllib): {record.record_type}")
                        else:
                            logger.warning(
                                f"Telemetry failed (urllib): HTTP {resp.getcode()}")
                except urllib.error.URLError as ue:
                    logger.warning(f"Telemetry send failed (urllib): {ue}")

        except Exception as e:
            # Never let telemetry errors interfere with app functionality
            logger.debug(f"Telemetry send failed: {e}")


# Global telemetry instance
_telemetry_collector: TelemetryCollector | None = None


def get_telemetry() -> TelemetryCollector:
    """Get the global telemetry collector instance"""
    global _telemetry_collector
    if _telemetry_collector is None:
        _telemetry_collector = TelemetryCollector()
    return _telemetry_collector


def record_telemetry(record_type: RecordType,
                     data: dict[str, Any],
                     milestone: MilestoneType | None = None):
    """Convenience function to record telemetry"""
    get_telemetry().record(record_type, data, milestone)


def record_milestone(milestone: MilestoneType, data: dict[str, Any] | None = None) -> bool:
    """Convenience function to record a milestone"""
    return get_telemetry().record_milestone(milestone, data)


def record_tool_usage(tool_name: str, success: bool, duration_ms: float, error: str | None = None, sub_action: str | None = None):
    """Record tool usage telemetry

    Args:
        tool_name: Name of the tool invoked (e.g., 'manage_scene').
        success: Whether the tool completed successfully.
        duration_ms: Execution duration in milliseconds.
        error: Optional error message (truncated if present).
        sub_action: Optional sub-action/operation within the tool (e.g., 'get_hierarchy').
    """
    data = {
        "tool_name": tool_name,
        "success": success,
        "duration_ms": round(duration_ms, 2)
    }

    if sub_action is not None:
        try:
            data["sub_action"] = str(sub_action)
        except Exception:
            # Ensure telemetry is never disruptive
            data["sub_action"] = "unknown"

    if error:
        data["error"] = str(error)[:200]  # Limit error message length

    record_telemetry(RecordType.TOOL_EXECUTION, data)


def record_resource_usage(resource_name: str, success: bool, duration_ms: float, error: str | None = None):
    """Record resource usage telemetry

    Args:
        resource_name: Name of the resource invoked (e.g., 'get_tests').
        success: Whether the resource completed successfully.
        duration_ms: Execution duration in milliseconds.
        error: Optional error message (truncated if present).
    """
    data = {
        "resource_name": resource_name,
        "success": success,
        "duration_ms": round(duration_ms, 2)
    }

    if error:
        data["error"] = str(error)[:200]  # Limit error message length

    record_telemetry(RecordType.RESOURCE_RETRIEVAL, data)


def record_latency(operation: str, duration_ms: float, metadata: dict[str, Any] | None = None):
    """Record latency telemetry"""
    data = {
        "operation": operation,
        "duration_ms": round(duration_ms, 2)
    }

    if metadata:
        data.update(metadata)

    record_telemetry(RecordType.LATENCY, data)


def record_failure(component: str, error: str, metadata: dict[str, Any] | None = None):
    """Record failure telemetry"""
    data = {
        "component": component,
        "error": str(error)[:500]  # Limit error message length
    }

    if metadata:
        data.update(metadata)

    record_telemetry(RecordType.FAILURE, data)


def is_telemetry_enabled() -> bool:
    """Check if telemetry is enabled"""
    return get_telemetry().config.enabled

```

--------------------------------------------------------------------------------
/MCPForUnity/UnityMcpServer~/src/unity_connection.py:
--------------------------------------------------------------------------------

```python
from config import config
import contextlib
from dataclasses import dataclass
import errno
import json
import logging
from pathlib import Path
from port_discovery import PortDiscovery
import random
import socket
import struct
import threading
import time
from typing import Any, Dict

from models import MCPResponse


# Configure logging using settings from config
logging.basicConfig(
    level=getattr(logging, config.log_level),
    format=config.log_format
)
logger = logging.getLogger("mcp-for-unity-server")

# Module-level lock to guard global connection initialization
_connection_lock = threading.Lock()

# Maximum allowed framed payload size (64 MiB)
FRAMED_MAX = 64 * 1024 * 1024


@dataclass
class UnityConnection:
    """Manages the socket connection to the Unity Editor."""
    host: str = config.unity_host
    port: int = None  # Will be set dynamically
    sock: socket.socket = None  # Socket for Unity communication
    use_framing: bool = False  # Negotiated per-connection

    def __post_init__(self):
        """Set port from discovery if not explicitly provided"""
        if self.port is None:
            self.port = PortDiscovery.discover_unity_port()
        self._io_lock = threading.Lock()
        self._conn_lock = threading.Lock()

    def connect(self) -> bool:
        """Establish a connection to the Unity Editor."""
        if self.sock:
            return True
        with self._conn_lock:
            if self.sock:
                return True
            try:
                # Bounded connect to avoid indefinite blocking
                connect_timeout = float(
                    getattr(config, "connect_timeout", getattr(config, "connection_timeout", 1.0)))
                self.sock = socket.create_connection(
                    (self.host, self.port), connect_timeout)
                # Disable Nagle's algorithm to reduce small RPC latency
                with contextlib.suppress(Exception):
                    self.sock.setsockopt(
                        socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
                logger.debug(f"Connected to Unity at {self.host}:{self.port}")

                # Strict handshake: require FRAMING=1
                try:
                    require_framing = getattr(config, "require_framing", True)
                    timeout = float(getattr(config, "handshake_timeout", 1.0))
                    self.sock.settimeout(timeout)
                    buf = bytearray()
                    deadline = time.monotonic() + timeout
                    while time.monotonic() < deadline and len(buf) < 512:
                        try:
                            chunk = self.sock.recv(256)
                            if not chunk:
                                break
                            buf.extend(chunk)
                            if b"\n" in buf:
                                break
                        except socket.timeout:
                            break
                    text = bytes(buf).decode('ascii', errors='ignore').strip()

                    if 'FRAMING=1' in text:
                        self.use_framing = True
                        logger.debug(
                            'MCP for Unity handshake received: FRAMING=1 (strict)')
                    else:
                        if require_framing:
                            # Best-effort plain-text advisory for legacy peers
                            with contextlib.suppress(Exception):
                                self.sock.sendall(
                                    b'MCP for Unity requires FRAMING=1\n')
                            raise ConnectionError(
                                f'MCP for Unity requires FRAMING=1, got: {text!r}')
                        else:
                            self.use_framing = False
                            logger.warning(
                                'MCP for Unity handshake missing FRAMING=1; proceeding in legacy mode by configuration')
                finally:
                    self.sock.settimeout(config.connection_timeout)
                return True
            except Exception as e:
                logger.error(f"Failed to connect to Unity: {str(e)}")
                try:
                    if self.sock:
                        self.sock.close()
                except Exception:
                    pass
                self.sock = None
                return False

    def disconnect(self):
        """Close the connection to the Unity Editor."""
        if self.sock:
            try:
                self.sock.close()
            except Exception as e:
                logger.error(f"Error disconnecting from Unity: {str(e)}")
            finally:
                self.sock = None

    def _read_exact(self, sock: socket.socket, count: int) -> bytes:
        data = bytearray()
        while len(data) < count:
            chunk = sock.recv(count - len(data))
            if not chunk:
                raise ConnectionError(
                    "Connection closed before reading expected bytes")
            data.extend(chunk)
        return bytes(data)

    def receive_full_response(self, sock, buffer_size=config.buffer_size) -> bytes:
        """Receive a complete response from Unity, handling chunked data."""
        if self.use_framing:
            try:
                # Consume heartbeats, but do not hang indefinitely if only zero-length frames arrive
                heartbeat_count = 0
                deadline = time.monotonic() + getattr(config, 'framed_receive_timeout', 2.0)
                while True:
                    header = self._read_exact(sock, 8)
                    payload_len = struct.unpack('>Q', header)[0]
                    if payload_len == 0:
                        # Heartbeat/no-op frame: consume and continue waiting for a data frame
                        logger.debug("Received heartbeat frame (length=0)")
                        heartbeat_count += 1
                        if heartbeat_count >= getattr(config, 'max_heartbeat_frames', 16) or time.monotonic() > deadline:
                            # Treat as empty successful response to match C# server behavior
                            logger.debug(
                                "Heartbeat threshold reached; returning empty response")
                            return b""
                        continue
                    if payload_len > FRAMED_MAX:
                        raise ValueError(
                            f"Invalid framed length: {payload_len}")
                    payload = self._read_exact(sock, payload_len)
                    logger.debug(
                        f"Received framed response ({len(payload)} bytes)")
                    return payload
            except socket.timeout as e:
                logger.warning("Socket timeout during framed receive")
                raise TimeoutError("Timeout receiving Unity response") from e
            except Exception as e:
                logger.error(f"Error during framed receive: {str(e)}")
                raise

        chunks = []
        # Respect the socket's currently configured timeout
        try:
            while True:
                chunk = sock.recv(buffer_size)
                if not chunk:
                    if not chunks:
                        raise Exception(
                            "Connection closed before receiving data")
                    break
                chunks.append(chunk)

                # Process the data received so far
                data = b''.join(chunks)
                decoded_data = data.decode('utf-8')

                # Check if we've received a complete response
                try:
                    # Special case for ping-pong
                    if decoded_data.strip().startswith('{"status":"success","result":{"message":"pong"'):
                        logger.debug("Received ping response")
                        return data

                    # Handle escaped quotes in the content
                    if '"content":' in decoded_data:
                        # Find the content field and its value
                        content_start = decoded_data.find('"content":') + 9
                        content_end = decoded_data.rfind('"', content_start)
                        if content_end > content_start:
                            # Replace escaped quotes in content with regular quotes
                            content = decoded_data[content_start:content_end]
                            content = content.replace('\\"', '"')
                            decoded_data = decoded_data[:content_start] + \
                                content + decoded_data[content_end:]

                    # Validate JSON format
                    json.loads(decoded_data)

                    # If we get here, we have valid JSON
                    logger.info(
                        f"Received complete response ({len(data)} bytes)")
                    return data
                except json.JSONDecodeError:
                    # We haven't received a complete valid JSON response yet
                    continue
                except Exception as e:
                    logger.warning(
                        f"Error processing response chunk: {str(e)}")
                    # Continue reading more chunks as this might not be the complete response
                    continue
        except socket.timeout:
            logger.warning("Socket timeout during receive")
            raise Exception("Timeout receiving Unity response")
        except Exception as e:
            logger.error(f"Error during receive: {str(e)}")
            raise

    def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
        """Send a command with retry/backoff and port rediscovery. Pings only when requested."""
        # Defensive guard: catch empty/placeholder invocations early
        if not command_type:
            raise ValueError("MCP call missing command_type")
        if params is None:
            return MCPResponse(success=False, error="MCP call received with no parameters (client placeholder?)")
        attempts = max(config.max_retries, 5)
        base_backoff = max(0.5, config.retry_delay)

        def read_status_file() -> dict | None:
            try:
                status_files = sorted(Path.home().joinpath(
                    '.unity-mcp').glob('unity-mcp-status-*.json'), key=lambda p: p.stat().st_mtime, reverse=True)
                if not status_files:
                    return None
                latest = status_files[0]
                with latest.open('r') as f:
                    return json.load(f)
            except Exception:
                return None

        last_short_timeout = None

        # Preflight: if Unity reports reloading, return a structured hint so clients can retry politely
        try:
            status = read_status_file()
            if status and (status.get('reloading') or status.get('reason') == 'reloading'):
                return MCPResponse(
                    success=False,
                    error="Unity domain reload in progress, please try again shortly",
                    data={"state": "reloading", "retry_after_ms": int(
                        config.reload_retry_ms)}
                )
        except Exception:
            pass

        for attempt in range(attempts + 1):
            try:
                # Ensure connected (handshake occurs within connect())
                if not self.sock and not self.connect():
                    raise Exception("Could not connect to Unity")

                # Build payload
                if command_type == 'ping':
                    payload = b'ping'
                else:
                    command = {"type": command_type, "params": params or {}}
                    payload = json.dumps(
                        command, ensure_ascii=False).encode('utf-8')

                # Send/receive are serialized to protect the shared socket
                with self._io_lock:
                    mode = 'framed' if self.use_framing else 'legacy'
                    with contextlib.suppress(Exception):
                        logger.debug(
                            "send %d bytes; mode=%s; head=%s",
                            len(payload),
                            mode,
                            (payload[:32]).decode('utf-8', 'ignore'),
                        )
                    if self.use_framing:
                        header = struct.pack('>Q', len(payload))
                        self.sock.sendall(header)
                        self.sock.sendall(payload)
                    else:
                        self.sock.sendall(payload)

                    # During retry bursts use a short receive timeout and ensure restoration
                    restore_timeout = None
                    if attempt > 0 and last_short_timeout is None:
                        restore_timeout = self.sock.gettimeout()
                        self.sock.settimeout(1.0)
                    try:
                        response_data = self.receive_full_response(self.sock)
                        with contextlib.suppress(Exception):
                            logger.debug("recv %d bytes; mode=%s",
                                         len(response_data), mode)
                    finally:
                        if restore_timeout is not None:
                            self.sock.settimeout(restore_timeout)
                            last_short_timeout = None

                # Parse
                if command_type == 'ping':
                    resp = json.loads(response_data.decode('utf-8'))
                    if resp.get('status') == 'success' and resp.get('result', {}).get('message') == 'pong':
                        return {"message": "pong"}
                    raise Exception("Ping unsuccessful")

                resp = json.loads(response_data.decode('utf-8'))
                if resp.get('status') == 'error':
                    err = resp.get('error') or resp.get(
                        'message', 'Unknown Unity error')
                    raise Exception(err)
                return resp.get('result', {})
            except Exception as e:
                logger.warning(
                    f"Unity communication attempt {attempt+1} failed: {e}")
                try:
                    if self.sock:
                        self.sock.close()
                finally:
                    self.sock = None

                # Re-discover port each time
                try:
                    new_port = PortDiscovery.discover_unity_port()
                    if new_port != self.port:
                        logger.info(
                            f"Unity port changed {self.port} -> {new_port}")
                    self.port = new_port
                except Exception as de:
                    logger.debug(f"Port discovery failed: {de}")

                if attempt < attempts:
                    # Heartbeat-aware, jittered backoff
                    status = read_status_file()
                    # Base exponential backoff
                    backoff = base_backoff * (2 ** attempt)
                    # Decorrelated jitter multiplier
                    jitter = random.uniform(0.1, 0.3)

                    # Fast‑retry for transient socket failures
                    fast_error = isinstance(
                        e, (ConnectionRefusedError, ConnectionResetError, TimeoutError))
                    if not fast_error:
                        try:
                            err_no = getattr(e, 'errno', None)
                            fast_error = err_no in (
                                errno.ECONNREFUSED, errno.ECONNRESET, errno.ETIMEDOUT)
                        except Exception:
                            pass

                    # Cap backoff depending on state
                    if status and status.get('reloading'):
                        cap = 0.8
                    elif fast_error:
                        cap = 0.25
                    else:
                        cap = 3.0

                    sleep_s = min(cap, jitter * (2 ** attempt))
                    time.sleep(sleep_s)
                    continue
                raise


# Global Unity connection
_unity_connection = None


def get_unity_connection() -> UnityConnection:
    """Retrieve or establish a persistent Unity connection.

    Note: Do NOT ping on every retrieval to avoid connection storms. Rely on
    send_command() exceptions to detect broken sockets and reconnect there.
    """
    global _unity_connection
    if _unity_connection is not None:
        return _unity_connection

    # Double-checked locking to avoid concurrent socket creation
    with _connection_lock:
        if _unity_connection is not None:
            return _unity_connection
        logger.info("Creating new Unity connection")
        _unity_connection = UnityConnection()
        if not _unity_connection.connect():
            _unity_connection = None
            raise ConnectionError(
                "Could not connect to Unity. Ensure the Unity Editor and MCP Bridge are running.")
        logger.info("Connected to Unity on startup")
        return _unity_connection


# -----------------------------
# Centralized retry helpers
# -----------------------------

def _is_reloading_response(resp: dict) -> bool:
    """Return True if the Unity response indicates the editor is reloading."""
    if not isinstance(resp, dict):
        return False
    if resp.get("state") == "reloading":
        return True
    message_text = (resp.get("message") or resp.get("error") or "").lower()
    return "reload" in message_text


def send_command_with_retry(command_type: str, params: Dict[str, Any], *, max_retries: int | None = None, retry_ms: int | None = None) -> Dict[str, Any]:
    """Send a command via the shared connection, waiting politely through Unity reloads.

    Uses config.reload_retry_ms and config.reload_max_retries by default. Preserves the
    structured failure if retries are exhausted.
    """
    conn = get_unity_connection()
    if max_retries is None:
        max_retries = getattr(config, "reload_max_retries", 40)
    if retry_ms is None:
        retry_ms = getattr(config, "reload_retry_ms", 250)

    response = conn.send_command(command_type, params)
    retries = 0
    while _is_reloading_response(response) and retries < max_retries:
        delay_ms = int(response.get("retry_after_ms", retry_ms)
                       ) if isinstance(response, dict) else retry_ms
        time.sleep(max(0.0, delay_ms / 1000.0))
        retries += 1
        response = conn.send_command(command_type, params)
    return response


async def async_send_command_with_retry(command_type: str, params: dict[str, Any], *, loop=None, max_retries: int | None = None, retry_ms: int | None = None) -> dict[str, Any] | MCPResponse:
    """Async wrapper that runs the blocking retry helper in a thread pool."""
    try:
        import asyncio  # local import to avoid mandatory asyncio dependency for sync callers
        if loop is None:
            loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            None,
            lambda: send_command_with_retry(
                command_type, params, max_retries=max_retries, retry_ms=retry_ms),
        )
    except Exception as e:
        return MCPResponse(success=False, error=str(e))

```

--------------------------------------------------------------------------------
/UnityMcpBridge/UnityMcpServer~/src/unity_connection.py:
--------------------------------------------------------------------------------

```python
from config import config
import contextlib
from dataclasses import dataclass
import errno
import json
import logging
from pathlib import Path
from port_discovery import PortDiscovery
import random
import socket
import struct
import threading
import time
from typing import Any, Dict


# Configure logging using settings from config
logging.basicConfig(
    level=getattr(logging, config.log_level),
    format=config.log_format
)
logger = logging.getLogger("mcp-for-unity-server")

# Module-level lock to guard global connection initialization
_connection_lock = threading.Lock()

# Maximum allowed framed payload size (64 MiB)
FRAMED_MAX = 64 * 1024 * 1024


@dataclass
class UnityConnection:
    """Manages the socket connection to the Unity Editor."""
    host: str = config.unity_host
    port: int = None  # Will be set dynamically
    sock: socket.socket = None  # Socket for Unity communication
    use_framing: bool = False  # Negotiated per-connection

    def __post_init__(self):
        """Set port from discovery if not explicitly provided"""
        if self.port is None:
            self.port = PortDiscovery.discover_unity_port()
        self._io_lock = threading.Lock()
        self._conn_lock = threading.Lock()

    def connect(self) -> bool:
        """Establish a connection to the Unity Editor."""
        if self.sock:
            return True
        with self._conn_lock:
            if self.sock:
                return True
            try:
                # Bounded connect to avoid indefinite blocking
                connect_timeout = float(
                    getattr(config, "connect_timeout", getattr(config, "connection_timeout", 1.0)))
                self.sock = socket.create_connection(
                    (self.host, self.port), connect_timeout)
                # Disable Nagle's algorithm to reduce small RPC latency
                with contextlib.suppress(Exception):
                    self.sock.setsockopt(
                        socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
                logger.debug(f"Connected to Unity at {self.host}:{self.port}")

                # Strict handshake: require FRAMING=1
                try:
                    require_framing = getattr(config, "require_framing", True)
                    timeout = float(getattr(config, "handshake_timeout", 1.0))
                    self.sock.settimeout(timeout)
                    buf = bytearray()
                    deadline = time.monotonic() + timeout
                    while time.monotonic() < deadline and len(buf) < 512:
                        try:
                            chunk = self.sock.recv(256)
                            if not chunk:
                                break
                            buf.extend(chunk)
                            if b"\n" in buf:
                                break
                        except socket.timeout:
                            break
                    text = bytes(buf).decode('ascii', errors='ignore').strip()

                    if 'FRAMING=1' in text:
                        self.use_framing = True
                        logger.debug(
                            'Unity MCP handshake received: FRAMING=1 (strict)')
                    else:
                        if require_framing:
                            # Best-effort plain-text advisory for legacy peers
                            with contextlib.suppress(Exception):
                                self.sock.sendall(
                                    b'Unity MCP requires FRAMING=1\n')
                            raise ConnectionError(
                                f'Unity MCP requires FRAMING=1, got: {text!r}')
                        else:
                            self.use_framing = False
                            logger.warning(
                                'Unity MCP handshake missing FRAMING=1; proceeding in legacy mode by configuration')
                finally:
                    self.sock.settimeout(config.connection_timeout)
                return True
            except Exception as e:
                logger.error(f"Failed to connect to Unity: {str(e)}")
                try:
                    if self.sock:
                        self.sock.close()
                except Exception:
                    pass
                self.sock = None
                return False

    def disconnect(self):
        """Close the connection to the Unity Editor."""
        if self.sock:
            try:
                self.sock.close()
            except Exception as e:
                logger.error(f"Error disconnecting from Unity: {str(e)}")
            finally:
                self.sock = None

    def _read_exact(self, sock: socket.socket, count: int) -> bytes:
        data = bytearray()
        while len(data) < count:
            chunk = sock.recv(count - len(data))
            if not chunk:
                raise ConnectionError(
                    "Connection closed before reading expected bytes")
            data.extend(chunk)
        return bytes(data)

    def receive_full_response(self, sock, buffer_size=config.buffer_size) -> bytes:
        """Receive a complete response from Unity, handling chunked data."""
        if self.use_framing:
            try:
                # Consume heartbeats, but do not hang indefinitely if only zero-length frames arrive
                heartbeat_count = 0
                deadline = time.monotonic() + getattr(config, 'framed_receive_timeout', 2.0)
                while True:
                    header = self._read_exact(sock, 8)
                    payload_len = struct.unpack('>Q', header)[0]
                    if payload_len == 0:
                        # Heartbeat/no-op frame: consume and continue waiting for a data frame
                        logger.debug("Received heartbeat frame (length=0)")
                        heartbeat_count += 1
                        if heartbeat_count >= getattr(config, 'max_heartbeat_frames', 16) or time.monotonic() > deadline:
                            # Treat as empty successful response to match C# server behavior
                            logger.debug(
                                "Heartbeat threshold reached; returning empty response")
                            return b""
                        continue
                    if payload_len > FRAMED_MAX:
                        raise ValueError(
                            f"Invalid framed length: {payload_len}")
                    payload = self._read_exact(sock, payload_len)
                    logger.debug(
                        f"Received framed response ({len(payload)} bytes)")
                    return payload
            except socket.timeout as e:
                logger.warning("Socket timeout during framed receive")
                raise TimeoutError("Timeout receiving Unity response") from e
            except Exception as e:
                logger.error(f"Error during framed receive: {str(e)}")
                raise

        chunks = []
        # Respect the socket's currently configured timeout
        try:
            while True:
                chunk = sock.recv(buffer_size)
                if not chunk:
                    if not chunks:
                        raise Exception(
                            "Connection closed before receiving data")
                    break
                chunks.append(chunk)

                # Process the data received so far
                data = b''.join(chunks)
                decoded_data = data.decode('utf-8')

                # Check if we've received a complete response
                try:
                    # Special case for ping-pong
                    if decoded_data.strip().startswith('{"status":"success","result":{"message":"pong"'):
                        logger.debug("Received ping response")
                        return data

                    # Handle escaped quotes in the content
                    if '"content":' in decoded_data:
                        # Find the content field and its value
                        content_start = decoded_data.find('"content":') + 9
                        content_end = decoded_data.rfind('"', content_start)
                        if content_end > content_start:
                            # Replace escaped quotes in content with regular quotes
                            content = decoded_data[content_start:content_end]
                            content = content.replace('\\"', '"')
                            decoded_data = decoded_data[:content_start] + \
                                content + decoded_data[content_end:]

                    # Validate JSON format
                    json.loads(decoded_data)

                    # If we get here, we have valid JSON
                    logger.info(
                        f"Received complete response ({len(data)} bytes)")
                    return data
                except json.JSONDecodeError:
                    # We haven't received a complete valid JSON response yet
                    continue
                except Exception as e:
                    logger.warning(
                        f"Error processing response chunk: {str(e)}")
                    # Continue reading more chunks as this might not be the complete response
                    continue
        except socket.timeout:
            logger.warning("Socket timeout during receive")
            raise Exception("Timeout receiving Unity response")
        except Exception as e:
            logger.error(f"Error during receive: {str(e)}")
            raise

    def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
        """Send a command with retry/backoff and port rediscovery. Pings only when requested."""
        # Defensive guard: catch empty/placeholder invocations early
        if not command_type:
            raise ValueError("MCP call missing command_type")
        if params is None:
            # Return a fast, structured error that clients can display without hanging
            return {"success": False, "error": "MCP call received with no parameters (client placeholder?)"}
        attempts = max(config.max_retries, 5)
        base_backoff = max(0.5, config.retry_delay)

        def read_status_file() -> dict | None:
            try:
                status_files = sorted(Path.home().joinpath(
                    '.unity-mcp').glob('unity-mcp-status-*.json'), key=lambda p: p.stat().st_mtime, reverse=True)
                if not status_files:
                    return None
                latest = status_files[0]
                with latest.open('r') as f:
                    return json.load(f)
            except Exception:
                return None

        last_short_timeout = None

        # Preflight: if Unity reports reloading, return a structured hint so clients can retry politely
        try:
            status = read_status_file()
            if status and (status.get('reloading') or status.get('reason') == 'reloading'):
                return {
                    "success": False,
                    "state": "reloading",
                    "retry_after_ms": int(config.reload_retry_ms),
                    "error": "Unity domain reload in progress",
                    "message": "Unity is reloading scripts; please retry shortly"
                }
        except Exception:
            pass

        for attempt in range(attempts + 1):
            try:
                # Ensure connected (handshake occurs within connect())
                if not self.sock and not self.connect():
                    raise Exception("Could not connect to Unity")

                # Build payload
                if command_type == 'ping':
                    payload = b'ping'
                else:
                    command = {"type": command_type, "params": params or {}}
                    payload = json.dumps(
                        command, ensure_ascii=False).encode('utf-8')

                # Send/receive are serialized to protect the shared socket
                with self._io_lock:
                    mode = 'framed' if self.use_framing else 'legacy'
                    with contextlib.suppress(Exception):
                        logger.debug(
                            "send %d bytes; mode=%s; head=%s",
                            len(payload),
                            mode,
                            (payload[:32]).decode('utf-8', 'ignore'),
                        )
                    if self.use_framing:
                        header = struct.pack('>Q', len(payload))
                        self.sock.sendall(header)
                        self.sock.sendall(payload)
                    else:
                        self.sock.sendall(payload)

                    # During retry bursts use a short receive timeout and ensure restoration
                    restore_timeout = None
                    if attempt > 0 and last_short_timeout is None:
                        restore_timeout = self.sock.gettimeout()
                        self.sock.settimeout(1.0)
                    try:
                        response_data = self.receive_full_response(self.sock)
                        with contextlib.suppress(Exception):
                            logger.debug("recv %d bytes; mode=%s",
                                         len(response_data), mode)
                    finally:
                        if restore_timeout is not None:
                            self.sock.settimeout(restore_timeout)
                            last_short_timeout = None

                # Parse
                if command_type == 'ping':
                    resp = json.loads(response_data.decode('utf-8'))
                    if resp.get('status') == 'success' and resp.get('result', {}).get('message') == 'pong':
                        return {"message": "pong"}
                    raise Exception("Ping unsuccessful")

                resp = json.loads(response_data.decode('utf-8'))
                if resp.get('status') == 'error':
                    err = resp.get('error') or resp.get(
                        'message', 'Unknown Unity error')
                    raise Exception(err)
                return resp.get('result', {})
            except Exception as e:
                logger.warning(
                    f"Unity communication attempt {attempt+1} failed: {e}")
                try:
                    if self.sock:
                        self.sock.close()
                finally:
                    self.sock = None

                # Re-discover port each time
                try:
                    new_port = PortDiscovery.discover_unity_port()
                    if new_port != self.port:
                        logger.info(
                            f"Unity port changed {self.port} -> {new_port}")
                    self.port = new_port
                except Exception as de:
                    logger.debug(f"Port discovery failed: {de}")

                if attempt < attempts:
                    # Heartbeat-aware, jittered backoff
                    status = read_status_file()
                    # Base exponential backoff
                    backoff = base_backoff * (2 ** attempt)
                    # Decorrelated jitter multiplier
                    jitter = random.uniform(0.1, 0.3)

                    # Fast‑retry for transient socket failures
                    fast_error = isinstance(
                        e, (ConnectionRefusedError, ConnectionResetError, TimeoutError))
                    if not fast_error:
                        try:
                            err_no = getattr(e, 'errno', None)
                            fast_error = err_no in (
                                errno.ECONNREFUSED, errno.ECONNRESET, errno.ETIMEDOUT)
                        except Exception:
                            pass

                    # Cap backoff depending on state
                    if status and status.get('reloading'):
                        cap = 0.8
                    elif fast_error:
                        cap = 0.25
                    else:
                        cap = 3.0

                    sleep_s = min(cap, jitter * (2 ** attempt))
                    time.sleep(sleep_s)
                    continue
                raise


# Global Unity connection
_unity_connection = None


def get_unity_connection() -> UnityConnection:
    """Retrieve or establish a persistent Unity connection.

    Note: Do NOT ping on every retrieval to avoid connection storms. Rely on
    send_command() exceptions to detect broken sockets and reconnect there.
    """
    global _unity_connection
    if _unity_connection is not None:
        return _unity_connection

    # Double-checked locking to avoid concurrent socket creation
    with _connection_lock:
        if _unity_connection is not None:
            return _unity_connection
        logger.info("Creating new Unity connection")
        _unity_connection = UnityConnection()
        if not _unity_connection.connect():
            _unity_connection = None
            raise ConnectionError(
                "Could not connect to Unity. Ensure the Unity Editor and MCP Bridge are running.")
        logger.info("Connected to Unity on startup")
        return _unity_connection


# -----------------------------
# Centralized retry helpers
# -----------------------------

def _is_reloading_response(resp: dict) -> bool:
    """Return True if the Unity response indicates the editor is reloading."""
    if not isinstance(resp, dict):
        return False
    if resp.get("state") == "reloading":
        return True
    message_text = (resp.get("message") or resp.get("error") or "").lower()
    return "reload" in message_text


def send_command_with_retry(command_type: str, params: Dict[str, Any], *, max_retries: int | None = None, retry_ms: int | None = None) -> Dict[str, Any]:
    """Send a command via the shared connection, waiting politely through Unity reloads.

    Uses config.reload_retry_ms and config.reload_max_retries by default. Preserves the
    structured failure if retries are exhausted.
    """
    conn = get_unity_connection()
    if max_retries is None:
        max_retries = getattr(config, "reload_max_retries", 40)
    if retry_ms is None:
        retry_ms = getattr(config, "reload_retry_ms", 250)

    response = conn.send_command(command_type, params)
    retries = 0
    while _is_reloading_response(response) and retries < max_retries:
        delay_ms = int(response.get("retry_after_ms", retry_ms)
                       ) if isinstance(response, dict) else retry_ms
        time.sleep(max(0.0, delay_ms / 1000.0))
        retries += 1
        response = conn.send_command(command_type, params)
    return response


async def async_send_command_with_retry(command_type: str, params: Dict[str, Any], *, loop=None, max_retries: int | None = None, retry_ms: int | None = None) -> Dict[str, Any]:
    """Async wrapper that runs the blocking retry helper in a thread pool."""
    try:
        import asyncio  # local import to avoid mandatory asyncio dependency for sync callers
        if loop is None:
            loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            None,
            lambda: send_command_with_retry(
                command_type, params, max_retries=max_retries, retry_ms=retry_ms),
        )
    except Exception as e:
        # Return a structured error dict for consistency with other responses
        return {"success": False, "error": f"Python async retry helper failed: {str(e)}"}

```
Page 5/13FirstPrevNextLast