This is page 1 of 2. Use http://codebase.md/alexkissijr/unrealmcp?page={x} to view the full context.
# Directory Structure
```
├── .gitignore
├── MCP
│ ├── 0.1.0
│ ├── check_mcp_setup.py
│ ├── check_setup.bat
│ ├── Commands
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-312.pyc
│ │ │ ├── commands_materials.cpython-312.pyc
│ │ │ ├── commands_python.cpython-312.pyc
│ │ │ ├── commands_scene.cpython-312.pyc
│ │ │ ├── materials.cpython-312.pyc
│ │ │ ├── python.cpython-312.pyc
│ │ │ └── scene.cpython-312.pyc
│ │ ├── commands_materials.py
│ │ ├── commands_python.py
│ │ └── commands_scene.py
│ ├── cursor_setup.py
│ ├── example_extension_script.py
│ ├── install_mcp.py
│ ├── README_MCP_SETUP.md
│ ├── requirements.txt
│ ├── run_unreal_mcp.bat
│ ├── setup_cursor_mcp.bat
│ ├── setup_unreal_mcp.bat
│ ├── temp_update_config.py
│ ├── TestScripts
│ │ ├── 1_basic_connection.py
│ │ ├── 2_python_execution.py
│ │ ├── 3_string_test.py
│ │ ├── format_test.py
│ │ ├── README.md
│ │ ├── run_all_tests.py
│ │ ├── simple_test_command.py
│ │ ├── test_commands_basic.py
│ │ ├── test_commands_blueprint.py
│ │ └── test_commands_material.py
│ ├── unreal_mcp_bridge.py
│ ├── UserTools
│ │ ├── __pycache__
│ │ │ └── example_tool.cpython-312.pyc
│ │ ├── example_tool.py
│ │ └── README.md
│ └── utils
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-312.pyc
│ │ └── command_utils.cpython-312.pyc
│ └── command_utils.py
├── README.md
├── Resources
│ └── Icon128.png
├── Source
│ └── UnrealMCP
│ ├── Private
│ │ ├── MCPCommandHandlers_Blueprints.cpp
│ │ ├── MCPCommandHandlers_Materials.cpp
│ │ ├── MCPCommandHandlers.cpp
│ │ ├── MCPConstants.cpp
│ │ ├── MCPExtensionExample.cpp
│ │ ├── MCPFileLogger.h
│ │ ├── MCPTCPServer.cpp
│ │ └── UnrealMCP.cpp
│ ├── Public
│ │ ├── MCPCommandHandlers_Blueprints.h
│ │ ├── MCPCommandHandlers_Materials.h
│ │ ├── MCPCommandHandlers.h
│ │ ├── MCPConstants.h
│ │ ├── MCPExtensionHandler.h
│ │ ├── MCPSettings.h
│ │ ├── MCPTCPServer.h
│ │ └── UnrealMCP.h
│ └── UnrealMCP.Build.cs
└── UnrealMCP.uplugin
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Unreal Engine Plugin Ignores
Binaries/
Intermediate/
DerivedDataCache/
Saved/
Logs/
*.sln
*.vcxproj
*.vcxproj.filters
*.vcxproj.user
*.pdb
# Python modules
MCP/python_modules/
MCP/__pycache__/
MCP/Commands/__pycache__/
**/__pycache__/
*.py[cod]
*$py.class
```
--------------------------------------------------------------------------------
/MCP/TestScripts/README.md:
--------------------------------------------------------------------------------
```markdown
# MCP Server Test Scripts
This directory contains test scripts for the Unreal MCP Server.
## Overview
These scripts test various aspects of the MCP Server functionality:
1. **Basic Connection Test** (`1_basic_connection.py`): Tests the basic connection to the MCP Server.
2. **Python Execution Test** (`2_python_execution.py`): Tests executing Python code through the MCP Server.
3. **String Handling Test** (`3_string_test.py`): Tests various string formats and potential problem areas.
## Running the Tests
You can run individual tests:
```bash
python 1_basic_connection.py
python 2_python_execution.py
python 3_string_test.py
```
Or run all tests in sequence:
```bash
python run_all_tests.py
```
## Test Requirements
- The MCP Server must be running in Unreal Engine
- Python 3.6 or higher
- Socket and JSON modules (included in standard library)
## Command Format
The MCP Server expects commands in the following format:
```json
{
"type": "command_name",
"code": "python_code_here" // For execute_python command
}
```
The command should be sent as a JSON string followed by a newline character.
## Troubleshooting
If you encounter issues:
1. Make sure the MCP Server is running in Unreal Engine
2. Check that you're connecting to the correct host and port (default: localhost:13377)
3. Verify the command format is correct
4. Check the Unreal Engine log for any error messages
## Adding New Tests
When adding new tests, follow the pattern of the existing tests:
1. Connect to the server
2. Send a command
3. Receive and process the response
4. Return success/failure
Use the `sys.exit()` code to indicate test success (0) or failure (non-zero).
```
--------------------------------------------------------------------------------
/MCP/UserTools/README.md:
--------------------------------------------------------------------------------
```markdown
### User Guide: Adding Custom MCP Tools
To extend the functionality of the UnrealMCP plugin with your own tools, follow these steps:
1. **Locate the `user_tools` Directory**
- Find the `user_tools` directory in the plugin’s MCP folder (e.g., `Plugins/UnrealMCP/MCP/user_tools`).
- If it doesn’t exist, create it manually.
2. **Create a Python Script**
- Add a new `.py` file in the `user_tools` directory (e.g., `my_tool.py`).
- Define a `register_tools(mcp, utils)` function in your script to register your custom tools.
3. **Define Your Tools**
- Use the `@mcp.tool()` decorator to create tools.
- Access the `send_command` function via `utils['send_command']` to interact with Unreal Engine.
- Example:
```python
def register_tools(mcp, utils):
send_command = utils['send_command']
@mcp.tool()
def create_cube(ctx, location: list) -> str:
"""Create a cube at the specified location."""
code = f"""
import unreal
location = unreal.Vector({location[0]}, {location[1]}, {location[2]})
unreal.EditorLevelLibrary.spawn_actor_from_class(unreal.StaticMeshActor, location)
"""
response = send_command("execute_python", {"code": code})
return "Cube created" if response["status"] == "success" else f"Error: {response['message']}"
```
4. **Run the Bridge**
- Start the MCP bridge as usual with `run_unreal_mcp.bat`. Your tools will be loaded automatically.
**Notes:**
- Tools run in the bridge’s Python process and communicate with Unreal Engine via the MCP server.
- Use `send_command("execute_python", {"code": "..."})` to execute Python code in Unreal Engine’s interpreter, accessing the `unreal` module.
- Ensure any additional Python packages required by your tools are installed in Unreal Engine’s Python environment (not the bridge’s virtual environment) if using `execute_python`.
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# UnrealMCP Plugin
# VERY WIP REPO
I'm working on adding more tools now and cleaning up the codebase,
I plan to allow for easy tool extension outside the main plugin
This is very much a work in progress, and I need to clean up a lot of stuff!!!!!
Also, I only use windows, so I don't know how this would be setup for mac/unix
## Overview
UnrealMCP is an Unofficial Unreal Engine plugin designed to control Unreal Engine with AI tools. It implements a Machine Control Protocol (MCP) within Unreal Engine, allowing external AI systems to interact with and manipulate the Unreal environment programmatically.
I only just learned about MCP a few days ago, so I'm not that familiar with it, I'm still learning so things might be initially pretty rough.
I've implemented this using https://github.com/ahujasid/blender-mcp as a reference, which relies on Claude for Desktop. It now works with both Claude for Desktop and Cursor. If you experiment with other models, please let me know!
## ⚠️ DISCLAIMER
This plugin allows AI agents to directly modify your Unreal Engine project. While it can be a powerful tool, it also comes with risks:
- AI agents may make unexpected changes to your project
- Files could be accidentally deleted or modified
- Project settings could be altered
- Assets could be overwritten
**IMPORTANT SAFETY MEASURES:**
1. Always use source control (like Git or Perforce) with your project
2. Make regular backups of your project
3. Test the plugin in a separate project first
4. Review changes before committing them
By using this plugin, you acknowledge that:
- You are solely responsible for any changes made to your project
- The plugin author is not responsible for any damage, data loss, or issues caused by AI agents
- You use this plugin at your own risk
## Features
- TCP server implementation for remote control of Unreal Engine
- JSON-based command protocol for AI tools integration
- Editor UI integration for easy access to MCP functionality
- Comprehensive scene manipulation capabilities
- Python companion scripts for client-side interaction
## Roadmap
These are what I have in mind for development as of 3/14/2025
I'm not sure what's possible yet, in theory anything, but it depends on how
good the integrated LLM is at utilizing these tools.
- [X] Basic operations working
- [X] Python working
- [X] Materials
- [ ] User Extensions (in progress)
- [ ] Asset tools
- [ ] Blueprints
- [ ] Niagara VFX
- [ ] Metasound
- [ ] Landscape (I might hold off on this because Epic has mentioned they are going to be updating the landscape tools)
- [ ] Modeling Tools
- [ ] PCG
## Requirements
- Unreal Engine 5.5 (I have only tested on this version, may work with earlier, but no official support)
- C++ development environment configured for Unreal Engine
- Python 3.7+ for client-side scripting
- Model to run the commands, in testing I've been using Claude for Desktop https://claude.ai/download
## Prerequisites to run
- Unreal Editor Installation (Tested with 5.3, but should work on 5.0+)
- Python 3.7+ (This can run with your existing python install)
- MCP compatible LLM (Claude for Desktop, Cursor, etc.)
- Setup: run setup_unreal_mcp.bat in MCP folder as per instructions in MCP/README_MCP_SETUP.md
## Quick Start for Cursor Users
If you want to use UnrealMCP with Cursor, follow these simple steps:
1. Clone or download this repository as a zip
2. Create a new Unreal Project, or open an existing one
3. Create a "Plugins" folder in your project directory if it doesn't exist
4. Unzip or copy this repository into the Plugins folder
5. Run `setup_cursor_mcp.bat` in the MCP folder
6. Open your Unreal project and enable the plugin in Edit > Plugins (if not already enabled)
7. Start Cursor and ask it to work with your Unreal project
That's it! The setup script will automatically configure everything needed for Cursor integration.
## Installation
1. Clone or download this repository as a zip
2. Create a new Unreal Project, or open an existing one
3. Create a "Plugins" folder in your project directory if it doesn't exist
4. Unzip or copy this repository into the Plugins folder
5. Setup MCP
- Run the `setup_unreal_mcp.bat` script in the MCP folder (see `MCP/README_MCP_SETUP.md` for details)
- This will configure Python and your AI assistant (Claude for Desktop or Cursor)
6. Open your Unreal project, the plugin should be available in the Plugins menu
7. If not, enable the plugin in Edit > Plugins
8. Choose your preferred AI assistant:
- For Claude for Desktop: follow the instructions in the "With Claude for Desktop" section below
- For Cursor: follow the instructions in the "With Cursor" section below
## With Claude for Desktop
You will need to find your installation directory for Claude for Desktop. Find claude_desktop_config.json and add an entry and make it look like so:
**Windows:** `%APPDATA%\Claude\claude_desktop_config.json`
```json
{
"mcpServers": {
"unreal": {
"command": "C:/path/to/your/project/Plugins/UnrealMCP/MCP/run_unreal_mcp.bat",
"args": []
}
}
}
```
Alternatively the unreal_mcp_setup.bat script should do this for you.
To find the path to your claude for desktop install you can go into settings and click 'Edit Config'
This is usually in
```
C:\Users\USERNAME\AppData\Roaming\Claude
```
## With Cursor
Cursor should be automatically configured if you've run the setup script with the Cursor option. If you need to manually configure it:
**Windows:** `%APPDATA%\Cursor\User\settings.json`
Add or update the settings with:
```json
{
"mcp": {
"enabled": true,
"servers": {
"unreal": {
"command": "C:/path/to/your/project/Plugins/UnrealMCP/MCP/run_unreal_mcp.bat",
"args": []
}
}
}
}
```
## Testing
Once everything is setup you need to launch the unreal editor.
Note: Nothing else has to be started or set up to run the mcp bridge, it will run when needed.
Open Claude for Desktop or Cursor, ensure that the tools have successfully enabled, ask your AI assistant to work in Unreal.
Here are some example prompts to try:
- "What actors are in the current level?"
- "Create a cube at position (0, 0, 100)"
- "List available commands I can use with Unreal Engine"
## Usage
### In Unreal Editor
Once the plugin is enabled, you'll find MCP controls in the editor toolbar button.


The TCP server can be started/stopped from here.
Check the output log under log filter LogMCP for extra information.
Once the server is confirmed up and running from the editor.
Open Claude for Desktop, ensure that the tools have successfully enabled, ask Claude to work in unreal.
Currently only basic operations are supported, creating objects, modfiying their transforms, getting scene info, and running python scripts.
Claude makes a lot of errors with unreal python as I believe there aren't a ton of examples for it, but let it run and it will usually figure things out.
I would really like to improve this aspect of how it works but it's low hanging fruit for adding functionality into unreal.
### Client-Side Integration
Use the provided Python scripts in the `MCP` directory to connect to and control your Unreal Engine instance:
```python
from unreal_mcp_client import UnrealMCPClient
# Connect to the Unreal MCP server
client = UnrealMCPClient("localhost", 13377)
# Example: Create a cube in the scene
client.create_object(
class_name="StaticMeshActor",
asset_path="/Engine/BasicShapes/Cube.Cube",
location=(0, 0, 100),
rotation=(0, 0, 0),
scale=(1, 1, 1),
name="MCP_Cube"
)
```
## Command Reference
The plugin supports various commands for scene manipulation:
- `get_scene_info`: Retrieve information about the current scene
- `create_object`: Spawn a new object in the scene
- `delete_object`: Remove an object from the scene
- `modify_object`: Change properties of an existing object
- `execute_python`: Run Python commands in Unreal's Python environment
- And more to come...
Refer to the documentation in the `Docs` directory for a complete command reference.
## Security Considerations
- The MCP server accepts connections from any client by default
- Limit server exposure to localhost for development
- Validate all incoming commands to prevent injection attacks
## Troubleshooting
- Ensure Unreal Engine is running with the MCP plugin.
- Check logs in Claude for Desktop for stderr output.
- Reach out on the discord, I just made it, but I will check it periodically
Discord (Dreamatron Studios): https://discord.gg/abRftdSe
### Project Structure
- `Source/UnrealMCP/`: Core plugin implementation
- `Private/`: Internal implementation files
- `Public/`: Public header files
- `Content/`: Plugin assets
- `MCP/`: Python client scripts and examples
- `Resources/`: Icons and other resources
## License
MIT License
Copyright (c) 2025 kvick
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
## Credits
- Created by: kvick
- X: [@kvickart](https://x.com/kvickart)
- Discord: https://discord.gg/abRftdSe
### Thank you to testers!!!
- https://github.com/TheMurphinatur
- [@sidahuj](https://x.com/sidahuj) for the inspriation
## Contributing
Contributions are welcome, but I will need some time to wrap my head around things and cleanup first, lol
```
--------------------------------------------------------------------------------
/MCP/example_extension_script.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/MCP/Commands/__init__.py:
--------------------------------------------------------------------------------
```python
"""Command modules for the UnrealMCP bridge."""
```
--------------------------------------------------------------------------------
/MCP/requirements.txt:
--------------------------------------------------------------------------------
```
mcp>=0.1.0
# socket and json are part of Python's standard library and don't need to be listed
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Public/MCPSettings.h:
--------------------------------------------------------------------------------
```
#pragma once
#include "CoreMinimal.h"
#include "Engine/DeveloperSettings.h"
#include "MCPConstants.h"
#include "MCPSettings.generated.h"
UCLASS(config = Editor, defaultconfig)
class UNREALMCP_API UMCPSettings : public UDeveloperSettings
{
GENERATED_BODY()
public:
UPROPERTY(config, EditAnywhere, Category = "MCP", meta = (ClampMin = "1024", ClampMax = "65535"))
int32 Port = MCPConstants::DEFAULT_PORT;
};
```
--------------------------------------------------------------------------------
/MCP/setup_cursor_mcp.bat:
--------------------------------------------------------------------------------
```
@echo off
echo ========================================================
echo Unreal MCP - Cursor Setup
echo ========================================================
echo.
echo This script will set up the MCP bridge for Cursor.
echo.
REM Get the directory where this script is located
set "SCRIPT_DIR=%~dp0"
set "SCRIPT_DIR=%SCRIPT_DIR:~0,-1%"
REM Run the main setup script with the cursor configuration flag
call "%SCRIPT_DIR%\setup_unreal_mcp.bat" --configure-cursor
echo.
echo Setup complete! You can now use UnrealMCP with Cursor.
echo.
```
--------------------------------------------------------------------------------
/MCP/run_unreal_mcp.bat:
--------------------------------------------------------------------------------
```
@echo off
setlocal
REM Get the directory where this script is located
set "SCRIPT_DIR=%~dp0"
set "SCRIPT_DIR=%SCRIPT_DIR:~0,-1%"
REM Set paths for local environment
set "ENV_DIR=%SCRIPT_DIR%\python_env"
set "PYTHON_PATH=%ENV_DIR%\Scripts\python.exe"
REM Check if Python environment exists
if not exist "%PYTHON_PATH%" (
echo ERROR: Python environment not found. Please run setup_unreal_mcp.bat first. >&2
goto :end
)
REM Activate the virtual environment silently
call "%ENV_DIR%\Scripts\activate.bat" >nul 2>&1
REM Log start message to stderr
echo Starting Unreal MCP bridge... >&2
REM Run the Python bridge script, keeping stdout clean for MCP
python "%SCRIPT_DIR%\unreal_mcp_bridge.py" %*
:end
```
--------------------------------------------------------------------------------
/MCP/check_setup.bat:
--------------------------------------------------------------------------------
```
@echo off
echo ========================================================
echo Unreal MCP - Setup Diagnosis Tool
echo ========================================================
echo.
echo This tool will check your MCP setup and provide diagnostic information.
echo.
REM Get the directory where this script is located
set "SCRIPT_DIR=%~dp0"
set "SCRIPT_DIR=%SCRIPT_DIR:~0,-1%"
REM Try to run with the virtual environment first
if exist "%SCRIPT_DIR%\python_env\Scripts\python.exe" (
echo Using Python from virtual environment...
call "%SCRIPT_DIR%\python_env\Scripts\activate.bat"
python "%SCRIPT_DIR%\check_mcp_setup.py"
) else (
echo Using system Python...
python "%SCRIPT_DIR%\check_mcp_setup.py"
)
echo.
echo Press any key to exit...
pause >nul
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/UnrealMCP.Build.cs:
--------------------------------------------------------------------------------
```csharp
// Copyright Epic Games, Inc. All Rights Reserved.
using UnrealBuildTool;
public class UnrealMCP : ModuleRules
{
public UnrealMCP(ReadOnlyTargetRules Target) : base(Target)
{
PCHUsage = ModuleRules.PCHUsageMode.UseExplicitOrSharedPCHs;
PublicIncludePaths.AddRange(
new string[] {
// ... add public include paths required here ...
}
);
PrivateIncludePaths.AddRange(
new string[] {
// ... add other private include paths required here ...
}
);
PublicDependencyModuleNames.AddRange(
new string[] {
"Core", "CoreUObject", "Engine", "UnrealEd",
"Networking", "Sockets", "Slate", "SlateCore", "EditorStyle",
"DeveloperSettings", "Projects", "ToolMenus",
"BlueprintGraph", "GraphEditor", "KismetCompiler"
}
);
PrivateDependencyModuleNames.AddRange(
new string[] {
"Json", "JsonUtilities", "Settings", "InputCore", "PythonScriptPlugin",
"Kismet", "KismetWidgets"
}
);
DynamicallyLoadedModuleNames.AddRange(
new string[]
{
// ... add any modules that your module loads dynamically here ...
}
);
}
}
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Public/UnrealMCP.h:
--------------------------------------------------------------------------------
```
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include "CoreMinimal.h"
#include "Modules/ModuleManager.h"
// Declare custom log category
UNREALMCP_API DECLARE_LOG_CATEGORY_EXTERN(LogMCP, Log, All);
class FMCPTCPServer;
class SWindow;
class FUnrealMCPModule : public IModuleInterface, public TSharedFromThis<FUnrealMCPModule>
{
public:
/** IModuleInterface implementation */
virtual void StartupModule() override;
virtual void ShutdownModule() override;
/**
* Get the MCP server instance
* External modules can use this to register custom handlers
* @return The MCP server instance, or nullptr if not available
*/
UNREALMCP_API FMCPTCPServer* GetServer() const { return Server.Get(); }
private:
void ExtendLevelEditorToolbar();
void AddToolbarButton(FToolBarBuilder& Builder);
void ToggleServer();
void StartServer();
void StopServer();
bool IsServerRunning() const;
// MCP Control Panel functions
void OpenMCPControlPanel();
FReply OpenMCPControlPanel_OnClicked();
void CloseMCPControlPanel();
void OnMCPControlPanelClosed(const TSharedRef<SWindow>& Window);
TSharedRef<class SWidget> CreateMCPControlPanelContent();
FReply OnStartServerClicked();
FReply OnStopServerClicked();
TUniquePtr<FMCPTCPServer> Server;
TSharedPtr<SWindow> MCPControlPanelWindow;
};
```
--------------------------------------------------------------------------------
/MCP/UserTools/example_tool.py:
--------------------------------------------------------------------------------
```python
def register_tools(mcp, utils):
send_command = utils['send_command']
@mcp.tool()
def my_custom_tool(ctx):
return "Hello from custom tool!"
@mcp.tool()
def get_actor_count(ctx) -> str:
"""Get the number of actors in the current Unreal Engine scene."""
try:
response = send_command("get_scene_info")
print(f"Response: {response}")
if response["status"] == "success":
result = response["result"]
total_actor_count = result["actor_count"]
returned_actor_count = result.get("returned_actor_count", len(result["actors"]))
limit_reached = result.get("limit_reached", False)
response_text = f"Total number of actors: {total_actor_count}\n"
if limit_reached:
response_text += f"WARNING: Actor limit reached! Only {returned_actor_count} actors were returned in the response.\n"
response_text += f"The remaining {total_actor_count - returned_actor_count} actors are not included in the response.\n"
return response_text
else:
return f"Error: {response['message']}"
except Exception as e:
return f"Error: {str(e)}"
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Public/MCPCommandHandlers_Materials.h:
--------------------------------------------------------------------------------
```
#pragma once
#include "CoreMinimal.h"
#include "MCPCommandHandlers.h"
#include "Materials/Material.h"
#include "Materials/MaterialExpressionScalarParameter.h"
#include "Materials/MaterialExpressionVectorParameter.h"
class FMCPCreateMaterialHandler : public FMCPCommandHandlerBase
{
public:
FMCPCreateMaterialHandler() : FMCPCommandHandlerBase(TEXT("create_material")) {}
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) override;
private:
TPair<UMaterial*, bool> CreateMaterial(const FString& PackagePath, const FString& MaterialName, const TSharedPtr<FJsonObject>& Properties);
bool ModifyMaterialProperties(UMaterial* Material, const TSharedPtr<FJsonObject>& Properties);
};
class FMCPModifyMaterialHandler : public FMCPCommandHandlerBase
{
public:
FMCPModifyMaterialHandler() : FMCPCommandHandlerBase(TEXT("modify_material")) {}
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) override;
private:
bool ModifyMaterialProperties(UMaterial* Material, const TSharedPtr<FJsonObject>& Properties);
};
class FMCPGetMaterialInfoHandler : public FMCPCommandHandlerBase
{
public:
FMCPGetMaterialInfoHandler() : FMCPCommandHandlerBase(TEXT("get_material_info")) {}
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) override;
private:
TSharedPtr<FJsonObject> GetMaterialInfo(UMaterial* Material);
};
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Public/MCPConstants.h:
--------------------------------------------------------------------------------
```
#pragma once
#include "CoreMinimal.h"
/**
* Constants used throughout the MCP plugin
*/
namespace MCPConstants
{
// Network constants
constexpr int32 DEFAULT_PORT = 13377;
constexpr int32 DEFAULT_RECEIVE_BUFFER_SIZE = 65536; // 64KB buffer size
constexpr int32 DEFAULT_SEND_BUFFER_SIZE = DEFAULT_RECEIVE_BUFFER_SIZE;
constexpr float DEFAULT_CLIENT_TIMEOUT_SECONDS = 30.0f;
constexpr float DEFAULT_TICK_INTERVAL_SECONDS = 0.1f;
// Python constants
constexpr const TCHAR* PYTHON_TEMP_DIR_NAME = TEXT("PythonTemp");
constexpr const TCHAR* PYTHON_TEMP_FILE_PREFIX = TEXT("mcp_temp_script_");
// Logging constants
constexpr bool DEFAULT_VERBOSE_LOGGING = false;
// Performance constants
constexpr int32 MAX_ACTORS_IN_SCENE_INFO = 1000;
// Path constants - use these instead of hardcoded paths
// These will be initialized at runtime in the module startup
extern FString ProjectRootPath; // Root path of the project
extern FString PluginRootPath; // Root path of the MCP plugin
extern FString PluginContentPath; // Path to the plugin's content directory
extern FString PluginResourcesPath; // Path to the plugin's resources directory
extern FString PluginLogsPath; // Path to the plugin's logs directory
extern FString PluginMCPScriptsPath; // Path to the plugin's MCP scripts directory
// Function to initialize all path variables at runtime
void InitializePathConstants();
}
```
--------------------------------------------------------------------------------
/MCP/TestScripts/run_all_tests.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
MCP Server Test Runner
This script runs all the MCP Server test scripts in sequence.
"""
import subprocess
import sys
import os
import time
def run_test(test_script):
"""Run a test script and return whether it passed."""
script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), test_script)
print(f"\n{'=' * 60}")
print(f"Running {test_script}...")
print(f"{'=' * 60}")
try:
result = subprocess.run(
[sys.executable, script_path],
capture_output=False, # Show output in real-time
check=False
)
if result.returncode == 0:
print(f"\n✓ {test_script} PASSED")
return True
else:
print(f"\n✗ {test_script} FAILED (exit code: {result.returncode})")
return False
except Exception as e:
print(f"\n✗ Error running {test_script}: {e}")
return False
def main():
"""Run all test scripts."""
# List of test scripts to run
test_scripts = [
"1_basic_connection.py",
"2_python_execution.py",
"3_string_test.py"
]
# Track results
results = {}
# Run each test
for script in test_scripts:
results[script] = run_test(script)
# Add a small delay between tests
time.sleep(1)
# Print summary
print("\n" + "=" * 60)
print("TEST SUMMARY")
print("=" * 60)
all_passed = True
for script, passed in results.items():
status = "✓ PASSED" if passed else "✗ FAILED"
print(f"{script}: {status}")
if not passed:
all_passed = False
print("\n" + "=" * 60)
if all_passed:
print("✓ ALL TESTS PASSED")
return 0
else:
print("✗ SOME TESTS FAILED")
return 1
if __name__ == "__main__":
print("=== MCP Server Test Runner ===")
exit_code = main()
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Private/MCPConstants.cpp:
--------------------------------------------------------------------------------
```cpp
#include "MCPConstants.h"
#include "Misc/Paths.h"
#include "HAL/PlatformFileManager.h"
#include "Interfaces/IPluginManager.h"
// Initialize the static path variables
FString MCPConstants::ProjectRootPath;
FString MCPConstants::PluginRootPath;
FString MCPConstants::PluginContentPath;
FString MCPConstants::PluginResourcesPath;
FString MCPConstants::PluginLogsPath;
FString MCPConstants::PluginMCPScriptsPath;
void MCPConstants::InitializePathConstants()
{
// Get the project root path
ProjectRootPath = FPaths::ConvertRelativePathToFull(FPaths::ProjectDir());
// Get the plugin root path
TSharedPtr<IPlugin> Plugin = IPluginManager::Get().FindPlugin("UnrealMCP");
if (Plugin.IsValid())
{
PluginRootPath = FPaths::ConvertRelativePathToFull(Plugin->GetBaseDir());
// Derive other paths from the plugin root
PluginContentPath = FPaths::Combine(PluginRootPath, TEXT("Content"));
PluginResourcesPath = FPaths::Combine(PluginRootPath, TEXT("Resources"));
PluginLogsPath = FPaths::Combine(PluginRootPath, TEXT("Logs"));
PluginMCPScriptsPath = FPaths::Combine(PluginRootPath, TEXT("MCP"));
// Ensure directories exist
IPlatformFile& PlatformFile = FPlatformFileManager::Get().GetPlatformFile();
if (!PlatformFile.DirectoryExists(*PluginLogsPath))
{
PlatformFile.CreateDirectory(*PluginLogsPath);
}
if (!PlatformFile.DirectoryExists(*PluginMCPScriptsPath))
{
PlatformFile.CreateDirectory(*PluginMCPScriptsPath);
}
}
else
{
// Fallback to project-relative paths if plugin is not found
PluginRootPath = FPaths::Combine(ProjectRootPath, TEXT("Plugins/UnrealMCP"));
PluginContentPath = FPaths::Combine(PluginRootPath, TEXT("Content"));
PluginResourcesPath = FPaths::Combine(PluginRootPath, TEXT("Resources"));
PluginLogsPath = FPaths::Combine(PluginRootPath, TEXT("Logs"));
PluginMCPScriptsPath = FPaths::Combine(PluginRootPath, TEXT("MCP"));
}
}
```
--------------------------------------------------------------------------------
/MCP/install_mcp.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""
This script installs the MCP package and verifies it's installed correctly.
Run this script with the same Python interpreter that Claude Desktop will use.
"""
import sys
import subprocess
import importlib.util
def check_mcp_installed():
"""Check if the MCP package is installed."""
spec = importlib.util.find_spec("mcp")
return spec is not None
def install_mcp():
"""Install the MCP package."""
print(f"Installing MCP package using Python: {sys.executable}")
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "mcp>=0.1.0"])
print("MCP package installed successfully!")
return True
except subprocess.CalledProcessError as e:
print(f"Error installing MCP package: {e}")
return False
def main():
"""Main function."""
print(f"Python version: {sys.version}")
print(f"Python executable: {sys.executable}")
if check_mcp_installed():
print("MCP package is already installed.")
try:
import mcp
print(f"MCP version: {mcp.__version__}")
except (ImportError, AttributeError):
print("MCP package is installed but version could not be determined.")
else:
print("MCP package is not installed.")
if install_mcp():
if check_mcp_installed():
print("Verification: MCP package is now installed.")
try:
import mcp
print(f"MCP version: {mcp.__version__}")
except (ImportError, AttributeError):
print("MCP package is installed but version could not be determined.")
else:
print("ERROR: MCP package installation failed verification.")
print("Please try installing manually with:")
print(f"{sys.executable} -m pip install mcp>=0.1.0")
else:
print("Installation failed. Please try installing manually with:")
print(f"{sys.executable} -m pip install mcp>=0.1.0")
print("\nTo verify the MCP package is installed in the correct environment:")
print("1. Make sure you run this script with the same Python interpreter that Claude Desktop will use")
print("2. Check that the Python executable path shown above matches the one in your Claude Desktop configuration")
input("\nPress Enter to exit...")
```
--------------------------------------------------------------------------------
/MCP/TestScripts/test_commands_basic.py:
--------------------------------------------------------------------------------
```python
"""Test script for UnrealMCP basic commands.
This script tests the basic scene and Python execution commands available in the UnrealMCP bridge.
Make sure Unreal Engine is running with the UnrealMCP plugin enabled before running this script.
"""
import sys
import os
import json
from mcp.server.fastmcp import FastMCP, Context
# Add the MCP directory to sys.path so we can import unreal_mcp_bridge
mcp_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if mcp_dir not in sys.path:
sys.path.insert(0, mcp_dir)
from unreal_mcp_bridge import send_command
def test_scene_info():
"""Test getting scene information."""
print("\n1. Testing get_scene_info...")
try:
response = send_command("get_scene_info")
print(f"Get Scene Info Response: {json.dumps(response, indent=2)}")
return response["status"] == "success"
except Exception as e:
print(f"Error testing get_scene_info: {e}")
return False
def test_object_creation():
"""Test creating objects in the scene."""
print("\n2. Testing create_object...")
try:
params = {
"type": "StaticMeshActor",
"location": [0, 0, 100],
"label": "TestCube"
}
response = send_command("create_object", params)
print(f"Create Object Response: {json.dumps(response, indent=2)}")
return response["status"] == "success"
except Exception as e:
print(f"Error testing create_object: {e}")
return False
def test_python_execution():
"""Test Python execution in Unreal Engine."""
print("\n3. Testing execute_python...")
test_code = """
import unreal
print("Python executing in Unreal Engine!")
world = unreal.EditorLevelLibrary.get_editor_world()
print(f"Current level: {world.get_name()}")
actors = unreal.EditorLevelLibrary.get_all_level_actors()
print(f"Number of actors in level: {len(actors)}")
"""
try:
response = send_command("execute_python", {"code": test_code})
print(f"Python Execution Response: {json.dumps(response, indent=2)}")
return response["status"] == "success"
except Exception as e:
print(f"Error testing Python execution: {e}")
return False
def main():
"""Run all basic command tests."""
print("Starting UnrealMCP basic command tests...")
print("Make sure Unreal Engine is running with the UnrealMCP plugin enabled!")
try:
results = {
"get_scene_info": test_scene_info(),
"create_object": test_object_creation(),
"execute_python": test_python_execution()
}
print("\nTest Results:")
print("-" * 40)
for test_name, success in results.items():
status = "✓ PASS" if success else "✗ FAIL"
print(f"{status} - {test_name}")
print("-" * 40)
if all(results.values()):
print("\nAll basic tests passed successfully!")
else:
print("\nSome tests failed. Check the output above for details.")
sys.exit(1)
except Exception as e:
print(f"\nError during testing: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/MCP/utils/command_utils.py:
--------------------------------------------------------------------------------
```python
"""Utility functions for MCP commands."""
import json
import socket
import sys
# Constants (these will be read from MCPConstants.h)
DEFAULT_PORT = 13377
DEFAULT_BUFFER_SIZE = 65536
DEFAULT_TIMEOUT = 10
try:
# Try to read the port from the C++ constants
import os
plugin_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
constants_path = os.path.join(plugin_dir, "Source", "UnrealMCP", "Public", "MCPConstants.h")
if os.path.exists(constants_path):
with open(constants_path, 'r') as f:
constants_content = f.read()
# Extract port from MCPConstants
port_match = constants_content.find("DEFAULT_PORT = ")
if port_match != -1:
port_line = constants_content[port_match:].split(';')[0]
DEFAULT_PORT = int(port_line.split('=')[1].strip())
# Extract buffer size from MCPConstants
buffer_match = constants_content.find("DEFAULT_RECEIVE_BUFFER_SIZE = ")
if buffer_match != -1:
buffer_line = constants_content[buffer_match:].split(';')[0]
DEFAULT_BUFFER_SIZE = int(buffer_line.split('=')[1].strip())
except Exception as e:
print(f"Warning: Could not read constants from MCPConstants.h: {e}", file=sys.stderr)
def send_command(command_type, params=None):
"""Send a command to the C++ MCP server and return the response."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(DEFAULT_TIMEOUT)
s.connect(("localhost", DEFAULT_PORT))
command = {
"type": command_type,
"params": params or {}
}
s.sendall(json.dumps(command).encode('utf-8'))
chunks = []
response_data = b''
while True:
try:
chunk = s.recv(DEFAULT_BUFFER_SIZE)
if not chunk:
break
chunks.append(chunk)
response_data = b''.join(chunks)
try:
json.loads(response_data.decode('utf-8'))
break
except json.JSONDecodeError:
continue
except socket.timeout:
if response_data:
break
raise
if not response_data:
raise Exception("No data received from server")
return json.loads(response_data.decode('utf-8'))
except ConnectionRefusedError:
print(f"Error: Could not connect to Unreal MCP server on localhost:{DEFAULT_PORT}.", file=sys.stderr)
print("Make sure your Unreal Engine with MCP plugin is running.", file=sys.stderr)
raise Exception("Failed to connect to Unreal MCP server: Connection refused")
except socket.timeout:
print("Error: Connection timed out while communicating with Unreal MCP server.", file=sys.stderr)
raise Exception("Failed to communicate with Unreal MCP server: Connection timed out")
except Exception as e:
print(f"Error communicating with Unreal MCP server: {str(e)}", file=sys.stderr)
raise Exception(f"Failed to communicate with Unreal MCP server: {str(e)}")
```
--------------------------------------------------------------------------------
/MCP/TestScripts/test_commands_material.py:
--------------------------------------------------------------------------------
```python
"""Test script for UnrealMCP material commands.
This script tests the material-related commands available in the UnrealMCP bridge.
Make sure Unreal Engine is running with the UnrealMCP plugin enabled before running this script.
"""
import sys
import os
import json
from mcp.server.fastmcp import FastMCP, Context
# Add the MCP directory to sys.path so we can import unreal_mcp_bridge
mcp_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if mcp_dir not in sys.path:
sys.path.insert(0, mcp_dir)
from unreal_mcp_bridge import send_command
def test_material_creation():
"""Test material creation command."""
print("\n1. Testing create_material...")
try:
params = {
"package_path": "/Game/Materials/Tests",
"name": "TestMaterial",
"properties": {
"shading_model": "DefaultLit",
"base_color": [1.0, 0.0, 0.0, 1.0], # Red material
"metallic": 0.0,
"roughness": 0.5
}
}
response = send_command("create_material", params)
print(f"Create Material Response: {json.dumps(response, indent=2)}")
return response["status"] == "success"
except Exception as e:
print(f"Error testing create_material: {e}")
return False
def test_material_info():
"""Test getting material information."""
print("\n2. Testing get_material_info...")
try:
params = {
"path": "/Game/Materials/Tests/TestMaterial"
}
response = send_command("get_material_info", params)
print(f"Get Material Info Response: {json.dumps(response, indent=2)}")
return response["status"] == "success"
except Exception as e:
print(f"Error testing get_material_info: {e}")
return False
def test_material_modification():
"""Test modifying material properties."""
print("\n3. Testing modify_material...")
try:
params = {
"path": "/Game/Materials/Tests/TestMaterial",
"properties": {
"base_color": [0.0, 1.0, 0.0, 1.0], # Change to green
"metallic": 0.5
}
}
response = send_command("modify_material", params)
print(f"Modify Material Response: {json.dumps(response, indent=2)}")
return response["status"] == "success"
except Exception as e:
print(f"Error testing modify_material: {e}")
return False
def main():
"""Run all material-related tests."""
print("Starting UnrealMCP material command tests...")
print("Make sure Unreal Engine is running with the UnrealMCP plugin enabled!")
try:
results = {
"create_material": test_material_creation(),
"get_material_info": test_material_info(),
"modify_material": test_material_modification()
}
print("\nTest Results:")
print("-" * 40)
for test_name, success in results.items():
status = "✓ PASS" if success else "✗ FAIL"
print(f"{status} - {test_name}")
print("-" * 40)
if all(results.values()):
print("\nAll material tests passed successfully!")
else:
print("\nSome tests failed. Check the output above for details.")
sys.exit(1)
except Exception as e:
print(f"\nError during testing: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/MCP/Commands/commands_scene.py:
--------------------------------------------------------------------------------
```python
"""Scene-related commands for Unreal Engine.
This module contains all scene-related commands for the UnrealMCP bridge,
including getting scene information, creating, modifying, and deleting objects.
"""
import sys
import os
from mcp.server.fastmcp import Context
# Import send_command from the parent module
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from unreal_mcp_bridge import send_command
def register_all(mcp):
"""Register all scene-related commands with the MCP server."""
@mcp.tool()
def get_scene_info(ctx: Context) -> str:
"""Get detailed information about the current Unreal scene."""
try:
response = send_command("get_scene_info")
if response["status"] == "success":
return json.dumps(response["result"], indent=2)
else:
return f"Error: {response['message']}"
except Exception as e:
return f"Error getting scene info: {str(e)}"
@mcp.tool()
def create_object(ctx: Context, type: str, location: list = None, label: str = None) -> str:
"""Create a new object in the Unreal scene.
Args:
type: The type of object to create (e.g., 'StaticMeshActor', 'PointLight', etc.)
location: Optional 3D location as [x, y, z]
label: Optional label for the object
"""
try:
params = {"type": type}
if location:
params["location"] = location
if label:
params["label"] = label
response = send_command("create_object", params)
if response["status"] == "success":
return f"Created object: {response['result']['name']} with label: {response['result']['label']}"
else:
return f"Error: {response['message']}"
except Exception as e:
return f"Error creating object: {str(e)}"
@mcp.tool()
def modify_object(ctx: Context, name: str, location: list = None, rotation: list = None, scale: list = None) -> str:
"""Modify an existing object in the Unreal scene.
Args:
name: The name of the object to modify
location: Optional 3D location as [x, y, z]
rotation: Optional rotation as [pitch, yaw, roll]
scale: Optional scale as [x, y, z]
"""
try:
params = {"name": name}
if location:
params["location"] = location
if rotation:
params["rotation"] = rotation
if scale:
params["scale"] = scale
response = send_command("modify_object", params)
if response["status"] == "success":
return f"Modified object: {response['result']['name']}"
else:
return f"Error: {response['message']}"
except Exception as e:
return f"Error modifying object: {str(e)}"
@mcp.tool()
def delete_object(ctx: Context, name: str) -> str:
"""Delete an object from the Unreal scene.
Args:
name: The name of the object to delete
"""
try:
response = send_command("delete_object", {"name": name})
if response["status"] == "success":
return f"Deleted object: {name}"
else:
return f"Error: {response['message']}"
except Exception as e:
return f"Error deleting object: {str(e)}"
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Public/MCPCommandHandlers_Blueprints.h:
--------------------------------------------------------------------------------
```
#pragma once
#include "CoreMinimal.h"
#include "MCPCommandHandlers.h"
#include "Engine/Blueprint.h"
#include "Engine/BlueprintGeneratedClass.h"
#include "Kismet/GameplayStatics.h"
#include "Kismet/KismetSystemLibrary.h"
#include "EdGraph/EdGraph.h"
#include "K2Node_Event.h"
#include "K2Node_CallFunction.h"
#include "EdGraphSchema_K2.h"
#include "AssetRegistry/AssetRegistryModule.h"
/**
* Common utilities for blueprint operations
*/
class FMCPBlueprintUtils
{
public:
/**
* Create a new blueprint asset
* @param PackagePath - Path where the blueprint should be created
* @param BlueprintName - Name of the blueprint
* @param ParentClass - Parent class for the blueprint
* @return The created blueprint and success flag
*/
static TPair<UBlueprint*, bool> CreateBlueprintAsset(
const FString& PackagePath,
const FString& BlueprintName,
UClass* ParentClass);
/**
* Add event node to blueprint
* @param Blueprint - Target blueprint
* @param EventName - Name of the event to create
* @param ParentClass - Parent class containing the event
* @return The created event node and success flag
*/
static TPair<UK2Node_Event*, bool> AddEventNode(
UBlueprint* Blueprint,
const FString& EventName,
UClass* ParentClass);
/**
* Add print string node to blueprint
* @param Graph - Target graph
* @param Message - Message to print
* @return The created print node and success flag
*/
static TPair<UK2Node_CallFunction*, bool> AddPrintStringNode(
UEdGraph* Graph,
const FString& Message);
};
/**
* Handler for creating blueprints
*/
class FMCPCreateBlueprintHandler : public FMCPCommandHandlerBase
{
public:
FMCPCreateBlueprintHandler() : FMCPCommandHandlerBase(TEXT("create_blueprint")) {}
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) override;
private:
TPair<UBlueprint*, bool> CreateBlueprint(const FString& PackagePath, const FString& BlueprintName, const TSharedPtr<FJsonObject>& Properties);
};
/**
* Handler for modifying blueprints
*/
class FMCPModifyBlueprintHandler : public FMCPCommandHandlerBase
{
public:
FMCPModifyBlueprintHandler() : FMCPCommandHandlerBase(TEXT("modify_blueprint")) {}
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) override;
private:
bool ModifyBlueprint(UBlueprint* Blueprint, const TSharedPtr<FJsonObject>& Properties);
};
/**
* Handler for getting blueprint info
*/
class FMCPGetBlueprintInfoHandler : public FMCPCommandHandlerBase
{
public:
FMCPGetBlueprintInfoHandler() : FMCPCommandHandlerBase(TEXT("get_blueprint_info")) {}
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) override;
private:
TSharedPtr<FJsonObject> GetBlueprintInfo(UBlueprint* Blueprint);
};
/**
* Handler for creating blueprint events
*/
class FMCPCreateBlueprintEventHandler : public FMCPCommandHandlerBase
{
public:
FMCPCreateBlueprintEventHandler() : FMCPCommandHandlerBase(TEXT("create_blueprint_event")) {}
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) override;
private:
TPair<bool, TSharedPtr<FJsonObject>> CreateBlueprintEvent(
UWorld* World,
const FString& EventName,
const FString& BlueprintPath,
const TSharedPtr<FJsonObject>& EventParameters);
};
```
--------------------------------------------------------------------------------
/MCP/TestScripts/2_python_execution.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Python Execution Test for MCP Server
This script tests executing Python code through the MCP Server.
It connects to the server, sends a Python code snippet, and verifies the execution.
"""
import socket
import json
import sys
def main():
"""Connect to the MCP Server and execute Python code."""
try:
# Create socket
print("Connecting to MCP Server on localhost:13377...")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5) # 5 second timeout
# Connect to server
s.connect(("localhost", 13377))
print("✓ Connected successfully")
# Python code to execute
code = """
import unreal
# Get the current level
level = unreal.EditorLevelLibrary.get_editor_world()
level_name = level.get_name()
# Get all actors in the level
actors = unreal.EditorLevelLibrary.get_all_level_actors()
actor_count = len(actors)
# Log some information
unreal.log(f"Current level: {level_name}")
unreal.log(f"Actor count: {actor_count}")
# Return a result
return {
"level_name": level_name,
"actor_count": actor_count
}
"""
# Create command
command = {
"type": "execute_python",
"code": code
}
# Send command
print("Sending execute_python command...")
command_str = json.dumps(command) + "\n" # Add newline
s.sendall(command_str.encode('utf-8'))
# Receive response
print("Waiting for response...")
response = b""
while True:
data = s.recv(4096)
if not data:
break
response += data
if b"\n" in data: # Check for newline which indicates end of response
break
# Close connection
s.close()
print("✓ Connection closed properly")
# Process response
if response:
response_str = response.decode('utf-8').strip()
try:
response_json = json.loads(response_str)
print("\n=== RESPONSE ===")
print(f"Status: {response_json.get('status', 'unknown')}")
if response_json.get('status') == 'success':
print("✓ Python execution successful")
result = response_json.get('result', {})
if isinstance(result, dict) and 'output' in result:
print(f"Output: {result['output']}")
return True
else:
print("✗ Python execution failed")
print(f"Error: {response_json.get('message', 'Unknown error')}")
return False
except json.JSONDecodeError as e:
print(f"✗ Error parsing JSON response: {e}")
print(f"Raw response: {response_str}")
return False
else:
print("✗ No response received from server")
return False
except ConnectionRefusedError:
print("✗ Connection refused. Is the MCP Server running?")
return False
except socket.timeout:
print("✗ Connection timed out. Is the MCP Server running?")
return False
except Exception as e:
print(f"✗ Error: {e}")
return False
if __name__ == "__main__":
print("=== MCP Server Python Execution Test ===")
success = main()
print("\n=== TEST RESULT ===")
if success:
print("✓ Python execution test PASSED")
sys.exit(0)
else:
print("✗ Python execution test FAILED")
sys.exit(1)
```
--------------------------------------------------------------------------------
/MCP/Commands/commands_python.py:
--------------------------------------------------------------------------------
```python
"""Python execution commands for Unreal Engine.
This module contains commands for executing Python code in Unreal Engine.
"""
import sys
import os
from mcp.server.fastmcp import Context
# Import send_command from the parent module
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from unreal_mcp_bridge import send_command
def register_all(mcp):
"""Register all Python execution commands with the MCP server."""
@mcp.tool()
def execute_python(ctx: Context, code: str = None, file: str = None) -> str:
"""Execute Python code or a Python script file in Unreal Engine.
This function allows you to execute arbitrary Python code directly in the Unreal Engine
environment. You can either provide Python code as a string or specify a path to a Python
script file to execute.
The Python code will have access to the full Unreal Engine Python API, including the 'unreal'
module, allowing you to interact with and manipulate the Unreal Engine editor and its assets.
Args:
code: Python code to execute as a string. Can be multiple lines.
file: Path to a Python script file to execute.
Note:
- You must provide either code or file, but not both.
- The output of the Python code will be visible in the Unreal Engine log.
- The Python code runs in the Unreal Engine process, so it has full access to the engine.
- Be careful with destructive operations as they can affect your project.
Examples:
# Execute simple Python code
execute_python(code="print('Hello from Unreal Engine!')")
# Get information about the current level
execute_python(code='''
import unreal
level = unreal.EditorLevelLibrary.get_editor_world()
print(f"Current level: {level.get_name()}")
actors = unreal.EditorLevelLibrary.get_all_level_actors()
print(f"Number of actors: {len(actors)}")
''')
# Execute a Python script file
execute_python(file="D:/my_scripts/create_assets.py")
"""
try:
if not code and not file:
return "Error: You must provide either 'code' or 'file' parameter"
if code and file:
return "Error: You can only provide either 'code' or 'file', not both"
params = {}
if code:
params["code"] = code
if file:
params["file"] = file
response = send_command("execute_python", params)
# Handle the response
if response["status"] == "success":
return f"Python execution successful:\n{response['result']['output']}"
elif response["status"] == "error":
# New format with detailed error information
result = response.get("result", {})
output = result.get("output", "")
error = result.get("error", "")
# Format the response with both output and error information
response_text = "Python execution failed with errors:\n\n"
if output:
response_text += f"--- Output ---\n{output}\n\n"
if error:
response_text += f"--- Error ---\n{error}"
return response_text
else:
return f"Error: {response['message']}"
except Exception as e:
return f"Error executing Python: {str(e)}"
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Public/MCPExtensionHandler.h:
--------------------------------------------------------------------------------
```
#pragma once
#include "CoreMinimal.h"
#include "MCPTCPServer.h"
#include "Delegates/Delegate.h"
#include "Json.h"
/**
* Delegate for handling MCP command execution
* Used by the extension system to allow easy registration of custom command handlers
*/
DECLARE_DELEGATE_RetVal_TwoParams(
TSharedPtr<FJsonObject>, // Return type: JSON response
FMCPCommandExecuteDelegate, // Delegate name
const TSharedPtr<FJsonObject>&, // Parameter 1: Command parameters
FSocket* // Parameter 2: Client socket
);
/**
* Helper class for creating external command handlers
* Makes it easy for external code to register custom commands with the MCP server
*/
class UNREALMCP_API FMCPExtensionHandler : public IMCPCommandHandler
{
public:
/**
* Constructor
* @param InCommandName - The command name this handler responds to
* @param InExecuteDelegate - The delegate to execute when this command is received
*/
FMCPExtensionHandler(const FString& InCommandName, const FMCPCommandExecuteDelegate& InExecuteDelegate)
: CommandName(InCommandName)
, ExecuteDelegate(InExecuteDelegate)
{
}
/**
* Get the command name this handler responds to
* @return The command name
*/
virtual FString GetCommandName() const override
{
return CommandName;
}
/**
* Handle the command by executing the delegate
* @param Params - The command parameters
* @param ClientSocket - The client socket
* @return JSON response object
*/
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket)
{
// If the delegate is bound, execute it
if (ExecuteDelegate.IsBound())
{
return ExecuteDelegate.Execute(Params, ClientSocket);
}
// If the delegate is not bound, return an error
TSharedPtr<FJsonObject> Response = MakeShared<FJsonObject>();
Response->SetStringField("status", "error");
Response->SetStringField("message", FString::Printf(TEXT("Command handler for '%s' has no bound execution delegate"), *CommandName));
return Response;
}
private:
/** The command name this handler responds to */
FString CommandName;
/** The delegate to execute when this command is received */
FMCPCommandExecuteDelegate ExecuteDelegate;
};
/**
* Helper utility for working with the MCP extension system
*/
class UNREALMCP_API FMCPExtensionSystem
{
public:
/**
* Register a command handler with the server
* @param Server - The MCP server
* @param CommandName - The name of the command to register
* @param ExecuteDelegate - The delegate to execute when the command is received
* @return True if registration was successful
*/
static bool RegisterCommand(FMCPTCPServer* Server, const FString& CommandName, const FMCPCommandExecuteDelegate& ExecuteDelegate)
{
if (!Server)
{
return false;
}
// Create a handler with the delegate
TSharedPtr<FMCPExtensionHandler> Handler = MakeShared<FMCPExtensionHandler>(CommandName, ExecuteDelegate);
// Register the handler with the server
return Server->RegisterExternalCommandHandler(Handler);
}
/**
* Unregister a command handler with the server
* @param Server - The MCP server
* @param CommandName - The name of the command to unregister
* @return True if unregistration was successful
*/
static bool UnregisterCommand(FMCPTCPServer* Server, const FString& CommandName)
{
if (!Server)
{
return false;
}
// Unregister the handler with the server
return Server->UnregisterExternalCommandHandler(CommandName);
}
};
```
--------------------------------------------------------------------------------
/MCP/utils/__init__.py:
--------------------------------------------------------------------------------
```python
"""Utility functions for the UnrealMCP bridge."""
import json
import socket
import sys
import os
# Try to get the port from MCPConstants
DEFAULT_PORT = 13377
DEFAULT_BUFFER_SIZE = 65536
DEFAULT_TIMEOUT = 10 # 10 second timeout
try:
# Try to read the port from the C++ constants
plugin_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(__file__)), ".."))
constants_path = os.path.join(plugin_dir, "Source", "UnrealMCP", "Public", "MCPConstants.h")
if os.path.exists(constants_path):
with open(constants_path, 'r') as f:
constants_content = f.read()
# Extract port from MCPConstants
port_match = constants_content.find("DEFAULT_PORT = ")
if port_match != -1:
port_line = constants_content[port_match:].split(';')[0]
DEFAULT_PORT = int(port_line.split('=')[1].strip())
# Extract buffer size from MCPConstants
buffer_match = constants_content.find("DEFAULT_RECEIVE_BUFFER_SIZE = ")
if buffer_match != -1:
buffer_line = constants_content[buffer_match:].split(';')[0]
DEFAULT_BUFFER_SIZE = int(buffer_line.split('=')[1].strip())
except Exception as e:
# If anything goes wrong, use the defaults (which are already defined)
print(f"Warning: Could not read constants from MCPConstants.h: {e}", file=sys.stderr)
def send_command(command_type, params=None):
"""Send a command to the C++ MCP server and return the response."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(DEFAULT_TIMEOUT) # Set a timeout
s.connect(("localhost", DEFAULT_PORT)) # Connect to Unreal C++ server
command = {
"type": command_type,
"params": params or {}
}
s.sendall(json.dumps(command).encode('utf-8'))
# Read response with a buffer
chunks = []
response_data = b''
# Wait for data with timeout
while True:
try:
chunk = s.recv(DEFAULT_BUFFER_SIZE)
if not chunk: # Connection closed
break
chunks.append(chunk)
# Try to parse what we have so far
response_data = b''.join(chunks)
try:
# If we can parse it as JSON, we have a complete response
json.loads(response_data.decode('utf-8'))
break
except json.JSONDecodeError:
# Incomplete JSON, continue receiving
continue
except socket.timeout:
# If we have some data but timed out, try to use what we have
if response_data:
break
raise
if not response_data:
raise Exception("No data received from server")
return json.loads(response_data.decode('utf-8'))
except ConnectionRefusedError:
print(f"Error: Could not connect to Unreal MCP server on localhost:{DEFAULT_PORT}.", file=sys.stderr)
print("Make sure your Unreal Engine with MCP plugin is running.", file=sys.stderr)
raise Exception("Failed to connect to Unreal MCP server: Connection refused")
except socket.timeout:
print("Error: Connection timed out while communicating with Unreal MCP server.", file=sys.stderr)
raise Exception("Failed to communicate with Unreal MCP server: Connection timed out")
except Exception as e:
print(f"Error communicating with Unreal MCP server: {str(e)}", file=sys.stderr)
raise Exception(f"Failed to communicate with Unreal MCP server: {str(e)}")
__all__ = ['send_command']
```
--------------------------------------------------------------------------------
/MCP/TestScripts/simple_test_command.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Simple Test Command for MCP Server
This script sends a very simple Python command to the MCP Server.
"""
import socket
import json
import sys
def main():
"""Send a simple Python command to the MCP Server."""
try:
# Create socket
print("Connecting to MCP Server on localhost:13377...")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5) # 5 second timeout
# Connect to server
s.connect(("localhost", 13377))
print("Connected successfully")
# Very simple Python code
code = 'import unreal\nreturn "Hello from Python!"'
# Try different command formats
formats = [
{
"name": "Format 1",
"command": {
"type": "execute_python",
"code": code
}
},
{
"name": "Format 2",
"command": {
"command": "execute_python",
"code": code
}
},
{
"name": "Format 3",
"command": {
"type": "execute_python",
"data": {
"code": code
}
}
},
{
"name": "Format 4",
"command": {
"command": "execute_python",
"type": "execute_python",
"code": code
}
},
{
"name": "Format 5",
"command": {
"command": "execute_python",
"type": "execute_python",
"data": {
"code": code
}
}
}
]
# Test each format
for format_info in formats:
format_name = format_info["name"]
command = format_info["command"]
print(f"\n=== Testing {format_name} ===")
print(f"Command: {json.dumps(command)}")
# Create a new socket for each test
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_socket.settimeout(5)
test_socket.connect(("localhost", 13377))
# Send command with newline
command_str = json.dumps(command) + "\n"
test_socket.sendall(command_str.encode('utf-8'))
# Receive response
response = b""
while True:
data = test_socket.recv(4096)
if not data:
break
response += data
if b"\n" in data:
break
# Close socket
test_socket.close()
# Process response
if response:
response_str = response.decode('utf-8').strip()
print(f"Response: {response_str}")
try:
response_json = json.loads(response_str)
status = response_json.get('status', 'unknown')
if status == 'success':
print(f"✓ {format_name} SUCCEEDED")
return True
else:
print(f"✗ {format_name} FAILED: {response_json.get('message', 'Unknown error')}")
except json.JSONDecodeError as e:
print(f"✗ Error parsing JSON response: {e}")
else:
print("✗ No response received")
print("\nAll formats failed.")
return False
except Exception as e:
print(f"Error: {e}")
return False
if __name__ == "__main__":
print("=== Simple Test Command for MCP Server ===")
success = main()
sys.exit(0 if success else 1)
```
--------------------------------------------------------------------------------
/MCP/TestScripts/format_test.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
# format_test.py - Tests different command formats for MCP Server
import socket
import json
import sys
import time
# Configuration
HOST = '127.0.0.1'
PORT = 13377
TIMEOUT = 5 # seconds
# Simple Python code to execute
PYTHON_CODE = """
import unreal
return "Hello from Python!"
"""
def send_command(command_dict):
"""Send a command to the MCP Server and return the response."""
try:
# Create a socket connection
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(TIMEOUT)
s.connect((HOST, PORT))
# Convert command to JSON and send
command_json = json.dumps(command_dict)
print(f"Sending command format: {command_json}")
s.sendall(command_json.encode('utf-8'))
# Receive response
response = b""
while True:
chunk = s.recv(4096)
if not chunk:
break
response += chunk
# Parse and return response
if response:
try:
return json.loads(response.decode('utf-8'))
except json.JSONDecodeError:
return {"status": "error", "message": "Invalid JSON response", "raw": response.decode('utf-8')}
else:
return {"status": "error", "message": "Empty response"}
except socket.timeout:
return {"status": "error", "message": "Connection timed out"}
except ConnectionRefusedError:
return {"status": "error", "message": "Connection refused. Is the MCP Server running?"}
except Exception as e:
return {"status": "error", "message": f"Error: {str(e)}"}
def test_format(format_name, command_dict):
"""Test a specific command format and print the result."""
print(f"\n=== Testing Format: {format_name} ===")
response = send_command(command_dict)
print(f"Response: {json.dumps(response, indent=2)}")
if response.get("status") == "success":
print(f"✅ SUCCESS: Format '{format_name}' works!")
return True
else:
print(f"❌ FAILED: Format '{format_name}' does not work.")
return False
def main():
print("=== MCP Server Command Format Test ===")
print(f"Connecting to {HOST}:{PORT}")
# Test different command formats
formats = [
("Format 1: Basic", {
"command": "execute_python",
"code": PYTHON_CODE
}),
("Format 2: Type field", {
"type": "execute_python",
"code": PYTHON_CODE
}),
("Format 3: Command in data", {
"command": "execute_python",
"data": {
"code": PYTHON_CODE
}
}),
("Format 4: Type in data", {
"type": "execute_python",
"data": {
"code": PYTHON_CODE
}
}),
("Format 5: Command and params", {
"command": "execute_python",
"params": {
"code": PYTHON_CODE
}
}),
("Format 6: Type and params", {
"type": "execute_python",
"params": {
"code": PYTHON_CODE
}
}),
("Format 7: Command and type", {
"command": "execute_python",
"type": "python",
"code": PYTHON_CODE
}),
("Format 8: Command, type, and data", {
"command": "execute_python",
"type": "python",
"data": {
"code": PYTHON_CODE
}
})
]
success_count = 0
for format_name, command_dict in formats:
if test_format(format_name, command_dict):
success_count += 1
time.sleep(1) # Brief pause between tests
print(f"\n=== Test Summary ===")
print(f"Tested {len(formats)} command formats")
print(f"Successful formats: {success_count}")
print(f"Failed formats: {len(formats) - success_count}")
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Private/MCPExtensionExample.cpp:
--------------------------------------------------------------------------------
```cpp
#include "MCPExtensionHandler.h"
#include "MCPFileLogger.h"
/**
* Example showing how to use the MCP extension system
*
* This code demonstrates how external modules can extend the MCP server
* with custom commands without modifying the MCP plugin code.
*/
class FMCPExtensionExample
{
public:
static void RegisterCustomCommands(FMCPTCPServer* Server)
{
if (!Server || !Server->IsRunning())
{
return;
}
// Register a custom "hello_world" command
FMCPExtensionSystem::RegisterCommand(
Server,
"hello_world",
FMCPCommandExecuteDelegate::CreateStatic(&FMCPExtensionExample::HandleHelloWorldCommand)
);
// Register a custom "echo" command
FMCPExtensionSystem::RegisterCommand(
Server,
"echo",
FMCPCommandExecuteDelegate::CreateStatic(&FMCPExtensionExample::HandleEchoCommand)
);
}
static void UnregisterCustomCommands(FMCPTCPServer* Server)
{
if (!Server)
{
return;
}
// Unregister the custom commands
FMCPExtensionSystem::UnregisterCommand(Server, "hello_world");
FMCPExtensionSystem::UnregisterCommand(Server, "echo");
}
private:
/**
* Handle the "hello_world" command
* @param Params - The command parameters
* @param ClientSocket - The client socket
* @return JSON response
*/
static TSharedPtr<FJsonObject> HandleHelloWorldCommand(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket)
{
// Log that we received this command
UE_LOG(LogMCP, Display, TEXT("Received hello_world command"));
// Get the name parameter if provided
FString Name = "World";
Params->TryGetStringField(FStringView(TEXT("name")), Name);
// Create the response
TSharedPtr<FJsonObject> Result = MakeShared<FJsonObject>();
Result->SetStringField("message", FString::Printf(TEXT("Hello, %s!"), *Name));
// Create the success response with the result
TSharedPtr<FJsonObject> Response = MakeShared<FJsonObject>();
Response->SetStringField("status", "success");
Response->SetObjectField("result", Result);
return Response;
}
/**
* Handle the "echo" command
* @param Params - The command parameters
* @param ClientSocket - The client socket
* @return JSON response
*/
static TSharedPtr<FJsonObject> HandleEchoCommand(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket)
{
// Log that we received this command
UE_LOG(LogMCP, Display, TEXT("Received echo command"));
// Create the response
TSharedPtr<FJsonObject> Response = MakeShared<FJsonObject>();
Response->SetStringField("status", "success");
// Echo back all parameters as the result
Response->SetObjectField("result", Params);
return Response;
}
};
// The following code shows how you might register these handlers in your own module
// Uncomment this code and modify as needed for your project
/*
void YourGameModule::StartupModule()
{
// ... your existing code ...
// Get a reference to the MCP server
FUnrealMCPModule& MCPModule = FModuleManager::LoadModuleChecked<FUnrealMCPModule>("UnrealMCP");
FMCPTCPServer* MCPServer = MCPModule.GetServer();
if (MCPServer && MCPServer->IsRunning())
{
// Register custom commands
FMCPExtensionExample::RegisterCustomCommands(MCPServer);
}
else
{
// The MCP server isn't running yet
// You might want to set up a delegate to register when it starts
// or expose a function in the MCP module that lets you register commands
// that will be applied when the server starts
}
}
void YourGameModule::ShutdownModule()
{
// Get a reference to the MCP server
FUnrealMCPModule& MCPModule = FModuleManager::GetModulePtr<FUnrealMCPModule>("UnrealMCP");
if (MCPModule)
{
FMCPTCPServer* MCPServer = MCPModule.GetServer();
if (MCPServer)
{
// Unregister custom commands
FMCPExtensionExample::UnregisterCustomCommands(MCPServer);
}
}
// ... your existing code ...
}
*/
```
--------------------------------------------------------------------------------
/MCP/Commands/commands_materials.py:
--------------------------------------------------------------------------------
```python
"""Material-related commands for Unreal Engine.
This module contains all material-related commands for the UnrealMCP bridge,
including creation, modification, and querying of materials.
"""
import sys
import os
import importlib.util
import importlib
from mcp.server.fastmcp import Context
# Import send_command from the parent module
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from unreal_mcp_bridge import send_command
def register_all(mcp):
"""Register all material-related commands with the MCP server."""
# Create material command
@mcp.tool()
def create_material(ctx: Context, package_path: str, name: str, properties: dict = None) -> str:
"""Create a new material in the Unreal project.
Args:
package_path: The path where the material should be created (e.g., '/Game/Materials')
name: The name of the material
properties: Optional dictionary of material properties to set. Can include:
- shading_model: str (e.g., "DefaultLit", "Unlit", "Subsurface", etc.)
- blend_mode: str (e.g., "Opaque", "Masked", "Translucent", etc.)
- two_sided: bool
- dithered_lod_transition: bool
- cast_contact_shadow: bool
- base_color: list[float] (RGBA values 0-1)
- metallic: float (0-1)
- roughness: float (0-1)
"""
try:
params = {
"package_path": package_path,
"name": name
}
if properties:
params["properties"] = properties
response = send_command("create_material", params)
if response["status"] == "success":
return f"Created material: {response['result']['name']} at path: {response['result']['path']}"
else:
return f"Error: {response['message']}"
except Exception as e:
return f"Error creating material: {str(e)}"
# Modify material command
@mcp.tool()
def modify_material(ctx: Context, path: str, properties: dict) -> str:
"""Modify an existing material's properties.
Args:
path: The full path to the material (e.g., '/Game/Materials/MyMaterial')
properties: Dictionary of material properties to set. Can include:
- shading_model: str (e.g., "DefaultLit", "Unlit", "Subsurface", etc.)
- blend_mode: str (e.g., "Opaque", "Masked", "Translucent", etc.)
- two_sided: bool
- dithered_lod_transition: bool
- cast_contact_shadow: bool
- base_color: list[float] (RGBA values 0-1)
- metallic: float (0-1)
- roughness: float (0-1)
"""
try:
params = {
"path": path,
"properties": properties
}
response = send_command("modify_material", params)
if response["status"] == "success":
return f"Modified material: {response['result']['name']} at path: {response['result']['path']}"
else:
return f"Error: {response['message']}"
except Exception as e:
return f"Error modifying material: {str(e)}"
# Get material info command
@mcp.tool()
def get_material_info(ctx: Context, path: str) -> dict:
"""Get information about a material.
Args:
path: The full path to the material (e.g., '/Game/Materials/MyMaterial')
Returns:
Dictionary containing material information including:
- name: str
- path: str
- shading_model: str
- blend_mode: str
- two_sided: bool
- dithered_lod_transition: bool
- cast_contact_shadow: bool
- base_color: list[float]
- metallic: float
- roughness: float
"""
try:
params = {"path": path}
response = send_command("get_material_info", params)
if response["status"] == "success":
return response["result"]
else:
return {"error": response["message"]}
except Exception as e:
return {"error": str(e)}
```
--------------------------------------------------------------------------------
/MCP/temp_update_config.py:
--------------------------------------------------------------------------------
```python
"""
Script to configure Claude Desktop MCP integration.
This script will update or create the necessary configuration for Claude Desktop
to use the Unreal MCP bridge.
"""
import json
import os
import sys
import shutil
from pathlib import Path
def check_claude_installed():
"""Check if Claude Desktop is installed by looking for common installation paths."""
claude_paths = []
if os.name == 'nt': # Windows
claude_paths = [
os.path.join(os.environ.get('LOCALAPPDATA', ''), 'Programs', 'Claude Desktop', 'Claude Desktop.exe'),
os.path.join(os.environ.get('PROGRAMFILES', ''), 'Claude Desktop', 'Claude Desktop.exe'),
os.path.join(os.environ.get('PROGRAMFILES(X86)', ''), 'Claude Desktop', 'Claude Desktop.exe')
]
elif os.name == 'darwin': # macOS
claude_paths = [
'/Applications/Claude Desktop.app',
os.path.expanduser('~/Applications/Claude Desktop.app')
]
# Check if any of the paths exist
for path in claude_paths:
if os.path.exists(path):
return True
# Check if config directory exists as a fallback
claude_config_dir = os.environ.get('APPDATA', '')
if os.name == 'nt': # Windows
claude_config_dir = os.path.join(claude_config_dir, 'Claude')
elif os.name == 'darwin': # macOS
claude_config_dir = os.path.expanduser('~/Library/Application Support/Claude')
return os.path.exists(claude_config_dir)
def update_claude_config(config_file, run_script):
"""Update the Claude Desktop configuration file."""
# Check if Claude is installed
if not check_claude_installed():
print(f"Claude Desktop doesn't appear to be installed on this system.")
print("You can download Claude Desktop from: https://claude.ai/download")
print("After installing Claude Desktop, run this script again.")
return False
# Make sure the config directory exists
config_dir = os.path.dirname(config_file)
if not os.path.exists(config_dir):
try:
print(f"Creating Claude configuration directory: {config_dir}")
os.makedirs(config_dir, exist_ok=True)
except Exception as e:
print(f"Error creating Claude configuration directory: {str(e)}")
return False
# Load existing config or create new one
config = {}
try:
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
print(f"Creating new Claude Desktop configuration file.")
# Create backup of the original file if it exists
if os.path.exists(config_file):
backup_path = config_file + '.bak'
try:
shutil.copy2(config_file, backup_path)
print(f"Created backup of original configuration at: {backup_path}")
except Exception as e:
print(f"Warning: Couldn't create backup: {str(e)}")
# Update the config
config.setdefault('mcpServers', {})['unreal'] = {'command': run_script, 'args': []}
# Save the updated configuration
try:
with open(config_file, 'w') as f:
json.dump(config, f, indent=4)
print(f"Successfully updated Claude Desktop configuration at: {config_file}")
return True
except Exception as e:
print(f"Error saving Claude Desktop configuration: {str(e)}")
return False
def main():
if len(sys.argv) < 3:
print("Usage: python temp_update_config.py <config_file> <run_script>")
return 1
config_file = sys.argv[1]
run_script = sys.argv[2]
# Get absolute paths
config_file = os.path.abspath(config_file)
run_script = os.path.abspath(run_script)
if not os.path.exists(run_script):
print(f"Error: Run script not found at: {run_script}")
return 1
# Update the configuration
if update_claude_config(config_file, run_script):
print("\nClaude Desktop has been configured to use Unreal MCP!")
print("\nTo use with Claude Desktop:")
print("1. Make sure Unreal Engine with MCP plugin is running")
print("2. Start Claude Desktop and it should automatically use the Unreal MCP tools")
return 0
else:
print("\nFailed to configure Claude Desktop for Unreal MCP.")
return 1
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Private/MCPFileLogger.h:
--------------------------------------------------------------------------------
```
#pragma once
#include "CoreMinimal.h"
#include "Misc/FileHelper.h"
#include "HAL/PlatformFilemanager.h"
#include "UnrealMCP.h"
// Shorthand for logger
#define MCP_LOG(Verbosity, Format, ...) FMCPFileLogger::Get().Log(ELogVerbosity::Verbosity, FString::Printf(TEXT(Format), ##__VA_ARGS__))
#define MCP_LOG_INFO(Format, ...) FMCPFileLogger::Get().Info(FString::Printf(TEXT(Format), ##__VA_ARGS__))
#define MCP_LOG_ERROR(Format, ...) FMCPFileLogger::Get().Error(FString::Printf(TEXT(Format), ##__VA_ARGS__))
#define MCP_LOG_WARNING(Format, ...) FMCPFileLogger::Get().Warning(FString::Printf(TEXT(Format), ##__VA_ARGS__))
#define MCP_LOG_VERBOSE(Format, ...) FMCPFileLogger::Get().Verbose(FString::Printf(TEXT(Format), ##__VA_ARGS__))
/**
* Simple file logger for MCP operations
* Writes logs to a file in the plugin directory
*/
class FMCPFileLogger
{
public:
static FMCPFileLogger& Get()
{
static FMCPFileLogger Instance;
return Instance;
}
void Initialize(const FString& InLogFilePath)
{
LogFilePath = InLogFilePath;
// Create or clear the log file
FString LogDirectory = FPaths::GetPath(LogFilePath);
IPlatformFile& PlatformFile = FPlatformFileManager::Get().GetPlatformFile();
if (!PlatformFile.DirectoryExists(*LogDirectory))
{
PlatformFile.CreateDirectoryTree(*LogDirectory);
}
// Clear the file and write a header
FString Header = FString::Printf(TEXT("MCP Server Log - Started at %s\n"), *FDateTime::Now().ToString());
FFileHelper::SaveStringToFile(Header, *LogFilePath);
bInitialized = true;
UE_LOG(LogMCP, Log, TEXT("MCP File Logger initialized at %s"), *LogFilePath);
}
// Log with verbosity level
void Log(ELogVerbosity::Type Verbosity, const FString& Message)
{
if (!bInitialized) return;
// Log to Unreal's logging system - need to handle each verbosity level separately
switch (Verbosity)
{
case ELogVerbosity::Fatal:
UE_LOG(LogMCP, Fatal, TEXT("%s"), *Message);
break;
case ELogVerbosity::Error:
UE_LOG(LogMCP, Error, TEXT("%s"), *Message);
break;
case ELogVerbosity::Warning:
UE_LOG(LogMCP, Warning, TEXT("%s"), *Message);
break;
case ELogVerbosity::Display:
UE_LOG(LogMCP, Display, TEXT("%s"), *Message);
break;
case ELogVerbosity::Log:
UE_LOG(LogMCP, Log, TEXT("%s"), *Message);
break;
case ELogVerbosity::Verbose:
UE_LOG(LogMCP, Verbose, TEXT("%s"), *Message);
break;
case ELogVerbosity::VeryVerbose:
UE_LOG(LogMCP, VeryVerbose, TEXT("%s"), *Message);
break;
default:
UE_LOG(LogMCP, Log, TEXT("%s"), *Message);
break;
}
// Also log to file
FString TimeStamp = FDateTime::Now().ToString();
FString VerbosityStr;
switch (Verbosity)
{
case ELogVerbosity::Fatal: VerbosityStr = TEXT("Fatal"); break;
case ELogVerbosity::Error: VerbosityStr = TEXT("Error"); break;
case ELogVerbosity::Warning: VerbosityStr = TEXT("Warning"); break;
case ELogVerbosity::Display: VerbosityStr = TEXT("Display"); break;
case ELogVerbosity::Log: VerbosityStr = TEXT("Log"); break;
case ELogVerbosity::Verbose: VerbosityStr = TEXT("Verbose"); break;
case ELogVerbosity::VeryVerbose: VerbosityStr = TEXT("VeryVerbose"); break;
default: VerbosityStr = TEXT("Unknown"); break;
}
FString LogEntry = FString::Printf(TEXT("[%s][%s] %s\n"), *TimeStamp, *VerbosityStr, *Message);
FFileHelper::SaveStringToFile(LogEntry, *LogFilePath, FFileHelper::EEncodingOptions::AutoDetect, &IFileManager::Get(), EFileWrite::FILEWRITE_Append);
}
// Convenience methods for different verbosity levels
void Error(const FString& Message) { Log(ELogVerbosity::Error, Message); }
void Warning(const FString& Message) { Log(ELogVerbosity::Warning, Message); }
void Info(const FString& Message) { Log(ELogVerbosity::Log, Message); }
void Verbose(const FString& Message) { Log(ELogVerbosity::Verbose, Message); }
// For backward compatibility
void Log(const FString& Message) { Info(Message); }
private:
FMCPFileLogger() : bInitialized(false) {}
~FMCPFileLogger() {}
// Make non-copyable
FMCPFileLogger(const FMCPFileLogger&) = delete;
FMCPFileLogger& operator=(const FMCPFileLogger&) = delete;
bool bInitialized;
FString LogFilePath;
};
```
--------------------------------------------------------------------------------
/MCP/cursor_setup.py:
--------------------------------------------------------------------------------
```python
"""
Script to configure Cursor MCP integration.
This script will update or create the necessary configuration for Cursor
to use the Unreal MCP bridge.
"""
import json
import os
import sys
import shutil
import argparse
from pathlib import Path
def get_cursor_config_dir():
"""Get the Cursor configuration directory based on OS."""
if os.name == 'nt': # Windows
appdata = os.environ.get('APPDATA', '')
return os.path.join(appdata, 'Cursor', 'User')
elif os.name == 'darwin': # macOS
return os.path.expanduser('~/Library/Application Support/Cursor/User')
else: # Linux
return os.path.expanduser('~/.config/Cursor/User')
def check_cursor_installed():
"""Check if Cursor is installed by looking for common installation paths."""
cursor_paths = []
if os.name == 'nt': # Windows
cursor_paths = [
os.path.join(os.environ.get('LOCALAPPDATA', ''), 'Programs', 'Cursor', 'Cursor.exe'),
os.path.join(os.environ.get('PROGRAMFILES', ''), 'Cursor', 'Cursor.exe'),
os.path.join(os.environ.get('PROGRAMFILES(X86)', ''), 'Cursor', 'Cursor.exe')
]
elif os.name == 'darwin': # macOS
cursor_paths = [
'/Applications/Cursor.app',
os.path.expanduser('~/Applications/Cursor.app')
]
else: # Linux
cursor_paths = [
'/usr/bin/cursor',
'/usr/local/bin/cursor',
os.path.expanduser('~/.local/bin/cursor')
]
# Check if any of the paths exist
for path in cursor_paths:
if os.path.exists(path):
return True
# Check if config directory exists as a fallback
return os.path.exists(get_cursor_config_dir())
def configure_cursor_mcp(run_script_path):
"""Configure Cursor to use the Unreal MCP bridge."""
# First check if Cursor is installed
if not check_cursor_installed():
print(f"Cursor doesn't appear to be installed on this system.")
print("You can download Cursor from: https://cursor.sh/")
print("After installing Cursor, run this script again.")
return False
cursor_config_dir = get_cursor_config_dir()
if not os.path.exists(cursor_config_dir):
try:
print(f"Creating Cursor configuration directory: {cursor_config_dir}")
os.makedirs(cursor_config_dir, exist_ok=True)
except Exception as e:
print(f"Error creating Cursor configuration directory: {str(e)}")
return False
# Create settings.json path
settings_path = os.path.join(cursor_config_dir, 'settings.json')
# Load existing settings or create new ones
settings = {}
if os.path.exists(settings_path):
try:
with open(settings_path, 'r') as f:
settings = json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
print(f"Could not read existing settings file, creating new one.")
# Ensure the settings structure exists
settings.setdefault('mcp', {})
# Add UnrealMCP to the MCP servers list
settings['mcp'].setdefault('servers', {})
# Configure the Unreal MCP server
settings['mcp']['servers']['unreal'] = {
'command': run_script_path,
'args': []
}
# Enable MCP in Cursor
settings['mcp']['enabled'] = True
# Save the updated settings file
try:
# Create backup of the original file if it exists
if os.path.exists(settings_path):
backup_path = settings_path + '.bak'
shutil.copy2(settings_path, backup_path)
print(f"Created backup of original settings at: {backup_path}")
# Write the new settings
with open(settings_path, 'w') as f:
json.dump(settings, f, indent=4)
print(f"Successfully updated Cursor settings at: {settings_path}")
print("Please restart Cursor for the changes to take effect.")
return True
except Exception as e:
print(f"Error saving Cursor settings: {str(e)}")
return False
def main():
parser = argparse.ArgumentParser(description='Configure Cursor for Unreal MCP')
parser.add_argument('--script', help='Path to the run_unreal_mcp.bat script',
default=os.path.abspath(os.path.join(os.path.dirname(__file__), 'run_unreal_mcp.bat')))
args = parser.parse_args()
# Get absolute path to the run script
run_script_path = os.path.abspath(args.script)
if not os.path.exists(run_script_path):
print(f"Error: Run script not found at: {run_script_path}")
return 1
# Configure Cursor
if configure_cursor_mcp(run_script_path):
print("\nCursor has been configured to use Unreal MCP!")
print("\nTo use with Cursor:")
print("1. Make sure Unreal Engine with MCP plugin is running")
print("2. Start Cursor and it should automatically use the Unreal MCP tools")
return 0
else:
print("\nFailed to configure Cursor for Unreal MCP.")
return 1
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Public/MCPCommandHandlers.h:
--------------------------------------------------------------------------------
```
#pragma once
#include "CoreMinimal.h"
#include "MCPTCPServer.h"
#include "Engine/World.h"
#include "Engine/StaticMeshActor.h"
#include "Components/StaticMeshComponent.h"
/**
* Base class for MCP command handlers
*/
class FMCPCommandHandlerBase : public IMCPCommandHandler
{
public:
/**
* Constructor
* @param InCommandName - The command name this handler responds to
*/
explicit FMCPCommandHandlerBase(const FString& InCommandName)
: CommandName(InCommandName)
{
}
/**
* Get the command name this handler responds to
* @return The command name
*/
virtual FString GetCommandName() const override
{
return CommandName;
}
protected:
/**
* Create an error response
* @param Message - The error message
* @return JSON response object
*/
TSharedPtr<FJsonObject> CreateErrorResponse(const FString& Message)
{
TSharedPtr<FJsonObject> Response = MakeShared<FJsonObject>();
Response->SetStringField("status", "error");
Response->SetStringField("message", Message);
return Response;
}
/**
* Create a success response
* @param Result - Optional result object
* @return JSON response object
*/
TSharedPtr<FJsonObject> CreateSuccessResponse(TSharedPtr<FJsonObject> Result = nullptr)
{
TSharedPtr<FJsonObject> Response = MakeShared<FJsonObject>();
Response->SetStringField("status", "success");
if (Result.IsValid())
{
Response->SetObjectField("result", Result);
}
return Response;
}
/** The command name this handler responds to */
FString CommandName;
};
/**
* Handler for the get_scene_info command
*/
class FMCPGetSceneInfoHandler : public FMCPCommandHandlerBase
{
public:
FMCPGetSceneInfoHandler()
: FMCPCommandHandlerBase("get_scene_info")
{
}
/**
* Execute the get_scene_info command
* @param Params - The command parameters
* @param ClientSocket - The client socket
* @return JSON response object
*/
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) override;
};
/**
* Handler for the create_object command
*/
class FMCPCreateObjectHandler : public FMCPCommandHandlerBase
{
public:
FMCPCreateObjectHandler()
: FMCPCommandHandlerBase("create_object")
{
}
/**
* Execute the create_object command
* @param Params - The command parameters
* @param ClientSocket - The client socket
* @return JSON response object
*/
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) override;
protected:
/**
* Create a static mesh actor
* @param World - The world to create the actor in
* @param Location - The location to create the actor at
* @param MeshPath - Optional path to the mesh to use
* @param Label - Optional custom label for the actor in the outliner
* @return The created actor and a success flag
*/
TPair<AStaticMeshActor*, bool> CreateStaticMeshActor(UWorld* World, const FVector& Location, const FString& MeshPath = "", const FString& Label = "");
/**
* Create a cube actor
* @param World - The world to create the actor in
* @param Location - The location to create the actor at
* @param Label - Optional custom label for the actor in the outliner
* @return The created actor and a success flag
*/
TPair<AStaticMeshActor*, bool> CreateCubeActor(UWorld* World, const FVector& Location, const FString& Label = "");
};
/**
* Handler for the modify_object command
*/
class FMCPModifyObjectHandler : public FMCPCommandHandlerBase
{
public:
FMCPModifyObjectHandler()
: FMCPCommandHandlerBase("modify_object")
{
}
/**
* Execute the modify_object command
* @param Params - The command parameters
* @param ClientSocket - The client socket
* @return JSON response object
*/
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) override;
};
/**
* Handler for the delete_object command
*/
class FMCPDeleteObjectHandler : public FMCPCommandHandlerBase
{
public:
FMCPDeleteObjectHandler()
: FMCPCommandHandlerBase("delete_object")
{
}
/**
* Execute the delete_object command
* @param Params - The command parameters
* @param ClientSocket - The client socket
* @return JSON response object
*/
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) override;
};
/**
* Handler for the execute_python command
*/
class FMCPExecutePythonHandler : public FMCPCommandHandlerBase
{
public:
FMCPExecutePythonHandler()
: FMCPCommandHandlerBase("execute_python")
{
}
/**
* Execute the execute_python command
* @param Params - The command parameters
* @param ClientSocket - The client socket
* @return JSON response object
*/
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) override;
};
```
--------------------------------------------------------------------------------
/MCP/TestScripts/3_string_test.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
String Handling Test for MCP Server
This script tests string handling in the MCP Server.
It connects to the server, sends Python code with various string formats,
and verifies they are handled correctly.
"""
import socket
import json
import sys
def main():
"""Connect to the MCP Server and test string handling."""
try:
# Create socket
print("Connecting to MCP Server on localhost:13377...")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(10) # 10 second timeout
# Connect to server
s.connect(("localhost", 13377))
print("✓ Connected successfully")
# Python code with various string formats
code = """
import unreal
import json
# Test 1: Basic string types
test_string1 = "This is a simple string with double quotes"
test_string2 = 'This is a simple string with single quotes'
test_string3 = \"\"\"This is a
multiline string\"\"\"
test_string4 = f"This is an f-string with {'interpolation'}"
test_string5 = "This string has escape sequences: \\n \\t \\\\ \\' \\""
test_string6 = r"This is a raw string with no escape processing: \n \t \\"
# Test 2: Print statements
print("Print statement 1: Simple string")
print(f"Print statement 2: F-string with {test_string2}")
print("Print statement 3: Multiple", "arguments", 123, test_string3)
print("Print statement 4: With escape sequences: \\n \\t")
# Test 3: Potentially problematic strings
test_string7 = "String with quotes: 'single' and \\"double\\""
test_string8 = 'String with quotes: "double" and \\'single\\''
test_string9 = "String with backslashes: \\ \\\\ \\n"
test_string10 = "String with special characters: 🐍 😊 🚀"
# Test 4: Unterminated strings (these are properly terminated but might be misinterpreted)
test_string11 = "String with a quote at the end: '"
test_string12 = 'String with a quote at the end: "'
test_string13 = "String with a backslash at the end: \\"
test_string14 = "String with multiple backslashes at the end: \\\\"
# Test 5: String concatenation
test_string15 = "Part 1 " + "Part 2"
test_string16 = "Multiple " + "parts " + "concatenated"
test_string17 = "Mixed " + 'quote' + " types"
# Collect results in a dictionary
results = {
"test1": test_string1,
"test2": test_string2,
"test3": test_string3,
"test4": test_string4,
"test5": test_string5,
"test6": test_string6,
"test7": test_string7,
"test8": test_string8,
"test9": test_string9,
"test10": test_string10,
"test11": test_string11,
"test12": test_string12,
"test13": test_string13,
"test14": test_string14,
"test15": test_string15,
"test16": test_string16,
"test17": test_string17
}
# Log the results
unreal.log("===== STRING TEST RESULTS =====")
for key, value in results.items():
unreal.log(f"{key}: {value}")
# Return the results as JSON
return json.dumps(results, indent=2)
"""
# Create command with multiple formats to try to make it work
command = {
"command": "execute_python",
"type": "execute_python",
"code": code,
"data": {
"code": code
}
}
# Send command
print("Sending execute_python command...")
command_str = json.dumps(command) + "\n" # Add newline
s.sendall(command_str.encode('utf-8'))
# Receive response
print("Waiting for response...")
response = b""
while True:
data = s.recv(4096)
if not data:
break
response += data
if b"\n" in data: # Check for newline which indicates end of response
break
# Close connection
s.close()
print("✓ Connection closed properly")
# Process response
if response:
response_str = response.decode('utf-8').strip()
try:
response_json = json.loads(response_str)
print("\n=== RESPONSE ===")
print(f"Status: {response_json.get('status', 'unknown')}")
if response_json.get('status') == 'success':
print("✓ String test successful")
result = response_json.get('result', {})
if isinstance(result, dict) and 'output' in result:
output = result['output']
print(f"Output length: {len(output)} characters")
print("First 200 characters of output:")
print(output[:200] + "..." if len(output) > 200 else output)
return True
else:
print("✗ String test failed")
print(f"Error: {response_json.get('message', 'Unknown error')}")
return False
except json.JSONDecodeError as e:
print(f"✗ Error parsing JSON response: {e}")
print(f"Raw response: {response_str}")
return False
else:
print("✗ No response received from server")
return False
except ConnectionRefusedError:
print("✗ Connection refused. Is the MCP Server running?")
return False
except socket.timeout:
print("✗ Connection timed out. Is the MCP Server running?")
return False
except Exception as e:
print(f"✗ Error: {e}")
return False
if __name__ == "__main__":
print("=== MCP Server String Handling Test ===")
success = main()
print("\n=== TEST RESULT ===")
if success:
print("✓ String handling test PASSED")
sys.exit(0)
else:
print("✗ String handling test FAILED")
sys.exit(1)
```
--------------------------------------------------------------------------------
/MCP/README_MCP_SETUP.md:
--------------------------------------------------------------------------------
```markdown
Here's the README in Markdown format (already used in the previous response, but I'll ensure it's clean and ready for copy-pasting). You can directly copy this into a `README.md` file for your GitHub repository:
```markdown
# Unreal Engine MCP Interface
This project provides a Model Context Protocol (MCP) interface for Unreal Engine, enabling seamless integration with Claude Desktop and Cursor. With this interface, users can interact with Unreal Engine using natural language commands through their preferred AI assistant, simplifying scene management and object manipulation.
## Table of Contents
- [Prerequisites](#prerequisites)
- [Quick Setup](#quick-setup)
- [Manual Configuration](#manual-configuration)
- [Troubleshooting](#troubleshooting)
- [Usage](#usage)
- [Available Commands](#available-commands)
- [Testing the MCP Server Directly](#testing-the-mcp-server-directly)
## Prerequisites
To set up the MCP interface, ensure you have the following:
- **Python 3.7 or newer** installed on your system
- **Claude Desktop** application or **Cursor** application
- **Unreal Engine** with the UnrealMCP plugin enabled
## Quick Setup
The setup process is streamlined with a single script that handles all installation scenarios:
1. Navigate to the `Plugins\UnrealMCP\MCP\` directory.
2. Run the following script:
```
Plugins\UnrealMCP\MCP\setup_unreal_mcp.bat
```
This script will:
- Detect available Python environments (System Python, Miniconda/Anaconda, Claude Desktop environment, Cursor environment)
- Prompt you to choose a Python environment
- Install the required `mcp` package in the selected environment
- Generate a `run_unreal_mcp.bat` script tailored to the chosen Python environment
- Prompt you to configure Claude Desktop, Cursor, both, or skip configuration
- Create or update the configuration files for the selected AI assistants
### Command Line Options
The setup script supports the following command line options:
```
setup_unreal_mcp.bat [OPTIONS]
Options:
--help Show help message
--configure-claude Configure Claude Desktop (default)
--configure-cursor Configure Cursor
--configure-both Configure both Claude and Cursor
--skip-config Skip configuration
```
### Python Environment Options
The setup script supports multiple Python environment options:
1. **System Python**: Uses the Python installation in your system PATH.
2. **Miniconda/Anaconda**: Uses a Python environment from Miniconda/Anaconda (recommended for users integrating with Blender via Claude Desktop).
3. **Claude Desktop Environment**: Uses the Python environment bundled with Claude Desktop (if available).
4. **Cursor Environment**: Uses the Python environment bundled with Cursor (if available).
## Manual Configuration
For manual setup, follow these steps:
### 1. Install Required Python Package
Install the `mcp` package with the following command:
```bash
python -m pip install mcp>=0.1.0
```
### 2. Create a Run Script
Create a batch file named `run_unreal_mcp.bat` with this content:
```batch
@echo off
setlocal
cd /d "%~dp0"
python "%~dp0unreal_mcp_server.py"
```
Save it in the `Plugins\UnrealMCP\MCP\` directory.
### 3. Configure Claude Desktop
Locate or create the Claude Desktop configuration file at:
```
%APPDATA%\Claude\claude_desktop_config.json
```
Add or update it with the following content, replacing the path with the actual location of your `run_unreal_mcp.bat`:
```json
{
"mcpServers": {
"unreal": {
"command": "C:\\Path\\To\\Your\\Plugins\\UnrealMCP\\MCP\\run_unreal_mcp.bat",
"args": []
}
}
}
```
### 4. Configure Cursor
Locate or create the Cursor settings file at:
```
%APPDATA%\Cursor\User\settings.json
```
Add or update it with the following MCP configuration, replacing the path with the actual location of your `run_unreal_mcp.bat`:
```json
{
"mcp": {
"enabled": true,
"servers": {
"unreal": {
"command": "C:\\Path\\To\\Your\\Plugins\\UnrealMCP\\MCP\\run_unreal_mcp.bat",
"args": []
}
}
}
}
```
## Troubleshooting
### Common Issues
1. **"No module named 'mcp'"**
- **Cause**: The `mcp` package isn't installed in the Python environment used by Claude Desktop or Cursor.
- **Solution**: Rerun the `setup_unreal_mcp.bat` script and select the correct Python environment.
2. **Connection refused errors**
- **Cause**: The MCP server isn't running or isn't listening on port 13377.
- **Solution**:
- Ensure Unreal Engine is running with the MCP plugin enabled.
- Confirm the MCP plugin's port setting matches the default (13377).
3. **Claude Desktop or Cursor can't start the MCP server**
- **Cause**: Configuration or file path issues.
- **Solution**:
- For Claude: Check the logs at: `%APPDATA%\Claude\logs\mcp-server-unreal.log`
- Verify the path in the configuration file is correct.
- Ensure `run_unreal_mcp.bat` exists and references the correct Python interpreter.
### Checking Logs
Claude Desktop logs MCP server output to:
```
%APPDATA%\Claude\logs\mcp-server-unreal.log
```
Review this file for detailed error messages.
## Usage
To use the MCP interface:
1. Launch your Unreal Engine project with the MCP plugin enabled.
2. Open Claude Desktop or Cursor.
3. Use natural language commands in your AI assistant, such as:
- "Show me what's in the current Unreal scene"
- "Create a cube at position [0, 0, 100]"
- "Modify the object named 'Cube_1' to have scale [2, 2, 2]"
- "Delete the object named 'Cube_1'"
## Available Commands
The MCP interface supports these commands:
- **`get_scene_info`**: Retrieves details about the current scene.
- **`create_object`**: Spawns a new object in the scene.
- **`modify_object`**: Updates properties of an existing object.
- **`delete_object`**: Removes an object from the scene.
## Testing the MCP Server Directly
To test the MCP server independently of Claude Desktop or Cursor:
1. Run the following script:
```
Plugins\UnrealMCP\MCP\run_unreal_mcp.bat
```
This starts the MCP server using the configured Python interpreter, allowing it to listen for connections.
```
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Public/MCPTCPServer.h:
--------------------------------------------------------------------------------
```
#pragma once
#include "CoreMinimal.h"
#include "Containers/Ticker.h"
#include "Json.h"
#include "Networking.h"
#include "Common/TcpListener.h"
#include "Sockets.h"
#include "SocketSubsystem.h"
#include "MCPConstants.h"
/**
* Configuration struct for the TCP server
* Allows for easy customization of server parameters
*/
struct FMCPTCPServerConfig
{
/** Port to listen on */
int32 Port = MCPConstants::DEFAULT_PORT;
/** Client timeout in seconds */
float ClientTimeoutSeconds = MCPConstants::DEFAULT_CLIENT_TIMEOUT_SECONDS;
/** Size of the receive buffer in bytes */
int32 ReceiveBufferSize = MCPConstants::DEFAULT_RECEIVE_BUFFER_SIZE;
/** Tick interval in seconds */
float TickIntervalSeconds = MCPConstants::DEFAULT_TICK_INTERVAL_SECONDS;
/** Whether to log verbose messages */
bool bEnableVerboseLogging = MCPConstants::DEFAULT_VERBOSE_LOGGING;
};
/**
* Structure to track client connection information
*/
struct FMCPClientConnection
{
/** Socket for this client */
FSocket* Socket;
/** Endpoint information */
FIPv4Endpoint Endpoint;
/** Time since last activity for timeout tracking */
float TimeSinceLastActivity;
/** Buffer for receiving data */
TArray<uint8> ReceiveBuffer;
/**
* Constructor
* @param InSocket - The client socket
* @param InEndpoint - The client endpoint
* @param BufferSize - Size of the receive buffer
*/
FMCPClientConnection(FSocket* InSocket, const FIPv4Endpoint& InEndpoint, int32 BufferSize = MCPConstants::DEFAULT_RECEIVE_BUFFER_SIZE)
: Socket(InSocket)
, Endpoint(InEndpoint)
, TimeSinceLastActivity(0.0f)
{
ReceiveBuffer.SetNumUninitialized(BufferSize);
}
};
/**
* Interface for command handlers
* Allows for easy addition of new commands without modifying the server
*/
class IMCPCommandHandler
{
public:
virtual ~IMCPCommandHandler() {}
/**
* Get the command name this handler responds to
* @return The command name
*/
virtual FString GetCommandName() const = 0;
/**
* Handle the command
* @param Params - The command parameters
* @param ClientSocket - The client socket
* @return JSON response object
*/
virtual TSharedPtr<FJsonObject> Execute(const TSharedPtr<FJsonObject>& Params, FSocket* ClientSocket) = 0;
};
/**
* MCP TCP Server
* Manages connections and command routing
*/
class UNREALMCP_API FMCPTCPServer
{
public:
/**
* Constructor
* @param InConfig - Configuration for the server
*/
FMCPTCPServer(const FMCPTCPServerConfig& InConfig);
/**
* Destructor
*/
virtual ~FMCPTCPServer();
/**
* Start the server
* @return True if started successfully
*/
bool Start();
/**
* Stop the server
*/
void Stop();
/**
* Check if the server is running
* @return True if running
*/
bool IsRunning() const { return bRunning; }
/**
* Register a command handler
* @param Handler - The handler to register
*/
void RegisterCommandHandler(TSharedPtr<IMCPCommandHandler> Handler);
/**
* Unregister a command handler
* @param CommandName - The command name to unregister
*/
void UnregisterCommandHandler(const FString& CommandName);
/**
* Register an external command handler
* This is a public API that allows external code to extend the MCP plugin with custom functionality
* @param Handler - The handler to register
* @return True if registration was successful
*/
bool RegisterExternalCommandHandler(TSharedPtr<IMCPCommandHandler> Handler);
/**
* Unregister an external command handler
* @param CommandName - The command name to unregister
* @return True if unregistration was successful
*/
bool UnregisterExternalCommandHandler(const FString& CommandName);
/**
* Send a response to a client
* @param Client - The client socket
* @param Response - The response to send
*/
void SendResponse(FSocket* Client, const TSharedPtr<FJsonObject>& Response);
/**
* Get the command handlers map (for testing purposes)
* @return The map of command handlers
*/
const TMap<FString, TSharedPtr<IMCPCommandHandler>>& GetCommandHandlers() const { return CommandHandlers; }
protected:
/**
* Tick function called by the ticker
* @param DeltaTime - Time since last tick
* @return True to continue ticking
*/
bool Tick(float DeltaTime);
/**
* Process pending connections
*/
virtual void ProcessPendingConnections();
/**
* Process client data
*/
virtual void ProcessClientData();
/**
* Process a command
* @param CommandJson - The command JSON
* @param ClientSocket - The client socket
*/
virtual void ProcessCommand(const FString& CommandJson, FSocket* ClientSocket);
/**
* Check for client timeouts
* @param DeltaTime - Time since last tick
*/
virtual void CheckClientTimeouts(float DeltaTime);
/**
* Clean up a client connection
* @param ClientConnection - The client connection to clean up
*/
virtual void CleanupClientConnection(FMCPClientConnection& ClientConnection);
/**
* Clean up a client connection by socket
* @param ClientSocket - The client socket to clean up
*/
virtual void CleanupClientConnection(FSocket* ClientSocket);
/**
* Clean up all client connections
*/
virtual void CleanupAllClientConnections();
/**
* Get a safe description of a socket
* @param Socket - The socket
* @return A safe description string
*/
FString GetSafeSocketDescription(FSocket* Socket);
/**
* Connection handler
* @param InSocket - The new client socket
* @param Endpoint - The client endpoint
* @return True if connection accepted
*/
virtual bool HandleConnectionAccepted(FSocket* InSocket, const FIPv4Endpoint& Endpoint);
/** Server configuration */
FMCPTCPServerConfig Config;
/** TCP listener */
FTcpListener* Listener;
/** Client connections */
TArray<FMCPClientConnection> ClientConnections;
/** Running flag */
bool bRunning;
/** Ticker handle */
FTSTicker::FDelegateHandle TickerHandle;
/** Command handlers map */
TMap<FString, TSharedPtr<IMCPCommandHandler>> CommandHandlers;
private:
// Disable copy and assignment
FMCPTCPServer(const FMCPTCPServer&) = delete;
FMCPTCPServer& operator=(const FMCPTCPServer&) = delete;
};
```
--------------------------------------------------------------------------------
/MCP/setup_unreal_mcp.bat:
--------------------------------------------------------------------------------
```
@echo off
setlocal EnableDelayedExpansion
echo ========================================================
echo Unreal MCP - Python Environment Setup
echo ========================================================
echo.
REM Get the directory where this script is located
set "SCRIPT_DIR=%~dp0"
set "SCRIPT_DIR=%SCRIPT_DIR:~0,-1%"
REM Set paths for local environment
set "ENV_DIR=%SCRIPT_DIR%\python_env"
set "MODULES_DIR=%SCRIPT_DIR%\python_modules"
REM Parse command line arguments
set "CONFIGURE_CLAUDE=0"
set "CONFIGURE_CURSOR=0"
:parse_args
if "%1"=="" goto :done_parsing
if /i "%1"=="--help" (
echo Usage: setup_unreal_mcp.bat [OPTIONS]
echo.
echo Options:
echo --help Show this help message
echo --configure-claude Configure Claude Desktop (default)
echo --configure-cursor Configure Cursor
echo --configure-both Configure both Claude and Cursor
echo --skip-config Skip configuration
echo.
goto :end
)
if /i "%1"=="--configure-claude" set "CONFIGURE_CLAUDE=1" & set "CONFIGURE_CURSOR=0" & shift & goto :parse_args
if /i "%1"=="--configure-cursor" set "CONFIGURE_CLAUDE=0" & set "CONFIGURE_CURSOR=1" & shift & goto :parse_args
if /i "%1"=="--configure-both" set "CONFIGURE_CLAUDE=1" & set "CONFIGURE_CURSOR=1" & shift & goto :parse_args
if /i "%1"=="--skip-config" set "CONFIGURE_CLAUDE=0" & set "CONFIGURE_CURSOR=0" & shift & goto :parse_args
shift
goto :parse_args
:done_parsing
REM If no config option was specified, show the assistant choice menu first
if "%CONFIGURE_CLAUDE%"=="0" if "%CONFIGURE_CURSOR%"=="0" (
echo Which AI assistant would you like to configure?
echo.
echo 1. Claude Desktop
echo 2. Cursor
echo 3. Both Claude Desktop and Cursor
echo 4. Skip AI assistant configuration
echo.
set /p AI_CHOICE="Enter choice (1-4): "
echo.
if "!AI_CHOICE!"=="1" set "CONFIGURE_CLAUDE=1" & set "CONFIGURE_CURSOR=0"
if "!AI_CHOICE!"=="2" set "CONFIGURE_CLAUDE=0" & set "CONFIGURE_CURSOR=1"
if "!AI_CHOICE!"=="3" set "CONFIGURE_CLAUDE=1" & set "CONFIGURE_CURSOR=1"
if "!AI_CHOICE!"=="4" set "CONFIGURE_CLAUDE=0" & set "CONFIGURE_CURSOR=0"
)
echo Setting up Python environment in: %ENV_DIR%
echo.
REM Check if Python is installed
where python >nul 2>&1
if %ERRORLEVEL% neq 0 (
echo ERROR: Python is not installed or not in your PATH.
echo Please install Python and try again.
goto :end
)
REM Get Python version and path
for /f "tokens=*" %%i in ('python --version 2^>^&1') do set PYTHON_VERSION=%%i
for /f "tokens=*" %%i in ('where python') do set SYSTEM_PYTHON=%%i
echo Detected %PYTHON_VERSION% at %SYSTEM_PYTHON%
echo.
REM Create directories if they don't exist
if not exist "%ENV_DIR%" (
echo Creating Python environment directory...
mkdir "%ENV_DIR%"
)
if not exist "%MODULES_DIR%" (
echo Creating Python modules directory...
mkdir "%MODULES_DIR%"
)
REM Check if virtualenv is installed
python -c "import virtualenv" >nul 2>&1
if %ERRORLEVEL% neq 0 (
echo Installing virtualenv...
python -m pip install virtualenv
)
REM Create virtual environment if it doesn't exist
if not exist "%ENV_DIR%\Scripts\python.exe" (
echo Creating virtual environment...
python -m virtualenv "%ENV_DIR%"
) else (
echo Virtual environment already exists.
)
REM Activate the virtual environment and install packages
echo.
echo Activating virtual environment and installing packages...
call "%ENV_DIR%\Scripts\activate.bat"
REM Check if activation was successful
if %ERRORLEVEL% neq 0 (
echo ERROR: Failed to activate virtual environment.
goto :end
)
REM Install MCP package in the virtual environment
echo Installing MCP package...
python -m pip install mcp>=0.1.0
REM Also install to modules directory as a backup
echo Installing MCP package to modules directory as backup...
python -m pip install mcp>=0.1.0 -t "%MODULES_DIR%"
REM Verify installation
echo.
echo Verifying MCP installation...
python -c "import mcp; print(f'MCP package installed successfully. Version: {getattr(mcp, \"__version__\", \"unknown\")}')"
REM Create the run script
echo.
echo Creating run script...
(
echo @echo off
echo setlocal
echo.
echo REM Get the directory where this script is located
echo set "SCRIPT_DIR=%%~dp0"
echo set "SCRIPT_DIR=%%SCRIPT_DIR:~0,-1%%"
echo.
echo REM Set paths for local environment
echo set "ENV_DIR=%%SCRIPT_DIR%%\python_env"
echo set "PYTHON_PATH=%%ENV_DIR%%\Scripts\python.exe"
echo.
echo REM Check if Python environment exists
echo if not exist "%%PYTHON_PATH%%" (
echo echo ERROR: Python environment not found. Please run setup_unreal_mcp.bat first. ^>^&2
echo goto :end
echo )
echo.
echo REM Activate the virtual environment silently
echo call "%%ENV_DIR%%\Scripts\activate.bat" ^>nul 2^>^&1
echo.
echo REM Log start message to stderr
echo echo Starting Unreal MCP bridge... ^>^&2
echo.
echo REM Run the Python bridge script
echo python "%%SCRIPT_DIR%%\unreal_mcp_bridge.py" %%*
echo.
echo :end
) > "%SCRIPT_DIR%\run_unreal_mcp.bat"
REM Configure Claude Desktop if requested
if "%CONFIGURE_CLAUDE%"=="1" (
set "CLAUDE_CONFIG_DIR=%APPDATA%\Claude"
set "CLAUDE_CONFIG_FILE=%CLAUDE_CONFIG_DIR%\claude_desktop_config.json"
REM Check if Claude Desktop is installed
if not exist "%CLAUDE_CONFIG_DIR%" (
echo Creating Claude configuration directory...
mkdir "%CLAUDE_CONFIG_DIR%"
)
REM Update Claude Desktop configuration using Python
echo.
echo Updating Claude Desktop configuration...
python "%SCRIPT_DIR%\temp_update_config.py" "%CLAUDE_CONFIG_FILE%" "%SCRIPT_DIR%\run_unreal_mcp.bat"
if %ERRORLEVEL% neq 0 (
echo WARNING: Failed to update Claude Desktop configuration. Claude Desktop may not be installed.
) else (
echo Claude Desktop configuration updated at: %CLAUDE_CONFIG_FILE%
)
)
REM Configure Cursor if requested
if "%CONFIGURE_CURSOR%"=="1" (
echo.
echo Updating Cursor configuration...
python "%SCRIPT_DIR%\cursor_setup.py" --script "%SCRIPT_DIR%\run_unreal_mcp.bat"
if %ERRORLEVEL% neq 0 (
echo WARNING: Failed to update Cursor configuration. Cursor may not be installed.
) else (
echo Cursor configuration updated successfully!
)
)
echo.
echo ========================================================
echo Setup complete!
echo.
if "%CONFIGURE_CLAUDE%"=="1" if "%CONFIGURE_CURSOR%"=="0" (
echo To use with Claude Desktop:
echo 1. Run run_unreal_mcp.bat to start the MCP bridge
echo 2. Open Claude Desktop and it should automatically use the correct configuration
) else if "%CONFIGURE_CLAUDE%"=="0" if "%CONFIGURE_CURSOR%"=="1" (
echo To use with Cursor:
echo 1. Run run_unreal_mcp.bat to start the MCP bridge
echo 2. Open Cursor and it should automatically use the UnrealMCP tools
) else if "%CONFIGURE_CLAUDE%"=="1" if "%CONFIGURE_CURSOR%"=="1" (
echo To use with Claude Desktop:
echo 1. Run run_unreal_mcp.bat to start the MCP bridge
echo 2. Open Claude Desktop and it should automatically use the correct configuration
echo.
echo To use with Cursor:
echo 1. Run run_unreal_mcp.bat to start the MCP bridge
echo 2. Open Cursor and it should automatically use the UnrealMCP tools
) else (
echo No AI assistant configurations were applied.
echo To configure an assistant, run this script again with one of these options:
echo --configure-claude Configure Claude Desktop
echo --configure-cursor Configure Cursor
echo --configure-both Configure both Claude and Cursor
)
echo ========================================================
echo.
echo Press any key to exit...
pause >nul
:end
```
--------------------------------------------------------------------------------
/MCP/unreal_mcp_bridge.py:
--------------------------------------------------------------------------------
```python
"""
Bridge module connecting Unreal Engine to MCP (Model Context Protocol).
This module serves as a bridge between the Unreal Engine MCP plugin and
the MCP server provided by the 'mcp' Python package. It handles the communication
between Claude for Desktop and Unreal Engine through the MCP protocol.
Requirements:
- Python 3.7+
- MCP package (pip install mcp>=0.1.0)
- Running Unreal Engine with the UnrealMCP plugin enabled
The bridge connects to the Unreal Engine plugin (which acts as the actual MCP server)
and exposes MCP functionality to Claude for Desktop. This allows Claude to interact
with Unreal Engine through natural language commands.
"""
import json
import socket
import sys
import os
import importlib.util
import importlib
# Try to get the port from MCPConstants
DEFAULT_PORT = 13377
DEFAULT_BUFFER_SIZE = 65536
DEFAULT_TIMEOUT = 10 # 10 second timeout
try:
# Try to read the port from the C++ constants
plugin_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
constants_path = os.path.join(plugin_dir, "Source", "UnrealMCP", "Public", "MCPConstants.h")
if os.path.exists(constants_path):
with open(constants_path, 'r') as f:
constants_content = f.read()
# Extract port from MCPConstants
port_match = constants_content.find("DEFAULT_PORT = ")
if port_match != -1:
port_line = constants_content[port_match:].split(';')[0]
DEFAULT_PORT = int(port_line.split('=')[1].strip())
# Extract buffer size from MCPConstants
buffer_match = constants_content.find("DEFAULT_RECEIVE_BUFFER_SIZE = ")
if buffer_match != -1:
buffer_line = constants_content[buffer_match:].split(';')[0]
DEFAULT_BUFFER_SIZE = int(buffer_line.split('=')[1].strip())
except Exception as e:
# If anything goes wrong, use the defaults (which are already defined)
print(f"Warning: Could not read constants from MCPConstants.h: {e}", file=sys.stderr)
# No need to redefine DEFAULT_PORT and DEFAULT_BUFFER_SIZE here
print(f"Using port: {DEFAULT_PORT}", file=sys.stderr)
print(f"Using buffer size: {DEFAULT_BUFFER_SIZE}", file=sys.stderr)
# Check for local python_modules directory first
local_modules_path = os.path.join(os.path.dirname(__file__), "python_modules")
if os.path.exists(local_modules_path):
print(f"Found local python_modules directory: {local_modules_path}", file=sys.stderr)
sys.path.insert(0, local_modules_path)
print(f"Added local python_modules to sys.path", file=sys.stderr)
# Try to import MCP
mcp_spec = importlib.util.find_spec("mcp")
if mcp_spec is None:
print("Error: The 'mcp' package is not installed.", file=sys.stderr)
print("Please install it using one of the following methods:", file=sys.stderr)
print("1. Run setup_unreal_mcp.bat to install it globally", file=sys.stderr)
print("2. Run: pip install mcp", file=sys.stderr)
print("3. Run: pip install mcp -t ./python_modules", file=sys.stderr)
sys.exit(1)
try:
from mcp.server.fastmcp import FastMCP, Context
except ImportError as e:
print(f"Error importing from mcp package: {e}", file=sys.stderr)
print("The mcp package is installed but there was an error importing from it.", file=sys.stderr)
print("This could be due to a version mismatch or incomplete installation.", file=sys.stderr)
print("Please try reinstalling the package using: pip install --upgrade mcp", file=sys.stderr)
sys.exit(1)
# Initialize the MCP server
mcp = FastMCP(
"UnrealMCP",
description="Unreal Engine integration through the Model Context Protocol"
)
def send_command(command_type, params=None, timeout=DEFAULT_TIMEOUT):
"""Send a command to the C++ MCP server and return the response.
Args:
command_type: The type of command to send
params: Optional parameters for the command
timeout: Timeout in seconds (default: DEFAULT_TIMEOUT)
Returns:
The JSON response from the server
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout) # Set a timeout
s.connect(("localhost", DEFAULT_PORT)) # Connect to Unreal C++ server
command = {
"type": command_type,
"params": params or {}
}
s.sendall(json.dumps(command).encode('utf-8'))
# Read response with a buffer
chunks = []
response_data = b''
# Wait for data with timeout
while True:
try:
chunk = s.recv(DEFAULT_BUFFER_SIZE)
if not chunk: # Connection closed
break
chunks.append(chunk)
# Try to parse what we have so far
response_data = b''.join(chunks)
try:
# If we can parse it as JSON, we have a complete response
json.loads(response_data.decode('utf-8'))
break
except json.JSONDecodeError:
# Incomplete JSON, continue receiving
continue
except socket.timeout:
# If we have some data but timed out, try to use what we have
if response_data:
break
raise
if not response_data:
raise Exception("No data received from server")
return json.loads(response_data.decode('utf-8'))
except ConnectionRefusedError:
print(f"Error: Could not connect to Unreal MCP server on localhost:{DEFAULT_PORT}.", file=sys.stderr)
print("Make sure your Unreal Engine with MCP plugin is running.", file=sys.stderr)
raise Exception("Failed to connect to Unreal MCP server: Connection refused")
except socket.timeout:
print("Error: Connection timed out while communicating with Unreal MCP server.", file=sys.stderr)
raise Exception("Failed to communicate with Unreal MCP server: Connection timed out")
except Exception as e:
print(f"Error communicating with Unreal MCP server: {str(e)}", file=sys.stderr)
raise Exception(f"Failed to communicate with Unreal MCP server: {str(e)}")
# All commands have been moved to separate modules in the Commands directory
def load_commands():
"""Load all commands from the Commands directory structure."""
commands_dir = os.path.join(os.path.dirname(__file__), 'Commands')
if not os.path.exists(commands_dir):
print(f"Commands directory not found at: {commands_dir}", file=sys.stderr)
return
# First, load Python files directly in the Commands directory
for filename in os.listdir(commands_dir):
if filename.endswith('.py') and not filename.startswith('__'):
try:
module_name = f"Commands.{filename[:-3]}" # Remove .py extension
module = importlib.import_module(module_name)
if hasattr(module, 'register_all'):
module.register_all(mcp)
print(f"Registered commands from module: {filename}", file=sys.stderr)
else:
print(f"Warning: {filename} has no register_all function", file=sys.stderr)
except Exception as e:
print(f"Error loading module {filename}: {e}", file=sys.stderr)
# Then, load command categories from subdirectories
for category in os.listdir(commands_dir):
category_path = os.path.join(commands_dir, category)
if os.path.isdir(category_path) and not category.startswith('__'):
try:
# Try to load the category's __init__.py which should have register_all
module_name = f"Commands.{category}"
module = importlib.import_module(module_name)
if hasattr(module, 'register_all'):
module.register_all(mcp)
print(f"Registered commands from category: {category}", file=sys.stderr)
else:
print(f"Warning: {category} has no register_all function", file=sys.stderr)
except Exception as e:
print(f"Error loading category {category}: {e}", file=sys.stderr)
def load_user_tools():
"""Load user-defined tools from the UserTools directory."""
user_tools_dir = os.path.join(os.path.dirname(__file__), 'UserTools')
if not os.path.exists(user_tools_dir):
print(f"User tools directory not found at: {user_tools_dir}", file=sys.stderr)
return
for filename in os.listdir(user_tools_dir):
if filename.endswith('.py') and filename != '__init__.py':
module_name = filename[:-3]
try:
spec = importlib.util.spec_from_file_location(module_name, os.path.join(user_tools_dir, filename))
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, 'register_tools'):
from utils import send_command
module.register_tools(mcp, {'send_command': send_command})
print(f"Loaded user tool: {module_name}", file=sys.stderr)
else:
print(f"Warning: {filename} has no register_tools function", file=sys.stderr)
except Exception as e:
print(f"Error loading user tool {filename}: {str(e)}", file=sys.stderr)
def main():
"""Main entry point for the Unreal MCP bridge."""
print("Starting Unreal MCP bridge...", file=sys.stderr)
try:
load_commands() # Load built-in commands
load_user_tools() # Load user-defined tools
mcp.run() # Start the MCP bridge
except Exception as e:
print(f"Error starting MCP bridge: {str(e)}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/MCP/TestScripts/1_basic_connection.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Basic Connection Test for MCP Server
This script tests the basic connection to the MCP Server.
It connects to the server, sends a simple ping command, and verifies the response.
"""
import socket
import json
import sys
import os
import platform
import subprocess
import time
import traceback
from datetime import datetime
def check_port_in_use(host, port):
"""Check if the specified port is already in use."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind((host, port))
return False # Port is available
except socket.error:
return True # Port is in use
def check_service_running(port):
"""Check if any service is running on the specified port using system commands."""
try:
if platform.system() == "Windows":
output = subprocess.check_output(f"netstat -ano | findstr :{port}", shell=True).decode()
if output:
return True, output.strip()
else:
output = subprocess.check_output(f"lsof -i :{port}", shell=True).decode()
if output:
return True, output.strip()
except subprocess.CalledProcessError:
pass # Command returned error or no output
except Exception as e:
print(f"Error checking service: {e}")
return False, "No service detected"
def log_system_info():
"""Log system information to help with debugging."""
print("\n=== SYSTEM INFORMATION ===")
print(f"Operating System: {platform.system()} {platform.version()}")
print(f"Python Version: {platform.python_version()}")
print(f"Machine: {platform.machine()}")
print(f"Node: {platform.node()}")
print(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
try:
# Check firewall status on Windows
if platform.system() == "Windows":
firewall_output = subprocess.check_output("netsh advfirewall show currentprofile", shell=True).decode()
if "State ON" in firewall_output:
print("Windows Firewall: ENABLED")
else:
print("Windows Firewall: DISABLED")
except Exception as e:
print(f"Error checking firewall: {e}")
def ping_host(host, timeout=2):
"""Ping the host to check basic connectivity."""
try:
if platform.system() == "Windows":
ping_cmd = f"ping -n 1 -w {int(timeout*1000)} {host}"
else:
ping_cmd = f"ping -c 1 -W {int(timeout)} {host}"
result = subprocess.run(ping_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
return True, "Host is reachable"
else:
return False, f"Host unreachable: {result.stdout.decode().strip()}"
except Exception as e:
return False, f"Error pinging host: {str(e)}"
def trace_socket_errors(error):
"""Get detailed information about a socket error."""
error_info = {
10035: "WSAEWOULDBLOCK: Resource temporarily unavailable, operation would block",
10036: "WSAEINPROGRESS: Operation now in progress",
10037: "WSAEALREADY: Operation already in progress",
10038: "WSAENOTSOCK: Socket operation on non-socket",
10039: "WSAEDESTADDRREQ: Destination address required",
10040: "WSAEMSGSIZE: Message too long",
10041: "WSAEPROTOTYPE: Protocol wrong type for socket",
10042: "WSAENOPROTOOPT: Bad protocol option",
10043: "WSAEPROTONOSUPPORT: Protocol not supported",
10044: "WSAESOCKTNOSUPPORT: Socket type not supported",
10045: "WSAEOPNOTSUPP: Operation not supported",
10046: "WSAEPFNOSUPPORT: Protocol family not supported",
10047: "WSAEAFNOSUPPORT: Address family not supported by protocol",
10048: "WSAEADDRINUSE: Address already in use",
10049: "WSAEADDRNOTAVAIL: Cannot assign requested address",
10050: "WSAENETDOWN: Network is down",
10051: "WSAENETUNREACH: Network is unreachable",
10052: "WSAENETRESET: Network dropped connection on reset",
10053: "WSAECONNABORTED: Software caused connection abort",
10054: "WSAECONNRESET: Connection reset by peer",
10055: "WSAENOBUFS: No buffer space available",
10056: "WSAEISCONN: Socket is already connected",
10057: "WSAENOTCONN: Socket is not connected",
10058: "WSAESHUTDOWN: Cannot send after socket shutdown",
10059: "WSAETOOMANYREFS: Too many references",
10060: "WSAETIMEDOUT: Connection timed out",
10061: "WSAECONNREFUSED: Connection refused",
10062: "WSAELOOP: Cannot translate name",
10063: "WSAENAMETOOLONG: Name too long",
10064: "WSAEHOSTDOWN: Host is down",
10065: "WSAEHOSTUNREACH: No route to host",
}
if hasattr(error, 'errno'):
errno = error.errno
description = error_info.get(errno, f"Unknown error code: {errno}")
return f"Socket error {errno}: {description}"
return f"Unknown socket error: {str(error)}"
def main():
"""Connect to the MCP Server and verify the connection works."""
host = "localhost"
port = 13377
timeout = 5 # 5 second timeout
print("\n=== NETWORK DIAGNOSTICS ===")
# Check if we can ping the host
ping_success, ping_msg = ping_host(host)
print(f"Ping test: {ping_msg}")
# Check if port is already in use locally
port_in_use = check_port_in_use("127.0.0.1", port)
if port_in_use:
print(f"Warning: Port {port} is already in use on this machine")
else:
print(f"Port {port} is not in use on this machine")
# Check if any service is running on the target port
service_running, service_details = check_service_running(port)
if service_running:
print(f"A service is running on port {port}:")
print(service_details)
else:
print(f"No service detected on port {port}")
try:
print("\n=== CONNECTION TEST ===")
print(f"Creating socket to connect to {host}:{port}...")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
# Try to connect
print(f"Attempting connection to {host}:{port}...")
connect_start = time.time()
s.connect((host, port))
connect_time = time.time() - connect_start
print(f"✓ Connected successfully in {connect_time:.2f} seconds")
# Create a simple get_scene_info command
command = {
"type": "get_scene_info"
}
# Send command
print("Sending get_scene_info command...")
command_str = json.dumps(command) + "\n" # Add newline
s.sendall(command_str.encode('utf-8'))
# Receive response with timeout tracking
print("Waiting for response...")
response_start = time.time()
response = b""
while True:
try:
data = s.recv(4096)
if not data:
print("Connection closed by server")
break
response += data
print(f"Received {len(data)} bytes")
if b"\n" in data: # Check for newline which indicates end of response
print("Received newline character, end of response")
break
# Check if we've been waiting too long
if time.time() - response_start > timeout:
print(f"Response timeout after {timeout} seconds")
break
except socket.timeout:
print(f"Socket timeout after {timeout} seconds")
break
response_time = time.time() - response_start
print(f"Response received in {response_time:.2f} seconds")
# Close connection
try:
s.shutdown(socket.SHUT_RDWR)
except Exception as e:
print(f"Warning during socket shutdown: {str(e)}")
s.close()
print("✓ Connection closed properly")
# Process response
if response:
response_str = response.decode('utf-8').strip()
print(f"Raw response ({len(response_str)} bytes): {response_str[:100]}...")
try:
response_json = json.loads(response_str)
print("\n=== RESPONSE ===")
print(f"Status: {response_json.get('status', 'unknown')}")
if response_json.get('status') == 'success':
print("✓ Server responded successfully")
print(f"Level: {response_json.get('result', {}).get('level', 'unknown')}")
print(f"Actor count: {response_json.get('result', {}).get('actor_count', 0)}")
return True
else:
print("✗ Server responded with an error")
print(f"Error: {response_json.get('message', 'Unknown error')}")
return False
except json.JSONDecodeError as e:
print(f"✗ Error parsing JSON response: {e}")
print(f"Raw response: {response_str}")
return False
else:
print("✗ No response received from server")
return False
except ConnectionRefusedError as e:
print(f"✗ Connection refused: {trace_socket_errors(e)}")
print("This typically means:")
print(" 1. The MCP Server is not running")
print(" 2. The server is running on a different port")
print(" 3. A firewall is blocking the connection")
return False
except socket.timeout as e:
print(f"✗ Connection timed out: {trace_socket_errors(e)}")
print("This typically means:")
print(" 1. The MCP Server is running but not responding")
print(" 2. A firewall or security software is intercepting but not blocking the connection")
print(" 3. The network configuration is preventing the connection")
return False
except Exception as e:
print(f"✗ Error: {trace_socket_errors(e)}")
print("Detailed error information:")
traceback.print_exc()
return False
if __name__ == "__main__":
print("=== MCP Server Basic Connection Test ===")
print(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Log system information
log_system_info()
# Run the main test
success = main()
print("\n=== TEST RESULT ===")
if success:
print("✓ Connection test PASSED")
sys.exit(0)
else:
print("✗ Connection test FAILED")
sys.exit(1)
```
--------------------------------------------------------------------------------
/MCP/TestScripts/test_commands_blueprint.py:
--------------------------------------------------------------------------------
```python
"""Test script for UnrealMCP blueprint commands.
This script tests the blueprint-related commands available in the UnrealMCP bridge.
Make sure Unreal Engine is running with the UnrealMCP plugin enabled before running this script.
"""
import sys
import os
import json
import time
from mcp.server.fastmcp import FastMCP, Context
# Add the MCP directory to sys.path so we can import unreal_mcp_bridge
mcp_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if mcp_dir not in sys.path:
sys.path.insert(0, mcp_dir)
from unreal_mcp_bridge import send_command
# Global variables to store paths
blueprint_path = ""
# Longer timeout for Unreal Engine operations
TIMEOUT = 30
def test_create_blueprint():
"""Test blueprint creation command."""
global blueprint_path
print("\n1. Testing create_blueprint...")
try:
# Define the package path and blueprint name
# Use a subdirectory to ensure proper directory structure
package_path = "/Game/Blueprints/TestDir"
blueprint_name = "TestBlueprint"
# Print the expected file path (this is just an approximation)
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
# Go up one more level to get to the actual project directory
project_dir = os.path.dirname(project_dir)
project_content_dir = os.path.join(project_dir, "Content")
# Check both possible locations based on how the path is interpreted
expected_path_in_dir = os.path.join(project_content_dir, "Blueprints", "TestDir", f"{blueprint_name}.uasset")
expected_path_as_asset = os.path.join(project_content_dir, "Blueprints", "TestDir.uasset")
print(f"Project directory: {project_dir}")
print(f"Project Content directory: {project_content_dir}")
print(f"Expected file path (in directory): {expected_path_in_dir}")
print(f"Expected file path (as asset): {expected_path_as_asset}")
# Print debug info about the current working directory
print(f"Current working directory: {os.getcwd()}")
# The package_path should be the directory, and name should be the asset name
params = {
"package_path": package_path, # Directory path
"name": blueprint_name, # Asset name
"properties": {
"parent_class": "Actor" # Default parent class
}
}
print(f"Sending create_blueprint command with package_path={params['package_path']} and name={params['name']}")
response = send_command("create_blueprint", params, timeout=TIMEOUT)
print(f"Create Blueprint Response: {json.dumps(response, indent=2)}")
# Store the actual path from the response for later tests
if response["status"] == "success":
blueprint_path = response["result"]["path"]
print(f"Blueprint created at: {blueprint_path}")
# Check if the blueprint file exists in either expected location
if os.path.exists(expected_path_in_dir):
print(f"✓ Blueprint file found at: {expected_path_in_dir}")
elif os.path.exists(expected_path_as_asset):
print(f"✓ Blueprint file found at: {expected_path_as_asset}")
else:
print(f"✗ Blueprint file NOT found at expected locations")
# Try to find the file in other possible locations
possible_locations = [
os.path.join(project_dir, "Saved", "Blueprints"),
os.path.join(project_dir, "Saved", "Autosaves", "Game", "Blueprints"),
os.path.join(project_dir, "Plugins", "UnrealMCP", "Content", "Blueprints")
]
for location in possible_locations:
potential_path = os.path.join(location, f"{blueprint_name}.uasset")
if os.path.exists(potential_path):
print(f"✓ Blueprint file found at alternative location: {potential_path}")
break
else:
print("✗ Blueprint file not found in any expected location")
# Try to find the file using a more extensive search
print("Searching for the blueprint file in the project directory...")
for root, dirs, files in os.walk(project_dir):
for file in files:
if "Blueprint" in file and file.endswith(".uasset"):
found_path = os.path.join(root, file)
print(f"✓ Blueprint file found at: {found_path}")
break
else:
continue
break
return response["status"] == "success"
except Exception as e:
print(f"Error testing create_blueprint: {e}")
return False
def test_get_blueprint_info():
"""Test getting blueprint information."""
global blueprint_path
print("\n2. Testing get_blueprint_info...")
try:
# Use the path from the create_blueprint response
params = {
"blueprint_path": blueprint_path
}
response = send_command("get_blueprint_info", params, timeout=TIMEOUT)
print(f"Get Blueprint Info Response: {json.dumps(response, indent=2)}")
return response["status"] == "success"
except Exception as e:
print(f"Error testing get_blueprint_info: {e}")
return False
def test_create_blueprint_event():
"""Test creating a blueprint event."""
global blueprint_path
print("\n3. Testing create_blueprint_event...")
try:
# Use the path from the create_blueprint response
params = {
"event_name": "TestEvent",
"blueprint_path": blueprint_path
}
# Set a longer timeout for this operation
print("This operation may take some time...")
response = send_command("create_blueprint_event", params, timeout=TIMEOUT)
print(f"Create Blueprint Event Response: {json.dumps(response, indent=2)}")
return response["status"] == "success"
except Exception as e:
print(f"Error testing create_blueprint_event: {e}")
return False
def test_modify_blueprint():
"""Test modifying a blueprint."""
global blueprint_path
print("\n4. Testing modify_blueprint...")
try:
# Use the path from the create_blueprint response
params = {
"blueprint_path": blueprint_path,
"properties": {
"description": "A test blueprint created by MCP",
"category": "Tests",
"options": {
"hide_categories": ["Variables", "Transformation"],
"namespace": "MCP",
"display_name": "MCP Test Blueprint",
"compile_mode": "Development",
"abstract_class": False,
"const_class": False,
"deprecate": False
}
}
}
response = send_command("modify_blueprint", params, timeout=TIMEOUT)
print(f"Modify Blueprint Response: {json.dumps(response, indent=2)}")
# Verify the changes by getting the blueprint info again
if response["status"] == "success":
print("\nVerifying blueprint modifications...")
verify_params = {
"blueprint_path": blueprint_path
}
verify_response = send_command("get_blueprint_info", verify_params, timeout=TIMEOUT)
print(f"Updated Blueprint Info: {json.dumps(verify_response, indent=2)}")
# Check if the events were updated
if verify_response["status"] == "success":
result = verify_response["result"]
# Check for events
if "events" in result and len(result["events"]) > 0:
print(f"✓ Blueprint has {len(result['events'])} events")
# Look for our TestEvent
test_event_found = False
for event in result["events"]:
if "name" in event and "TestEvent" in event["name"]:
test_event_found = True
print(f"✓ Found TestEvent: {event['name']}")
break
if not test_event_found:
print("✗ TestEvent not found in events")
else:
print("✗ No events found in blueprint")
return response["status"] == "success"
except Exception as e:
print(f"Error testing modify_blueprint: {e}")
return False
def main():
"""Run all blueprint-related tests."""
print("Starting UnrealMCP blueprint command tests...")
print("Make sure Unreal Engine is running with the UnrealMCP plugin enabled!")
try:
# Run tests in sequence, with each test depending on the previous one
create_result = test_create_blueprint()
# Only run subsequent tests if the blueprint was created successfully
if create_result:
# Wait a moment for the blueprint to be fully created
time.sleep(1)
get_info_result = test_get_blueprint_info()
# Only run event creation if get_info succeeded
if get_info_result:
create_event_result = test_create_blueprint_event()
else:
create_event_result = False
# Only run modify if previous tests succeeded
if create_event_result:
modify_result = test_modify_blueprint()
else:
modify_result = False
else:
get_info_result = False
create_event_result = False
modify_result = False
results = {
"create_blueprint": create_result,
"get_blueprint_info": get_info_result,
"create_blueprint_event": create_event_result,
"modify_blueprint": modify_result
}
print("\nTest Results:")
print("-" * 40)
for test_name, success in results.items():
status = "✓ PASS" if success else "✗ FAIL"
print(f"{status} - {test_name}")
print("-" * 40)
if all(results.values()):
print("\nAll blueprint tests passed successfully!")
else:
print("\nSome tests failed. Check the output above for details.")
sys.exit(1)
except Exception as e:
print(f"\nError during testing: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/MCP/check_mcp_setup.py:
--------------------------------------------------------------------------------
```python
"""
Check MCP Setup Script
This script verifies the MCP setup for Unreal Engine integration with Cursor and Claude Desktop.
It checks for necessary components and configurations and provides diagnostic information.
"""
import os
import sys
import json
import importlib
import platform
import subprocess
def check_mark():
"""Return a green check mark for success."""
return "\033[92m✓\033[0m" if os.name != 'nt' else "\033[92mOK\033[0m"
def x_mark():
"""Return a red X mark for failure."""
return "\033[91m✗\033[0m" if os.name != 'nt' else "\033[91mFAIL\033[0m"
def info_mark():
"""Return a blue info mark."""
return "\033[94mi\033[0m" if os.name != 'nt' else "\033[94mINFO\033[0m"
def print_status(message, success=None):
"""Print a status message with appropriate formatting."""
if success is None:
print(f" {info_mark()} {message}")
elif success:
print(f" {check_mark()} {message}")
else:
print(f" {x_mark()} {message}")
def check_python():
"""Check Python installation."""
print("\n=== Python Environment ===")
print_status(f"Python version: {platform.python_version()}", True)
# Check virtualenv
try:
import virtualenv
print_status(f"virtualenv is installed (version: {virtualenv.__version__})", True)
except ImportError:
print_status("virtualenv is not installed", False)
# Check MCP package
try:
import mcp
version = getattr(mcp, "__version__", "unknown")
print_status(f"MCP package is installed (version: {version})", True)
except ImportError:
print_status("MCP package is not installed", False)
# Check if python_env exists
script_dir = os.path.dirname(os.path.abspath(__file__))
env_dir = os.path.join(script_dir, "python_env")
if os.path.exists(env_dir):
print_status(f"Python virtual environment exists at: {env_dir}", True)
else:
print_status(f"Python virtual environment not found at: {env_dir}", False)
# Check if run_unreal_mcp.bat exists
run_script = os.path.join(script_dir, "run_unreal_mcp.bat")
if os.path.exists(run_script):
print_status(f"Run script exists at: {run_script}", True)
else:
print_status(f"Run script not found at: {run_script}", False)
def check_claude_setup():
"""Check Claude Desktop setup."""
print("\n=== Claude Desktop Setup ===")
# Check if Claude Desktop is installed
claude_installed = False
if os.name == 'nt': # Windows
claude_paths = [
os.path.join(os.environ.get('LOCALAPPDATA', ''), 'Programs', 'Claude Desktop', 'Claude Desktop.exe'),
os.path.join(os.environ.get('PROGRAMFILES', ''), 'Claude Desktop', 'Claude Desktop.exe'),
os.path.join(os.environ.get('PROGRAMFILES(X86)', ''), 'Claude Desktop', 'Claude Desktop.exe')
]
for path in claude_paths:
if os.path.exists(path):
print_status(f"Claude Desktop is installed at: {path}", True)
claude_installed = True
break
elif os.name == 'darwin': # macOS
claude_paths = [
'/Applications/Claude Desktop.app',
os.path.expanduser('~/Applications/Claude Desktop.app')
]
for path in claude_paths:
if os.path.exists(path):
print_status(f"Claude Desktop is installed at: {path}", True)
claude_installed = True
break
if not claude_installed:
print_status("Claude Desktop installation not found", False)
# Check Claude config
config_file = None
if os.name == 'nt': # Windows
config_file = os.path.join(os.environ.get('APPDATA', ''), 'Claude', 'claude_desktop_config.json')
elif os.name == 'darwin': # macOS
config_file = os.path.expanduser('~/Library/Application Support/Claude/claude_desktop_config.json')
if config_file and os.path.exists(config_file):
print_status(f"Claude Desktop configuration file exists at: {config_file}", True)
try:
with open(config_file, 'r') as f:
config = json.load(f)
if 'mcpServers' in config and 'unreal' in config['mcpServers']:
cmd = config['mcpServers']['unreal'].get('command', '')
print_status(f"Unreal MCP configuration found with command: {cmd}", True)
if not os.path.exists(cmd):
print_status(f"Warning: The configured command path does not exist: {cmd}", False)
else:
print_status("Unreal MCP configuration not found in Claude Desktop config", False)
except (json.JSONDecodeError, FileNotFoundError) as e:
print_status(f"Error reading Claude Desktop config: {str(e)}", False)
else:
print_status(f"Claude Desktop configuration file not found at: {config_file}", False)
# Check Claude logs
log_file = None
if os.name == 'nt': # Windows
log_file = os.path.join(os.environ.get('APPDATA', ''), 'Claude', 'logs', 'mcp-server-unreal.log')
elif os.name == 'darwin': # macOS
log_file = os.path.expanduser('~/Library/Application Support/Claude/logs/mcp-server-unreal.log')
if log_file and os.path.exists(log_file):
print_status(f"Claude Desktop MCP log file exists at: {log_file}", True)
# Optionally show last few lines of log
try:
with open(log_file, 'r') as f:
lines = f.readlines()
if lines:
print("\n Last log entry:")
print(f" {lines[-1].strip()}")
except Exception:
pass
else:
print_status(f"Claude Desktop MCP log file not found at: {log_file}", None)
print_status("This is normal if you haven't run the MCP server with Claude Desktop yet", None)
def check_cursor_setup():
"""Check Cursor setup."""
print("\n=== Cursor Setup ===")
# Check if Cursor is installed
cursor_installed = False
if os.name == 'nt': # Windows
cursor_paths = [
os.path.join(os.environ.get('LOCALAPPDATA', ''), 'Programs', 'Cursor', 'Cursor.exe'),
os.path.join(os.environ.get('PROGRAMFILES', ''), 'Cursor', 'Cursor.exe'),
os.path.join(os.environ.get('PROGRAMFILES(X86)', ''), 'Cursor', 'Cursor.exe')
]
for path in cursor_paths:
if os.path.exists(path):
print_status(f"Cursor is installed at: {path}", True)
cursor_installed = True
break
elif os.name == 'darwin': # macOS
cursor_paths = [
'/Applications/Cursor.app',
os.path.expanduser('~/Applications/Cursor.app')
]
for path in cursor_paths:
if os.path.exists(path):
print_status(f"Cursor is installed at: {path}", True)
cursor_installed = True
break
elif os.name == 'posix': # Linux
cursor_paths = [
'/usr/bin/cursor',
'/usr/local/bin/cursor',
os.path.expanduser('~/.local/bin/cursor')
]
for path in cursor_paths:
if os.path.exists(path):
print_status(f"Cursor is installed at: {path}", True)
cursor_installed = True
break
if not cursor_installed:
print_status("Cursor installation not found", False)
# Check Cursor config
config_file = None
if os.name == 'nt': # Windows
config_file = os.path.join(os.environ.get('APPDATA', ''), 'Cursor', 'User', 'settings.json')
elif os.name == 'darwin': # macOS
config_file = os.path.expanduser('~/Library/Application Support/Cursor/User/settings.json')
elif os.name == 'posix': # Linux
config_file = os.path.expanduser('~/.config/Cursor/User/settings.json')
if config_file and os.path.exists(config_file):
print_status(f"Cursor configuration file exists at: {config_file}", True)
try:
with open(config_file, 'r') as f:
config = json.load(f)
mcp_enabled = config.get('mcp', {}).get('enabled', False)
print_status(f"MCP enabled in Cursor config: {mcp_enabled}", mcp_enabled)
servers = config.get('mcp', {}).get('servers', {})
if 'unreal' in servers:
cmd = servers['unreal'].get('command', '')
print_status(f"Unreal MCP configuration found with command: {cmd}", True)
if not os.path.exists(cmd):
print_status(f"Warning: The configured command path does not exist: {cmd}", False)
else:
print_status("Unreal MCP configuration not found in Cursor config", False)
except (json.JSONDecodeError, FileNotFoundError) as e:
print_status(f"Error reading Cursor config: {str(e)}", False)
else:
print_status(f"Cursor configuration file not found at: {config_file}", False)
def check_unreal_plugin():
"""Check Unreal Engine plugin setup."""
print("\n=== Unreal Engine Plugin ===")
# Get plugin directory
script_dir = os.path.dirname(os.path.abspath(__file__))
plugin_dir = os.path.abspath(os.path.join(script_dir, ".."))
# Check for plugin file
plugin_file = os.path.join(plugin_dir, "UnrealMCP.uplugin")
if os.path.exists(plugin_file):
print_status(f"UnrealMCP plugin file exists at: {plugin_file}", True)
else:
print_status(f"UnrealMCP plugin file not found at: {plugin_file}", False)
# Check for Source directory
source_dir = os.path.join(plugin_dir, "Source")
if os.path.exists(source_dir) and os.path.isdir(source_dir):
print_status(f"Plugin Source directory exists at: {source_dir}", True)
else:
print_status(f"Plugin Source directory not found at: {source_dir}", False)
# Check if the plugin can be loaded by Unreal
print_status("Note: To check if the plugin is loaded in Unreal Engine:", None)
print_status("1. Open your Unreal project", None)
print_status("2. Go to Edit > Plugins", None)
print_status("3. Search for 'UnrealMCP' and ensure it's enabled", None)
def main():
"""Main function."""
print("\n========================================================")
print(" Unreal MCP Setup Diagnosis Tool ")
print("========================================================")
check_python()
check_claude_setup()
check_cursor_setup()
check_unreal_plugin()
print("\n========================================================")
print(" Diagnosis Complete ")
print("========================================================")
print("\nIf you encountered any issues, please try running:")
print("1. setup_unreal_mcp.bat - To set up the Python environment")
print("2. setup_cursor_mcp.bat - For Cursor integration")
print("\nFor more help, see the README.md or open an issue on GitHub.")
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Private/UnrealMCP.cpp:
--------------------------------------------------------------------------------
```cpp
// Copyright Epic Games, Inc. All Rights Reserved.
#include "UnrealMCP.h"
#include "MCPTCPServer.h"
#include "MCPSettings.h"
#include "MCPConstants.h"
#include "LevelEditor.h"
#include "Framework/MultiBox/MultiBoxBuilder.h"
#include "Styling/SlateStyleRegistry.h"
#include "Interfaces/IPluginManager.h"
#include "Styling/SlateStyle.h"
#include "Styling/SlateStyleMacros.h"
#include "ISettingsModule.h"
#include "ToolMenus.h"
#include "ToolMenuSection.h"
#include "MCPFileLogger.h"
#include "Widgets/SWindow.h"
#include "Widgets/Layout/SBox.h"
#include "Widgets/Layout/SBorder.h"
#include "Widgets/Layout/SScrollBox.h"
#include "Widgets/Text/STextBlock.h"
#include "Widgets/Input/SButton.h"
#include "Widgets/Layout/SGridPanel.h"
#include "Widgets/Layout/SUniformGridPanel.h"
#include "Framework/Application/SlateApplication.h"
#include "EditorStyleSet.h"
// Define the log category
DEFINE_LOG_CATEGORY(LogMCP);
#define LOCTEXT_NAMESPACE "FUnrealMCPModule"
// Define a style set for our plugin
class FMCPPluginStyle : public FSlateStyleSet
{
public:
FMCPPluginStyle() : FSlateStyleSet("MCPPluginStyle")
{
const FVector2D Icon16x16(16.0f, 16.0f);
const FVector2D StatusSize(6.0f, 6.0f);
// Use path constants instead of finding the plugin each time
SetContentRoot(MCPConstants::PluginResourcesPath);
// Register icon
FSlateImageBrush* MCPIconBrush = new FSlateImageBrush(
RootToContentDir(TEXT("Icon128.png")),
Icon16x16,
FLinearColor::White, // Tint (white preserves original colors)
ESlateBrushTileType::NoTile // Ensure no tiling, just the image
);
Set("MCPPlugin.ServerIcon", MCPIconBrush);
// Create status indicator brushes
const FLinearColor RunningColor(0.0f, 0.8f, 0.0f); // Green
const FLinearColor StoppedColor(0.8f, 0.0f, 0.0f); // Red
Set("MCPPlugin.StatusRunning", new FSlateRoundedBoxBrush(RunningColor, 3.0f, FVector2f(StatusSize)));
Set("MCPPlugin.StatusStopped", new FSlateRoundedBoxBrush(StoppedColor, 3.0f, FVector2f(StatusSize)));
// Define a custom button style with hover feedback
FButtonStyle ToolbarButtonStyle = FAppStyle::Get().GetWidgetStyle<FButtonStyle>("LevelEditor.ToolBar.Button");
// Normal state: fully transparent background
ToolbarButtonStyle.SetNormal(FSlateColorBrush(FLinearColor(0, 0, 0, 0))); // Transparent
// Hovered state: subtle overlay (e.g., light gray with low opacity)
ToolbarButtonStyle.SetHovered(FSlateColorBrush(FLinearColor(0.2f, 0.2f, 0.2f, 0.3f))); // Semi-transparent gray
// Pressed state: slightly darker overlay
ToolbarButtonStyle.SetPressed(FSlateColorBrush(FLinearColor(0.1f, 0.1f, 0.1f, 0.5f))); // Darker semi-transparent gray
// Register the custom style
Set("MCPPlugin.TransparentToolbarButton", ToolbarButtonStyle);
}
static void Initialize()
{
if (!Instance.IsValid())
{
Instance = MakeShareable(new FMCPPluginStyle());
}
}
static void Shutdown()
{
if (Instance.IsValid())
{
FSlateStyleRegistry::UnRegisterSlateStyle(*Instance);
Instance.Reset();
}
}
static TSharedPtr<FMCPPluginStyle> Get()
{
return Instance;
}
private:
static TSharedPtr<FMCPPluginStyle> Instance;
};
TSharedPtr<FMCPPluginStyle> FMCPPluginStyle::Instance = nullptr;
void FUnrealMCPModule::StartupModule()
{
// Initialize path constants first
MCPConstants::InitializePathConstants();
// Initialize our custom log category
MCP_LOG_INFO("UnrealMCP Plugin is starting up");
// Initialize file logger - now using path constants
FString LogFilePath = FPaths::Combine(MCPConstants::PluginLogsPath, TEXT("MCPServer.log"));
FMCPFileLogger::Get().Initialize(LogFilePath);
// Register style set
FMCPPluginStyle::Initialize();
FSlateStyleRegistry::RegisterSlateStyle(*FMCPPluginStyle::Get());
// More debug logging
MCP_LOG_INFO("UnrealMCP Style registered");
// Register settings
if (ISettingsModule* SettingsModule = FModuleManager::GetModulePtr<ISettingsModule>("Settings"))
{
SettingsModule->RegisterSettings("Editor", "Plugins", "MCP Settings",
LOCTEXT("MCPSettingsName", "MCP Settings"),
LOCTEXT("MCPSettingsDescription", "Configure the MCP plugin settings"),
GetMutableDefault<UMCPSettings>()
);
}
// Register for post engine init to add toolbar button
// First, make sure we're not already registered
FCoreDelegates::OnPostEngineInit.RemoveAll(this);
MCP_LOG_INFO("Registering OnPostEngineInit delegate");
FCoreDelegates::OnPostEngineInit.AddRaw(this, &FUnrealMCPModule::ExtendLevelEditorToolbar);
}
void FUnrealMCPModule::ShutdownModule()
{
// Unregister style set
FMCPPluginStyle::Shutdown();
// Unregister settings
if (ISettingsModule* SettingsModule = FModuleManager::GetModulePtr<ISettingsModule>("Settings"))
{
SettingsModule->UnregisterSettings("Editor", "Plugins", "MCP Settings");
}
// Stop server if running
if (Server)
{
StopServer();
}
// Close control panel if open
CloseMCPControlPanel();
// Clean up delegates
FCoreDelegates::OnPostEngineInit.RemoveAll(this);
}
void FUnrealMCPModule::ExtendLevelEditorToolbar()
{
static bool bToolbarExtended = false;
if (bToolbarExtended)
{
MCP_LOG_WARNING("ExtendLevelEditorToolbar called but toolbar already extended, skipping");
return;
}
MCP_LOG_INFO("ExtendLevelEditorToolbar called - first time");
UToolMenus::Get()->RegisterMenu("LevelEditor.MainMenu", "MainFrame.MainMenu");
UToolMenu* ToolbarMenu = UToolMenus::Get()->ExtendMenu("LevelEditor.LevelEditorToolBar.User");
if (ToolbarMenu)
{
FToolMenuSection& Section = ToolbarMenu->FindOrAddSection("MCP");
// Add a custom widget instead of a static toolbar button
Section.AddEntry(FToolMenuEntry::InitWidget(
"MCPServerControl",
SNew(SButton)
.ButtonStyle(FMCPPluginStyle::Get().ToSharedRef(), "MCPPlugin.TransparentToolbarButton")
//.ButtonStyle(FAppStyle::Get(), "LevelEditor.ToolBar.Button") // Match toolbar style
.OnClicked(FOnClicked::CreateRaw(this, &FUnrealMCPModule::OpenMCPControlPanel_OnClicked))
.ToolTipText(LOCTEXT("MCPButtonTooltip", "Open MCP Server Control Panel"))
.Content()
[
SNew(SOverlay)
+ SOverlay::Slot()
[
SNew(SImage)
.Image(FMCPPluginStyle::Get()->GetBrush("MCPPlugin.ServerIcon"))
.ColorAndOpacity(FLinearColor::White) // Ensure no tint overrides transparency
]
+ SOverlay::Slot()
.HAlign(HAlign_Right)
.VAlign(VAlign_Bottom)
[
SNew(SImage)
.Image_Lambda([this]() -> const FSlateBrush* {
return IsServerRunning()
? FMCPPluginStyle::Get()->GetBrush("MCPPlugin.StatusRunning")
: FMCPPluginStyle::Get()->GetBrush("MCPPlugin.StatusStopped");
})
]
],
FText::GetEmpty(), // No label needed since the icon is visual
true, // bNoIndent
false, // bSearchable
false
));
MCP_LOG_INFO("MCP Server button added to main toolbar with dynamic icon");
}
// Window menu code remains unchanged
UToolMenu* WindowMenu = UToolMenus::Get()->ExtendMenu("LevelEditor.MainMenu.Window");
if (WindowMenu)
{
FToolMenuSection& Section = WindowMenu->FindOrAddSection("WindowLayout");
Section.AddMenuEntry(
"MCPServerControlWindow",
LOCTEXT("MCPWindowMenuLabel", "MCP Server Control Panel"),
LOCTEXT("MCPWindowMenuTooltip", "Open MCP Server Control Panel"),
FSlateIcon(FMCPPluginStyle::Get()->GetStyleSetName(), "MCPPlugin.ServerIcon"),
FUIAction(
FExecuteAction::CreateRaw(this, &FUnrealMCPModule::OpenMCPControlPanel),
FCanExecuteAction()
)
);
MCP_LOG_INFO("MCP Server entry added to Window menu");
}
bToolbarExtended = true;
}
// Legacy toolbar extension method - no longer used
void FUnrealMCPModule::AddToolbarButton(FToolBarBuilder& Builder)
{
Builder.AddToolBarButton(
FUIAction(
FExecuteAction::CreateRaw(this, &FUnrealMCPModule::OpenMCPControlPanel),
FCanExecuteAction()
),
NAME_None,
LOCTEXT("MCPButtonLabel", "MCP Server"),
LOCTEXT("MCPButtonTooltip", "Open MCP Server Control Panel"),
FSlateIcon(FMCPPluginStyle::Get()->GetStyleSetName(), "MCPPlugin.ServerIcon")
);
}
void FUnrealMCPModule::OpenMCPControlPanel()
{
// If the window already exists, just focus it
if (MCPControlPanelWindow.IsValid())
{
MCPControlPanelWindow->BringToFront();
return;
}
// Create a new window
MCPControlPanelWindow = SNew(SWindow)
.Title(LOCTEXT("MCPControlPanelTitle", "MCP Server Control Panel"))
.SizingRule(ESizingRule::Autosized)
.SupportsMaximize(false)
.SupportsMinimize(false)
.HasCloseButton(true)
.CreateTitleBar(true)
.IsTopmostWindow(true)
.MinWidth(300)
.MinHeight(150);
// Set the content of the window
MCPControlPanelWindow->SetContent(CreateMCPControlPanelContent());
// Register a callback for when the window is closed
MCPControlPanelWindow->GetOnWindowClosedEvent().AddRaw(this, &FUnrealMCPModule::OnMCPControlPanelClosed);
// Show the window
FSlateApplication::Get().AddWindow(MCPControlPanelWindow.ToSharedRef());
MCP_LOG_INFO("MCP Control Panel opened");
}
FReply FUnrealMCPModule::OpenMCPControlPanel_OnClicked()
{
OpenMCPControlPanel();
return FReply::Handled();
}
void FUnrealMCPModule::OnMCPControlPanelClosed(const TSharedRef<SWindow>& Window)
{
MCPControlPanelWindow.Reset();
MCP_LOG_INFO("MCP Control Panel closed");
}
void FUnrealMCPModule::CloseMCPControlPanel()
{
if (MCPControlPanelWindow.IsValid())
{
MCPControlPanelWindow->RequestDestroyWindow();
MCPControlPanelWindow.Reset();
MCP_LOG_INFO("MCP Control Panel closed");
}
}
TSharedRef<SWidget> FUnrealMCPModule::CreateMCPControlPanelContent()
{
const UMCPSettings* Settings = GetDefault<UMCPSettings>();
return SNew(SBorder)
.BorderImage(FAppStyle::GetBrush("ToolPanel.GroupBorder"))
.Padding(8.0f)
[
SNew(SVerticalBox)
// Status section
+ SVerticalBox::Slot()
.AutoHeight()
.Padding(0, 0, 0, 8)
[
SNew(SHorizontalBox)
+ SHorizontalBox::Slot()
.AutoWidth()
.VAlign(VAlign_Center)
.Padding(0, 0, 8, 0)
[
SNew(STextBlock)
.Text(LOCTEXT("ServerStatusLabel", "Server Status:"))
.Font(FAppStyle::GetFontStyle("NormalText"))
]
+ SHorizontalBox::Slot()
.FillWidth(1.0f)
.VAlign(VAlign_Center)
[
SNew(STextBlock)
.Text_Lambda([this]() -> FText {
return IsServerRunning()
? LOCTEXT("ServerRunningStatus", "Running")
: LOCTEXT("ServerStoppedStatus", "Stopped");
})
.ColorAndOpacity_Lambda([this]() -> FSlateColor {
return IsServerRunning()
? FSlateColor(FLinearColor(0.0f, 0.8f, 0.0f))
: FSlateColor(FLinearColor(0.8f, 0.0f, 0.0f));
})
.Font(FAppStyle::GetFontStyle("NormalText"))
]
]
// Port information
+ SVerticalBox::Slot()
.AutoHeight()
.Padding(0, 0, 0, 8)
[
SNew(SHorizontalBox)
+ SHorizontalBox::Slot()
.AutoWidth()
.VAlign(VAlign_Center)
.Padding(0, 0, 8, 0)
[
SNew(STextBlock)
.Text(LOCTEXT("ServerPortLabel", "Port:"))
.Font(FAppStyle::GetFontStyle("NormalText"))
]
+ SHorizontalBox::Slot()
.FillWidth(1.0f)
.VAlign(VAlign_Center)
[
SNew(STextBlock)
.Text(FText::FromString(FString::FromInt(Settings->Port)))
.Font(FAppStyle::GetFontStyle("NormalText"))
]
]
// Buttons
+ SVerticalBox::Slot()
.AutoHeight()
.Padding(0, 8, 0, 0)
.HAlign(HAlign_Center)
[
SNew(SUniformGridPanel)
.SlotPadding(FMargin(5.0f))
.MinDesiredSlotWidth(100.0f)
// Start button
+ SUniformGridPanel::Slot(0, 0)
[
SNew(SButton)
.HAlign(HAlign_Center)
.VAlign(VAlign_Center)
.Text(LOCTEXT("StartServerButton", "Start Server"))
.IsEnabled_Lambda([this]() -> bool { return !IsServerRunning(); })
.OnClicked(FOnClicked::CreateRaw(this, &FUnrealMCPModule::OnStartServerClicked))
]
// Stop button
+ SUniformGridPanel::Slot(1, 0)
[
SNew(SButton)
.HAlign(HAlign_Center)
.VAlign(VAlign_Center)
.Text(LOCTEXT("StopServerButton", "Stop Server"))
.IsEnabled_Lambda([this]() -> bool { return IsServerRunning(); })
.OnClicked(FOnClicked::CreateRaw(this, &FUnrealMCPModule::OnStopServerClicked))
]
]
// Settings button
+ SVerticalBox::Slot()
.AutoHeight()
.Padding(0, 16, 0, 0)
.HAlign(HAlign_Center)
[
SNew(SButton)
.HAlign(HAlign_Center)
.VAlign(VAlign_Center)
.Text(LOCTEXT("OpenSettingsButton", "Open Settings"))
.OnClicked_Lambda([this]() -> FReply {
if (ISettingsModule* SettingsModule = FModuleManager::GetModulePtr<ISettingsModule>("Settings"))
{
SettingsModule->ShowViewer("Editor", "Plugins", "MCP Settings");
}
return FReply::Handled();
})
]
];
}
FReply FUnrealMCPModule::OnStartServerClicked()
{
StartServer();
return FReply::Handled();
}
FReply FUnrealMCPModule::OnStopServerClicked()
{
StopServer();
return FReply::Handled();
}
void FUnrealMCPModule::ToggleServer()
{
MCP_LOG_WARNING("ToggleServer called - Server state: %s", (Server && Server->IsRunning()) ? TEXT("Running") : TEXT("Not Running"));
if (Server && Server->IsRunning())
{
MCP_LOG_WARNING("Stopping server...");
StopServer();
}
else
{
MCP_LOG_WARNING("Starting server...");
StartServer();
}
MCP_LOG_WARNING("ToggleServer completed - Server state: %s", (Server && Server->IsRunning()) ? TEXT("Running") : TEXT("Not Running"));
}
void FUnrealMCPModule::StartServer()
{
// Check if server is already running to prevent double-start
if (Server && Server->IsRunning())
{
MCP_LOG_WARNING("Server is already running, ignoring start request");
return;
}
MCP_LOG_WARNING("Creating new server instance");
const UMCPSettings* Settings = GetDefault<UMCPSettings>();
// Create a config object and set the port from settings
FMCPTCPServerConfig Config;
Config.Port = Settings->Port;
// Create the server with the config
Server = MakeUnique<FMCPTCPServer>(Config);
if (Server->Start())
{
// Refresh the toolbar to update the status indicator
if (UToolMenus* ToolMenus = UToolMenus::Get())
{
ToolMenus->RefreshAllWidgets();
}
}
else
{
MCP_LOG_ERROR("Failed to start MCP Server");
}
}
void FUnrealMCPModule::StopServer()
{
if (Server)
{
Server->Stop();
Server.Reset();
MCP_LOG_INFO("MCP Server stopped");
// Refresh the toolbar to update the status indicator
if (UToolMenus* ToolMenus = UToolMenus::Get())
{
ToolMenus->RefreshAllWidgets();
}
}
}
bool FUnrealMCPModule::IsServerRunning() const
{
return Server && Server->IsRunning();
}
#undef LOCTEXT_NAMESPACE
IMPLEMENT_MODULE(FUnrealMCPModule, UnrealMCP)
```
--------------------------------------------------------------------------------
/Source/UnrealMCP/Private/MCPTCPServer.cpp:
--------------------------------------------------------------------------------
```cpp
#include "MCPTCPServer.h"
#include "Engine/World.h"
#include "Editor.h"
#include "LevelEditor.h"
#include "Engine/StaticMeshActor.h"
#include "Components/StaticMeshComponent.h"
#include "JsonObjectConverter.h"
#include "Sockets.h"
#include "SocketSubsystem.h"
#include "IPAddress.h"
#include "Interfaces/IPv4/IPv4Address.h"
#include "Interfaces/IPv4/IPv4Endpoint.h"
#include "ActorEditorUtils.h"
#include "EngineUtils.h"
#include "Containers/Ticker.h"
#include "UnrealMCP.h"
#include "MCPFileLogger.h"
#include "MCPCommandHandlers.h"
#include "MCPCommandHandlers_Blueprints.h"
#include "MCPCommandHandlers_Materials.h"
#include "HAL/PlatformFilemanager.h"
#include "Misc/FileHelper.h"
#include "Misc/Paths.h"
#include "Misc/Guid.h"
#include "MCPConstants.h"
FMCPTCPServer::FMCPTCPServer(const FMCPTCPServerConfig& InConfig)
: Config(InConfig)
, Listener(nullptr)
, bRunning(false)
{
// Register default command handlers
RegisterCommandHandler(MakeShared<FMCPGetSceneInfoHandler>());
RegisterCommandHandler(MakeShared<FMCPCreateObjectHandler>());
RegisterCommandHandler(MakeShared<FMCPModifyObjectHandler>());
RegisterCommandHandler(MakeShared<FMCPDeleteObjectHandler>());
RegisterCommandHandler(MakeShared<FMCPExecutePythonHandler>());
// Material command handlers
RegisterCommandHandler(MakeShared<FMCPCreateMaterialHandler>());
RegisterCommandHandler(MakeShared<FMCPModifyMaterialHandler>());
RegisterCommandHandler(MakeShared<FMCPGetMaterialInfoHandler>());
// Blueprint command handlers
RegisterCommandHandler(MakeShared<FMCPCreateBlueprintHandler>());
RegisterCommandHandler(MakeShared<FMCPModifyBlueprintHandler>());
RegisterCommandHandler(MakeShared<FMCPGetBlueprintInfoHandler>());
RegisterCommandHandler(MakeShared<FMCPCreateBlueprintEventHandler>());
}
FMCPTCPServer::~FMCPTCPServer()
{
Stop();
}
void FMCPTCPServer::RegisterCommandHandler(TSharedPtr<IMCPCommandHandler> Handler)
{
if (!Handler.IsValid())
{
MCP_LOG_ERROR("Attempted to register null command handler");
return;
}
FString CommandName = Handler->GetCommandName();
if (CommandName.IsEmpty())
{
MCP_LOG_ERROR("Attempted to register command handler with empty command name");
return;
}
CommandHandlers.Add(CommandName, Handler);
MCP_LOG_INFO("Registered command handler for '%s'", *CommandName);
}
void FMCPTCPServer::UnregisterCommandHandler(const FString& CommandName)
{
if (CommandHandlers.Remove(CommandName) > 0)
{
MCP_LOG_INFO("Unregistered command handler for '%s'", *CommandName);
}
else
{
MCP_LOG_WARNING("Attempted to unregister non-existent command handler for '%s'", *CommandName);
}
}
bool FMCPTCPServer::RegisterExternalCommandHandler(TSharedPtr<IMCPCommandHandler> Handler)
{
if (!Handler.IsValid())
{
MCP_LOG_ERROR("Attempted to register null external command handler");
return false;
}
FString CommandName = Handler->GetCommandName();
if (CommandName.IsEmpty())
{
MCP_LOG_ERROR("Attempted to register external command handler with empty command name");
return false;
}
// Check if there's a conflict with an existing handler
if (CommandHandlers.Contains(CommandName))
{
MCP_LOG_WARNING("External command handler for '%s' conflicts with an existing handler", *CommandName);
return false;
}
// Register the handler
CommandHandlers.Add(CommandName, Handler);
MCP_LOG_INFO("Registered external command handler for '%s'", *CommandName);
return true;
}
bool FMCPTCPServer::UnregisterExternalCommandHandler(const FString& CommandName)
{
if (CommandName.IsEmpty())
{
MCP_LOG_ERROR("Attempted to unregister external command handler with empty command name");
return false;
}
// Check if the handler exists
if (!CommandHandlers.Contains(CommandName))
{
MCP_LOG_WARNING("Attempted to unregister non-existent external command handler for '%s'", *CommandName);
return false;
}
// Unregister the handler
CommandHandlers.Remove(CommandName);
MCP_LOG_INFO("Unregistered external command handler for '%s'", *CommandName);
return true;
}
bool FMCPTCPServer::Start()
{
if (bRunning)
{
MCP_LOG_WARNING("Start called but server is already running, returning true");
return true;
}
MCP_LOG_WARNING("Starting MCP server on port %d", Config.Port);
// Use a simple ASCII string for the socket description to avoid encoding issues
Listener = new FTcpListener(FIPv4Endpoint(FIPv4Address::Any, Config.Port));
if (!Listener || !Listener->IsActive())
{
MCP_LOG_ERROR("Failed to start MCP server on port %d", Config.Port);
Stop();
return false;
}
// Clear any existing client connections
ClientConnections.Empty();
TickerHandle = FTSTicker::GetCoreTicker().AddTicker(FTickerDelegate::CreateRaw(this, &FMCPTCPServer::Tick), Config.TickIntervalSeconds);
bRunning = true;
MCP_LOG_INFO("MCP Server started on port %d", Config.Port);
return true;
}
void FMCPTCPServer::Stop()
{
// Clean up all client connections
CleanupAllClientConnections();
if (Listener)
{
delete Listener;
Listener = nullptr;
}
if (TickerHandle.IsValid())
{
FTSTicker::GetCoreTicker().RemoveTicker(TickerHandle);
TickerHandle.Reset();
}
bRunning = false;
MCP_LOG_INFO("MCP Server stopped");
}
bool FMCPTCPServer::Tick(float DeltaTime)
{
if (!bRunning) return false;
// Normal processing
ProcessPendingConnections();
ProcessClientData();
CheckClientTimeouts(DeltaTime);
return true;
}
void FMCPTCPServer::ProcessPendingConnections()
{
if (!Listener) return;
// Always accept new connections
if (!Listener->OnConnectionAccepted().IsBound())
{
Listener->OnConnectionAccepted().BindRaw(this, &FMCPTCPServer::HandleConnectionAccepted);
}
}
bool FMCPTCPServer::HandleConnectionAccepted(FSocket* InSocket, const FIPv4Endpoint& Endpoint)
{
if (!InSocket)
{
MCP_LOG_ERROR("HandleConnectionAccepted called with null socket");
return false;
}
MCP_LOG_VERBOSE("Connection attempt from %s", *Endpoint.ToString());
// Accept all connections
InSocket->SetNonBlocking(true);
// Add to our list of client connections
ClientConnections.Add(FMCPClientConnection(InSocket, Endpoint, Config.ReceiveBufferSize));
MCP_LOG_INFO("MCP Client connected from %s (Total clients: %d)", *Endpoint.ToString(), ClientConnections.Num());
return true;
}
void FMCPTCPServer::ProcessClientData()
{
// Make a copy of the array since we might modify it during iteration
TArray<FMCPClientConnection> ConnectionsCopy = ClientConnections;
for (FMCPClientConnection& ClientConnection : ConnectionsCopy)
{
if (!ClientConnection.Socket) continue;
// Check if the client is still connected
uint32 PendingDataSize = 0;
if (!ClientConnection.Socket->HasPendingData(PendingDataSize))
{
// Try to check connection status
uint8 DummyBuffer[1];
int32 BytesRead = 0;
bool bConnectionLost = false;
try
{
if (!ClientConnection.Socket->Recv(DummyBuffer, 1, BytesRead, ESocketReceiveFlags::Peek))
{
// Check if it's a real error or just a non-blocking socket that would block
int32 ErrorCode = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->GetLastErrorCode();
if (ErrorCode != SE_EWOULDBLOCK)
{
// Real connection error
MCP_LOG_INFO("Client connection from %s appears to be closed (error code %d), cleaning up",
*ClientConnection.Endpoint.ToString(), ErrorCode);
bConnectionLost = true;
}
}
}
catch (...)
{
MCP_LOG_ERROR("Exception while checking client connection status for %s",
*ClientConnection.Endpoint.ToString());
bConnectionLost = true;
}
if (bConnectionLost)
{
CleanupClientConnection(ClientConnection);
continue; // Skip to the next client
}
}
// Reset PendingDataSize and check again to ensure we have the latest value
PendingDataSize = 0;
if (ClientConnection.Socket->HasPendingData(PendingDataSize))
{
if (Config.bEnableVerboseLogging)
{
MCP_LOG_VERBOSE("Client from %s has %u bytes of pending data",
*ClientConnection.Endpoint.ToString(), PendingDataSize);
}
// Reset timeout timer since we're receiving data
ClientConnection.TimeSinceLastActivity = 0.0f;
int32 BytesRead = 0;
if (ClientConnection.Socket->Recv(ClientConnection.ReceiveBuffer.GetData(), ClientConnection.ReceiveBuffer.Num(), BytesRead))
{
if (BytesRead > 0)
{
if (Config.bEnableVerboseLogging)
{
MCP_LOG_VERBOSE("Read %d bytes from client %s", BytesRead, *ClientConnection.Endpoint.ToString());
}
// Null-terminate the buffer to ensure it's a valid string
ClientConnection.ReceiveBuffer[BytesRead] = 0;
FString ReceivedData = FString(UTF8_TO_TCHAR(ClientConnection.ReceiveBuffer.GetData()));
ProcessCommand(ReceivedData, ClientConnection.Socket);
}
}
else
{
// Check if it's a real error or just a non-blocking socket that would block
int32 ErrorCode = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->GetLastErrorCode();
if (ErrorCode != SE_EWOULDBLOCK)
{
// Real connection error, close the socket
MCP_LOG_WARNING("Socket error %d for client %s, closing connection",
ErrorCode, *ClientConnection.Endpoint.ToString());
CleanupClientConnection(ClientConnection);
}
}
}
}
}
void FMCPTCPServer::CheckClientTimeouts(float DeltaTime)
{
// Make a copy of the array since we might modify it during iteration
TArray<FMCPClientConnection> ConnectionsCopy = ClientConnections;
for (FMCPClientConnection& ClientConnection : ConnectionsCopy)
{
if (!ClientConnection.Socket) continue;
// Increment time since last activity
ClientConnection.TimeSinceLastActivity += DeltaTime;
// Check if client has timed out
if (ClientConnection.TimeSinceLastActivity > Config.ClientTimeoutSeconds)
{
MCP_LOG_WARNING("Client from %s timed out after %.1f seconds of inactivity, disconnecting",
*ClientConnection.Endpoint.ToString(), ClientConnection.TimeSinceLastActivity);
CleanupClientConnection(ClientConnection);
}
}
}
void FMCPTCPServer::CleanupAllClientConnections()
{
MCP_LOG_INFO("Cleaning up all client connections (%d total)", ClientConnections.Num());
// Make a copy of the array since we'll be modifying it during iteration
TArray<FMCPClientConnection> ConnectionsCopy = ClientConnections;
for (FMCPClientConnection& Connection : ConnectionsCopy)
{
CleanupClientConnection(Connection);
}
// Ensure the array is empty
ClientConnections.Empty();
}
void FMCPTCPServer::CleanupClientConnection(FSocket* ClientSocket)
{
if (!ClientSocket) return;
// Find the client connection with this socket
for (FMCPClientConnection& Connection : ClientConnections)
{
if (Connection.Socket == ClientSocket)
{
CleanupClientConnection(Connection);
break;
}
}
}
void FMCPTCPServer::CleanupClientConnection(FMCPClientConnection& ClientConnection)
{
if (!ClientConnection.Socket) return;
MCP_LOG_INFO("Cleaning up client connection from %s", *ClientConnection.Endpoint.ToString());
try
{
// Get the socket description before closing
FString SocketDesc = GetSafeSocketDescription(ClientConnection.Socket);
MCP_LOG_VERBOSE("Closing client socket with description: %s", *SocketDesc);
// First close the socket
bool bCloseSuccess = ClientConnection.Socket->Close();
if (!bCloseSuccess)
{
MCP_LOG_ERROR("Failed to close client socket");
}
// Then destroy it
ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM);
if (SocketSubsystem)
{
SocketSubsystem->DestroySocket(ClientConnection.Socket);
MCP_LOG_VERBOSE("Successfully destroyed client socket");
}
else
{
MCP_LOG_ERROR("Failed to get socket subsystem when cleaning up client connection");
}
}
catch (const std::exception& Ex)
{
MCP_LOG_ERROR("Exception while cleaning up client connection: %s", UTF8_TO_TCHAR(Ex.what()));
}
catch (...)
{
MCP_LOG_ERROR("Unknown exception while cleaning up client connection");
}
// Remove from our list of connections
ClientConnections.RemoveAll([&ClientConnection](const FMCPClientConnection& Connection) {
return Connection.Socket == ClientConnection.Socket;
});
MCP_LOG_INFO("MCP Client disconnected (Remaining clients: %d)", ClientConnections.Num());
}
void FMCPTCPServer::ProcessCommand(const FString& CommandJson, FSocket* ClientSocket)
{
if (Config.bEnableVerboseLogging)
{
MCP_LOG_VERBOSE("Processing command: %s", *CommandJson);
}
TSharedPtr<FJsonObject> Command;
TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(CommandJson);
if (FJsonSerializer::Deserialize(Reader, Command) && Command.IsValid())
{
FString Type;
if (Command->TryGetStringField(FStringView(TEXT("type")), Type))
{
TSharedPtr<IMCPCommandHandler> Handler = CommandHandlers.FindRef(Type);
if (Handler.IsValid())
{
MCP_LOG_INFO("Processing command: %s", *Type);
const TSharedPtr<FJsonObject>* ParamsPtr = nullptr;
TSharedPtr<FJsonObject> Params = MakeShared<FJsonObject>();
if (Command->TryGetObjectField(FStringView(TEXT("params")), ParamsPtr) && ParamsPtr != nullptr)
{
Params = *ParamsPtr;
}
// Handle the command and get the response
TSharedPtr<FJsonObject> Response = Handler->Execute(Params, ClientSocket);
// Send the response
SendResponse(ClientSocket, Response);
}
else
{
MCP_LOG_WARNING("Unknown command: %s", *Type);
TSharedPtr<FJsonObject> Response = MakeShared<FJsonObject>();
Response->SetStringField("status", "error");
Response->SetStringField("message", FString::Printf(TEXT("Unknown command: %s"), *Type));
SendResponse(ClientSocket, Response);
}
}
else
{
MCP_LOG_WARNING("Missing 'type' field in command");
TSharedPtr<FJsonObject> Response = MakeShared<FJsonObject>();
Response->SetStringField("status", "error");
Response->SetStringField("message", TEXT("Missing 'type' field"));
SendResponse(ClientSocket, Response);
}
}
else
{
MCP_LOG_WARNING("Invalid JSON format: %s", *CommandJson);
TSharedPtr<FJsonObject> Response = MakeShared<FJsonObject>();
Response->SetStringField("status", "error");
Response->SetStringField("message", TEXT("Invalid JSON format"));
SendResponse(ClientSocket, Response);
}
// Keep the connection open for future commands
// Do not close the socket here
}
void FMCPTCPServer::SendResponse(FSocket* Client, const TSharedPtr<FJsonObject>& Response)
{
if (!Client) return;
FString ResponseStr;
TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&ResponseStr);
FJsonSerializer::Serialize(Response.ToSharedRef(), Writer);
if (Config.bEnableVerboseLogging)
{
MCP_LOG_VERBOSE("Preparing to send response: %s", *ResponseStr);
}
FTCHARToUTF8 Converter(*ResponseStr);
int32 BytesSent = 0;
int32 TotalBytes = Converter.Length();
const uint8* Data = (const uint8*)Converter.Get();
// Ensure all data is sent
while (BytesSent < TotalBytes)
{
int32 SentThisTime = 0;
if (!Client->Send(Data + BytesSent, TotalBytes - BytesSent, SentThisTime))
{
MCP_LOG_WARNING("Failed to send response");
break;
}
if (SentThisTime <= 0)
{
// Would block, try again next tick
MCP_LOG_VERBOSE("Socket would block, will try again next tick");
break;
}
BytesSent += SentThisTime;
if (Config.bEnableVerboseLogging)
{
MCP_LOG_VERBOSE("Sent %d/%d bytes", BytesSent, TotalBytes);
}
}
if (BytesSent == TotalBytes)
{
MCP_LOG_INFO("Successfully sent complete response (%d bytes)", TotalBytes);
}
else
{
MCP_LOG_WARNING("Only sent %d/%d bytes of response", BytesSent, TotalBytes);
}
}
FString FMCPTCPServer::GetSafeSocketDescription(FSocket* Socket)
{
if (!Socket)
{
return TEXT("NullSocket");
}
try
{
FString Description = Socket->GetDescription();
// Check if the description contains any non-ASCII characters
bool bHasNonAscii = false;
for (TCHAR Char : Description)
{
if (Char > 127)
{
bHasNonAscii = true;
break;
}
}
if (bHasNonAscii)
{
// Return a safe description instead
return TEXT("Socket_") + FString::FromInt(reinterpret_cast<uint64>(Socket));
}
return Description;
}
catch (...)
{
// If there's any exception, return a safe description
return TEXT("Socket_") + FString::FromInt(reinterpret_cast<uint64>(Socket));
}
}
```