This is page 12 of 16. Use http://codebase.md/trycua/cua?page={x} to view the full context. # Directory Structure ``` ├── .all-contributorsrc ├── .cursorignore ├── .devcontainer │ ├── devcontainer.json │ ├── post-install.sh │ └── README.md ├── .dockerignore ├── .gitattributes ├── .github │ ├── FUNDING.yml │ ├── scripts │ │ ├── get_pyproject_version.py │ │ └── tests │ │ ├── __init__.py │ │ ├── README.md │ │ └── test_get_pyproject_version.py │ └── workflows │ ├── ci-lume.yml │ ├── docker-publish-kasm.yml │ ├── docker-publish-xfce.yml │ ├── docker-reusable-publish.yml │ ├── npm-publish-computer.yml │ ├── npm-publish-core.yml │ ├── publish-lume.yml │ ├── pypi-publish-agent.yml │ ├── pypi-publish-computer-server.yml │ ├── pypi-publish-computer.yml │ ├── pypi-publish-core.yml │ ├── pypi-publish-mcp-server.yml │ ├── pypi-publish-pylume.yml │ ├── pypi-publish-som.yml │ ├── pypi-reusable-publish.yml │ └── test-validation-script.yml ├── .gitignore ├── .vscode │ ├── docs.code-workspace │ ├── launch.json │ ├── libs-ts.code-workspace │ ├── lume.code-workspace │ ├── lumier.code-workspace │ ├── py.code-workspace │ └── settings.json ├── blog │ ├── app-use.md │ ├── assets │ │ ├── composite-agents.png │ │ ├── docker-ubuntu-support.png │ │ ├── hack-booth.png │ │ ├── hack-closing-ceremony.jpg │ │ ├── hack-cua-ollama-hud.jpeg │ │ ├── hack-leaderboard.png │ │ ├── hack-the-north.png │ │ ├── hack-winners.jpeg │ │ ├── hack-workshop.jpeg │ │ ├── hud-agent-evals.png │ │ └── trajectory-viewer.jpeg │ ├── bringing-computer-use-to-the-web.md │ ├── build-your-own-operator-on-macos-1.md │ ├── build-your-own-operator-on-macos-2.md │ ├── composite-agents.md │ ├── cua-hackathon.md │ ├── hack-the-north.md │ ├── hud-agent-evals.md │ ├── human-in-the-loop.md │ ├── introducing-cua-cloud-containers.md │ ├── lume-to-containerization.md │ ├── sandboxed-python-execution.md │ ├── training-computer-use-models-trajectories-1.md │ ├── trajectory-viewer.md │ ├── ubuntu-docker-support.md │ └── windows-sandbox.md ├── CONTRIBUTING.md ├── Development.md ├── Dockerfile ├── docs │ ├── .gitignore │ ├── .prettierrc │ ├── content │ │ └── docs │ │ ├── agent-sdk │ │ │ ├── agent-loops.mdx │ │ │ ├── benchmarks │ │ │ │ ├── index.mdx │ │ │ │ ├── interactive.mdx │ │ │ │ ├── introduction.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── osworld-verified.mdx │ │ │ │ ├── screenspot-pro.mdx │ │ │ │ └── screenspot-v2.mdx │ │ │ ├── callbacks │ │ │ │ ├── agent-lifecycle.mdx │ │ │ │ ├── cost-saving.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── logging.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── pii-anonymization.mdx │ │ │ │ └── trajectories.mdx │ │ │ ├── chat-history.mdx │ │ │ ├── custom-computer-handlers.mdx │ │ │ ├── custom-tools.mdx │ │ │ ├── customizing-computeragent.mdx │ │ │ ├── integrations │ │ │ │ ├── hud.mdx │ │ │ │ └── meta.json │ │ │ ├── message-format.mdx │ │ │ ├── meta.json │ │ │ ├── migration-guide.mdx │ │ │ ├── prompt-caching.mdx │ │ │ ├── supported-agents │ │ │ │ ├── composed-agents.mdx │ │ │ │ ├── computer-use-agents.mdx │ │ │ │ ├── grounding-models.mdx │ │ │ │ ├── human-in-the-loop.mdx │ │ │ │ └── meta.json │ │ │ ├── supported-model-providers │ │ │ │ ├── index.mdx │ │ │ │ └── local-models.mdx │ │ │ └── usage-tracking.mdx │ │ ├── computer-sdk │ │ │ ├── cloud-vm-management.mdx │ │ │ ├── commands.mdx │ │ │ ├── computer-ui.mdx │ │ │ ├── computers.mdx │ │ │ ├── meta.json │ │ │ └── sandboxed-python.mdx │ │ ├── index.mdx │ │ ├── libraries │ │ │ ├── agent │ │ │ │ └── index.mdx │ │ │ ├── computer │ │ │ │ └── index.mdx │ │ │ ├── computer-server │ │ │ │ ├── Commands.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── REST-API.mdx │ │ │ │ └── WebSocket-API.mdx │ │ │ ├── core │ │ │ │ └── index.mdx │ │ │ ├── lume │ │ │ │ ├── cli-reference.mdx │ │ │ │ ├── faq.md │ │ │ │ ├── http-api.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── meta.json │ │ │ │ └── prebuilt-images.mdx │ │ │ ├── lumier │ │ │ │ ├── building-lumier.mdx │ │ │ │ ├── docker-compose.mdx │ │ │ │ ├── docker.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ └── meta.json │ │ │ ├── mcp-server │ │ │ │ ├── client-integrations.mdx │ │ │ │ ├── configuration.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── llm-integrations.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── tools.mdx │ │ │ │ └── usage.mdx │ │ │ └── som │ │ │ ├── configuration.mdx │ │ │ └── index.mdx │ │ ├── meta.json │ │ ├── quickstart-cli.mdx │ │ ├── quickstart-devs.mdx │ │ └── telemetry.mdx │ ├── next.config.mjs │ ├── package-lock.json │ ├── package.json │ ├── pnpm-lock.yaml │ ├── postcss.config.mjs │ ├── public │ │ └── img │ │ ├── agent_gradio_ui.png │ │ ├── agent.png │ │ ├── cli.png │ │ ├── computer.png │ │ ├── som_box_threshold.png │ │ └── som_iou_threshold.png │ ├── README.md │ ├── source.config.ts │ ├── src │ │ ├── app │ │ │ ├── (home) │ │ │ │ ├── [[...slug]] │ │ │ │ │ └── page.tsx │ │ │ │ └── layout.tsx │ │ │ ├── api │ │ │ │ └── search │ │ │ │ └── route.ts │ │ │ ├── favicon.ico │ │ │ ├── global.css │ │ │ ├── layout.config.tsx │ │ │ ├── layout.tsx │ │ │ ├── llms.mdx │ │ │ │ └── [[...slug]] │ │ │ │ └── route.ts │ │ │ └── llms.txt │ │ │ └── route.ts │ │ ├── assets │ │ │ ├── discord-black.svg │ │ │ ├── discord-white.svg │ │ │ ├── logo-black.svg │ │ │ └── logo-white.svg │ │ ├── components │ │ │ ├── iou.tsx │ │ │ └── mermaid.tsx │ │ ├── lib │ │ │ ├── llms.ts │ │ │ └── source.ts │ │ └── mdx-components.tsx │ └── tsconfig.json ├── examples │ ├── agent_examples.py │ ├── agent_ui_examples.py │ ├── cloud_api_examples.py │ ├── computer_examples_windows.py │ ├── computer_examples.py │ ├── computer_ui_examples.py │ ├── computer-example-ts │ │ ├── .env.example │ │ ├── .gitignore │ │ ├── .prettierrc │ │ ├── package-lock.json │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── README.md │ │ ├── src │ │ │ ├── helpers.ts │ │ │ └── index.ts │ │ └── tsconfig.json │ ├── docker_examples.py │ ├── evals │ │ ├── hud_eval_examples.py │ │ └── wikipedia_most_linked.txt │ ├── pylume_examples.py │ ├── sandboxed_functions_examples.py │ ├── som_examples.py │ ├── utils.py │ └── winsandbox_example.py ├── img │ ├── agent_gradio_ui.png │ ├── agent.png │ ├── cli.png │ ├── computer.png │ ├── logo_black.png │ └── logo_white.png ├── libs │ ├── kasm │ │ ├── Dockerfile │ │ ├── LICENSE │ │ ├── README.md │ │ └── src │ │ └── ubuntu │ │ └── install │ │ └── firefox │ │ ├── custom_startup.sh │ │ ├── firefox.desktop │ │ └── install_firefox.sh │ ├── lume │ │ ├── .cursorignore │ │ ├── CONTRIBUTING.md │ │ ├── Development.md │ │ ├── img │ │ │ └── cli.png │ │ ├── Package.resolved │ │ ├── Package.swift │ │ ├── README.md │ │ ├── resources │ │ │ └── lume.entitlements │ │ ├── scripts │ │ │ ├── build │ │ │ │ ├── build-debug.sh │ │ │ │ ├── build-release-notarized.sh │ │ │ │ └── build-release.sh │ │ │ └── install.sh │ │ ├── src │ │ │ ├── Commands │ │ │ │ ├── Clone.swift │ │ │ │ ├── Config.swift │ │ │ │ ├── Create.swift │ │ │ │ ├── Delete.swift │ │ │ │ ├── Get.swift │ │ │ │ ├── Images.swift │ │ │ │ ├── IPSW.swift │ │ │ │ ├── List.swift │ │ │ │ ├── Logs.swift │ │ │ │ ├── Options │ │ │ │ │ └── FormatOption.swift │ │ │ │ ├── Prune.swift │ │ │ │ ├── Pull.swift │ │ │ │ ├── Push.swift │ │ │ │ ├── Run.swift │ │ │ │ ├── Serve.swift │ │ │ │ ├── Set.swift │ │ │ │ └── Stop.swift │ │ │ ├── ContainerRegistry │ │ │ │ ├── ImageContainerRegistry.swift │ │ │ │ ├── ImageList.swift │ │ │ │ └── ImagesPrinter.swift │ │ │ ├── Errors │ │ │ │ └── Errors.swift │ │ │ ├── FileSystem │ │ │ │ ├── Home.swift │ │ │ │ ├── Settings.swift │ │ │ │ ├── VMConfig.swift │ │ │ │ ├── VMDirectory.swift │ │ │ │ └── VMLocation.swift │ │ │ ├── LumeController.swift │ │ │ ├── Main.swift │ │ │ ├── Server │ │ │ │ ├── Handlers.swift │ │ │ │ ├── HTTP.swift │ │ │ │ ├── Requests.swift │ │ │ │ ├── Responses.swift │ │ │ │ └── Server.swift │ │ │ ├── Utils │ │ │ │ ├── CommandRegistry.swift │ │ │ │ ├── CommandUtils.swift │ │ │ │ ├── Logger.swift │ │ │ │ ├── NetworkUtils.swift │ │ │ │ ├── Path.swift │ │ │ │ ├── ProcessRunner.swift │ │ │ │ ├── ProgressLogger.swift │ │ │ │ ├── String.swift │ │ │ │ └── Utils.swift │ │ │ ├── Virtualization │ │ │ │ ├── DarwinImageLoader.swift │ │ │ │ ├── DHCPLeaseParser.swift │ │ │ │ ├── ImageLoaderFactory.swift │ │ │ │ └── VMVirtualizationService.swift │ │ │ ├── VM │ │ │ │ ├── DarwinVM.swift │ │ │ │ ├── LinuxVM.swift │ │ │ │ ├── VM.swift │ │ │ │ ├── VMDetails.swift │ │ │ │ ├── VMDetailsPrinter.swift │ │ │ │ ├── VMDisplayResolution.swift │ │ │ │ └── VMFactory.swift │ │ │ └── VNC │ │ │ ├── PassphraseGenerator.swift │ │ │ └── VNCService.swift │ │ └── tests │ │ ├── Mocks │ │ │ ├── MockVM.swift │ │ │ ├── MockVMVirtualizationService.swift │ │ │ └── MockVNCService.swift │ │ ├── VM │ │ │ └── VMDetailsPrinterTests.swift │ │ ├── VMTests.swift │ │ ├── VMVirtualizationServiceTests.swift │ │ └── VNCServiceTests.swift │ ├── lumier │ │ ├── .dockerignore │ │ ├── Dockerfile │ │ ├── README.md │ │ └── src │ │ ├── bin │ │ │ └── entry.sh │ │ ├── config │ │ │ └── constants.sh │ │ ├── hooks │ │ │ └── on-logon.sh │ │ └── lib │ │ ├── utils.sh │ │ └── vm.sh │ ├── python │ │ ├── agent │ │ │ ├── .bumpversion.cfg │ │ │ ├── agent │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── adapters │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── huggingfacelocal_adapter.py │ │ │ │ │ ├── human_adapter.py │ │ │ │ │ ├── mlxvlm_adapter.py │ │ │ │ │ └── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── qwen2_5_vl.py │ │ │ │ ├── agent.py │ │ │ │ ├── callbacks │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── budget_manager.py │ │ │ │ │ ├── image_retention.py │ │ │ │ │ ├── logging.py │ │ │ │ │ ├── operator_validator.py │ │ │ │ │ ├── pii_anonymization.py │ │ │ │ │ ├── prompt_instructions.py │ │ │ │ │ ├── telemetry.py │ │ │ │ │ └── trajectory_saver.py │ │ │ │ ├── cli.py │ │ │ │ ├── computers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cua.py │ │ │ │ │ └── custom.py │ │ │ │ ├── decorators.py │ │ │ │ ├── human_tool │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ ├── server.py │ │ │ │ │ └── ui.py │ │ │ │ ├── integrations │ │ │ │ │ └── hud │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── agent.py │ │ │ │ │ └── proxy.py │ │ │ │ ├── loops │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── anthropic.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── composed_grounded.py │ │ │ │ │ ├── gemini.py │ │ │ │ │ ├── glm45v.py │ │ │ │ │ ├── gta1.py │ │ │ │ │ ├── holo.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── model_types.csv │ │ │ │ │ ├── moondream3.py │ │ │ │ │ ├── omniparser.py │ │ │ │ │ ├── openai.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── uitars.py │ │ │ │ ├── proxy │ │ │ │ │ ├── examples.py │ │ │ │ │ └── handlers.py │ │ │ │ ├── responses.py │ │ │ │ ├── types.py │ │ │ │ └── ui │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ └── gradio │ │ │ │ ├── __init__.py │ │ │ │ ├── app.py │ │ │ │ └── ui_components.py │ │ │ ├── benchmarks │ │ │ │ ├── .gitignore │ │ │ │ ├── contrib.md │ │ │ │ ├── interactive.py │ │ │ │ ├── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ └── gta1.py │ │ │ │ ├── README.md │ │ │ │ ├── ss-pro.py │ │ │ │ ├── ss-v2.py │ │ │ │ └── utils.py │ │ │ ├── example.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer │ │ │ │ ├── __init__.py │ │ │ │ ├── computer.py │ │ │ │ ├── diorama_computer.py │ │ │ │ ├── helpers.py │ │ │ │ ├── interface │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ ├── models.py │ │ │ │ │ └── windows.py │ │ │ │ ├── logger.py │ │ │ │ ├── models.py │ │ │ │ ├── providers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cloud │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── docker │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── lume │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── lume_api.py │ │ │ │ │ ├── lumier │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── types.py │ │ │ │ │ └── winsandbox │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── provider.py │ │ │ │ │ └── setup_script.ps1 │ │ │ │ ├── ui │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ └── gradio │ │ │ │ │ ├── __init__.py │ │ │ │ │ └── app.py │ │ │ │ └── utils.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── cli.py │ │ │ │ ├── diorama │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── diorama_computer.py │ │ │ │ │ ├── diorama.py │ │ │ │ │ ├── draw.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── safezone.py │ │ │ │ ├── handlers │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── windows.py │ │ │ │ ├── main.py │ │ │ │ ├── server.py │ │ │ │ └── watchdog.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── usage_example.py │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ ├── run_server.py │ │ │ └── test_connection.py │ │ ├── core │ │ │ ├── .bumpversion.cfg │ │ │ ├── core │ │ │ │ ├── __init__.py │ │ │ │ └── telemetry │ │ │ │ ├── __init__.py │ │ │ │ └── posthog.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── mcp-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── CONCURRENT_SESSIONS.md │ │ │ ├── mcp_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── server.py │ │ │ │ └── session_manager.py │ │ │ ├── pdm.lock │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ └── scripts │ │ │ ├── install_mcp_server.sh │ │ │ └── start_mcp_server.sh │ │ ├── pylume │ │ │ ├── __init__.py │ │ │ ├── .bumpversion.cfg │ │ │ ├── pylume │ │ │ │ ├── __init__.py │ │ │ │ ├── client.py │ │ │ │ ├── exceptions.py │ │ │ │ ├── lume │ │ │ │ ├── models.py │ │ │ │ ├── pylume.py │ │ │ │ └── server.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ └── som │ │ ├── .bumpversion.cfg │ │ ├── LICENSE │ │ ├── poetry.toml │ │ ├── pyproject.toml │ │ ├── README.md │ │ ├── som │ │ │ ├── __init__.py │ │ │ ├── detect.py │ │ │ ├── detection.py │ │ │ ├── models.py │ │ │ ├── ocr.py │ │ │ ├── util │ │ │ │ └── utils.py │ │ │ └── visualization.py │ │ └── tests │ │ └── test_omniparser.py │ ├── typescript │ │ ├── .gitignore │ │ ├── .nvmrc │ │ ├── agent │ │ │ ├── examples │ │ │ │ ├── playground-example.html │ │ │ │ └── README.md │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── client.ts │ │ │ │ ├── index.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ └── client.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── biome.json │ │ ├── computer │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── computer │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── providers │ │ │ │ │ │ ├── base.ts │ │ │ │ │ │ ├── cloud.ts │ │ │ │ │ │ └── index.ts │ │ │ │ │ └── types.ts │ │ │ │ ├── index.ts │ │ │ │ ├── interface │ │ │ │ │ ├── base.ts │ │ │ │ │ ├── factory.ts │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── linux.ts │ │ │ │ │ ├── macos.ts │ │ │ │ │ └── windows.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ ├── computer │ │ │ │ │ └── cloud.test.ts │ │ │ │ ├── interface │ │ │ │ │ ├── factory.test.ts │ │ │ │ │ ├── index.test.ts │ │ │ │ │ ├── linux.test.ts │ │ │ │ │ ├── macos.test.ts │ │ │ │ │ └── windows.test.ts │ │ │ │ └── setup.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── core │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── index.ts │ │ │ │ └── telemetry │ │ │ │ ├── clients │ │ │ │ │ ├── index.ts │ │ │ │ │ └── posthog.ts │ │ │ │ └── index.ts │ │ │ ├── tests │ │ │ │ └── telemetry.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── pnpm-workspace.yaml │ │ └── README.md │ └── xfce │ ├── .dockerignore │ ├── .gitignore │ ├── Dockerfile │ ├── README.md │ └── src │ ├── scripts │ │ ├── resize-display.sh │ │ ├── start-computer-server.sh │ │ ├── start-novnc.sh │ │ ├── start-vnc.sh │ │ └── xstartup.sh │ ├── supervisor │ │ └── supervisord.conf │ └── xfce-config │ ├── helpers.rc │ ├── xfce4-power-manager.xml │ └── xfce4-session.xml ├── LICENSE.md ├── Makefile ├── notebooks │ ├── agent_nb.ipynb │ ├── blog │ │ ├── build-your-own-operator-on-macos-1.ipynb │ │ └── build-your-own-operator-on-macos-2.ipynb │ ├── composite_agents_docker_nb.ipynb │ ├── computer_nb.ipynb │ ├── computer_server_nb.ipynb │ ├── customizing_computeragent.ipynb │ ├── eval_osworld.ipynb │ ├── ollama_nb.ipynb │ ├── pylume_nb.ipynb │ ├── README.md │ ├── sota_hackathon_cloud.ipynb │ └── sota_hackathon.ipynb ├── pdm.lock ├── pyproject.toml ├── pyrightconfig.json ├── README.md ├── samples │ └── community │ ├── global-online │ │ └── README.md │ └── hack-the-north │ └── README.md ├── scripts │ ├── build-uv.sh │ ├── build.ps1 │ ├── build.sh │ ├── cleanup.sh │ ├── playground-docker.sh │ ├── playground.sh │ └── run-docker-dev.sh └── tests ├── pytest.ini ├── shell_cmd.py ├── test_files.py ├── test_mcp_server_session_management.py ├── test_mcp_server_streaming.py ├── test_shell_bash.py ├── test_telemetry.py ├── test_venv.py └── test_watchdog.py ``` # Files -------------------------------------------------------------------------------- /docs/content/docs/libraries/lume/http-api.mdx: -------------------------------------------------------------------------------- ```markdown --- title: HTTP Server API description: Lume exposes a local HTTP API server that listens at localhost for programmatic management of VMs. --- import { Tabs, Tab } from 'fumadocs-ui/components/tabs'; import { Callout } from 'fumadocs-ui/components/callout'; ## Default URL ``` http://localhost:7777 ``` <Callout type="info"> The HTTP API service runs on port `7777` by default. If you'd like to use a different port, pass the `--port` option during installation or when running `lume serve`. </Callout> ## Endpoints --- ### Create VM Create a new virtual machine. `POST: /lume/vms` #### Parameters | Name | Type | Required | Description | | -------- | ------- | -------- | ------------------------------------ | | name | string | Yes | Name of the VM | | os | string | Yes | Guest OS (`macOS`, `linux`, etc.) | | cpu | integer | Yes | Number of CPU cores | | memory | string | Yes | Memory size (e.g. `4GB`) | | diskSize | string | Yes | Disk size (e.g. `64GB`) | | display | string | No | Display resolution (e.g. `1024x768`) | | ipsw | string | No | IPSW version (e.g. `latest`) | | storage | string | No | Storage type (`ssd`, etc.) | #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ -H "Content-Type: application/json" \ -d '{ "name": "lume_vm", "os": "macOS", "cpu": 2, "memory": "4GB", "diskSize": "64GB", "display": "1024x768", "ipsw": "latest", "storage": "ssd" }' \ http://localhost:7777/lume/vms ``` </Tab> <Tab value="Python"> ```python import requests payload = { "name": "lume_vm", "os": "macOS", "cpu": 2, "memory": "4GB", "diskSize": "64GB", "display": "1024x768", "ipsw": "latest", "storage": "ssd" } r = requests.post("http://localhost:7777/lume/vms", json=payload, timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const payload = { name: 'lume_vm', os: 'macOS', cpu: 2, memory: '4GB', diskSize: '64GB', display: '1024x768', ipsw: 'latest', storage: 'ssd', }; const res = await fetch('http://localhost:7777/lume/vms', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }); console.log(await res.json()); ``` </Tab> </Tabs> --- ### Run VM Run a virtual machine instance. `POST: /lume/vms/:name/run` #### Parameters | Name | Type | Required | Description | | ----------------- | --------------- | -------- | --------------------------------------------------- | | noDisplay | boolean | No | If true, do not start VNC client | | sharedDirectories | array of object | No | List of shared directories (`hostPath`, `readOnly`) | | recoveryMode | boolean | No | Start in recovery mode | | storage | string | No | Storage type (`ssd`, etc.) | #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash # Basic run curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ http://localhost:7777/lume/vms/my-vm-name/run # Run with VNC client started and shared directory curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ -H "Content-Type: application/json" \ -d '{ "noDisplay": false, "sharedDirectories": [ { "hostPath": "~/Projects", "readOnly": false } ], "recoveryMode": false, "storage": "ssd" }' \ http://localhost:7777/lume/vms/lume_vm/run ``` </Tab> <Tab value="Python"> ```python import requests # Basic run r = requests.post("http://localhost:7777/lume/vms/my-vm-name/run", timeout=50) print(r.json()) # With VNC and shared directory payload = { "noDisplay": False, "sharedDirectories": [ {"hostPath": "~/Projects", "readOnly": False} ], "recoveryMode": False, "storage": "ssd" } r = requests.post("http://localhost:7777/lume/vms/lume_vm/run", json=payload, timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript // Basic run let res = await fetch('http://localhost:7777/lume/vms/my-vm-name/run', { method: 'POST', }); console.log(await res.json()); // With VNC and shared directory const payload = { noDisplay: false, sharedDirectories: [{ hostPath: '~/Projects', readOnly: false }], recoveryMode: false, storage: 'ssd', }; res = await fetch('http://localhost:7777/lume/vms/lume_vm/run', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }); console.log(await res.json()); ``` </Tab> </Tabs> --- ### List VMs List all virtual machines. `GET: /lume/vms` #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ http://localhost:7777/lume/vms ``` </Tab> <Tab value="Python"> ```python import requests r = requests.get("http://localhost:7777/lume/vms", timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const res = await fetch('http://localhost:7777/lume/vms'); console.log(await res.json()); ``` </Tab> </Tabs> ```json [ { "name": "my-vm", "state": "stopped", "os": "macOS", "cpu": 2, "memory": "4GB", "diskSize": "64GB" }, { "name": "my-vm-2", "state": "stopped", "os": "linux", "cpu": 2, "memory": "4GB", "diskSize": "64GB" } ] ``` --- ### Get VM Details Get details for a specific virtual machine. `GET: /lume/vms/:name` #### Parameters | Name | Type | Required | Description | | ------- | ------ | -------- | -------------------------- | | storage | string | No | Storage type (`ssd`, etc.) | #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash # Basic get curl --connect-timeout 6000 \ --max-time 5000 \ http://localhost:7777/lume/vms/lume_vm # Get with specific storage curl --connect-timeout 6000 \ --max-time 5000 \ http://localhost:7777/lume/vms/lume_vm?storage=ssd ``` </Tab> <Tab value="Python"> ```python import requests # Basic get details = requests.get("http://localhost:7777/lume/vms/lume_vm", timeout=50) print(details.json()) # Get with specific storage details = requests.get("http://localhost:7777/lume/vms/lume_vm", params={"storage": "ssd"}, timeout=50) print(details.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript // Basic get let res = await fetch('http://localhost:7777/lume/vms/lume_vm'); console.log(await res.json()); // Get with specific storage res = await fetch('http://localhost:7777/lume/vms/lume_vm?storage=ssd'); console.log(await res.json()); ``` </Tab> </Tabs> ```json { "name": "lume_vm", "state": "stopped", "os": "macOS", "cpu": 2, "memory": "4GB", "diskSize": "64GB", "display": "1024x768", "ipAddress": "192.168.65.2", "vncPort": 5900, "sharedDirectories": [ { "hostPath": "~/Projects", "readOnly": false, "tag": "com.apple.virtio-fs.automount" } ] } ``` --- ### Update VM Configuration Update the configuration of a virtual machine. `PATCH: /lume/vms/:name` #### Parameters | Name | Type | Required | Description | | -------- | ------- | -------- | ------------------------------------- | | cpu | integer | No | Number of CPU cores | | memory | string | No | Memory size (e.g. `8GB`) | | diskSize | string | No | Disk size (e.g. `100GB`) | | display | string | No | Display resolution (e.g. `1920x1080`) | | storage | string | No | Storage type (`ssd`, etc.) | #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ -X PATCH \ -H "Content-Type: application/json" \ -d '{ "cpu": 4, "memory": "8GB", "diskSize": "100GB", "display": "1920x1080", "storage": "ssd" }' \ http://localhost:7777/lume/vms/lume_vm ``` </Tab> <Tab value="Python"> ```python import requests payload = { "cpu": 4, "memory": "8GB", "diskSize": "100GB", "display": "1920x1080", "storage": "ssd" } r = requests.patch("http://localhost:7777/lume/vms/lume_vm", json=payload, timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const payload = { cpu: 4, memory: '8GB', diskSize: '100GB', display: '1920x1080', storage: 'ssd', }; const res = await fetch('http://localhost:7777/lume/vms/lume_vm', { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }); console.log(await res.json()); ``` </Tab> </Tabs> --- ### Stop VM Stop a running virtual machine. `POST: /lume/vms/:name/stop` #### Parameters | Name | Type | Required | Description | | ------- | ------ | -------- | -------------------------- | | storage | string | No | Storage type (`ssd`, etc.) | #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash # Basic stop curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ http://localhost:7777/lume/vms/lume_vm/stop # Stop with storage location specified curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ http://localhost:7777/lume/vms/lume_vm/stop?storage=ssd ``` </Tab> <Tab value="Python"> ```python import requests # Basic stop r = requests.post("http://localhost:7777/lume/vms/lume_vm/stop", timeout=50) print(r.json()) # Stop with storage location specified r = requests.post("http://localhost:7777/lume/vms/lume_vm/stop", params={"storage": "ssd"}, timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript // Basic stop let res = await fetch('http://localhost:7777/lume/vms/lume_vm/stop', { method: 'POST', }); console.log(await res.json()); // Stop with storage location specified res = await fetch('http://localhost:7777/lume/vms/lume_vm/stop?storage=ssd', { method: 'POST', }); console.log(await res.json()); ``` </Tab> </Tabs> --- ### Delete VM Delete a virtual machine instance. `DELETE: /lume/vms/:name` #### Parameters | Name | Type | Required | Description | | ------- | ------ | -------- | -------------------------- | | storage | string | No | Storage type (`ssd`, etc.) | #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash # Basic delete curl --connect-timeout 6000 \ --max-time 5000 \ -X DELETE \ http://localhost:7777/lume/vms/lume_vm # Delete with specific storage curl --connect-timeout 6000 \ --max-time 5000 \ -X DELETE \ http://localhost:7777/lume/vms/lume_vm?storage=ssd ``` </Tab> <Tab value="Python"> ```python import requests # Basic delete r = requests.delete("http://localhost:7777/lume/vms/lume_vm", timeout=50) print(r.status_code) # Delete with specific storage r = requests.delete("http://localhost:7777/lume/vms/lume_vm", params={"storage": "ssd"}, timeout=50) print(r.status_code) ``` </Tab> <Tab value="TypeScript"> ```typescript // Basic delete let res = await fetch('http://localhost:7777/lume/vms/lume_vm', { method: 'DELETE', }); console.log(res.status); // Delete with specific storage res = await fetch('http://localhost:7777/lume/vms/lume_vm?storage=ssd', { method: 'DELETE', }); console.log(res.status); ``` </Tab> </Tabs> --- ### Clone VM Clone an existing virtual machine. `POST: /lume/vms/clone` #### Parameters | Name | Type | Required | Description | | -------------- | ------ | -------- | ----------------------------------- | | name | string | Yes | Source VM name | | newName | string | Yes | New VM name | | sourceLocation | string | No | Source storage location (`default`) | | destLocation | string | No | Destination storage location | #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ -H "Content-Type: application/json" \ -d '{ "name": "source-vm", "newName": "cloned-vm", "sourceLocation": "default", "destLocation": "ssd" }' \ http://localhost:7777/lume/vms/clone ``` </Tab> <Tab value="Python"> ```python import requests payload = { "name": "source-vm", "newName": "cloned-vm", "sourceLocation": "default", "destLocation": "ssd" } r = requests.post("http://localhost:7777/lume/vms/clone", json=payload, timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const payload = { name: 'source-vm', newName: 'cloned-vm', sourceLocation: 'default', destLocation: 'ssd', }; const res = await fetch('http://localhost:7777/lume/vms/clone', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }); console.log(await res.json()); ``` </Tab> </Tabs> --- ### Pull VM Image Pull a VM image from a registry. `POST: /lume/pull` #### Parameters | Name | Type | Required | Description | | ------------ | ------ | -------- | ------------------------------------- | | image | string | Yes | Image name (e.g. `macos-sequoia-...`) | | name | string | No | VM name for the pulled image | | registry | string | No | Registry host (e.g. `ghcr.io`) | | organization | string | No | Organization name | | storage | string | No | Storage type (`ssd`, etc.) | #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ -H "Content-Type: application/json" \ -d '{ "image": "macos-sequoia-vanilla:latest", "name": "my-vm-name", "registry": "ghcr.io", "organization": "trycua", "storage": "ssd" }' \ http://localhost:7777/lume/pull ``` </Tab> <Tab value="Python"> ```python import requests payload = { "image": "macos-sequoia-vanilla:latest", "name": "my-vm-name", "registry": "ghcr.io", "organization": "trycua", "storage": "ssd" } r = requests.post("http://localhost:7777/lume/pull", json=payload, timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const payload = { image: 'macos-sequoia-vanilla:latest', name: 'my-vm-name', registry: 'ghcr.io', organization: 'trycua', storage: 'ssd', }; const res = await fetch('http://localhost:7777/lume/pull', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }); console.log(await res.json()); ``` </Tab> </Tabs> --- ### Push VM Image Push a VM to a registry as an image (asynchronous operation). `POST: /lume/vms/push` #### Parameters | Name | Type | Required | Description | | ------------ | ------------ | -------- | ----------------------------------------------- | | name | string | Yes | Local VM name to push | | imageName | string | Yes | Image name in registry | | tags | array | Yes | Image tags (e.g. `["latest", "v1"]`) | | organization | string | Yes | Organization name | | registry | string | No | Registry host (e.g. `ghcr.io`) | | chunkSizeMb | integer | No | Chunk size in MB for upload | | storage | string/null | No | Storage type (`ssd`, etc.) | #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ -H "Content-Type: application/json" \ -d '{ "name": "my-local-vm", "imageName": "my-image", "tags": ["latest", "v1"], "organization": "my-org", "registry": "ghcr.io", "chunkSizeMb": 512, "storage": null }' \ http://localhost:7777/lume/vms/push ``` </Tab> <Tab value="Python"> ```python import requests payload = { "name": "my-local-vm", "imageName": "my-image", "tags": ["latest", "v1"], "organization": "my-org", "registry": "ghcr.io", "chunkSizeMb": 512, "storage": None } r = requests.post("http://localhost:7777/lume/vms/push", json=payload, timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const payload = { name: 'my-local-vm', imageName: 'my-image', tags: ['latest', 'v1'], organization: 'my-org', registry: 'ghcr.io', chunkSizeMb: 512, storage: null, }; const res = await fetch('http://localhost:7777/lume/vms/push', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }); console.log(await res.json()); ``` </Tab> </Tabs> **Response (202 Accepted):** ```json { "message": "Push initiated in background", "name": "my-local-vm", "imageName": "my-image", "tags": [ "latest", "v1" ] } ``` --- ### List Images List available VM images. `GET: /lume/images` #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ http://localhost:7777/lume/images ``` </Tab> <Tab value="Python"> ```python import requests r = requests.get("http://localhost:7777/lume/images", timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const res = await fetch('http://localhost:7777/lume/images'); console.log(await res.json()); ``` </Tab> </Tabs> ```json { "local": [ "macos-sequoia-xcode:latest", "macos-sequoia-vanilla:latest" ] } ``` --- ### Prune Images Remove unused VM images to free up disk space. `POST: /lume/prune` #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ http://localhost:7777/lume/prune ``` </Tab> <Tab value="Python"> ```python import requests r = requests.post("http://localhost:7777/lume/prune", timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const res = await fetch('http://localhost:7777/lume/prune', { method: 'POST', }); console.log(await res.json()); ``` </Tab> </Tabs> --- ### Get Latest IPSW URL Get the URL for the latest macOS IPSW file. `GET: /lume/ipsw` #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ http://localhost:7777/lume/ipsw ``` </Tab> <Tab value="Python"> ```python import requests r = requests.get("http://localhost:7777/lume/ipsw", timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const res = await fetch('http://localhost:7777/lume/ipsw'); console.log(await res.json()); ``` </Tab> </Tabs> --- ## Configuration Management ### Get Configuration Get current Lume configuration settings. `GET: /lume/config` #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ http://localhost:7777/lume/config ``` </Tab> <Tab value="Python"> ```python import requests r = requests.get("http://localhost:7777/lume/config", timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const res = await fetch('http://localhost:7777/lume/config'); console.log(await res.json()); ``` </Tab> </Tabs> ```json { "homeDirectory": "~/.lume", "cacheDirectory": "~/.lume/cache", "cachingEnabled": true } ``` ### Update Configuration Update Lume configuration settings. `POST: /lume/config` #### Parameters | Name | Type | Required | Description | | --------------- | ------- | -------- | -------------------------------- | | homeDirectory | string | No | Lume home directory path | | cacheDirectory | string | No | Cache directory path | | cachingEnabled | boolean | No | Enable or disable caching | #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ -H "Content-Type: application/json" \ -d '{ "homeDirectory": "~/custom/lume", "cacheDirectory": "~/custom/lume/cache", "cachingEnabled": true }' \ http://localhost:7777/lume/config ``` </Tab> <Tab value="Python"> ```python import requests payload = { "homeDirectory": "~/custom/lume", "cacheDirectory": "~/custom/lume/cache", "cachingEnabled": True } r = requests.post("http://localhost:7777/lume/config", json=payload, timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const payload = { homeDirectory: '~/custom/lume', cacheDirectory: '~/custom/lume/cache', cachingEnabled: true, }; const res = await fetch('http://localhost:7777/lume/config', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }); console.log(await res.json()); ``` </Tab> </Tabs> --- ## Storage Location Management ### Get VM Storage Locations List all configured VM storage locations. `GET: /lume/config/locations` #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ http://localhost:7777/lume/config/locations ``` </Tab> <Tab value="Python"> ```python import requests r = requests.get("http://localhost:7777/lume/config/locations", timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const res = await fetch('http://localhost:7777/lume/config/locations'); console.log(await res.json()); ``` </Tab> </Tabs> ```json [ { "name": "default", "path": "~/.lume/vms", "isDefault": true }, { "name": "ssd", "path": "/Volumes/SSD/lume/vms", "isDefault": false } ] ``` ### Add VM Storage Location Add a new VM storage location. `POST: /lume/config/locations` #### Parameters | Name | Type | Required | Description | | ---- | ------ | -------- | ---------------------------- | | name | string | Yes | Storage location name | | path | string | Yes | File system path for storage | #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ -H "Content-Type: application/json" \ -d '{ "name": "ssd", "path": "/Volumes/SSD/lume/vms" }' \ http://localhost:7777/lume/config/locations ``` </Tab> <Tab value="Python"> ```python import requests payload = { "name": "ssd", "path": "/Volumes/SSD/lume/vms" } r = requests.post("http://localhost:7777/lume/config/locations", json=payload, timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const payload = { name: 'ssd', path: '/Volumes/SSD/lume/vms', }; const res = await fetch('http://localhost:7777/lume/config/locations', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }); console.log(await res.json()); ``` </Tab> </Tabs> ### Remove VM Storage Location Remove a VM storage location. `DELETE: /lume/config/locations/:name` #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ -X DELETE \ http://localhost:7777/lume/config/locations/ssd ``` </Tab> <Tab value="Python"> ```python import requests r = requests.delete("http://localhost:7777/lume/config/locations/ssd", timeout=50) print(r.status_code) ``` </Tab> <Tab value="TypeScript"> ```typescript const res = await fetch('http://localhost:7777/lume/config/locations/ssd', { method: 'DELETE', }); console.log(res.status); ``` </Tab> </Tabs> ### Set Default VM Storage Location Set a storage location as the default. `POST: /lume/config/locations/default/:name` #### Example Request <Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}> <Tab value="Curl"> ```bash curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ http://localhost:7777/lume/config/locations/default/ssd ``` </Tab> <Tab value="Python"> ```python import requests r = requests.post("http://localhost:7777/lume/config/locations/default/ssd", timeout=50) print(r.json()) ``` </Tab> <Tab value="TypeScript"> ```typescript const res = await fetch('http://localhost:7777/lume/config/locations/default/ssd', { method: 'POST', }); console.log(await res.json()); ``` </Tab> </Tabs> ``` -------------------------------------------------------------------------------- /libs/python/computer-server/computer_server/main.py: -------------------------------------------------------------------------------- ```python from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, HTTPException, Header from fastapi.responses import StreamingResponse, JSONResponse from typing import List, Dict, Any, Optional, Union, Literal, cast import uvicorn import logging import asyncio import json import traceback import inspect from contextlib import redirect_stdout, redirect_stderr from io import StringIO from .handlers.factory import HandlerFactory import os import aiohttp import hashlib import time import platform from fastapi.middleware.cors import CORSMiddleware # Authentication session TTL (in seconds). Override via env var CUA_AUTH_TTL_SECONDS. Default: 60s AUTH_SESSION_TTL_SECONDS: int = int(os.environ.get("CUA_AUTH_TTL_SECONDS", "60")) try: from agent import ComputerAgent HAS_AGENT = True except ImportError: HAS_AGENT = False # Set up logging with more detail logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # Configure WebSocket with larger message size WEBSOCKET_MAX_SIZE = 1024 * 1024 * 10 # 10MB limit # Configure application with WebSocket settings app = FastAPI( title="Computer API", description="API for the Computer project", version="0.1.0", websocket_max_size=WEBSOCKET_MAX_SIZE, ) # CORS configuration origins = ["*"] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) protocol_version = 1 try: from importlib.metadata import version package_version = version("cua-computer-server") except Exception: # Fallback for cases where package is not installed or importlib.metadata is not available try: import pkg_resources package_version = pkg_resources.get_distribution("cua-computer-server").version except Exception: package_version = "unknown" accessibility_handler, automation_handler, diorama_handler, file_handler = HandlerFactory.create_handlers() handlers = { "version": lambda: {"protocol": protocol_version, "package": package_version}, # App-Use commands "diorama_cmd": diorama_handler.diorama_cmd, # Accessibility commands "get_accessibility_tree": accessibility_handler.get_accessibility_tree, "find_element": accessibility_handler.find_element, # Shell commands "run_command": automation_handler.run_command, # File system commands "file_exists": file_handler.file_exists, "directory_exists": file_handler.directory_exists, "list_dir": file_handler.list_dir, "read_text": file_handler.read_text, "write_text": file_handler.write_text, "read_bytes": file_handler.read_bytes, "write_bytes": file_handler.write_bytes, "get_file_size": file_handler.get_file_size, "delete_file": file_handler.delete_file, "create_dir": file_handler.create_dir, "delete_dir": file_handler.delete_dir, # Mouse commands "mouse_down": automation_handler.mouse_down, "mouse_up": automation_handler.mouse_up, "left_click": automation_handler.left_click, "right_click": automation_handler.right_click, "double_click": automation_handler.double_click, "move_cursor": automation_handler.move_cursor, "drag_to": automation_handler.drag_to, "drag": automation_handler.drag, # Keyboard commands "key_down": automation_handler.key_down, "key_up": automation_handler.key_up, "type_text": automation_handler.type_text, "press_key": automation_handler.press_key, "hotkey": automation_handler.hotkey, # Scrolling actions "scroll": automation_handler.scroll, "scroll_down": automation_handler.scroll_down, "scroll_up": automation_handler.scroll_up, # Screen actions "screenshot": automation_handler.screenshot, "get_cursor_position": automation_handler.get_cursor_position, "get_screen_size": automation_handler.get_screen_size, # Clipboard actions "copy_to_clipboard": automation_handler.copy_to_clipboard, "set_clipboard": automation_handler.set_clipboard, } class AuthenticationManager: def __init__(self): self.sessions: Dict[str, Dict[str, Any]] = {} self.container_name = os.environ.get("CONTAINER_NAME") def _hash_credentials(self, container_name: str, api_key: str) -> str: """Create a hash of container name and API key for session identification""" combined = f"{container_name}:{api_key}" return hashlib.sha256(combined.encode()).hexdigest() def _is_session_valid(self, session_data: Dict[str, Any]) -> bool: """Check if a session is still valid based on expiration time""" if not session_data.get('valid', False): return False expires_at = session_data.get('expires_at', 0) return time.time() < expires_at async def auth(self, container_name: str, api_key: str) -> bool: """Authenticate container name and API key, using cached sessions when possible""" # If no CONTAINER_NAME is set, always allow access (local development) if not self.container_name: logger.info("No CONTAINER_NAME set in environment. Allowing access (local development mode)") return True # Layer 1: VM Identity Verification if container_name != self.container_name: logger.warning(f"VM name mismatch. Expected: {self.container_name}, Got: {container_name}") return False # Create hash for session lookup session_hash = self._hash_credentials(container_name, api_key) # Check if we have a valid cached session if session_hash in self.sessions: session_data = self.sessions[session_hash] if self._is_session_valid(session_data): logger.info(f"Using cached authentication for container: {container_name}") return session_data['valid'] else: # Remove expired session del self.sessions[session_hash] # No valid cached session, authenticate with API logger.info(f"Authenticating with TryCUA API for container: {container_name}") try: async with aiohttp.ClientSession() as session: headers = { "Authorization": f"Bearer {api_key}" } async with session.get( f"https://www.trycua.com/api/vm/auth?container_name={container_name}", headers=headers, ) as resp: is_valid = resp.status == 200 and bool((await resp.text()).strip()) # Cache the result with configurable expiration self.sessions[session_hash] = { 'valid': is_valid, 'expires_at': time.time() + AUTH_SESSION_TTL_SECONDS } if is_valid: logger.info(f"Authentication successful for container: {container_name}") else: logger.warning(f"Authentication failed for container: {container_name}. Status: {resp.status}") return is_valid except aiohttp.ClientError as e: logger.error(f"Failed to validate API key with TryCUA API: {str(e)}") # Cache failed result to avoid repeated requests self.sessions[session_hash] = { 'valid': False, 'expires_at': time.time() + AUTH_SESSION_TTL_SECONDS } return False except Exception as e: logger.error(f"Unexpected error during authentication: {str(e)}") # Cache failed result to avoid repeated requests self.sessions[session_hash] = { 'valid': False, 'expires_at': time.time() + AUTH_SESSION_TTL_SECONDS } return False class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) manager = ConnectionManager() auth_manager = AuthenticationManager() @app.get("/status") async def status(): sys = platform.system().lower() # get os type if "darwin" in sys or sys == "macos" or sys == "mac": os_type = "macos" elif "windows" in sys: os_type = "windows" else: os_type = "linux" # get computer-server features features = [] if HAS_AGENT: features.append("agent") return {"status": "ok", "os_type": os_type, "features": features} @app.websocket("/ws", name="websocket_endpoint") async def websocket_endpoint(websocket: WebSocket): global handlers # WebSocket message size is configured at the app or endpoint level, not on the instance await manager.connect(websocket) # Check if CONTAINER_NAME is set (indicating cloud provider) server_container_name = os.environ.get("CONTAINER_NAME") # If cloud provider, perform authentication handshake if server_container_name: try: logger.info(f"Cloud provider detected. CONTAINER_NAME: {server_container_name}. Waiting for authentication...") # Wait for authentication message auth_data = await websocket.receive_json() # Validate auth message format if auth_data.get("command") != "authenticate": await websocket.send_json({ "success": False, "error": "First message must be authentication" }) await websocket.close() manager.disconnect(websocket) return # Extract credentials client_api_key = auth_data.get("params", {}).get("api_key") client_container_name = auth_data.get("params", {}).get("container_name") # Validate credentials using AuthenticationManager if not client_api_key: await websocket.send_json({ "success": False, "error": "API key required" }) await websocket.close() manager.disconnect(websocket) return if not client_container_name: await websocket.send_json({ "success": False, "error": "Container name required" }) await websocket.close() manager.disconnect(websocket) return # Use AuthenticationManager for validation is_authenticated = await auth_manager.auth(client_container_name, client_api_key) if not is_authenticated: await websocket.send_json({ "success": False, "error": "Authentication failed" }) await websocket.close() manager.disconnect(websocket) return logger.info(f"Authentication successful for VM: {client_container_name}") await websocket.send_json({ "success": True, "message": "Authentication successful" }) except Exception as e: logger.error(f"Error during authentication handshake: {str(e)}") await websocket.send_json({ "success": False, "error": "Authentication failed" }) await websocket.close() manager.disconnect(websocket) return try: while True: try: data = await websocket.receive_json() command = data.get("command") params = data.get("params", {}) if command not in handlers: await websocket.send_json( {"success": False, "error": f"Unknown command: {command}"} ) continue try: # Filter params to only include those accepted by the handler function handler_func = handlers[command] sig = inspect.signature(handler_func) filtered_params = {k: v for k, v in params.items() if k in sig.parameters} # Handle both sync and async functions if asyncio.iscoroutinefunction(handler_func): result = await handler_func(**filtered_params) else: # Run sync functions in thread pool to avoid blocking event loop result = await asyncio.to_thread(handler_func, **filtered_params) await websocket.send_json({"success": True, **result}) except Exception as cmd_error: logger.error(f"Error executing command {command}: {str(cmd_error)}") logger.error(traceback.format_exc()) await websocket.send_json({"success": False, "error": str(cmd_error)}) except WebSocketDisconnect: raise except json.JSONDecodeError as json_err: logger.error(f"JSON decode error: {str(json_err)}") await websocket.send_json( {"success": False, "error": f"Invalid JSON: {str(json_err)}"} ) except Exception as loop_error: logger.error(f"Error in message loop: {str(loop_error)}") logger.error(traceback.format_exc()) await websocket.send_json({"success": False, "error": str(loop_error)}) except WebSocketDisconnect: logger.info("Client disconnected") manager.disconnect(websocket) except Exception as e: logger.error(f"Fatal error in websocket connection: {str(e)}") logger.error(traceback.format_exc()) try: await websocket.close() except: pass manager.disconnect(websocket) @app.post("/cmd") async def cmd_endpoint( request: Request, container_name: Optional[str] = Header(None, alias="X-Container-Name"), api_key: Optional[str] = Header(None, alias="X-API-Key") ): """ Backup endpoint for when WebSocket connections fail. Accepts commands via HTTP POST with streaming response. Headers: - X-Container-Name: Container name for cloud authentication - X-API-Key: API key for cloud authentication Body: { "command": "command_name", "params": {...} } """ global handlers # Parse request body try: body = await request.json() command = body.get("command") params = body.get("params", {}) except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid JSON body: {str(e)}") if not command: raise HTTPException(status_code=400, detail="Command is required") # Check if CONTAINER_NAME is set (indicating cloud provider) server_container_name = os.environ.get("CONTAINER_NAME") # If cloud provider, perform authentication if server_container_name: logger.info(f"Cloud provider detected. CONTAINER_NAME: {server_container_name}. Performing authentication...") # Validate required headers if not container_name: raise HTTPException(status_code=401, detail="Container name required") if not api_key: raise HTTPException(status_code=401, detail="API key required") # Validate with AuthenticationManager is_authenticated = await auth_manager.auth(container_name, api_key) if not is_authenticated: raise HTTPException(status_code=401, detail="Authentication failed") if command not in handlers: raise HTTPException(status_code=400, detail=f"Unknown command: {command}") async def generate_response(): """Generate streaming response for the command execution""" try: # Filter params to only include those accepted by the handler function handler_func = handlers[command] sig = inspect.signature(handler_func) filtered_params = {k: v for k, v in params.items() if k in sig.parameters} # Handle both sync and async functions if asyncio.iscoroutinefunction(handler_func): result = await handler_func(**filtered_params) else: # Run sync functions in thread pool to avoid blocking event loop result = await asyncio.to_thread(handler_func, **filtered_params) # Stream the successful result response_data = {"success": True, **result} yield f"data: {json.dumps(response_data)}\n\n" except Exception as cmd_error: logger.error(f"Error executing command {command}: {str(cmd_error)}") logger.error(traceback.format_exc()) # Stream the error result error_data = {"success": False, "error": str(cmd_error)} yield f"data: {json.dumps(error_data)}\n\n" return StreamingResponse( generate_response(), media_type="text/plain", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", } ) @app.post("/responses") async def agent_response_endpoint( request: Request, api_key: Optional[str] = Header(None, alias="X-API-Key"), ): """ Minimal proxy to run ComputerAgent for up to 2 turns. Security: - If CONTAINER_NAME is set on the server, require X-API-Key and validate using AuthenticationManager unless CUA_ENABLE_PUBLIC_PROXY is true. Body JSON: { "model": "...", # required "input": "... or messages[]", # required "agent_kwargs": { ... }, # optional, passed directly to ComputerAgent "env": { ... } # optional env overrides for agent } """ if not HAS_AGENT: raise HTTPException(status_code=501, detail="ComputerAgent not available") # Authenticate via AuthenticationManager if running in cloud (CONTAINER_NAME set) container_name = os.environ.get("CONTAINER_NAME") if container_name: is_public = os.environ.get("CUA_ENABLE_PUBLIC_PROXY", "").lower().strip() in ["1", "true", "yes", "y", "on"] if not is_public: if not api_key: raise HTTPException(status_code=401, detail="Missing AGENT PROXY auth headers") ok = await auth_manager.auth(container_name, api_key) if not ok: raise HTTPException(status_code=401, detail="Unauthorized") # Parse request body try: body = await request.json() except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid JSON body: {str(e)}") model = body.get("model") input_data = body.get("input") if not model or input_data is None: raise HTTPException(status_code=400, detail="'model' and 'input' are required") agent_kwargs: Dict[str, Any] = body.get("agent_kwargs") or {} env_overrides: Dict[str, str] = body.get("env") or {} # Simple env override context class _EnvOverride: def __init__(self, overrides: Dict[str, str]): self.overrides = overrides self._original: Dict[str, Optional[str]] = {} def __enter__(self): for k, v in (self.overrides or {}).items(): self._original[k] = os.environ.get(k) os.environ[k] = str(v) def __exit__(self, exc_type, exc, tb): for k, old in self._original.items(): if old is None: os.environ.pop(k, None) else: os.environ[k] = old # Convert input to messages def _to_messages(data: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]: if isinstance(data, str): return [{"role": "user", "content": data}] if isinstance(data, list): return data messages = _to_messages(input_data) # Define a direct computer tool that implements the AsyncComputerHandler protocol # and delegates to our existing automation/file/accessibility handlers. from agent.computers import AsyncComputerHandler # runtime-checkable Protocol class DirectComputer(AsyncComputerHandler): def __init__(self): # use module-scope handler singletons created by HandlerFactory self._auto = automation_handler self._file = file_handler self._access = accessibility_handler async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]: sys = platform.system().lower() if "darwin" in sys or sys in ("macos", "mac"): return "mac" if "windows" in sys: return "windows" return "linux" async def get_dimensions(self) -> tuple[int, int]: size = await self._auto.get_screen_size() return size["width"], size["height"] async def screenshot(self) -> str: img_b64 = await self._auto.screenshot() return img_b64["image_data"] async def click(self, x: int, y: int, button: str = "left") -> None: if button == "left": await self._auto.left_click(x, y) elif button == "right": await self._auto.right_click(x, y) else: await self._auto.left_click(x, y) async def double_click(self, x: int, y: int) -> None: await self._auto.double_click(x, y) async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None: await self._auto.move_cursor(x, y) await self._auto.scroll(scroll_x, scroll_y) async def type(self, text: str) -> None: await self._auto.type_text(text) async def wait(self, ms: int = 1000) -> None: await asyncio.sleep(ms / 1000.0) async def move(self, x: int, y: int) -> None: await self._auto.move_cursor(x, y) async def keypress(self, keys: Union[List[str], str]) -> None: if isinstance(keys, str): parts = keys.replace("-", "+").split("+") if len(keys) > 1 else [keys] else: parts = keys if len(parts) == 1: await self._auto.press_key(parts[0]) else: await self._auto.hotkey(parts) async def drag(self, path: List[Dict[str, int]]) -> None: if not path: return start = path[0] await self._auto.mouse_down(start["x"], start["y"]) for pt in path[1:]: await self._auto.move_cursor(pt["x"], pt["y"]) end = path[-1] await self._auto.mouse_up(end["x"], end["y"]) async def get_current_url(self) -> str: # Not available in this server context return "" async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None: await self._auto.mouse_down(x, y, button="left") async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None: await self._auto.mouse_up(x, y, button="left") # # Inline image URLs to base64 # import base64, mimetypes, requests # # Use a browser-like User-Agent to avoid 403s from some CDNs (e.g., Wikimedia) # HEADERS = { # "User-Agent": ( # "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " # "AppleWebKit/537.36 (KHTML, like Gecko) " # "Chrome/124.0.0.0 Safari/537.36" # ) # } # def _to_data_url(content_bytes: bytes, url: str, resp: requests.Response) -> str: # ctype = resp.headers.get("Content-Type") or mimetypes.guess_type(url)[0] or "application/octet-stream" # b64 = base64.b64encode(content_bytes).decode("utf-8") # return f"data:{ctype};base64,{b64}" # def inline_image_urls(messages): # # messages: List[{"role": "...","content":[...]}] # out = [] # for m in messages: # if not isinstance(m.get("content"), list): # out.append(m) # continue # new_content = [] # for part in (m.get("content") or []): # if part.get("type") == "input_image" and (url := part.get("image_url")): # resp = requests.get(url, headers=HEADERS, timeout=30) # resp.raise_for_status() # new_content.append({ # "type": "input_image", # "image_url": _to_data_url(resp.content, url, resp) # }) # else: # new_content.append(part) # out.append({**m, "content": new_content}) # return out # messages = inline_image_urls(messages) error = None with _EnvOverride(env_overrides): # Prepare tools: if caller did not pass tools, inject our DirectComputer tools = agent_kwargs.get("tools") if not tools: tools = [DirectComputer()] agent_kwargs = {**agent_kwargs, "tools": tools} # Instantiate agent with our tools agent = ComputerAgent(model=model, **agent_kwargs) # type: ignore[arg-type] total_output: List[Any] = [] total_usage: Dict[str, Any] = {} pending_computer_call_ids = set() try: async for result in agent.run(messages): total_output += result["output"] # Try to collect usage if present if isinstance(result, dict) and "usage" in result and isinstance(result["usage"], dict): # Merge usage counters for k, v in result["usage"].items(): if isinstance(v, (int, float)): total_usage[k] = total_usage.get(k, 0) + v else: total_usage[k] = v for msg in result.get("output", []): if msg.get("type") == "computer_call": pending_computer_call_ids.add(msg["call_id"]) elif msg.get("type") == "computer_call_output": pending_computer_call_ids.discard(msg["call_id"]) # exit if no pending computer calls if not pending_computer_call_ids: break except Exception as e: logger.error(f"Error running agent: {str(e)}") logger.error(traceback.format_exc()) error = str(e) # Build response payload payload = { "model": model, "error": error, "output": total_output, "usage": total_usage, "status": "completed" if not error else "failed" } # CORS: allow any origin headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", } return JSONResponse(content=payload, headers=headers) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) ``` -------------------------------------------------------------------------------- /libs/lume/src/Server/Handlers.swift: -------------------------------------------------------------------------------- ```swift import ArgumentParser import Foundation import Virtualization @MainActor extension Server { // MARK: - VM Management Handlers func handleListVMs(storage: String? = nil) async throws -> HTTPResponse { do { let vmController = LumeController() let vms = try vmController.list(storage: storage) return try .json(vms) } catch { print( "ERROR: Failed to list VMs: \(error.localizedDescription), storage=\(String(describing: storage))" ) return .badRequest(message: error.localizedDescription) } } func handleGetVM(name: String, storage: String? = nil) async throws -> HTTPResponse { print("Getting VM details: name=\(name), storage=\(String(describing: storage))") do { let vmController = LumeController() print("Created VM controller, attempting to get VM") let vm = try vmController.get(name: name, storage: storage) print("Successfully retrieved VM") // Check for nil values that might cause crashes if vm.vmDirContext.config.macAddress == nil { print("ERROR: VM has nil macAddress") return .badRequest(message: "VM configuration is invalid (nil macAddress)") } print("MacAddress check passed") // Log that we're about to access details print("Preparing VM details response") // Print the full details object for debugging let details = vm.details print("VM DETAILS: \(details)") print(" name: \(details.name)") print(" os: \(details.os)") print(" cpuCount: \(details.cpuCount)") print(" memorySize: \(details.memorySize)") print(" diskSize: \(details.diskSize)") print(" display: \(details.display)") print(" status: \(details.status)") print(" vncUrl: \(String(describing: details.vncUrl))") print(" ipAddress: \(String(describing: details.ipAddress))") print(" locationName: \(details.locationName)") // Serialize the VM details print("About to serialize VM details") let response = try HTTPResponse.json(vm.details) print("Successfully serialized VM details") return response } catch { // This will catch errors from both vmController.get and the json serialization print("ERROR: Failed to get VM details: \(error.localizedDescription)") return .badRequest(message: error.localizedDescription) } } func handleCreateVM(_ body: Data?) async throws -> HTTPResponse { guard let body = body, let request = try? JSONDecoder().decode(CreateVMRequest.self, from: body) else { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: "Invalid request body")) ) } do { let sizes = try request.parse() let vmController = LumeController() try await vmController.create( name: request.name, os: request.os, diskSize: sizes.diskSize, cpuCount: request.cpu, memorySize: sizes.memory, display: request.display, ipsw: request.ipsw, storage: request.storage ) return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode([ "message": "VM created successfully", "name": request.name, ]) ) } catch { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } func handleDeleteVM(name: String, storage: String? = nil) async throws -> HTTPResponse { do { let vmController = LumeController() try await vmController.delete(name: name, storage: storage) return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: Data()) } catch { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription))) } } func handleCloneVM(_ body: Data?) async throws -> HTTPResponse { guard let body = body, let request = try? JSONDecoder().decode(CloneRequest.self, from: body) else { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: "Invalid request body")) ) } do { let vmController = LumeController() try vmController.clone( name: request.name, newName: request.newName, sourceLocation: request.sourceLocation, destLocation: request.destLocation ) return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode([ "message": "VM cloned successfully", "source": request.name, "destination": request.newName, ]) ) } catch { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } // MARK: - VM Operation Handlers func handleSetVM(name: String, body: Data?) async throws -> HTTPResponse { guard let body = body, let request = try? JSONDecoder().decode(SetVMRequest.self, from: body) else { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: "Invalid request body")) ) } do { let vmController = LumeController() let sizes = try request.parse() try vmController.updateSettings( name: name, cpu: request.cpu, memory: sizes.memory, diskSize: sizes.diskSize, display: sizes.display?.string, storage: request.storage ) return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(["message": "VM settings updated successfully"]) ) } catch { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } func handleStopVM(name: String, storage: String? = nil) async throws -> HTTPResponse { Logger.info( "Stopping VM", metadata: ["name": name, "storage": String(describing: storage)]) do { Logger.info("Creating VM controller", metadata: ["name": name]) let vmController = LumeController() Logger.info("Calling stopVM on controller", metadata: ["name": name]) try await vmController.stopVM(name: name, storage: storage) Logger.info( "VM stopped, waiting 5 seconds for locks to clear", metadata: ["name": name]) // Add a delay to ensure locks are fully released before returning for i in 1...5 { try? await Task.sleep(nanoseconds: 1_000_000_000) Logger.info("Lock clearing delay", metadata: ["name": name, "seconds": "\(i)/5"]) } // Verify the VM is really in a stopped state Logger.info("Verifying VM is stopped", metadata: ["name": name]) let vm = try? vmController.get(name: name, storage: storage) if let vm = vm, vm.details.status == "running" { Logger.info( "VM still reports as running despite stop operation", metadata: ["name": name, "severity": "warning"]) } else { Logger.info( "Verification complete: VM is in stopped state", metadata: ["name": name]) } Logger.info("Returning successful response", metadata: ["name": name]) return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(["message": "VM stopped successfully"]) ) } catch { Logger.error( "Failed to stop VM", metadata: [ "name": name, "error": error.localizedDescription, "storage": String(describing: storage), ]) return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } func handleRunVM(name: String, body: Data?) async throws -> HTTPResponse { Logger.info("Running VM", metadata: ["name": name]) // Log the raw body data if available if let body = body, let bodyString = String(data: body, encoding: .utf8) { Logger.info("Run VM raw request body", metadata: ["name": name, "body": bodyString]) } else { Logger.info("No request body or could not decode as string", metadata: ["name": name]) } do { Logger.info("Creating VM controller and parsing request", metadata: ["name": name]) let request = body.flatMap { try? JSONDecoder().decode(RunVMRequest.self, from: $0) } ?? RunVMRequest( noDisplay: nil, sharedDirectories: nil, recoveryMode: nil, storage: nil) Logger.info( "Parsed request", metadata: [ "name": name, "noDisplay": String(describing: request.noDisplay), "sharedDirectories": "\(request.sharedDirectories?.count ?? 0)", "storage": String(describing: request.storage), ]) Logger.info("Parsing shared directories", metadata: ["name": name]) let dirs = try request.parse() Logger.info( "Successfully parsed shared directories", metadata: ["name": name, "count": "\(dirs.count)"]) // Start VM in background Logger.info("Starting VM in background", metadata: ["name": name]) startVM( name: name, noDisplay: request.noDisplay ?? false, sharedDirectories: dirs, recoveryMode: request.recoveryMode ?? false, storage: request.storage ) Logger.info("VM start initiated in background", metadata: ["name": name]) // Return response immediately return HTTPResponse( statusCode: .accepted, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode([ "message": "VM start initiated", "name": name, "status": "pending", ]) ) } catch { Logger.error( "Failed to run VM", metadata: [ "name": name, "error": error.localizedDescription, ]) return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } // MARK: - Image Management Handlers func handleIPSW() async throws -> HTTPResponse { do { let vmController = LumeController() let url = try await vmController.getLatestIPSWURL() return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(["url": url.absoluteString]) ) } catch { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } func handlePull(_ body: Data?) async throws -> HTTPResponse { guard let body = body, let request = try? JSONDecoder().decode(PullRequest.self, from: body) else { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: "Invalid request body")) ) } do { let vmController = LumeController() try await vmController.pullImage( image: request.image, name: request.name, registry: request.registry, organization: request.organization, storage: request.storage ) return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode([ "message": "Image pulled successfully", "image": request.image, "name": request.name ?? "default", ]) ) } catch { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } func handlePruneImages() async throws -> HTTPResponse { do { let vmController = LumeController() try await vmController.pruneImages() return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(["message": "Successfully removed cached images"]) ) } catch { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } func handlePush(_ body: Data?) async throws -> HTTPResponse { guard let body = body, let request = try? JSONDecoder().decode(PushRequest.self, from: body) else { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: "Invalid request body")) ) } // Trigger push asynchronously, return Accepted immediately Task.detached { @MainActor @Sendable in do { let vmController = LumeController() try await vmController.pushImage( name: request.name, imageName: request.imageName, tags: request.tags, registry: request.registry, organization: request.organization, storage: request.storage, chunkSizeMb: request.chunkSizeMb, verbose: false, // Verbose typically handled by server logs dryRun: false, // Default API behavior is likely non-dry-run reassemble: false // Default API behavior is likely non-reassemble ) print( "Background push completed successfully for image: \(request.imageName):\(request.tags.joined(separator: ","))" ) } catch { print( "Background push failed for image: \(request.imageName):\(request.tags.joined(separator: ",")) - Error: \(error.localizedDescription)" ) } } return HTTPResponse( statusCode: .accepted, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode([ "message": AnyEncodable("Push initiated in background"), "name": AnyEncodable(request.name), "imageName": AnyEncodable(request.imageName), "tags": AnyEncodable(request.tags), ]) ) } func handleGetImages(_ request: HTTPRequest) async throws -> HTTPResponse { let pathAndQuery = request.path.split(separator: "?", maxSplits: 1) let queryParams = pathAndQuery.count > 1 ? pathAndQuery[1] .split(separator: "&") .reduce(into: [String: String]()) { dict, param in let parts = param.split(separator: "=", maxSplits: 1) if parts.count == 2 { dict[String(parts[0])] = String(parts[1]) } } : [:] let organization = queryParams["organization"] ?? "trycua" do { let vmController = LumeController() let imageList = try await vmController.getImages(organization: organization) // Create a response format that matches the CLI output let response = imageList.local.map { [ "repository": $0.repository, "imageId": $0.imageId, ] } return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(response) ) } catch { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } // MARK: - Config Management Handlers func handleGetConfig() async throws -> HTTPResponse { do { let vmController = LumeController() let settings = vmController.getSettings() return try .json(settings) } catch { return .badRequest(message: error.localizedDescription) } } struct ConfigRequest: Codable { let homeDirectory: String? let cacheDirectory: String? let cachingEnabled: Bool? } func handleUpdateConfig(_ body: Data?) async throws -> HTTPResponse { guard let body = body, let request = try? JSONDecoder().decode(ConfigRequest.self, from: body) else { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: "Invalid request body")) ) } do { let vmController = LumeController() if let homeDir = request.homeDirectory { try vmController.setHomeDirectory(homeDir) } if let cacheDir = request.cacheDirectory { try vmController.setCacheDirectory(path: cacheDir) } if let cachingEnabled = request.cachingEnabled { try vmController.setCachingEnabled(cachingEnabled) } return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(["message": "Configuration updated successfully"]) ) } catch { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } func handleGetLocations() async throws -> HTTPResponse { do { let vmController = LumeController() let locations = vmController.getLocations() return try .json(locations) } catch { return .badRequest(message: error.localizedDescription) } } struct LocationRequest: Codable { let name: String let path: String } func handleAddLocation(_ body: Data?) async throws -> HTTPResponse { guard let body = body, let request = try? JSONDecoder().decode(LocationRequest.self, from: body) else { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: "Invalid request body")) ) } do { let vmController = LumeController() try vmController.addLocation(name: request.name, path: request.path) return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode([ "message": "Location added successfully", "name": request.name, "path": request.path, ]) ) } catch { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } func handleRemoveLocation(_ name: String) async throws -> HTTPResponse { do { let vmController = LumeController() try vmController.removeLocation(name: name) return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(["message": "Location removed successfully"]) ) } catch { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } func handleSetDefaultLocation(_ name: String) async throws -> HTTPResponse { do { let vmController = LumeController() try vmController.setDefaultLocation(name: name) return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(["message": "Default location set successfully"]) ) } catch { return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } // MARK: - Log Handlers func handleGetLogs(type: String?, lines: Int?) async throws -> HTTPResponse { do { let logType = type?.lowercased() ?? "all" let infoPath = "/tmp/lume_daemon.log" let errorPath = "/tmp/lume_daemon.error.log" let fileManager = FileManager.default var response: [String: String] = [:] // Function to read log files func readLogFile(path: String) -> String? { guard fileManager.fileExists(atPath: path) else { return nil } do { let content = try String(contentsOfFile: path, encoding: .utf8) // If lines parameter is provided, return only the specified number of lines from the end if let lineCount = lines { let allLines = content.components(separatedBy: .newlines) let startIndex = max(0, allLines.count - lineCount) let lastLines = Array(allLines[startIndex...]) return lastLines.joined(separator: "\n") } return content } catch { return "Error reading log file: \(error.localizedDescription)" } } // Get logs based on requested type if logType == "info" || logType == "all" { response["info"] = readLogFile(path: infoPath) ?? "Info log file not found" } if logType == "error" || logType == "all" { response["error"] = readLogFile(path: errorPath) ?? "Error log file not found" } return try .json(response) } catch { return .badRequest(message: error.localizedDescription) } } // MARK: - Private Helper Methods nonisolated private func startVM( name: String, noDisplay: Bool, sharedDirectories: [SharedDirectory] = [], recoveryMode: Bool = false, storage: String? = nil ) { Logger.info( "Starting VM in detached task", metadata: [ "name": name, "noDisplay": "\(noDisplay)", "recoveryMode": "\(recoveryMode)", "storage": String(describing: storage), ]) Task.detached { @MainActor @Sendable in Logger.info("Background task started for VM", metadata: ["name": name]) do { Logger.info("Creating VM controller in background task", metadata: ["name": name]) let vmController = LumeController() Logger.info( "Calling runVM on controller", metadata: [ "name": name, "noDisplay": "\(noDisplay)", ]) try await vmController.runVM( name: name, noDisplay: noDisplay, sharedDirectories: sharedDirectories, recoveryMode: recoveryMode, storage: storage ) Logger.info("VM started successfully in background task", metadata: ["name": name]) } catch { Logger.error( "Failed to start VM in background task", metadata: [ "name": name, "error": error.localizedDescription, ]) } } Logger.info("Background task dispatched for VM", metadata: ["name": name]) } } ``` -------------------------------------------------------------------------------- /blog/build-your-own-operator-on-macos-2.md: -------------------------------------------------------------------------------- ```markdown # Build Your Own Operator on macOS - Part 2 *Published on April 27, 2025 by Francesco Bonacci* In our [previous post](build-your-own-operator-on-macos-1.md), we built a basic Computer-Use Operator from scratch using OpenAI's `computer-use-preview` model and our [cua-computer](https://pypi.org/project/cua-computer) package. While educational, implementing the control loop manually can be tedious and error-prone. In this follow-up, we'll explore our [cua-agent](https://pypi.org/project/cua-agent) framework - a high-level abstraction that handles all the complexity of VM interaction, screenshot processing, model communication, and action execution automatically. <div align="center"> <video src="https://github.com/user-attachments/assets/0be7e3e3-eead-4646-a4a3-5bb392501ee7" width="600" controls></video> </div> ## What You'll Learn By the end of this tutorial, you'll be able to: - Set up the `cua-agent` framework with various agent loop types and model providers - Understand the different agent loop types and their capabilities - Work with local models for cost-effective workflows - Use a simple UI for your operator **Prerequisites:** - Completed setup from Part 1 ([lume CLI installed](https://github.com/trycua/cua?tab=readme-ov-file#option-2-full-computer-use-agent-capabilities), macOS CUA image already pulled) - Python 3.10+. We recommend using Conda (or Anaconda) to create an ad hoc Python environment. - API keys for OpenAI and/or Anthropic (optional for local models) **Estimated Time:** 30-45 minutes ## Introduction to cua-agent The `cua-agent` framework is designed to simplify building Computer-Use Agents. It abstracts away the complex interaction loop we built manually in Part 1, letting you focus on defining tasks rather than implementing the machinery. Among other features, it includes: - **Multiple Provider Support**: Works with OpenAI, Anthropic, UI-Tars, local models (via Ollama), or any OpenAI-compatible model (e.g. LM Studio, vLLM, LocalAI, OpenRouter, Groq, etc.) - **Flexible Loop Types**: Different implementations optimized for various models (e.g. OpenAI vs. Anthropic) - **Structured Responses**: Clean, consistent output following the OpenAI Agent SDK specification we touched on in Part 1 - **Local Model Support**: Run cost-effectively with locally hosted models (Ollama, LM Studio, vLLM, LocalAI, etc.) - **Gradio UI**: Optional visual interface for interacting with your agent ## Installation Let's start by installing the `cua-agent` package. You can install it with all features or selectively install only what you need. From your python 3.10+ environment, run: ```bash # For all features pip install "cua-agent[all]" # Or selectively install only what you need pip install "cua-agent[openai]" # OpenAI support pip install "cua-agent[anthropic]" # Anthropic support pip install "cua-agent[uitars]" # UI-Tars support pip install "cua-agent[omni]" # OmniParser + VLMs support pip install "cua-agent[ui]" # Gradio UI ``` ## Setting Up Your Environment Before running any code examples, let's set up a proper environment: 1. **Create a new directory** for your project: ```bash mkdir cua-agent-tutorial cd cua-agent-tutorial ``` 2. **Set up a Python environment** using one of these methods: **Option A: Using conda command line** ```bash # Using conda conda create -n cua-agent python=3.10 conda activate cua-agent ``` **Option B: Using Anaconda Navigator UI** - Open Anaconda Navigator - Click on "Environments" in the left sidebar - Click the "Create" button at the bottom - Name your environment "cua-agent" - Select Python 3.10 - Click "Create" - Once created, select the environment and click "Open Terminal" to activate it **Option C: Using venv** ```bash python -m venv cua-env source cua-env/bin/activate # On macOS/Linux ``` 3. **Install the cua-agent package**: ```bash pip install "cua-agent[all]" ``` 4. **Set up your API keys as environment variables**: ```bash # For OpenAI models export OPENAI_API_KEY=your_openai_key_here # For Anthropic models (if needed) export ANTHROPIC_API_KEY=your_anthropic_key_here ``` 5. **Create a Python file or notebook**: **Option A: Create a Python script** ```bash # For a Python script touch cua_agent_example.py ``` **Option B: Use VS Code notebooks** - Open VS Code - Install the Python extension if you haven't already - Create a new file with a `.ipynb` extension (e.g., `cua_agent_tutorial.ipynb`) - Select your Python environment when prompted - You can now create and run code cells in the notebook interface Now you're ready to run the code examples! ## Understanding Agent Loops If you recall from Part 1, we had to implement a custom interaction loop to interact with the compute-use-preview model. In the `cua-agent` framework, an **Agent Loop** is the core abstraction that implements the continuous interaction cycle between an AI model and the computer environment. It manages the flow of: 1. Capturing screenshots of the computer's state 2. Processing these screenshots (with or without UI element detection) 3. Sending this visual context to an AI model along with the task instructions 4. Receiving the model's decisions on what actions to take 5. Safely executing these actions in the environment 6. Repeating this cycle until the task is complete The loop handles all the complex error handling, retries, context management, and model-specific interaction patterns so you don't have to implement them yourself. While the core concept remains the same across all agent loops, different AI models require specialized handling for optimal performance. To address this, the framework provides 4 different agent loop implementations, each designed for different computer-use modalities. | Agent Loop | Supported Models | Description | Set-Of-Marks | |:-----------|:-----------------|:------------|:-------------| | `AgentLoop.OPENAI` | • `computer_use_preview` | Use OpenAI Operator CUA Preview model | Not Required | | `AgentLoop.ANTHROPIC` | • `claude-3-5-sonnet-20240620`<br>• `claude-3-7-sonnet-20250219` | Use Anthropic Computer-Use Beta Tools | Not Required | | `AgentLoop.UITARS` | • `ByteDance-Seed/UI-TARS-1.5-7B` | Uses ByteDance's UI-TARS 1.5 model | Not Required | | `AgentLoop.OMNI` | • `claude-3-5-sonnet-20240620`<br>• `claude-3-7-sonnet-20250219`<br>• `gpt-4.5-preview`<br>• `gpt-4o`<br>• `gpt-4`<br>• `phi4`<br>• `phi4-mini`<br>• `gemma3`<br>• `...`<br>• `Any Ollama or OpenAI-compatible model` | Use OmniParser for element pixel-detection (SoM) and any VLMs for UI Grounding and Reasoning | OmniParser | Each loop handles the same basic pattern we implemented manually in Part 1: 1. Take a screenshot of the VM 2. Send the screenshot and task to the AI model 3. Receive an action to perform 4. Execute the action 5. Repeat until the task is complete ### Why Different Agent Loops? The `cua-agent` framework provides multiple agent loop implementations to abstract away the complexity of interacting with different CUA models. Each provider has unique API structures, response formats, conventions and capabilities that require specialized handling: - **OpenAI Loop**: Uses the Responses API with a specific `computer_call_output` format for sending screenshots after actions. Requires handling safety checks and maintains a chain of requests using `previous_response_id`. - **Anthropic Loop**: Implements a [multi-agent loop pattern](https://docs.anthropic.com/en/docs/agents-and-tools/computer-use#understanding-the-multi-agent-loop) with a sophisticated message handling system, supporting various API providers (Anthropic, Bedrock, Vertex) with token management and prompt caching capabilities. - **UI-TARS Loop**: Requires custom message formatting and specialized parsing to extract actions from text responses using a "box token" system for UI element identification. - **OMNI Loop**: Uses [Microsoft's OmniParser](https://github.com/microsoft/OmniParser) to create a [Set-of-Marks (SoM)](https://arxiv.org/abs/2310.11441) representation of the UI, enabling any vision-language model to interact with interfaces without specialized UI training. - **AgentLoop.OMNI**: The most flexible option that works with virtually any vision-language model including local and open-source ones. Perfect for cost-effective development or when you need to use models without native computer-use capabilities. These abstractions allow you to easily switch between providers without changing your application code. All loop implementations are available in the [cua-agent GitHub repository](https://github.com/trycua/cua/tree/main/libs/agent/agent/providers). Choosing the right agent loop depends not only on your API access and technical requirements but also on the specific tasks you need to accomplish. To make an informed decision, it's helpful to understand how these underlying models perform across different computing environments – from desktop operating systems to web browsers and mobile interfaces. ## Computer-Use Model Capabilities The performance of different Computer-Use models varies significantly across tasks. These benchmark evaluations measure an agent's ability to follow instructions and complete real-world tasks in different computing environments. | Benchmark type | Benchmark | UI-TARS-1.5 | OpenAI CUA | Claude 3.7 | Previous SOTA | Human | |----------------|--------------------------------------------------------------------------------------------------------------------------------------------------|-------------|-------------|-------------|----------------------|-------------| | **Computer Use** | [OSworld](https://arxiv.org/abs/2404.07972) (100 steps) | **42.5** | 36.4 | 28 | 38.1 (200 step) | 72.4 | | | [Windows Agent Arena](https://arxiv.org/abs/2409.08264) (50 steps) | **42.1** | - | - | 29.8 | - | | **Browser Use** | [WebVoyager](https://arxiv.org/abs/2401.13919) | 84.8 | **87** | 84.1 | 87 | - | | | [Online-Mind2web](https://arxiv.org/abs/2504.01382) | **75.8** | 71 | 62.9 | 71 | - | | **Phone Use** | [Android World](https://arxiv.org/abs/2405.14573) | **64.2** | - | - | 59.5 | - | ### When to Use Each Loop - **AgentLoop.OPENAI**: Choose when you have OpenAI Tier 3 access and need the most capable computer-use agent for web-based tasks. Uses the same [OpenAI Computer-Use Loop](https://platform.openai.com/docs/guides/tools-computer-use) as Part 1, delivering strong performance on browser-based benchmarks. - **AgentLoop.ANTHROPIC**: Ideal for users with Anthropic API access who need strong reasoning capabilities with computer-use abilities. Works with `claude-3-5-sonnet-20240620` and `claude-3-7-sonnet-20250219` models following [Anthropic's Computer-Use tools](https://docs.anthropic.com/en/docs/agents-and-tools/computer-use#understanding-the-multi-agent-loop). - **AgentLoop.UITARS**: Best for scenarios requiring more powerful OS/desktop, and latency-sensitive automation, as UI-TARS-1.5 leads in OS capabilities benchmarks. Requires running the model locally or accessing it through compatible endpoints (e.g. on Hugging Face). - **AgentLoop.OMNI**: The most flexible option that works with virtually any vision-language model including local and open-source ones. Perfect for cost-effective development or when you need to use models without native computer-use capabilities. Now that we understand the capabilities and strengths of different models, let's see how easy it is to implement a Computer-Use Agent using the `cua-agent` framework. Let's look at the implementation details. ## Creating Your First Computer-Use Agent With the `cua-agent` framework, creating a Computer-Use Agent becomes remarkably straightforward. The framework handles all the complexities of model interaction, screenshot processing, and action execution behind the scenes. Let's look at a simple example of how to build your first agent: **How to run this example:** 1. Create a new file named `simple_task.py` in your text editor or IDE (like VS Code, PyCharm, or Cursor) 2. Copy and paste the following code: ```python import asyncio from computer import Computer from agent import ComputerAgent async def run_simple_task(): async with Computer() as macos_computer: # Create agent with OpenAI loop agent = ComputerAgent( model="openai/computer-use-preview", tools=[macos_computer] ) # Define a simple task task = "Open Safari and search for 'Python tutorials'" # Run the task and process responses async for result in agent.run(task): print(f"Action: {result.get('text')}") # Run the example if __name__ == "__main__": asyncio.run(run_simple_task()) ``` 3. Save the file 4. Open a terminal, navigate to your project directory, and run: ```bash python simple_task.py ``` 5. The code will initialize the macOS virtual machine, create an agent, and execute the task of opening Safari and searching for Python tutorials. You can also run this in a VS Code notebook: 1. Create a new notebook in VS Code (.ipynb file) 2. Copy the code into a cell (without the `if __name__ == "__main__":` part) 3. Run the cell to execute the code You can find the full code in our [notebook](https://github.com/trycua/cua/blob/main/notebooks/blog/build-your-own-operator-on-macos-2.ipynb). Compare this to the manual implementation from Part 1 - we've reduced dozens of lines of code to just a few. The cua-agent framework handles all the complex logic internally, letting you focus on the overarching agentic system. ## Working with Multiple Tasks Another advantage of the cua-agent framework is easily chaining multiple tasks. Instead of managing complex state between tasks, you can simply provide a sequence of instructions to be executed in order: **How to run this example:** 1. Create a new file named `multi_task.py` with the following code: ```python import asyncio from computer import Computer from agent import ComputerAgent async def run_multi_task_workflow(): async with Computer() as macos_computer: agent = ComputerAgent( model="anthropic/claude-3-5-sonnet-20241022", tools=[macos_computer] ) tasks = [ "Open Safari and go to github.com", "Search for 'trycua/cua'", "Open the repository page", "Click on the 'Issues' tab", "Read the first open issue" ] for i, task in enumerate(tasks): print(f"\nTask {i+1}/{len(tasks)}: {task}") async for result in agent.run(task): # Print just the action description for brevity if result.get("text"): print(f" → {result.get('text')}") print(f"✅ Task {i+1} completed") if __name__ == "__main__": asyncio.run(run_multi_task_workflow()) ``` 2. Save the file 3. Make sure you have set your Anthropic API key: ```bash export ANTHROPIC_API_KEY=your_anthropic_key_here ``` 4. Run the script: ```bash python multi_task.py ``` This pattern is particularly useful for creating workflows that navigate through multiple steps of an application or process. The agent maintains visual context between tasks, making it more likely to successfully complete complex sequences of actions. ## Understanding the Response Format Each action taken by the agent returns a structured response following the OpenAI Agent SDK specification. This standardized format makes it easy to extract detailed information about what the agent is doing and why: ```python async for result in agent.run(task): # Basic information print(f"Response ID: {result.get('id')}") print(f"Response Text: {result.get('text')}") # Detailed token usage statistics usage = result.get('usage') if usage: print(f"Input Tokens: {usage.get('input_tokens')}") print(f"Output Tokens: {usage.get('output_tokens')}") # Reasoning and actions for output in result.get('output', []): if output.get('type') == 'reasoning': print(f"Reasoning: {output.get('summary', [{}])[0].get('text')}") elif output.get('type') == 'computer_call': action = output.get('action', {}) print(f"Action: {action.get('type')} at ({action.get('x')}, {action.get('y')})") ``` This structured format allows you to: - Log detailed information about agent actions - Provide real-time feedback to users - Track token usage for cost monitoring - Access the reasoning behind decisions for debugging or user explanation ## Using Local Models with OMNI One of the most powerful features of the framework is the ability to use local models via the OMNI loop. This approach dramatically reduces costs while maintaining acceptable reliability for many agentic workflows: **How to run this example:** 1. First, you'll need to install Ollama for running local models: - Visit [ollama.com](https://ollama.com) and download the installer for your OS - Follow the installation instructions - Pull the Gemma 3 model: ```bash ollama pull gemma3:4b-it-q4_K_M ``` 2. Create a file named `local_model.py` with this code: ```python import asyncio from computer import Computer from agent import ComputerAgent async def run_with_local_model(): async with Computer() as macos_computer: agent = ComputerAgent( model="omniparser+ollama_chat/gemma3", tools=[macos_computer] ) task = "Open the Calculator app and perform a simple calculation" async for result in agent.run(task): print(f"Action: {result.get('text')}") if __name__ == "__main__": asyncio.run(run_with_local_model()) ``` 3. Run the script: ```bash python local_model.py ``` You can also use other local model servers with the OAICOMPAT provider, which enables compatibility with any API endpoint following the OpenAI API structure: ```python agent = ComputerAgent( model=LLM( provider=LLMProvider.OAICOMPAT, name="gemma-3-12b-it", provider_base_url="http://localhost:1234/v1" # LM Studio endpoint ), tools=[macos_computer] ) ``` Common local endpoints include: - LM Studio: `http://localhost:1234/v1` - vLLM: `http://localhost:8000/v1` - LocalAI: `http://localhost:8080/v1` - Ollama with OpenAI compat: `http://localhost:11434/v1` This approach is perfect for: - Development and testing without incurring API costs - Offline or air-gapped environments where API access isn't possible - Privacy-sensitive applications where data can't leave your network - Experimenting with different models to find the best fit for your use case ## Deploying and Using UI-TARS UI-TARS is ByteDance's Computer-Use model designed for navigating OS-level interfaces. It shows excellent performance on desktop OS tasks. To use UI-TARS, you'll first need to deploy the model. ### Deployment Options 1. **Local Deployment**: Follow the [UI-TARS deployment guide](https://github.com/bytedance/UI-TARS/blob/main/README_deploy.md) to run the model locally. 2. **Hugging Face Endpoint**: Deploy UI-TARS on Hugging Face Inference Endpoints, which will give you a URL like: `https://**************.us-east-1.aws.endpoints.huggingface.cloud/v1` 3. **Using with cua-agent**: Once deployed, you can use UI-TARS with the cua-agent framework: ```python agent = ComputerAgent( model=LLM( provider=LLMProvider.OAICOMPAT, name="tgi", provider_base_url="https://**************.us-east-1.aws.endpoints.huggingface.cloud/v1" ), tools=[macos_computer] ) ``` UI-TARS is particularly useful for desktop automation tasks, as it shows the highest performance on OS-level benchmarks like OSworld and Windows Agent Arena. ## Understanding Agent Responses in Detail The `run()` method of your agent yields structured responses that follow the OpenAI Agent SDK specification. This provides a rich set of information beyond just the basic action text: ```python async for result in agent.run(task): # Basic ID and text print("Response ID:", result.get("id")) print("Response Text:", result.get("text")) # Token usage statistics usage = result.get("usage") if usage: print("\nUsage Details:") print(f" Input Tokens: {usage.get('input_tokens')}") if "input_tokens_details" in usage: print(f" Input Tokens Details: {usage.get('input_tokens_details')}") print(f" Output Tokens: {usage.get('output_tokens')}") if "output_tokens_details" in usage: print(f" Output Tokens Details: {usage.get('output_tokens_details')}") print(f" Total Tokens: {usage.get('total_tokens')}") # Detailed reasoning and actions outputs = result.get("output", []) for output in outputs: output_type = output.get("type") if output_type == "reasoning": print("\nReasoning:") for summary in output.get("summary", []): print(f" {summary.get('text')}") elif output_type == "computer_call": action = output.get("action", {}) print("\nComputer Action:") print(f" Type: {action.get('type')}") print(f" Position: ({action.get('x')}, {action.get('y')})") if action.get("text"): print(f" Text: {action.get('text')}") ``` This detailed information is invaluable for debugging, logging, and understanding the agent's decision-making process in an agentic system. More details can be found in the [OpenAI Agent SDK Specification](https://platform.openai.com/docs/guides/responses-vs-chat-completions). ## Building a Gradio UI For a visual interface to your agent, the package also includes a Gradio UI: **How to run the Gradio UI:** 1. Create a file named `launch_ui.py` with the following code: ```python from agent.ui.gradio.app import create_gradio_ui # Create and launch the UI if __name__ == "__main__": app = create_gradio_ui() app.launch(share=False) # Set share=False for local access only ``` 2. Install the UI dependencies if you haven't already: ```bash pip install "cua-agent[ui]" ``` 3. Run the script: ```bash python launch_ui.py ``` 4. Open your browser to the displayed URL (usually http://127.0.0.1:7860) **Creating a Shareable Link (Optional):** You can also create a temporary public URL to access your Gradio UI from anywhere: ```python # In launch_ui.py if __name__ == "__main__": app = create_gradio_ui() app.launch(share=True) # Creates a public link ``` When you run this, Gradio will display both a local URL and a public URL like: ``` Running on local URL: http://127.0.0.1:7860 Running on public URL: https://abcd1234.gradio.live ``` **Security Note:** Be cautious when sharing your Gradio UI publicly: - The public URL gives anyone with the link full access to your agent - Consider using basic authentication for additional protection: ```python app.launch(share=True, auth=("username", "password")) ``` - Only use this feature for personal or team use, not for production environments - The temporary link expires when you stop the Gradio application This provides: - Model provider selection - Agent loop selection - Task input field - Real-time display of VM screenshots - Action history ### Setting API Keys for the UI To use the UI with different providers, set your API keys as environment variables: ```bash # For OpenAI models export OPENAI_API_KEY=your_openai_key_here # For Anthropic models export ANTHROPIC_API_KEY=your_anthropic_key_here # Launch with both keys set OPENAI_API_KEY=your_key ANTHROPIC_API_KEY=your_key python launch_ui.py ``` ### UI Settings Persistence The Gradio UI automatically saves your configuration to maintain your preferences between sessions: - Settings like Agent Loop, Model Choice, Custom Base URL, and configuration options are saved to `.gradio_settings.json` in the project's root directory - These settings are loaded automatically when you restart the UI - API keys entered in the custom provider field are **not** saved for security reasons - It's recommended to add `.gradio_settings.json` to your `.gitignore` file ## Advanced Example: GitHub Repository Workflow Let's look at a more complex example that automates a GitHub workflow: **How to run this advanced example:** 1. Create a file named `github_workflow.py` with the following code: ```python import asyncio import logging from computer import Computer from agent import ComputerAgent async def github_workflow(): async with Computer(verbosity=logging.INFO) as macos_computer: agent = ComputerAgent( model="openai/computer-use-preview", save_trajectory=True, # Save screenshots for debugging only_n_most_recent_images=3, # Only keep last 3 images in context verbosity=logging.INFO, tools=[macos_computer] ) tasks = [ "Look for a repository named trycua/cua on GitHub.", "Check the open issues, open the most recent one and read it.", "Clone the repository in users/lume/projects if it doesn't exist yet.", "Open the repository with Cursor (on the dock, black background and white cube icon).", "From Cursor, open Composer if not already open.", "Focus on the Composer text area, then write and submit a task to help resolve the GitHub issue.", ] for i, task in enumerate(tasks): print(f"\nExecuting task {i+1}/{len(tasks)}: {task}") async for result in agent.run(task): print(f"Action: {result.get('text')}") print(f"✅ Task {i+1}/{len(tasks)} completed") if __name__ == "__main__": asyncio.run(github_workflow()) ``` 2. Make sure your OpenAI API key is set: ```bash export OPENAI_API_KEY=your_openai_key_here ``` 3. Run the script: ```bash python github_workflow.py ``` 4. Watch as the agent completes the entire workflow: - The agent will navigate to GitHub - Find and investigate issues in the repository - Clone the repository to the local machine - Open it in Cursor - Use Cursor's AI features to work on a solution This example: 1. Searches GitHub for a repository 2. Reads an issue 3. Clones the repository 4. Opens it in an IDE 5. Uses AI to write a solution ## Comparing Implementation Approaches Let's compare our manual implementation from Part 1 with the framework approach: ### Manual Implementation (Part 1) - Required writing custom code for the interaction loop - Needed explicit handling of different action types - Required direct management of the OpenAI API calls - Around 50-100 lines of code for basic functionality - Limited to OpenAI's computer-use model ### Framework Implementation (Part 2) - Abstracts the interaction loop - Handles all action types automatically - Manages API calls internally - Only 10-15 lines of code for the same functionality - Works with multiple model providers - Includes UI capabilities ## Conclusion The `cua-agent` framework transforms what was a complex implementation task into a simple, high-level interface for building Computer-Use Agents. By abstracting away the technical details, it lets you focus on defining the tasks rather than the machinery. ### When to Use Each Approach - **Manual Implementation (Part 1)**: When you need complete control over the interaction loop or are implementing a custom solution - **Framework (Part 2)**: For most applications where you want to quickly build and deploy Computer-Use Agents ### Next Steps With the basics covered, you might want to explore: - Customizing the agent's behavior with additional parameters - Building more complex workflows spanning multiple applications - Integrating your agent into other applications - Contributing to the open-source project on GitHub ### Resources - [cua-agent GitHub repository](https://github.com/trycua/cua/tree/main/libs/agent) - [Agent Notebook Examples](https://github.com/trycua/cua/blob/main/notebooks/agent_nb.ipynb) - [OpenAI Agent SDK Specification](https://platform.openai.com/docs/api-reference/responses) - [Anthropic API Documentation](https://docs.anthropic.com/en/api/getting-started) - [UI-TARS GitHub](https://github.com/ByteDance/UI-TARS) - [OmniParser GitHub](https://github.com/microsoft/OmniParser) ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/agent.py: -------------------------------------------------------------------------------- ```python """ ComputerAgent - Main agent class that selects and runs agent loops """ import asyncio from pathlib import Path from typing import Dict, List, Any, Optional, AsyncGenerator, Union, cast, Callable, Set, Tuple from litellm.responses.utils import Usage from .types import ( Messages, AgentCapability, ToolError, IllegalArgumentError ) from .responses import make_tool_error_item, replace_failed_computer_calls_with_function_calls from .decorators import find_agent_config import json import litellm import litellm.utils import inspect from .adapters import ( HuggingFaceLocalAdapter, HumanAdapter, MLXVLMAdapter, ) from .callbacks import ( ImageRetentionCallback, LoggingCallback, TrajectorySaverCallback, BudgetManagerCallback, TelemetryCallback, OperatorNormalizerCallback, PromptInstructionsCallback, ) from .computers import ( AsyncComputerHandler, is_agent_computer, make_computer_handler ) def assert_callable_with(f, *args, **kwargs): """Check if function can be called with given arguments.""" try: inspect.signature(f).bind(*args, **kwargs) return True except TypeError as e: sig = inspect.signature(f) raise IllegalArgumentError(f"Expected {sig}, got args={args} kwargs={kwargs}") from e def get_json(obj: Any, max_depth: int = 10) -> Any: def custom_serializer(o: Any, depth: int = 0, seen: Optional[Set[int]] = None) -> Any: if seen is None: seen = set() # Use model_dump() if available if hasattr(o, 'model_dump'): return o.model_dump() # Check depth limit if depth > max_depth: return f"<max_depth_exceeded:{max_depth}>" # Check for circular references using object id obj_id = id(o) if obj_id in seen: return f"<circular_reference:{type(o).__name__}>" # Handle Computer objects if hasattr(o, '__class__') and 'computer' in getattr(o, '__class__').__name__.lower(): return f"<computer:{o.__class__.__name__}>" # Handle objects with __dict__ if hasattr(o, '__dict__'): seen.add(obj_id) try: result = {} for k, v in o.__dict__.items(): if v is not None: # Recursively serialize with updated depth and seen set serialized_value = custom_serializer(v, depth + 1, seen.copy()) result[k] = serialized_value return result finally: seen.discard(obj_id) # Handle common types that might contain nested objects elif isinstance(o, dict): seen.add(obj_id) try: return { k: custom_serializer(v, depth + 1, seen.copy()) for k, v in o.items() if v is not None } finally: seen.discard(obj_id) elif isinstance(o, (list, tuple, set)): seen.add(obj_id) try: return [ custom_serializer(item, depth + 1, seen.copy()) for item in o if item is not None ] finally: seen.discard(obj_id) # For basic types that json.dumps can handle elif isinstance(o, (str, int, float, bool)) or o is None: return o # Fallback to string representation else: return str(o) def remove_nones(obj: Any) -> Any: if isinstance(obj, dict): return {k: remove_nones(v) for k, v in obj.items() if v is not None} elif isinstance(obj, list): return [remove_nones(item) for item in obj if item is not None] return obj # Serialize with circular reference and depth protection serialized = custom_serializer(obj) # Convert to JSON string and back to ensure JSON compatibility json_str = json.dumps(serialized) parsed = json.loads(json_str) # Final cleanup of any remaining None values return remove_nones(parsed) def sanitize_message(msg: Any) -> Any: """Return a copy of the message with image_url omitted for computer_call_output messages.""" if msg.get("type") == "computer_call_output": output = msg.get("output", {}) if isinstance(output, dict): sanitized = msg.copy() sanitized["output"] = {**output, "image_url": "[omitted]"} return sanitized return msg def get_output_call_ids(messages: List[Dict[str, Any]]) -> List[str]: call_ids = [] for message in messages: if message.get("type") == "computer_call_output" or message.get("type") == "function_call_output": call_ids.append(message.get("call_id")) return call_ids class ComputerAgent: """ Main agent class that automatically selects the appropriate agent loop based on the model and executes tool calls. """ def __init__( self, model: str, tools: Optional[List[Any]] = None, custom_loop: Optional[Callable] = None, only_n_most_recent_images: Optional[int] = None, callbacks: Optional[List[Any]] = None, instructions: Optional[str] = None, verbosity: Optional[int] = None, trajectory_dir: Optional[str | Path | dict] = None, max_retries: Optional[int] = 3, screenshot_delay: Optional[float | int] = 0.5, use_prompt_caching: Optional[bool] = False, max_trajectory_budget: Optional[float | dict] = None, telemetry_enabled: Optional[bool] = True, trust_remote_code: Optional[bool] = False, **kwargs ): """ Initialize ComputerAgent. Args: model: Model name (e.g., "claude-3-5-sonnet-20241022", "computer-use-preview", "omni+vertex_ai/gemini-pro") tools: List of tools (computer objects, decorated functions, etc.) custom_loop: Custom agent loop function to use instead of auto-selection only_n_most_recent_images: If set, only keep the N most recent images in message history. Adds ImageRetentionCallback automatically. callbacks: List of AsyncCallbackHandler instances for preprocessing/postprocessing instructions: Optional system instructions to be passed to the model verbosity: Logging level (logging.DEBUG, logging.INFO, etc.). If set, adds LoggingCallback automatically trajectory_dir: If set, saves trajectory data (screenshots, responses) to this directory. Adds TrajectorySaverCallback automatically. max_retries: Maximum number of retries for failed API calls screenshot_delay: Delay before screenshots in seconds use_prompt_caching: If set, use prompt caching to avoid reprocessing the same prompt. Intended for use with anthropic providers. max_trajectory_budget: If set, adds BudgetManagerCallback to track usage costs and stop when budget is exceeded telemetry_enabled: If set, adds TelemetryCallback to track anonymized usage data. Enabled by default. trust_remote_code: If set, trust remote code when loading local models. Disabled by default. **kwargs: Additional arguments passed to the agent loop """ # If the loop is "human/human", we need to prefix a grounding model fallback if model in ["human/human", "human"]: model = "openai/computer-use-preview+human/human" self.model = model self.tools = tools or [] self.custom_loop = custom_loop self.only_n_most_recent_images = only_n_most_recent_images self.callbacks = callbacks or [] self.instructions = instructions self.verbosity = verbosity self.trajectory_dir = trajectory_dir self.max_retries = max_retries self.screenshot_delay = screenshot_delay self.use_prompt_caching = use_prompt_caching self.telemetry_enabled = telemetry_enabled self.kwargs = kwargs self.trust_remote_code = trust_remote_code # == Add built-in callbacks == # Prepend operator normalizer callback self.callbacks.insert(0, OperatorNormalizerCallback()) # Add prompt instructions callback if provided if self.instructions: self.callbacks.append(PromptInstructionsCallback(self.instructions)) # Add telemetry callback if telemetry_enabled is set if self.telemetry_enabled: if isinstance(self.telemetry_enabled, bool): self.callbacks.append(TelemetryCallback(self)) else: self.callbacks.append(TelemetryCallback(self, **self.telemetry_enabled)) # Add logging callback if verbosity is set if self.verbosity is not None: self.callbacks.append(LoggingCallback(level=self.verbosity)) # Add image retention callback if only_n_most_recent_images is set if self.only_n_most_recent_images: self.callbacks.append(ImageRetentionCallback(self.only_n_most_recent_images)) # Add trajectory saver callback if trajectory_dir is set if self.trajectory_dir: if isinstance(self.trajectory_dir, dict): self.callbacks.append(TrajectorySaverCallback(**self.trajectory_dir)) elif isinstance(self.trajectory_dir, (str, Path)): self.callbacks.append(TrajectorySaverCallback(str(self.trajectory_dir))) # Add budget manager if max_trajectory_budget is set if max_trajectory_budget: if isinstance(max_trajectory_budget, dict): self.callbacks.append(BudgetManagerCallback(**max_trajectory_budget)) else: self.callbacks.append(BudgetManagerCallback(max_trajectory_budget)) # == Enable local model providers w/ LiteLLM == # Register local model providers hf_adapter = HuggingFaceLocalAdapter( device="auto", trust_remote_code=self.trust_remote_code or False ) human_adapter = HumanAdapter() mlx_adapter = MLXVLMAdapter() litellm.custom_provider_map = [ {"provider": "huggingface-local", "custom_handler": hf_adapter}, {"provider": "human", "custom_handler": human_adapter}, {"provider": "mlx", "custom_handler": mlx_adapter} ] litellm.suppress_debug_info = True # == Initialize computer agent == # Find the appropriate agent loop if custom_loop: self.agent_loop = custom_loop self.agent_config_info = None else: config_info = find_agent_config(model) if not config_info: raise ValueError(f"No agent config found for model: {model}") # Instantiate the agent config class self.agent_loop = config_info.agent_class() self.agent_config_info = config_info self.tool_schemas = [] self.computer_handler = None async def _initialize_computers(self): """Initialize computer objects""" if not self.tool_schemas: # Process tools and create tool schemas self.tool_schemas = self._process_tools() # Find computer tool and create interface adapter computer_handler = None for schema in self.tool_schemas: if schema["type"] == "computer": computer_handler = await make_computer_handler(schema["computer"]) break self.computer_handler = computer_handler def _process_input(self, input: Messages) -> List[Dict[str, Any]]: """Process input messages and create schemas for the agent loop""" if isinstance(input, str): return [{"role": "user", "content": input}] return [get_json(msg) for msg in input] def _process_tools(self) -> List[Dict[str, Any]]: """Process tools and create schemas for the agent loop""" schemas = [] for tool in self.tools: # Check if it's a computer object (has interface attribute) if is_agent_computer(tool): # This is a computer tool - will be handled by agent loop schemas.append({ "type": "computer", "computer": tool }) elif callable(tool): # Use litellm.utils.function_to_dict to extract schema from docstring try: function_schema = litellm.utils.function_to_dict(tool) schemas.append({ "type": "function", "function": function_schema }) except Exception as e: print(f"Warning: Could not process tool {tool}: {e}") else: print(f"Warning: Unknown tool type: {tool}") return schemas def _get_tool(self, name: str) -> Optional[Callable]: """Get a tool by name""" for tool in self.tools: if hasattr(tool, '__name__') and tool.__name__ == name: return tool elif hasattr(tool, 'func') and tool.func.__name__ == name: return tool return None # ============================================================================ # AGENT RUN LOOP LIFECYCLE HOOKS # ============================================================================ async def _on_run_start(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]]) -> None: """Initialize run tracking by calling callbacks.""" for callback in self.callbacks: if hasattr(callback, 'on_run_start'): await callback.on_run_start(kwargs, old_items) async def _on_run_end(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]], new_items: List[Dict[str, Any]]) -> None: """Finalize run tracking by calling callbacks.""" for callback in self.callbacks: if hasattr(callback, 'on_run_end'): await callback.on_run_end(kwargs, old_items, new_items) async def _on_run_continue(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]], new_items: List[Dict[str, Any]]) -> bool: """Check if run should continue by calling callbacks.""" for callback in self.callbacks: if hasattr(callback, 'on_run_continue'): should_continue = await callback.on_run_continue(kwargs, old_items, new_items) if not should_continue: return False return True async def _on_llm_start(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Prepare messages for the LLM call by applying callbacks.""" result = messages for callback in self.callbacks: if hasattr(callback, 'on_llm_start'): result = await callback.on_llm_start(result) return result async def _on_llm_end(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Postprocess messages after the LLM call by applying callbacks.""" result = messages for callback in self.callbacks: if hasattr(callback, 'on_llm_end'): result = await callback.on_llm_end(result) return result async def _on_responses(self, kwargs: Dict[str, Any], responses: Dict[str, Any]) -> None: """Called when responses are received.""" for callback in self.callbacks: if hasattr(callback, 'on_responses'): await callback.on_responses(get_json(kwargs), get_json(responses)) async def _on_computer_call_start(self, item: Dict[str, Any]) -> None: """Called when a computer call is about to start.""" for callback in self.callbacks: if hasattr(callback, 'on_computer_call_start'): await callback.on_computer_call_start(get_json(item)) async def _on_computer_call_end(self, item: Dict[str, Any], result: List[Dict[str, Any]]) -> None: """Called when a computer call has completed.""" for callback in self.callbacks: if hasattr(callback, 'on_computer_call_end'): await callback.on_computer_call_end(get_json(item), get_json(result)) async def _on_function_call_start(self, item: Dict[str, Any]) -> None: """Called when a function call is about to start.""" for callback in self.callbacks: if hasattr(callback, 'on_function_call_start'): await callback.on_function_call_start(get_json(item)) async def _on_function_call_end(self, item: Dict[str, Any], result: List[Dict[str, Any]]) -> None: """Called when a function call has completed.""" for callback in self.callbacks: if hasattr(callback, 'on_function_call_end'): await callback.on_function_call_end(get_json(item), get_json(result)) async def _on_text(self, item: Dict[str, Any]) -> None: """Called when a text message is encountered.""" for callback in self.callbacks: if hasattr(callback, 'on_text'): await callback.on_text(get_json(item)) async def _on_api_start(self, kwargs: Dict[str, Any]) -> None: """Called when an LLM API call is about to start.""" for callback in self.callbacks: if hasattr(callback, 'on_api_start'): await callback.on_api_start(get_json(kwargs)) async def _on_api_end(self, kwargs: Dict[str, Any], result: Any) -> None: """Called when an LLM API call has completed.""" for callback in self.callbacks: if hasattr(callback, 'on_api_end'): await callback.on_api_end(get_json(kwargs), get_json(result)) async def _on_usage(self, usage: Dict[str, Any]) -> None: """Called when usage information is received.""" for callback in self.callbacks: if hasattr(callback, 'on_usage'): await callback.on_usage(get_json(usage)) async def _on_screenshot(self, screenshot: Union[str, bytes], name: str = "screenshot") -> None: """Called when a screenshot is taken.""" for callback in self.callbacks: if hasattr(callback, 'on_screenshot'): await callback.on_screenshot(screenshot, name) # ============================================================================ # AGENT OUTPUT PROCESSING # ============================================================================ async def _handle_item(self, item: Any, computer: Optional[AsyncComputerHandler] = None, ignore_call_ids: Optional[List[str]] = None) -> List[Dict[str, Any]]: """Handle each item; may cause a computer action + screenshot.""" call_id = item.get("call_id") if ignore_call_ids and call_id and call_id in ignore_call_ids: return [] item_type = item.get("type", None) if item_type == "message": await self._on_text(item) # # Print messages # if item.get("content"): # for content_item in item.get("content"): # if content_item.get("text"): # print(content_item.get("text")) return [] try: if item_type == "computer_call": await self._on_computer_call_start(item) if not computer: raise ValueError("Computer handler is required for computer calls") # Perform computer actions action = item.get("action") action_type = action.get("type") if action_type is None: print(f"Action type cannot be `None`: action={action}, action_type={action_type}") return [] # Extract action arguments (all fields except 'type') action_args = {k: v for k, v in action.items() if k != "type"} # print(f"{action_type}({action_args})") # Execute the computer action computer_method = getattr(computer, action_type, None) if computer_method: assert_callable_with(computer_method, **action_args) await computer_method(**action_args) else: raise ToolError(f"Unknown computer action: {action_type}") # Take screenshot after action if self.screenshot_delay and self.screenshot_delay > 0: await asyncio.sleep(self.screenshot_delay) screenshot_base64 = await computer.screenshot() await self._on_screenshot(screenshot_base64, "screenshot_after") # Handle safety checks pending_checks = item.get("pending_safety_checks", []) acknowledged_checks = [] for check in pending_checks: check_message = check.get("message", str(check)) acknowledged_checks.append(check) # TODO: implement a callback for safety checks # if acknowledge_safety_check_callback(check_message, allow_always=True): # acknowledged_checks.append(check) # else: # raise ValueError(f"Safety check failed: {check_message}") # Create call output call_output = { "type": "computer_call_output", "call_id": item.get("call_id"), "acknowledged_safety_checks": acknowledged_checks, "output": { "type": "input_image", "image_url": f"data:image/png;base64,{screenshot_base64}", }, } # # Additional URL safety checks for browser environments # if await computer.get_environment() == "browser": # current_url = await computer.get_current_url() # call_output["output"]["current_url"] = current_url # # TODO: implement a callback for URL safety checks # # check_blocklisted_url(current_url) result = [call_output] await self._on_computer_call_end(item, result) return result if item_type == "function_call": await self._on_function_call_start(item) # Perform function call function = self._get_tool(item.get("name")) if not function: raise ToolError(f"Function {item.get("name")} not found") args = json.loads(item.get("arguments")) # Validate arguments before execution assert_callable_with(function, **args) # Execute function - use asyncio.to_thread for non-async functions if inspect.iscoroutinefunction(function): result = await function(**args) else: result = await asyncio.to_thread(function, **args) # Create function call output call_output = { "type": "function_call_output", "call_id": item.get("call_id"), "output": str(result), } result = [call_output] await self._on_function_call_end(item, result) return result except ToolError as e: return [make_tool_error_item(repr(e), call_id)] return [] # ============================================================================ # MAIN AGENT LOOP # ============================================================================ async def run( self, messages: Messages, stream: bool = False, **kwargs ) -> AsyncGenerator[Dict[str, Any], None]: """ Run the agent with the given messages using Computer protocol handler pattern. Args: messages: List of message dictionaries stream: Whether to stream the response **kwargs: Additional arguments Returns: AsyncGenerator that yields response chunks """ if not self.agent_config_info: raise ValueError("Agent configuration not found") capabilities = self.get_capabilities() if "step" not in capabilities: raise ValueError(f"Agent loop {self.agent_config_info.agent_class.__name__} does not support step predictions") await self._initialize_computers() # Merge kwargs merged_kwargs = {**self.kwargs, **kwargs} old_items = self._process_input(messages) new_items = [] # Initialize run tracking run_kwargs = { "messages": messages, "stream": stream, "model": self.model, "agent_loop": self.agent_config_info.agent_class.__name__, **merged_kwargs } await self._on_run_start(run_kwargs, old_items) while new_items[-1].get("role") != "assistant" if new_items else True: # Lifecycle hook: Check if we should continue based on callbacks (e.g., budget manager) should_continue = await self._on_run_continue(run_kwargs, old_items, new_items) if not should_continue: break # Lifecycle hook: Prepare messages for the LLM call # Use cases: # - PII anonymization # - Image retention policy combined_messages = old_items + new_items combined_messages = replace_failed_computer_calls_with_function_calls(combined_messages) preprocessed_messages = await self._on_llm_start(combined_messages) loop_kwargs = { "messages": preprocessed_messages, "model": self.model, "tools": self.tool_schemas, "stream": False, "computer_handler": self.computer_handler, "max_retries": self.max_retries, "use_prompt_caching": self.use_prompt_caching, **merged_kwargs } # Run agent loop iteration result = await self.agent_loop.predict_step( **loop_kwargs, _on_api_start=self._on_api_start, _on_api_end=self._on_api_end, _on_usage=self._on_usage, _on_screenshot=self._on_screenshot, ) result = get_json(result) # Lifecycle hook: Postprocess messages after the LLM call # Use cases: # - PII deanonymization (if you want tool calls to see PII) result["output"] = await self._on_llm_end(result.get("output", [])) await self._on_responses(loop_kwargs, result) # Yield agent response yield result # Add agent response to new_items new_items += result.get("output") # Get output call ids output_call_ids = get_output_call_ids(result.get("output", [])) # Handle computer actions for item in result.get("output"): partial_items = await self._handle_item(item, self.computer_handler, ignore_call_ids=output_call_ids) new_items += partial_items # Yield partial response yield { "output": partial_items, "usage": Usage( prompt_tokens=0, completion_tokens=0, total_tokens=0, ) } await self._on_run_end(loop_kwargs, old_items, new_items) async def predict_click( self, instruction: str, image_b64: Optional[str] = None ) -> Optional[Tuple[int, int]]: """ Predict click coordinates based on image and instruction. Args: instruction: Instruction for where to click image_b64: Base64 encoded image (optional, will take screenshot if not provided) Returns: None or tuple with (x, y) coordinates """ if not self.agent_config_info: raise ValueError("Agent configuration not found") capabilities = self.get_capabilities() if "click" not in capabilities: raise ValueError(f"Agent loop {self.agent_config_info.agent_class.__name__} does not support click predictions") if hasattr(self.agent_loop, 'predict_click'): if not image_b64: if not self.computer_handler: raise ValueError("Computer tool or image_b64 is required for predict_click") image_b64 = await self.computer_handler.screenshot() return await self.agent_loop.predict_click( model=self.model, image_b64=image_b64, instruction=instruction ) return None def get_capabilities(self) -> List[AgentCapability]: """ Get list of capabilities supported by the current agent config. Returns: List of capability strings (e.g., ["step", "click"]) """ if not self.agent_config_info: raise ValueError("Agent configuration not found") if hasattr(self.agent_loop, 'get_capabilities'): return self.agent_loop.get_capabilities() return ["step"] # Default capability ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/human_tool/ui.py: -------------------------------------------------------------------------------- ```python import gradio as gr import json import time from typing import List, Dict, Any, Optional from datetime import datetime import requests from .server import completion_queue import base64 import io from PIL import Image class HumanCompletionUI: def __init__(self, server_url: str = "http://localhost:8002"): self.server_url = server_url self.current_call_id: Optional[str] = None self.refresh_interval = 2.0 # seconds self.last_image = None # Store the last image for display # Track current interactive action controls self.current_action_type: str = "click" self.current_button: str = "left" self.current_scroll_x: int = 0 self.current_scroll_y: int = -120 def format_messages_for_chatbot(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Format messages for display in gr.Chatbot with type='messages'.""" formatted = [] for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") tool_calls = msg.get("tool_calls", []) # Handle different content formats if isinstance(content, list): # Multi-modal content - can include text and images formatted_content = [] for item in content: if item.get("type") == "text": text = item.get("text", "") if text.strip(): # Only add non-empty text formatted_content.append(text) elif item.get("type") == "image_url": image_url = item.get("image_url", {}).get("url", "") if image_url: # Check if it's a base64 image or URL if image_url.startswith("data:image"): # For base64 images, decode and create gr.Image try: header, data = image_url.split(",", 1) image_data = base64.b64decode(data) image = Image.open(io.BytesIO(image_data)) formatted_content.append(gr.Image(value=image)) except Exception as e: print(f"Error loading image: {e}") formatted_content.append(f"[Image loading error: {e}]") else: # For URL images, create gr.Image with URL formatted_content.append(gr.Image(value=image_url)) # Determine final content format if len(formatted_content) == 1: content = formatted_content[0] elif len(formatted_content) > 1: content = formatted_content else: content = "[Empty content]" # Ensure role is valid for Gradio Chatbot if role not in ["user", "assistant"]: role = "assistant" if role == "system" else "user" # Invert roles for better display in human UI context # (what the AI says becomes "user", what human should respond becomes "assistant") if role == "user": role = "assistant" else: role = "user" # Add the main message if it has content if content and str(content).strip(): formatted.append({"role": role, "content": content}) # Handle tool calls - create separate messages for each tool call if tool_calls: for tool_call in tool_calls: function_name = tool_call.get("function", {}).get("name", "unknown") arguments_str = tool_call.get("function", {}).get("arguments", "{}") try: # Parse arguments to format them nicely arguments = json.loads(arguments_str) formatted_args = json.dumps(arguments, indent=2) except json.JSONDecodeError: # If parsing fails, use the raw string formatted_args = arguments_str # Create a formatted message for the tool call tool_call_content = f"```json\n{formatted_args}\n```" formatted.append({ "role": role, "content": tool_call_content, "metadata": {"title": f"🛠️ Used {function_name}"} }) return formatted def get_pending_calls(self) -> List[Dict[str, Any]]: """Get pending calls from the server.""" try: response = requests.get(f"{self.server_url}/pending", timeout=5) if response.status_code == 200: return response.json().get("pending_calls", []) except Exception as e: print(f"Error fetching pending calls: {e}") return [] def complete_call_with_response(self, call_id: str, response: str) -> bool: """Complete a call with a text response.""" try: response_data = {"response": response} response_obj = requests.post( f"{self.server_url}/complete/{call_id}", json=response_data, timeout=10 ) response_obj.raise_for_status() return True except requests.RequestException as e: print(f"Error completing call: {e}") return False def complete_call_with_tool_calls(self, call_id: str, tool_calls: List[Dict[str, Any]]) -> bool: """Complete a call with tool calls.""" try: response_data = {"tool_calls": tool_calls} response_obj = requests.post( f"{self.server_url}/complete/{call_id}", json=response_data, timeout=10 ) response_obj.raise_for_status() return True except requests.RequestException as e: print(f"Error completing call: {e}") return False def complete_call(self, call_id: str, response: Optional[str] = None, tool_calls: Optional[List[Dict[str, Any]]] = None) -> bool: """Complete a call with either a response or tool calls.""" try: response_data = {} if response: response_data["response"] = response if tool_calls: response_data["tool_calls"] = tool_calls response_obj = requests.post( f"{self.server_url}/complete/{call_id}", json=response_data, timeout=10 ) response_obj.raise_for_status() return True except requests.RequestException as e: print(f"Error completing call: {e}") return False def get_last_image_from_messages(self, messages: List[Dict[str, Any]]) -> Optional[Any]: """Extract the last image from the messages for display above conversation.""" last_image = None for msg in reversed(messages): # Start from the last message content = msg.get("content", "") if isinstance(content, list): for item in reversed(content): # Get the last image in the message if item.get("type") == "image_url": image_url = item.get("image_url", {}).get("url", "") if image_url: if image_url.startswith("data:image"): # For base64 images, create a gr.Image component try: header, data = image_url.split(",", 1) image_data = base64.b64decode(data) image = Image.open(io.BytesIO(image_data)) return image except Exception as e: print(f"Error loading image: {e}") continue else: # For URL images, return the URL return image_url return last_image def refresh_pending_calls(self): """Refresh the list of pending calls.""" pending_calls = self.get_pending_calls() if not pending_calls: return ( gr.update(choices=["latest"], value="latest"), # dropdown gr.update(value=None), # image (no image) gr.update(value=[]), # chatbot (empty messages) gr.update(interactive=False), # submit button gr.update(visible=False), # click_actions_group hidden gr.update(visible=False), # actions_group hidden ) # Sort pending calls by created_at to get oldest first sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", "")) # Create choices for dropdown choices = [("latest", "latest")] # Add "latest" option first for call in sorted_calls: call_id = call["id"] model = call.get("model", "unknown") created_at = call.get("created_at", "") # Format timestamp try: dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) time_str = dt.strftime("%H:%M:%S") except: time_str = created_at choice_label = f"{call_id[:8]}... ({model}) - {time_str}" choices.append((choice_label, call_id)) # Default to "latest" which shows the oldest pending conversation selected_call_id = "latest" if selected_call_id == "latest" and sorted_calls: # Use the oldest call (first in sorted list) selected_call = sorted_calls[0] conversation = self.format_messages_for_chatbot(selected_call.get("messages", [])) self.current_call_id = selected_call["id"] # Get the last image from messages self.last_image = self.get_last_image_from_messages(selected_call.get("messages", [])) else: conversation = [] self.current_call_id = None self.last_image = None return ( gr.update(choices=choices, value="latest"), gr.update(value=self.last_image), gr.update(value=conversation), gr.update(interactive=bool(choices)), gr.update(visible=True), # click_actions_group visible when there is a call gr.update(visible=True), # actions_group visible when there is a call ) def on_call_selected(self, selected_choice): """Handle when a call is selected from the dropdown.""" if not selected_choice: return ( gr.update(value=None), # no image gr.update(value=[]), # empty chatbot gr.update(interactive=False), gr.update(visible=False), # click_actions_group hidden gr.update(visible=False), # actions_group hidden ) pending_calls = self.get_pending_calls() if not pending_calls: return ( gr.update(value=None), # no image gr.update(value=[]), # empty chatbot gr.update(interactive=False), gr.update(visible=False), # click_actions_group hidden gr.update(visible=False), # actions_group hidden ) # Handle "latest" option if selected_choice == "latest": # Sort calls by created_at to get oldest first sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", "")) selected_call = sorted_calls[0] # Get the oldest call call_id = selected_call["id"] else: # Extract call_id from the choice for specific calls call_id = None for call in pending_calls: call_id_short = call["id"][:8] if call_id_short in selected_choice: call_id = call["id"] break if not call_id: return ( gr.update(value=None), # no image gr.update(value=[]), # empty chatbot gr.update(interactive=False) ) # Find the selected call selected_call = next((c for c in pending_calls if c["id"] == call_id), None) if not selected_call: return ( gr.update(value=None), # no image gr.update(value=[]), # empty chatbot gr.update(interactive=False), gr.update(visible=False), # click_actions_group hidden gr.update(visible=False), # actions_group hidden ) conversation = self.format_messages_for_chatbot(selected_call.get("messages", [])) self.current_call_id = call_id # Get the last image from messages self.last_image = self.get_last_image_from_messages(selected_call.get("messages", [])) return ( gr.update(value=self.last_image), gr.update(value=conversation), gr.update(interactive=True), gr.update(visible=True), # click_actions_group visible gr.update(visible=True), # actions_group visible ) def submit_response(self, response_text: str): """Submit a text response to the current call.""" if not self.current_call_id: return ( gr.update(value=response_text), # keep response text gr.update(value="❌ No call selected") # status ) if not response_text.strip(): return ( gr.update(value=response_text), # keep response text gr.update(value="❌ Response cannot be empty") # status ) success = self.complete_call_with_response(self.current_call_id, response_text) if success: status_msg = "✅ Response submitted successfully!" return ( gr.update(value=""), # clear response text gr.update(value=status_msg) # status ) else: return ( gr.update(value=response_text), # keep response text gr.update(value="❌ Failed to submit response") # status ) def submit_action(self, action_type: str, **kwargs) -> str: """Submit a computer action as a tool call.""" if not self.current_call_id: return "❌ No call selected" import uuid # Create tool call structure action_data = {"type": action_type, **kwargs} tool_call = { "id": f"call_{uuid.uuid4().hex[:24]}", "type": "function", "function": { "name": "computer", "arguments": json.dumps(action_data) } } success = self.complete_call_with_tool_calls(self.current_call_id, [tool_call]) if success: return f"✅ {action_type.capitalize()} action submitted as tool call" else: return f"❌ Failed to submit {action_type} action" def submit_click_action(self, x: int, y: int, action_type: str = "click", button: str = "left") -> str: """Submit a coordinate-based action.""" if action_type == "click": return self.submit_action(action_type, x=x, y=y, button=button) else: return self.submit_action(action_type, x=x, y=y) def submit_type_action(self, text: str) -> str: """Submit a type action.""" return self.submit_action("type", text=text) def submit_hotkey_action(self, keys: str) -> str: """Submit a hotkey action.""" return self.submit_action("keypress", keys=keys) def submit_wait_action(self) -> str: """Submit a wait action with no kwargs.""" return self.submit_action("wait") def submit_description_click(self, description: str, action_type: str = "click", button: str = "left") -> str: """Submit a description-based action.""" if action_type == "click": return self.submit_action(action_type, element_description=description, button=button) else: return self.submit_action(action_type, element_description=description) def wait_for_pending_calls(self, max_seconds: float = 10.0, check_interval: float = 0.2): """Wait for pending calls to appear or until max_seconds elapsed. This method loops and checks for pending calls at regular intervals, returning as soon as a pending call is found or the maximum wait time is reached. Args: max_seconds: Maximum number of seconds to wait check_interval: How often to check for pending calls (in seconds) """ import time start_time = time.time() while time.time() - start_time < max_seconds: # Check if there are any pending calls pending_calls = self.get_pending_calls() if pending_calls: # Found pending calls, return immediately return self.refresh_pending_calls() # Wait before checking again time.sleep(check_interval) # Max wait time reached, return current state return self.refresh_pending_calls() def create_ui(): """Create the Gradio interface.""" ui_handler = HumanCompletionUI() with gr.Blocks(title="Human-in-the-Loop Agent Tool", fill_width=True) as demo: gr.Markdown("# 🤖 Human-in-the-Loop Agent Tool") gr.Markdown("Review AI conversation requests and provide human responses.") with gr.Row(): with gr.Column(scale=2): with gr.Group(): screenshot_image = gr.Image( label="Interactive Screenshot", interactive=False, height=600 ) # Action type selection for image clicks (wrapped for visibility control) with gr.Group(visible=False) as click_actions_group: with gr.Row(): action_type_radio = gr.Dropdown( label="Interactive Action", choices=["click", "double_click", "move", "left_mouse_up", "left_mouse_down", "scroll"], value="click", scale=2 ) action_button_radio = gr.Dropdown( label="Button", choices=["left", "right", "wheel", "back", "forward"], value="left", visible=True, scale=1 ) scroll_x_input = gr.Number( label="scroll_x", value=0, visible=False, scale=1 ) scroll_y_input = gr.Number( label="scroll_y", value=-120, visible=False, scale=1 ) conversation_chatbot = gr.Chatbot( label="Conversation", type="messages", height=500, show_copy_button=True ) with gr.Column(scale=1): with gr.Group(): call_dropdown = gr.Dropdown( label="Select a pending conversation request", choices=["latest"], interactive=True, value="latest" ) refresh_btn = gr.Button("🔄 Refresh", variant="secondary") status_display = gr.Textbox( label="Status", interactive=False, value="Ready to receive requests..." ) with gr.Group(): response_text = gr.Textbox( label="Message", lines=3, placeholder="Enter your message here..." ) submit_btn = gr.Button("📤 Submit Message", variant="primary", interactive=False) # Action Accordions (wrapped for visibility control) with gr.Group(visible=False) as actions_group: with gr.Tabs(): with gr.Tab("🖱️ Click Actions"): with gr.Group(): description_text = gr.Textbox( label="Element Description", placeholder="e.g., 'Privacy and security option in left sidebar'" ) with gr.Row(): description_action_type = gr.Dropdown( label="Action", choices=["click", "double_click", "move", "left_mouse_up", "left_mouse_down"], value="click" ) description_button = gr.Dropdown( label="Button", choices=["left", "right", "wheel", "back", "forward"], value="left" ) description_submit_btn = gr.Button("Submit Click Action") with gr.Tab("📝 Type Action"): with gr.Group(): type_text = gr.Textbox( label="Text to Type", placeholder="Enter text to type..." ) type_submit_btn = gr.Button("Submit Type") with gr.Tab("⌨️ Keypress Action"): with gr.Group(): keypress_text = gr.Textbox( label="Keys", placeholder="e.g., ctrl+c, alt+tab" ) keypress_submit_btn = gr.Button("Submit Keypress") with gr.Tab("🧰 Misc Actions"): with gr.Group(): misc_action_dropdown = gr.Dropdown( label="Action", choices=["wait"], value="wait" ) misc_submit_btn = gr.Button("Submit Action") # Event handlers refresh_btn.click( fn=ui_handler.refresh_pending_calls, outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group] ) call_dropdown.change( fn=ui_handler.on_call_selected, inputs=[call_dropdown], outputs=[screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group] ) def handle_image_click(evt: gr.SelectData): if evt.index is not None: x, y = evt.index action_type = ui_handler.current_action_type or "click" button = ui_handler.current_button or "left" if action_type == "scroll": sx_i = int(ui_handler.current_scroll_x or 0) sy_i = int(ui_handler.current_scroll_y or 0) # Submit a scroll action with x,y position and scroll deltas result = ui_handler.submit_action("scroll", x=x, y=y, scroll_x=sx_i, scroll_y=sy_i) else: result = ui_handler.submit_click_action(x, y, action_type, button) ui_handler.wait_for_pending_calls() return result return "No coordinates selected" screenshot_image.select( fn=handle_image_click, outputs=[status_display] ).then( fn=ui_handler.wait_for_pending_calls, outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group] ) # Response submission submit_btn.click( fn=ui_handler.submit_response, inputs=[response_text], outputs=[response_text, status_display] ).then( fn=ui_handler.refresh_pending_calls, outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group] ) # Toggle visibility of controls based on action type def toggle_action_controls(action_type): # Button visible only for click button_vis = gr.update(visible=(action_type == "click")) # Scroll inputs visible only for scroll scroll_x_vis = gr.update(visible=(action_type == "scroll")) scroll_y_vis = gr.update(visible=(action_type == "scroll")) # Update state ui_handler.current_action_type = action_type or "click" return button_vis, scroll_x_vis, scroll_y_vis action_type_radio.change( fn=toggle_action_controls, inputs=[action_type_radio], outputs=[action_button_radio, scroll_x_input, scroll_y_input] ) # Keep other control values in ui_handler state def on_button_change(val): ui_handler.current_button = (val or "left") action_button_radio.change( fn=on_button_change, inputs=[action_button_radio] ) def on_scroll_x_change(val): try: ui_handler.current_scroll_x = int(val) if val is not None else 0 except Exception: ui_handler.current_scroll_x = 0 scroll_x_input.change( fn=on_scroll_x_change, inputs=[scroll_x_input] ) def on_scroll_y_change(val): try: ui_handler.current_scroll_y = int(val) if val is not None else 0 except Exception: ui_handler.current_scroll_y = 0 scroll_y_input.change( fn=on_scroll_y_change, inputs=[scroll_y_input] ) type_submit_btn.click( fn=ui_handler.submit_type_action, inputs=[type_text], outputs=[status_display] ).then( fn=ui_handler.wait_for_pending_calls, outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group] ) keypress_submit_btn.click( fn=ui_handler.submit_hotkey_action, inputs=[keypress_text], outputs=[status_display] ).then( fn=ui_handler.wait_for_pending_calls, outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group] ) def handle_description_submit(description, action_type, button): if description: result = ui_handler.submit_description_click(description, action_type, button) ui_handler.wait_for_pending_calls() return result return "Please enter a description" description_submit_btn.click( fn=handle_description_submit, inputs=[description_text, description_action_type, description_button], outputs=[status_display] ).then( fn=ui_handler.wait_for_pending_calls, outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group] ) # Misc action handler def handle_misc_submit(selected_action): if selected_action == "wait": result = ui_handler.submit_wait_action() ui_handler.wait_for_pending_calls() return result return f"Unsupported misc action: {selected_action}" misc_submit_btn.click( fn=handle_misc_submit, inputs=[misc_action_dropdown], outputs=[status_display] ).then( fn=ui_handler.wait_for_pending_calls, outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group] ) # Load initial data demo.load( fn=ui_handler.refresh_pending_calls, outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group] ) return demo if __name__ == "__main__": demo = create_ui() demo.queue() demo.launch(server_name="0.0.0.0", server_port=7860) ```