This is page 5 of 13. Use http://codebase.md/justinpbarnett/unity-mcp?page={x} to view the full context. # Directory Structure ``` ├── .claude │ ├── prompts │ │ ├── nl-unity-suite-nl.md │ │ └── nl-unity-suite-t.md │ └── settings.json ├── .github │ ├── scripts │ │ └── mark_skipped.py │ └── workflows │ ├── bump-version.yml │ ├── claude-nl-suite.yml │ ├── github-repo-stats.yml │ └── unity-tests.yml ├── .gitignore ├── deploy-dev.bat ├── docs │ ├── CURSOR_HELP.md │ ├── CUSTOM_TOOLS.md │ ├── README-DEV-zh.md │ ├── README-DEV.md │ ├── screenshots │ │ ├── v5_01_uninstall.png │ │ ├── v5_02_install.png │ │ ├── v5_03_open_mcp_window.png │ │ ├── v5_04_rebuild_mcp_server.png │ │ ├── v5_05_rebuild_success.png │ │ ├── v6_2_create_python_tools_asset.png │ │ ├── v6_2_python_tools_asset.png │ │ ├── v6_new_ui_asset_store_version.png │ │ ├── v6_new_ui_dark.png │ │ └── v6_new_ui_light.png │ ├── TELEMETRY.md │ ├── v5_MIGRATION.md │ └── v6_NEW_UI_CHANGES.md ├── LICENSE ├── logo.png ├── mcp_source.py ├── MCPForUnity │ ├── Editor │ │ ├── AssemblyInfo.cs │ │ ├── AssemblyInfo.cs.meta │ │ ├── Data │ │ │ ├── DefaultServerConfig.cs │ │ │ ├── DefaultServerConfig.cs.meta │ │ │ ├── McpClients.cs │ │ │ ├── McpClients.cs.meta │ │ │ ├── PythonToolsAsset.cs │ │ │ └── PythonToolsAsset.cs.meta │ │ ├── Data.meta │ │ ├── Dependencies │ │ │ ├── DependencyManager.cs │ │ │ ├── DependencyManager.cs.meta │ │ │ ├── Models │ │ │ │ ├── DependencyCheckResult.cs │ │ │ │ ├── DependencyCheckResult.cs.meta │ │ │ │ ├── DependencyStatus.cs │ │ │ │ └── DependencyStatus.cs.meta │ │ │ ├── Models.meta │ │ │ ├── PlatformDetectors │ │ │ │ ├── IPlatformDetector.cs │ │ │ │ ├── IPlatformDetector.cs.meta │ │ │ │ ├── LinuxPlatformDetector.cs │ │ │ │ ├── LinuxPlatformDetector.cs.meta │ │ │ │ ├── MacOSPlatformDetector.cs │ │ │ │ ├── MacOSPlatformDetector.cs.meta │ │ │ │ ├── PlatformDetectorBase.cs │ │ │ │ ├── PlatformDetectorBase.cs.meta │ │ │ │ ├── WindowsPlatformDetector.cs │ │ │ │ └── WindowsPlatformDetector.cs.meta │ │ │ └── PlatformDetectors.meta │ │ ├── Dependencies.meta │ │ ├── External │ │ │ ├── Tommy.cs │ │ │ └── Tommy.cs.meta │ │ ├── External.meta │ │ ├── Helpers │ │ │ ├── AssetPathUtility.cs │ │ │ ├── AssetPathUtility.cs.meta │ │ │ ├── CodexConfigHelper.cs │ │ │ ├── CodexConfigHelper.cs.meta │ │ │ ├── ConfigJsonBuilder.cs │ │ │ ├── ConfigJsonBuilder.cs.meta │ │ │ ├── ExecPath.cs │ │ │ ├── ExecPath.cs.meta │ │ │ ├── GameObjectSerializer.cs │ │ │ ├── GameObjectSerializer.cs.meta │ │ │ ├── McpConfigFileHelper.cs │ │ │ ├── McpConfigFileHelper.cs.meta │ │ │ ├── McpConfigurationHelper.cs │ │ │ ├── McpConfigurationHelper.cs.meta │ │ │ ├── McpLog.cs │ │ │ ├── McpLog.cs.meta │ │ │ ├── McpPathResolver.cs │ │ │ ├── McpPathResolver.cs.meta │ │ │ ├── PackageDetector.cs │ │ │ ├── PackageDetector.cs.meta │ │ │ ├── PackageInstaller.cs │ │ │ ├── PackageInstaller.cs.meta │ │ │ ├── PortManager.cs │ │ │ ├── PortManager.cs.meta │ │ │ ├── PythonToolSyncProcessor.cs │ │ │ ├── PythonToolSyncProcessor.cs.meta │ │ │ ├── Response.cs │ │ │ ├── Response.cs.meta │ │ │ ├── ServerInstaller.cs │ │ │ ├── ServerInstaller.cs.meta │ │ │ ├── ServerPathResolver.cs │ │ │ ├── ServerPathResolver.cs.meta │ │ │ ├── TelemetryHelper.cs │ │ │ ├── TelemetryHelper.cs.meta │ │ │ ├── Vector3Helper.cs │ │ │ └── Vector3Helper.cs.meta │ │ ├── Helpers.meta │ │ ├── Importers │ │ │ ├── PythonFileImporter.cs │ │ │ └── PythonFileImporter.cs.meta │ │ ├── Importers.meta │ │ ├── MCPForUnity.Editor.asmdef │ │ ├── MCPForUnity.Editor.asmdef.meta │ │ ├── MCPForUnityBridge.cs │ │ ├── MCPForUnityBridge.cs.meta │ │ ├── Models │ │ │ ├── Command.cs │ │ │ ├── Command.cs.meta │ │ │ ├── McpClient.cs │ │ │ ├── McpClient.cs.meta │ │ │ ├── McpConfig.cs │ │ │ ├── McpConfig.cs.meta │ │ │ ├── MCPConfigServer.cs │ │ │ ├── MCPConfigServer.cs.meta │ │ │ ├── MCPConfigServers.cs │ │ │ ├── MCPConfigServers.cs.meta │ │ │ ├── McpStatus.cs │ │ │ ├── McpStatus.cs.meta │ │ │ ├── McpTypes.cs │ │ │ ├── McpTypes.cs.meta │ │ │ ├── ServerConfig.cs │ │ │ └── ServerConfig.cs.meta │ │ ├── Models.meta │ │ ├── Resources │ │ │ ├── McpForUnityResourceAttribute.cs │ │ │ ├── McpForUnityResourceAttribute.cs.meta │ │ │ ├── MenuItems │ │ │ │ ├── GetMenuItems.cs │ │ │ │ └── GetMenuItems.cs.meta │ │ │ ├── MenuItems.meta │ │ │ ├── Tests │ │ │ │ ├── GetTests.cs │ │ │ │ └── GetTests.cs.meta │ │ │ └── Tests.meta │ │ ├── Resources.meta │ │ ├── Services │ │ │ ├── BridgeControlService.cs │ │ │ ├── BridgeControlService.cs.meta │ │ │ ├── ClientConfigurationService.cs │ │ │ ├── ClientConfigurationService.cs.meta │ │ │ ├── IBridgeControlService.cs │ │ │ ├── IBridgeControlService.cs.meta │ │ │ ├── IClientConfigurationService.cs │ │ │ ├── IClientConfigurationService.cs.meta │ │ │ ├── IPackageUpdateService.cs │ │ │ ├── IPackageUpdateService.cs.meta │ │ │ ├── IPathResolverService.cs │ │ │ ├── IPathResolverService.cs.meta │ │ │ ├── IPythonToolRegistryService.cs │ │ │ ├── IPythonToolRegistryService.cs.meta │ │ │ ├── ITestRunnerService.cs │ │ │ ├── ITestRunnerService.cs.meta │ │ │ ├── IToolSyncService.cs │ │ │ ├── IToolSyncService.cs.meta │ │ │ ├── MCPServiceLocator.cs │ │ │ ├── MCPServiceLocator.cs.meta │ │ │ ├── PackageUpdateService.cs │ │ │ ├── PackageUpdateService.cs.meta │ │ │ ├── PathResolverService.cs │ │ │ ├── PathResolverService.cs.meta │ │ │ ├── PythonToolRegistryService.cs │ │ │ ├── PythonToolRegistryService.cs.meta │ │ │ ├── TestRunnerService.cs │ │ │ ├── TestRunnerService.cs.meta │ │ │ ├── ToolSyncService.cs │ │ │ └── ToolSyncService.cs.meta │ │ ├── Services.meta │ │ ├── Setup │ │ │ ├── SetupWizard.cs │ │ │ ├── SetupWizard.cs.meta │ │ │ ├── SetupWizardWindow.cs │ │ │ └── SetupWizardWindow.cs.meta │ │ ├── Setup.meta │ │ ├── Tools │ │ │ ├── CommandRegistry.cs │ │ │ ├── CommandRegistry.cs.meta │ │ │ ├── ExecuteMenuItem.cs │ │ │ ├── ExecuteMenuItem.cs.meta │ │ │ ├── ManageAsset.cs │ │ │ ├── ManageAsset.cs.meta │ │ │ ├── ManageEditor.cs │ │ │ ├── ManageEditor.cs.meta │ │ │ ├── ManageGameObject.cs │ │ │ ├── ManageGameObject.cs.meta │ │ │ ├── ManageScene.cs │ │ │ ├── ManageScene.cs.meta │ │ │ ├── ManageScript.cs │ │ │ ├── ManageScript.cs.meta │ │ │ ├── ManageShader.cs │ │ │ ├── ManageShader.cs.meta │ │ │ ├── McpForUnityToolAttribute.cs │ │ │ ├── McpForUnityToolAttribute.cs.meta │ │ │ ├── Prefabs │ │ │ │ ├── ManagePrefabs.cs │ │ │ │ └── ManagePrefabs.cs.meta │ │ │ ├── Prefabs.meta │ │ │ ├── ReadConsole.cs │ │ │ ├── ReadConsole.cs.meta │ │ │ ├── RunTests.cs │ │ │ └── RunTests.cs.meta │ │ ├── Tools.meta │ │ ├── Windows │ │ │ ├── ManualConfigEditorWindow.cs │ │ │ ├── ManualConfigEditorWindow.cs.meta │ │ │ ├── MCPForUnityEditorWindow.cs │ │ │ ├── MCPForUnityEditorWindow.cs.meta │ │ │ ├── MCPForUnityEditorWindowNew.cs │ │ │ ├── MCPForUnityEditorWindowNew.cs.meta │ │ │ ├── MCPForUnityEditorWindowNew.uss │ │ │ ├── MCPForUnityEditorWindowNew.uss.meta │ │ │ ├── MCPForUnityEditorWindowNew.uxml │ │ │ ├── MCPForUnityEditorWindowNew.uxml.meta │ │ │ ├── VSCodeManualSetupWindow.cs │ │ │ └── VSCodeManualSetupWindow.cs.meta │ │ └── Windows.meta │ ├── Editor.meta │ ├── package.json │ ├── package.json.meta │ ├── README.md │ ├── README.md.meta │ ├── Runtime │ │ ├── MCPForUnity.Runtime.asmdef │ │ ├── MCPForUnity.Runtime.asmdef.meta │ │ ├── Serialization │ │ │ ├── UnityTypeConverters.cs │ │ │ └── UnityTypeConverters.cs.meta │ │ └── Serialization.meta │ ├── Runtime.meta │ └── UnityMcpServer~ │ └── src │ ├── __init__.py │ ├── config.py │ ├── Dockerfile │ ├── models.py │ ├── module_discovery.py │ ├── port_discovery.py │ ├── pyproject.toml │ ├── pyrightconfig.json │ ├── registry │ │ ├── __init__.py │ │ ├── resource_registry.py │ │ └── tool_registry.py │ ├── reload_sentinel.py │ ├── resources │ │ ├── __init__.py │ │ ├── menu_items.py │ │ └── tests.py │ ├── server_version.txt │ ├── server.py │ ├── telemetry_decorator.py │ ├── telemetry.py │ ├── test_telemetry.py │ ├── tools │ │ ├── __init__.py │ │ ├── execute_menu_item.py │ │ ├── manage_asset.py │ │ ├── manage_editor.py │ │ ├── manage_gameobject.py │ │ ├── manage_prefabs.py │ │ ├── manage_scene.py │ │ ├── manage_script.py │ │ ├── manage_shader.py │ │ ├── read_console.py │ │ ├── resource_tools.py │ │ ├── run_tests.py │ │ └── script_apply_edits.py │ ├── unity_connection.py │ └── uv.lock ├── prune_tool_results.py ├── README-zh.md ├── README.md ├── restore-dev.bat ├── scripts │ └── validate-nlt-coverage.sh ├── test_unity_socket_framing.py ├── TestProjects │ └── UnityMCPTests │ ├── .gitignore │ ├── Assets │ │ ├── Editor.meta │ │ ├── Scenes │ │ │ ├── SampleScene.unity │ │ │ └── SampleScene.unity.meta │ │ ├── Scenes.meta │ │ ├── Scripts │ │ │ ├── Hello.cs │ │ │ ├── Hello.cs.meta │ │ │ ├── LongUnityScriptClaudeTest.cs │ │ │ ├── LongUnityScriptClaudeTest.cs.meta │ │ │ ├── TestAsmdef │ │ │ │ ├── CustomComponent.cs │ │ │ │ ├── CustomComponent.cs.meta │ │ │ │ ├── TestAsmdef.asmdef │ │ │ │ └── TestAsmdef.asmdef.meta │ │ │ └── TestAsmdef.meta │ │ ├── Scripts.meta │ │ ├── Tests │ │ │ ├── EditMode │ │ │ │ ├── Data │ │ │ │ │ ├── PythonToolsAssetTests.cs │ │ │ │ │ └── PythonToolsAssetTests.cs.meta │ │ │ │ ├── Data.meta │ │ │ │ ├── Helpers │ │ │ │ │ ├── CodexConfigHelperTests.cs │ │ │ │ │ ├── CodexConfigHelperTests.cs.meta │ │ │ │ │ ├── WriteToConfigTests.cs │ │ │ │ │ └── WriteToConfigTests.cs.meta │ │ │ │ ├── Helpers.meta │ │ │ │ ├── MCPForUnityTests.Editor.asmdef │ │ │ │ ├── MCPForUnityTests.Editor.asmdef.meta │ │ │ │ ├── Resources │ │ │ │ │ ├── GetMenuItemsTests.cs │ │ │ │ │ └── GetMenuItemsTests.cs.meta │ │ │ │ ├── Resources.meta │ │ │ │ ├── Services │ │ │ │ │ ├── PackageUpdateServiceTests.cs │ │ │ │ │ ├── PackageUpdateServiceTests.cs.meta │ │ │ │ │ ├── PythonToolRegistryServiceTests.cs │ │ │ │ │ ├── PythonToolRegistryServiceTests.cs.meta │ │ │ │ │ ├── ToolSyncServiceTests.cs │ │ │ │ │ └── ToolSyncServiceTests.cs.meta │ │ │ │ ├── Services.meta │ │ │ │ ├── Tools │ │ │ │ │ ├── AIPropertyMatchingTests.cs │ │ │ │ │ ├── AIPropertyMatchingTests.cs.meta │ │ │ │ │ ├── CommandRegistryTests.cs │ │ │ │ │ ├── CommandRegistryTests.cs.meta │ │ │ │ │ ├── ComponentResolverTests.cs │ │ │ │ │ ├── ComponentResolverTests.cs.meta │ │ │ │ │ ├── ExecuteMenuItemTests.cs │ │ │ │ │ ├── ExecuteMenuItemTests.cs.meta │ │ │ │ │ ├── ManageGameObjectTests.cs │ │ │ │ │ ├── ManageGameObjectTests.cs.meta │ │ │ │ │ ├── ManagePrefabsTests.cs │ │ │ │ │ ├── ManagePrefabsTests.cs.meta │ │ │ │ │ ├── ManageScriptValidationTests.cs │ │ │ │ │ └── ManageScriptValidationTests.cs.meta │ │ │ │ ├── Tools.meta │ │ │ │ ├── Windows │ │ │ │ │ ├── ManualConfigJsonBuilderTests.cs │ │ │ │ │ └── ManualConfigJsonBuilderTests.cs.meta │ │ │ │ └── Windows.meta │ │ │ └── EditMode.meta │ │ └── Tests.meta │ ├── Packages │ │ └── manifest.json │ └── ProjectSettings │ ├── Packages │ │ └── com.unity.testtools.codecoverage │ │ └── Settings.json │ └── ProjectVersion.txt ├── tests │ ├── conftest.py │ ├── test_edit_normalization_and_noop.py │ ├── test_edit_strict_and_warnings.py │ ├── test_find_in_file_minimal.py │ ├── test_get_sha.py │ ├── test_improved_anchor_matching.py │ ├── test_logging_stdout.py │ ├── test_manage_script_uri.py │ ├── test_read_console_truncate.py │ ├── test_read_resource_minimal.py │ ├── test_resources_api.py │ ├── test_script_editing.py │ ├── test_script_tools.py │ ├── test_telemetry_endpoint_validation.py │ ├── test_telemetry_queue_worker.py │ ├── test_telemetry_subaction.py │ ├── test_transport_framing.py │ └── test_validate_script_summary.py ├── tools │ └── stress_mcp.py └── UnityMcpBridge ├── Editor │ ├── AssemblyInfo.cs │ ├── AssemblyInfo.cs.meta │ ├── Data │ │ ├── DefaultServerConfig.cs │ │ ├── DefaultServerConfig.cs.meta │ │ ├── McpClients.cs │ │ └── McpClients.cs.meta │ ├── Data.meta │ ├── Dependencies │ │ ├── DependencyManager.cs │ │ ├── DependencyManager.cs.meta │ │ ├── Models │ │ │ ├── DependencyCheckResult.cs │ │ │ ├── DependencyCheckResult.cs.meta │ │ │ ├── DependencyStatus.cs │ │ │ └── DependencyStatus.cs.meta │ │ ├── Models.meta │ │ ├── PlatformDetectors │ │ │ ├── IPlatformDetector.cs │ │ │ ├── IPlatformDetector.cs.meta │ │ │ ├── LinuxPlatformDetector.cs │ │ │ ├── LinuxPlatformDetector.cs.meta │ │ │ ├── MacOSPlatformDetector.cs │ │ │ ├── MacOSPlatformDetector.cs.meta │ │ │ ├── PlatformDetectorBase.cs │ │ │ ├── PlatformDetectorBase.cs.meta │ │ │ ├── WindowsPlatformDetector.cs │ │ │ └── WindowsPlatformDetector.cs.meta │ │ └── PlatformDetectors.meta │ ├── Dependencies.meta │ ├── External │ │ ├── Tommy.cs │ │ └── Tommy.cs.meta │ ├── External.meta │ ├── Helpers │ │ ├── AssetPathUtility.cs │ │ ├── AssetPathUtility.cs.meta │ │ ├── CodexConfigHelper.cs │ │ ├── CodexConfigHelper.cs.meta │ │ ├── ConfigJsonBuilder.cs │ │ ├── ConfigJsonBuilder.cs.meta │ │ ├── ExecPath.cs │ │ ├── ExecPath.cs.meta │ │ ├── GameObjectSerializer.cs │ │ ├── GameObjectSerializer.cs.meta │ │ ├── McpConfigFileHelper.cs │ │ ├── McpConfigFileHelper.cs.meta │ │ ├── McpConfigurationHelper.cs │ │ ├── McpConfigurationHelper.cs.meta │ │ ├── McpLog.cs │ │ ├── McpLog.cs.meta │ │ ├── McpPathResolver.cs │ │ ├── McpPathResolver.cs.meta │ │ ├── PackageDetector.cs │ │ ├── PackageDetector.cs.meta │ │ ├── PackageInstaller.cs │ │ ├── PackageInstaller.cs.meta │ │ ├── PortManager.cs │ │ ├── PortManager.cs.meta │ │ ├── Response.cs │ │ ├── Response.cs.meta │ │ ├── ServerInstaller.cs │ │ ├── ServerInstaller.cs.meta │ │ ├── ServerPathResolver.cs │ │ ├── ServerPathResolver.cs.meta │ │ ├── TelemetryHelper.cs │ │ ├── TelemetryHelper.cs.meta │ │ ├── Vector3Helper.cs │ │ └── Vector3Helper.cs.meta │ ├── Helpers.meta │ ├── MCPForUnity.Editor.asmdef │ ├── MCPForUnity.Editor.asmdef.meta │ ├── MCPForUnityBridge.cs │ ├── MCPForUnityBridge.cs.meta │ ├── Models │ │ ├── Command.cs │ │ ├── Command.cs.meta │ │ ├── McpClient.cs │ │ ├── McpClient.cs.meta │ │ ├── McpConfig.cs │ │ ├── McpConfig.cs.meta │ │ ├── MCPConfigServer.cs │ │ ├── MCPConfigServer.cs.meta │ │ ├── MCPConfigServers.cs │ │ ├── MCPConfigServers.cs.meta │ │ ├── McpStatus.cs │ │ ├── McpStatus.cs.meta │ │ ├── McpTypes.cs │ │ ├── McpTypes.cs.meta │ │ ├── ServerConfig.cs │ │ └── ServerConfig.cs.meta │ ├── Models.meta │ ├── Setup │ │ ├── SetupWizard.cs │ │ ├── SetupWizard.cs.meta │ │ ├── SetupWizardWindow.cs │ │ └── SetupWizardWindow.cs.meta │ ├── Setup.meta │ ├── Tools │ │ ├── CommandRegistry.cs │ │ ├── CommandRegistry.cs.meta │ │ ├── ManageAsset.cs │ │ ├── ManageAsset.cs.meta │ │ ├── ManageEditor.cs │ │ ├── ManageEditor.cs.meta │ │ ├── ManageGameObject.cs │ │ ├── ManageGameObject.cs.meta │ │ ├── ManageScene.cs │ │ ├── ManageScene.cs.meta │ │ ├── ManageScript.cs │ │ ├── ManageScript.cs.meta │ │ ├── ManageShader.cs │ │ ├── ManageShader.cs.meta │ │ ├── McpForUnityToolAttribute.cs │ │ ├── McpForUnityToolAttribute.cs.meta │ │ ├── MenuItems │ │ │ ├── ManageMenuItem.cs │ │ │ ├── ManageMenuItem.cs.meta │ │ │ ├── MenuItemExecutor.cs │ │ │ ├── MenuItemExecutor.cs.meta │ │ │ ├── MenuItemsReader.cs │ │ │ └── MenuItemsReader.cs.meta │ │ ├── MenuItems.meta │ │ ├── Prefabs │ │ │ ├── ManagePrefabs.cs │ │ │ └── ManagePrefabs.cs.meta │ │ ├── Prefabs.meta │ │ ├── ReadConsole.cs │ │ └── ReadConsole.cs.meta │ ├── Tools.meta │ ├── Windows │ │ ├── ManualConfigEditorWindow.cs │ │ ├── ManualConfigEditorWindow.cs.meta │ │ ├── MCPForUnityEditorWindow.cs │ │ ├── MCPForUnityEditorWindow.cs.meta │ │ ├── VSCodeManualSetupWindow.cs │ │ └── VSCodeManualSetupWindow.cs.meta │ └── Windows.meta ├── Editor.meta ├── package.json ├── package.json.meta ├── README.md ├── README.md.meta ├── Runtime │ ├── MCPForUnity.Runtime.asmdef │ ├── MCPForUnity.Runtime.asmdef.meta │ ├── Serialization │ │ ├── UnityTypeConverters.cs │ │ └── UnityTypeConverters.cs.meta │ └── Serialization.meta ├── Runtime.meta └── UnityMcpServer~ └── src ├── __init__.py ├── config.py ├── Dockerfile ├── port_discovery.py ├── pyproject.toml ├── pyrightconfig.json ├── registry │ ├── __init__.py │ └── tool_registry.py ├── reload_sentinel.py ├── server_version.txt ├── server.py ├── telemetry_decorator.py ├── telemetry.py ├── test_telemetry.py ├── tools │ ├── __init__.py │ ├── manage_asset.py │ ├── manage_editor.py │ ├── manage_gameobject.py │ ├── manage_menu_item.py │ ├── manage_prefabs.py │ ├── manage_scene.py │ ├── manage_script.py │ ├── manage_shader.py │ ├── read_console.py │ ├── resource_tools.py │ └── script_apply_edits.py ├── unity_connection.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /docs/CUSTOM_TOOLS.md: -------------------------------------------------------------------------------- ```markdown # Adding Custom Tools to MCP for Unity MCP for Unity supports auto-discovery of custom tools using decorators (Python) and attributes (C#). This allows you to easily extend the MCP server with your own tools. Be sure to review the developer README first: | [English](README-DEV.md) | [简体中文](README-DEV-zh.md) | |---------------------------|------------------------------| --- # Part 1: How to Use (Quick Start Guide) This section shows you how to add custom tools to your Unity project. ## Step 1: Create a PythonToolsAsset First, create a ScriptableObject to manage your Python tools: 1. In Unity, right-click in the Project window 2. Select **Assets > Create > MCP For Unity > Python Tools** 3. Name it (e.g., `MyPythonTools`)  ## Step 2: Create Your Python Tool File Create a Python file **anywhere in your Unity project**. For example, `Assets/Editor/MyTools/my_custom_tool.py`: ```python from typing import Annotated, Any from mcp.server.fastmcp import Context from registry import mcp_for_unity_tool from unity_connection import send_command_with_retry @mcp_for_unity_tool( description="My custom tool that does something amazing" ) async def my_custom_tool( ctx: Context, param1: Annotated[str, "Description of param1"], param2: Annotated[int, "Description of param2"] | None = None ) -> dict[str, Any]: await ctx.info(f"Processing my_custom_tool: {param1}") # Prepare parameters for Unity params = { "action": "do_something", "param1": param1, "param2": param2, } params = {k: v for k, v in params.items() if v is not None} # Send to Unity handler response = send_command_with_retry("my_custom_tool", params) return response if isinstance(response, dict) else {"success": False, "message": str(response)} ``` ## Step 3: Add Python File to Asset 1. Select your `PythonToolsAsset` in the Project window 2. In the Inspector, expand **Python Files** 3. Drag your `.py` file into the list (or click **+** and select it)  **Note:** If you can't see `.py` files in the object picker, go to **Window > MCP For Unity > Tool Sync > Reimport Python Files** to force Unity to recognize them as text assets. ## Step 4: Create C# Handler Create a C# file anywhere in your Unity project (typically in `Editor/`): ```csharp using Newtonsoft.Json.Linq; using MCPForUnity.Editor.Helpers; namespace MyProject.Editor.CustomTools { [McpForUnityTool("my_custom_tool")] public static class MyCustomTool { public static object HandleCommand(JObject @params) { string action = @params["action"]?.ToString(); string param1 = @params["param1"]?.ToString(); int? param2 = @params["param2"]?.ToObject<int?>(); // Your custom logic here if (string.IsNullOrEmpty(param1)) { return Response.Error("param1 is required"); } // Do something amazing DoSomethingAmazing(param1, param2); return Response.Success("Custom tool executed successfully!"); } private static void DoSomethingAmazing(string param1, int? param2) { // Your implementation } } } ``` ## Step 5: Rebuild the MCP Server 1. Open the MCP for Unity window in the Unity Editor 2. Click **Rebuild Server** to apply your changes 3. Your tool is now available to MCP clients! **What happens automatically:** - ✅ Python files are synced to the MCP server on Unity startup - ✅ Python files are synced when modified (you would need to rebuild the server) - ✅ C# handlers are discovered via reflection - ✅ Tools are registered with the MCP server ## Complete Example: Screenshot Tool Here's a complete example showing how to create a screenshot capture tool. ### Python File (`Assets/Editor/ScreenShots/Python/screenshot_tool.py`) ```python from typing import Annotated, Any from mcp.server.fastmcp import Context from registry import mcp_for_unity_tool from unity_connection import send_command_with_retry @mcp_for_unity_tool( description="Capture screenshots in Unity, saving them as PNGs" ) async def capture_screenshot( ctx: Context, filename: Annotated[str, "Screenshot filename without extension, e.g., screenshot_01"], ) -> dict[str, Any]: await ctx.info(f"Capturing screenshot: {filename}") params = { "action": "capture", "filename": filename, } params = {k: v for k, v in params.items() if v is not None} response = send_command_with_retry("capture_screenshot", params) return response if isinstance(response, dict) else {"success": False, "message": str(response)} ``` ### Add to PythonToolsAsset 1. Select your `PythonToolsAsset` 2. Add `screenshot_tool.py` to the **Python Files** list 3. The file will automatically sync to the MCP server ### C# Handler (`Assets/Editor/ScreenShots/CaptureScreenshotTool.cs`) ```csharp using System.IO; using Newtonsoft.Json.Linq; using UnityEngine; using MCPForUnity.Editor.Tools; namespace MyProject.Editor.Tools { [McpForUnityTool("capture_screenshot")] public static class CaptureScreenshotTool { public static object HandleCommand(JObject @params) { string filename = @params["filename"]?.ToString(); if (string.IsNullOrEmpty(filename)) { return MCPForUnity.Editor.Helpers.Response.Error("filename is required"); } try { string absolutePath = Path.Combine(Application.dataPath, "Screenshots", filename); Directory.CreateDirectory(Path.GetDirectoryName(absolutePath)); // Find the main camera Camera camera = Camera.main; if (camera == null) { camera = Object.FindFirstObjectByType<Camera>(); } if (camera == null) { return MCPForUnity.Editor.Helpers.Response.Error("No camera found in the scene"); } // Create a RenderTexture RenderTexture rt = new RenderTexture(Screen.width, Screen.height, 24); camera.targetTexture = rt; // Render the camera's view camera.Render(); // Read pixels from the RenderTexture RenderTexture.active = rt; Texture2D screenshot = new Texture2D(Screen.width, Screen.height, TextureFormat.RGB24, false); screenshot.ReadPixels(new Rect(0, 0, Screen.width, Screen.height), 0, 0); screenshot.Apply(); // Clean up camera.targetTexture = null; RenderTexture.active = null; Object.DestroyImmediate(rt); // Save to file byte[] bytes = screenshot.EncodeToPNG(); File.WriteAllBytes(absolutePath, bytes); Object.DestroyImmediate(screenshot); return MCPForUnity.Editor.Helpers.Response.Success($"Screenshot saved to {absolutePath}", new { path = absolutePath, }); } catch (System.Exception ex) { return MCPForUnity.Editor.Helpers.Response.Error($"Failed to capture screenshot: {ex.Message}"); } } } } ``` ### Rebuild and Test 1. Open the MCP for Unity window 2. Click **Rebuild Server** 3. Test your tool from your MCP client! --- # Part 2: How It Works (Technical Details) This section explains the technical implementation of the custom tools system. ## Python Side: Decorator System ### The `@mcp_for_unity_tool` Decorator The decorator automatically registers your function as an MCP tool: ```python @mcp_for_unity_tool( name="custom_name", # Optional: function name used by default description="Tool description", # Required: describe what the tool does ) ``` **How it works:** - Auto-generates the tool name from the function name (e.g., `my_custom_tool`) - Registers the tool with FastMCP during module import - Supports all FastMCP `mcp.tool` decorator options: <https://gofastmcp.com/servers/tools#tools> **Note:** All tools should have the `description` field. It's not strictly required, however, that parameter is the best place to define a description so that most MCP clients can read it. See [issue #289](https://github.com/CoplayDev/unity-mcp/issues/289). ### Auto-Discovery Python tools are automatically discovered when: - The Python file is added to a `PythonToolsAsset` - The file is synced to `MCPForUnity/UnityMcpServer~/src/tools/custom/` - The file is imported during server startup - The decorator `@mcp_for_unity_tool` is used ### Sync System The `PythonToolsAsset` system automatically syncs your Python files: **When sync happens:** - ✅ Unity starts up - ✅ Python files are modified - ✅ Python files are added/removed from the asset **Manual controls:** - **Sync Now:** Window > MCP For Unity > Tool Sync > Sync Python Tools - **Toggle Auto-Sync:** Window > MCP For Unity > Tool Sync > Auto-Sync Python Tools - **Reimport Python Files:** Window > MCP For Unity > Tool Sync > Reimport Python Files **How it works:** - Uses content hashing to detect changes (only syncs modified files) - Files are copied to `MCPForUnity/UnityMcpServer~/src/tools/custom/` - Stale files are automatically cleaned up ## C# Side: Attribute System ### The `[McpForUnityTool]` Attribute The attribute marks your class as a tool handler: ```csharp // Explicit command name [McpForUnityTool("my_custom_tool")] public static class MyCustomTool { } // Auto-generated from class name (MyCustomTool → my_custom_tool) [McpForUnityTool] public static class MyCustomTool { } ``` ### Auto-Discovery C# handlers are automatically discovered when: - The class has the `[McpForUnityTool]` attribute - The class has a `public static HandleCommand(JObject)` method - Unity loads the assembly containing the class **How it works:** - Unity scans all assemblies on startup - Finds classes with `[McpForUnityTool]` attribute - Registers them in the command registry - Routes MCP commands to the appropriate handler ## Best Practices ### Python - ✅ Use type hints with `Annotated` for parameter documentation - ✅ Return `dict[str, Any]` with `{"success": bool, "message": str, "data": Any}` - ✅ Use `ctx.info()` for logging - ✅ Handle errors gracefully and return structured error responses - ✅ Use `send_command_with_retry()` for Unity communication ### C# - ✅ Use the `Response.Success()` and `Response.Error()` helper methods - ✅ Validate input parameters before processing - ✅ Use `@params["key"]?.ToObject<Type>()` for safe type conversion - ✅ Return structured responses with meaningful data - ✅ Handle exceptions and return error responses ## Debugging ### Python - Check server logs: `~/Library/Application Support/UnityMCP/Logs/unity_mcp_server.log` - Look for: `"Registered X MCP tools"` message on startup - Use `ctx.info()` for debugging messages ### C# - Check Unity Console for: `"MCP-FOR-UNITY: Auto-discovered X tools"` message - Look for warnings about missing `HandleCommand` methods - Use `Debug.Log()` in your handler for debugging ## Troubleshooting **Tool not appearing:** - **Python:** - Ensure the `.py` file is added to a `PythonToolsAsset` - Check Unity Console for sync messages: "Python tools synced: X copied" - Verify file was synced to `UnityMcpServer~/src/tools/custom/` - Try manual sync: Window > MCP For Unity > Tool Sync > Sync Python Tools - Rebuild the server in the MCP for Unity window - **C#:** - Ensure the class has `[McpForUnityTool]` attribute - Ensure the class has a `public static HandleCommand(JObject)` method - Check Unity Console for: "MCP-FOR-UNITY: Auto-discovered X tools" **Python files not showing in Inspector:** - Go to **Window > MCP For Unity > Tool Sync > Reimport Python Files** - This forces Unity to recognize `.py` files as TextAssets - Check that `.py.meta` files show `ScriptedImporter` (not `DefaultImporter`) **Sync not working:** - Check if auto-sync is enabled: Window > MCP For Unity > Tool Sync > Auto-Sync Python Tools - Look for errors in Unity Console - Verify `PythonToolsAsset` has the correct files added **Name conflicts:** - Use explicit names in decorators/attributes to avoid conflicts - Check registered tools: `CommandRegistry.GetAllCommandNames()` in C# **Tool not being called:** - Verify the command name matches between Python and C# - Check that parameters are being passed correctly - Look for errors in logs ``` -------------------------------------------------------------------------------- /UnityMcpBridge/Editor/Helpers/CodexConfigHelper.cs: -------------------------------------------------------------------------------- ```csharp using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Text.RegularExpressions; using MCPForUnity.External.Tommy; using Newtonsoft.Json; namespace MCPForUnity.Editor.Helpers { /// <summary> /// Codex CLI specific configuration helpers. Handles TOML snippet /// generation and lightweight parsing so Codex can join the auto-setup /// flow alongside JSON-based clients. /// </summary> public static class CodexConfigHelper { public static bool IsCodexConfigured(string pythonDir) { try { string basePath = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile); if (string.IsNullOrEmpty(basePath)) return false; string configPath = Path.Combine(basePath, ".codex", "config.toml"); if (!File.Exists(configPath)) return false; string toml = File.ReadAllText(configPath); if (!TryParseCodexServer(toml, out _, out var args)) return false; string dir = McpConfigFileHelper.ExtractDirectoryArg(args); if (string.IsNullOrEmpty(dir)) return false; return McpConfigFileHelper.PathsEqual(dir, pythonDir); } catch { return false; } } public static string BuildCodexServerBlock(string uvPath, string serverSrc) { string argsArray = FormatTomlStringArray(new[] { "run", "--directory", serverSrc, "server.py" }); var sb = new StringBuilder(); sb.AppendLine("[mcp_servers.unityMCP]"); sb.AppendLine($"command = \"{EscapeTomlString(uvPath)}\""); sb.AppendLine($"args = {argsArray}"); sb.AppendLine($"startup_timeout_sec = 30"); // Windows-specific environment block to help Codex locate needed paths try { if (Environment.OSVersion.Platform == PlatformID.Win32NT) { string userProfile = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile) ?? string.Empty; string appData = Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData) ?? string.Empty; // Roaming string localAppData = Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData) ?? string.Empty; string programData = Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData) ?? string.Empty; string programFiles = Environment.GetFolderPath(Environment.SpecialFolder.ProgramFiles) ?? string.Empty; string systemDrive = Environment.GetEnvironmentVariable("SystemDrive") ?? (Path.GetPathRoot(userProfile)?.TrimEnd('\\', '/') ?? "C:"); string systemRoot = Environment.GetEnvironmentVariable("SystemRoot") ?? Path.Combine(systemDrive + "\\", "Windows"); string comspec = Environment.GetEnvironmentVariable("COMSPEC") ?? Path.Combine(Environment.SystemDirectory ?? (systemRoot + "\\System32"), "cmd.exe"); string homeDrive = Environment.GetEnvironmentVariable("HOMEDRIVE"); string homePath = Environment.GetEnvironmentVariable("HOMEPATH"); if (string.IsNullOrEmpty(homeDrive)) { homeDrive = systemDrive; } if (string.IsNullOrEmpty(homePath) && !string.IsNullOrEmpty(userProfile)) { // Derive HOMEPATH from USERPROFILE (e.g., C:\\Users\\name -> \\Users\\name) if (userProfile.StartsWith(homeDrive + "\\", StringComparison.OrdinalIgnoreCase)) { homePath = userProfile.Substring(homeDrive.Length); } else { try { var root = Path.GetPathRoot(userProfile) ?? string.Empty; // e.g., C:\\ homePath = userProfile.Substring(root.Length - 1); // keep leading backslash } catch { homePath = "\\"; } } } string powershell = Path.Combine(Environment.SystemDirectory ?? (systemRoot + "\\System32"), "WindowsPowerShell\\v1.0\\powershell.exe"); string pwsh = Path.Combine(programFiles, "PowerShell\\7\\pwsh.exe"); string tempDir = Path.Combine(localAppData, "Temp"); sb.AppendLine(); sb.AppendLine("[mcp_servers.unityMCP.env]"); sb.AppendLine($"SystemRoot = \"{EscapeTomlString(systemRoot)}\""); sb.AppendLine($"APPDATA = \"{EscapeTomlString(appData)}\""); sb.AppendLine($"COMSPEC = \"{EscapeTomlString(comspec)}\""); sb.AppendLine($"HOMEDRIVE = \"{EscapeTomlString(homeDrive?.TrimEnd('\\') ?? string.Empty)}\""); sb.AppendLine($"HOMEPATH = \"{EscapeTomlString(homePath ?? string.Empty)}\""); sb.AppendLine($"LOCALAPPDATA = \"{EscapeTomlString(localAppData)}\""); sb.AppendLine($"POWERSHELL = \"{EscapeTomlString(powershell)}\""); sb.AppendLine($"PROGRAMDATA = \"{EscapeTomlString(programData)}\""); sb.AppendLine($"PROGRAMFILES = \"{EscapeTomlString(programFiles)}\""); sb.AppendLine($"PWSH = \"{EscapeTomlString(pwsh)}\""); sb.AppendLine($"SYSTEMDRIVE = \"{EscapeTomlString(systemDrive)}\""); sb.AppendLine($"SYSTEMROOT = \"{EscapeTomlString(systemRoot)}\""); sb.AppendLine($"TEMP = \"{EscapeTomlString(tempDir)}\""); sb.AppendLine($"TMP = \"{EscapeTomlString(tempDir)}\""); sb.AppendLine($"USERPROFILE = \"{EscapeTomlString(userProfile)}\""); } } catch { /* best effort */ } return sb.ToString(); } public static string UpsertCodexServerBlock(string existingToml, string newBlock) { if (string.IsNullOrWhiteSpace(existingToml)) { // Default to snake_case section when creating new files return newBlock.TrimEnd() + Environment.NewLine; } StringBuilder sb = new StringBuilder(); using StringReader reader = new StringReader(existingToml); string line; bool inTarget = false; bool replaced = false; // Support both TOML section casings and nested subtables (e.g., env) // Prefer the casing already present in the user's file; fall back to snake_case bool hasCamelSection = existingToml.IndexOf("[mcpServers.unityMCP]", StringComparison.OrdinalIgnoreCase) >= 0 || existingToml.IndexOf("[mcpServers.unityMCP.", StringComparison.OrdinalIgnoreCase) >= 0; bool hasSnakeSection = existingToml.IndexOf("[mcp_servers.unityMCP]", StringComparison.OrdinalIgnoreCase) >= 0 || existingToml.IndexOf("[mcp_servers.unityMCP.", StringComparison.OrdinalIgnoreCase) >= 0; bool preferCamel = hasCamelSection || (!hasSnakeSection && existingToml.IndexOf("[mcpServers]", StringComparison.OrdinalIgnoreCase) >= 0); // Prepare block variants matching the chosen casing, including nested tables string newBlockCamel = newBlock .Replace("[mcp_servers.unityMCP.env]", "[mcpServers.unityMCP.env]") .Replace("[mcp_servers.unityMCP]", "[mcpServers.unityMCP]"); string newBlockEffective = preferCamel ? newBlockCamel : newBlock; static bool IsSection(string s) { string t = s.Trim(); return t.StartsWith("[") && t.EndsWith("]") && !t.StartsWith("[["); } static string SectionName(string header) { string t = header.Trim(); if (t.StartsWith("[") && t.EndsWith("]")) t = t.Substring(1, t.Length - 2); return t; } bool TargetOrChild(string section) { // Compare case-insensitively; accept both snake and camel as the same logical table string name = SectionName(section); return name.StartsWith("mcp_servers.unityMCP", StringComparison.OrdinalIgnoreCase) || name.StartsWith("mcpServers.unityMCP", StringComparison.OrdinalIgnoreCase); } while ((line = reader.ReadLine()) != null) { string trimmed = line.Trim(); bool isSection = IsSection(trimmed); if (isSection) { // If we encounter the target section or any of its nested tables, mark/keep in-target if (TargetOrChild(trimmed)) { if (!replaced) { if (sb.Length > 0 && sb[^1] != '\n') sb.AppendLine(); sb.AppendLine(newBlockEffective.TrimEnd()); replaced = true; } inTarget = true; continue; } // A new unrelated section ends the target region if (inTarget) { inTarget = false; } } if (inTarget) { continue; } sb.AppendLine(line); } if (!replaced) { if (sb.Length > 0 && sb[^1] != '\n') sb.AppendLine(); sb.AppendLine(newBlockEffective.TrimEnd()); } return sb.ToString().TrimEnd() + Environment.NewLine; } public static bool TryParseCodexServer(string toml, out string command, out string[] args) { command = null; args = null; if (string.IsNullOrWhiteSpace(toml)) return false; try { using var reader = new StringReader(toml); TomlTable root = TOML.Parse(reader); if (root == null) return false; if (!TryGetTable(root, "mcp_servers", out var servers) && !TryGetTable(root, "mcpServers", out servers)) { return false; } if (!TryGetTable(servers, "unityMCP", out var unity)) { return false; } command = GetTomlString(unity, "command"); args = GetTomlStringArray(unity, "args"); return !string.IsNullOrEmpty(command) && args != null; } catch (TomlParseException) { return false; } catch (TomlSyntaxException) { return false; } catch (FormatException) { return false; } } private static bool TryGetTable(TomlTable parent, string key, out TomlTable table) { table = null; if (parent == null) return false; if (parent.TryGetNode(key, out var node)) { if (node is TomlTable tbl) { table = tbl; return true; } if (node is TomlArray array) { var firstTable = array.Children.OfType<TomlTable>().FirstOrDefault(); if (firstTable != null) { table = firstTable; return true; } } } return false; } private static string GetTomlString(TomlTable table, string key) { if (table != null && table.TryGetNode(key, out var node)) { if (node is TomlString str) return str.Value; if (node.HasValue) return node.ToString(); } return null; } private static string[] GetTomlStringArray(TomlTable table, string key) { if (table == null) return null; if (!table.TryGetNode(key, out var node)) return null; if (node is TomlArray array) { List<string> values = new List<string>(); foreach (TomlNode element in array.Children) { if (element is TomlString str) { values.Add(str.Value); } else if (element.HasValue) { values.Add(element.ToString()); } } return values.Count > 0 ? values.ToArray() : Array.Empty<string>(); } if (node is TomlString single) { return new[] { single.Value }; } return null; } private static string FormatTomlStringArray(IEnumerable<string> values) { if (values == null) return "[]"; StringBuilder sb = new StringBuilder(); sb.Append('['); bool first = true; foreach (string value in values) { if (!first) { sb.Append(", "); } sb.Append('"').Append(EscapeTomlString(value ?? string.Empty)).Append('"'); first = false; } sb.Append(']'); return sb.ToString(); } private static string EscapeTomlString(string value) { if (string.IsNullOrEmpty(value)) return string.Empty; return value .Replace("\\", "\\\\") .Replace("\"", "\\\""); } } } ``` -------------------------------------------------------------------------------- /MCPForUnity/Editor/Tools/CommandRegistry.cs: -------------------------------------------------------------------------------- ```csharp using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Text.RegularExpressions; using System.Threading.Tasks; using MCPForUnity.Editor.Helpers; using MCPForUnity.Editor.Resources; using Newtonsoft.Json; using Newtonsoft.Json.Linq; namespace MCPForUnity.Editor.Tools { /// <summary> /// Holds information about a registered command handler. /// </summary> class HandlerInfo { public string CommandName { get; } public Func<JObject, object> SyncHandler { get; } public Func<JObject, Task<object>> AsyncHandler { get; } public bool IsAsync => AsyncHandler != null; public HandlerInfo(string commandName, Func<JObject, object> syncHandler, Func<JObject, Task<object>> asyncHandler) { CommandName = commandName; SyncHandler = syncHandler; AsyncHandler = asyncHandler; } } /// <summary> /// Registry for all MCP command handlers via reflection. /// Handles both MCP tools and resources. /// </summary> public static class CommandRegistry { private static readonly Dictionary<string, HandlerInfo> _handlers = new(); private static bool _initialized = false; /// <summary> /// Initialize and auto-discover all tools and resources marked with /// [McpForUnityTool] or [McpForUnityResource] /// </summary> public static void Initialize() { if (_initialized) return; AutoDiscoverCommands(); _initialized = true; } /// <summary> /// Convert PascalCase or camelCase to snake_case /// </summary> private static string ToSnakeCase(string name) { if (string.IsNullOrEmpty(name)) return name; // Insert underscore before uppercase letters (except first) var s1 = Regex.Replace(name, "(.)([A-Z][a-z]+)", "$1_$2"); var s2 = Regex.Replace(s1, "([a-z0-9])([A-Z])", "$1_$2"); return s2.ToLower(); } /// <summary> /// Auto-discover all types with [McpForUnityTool] or [McpForUnityResource] attributes /// </summary> private static void AutoDiscoverCommands() { try { var allTypes = AppDomain.CurrentDomain.GetAssemblies() .Where(a => !a.IsDynamic) .SelectMany(a => { try { return a.GetTypes(); } catch { return new Type[0]; } }) .ToList(); // Discover tools var toolTypes = allTypes.Where(t => t.GetCustomAttribute<McpForUnityToolAttribute>() != null); int toolCount = 0; foreach (var type in toolTypes) { if (RegisterCommandType(type, isResource: false)) toolCount++; } // Discover resources var resourceTypes = allTypes.Where(t => t.GetCustomAttribute<McpForUnityResourceAttribute>() != null); int resourceCount = 0; foreach (var type in resourceTypes) { if (RegisterCommandType(type, isResource: true)) resourceCount++; } McpLog.Info($"Auto-discovered {toolCount} tools and {resourceCount} resources ({_handlers.Count} total handlers)"); } catch (Exception ex) { McpLog.Error($"Failed to auto-discover MCP commands: {ex.Message}"); } } /// <summary> /// Register a command type (tool or resource) with the registry. /// Returns true if successfully registered, false otherwise. /// </summary> private static bool RegisterCommandType(Type type, bool isResource) { string commandName; string typeLabel = isResource ? "resource" : "tool"; // Get command name from appropriate attribute if (isResource) { var resourceAttr = type.GetCustomAttribute<McpForUnityResourceAttribute>(); commandName = resourceAttr.ResourceName; } else { var toolAttr = type.GetCustomAttribute<McpForUnityToolAttribute>(); commandName = toolAttr.CommandName; } // Auto-generate command name if not explicitly provided if (string.IsNullOrEmpty(commandName)) { commandName = ToSnakeCase(type.Name); } // Check for duplicate command names if (_handlers.ContainsKey(commandName)) { McpLog.Warn( $"Duplicate command name '{commandName}' detected. " + $"{typeLabel} {type.Name} will override previously registered handler." ); } // Find HandleCommand method var method = type.GetMethod( "HandleCommand", BindingFlags.Public | BindingFlags.Static, null, new[] { typeof(JObject) }, null ); if (method == null) { McpLog.Warn( $"MCP {typeLabel} {type.Name} is marked with [McpForUnity{(isResource ? "Resource" : "Tool")}] " + $"but has no public static HandleCommand(JObject) method" ); return false; } try { HandlerInfo handlerInfo; if (typeof(Task).IsAssignableFrom(method.ReturnType)) { var asyncHandler = CreateAsyncHandlerDelegate(method, commandName); handlerInfo = new HandlerInfo(commandName, null, asyncHandler); } else { var handler = (Func<JObject, object>)Delegate.CreateDelegate( typeof(Func<JObject, object>), method ); handlerInfo = new HandlerInfo(commandName, handler, null); } _handlers[commandName] = handlerInfo; return true; } catch (Exception ex) { McpLog.Error($"Failed to register {typeLabel} {type.Name}: {ex.Message}"); return false; } } /// <summary> /// Get a command handler by name /// </summary> private static HandlerInfo GetHandlerInfo(string commandName) { if (!_handlers.TryGetValue(commandName, out var handler)) { throw new InvalidOperationException( $"Unknown or unsupported command type: {commandName}" ); } return handler; } /// <summary> /// Get a synchronous command handler by name. /// Throws if the command is asynchronous. /// </summary> /// <param name="commandName"></param> /// <returns></returns> /// <exception cref="InvalidOperationException"></exception> public static Func<JObject, object> GetHandler(string commandName) { var handlerInfo = GetHandlerInfo(commandName); if (handlerInfo.IsAsync) { throw new InvalidOperationException( $"Command '{commandName}' is asynchronous and must be executed via ExecuteCommand" ); } return handlerInfo.SyncHandler; } /// <summary> /// Execute a command handler, supporting both synchronous and asynchronous (coroutine) handlers. /// If the handler returns an IEnumerator, it will be executed as a coroutine. /// </summary> /// <param name="commandName">The command name to execute</param> /// <param name="params">Command parameters</param> /// <param name="tcs">TaskCompletionSource to complete when async operation finishes</param> /// <returns>The result for synchronous commands, or null for async commands (TCS will be completed later)</returns> public static object ExecuteCommand(string commandName, JObject @params, TaskCompletionSource<string> tcs) { var handlerInfo = GetHandlerInfo(commandName); if (handlerInfo.IsAsync) { ExecuteAsyncHandler(handlerInfo, @params, commandName, tcs); return null; } if (handlerInfo.SyncHandler == null) { throw new InvalidOperationException($"Handler for '{commandName}' does not provide a synchronous implementation"); } return handlerInfo.SyncHandler(@params); } /// <summary> /// Create a delegate for an async handler method that returns Task or Task<T>. /// The delegate will invoke the method and await its completion, returning the result. /// </summary> /// <param name="method"></param> /// <param name="commandName"></param> /// <returns></returns> /// <exception cref="InvalidOperationException"></exception> private static Func<JObject, Task<object>> CreateAsyncHandlerDelegate(MethodInfo method, string commandName) { return async (JObject parameters) => { object rawResult; try { rawResult = method.Invoke(null, new object[] { parameters }); } catch (TargetInvocationException ex) { throw ex.InnerException ?? ex; } if (rawResult == null) { return null; } if (rawResult is not Task task) { throw new InvalidOperationException( $"Async handler '{commandName}' returned an object that is not a Task" ); } await task.ConfigureAwait(true); var taskType = task.GetType(); if (taskType.IsGenericType) { var resultProperty = taskType.GetProperty("Result"); if (resultProperty != null) { return resultProperty.GetValue(task); } } return null; }; } private static void ExecuteAsyncHandler( HandlerInfo handlerInfo, JObject parameters, string commandName, TaskCompletionSource<string> tcs) { if (handlerInfo.AsyncHandler == null) { throw new InvalidOperationException($"Async handler for '{commandName}' is not configured correctly"); } Task<object> handlerTask; try { handlerTask = handlerInfo.AsyncHandler(parameters); } catch (Exception ex) { ReportAsyncFailure(commandName, tcs, ex); return; } if (handlerTask == null) { CompleteAsyncCommand(commandName, tcs, null); return; } async void AwaitHandler() { try { var finalResult = await handlerTask.ConfigureAwait(true); CompleteAsyncCommand(commandName, tcs, finalResult); } catch (Exception ex) { ReportAsyncFailure(commandName, tcs, ex); } } AwaitHandler(); } /// <summary> /// Complete the TaskCompletionSource for an async command with a success result. /// </summary> /// <param name="commandName"></param> /// <param name="tcs"></param> /// <param name="result"></param> private static void CompleteAsyncCommand(string commandName, TaskCompletionSource<string> tcs, object result) { try { var response = new { status = "success", result }; string json = JsonConvert.SerializeObject(response); if (!tcs.TrySetResult(json)) { McpLog.Warn($"TCS for async command '{commandName}' was already completed"); } } catch (Exception ex) { McpLog.Error($"Error completing async command '{commandName}': {ex.Message}\n{ex.StackTrace}"); ReportAsyncFailure(commandName, tcs, ex); } } /// <summary> /// Report an error that occurred during async command execution. /// Completes the TaskCompletionSource with an error response. /// </summary> /// <param name="commandName"></param> /// <param name="tcs"></param> /// <param name="ex"></param> private static void ReportAsyncFailure(string commandName, TaskCompletionSource<string> tcs, Exception ex) { McpLog.Error($"Error in async command '{commandName}': {ex.Message}\n{ex.StackTrace}"); var errorResponse = new { status = "error", error = ex.Message, command = commandName, stackTrace = ex.StackTrace }; string json; try { json = JsonConvert.SerializeObject(errorResponse); } catch (Exception serializationEx) { McpLog.Error($"Failed to serialize error response for '{commandName}': {serializationEx.Message}"); json = "{\"status\":\"error\",\"error\":\"Failed to complete command\"}"; } if (!tcs.TrySetResult(json)) { McpLog.Warn($"TCS for async command '{commandName}' was already completed when trying to report error"); } } } } ``` -------------------------------------------------------------------------------- /.claude/prompts/nl-unity-suite-t.md: -------------------------------------------------------------------------------- ```markdown # Unity T Editing Suite — Additive Test Design You are running inside CI for the `unity-mcp` repo. Use only the tools allowed by the workflow. Work autonomously; do not prompt the user. Do NOT spawn subagents. **Print this once, verbatim, early in the run:** AllowedTools: Write,mcp__unity__manage_editor,mcp__unity__list_resources,mcp__unity__read_resource,mcp__unity__apply_text_edits,mcp__unity__script_apply_edits,mcp__unity__validate_script,mcp__unity__find_in_file,mcp__unity__read_console,mcp__unity__get_sha --- ## Mission 1) Pick target file (prefer): - `unity://path/Assets/Scripts/LongUnityScriptClaudeTest.cs` 2) Execute T tests T-A..T-J in order using minimal, precise edits that build on the NL pass state. 3) Validate each edit with `mcp__unity__validate_script(level:"standard")`. 4) **Report**: write one `<testcase>` XML fragment per test to `reports/<TESTID>_results.xml`. Do **not** read or edit `$JUNIT_OUT`. **CRITICAL XML FORMAT REQUIREMENTS:** - Each file must contain EXACTLY one `<testcase>` root element - NO prologue, epilogue, code fences, or extra characters - NO markdown formatting or explanations outside the XML - Use this exact format: ```xml <testcase name="T-D — End-of-Class Helper" classname="UnityMCP.NL-T"> <system-out><![CDATA[ (evidence of what was accomplished) ]]></system-out> </testcase> ``` - If test fails, include: `<failure message="reason"/>` - TESTID must be one of: T-A, T-B, T-C, T-D, T-E, T-F, T-G, T-H, T-I, T-J 5) **NO RESTORATION** - tests build additively on previous state. 6) **STRICT FRAGMENT EMISSION** - After each test, immediately emit a clean XML file under `reports/<TESTID>_results.xml` with exactly one `<testcase>` whose `name` begins with the exact test id. No prologue/epilogue or fences. If the test fails, include a `<failure message="..."/>` and still emit. --- ## Environment & Paths (CI) - Always pass: `project_root: "TestProjects/UnityMCPTests"` and `ctx: {}` on list/read/edit/validate. - **Canonical URIs only**: - Primary: `unity://path/Assets/...` (never embed `project_root` in the URI) - Relative (when supported): `Assets/...` CI provides: - `$JUNIT_OUT=reports/junit-nl-suite.xml` (pre‑created; leave alone) - `$MD_OUT=reports/junit-nl-suite.md` (synthesized from JUnit) --- ## Transcript Minimization Rules - Do not restate tool JSON; summarize in ≤ 2 short lines. - Never paste full file contents. For matches, include only the matched line and ±1 line. - Prefer `mcp__unity__find_in_file` for targeting; avoid `mcp__unity__read_resource` unless strictly necessary. If needed, limit to `head_bytes ≤ 256` or `tail_lines ≤ 10`. - Per‑test `system-out` ≤ 400 chars: brief status only (no SHA). - Console evidence: fetch the last 10 lines with `include_stacktrace:false` and include ≤ 3 lines in the fragment. - Avoid quoting multi‑line diffs; reference markers instead. — Console scans: perform two reads — last 10 `log/info` lines and up to 3 `error` entries (use `include_stacktrace:false`); include ≤ 3 lines total in the fragment; if no errors, state "no errors". — Final check is folded into T‑J: perform an errors‑only scan (with `include_stacktrace:false`) and include a single "no errors" line or up to 3 error lines within the T‑J fragment. --- ## Tool Mapping - **Anchors/regex/structured**: `mcp__unity__script_apply_edits` - Allowed ops: `anchor_insert`, `replace_method`, `insert_method`, `delete_method`, `regex_replace` - For `anchor_insert`, always set `"position": "before"` or `"after"`. - **Precise ranges / atomic batch**: `mcp__unity__apply_text_edits` (non‑overlapping ranges) STRICT OP GUARDRAILS - Do not use `anchor_replace`. Structured edits must be one of: `anchor_insert`, `replace_method`, `insert_method`, `delete_method`, `regex_replace`. - For multi‑spot textual tweaks in one operation, compute non‑overlapping ranges with `mcp__unity__find_in_file` and use `mcp__unity__apply_text_edits`. - **Hash-only**: `mcp__unity__get_sha` — returns `{sha256,lengthBytes,lastModifiedUtc}` without file body - **Validation**: `mcp__unity__validate_script(level:"standard")` - **Dynamic targeting**: Use `mcp__unity__find_in_file` to locate current positions of methods/markers --- ## Additive Test Design Principles **Key Changes from Reset-Based:** 1. **Dynamic Targeting**: Use `find_in_file` to locate methods/content, never hardcode line numbers 2. **State Awareness**: Each test expects the file state left by the previous test 3. **Content-Based Operations**: Target methods by signature, classes by name, not coordinates 4. **Cumulative Validation**: Ensure the file remains structurally sound throughout the sequence 5. **Composability**: Tests demonstrate how operations work together in real workflows **State Tracking:** - Track file SHA after each test (`mcp__unity__get_sha`) and use it as a precondition for `apply_text_edits` in T‑F/T‑G/T‑I to exercise `stale_file` semantics. Do not include SHA values in report fragments. - Use content signatures (method names, comment markers) to verify expected state - Validate structural integrity after each major change --- ### T-A. Temporary Helper Lifecycle (Returns to State C) **Goal**: Test insert → verify → delete cycle for temporary code **Actions**: - Find current position of `GetCurrentTarget()` method (may have shifted from NL-2 comment) - Insert temporary helper: `private int __TempHelper(int a, int b) => a + b;` - Verify helper method exists and compiles - Delete helper method via structured delete operation - **Expected final state**: Return to State C (helper removed, other changes intact) ### Late-Test Editing Rule - When modifying a method body, use `mcp__unity__script_apply_edits`. If the method is expression-bodied (`=>`), convert it to a block or replace the whole method definition. After the edit, run `mcp__unity__validate_script` and rollback on error. Use `//` comments in inserted code. ### T-B. Method Body Interior Edit (Additive State D) **Goal**: Edit method interior without affecting structure, on modified file **Actions**: - Use `find_in_file` to locate current `HasTarget()` method (modified in NL-1) - Edit method body interior: change return statement to `return true; /* test modification */` - Validate with `mcp__unity__validate_script(level:"standard")` for consistency - Verify edit succeeded and file remains balanced - **Expected final state**: State C + modified HasTarget() body ### T-C. Different Method Interior Edit (Additive State E) **Goal**: Edit a different method to show operations don't interfere **Actions**: - Locate `ApplyBlend()` method using content search - Edit interior line to add null check: `if (animator == null) return; // safety check` - Preserve method signature and structure - **Expected final state**: State D + modified ApplyBlend() method ### T-D. End-of-Class Helper (Additive State F) **Goal**: Add permanent helper method at class end **Actions**: - Use smart anchor matching to find current class-ending brace (after NL-3 tail comments) - Insert permanent helper before class brace: `private void TestHelper() { /* placeholder */ }` - Validate with `mcp__unity__validate_script(level:"standard")` - **IMMEDIATELY** write clean XML fragment to `reports/T-D_results.xml` (no extra text). The `<testcase name>` must start with `T-D`. Include brief evidence in `system-out`. - **Expected final state**: State E + TestHelper() method before class end ### T-E. Method Evolution Lifecycle (Additive State G) **Goal**: Insert → modify → finalize a field + companion method **Actions**: - Insert field: `private int Counter = 0;` - Update it: find and replace with `private int Counter = 42; // initialized` - Add companion method: `private void IncrementCounter() { Counter++; }` - **Expected final state**: State F + Counter field + IncrementCounter() method ### T-F. Atomic Multi-Edit (Additive State H) **Goal**: Multiple coordinated edits in single atomic operation **Actions**: - Read current file state to compute precise ranges - Atomic edit combining: 1. Add comment in `HasTarget()`: `// validated access` 2. Add comment in `ApplyBlend()`: `// safe animation` 3. Add final class comment: `// end of test modifications` - All edits computed from same file snapshot, applied atomically - **Expected final state**: State G + three coordinated comments - After applying the atomic edits, run `validate_script(level:"standard")` and emit a clean fragment to `reports/T-F_results.xml` with a short summary. ### T-G. Path Normalization Test (No State Change) **Goal**: Verify URI forms work equivalently on modified file **Actions**: - Make identical edit using `unity://path/Assets/Scripts/LongUnityScriptClaudeTest.cs` - Then using `Assets/Scripts/LongUnityScriptClaudeTest.cs` - Second should return `stale_file`, retry with updated SHA - Verify both URI forms target same file - **Expected final state**: State H (no content change, just path testing) - Emit `reports/T-G_results.xml` showing evidence of stale SHA handling. ### T-H. Validation on Modified File (No State Change) **Goal**: Ensure validation works correctly on heavily modified file **Actions**: - Run `validate_script(level:"standard")` on current state - Verify no structural errors despite extensive modifications - **Expected final state**: State H (validation only, no edits) - Emit `reports/T-H_results.xml` confirming validation OK. ### T-I. Failure Surface Testing (No State Change) **Goal**: Test error handling on real modified file **Actions**: - Attempt overlapping edits (should fail cleanly) - Attempt edit with stale SHA (should fail cleanly) - Verify error responses are informative - **Expected final state**: State H (failed operations don't modify file) - Emit `reports/T-I_results.xml` capturing error evidence; file must contain one `<testcase>`. ### T-J. Idempotency on Modified File (Additive State I) **Goal**: Verify operations behave predictably when repeated **Actions**: - **Insert (structured)**: `mcp__unity__script_apply_edits` with: `{"op":"anchor_insert","anchor":"// Tail test C","position":"after","text":"\n // idempotency test marker"}` - **Insert again** (same op) → expect `no_op: true`. - **Remove (structured)**: `{"op":"regex_replace","pattern":"(?m)^\\s*// idempotency test marker\\r?\\n?","text":""}` - **Remove again** (same `regex_replace`) → expect `no_op: true`. - `mcp__unity__validate_script(level:"standard")` - Perform a final console scan for errors/exceptions (errors only, up to 3); include "no errors" if none - **IMMEDIATELY** write clean XML fragment to `reports/T-J_results.xml` with evidence of both `no_op: true` outcomes and the console result. The `<testcase name>` must start with `T-J`. - **Expected final state**: State H + verified idempotent behavior --- ## Dynamic Targeting Examples **Instead of hardcoded coordinates:** ```json {"startLine": 31, "startCol": 26, "endLine": 31, "endCol": 58} ``` **Use content-aware targeting:** ```json # Find current method location find_in_file(pattern: "public bool HasTarget\\(\\)") # Then compute edit ranges from found position ``` **Method targeting by signature:** ```json {"op": "replace_method", "className": "LongUnityScriptClaudeTest", "methodName": "HasTarget"} ``` **Anchor-based insertions:** ```json {"op": "anchor_insert", "anchor": "private void Update\\(\\)", "position": "before", "text": "// comment"} ``` --- ## State Verification Patterns **After each test:** 1. Verify expected content exists: `find_in_file` for key markers 2. Check structural integrity: `validate_script(level:"standard")` 3. Update SHA tracking for next test's preconditions 4. Emit a per‑test fragment to `reports/<TESTID>_results.xml` immediately. If the test failed, still write a single `<testcase>` with a `<failure message="..."/>` and evidence in `system-out`. 5. Log cumulative changes in test evidence (keep concise per Transcript Minimization Rules; never paste raw tool JSON) **Error Recovery:** - If test fails, log current state but continue (don't restore) - Next test adapts to actual current state, not expected state - Demonstrates resilience of operations on varied file conditions --- ## Benefits of Additive Design 1. **Realistic Workflows**: Tests mirror actual development patterns 2. **Robust Operations**: Proves edits work on evolving files, not just pristine baselines 3. **Composability Validation**: Shows operations coordinate well together 4. **Simplified Infrastructure**: No restore scripts or snapshots needed 5. **Better Failure Analysis**: Failures don't cascade - each test adapts to current reality 6. **State Evolution Testing**: Validates SDK handles cumulative file modifications correctly This additive approach produces a more realistic and maintainable test suite that better represents actual SDK usage patterns. --- BAN ON EXTRA TOOLS AND DIRS - Do not use any tools outside `AllowedTools`. Do not create directories; assume `reports/` exists. --- ## XML Fragment Templates (T-F .. T-J) Use these skeletons verbatim as a starting point. Replace the bracketed placeholders with your evidence. Ensure each file contains exactly one `<testcase>` element and that the `name` begins with the exact test id. ```xml <testcase name="T-F — Atomic Multi-Edit" classname="UnityMCP.NL-T"> <system-out><![CDATA[ Applied 3 non-overlapping edits in one atomic call: - HasTarget(): added "// validated access" - ApplyBlend(): added "// safe animation" - End-of-class: added "// end of test modifications" validate_script: OK ]]></system-out> </testcase> ``` ```xml <testcase name="T-G — Path Normalization Test" classname="UnityMCP.NL-T"> <system-out><![CDATA[ Edit via unity://path/... succeeded. Same edit via Assets/... returned stale_file, retried with updated hash: OK. ]]></system-out> </testcase> ``` ```xml <testcase name="T-H — Validation on Modified File" classname="UnityMCP.NL-T"> <system-out><![CDATA[ validate_script(level:"standard"): OK on the modified file. ]]></system-out> </testcase> ``` ```xml <testcase name="T-I — Failure Surface Testing" classname="UnityMCP.NL-T"> <system-out><![CDATA[ Overlapping edit: failed cleanly (error captured). Stale hash edit: failed cleanly (error captured). File unchanged. ]]></system-out> </testcase> ``` ```xml <testcase name="T-J — Idempotency on Modified File" classname="UnityMCP.NL-T"> <system-out><![CDATA[ Insert marker after "// Tail test C": OK. Insert same marker again: no_op: true. regex_remove marker: OK. regex_remove again: no_op: true. validate_script: OK. ]]></system-out> </testcase> ``` -------------------------------------------------------------------------------- /tools/stress_mcp.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 import asyncio import argparse import json import os import struct import time from pathlib import Path import random import sys TIMEOUT = float(os.environ.get("MCP_STRESS_TIMEOUT", "2.0")) DEBUG = os.environ.get("MCP_STRESS_DEBUG", "").lower() in ("1", "true", "yes") def dlog(*args): if DEBUG: print(*args, file=sys.stderr) def find_status_files() -> list[Path]: home = Path.home() status_dir = Path(os.environ.get( "UNITY_MCP_STATUS_DIR", home / ".unity-mcp")) if not status_dir.exists(): return [] return sorted(status_dir.glob("unity-mcp-status-*.json"), key=lambda p: p.stat().st_mtime, reverse=True) def discover_port(project_path: str | None) -> int: # Default bridge port if nothing found default_port = 6400 files = find_status_files() for f in files: try: data = json.loads(f.read_text()) port = int(data.get("unity_port", 0) or 0) proj = data.get("project_path") or "" if project_path: # Match status for the given project if possible if proj and project_path in proj: if 0 < port < 65536: return port else: if 0 < port < 65536: return port except Exception: pass return default_port async def read_exact(reader: asyncio.StreamReader, n: int) -> bytes: buf = b"" while len(buf) < n: chunk = await reader.read(n - len(buf)) if not chunk: raise ConnectionError("Connection closed while reading") buf += chunk return buf async def read_frame(reader: asyncio.StreamReader) -> bytes: header = await read_exact(reader, 8) (length,) = struct.unpack(">Q", header) if length <= 0 or length > (64 * 1024 * 1024): raise ValueError(f"Invalid frame length: {length}") return await read_exact(reader, length) async def write_frame(writer: asyncio.StreamWriter, payload: bytes) -> None: header = struct.pack(">Q", len(payload)) writer.write(header) writer.write(payload) await asyncio.wait_for(writer.drain(), timeout=TIMEOUT) async def do_handshake(reader: asyncio.StreamReader) -> None: # Server sends a single line handshake: "WELCOME UNITY-MCP 1 FRAMING=1\n" line = await reader.readline() if not line or b"WELCOME UNITY-MCP" not in line: raise ConnectionError(f"Unexpected handshake from server: {line!r}") def make_ping_frame() -> bytes: return b"ping" def make_execute_menu_item(menu_path: str) -> bytes: # Retained for manual debugging; not used in normal stress runs payload = {"type": "execute_menu_item", "params": { "action": "execute", "menu_path": menu_path}} return json.dumps(payload).encode("utf-8") async def client_loop(idx: int, host: str, port: int, stop_time: float, stats: dict): reconnect_delay = 0.2 while time.time() < stop_time: writer = None try: # slight stagger to prevent burst synchronization across clients await asyncio.sleep(0.003 * (idx % 11)) reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=TIMEOUT) await asyncio.wait_for(do_handshake(reader), timeout=TIMEOUT) # Send a quick ping first await write_frame(writer, make_ping_frame()) # ignore content _ = await asyncio.wait_for(read_frame(reader), timeout=TIMEOUT) # Main activity loop (keep-alive + light load). Edit spam handled by reload_churn_task. while time.time() < stop_time: # Ping-only; edits are sent via reload_churn_task to avoid console spam await write_frame(writer, make_ping_frame()) _ = await asyncio.wait_for(read_frame(reader), timeout=TIMEOUT) stats["pings"] += 1 await asyncio.sleep(0.02 + random.uniform(-0.003, 0.003)) except (ConnectionError, OSError, asyncio.IncompleteReadError, asyncio.TimeoutError): stats["disconnects"] += 1 dlog(f"[client {idx}] disconnect/backoff {reconnect_delay}s") await asyncio.sleep(reconnect_delay) reconnect_delay = min(reconnect_delay * 1.5, 2.0) continue except Exception: stats["errors"] += 1 dlog(f"[client {idx}] unexpected error") await asyncio.sleep(0.2) continue finally: if writer is not None: try: writer.close() await writer.wait_closed() except Exception: pass async def reload_churn_task(project_path: str, stop_time: float, unity_file: str | None, host: str, port: int, stats: dict, storm_count: int = 1): # Use script edit tool to touch a C# file, which triggers compilation reliably path = Path(unity_file) if unity_file else None seq = 0 proj_root = Path(project_path).resolve() if project_path else None # Build candidate list for storm mode candidates: list[Path] = [] if proj_root: try: for p in (proj_root / "Assets").rglob("*.cs"): candidates.append(p.resolve()) except Exception: candidates = [] if path and path.exists(): rp = path.resolve() if rp not in candidates: candidates.append(rp) while time.time() < stop_time: try: if path and path.exists(): # Determine files to touch this cycle targets: list[Path] if storm_count and storm_count > 1 and candidates: k = min(max(1, storm_count), len(candidates)) targets = random.sample(candidates, k) else: targets = [path] for tpath in targets: # Build a tiny ApplyTextEdits request that toggles a trailing comment relative = None try: # Derive Unity-relative path under Assets/ (cross-platform) resolved = tpath.resolve() parts = list(resolved.parts) if "Assets" in parts: i = parts.index("Assets") relative = Path(*parts[i:]).as_posix() elif proj_root and str(resolved).startswith(str(proj_root)): rel = resolved.relative_to(proj_root) parts2 = list(rel.parts) if "Assets" in parts2: i2 = parts2.index("Assets") relative = Path(*parts2[i2:]).as_posix() except Exception: relative = None if relative: # Derive name and directory for ManageScript and compute precondition SHA + EOF position name_base = Path(relative).stem dir_path = str( Path(relative).parent).replace('\\', '/') # 1) Read current contents via manage_script.read to compute SHA and true EOF location contents = None read_success = False for attempt in range(3): writer = None try: reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=TIMEOUT) await asyncio.wait_for(do_handshake(reader), timeout=TIMEOUT) read_payload = { "type": "manage_script", "params": { "action": "read", "name": name_base, "path": dir_path } } await write_frame(writer, json.dumps(read_payload).encode("utf-8")) resp = await asyncio.wait_for(read_frame(reader), timeout=TIMEOUT) read_obj = json.loads( resp.decode("utf-8", errors="ignore")) result = read_obj.get("result", read_obj) if isinstance( read_obj, dict) else {} if result.get("success"): data_obj = result.get("data", {}) contents = data_obj.get("contents") or "" read_success = True break except Exception: # retry with backoff await asyncio.sleep(0.2 * (2 ** attempt) + random.uniform(0.0, 0.1)) finally: if 'writer' in locals() and writer is not None: try: writer.close() await writer.wait_closed() except Exception: pass if not read_success or contents is None: stats["apply_errors"] = stats.get( "apply_errors", 0) + 1 await asyncio.sleep(0.5) continue # Compute SHA and EOF insertion point import hashlib sha = hashlib.sha256( contents.encode("utf-8")).hexdigest() lines = contents.splitlines(keepends=True) # Insert at true EOF (safe against header guards) end_line = len(lines) + 1 # 1-based exclusive end end_col = 1 # Build a unique marker append; ensure it begins with a newline if needed marker = f"// MCP_STRESS seq={seq} time={int(time.time())}" seq += 1 insert_text = ("\n" if not contents.endswith( "\n") else "") + marker + "\n" # 2) Apply text edits with immediate refresh and precondition apply_payload = { "type": "manage_script", "params": { "action": "apply_text_edits", "name": name_base, "path": dir_path, "edits": [ { "startLine": end_line, "startCol": end_col, "endLine": end_line, "endCol": end_col, "newText": insert_text } ], "precondition_sha256": sha, "options": {"refresh": "immediate", "validate": "standard"} } } apply_success = False for attempt in range(3): writer = None try: reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=TIMEOUT) await asyncio.wait_for(do_handshake(reader), timeout=TIMEOUT) await write_frame(writer, json.dumps(apply_payload).encode("utf-8")) resp = await asyncio.wait_for(read_frame(reader), timeout=TIMEOUT) try: data = json.loads(resp.decode( "utf-8", errors="ignore")) result = data.get("result", data) if isinstance( data, dict) else {} ok = bool(result.get("success", False)) if ok: stats["applies"] = stats.get( "applies", 0) + 1 apply_success = True break except Exception: # fall through to retry pass except Exception: # retry with backoff await asyncio.sleep(0.2 * (2 ** attempt) + random.uniform(0.0, 0.1)) finally: if 'writer' in locals() and writer is not None: try: writer.close() await writer.wait_closed() except Exception: pass if not apply_success: stats["apply_errors"] = stats.get( "apply_errors", 0) + 1 except Exception: pass await asyncio.sleep(1.0) async def main(): ap = argparse.ArgumentParser( description="Stress test MCP for Unity with concurrent clients and reload churn") ap.add_argument("--host", default="127.0.0.1") ap.add_argument("--project", default=str( Path(__file__).resolve().parents[1] / "TestProjects" / "UnityMCPTests")) ap.add_argument("--unity-file", default=str(Path(__file__).resolve( ).parents[1] / "TestProjects" / "UnityMCPTests" / "Assets" / "Scripts" / "LongUnityScriptClaudeTest.cs")) ap.add_argument("--clients", type=int, default=10) ap.add_argument("--duration", type=int, default=60) ap.add_argument("--storm-count", type=int, default=1, help="Number of scripts to touch each cycle") args = ap.parse_args() port = discover_port(args.project) stop_time = time.time() + max(10, args.duration) stats = {"pings": 0, "menus": 0, "mods": 0, "disconnects": 0, "errors": 0} tasks = [] # Spawn clients for i in range(max(1, args.clients)): tasks.append(asyncio.create_task( client_loop(i, args.host, port, stop_time, stats))) # Spawn reload churn task tasks.append(asyncio.create_task(reload_churn_task(args.project, stop_time, args.unity_file, args.host, port, stats, storm_count=args.storm_count))) await asyncio.gather(*tasks, return_exceptions=True) print(json.dumps({"port": port, "stats": stats}, indent=2)) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass ``` -------------------------------------------------------------------------------- /TestProjects/UnityMCPTests/Assets/Tests/EditMode/Tools/ManageGameObjectTests.cs: -------------------------------------------------------------------------------- ```csharp using System; using System.Collections.Generic; using NUnit.Framework; using UnityEngine; using UnityEditor; using UnityEngine.TestTools; using Newtonsoft.Json.Linq; using MCPForUnity.Editor.Tools; namespace MCPForUnityTests.Editor.Tools { public class ManageGameObjectTests { private GameObject testGameObject; [SetUp] public void SetUp() { // Create a test GameObject for each test testGameObject = new GameObject("TestObject"); } [TearDown] public void TearDown() { // Clean up test GameObject if (testGameObject != null) { UnityEngine.Object.DestroyImmediate(testGameObject); } } [Test] public void HandleCommand_ReturnsError_ForNullParams() { var result = ManageGameObject.HandleCommand(null); Assert.IsNotNull(result, "Should return a result object"); // Note: Actual error checking would need access to Response structure } [Test] public void HandleCommand_ReturnsError_ForEmptyParams() { var emptyParams = new JObject(); var result = ManageGameObject.HandleCommand(emptyParams); Assert.IsNotNull(result, "Should return a result object for empty params"); } [Test] public void HandleCommand_ProcessesValidCreateAction() { var createParams = new JObject { ["action"] = "create", ["name"] = "TestCreateObject" }; var result = ManageGameObject.HandleCommand(createParams); Assert.IsNotNull(result, "Should return a result for valid create action"); // Clean up - find and destroy the created object var createdObject = GameObject.Find("TestCreateObject"); if (createdObject != null) { UnityEngine.Object.DestroyImmediate(createdObject); } } [Test] public void ComponentResolver_Integration_WorksWithRealComponents() { // Test that our ComponentResolver works with actual Unity components var transformResult = ComponentResolver.TryResolve("Transform", out Type transformType, out string error); Assert.IsTrue(transformResult, "Should resolve Transform component"); Assert.AreEqual(typeof(Transform), transformType, "Should return correct Transform type"); Assert.IsEmpty(error, "Should have no error for valid component"); } [Test] public void ComponentResolver_Integration_WorksWithBuiltInComponents() { var components = new[] { ("Rigidbody", typeof(Rigidbody)), ("Collider", typeof(Collider)), ("Renderer", typeof(Renderer)), ("Camera", typeof(Camera)), ("Light", typeof(Light)) }; foreach (var (componentName, expectedType) in components) { var result = ComponentResolver.TryResolve(componentName, out Type actualType, out string error); // Some components might not resolve (abstract classes), but the method should handle gracefully if (result) { Assert.IsTrue(expectedType.IsAssignableFrom(actualType), $"{componentName} should resolve to assignable type"); } else { Assert.IsNotEmpty(error, $"Should have error message for {componentName}"); } } } [Test] public void PropertyMatching_Integration_WorksWithRealGameObject() { // Add a Rigidbody to test real property matching var rigidbody = testGameObject.AddComponent<Rigidbody>(); var properties = ComponentResolver.GetAllComponentProperties(typeof(Rigidbody)); Assert.IsNotEmpty(properties, "Rigidbody should have properties"); Assert.Contains("mass", properties, "Rigidbody should have mass property"); Assert.Contains("useGravity", properties, "Rigidbody should have useGravity property"); // Test AI suggestions var suggestions = ComponentResolver.GetAIPropertySuggestions("Use Gravity", properties); Assert.Contains("useGravity", suggestions, "Should suggest useGravity for 'Use Gravity'"); } [Test] public void PropertyMatching_HandlesMonoBehaviourProperties() { var properties = ComponentResolver.GetAllComponentProperties(typeof(MonoBehaviour)); Assert.IsNotEmpty(properties, "MonoBehaviour should have properties"); Assert.Contains("enabled", properties, "MonoBehaviour should have enabled property"); Assert.Contains("name", properties, "MonoBehaviour should have name property"); Assert.Contains("tag", properties, "MonoBehaviour should have tag property"); } [Test] public void PropertyMatching_HandlesCaseVariations() { var testProperties = new List<string> { "maxReachDistance", "playerHealth", "movementSpeed" }; var testCases = new[] { ("max reach distance", "maxReachDistance"), ("Max Reach Distance", "maxReachDistance"), ("MAX_REACH_DISTANCE", "maxReachDistance"), ("player health", "playerHealth"), ("movement speed", "movementSpeed") }; foreach (var (input, expected) in testCases) { var suggestions = ComponentResolver.GetAIPropertySuggestions(input, testProperties); Assert.Contains(expected, suggestions, $"Should suggest {expected} for input '{input}'"); } } [Test] public void ErrorHandling_ReturnsHelpfulMessages() { // This test verifies that error messages are helpful and contain suggestions var testProperties = new List<string> { "mass", "velocity", "drag", "useGravity" }; var suggestions = ComponentResolver.GetAIPropertySuggestions("weight", testProperties); // Even if no perfect match, should return valid list Assert.IsNotNull(suggestions, "Should return valid suggestions list"); // Test with completely invalid input var badSuggestions = ComponentResolver.GetAIPropertySuggestions("xyz123invalid", testProperties); Assert.IsNotNull(badSuggestions, "Should handle invalid input gracefully"); } [Test] public void PerformanceTest_CachingWorks() { var properties = ComponentResolver.GetAllComponentProperties(typeof(Transform)); var input = "Test Property Name"; // First call - populate cache var startTime = System.DateTime.UtcNow; var suggestions1 = ComponentResolver.GetAIPropertySuggestions(input, properties); var firstCallTime = (System.DateTime.UtcNow - startTime).TotalMilliseconds; // Second call - should use cache startTime = System.DateTime.UtcNow; var suggestions2 = ComponentResolver.GetAIPropertySuggestions(input, properties); var secondCallTime = (System.DateTime.UtcNow - startTime).TotalMilliseconds; Assert.AreEqual(suggestions1.Count, suggestions2.Count, "Cached results should be identical"); CollectionAssert.AreEqual(suggestions1, suggestions2, "Cached results should match exactly"); // Second call should be faster (though this test might be flaky) Assert.LessOrEqual(secondCallTime, firstCallTime * 2, "Cached call should not be significantly slower"); } [Test] public void SetComponentProperties_CollectsAllFailuresAndAppliesValidOnes() { // Arrange - add Transform and Rigidbody components to test with var transform = testGameObject.transform; var rigidbody = testGameObject.AddComponent<Rigidbody>(); // Create a params object with mixed valid and invalid properties var setPropertiesParams = new JObject { ["action"] = "modify", ["target"] = testGameObject.name, ["search_method"] = "by_name", ["componentProperties"] = new JObject { ["Transform"] = new JObject { ["localPosition"] = new JObject { ["x"] = 1.0f, ["y"] = 2.0f, ["z"] = 3.0f }, // Valid ["rotatoin"] = new JObject { ["x"] = 0.0f, ["y"] = 90.0f, ["z"] = 0.0f }, // Invalid (typo - should be rotation) ["localScale"] = new JObject { ["x"] = 2.0f, ["y"] = 2.0f, ["z"] = 2.0f } // Valid }, ["Rigidbody"] = new JObject { ["mass"] = 5.0f, // Valid ["invalidProp"] = "test", // Invalid - doesn't exist ["useGravity"] = true // Valid } } }; // Store original values to verify changes var originalLocalPosition = transform.localPosition; var originalLocalScale = transform.localScale; var originalMass = rigidbody.mass; var originalUseGravity = rigidbody.useGravity; Debug.Log($"BEFORE TEST - Mass: {rigidbody.mass}, UseGravity: {rigidbody.useGravity}"); // Expect the warning logs from the invalid properties LogAssert.Expect(LogType.Warning, new System.Text.RegularExpressions.Regex("Property 'rotatoin' not found")); LogAssert.Expect(LogType.Warning, new System.Text.RegularExpressions.Regex("Property 'invalidProp' not found")); // Act var result = ManageGameObject.HandleCommand(setPropertiesParams); Debug.Log($"AFTER TEST - Mass: {rigidbody.mass}, UseGravity: {rigidbody.useGravity}"); Debug.Log($"AFTER TEST - LocalPosition: {transform.localPosition}"); Debug.Log($"AFTER TEST - LocalScale: {transform.localScale}"); // Assert - verify that valid properties were set despite invalid ones Assert.AreEqual(new Vector3(1.0f, 2.0f, 3.0f), transform.localPosition, "Valid localPosition should be set even with other invalid properties"); Assert.AreEqual(new Vector3(2.0f, 2.0f, 2.0f), transform.localScale, "Valid localScale should be set even with other invalid properties"); Assert.AreEqual(5.0f, rigidbody.mass, 0.001f, "Valid mass should be set even with other invalid properties"); Assert.AreEqual(true, rigidbody.useGravity, "Valid useGravity should be set even with other invalid properties"); // Verify the result indicates errors (since we had invalid properties) Assert.IsNotNull(result, "Should return a result object"); // The collect-and-continue behavior means we should get an error response // that contains info about the failed properties, but valid ones were still applied // This proves the collect-and-continue behavior is working // Harden: verify structured error response with failures list contains both invalid fields var successProp = result.GetType().GetProperty("success"); Assert.IsNotNull(successProp, "Result should expose 'success' property"); Assert.IsFalse((bool)successProp.GetValue(result), "Result.success should be false for partial failure"); var dataProp = result.GetType().GetProperty("data"); Assert.IsNotNull(dataProp, "Result should include 'data' with errors"); var dataVal = dataProp.GetValue(result); Assert.IsNotNull(dataVal, "Result.data should not be null"); var errorsProp = dataVal.GetType().GetProperty("errors"); Assert.IsNotNull(errorsProp, "Result.data should include 'errors' list"); var errorsEnum = errorsProp.GetValue(dataVal) as System.Collections.IEnumerable; Assert.IsNotNull(errorsEnum, "errors should be enumerable"); bool foundRotatoin = false; bool foundInvalidProp = false; foreach (var err in errorsEnum) { string s = err?.ToString() ?? string.Empty; if (s.Contains("rotatoin")) foundRotatoin = true; if (s.Contains("invalidProp")) foundInvalidProp = true; } Assert.IsTrue(foundRotatoin, "errors should mention the misspelled 'rotatoin' property"); Assert.IsTrue(foundInvalidProp, "errors should mention the 'invalidProp' property"); } [Test] public void SetComponentProperties_ContinuesAfterException() { // Arrange - create scenario that might cause exceptions var rigidbody = testGameObject.AddComponent<Rigidbody>(); // Set initial values that we'll change rigidbody.mass = 1.0f; rigidbody.useGravity = true; var setPropertiesParams = new JObject { ["action"] = "modify", ["target"] = testGameObject.name, ["search_method"] = "by_name", ["componentProperties"] = new JObject { ["Rigidbody"] = new JObject { ["mass"] = 2.5f, // Valid - should be set ["velocity"] = "invalid_type", // Invalid type - will cause exception ["useGravity"] = false // Valid - should still be set after exception } } }; // Expect the error logs from the invalid property LogAssert.Expect(LogType.Error, new System.Text.RegularExpressions.Regex("Unexpected error converting token to UnityEngine.Vector3")); LogAssert.Expect(LogType.Error, new System.Text.RegularExpressions.Regex("SetProperty.*Failed to set 'velocity'")); LogAssert.Expect(LogType.Warning, new System.Text.RegularExpressions.Regex("Property 'velocity' not found")); // Act var result = ManageGameObject.HandleCommand(setPropertiesParams); // Assert - verify that valid properties before AND after the exception were still set Assert.AreEqual(2.5f, rigidbody.mass, 0.001f, "Mass should be set even if later property causes exception"); Assert.AreEqual(false, rigidbody.useGravity, "UseGravity should be set even if previous property caused exception"); Assert.IsNotNull(result, "Should return a result even with exceptions"); // The key test: processing continued after the exception and set useGravity // This proves the collect-and-continue behavior works even with exceptions // Harden: verify structured error response contains velocity failure var successProp2 = result.GetType().GetProperty("success"); Assert.IsNotNull(successProp2, "Result should expose 'success' property"); Assert.IsFalse((bool)successProp2.GetValue(result), "Result.success should be false when an exception occurs for a property"); var dataProp2 = result.GetType().GetProperty("data"); Assert.IsNotNull(dataProp2, "Result should include 'data' with errors"); var dataVal2 = dataProp2.GetValue(result); Assert.IsNotNull(dataVal2, "Result.data should not be null"); var errorsProp2 = dataVal2.GetType().GetProperty("errors"); Assert.IsNotNull(errorsProp2, "Result.data should include 'errors' list"); var errorsEnum2 = errorsProp2.GetValue(dataVal2) as System.Collections.IEnumerable; Assert.IsNotNull(errorsEnum2, "errors should be enumerable"); bool foundVelocityError = false; foreach (var err in errorsEnum2) { string s = err?.ToString() ?? string.Empty; if (s.Contains("velocity")) { foundVelocityError = true; break; } } Assert.IsTrue(foundVelocityError, "errors should include a message referencing 'velocity'"); } } } ``` -------------------------------------------------------------------------------- /UnityMcpBridge/UnityMcpServer~/src/telemetry.py: -------------------------------------------------------------------------------- ```python """ Privacy-focused, anonymous telemetry system for Unity MCP Inspired by Onyx's telemetry implementation with Unity-specific adaptations Fire-and-forget telemetry sender with a single background worker. - No context/thread-local propagation to avoid re-entrancy into tool resolution. - Small network timeouts to prevent stalls. """ import contextlib from dataclasses import dataclass from enum import Enum import importlib import json import logging import os from pathlib import Path import platform import queue import sys import threading import time from typing import Optional, Dict, Any from urllib.parse import urlparse import uuid import tomli try: import httpx HAS_HTTPX = True except ImportError: httpx = None # type: ignore HAS_HTTPX = False def get_package_version() -> str: """ Open pyproject.toml and parse version We use the tomli library instead of tomllib to support Python 3.10 """ with open("pyproject.toml", "rb") as f: data = tomli.load(f) return data["project"]["version"] MCP_VERSION = get_package_version() logger = logging.getLogger("unity-mcp-telemetry") class RecordType(str, Enum): """Types of telemetry records we collect""" VERSION = "version" STARTUP = "startup" USAGE = "usage" LATENCY = "latency" FAILURE = "failure" TOOL_EXECUTION = "tool_execution" UNITY_CONNECTION = "unity_connection" CLIENT_CONNECTION = "client_connection" class MilestoneType(str, Enum): """Major user journey milestones""" FIRST_STARTUP = "first_startup" FIRST_TOOL_USAGE = "first_tool_usage" FIRST_SCRIPT_CREATION = "first_script_creation" FIRST_SCENE_MODIFICATION = "first_scene_modification" MULTIPLE_SESSIONS = "multiple_sessions" DAILY_ACTIVE_USER = "daily_active_user" WEEKLY_ACTIVE_USER = "weekly_active_user" @dataclass class TelemetryRecord: """Structure for telemetry data""" record_type: RecordType timestamp: float customer_uuid: str session_id: str data: Dict[str, Any] milestone: Optional[MilestoneType] = None class TelemetryConfig: """Telemetry configuration""" def __init__(self): # Prefer config file, then allow env overrides server_config = None for modname in ( "UnityMcpBridge.UnityMcpServer~.src.config", "UnityMcpBridge.UnityMcpServer.src.config", "src.config", "config", ): try: mod = importlib.import_module(modname) server_config = getattr(mod, "config", None) if server_config is not None: break except Exception: continue # Determine enabled flag: config -> env DISABLE_* opt-out cfg_enabled = True if server_config is None else bool( getattr(server_config, "telemetry_enabled", True)) self.enabled = cfg_enabled and not self._is_disabled() # Telemetry endpoint (Cloud Run default; override via env) cfg_default = None if server_config is None else getattr( server_config, "telemetry_endpoint", None) default_ep = cfg_default or "https://api-prod.coplay.dev/telemetry/events" self.default_endpoint = default_ep self.endpoint = self._validated_endpoint( os.environ.get("UNITY_MCP_TELEMETRY_ENDPOINT", default_ep), default_ep, ) try: logger.info( "Telemetry configured: endpoint=%s (default=%s), timeout_env=%s", self.endpoint, default_ep, os.environ.get("UNITY_MCP_TELEMETRY_TIMEOUT") or "<unset>" ) except Exception: pass # Local storage for UUID and milestones self.data_dir = self._get_data_directory() self.uuid_file = self.data_dir / "customer_uuid.txt" self.milestones_file = self.data_dir / "milestones.json" # Request timeout (small, fail fast). Override with UNITY_MCP_TELEMETRY_TIMEOUT try: self.timeout = float(os.environ.get( "UNITY_MCP_TELEMETRY_TIMEOUT", "1.5")) except Exception: self.timeout = 1.5 try: logger.info("Telemetry timeout=%.2fs", self.timeout) except Exception: pass # Session tracking self.session_id = str(uuid.uuid4()) def _is_disabled(self) -> bool: """Check if telemetry is disabled via environment variables""" disable_vars = [ "DISABLE_TELEMETRY", "UNITY_MCP_DISABLE_TELEMETRY", "MCP_DISABLE_TELEMETRY" ] for var in disable_vars: if os.environ.get(var, "").lower() in ("true", "1", "yes", "on"): return True return False def _get_data_directory(self) -> Path: """Get directory for storing telemetry data""" if os.name == 'nt': # Windows base_dir = Path(os.environ.get( 'APPDATA', Path.home() / 'AppData' / 'Roaming')) elif os.name == 'posix': # macOS/Linux if 'darwin' in os.uname().sysname.lower(): # macOS base_dir = Path.home() / 'Library' / 'Application Support' else: # Linux base_dir = Path(os.environ.get('XDG_DATA_HOME', Path.home() / '.local' / 'share')) else: base_dir = Path.home() / '.unity-mcp' data_dir = base_dir / 'UnityMCP' data_dir.mkdir(parents=True, exist_ok=True) return data_dir def _validated_endpoint(self, candidate: str, fallback: str) -> str: """Validate telemetry endpoint URL scheme; allow only http/https. Falls back to the provided default on error. """ try: parsed = urlparse(candidate) if parsed.scheme not in ("https", "http"): raise ValueError(f"Unsupported scheme: {parsed.scheme}") # Basic sanity: require network location and path if not parsed.netloc: raise ValueError("Missing netloc in endpoint") # Reject localhost/loopback endpoints in production to avoid accidental local overrides host = parsed.hostname or "" if host in ("localhost", "127.0.0.1", "::1"): raise ValueError( "Localhost endpoints are not allowed for telemetry") return candidate except Exception as e: logger.debug( f"Invalid telemetry endpoint '{candidate}', using default. Error: {e}", exc_info=True, ) return fallback class TelemetryCollector: """Main telemetry collection class""" def __init__(self): self.config = TelemetryConfig() self._customer_uuid: Optional[str] = None self._milestones: Dict[str, Dict[str, Any]] = {} self._lock: threading.Lock = threading.Lock() # Bounded queue with single background worker (records only; no context propagation) self._queue: "queue.Queue[TelemetryRecord]" = queue.Queue(maxsize=1000) # Load persistent data before starting worker so first events have UUID self._load_persistent_data() self._worker: threading.Thread = threading.Thread( target=self._worker_loop, daemon=True) self._worker.start() def _load_persistent_data(self): """Load UUID and milestones from disk""" # Load customer UUID try: if self.config.uuid_file.exists(): self._customer_uuid = self.config.uuid_file.read_text( encoding="utf-8").strip() or str(uuid.uuid4()) else: self._customer_uuid = str(uuid.uuid4()) try: self.config.uuid_file.write_text( self._customer_uuid, encoding="utf-8") if os.name == "posix": os.chmod(self.config.uuid_file, 0o600) except OSError as e: logger.debug( f"Failed to persist customer UUID: {e}", exc_info=True) except OSError as e: logger.debug(f"Failed to load customer UUID: {e}", exc_info=True) self._customer_uuid = str(uuid.uuid4()) # Load milestones (failure here must not affect UUID) try: if self.config.milestones_file.exists(): content = self.config.milestones_file.read_text( encoding="utf-8") self._milestones = json.loads(content) or {} if not isinstance(self._milestones, dict): self._milestones = {} except (OSError, json.JSONDecodeError, ValueError) as e: logger.debug(f"Failed to load milestones: {e}", exc_info=True) self._milestones = {} def _save_milestones(self): """Save milestones to disk. Caller must hold self._lock.""" try: self.config.milestones_file.write_text( json.dumps(self._milestones, indent=2), encoding="utf-8", ) except OSError as e: logger.warning(f"Failed to save milestones: {e}", exc_info=True) def record_milestone(self, milestone: MilestoneType, data: Optional[Dict[str, Any]] = None) -> bool: """Record a milestone event, returns True if this is the first occurrence""" if not self.config.enabled: return False milestone_key = milestone.value with self._lock: if milestone_key in self._milestones: return False # Already recorded milestone_data = { "timestamp": time.time(), "data": data or {}, } self._milestones[milestone_key] = milestone_data self._save_milestones() # Also send as telemetry record self.record( record_type=RecordType.USAGE, data={"milestone": milestone_key, **(data or {})}, milestone=milestone ) return True def record(self, record_type: RecordType, data: Dict[str, Any], milestone: Optional[MilestoneType] = None): """Record a telemetry event (async, non-blocking)""" if not self.config.enabled: return # Allow fallback sender when httpx is unavailable (no early return) record = TelemetryRecord( record_type=record_type, timestamp=time.time(), customer_uuid=self._customer_uuid or "unknown", session_id=self.config.session_id, data=data, milestone=milestone ) # Enqueue for background worker (non-blocking). Drop on backpressure. try: self._queue.put_nowait(record) except queue.Full: logger.debug("Telemetry queue full; dropping %s", record.record_type) def _worker_loop(self): """Background worker that serializes telemetry sends.""" while True: rec = self._queue.get() try: # Run sender directly; do not reuse caller context/thread-locals self._send_telemetry(rec) except Exception: logger.debug("Telemetry worker send failed", exc_info=True) finally: with contextlib.suppress(Exception): self._queue.task_done() def _send_telemetry(self, record: TelemetryRecord): """Send telemetry data to endpoint""" try: # System fingerprint (top-level remains concise; details stored in data JSON) _platform = platform.system() # 'Darwin' | 'Linux' | 'Windows' _source = sys.platform # 'darwin' | 'linux' | 'win32' _platform_detail = f"{_platform} {platform.release()} ({platform.machine()})" _python_version = platform.python_version() # Enrich data JSON so BigQuery stores detailed fields without schema change enriched_data = dict(record.data or {}) enriched_data.setdefault("platform_detail", _platform_detail) enriched_data.setdefault("python_version", _python_version) payload = { "record": record.record_type.value, "timestamp": record.timestamp, "customer_uuid": record.customer_uuid, "session_id": record.session_id, "data": enriched_data, "version": MCP_VERSION, "platform": _platform, "source": _source, } if record.milestone: payload["milestone"] = record.milestone.value # Prefer httpx when available; otherwise fall back to urllib if httpx: with httpx.Client(timeout=self.config.timeout) as client: # Re-validate endpoint at send time to handle dynamic changes endpoint = self.config._validated_endpoint( self.config.endpoint, self.config.default_endpoint) response = client.post(endpoint, json=payload) if 200 <= response.status_code < 300: logger.debug(f"Telemetry sent: {record.record_type}") else: logger.warning( f"Telemetry failed: HTTP {response.status_code}") else: import urllib.request import urllib.error data_bytes = json.dumps(payload).encode("utf-8") endpoint = self.config._validated_endpoint( self.config.endpoint, self.config.default_endpoint) req = urllib.request.Request( endpoint, data=data_bytes, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=self.config.timeout) as resp: if 200 <= resp.getcode() < 300: logger.debug( f"Telemetry sent (urllib): {record.record_type}") else: logger.warning( f"Telemetry failed (urllib): HTTP {resp.getcode()}") except urllib.error.URLError as ue: logger.warning(f"Telemetry send failed (urllib): {ue}") except Exception as e: # Never let telemetry errors interfere with app functionality logger.debug(f"Telemetry send failed: {e}") # Global telemetry instance _telemetry_collector: Optional[TelemetryCollector] = None def get_telemetry() -> TelemetryCollector: """Get the global telemetry collector instance""" global _telemetry_collector if _telemetry_collector is None: _telemetry_collector = TelemetryCollector() return _telemetry_collector def record_telemetry(record_type: RecordType, data: Dict[str, Any], milestone: Optional[MilestoneType] = None): """Convenience function to record telemetry""" get_telemetry().record(record_type, data, milestone) def record_milestone(milestone: MilestoneType, data: Optional[Dict[str, Any]] = None) -> bool: """Convenience function to record a milestone""" return get_telemetry().record_milestone(milestone, data) def record_tool_usage(tool_name: str, success: bool, duration_ms: float, error: Optional[str] = None, sub_action: Optional[str] = None): """Record tool usage telemetry Args: tool_name: Name of the tool invoked (e.g., 'manage_scene'). success: Whether the tool completed successfully. duration_ms: Execution duration in milliseconds. error: Optional error message (truncated if present). sub_action: Optional sub-action/operation within the tool (e.g., 'get_hierarchy'). """ data = { "tool_name": tool_name, "success": success, "duration_ms": round(duration_ms, 2) } if sub_action is not None: try: data["sub_action"] = str(sub_action) except Exception: # Ensure telemetry is never disruptive data["sub_action"] = "unknown" if error: data["error"] = str(error)[:200] # Limit error message length record_telemetry(RecordType.TOOL_EXECUTION, data) def record_latency(operation: str, duration_ms: float, metadata: Optional[Dict[str, Any]] = None): """Record latency telemetry""" data = { "operation": operation, "duration_ms": round(duration_ms, 2) } if metadata: data.update(metadata) record_telemetry(RecordType.LATENCY, data) def record_failure(component: str, error: str, metadata: Optional[Dict[str, Any]] = None): """Record failure telemetry""" data = { "component": component, "error": str(error)[:500] # Limit error message length } if metadata: data.update(metadata) record_telemetry(RecordType.FAILURE, data) def is_telemetry_enabled() -> bool: """Check if telemetry is enabled""" return get_telemetry().config.enabled ``` -------------------------------------------------------------------------------- /UnityMcpBridge/UnityMcpServer~/src/tools/resource_tools.py: -------------------------------------------------------------------------------- ```python """ Resource wrapper tools so clients that do not expose MCP resources primitives can still list and read files via normal tools. These call into the same safe path logic (re-implemented here to avoid importing server.py). """ import fnmatch import hashlib import os from pathlib import Path import re from typing import Annotated, Any from urllib.parse import urlparse, unquote from mcp.server.fastmcp import Context from registry import mcp_for_unity_tool from unity_connection import send_command_with_retry def _coerce_int(value: Any, default: int | None = None, minimum: int | None = None) -> int | None: """Safely coerce various inputs (str/float/etc.) to an int. Returns default on failure; clamps to minimum when provided. """ if value is None: return default try: # Avoid treating booleans as ints implicitly if isinstance(value, bool): return default if isinstance(value, int): result = int(value) else: s = str(value).strip() if s.lower() in ("", "none", "null"): return default # Allow "10.0" or similar inputs result = int(float(s)) if minimum is not None and result < minimum: return minimum return result except Exception: return default def _resolve_project_root(override: str | None) -> Path: # 1) Explicit override if override: pr = Path(override).expanduser().resolve() if (pr / "Assets").exists(): return pr # 2) Environment env = os.environ.get("UNITY_PROJECT_ROOT") if env: env_path = Path(env).expanduser() # If UNITY_PROJECT_ROOT is relative, resolve against repo root (cwd's repo) instead of src dir pr = (Path.cwd( ) / env_path).resolve() if not env_path.is_absolute() else env_path.resolve() if (pr / "Assets").exists(): return pr # 3) Ask Unity via manage_editor.get_project_root try: resp = send_command_with_retry( "manage_editor", {"action": "get_project_root"}) if isinstance(resp, dict) and resp.get("success"): pr = Path(resp.get("data", {}).get( "projectRoot", "")).expanduser().resolve() if pr and (pr / "Assets").exists(): return pr except Exception: pass # 4) Walk up from CWD to find a Unity project (Assets + ProjectSettings) cur = Path.cwd().resolve() for _ in range(6): if (cur / "Assets").exists() and (cur / "ProjectSettings").exists(): return cur if cur.parent == cur: break cur = cur.parent # 5) Search downwards (shallow) from repo root for first folder with Assets + ProjectSettings try: import os as _os root = Path.cwd().resolve() max_depth = 3 for dirpath, dirnames, _ in _os.walk(root): rel = Path(dirpath).resolve() try: depth = len(rel.relative_to(root).parts) except Exception: # Unrelated mount/permission edge; skip deeper traversal dirnames[:] = [] continue if depth > max_depth: # Prune deeper traversal dirnames[:] = [] continue if (rel / "Assets").exists() and (rel / "ProjectSettings").exists(): return rel except Exception: pass # 6) Fallback: CWD return Path.cwd().resolve() def _resolve_safe_path_from_uri(uri: str, project: Path) -> Path | None: raw: str | None = None if uri.startswith("unity://path/"): raw = uri[len("unity://path/"):] elif uri.startswith("file://"): parsed = urlparse(uri) raw = unquote(parsed.path or "") # On Windows, urlparse('file:///C:/x') -> path='/C:/x'. Strip the leading slash for drive letters. try: import os as _os if _os.name == "nt" and raw.startswith("/") and re.match(r"^/[A-Za-z]:/", raw): raw = raw[1:] # UNC paths: file://server/share -> netloc='server', path='/share'. Treat as \\\\server/share if _os.name == "nt" and parsed.netloc: raw = f"//{parsed.netloc}{raw}" except Exception: pass elif uri.startswith("Assets/"): raw = uri if raw is None: return None # Normalize separators early raw = raw.replace("\\", "/") p = (project / raw).resolve() try: p.relative_to(project) except ValueError: return None return p @mcp_for_unity_tool(description=("List project URIs (unity://path/...) under a folder (default: Assets). Only .cs files are returned by default; always appends unity://spec/script-edits.\n")) async def list_resources( ctx: Context, pattern: Annotated[str, "Glob, default is *.cs"] | None = "*.cs", under: Annotated[str, "Folder under project root, default is Assets"] = "Assets", limit: Annotated[int, "Page limit"] = 200, project_root: Annotated[str, "Project path"] | None = None, ) -> dict[str, Any]: ctx.info(f"Processing list_resources: {pattern}") try: project = _resolve_project_root(project_root) base = (project / under).resolve() try: base.relative_to(project) except ValueError: return {"success": False, "error": "Base path must be under project root"} # Enforce listing only under Assets try: base.relative_to(project / "Assets") except ValueError: return {"success": False, "error": "Listing is restricted to Assets/"} matches: list[str] = [] limit_int = _coerce_int(limit, default=200, minimum=1) for p in base.rglob("*"): if not p.is_file(): continue # Resolve symlinks and ensure the real path stays under project/Assets try: rp = p.resolve() rp.relative_to(project / "Assets") except Exception: continue # Enforce .cs extension regardless of provided pattern if p.suffix.lower() != ".cs": continue if pattern and not fnmatch.fnmatch(p.name, pattern): continue rel = p.relative_to(project).as_posix() matches.append(f"unity://path/{rel}") if len(matches) >= max(1, limit_int): break # Always include the canonical spec resource so NL clients can discover it if "unity://spec/script-edits" not in matches: matches.append("unity://spec/script-edits") return {"success": True, "data": {"uris": matches, "count": len(matches)}} except Exception as e: return {"success": False, "error": str(e)} @mcp_for_unity_tool(description=("Reads a resource by unity://path/... URI with optional slicing.")) async def read_resource( ctx: Context, uri: Annotated[str, "The resource URI to read under Assets/"], start_line: Annotated[int, "The starting line number (0-based)"] | None = None, line_count: Annotated[int, "The number of lines to read"] | None = None, head_bytes: Annotated[int, "The number of bytes to read from the start of the file"] | None = None, tail_lines: Annotated[int, "The number of lines to read from the end of the file"] | None = None, project_root: Annotated[str, "The project root directory"] | None = None, request: Annotated[str, "The request ID"] | None = None, ) -> dict[str, Any]: ctx.info(f"Processing read_resource: {uri}") try: # Serve the canonical spec directly when requested (allow bare or with scheme) if uri in ("unity://spec/script-edits", "spec/script-edits", "script-edits"): spec_json = ( '{\n' ' "name": "Unity MCP - Script Edits v1",\n' ' "target_tool": "script_apply_edits",\n' ' "canonical_rules": {\n' ' "always_use": ["op","className","methodName","replacement","afterMethodName","beforeMethodName"],\n' ' "never_use": ["new_method","anchor_method","content","newText"],\n' ' "defaults": {\n' ' "className": "\u2190 server will default to \'name\' when omitted",\n' ' "position": "end"\n' ' }\n' ' },\n' ' "ops": [\n' ' {"op":"replace_method","required":["className","methodName","replacement"],"optional":["returnType","parametersSignature","attributesContains"],"examples":[{"note":"match overload by signature","parametersSignature":"(int a, string b)"},{"note":"ensure attributes retained","attributesContains":"ContextMenu"}]},\n' ' {"op":"insert_method","required":["className","replacement"],"position":{"enum":["start","end","after","before"],"after_requires":"afterMethodName","before_requires":"beforeMethodName"}},\n' ' {"op":"delete_method","required":["className","methodName"]},\n' ' {"op":"anchor_insert","required":["anchor","text"],"notes":"regex; position=before|after"}\n' ' ],\n' ' "apply_text_edits_recipe": {\n' ' "step1_read": { "tool": "resources/read", "args": {"uri": "unity://path/Assets/Scripts/Interaction/SmartReach.cs"} },\n' ' "step2_apply": {\n' ' "tool": "manage_script",\n' ' "args": {\n' ' "action": "apply_text_edits",\n' ' "name": "SmartReach", "path": "Assets/Scripts/Interaction",\n' ' "edits": [{"startLine": 42, "startCol": 1, "endLine": 42, "endCol": 1, "newText": "[MyAttr]\\n"}],\n' ' "precondition_sha256": "<sha-from-step1>",\n' ' "options": {"refresh": "immediate", "validate": "standard"}\n' ' }\n' ' },\n' ' "note": "newText is for apply_text_edits ranges only; use replacement in script_apply_edits ops."\n' ' },\n' ' "examples": [\n' ' {\n' ' "title": "Replace a method",\n' ' "args": {\n' ' "name": "SmartReach",\n' ' "path": "Assets/Scripts/Interaction",\n' ' "edits": [\n' ' {"op":"replace_method","className":"SmartReach","methodName":"HasTarget","replacement":"public bool HasTarget() { return currentTarget != null; }"}\n' ' ],\n' ' "options": { "validate": "standard", "refresh": "immediate" }\n' ' }\n' ' },\n' ' {\n' ' "title": "Insert a method after another",\n' ' "args": {\n' ' "name": "SmartReach",\n' ' "path": "Assets/Scripts/Interaction",\n' ' "edits": [\n' ' {"op":"insert_method","className":"SmartReach","replacement":"public void PrintSeries() { Debug.Log(seriesName); }","position":"after","afterMethodName":"GetCurrentTarget"}\n' ' ]\n' ' }\n' ' }\n' ' ]\n' '}\n' ) sha = hashlib.sha256(spec_json.encode("utf-8")).hexdigest() return {"success": True, "data": {"text": spec_json, "metadata": {"sha256": sha}}} project = _resolve_project_root(project_root) p = _resolve_safe_path_from_uri(uri, project) if not p or not p.exists() or not p.is_file(): return {"success": False, "error": f"Resource not found: {uri}"} try: p.relative_to(project / "Assets") except ValueError: return {"success": False, "error": "Read restricted to Assets/"} # Natural-language convenience: request like "last 120 lines", "first 200 lines", # "show 40 lines around MethodName", etc. if request: req = request.strip().lower() m = re.search(r"last\s+(\d+)\s+lines", req) if m: tail_lines = int(m.group(1)) m = re.search(r"first\s+(\d+)\s+lines", req) if m: start_line = 1 line_count = int(m.group(1)) m = re.search(r"first\s+(\d+)\s*bytes", req) if m: head_bytes = int(m.group(1)) m = re.search( r"show\s+(\d+)\s+lines\s+around\s+([A-Za-z_][A-Za-z0-9_]*)", req) if m: window = int(m.group(1)) method = m.group(2) # naive search for method header to get a line number text_all = p.read_text(encoding="utf-8") lines_all = text_all.splitlines() pat = re.compile( rf"^\s*(?:\[[^\]]+\]\s*)*(?:public|private|protected|internal|static|virtual|override|sealed|async|extern|unsafe|new|partial).*?\b{re.escape(method)}\s*\(", re.MULTILINE) hit_line = None for i, line in enumerate(lines_all, start=1): if pat.search(line): hit_line = i break if hit_line: half = max(1, window // 2) start_line = max(1, hit_line - half) line_count = window # Coerce numeric inputs defensively (string/float -> int) start_line = _coerce_int(start_line) line_count = _coerce_int(line_count) head_bytes = _coerce_int(head_bytes, minimum=1) tail_lines = _coerce_int(tail_lines, minimum=1) # Compute SHA over full file contents (metadata-only default) full_bytes = p.read_bytes() full_sha = hashlib.sha256(full_bytes).hexdigest() # Selection only when explicitly requested via windowing args or request text hints selection_requested = bool(head_bytes or tail_lines or ( start_line is not None and line_count is not None) or request) if selection_requested: # Mutually exclusive windowing options precedence: # 1) head_bytes, 2) tail_lines, 3) start_line+line_count, else full text if head_bytes and head_bytes > 0: raw = full_bytes[: head_bytes] text = raw.decode("utf-8", errors="replace") else: text = full_bytes.decode("utf-8", errors="replace") if tail_lines is not None and tail_lines > 0: lines = text.splitlines() n = max(0, tail_lines) text = "\n".join(lines[-n:]) elif start_line is not None and line_count is not None and line_count >= 0: lines = text.splitlines() s = max(0, start_line - 1) e = min(len(lines), s + line_count) text = "\n".join(lines[s:e]) return {"success": True, "data": {"text": text, "metadata": {"sha256": full_sha, "lengthBytes": len(full_bytes)}}} else: # Default: metadata only return {"success": True, "data": {"metadata": {"sha256": full_sha, "lengthBytes": len(full_bytes)}}} except Exception as e: return {"success": False, "error": str(e)} @mcp_for_unity_tool(description="Searches a file with a regex pattern and returns line numbers and excerpts.") async def find_in_file( ctx: Context, uri: Annotated[str, "The resource URI to search under Assets/ or file path form supported by read_resource"], pattern: Annotated[str, "The regex pattern to search for"], ignore_case: Annotated[bool, "Case-insensitive search"] | None = True, project_root: Annotated[str, "The project root directory"] | None = None, max_results: Annotated[int, "Cap results to avoid huge payloads"] = 200, ) -> dict[str, Any]: ctx.info(f"Processing find_in_file: {uri}") try: project = _resolve_project_root(project_root) p = _resolve_safe_path_from_uri(uri, project) if not p or not p.exists() or not p.is_file(): return {"success": False, "error": f"Resource not found: {uri}"} text = p.read_text(encoding="utf-8") flags = re.MULTILINE if ignore_case: flags |= re.IGNORECASE rx = re.compile(pattern, flags) results = [] max_results_int = _coerce_int(max_results, default=200, minimum=1) lines = text.splitlines() for i, line in enumerate(lines, start=1): m = rx.search(line) if m: start_col = m.start() + 1 # 1-based end_col = m.end() + 1 # 1-based, end exclusive results.append({ "startLine": i, "startCol": start_col, "endLine": i, "endCol": end_col, }) if max_results_int and len(results) >= max_results_int: break return {"success": True, "data": {"matches": results, "count": len(results)}} except Exception as e: return {"success": False, "error": str(e)} ``` -------------------------------------------------------------------------------- /MCPForUnity/UnityMcpServer~/src/tools/resource_tools.py: -------------------------------------------------------------------------------- ```python """ Resource wrapper tools so clients that do not expose MCP resources primitives can still list and read files via normal tools. These call into the same safe path logic (re-implemented here to avoid importing server.py). """ import fnmatch import hashlib import os from pathlib import Path import re from typing import Annotated, Any from urllib.parse import urlparse, unquote from mcp.server.fastmcp import Context from registry import mcp_for_unity_tool from unity_connection import send_command_with_retry def _coerce_int(value: Any, default: int | None = None, minimum: int | None = None) -> int | None: """Safely coerce various inputs (str/float/etc.) to an int. Returns default on failure; clamps to minimum when provided. """ if value is None: return default try: # Avoid treating booleans as ints implicitly if isinstance(value, bool): return default if isinstance(value, int): result = int(value) else: s = str(value).strip() if s.lower() in ("", "none", "null"): return default # Allow "10.0" or similar inputs result = int(float(s)) if minimum is not None and result < minimum: return minimum return result except Exception: return default def _resolve_project_root(override: str | None) -> Path: # 1) Explicit override if override: pr = Path(override).expanduser().resolve() if (pr / "Assets").exists(): return pr # 2) Environment env = os.environ.get("UNITY_PROJECT_ROOT") if env: env_path = Path(env).expanduser() # If UNITY_PROJECT_ROOT is relative, resolve against repo root (cwd's repo) instead of src dir pr = (Path.cwd( ) / env_path).resolve() if not env_path.is_absolute() else env_path.resolve() if (pr / "Assets").exists(): return pr # 3) Ask Unity via manage_editor.get_project_root try: resp = send_command_with_retry( "manage_editor", {"action": "get_project_root"}) if isinstance(resp, dict) and resp.get("success"): pr = Path(resp.get("data", {}).get( "projectRoot", "")).expanduser().resolve() if pr and (pr / "Assets").exists(): return pr except Exception: pass # 4) Walk up from CWD to find a Unity project (Assets + ProjectSettings) cur = Path.cwd().resolve() for _ in range(6): if (cur / "Assets").exists() and (cur / "ProjectSettings").exists(): return cur if cur.parent == cur: break cur = cur.parent # 5) Search downwards (shallow) from repo root for first folder with Assets + ProjectSettings try: import os as _os root = Path.cwd().resolve() max_depth = 3 for dirpath, dirnames, _ in _os.walk(root): rel = Path(dirpath).resolve() try: depth = len(rel.relative_to(root).parts) except Exception: # Unrelated mount/permission edge; skip deeper traversal dirnames[:] = [] continue if depth > max_depth: # Prune deeper traversal dirnames[:] = [] continue if (rel / "Assets").exists() and (rel / "ProjectSettings").exists(): return rel except Exception: pass # 6) Fallback: CWD return Path.cwd().resolve() def _resolve_safe_path_from_uri(uri: str, project: Path) -> Path | None: raw: str | None = None if uri.startswith("unity://path/"): raw = uri[len("unity://path/"):] elif uri.startswith("file://"): parsed = urlparse(uri) raw = unquote(parsed.path or "") # On Windows, urlparse('file:///C:/x') -> path='/C:/x'. Strip the leading slash for drive letters. try: import os as _os if _os.name == "nt" and raw.startswith("/") and re.match(r"^/[A-Za-z]:/", raw): raw = raw[1:] # UNC paths: file://server/share -> netloc='server', path='/share'. Treat as \\\\server/share if _os.name == "nt" and parsed.netloc: raw = f"//{parsed.netloc}{raw}" except Exception: pass elif uri.startswith("Assets/"): raw = uri if raw is None: return None # Normalize separators early raw = raw.replace("\\", "/") p = (project / raw).resolve() try: p.relative_to(project) except ValueError: return None return p @mcp_for_unity_tool(description=("List project URIs (unity://path/...) under a folder (default: Assets). Only .cs files are returned by default; always appends unity://spec/script-edits.\n")) async def list_resources( ctx: Context, pattern: Annotated[str, "Glob, default is *.cs"] | None = "*.cs", under: Annotated[str, "Folder under project root, default is Assets"] = "Assets", limit: Annotated[int, "Page limit"] = 200, project_root: Annotated[str, "Project path"] | None = None, ) -> dict[str, Any]: ctx.info(f"Processing list_resources: {pattern}") try: project = _resolve_project_root(project_root) base = (project / under).resolve() try: base.relative_to(project) except ValueError: return {"success": False, "error": "Base path must be under project root"} # Enforce listing only under Assets try: base.relative_to(project / "Assets") except ValueError: return {"success": False, "error": "Listing is restricted to Assets/"} matches: list[str] = [] limit_int = _coerce_int(limit, default=200, minimum=1) for p in base.rglob("*"): if not p.is_file(): continue # Resolve symlinks and ensure the real path stays under project/Assets try: rp = p.resolve() rp.relative_to(project / "Assets") except Exception: continue # Enforce .cs extension regardless of provided pattern if p.suffix.lower() != ".cs": continue if pattern and not fnmatch.fnmatch(p.name, pattern): continue rel = p.relative_to(project).as_posix() matches.append(f"unity://path/{rel}") if len(matches) >= max(1, limit_int): break # Always include the canonical spec resource so NL clients can discover it if "unity://spec/script-edits" not in matches: matches.append("unity://spec/script-edits") return {"success": True, "data": {"uris": matches, "count": len(matches)}} except Exception as e: return {"success": False, "error": str(e)} @mcp_for_unity_tool(description=("Reads a resource by unity://path/... URI with optional slicing.")) async def read_resource( ctx: Context, uri: Annotated[str, "The resource URI to read under Assets/"], start_line: Annotated[int, "The starting line number (0-based)"] | None = None, line_count: Annotated[int, "The number of lines to read"] | None = None, head_bytes: Annotated[int, "The number of bytes to read from the start of the file"] | None = None, tail_lines: Annotated[int, "The number of lines to read from the end of the file"] | None = None, project_root: Annotated[str, "The project root directory"] | None = None, request: Annotated[str, "The request ID"] | None = None, ) -> dict[str, Any]: ctx.info(f"Processing read_resource: {uri}") try: # Serve the canonical spec directly when requested (allow bare or with scheme) if uri in ("unity://spec/script-edits", "spec/script-edits", "script-edits"): spec_json = ( '{\n' ' "name": "MCP for Unity - Script Edits v1",\n' ' "target_tool": "script_apply_edits",\n' ' "canonical_rules": {\n' ' "always_use": ["op","className","methodName","replacement","afterMethodName","beforeMethodName"],\n' ' "never_use": ["new_method","anchor_method","content","newText"],\n' ' "defaults": {\n' ' "className": "\u2190 server will default to \'name\' when omitted",\n' ' "position": "end"\n' ' }\n' ' },\n' ' "ops": [\n' ' {"op":"replace_method","required":["className","methodName","replacement"],"optional":["returnType","parametersSignature","attributesContains"],"examples":[{"note":"match overload by signature","parametersSignature":"(int a, string b)"},{"note":"ensure attributes retained","attributesContains":"ContextMenu"}]},\n' ' {"op":"insert_method","required":["className","replacement"],"position":{"enum":["start","end","after","before"],"after_requires":"afterMethodName","before_requires":"beforeMethodName"}},\n' ' {"op":"delete_method","required":["className","methodName"]},\n' ' {"op":"anchor_insert","required":["anchor","text"],"notes":"regex; position=before|after"}\n' ' ],\n' ' "apply_text_edits_recipe": {\n' ' "step1_read": { "tool": "resources/read", "args": {"uri": "unity://path/Assets/Scripts/Interaction/SmartReach.cs"} },\n' ' "step2_apply": {\n' ' "tool": "manage_script",\n' ' "args": {\n' ' "action": "apply_text_edits",\n' ' "name": "SmartReach", "path": "Assets/Scripts/Interaction",\n' ' "edits": [{"startLine": 42, "startCol": 1, "endLine": 42, "endCol": 1, "newText": "[MyAttr]\\n"}],\n' ' "precondition_sha256": "<sha-from-step1>",\n' ' "options": {"refresh": "immediate", "validate": "standard"}\n' ' }\n' ' },\n' ' "note": "newText is for apply_text_edits ranges only; use replacement in script_apply_edits ops."\n' ' },\n' ' "examples": [\n' ' {\n' ' "title": "Replace a method",\n' ' "args": {\n' ' "name": "SmartReach",\n' ' "path": "Assets/Scripts/Interaction",\n' ' "edits": [\n' ' {"op":"replace_method","className":"SmartReach","methodName":"HasTarget","replacement":"public bool HasTarget() { return currentTarget != null; }"}\n' ' ],\n' ' "options": { "validate": "standard", "refresh": "immediate" }\n' ' }\n' ' },\n' ' {\n' ' "title": "Insert a method after another",\n' ' "args": {\n' ' "name": "SmartReach",\n' ' "path": "Assets/Scripts/Interaction",\n' ' "edits": [\n' ' {"op":"insert_method","className":"SmartReach","replacement":"public void PrintSeries() { Debug.Log(seriesName); }","position":"after","afterMethodName":"GetCurrentTarget"}\n' ' ]\n' ' }\n' ' }\n' ' ]\n' '}\n' ) sha = hashlib.sha256(spec_json.encode("utf-8")).hexdigest() return {"success": True, "data": {"text": spec_json, "metadata": {"sha256": sha}}} project = _resolve_project_root(project_root) p = _resolve_safe_path_from_uri(uri, project) if not p or not p.exists() or not p.is_file(): return {"success": False, "error": f"Resource not found: {uri}"} try: p.relative_to(project / "Assets") except ValueError: return {"success": False, "error": "Read restricted to Assets/"} # Natural-language convenience: request like "last 120 lines", "first 200 lines", # "show 40 lines around MethodName", etc. if request: req = request.strip().lower() m = re.search(r"last\s+(\d+)\s+lines", req) if m: tail_lines = int(m.group(1)) m = re.search(r"first\s+(\d+)\s+lines", req) if m: start_line = 1 line_count = int(m.group(1)) m = re.search(r"first\s+(\d+)\s*bytes", req) if m: head_bytes = int(m.group(1)) m = re.search( r"show\s+(\d+)\s+lines\s+around\s+([A-Za-z_][A-Za-z0-9_]*)", req) if m: window = int(m.group(1)) method = m.group(2) # naive search for method header to get a line number text_all = p.read_text(encoding="utf-8") lines_all = text_all.splitlines() pat = re.compile( rf"^\s*(?:\[[^\]]+\]\s*)*(?:public|private|protected|internal|static|virtual|override|sealed|async|extern|unsafe|new|partial).*?\b{re.escape(method)}\s*\(", re.MULTILINE) hit_line = None for i, line in enumerate(lines_all, start=1): if pat.search(line): hit_line = i break if hit_line: half = max(1, window // 2) start_line = max(1, hit_line - half) line_count = window # Coerce numeric inputs defensively (string/float -> int) start_line = _coerce_int(start_line) line_count = _coerce_int(line_count) head_bytes = _coerce_int(head_bytes, minimum=1) tail_lines = _coerce_int(tail_lines, minimum=1) # Compute SHA over full file contents (metadata-only default) full_bytes = p.read_bytes() full_sha = hashlib.sha256(full_bytes).hexdigest() # Selection only when explicitly requested via windowing args or request text hints selection_requested = bool(head_bytes or tail_lines or ( start_line is not None and line_count is not None) or request) if selection_requested: # Mutually exclusive windowing options precedence: # 1) head_bytes, 2) tail_lines, 3) start_line+line_count, else full text if head_bytes and head_bytes > 0: raw = full_bytes[: head_bytes] text = raw.decode("utf-8", errors="replace") else: text = full_bytes.decode("utf-8", errors="replace") if tail_lines is not None and tail_lines > 0: lines = text.splitlines() n = max(0, tail_lines) text = "\n".join(lines[-n:]) elif start_line is not None and line_count is not None and line_count >= 0: lines = text.splitlines() s = max(0, start_line - 1) e = min(len(lines), s + line_count) text = "\n".join(lines[s:e]) return {"success": True, "data": {"text": text, "metadata": {"sha256": full_sha, "lengthBytes": len(full_bytes)}}} else: # Default: metadata only return {"success": True, "data": {"metadata": {"sha256": full_sha, "lengthBytes": len(full_bytes)}}} except Exception as e: return {"success": False, "error": str(e)} @mcp_for_unity_tool(description="Searches a file with a regex pattern and returns line numbers and excerpts.") async def find_in_file( ctx: Context, uri: Annotated[str, "The resource URI to search under Assets/ or file path form supported by read_resource"], pattern: Annotated[str, "The regex pattern to search for"], ignore_case: Annotated[bool, "Case-insensitive search"] | None = True, project_root: Annotated[str, "The project root directory"] | None = None, max_results: Annotated[int, "Cap results to avoid huge payloads"] = 200, ) -> dict[str, Any]: ctx.info(f"Processing find_in_file: {uri}") try: project = _resolve_project_root(project_root) p = _resolve_safe_path_from_uri(uri, project) if not p or not p.exists() or not p.is_file(): return {"success": False, "error": f"Resource not found: {uri}"} text = p.read_text(encoding="utf-8") flags = re.MULTILINE if ignore_case: flags |= re.IGNORECASE rx = re.compile(pattern, flags) results = [] max_results_int = _coerce_int(max_results, default=200, minimum=1) lines = text.splitlines() for i, line in enumerate(lines, start=1): m = rx.search(line) if m: start_col = m.start() + 1 # 1-based end_col = m.end() + 1 # 1-based, end exclusive results.append({ "startLine": i, "startCol": start_col, "endLine": i, "endCol": end_col, }) if max_results_int and len(results) >= max_results_int: break return {"success": True, "data": {"matches": results, "count": len(results)}} except Exception as e: return {"success": False, "error": str(e)} ``` -------------------------------------------------------------------------------- /MCPForUnity/UnityMcpServer~/src/telemetry.py: -------------------------------------------------------------------------------- ```python """ Privacy-focused, anonymous telemetry system for MCP for Unity Inspired by Onyx's telemetry implementation with Unity-specific adaptations Fire-and-forget telemetry sender with a single background worker. - No context/thread-local propagation to avoid re-entrancy into tool resolution. - Small network timeouts to prevent stalls. """ import contextlib from dataclasses import dataclass from enum import Enum import importlib import json import logging import os from pathlib import Path import platform import queue import sys import threading import time from typing import Any from urllib.parse import urlparse import uuid import tomli try: import httpx HAS_HTTPX = True except ImportError: httpx = None # type: ignore HAS_HTTPX = False logger = logging.getLogger("unity-mcp-telemetry") def get_package_version() -> str: """ Open pyproject.toml and parse version We use the tomli library instead of tomllib to support Python 3.10 """ with open("pyproject.toml", "rb") as f: data = tomli.load(f) return data["project"]["version"] MCP_VERSION = get_package_version() class RecordType(str, Enum): """Types of telemetry records we collect""" VERSION = "version" STARTUP = "startup" USAGE = "usage" LATENCY = "latency" FAILURE = "failure" RESOURCE_RETRIEVAL = "resource_retrieval" TOOL_EXECUTION = "tool_execution" UNITY_CONNECTION = "unity_connection" CLIENT_CONNECTION = "client_connection" class MilestoneType(str, Enum): """Major user journey milestones""" FIRST_STARTUP = "first_startup" FIRST_TOOL_USAGE = "first_tool_usage" FIRST_SCRIPT_CREATION = "first_script_creation" FIRST_SCENE_MODIFICATION = "first_scene_modification" MULTIPLE_SESSIONS = "multiple_sessions" DAILY_ACTIVE_USER = "daily_active_user" WEEKLY_ACTIVE_USER = "weekly_active_user" @dataclass class TelemetryRecord: """Structure for telemetry data""" record_type: RecordType timestamp: float customer_uuid: str session_id: str data: dict[str, Any] milestone: MilestoneType | None = None class TelemetryConfig: """Telemetry configuration""" def __init__(self): """ Prefer config file, then allow env overrides """ server_config = None for modname in ( "MCPForUnity.UnityMcpServer~.src.config", "MCPForUnity.UnityMcpServer.src.config", "src.config", "config", ): try: mod = importlib.import_module(modname) server_config = getattr(mod, "config", None) if server_config is not None: break except Exception: continue # Determine enabled flag: config -> env DISABLE_* opt-out cfg_enabled = True if server_config is None else bool( getattr(server_config, "telemetry_enabled", True)) self.enabled = cfg_enabled and not self._is_disabled() # Telemetry endpoint (Cloud Run default; override via env) cfg_default = None if server_config is None else getattr( server_config, "telemetry_endpoint", None) default_ep = cfg_default or "https://api-prod.coplay.dev/telemetry/events" self.default_endpoint = default_ep self.endpoint = self._validated_endpoint( os.environ.get("UNITY_MCP_TELEMETRY_ENDPOINT", default_ep), default_ep, ) try: logger.info( "Telemetry configured: endpoint=%s (default=%s), timeout_env=%s", self.endpoint, default_ep, os.environ.get("UNITY_MCP_TELEMETRY_TIMEOUT") or "<unset>" ) except Exception: pass # Local storage for UUID and milestones self.data_dir = self._get_data_directory() self.uuid_file = self.data_dir / "customer_uuid.txt" self.milestones_file = self.data_dir / "milestones.json" # Request timeout (small, fail fast). Override with UNITY_MCP_TELEMETRY_TIMEOUT try: self.timeout = float(os.environ.get( "UNITY_MCP_TELEMETRY_TIMEOUT", "1.5")) except Exception: self.timeout = 1.5 try: logger.info("Telemetry timeout=%.2fs", self.timeout) except Exception: pass # Session tracking self.session_id = str(uuid.uuid4()) def _is_disabled(self) -> bool: """Check if telemetry is disabled via environment variables""" disable_vars = [ "DISABLE_TELEMETRY", "UNITY_MCP_DISABLE_TELEMETRY", "MCP_DISABLE_TELEMETRY" ] for var in disable_vars: if os.environ.get(var, "").lower() in ("true", "1", "yes", "on"): return True return False def _get_data_directory(self) -> Path: """Get directory for storing telemetry data""" if os.name == 'nt': # Windows base_dir = Path(os.environ.get( 'APPDATA', Path.home() / 'AppData' / 'Roaming')) elif os.name == 'posix': # macOS/Linux if 'darwin' in os.uname().sysname.lower(): # macOS base_dir = Path.home() / 'Library' / 'Application Support' else: # Linux base_dir = Path(os.environ.get('XDG_DATA_HOME', Path.home() / '.local' / 'share')) else: base_dir = Path.home() / '.unity-mcp' data_dir = base_dir / 'UnityMCP' data_dir.mkdir(parents=True, exist_ok=True) return data_dir def _validated_endpoint(self, candidate: str, fallback: str) -> str: """Validate telemetry endpoint URL scheme; allow only http/https. Falls back to the provided default on error. """ try: parsed = urlparse(candidate) if parsed.scheme not in ("https", "http"): raise ValueError(f"Unsupported scheme: {parsed.scheme}") # Basic sanity: require network location and path if not parsed.netloc: raise ValueError("Missing netloc in endpoint") # Reject localhost/loopback endpoints in production to avoid accidental local overrides host = parsed.hostname or "" if host in ("localhost", "127.0.0.1", "::1"): raise ValueError( "Localhost endpoints are not allowed for telemetry") return candidate except Exception as e: logger.debug( f"Invalid telemetry endpoint '{candidate}', using default. Error: {e}", exc_info=True, ) return fallback class TelemetryCollector: """Main telemetry collection class""" def __init__(self): self.config = TelemetryConfig() self._customer_uuid: str | None = None self._milestones: dict[str, dict[str, Any]] = {} self._lock: threading.Lock = threading.Lock() # Bounded queue with single background worker (records only; no context propagation) self._queue: "queue.Queue[TelemetryRecord]" = queue.Queue(maxsize=1000) # Load persistent data before starting worker so first events have UUID self._load_persistent_data() self._worker: threading.Thread = threading.Thread( target=self._worker_loop, daemon=True) self._worker.start() def _load_persistent_data(self): """Load UUID and milestones from disk""" # Load customer UUID try: if self.config.uuid_file.exists(): self._customer_uuid = self.config.uuid_file.read_text( encoding="utf-8").strip() or str(uuid.uuid4()) else: self._customer_uuid = str(uuid.uuid4()) try: self.config.uuid_file.write_text( self._customer_uuid, encoding="utf-8") if os.name == "posix": os.chmod(self.config.uuid_file, 0o600) except OSError as e: logger.debug( f"Failed to persist customer UUID: {e}", exc_info=True) except OSError as e: logger.debug(f"Failed to load customer UUID: {e}", exc_info=True) self._customer_uuid = str(uuid.uuid4()) # Load milestones (failure here must not affect UUID) try: if self.config.milestones_file.exists(): content = self.config.milestones_file.read_text( encoding="utf-8") self._milestones = json.loads(content) or {} if not isinstance(self._milestones, dict): self._milestones = {} except (OSError, json.JSONDecodeError, ValueError) as e: logger.debug(f"Failed to load milestones: {e}", exc_info=True) self._milestones = {} def _save_milestones(self): """Save milestones to disk. Caller must hold self._lock.""" try: self.config.milestones_file.write_text( json.dumps(self._milestones, indent=2), encoding="utf-8", ) except OSError as e: logger.warning(f"Failed to save milestones: {e}", exc_info=True) def record_milestone(self, milestone: MilestoneType, data: dict[str, Any] | None = None) -> bool: """Record a milestone event, returns True if this is the first occurrence""" if not self.config.enabled: return False milestone_key = milestone.value with self._lock: if milestone_key in self._milestones: return False # Already recorded milestone_data = { "timestamp": time.time(), "data": data or {}, } self._milestones[milestone_key] = milestone_data self._save_milestones() # Also send as telemetry record self.record( record_type=RecordType.USAGE, data={"milestone": milestone_key, **(data or {})}, milestone=milestone ) return True def record(self, record_type: RecordType, data: dict[str, Any], milestone: MilestoneType | None = None): """Record a telemetry event (async, non-blocking)""" if not self.config.enabled: return # Allow fallback sender when httpx is unavailable (no early return) record = TelemetryRecord( record_type=record_type, timestamp=time.time(), customer_uuid=self._customer_uuid or "unknown", session_id=self.config.session_id, data=data, milestone=milestone ) # Enqueue for background worker (non-blocking). Drop on backpressure. try: self._queue.put_nowait(record) except queue.Full: logger.debug("Telemetry queue full; dropping %s", record.record_type) def _worker_loop(self): """Background worker that serializes telemetry sends.""" while True: rec = self._queue.get() try: # Run sender directly; do not reuse caller context/thread-locals self._send_telemetry(rec) except Exception: logger.debug("Telemetry worker send failed", exc_info=True) finally: with contextlib.suppress(Exception): self._queue.task_done() def _send_telemetry(self, record: TelemetryRecord): """Send telemetry data to endpoint""" try: # System fingerprint (top-level remains concise; details stored in data JSON) _platform = platform.system() # 'Darwin' | 'Linux' | 'Windows' _source = sys.platform # 'darwin' | 'linux' | 'win32' _platform_detail = f"{_platform} {platform.release()} ({platform.machine()})" _python_version = platform.python_version() # Enrich data JSON so BigQuery stores detailed fields without schema change enriched_data = dict(record.data or {}) enriched_data.setdefault("platform_detail", _platform_detail) enriched_data.setdefault("python_version", _python_version) payload = { "record": record.record_type.value, "timestamp": record.timestamp, "customer_uuid": record.customer_uuid, "session_id": record.session_id, "data": enriched_data, "version": MCP_VERSION, "platform": _platform, "source": _source, } if record.milestone: payload["milestone"] = record.milestone.value # Prefer httpx when available; otherwise fall back to urllib if httpx: with httpx.Client(timeout=self.config.timeout) as client: # Re-validate endpoint at send time to handle dynamic changes endpoint = self.config._validated_endpoint( self.config.endpoint, self.config.default_endpoint) response = client.post(endpoint, json=payload) if 200 <= response.status_code < 300: logger.debug(f"Telemetry sent: {record.record_type}") else: logger.warning( f"Telemetry failed: HTTP {response.status_code}") else: import urllib.request import urllib.error data_bytes = json.dumps(payload).encode("utf-8") endpoint = self.config._validated_endpoint( self.config.endpoint, self.config.default_endpoint) req = urllib.request.Request( endpoint, data=data_bytes, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=self.config.timeout) as resp: if 200 <= resp.getcode() < 300: logger.debug( f"Telemetry sent (urllib): {record.record_type}") else: logger.warning( f"Telemetry failed (urllib): HTTP {resp.getcode()}") except urllib.error.URLError as ue: logger.warning(f"Telemetry send failed (urllib): {ue}") except Exception as e: # Never let telemetry errors interfere with app functionality logger.debug(f"Telemetry send failed: {e}") # Global telemetry instance _telemetry_collector: TelemetryCollector | None = None def get_telemetry() -> TelemetryCollector: """Get the global telemetry collector instance""" global _telemetry_collector if _telemetry_collector is None: _telemetry_collector = TelemetryCollector() return _telemetry_collector def record_telemetry(record_type: RecordType, data: dict[str, Any], milestone: MilestoneType | None = None): """Convenience function to record telemetry""" get_telemetry().record(record_type, data, milestone) def record_milestone(milestone: MilestoneType, data: dict[str, Any] | None = None) -> bool: """Convenience function to record a milestone""" return get_telemetry().record_milestone(milestone, data) def record_tool_usage(tool_name: str, success: bool, duration_ms: float, error: str | None = None, sub_action: str | None = None): """Record tool usage telemetry Args: tool_name: Name of the tool invoked (e.g., 'manage_scene'). success: Whether the tool completed successfully. duration_ms: Execution duration in milliseconds. error: Optional error message (truncated if present). sub_action: Optional sub-action/operation within the tool (e.g., 'get_hierarchy'). """ data = { "tool_name": tool_name, "success": success, "duration_ms": round(duration_ms, 2) } if sub_action is not None: try: data["sub_action"] = str(sub_action) except Exception: # Ensure telemetry is never disruptive data["sub_action"] = "unknown" if error: data["error"] = str(error)[:200] # Limit error message length record_telemetry(RecordType.TOOL_EXECUTION, data) def record_resource_usage(resource_name: str, success: bool, duration_ms: float, error: str | None = None): """Record resource usage telemetry Args: resource_name: Name of the resource invoked (e.g., 'get_tests'). success: Whether the resource completed successfully. duration_ms: Execution duration in milliseconds. error: Optional error message (truncated if present). """ data = { "resource_name": resource_name, "success": success, "duration_ms": round(duration_ms, 2) } if error: data["error"] = str(error)[:200] # Limit error message length record_telemetry(RecordType.RESOURCE_RETRIEVAL, data) def record_latency(operation: str, duration_ms: float, metadata: dict[str, Any] | None = None): """Record latency telemetry""" data = { "operation": operation, "duration_ms": round(duration_ms, 2) } if metadata: data.update(metadata) record_telemetry(RecordType.LATENCY, data) def record_failure(component: str, error: str, metadata: dict[str, Any] | None = None): """Record failure telemetry""" data = { "component": component, "error": str(error)[:500] # Limit error message length } if metadata: data.update(metadata) record_telemetry(RecordType.FAILURE, data) def is_telemetry_enabled() -> bool: """Check if telemetry is enabled""" return get_telemetry().config.enabled ``` -------------------------------------------------------------------------------- /MCPForUnity/UnityMcpServer~/src/unity_connection.py: -------------------------------------------------------------------------------- ```python from config import config import contextlib from dataclasses import dataclass import errno import json import logging from pathlib import Path from port_discovery import PortDiscovery import random import socket import struct import threading import time from typing import Any, Dict from models import MCPResponse # Configure logging using settings from config logging.basicConfig( level=getattr(logging, config.log_level), format=config.log_format ) logger = logging.getLogger("mcp-for-unity-server") # Module-level lock to guard global connection initialization _connection_lock = threading.Lock() # Maximum allowed framed payload size (64 MiB) FRAMED_MAX = 64 * 1024 * 1024 @dataclass class UnityConnection: """Manages the socket connection to the Unity Editor.""" host: str = config.unity_host port: int = None # Will be set dynamically sock: socket.socket = None # Socket for Unity communication use_framing: bool = False # Negotiated per-connection def __post_init__(self): """Set port from discovery if not explicitly provided""" if self.port is None: self.port = PortDiscovery.discover_unity_port() self._io_lock = threading.Lock() self._conn_lock = threading.Lock() def connect(self) -> bool: """Establish a connection to the Unity Editor.""" if self.sock: return True with self._conn_lock: if self.sock: return True try: # Bounded connect to avoid indefinite blocking connect_timeout = float( getattr(config, "connect_timeout", getattr(config, "connection_timeout", 1.0))) self.sock = socket.create_connection( (self.host, self.port), connect_timeout) # Disable Nagle's algorithm to reduce small RPC latency with contextlib.suppress(Exception): self.sock.setsockopt( socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) logger.debug(f"Connected to Unity at {self.host}:{self.port}") # Strict handshake: require FRAMING=1 try: require_framing = getattr(config, "require_framing", True) timeout = float(getattr(config, "handshake_timeout", 1.0)) self.sock.settimeout(timeout) buf = bytearray() deadline = time.monotonic() + timeout while time.monotonic() < deadline and len(buf) < 512: try: chunk = self.sock.recv(256) if not chunk: break buf.extend(chunk) if b"\n" in buf: break except socket.timeout: break text = bytes(buf).decode('ascii', errors='ignore').strip() if 'FRAMING=1' in text: self.use_framing = True logger.debug( 'MCP for Unity handshake received: FRAMING=1 (strict)') else: if require_framing: # Best-effort plain-text advisory for legacy peers with contextlib.suppress(Exception): self.sock.sendall( b'MCP for Unity requires FRAMING=1\n') raise ConnectionError( f'MCP for Unity requires FRAMING=1, got: {text!r}') else: self.use_framing = False logger.warning( 'MCP for Unity handshake missing FRAMING=1; proceeding in legacy mode by configuration') finally: self.sock.settimeout(config.connection_timeout) return True except Exception as e: logger.error(f"Failed to connect to Unity: {str(e)}") try: if self.sock: self.sock.close() except Exception: pass self.sock = None return False def disconnect(self): """Close the connection to the Unity Editor.""" if self.sock: try: self.sock.close() except Exception as e: logger.error(f"Error disconnecting from Unity: {str(e)}") finally: self.sock = None def _read_exact(self, sock: socket.socket, count: int) -> bytes: data = bytearray() while len(data) < count: chunk = sock.recv(count - len(data)) if not chunk: raise ConnectionError( "Connection closed before reading expected bytes") data.extend(chunk) return bytes(data) def receive_full_response(self, sock, buffer_size=config.buffer_size) -> bytes: """Receive a complete response from Unity, handling chunked data.""" if self.use_framing: try: # Consume heartbeats, but do not hang indefinitely if only zero-length frames arrive heartbeat_count = 0 deadline = time.monotonic() + getattr(config, 'framed_receive_timeout', 2.0) while True: header = self._read_exact(sock, 8) payload_len = struct.unpack('>Q', header)[0] if payload_len == 0: # Heartbeat/no-op frame: consume and continue waiting for a data frame logger.debug("Received heartbeat frame (length=0)") heartbeat_count += 1 if heartbeat_count >= getattr(config, 'max_heartbeat_frames', 16) or time.monotonic() > deadline: # Treat as empty successful response to match C# server behavior logger.debug( "Heartbeat threshold reached; returning empty response") return b"" continue if payload_len > FRAMED_MAX: raise ValueError( f"Invalid framed length: {payload_len}") payload = self._read_exact(sock, payload_len) logger.debug( f"Received framed response ({len(payload)} bytes)") return payload except socket.timeout as e: logger.warning("Socket timeout during framed receive") raise TimeoutError("Timeout receiving Unity response") from e except Exception as e: logger.error(f"Error during framed receive: {str(e)}") raise chunks = [] # Respect the socket's currently configured timeout try: while True: chunk = sock.recv(buffer_size) if not chunk: if not chunks: raise Exception( "Connection closed before receiving data") break chunks.append(chunk) # Process the data received so far data = b''.join(chunks) decoded_data = data.decode('utf-8') # Check if we've received a complete response try: # Special case for ping-pong if decoded_data.strip().startswith('{"status":"success","result":{"message":"pong"'): logger.debug("Received ping response") return data # Handle escaped quotes in the content if '"content":' in decoded_data: # Find the content field and its value content_start = decoded_data.find('"content":') + 9 content_end = decoded_data.rfind('"', content_start) if content_end > content_start: # Replace escaped quotes in content with regular quotes content = decoded_data[content_start:content_end] content = content.replace('\\"', '"') decoded_data = decoded_data[:content_start] + \ content + decoded_data[content_end:] # Validate JSON format json.loads(decoded_data) # If we get here, we have valid JSON logger.info( f"Received complete response ({len(data)} bytes)") return data except json.JSONDecodeError: # We haven't received a complete valid JSON response yet continue except Exception as e: logger.warning( f"Error processing response chunk: {str(e)}") # Continue reading more chunks as this might not be the complete response continue except socket.timeout: logger.warning("Socket timeout during receive") raise Exception("Timeout receiving Unity response") except Exception as e: logger.error(f"Error during receive: {str(e)}") raise def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """Send a command with retry/backoff and port rediscovery. Pings only when requested.""" # Defensive guard: catch empty/placeholder invocations early if not command_type: raise ValueError("MCP call missing command_type") if params is None: return MCPResponse(success=False, error="MCP call received with no parameters (client placeholder?)") attempts = max(config.max_retries, 5) base_backoff = max(0.5, config.retry_delay) def read_status_file() -> dict | None: try: status_files = sorted(Path.home().joinpath( '.unity-mcp').glob('unity-mcp-status-*.json'), key=lambda p: p.stat().st_mtime, reverse=True) if not status_files: return None latest = status_files[0] with latest.open('r') as f: return json.load(f) except Exception: return None last_short_timeout = None # Preflight: if Unity reports reloading, return a structured hint so clients can retry politely try: status = read_status_file() if status and (status.get('reloading') or status.get('reason') == 'reloading'): return MCPResponse( success=False, error="Unity domain reload in progress, please try again shortly", data={"state": "reloading", "retry_after_ms": int( config.reload_retry_ms)} ) except Exception: pass for attempt in range(attempts + 1): try: # Ensure connected (handshake occurs within connect()) if not self.sock and not self.connect(): raise Exception("Could not connect to Unity") # Build payload if command_type == 'ping': payload = b'ping' else: command = {"type": command_type, "params": params or {}} payload = json.dumps( command, ensure_ascii=False).encode('utf-8') # Send/receive are serialized to protect the shared socket with self._io_lock: mode = 'framed' if self.use_framing else 'legacy' with contextlib.suppress(Exception): logger.debug( "send %d bytes; mode=%s; head=%s", len(payload), mode, (payload[:32]).decode('utf-8', 'ignore'), ) if self.use_framing: header = struct.pack('>Q', len(payload)) self.sock.sendall(header) self.sock.sendall(payload) else: self.sock.sendall(payload) # During retry bursts use a short receive timeout and ensure restoration restore_timeout = None if attempt > 0 and last_short_timeout is None: restore_timeout = self.sock.gettimeout() self.sock.settimeout(1.0) try: response_data = self.receive_full_response(self.sock) with contextlib.suppress(Exception): logger.debug("recv %d bytes; mode=%s", len(response_data), mode) finally: if restore_timeout is not None: self.sock.settimeout(restore_timeout) last_short_timeout = None # Parse if command_type == 'ping': resp = json.loads(response_data.decode('utf-8')) if resp.get('status') == 'success' and resp.get('result', {}).get('message') == 'pong': return {"message": "pong"} raise Exception("Ping unsuccessful") resp = json.loads(response_data.decode('utf-8')) if resp.get('status') == 'error': err = resp.get('error') or resp.get( 'message', 'Unknown Unity error') raise Exception(err) return resp.get('result', {}) except Exception as e: logger.warning( f"Unity communication attempt {attempt+1} failed: {e}") try: if self.sock: self.sock.close() finally: self.sock = None # Re-discover port each time try: new_port = PortDiscovery.discover_unity_port() if new_port != self.port: logger.info( f"Unity port changed {self.port} -> {new_port}") self.port = new_port except Exception as de: logger.debug(f"Port discovery failed: {de}") if attempt < attempts: # Heartbeat-aware, jittered backoff status = read_status_file() # Base exponential backoff backoff = base_backoff * (2 ** attempt) # Decorrelated jitter multiplier jitter = random.uniform(0.1, 0.3) # Fast‑retry for transient socket failures fast_error = isinstance( e, (ConnectionRefusedError, ConnectionResetError, TimeoutError)) if not fast_error: try: err_no = getattr(e, 'errno', None) fast_error = err_no in ( errno.ECONNREFUSED, errno.ECONNRESET, errno.ETIMEDOUT) except Exception: pass # Cap backoff depending on state if status and status.get('reloading'): cap = 0.8 elif fast_error: cap = 0.25 else: cap = 3.0 sleep_s = min(cap, jitter * (2 ** attempt)) time.sleep(sleep_s) continue raise # Global Unity connection _unity_connection = None def get_unity_connection() -> UnityConnection: """Retrieve or establish a persistent Unity connection. Note: Do NOT ping on every retrieval to avoid connection storms. Rely on send_command() exceptions to detect broken sockets and reconnect there. """ global _unity_connection if _unity_connection is not None: return _unity_connection # Double-checked locking to avoid concurrent socket creation with _connection_lock: if _unity_connection is not None: return _unity_connection logger.info("Creating new Unity connection") _unity_connection = UnityConnection() if not _unity_connection.connect(): _unity_connection = None raise ConnectionError( "Could not connect to Unity. Ensure the Unity Editor and MCP Bridge are running.") logger.info("Connected to Unity on startup") return _unity_connection # ----------------------------- # Centralized retry helpers # ----------------------------- def _is_reloading_response(resp: dict) -> bool: """Return True if the Unity response indicates the editor is reloading.""" if not isinstance(resp, dict): return False if resp.get("state") == "reloading": return True message_text = (resp.get("message") or resp.get("error") or "").lower() return "reload" in message_text def send_command_with_retry(command_type: str, params: Dict[str, Any], *, max_retries: int | None = None, retry_ms: int | None = None) -> Dict[str, Any]: """Send a command via the shared connection, waiting politely through Unity reloads. Uses config.reload_retry_ms and config.reload_max_retries by default. Preserves the structured failure if retries are exhausted. """ conn = get_unity_connection() if max_retries is None: max_retries = getattr(config, "reload_max_retries", 40) if retry_ms is None: retry_ms = getattr(config, "reload_retry_ms", 250) response = conn.send_command(command_type, params) retries = 0 while _is_reloading_response(response) and retries < max_retries: delay_ms = int(response.get("retry_after_ms", retry_ms) ) if isinstance(response, dict) else retry_ms time.sleep(max(0.0, delay_ms / 1000.0)) retries += 1 response = conn.send_command(command_type, params) return response async def async_send_command_with_retry(command_type: str, params: dict[str, Any], *, loop=None, max_retries: int | None = None, retry_ms: int | None = None) -> dict[str, Any] | MCPResponse: """Async wrapper that runs the blocking retry helper in a thread pool.""" try: import asyncio # local import to avoid mandatory asyncio dependency for sync callers if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor( None, lambda: send_command_with_retry( command_type, params, max_retries=max_retries, retry_ms=retry_ms), ) except Exception as e: return MCPResponse(success=False, error=str(e)) ``` -------------------------------------------------------------------------------- /UnityMcpBridge/UnityMcpServer~/src/unity_connection.py: -------------------------------------------------------------------------------- ```python from config import config import contextlib from dataclasses import dataclass import errno import json import logging from pathlib import Path from port_discovery import PortDiscovery import random import socket import struct import threading import time from typing import Any, Dict # Configure logging using settings from config logging.basicConfig( level=getattr(logging, config.log_level), format=config.log_format ) logger = logging.getLogger("mcp-for-unity-server") # Module-level lock to guard global connection initialization _connection_lock = threading.Lock() # Maximum allowed framed payload size (64 MiB) FRAMED_MAX = 64 * 1024 * 1024 @dataclass class UnityConnection: """Manages the socket connection to the Unity Editor.""" host: str = config.unity_host port: int = None # Will be set dynamically sock: socket.socket = None # Socket for Unity communication use_framing: bool = False # Negotiated per-connection def __post_init__(self): """Set port from discovery if not explicitly provided""" if self.port is None: self.port = PortDiscovery.discover_unity_port() self._io_lock = threading.Lock() self._conn_lock = threading.Lock() def connect(self) -> bool: """Establish a connection to the Unity Editor.""" if self.sock: return True with self._conn_lock: if self.sock: return True try: # Bounded connect to avoid indefinite blocking connect_timeout = float( getattr(config, "connect_timeout", getattr(config, "connection_timeout", 1.0))) self.sock = socket.create_connection( (self.host, self.port), connect_timeout) # Disable Nagle's algorithm to reduce small RPC latency with contextlib.suppress(Exception): self.sock.setsockopt( socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) logger.debug(f"Connected to Unity at {self.host}:{self.port}") # Strict handshake: require FRAMING=1 try: require_framing = getattr(config, "require_framing", True) timeout = float(getattr(config, "handshake_timeout", 1.0)) self.sock.settimeout(timeout) buf = bytearray() deadline = time.monotonic() + timeout while time.monotonic() < deadline and len(buf) < 512: try: chunk = self.sock.recv(256) if not chunk: break buf.extend(chunk) if b"\n" in buf: break except socket.timeout: break text = bytes(buf).decode('ascii', errors='ignore').strip() if 'FRAMING=1' in text: self.use_framing = True logger.debug( 'Unity MCP handshake received: FRAMING=1 (strict)') else: if require_framing: # Best-effort plain-text advisory for legacy peers with contextlib.suppress(Exception): self.sock.sendall( b'Unity MCP requires FRAMING=1\n') raise ConnectionError( f'Unity MCP requires FRAMING=1, got: {text!r}') else: self.use_framing = False logger.warning( 'Unity MCP handshake missing FRAMING=1; proceeding in legacy mode by configuration') finally: self.sock.settimeout(config.connection_timeout) return True except Exception as e: logger.error(f"Failed to connect to Unity: {str(e)}") try: if self.sock: self.sock.close() except Exception: pass self.sock = None return False def disconnect(self): """Close the connection to the Unity Editor.""" if self.sock: try: self.sock.close() except Exception as e: logger.error(f"Error disconnecting from Unity: {str(e)}") finally: self.sock = None def _read_exact(self, sock: socket.socket, count: int) -> bytes: data = bytearray() while len(data) < count: chunk = sock.recv(count - len(data)) if not chunk: raise ConnectionError( "Connection closed before reading expected bytes") data.extend(chunk) return bytes(data) def receive_full_response(self, sock, buffer_size=config.buffer_size) -> bytes: """Receive a complete response from Unity, handling chunked data.""" if self.use_framing: try: # Consume heartbeats, but do not hang indefinitely if only zero-length frames arrive heartbeat_count = 0 deadline = time.monotonic() + getattr(config, 'framed_receive_timeout', 2.0) while True: header = self._read_exact(sock, 8) payload_len = struct.unpack('>Q', header)[0] if payload_len == 0: # Heartbeat/no-op frame: consume and continue waiting for a data frame logger.debug("Received heartbeat frame (length=0)") heartbeat_count += 1 if heartbeat_count >= getattr(config, 'max_heartbeat_frames', 16) or time.monotonic() > deadline: # Treat as empty successful response to match C# server behavior logger.debug( "Heartbeat threshold reached; returning empty response") return b"" continue if payload_len > FRAMED_MAX: raise ValueError( f"Invalid framed length: {payload_len}") payload = self._read_exact(sock, payload_len) logger.debug( f"Received framed response ({len(payload)} bytes)") return payload except socket.timeout as e: logger.warning("Socket timeout during framed receive") raise TimeoutError("Timeout receiving Unity response") from e except Exception as e: logger.error(f"Error during framed receive: {str(e)}") raise chunks = [] # Respect the socket's currently configured timeout try: while True: chunk = sock.recv(buffer_size) if not chunk: if not chunks: raise Exception( "Connection closed before receiving data") break chunks.append(chunk) # Process the data received so far data = b''.join(chunks) decoded_data = data.decode('utf-8') # Check if we've received a complete response try: # Special case for ping-pong if decoded_data.strip().startswith('{"status":"success","result":{"message":"pong"'): logger.debug("Received ping response") return data # Handle escaped quotes in the content if '"content":' in decoded_data: # Find the content field and its value content_start = decoded_data.find('"content":') + 9 content_end = decoded_data.rfind('"', content_start) if content_end > content_start: # Replace escaped quotes in content with regular quotes content = decoded_data[content_start:content_end] content = content.replace('\\"', '"') decoded_data = decoded_data[:content_start] + \ content + decoded_data[content_end:] # Validate JSON format json.loads(decoded_data) # If we get here, we have valid JSON logger.info( f"Received complete response ({len(data)} bytes)") return data except json.JSONDecodeError: # We haven't received a complete valid JSON response yet continue except Exception as e: logger.warning( f"Error processing response chunk: {str(e)}") # Continue reading more chunks as this might not be the complete response continue except socket.timeout: logger.warning("Socket timeout during receive") raise Exception("Timeout receiving Unity response") except Exception as e: logger.error(f"Error during receive: {str(e)}") raise def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """Send a command with retry/backoff and port rediscovery. Pings only when requested.""" # Defensive guard: catch empty/placeholder invocations early if not command_type: raise ValueError("MCP call missing command_type") if params is None: # Return a fast, structured error that clients can display without hanging return {"success": False, "error": "MCP call received with no parameters (client placeholder?)"} attempts = max(config.max_retries, 5) base_backoff = max(0.5, config.retry_delay) def read_status_file() -> dict | None: try: status_files = sorted(Path.home().joinpath( '.unity-mcp').glob('unity-mcp-status-*.json'), key=lambda p: p.stat().st_mtime, reverse=True) if not status_files: return None latest = status_files[0] with latest.open('r') as f: return json.load(f) except Exception: return None last_short_timeout = None # Preflight: if Unity reports reloading, return a structured hint so clients can retry politely try: status = read_status_file() if status and (status.get('reloading') or status.get('reason') == 'reloading'): return { "success": False, "state": "reloading", "retry_after_ms": int(config.reload_retry_ms), "error": "Unity domain reload in progress", "message": "Unity is reloading scripts; please retry shortly" } except Exception: pass for attempt in range(attempts + 1): try: # Ensure connected (handshake occurs within connect()) if not self.sock and not self.connect(): raise Exception("Could not connect to Unity") # Build payload if command_type == 'ping': payload = b'ping' else: command = {"type": command_type, "params": params or {}} payload = json.dumps( command, ensure_ascii=False).encode('utf-8') # Send/receive are serialized to protect the shared socket with self._io_lock: mode = 'framed' if self.use_framing else 'legacy' with contextlib.suppress(Exception): logger.debug( "send %d bytes; mode=%s; head=%s", len(payload), mode, (payload[:32]).decode('utf-8', 'ignore'), ) if self.use_framing: header = struct.pack('>Q', len(payload)) self.sock.sendall(header) self.sock.sendall(payload) else: self.sock.sendall(payload) # During retry bursts use a short receive timeout and ensure restoration restore_timeout = None if attempt > 0 and last_short_timeout is None: restore_timeout = self.sock.gettimeout() self.sock.settimeout(1.0) try: response_data = self.receive_full_response(self.sock) with contextlib.suppress(Exception): logger.debug("recv %d bytes; mode=%s", len(response_data), mode) finally: if restore_timeout is not None: self.sock.settimeout(restore_timeout) last_short_timeout = None # Parse if command_type == 'ping': resp = json.loads(response_data.decode('utf-8')) if resp.get('status') == 'success' and resp.get('result', {}).get('message') == 'pong': return {"message": "pong"} raise Exception("Ping unsuccessful") resp = json.loads(response_data.decode('utf-8')) if resp.get('status') == 'error': err = resp.get('error') or resp.get( 'message', 'Unknown Unity error') raise Exception(err) return resp.get('result', {}) except Exception as e: logger.warning( f"Unity communication attempt {attempt+1} failed: {e}") try: if self.sock: self.sock.close() finally: self.sock = None # Re-discover port each time try: new_port = PortDiscovery.discover_unity_port() if new_port != self.port: logger.info( f"Unity port changed {self.port} -> {new_port}") self.port = new_port except Exception as de: logger.debug(f"Port discovery failed: {de}") if attempt < attempts: # Heartbeat-aware, jittered backoff status = read_status_file() # Base exponential backoff backoff = base_backoff * (2 ** attempt) # Decorrelated jitter multiplier jitter = random.uniform(0.1, 0.3) # Fast‑retry for transient socket failures fast_error = isinstance( e, (ConnectionRefusedError, ConnectionResetError, TimeoutError)) if not fast_error: try: err_no = getattr(e, 'errno', None) fast_error = err_no in ( errno.ECONNREFUSED, errno.ECONNRESET, errno.ETIMEDOUT) except Exception: pass # Cap backoff depending on state if status and status.get('reloading'): cap = 0.8 elif fast_error: cap = 0.25 else: cap = 3.0 sleep_s = min(cap, jitter * (2 ** attempt)) time.sleep(sleep_s) continue raise # Global Unity connection _unity_connection = None def get_unity_connection() -> UnityConnection: """Retrieve or establish a persistent Unity connection. Note: Do NOT ping on every retrieval to avoid connection storms. Rely on send_command() exceptions to detect broken sockets and reconnect there. """ global _unity_connection if _unity_connection is not None: return _unity_connection # Double-checked locking to avoid concurrent socket creation with _connection_lock: if _unity_connection is not None: return _unity_connection logger.info("Creating new Unity connection") _unity_connection = UnityConnection() if not _unity_connection.connect(): _unity_connection = None raise ConnectionError( "Could not connect to Unity. Ensure the Unity Editor and MCP Bridge are running.") logger.info("Connected to Unity on startup") return _unity_connection # ----------------------------- # Centralized retry helpers # ----------------------------- def _is_reloading_response(resp: dict) -> bool: """Return True if the Unity response indicates the editor is reloading.""" if not isinstance(resp, dict): return False if resp.get("state") == "reloading": return True message_text = (resp.get("message") or resp.get("error") or "").lower() return "reload" in message_text def send_command_with_retry(command_type: str, params: Dict[str, Any], *, max_retries: int | None = None, retry_ms: int | None = None) -> Dict[str, Any]: """Send a command via the shared connection, waiting politely through Unity reloads. Uses config.reload_retry_ms and config.reload_max_retries by default. Preserves the structured failure if retries are exhausted. """ conn = get_unity_connection() if max_retries is None: max_retries = getattr(config, "reload_max_retries", 40) if retry_ms is None: retry_ms = getattr(config, "reload_retry_ms", 250) response = conn.send_command(command_type, params) retries = 0 while _is_reloading_response(response) and retries < max_retries: delay_ms = int(response.get("retry_after_ms", retry_ms) ) if isinstance(response, dict) else retry_ms time.sleep(max(0.0, delay_ms / 1000.0)) retries += 1 response = conn.send_command(command_type, params) return response async def async_send_command_with_retry(command_type: str, params: Dict[str, Any], *, loop=None, max_retries: int | None = None, retry_ms: int | None = None) -> Dict[str, Any]: """Async wrapper that runs the blocking retry helper in a thread pool.""" try: import asyncio # local import to avoid mandatory asyncio dependency for sync callers if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor( None, lambda: send_command_with_retry( command_type, params, max_retries=max_retries, retry_ms=retry_ms), ) except Exception as e: # Return a structured error dict for consistency with other responses return {"success": False, "error": f"Python async retry helper failed: {str(e)}"} ```