This is page 13 of 16. Use http://codebase.md/trycua/cua?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .all-contributorsrc ├── .cursorignore ├── .devcontainer │ ├── devcontainer.json │ ├── post-install.sh │ └── README.md ├── .dockerignore ├── .gitattributes ├── .github │ ├── FUNDING.yml │ ├── scripts │ │ ├── get_pyproject_version.py │ │ └── tests │ │ ├── __init__.py │ │ ├── README.md │ │ └── test_get_pyproject_version.py │ └── workflows │ ├── ci-lume.yml │ ├── docker-publish-kasm.yml │ ├── docker-publish-xfce.yml │ ├── docker-reusable-publish.yml │ ├── npm-publish-computer.yml │ ├── npm-publish-core.yml │ ├── publish-lume.yml │ ├── pypi-publish-agent.yml │ ├── pypi-publish-computer-server.yml │ ├── pypi-publish-computer.yml │ ├── pypi-publish-core.yml │ ├── pypi-publish-mcp-server.yml │ ├── pypi-publish-pylume.yml │ ├── pypi-publish-som.yml │ ├── pypi-reusable-publish.yml │ └── test-validation-script.yml ├── .gitignore ├── .vscode │ ├── docs.code-workspace │ ├── launch.json │ ├── libs-ts.code-workspace │ ├── lume.code-workspace │ ├── lumier.code-workspace │ ├── py.code-workspace │ └── settings.json ├── blog │ ├── app-use.md │ ├── assets │ │ ├── composite-agents.png │ │ ├── docker-ubuntu-support.png │ │ ├── hack-booth.png │ │ ├── hack-closing-ceremony.jpg │ │ ├── hack-cua-ollama-hud.jpeg │ │ ├── hack-leaderboard.png │ │ ├── hack-the-north.png │ │ ├── hack-winners.jpeg │ │ ├── hack-workshop.jpeg │ │ ├── hud-agent-evals.png │ │ └── trajectory-viewer.jpeg │ ├── bringing-computer-use-to-the-web.md │ ├── build-your-own-operator-on-macos-1.md │ ├── build-your-own-operator-on-macos-2.md │ ├── composite-agents.md │ ├── cua-hackathon.md │ ├── hack-the-north.md │ ├── hud-agent-evals.md │ ├── human-in-the-loop.md │ ├── introducing-cua-cloud-containers.md │ ├── lume-to-containerization.md │ ├── sandboxed-python-execution.md │ ├── training-computer-use-models-trajectories-1.md │ ├── trajectory-viewer.md │ ├── ubuntu-docker-support.md │ └── windows-sandbox.md ├── CONTRIBUTING.md ├── Development.md ├── Dockerfile ├── docs │ ├── .gitignore │ ├── .prettierrc │ ├── content │ │ └── docs │ │ ├── agent-sdk │ │ │ ├── agent-loops.mdx │ │ │ ├── benchmarks │ │ │ │ ├── index.mdx │ │ │ │ ├── interactive.mdx │ │ │ │ ├── introduction.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── osworld-verified.mdx │ │ │ │ ├── screenspot-pro.mdx │ │ │ │ └── screenspot-v2.mdx │ │ │ ├── callbacks │ │ │ │ ├── agent-lifecycle.mdx │ │ │ │ ├── cost-saving.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── logging.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── pii-anonymization.mdx │ │ │ │ └── trajectories.mdx │ │ │ ├── chat-history.mdx │ │ │ ├── custom-computer-handlers.mdx │ │ │ ├── custom-tools.mdx │ │ │ ├── customizing-computeragent.mdx │ │ │ ├── integrations │ │ │ │ ├── hud.mdx │ │ │ │ └── meta.json │ │ │ ├── message-format.mdx │ │ │ ├── meta.json │ │ │ ├── migration-guide.mdx │ │ │ ├── prompt-caching.mdx │ │ │ ├── supported-agents │ │ │ │ ├── composed-agents.mdx │ │ │ │ ├── computer-use-agents.mdx │ │ │ │ ├── grounding-models.mdx │ │ │ │ ├── human-in-the-loop.mdx │ │ │ │ └── meta.json │ │ │ ├── supported-model-providers │ │ │ │ ├── index.mdx │ │ │ │ └── local-models.mdx │ │ │ └── usage-tracking.mdx │ │ ├── computer-sdk │ │ │ ├── cloud-vm-management.mdx │ │ │ ├── commands.mdx │ │ │ ├── computer-ui.mdx │ │ │ ├── computers.mdx │ │ │ ├── meta.json │ │ │ └── sandboxed-python.mdx │ │ ├── index.mdx │ │ ├── libraries │ │ │ ├── agent │ │ │ │ └── index.mdx │ │ │ ├── computer │ │ │ │ └── index.mdx │ │ │ ├── computer-server │ │ │ │ ├── Commands.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── REST-API.mdx │ │ │ │ └── WebSocket-API.mdx │ │ │ ├── core │ │ │ │ └── index.mdx │ │ │ ├── lume │ │ │ │ ├── cli-reference.mdx │ │ │ │ ├── faq.md │ │ │ │ ├── http-api.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── meta.json │ │ │ │ └── prebuilt-images.mdx │ │ │ ├── lumier │ │ │ │ ├── building-lumier.mdx │ │ │ │ ├── docker-compose.mdx │ │ │ │ ├── docker.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ └── meta.json │ │ │ ├── mcp-server │ │ │ │ ├── client-integrations.mdx │ │ │ │ ├── configuration.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── llm-integrations.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── tools.mdx │ │ │ │ └── usage.mdx │ │ │ └── som │ │ │ ├── configuration.mdx │ │ │ └── index.mdx │ │ ├── meta.json │ │ ├── quickstart-cli.mdx │ │ ├── quickstart-devs.mdx │ │ └── telemetry.mdx │ ├── next.config.mjs │ ├── package-lock.json │ ├── package.json │ ├── pnpm-lock.yaml │ ├── postcss.config.mjs │ ├── public │ │ └── img │ │ ├── agent_gradio_ui.png │ │ ├── agent.png │ │ ├── cli.png │ │ ├── computer.png │ │ ├── som_box_threshold.png │ │ └── som_iou_threshold.png │ ├── README.md │ ├── source.config.ts │ ├── src │ │ ├── app │ │ │ ├── (home) │ │ │ │ ├── [[...slug]] │ │ │ │ │ └── page.tsx │ │ │ │ └── layout.tsx │ │ │ ├── api │ │ │ │ └── search │ │ │ │ └── route.ts │ │ │ ├── favicon.ico │ │ │ ├── global.css │ │ │ ├── layout.config.tsx │ │ │ ├── layout.tsx │ │ │ ├── llms.mdx │ │ │ │ └── [[...slug]] │ │ │ │ └── route.ts │ │ │ └── llms.txt │ │ │ └── route.ts │ │ ├── assets │ │ │ ├── discord-black.svg │ │ │ ├── discord-white.svg │ │ │ ├── logo-black.svg │ │ │ └── logo-white.svg │ │ ├── components │ │ │ ├── iou.tsx │ │ │ └── mermaid.tsx │ │ ├── lib │ │ │ ├── llms.ts │ │ │ └── source.ts │ │ └── mdx-components.tsx │ └── tsconfig.json ├── examples │ ├── agent_examples.py │ ├── agent_ui_examples.py │ ├── cloud_api_examples.py │ ├── computer_examples_windows.py │ ├── computer_examples.py │ ├── computer_ui_examples.py │ ├── computer-example-ts │ │ ├── .env.example │ │ ├── .gitignore │ │ ├── .prettierrc │ │ ├── package-lock.json │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── README.md │ │ ├── src │ │ │ ├── helpers.ts │ │ │ └── index.ts │ │ └── tsconfig.json │ ├── docker_examples.py │ ├── evals │ │ ├── hud_eval_examples.py │ │ └── wikipedia_most_linked.txt │ ├── pylume_examples.py │ ├── sandboxed_functions_examples.py │ ├── som_examples.py │ ├── utils.py │ └── winsandbox_example.py ├── img │ ├── agent_gradio_ui.png │ ├── agent.png │ ├── cli.png │ ├── computer.png │ ├── logo_black.png │ └── logo_white.png ├── libs │ ├── kasm │ │ ├── Dockerfile │ │ ├── LICENSE │ │ ├── README.md │ │ └── src │ │ └── ubuntu │ │ └── install │ │ └── firefox │ │ ├── custom_startup.sh │ │ ├── firefox.desktop │ │ └── install_firefox.sh │ ├── lume │ │ ├── .cursorignore │ │ ├── CONTRIBUTING.md │ │ ├── Development.md │ │ ├── img │ │ │ └── cli.png │ │ ├── Package.resolved │ │ ├── Package.swift │ │ ├── README.md │ │ ├── resources │ │ │ └── lume.entitlements │ │ ├── scripts │ │ │ ├── build │ │ │ │ ├── build-debug.sh │ │ │ │ ├── build-release-notarized.sh │ │ │ │ └── build-release.sh │ │ │ └── install.sh │ │ ├── src │ │ │ ├── Commands │ │ │ │ ├── Clone.swift │ │ │ │ ├── Config.swift │ │ │ │ ├── Create.swift │ │ │ │ ├── Delete.swift │ │ │ │ ├── Get.swift │ │ │ │ ├── Images.swift │ │ │ │ ├── IPSW.swift │ │ │ │ ├── List.swift │ │ │ │ ├── Logs.swift │ │ │ │ ├── Options │ │ │ │ │ └── FormatOption.swift │ │ │ │ ├── Prune.swift │ │ │ │ ├── Pull.swift │ │ │ │ ├── Push.swift │ │ │ │ ├── Run.swift │ │ │ │ ├── Serve.swift │ │ │ │ ├── Set.swift │ │ │ │ └── Stop.swift │ │ │ ├── ContainerRegistry │ │ │ │ ├── ImageContainerRegistry.swift │ │ │ │ ├── ImageList.swift │ │ │ │ └── ImagesPrinter.swift │ │ │ ├── Errors │ │ │ │ └── Errors.swift │ │ │ ├── FileSystem │ │ │ │ ├── Home.swift │ │ │ │ ├── Settings.swift │ │ │ │ ├── VMConfig.swift │ │ │ │ ├── VMDirectory.swift │ │ │ │ └── VMLocation.swift │ │ │ ├── LumeController.swift │ │ │ ├── Main.swift │ │ │ ├── Server │ │ │ │ ├── Handlers.swift │ │ │ │ ├── HTTP.swift │ │ │ │ ├── Requests.swift │ │ │ │ ├── Responses.swift │ │ │ │ └── Server.swift │ │ │ ├── Utils │ │ │ │ ├── CommandRegistry.swift │ │ │ │ ├── CommandUtils.swift │ │ │ │ ├── Logger.swift │ │ │ │ ├── NetworkUtils.swift │ │ │ │ ├── Path.swift │ │ │ │ ├── ProcessRunner.swift │ │ │ │ ├── ProgressLogger.swift │ │ │ │ ├── String.swift │ │ │ │ └── Utils.swift │ │ │ ├── Virtualization │ │ │ │ ├── DarwinImageLoader.swift │ │ │ │ ├── DHCPLeaseParser.swift │ │ │ │ ├── ImageLoaderFactory.swift │ │ │ │ └── VMVirtualizationService.swift │ │ │ ├── VM │ │ │ │ ├── DarwinVM.swift │ │ │ │ ├── LinuxVM.swift │ │ │ │ ├── VM.swift │ │ │ │ ├── VMDetails.swift │ │ │ │ ├── VMDetailsPrinter.swift │ │ │ │ ├── VMDisplayResolution.swift │ │ │ │ └── VMFactory.swift │ │ │ └── VNC │ │ │ ├── PassphraseGenerator.swift │ │ │ └── VNCService.swift │ │ └── tests │ │ ├── Mocks │ │ │ ├── MockVM.swift │ │ │ ├── MockVMVirtualizationService.swift │ │ │ └── MockVNCService.swift │ │ ├── VM │ │ │ └── VMDetailsPrinterTests.swift │ │ ├── VMTests.swift │ │ ├── VMVirtualizationServiceTests.swift │ │ └── VNCServiceTests.swift │ ├── lumier │ │ ├── .dockerignore │ │ ├── Dockerfile │ │ ├── README.md │ │ └── src │ │ ├── bin │ │ │ └── entry.sh │ │ ├── config │ │ │ └── constants.sh │ │ ├── hooks │ │ │ └── on-logon.sh │ │ └── lib │ │ ├── utils.sh │ │ └── vm.sh │ ├── python │ │ ├── agent │ │ │ ├── .bumpversion.cfg │ │ │ ├── agent │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── adapters │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── huggingfacelocal_adapter.py │ │ │ │ │ ├── human_adapter.py │ │ │ │ │ ├── mlxvlm_adapter.py │ │ │ │ │ └── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── qwen2_5_vl.py │ │ │ │ ├── agent.py │ │ │ │ ├── callbacks │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── budget_manager.py │ │ │ │ │ ├── image_retention.py │ │ │ │ │ ├── logging.py │ │ │ │ │ ├── operator_validator.py │ │ │ │ │ ├── pii_anonymization.py │ │ │ │ │ ├── prompt_instructions.py │ │ │ │ │ ├── telemetry.py │ │ │ │ │ └── trajectory_saver.py │ │ │ │ ├── cli.py │ │ │ │ ├── computers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cua.py │ │ │ │ │ └── custom.py │ │ │ │ ├── decorators.py │ │ │ │ ├── human_tool │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ ├── server.py │ │ │ │ │ └── ui.py │ │ │ │ ├── integrations │ │ │ │ │ └── hud │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── agent.py │ │ │ │ │ └── proxy.py │ │ │ │ ├── loops │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── anthropic.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── composed_grounded.py │ │ │ │ │ ├── gemini.py │ │ │ │ │ ├── glm45v.py │ │ │ │ │ ├── gta1.py │ │ │ │ │ ├── holo.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── model_types.csv │ │ │ │ │ ├── moondream3.py │ │ │ │ │ ├── omniparser.py │ │ │ │ │ ├── openai.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── uitars.py │ │ │ │ ├── proxy │ │ │ │ │ ├── examples.py │ │ │ │ │ └── handlers.py │ │ │ │ ├── responses.py │ │ │ │ ├── types.py │ │ │ │ └── ui │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ └── gradio │ │ │ │ ├── __init__.py │ │ │ │ ├── app.py │ │ │ │ └── ui_components.py │ │ │ ├── benchmarks │ │ │ │ ├── .gitignore │ │ │ │ ├── contrib.md │ │ │ │ ├── interactive.py │ │ │ │ ├── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ └── gta1.py │ │ │ │ ├── README.md │ │ │ │ ├── ss-pro.py │ │ │ │ ├── ss-v2.py │ │ │ │ └── utils.py │ │ │ ├── example.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer │ │ │ │ ├── __init__.py │ │ │ │ ├── computer.py │ │ │ │ ├── diorama_computer.py │ │ │ │ ├── helpers.py │ │ │ │ ├── interface │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ ├── models.py │ │ │ │ │ └── windows.py │ │ │ │ ├── logger.py │ │ │ │ ├── models.py │ │ │ │ ├── providers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cloud │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── docker │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── lume │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── lume_api.py │ │ │ │ │ ├── lumier │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── types.py │ │ │ │ │ └── winsandbox │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── provider.py │ │ │ │ │ └── setup_script.ps1 │ │ │ │ ├── ui │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ └── gradio │ │ │ │ │ ├── __init__.py │ │ │ │ │ └── app.py │ │ │ │ └── utils.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── cli.py │ │ │ │ ├── diorama │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── diorama_computer.py │ │ │ │ │ ├── diorama.py │ │ │ │ │ ├── draw.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── safezone.py │ │ │ │ ├── handlers │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── windows.py │ │ │ │ ├── main.py │ │ │ │ ├── server.py │ │ │ │ └── watchdog.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── usage_example.py │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ ├── run_server.py │ │ │ └── test_connection.py │ │ ├── core │ │ │ ├── .bumpversion.cfg │ │ │ ├── core │ │ │ │ ├── __init__.py │ │ │ │ └── telemetry │ │ │ │ ├── __init__.py │ │ │ │ └── posthog.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── mcp-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── CONCURRENT_SESSIONS.md │ │ │ ├── mcp_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── server.py │ │ │ │ └── session_manager.py │ │ │ ├── pdm.lock │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ └── scripts │ │ │ ├── install_mcp_server.sh │ │ │ └── start_mcp_server.sh │ │ ├── pylume │ │ │ ├── __init__.py │ │ │ ├── .bumpversion.cfg │ │ │ ├── pylume │ │ │ │ ├── __init__.py │ │ │ │ ├── client.py │ │ │ │ ├── exceptions.py │ │ │ │ ├── lume │ │ │ │ ├── models.py │ │ │ │ ├── pylume.py │ │ │ │ └── server.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ └── som │ │ ├── .bumpversion.cfg │ │ ├── LICENSE │ │ ├── poetry.toml │ │ ├── pyproject.toml │ │ ├── README.md │ │ ├── som │ │ │ ├── __init__.py │ │ │ ├── detect.py │ │ │ ├── detection.py │ │ │ ├── models.py │ │ │ ├── ocr.py │ │ │ ├── util │ │ │ │ └── utils.py │ │ │ └── visualization.py │ │ └── tests │ │ └── test_omniparser.py │ ├── typescript │ │ ├── .gitignore │ │ ├── .nvmrc │ │ ├── agent │ │ │ ├── examples │ │ │ │ ├── playground-example.html │ │ │ │ └── README.md │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── client.ts │ │ │ │ ├── index.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ └── client.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── biome.json │ │ ├── computer │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── computer │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── providers │ │ │ │ │ │ ├── base.ts │ │ │ │ │ │ ├── cloud.ts │ │ │ │ │ │ └── index.ts │ │ │ │ │ └── types.ts │ │ │ │ ├── index.ts │ │ │ │ ├── interface │ │ │ │ │ ├── base.ts │ │ │ │ │ ├── factory.ts │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── linux.ts │ │ │ │ │ ├── macos.ts │ │ │ │ │ └── windows.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ ├── computer │ │ │ │ │ └── cloud.test.ts │ │ │ │ ├── interface │ │ │ │ │ ├── factory.test.ts │ │ │ │ │ ├── index.test.ts │ │ │ │ │ ├── linux.test.ts │ │ │ │ │ ├── macos.test.ts │ │ │ │ │ └── windows.test.ts │ │ │ │ └── setup.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── core │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── index.ts │ │ │ │ └── telemetry │ │ │ │ ├── clients │ │ │ │ │ ├── index.ts │ │ │ │ │ └── posthog.ts │ │ │ │ └── index.ts │ │ │ ├── tests │ │ │ │ └── telemetry.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── pnpm-workspace.yaml │ │ └── README.md │ └── xfce │ ├── .dockerignore │ ├── .gitignore │ ├── Dockerfile │ ├── README.md │ └── src │ ├── scripts │ │ ├── resize-display.sh │ │ ├── start-computer-server.sh │ │ ├── start-novnc.sh │ │ ├── start-vnc.sh │ │ └── xstartup.sh │ ├── supervisor │ │ └── supervisord.conf │ └── xfce-config │ ├── helpers.rc │ ├── xfce4-power-manager.xml │ └── xfce4-session.xml ├── LICENSE.md ├── Makefile ├── notebooks │ ├── agent_nb.ipynb │ ├── blog │ │ ├── build-your-own-operator-on-macos-1.ipynb │ │ └── build-your-own-operator-on-macos-2.ipynb │ ├── composite_agents_docker_nb.ipynb │ ├── computer_nb.ipynb │ ├── computer_server_nb.ipynb │ ├── customizing_computeragent.ipynb │ ├── eval_osworld.ipynb │ ├── ollama_nb.ipynb │ ├── pylume_nb.ipynb │ ├── README.md │ ├── sota_hackathon_cloud.ipynb │ └── sota_hackathon.ipynb ├── pdm.lock ├── pyproject.toml ├── pyrightconfig.json ├── README.md ├── samples │ └── community │ ├── global-online │ │ └── README.md │ └── hack-the-north │ └── README.md ├── scripts │ ├── build-uv.sh │ ├── build.ps1 │ ├── build.sh │ ├── cleanup.sh │ ├── playground-docker.sh │ ├── playground.sh │ └── run-docker-dev.sh └── tests ├── pytest.ini ├── shell_cmd.py ├── test_files.py ├── test_mcp_server_session_management.py ├── test_mcp_server_streaming.py ├── test_shell_bash.py ├── test_telemetry.py ├── test_venv.py └── test_watchdog.py ``` # Files -------------------------------------------------------------------------------- /libs/python/agent/agent/responses.py: -------------------------------------------------------------------------------- ```python """ Functions for making various Responses API items from different types of responses. Based on the OpenAI spec for Responses API items. """ import base64 import json import uuid from typing import List, Dict, Any, Literal, Union, Optional from openai.types.responses.response_computer_tool_call_param import ( ResponseComputerToolCallParam, ActionClick, ActionDoubleClick, ActionDrag, ActionDragPath, ActionKeypress, ActionMove, ActionScreenshot, ActionScroll, ActionType as ActionTypeAction, ActionWait, PendingSafetyCheck ) from openai.types.responses.response_function_tool_call_param import ResponseFunctionToolCallParam from openai.types.responses.response_output_text_param import ResponseOutputTextParam from openai.types.responses.response_reasoning_item_param import ResponseReasoningItemParam, Summary from openai.types.responses.response_output_message_param import ResponseOutputMessageParam from openai.types.responses.easy_input_message_param import EasyInputMessageParam from openai.types.responses.response_input_image_param import ResponseInputImageParam def random_id(): return str(uuid.uuid4()) # User message items def make_input_image_item(image_data: Union[str, bytes]) -> EasyInputMessageParam: return EasyInputMessageParam( content=[ ResponseInputImageParam( type="input_image", image_url=f"data:image/png;base64,{base64.b64encode(image_data).decode('utf-8') if isinstance(image_data, bytes) else image_data}" ) # type: ignore ], role="user", type="message" ) # Text items def make_reasoning_item(reasoning: str) -> ResponseReasoningItemParam: return ResponseReasoningItemParam( id=random_id(), summary=[ Summary(text=reasoning, type="summary_text") ], type="reasoning" ) def make_output_text_item(content: str) -> ResponseOutputMessageParam: return ResponseOutputMessageParam( id=random_id(), content=[ ResponseOutputTextParam( text=content, type="output_text", annotations=[] ) ], role="assistant", status="completed", type="message" ) # Function call items def make_function_call_item(function_name: str, arguments: Dict[str, Any], call_id: Optional[str] = None) -> ResponseFunctionToolCallParam: return ResponseFunctionToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), name=function_name, arguments=json.dumps(arguments), status="completed", type="function_call" ) # Computer tool call items def make_click_item(x: int, y: int, button: Literal["left", "right", "wheel", "back", "forward"] = "left", call_id: Optional[str] = None) -> ResponseComputerToolCallParam: return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), action=ActionClick( button=button, type="click", x=x, y=y ), pending_safety_checks=[], status="completed", type="computer_call" ) def make_double_click_item(x: int, y: int, call_id: Optional[str] = None) -> ResponseComputerToolCallParam: return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), action=ActionDoubleClick( type="double_click", x=x, y=y ), pending_safety_checks=[], status="completed", type="computer_call" ) def make_drag_item(path: List[Dict[str, int]], call_id: Optional[str] = None) -> ResponseComputerToolCallParam: drag_path = [ActionDragPath(x=point["x"], y=point["y"]) for point in path] return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), action=ActionDrag( path=drag_path, type="drag" ), pending_safety_checks=[], status="completed", type="computer_call" ) def make_keypress_item(keys: List[str], call_id: Optional[str] = None) -> ResponseComputerToolCallParam: return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), action=ActionKeypress( keys=keys, type="keypress" ), pending_safety_checks=[], status="completed", type="computer_call" ) def make_move_item(x: int, y: int, call_id: Optional[str] = None) -> ResponseComputerToolCallParam: return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), action=ActionMove( type="move", x=x, y=y ), pending_safety_checks=[], status="completed", type="computer_call" ) def make_screenshot_item(call_id: Optional[str] = None) -> ResponseComputerToolCallParam: return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), action=ActionScreenshot( type="screenshot" ), pending_safety_checks=[], status="completed", type="computer_call" ) def make_scroll_item(x: int, y: int, scroll_x: int, scroll_y: int, call_id: Optional[str] = None) -> ResponseComputerToolCallParam: return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), action=ActionScroll( scroll_x=scroll_x, scroll_y=scroll_y, type="scroll", x=x, y=y ), pending_safety_checks=[], status="completed", type="computer_call" ) def make_type_item(text: str, call_id: Optional[str] = None) -> ResponseComputerToolCallParam: return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), action=ActionTypeAction( text=text, type="type" ), pending_safety_checks=[], status="completed", type="computer_call" ) def make_wait_item(call_id: Optional[str] = None) -> ResponseComputerToolCallParam: return ResponseComputerToolCallParam( id=random_id(), call_id=call_id if call_id else random_id(), action=ActionWait( type="wait" ), pending_safety_checks=[], status="completed", type="computer_call" ) # Extra anthropic computer calls def make_left_mouse_down_item(x: Optional[int] = None, y: Optional[int] = None, call_id: Optional[str] = None) -> Dict[str, Any]: return { "id": random_id(), "call_id": call_id if call_id else random_id(), "action": { "type": "left_mouse_down", "x": x, "y": y }, "pending_safety_checks": [], "status": "completed", "type": "computer_call" } def make_left_mouse_up_item(x: Optional[int] = None, y: Optional[int] = None, call_id: Optional[str] = None) -> Dict[str, Any]: return { "id": random_id(), "call_id": call_id if call_id else random_id(), "action": { "type": "left_mouse_up", "x": x, "y": y }, "pending_safety_checks": [], "status": "completed", "type": "computer_call" } def make_failed_tool_call_items(tool_name: str, tool_kwargs: Dict[str, Any], error_message: str, call_id: Optional[str] = None) -> List[Dict[str, Any]]: call_id = call_id if call_id else random_id() return [ { "type": "function_call", "id": random_id(), "call_id": call_id, "name": tool_name, "arguments": json.dumps(tool_kwargs), }, { "type": "function_call_output", "call_id": call_id, "output": json.dumps({"error": error_message}), } ] def make_tool_error_item(error_message: str, call_id: Optional[str] = None) -> Dict[str, Any]: call_id = call_id if call_id else random_id() return { "type": "function_call_output", "call_id": call_id, "output": json.dumps({"error": error_message}), } def replace_failed_computer_calls_with_function_calls(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Replace computer_call items with function_call items if they share a call_id with a function_call_output. This indicates the computer call failed and should be treated as a function call instead. We do this because the computer_call_output items do not support text output. Args: messages: List of message items to process """ messages = messages.copy() # Find all call_ids that have function_call_output items failed_call_ids = set() for msg in messages: if msg.get("type") == "function_call_output": call_id = msg.get("call_id") if call_id: failed_call_ids.add(call_id) # Replace computer_call items that have matching call_ids for i, msg in enumerate(messages): if (msg.get("type") == "computer_call" and msg.get("call_id") in failed_call_ids): # Extract action from computer_call action = msg.get("action", {}) call_id = msg.get("call_id") # Create function_call replacement messages[i] = { "type": "function_call", "id": msg.get("id", random_id()), "call_id": call_id, "name": "computer", "arguments": json.dumps(action), } return messages # Conversion functions between element descriptions and coordinates def convert_computer_calls_desc2xy(responses_items: List[Dict[str, Any]], desc2xy: Dict[str, tuple]) -> List[Dict[str, Any]]: """ Convert computer calls from element descriptions to x,y coordinates. Args: responses_items: List of response items containing computer calls with element_description desc2xy: Dictionary mapping element descriptions to (x, y) coordinate tuples Returns: List of response items with element_description replaced by x,y coordinates """ converted_items = [] for item in responses_items: if item.get("type") == "computer_call" and "action" in item: action = item["action"].copy() # Handle single element_description if "element_description" in action: desc = action["element_description"] if desc in desc2xy: x, y = desc2xy[desc] action["x"] = x action["y"] = y del action["element_description"] # Handle start_element_description and end_element_description for drag operations elif "start_element_description" in action and "end_element_description" in action: start_desc = action["start_element_description"] end_desc = action["end_element_description"] if start_desc in desc2xy and end_desc in desc2xy: start_x, start_y = desc2xy[start_desc] end_x, end_y = desc2xy[end_desc] action["path"] = [{"x": start_x, "y": start_y}, {"x": end_x, "y": end_y}] del action["start_element_description"] del action["end_element_description"] converted_item = item.copy() converted_item["action"] = action converted_items.append(converted_item) else: converted_items.append(item) return converted_items def convert_computer_calls_xy2desc(responses_items: List[Dict[str, Any]], desc2xy: Dict[str, tuple]) -> List[Dict[str, Any]]: """ Convert computer calls from x,y coordinates to element descriptions. Args: responses_items: List of response items containing computer calls with x,y coordinates desc2xy: Dictionary mapping element descriptions to (x, y) coordinate tuples Returns: List of response items with x,y coordinates replaced by element_description """ # Create reverse mapping from coordinates to descriptions xy2desc = {coords: desc for desc, coords in desc2xy.items()} converted_items = [] for item in responses_items: if item.get("type") == "computer_call" and "action" in item: action = item["action"].copy() # Handle single x,y coordinates if "x" in action and "y" in action: coords = (action["x"], action["y"]) if coords in xy2desc: action["element_description"] = xy2desc[coords] del action["x"] del action["y"] # Handle path for drag operations elif "path" in action and isinstance(action["path"], list) and len(action["path"]) == 2: start_point = action["path"][0] end_point = action["path"][1] if ("x" in start_point and "y" in start_point and "x" in end_point and "y" in end_point): start_coords = (start_point["x"], start_point["y"]) end_coords = (end_point["x"], end_point["y"]) if start_coords in xy2desc and end_coords in xy2desc: action["start_element_description"] = xy2desc[start_coords] action["end_element_description"] = xy2desc[end_coords] del action["path"] converted_item = item.copy() converted_item["action"] = action converted_items.append(converted_item) else: converted_items.append(item) return converted_items def get_all_element_descriptions(responses_items: List[Dict[str, Any]]) -> List[str]: """ Extract all element descriptions from computer calls in responses items. Args: responses_items: List of response items containing computer calls Returns: List of unique element descriptions found in computer calls """ descriptions = set() for item in responses_items: if item.get("type") == "computer_call" and "action" in item: action = item["action"] # Handle single element_description if "element_description" in action: descriptions.add(action["element_description"]) # Handle start_element_description and end_element_description for drag operations if "start_element_description" in action: descriptions.add(action["start_element_description"]) if "end_element_description" in action: descriptions.add(action["end_element_description"]) return list(descriptions) # Conversion functions between responses_items and completion messages formats def convert_responses_items_to_completion_messages(messages: List[Dict[str, Any]], allow_images_in_tool_results: bool = True) -> List[Dict[str, Any]]: """Convert responses_items message format to liteLLM completion format. Args: messages: List of responses_items format messages allow_images_in_tool_results: If True, include images in tool role messages. If False, send tool message + separate user message with image. """ completion_messages = [] for message in messages: msg_type = message.get("type") role = message.get("role") # Handle user messages (both with and without explicit type) if role == "user" or msg_type == "user": content = message.get("content", "") if isinstance(content, list): # Handle list content (images, text blocks) completion_content = [] for item in content: if item.get("type") == "input_image": completion_content.append({ "type": "image_url", "image_url": { "url": item.get("image_url") } }) elif item.get("type") == "input_text": completion_content.append({ "type": "text", "text": item.get("text") }) elif item.get("type") == "text": completion_content.append({ "type": "text", "text": item.get("text") }) completion_messages.append({ "role": "user", "content": completion_content }) elif isinstance(content, str): # Handle string content completion_messages.append({ "role": "user", "content": content }) # Handle assistant messages elif role == "assistant" or msg_type == "message": content = message.get("content", []) if isinstance(content, list): text_parts = [] for item in content: if item.get("type") == "output_text": text_parts.append(item.get("text", "")) elif item.get("type") == "text": text_parts.append(item.get("text", "")) if text_parts: completion_messages.append({ "role": "assistant", "content": "\n".join(text_parts) }) # Handle reasoning items (convert to assistant message) elif msg_type == "reasoning": summary = message.get("summary", []) text_parts = [] for item in summary: if item.get("type") == "summary_text": text_parts.append(item.get("text", "")) if text_parts: completion_messages.append({ "role": "assistant", "content": "\n".join(text_parts) }) # Handle function calls elif msg_type == "function_call": # Add tool call to last assistant message or create new one if not completion_messages or completion_messages[-1]["role"] != "assistant": completion_messages.append({ "role": "assistant", "content": "", "tool_calls": [] }) if "tool_calls" not in completion_messages[-1]: completion_messages[-1]["tool_calls"] = [] completion_messages[-1]["tool_calls"].append({ "id": message.get("call_id"), "type": "function", "function": { "name": message.get("name"), "arguments": message.get("arguments") } }) # Handle computer calls elif msg_type == "computer_call": # Add tool call to last assistant message or create new one if not completion_messages or completion_messages[-1]["role"] != "assistant": completion_messages.append({ "role": "assistant", "content": "", "tool_calls": [] }) if "tool_calls" not in completion_messages[-1]: completion_messages[-1]["tool_calls"] = [] action = message.get("action", {}) completion_messages[-1]["tool_calls"].append({ "id": message.get("call_id"), "type": "function", "function": { "name": "computer", "arguments": json.dumps(action) } }) # Handle function/computer call outputs elif msg_type in ["function_call_output", "computer_call_output"]: output = message.get("output") call_id = message.get("call_id") if isinstance(output, dict) and output.get("type") == "input_image": if allow_images_in_tool_results: # Handle image output as tool response (may not work with all APIs) completion_messages.append({ "role": "tool", "tool_call_id": call_id, "content": [{ "type": "image_url", "image_url": { "url": output.get("image_url") } }] }) else: # Send tool message + separate user message with image (OpenAI compatible) completion_messages += [{ "role": "tool", "tool_call_id": call_id, "content": "[Execution completed. See screenshot below]" }, { "role": "user", "content": [{ "type": "image_url", "image_url": { "url": output.get("image_url") } }] }] else: # Handle text output as tool response completion_messages.append({ "role": "tool", "tool_call_id": call_id, "content": str(output) }) return completion_messages def convert_completion_messages_to_responses_items(completion_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Convert completion messages format to responses_items message format.""" responses_items = [] skip_next = False for i, message in enumerate(completion_messages): if skip_next: skip_next = False continue role = message.get("role") content = message.get("content") tool_calls = message.get("tool_calls", []) # Handle assistant messages with text content if role == "assistant" and content and isinstance(content, str): responses_items.append({ "type": "message", "role": "assistant", "content": [{ "type": "output_text", "text": content }] }) # Handle tool calls if tool_calls: for tool_call in tool_calls: if tool_call.get("type") == "function": function = tool_call.get("function", {}) function_name = function.get("name") if function_name == "computer": # Parse computer action try: action = json.loads(function.get("arguments", "{}")) # Change key from "action" -> "type" if action.get("action"): action["type"] = action["action"] del action["action"] responses_items.append({ "type": "computer_call", "call_id": tool_call.get("id"), "action": action, "status": "completed" }) except json.JSONDecodeError: # Fallback to function call format responses_items.append({ "type": "function_call", "call_id": tool_call.get("id"), "name": function_name, "arguments": function.get("arguments", "{}"), "status": "completed" }) else: # Regular function call responses_items.append({ "type": "function_call", "call_id": tool_call.get("id"), "name": function_name, "arguments": function.get("arguments", "{}"), "status": "completed" }) # Handle tool messages (function/computer call outputs) elif role == "tool" and content: tool_call_id = message.get("tool_call_id") if isinstance(content, str): # Check if this is the "[Execution completed. See screenshot below]" pattern if content == "[Execution completed. See screenshot below]": # Look ahead for the next user message with image next_idx = i + 1 if (next_idx < len(completion_messages) and completion_messages[next_idx].get("role") == "user" and isinstance(completion_messages[next_idx].get("content"), list)): # Found the pattern - extract image from next message next_content = completion_messages[next_idx]["content"] for item in next_content: if item.get("type") == "image_url": responses_items.append({ "type": "computer_call_output", "call_id": tool_call_id, "output": { "type": "input_image", "image_url": item.get("image_url", {}).get("url") } }) # Skip the next user message since we processed it skip_next = True break else: # No matching user message, treat as regular text responses_items.append({ "type": "computer_call_output", "call_id": tool_call_id, "output": content }) else: # Determine if this is a computer call or function call output try: # Try to parse as structured output parsed_content = json.loads(content) if parsed_content.get("type") == "input_image": responses_items.append({ "type": "computer_call_output", "call_id": tool_call_id, "output": parsed_content }) else: responses_items.append({ "type": "computer_call_output", "call_id": tool_call_id, "output": content }) except json.JSONDecodeError: # Plain text output - could be function or computer call responses_items.append({ "type": "function_call_output", "call_id": tool_call_id, "output": content }) elif isinstance(content, list): # Handle structured content (e.g., images) for item in content: if item.get("type") == "image_url": responses_items.append({ "type": "computer_call_output", "call_id": tool_call_id, "output": { "type": "input_image", "image_url": item.get("image_url", {}).get("url") } }) elif item.get("type") == "text": responses_items.append({ "type": "function_call_output", "call_id": tool_call_id, "output": item.get("text") }) # Handle actual user messages elif role == "user" and content: if isinstance(content, list): # Handle structured user content (e.g., text + images) user_content = [] for item in content: if item.get("type") == "image_url": user_content.append({ "type": "input_image", "image_url": item.get("image_url", {}).get("url") }) elif item.get("type") == "text": user_content.append({ "type": "input_text", "text": item.get("text") }) if user_content: responses_items.append({ "role": "user", "type": "message", "content": user_content }) elif isinstance(content, str): # Handle simple text user message responses_items.append({ "role": "user", "content": content }) return responses_items ``` -------------------------------------------------------------------------------- /libs/lume/src/VM/VM.swift: -------------------------------------------------------------------------------- ```swift import Foundation // MARK: - Support Types /// Base context for virtual machine directory and configuration struct VMDirContext { let dir: VMDirectory var config: VMConfig let home: Home let storage: String? func saveConfig() throws { try dir.saveConfig(config) } var name: String { dir.name } var initialized: Bool { dir.initialized() } var diskPath: Path { dir.diskPath } var nvramPath: Path { dir.nvramPath } func setDisk(_ size: UInt64) throws { try dir.setDisk(size) } func finalize(to name: String) throws { let vmDir = try home.getVMDirectory(name) try FileManager.default.moveItem(at: dir.dir.url, to: vmDir.dir.url) } } // MARK: - Base VM Class /// Base class for virtual machine implementations @MainActor class VM { // MARK: - Properties var vmDirContext: VMDirContext @MainActor private var virtualizationService: VMVirtualizationService? private let vncService: VNCService internal let virtualizationServiceFactory: (VMVirtualizationServiceContext) throws -> VMVirtualizationService private let vncServiceFactory: (VMDirectory) -> VNCService // MARK: - Initialization init( vmDirContext: VMDirContext, virtualizationServiceFactory: @escaping (VMVirtualizationServiceContext) throws -> VMVirtualizationService = { try DarwinVirtualizationService(configuration: $0) }, vncServiceFactory: @escaping (VMDirectory) -> VNCService = { DefaultVNCService(vmDirectory: $0) } ) { self.vmDirContext = vmDirContext self.virtualizationServiceFactory = virtualizationServiceFactory self.vncServiceFactory = vncServiceFactory // Initialize VNC service self.vncService = vncServiceFactory(vmDirContext.dir) } // MARK: - VM State Management private var isRunning: Bool { // First check if we have a MAC address guard let macAddress = vmDirContext.config.macAddress else { Logger.info( "Cannot check if VM is running: macAddress is nil", metadata: ["name": vmDirContext.name]) return false } // Then check if we have an IP address guard let ipAddress = DHCPLeaseParser.getIPAddress(forMAC: macAddress) else { return false } // Then check if it's reachable return NetworkUtils.isReachable(ipAddress: ipAddress) } var details: VMDetails { let isRunning: Bool = self.isRunning let vncUrl = isRunning ? getVNCUrl() : nil // Safely get disk size with fallback let diskSizeValue: DiskSize do { diskSizeValue = try getDiskSize() } catch { Logger.error( "Failed to get disk size", metadata: ["name": vmDirContext.name, "error": "\(error)"]) // Provide a fallback value to avoid crashing diskSizeValue = DiskSize(allocated: 0, total: vmDirContext.config.diskSize ?? 0) } // Safely access MAC address let macAddress = vmDirContext.config.macAddress let ipAddress: String? = isRunning && macAddress != nil ? DHCPLeaseParser.getIPAddress(forMAC: macAddress!) : nil return VMDetails( name: vmDirContext.name, os: getOSType(), cpuCount: vmDirContext.config.cpuCount ?? 0, memorySize: vmDirContext.config.memorySize ?? 0, diskSize: diskSizeValue, display: vmDirContext.config.display.string, status: isRunning ? "running" : "stopped", vncUrl: vncUrl, ipAddress: ipAddress, locationName: vmDirContext.storage ?? "default" ) } // MARK: - VM Lifecycle Management func run( noDisplay: Bool, sharedDirectories: [SharedDirectory], mount: Path?, vncPort: Int = 0, recoveryMode: Bool = false, usbMassStoragePaths: [Path]? = nil ) async throws { Logger.info( "VM.run method called", metadata: [ "name": vmDirContext.name, "noDisplay": "\(noDisplay)", "recoveryMode": "\(recoveryMode)", ]) guard vmDirContext.initialized else { Logger.error("VM not initialized", metadata: ["name": vmDirContext.name]) throw VMError.notInitialized(vmDirContext.name) } guard let cpuCount = vmDirContext.config.cpuCount, let memorySize = vmDirContext.config.memorySize else { Logger.error("VM missing cpuCount or memorySize", metadata: ["name": vmDirContext.name]) throw VMError.notInitialized(vmDirContext.name) } // Try to acquire lock on config file Logger.info( "Attempting to acquire lock on config file", metadata: [ "path": vmDirContext.dir.configPath.path, "name": vmDirContext.name, ]) var fileHandle = try FileHandle(forWritingTo: vmDirContext.dir.configPath.url) if flock(fileHandle.fileDescriptor, LOCK_EX | LOCK_NB) != 0 { try? fileHandle.close() Logger.error( "VM already running (failed to acquire lock)", metadata: ["name": vmDirContext.name] ) // Try to forcibly clear the lock before giving up Logger.info("Attempting emergency lock cleanup", metadata: ["name": vmDirContext.name]) unlockConfigFile() // Try one more time to acquire the lock if let retryHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url), flock(retryHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0 { Logger.info("Emergency lock cleanup worked", metadata: ["name": vmDirContext.name]) // Continue with a fresh file handle try? retryHandle.close() // Get a completely new file handle to be safe guard let newHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) else { throw VMError.internalError("Failed to open file handle after lock cleanup") } // Update our main file handle fileHandle = newHandle } else { // If we still can't get the lock, give up Logger.error( "Could not acquire lock even after emergency cleanup", metadata: ["name": vmDirContext.name]) throw VMError.alreadyRunning(vmDirContext.name) } } Logger.info("Successfully acquired lock", metadata: ["name": vmDirContext.name]) Logger.info( "Running VM with configuration", metadata: [ "name": vmDirContext.name, "cpuCount": "\(cpuCount)", "memorySize": "\(memorySize)", "diskSize": "\(vmDirContext.config.diskSize ?? 0)", "sharedDirectories": sharedDirectories.map { $0.string }.joined(separator: ", "), "recoveryMode": "\(recoveryMode)", ]) // Create and configure the VM do { Logger.info( "Creating virtualization service context", metadata: ["name": vmDirContext.name]) let config = try createVMVirtualizationServiceContext( cpuCount: cpuCount, memorySize: memorySize, display: vmDirContext.config.display.string, sharedDirectories: sharedDirectories, mount: mount, recoveryMode: recoveryMode, usbMassStoragePaths: usbMassStoragePaths ) Logger.info( "Successfully created virtualization service context", metadata: ["name": vmDirContext.name]) Logger.info( "Initializing virtualization service", metadata: ["name": vmDirContext.name]) virtualizationService = try virtualizationServiceFactory(config) Logger.info( "Successfully initialized virtualization service", metadata: ["name": vmDirContext.name]) Logger.info( "Setting up VNC", metadata: [ "name": vmDirContext.name, "noDisplay": "\(noDisplay)", "port": "\(vncPort)", ]) let vncInfo = try await setupSession( noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories) Logger.info( "VNC setup successful", metadata: ["name": vmDirContext.name, "vncInfo": vncInfo]) // Start the VM guard let service = virtualizationService else { Logger.error("Virtualization service is nil", metadata: ["name": vmDirContext.name]) throw VMError.internalError("Virtualization service not initialized") } Logger.info( "Starting VM via virtualization service", metadata: ["name": vmDirContext.name]) try await service.start() Logger.info("VM started successfully", metadata: ["name": vmDirContext.name]) while true { try await Task.sleep(nanoseconds: UInt64(1e9)) } } catch { Logger.error( "Failed in VM.run", metadata: [ "name": vmDirContext.name, "error": error.localizedDescription, "errorType": "\(type(of: error))", ]) virtualizationService = nil vncService.stop() // Release lock Logger.info("Releasing file lock after error", metadata: ["name": vmDirContext.name]) flock(fileHandle.fileDescriptor, LOCK_UN) try? fileHandle.close() // Additionally, perform our aggressive unlock to ensure no locks remain Logger.info( "Performing additional lock cleanup after error", metadata: ["name": vmDirContext.name]) unlockConfigFile() throw error } } @MainActor func stop() async throws { guard vmDirContext.initialized else { throw VMError.notInitialized(vmDirContext.name) } Logger.info("Attempting to stop VM", metadata: ["name": vmDirContext.name]) // If we have a virtualization service, try to stop it cleanly first if let service = virtualizationService { do { Logger.info( "Stopping VM via virtualization service", metadata: ["name": vmDirContext.name]) try await service.stop() virtualizationService = nil vncService.stop() Logger.info( "VM stopped successfully via virtualization service", metadata: ["name": vmDirContext.name]) // Try to ensure any existing locks are released Logger.info( "Attempting to clear any locks on config file", metadata: ["name": vmDirContext.name]) unlockConfigFile() return } catch let error { Logger.error( "Failed to stop VM via virtualization service", metadata: [ "name": vmDirContext.name, "error": error.localizedDescription, ]) // Fall through to process termination } } // Try to open config file to get file descriptor Logger.info( "Attempting to access config file lock", metadata: [ "path": vmDirContext.dir.configPath.path, "name": vmDirContext.name, ]) let fileHandle = try? FileHandle(forReadingFrom: vmDirContext.dir.configPath.url) guard let fileHandle = fileHandle else { Logger.info( "Failed to open config file - VM may not be running", metadata: ["name": vmDirContext.name]) // Even though we couldn't open the file, try to force unlock anyway unlockConfigFile() throw VMError.notRunning(vmDirContext.name) } // Get the PID of the process holding the lock using lsof command Logger.info( "Finding process holding lock on config file", metadata: ["name": vmDirContext.name]) let task = Process() task.executableURL = URL(fileURLWithPath: "/usr/sbin/lsof") task.arguments = ["-F", "p", vmDirContext.dir.configPath.path] let outputPipe = Pipe() task.standardOutput = outputPipe try task.run() task.waitUntilExit() let outputData = try outputPipe.fileHandleForReading.readToEnd() ?? Data() guard let outputString = String(data: outputData, encoding: .utf8), let pidString = outputString.split(separator: "\n").first?.dropFirst(), // Drop the 'p' prefix let pid = pid_t(pidString) else { try? fileHandle.close() Logger.info( "Failed to find process holding lock - VM may not be running", metadata: ["name": vmDirContext.name]) // Even though we couldn't find the process, try to force unlock unlockConfigFile() throw VMError.notRunning(vmDirContext.name) } Logger.info( "Found process \(pid) holding lock on config file", metadata: ["name": vmDirContext.name]) // First try graceful shutdown with SIGINT if kill(pid, SIGINT) == 0 { Logger.info("Sent SIGINT to VM process \(pid)", metadata: ["name": vmDirContext.name]) } // Wait for process to stop with timeout var attempts = 0 while attempts < 10 { Logger.info( "Waiting for process \(pid) to terminate (attempt \(attempts + 1)/10)", metadata: ["name": vmDirContext.name]) try await Task.sleep(nanoseconds: 1_000_000_000) // Check if process still exists if kill(pid, 0) != 0 { // Process is gone, do final cleanup Logger.info("Process \(pid) has terminated", metadata: ["name": vmDirContext.name]) virtualizationService = nil vncService.stop() try? fileHandle.close() // Force unlock the config file unlockConfigFile() Logger.info( "VM stopped successfully via process termination", metadata: ["name": vmDirContext.name]) return } attempts += 1 } // If graceful shutdown failed, force kill the process Logger.info( "Graceful shutdown failed, forcing termination of process \(pid)", metadata: ["name": vmDirContext.name]) if kill(pid, SIGKILL) == 0 { Logger.info("Sent SIGKILL to process \(pid)", metadata: ["name": vmDirContext.name]) // Wait a moment for the process to be fully killed try await Task.sleep(nanoseconds: 2_000_000_000) // Do final cleanup virtualizationService = nil vncService.stop() try? fileHandle.close() // Force unlock the config file unlockConfigFile() Logger.info("VM forcefully stopped", metadata: ["name": vmDirContext.name]) return } // If we get here, something went very wrong try? fileHandle.close() Logger.error( "Failed to stop VM - could not terminate process \(pid)", metadata: ["name": vmDirContext.name]) // As a last resort, try to force unlock unlockConfigFile() throw VMError.internalError("Failed to stop VM process") } // Helper method to forcibly clear any locks on the config file private func unlockConfigFile() { Logger.info( "Forcibly clearing locks on config file", metadata: [ "path": vmDirContext.dir.configPath.path, "name": vmDirContext.name, ]) // First attempt: standard unlock methods if let fileHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) { // Use F_GETLK and F_SETLK to check and clear locks var lockInfo = flock() lockInfo.l_type = Int16(F_UNLCK) lockInfo.l_whence = Int16(SEEK_SET) lockInfo.l_start = 0 lockInfo.l_len = 0 // Try to unlock the file using fcntl _ = fcntl(fileHandle.fileDescriptor, F_SETLK, &lockInfo) // Also try the regular flock method flock(fileHandle.fileDescriptor, LOCK_UN) try? fileHandle.close() Logger.info("Standard unlock attempts performed", metadata: ["name": vmDirContext.name]) } // Second attempt: try to acquire and immediately release a fresh lock if let tempHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) { if flock(tempHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0 { Logger.info( "Successfully acquired and released lock to reset state", metadata: ["name": vmDirContext.name]) flock(tempHandle.fileDescriptor, LOCK_UN) } else { Logger.info( "Could not acquire lock for resetting - may still be locked", metadata: ["name": vmDirContext.name]) } try? tempHandle.close() } // Third attempt (most aggressive): copy the config file, remove the original, and restore Logger.info( "Trying aggressive method: backup and restore config file", metadata: ["name": vmDirContext.name]) // Only proceed if the config file exists let fileManager = FileManager.default let configPath = vmDirContext.dir.configPath.path let backupPath = configPath + ".backup" if fileManager.fileExists(atPath: configPath) { // Create a backup of the config file if let configData = try? Data(contentsOf: URL(fileURLWithPath: configPath)) { // Make backup try? configData.write(to: URL(fileURLWithPath: backupPath)) // Remove the original file to clear all locks try? fileManager.removeItem(atPath: configPath) Logger.info( "Removed original config file to clear locks", metadata: ["name": vmDirContext.name]) // Wait a moment for OS to fully release resources Thread.sleep(forTimeInterval: 0.1) // Restore from backup try? configData.write(to: URL(fileURLWithPath: configPath)) Logger.info( "Restored config file from backup", metadata: ["name": vmDirContext.name]) } else { Logger.error( "Could not read config file content for backup", metadata: ["name": vmDirContext.name]) } } else { Logger.info( "Config file does not exist, cannot perform aggressive unlock", metadata: ["name": vmDirContext.name]) } // Final check if let finalHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) { let lockResult = flock(finalHandle.fileDescriptor, LOCK_EX | LOCK_NB) if lockResult == 0 { Logger.info( "Lock successfully cleared - verified by acquiring test lock", metadata: ["name": vmDirContext.name]) flock(finalHandle.fileDescriptor, LOCK_UN) } else { Logger.info( "Lock still present after all clearing attempts", metadata: ["name": vmDirContext.name, "severity": "warning"]) } try? finalHandle.close() } } // MARK: - Resource Management func updateVMConfig(vmConfig: VMConfig) throws { vmDirContext.config = vmConfig try vmDirContext.saveConfig() } private func getDiskSize() throws -> DiskSize { let resourceValues = try vmDirContext.diskPath.url.resourceValues(forKeys: [ .totalFileAllocatedSizeKey, .totalFileSizeKey, ]) guard let allocated = resourceValues.totalFileAllocatedSize, let total = resourceValues.totalFileSize else { throw VMConfigError.invalidDiskSize } return DiskSize(allocated: UInt64(allocated), total: UInt64(total)) } func resizeDisk(_ newSize: UInt64) throws { let currentSize = try getDiskSize() guard newSize >= currentSize.total else { throw VMError.resizeTooSmall(current: currentSize.total, requested: newSize) } try setDiskSize(newSize) } func setCpuCount(_ newCpuCount: Int) throws { guard !isRunning else { throw VMError.alreadyRunning(vmDirContext.name) } vmDirContext.config.setCpuCount(newCpuCount) try vmDirContext.saveConfig() } func setMemorySize(_ newMemorySize: UInt64) throws { guard !isRunning else { throw VMError.alreadyRunning(vmDirContext.name) } vmDirContext.config.setMemorySize(newMemorySize) try vmDirContext.saveConfig() } func setDiskSize(_ newDiskSize: UInt64) throws { try vmDirContext.setDisk(newDiskSize) vmDirContext.config.setDiskSize(newDiskSize) try vmDirContext.saveConfig() } func setDisplay(_ newDisplay: String) throws { guard !isRunning else { throw VMError.alreadyRunning(vmDirContext.name) } guard let display: VMDisplayResolution = VMDisplayResolution(string: newDisplay) else { throw VMError.invalidDisplayResolution(newDisplay) } vmDirContext.config.setDisplay(display) try vmDirContext.saveConfig() } func setHardwareModel(_ newHardwareModel: Data) throws { guard !isRunning else { throw VMError.alreadyRunning(vmDirContext.name) } vmDirContext.config.setHardwareModel(newHardwareModel) try vmDirContext.saveConfig() } func setMachineIdentifier(_ newMachineIdentifier: Data) throws { guard !isRunning else { throw VMError.alreadyRunning(vmDirContext.name) } vmDirContext.config.setMachineIdentifier(newMachineIdentifier) try vmDirContext.saveConfig() } func setMacAddress(_ newMacAddress: String) throws { guard !isRunning else { throw VMError.alreadyRunning(vmDirContext.name) } vmDirContext.config.setMacAddress(newMacAddress) try vmDirContext.saveConfig() } // MARK: - VNC Management func getVNCUrl() -> String? { return vncService.url } /// Sets up the VNC service and returns the VNC URL private func startVNCService(port: Int = 0) async throws -> String { guard let service = virtualizationService else { throw VMError.internalError("Virtualization service not initialized") } try await vncService.start(port: port, virtualMachine: service.getVirtualMachine()) guard let url = vncService.url else { throw VMError.vncNotConfigured } return url } /// Saves the session information including shared directories to disk private func saveSessionData(url: String, sharedDirectories: [SharedDirectory]) { do { let session = VNCSession( url: url, sharedDirectories: sharedDirectories.isEmpty ? nil : sharedDirectories) try vmDirContext.dir.saveSession(session) Logger.info( "Saved VNC session with shared directories", metadata: [ "count": "\(sharedDirectories.count)", "dirs": "\(sharedDirectories.map { $0.hostPath }.joined(separator: ", "))", "sessionsPath": "\(vmDirContext.dir.sessionsPath.path)", ]) } catch { Logger.error("Failed to save VNC session", metadata: ["error": "\(error)"]) } } /// Main session setup method that handles VNC and persists session data private func setupSession( noDisplay: Bool, port: Int = 0, sharedDirectories: [SharedDirectory] = [] ) async throws -> String { // Start the VNC service and get the URL let url = try await startVNCService(port: port) // Save the session data saveSessionData(url: url, sharedDirectories: sharedDirectories) // Open the VNC client if needed if !noDisplay { Logger.info("Starting VNC session", metadata: ["name": vmDirContext.name]) try await vncService.openClient(url: url) } return url } // MARK: - Platform-specific Methods func getOSType() -> String { fatalError("Must be implemented by subclass") } func createVMVirtualizationServiceContext( cpuCount: Int, memorySize: UInt64, display: String, sharedDirectories: [SharedDirectory] = [], mount: Path? = nil, recoveryMode: Bool = false, usbMassStoragePaths: [Path]? = nil ) throws -> VMVirtualizationServiceContext { // This is a diagnostic log to track actual file paths on disk for debugging try validateDiskState() return VMVirtualizationServiceContext( cpuCount: cpuCount, memorySize: memorySize, display: display, sharedDirectories: sharedDirectories, mount: mount, hardwareModel: vmDirContext.config.hardwareModel, machineIdentifier: vmDirContext.config.machineIdentifier, macAddress: vmDirContext.config.macAddress!, diskPath: vmDirContext.diskPath, nvramPath: vmDirContext.nvramPath, recoveryMode: recoveryMode, usbMassStoragePaths: usbMassStoragePaths ) } /// Validates the disk state to help diagnose storage attachment issues private func validateDiskState() throws { // Check disk image state let diskPath = vmDirContext.diskPath.path let diskExists = FileManager.default.fileExists(atPath: diskPath) var diskSize: UInt64 = 0 var diskPermissions = "" if diskExists { if let attrs = try? FileManager.default.attributesOfItem(atPath: diskPath) { diskSize = attrs[.size] as? UInt64 ?? 0 let posixPerms = attrs[.posixPermissions] as? Int ?? 0 diskPermissions = String(format: "%o", posixPerms) } } // Check disk container directory permissions let diskDir = (diskPath as NSString).deletingLastPathComponent let dirPerms = try? FileManager.default.attributesOfItem(atPath: diskDir)[.posixPermissions] as? Int ?? 0 let dirPermsString = dirPerms != nil ? String(format: "%o", dirPerms!) : "unknown" // Log detailed diagnostics Logger.info( "Validating VM disk state", metadata: [ "diskPath": diskPath, "diskExists": "\(diskExists)", "diskSize": "\(ByteCountFormatter.string(fromByteCount: Int64(diskSize), countStyle: .file))", "diskPermissions": diskPermissions, "dirPermissions": dirPermsString, "locationName": vmDirContext.storage ?? "default", ]) if !diskExists { Logger.error("VM disk image does not exist", metadata: ["diskPath": diskPath]) } else if diskSize == 0 { Logger.error("VM disk image exists but has zero size", metadata: ["diskPath": diskPath]) } } func setup( ipswPath: String, cpuCount: Int, memorySize: UInt64, diskSize: UInt64, display: String ) async throws { fatalError("Must be implemented by subclass") } // MARK: - Finalization /// Post-installation step to move the VM directory to the home directory func finalize(to name: String, home: Home, storage: String? = nil) throws { let vmDir = try home.getVMDirectory(name, storage: storage) try FileManager.default.moveItem(at: vmDirContext.dir.dir.url, to: vmDir.dir.url) } // Method to run VM with additional USB mass storage devices func runWithUSBStorage( noDisplay: Bool, sharedDirectories: [SharedDirectory], mount: Path?, vncPort: Int = 0, recoveryMode: Bool = false, usbImagePaths: [Path] ) async throws { guard vmDirContext.initialized else { throw VMError.notInitialized(vmDirContext.name) } guard let cpuCount = vmDirContext.config.cpuCount, let memorySize = vmDirContext.config.memorySize else { throw VMError.notInitialized(vmDirContext.name) } // Try to acquire lock on config file let fileHandle = try FileHandle(forWritingTo: vmDirContext.dir.configPath.url) guard flock(fileHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0 else { try? fileHandle.close() throw VMError.alreadyRunning(vmDirContext.name) } Logger.info( "Running VM with USB storage devices", metadata: [ "cpuCount": "\(cpuCount)", "memorySize": "\(memorySize)", "diskSize": "\(vmDirContext.config.diskSize ?? 0)", "usbImageCount": "\(usbImagePaths.count)", "recoveryMode": "\(recoveryMode)", ]) // Create and configure the VM do { let config = try createVMVirtualizationServiceContext( cpuCount: cpuCount, memorySize: memorySize, display: vmDirContext.config.display.string, sharedDirectories: sharedDirectories, mount: mount, recoveryMode: recoveryMode, usbMassStoragePaths: usbImagePaths ) virtualizationService = try virtualizationServiceFactory(config) let vncInfo = try await setupSession( noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories) Logger.info("VNC info", metadata: ["vncInfo": vncInfo]) // Start the VM guard let service = virtualizationService else { throw VMError.internalError("Virtualization service not initialized") } try await service.start() while true { try await Task.sleep(nanoseconds: UInt64(1e9)) } } catch { Logger.error( "Failed to create/start VM with USB storage", metadata: [ "error": "\(error)", "errorType": "\(type(of: error))", ]) virtualizationService = nil vncService.stop() // Release lock flock(fileHandle.fileDescriptor, LOCK_UN) try? fileHandle.close() throw error } } } ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/loops/uitars.py: -------------------------------------------------------------------------------- ```python """ UITARS agent loop implementation using liteLLM for ByteDance-Seed/UI-TARS-1.5-7B Paper: https://arxiv.org/abs/2501.12326 Code: https://github.com/bytedance/UI-TARS """ import asyncio from ctypes import cast import json import base64 import math import re import ast from typing import Dict, List, Any, AsyncGenerator, Union, Optional, Tuple from io import BytesIO from PIL import Image import litellm from litellm.types.utils import ModelResponse from litellm.responses.litellm_completion_transformation.transformation import LiteLLMCompletionResponsesConfig from litellm.responses.utils import Usage from openai.types.responses.response_computer_tool_call_param import ActionType, ResponseComputerToolCallParam from openai.types.responses.response_input_param import ComputerCallOutput from openai.types.responses.response_output_message_param import ResponseOutputMessageParam from openai.types.responses.response_reasoning_item_param import ResponseReasoningItemParam, Summary from ..decorators import register_agent from ..types import Messages, AgentResponse, Tools, AgentCapability from ..responses import ( make_reasoning_item, make_output_text_item, make_click_item, make_double_click_item, make_drag_item, make_keypress_item, make_scroll_item, make_type_item, make_wait_item, make_input_image_item ) # Constants from reference code IMAGE_FACTOR = 28 MIN_PIXELS = 100 * 28 * 28 MAX_PIXELS = 16384 * 28 * 28 MAX_RATIO = 200 FINISH_WORD = "finished" WAIT_WORD = "wait" ENV_FAIL_WORD = "error_env" CALL_USER = "call_user" # Action space prompt for UITARS UITARS_ACTION_SPACE = """ click(start_box='<|box_start|>(x1,y1)<|box_end|>') left_double(start_box='<|box_start|>(x1,y1)<|box_end|>') right_single(start_box='<|box_start|>(x1,y1)<|box_end|>') drag(start_box='<|box_start|>(x1,y1)<|box_end|>', end_box='<|box_start|>(x3,y3)<|box_end|>') hotkey(key='') type(content='') #If you want to submit your input, use "\\n" at the end of `content`. scroll(start_box='<|box_start|>(x1,y1)<|box_end|>', direction='down or up or right or left') wait() #Sleep for 5s and take a screenshot to check for any changes. finished(content='xxx') # Use escape characters \\', \\", and \\n in content part to ensure we can parse the content in normal python string format. """ UITARS_PROMPT_TEMPLATE = """You are a GUI agent. You are given a task and your action history, with screenshots. You need to perform the next action to complete the task. ## Output Format ``` Thought: ... Action: ... ``` ## Action Space {action_space} ## Note - Use {language} in `Thought` part. - Write a small plan and finally summarize your next action (with its target element) in one sentence in `Thought` part. ## User Instruction {instruction} """ GROUNDING_UITARS_PROMPT_TEMPLATE = """You are a GUI agent. You are given a task and your action history, with screenshots. You need to perform the next action to complete the task. ## Output Format Action: ... ## Action Space click(point='<|box_start|>(x1,y1)<|box_end|>') ## User Instruction {instruction}""" def round_by_factor(number: float, factor: int) -> int: """Returns the closest integer to 'number' that is divisible by 'factor'.""" return round(number / factor) * factor def ceil_by_factor(number: float, factor: int) -> int: """Returns the smallest integer greater than or equal to 'number' that is divisible by 'factor'.""" return math.ceil(number / factor) * factor def floor_by_factor(number: float, factor: int) -> int: """Returns the largest integer less than or equal to 'number' that is divisible by 'factor'.""" return math.floor(number / factor) * factor def smart_resize( height: int, width: int, factor: int = IMAGE_FACTOR, min_pixels: int = MIN_PIXELS, max_pixels: int = MAX_PIXELS ) -> tuple[int, int]: """ Rescales the image so that the following conditions are met: 1. Both dimensions (height and width) are divisible by 'factor'. 2. The total number of pixels is within the range ['min_pixels', 'max_pixels']. 3. The aspect ratio of the image is maintained as closely as possible. """ if max(height, width) / min(height, width) > MAX_RATIO: raise ValueError( f"absolute aspect ratio must be smaller than {MAX_RATIO}, got {max(height, width) / min(height, width)}" ) h_bar = max(factor, round_by_factor(height, factor)) w_bar = max(factor, round_by_factor(width, factor)) if h_bar * w_bar > max_pixels: beta = math.sqrt((height * width) / max_pixels) h_bar = floor_by_factor(height / beta, factor) w_bar = floor_by_factor(width / beta, factor) elif h_bar * w_bar < min_pixels: beta = math.sqrt(min_pixels / (height * width)) h_bar = ceil_by_factor(height * beta, factor) w_bar = ceil_by_factor(width * beta, factor) return h_bar, w_bar def escape_single_quotes(text): """Escape single quotes in text for safe string formatting.""" pattern = r"(?<!\\)'" return re.sub(pattern, r"\\'", text) def parse_action(action_str): """Parse action string into structured format.""" try: node = ast.parse(action_str, mode='eval') if not isinstance(node, ast.Expression): raise ValueError("Not an expression") call = node.body if not isinstance(call, ast.Call): raise ValueError("Not a function call") # Get function name if isinstance(call.func, ast.Name): func_name = call.func.id elif isinstance(call.func, ast.Attribute): func_name = call.func.attr else: func_name = None # Get keyword arguments kwargs = {} for kw in call.keywords: key = kw.arg if isinstance(kw.value, ast.Constant): value = kw.value.value elif isinstance(kw.value, ast.Str): # Compatibility with older Python value = kw.value.s else: value = None kwargs[key] = value return { 'function': func_name, 'args': kwargs } except Exception as e: print(f"Failed to parse action '{action_str}': {e}") return None def parse_uitars_response(text: str, image_width: int, image_height: int) -> List[Dict[str, Any]]: """Parse UITARS model response into structured actions.""" text = text.strip() # Extract thought thought = None if text.startswith("Thought:"): thought_match = re.search(r"Thought: (.+?)(?=\s*Action:|$)", text, re.DOTALL) if thought_match: thought = thought_match.group(1).strip() # Extract action if "Action:" not in text: raise ValueError("No Action found in response") action_str = text.split("Action:")[-1].strip() # Handle special case for type actions if "type(content" in action_str: def escape_quotes(match): return match.group(1) pattern = r"type\(content='(.*?)'\)" content = re.sub(pattern, escape_quotes, action_str) action_str = escape_single_quotes(content) action_str = "type(content='" + action_str + "')" # Parse the action parsed_action = parse_action(action_str.replace("\n", "\\n").lstrip()) if parsed_action is None: raise ValueError(f"Action can't parse: {action_str}") action_type = parsed_action["function"] params = parsed_action["args"] # Process parameters action_inputs = {} for param_name, param in params.items(): if param == "": continue param = str(param).lstrip() action_inputs[param_name.strip()] = param # Handle coordinate parameters if "start_box" in param_name or "end_box" in param_name: # Parse coordinates like '<|box_start|>(x,y)<|box_end|>' or '(x,y)' # First, remove special tokens clean_param = param.replace("<|box_start|>", "").replace("<|box_end|>", "") # Then remove parentheses and split numbers = clean_param.replace("(", "").replace(")", "").split(",") try: float_numbers = [float(num.strip()) / 1000 for num in numbers] # Normalize to 0-1 range if len(float_numbers) == 2: # Single point, duplicate for box format float_numbers = [float_numbers[0], float_numbers[1], float_numbers[0], float_numbers[1]] action_inputs[param_name.strip()] = str(float_numbers) except ValueError as e: # If parsing fails, keep the original parameter value print(f"Warning: Could not parse coordinates '{param}': {e}") action_inputs[param_name.strip()] = param return [{ "thought": thought, "action_type": action_type, "action_inputs": action_inputs, "text": text }] def convert_to_computer_actions(parsed_responses: List[Dict[str, Any]], image_width: int, image_height: int) -> List[ResponseComputerToolCallParam | ResponseOutputMessageParam]: """Convert parsed UITARS responses to computer actions.""" computer_actions = [] for response in parsed_responses: action_type = response.get("action_type") action_inputs = response.get("action_inputs", {}) if action_type == "finished": finished_text = action_inputs.get("content", "Task completed successfully.") computer_actions.append(make_output_text_item(finished_text)) break elif action_type == "wait": computer_actions.append(make_wait_item()) elif action_type == "call_user": computer_actions.append(make_output_text_item("I need assistance from the user to proceed with this task.")) elif action_type in ["click", "left_single"]: start_box = action_inputs.get("start_box") if start_box: coords = eval(start_box) x = int((coords[0] + coords[2]) / 2 * image_width) y = int((coords[1] + coords[3]) / 2 * image_height) computer_actions.append(make_click_item(x, y, "left")) elif action_type == "double_click": start_box = action_inputs.get("start_box") if start_box: coords = eval(start_box) x = int((coords[0] + coords[2]) / 2 * image_width) y = int((coords[1] + coords[3]) / 2 * image_height) computer_actions.append(make_double_click_item(x, y)) elif action_type == "right_click": start_box = action_inputs.get("start_box") if start_box: coords = eval(start_box) x = int((coords[0] + coords[2]) / 2 * image_width) y = int((coords[1] + coords[3]) / 2 * image_height) computer_actions.append(make_click_item(x, y, "right")) elif action_type == "type": content = action_inputs.get("content", "") computer_actions.append(make_type_item(content)) elif action_type == "hotkey": key = action_inputs.get("key", "") keys = key.split() computer_actions.append(make_keypress_item(keys)) elif action_type == "press": key = action_inputs.get("key", "") computer_actions.append(make_keypress_item([key])) elif action_type == "scroll": start_box = action_inputs.get("start_box") direction = action_inputs.get("direction", "down") if start_box: coords = eval(start_box) x = int((coords[0] + coords[2]) / 2 * image_width) y = int((coords[1] + coords[3]) / 2 * image_height) else: x, y = image_width // 2, image_height // 2 scroll_y = 5 if "up" in direction.lower() else -5 computer_actions.append(make_scroll_item(x, y, 0, scroll_y)) elif action_type == "drag": start_box = action_inputs.get("start_box") end_box = action_inputs.get("end_box") if start_box and end_box: start_coords = eval(start_box) end_coords = eval(end_box) start_x = int((start_coords[0] + start_coords[2]) / 2 * image_width) start_y = int((start_coords[1] + start_coords[3]) / 2 * image_height) end_x = int((end_coords[0] + end_coords[2]) / 2 * image_width) end_y = int((end_coords[1] + end_coords[3]) / 2 * image_height) path = [{"x": start_x, "y": start_y}, {"x": end_x, "y": end_y}] computer_actions.append(make_drag_item(path)) return computer_actions def pil_to_base64(image: Image.Image) -> str: """Convert PIL image to base64 string.""" buffer = BytesIO() image.save(buffer, format="PNG") return base64.b64encode(buffer.getvalue()).decode("utf-8") def process_image_for_uitars(image_data: str, max_pixels: int = MAX_PIXELS, min_pixels: int = MIN_PIXELS) -> tuple[Image.Image, int, int]: """Process image for UITARS model input.""" # Decode base64 image if image_data.startswith('data:image'): image_data = image_data.split(',')[1] image_bytes = base64.b64decode(image_data) image = Image.open(BytesIO(image_bytes)) original_width, original_height = image.size # Resize image according to UITARS requirements if image.width * image.height > max_pixels: resize_factor = math.sqrt(max_pixels / (image.width * image.height)) width = int(image.width * resize_factor) height = int(image.height * resize_factor) image = image.resize((width, height)) if image.width * image.height < min_pixels: resize_factor = math.sqrt(min_pixels / (image.width * image.height)) width = math.ceil(image.width * resize_factor) height = math.ceil(image.height * resize_factor) image = image.resize((width, height)) if image.mode != "RGB": image = image.convert("RGB") return image, original_width, original_height def sanitize_message(msg: Any) -> Any: """Return a copy of the message with image_url ommited within content parts""" if isinstance(msg, dict): result = {} for key, value in msg.items(): if key == "content" and isinstance(value, list): result[key] = [ {k: v for k, v in item.items() if k != "image_url"} if isinstance(item, dict) else item for item in value ] else: result[key] = value return result elif isinstance(msg, list): return [sanitize_message(item) for item in msg] else: return msg def convert_uitars_messages_to_litellm(messages: Messages) -> List[Dict[str, Any]]: """ Convert UITARS internal message format back to LiteLLM format. This function processes reasoning, computer_call, and computer_call_output messages and converts them to the appropriate LiteLLM assistant message format. Args: messages: List of UITARS internal messages Returns: List of LiteLLM formatted messages """ litellm_messages = [] current_assistant_content = [] for message in messages: if isinstance(message, dict): message_type = message.get("type") if message_type == "reasoning": # Extract reasoning text from summary summary = message.get("summary", []) if summary and isinstance(summary, list): for summary_item in summary: if isinstance(summary_item, dict) and summary_item.get("type") == "summary_text": reasoning_text = summary_item.get("text", "") if reasoning_text: current_assistant_content.append(f"Thought: {reasoning_text}") elif message_type == "computer_call": # Convert computer action to UITARS action format action = message.get("action", {}) action_type = action.get("type") if action_type == "click": x, y = action.get("x", 0), action.get("y", 0) button = action.get("button", "left") if button == "left": action_text = f"Action: click(start_box='({x},{y})')" elif button == "right": action_text = f"Action: right_single(start_box='({x},{y})')" else: action_text = f"Action: click(start_box='({x},{y})')" elif action_type == "double_click": x, y = action.get("x", 0), action.get("y", 0) action_text = f"Action: left_double(start_box='({x},{y})')" elif action_type == "drag": start_x, start_y = action.get("start_x", 0), action.get("start_y", 0) end_x, end_y = action.get("end_x", 0), action.get("end_y", 0) action_text = f"Action: drag(start_box='({start_x},{start_y})', end_box='({end_x},{end_y})')" elif action_type == "key": key = action.get("key", "") action_text = f"Action: hotkey(key='{key}')" elif action_type == "type": text = action.get("text", "") # Escape single quotes in the text escaped_text = escape_single_quotes(text) action_text = f"Action: type(content='{escaped_text}')" elif action_type == "scroll": x, y = action.get("x", 0), action.get("y", 0) direction = action.get("direction", "down") action_text = f"Action: scroll(start_box='({x},{y})', direction='{direction}')" elif action_type == "wait": action_text = "Action: wait()" else: # Fallback for unknown action types action_text = f"Action: {action_type}({action})" current_assistant_content.append(action_text) # When we hit a computer_call_output, finalize the current assistant message if current_assistant_content: litellm_messages.append({ "role": "assistant", "content": [{"type": "text", "text": "\n".join(current_assistant_content)}] }) current_assistant_content = [] elif message_type == "computer_call_output": # Add screenshot from computer call output output = message.get("output", {}) if isinstance(output, dict) and output.get("type") == "input_image": image_url = output.get("image_url", "") if image_url: litellm_messages.append({ "role": "user", "content": [{"type": "image_url", "image_url": {"url": image_url}}] }) elif message.get("role") == "user": # # Handle user messages # content = message.get("content", "") # if isinstance(content, str): # litellm_messages.append({ # "role": "user", # "content": content # }) # elif isinstance(content, list): # litellm_messages.append({ # "role": "user", # "content": content # }) pass # Add any remaining assistant content if current_assistant_content: litellm_messages.append({ "role": "assistant", "content": current_assistant_content }) return litellm_messages @register_agent(models=r"(?i).*ui-?tars.*") class UITARSConfig: """ UITARS agent configuration using liteLLM for ByteDance-Seed/UI-TARS-1.5-7B model. Supports UITARS vision-language models for computer control. """ async def predict_step( self, messages: List[Dict[str, Any]], model: str, tools: Optional[List[Dict[str, Any]]] = None, max_retries: Optional[int] = None, stream: bool = False, computer_handler=None, use_prompt_caching: Optional[bool] = False, _on_api_start=None, _on_api_end=None, _on_usage=None, _on_screenshot=None, **kwargs ) -> Dict[str, Any]: """ Predict the next step based on input messages. Args: messages: Input messages following Responses format model: Model name to use tools: Optional list of tool schemas max_retries: Maximum number of retries stream: Whether to stream responses computer_handler: Computer handler instance _on_api_start: Callback for API start _on_api_end: Callback for API end _on_usage: Callback for usage tracking _on_screenshot: Callback for screenshot events **kwargs: Additional arguments Returns: Dictionary with "output" (output items) and "usage" array """ tools = tools or [] # Create response items response_items = [] # Find computer tool for screen dimensions computer_tool = None for tool_schema in tools: if tool_schema["type"] == "computer": computer_tool = tool_schema["computer"] break # Get screen dimensions screen_width, screen_height = 1024, 768 if computer_tool: try: screen_width, screen_height = await computer_tool.get_dimensions() except: pass # Process messages to extract instruction and image instruction = "" image_data = None # Convert messages to list if string if isinstance(messages, str): messages = [{"role": "user", "content": messages}] # Extract instruction and latest screenshot for message in reversed(messages): if isinstance(message, dict): content = message.get("content", "") # Handle different content formats if isinstance(content, str): if not instruction and message.get("role") == "user": instruction = content elif isinstance(content, list): for item in content: if isinstance(item, dict): if item.get("type") == "text" and not instruction: instruction = item.get("text", "") elif item.get("type") == "image_url" and not image_data: image_url = item.get("image_url", {}) if isinstance(image_url, dict): image_data = image_url.get("url", "") else: image_data = image_url # Also check for computer_call_output with screenshots if message.get("type") == "computer_call_output" and not image_data: output = message.get("output", {}) if isinstance(output, dict) and output.get("type") == "input_image": image_data = output.get("image_url", "") if instruction and image_data: break if not instruction: instruction = "Help me complete this task by analyzing the screen and taking appropriate actions." # Create prompt user_prompt = UITARS_PROMPT_TEMPLATE.format( instruction=instruction, action_space=UITARS_ACTION_SPACE, language="English" ) # Convert conversation history to LiteLLM format history_messages = convert_uitars_messages_to_litellm(messages) # Prepare messages for liteLLM litellm_messages = [ { "role": "system", "content": "You are a helpful assistant." } ] # Add current user instruction with screenshot current_user_message = { "role": "user", "content": [ {"type": "text", "text": user_prompt}, ] } litellm_messages.append(current_user_message) # Process image for UITARS if not image_data: # Take screenshot if none found in messages if computer_handler: image_data = await computer_handler.screenshot() await _on_screenshot(image_data, "screenshot_before") # Add screenshot to output items so it can be retained in history response_items.append(make_input_image_item(image_data)) else: raise ValueError("No screenshot found in messages and no computer_handler provided") processed_image, original_width, original_height = process_image_for_uitars(image_data) encoded_image = pil_to_base64(processed_image) # Add conversation history if history_messages: litellm_messages.extend(history_messages) else: litellm_messages.append({ "role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{encoded_image}"}} ] }) # Prepare API call kwargs api_kwargs = { "model": model, "messages": litellm_messages, "max_tokens": kwargs.get("max_tokens", 500), "temperature": kwargs.get("temperature", 0.0), "do_sample": kwargs.get("temperature", 0.0) > 0.0, "num_retries": max_retries, **{k: v for k, v in kwargs.items() if k not in ["max_tokens", "temperature"]} } # Call API start hook if _on_api_start: await _on_api_start(api_kwargs) # Call liteLLM with UITARS model response = await litellm.acompletion(**api_kwargs) # Call API end hook if _on_api_end: await _on_api_end(api_kwargs, response) # Extract response content response_content = response.choices[0].message.content.strip() # type: ignore # Parse UITARS response parsed_responses = parse_uitars_response(response_content, original_width, original_height) # Convert to computer actions computer_actions = convert_to_computer_actions(parsed_responses, original_width, original_height) # Add computer actions to response items thought = parsed_responses[0].get("thought", "") if thought: response_items.append(make_reasoning_item(thought)) response_items.extend(computer_actions) # Extract usage information response_usage = { **LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage(response.usage).model_dump(), "response_cost": response._hidden_params.get("response_cost", 0.0), } if _on_usage: await _on_usage(response_usage) # Create agent response agent_response = { "output": response_items, "usage": response_usage } return agent_response async def predict_click( self, model: str, image_b64: str, instruction: str ) -> Optional[Tuple[int, int]]: """ Predict click coordinates based on image and instruction. UITARS supports click prediction through its action parsing. Args: model: Model name to use image_b64: Base64 encoded image instruction: Instruction for where to click Returns: Tuple with (x, y) coordinates or None """ try: # Create prompt using grounding template user_prompt = GROUNDING_UITARS_PROMPT_TEMPLATE.format( instruction=instruction ) # Process image for UITARS processed_image, original_width, original_height = process_image_for_uitars(image_b64) encoded_image = pil_to_base64(processed_image) # Prepare messages for liteLLM litellm_messages = [ { "role": "system", "content": "You are a helpful assistant." }, { "role": "user", "content": [ {"type": "text", "text": user_prompt}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{encoded_image}"}} ] } ] # Prepare API call kwargs api_kwargs = { "model": model, "messages": litellm_messages, "max_tokens": 2056, "temperature": 0.0, "do_sample": False } # Call liteLLM with UITARS model response = await litellm.acompletion(**api_kwargs) # Extract response content response_content = response.choices[0].message.content.strip() # type: ignore print(response_content) # Parse the response to extract click coordinates # Look for click action with coordinates (with special tokens) click_pattern = r"click\(point='<\|box_start\|>\((\d+),(\d+)\)<\|box_end\|>'\)" match = re.search(click_pattern, response_content) # Fallback: Look for simpler format without special tokens if not match: # Pattern for: click(start_box='(x,y)') or click(point='(x,y)') fallback_pattern = r"click\((?:start_box|point)='\((\d+),(\d+)\)'\)" match = re.search(fallback_pattern, response_content) if match: x, y = int(match.group(1)), int(match.group(2)) # Scale coordinates back to original image dimensions scale_x = original_width / processed_image.width scale_y = original_height / processed_image.height scaled_x = int(x * scale_x) scaled_y = int(y * scale_y) return (scaled_x, scaled_y) return None except Exception as e: # Log error and return None print(f"Error in predict_click: {e}") return None def get_capabilities(self) -> List[AgentCapability]: """ Get list of capabilities supported by this agent config. Returns: List of capability strings """ return ["step", "click"] ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/loops/glm45v.py: -------------------------------------------------------------------------------- ```python """ GLM-4.5V agent loop implementation using liteLLM for GLM-4.5V model. Supports vision-language models for computer control with bounding box parsing. """ import asyncio import json import base64 import re from typing import Dict, List, Any, Optional, Tuple from io import BytesIO from PIL import Image import litellm from litellm.types.utils import ModelResponse from litellm.responses.litellm_completion_transformation.transformation import LiteLLMCompletionResponsesConfig from ..decorators import register_agent from ..types import Messages, AgentResponse, Tools, AgentCapability from ..loops.base import AsyncAgentConfig from ..responses import ( convert_responses_items_to_completion_messages, convert_completion_messages_to_responses_items, make_reasoning_item, make_output_text_item, make_click_item, make_double_click_item, make_drag_item, make_keypress_item, make_scroll_item, make_type_item, make_wait_item, make_input_image_item ) # GLM-4.5V specific constants GLM_ACTION_SPACE = """ ### {left,right,middle}_click Call rule: `{left,right,middle}_click(start_box='[x,y]', element_info='')` { 'name': ['left_click', 'right_click', 'middle_click'], 'description': 'Perform a left/right/middle mouse click at the specified coordinates on the screen.', 'parameters': { 'type': 'object', 'properties': { 'start_box': { 'type': 'array', 'items': { 'type': 'integer' }, 'description': 'Coordinates [x,y] where to perform the click, normalized to 0-999 range.' }, 'element_info': { 'type': 'string', 'description': 'Optional text description of the UI element being clicked.' } }, 'required': ['start_box'] } } ### hover Call rule: `hover(start_box='[x,y]', element_info='')` { 'name': 'hover', 'description': 'Move the mouse pointer to the specified coordinates without performing any click action.', 'parameters': { 'type': 'object', 'properties': { 'start_box': { 'type': 'array', 'items': { 'type': 'integer' }, 'description': 'Coordinates [x,y] where to move the mouse pointer, normalized to 0-999 range.' }, 'element_info': { 'type': 'string', 'description': 'Optional text description of the UI element being hovered over.' } }, 'required': ['start_box'] } } ### left_double_click Call rule: `left_double_click(start_box='[x,y]', element_info='')` { 'name': 'left_double_click', 'description': 'Perform a left mouse double-click at the specified coordinates on the screen.', 'parameters': { 'type': 'object', 'properties': { 'start_box': { 'type': 'array', 'items': { 'type': 'integer' }, 'description': 'Coordinates [x,y] where to perform the double-click, normalized to 0-999 range.' }, 'element_info': { 'type': 'string', 'description': 'Optional text description of the UI element being double-clicked.' } }, 'required': ['start_box'] } } ### left_drag Call rule: `left_drag(start_box='[x1,y1]', end_box='[x2,y2]', element_info='')` { 'name': 'left_drag', 'description': 'Drag the mouse from starting coordinates to ending coordinates while holding the left mouse button.', 'parameters': { 'type': 'object', 'properties': { 'start_box': { 'type': 'array', 'items': { 'type': 'integer' }, 'description': 'Starting coordinates [x1,y1] for the drag operation, normalized to 0-999 range.' }, 'end_box': { 'type': 'array', 'items': { 'type': 'integer' }, 'description': 'Ending coordinates [x2,y2] for the drag operation, normalized to 0-999 range.' }, 'element_info': { 'type': 'string', 'description': 'Optional text description of the UI element being dragged.' } }, 'required': ['start_box', 'end_box'] } } ### key Call rule: `key(keys='')` { 'name': 'key', 'description': 'Simulate pressing a single key or combination of keys on the keyboard.', 'parameters': { 'type': 'object', 'properties': { 'keys': { 'type': 'string', 'description': 'The key or key combination to press. Use '+' to separate keys in combinations (e.g., 'ctrl+c', 'alt+tab').' } }, 'required': ['keys'] } } ### type Call rule: `type(content='')` { 'name': 'type', 'description': 'Type text content into the currently focused text input field. This action only performs typing and does not handle field activation or clearing.', 'parameters': { 'type': 'object', 'properties': { 'content': { 'type': 'string', 'description': 'The text content to be typed into the active text field.' } }, 'required': ['content'] } } ### scroll Call rule: `scroll(start_box='[x,y]', direction='', step=5, element_info='')` { 'name': 'scroll', 'description': 'Scroll an element at the specified coordinates in the specified direction by a given number of wheel steps.', 'parameters': { 'type': 'object', 'properties': { 'start_box': { 'type': 'array', 'items': { 'type': 'integer' }, 'description': 'Coordinates [x,y] of the element or area to scroll, normalized to 0-999 range.' }, 'direction': { 'type': 'string', 'enum': ['down', 'up'], 'description': 'The direction to scroll: 'down' or 'up'.' }, 'step': { 'type': 'integer', 'default': 5, 'description': 'Number of wheel steps to scroll, default is 5.' }, 'element_info': { 'type': 'string', 'description': 'Optional text description of the UI element being scrolled.' } }, 'required': ['start_box', 'direction'] } } ### WAIT Call rule: `WAIT()` { 'name': 'WAIT', 'description': 'Wait for 5 seconds before proceeding to the next action.', 'parameters': { 'type': 'object', 'properties': {}, 'required': [] } } ### DONE Call rule: `DONE()` { 'name': 'DONE', 'description': 'Indicate that the current task has been completed successfully and no further actions are needed.', 'parameters': { 'type': 'object', 'properties': {}, 'required': [] } } ### FAIL Call rule: `FAIL()` { 'name': 'FAIL', 'description': 'Indicate that the current task cannot be completed or is impossible to accomplish.', 'parameters': { 'type': 'object', 'properties': {}, 'required': [] } }""" def encode_image_to_base64(image_path: str) -> str: """Encode image file to base64 string with data URI.""" with open(image_path, "rb") as image_file: encoded_string = base64.b64encode(image_file.read()).decode("utf-8") return f"data:image/png;base64,{encoded_string}" def parse_glm_response(response: str) -> Dict[str, Any]: """ Parse GLM-4.5V response to extract action and memory. The special tokens <|begin_of_box|> and <|end_of_box|> mark bounding boxes. Coordinates are normalized values between 0 and 1000. """ # Extract action from between special tokens pattern = r"<\|begin_of_box\|>(.*?)<\|end_of_box\|>" match = re.search(pattern, response) if match: action = match.group(1).strip() else: # Fallback: look for function call patterns action_pattern = r"[\w_]+\([^)]*\)" matches = re.findall(action_pattern, response) action = matches[0] if matches else None # Extract memory section memory_pattern = r"Memory:(.*?)$" memory_match = re.search(memory_pattern, response, re.DOTALL) memory = memory_match.group(1).strip() if memory_match else "[]" # Extract action text (everything before Memory:) action_text_pattern = r'^(.*?)Memory:' action_text_match = re.search(action_text_pattern, response, re.DOTALL) action_text = action_text_match.group(1).strip() if action_text_match else response # Clean up action text by removing special tokens if action_text: action_text = action_text.replace("<|begin_of_box|>", "").replace("<|end_of_box|>", "") return { "action": action, "action_text": action_text, "memory": memory } def get_last_image_from_messages(messages: Messages) -> Optional[str]: """Extract the last image from messages for processing.""" for message in reversed(messages): if isinstance(message, dict): if message.get("type") == "computer_call_output": output = message.get("output", {}) if isinstance(output, dict) and output.get("type") == "input_image": image_url = output.get("image_url", "") if isinstance(image_url, str) and image_url.startswith("data:image/"): # Extract base64 part return image_url.split(",", 1)[1] elif message.get("role") == "user": content = message.get("content", []) if isinstance(content, list): for item in reversed(content): if isinstance(item, dict) and item.get("type") == "image_url": image_url_obj = item.get("image_url", {}) if isinstance(image_url_obj, dict): image_url = image_url_obj.get("url", "") if isinstance(image_url, str) and image_url.startswith("data:image/"): return image_url.split(",", 1)[1] return None def convert_responses_items_to_glm45v_pc_prompt(messages: Messages, task: str, memory: str = "") -> List[Dict[str, Any]]: """Convert responses items to GLM-4.5V PC prompt format with historical actions. Args: messages: List of message items from the conversation task: The task description memory: Current memory state Returns: List of content items for the prompt (text and image_url items) """ action_space = GLM_ACTION_SPACE # Template head head_text = f"""You are a GUI Agent, and your primary task is to respond accurately to user requests or questions. In addition to directly answering the user's queries, you can also use tools or perform GUI operations directly until you fulfill the user's request or provide a correct answer. You should carefully read and understand the images and questions provided by the user, and engage in thinking and reflection when appropriate. The coordinates involved are all represented in thousandths (0-999). # Task: {task} # Task Platform Ubuntu # Action Space {action_space} # Historical Actions and Current Memory History:""" # Template tail tail_text = f""" Memory: {memory} # Output Format Plain text explanation with action(param='...') Memory: [{{"key": "value"}}, ...] # Some Additional Notes - I'll give you the most recent 4 history screenshots(shrunked to 50%*50%) along with the historical action steps. - You should put the key information you *have to remember* in a seperated memory part and I'll give it to you in the next round. The content in this part should be a dict list. If you no longer need some given information, you should remove it from the memory. Even if you don't need to remember anything, you should also output an empty list. - My computer's password is "password", feel free to use it when you need sudo rights. - For the thunderbird account "[email protected]", the password is "gTCI";=@y7|QJ0nDa_kN3Sb&>". Current Screenshot: """ # Build history from messages history = [] history_images = [] # Group messages into steps current_step = [] step_num = 0 for message in messages: msg_type = message.get("type") if msg_type == "reasoning": current_step.append(message) elif msg_type == "message" and message.get("role") == "assistant": current_step.append(message) elif msg_type == "computer_call": current_step.append(message) elif msg_type == "computer_call_output": current_step.append(message) # End of step - process it if current_step: step_num += 1 # Extract bot thought from message content bot_thought = "" for item in current_step: if item.get("type") == "message" and item.get("role") == "assistant": content = item.get("content", []) for content_item in content: if content_item.get("type") == "output_text": bot_thought = content_item.get("text", "") break break # Extract action from computer_call action_text = "" for item in current_step: if item.get("type") == "computer_call": action = item.get("action", {}) action_type = action.get("type", "") if action_type == "click": x, y = action.get("x", 0), action.get("y", 0) # Convert to 0-999 range (assuming screen dimensions) # For now, use direct coordinates - this may need adjustment action_text = f"left_click(start_box='[{x},{y}]')" elif action_type == "double_click": x, y = action.get("x", 0), action.get("y", 0) action_text = f"left_double_click(start_box='[{x},{y}]')" elif action_type == "right_click": x, y = action.get("x", 0), action.get("y", 0) action_text = f"right_click(start_box='[{x},{y}]')" elif action_type == "drag": # Handle drag with path path = action.get("path", []) if len(path) >= 2: start = path[0] end = path[-1] action_text = f"left_drag(start_box='[{start.get('x', 0)},{start.get('y', 0)}]', end_box='[{end.get('x', 0)},{end.get('y', 0)}]')" elif action_type == "keypress": key = action.get("key", "") action_text = f"key(keys='{key}')" elif action_type == "type": text = action.get("text", "") action_text = f"type(content='{text}')" elif action_type == "scroll": x, y = action.get("x", 0), action.get("y", 0) direction = action.get("direction", "down") action_text = f"scroll(start_box='[{x},{y}]', direction='{direction}')" elif action_type == "wait": action_text = "WAIT()" break # Extract screenshot from computer_call_output screenshot_url = None for item in current_step: if item.get("type") == "computer_call_output": output = item.get("output", {}) if output.get("type") == "input_image": screenshot_url = output.get("image_url", "") break # Store step info step_info = { "step_num": step_num, "bot_thought": bot_thought, "action_text": action_text, "screenshot_url": screenshot_url } history.append(step_info) # Store screenshot for last 4 steps if screenshot_url: history_images.append(screenshot_url) current_step = [] # Build content array with head, history, and tail content = [] current_text = head_text total_history_steps = len(history) history_image_count = min(4, len(history_images)) # Last 4 images for step_idx, step_info in enumerate(history): step_num = step_info["step_num"] bot_thought = step_info["bot_thought"] action_text = step_info["action_text"] if step_idx < total_history_steps - history_image_count: # For steps beyond the last 4, use text placeholder current_text += f"\nstep {step_num}: Screenshot:(Omitted in context.) Thought: {bot_thought}\nAction: {action_text}" else: # For the last 4 steps, insert images current_text += f"\nstep {step_num}: Screenshot:" content.append({"type": "text", "text": current_text}) # Add image img_idx = step_idx - (total_history_steps - history_image_count) if img_idx < len(history_images): content.append({"type": "image_url", "image_url": {"url": history_images[img_idx]}}) current_text = f" Thought: {bot_thought}\nAction: {action_text}" # Add tail current_text += tail_text content.append({"type": "text", "text": current_text}) return content def model_dump(obj) -> Dict[str, Any]: if isinstance(obj, dict): return {k: model_dump(v) for k, v in obj.items()} elif hasattr(obj, "model_dump"): return obj.model_dump() else: return obj def convert_glm_completion_to_responses_items(response: ModelResponse, image_width: int, image_height: int) -> List[Dict[str, Any]]: """ Convert GLM-4.5V completion response to responses items format. Args: response: LiteLLM ModelResponse from GLM-4.5V image_width: Original image width for coordinate scaling image_height: Original image height for coordinate scaling Returns: List of response items in the proper format """ import uuid response_items = [] if not response.choices or not response.choices[0].message: return response_items message = response.choices[0].message content = message.content or "" reasoning_content = getattr(message, 'reasoning_content', None) # Add reasoning item if present if reasoning_content: reasoning_item = model_dump(make_reasoning_item(reasoning_content)) response_items.append(reasoning_item) # Parse the content to extract action and text parsed_response = parse_glm_response(content) action = parsed_response.get("action", "") action_text = parsed_response.get("action_text", "") # Add message item with text content (excluding action and memory) if action_text: # Remove action from action_text if it's there clean_text = action_text if action and action in clean_text: clean_text = clean_text.replace(action, "").strip() # Remove memory section memory_pattern = r"Memory:\s*\[.*?\]\s*$" clean_text = re.sub(memory_pattern, "", clean_text, flags=re.DOTALL).strip() if clean_text: message_item = model_dump(make_output_text_item(clean_text)) response_items.append(message_item) # Convert action to computer call if present if action: call_id = f"call_{uuid.uuid4().hex[:8]}" # Parse different action types and create appropriate computer calls if action.startswith("left_click"): coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action) if coord_match: x, y = int(coord_match.group(1)), int(coord_match.group(2)) # Convert from 0-999 to actual pixel coordinates actual_x = int((x / 999.0) * image_width) actual_y = int((y / 999.0) * image_height) computer_call = model_dump(make_click_item(actual_x, actual_y)) computer_call["call_id"] = call_id computer_call["status"] = "completed" response_items.append(computer_call) elif action.startswith("right_click"): coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action) if coord_match: x, y = int(coord_match.group(1)), int(coord_match.group(2)) actual_x = int((x / 999.0) * image_width) actual_y = int((y / 999.0) * image_height) computer_call = model_dump(make_click_item(actual_x, actual_y, button="right")) computer_call["call_id"] = call_id computer_call["status"] = "completed" response_items.append(computer_call) elif action.startswith("left_double_click"): coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action) if coord_match: x, y = int(coord_match.group(1)), int(coord_match.group(2)) actual_x = int((x / 999.0) * image_width) actual_y = int((y / 999.0) * image_height) computer_call = model_dump(make_double_click_item(actual_x, actual_y)) computer_call["call_id"] = call_id computer_call["status"] = "completed" response_items.append(computer_call) elif action.startswith("left_drag"): start_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action) end_match = re.search(r"end_box='?\[(\d+),\s*(\d+)\]'?", action) if start_match and end_match: x1, y1 = int(start_match.group(1)), int(start_match.group(2)) x2, y2 = int(end_match.group(1)), int(end_match.group(2)) actual_x1 = int((x1 / 999.0) * image_width) actual_y1 = int((y1 / 999.0) * image_height) actual_x2 = int((x2 / 999.0) * image_width) actual_y2 = int((y2 / 999.0) * image_height) # Create path for drag operation drag_path = [{"x": actual_x1, "y": actual_y1}, {"x": actual_x2, "y": actual_y2}] computer_call = model_dump(make_drag_item(drag_path)) computer_call["call_id"] = call_id computer_call["status"] = "completed" response_items.append(computer_call) elif action.startswith("key"): key_match = re.search(r"keys='([^']+)'", action) if key_match: keys = key_match.group(1) # Split keys by '+' for key combinations, or use as single key key_list = keys.split('+') if '+' in keys else [keys] computer_call = model_dump(make_keypress_item(key_list)) computer_call["call_id"] = call_id computer_call["status"] = "completed" response_items.append(computer_call) elif action.startswith("type"): content_match = re.search(r"content='([^']*)'", action) if content_match: content = content_match.group(1) computer_call = model_dump(make_type_item(content)) computer_call["call_id"] = call_id computer_call["status"] = "completed" response_items.append(computer_call) elif action.startswith("scroll"): coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action) direction_match = re.search(r"direction='([^']+)'", action) if coord_match and direction_match: x, y = int(coord_match.group(1)), int(coord_match.group(2)) direction = direction_match.group(1) actual_x = int((x / 999.0) * image_width) actual_y = int((y / 999.0) * image_height) # Convert direction to scroll amounts scroll_x, scroll_y = 0, 0 if direction == "up": scroll_y = -5 elif direction == "down": scroll_y = 5 elif direction == "left": scroll_x = -5 elif direction == "right": scroll_x = 5 computer_call = model_dump(make_scroll_item(actual_x, actual_y, scroll_x, scroll_y)) computer_call["call_id"] = call_id computer_call["status"] = "completed" response_items.append(computer_call) elif action == "WAIT()": computer_call = model_dump(make_wait_item()) computer_call["call_id"] = call_id computer_call["status"] = "completed" response_items.append(computer_call) return response_items @register_agent(models=r"(?i).*GLM-4\.5V.*") class Glm4vConfig(AsyncAgentConfig): """GLM-4.5V agent configuration using liteLLM.""" async def predict_step( self, messages: List[Dict[str, Any]], model: str, tools: Optional[List[Dict[str, Any]]] = None, max_retries: Optional[int] = None, stream: bool = False, computer_handler=None, use_prompt_caching: Optional[bool] = False, _on_api_start=None, _on_api_end=None, _on_usage=None, _on_screenshot=None, **kwargs ) -> Dict[str, Any]: """ Predict the next step using GLM-4.5V model. Args: messages: Input messages following Responses format model: Model name to use tools: Optional list of tool schemas max_retries: Maximum number of retries for API calls stream: Whether to stream the response computer_handler: Computer handler for taking screenshots use_prompt_caching: Whether to use prompt caching _on_api_start: Callback for API start _on_api_end: Callback for API end _on_usage: Callback for usage tracking _on_screenshot: Callback for screenshot events Returns: Dict with "output" and "usage" keys """ # Get the user instruction from the last user message user_instruction = "" for message in reversed(messages): if isinstance(message, dict) and message.get("role") == "user": content = message.get("content", "") if isinstance(content, str): user_instruction = content elif isinstance(content, list): for item in content: if isinstance(item, dict) and item.get("type") == "text": user_instruction = item.get("text", "") break break # Get the last image for processing last_image_b64 = get_last_image_from_messages(messages) if not last_image_b64 and computer_handler: # Take a screenshot if no image available screenshot_b64 = await computer_handler.screenshot() if screenshot_b64: last_image_b64 = screenshot_b64 if _on_screenshot: await _on_screenshot(screenshot_b64) if not last_image_b64: raise ValueError("No image available for GLM-4.5V processing") # Convert responses items to GLM-4.5V PC prompt format with historical actions prompt_content = convert_responses_items_to_glm45v_pc_prompt( messages=messages, task=user_instruction, memory="[]" # Initialize with empty memory for now ) # Add the current screenshot to the end prompt_content.append({ "type": "image_url", "image_url": {"url": f"data:image/png;base64,{last_image_b64}"} }) # Prepare messages for liteLLM litellm_messages = [ { "role": "system", "content": "You are a helpful GUI agent assistant." }, { "role": "user", "content": prompt_content } ] # Prepare API call kwargs api_kwargs = { "model": model, "messages": litellm_messages, # "max_tokens": 2048, # "temperature": 0.001, # "extra_body": { # "skip_special_tokens": False, # } } # Add API callbacks if _on_api_start: await _on_api_start(api_kwargs) # Call liteLLM response = await litellm.acompletion(**api_kwargs) if _on_api_end: await _on_api_end(api_kwargs, response) # Get image dimensions for coordinate scaling image_width, image_height = 1920, 1080 # Default dimensions # Try to get actual dimensions from the image try: image_data = base64.b64decode(last_image_b64) image = Image.open(BytesIO(image_data)) image_width, image_height = image.size except Exception: pass # Use default dimensions # Convert GLM completion response to responses items response_items = convert_glm_completion_to_responses_items(response, image_width, image_height) # Extract usage information response_usage = { **LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage(response.usage).model_dump(), "response_cost": response._hidden_params.get("response_cost", 0.0), } if _on_usage: await _on_usage(response_usage) # Create agent response agent_response = { "output": response_items, "usage": response_usage } return agent_response async def predict_click( self, model: str, image_b64: str, instruction: str, **kwargs ) -> Optional[Tuple[int, int]]: """ Predict click coordinates using GLM-4.5V model. Args: model: Model name to use image_b64: Base64 encoded image instruction: Instruction for where to click Returns: Tuple with (x, y) coordinates or None """ try: # Create a simple click instruction prompt click_prompt = f"""You are a GUI agent. Look at the screenshot and identify where to click for: {instruction} Respond with a single click action in this format: left_click(start_box='[x,y]') Where x,y are coordinates normalized to 0-999 range.""" # Prepare messages for liteLLM litellm_messages = [ { "role": "system", "content": "You are a helpful GUI agent assistant." }, { "role": "user", "content": [ {"type": "text", "text": click_prompt}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_b64}"}} ] } ] # Prepare API call kwargs api_kwargs = { "model": model, "messages": litellm_messages, "max_tokens": 2056, "temperature": 0.001, "extra_body": { "skip_special_tokens": False, } } # Call liteLLM response = await litellm.acompletion(**api_kwargs) # Extract response content response_content = response.choices[0].message.content.strip() print(response) # Parse response for click coordinates # Look for coordinates in the response, handling special tokens coord_pattern = r"<\|begin_of_box\|>.*?left_click\(start_box='?\[(\d+),(\d+)\]'?\).*?<\|end_of_box\|>" match = re.search(coord_pattern, response_content) if not match: # Fallback: look for coordinates without special tokens coord_pattern = r"left_click\(start_box='?\[(\d+),(\d+)\]'?\)" match = re.search(coord_pattern, response_content) if match: x, y = int(match.group(1)), int(match.group(2)) # Get actual image dimensions for scaling try: image_data = base64.b64decode(image_b64) image = Image.open(BytesIO(image_data)) image_width, image_height = image.size except Exception: # Use default dimensions image_width, image_height = 1920, 1080 # Convert from 0-999 normalized coordinates to actual pixel coordinates actual_x = int((x / 999.0) * image_width) actual_y = int((y / 999.0) * image_height) return (actual_x, actual_y) return None except Exception as e: # Log error and return None print(f"Error in predict_click: {e}") return None def get_capabilities(self) -> List[AgentCapability]: """ Get list of capabilities supported by this agent config. Returns: List of capability strings """ return ["step", "click"] ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/ui/gradio/ui_components.py: -------------------------------------------------------------------------------- ```python """ UI Components for the Gradio interface """ import os import asyncio import logging import json import platform from pathlib import Path from typing import Dict, List, Optional, Any, cast import gradio as gr from gradio.components.chatbot import MetadataDict from .app import ( load_settings, save_settings, create_agent, get_model_string, get_ollama_models, global_agent, global_computer ) # Global messages array to maintain conversation history global_messages = [] def create_gradio_ui() -> gr.Blocks: """Create a Gradio UI for the Computer-Use Agent.""" # Load settings saved_settings = load_settings() # Check for API keys openai_api_key = os.environ.get("OPENAI_API_KEY", "") anthropic_api_key = os.environ.get("ANTHROPIC_API_KEY", "") cua_api_key = os.environ.get("CUA_API_KEY", "") # Model choices openai_models = ["OpenAI: Computer-Use Preview"] anthropic_models = [ "Anthropic: Claude 4 Opus (20250514)", "Anthropic: Claude 4 Sonnet (20250514)", "Anthropic: Claude 3.7 Sonnet (20250219)", "Anthropic: Claude 3.5 Sonnet (20241022)", ] omni_models = [ "OMNI: OpenAI GPT-4o", "OMNI: OpenAI GPT-4o mini", "OMNI: Claude 3.7 Sonnet (20250219)", "OMNI: Claude 3.5 Sonnet (20241022)" ] # Check if API keys are available has_openai_key = bool(openai_api_key) has_anthropic_key = bool(anthropic_api_key) has_cua_key = bool(cua_api_key) # Get Ollama models for OMNI ollama_models = get_ollama_models() if ollama_models: omni_models += ollama_models # Detect platform is_mac = platform.system().lower() == "darwin" # Format model choices provider_to_models = { "OPENAI": openai_models, "ANTHROPIC": anthropic_models, "OMNI": omni_models + ["Custom model (OpenAI compatible API)", "Custom model (ollama)"], "UITARS": ([ "huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B", ] if is_mac else []) + ["Custom model (OpenAI compatible API)"], } # Apply saved settings initial_loop = saved_settings.get("agent_loop", "OMNI") available_models_for_loop = provider_to_models.get(initial_loop, []) saved_model_choice = saved_settings.get("model_choice") if saved_model_choice and saved_model_choice in available_models_for_loop: initial_model = saved_model_choice else: if initial_loop == "OPENAI": initial_model = openai_models[0] if openai_models else "No models available" elif initial_loop == "ANTHROPIC": initial_model = anthropic_models[0] if anthropic_models else "No models available" else: # OMNI initial_model = omni_models[0] if omni_models else "Custom model (OpenAI compatible API)" initial_custom_model = saved_settings.get("custom_model", "Qwen2.5-VL-7B-Instruct") initial_provider_base_url = saved_settings.get("provider_base_url", "http://localhost:1234/v1") initial_save_trajectory = saved_settings.get("save_trajectory", True) initial_recent_images = saved_settings.get("recent_images", 3) # Example prompts example_messages = [ "Create a Python virtual environment, install pandas and matplotlib, then plot stock data", "Open a PDF in Preview, add annotations, and save it as a compressed version", "Open Safari, search for 'macOS automation tools', and save the first three results as bookmarks", "Configure SSH keys and set up a connection to a remote server", ] def generate_python_code(agent_loop_choice, model_name, tasks, recent_images=3, save_trajectory=True, computer_os="linux", computer_provider="cloud", container_name="", cua_cloud_api_key="", max_budget=None): """Generate Python code for the current configuration and tasks.""" tasks_str = "" for task in tasks: if task and task.strip(): tasks_str += f' "{task}",\n' model_string = get_model_string(model_name, agent_loop_choice) computer_args = [] if computer_os != "macos": computer_args.append(f'os_type="{computer_os}"') if computer_provider != "lume": computer_args.append(f'provider_type="{computer_provider}"') if container_name: computer_args.append(f'name="{container_name}"') if cua_cloud_api_key: computer_args.append(f'api_key="{cua_cloud_api_key}"') computer_args_str = ", ".join(computer_args) if computer_args_str: computer_args_str = f"({computer_args_str})" else: computer_args_str = "()" code = f'''import asyncio from computer import Computer from agent import ComputerAgent async def main(): async with Computer{computer_args_str} as computer: agent = ComputerAgent( model="{model_string}", tools=[computer], only_n_most_recent_images={recent_images},''' if save_trajectory: code += ''' trajectory_dir="trajectories",''' if max_budget: code += f''' max_trajectory_budget={{"max_budget": {max_budget}, "raise_error": True}},''' code += ''' ) ''' if tasks_str: code += f''' # Prompts for the computer-use agent tasks = [ {tasks_str.rstrip()} ] for task in tasks: print(f"Executing task: {{task}}") messages = [{{"role": "user", "content": task}}] async for result in agent.run(messages): for item in result["output"]: if item["type"] == "message": print(item["content"][0]["text"])''' else: code += f''' # Execute a single task task = "Search for information about CUA on GitHub" print(f"Executing task: {{task}}") messages = [{{"role": "user", "content": task}}] async for result in agent.run(messages): for item in result["output"]: if item["type"] == "message": print(item["content"][0]["text"])''' code += ''' if __name__ == "__main__": asyncio.run(main())''' return code # Create the Gradio interface with gr.Blocks(title="Computer-Use Agent") as demo: with gr.Row(): # Left column for settings with gr.Column(scale=1): # Logo gr.HTML( """ <div style="display: flex; justify-content: center; margin-bottom: 0.5em"> <img alt="CUA Logo" style="width: 80px;" src="https://github.com/trycua/cua/blob/main/img/logo_white.png?raw=true" /> </div> """ ) # Python code accordion with gr.Accordion("Python Code", open=False): code_display = gr.Code( language="python", value=generate_python_code(initial_loop, "gpt-4o", []), interactive=False, ) with gr.Accordion("Computer Configuration", open=True): is_windows = platform.system().lower() == "windows" is_mac = platform.system().lower() == "darwin" providers = ["cloud", "localhost", "docker"] if is_mac: providers += ["lume"] if is_windows: providers += ["winsandbox"] # Remove unavailable options # MacOS is unavailable if Lume is not available # Windows is unavailable if Winsandbox is not available # Linux is always available # This should be removed once we support macOS and Windows on the cloud provider computer_choices = ["macos", "linux", "windows"] if not is_mac or "lume" not in providers: computer_choices.remove("macos") if not is_windows or "winsandbox" not in providers: computer_choices.remove("windows") computer_os = gr.Radio( choices=computer_choices, label="Operating System", value=computer_choices[0], info="Select the operating system for the computer", ) computer_provider = gr.Radio( choices=providers, label="Provider", value="lume" if is_mac else "cloud", info="Select the computer provider", ) container_name = gr.Textbox( label="Container Name", placeholder="Enter container name (optional)", value=os.environ.get("CUA_CONTAINER_NAME", ""), info="Optional name for the container", ) cua_cloud_api_key = gr.Textbox( label="CUA Cloud API Key", placeholder="Enter your CUA Cloud API key", value=os.environ.get("CUA_API_KEY", ""), type="password", info="Required for cloud provider", visible=(not has_cua_key) ) with gr.Accordion("Agent Configuration", open=True): agent_loop = gr.Dropdown( choices=["OPENAI", "ANTHROPIC", "OMNI", "UITARS"], label="Agent Loop", value=initial_loop, info="Select the agent loop provider", ) # Model selection dropdowns with gr.Group() as model_selection_group: openai_model_choice = gr.Dropdown( choices=openai_models, label="OpenAI Model", value=openai_models[0] if openai_models else "No models available", info="Select OpenAI model", interactive=True, visible=(initial_loop == "OPENAI") ) anthropic_model_choice = gr.Dropdown( choices=anthropic_models, label="Anthropic Model", value=anthropic_models[0] if anthropic_models else "No models available", info="Select Anthropic model", interactive=True, visible=(initial_loop == "ANTHROPIC") ) omni_model_choice = gr.Dropdown( choices=omni_models + ["Custom model (OpenAI compatible API)", "Custom model (ollama)"], label="OMNI Model", value=omni_models[0] if omni_models else "Custom model (OpenAI compatible API)", info="Select OMNI model or choose a custom model option", interactive=True, visible=(initial_loop == "OMNI") ) uitars_model_choice = gr.Dropdown( choices=provider_to_models.get("UITARS", ["No models available"]), label="UITARS Model", value=provider_to_models.get("UITARS", ["No models available"])[0] if provider_to_models.get("UITARS") else "No models available", info="Select UITARS model", interactive=True, visible=(initial_loop == "UITARS") ) model_choice = gr.Textbox(visible=False) # API key inputs with gr.Group(visible=not has_openai_key and (initial_loop == "OPENAI" or initial_loop == "OMNI")) as openai_key_group: openai_api_key_input = gr.Textbox( label="OpenAI API Key", placeholder="Enter your OpenAI API key", value=os.environ.get("OPENAI_API_KEY", ""), interactive=True, type="password", info="Required for OpenAI models" ) with gr.Group(visible=not has_anthropic_key and (initial_loop == "ANTHROPIC" or initial_loop == "OMNI")) as anthropic_key_group: anthropic_api_key_input = gr.Textbox( label="Anthropic API Key", placeholder="Enter your Anthropic API key", value=os.environ.get("ANTHROPIC_API_KEY", ""), interactive=True, type="password", info="Required for Anthropic models" ) # API key handlers def set_openai_api_key(key): if key and key.strip(): os.environ["OPENAI_API_KEY"] = key.strip() print(f"DEBUG - Set OpenAI API key environment variable") return key def set_anthropic_api_key(key): if key and key.strip(): os.environ["ANTHROPIC_API_KEY"] = key.strip() print(f"DEBUG - Set Anthropic API key environment variable") return key openai_api_key_input.change( fn=set_openai_api_key, inputs=[openai_api_key_input], outputs=[openai_api_key_input], queue=False ) anthropic_api_key_input.change( fn=set_anthropic_api_key, inputs=[anthropic_api_key_input], outputs=[anthropic_api_key_input], queue=False ) # UI update function def update_ui(loop=None, openai_model=None, anthropic_model=None, omni_model=None, uitars_model=None): loop = loop or agent_loop.value model_value = None if loop == "OPENAI" and openai_model: model_value = openai_model elif loop == "ANTHROPIC" and anthropic_model: model_value = anthropic_model elif loop == "OMNI" and omni_model: model_value = omni_model elif loop == "UITARS" and uitars_model: model_value = uitars_model openai_visible = (loop == "OPENAI") anthropic_visible = (loop == "ANTHROPIC") omni_visible = (loop == "OMNI") uitars_visible = (loop == "UITARS") show_openai_key = not has_openai_key and (loop == "OPENAI" or (loop == "OMNI" and model_value and "OpenAI" in model_value and "Custom" not in model_value)) show_anthropic_key = not has_anthropic_key and (loop == "ANTHROPIC" or (loop == "OMNI" and model_value and "Claude" in model_value and "Custom" not in model_value)) is_custom_openai_api = model_value == "Custom model (OpenAI compatible API)" is_custom_ollama = model_value == "Custom model (ollama)" is_any_custom = is_custom_openai_api or is_custom_ollama model_choice_value = model_value if model_value else "" return [ gr.update(visible=openai_visible), gr.update(visible=anthropic_visible), gr.update(visible=omni_visible), gr.update(visible=uitars_visible), gr.update(visible=show_openai_key), gr.update(visible=show_anthropic_key), gr.update(visible=is_any_custom), gr.update(visible=is_custom_openai_api), gr.update(visible=is_custom_openai_api), gr.update(value=model_choice_value) ] # Custom model inputs custom_model = gr.Textbox( label="Custom Model Name", placeholder="Enter custom model name (e.g., Qwen2.5-VL-7B-Instruct or llama3)", value=initial_custom_model, visible=(initial_model == "Custom model (OpenAI compatible API)" or initial_model == "Custom model (ollama)"), interactive=True, ) provider_base_url = gr.Textbox( label="Provider Base URL", placeholder="Enter provider base URL (e.g., http://localhost:1234/v1)", value=initial_provider_base_url, visible=(initial_model == "Custom model (OpenAI compatible API)"), interactive=True, ) provider_api_key = gr.Textbox( label="Provider API Key", placeholder="Enter provider API key (if required)", value="", visible=(initial_model == "Custom model (OpenAI compatible API)"), interactive=True, type="password", ) # Provider visibility update function def update_provider_visibility(provider): """Update visibility of container name and API key based on selected provider.""" is_localhost = provider == "localhost" return [ gr.update(visible=not is_localhost), # container_name gr.update(visible=not is_localhost and not has_cua_key) # cua_cloud_api_key ] # Connect provider change event computer_provider.change( fn=update_provider_visibility, inputs=[computer_provider], outputs=[container_name, cua_cloud_api_key], queue=False ) # Connect UI update events for dropdown in [agent_loop, omni_model_choice, uitars_model_choice, openai_model_choice, anthropic_model_choice]: dropdown.change( fn=update_ui, inputs=[agent_loop, openai_model_choice, anthropic_model_choice, omni_model_choice, uitars_model_choice], outputs=[ openai_model_choice, anthropic_model_choice, omni_model_choice, uitars_model_choice, openai_key_group, anthropic_key_group, custom_model, provider_base_url, provider_api_key, model_choice ], queue=False ) save_trajectory = gr.Checkbox( label="Save Trajectory", value=initial_save_trajectory, info="Save the agent's trajectory for debugging", interactive=True, ) recent_images = gr.Slider( label="Recent Images", minimum=1, maximum=10, value=initial_recent_images, step=1, info="Number of recent images to keep in context", interactive=True, ) max_budget = gr.Number( label="Max Budget ($)", value=lambda: None, minimum=-1, maximum=100.0, step=0.1, info="Optional budget limit for trajectory (0 = no limit)", interactive=True, ) # Right column for chat interface with gr.Column(scale=2): gr.Markdown( "Ask me to perform tasks in a virtual environment.<br>Built with <a href='https://github.com/trycua/cua' target='_blank'>github.com/trycua/cua</a>." ) chatbot_history = gr.Chatbot(type="messages") msg = gr.Textbox( placeholder="Ask me to perform tasks in a virtual environment" ) clear = gr.Button("Clear") cancel_button = gr.Button("Cancel", variant="stop") # Add examples example_group = gr.Examples(examples=example_messages, inputs=msg) # Chat submission function def chat_submit(message, history): history.append(gr.ChatMessage(role="user", content=message)) return "", history # Cancel function async def cancel_agent_task(history): global global_agent if global_agent: print("DEBUG - Cancelling agent task") history.append(gr.ChatMessage(role="assistant", content="Task cancelled by user", metadata={"title": "❌ Cancelled"})) else: history.append(gr.ChatMessage(role="assistant", content="No active agent task to cancel", metadata={"title": "ℹ️ Info"})) return history # Process response function async def process_response( history, openai_model_value, anthropic_model_value, omni_model_value, uitars_model_value, custom_model_value, agent_loop_choice, save_traj, recent_imgs, custom_url_value=None, custom_api_key=None, openai_key_input=None, anthropic_key_input=None, computer_os="linux", computer_provider="cloud", container_name="", cua_cloud_api_key="", max_budget_value=None, ): if not history: yield history return # Get the last user message last_user_message = history[-1]["content"] # Get the appropriate model value based on the agent loop if agent_loop_choice == "OPENAI": model_choice_value = openai_model_value elif agent_loop_choice == "ANTHROPIC": model_choice_value = anthropic_model_value elif agent_loop_choice == "OMNI": model_choice_value = omni_model_value elif agent_loop_choice == "UITARS": model_choice_value = uitars_model_value else: model_choice_value = "No models available" # Determine if this is a custom model selection is_custom_model_selected = model_choice_value in ["Custom model (OpenAI compatible API)", "Custom model (ollama)"] # Determine the model name string to analyze if is_custom_model_selected: model_string_to_analyze = custom_model_value else: model_string_to_analyze = model_choice_value try: # Get the model string model_string = get_model_string(model_string_to_analyze, agent_loop_choice) # Set API keys if provided if openai_key_input: os.environ["OPENAI_API_KEY"] = openai_key_input if anthropic_key_input: os.environ["ANTHROPIC_API_KEY"] = anthropic_key_input if cua_cloud_api_key: os.environ["CUA_API_KEY"] = cua_cloud_api_key # Save settings current_settings = { "agent_loop": agent_loop_choice, "model_choice": model_choice_value, "custom_model": custom_model_value, "provider_base_url": custom_url_value, "save_trajectory": save_traj, "recent_images": recent_imgs, "computer_os": computer_os, "computer_provider": computer_provider, "container_name": container_name, } save_settings(current_settings) # Create agent global_agent = create_agent( model_string=model_string, save_trajectory=save_traj, only_n_most_recent_images=recent_imgs, custom_model_name=custom_model_value if is_custom_model_selected else None, computer_os=computer_os, computer_provider=computer_provider, computer_name=container_name, computer_api_key=cua_cloud_api_key, verbosity=logging.DEBUG, max_trajectory_budget=max_budget_value if max_budget_value and max_budget_value > 0 else None, ) if global_agent is None: history.append( gr.ChatMessage( role="assistant", content="Failed to create agent. Check API keys and configuration.", ) ) yield history return # Add user message to global history global global_messages global_messages.append({"role": "user", "content": last_user_message}) # Stream responses from the agent async for result in global_agent.run(global_messages): global_messages += result.get("output", []) # print(f"DEBUG - Agent response ------- START") # from pprint import pprint # pprint(result) # print(f"DEBUG - Agent response ------- END") # Process the result output for item in result.get("output", []): if item.get("type") == "message": content = item.get("content", []) for content_part in content: if content_part.get("text"): history.append(gr.ChatMessage( role=item.get("role", "assistant"), content=content_part.get("text", ""), metadata=content_part.get("metadata", {}) )) elif item.get("type") == "computer_call": action = item.get("action", {}) action_type = action.get("type", "") if action_type: action_title = f"🛠️ Performing {action_type}" if action.get("x") and action.get("y"): action_title += f" at ({action['x']}, {action['y']})" history.append(gr.ChatMessage( role="assistant", content=f"```json\n{json.dumps(action)}\n```", metadata={"title": action_title} )) elif item.get("type") == "function_call": function_name = item.get("name", "") arguments = item.get("arguments", "{}") history.append(gr.ChatMessage( role="assistant", content=f"🔧 Calling function: {function_name}\n```json\n{arguments}\n```", metadata={"title": f"Function Call: {function_name}"} )) elif item.get("type") == "function_call_output": output = item.get("output", "") history.append(gr.ChatMessage( role="assistant", content=f"📤 Function output:\n```\n{output}\n```", metadata={"title": "Function Output"} )) elif item.get("type") == "computer_call_output": output = item.get("output", {}).get("image_url", "") image_markdown = f"" history.append(gr.ChatMessage( role="assistant", content=image_markdown, metadata={"title": "🖥️ Computer Output"} )) yield history except Exception as e: import traceback traceback.print_exc() history.append(gr.ChatMessage(role="assistant", content=f"Error: {str(e)}")) yield history # Connect the submit button submit_event = msg.submit( fn=chat_submit, inputs=[msg, chatbot_history], outputs=[msg, chatbot_history], queue=False, ).then( fn=process_response, inputs=[ chatbot_history, openai_model_choice, anthropic_model_choice, omni_model_choice, uitars_model_choice, custom_model, agent_loop, save_trajectory, recent_images, provider_base_url, provider_api_key, openai_api_key_input, anthropic_api_key_input, computer_os, computer_provider, container_name, cua_cloud_api_key, max_budget, ], outputs=[chatbot_history], queue=True, ) # Clear button functionality def clear_chat(): global global_messages global_messages.clear() return None clear.click(clear_chat, None, chatbot_history, queue=False) # Connect cancel button cancel_button.click( cancel_agent_task, [chatbot_history], [chatbot_history], queue=False ) # Code display update function def update_code_display(agent_loop, model_choice_val, custom_model_val, chat_history, recent_images_val, save_trajectory_val, computer_os, computer_provider, container_name, cua_cloud_api_key, max_budget_val): messages = [] if chat_history: for msg in chat_history: if isinstance(msg, dict) and msg.get("role") == "user": messages.append(msg.get("content", "")) return generate_python_code( agent_loop, model_choice_val or custom_model_val or "gpt-4o", messages, recent_images_val, save_trajectory_val, computer_os, computer_provider, container_name, cua_cloud_api_key, max_budget_val ) # Update code display when configuration changes for component in [agent_loop, model_choice, custom_model, chatbot_history, recent_images, save_trajectory, computer_os, computer_provider, container_name, cua_cloud_api_key, max_budget]: component.change( update_code_display, inputs=[agent_loop, model_choice, custom_model, chatbot_history, recent_images, save_trajectory, computer_os, computer_provider, container_name, cua_cloud_api_key, max_budget], outputs=[code_display] ) return demo ``` -------------------------------------------------------------------------------- /libs/lume/src/LumeController.swift: -------------------------------------------------------------------------------- ```swift import ArgumentParser import Foundation import Virtualization // MARK: - Shared VM Manager @MainActor final class SharedVM { static let shared: SharedVM = SharedVM() private var runningVMs: [String: VM] = [:] private init() {} func getVM(name: String) -> VM? { return runningVMs[name] } func setVM(name: String, vm: VM) { runningVMs[name] = vm } func removeVM(name: String) { runningVMs.removeValue(forKey: name) } } /// Entrypoint for Commands and API server final class LumeController { // MARK: - Properties let home: Home private let imageLoaderFactory: ImageLoaderFactory private let vmFactory: VMFactory // MARK: - Initialization init( home: Home = Home(), imageLoaderFactory: ImageLoaderFactory = DefaultImageLoaderFactory(), vmFactory: VMFactory = DefaultVMFactory() ) { self.home = home self.imageLoaderFactory = imageLoaderFactory self.vmFactory = vmFactory } // MARK: - Public VM Management Methods /// Lists all virtual machines in the system @MainActor public func list(storage: String? = nil) throws -> [VMDetails] { do { if let storage = storage { // If storage is specified, only return VMs from that location if storage.contains("/") || storage.contains("\\") { // Direct path - check if it exists if !FileManager.default.fileExists(atPath: storage) { // Return empty array if the path doesn't exist return [] } // Try to get all VMs from the specified path // We need to check which subdirectories are valid VM dirs let directoryURL = URL(fileURLWithPath: storage) let contents = try FileManager.default.contentsOfDirectory( at: directoryURL, includingPropertiesForKeys: [.isDirectoryKey], options: .skipsHiddenFiles ) let statuses = try contents.compactMap { subdir -> VMDetails? in guard let isDirectory = try subdir.resourceValues(forKeys: [.isDirectoryKey]).isDirectory, isDirectory else { return nil } let vmName = subdir.lastPathComponent // Check if it's a valid VM directory let vmDir = try home.getVMDirectoryFromPath(vmName, storagePath: storage) if !vmDir.initialized() { return nil } do { let vm = try self.get(name: vmName, storage: storage) return vm.details } catch { // Skip invalid VM directories return nil } } return statuses } else { // Named storage let vmsWithLoc = try home.getAllVMDirectories() let statuses = try vmsWithLoc.compactMap { vmWithLoc -> VMDetails? in // Only include VMs from the specified location if vmWithLoc.locationName != storage { return nil } let vm = try self.get( name: vmWithLoc.directory.name, storage: vmWithLoc.locationName) return vm.details } return statuses } } else { // No storage filter - get all VMs let vmsWithLoc = try home.getAllVMDirectories() let statuses = try vmsWithLoc.compactMap { vmWithLoc -> VMDetails? in let vm = try self.get( name: vmWithLoc.directory.name, storage: vmWithLoc.locationName) return vm.details } return statuses } } catch { Logger.error("Failed to list VMs", metadata: ["error": error.localizedDescription]) throw error } } @MainActor public func clone( name: String, newName: String, sourceLocation: String? = nil, destLocation: String? = nil ) throws { let normalizedName = normalizeVMName(name: name) let normalizedNewName = normalizeVMName(name: newName) Logger.info( "Cloning VM", metadata: [ "source": normalizedName, "destination": normalizedNewName, "sourceLocation": sourceLocation ?? "default", "destLocation": destLocation ?? "default", ]) do { // Validate source VM exists _ = try self.validateVMExists(normalizedName, storage: sourceLocation) // Get the source VM and check if it's running let sourceVM = try get(name: normalizedName, storage: sourceLocation) if sourceVM.details.status == "running" { Logger.error("Cannot clone a running VM", metadata: ["source": normalizedName]) throw VMError.alreadyRunning(normalizedName) } // Check if destination already exists do { let destDir = try home.getVMDirectory(normalizedNewName, storage: destLocation) if destDir.exists() { Logger.error( "Destination VM already exists", metadata: ["destination": normalizedNewName]) throw HomeError.directoryAlreadyExists(path: destDir.dir.path) } } catch VMLocationError.locationNotFound { // Location not found is okay, we'll create it } catch VMError.notFound { // VM not found is okay, we'll create it } // Copy the VM directory try home.copyVMDirectory( from: normalizedName, to: normalizedNewName, sourceLocation: sourceLocation, destLocation: destLocation ) // Update MAC address in the cloned VM to ensure uniqueness let clonedVM = try get(name: normalizedNewName, storage: destLocation) try clonedVM.setMacAddress(VZMACAddress.randomLocallyAdministered().string) // Update MAC Identifier in the cloned VM to ensure uniqueness try clonedVM.setMachineIdentifier( DarwinVirtualizationService.generateMachineIdentifier()) Logger.info( "VM cloned successfully", metadata: ["source": normalizedName, "destination": normalizedNewName]) } catch { Logger.error("Failed to clone VM", metadata: ["error": error.localizedDescription]) throw error } } @MainActor public func get(name: String, storage: String? = nil) throws -> VM { let normalizedName = normalizeVMName(name: name) do { let vm: VM if let storagePath = storage, storagePath.contains("/") || storagePath.contains("\\") { // Storage is a direct path let vmDir = try home.getVMDirectoryFromPath(normalizedName, storagePath: storagePath) guard vmDir.initialized() else { // Throw a specific error if the directory exists but isn't a valid VM if vmDir.exists() { throw VMError.notInitialized(normalizedName) } else { throw VMError.notFound(normalizedName) } } // Pass the path as the storage context vm = try self.loadVM(vmDir: vmDir, storage: storagePath) } else { // Storage is nil or a named location let actualLocation = try self.validateVMExists( normalizedName, storage: storage) let vmDir = try home.getVMDirectory(normalizedName, storage: actualLocation) // loadVM will re-check initialized, but good practice to keep validateVMExists result. vm = try self.loadVM(vmDir: vmDir, storage: actualLocation) } return vm } catch { Logger.error( "Failed to get VM", metadata: [ "vmName": normalizedName, "storage": storage ?? "default", "error": error.localizedDescription, ]) // Re-throw the original error to preserve its type throw error } } @MainActor public func create( name: String, os: String, diskSize: UInt64, cpuCount: Int, memorySize: UInt64, display: String, ipsw: String?, storage: String? = nil ) async throws { Logger.info( "Creating VM", metadata: [ "name": name, "os": os, "location": storage ?? "default", "disk_size": "\(diskSize / 1024 / 1024)MB", "cpu_count": "\(cpuCount)", "memory_size": "\(memorySize / 1024 / 1024)MB", "display": display, "ipsw": ipsw ?? "none", ]) do { try validateCreateParameters(name: name, os: os, ipsw: ipsw, storage: storage) let vm = try await createTempVMConfig( os: os, cpuCount: cpuCount, memorySize: memorySize, diskSize: diskSize, display: display ) try await vm.setup( ipswPath: ipsw ?? "none", cpuCount: cpuCount, memorySize: memorySize, diskSize: diskSize, display: display ) try vm.finalize(to: name, home: home, storage: storage) Logger.info("VM created successfully", metadata: ["name": name]) } catch { Logger.error("Failed to create VM", metadata: ["error": error.localizedDescription]) throw error } } @MainActor public func delete(name: String, storage: String? = nil) async throws { let normalizedName = normalizeVMName(name: name) Logger.info( "Deleting VM", metadata: [ "name": normalizedName, "location": storage ?? "default", ]) do { let vmDir: VMDirectory // Check if storage is a direct path if let storagePath = storage, storagePath.contains("/") || storagePath.contains("\\") { // Storage is a direct path vmDir = try home.getVMDirectoryFromPath(normalizedName, storagePath: storagePath) guard vmDir.initialized() else { // Throw a specific error if the directory exists but isn't a valid VM if vmDir.exists() { throw VMError.notInitialized(normalizedName) } else { throw VMError.notFound(normalizedName) } } } else { // Storage is nil or a named location let actualLocation = try self.validateVMExists(normalizedName, storage: storage) vmDir = try home.getVMDirectory(normalizedName, storage: actualLocation) } // Stop VM if it's running if SharedVM.shared.getVM(name: normalizedName) != nil { try await stopVM(name: normalizedName) } try vmDir.delete() Logger.info("VM deleted successfully", metadata: ["name": normalizedName]) } catch { Logger.error("Failed to delete VM", metadata: ["error": error.localizedDescription]) throw error } } // MARK: - VM Operations @MainActor public func updateSettings( name: String, cpu: Int? = nil, memory: UInt64? = nil, diskSize: UInt64? = nil, display: String? = nil, storage: String? = nil ) throws { let normalizedName = normalizeVMName(name: name) Logger.info( "Updating VM settings", metadata: [ "name": normalizedName, "location": storage ?? "default", "cpu": cpu.map { "\($0)" } ?? "unchanged", "memory": memory.map { "\($0 / 1024 / 1024)MB" } ?? "unchanged", "disk_size": diskSize.map { "\($0 / 1024 / 1024)MB" } ?? "unchanged", "display": display ?? "unchanged", ]) do { // Find the actual location of the VM let actualLocation = try self.validateVMExists( normalizedName, storage: storage) let vm = try get(name: normalizedName, storage: actualLocation) // Apply settings in order if let cpu = cpu { try vm.setCpuCount(cpu) } if let memory = memory { try vm.setMemorySize(memory) } if let diskSize = diskSize { try vm.setDiskSize(diskSize) } if let display = display { try vm.setDisplay(display) } Logger.info("VM settings updated successfully", metadata: ["name": normalizedName]) } catch { Logger.error( "Failed to update VM settings", metadata: ["error": error.localizedDescription]) throw error } } @MainActor public func stopVM(name: String, storage: String? = nil) async throws { let normalizedName = normalizeVMName(name: name) Logger.info("Stopping VM", metadata: ["name": normalizedName]) do { // Find the actual location of the VM let actualLocation = try self.validateVMExists( normalizedName, storage: storage) // Try to get VM from cache first let vm: VM if let cachedVM = SharedVM.shared.getVM(name: normalizedName) { vm = cachedVM } else { vm = try get(name: normalizedName, storage: actualLocation) } try await vm.stop() // Remove VM from cache after stopping SharedVM.shared.removeVM(name: normalizedName) Logger.info("VM stopped successfully", metadata: ["name": normalizedName]) } catch { // Clean up cache even if stop fails SharedVM.shared.removeVM(name: normalizedName) Logger.error("Failed to stop VM", metadata: ["error": error.localizedDescription]) throw error } } @MainActor public func runVM( name: String, noDisplay: Bool = false, sharedDirectories: [SharedDirectory] = [], mount: Path? = nil, registry: String = "ghcr.io", organization: String = "trycua", vncPort: Int = 0, recoveryMode: Bool = false, storage: String? = nil, usbMassStoragePaths: [Path]? = nil ) async throws { let normalizedName = normalizeVMName(name: name) Logger.info( "Running VM", metadata: [ "name": normalizedName, "no_display": "\(noDisplay)", "shared_directories": "\(sharedDirectories.map( { $0.string } ).joined(separator: ", "))", "mount": mount?.path ?? "none", "vnc_port": "\(vncPort)", "recovery_mode": "\(recoveryMode)", "storage_param": storage ?? "default", // Log the original param "usb_storage_devices": "\(usbMassStoragePaths?.count ?? 0)", ]) do { // Check if name is an image ref to auto-pull let components = normalizedName.split(separator: ":") if components.count == 2 { // Check if it looks like image:tag // Attempt to validate if VM exists first, suppressing the error // This avoids pulling if the VM already exists, even if name looks like an image ref let vmExists = (try? self.validateVMExists(normalizedName, storage: storage)) != nil if !vmExists { Logger.info( "VM not found, attempting to pull image based on name", metadata: ["imageRef": normalizedName]) // Use the potentially new VM name derived from the image ref let potentialVMName = String(components[0]) try await pullImage( image: normalizedName, // Full image ref name: potentialVMName, // Name derived from image registry: registry, organization: organization, storage: storage ) // Important: After pull, the effective name might have changed // We proceed assuming the user wants to run the VM derived from image name // normalizedName = potentialVMName // Re-assign normalizedName if pull logic creates it // Note: Current pullImage doesn't return the final VM name, // so we assume it matches the name derived from the image. // This might need refinement if pullImage behaviour changes. } } // Determine effective storage path or name AND get the VMDirectory let effectiveStorage: String? let vmDir: VMDirectory if let storagePath = storage, storagePath.contains("/") || storagePath.contains("\\") { // Storage is a direct path vmDir = try home.getVMDirectoryFromPath(normalizedName, storagePath: storagePath) guard vmDir.initialized() else { if vmDir.exists() { throw VMError.notInitialized(normalizedName) } else { throw VMError.notFound(normalizedName) } } effectiveStorage = storagePath // Use the path string Logger.info("Using direct storage path", metadata: ["path": storagePath]) } else { // Storage is nil or a named location - validate and get the actual name let actualLocationName = try validateVMExists(normalizedName, storage: storage) vmDir = try home.getVMDirectory(normalizedName, storage: actualLocationName) // Get VMDir for named location effectiveStorage = actualLocationName // Use the named location string Logger.info( "Using named storage location", metadata: [ "requested": storage ?? "default", "actual": actualLocationName ?? "default", ]) } // Validate parameters using the located VMDirectory try validateRunParameters( vmDir: vmDir, // Pass vmDir sharedDirectories: sharedDirectories, mount: mount, usbMassStoragePaths: usbMassStoragePaths ) // Load the VM directly using the located VMDirectory and storage context let vm = try self.loadVM(vmDir: vmDir, storage: effectiveStorage) SharedVM.shared.setVM(name: normalizedName, vm: vm) try await vm.run( noDisplay: noDisplay, sharedDirectories: sharedDirectories, mount: mount, vncPort: vncPort, recoveryMode: recoveryMode, usbMassStoragePaths: usbMassStoragePaths) Logger.info("VM started successfully", metadata: ["name": normalizedName]) } catch { SharedVM.shared.removeVM(name: normalizedName) Logger.error("Failed to run VM", metadata: ["error": error.localizedDescription]) throw error } } // MARK: - Image Management @MainActor public func getLatestIPSWURL() async throws -> URL { Logger.info("Fetching latest supported IPSW URL") do { let imageLoader = DarwinImageLoader() let url = try await imageLoader.fetchLatestSupportedURL() Logger.info("Found latest IPSW URL", metadata: ["url": url.absoluteString]) return url } catch { Logger.error( "Failed to fetch IPSW URL", metadata: ["error": error.localizedDescription]) throw error } } @MainActor public func pullImage( image: String, name: String?, registry: String, organization: String, storage: String? = nil ) async throws { do { // Convert non-sparse image to sparse version if needed var actualImage = image var actualName = name // Split the image to get name and tag for both sparse and non-sparse cases let components = image.split(separator: ":") guard components.count == 2 else { throw ValidationError("Invalid image format. Expected format: name:tag") } let originalName = String(components[0]) let tag = String(components[1]) // For consistent VM naming, strip "-sparse" suffix if present when no name provided let normalizedBaseName: String if originalName.hasSuffix("-sparse") { normalizedBaseName = String(originalName.dropLast(7)) // drop "-sparse" } else { normalizedBaseName = originalName } // Set default VM name if not provided if actualName == nil { actualName = "\(normalizedBaseName)_\(tag)" } // Convert non-sparse image to sparse version if needed if !image.contains("-sparse") { // Create sparse version of the image name actualImage = "\(originalName)-sparse:\(tag)" Logger.info( "Converting to sparse image", metadata: [ "original": image, "sparse": actualImage, "vm_name": actualName ?? "default", ] ) } let vmName = actualName ?? "default" // Just use actualName as it's already normalized Logger.info( "Pulling image", metadata: [ "image": actualImage, "name": vmName, "registry": registry, "organization": organization, "location": storage ?? "default", ]) try self.validatePullParameters( image: actualImage, name: vmName, registry: registry, organization: organization, storage: storage ) let imageContainerRegistry = ImageContainerRegistry( registry: registry, organization: organization) let _ = try await imageContainerRegistry.pull( image: actualImage, name: vmName, locationName: storage) Logger.info( "Setting new VM mac address", metadata: [ "vm_name": vmName, "location": storage ?? "default", ]) // Update MAC address in the cloned VM to ensure uniqueness let vm = try get(name: vmName, storage: storage) try vm.setMacAddress(VZMACAddress.randomLocallyAdministered().string) Logger.info( "Image pulled successfully", metadata: [ "image": actualImage, "name": vmName, "registry": registry, "organization": organization, "location": storage ?? "default", ]) } catch { Logger.error("Failed to pull image", metadata: ["error": error.localizedDescription]) throw error } } @MainActor public func pushImage( name: String, imageName: String, tags: [String], registry: String, organization: String, storage: String? = nil, chunkSizeMb: Int = 512, verbose: Bool = false, dryRun: Bool = false, reassemble: Bool = false ) async throws { do { Logger.info( "Pushing VM to registry", metadata: [ "name": name, "imageName": imageName, "tags": "\(tags.joined(separator: ", "))", "registry": registry, "organization": organization, "location": storage ?? "default", "chunk_size": "\(chunkSizeMb)MB", "dry_run": "\(dryRun)", "reassemble": "\(reassemble)", ]) try validatePushParameters( name: name, imageName: imageName, tags: tags, registry: registry, organization: organization ) // Find the actual location of the VM let actualLocation = try self.validateVMExists(name, storage: storage) // Get the VM directory let vmDir = try home.getVMDirectory(name, storage: actualLocation) // Use ImageContainerRegistry to push the VM let imageContainerRegistry = ImageContainerRegistry( registry: registry, organization: organization) try await imageContainerRegistry.push( vmDirPath: vmDir.dir.path, imageName: imageName, tags: tags, chunkSizeMb: chunkSizeMb, verbose: verbose, dryRun: dryRun, reassemble: reassemble ) Logger.info( "VM pushed successfully", metadata: [ "name": name, "imageName": imageName, "tags": "\(tags.joined(separator: ", "))", "registry": registry, "organization": organization, ]) } catch { Logger.error("Failed to push VM", metadata: ["error": error.localizedDescription]) throw error } } @MainActor public func pruneImages() async throws { Logger.info("Pruning cached images") do { // Use configured cache directory let cacheDir = (SettingsManager.shared.getCacheDirectory() as NSString) .expandingTildeInPath let ghcrDir = URL(fileURLWithPath: cacheDir).appendingPathComponent("ghcr") if FileManager.default.fileExists(atPath: ghcrDir.path) { try FileManager.default.removeItem(at: ghcrDir) try FileManager.default.createDirectory( at: ghcrDir, withIntermediateDirectories: true) Logger.info("Successfully removed cached images") } else { Logger.info("No cached images found") } } catch { Logger.error("Failed to prune images", metadata: ["error": error.localizedDescription]) throw error } } public struct ImageInfo: Codable { public let repository: String public let imageId: String // This will be the shortened manifest ID } public struct ImageList: Codable { public let local: [ImageInfo] public let remote: [String] // Keep this for future remote registry support } @MainActor public func getImages(organization: String = "trycua") async throws -> ImageList { Logger.info("Listing local images", metadata: ["organization": organization]) let imageContainerRegistry = ImageContainerRegistry( registry: "ghcr.io", organization: organization) let cachedImages = try await imageContainerRegistry.getImages() let imageInfos = cachedImages.map { image in ImageInfo( repository: image.repository, imageId: String(image.manifestId.prefix(12)) ) } ImagesPrinter.print(images: imageInfos.map { "\($0.repository):\($0.imageId)" }) return ImageList(local: imageInfos, remote: []) } // MARK: - Settings Management public func getSettings() -> LumeSettings { return SettingsManager.shared.getSettings() } public func setHomeDirectory(_ path: String) throws { // Try to set the home directory in settings try SettingsManager.shared.setHomeDirectory(path: path) // Force recreate home instance to use the new path try home.validateHomeDirectory() Logger.info("Home directory updated", metadata: ["path": path]) } // MARK: - VM Location Management public func addLocation(name: String, path: String) throws { Logger.info("Adding VM location", metadata: ["name": name, "path": path]) try home.addLocation(name: name, path: path) Logger.info("VM location added successfully", metadata: ["name": name]) } public func removeLocation(name: String) throws { Logger.info("Removing VM location", metadata: ["name": name]) try home.removeLocation(name: name) Logger.info("VM location removed successfully", metadata: ["name": name]) } public func setDefaultLocation(name: String) throws { Logger.info("Setting default VM location", metadata: ["name": name]) try home.setDefaultLocation(name: name) Logger.info("Default VM location set successfully", metadata: ["name": name]) } public func getLocations() -> [VMLocation] { return home.getLocations() } // MARK: - Cache Directory Management public func setCacheDirectory(path: String) throws { Logger.info("Setting cache directory", metadata: ["path": path]) try SettingsManager.shared.setCacheDirectory(path: path) Logger.info("Cache directory updated", metadata: ["path": path]) } public func getCacheDirectory() -> String { return SettingsManager.shared.getCacheDirectory() } public func isCachingEnabled() -> Bool { return SettingsManager.shared.isCachingEnabled() } public func setCachingEnabled(_ enabled: Bool) throws { Logger.info("Setting caching enabled", metadata: ["enabled": "\(enabled)"]) try SettingsManager.shared.setCachingEnabled(enabled) Logger.info("Caching setting updated", metadata: ["enabled": "\(enabled)"]) } // MARK: - Private Helper Methods /// Normalizes a VM name by replacing colons with underscores private func normalizeVMName(name: String) -> String { let components = name.split(separator: ":") return components.count == 2 ? "\(components[0])_\(components[1])" : name } @MainActor private func createTempVMConfig( os: String, cpuCount: Int, memorySize: UInt64, diskSize: UInt64, display: String ) async throws -> VM { let config = try VMConfig( os: os, cpuCount: cpuCount, memorySize: memorySize, diskSize: diskSize, macAddress: VZMACAddress.randomLocallyAdministered().string, display: display ) let vmDirContext = VMDirContext( dir: try home.createTempVMDirectory(), config: config, home: home, storage: nil ) let imageLoader = os.lowercased() == "macos" ? imageLoaderFactory.createImageLoader() : nil return try vmFactory.createVM(vmDirContext: vmDirContext, imageLoader: imageLoader) } @MainActor private func loadVM(vmDir: VMDirectory, storage: String?) throws -> VM { // vmDir is now passed directly guard vmDir.initialized() else { throw VMError.notInitialized(vmDir.name) // Use name from vmDir } let config: VMConfig = try vmDir.loadConfig() // Pass the provided storage (which could be a path or named location) let vmDirContext = VMDirContext( dir: vmDir, config: config, home: home, storage: storage ) let imageLoader = config.os.lowercased() == "macos" ? imageLoaderFactory.createImageLoader() : nil return try vmFactory.createVM(vmDirContext: vmDirContext, imageLoader: imageLoader) } // MARK: - Validation Methods private func validateCreateParameters( name: String, os: String, ipsw: String?, storage: String? ) throws { if os.lowercased() == "macos" { guard let ipsw = ipsw else { throw ValidationError("IPSW path required for macOS VM") } if ipsw != "latest" && !FileManager.default.fileExists(atPath: ipsw) { throw ValidationError("IPSW file not found") } } else if os.lowercased() == "linux" { if ipsw != nil { throw ValidationError("IPSW path not supported for Linux VM") } } else { throw ValidationError("Unsupported OS type: \(os)") } let vmDir: VMDirectory = try home.getVMDirectory(name, storage: storage) if vmDir.exists() { throw VMError.alreadyExists(name) } } private func validateSharedDirectories(_ directories: [SharedDirectory]) throws { for dir in directories { var isDirectory: ObjCBool = false guard FileManager.default.fileExists(atPath: dir.hostPath, isDirectory: &isDirectory), isDirectory.boolValue else { throw ValidationError( "Host path does not exist or is not a directory: \(dir.hostPath)") } } } public func validateVMExists(_ name: String, storage: String? = nil) throws -> String? { // If location is specified, only check that location if let storage = storage { // Check if storage is a path by looking for directory separator if storage.contains("/") || storage.contains("\\") { // Treat as direct path let vmDir = try home.getVMDirectoryFromPath(name, storagePath: storage) guard vmDir.initialized() else { throw VMError.notFound(name) } return storage // Return the path as the location identifier } else { // Treat as named storage let vmDir = try home.getVMDirectory(name, storage: storage) guard vmDir.initialized() else { throw VMError.notFound(name) } return storage } } // If no location specified, try to find the VM in any location let allVMs = try home.getAllVMDirectories() if let foundVM = allVMs.first(where: { $0.directory.name == name }) { // VM found, return its location return foundVM.locationName } // VM not found in any location throw VMError.notFound(name) } private func validateRunParameters( vmDir: VMDirectory, // Changed signature: accept VMDirectory sharedDirectories: [SharedDirectory]?, mount: Path?, usbMassStoragePaths: [Path]? = nil ) throws { // VM existence is confirmed by having vmDir, no need for validateVMExists if let dirs = sharedDirectories { try self.validateSharedDirectories(dirs) } // Validate USB mass storage paths if let usbPaths = usbMassStoragePaths { for path in usbPaths { if !FileManager.default.fileExists(atPath: path.path) { throw ValidationError("USB mass storage image not found: \(path.path)") } } if #available(macOS 15.0, *) { // USB mass storage is supported } else { Logger.info( "USB mass storage devices require macOS 15.0 or later. They will be ignored.") } } // Load config directly from vmDir let vmConfig = try vmDir.loadConfig() switch vmConfig.os.lowercased() { case "macos": if mount != nil { throw ValidationError( "Mounting disk images is not supported for macOS VMs. If you are looking to mount a IPSW, please use the --ipsw option in the create command." ) } case "linux": if let mount = mount, !FileManager.default.fileExists(atPath: mount.path) { throw ValidationError("Mount file not found: \(mount.path)") } default: break } } private func validatePullParameters( image: String, name: String, registry: String, organization: String, storage: String? = nil ) throws { guard !image.isEmpty else { throw ValidationError("Image name cannot be empty") } guard !name.isEmpty else { throw ValidationError("VM name cannot be empty") } guard !registry.isEmpty else { throw ValidationError("Registry cannot be empty") } guard !organization.isEmpty else { throw ValidationError("Organization cannot be empty") } // Determine if storage is a path or a named storage location let vmDir: VMDirectory if let storage = storage, storage.contains("/") || storage.contains("\\") { // Create the base directory if it doesn't exist if !FileManager.default.fileExists(atPath: storage) { Logger.info("Creating VM storage directory", metadata: ["path": storage]) do { try FileManager.default.createDirectory( atPath: storage, withIntermediateDirectories: true ) } catch { throw HomeError.directoryCreationFailed(path: storage) } } // Use getVMDirectoryFromPath for direct paths vmDir = try home.getVMDirectoryFromPath(name, storagePath: storage) } else { // Use getVMDirectory for named storage locations vmDir = try home.getVMDirectory(name, storage: storage) } if vmDir.exists() { throw VMError.alreadyExists(name) } } private func validatePushParameters( name: String, imageName: String, tags: [String], registry: String, organization: String ) throws { guard !name.isEmpty else { throw ValidationError("VM name cannot be empty") } guard !imageName.isEmpty else { throw ValidationError("Image name cannot be empty") } guard !tags.isEmpty else { throw ValidationError("At least one tag must be provided.") } guard !registry.isEmpty else { throw ValidationError("Registry cannot be empty") } guard !organization.isEmpty else { throw ValidationError("Organization cannot be empty") } // Verify VM exists (this will throw if not found) _ = try self.validateVMExists(name) } } ```