This is page 5 of 16. Use http://codebase.md/trycua/cua?page={x} to view the full context. # Directory Structure ``` ├── .all-contributorsrc ├── .cursorignore ├── .devcontainer │ ├── devcontainer.json │ ├── post-install.sh │ └── README.md ├── .dockerignore ├── .gitattributes ├── .github │ ├── FUNDING.yml │ ├── scripts │ │ ├── get_pyproject_version.py │ │ └── tests │ │ ├── __init__.py │ │ ├── README.md │ │ └── test_get_pyproject_version.py │ └── workflows │ ├── ci-lume.yml │ ├── docker-publish-kasm.yml │ ├── docker-publish-xfce.yml │ ├── docker-reusable-publish.yml │ ├── npm-publish-computer.yml │ ├── npm-publish-core.yml │ ├── publish-lume.yml │ ├── pypi-publish-agent.yml │ ├── pypi-publish-computer-server.yml │ ├── pypi-publish-computer.yml │ ├── pypi-publish-core.yml │ ├── pypi-publish-mcp-server.yml │ ├── pypi-publish-pylume.yml │ ├── pypi-publish-som.yml │ ├── pypi-reusable-publish.yml │ └── test-validation-script.yml ├── .gitignore ├── .vscode │ ├── docs.code-workspace │ ├── launch.json │ ├── libs-ts.code-workspace │ ├── lume.code-workspace │ ├── lumier.code-workspace │ ├── py.code-workspace │ └── settings.json ├── blog │ ├── app-use.md │ ├── assets │ │ ├── composite-agents.png │ │ ├── docker-ubuntu-support.png │ │ ├── hack-booth.png │ │ ├── hack-closing-ceremony.jpg │ │ ├── hack-cua-ollama-hud.jpeg │ │ ├── hack-leaderboard.png │ │ ├── hack-the-north.png │ │ ├── hack-winners.jpeg │ │ ├── hack-workshop.jpeg │ │ ├── hud-agent-evals.png │ │ └── trajectory-viewer.jpeg │ ├── bringing-computer-use-to-the-web.md │ ├── build-your-own-operator-on-macos-1.md │ ├── build-your-own-operator-on-macos-2.md │ ├── composite-agents.md │ ├── cua-hackathon.md │ ├── hack-the-north.md │ ├── hud-agent-evals.md │ ├── human-in-the-loop.md │ ├── introducing-cua-cloud-containers.md │ ├── lume-to-containerization.md │ ├── sandboxed-python-execution.md │ ├── training-computer-use-models-trajectories-1.md │ ├── trajectory-viewer.md │ ├── ubuntu-docker-support.md │ └── windows-sandbox.md ├── CONTRIBUTING.md ├── Development.md ├── Dockerfile ├── docs │ ├── .gitignore │ ├── .prettierrc │ ├── content │ │ └── docs │ │ ├── agent-sdk │ │ │ ├── agent-loops.mdx │ │ │ ├── benchmarks │ │ │ │ ├── index.mdx │ │ │ │ ├── interactive.mdx │ │ │ │ ├── introduction.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── osworld-verified.mdx │ │ │ │ ├── screenspot-pro.mdx │ │ │ │ └── screenspot-v2.mdx │ │ │ ├── callbacks │ │ │ │ ├── agent-lifecycle.mdx │ │ │ │ ├── cost-saving.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── logging.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── pii-anonymization.mdx │ │ │ │ └── trajectories.mdx │ │ │ ├── chat-history.mdx │ │ │ ├── custom-computer-handlers.mdx │ │ │ ├── custom-tools.mdx │ │ │ ├── customizing-computeragent.mdx │ │ │ ├── integrations │ │ │ │ ├── hud.mdx │ │ │ │ └── meta.json │ │ │ ├── message-format.mdx │ │ │ ├── meta.json │ │ │ ├── migration-guide.mdx │ │ │ ├── prompt-caching.mdx │ │ │ ├── supported-agents │ │ │ │ ├── composed-agents.mdx │ │ │ │ ├── computer-use-agents.mdx │ │ │ │ ├── grounding-models.mdx │ │ │ │ ├── human-in-the-loop.mdx │ │ │ │ └── meta.json │ │ │ ├── supported-model-providers │ │ │ │ ├── index.mdx │ │ │ │ └── local-models.mdx │ │ │ └── usage-tracking.mdx │ │ ├── computer-sdk │ │ │ ├── cloud-vm-management.mdx │ │ │ ├── commands.mdx │ │ │ ├── computer-ui.mdx │ │ │ ├── computers.mdx │ │ │ ├── meta.json │ │ │ └── sandboxed-python.mdx │ │ ├── index.mdx │ │ ├── libraries │ │ │ ├── agent │ │ │ │ └── index.mdx │ │ │ ├── computer │ │ │ │ └── index.mdx │ │ │ ├── computer-server │ │ │ │ ├── Commands.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── REST-API.mdx │ │ │ │ └── WebSocket-API.mdx │ │ │ ├── core │ │ │ │ └── index.mdx │ │ │ ├── lume │ │ │ │ ├── cli-reference.mdx │ │ │ │ ├── faq.md │ │ │ │ ├── http-api.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── meta.json │ │ │ │ └── prebuilt-images.mdx │ │ │ ├── lumier │ │ │ │ ├── building-lumier.mdx │ │ │ │ ├── docker-compose.mdx │ │ │ │ ├── docker.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ └── meta.json │ │ │ ├── mcp-server │ │ │ │ ├── client-integrations.mdx │ │ │ │ ├── configuration.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── llm-integrations.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── tools.mdx │ │ │ │ └── usage.mdx │ │ │ └── som │ │ │ ├── configuration.mdx │ │ │ └── index.mdx │ │ ├── meta.json │ │ ├── quickstart-cli.mdx │ │ ├── quickstart-devs.mdx │ │ └── telemetry.mdx │ ├── next.config.mjs │ ├── package-lock.json │ ├── package.json │ ├── pnpm-lock.yaml │ ├── postcss.config.mjs │ ├── public │ │ └── img │ │ ├── agent_gradio_ui.png │ │ ├── agent.png │ │ ├── cli.png │ │ ├── computer.png │ │ ├── som_box_threshold.png │ │ └── som_iou_threshold.png │ ├── README.md │ ├── source.config.ts │ ├── src │ │ ├── app │ │ │ ├── (home) │ │ │ │ ├── [[...slug]] │ │ │ │ │ └── page.tsx │ │ │ │ └── layout.tsx │ │ │ ├── api │ │ │ │ └── search │ │ │ │ └── route.ts │ │ │ ├── favicon.ico │ │ │ ├── global.css │ │ │ ├── layout.config.tsx │ │ │ ├── layout.tsx │ │ │ ├── llms.mdx │ │ │ │ └── [[...slug]] │ │ │ │ └── route.ts │ │ │ └── llms.txt │ │ │ └── route.ts │ │ ├── assets │ │ │ ├── discord-black.svg │ │ │ ├── discord-white.svg │ │ │ ├── logo-black.svg │ │ │ └── logo-white.svg │ │ ├── components │ │ │ ├── iou.tsx │ │ │ └── mermaid.tsx │ │ ├── lib │ │ │ ├── llms.ts │ │ │ └── source.ts │ │ └── mdx-components.tsx │ └── tsconfig.json ├── examples │ ├── agent_examples.py │ ├── agent_ui_examples.py │ ├── cloud_api_examples.py │ ├── computer_examples_windows.py │ ├── computer_examples.py │ ├── computer_ui_examples.py │ ├── computer-example-ts │ │ ├── .env.example │ │ ├── .gitignore │ │ ├── .prettierrc │ │ ├── package-lock.json │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── README.md │ │ ├── src │ │ │ ├── helpers.ts │ │ │ └── index.ts │ │ └── tsconfig.json │ ├── docker_examples.py │ ├── evals │ │ ├── hud_eval_examples.py │ │ └── wikipedia_most_linked.txt │ ├── pylume_examples.py │ ├── sandboxed_functions_examples.py │ ├── som_examples.py │ ├── utils.py │ └── winsandbox_example.py ├── img │ ├── agent_gradio_ui.png │ ├── agent.png │ ├── cli.png │ ├── computer.png │ ├── logo_black.png │ └── logo_white.png ├── libs │ ├── kasm │ │ ├── Dockerfile │ │ ├── LICENSE │ │ ├── README.md │ │ └── src │ │ └── ubuntu │ │ └── install │ │ └── firefox │ │ ├── custom_startup.sh │ │ ├── firefox.desktop │ │ └── install_firefox.sh │ ├── lume │ │ ├── .cursorignore │ │ ├── CONTRIBUTING.md │ │ ├── Development.md │ │ ├── img │ │ │ └── cli.png │ │ ├── Package.resolved │ │ ├── Package.swift │ │ ├── README.md │ │ ├── resources │ │ │ └── lume.entitlements │ │ ├── scripts │ │ │ ├── build │ │ │ │ ├── build-debug.sh │ │ │ │ ├── build-release-notarized.sh │ │ │ │ └── build-release.sh │ │ │ └── install.sh │ │ ├── src │ │ │ ├── Commands │ │ │ │ ├── Clone.swift │ │ │ │ ├── Config.swift │ │ │ │ ├── Create.swift │ │ │ │ ├── Delete.swift │ │ │ │ ├── Get.swift │ │ │ │ ├── Images.swift │ │ │ │ ├── IPSW.swift │ │ │ │ ├── List.swift │ │ │ │ ├── Logs.swift │ │ │ │ ├── Options │ │ │ │ │ └── FormatOption.swift │ │ │ │ ├── Prune.swift │ │ │ │ ├── Pull.swift │ │ │ │ ├── Push.swift │ │ │ │ ├── Run.swift │ │ │ │ ├── Serve.swift │ │ │ │ ├── Set.swift │ │ │ │ └── Stop.swift │ │ │ ├── ContainerRegistry │ │ │ │ ├── ImageContainerRegistry.swift │ │ │ │ ├── ImageList.swift │ │ │ │ └── ImagesPrinter.swift │ │ │ ├── Errors │ │ │ │ └── Errors.swift │ │ │ ├── FileSystem │ │ │ │ ├── Home.swift │ │ │ │ ├── Settings.swift │ │ │ │ ├── VMConfig.swift │ │ │ │ ├── VMDirectory.swift │ │ │ │ └── VMLocation.swift │ │ │ ├── LumeController.swift │ │ │ ├── Main.swift │ │ │ ├── Server │ │ │ │ ├── Handlers.swift │ │ │ │ ├── HTTP.swift │ │ │ │ ├── Requests.swift │ │ │ │ ├── Responses.swift │ │ │ │ └── Server.swift │ │ │ ├── Utils │ │ │ │ ├── CommandRegistry.swift │ │ │ │ ├── CommandUtils.swift │ │ │ │ ├── Logger.swift │ │ │ │ ├── NetworkUtils.swift │ │ │ │ ├── Path.swift │ │ │ │ ├── ProcessRunner.swift │ │ │ │ ├── ProgressLogger.swift │ │ │ │ ├── String.swift │ │ │ │ └── Utils.swift │ │ │ ├── Virtualization │ │ │ │ ├── DarwinImageLoader.swift │ │ │ │ ├── DHCPLeaseParser.swift │ │ │ │ ├── ImageLoaderFactory.swift │ │ │ │ └── VMVirtualizationService.swift │ │ │ ├── VM │ │ │ │ ├── DarwinVM.swift │ │ │ │ ├── LinuxVM.swift │ │ │ │ ├── VM.swift │ │ │ │ ├── VMDetails.swift │ │ │ │ ├── VMDetailsPrinter.swift │ │ │ │ ├── VMDisplayResolution.swift │ │ │ │ └── VMFactory.swift │ │ │ └── VNC │ │ │ ├── PassphraseGenerator.swift │ │ │ └── VNCService.swift │ │ └── tests │ │ ├── Mocks │ │ │ ├── MockVM.swift │ │ │ ├── MockVMVirtualizationService.swift │ │ │ └── MockVNCService.swift │ │ ├── VM │ │ │ └── VMDetailsPrinterTests.swift │ │ ├── VMTests.swift │ │ ├── VMVirtualizationServiceTests.swift │ │ └── VNCServiceTests.swift │ ├── lumier │ │ ├── .dockerignore │ │ ├── Dockerfile │ │ ├── README.md │ │ └── src │ │ ├── bin │ │ │ └── entry.sh │ │ ├── config │ │ │ └── constants.sh │ │ ├── hooks │ │ │ └── on-logon.sh │ │ └── lib │ │ ├── utils.sh │ │ └── vm.sh │ ├── python │ │ ├── agent │ │ │ ├── .bumpversion.cfg │ │ │ ├── agent │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── adapters │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── huggingfacelocal_adapter.py │ │ │ │ │ ├── human_adapter.py │ │ │ │ │ ├── mlxvlm_adapter.py │ │ │ │ │ └── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── qwen2_5_vl.py │ │ │ │ ├── agent.py │ │ │ │ ├── callbacks │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── budget_manager.py │ │ │ │ │ ├── image_retention.py │ │ │ │ │ ├── logging.py │ │ │ │ │ ├── operator_validator.py │ │ │ │ │ ├── pii_anonymization.py │ │ │ │ │ ├── prompt_instructions.py │ │ │ │ │ ├── telemetry.py │ │ │ │ │ └── trajectory_saver.py │ │ │ │ ├── cli.py │ │ │ │ ├── computers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cua.py │ │ │ │ │ └── custom.py │ │ │ │ ├── decorators.py │ │ │ │ ├── human_tool │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ ├── server.py │ │ │ │ │ └── ui.py │ │ │ │ ├── integrations │ │ │ │ │ └── hud │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── agent.py │ │ │ │ │ └── proxy.py │ │ │ │ ├── loops │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── anthropic.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── composed_grounded.py │ │ │ │ │ ├── gemini.py │ │ │ │ │ ├── glm45v.py │ │ │ │ │ ├── gta1.py │ │ │ │ │ ├── holo.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── model_types.csv │ │ │ │ │ ├── moondream3.py │ │ │ │ │ ├── omniparser.py │ │ │ │ │ ├── openai.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── uitars.py │ │ │ │ ├── proxy │ │ │ │ │ ├── examples.py │ │ │ │ │ └── handlers.py │ │ │ │ ├── responses.py │ │ │ │ ├── types.py │ │ │ │ └── ui │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ └── gradio │ │ │ │ ├── __init__.py │ │ │ │ ├── app.py │ │ │ │ └── ui_components.py │ │ │ ├── benchmarks │ │ │ │ ├── .gitignore │ │ │ │ ├── contrib.md │ │ │ │ ├── interactive.py │ │ │ │ ├── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ └── gta1.py │ │ │ │ ├── README.md │ │ │ │ ├── ss-pro.py │ │ │ │ ├── ss-v2.py │ │ │ │ └── utils.py │ │ │ ├── example.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer │ │ │ │ ├── __init__.py │ │ │ │ ├── computer.py │ │ │ │ ├── diorama_computer.py │ │ │ │ ├── helpers.py │ │ │ │ ├── interface │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ ├── models.py │ │ │ │ │ └── windows.py │ │ │ │ ├── logger.py │ │ │ │ ├── models.py │ │ │ │ ├── providers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cloud │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── docker │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── lume │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── lume_api.py │ │ │ │ │ ├── lumier │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── types.py │ │ │ │ │ └── winsandbox │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── provider.py │ │ │ │ │ └── setup_script.ps1 │ │ │ │ ├── ui │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ └── gradio │ │ │ │ │ ├── __init__.py │ │ │ │ │ └── app.py │ │ │ │ └── utils.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── cli.py │ │ │ │ ├── diorama │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── diorama_computer.py │ │ │ │ │ ├── diorama.py │ │ │ │ │ ├── draw.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── safezone.py │ │ │ │ ├── handlers │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── windows.py │ │ │ │ ├── main.py │ │ │ │ ├── server.py │ │ │ │ └── watchdog.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── usage_example.py │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ ├── run_server.py │ │ │ └── test_connection.py │ │ ├── core │ │ │ ├── .bumpversion.cfg │ │ │ ├── core │ │ │ │ ├── __init__.py │ │ │ │ └── telemetry │ │ │ │ ├── __init__.py │ │ │ │ └── posthog.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── mcp-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── CONCURRENT_SESSIONS.md │ │ │ ├── mcp_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── server.py │ │ │ │ └── session_manager.py │ │ │ ├── pdm.lock │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ └── scripts │ │ │ ├── install_mcp_server.sh │ │ │ └── start_mcp_server.sh │ │ ├── pylume │ │ │ ├── __init__.py │ │ │ ├── .bumpversion.cfg │ │ │ ├── pylume │ │ │ │ ├── __init__.py │ │ │ │ ├── client.py │ │ │ │ ├── exceptions.py │ │ │ │ ├── lume │ │ │ │ ├── models.py │ │ │ │ ├── pylume.py │ │ │ │ └── server.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ └── som │ │ ├── .bumpversion.cfg │ │ ├── LICENSE │ │ ├── poetry.toml │ │ ├── pyproject.toml │ │ ├── README.md │ │ ├── som │ │ │ ├── __init__.py │ │ │ ├── detect.py │ │ │ ├── detection.py │ │ │ ├── models.py │ │ │ ├── ocr.py │ │ │ ├── util │ │ │ │ └── utils.py │ │ │ └── visualization.py │ │ └── tests │ │ └── test_omniparser.py │ ├── typescript │ │ ├── .gitignore │ │ ├── .nvmrc │ │ ├── agent │ │ │ ├── examples │ │ │ │ ├── playground-example.html │ │ │ │ └── README.md │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── client.ts │ │ │ │ ├── index.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ └── client.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── biome.json │ │ ├── computer │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── computer │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── providers │ │ │ │ │ │ ├── base.ts │ │ │ │ │ │ ├── cloud.ts │ │ │ │ │ │ └── index.ts │ │ │ │ │ └── types.ts │ │ │ │ ├── index.ts │ │ │ │ ├── interface │ │ │ │ │ ├── base.ts │ │ │ │ │ ├── factory.ts │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── linux.ts │ │ │ │ │ ├── macos.ts │ │ │ │ │ └── windows.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ ├── computer │ │ │ │ │ └── cloud.test.ts │ │ │ │ ├── interface │ │ │ │ │ ├── factory.test.ts │ │ │ │ │ ├── index.test.ts │ │ │ │ │ ├── linux.test.ts │ │ │ │ │ ├── macos.test.ts │ │ │ │ │ └── windows.test.ts │ │ │ │ └── setup.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── core │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── index.ts │ │ │ │ └── telemetry │ │ │ │ ├── clients │ │ │ │ │ ├── index.ts │ │ │ │ │ └── posthog.ts │ │ │ │ └── index.ts │ │ │ ├── tests │ │ │ │ └── telemetry.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── pnpm-workspace.yaml │ │ └── README.md │ └── xfce │ ├── .dockerignore │ ├── .gitignore │ ├── Dockerfile │ ├── README.md │ └── src │ ├── scripts │ │ ├── resize-display.sh │ │ ├── start-computer-server.sh │ │ ├── start-novnc.sh │ │ ├── start-vnc.sh │ │ └── xstartup.sh │ ├── supervisor │ │ └── supervisord.conf │ └── xfce-config │ ├── helpers.rc │ ├── xfce4-power-manager.xml │ └── xfce4-session.xml ├── LICENSE.md ├── Makefile ├── notebooks │ ├── agent_nb.ipynb │ ├── blog │ │ ├── build-your-own-operator-on-macos-1.ipynb │ │ └── build-your-own-operator-on-macos-2.ipynb │ ├── composite_agents_docker_nb.ipynb │ ├── computer_nb.ipynb │ ├── computer_server_nb.ipynb │ ├── customizing_computeragent.ipynb │ ├── eval_osworld.ipynb │ ├── ollama_nb.ipynb │ ├── pylume_nb.ipynb │ ├── README.md │ ├── sota_hackathon_cloud.ipynb │ └── sota_hackathon.ipynb ├── pdm.lock ├── pyproject.toml ├── pyrightconfig.json ├── README.md ├── samples │ └── community │ ├── global-online │ │ └── README.md │ └── hack-the-north │ └── README.md ├── scripts │ ├── build-uv.sh │ ├── build.ps1 │ ├── build.sh │ ├── cleanup.sh │ ├── playground-docker.sh │ ├── playground.sh │ └── run-docker-dev.sh └── tests ├── pytest.ini ├── shell_cmd.py ├── test_files.py ├── test_mcp_server_session_management.py ├── test_mcp_server_streaming.py ├── test_shell_bash.py ├── test_telemetry.py ├── test_venv.py └── test_watchdog.py ``` # Files -------------------------------------------------------------------------------- /.github/workflows/pypi-publish-mcp-server.yml: -------------------------------------------------------------------------------- ```yaml name: Publish MCP Server Package on: push: tags: - "mcp-server-v*" workflow_dispatch: inputs: version: description: "Version to publish (without v prefix)" required: true default: "0.1.0" workflow_call: inputs: version: description: "Version to publish" required: true type: string outputs: version: description: "The version that was published" value: ${{ jobs.prepare.outputs.version }} # Adding permissions at workflow level permissions: contents: write jobs: prepare: runs-on: macos-latest outputs: version: ${{ steps.get-version.outputs.version }} agent_version: ${{ steps.update-deps.outputs.agent_version }} computer_version: ${{ steps.update-deps.outputs.computer_version }} steps: - uses: actions/checkout@v4 - name: Determine version id: get-version run: | if [ "${{ github.event_name }}" == "push" ]; then # Extract version from tag (for package-specific tags) if [[ "${{ github.ref }}" =~ ^refs/tags/mcp-server-v([0-9]+\.[0-9]+\.[0-9]+) ]]; then VERSION=${BASH_REMATCH[1]} else echo "Invalid tag format for mcp-server" exit 1 fi elif [ "${{ github.event_name }}" == "workflow_dispatch" ]; then # Use version from workflow dispatch VERSION=${{ github.event.inputs.version }} else # Use version from workflow_call VERSION=${{ inputs.version }} fi echo "VERSION=$VERSION" echo "version=$VERSION" >> $GITHUB_OUTPUT - name: Set up Python uses: actions/setup-python@v4 with: python-version: "3.11" - name: Update dependencies to latest versions id: update-deps run: | cd libs/python/mcp-server # Install required package for PyPI API access pip install requests # Create a Python script for PyPI version checking cat > get_latest_versions.py << 'EOF' import requests import json import sys def get_package_version(package_name, fallback="0.1.0"): try: response = requests.get(f'https://pypi.org/pypi/{package_name}/json') print(f"API Response Status for {package_name}: {response.status_code}", file=sys.stderr) if response.status_code != 200: print(f"API request failed for {package_name}, using fallback version", file=sys.stderr) return fallback data = json.loads(response.text) if 'info' not in data: print(f"Missing 'info' key in API response for {package_name}, using fallback version", file=sys.stderr) return fallback return data['info']['version'] except Exception as e: print(f"Error fetching version for {package_name}: {str(e)}", file=sys.stderr) return fallback # Get latest versions print(get_package_version('cua-agent')) print(get_package_version('cua-computer')) EOF # Execute the script to get the versions VERSIONS=($(python get_latest_versions.py)) LATEST_AGENT=${VERSIONS[0]} LATEST_COMPUTER=${VERSIONS[1]} echo "Latest cua-agent version: $LATEST_AGENT" echo "Latest cua-computer version: $LATEST_COMPUTER" # Output the versions for the next job echo "agent_version=$LATEST_AGENT" >> $GITHUB_OUTPUT echo "computer_version=$LATEST_COMPUTER" >> $GITHUB_OUTPUT # Determine major version for version constraint AGENT_MAJOR=$(echo $LATEST_AGENT | cut -d. -f1) COMPUTER_MAJOR=$(echo $LATEST_COMPUTER | cut -d. -f1) NEXT_AGENT_MAJOR=$((AGENT_MAJOR + 1)) NEXT_COMPUTER_MAJOR=$((COMPUTER_MAJOR + 1)) # Update dependencies in pyproject.toml if [[ "$OSTYPE" == "darwin"* ]]; then # macOS version of sed needs an empty string for -i # Update cua-agent with all extras sed -i '' "s/\"cua-agent\[all\]>=.*,<.*\"/\"cua-agent[all]>=$LATEST_AGENT,<$NEXT_AGENT_MAJOR.0.0\"/" pyproject.toml sed -i '' "s/\"cua-computer>=.*,<.*\"/\"cua-computer>=$LATEST_COMPUTER,<$NEXT_COMPUTER_MAJOR.0.0\"/" pyproject.toml else # Linux version sed -i "s/\"cua-agent\[all\]>=.*,<.*\"/\"cua-agent[all]>=$LATEST_AGENT,<$NEXT_AGENT_MAJOR.0.0\"/" pyproject.toml sed -i "s/\"cua-computer>=.*,<.*\"/\"cua-computer>=$LATEST_COMPUTER,<$NEXT_COMPUTER_MAJOR.0.0\"/" pyproject.toml fi # Display the updated dependencies echo "Updated dependencies in pyproject.toml:" grep -E "cua-agent|cua-computer" pyproject.toml publish: needs: prepare uses: ./.github/workflows/pypi-reusable-publish.yml with: package_name: "mcp-server" package_dir: "libs/python/mcp-server" version: ${{ needs.prepare.outputs.version }} is_lume_package: false base_package_name: "cua-mcp-server" secrets: PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }} set-env-variables: needs: [prepare, publish] runs-on: macos-latest steps: - name: Set environment variables for use in other jobs run: | echo "AGENT_VERSION=${{ needs.prepare.outputs.agent_version }}" >> $GITHUB_ENV echo "COMPUTER_VERSION=${{ needs.prepare.outputs.computer_version }}" >> $GITHUB_ENV ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/loops/gta1.py: -------------------------------------------------------------------------------- ```python """ GTA1 agent loop implementation for click prediction using litellm.acompletion Paper: https://arxiv.org/pdf/2507.05791 Code: https://github.com/Yan98/GTA1 """ import asyncio import json import re import base64 from typing import Dict, List, Any, AsyncGenerator, Union, Optional, Tuple from io import BytesIO import uuid from PIL import Image import litellm import math from ..decorators import register_agent from ..types import Messages, AgentResponse, Tools, AgentCapability from ..loops.base import AsyncAgentConfig SYSTEM_PROMPT = ''' You are an expert UI element locator. Given a GUI image and a user's element description, provide the coordinates of the specified element as a single (x,y) point. The image resolution is height {height} and width {width}. For elements with area, return the center point. Output the coordinate pair exactly: (x,y) '''.strip() def extract_coordinates(raw_string: str) -> Tuple[float, float]: """Extract coordinates from model output.""" try: matches = re.findall(r"\((-?\d*\.?\d+),\s*(-?\d*\.?\d+)\)", raw_string) return tuple(map(float, matches[0])) # type: ignore except: return (0.0, 0.0) def smart_resize(height: int, width: int, factor: int = 28, min_pixels: int = 3136, max_pixels: int = 8847360) -> Tuple[int, int]: """Smart resize function similar to qwen_vl_utils.""" # Calculate the total pixels total_pixels = height * width # If already within bounds, return original dimensions if min_pixels <= total_pixels <= max_pixels: # Round to nearest factor new_height = (height // factor) * factor new_width = (width // factor) * factor return new_height, new_width # Calculate scaling factor if total_pixels > max_pixels: scale = (max_pixels / total_pixels) ** 0.5 else: scale = (min_pixels / total_pixels) ** 0.5 # Apply scaling new_height = int(height * scale) new_width = int(width * scale) # Round to nearest factor new_height = (new_height // factor) * factor new_width = (new_width // factor) * factor # Ensure minimum size new_height = max(new_height, factor) new_width = max(new_width, factor) return new_height, new_width @register_agent(models=r".*GTA1.*") class GTA1Config(AsyncAgentConfig): """GTA1 agent configuration implementing AsyncAgentConfig protocol for click prediction.""" def __init__(self): self.current_model = None self.last_screenshot_b64 = None async def predict_step( self, messages: List[Dict[str, Any]], model: str, tools: Optional[List[Dict[str, Any]]] = None, max_retries: Optional[int] = None, stream: bool = False, computer_handler=None, _on_api_start=None, _on_api_end=None, _on_usage=None, _on_screenshot=None, **kwargs ) -> Dict[str, Any]: raise NotImplementedError() async def predict_click( self, model: str, image_b64: str, instruction: str, **kwargs ) -> Optional[Tuple[float, float]]: """ Predict click coordinates using GTA1 model via litellm.acompletion. Args: model: The GTA1 model name image_b64: Base64 encoded image instruction: Instruction for where to click Returns: Tuple of (x, y) coordinates or None if prediction fails """ # Decode base64 image image_data = base64.b64decode(image_b64) image = Image.open(BytesIO(image_data)) width, height = image.width, image.height # Smart resize the image (similar to qwen_vl_utils) resized_height, resized_width = smart_resize( height, width, factor=28, # Default factor for Qwen models min_pixels=3136, max_pixels=4096 * 2160 ) resized_image = image.resize((resized_width, resized_height)) scale_x, scale_y = width / resized_width, height / resized_height # Convert resized image back to base64 buffered = BytesIO() resized_image.save(buffered, format="PNG") resized_image_b64 = base64.b64encode(buffered.getvalue()).decode() # Prepare system and user messages system_message = { "role": "system", "content": SYSTEM_PROMPT.format(height=resized_height, width=resized_width) } user_message = { "role": "user", "content": [ { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{resized_image_b64}" } }, { "type": "text", "text": instruction } ] } # Prepare API call kwargs api_kwargs = { "model": model, "messages": [system_message, user_message], "max_tokens": 2056, "temperature": 0.0, **kwargs } # Use liteLLM acompletion response = await litellm.acompletion(**api_kwargs) # Extract response text output_text = response.choices[0].message.content # type: ignore # Extract and rescale coordinates pred_x, pred_y = extract_coordinates(output_text) # type: ignore pred_x *= scale_x pred_y *= scale_y return (math.floor(pred_x), math.floor(pred_y)) def get_capabilities(self) -> List[AgentCapability]: """Return the capabilities supported by this agent.""" return ["click"] ``` -------------------------------------------------------------------------------- /libs/python/agent/benchmarks/models/gta1.py: -------------------------------------------------------------------------------- ```python """ GTA1 model implementation for benchmarking. """ from typing import Optional, Tuple from PIL import Image import torch import re import gc from qwen_vl_utils import process_vision_info, smart_resize from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor from .base import ModelProtocol class GTA1Model: """Ground truth GTA1 model implementation.""" def __init__(self, model_path: str = "HelloKKMe/GTA1-7B"): self.model_path = model_path self.model = None self.processor = None self.max_new_tokens = 32 self.system_prompt = ''' You are an expert UI element locator. Given a GUI image and a user's element description, provide the coordinates of the specified element as a single (x,y) point. The image resolution is height {height} and width {width}. For elements with area, return the center point. Output the coordinate pair exactly: (x,y) '''.strip() @property def model_name(self) -> str: """Return the name of the model.""" return f"GTA1-{self.model_path.split('/')[-1]}" async def load_model(self) -> None: """Load the model into memory.""" if self.model is None: print(f"Loading GTA1 model: {self.model_path}") self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained( self.model_path, torch_dtype=torch.bfloat16, device_map="auto" ) self.processor = AutoProcessor.from_pretrained( self.model_path, min_pixels=3136, max_pixels=4096 * 2160 ) print("GTA1 model loaded successfully") async def unload_model(self) -> None: """Unload the model from memory.""" if self.model is not None: print("Unloading GTA1 model from GPU...") del self.model del self.processor self.model = None self.processor = None gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() print("GTA1 model unloaded") def _extract_coordinates(self, raw_string: str) -> Tuple[int, int]: """Extract coordinates from model output.""" try: matches = re.findall(r"\((-?\d*\.?\d+),\s*(-?\d*\.?\d+)\)", raw_string) return tuple(map(int, map(float, matches[0]))) # type: ignore except: return (0, 0) async def predict_click(self, image: Image.Image, instruction: str) -> Optional[Tuple[int, int]]: """ Predict click coordinates for the given image and instruction. Args: image: PIL Image to analyze instruction: Text instruction describing what to click Returns: Tuple of (x, y) coordinates or None if prediction fails """ if self.model is None or self.processor is None: await self.load_model() assert self.processor is not None assert self.model is not None try: width, height = image.width, image.height # Resize image according to processor requirements resized_height, resized_width = smart_resize( image.height, image.width, factor=self.processor.image_processor.patch_size * self.processor.image_processor.merge_size, min_pixels=self.processor.image_processor.min_pixels, max_pixels=self.processor.image_processor.max_pixels, ) resized_image = image.resize((resized_width, resized_height)) scale_x, scale_y = width / resized_width, height / resized_height # Prepare messages system_message = { "role": "system", "content": self.system_prompt.format(height=resized_height, width=resized_width) } user_message = { "role": "user", "content": [ {"type": "image", "image": resized_image}, {"type": "text", "text": instruction} ] } # Process inputs image_inputs, video_inputs = process_vision_info([system_message, user_message]) # type: ignore text = self.processor.apply_chat_template( [system_message, user_message], tokenize=False, add_generation_prompt=True ) inputs = self.processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt" ) inputs = inputs.to(self.model.device) # Generate prediction output_ids = self.model.generate( **inputs, max_new_tokens=self.max_new_tokens, do_sample=False, temperature=1.0, use_cache=True ) generated_ids = [ output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, output_ids) ] output_text = self.processor.batch_decode( generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True )[0] # Extract and rescale coordinates pred_x, pred_y = self._extract_coordinates(output_text) pred_x = int(pred_x * scale_x) pred_y = int(pred_y * scale_y) return (pred_x, pred_y) except Exception as e: print(f"Error in GTA1 prediction: {e}") return None ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/integrations/hud/__init__.py: -------------------------------------------------------------------------------- ```python """HUD integration: dataset runners and MCP-based computer agent export. This module exposes helpers to evaluate HUD-compatible datasets and exports the MCP-compatible computer agent implementation. Exports: - run_single_task(dataset, ...) - run_full_dataset(dataset, ...) - MCPComputerAgent """ import time from typing import Any, Optional from agent.computers import is_agent_computer from datasets import load_dataset, Dataset from hud.datasets import Task, run_dataset from hud import trace from .agent import MCPComputerAgent # --------------------------------------------------------------------------- # Single-task runner # --------------------------------------------------------------------------- async def run_single_task( dataset: str | Dataset | list[dict[str, Any]], *, task_id: int = 0, model: str | None = None, allowed_tools: list[str] | None = None, # === ComputerAgent kwargs === tools: list[Any] | None = None, custom_loop: Any | None = None, only_n_most_recent_images: int | None = None, callbacks: list[Any] | None = None, instructions: str | None = None, verbosity: int | None = None, trajectory_dir: str | dict | None = None, max_retries: int | None = 3, screenshot_delay: float | int = 0.5, use_prompt_caching: bool | None = False, max_trajectory_budget: float | dict | None = None, telemetry_enabled: bool | None = True, ) -> None: """Load one task from the dataset and execute it with MCPComputerAgent.""" # Load dataset and pick a sample if isinstance(dataset, str): dataset = load_dataset(dataset, split="train") # type: ignore[arg-type] elif isinstance(dataset, list): dataset = dataset else: dataset = dataset["train"] sample_task = dataset[task_id] # type: ignore[index] task_prompt = sample_task.get("prompt", f"Task {sample_task.get('id', 0)}") # type: ignore[attr-defined] # Filter any existing Computer tools # The eval framework will add its own Computer tool per task if tools: tools = [ tool for tool in tools if not is_agent_computer(tool) ] with trace(name=task_prompt): task = Task(**sample_task) # type: ignore[arg-type] agent = MCPComputerAgent( model=model or "computer-use-preview", allowed_tools=allowed_tools or ["openai_computer"], # === ComputerAgent kwargs passthrough === tools=tools, custom_loop=custom_loop, only_n_most_recent_images=only_n_most_recent_images, callbacks=callbacks, instructions=instructions, verbosity=verbosity, trajectory_dir=trajectory_dir, max_retries=max_retries, screenshot_delay=screenshot_delay, use_prompt_caching=use_prompt_caching, max_trajectory_budget=max_trajectory_budget, telemetry_enabled=telemetry_enabled, ) print(f"Running: {task_prompt}") result = await agent.run(task, max_steps=10) print(f"✅ Reward: {getattr(result, 'reward')}") # --------------------------------------------------------------------------- # Full-dataset runner # --------------------------------------------------------------------------- async def run_full_dataset( dataset: str | Dataset | list[dict[str, Any]], *, job_name: Optional[str] = None, model: str | None = None, allowed_tools: list[str] | None = None, max_concurrent: int = 30, max_steps: int = 50, split: str = "train", trajectory_dir: str | dict | None = None, # === ComputerAgent kwargs === tools: list[Any] | None = None, custom_loop: Any | None = None, only_n_most_recent_images: int | None = 5, callbacks: list[Any] | None = None, instructions: str | None = None, verbosity: int | None = None, max_retries: int | None = 3, screenshot_delay: float | int = 0.5, use_prompt_caching: bool | None = False, max_trajectory_budget: float | dict | None = None, telemetry_enabled: bool | None = True, ) -> list[Any]: """Run evaluation across the entire dataset using hud.datasets.run_dataset.""" # Run with our MCP-based agent class. if isinstance(dataset, str): dataset_name = dataset.split('/')[-1] job_name = job_name or f"Evaluation {dataset_name}" dataset = load_dataset(dataset, split=split) # type: ignore[arg-type] else: dataset_name = "custom" job_name = job_name or f"Evaluation {time.strftime('%H:%M %Y-%m-%d')}" # Filter any existing Computer tools # The eval framework will add its own Computer tool per task if tools: tools = [ tool for tool in tools if not is_agent_computer(tool) ] # Execute evaluation return await run_dataset( name=job_name, dataset=dataset, agent_class=MCPComputerAgent, agent_config={ "model": model, "allowed_tools": allowed_tools, "trajectory_dir": trajectory_dir, # === ComputerAgent kwargs passthrough === "tools": tools, "custom_loop": custom_loop, "only_n_most_recent_images": only_n_most_recent_images, "callbacks": callbacks, "instructions": instructions, "verbosity": verbosity, "max_retries": max_retries, "screenshot_delay": screenshot_delay, "use_prompt_caching": use_prompt_caching, "max_trajectory_budget": max_trajectory_budget, "telemetry_enabled": telemetry_enabled, }, max_concurrent=max_concurrent, metadata={"dataset": dataset_name}, max_steps=max_steps, auto_respond=True, ) __all__ = [ "run_single_task", "run_full_dataset", "MCPComputerAgent", ] ``` -------------------------------------------------------------------------------- /libs/lume/tests/VMTests.swift: -------------------------------------------------------------------------------- ```swift import Foundation import Testing @testable import lume class MockProcessRunner: ProcessRunner { var runCalls: [(executable: String, arguments: [String])] = [] func run(executable: String, arguments: [String]) throws { runCalls.append((executable, arguments)) } } private func setupVMDirectory(_ tempDir: URL) throws -> VMDirectory { let vmDir = VMDirectory(Path(tempDir.path)) // Create disk image file let diskPath = vmDir.diskPath let diskData = Data(repeating: 0, count: 1024 * 1024) // 1MB mock disk try diskData.write(to: diskPath.url) // Create nvram file let nvramPath = vmDir.nvramPath let nvramData = Data(repeating: 0, count: 1024) // 1KB mock nvram try nvramData.write(to: nvramPath.url) // Create initial config file var config = try VMConfig( os: "mock-os", cpuCount: 1, memorySize: 1024, diskSize: 1024, display: "1024x768" ) config.setMacAddress("00:11:22:33:44:55") try vmDir.saveConfig(config) // Create .initialized file to mark VM as initialized let initializedPath = vmDir.dir.file(".initialized") try Data().write(to: initializedPath.url) return vmDir } @MainActor @Test("VM initialization and configuration") func testVMInitialization() async throws { let tempDir = try createTempDirectory() let vmDir = try setupVMDirectory(tempDir) var config = try VMConfig( os: "mock-os", cpuCount: 1, memorySize: 1024, diskSize: 1024, display: "1024x768" ) config.setMacAddress("00:11:22:33:44:55") // Set MAC address to avoid nil let home = Home(fileManager: FileManager.default) let context = VMDirContext(dir: vmDir, config: config, home: home, storage: nil) let vm = MockVM( vmDirContext: context, virtualizationServiceFactory: { _ in MockVMVirtualizationService() }, vncServiceFactory: { MockVNCService(vmDirectory: $0) } ) // Test initial state let details = vm.details #expect(details.name == vmDir.name) #expect(details.os == "mock-os") #expect(details.status == "stopped") #expect(details.vncUrl == nil) } @MainActor @Test("VM run and stop operations") func testVMRunAndStop() async throws { let tempDir = try createTempDirectory() let vmDir = try setupVMDirectory(tempDir) var config = try VMConfig( os: "mock-os", cpuCount: 2, memorySize: 2048, diskSize: 1024, display: "1024x768" ) config.setMacAddress("00:11:22:33:44:55") let home = Home(fileManager: FileManager.default) let context = VMDirContext(dir: vmDir, config: config, home: home, storage: nil) let vm = MockVM( vmDirContext: context, virtualizationServiceFactory: { _ in MockVMVirtualizationService() }, vncServiceFactory: { MockVNCService(vmDirectory: $0) } ) // Test running VM let runTask = Task { try await vm.run( noDisplay: false, sharedDirectories: [], mount: nil as Path?, vncPort: 0, recoveryMode: false) } // Give the VM time to start try await Task.sleep(nanoseconds: UInt64(1e9)) // Test stopping VM try await vm.stop() runTask.cancel() } @MainActor @Test("VM configuration updates") func testVMConfigurationUpdates() async throws { let tempDir = try createTempDirectory() let vmDir = try setupVMDirectory(tempDir) var config = try VMConfig( os: "mock-os", cpuCount: 1, memorySize: 1024, diskSize: 1024, display: "1024x768" ) config.setMacAddress("00:11:22:33:44:55") let home = Home(fileManager: FileManager.default) let context = VMDirContext(dir: vmDir, config: config, home: home, storage: nil) let vm = MockVM( vmDirContext: context, virtualizationServiceFactory: { _ in MockVMVirtualizationService() }, vncServiceFactory: { MockVNCService(vmDirectory: $0) } ) // Test CPU count update try vm.setCpuCount(4) #expect(vm.vmDirContext.config.cpuCount == 4) // Test memory size update try vm.setMemorySize(4096) #expect(vm.vmDirContext.config.memorySize == 4096) // Test MAC address update try vm.setMacAddress("00:11:22:33:44:66") #expect(vm.vmDirContext.config.macAddress == "00:11:22:33:44:66") } @MainActor @Test("VM setup process") func testVMSetup() async throws { let tempDir = try createTempDirectory() let vmDir = try setupVMDirectory(tempDir) var config = try VMConfig( os: "mock-os", cpuCount: 1, memorySize: 1024, diskSize: 1024, display: "1024x768" ) config.setMacAddress("00:11:22:33:44:55") let home = Home(fileManager: FileManager.default) let context = VMDirContext(dir: vmDir, config: config, home: home, storage: nil) let vm = MockVM( vmDirContext: context, virtualizationServiceFactory: { _ in MockVMVirtualizationService() }, vncServiceFactory: { MockVNCService(vmDirectory: $0) } ) let expectedDiskSize: UInt64 = 64 * 1024 * 1024 * 1024 // 64 GB try await vm.setup( ipswPath: "/path/to/mock.ipsw", cpuCount: 2, memorySize: 2048, diskSize: expectedDiskSize, display: "1024x768" ) #expect(vm.vmDirContext.config.cpuCount == 2) #expect(vm.vmDirContext.config.memorySize == 2048) let actualDiskSize = vm.vmDirContext.config.diskSize ?? 0 #expect( actualDiskSize == expectedDiskSize, "Expected disk size \(expectedDiskSize), but got \(actualDiskSize)") #expect(vm.vmDirContext.config.macAddress == "00:11:22:33:44:55") } private func createTempDirectory() throws -> URL { let tempDir = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString) try FileManager.default.createDirectory(at: tempDir, withIntermediateDirectories: true) return tempDir } ``` -------------------------------------------------------------------------------- /docs/content/docs/libraries/lume/cli-reference.mdx: -------------------------------------------------------------------------------- ```markdown --- title: Lume CLI Reference description: Command Line Interface reference for Lume --- import { Callout } from 'fumadocs-ui/components/callout'; Once installed, you can start using Lume with these common workflows: ### Run a Prebuilt VM ```bash # Run a macOS Sequoia VM lume run macos-sequoia-vanilla:latest # Run an Ubuntu VM lume run ubuntu-noble-vanilla:latest ``` <Callout> We provide [prebuilt VM images](../lume/prebuilt-images) in our [ghcr registry](https://github.com/orgs/trycua/packages). </Callout> ### Create a Custom VM ```bash # Create a new macOS VM lume create my-macos-vm --cpu 4 --memory 8GB --disk-size 50GB # Create a Linux VM lume create my-linux-vm --os linux --cpu 2 --memory 4GB ``` <Callout title="Disk Space"> The actual disk space used by sparse images will be much lower than the logical size listed. You can resize VM disks after creation using `lume set <name> --disk-size <size>`. </Callout> ## VM Management lume create <name> Create a new macOS or Linux virtual machine. **Options:** - `--os <os>` - Operating system to install (macOS or linux, default: macOS) - `--cpu <cores>` - Number of CPU cores (default: 4) - `--memory <size>` - Memory size, e.g., 8GB (default: 4GB) - `--disk-size <size>` - Disk size, e.g., 50GB (default: 40GB) - `--display <res>` - Display resolution (default: 1024x768) - `--ipsw <path>` - Path to IPSW file or 'latest' for macOS VMs - `--storage <name>` - VM storage location to use **Examples:** ```bash # Create macOS VM with custom specs lume create my-mac --cpu 6 --memory 16GB --disk-size 100GB # Create Linux VM lume create my-ubuntu --os linux --cpu 2 --memory 8GB # Create macOS VM with latest IPSW lume create my-sequoia --ipsw latest ``` lume run <name> Start and run a virtual machine. **Options:** - `--no-display` - Do not start the VNC client app - `--shared-dir <dir>` - Share directory with VM (format: path[:ro|rw]) - `--mount <path>` - For Linux VMs only, attach a read-only disk image - `--registry <url>` - Container registry URL (default: ghcr.io) - `--organization <org>` - Organization to pull from (default: trycua) - `--vnc-port <port>` - Port to use for the VNC server (default: 0 for auto-assign) - `--recovery-mode <boolean>` - For macOS VMs only, start VM in recovery mode (default: false) - `--storage <name>` - VM storage location to use **Examples:** ```bash # Run VM with shared directory lume run my-vm --shared-dir /path/to/share:rw # Run VM without display (headless) lume run my-vm --no-display # Run macOS VM in recovery mode lume run my-mac --recovery-mode true ``` lume stop <name> Stop a running virtual machine. **Options:** - `--storage <name>` - VM storage location to use ### lume delete <name> Delete a virtual machine and its associated files. **Options:** - `--force` - Force deletion without confirmation - `--storage <name>` - VM storage location to use ### lume clone <name> <new-name> Create a copy of an existing virtual machine. **Options:** - `--source-storage <name>` - Source VM storage location - `--dest-storage <name>` - Destination VM storage location ## VM Information and Configuration ### lume ls List all virtual machines and their status. ### lume get <name> Get detailed information about a specific virtual machine. **Options:** - `-f, --format <format>` - Output format (json|text) - `--storage <name>` - VM storage location to use ### lume set <name> Modify virtual machine configuration. **Options:** - `--cpu <cores>` - New number of CPU cores (e.g., 4) - `--memory <size>` - New memory size (e.g., 8192MB or 8GB) - `--disk-size <size>` - New disk size (e.g., 40960MB or 40GB) - `--display <res>` - New display resolution in format WIDTHxHEIGHT (e.g., 1024x768) - `--storage <name>` - VM storage location to use **Examples:** ```bash # Increase VM memory lume set my-vm --memory 16GB # Change display resolution lume set my-vm --display 1920x1080 # Add more CPU cores lume set my-vm --cpu 8 ``` ## Image Management ### lume images List available macOS images in local cache. ### lume pull <image> Download a VM image from a container registry. **Options:** - `--registry <url>` - Container registry URL (default: ghcr.io) - `--organization <org>` - Organization to pull from (default: trycua) - `--storage <name>` - VM storage location to use ### lume push <name> <image:tag> Upload a VM image to a container registry. **Options:** - `--additional-tags <tags...>` - Additional tags to push the same image to - `--registry <url>` - Container registry URL (default: ghcr.io) - `--organization <org>` - Organization/user to push to (default: trycua) - `--storage <name>` - VM storage location to use - `--chunk-size-mb <size>` - Chunk size for disk image upload in MB (default: 512) - `--verbose` - Enable verbose logging - `--dry-run` - Prepare files and show plan without uploading - `--reassemble` - Verify integrity by reassembling chunks (requires --dry-run) ### lume ipsw Get the latest macOS restore image URL. ### lume prune Remove cached images to free up disk space. ## Configuration ### lume config Manage Lume configuration settings. **Subcommands:** ##### Storage Management - `lume config storage add <name> <path>` - Add a new VM storage location - `lume config storage remove <name>` - Remove a VM storage location - `lume config storage list` - List all VM storage locations - `lume config storage default <name>` - Set the default VM storage location ##### Cache Management - `lume config cache get` - Get current cache directory - `lume config cache set <path>` - Set cache directory ##### Image Caching - `lume config caching get` - Show current caching status - `lume config caching set <boolean>` - Enable or disable image caching ## API Server ### lume serve Start the Lume API server for programmatic access. **Options:** - `--port <port>` - Port to listen on (default: 7777) ## Global Options These options are available for all commands: - `--help` - Show help information - `--version` - Show version number ``` -------------------------------------------------------------------------------- /docs/content/docs/agent-sdk/agent-loops.mdx: -------------------------------------------------------------------------------- ```markdown --- title: Agent Loops description: Supported computer-using agent loops and models --- <Callout>A corresponding <a href="https://github.com/trycua/cua/blob/main/notebooks/agent_nb.ipynb" target="_blank">Jupyter Notebook</a> is available for this documentation.</Callout> An agent can be thought of as a loop - it generates actions, executes them, and repeats until done: 1. **Generate**: Your `model` generates `output_text`, `computer_call`, `function_call` 2. **Execute**: The `computer` safely executes those items 3. **Complete**: If the model has no more calls, it's done! To run an agent loop simply do: ```python from agent import ComputerAgent import asyncio from computer import Computer async def take_screenshot(): async with Computer( os_type="linux", provider_type="cloud", name="your-sandbox-name", api_key="your-api-key" ) as computer: agent = ComputerAgent( model="anthropic/claude-3-5-sonnet-20241022", tools=[computer], max_trajectory_budget=5.0 ) messages = [{"role": "user", "content": "Take a screenshot and tell me what you see"}] async for result in agent.run(messages): for item in result["output"]: if item["type"] == "message": print(item["content"][0]["text"]) if __name__ == "__main__": asyncio.run(take_screenshot()) ``` For a list of supported models and configurations, see the [Supported Agents](./supported-agents/computer-use-agents) page. ### Response Format ```python { "output": [ { "type": "message", "role": "assistant", "content": [{"type": "output_text", "text": "I can see..."}] }, { "type": "computer_call", "action": {"type": "screenshot"}, "call_id": "call_123" }, { "type": "computer_call_output", "call_id": "call_123", "output": {"image_url": "data:image/png;base64,..."} } ], "usage": { "prompt_tokens": 150, "completion_tokens": 75, "total_tokens": 225, "response_cost": 0.01, } } ``` ### Environment Variables Use the following environment variables to configure the agent and its access to cloud computers and LLM providers: ```bash # Computer instance (cloud) export CUA_CONTAINER_NAME="your-container-name" export CUA_API_KEY="your-cua-api-key" # LLM API keys export ANTHROPIC_API_KEY="your-anthropic-key" export OPENAI_API_KEY="your-openai-key" ``` ### Input and output The input prompt passed to `Agent.run` can either be a string or a list of message dictionaries: ```python messages = [ { "role": "user", "content": "Take a screenshot and describe what you see" }, { "role": "assistant", "content": "I'll take a screenshot for you." } ] ``` The output is an AsyncGenerator that yields response chunks. ### Parameters The `ComputerAgent` constructor provides a wide range of options for customizing agent behavior, tool integration, callbacks, resource management, and more. - `model` (`str`): Default: **required** The LLM or agent model to use. Determines which agent loop is selected unless `custom_loop` is provided. (e.g., "claude-3-5-sonnet-20241022", "computer-use-preview", "omni+vertex_ai/gemini-pro") - `tools` (`List[Any]`): List of tools the agent can use (e.g., `Computer`, sandboxed Python functions, etc.). - `custom_loop` (`Callable`): Optional custom agent loop function. If provided, overrides automatic loop selection. - `only_n_most_recent_images` (`int`): If set, only the N most recent images are kept in the message history. Useful for limiting memory usage. Automatically adds `ImageRetentionCallback`. - `callbacks` (`List[Any]`): List of callback instances for advanced preprocessing, postprocessing, logging, or custom hooks. See [Callbacks & Extensibility](#callbacks--extensibility). - `verbosity` (`int`): Logging level (e.g., `logging.INFO`). If set, adds a logging callback. - `trajectory_dir` (`str`): Directory path to save full trajectory data, including screenshots and responses. Adds `TrajectorySaverCallback`. - `max_retries` (`int`): Default: `3` Maximum number of retries for failed API calls (default: 3). - `screenshot_delay` (`float` | `int`): Default: `0.5` Delay (in seconds) before taking screenshots (default: 0.5). - `use_prompt_caching` (`bool`): Default: `False` Enables prompt caching for repeated prompts (mainly for Anthropic models). - `max_trajectory_budget` (`float` | `dict`): If set (float or dict), adds a budget manager callback that tracks usage costs and stops execution if the budget is exceeded. Dict allows advanced options (e.g., `{ "max_budget": 5.0, "raise_error": True }`). - `**kwargs` (`any`): Any additional keyword arguments are passed through to the agent loop or model provider. **Example with advanced options:** ```python from agent import ComputerAgent from computer import Computer from agent.callbacks import ImageRetentionCallback agent = ComputerAgent( model="anthropic/claude-3-5-sonnet-20241022", tools=[Computer(...)], only_n_most_recent_images=3, callbacks=[ImageRetentionCallback(only_n_most_recent_images=3)], verbosity=logging.INFO, trajectory_dir="trajectories", max_retries=5, screenshot_delay=1.0, use_prompt_caching=True, max_trajectory_budget={"max_budget": 5.0, "raise_error": True} ) ``` ### Streaming Responses ```python async for result in agent.run(messages, stream=True): # Process streaming chunks for item in result["output"]: if item["type"] == "message": print(item["content"][0]["text"], end="", flush=True) elif item["type"] == "computer_call": action = item["action"] print(f"\n[Action: {action['type']}]") ``` ### Error Handling ```python try: async for result in agent.run(messages): # Process results pass except BudgetExceededException: print("Budget limit exceeded") except Exception as e: print(f"Agent error: {e}") ``` ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/proxy/examples.py: -------------------------------------------------------------------------------- ```python """ Example usage of the proxy server and client requests. """ import dotenv dotenv.load_dotenv() import asyncio import json import os import aiohttp from typing import Dict, Any async def test_http_endpoint(): """Test the HTTP /responses endpoint.""" anthropic_api_key = os.getenv("ANTHROPIC_API_KEY") assert isinstance(anthropic_api_key, str), "ANTHROPIC_API_KEY environment variable must be set" # Example 1: Simple text request simple_request = { "model": "anthropic/claude-3-5-sonnet-20241022", "input": "Tell me a three sentence bedtime story about a unicorn.", "env": { "ANTHROPIC_API_KEY": anthropic_api_key } } # Example 2: Multi-modal request with image multimodal_request = { "model": "anthropic/claude-3-5-sonnet-20241022", "input": [ { "role": "user", "content": [ {"type": "input_text", "text": "what is in this image?"}, { "type": "input_image", "image_url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" } ] } ], "env": { "ANTHROPIC_API_KEY": anthropic_api_key } } # Example 3: Request with custom agent and computer kwargs custom_request = { "model": "anthropic/claude-3-5-sonnet-20241022", "input": "Take a screenshot and tell me what you see", "env": { "ANTHROPIC_API_KEY": anthropic_api_key } } # Test requests base_url = "https://m-linux-96lcxd2c2k.containers.cloud.trycua.com:8443" # base_url = "http://localhost:8000" api_key = os.getenv("CUA_API_KEY") assert isinstance(api_key, str), "CUA_API_KEY environment variable must be set" async with aiohttp.ClientSession() as session: for i, request_data in enumerate([ simple_request, # multimodal_request, custom_request ], 1): print(f"\n--- Test {i} ---") print(f"Request: {json.dumps(request_data, indent=2)}") try: print(f"Sending request to {base_url}/responses") async with session.post( f"{base_url}/responses", json=request_data, headers={"Content-Type": "application/json", "X-API-Key": api_key} ) as response: result = await response.json() print(f"Status: {response.status}") print(f"Response: {json.dumps(result, indent=2)}") except Exception as e: print(f"Error: {e}") def curl_examples(): """Print curl command examples.""" print("=== CURL Examples ===\n") print("1. Simple text request:") print("""curl http://localhost:8000/responses \\ -H "Content-Type: application/json" \\ -d '{ "model": "anthropic/claude-3-5-sonnet-20241022", "input": "Tell me a three sentence bedtime story about a unicorn." }'""") print("\n2. Multi-modal request with image:") print("""curl http://localhost:8000/responses \\ -H "Content-Type: application/json" \\ -d '{ "model": "anthropic/claude-3-5-sonnet-20241022", "input": [ { "role": "user", "content": [ {"type": "input_text", "text": "what is in this image?"}, { "type": "input_image", "image_url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" } ] } ] }'""") print("\n3. Request with custom configuration:") print("""curl http://localhost:8000/responses \\ -H "Content-Type: application/json" \\ -d '{ "model": "anthropic/claude-3-5-sonnet-20241022", "input": "Take a screenshot and tell me what you see", "agent_kwargs": { "save_trajectory": true, "verbosity": 20 }, "computer_kwargs": { "os_type": "linux", "provider_type": "cloud" } }'""") async def test_p2p_client(): """Example P2P client using peerjs-python.""" try: from peerjs import Peer, PeerOptions, ConnectionEventType from aiortc import RTCConfiguration, RTCIceServer # Set up client peer options = PeerOptions( host="0.peerjs.com", port=443, secure=True, config=RTCConfiguration( iceServers=[RTCIceServer(urls="stun:stun.l.google.com:19302")] ) ) client_peer = Peer(id="test-client", peer_options=options) await client_peer.start() # Connect to proxy server connection = client_peer.connect("computer-agent-proxy") @connection.on(ConnectionEventType.Open) async def connection_open(): print("Connected to proxy server") # Send a test request request = { "model": "anthropic/claude-3-5-sonnet-20241022", "input": "Hello from P2P client!" } await connection.send(json.dumps(request)) @connection.on(ConnectionEventType.Data) async def connection_data(data): print(f"Received response: {data}") await client_peer.destroy() # Wait for connection await asyncio.sleep(10) except ImportError: print("P2P dependencies not available. Install peerjs-python for P2P testing.") except Exception as e: print(f"P2P test error: {e}") if __name__ == "__main__": import sys if len(sys.argv) > 1 and sys.argv[1] == "curl": curl_examples() elif len(sys.argv) > 1 and sys.argv[1] == "p2p": asyncio.run(test_p2p_client()) else: asyncio.run(test_http_endpoint()) ``` -------------------------------------------------------------------------------- /libs/python/computer-server/computer_server/diorama/safezone.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ UI Safezone Helper - A utility to get accurate bounds for macOS UI elements This module provides helper functions to get accurate bounds for macOS UI elements like the menubar and dock, which are needed for proper screenshot composition. """ import sys import time from typing import Dict, Any, Optional, Tuple # Import Objective-C bridge libraries try: import AppKit from ApplicationServices import ( AXUIElementCreateSystemWide, AXUIElementCreateApplication, AXUIElementCopyAttributeValue, AXUIElementCopyAttributeValues, kAXChildrenAttribute, kAXRoleAttribute, kAXTitleAttribute, kAXPositionAttribute, kAXSizeAttribute, kAXErrorSuccess, AXValueGetType, kAXValueCGSizeType, kAXValueCGPointType, AXUIElementGetTypeID, AXValueGetValue, kAXMenuBarAttribute, ) from AppKit import NSWorkspace, NSRunningApplication import Foundation except ImportError: print("Error: This script requires PyObjC to be installed.") print("Please install it with: pip install pyobjc") sys.exit(1) # Constants for accessibility API kAXErrorSuccess = 0 kAXRoleAttribute = "AXRole" kAXSubroleAttribute = "AXSubrole" kAXTitleAttribute = "AXTitle" kAXPositionAttribute = "AXPosition" kAXSizeAttribute = "AXSize" kAXChildrenAttribute = "AXChildren" kAXMenuBarAttribute = "AXMenuBar" def element_attribute(element, attribute): """Get an attribute from an accessibility element""" if attribute == kAXChildrenAttribute: err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None) if err == kAXErrorSuccess: if isinstance(value, Foundation.NSArray): return list(value) else: return value err, value = AXUIElementCopyAttributeValue(element, attribute, None) if err == kAXErrorSuccess: return value return None def element_value(element, type): """Get a value from an accessibility element""" err, value = AXValueGetValue(element, type, None) if err == True: return value return None def get_element_bounds(element): """Get the bounds of an accessibility element""" bounds = { "x": 0, "y": 0, "width": 0, "height": 0 } # Get position position_value = element_attribute(element, kAXPositionAttribute) if position_value: position_value = element_value(position_value, kAXValueCGPointType) if position_value: bounds["x"] = position_value.x bounds["y"] = position_value.y # Get size size_value = element_attribute(element, kAXSizeAttribute) if size_value: size_value = element_value(size_value, kAXValueCGSizeType) if size_value: bounds["width"] = size_value.width bounds["height"] = size_value.height return bounds def find_dock_process(): """Find the Dock process""" running_apps = NSWorkspace.sharedWorkspace().runningApplications() for app in running_apps: if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock": return app.processIdentifier() return None def get_menubar_bounds(): """Get the bounds of the macOS menubar Returns: Dictionary with x, y, width, height of the menubar """ # Get the system-wide accessibility element system_element = AXUIElementCreateSystemWide() # Try to find the menubar menubar = element_attribute(system_element, kAXMenuBarAttribute) if menubar is None: # If we can't get it directly, try through the frontmost app frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication() if frontmost_app: app_pid = frontmost_app.processIdentifier() app_element = AXUIElementCreateApplication(app_pid) menubar = element_attribute(app_element, kAXMenuBarAttribute) if menubar is None: print("Error: Could not get menubar") # Return default menubar bounds as fallback return {"x": 0, "y": 0, "width": 1800, "height": 24} # Get menubar bounds return get_element_bounds(menubar) def get_dock_bounds(): """Get the bounds of the macOS Dock Returns: Dictionary with x, y, width, height of the Dock """ dock_pid = find_dock_process() if dock_pid is None: print("Error: Could not find Dock process") # Return empty bounds as fallback return {"x": 0, "y": 0, "width": 0, "height": 0} # Create an accessibility element for the Dock dock_element = AXUIElementCreateApplication(dock_pid) if dock_element is None: print(f"Error: Could not create accessibility element for Dock (PID {dock_pid})") return {"x": 0, "y": 0, "width": 0, "height": 0} # Get the Dock's children children = element_attribute(dock_element, kAXChildrenAttribute) if not children or len(children) == 0: print("Error: Could not get Dock children") return {"x": 0, "y": 0, "width": 0, "height": 0} # Find the Dock's list (first child is usually the main dock list) dock_list = None for child in children: role = element_attribute(child, kAXRoleAttribute) if role == "AXList": dock_list = child break if dock_list is None: print("Error: Could not find Dock list") return {"x": 0, "y": 0, "width": 0, "height": 0} # Get the bounds of the dock list return get_element_bounds(dock_list) def get_ui_element_bounds(): """Get the bounds of important UI elements like menubar and dock Returns: Dictionary with menubar and dock bounds """ menubar_bounds = get_menubar_bounds() dock_bounds = get_dock_bounds() return { "menubar": menubar_bounds, "dock": dock_bounds } if __name__ == "__main__": # Example usage bounds = get_ui_element_bounds() print("Menubar bounds:", bounds["menubar"]) print("Dock bounds:", bounds["dock"]) ``` -------------------------------------------------------------------------------- /.github/workflows/pypi-publish-agent.yml: -------------------------------------------------------------------------------- ```yaml name: Publish Agent Package on: push: tags: - "agent-v*" workflow_dispatch: inputs: version: description: "Version to publish (without v prefix)" required: true default: "0.1.0" workflow_call: inputs: version: description: "Version to publish" required: true type: string # Adding permissions at workflow level permissions: contents: write jobs: prepare: runs-on: macos-latest outputs: version: ${{ steps.get-version.outputs.version }} computer_version: ${{ steps.update-deps.outputs.computer_version }} som_version: ${{ steps.update-deps.outputs.som_version }} core_version: ${{ steps.update-deps.outputs.core_version }} steps: - uses: actions/checkout@v4 - name: Determine version id: get-version run: | if [ "${{ github.event_name }}" == "push" ]; then # Extract version from tag (for package-specific tags) if [[ "${{ github.ref }}" =~ ^refs/tags/agent-v([0-9]+\.[0-9]+\.[0-9]+) ]]; then VERSION=${BASH_REMATCH[1]} else echo "Invalid tag format for agent" exit 1 fi elif [ "${{ github.event_name }}" == "workflow_dispatch" ]; then # Use version from workflow dispatch VERSION=${{ github.event.inputs.version }} else # Use version from workflow_call VERSION=${{ inputs.version }} fi echo "VERSION=$VERSION" echo "version=$VERSION" >> $GITHUB_OUTPUT - name: Set up Python uses: actions/setup-python@v4 with: python-version: "3.11" - name: Update dependencies to latest versions id: update-deps run: | cd libs/python/agent # Install required package for PyPI API access pip install requests # Create a more robust Python script for PyPI version checking cat > get_latest_versions.py << 'EOF' import requests import json import sys def get_package_version(package_name, fallback="0.1.0"): try: response = requests.get(f'https://pypi.org/pypi/{package_name}/json') print(f"API Response Status for {package_name}: {response.status_code}", file=sys.stderr) if response.status_code != 200: print(f"API request failed for {package_name}, using fallback version", file=sys.stderr) return fallback data = json.loads(response.text) if 'info' not in data: print(f"Missing 'info' key in API response for {package_name}, using fallback version", file=sys.stderr) return fallback return data['info']['version'] except Exception as e: print(f"Error fetching version for {package_name}: {str(e)}", file=sys.stderr) return fallback # Get latest versions print(get_package_version('cua-computer')) print(get_package_version('cua-som')) print(get_package_version('cua-core')) EOF # Execute the script to get the versions VERSIONS=($(python get_latest_versions.py)) LATEST_COMPUTER=${VERSIONS[0]} LATEST_SOM=${VERSIONS[1]} LATEST_CORE=${VERSIONS[2]} echo "Latest cua-computer version: $LATEST_COMPUTER" echo "Latest cua-som version: $LATEST_SOM" echo "Latest cua-core version: $LATEST_CORE" # Output the versions for the next job echo "computer_version=$LATEST_COMPUTER" >> $GITHUB_OUTPUT echo "som_version=$LATEST_SOM" >> $GITHUB_OUTPUT echo "core_version=$LATEST_CORE" >> $GITHUB_OUTPUT # Determine major version for version constraint COMPUTER_MAJOR=$(echo $LATEST_COMPUTER | cut -d. -f1) SOM_MAJOR=$(echo $LATEST_SOM | cut -d. -f1) CORE_MAJOR=$(echo $LATEST_CORE | cut -d. -f1) NEXT_COMPUTER_MAJOR=$((COMPUTER_MAJOR + 1)) NEXT_SOM_MAJOR=$((SOM_MAJOR + 1)) NEXT_CORE_MAJOR=$((CORE_MAJOR + 1)) # Update dependencies in pyproject.toml if [[ "$OSTYPE" == "darwin"* ]]; then # macOS version of sed needs an empty string for -i sed -i '' "s/\"cua-computer>=.*,<.*\"/\"cua-computer>=$LATEST_COMPUTER,<$NEXT_COMPUTER_MAJOR.0.0\"/" pyproject.toml sed -i '' "s/\"cua-som>=.*,<.*\"/\"cua-som>=$LATEST_SOM,<$NEXT_SOM_MAJOR.0.0\"/" pyproject.toml sed -i '' "s/\"cua-core>=.*,<.*\"/\"cua-core>=$LATEST_CORE,<$NEXT_CORE_MAJOR.0.0\"/" pyproject.toml else # Linux version sed -i "s/\"cua-computer>=.*,<.*\"/\"cua-computer>=$LATEST_COMPUTER,<$NEXT_COMPUTER_MAJOR.0.0\"/" pyproject.toml sed -i "s/\"cua-som>=.*,<.*\"/\"cua-som>=$LATEST_SOM,<$NEXT_SOM_MAJOR.0.0\"/" pyproject.toml sed -i "s/\"cua-core>=.*,<.*\"/\"cua-core>=$LATEST_CORE,<$NEXT_CORE_MAJOR.0.0\"/" pyproject.toml fi # Display the updated dependencies echo "Updated dependencies in pyproject.toml:" grep -E "cua-computer|cua-som|cua-core" pyproject.toml publish: needs: prepare uses: ./.github/workflows/pypi-reusable-publish.yml with: package_name: "agent" package_dir: "libs/python/agent" version: ${{ needs.prepare.outputs.version }} is_lume_package: false base_package_name: "cua-agent" secrets: PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }} set-env-variables: needs: [prepare, publish] runs-on: macos-latest steps: - name: Set environment variables for use in other jobs run: | echo "COMPUTER_VERSION=${{ needs.prepare.outputs.computer_version }}" >> $GITHUB_ENV echo "SOM_VERSION=${{ needs.prepare.outputs.som_version }}" >> $GITHUB_ENV echo "CORE_VERSION=${{ needs.prepare.outputs.core_version }}" >> $GITHUB_ENV ``` -------------------------------------------------------------------------------- /libs/lumier/src/lib/utils.sh: -------------------------------------------------------------------------------- ```bash #!/usr/bin/env bash # Function to wait for SSH to become available wait_for_ssh() { local host_ip=$1 local user=$2 local password=$3 local retry_interval=${4:-5} # Default retry interval is 5 seconds local max_retries=${5:-20} # Default maximum retries is 20 (0 for infinite) # Only show waiting message in debug mode if [ "${LUMIER_DEBUG:-0}" == "1" ]; then echo "Waiting for SSH to become available on $host_ip..." fi local retry_count=0 while true; do # Try to connect via SSH # Add -q for completely silent operation, redirect stderr to /dev/null sshpass -p "$password" ssh -q -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR "$user@$host_ip" "exit" 2>/dev/null # Check the exit status of the SSH command if [ $? -eq 0 ]; then echo "SSH is ready on $host_ip!" return 0 fi # Increment retry count ((retry_count++)) # Exit if maximum retries are reached if [ $max_retries -ne 0 ] && [ $retry_count -ge $max_retries ]; then echo "Maximum retries reached. SSH is not available." return 1 fi # Only show retry messages in debug mode if [ "${LUMIER_DEBUG:-0}" == "1" ]; then echo "SSH not ready. Retrying in $retry_interval seconds... (Attempt $retry_count)" fi sleep $retry_interval done } # Function to execute a script on a remote server using sshpass execute_remote_script() { local host="$1" local user="$2" local password="$3" local script_path="$4" local vnc_password="$5" local data_folder="$6" # Check if all required arguments are provided if [ -z "$host" ] || [ -z "$user" ] || [ -z "$password" ] || [ -z "$script_path" ] || [ -z "$vnc_password" ]; then echo "Usage: execute_remote_script <host> <user> <password> <script_path> <vnc_password> [data_folder]" return 1 fi # Only show VNC info in debug mode if [ "${LUMIER_DEBUG:-0}" == "1" ]; then echo "VNC password exported to VM: $vnc_password" fi # Set the shared folder path for the VM if [ -n "$data_folder" ]; then # VM always sees shared folders at this path, regardless of container path shared_folder_path="/Volumes/My Shared Files" # Only show path in debug mode if [ "${LUMIER_DEBUG:-0}" == "1" ]; then echo "Data folder path in VM: $shared_folder_path" fi else shared_folder_path="" fi # Read the script content and prepend the shebang script_content="#!/usr/bin/env bash\n" # Always export VNC_PASSWORD script_content+="export VNC_PASSWORD='$vnc_password'\n" # Export SHARED_FOLDER_PATH only if we have a data folder path if [ -n "$shared_folder_path" ]; then script_content+="export SHARED_FOLDER_PATH='$shared_folder_path'\n" fi # Pass debug setting to the VM script_content+="export VNC_DEBUG='${LUMIER_DEBUG:-0}'\n" # Add debug messages only if debug mode is enabled if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then script_content+="echo \"[DEBUG] Starting on-logon script execution...\"\n" fi # Add the original script content script_content+="$(<"$script_path")" # Add debug messages only if debug mode is enabled if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then script_content+="\necho \"[DEBUG] Finished executing on-logon script.\"\n" fi # Print debug info only when debug mode is enabled if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then echo "[DEBUG] Executing remote script with content length: $(echo -n "$script_content" | wc -c) bytes" echo "[DEBUG] Script path: $script_path" fi # Use a here-document to send the script content # We'll capture both stdout and stderr when debug is enabled if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then echo "[DEBUG] Connecting to $user@$host to execute script..." sshpass -p "$password" ssh -q -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR "$user@$host" "bash -s -- '$vnc_password' '$data_folder'" 2>&1 <<EOF $script_content EOF else # Otherwise run quietly sshpass -p "$password" ssh -q -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR "$user@$host" "bash -s -- '$vnc_password' '$data_folder'" 2>/dev/null <<EOF $script_content EOF fi # Print completion message only in debug mode if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then echo "[DEBUG] Script execution completed." fi # Check the exit status of the sshpass command if [ $? -ne 0 ]; then echo "Failed to execute script on remote host $host." return 1 fi } extract_json_field() { local field_name=$1 local input=$2 local result="" # First attempt with jq if available (most reliable JSON parsing) if command -v jq &> /dev/null; then # Use jq for reliable JSON parsing result=$(echo "$input" | jq -r ".$field_name // empty" 2>/dev/null) if [[ -n "$result" ]]; then echo "$result" return 0 fi fi # Fallback to grep-based approach with improvements # First try for quoted string values result=$(echo "$input" | tr -d '\n' | grep -o "\"$field_name\"\s*:\s*\"[^\"]*\"" | sed -E 's/.*":\s*"(.*)"$/\1/') if [[ -n "$result" ]]; then echo "$result" return 0 fi # Try for non-quoted values (numbers, true, false, null) result=$(echo "$input" | tr -d '\n' | grep -o "\"$field_name\"\s*:\s*[^,}]*" | sed -E 's/.*":\s*(.*)$/\1/') if [[ -n "$result" ]]; then echo "$result" return 0 fi # Return empty string if field not found echo "" } extract_json_field_from_file() { local field_name=$1 local json_file=$2 local json_text json_text=$(<"$json_file") extract_json_field "$field_name" "$json_text" } extract_json_field_from_text() { local field_name=$1 local json_text=$2 extract_json_field "$field_name" "$json_text" } ``` -------------------------------------------------------------------------------- /libs/lume/src/FileSystem/VMDirectory.swift: -------------------------------------------------------------------------------- ```swift import Foundation // MARK: - VMDirectory /// Manages a virtual machine's directory structure and files /// Responsible for: /// - Managing VM configuration files /// - Handling disk operations /// - Managing VM state and locking /// - Providing access to VM-related paths struct VMDirectory: Sendable { // MARK: - Constants private enum FileNames { static let nvram = "nvram.bin" static let disk = "disk.img" static let config = "config.json" static let sessions = "sessions.json" } // MARK: - Properties let dir: Path let nvramPath: Path let diskPath: Path let configPath: Path let sessionsPath: Path /// The name of the VM directory var name: String { dir.name } // MARK: - Initialization /// Creates a new VMDirectory instance /// - Parameters: /// - dir: The base directory path for the VM init(_ dir: Path) { self.dir = dir self.nvramPath = dir.file(FileNames.nvram) self.diskPath = dir.file(FileNames.disk) self.configPath = dir.file(FileNames.config) self.sessionsPath = dir.file(FileNames.sessions) } } // MARK: - VM State Management extension VMDirectory { /// Checks if the VM directory is fully initialized with all required files func initialized() -> Bool { // Add detailed logging for debugging let configExists = configPath.exists() let diskExists = diskPath.exists() let nvramExists = nvramPath.exists() // Logger.info( // "VM directory initialization check", // metadata: [ // "directory": dir.path, // "config_path": configPath.path, // "config_exists": "\(configExists)", // "disk_path": diskPath.path, // "disk_exists": "\(diskExists)", // "nvram_path": nvramPath.path, // "nvram_exists": "\(nvramExists)" // ] // ) return configExists && diskExists && nvramExists } /// Checks if the VM directory exists func exists() -> Bool { dir.exists() } } // MARK: - Disk Management extension VMDirectory { /// Resizes the VM's disk to the specified size /// - Parameter size: The new size in bytes /// - Throws: VMDirectoryError if the disk operation fails func setDisk(_ size: UInt64) throws { do { if !diskPath.exists() { guard FileManager.default.createFile(atPath: diskPath.path, contents: nil) else { throw VMDirectoryError.fileCreationFailed(diskPath.path) } } let handle = try FileHandle(forWritingTo: diskPath.url) defer { try? handle.close() } try handle.truncate(atOffset: size) } catch { } } } // MARK: - Configuration Management extension VMDirectory { /// Saves the VM configuration to disk /// - Parameter config: The configuration to save /// - Throws: VMDirectoryError if the save operation fails func saveConfig(_ config: VMConfig) throws { let encoder = JSONEncoder() encoder.outputFormatting = .prettyPrinted do { let data = try encoder.encode(config) guard FileManager.default.createFile(atPath: configPath.path, contents: data) else { throw VMDirectoryError.fileCreationFailed(configPath.path) } } catch { throw VMDirectoryError.invalidConfigData } } /// Loads the VM configuration from disk /// - Returns: The loaded configuration /// - Throws: VMDirectoryError if the load operation fails func loadConfig() throws -> VMConfig { guard let data = FileManager.default.contents(atPath: configPath.path) else { throw VMDirectoryError.configNotFound } do { let decoder = JSONDecoder() return try decoder.decode(VMConfig.self, from: data) } catch { throw VMDirectoryError.invalidConfigData } } } // MARK: - VNC Session Management struct VNCSession: Codable { let url: String let sharedDirectories: [SharedDirectory]? init(url: String, sharedDirectories: [SharedDirectory]? = nil) { self.url = url self.sharedDirectories = sharedDirectories } } extension VMDirectory { /// Saves VNC session information to disk /// - Parameters: /// - session: The VNC session to save /// - sharedDirectories: Optional array of shared directories to save with the session /// - Throws: VMDirectoryError if the save operation fails func saveSession(_ session: VNCSession) throws { let encoder = JSONEncoder() encoder.outputFormatting = .prettyPrinted do { let data = try encoder.encode(session) guard FileManager.default.createFile(atPath: sessionsPath.path, contents: data) else { throw VMDirectoryError.fileCreationFailed(sessionsPath.path) } } catch { throw VMDirectoryError.invalidSessionData } } /// Loads the VNC session information from disk /// - Returns: The loaded VNC session /// - Throws: VMDirectoryError if the load operation fails func loadSession() throws -> VNCSession { guard let data = FileManager.default.contents(atPath: sessionsPath.path) else { throw VMDirectoryError.sessionNotFound } do { let decoder = JSONDecoder() return try decoder.decode(VNCSession.self, from: data) } catch { throw VMDirectoryError.invalidSessionData } } /// Removes the VNC session information from disk func clearSession() { try? FileManager.default.removeItem(atPath: sessionsPath.path) } } // MARK: - CustomStringConvertible extension VMDirectory: CustomStringConvertible { var description: String { "VMDirectory(path: \(dir.path))" } } extension VMDirectory { func delete() throws { try FileManager.default.removeItem(atPath: dir.path) } } ``` -------------------------------------------------------------------------------- /libs/python/agent/benchmarks/ss-pro.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ ScreenSpot-Pro Benchmark Script Evaluates models on the ScreenSpot-Pro dataset for click prediction accuracy. Supports both ComputerAgent model strings and custom model classes. """ import argparse import asyncio import random import statistics import time from typing import Optional from datasets import load_dataset from tqdm import tqdm from utils import ( ModelWrapper, is_click_in_bbox, save_results_to_markdown, save_visualizations, get_available_models, get_gpu_memory ) async def evaluate_model(model_wrapper: ModelWrapper, dataset, max_samples: Optional[int] = None) -> dict: """ Evaluate a model on the ScreenSpot-Pro dataset. Args: model_wrapper: ModelWrapper instance dataset: ScreenSpot-Pro dataset (list of samples) max_samples: Maximum number of samples to evaluate (None for all) Returns: Dictionary with evaluation results """ print(f"\nEvaluating model: {model_wrapper.model_name}") # Load model await model_wrapper.load_model() total_samples = len(dataset) if max_samples is not None: total_samples = min(max_samples, total_samples) correct_predictions = 0 error_predictions = 0 results = [] for i in tqdm(range(total_samples), desc=f"Evaluating {model_wrapper.model_name}"): sample = dataset[i] # Extract sample data image = sample['image'] instruction = sample['instruction'] bbox = sample['bbox'] # [x1, y1, x2, y2] sample_id = sample['img_filename'] # Predict click coordinates with timing start_time = time.time() click_coords = await model_wrapper.predict_click(image, instruction) prediction_time = time.time() - start_time # Check if prediction is correct is_correct = is_click_in_bbox(click_coords, bbox) if is_correct: correct_predictions += 1 results.append({ 'id': sample_id, 'instruction': instruction, 'bbox': bbox, 'predicted_coords': click_coords, 'is_correct': is_correct, 'failed': False, 'prediction_time': prediction_time }) # Unload model await model_wrapper.unload_model() # Calculate metrics accuracy = correct_predictions / total_samples if total_samples > 0 else 0.0 error_rate = error_predictions / total_samples if total_samples > 0 else 0.0 # Calculate timing statistics successful_times = [r['prediction_time'] for r in results if not r['failed']] avg_prediction_time = sum(successful_times) / len(successful_times) if successful_times else 0.0 median_prediction_time = statistics.median(successful_times) if successful_times else 0.0 min_prediction_time = min(successful_times) if successful_times else 0.0 max_prediction_time = max(successful_times) if successful_times else 0.0 # Get VRAM statistics vram_stats = model_wrapper.get_vram_stats() return { 'model_name': model_wrapper.model_name, 'total_samples': total_samples, 'correct_predictions': correct_predictions, 'failed_predictions': error_predictions, 'accuracy': accuracy, 'failure_rate': error_rate, 'avg_prediction_time': avg_prediction_time, 'median_prediction_time': median_prediction_time, 'min_prediction_time': min_prediction_time, 'max_prediction_time': max_prediction_time, 'vram_max_mb': vram_stats['max_mb'], 'vram_avg_mb': vram_stats['avg_mb'], 'results': results } async def main(): """ Main function to run the benchmark. """ # Parse command line arguments parser = argparse.ArgumentParser(description='ScreenSpot-Pro Benchmark Script') parser.add_argument('--samples', type=int, default=300, help='Number of samples to evaluate (default: 300)') parser.add_argument('--seed', type=int, default=42, help='Random seed for shuffling (default: 42)') args = parser.parse_args() # Set random seed random.seed(args.seed) # Load dataset print("Loading ScreenSpot-Pro dataset...") ds = load_dataset("lmms-lab/ScreenSpot-Pro") dataset = ds['train'] # type: ignore # Convert to list to support indexing dataset_list = list(dataset) print(f"Dataset loaded: {len(dataset_list)} samples") # Shuffle dataset with seed random.shuffle(dataset_list) print(f"Dataset shuffled with seed {args.seed}") # Get available models models = get_available_models() # Evaluation settings max_samples = args.samples # Use command line argument # Run evaluations all_results = [] for model in models: model_wrapper = ModelWrapper(model) result = await evaluate_model(model_wrapper, dataset_list, max_samples) all_results.append(result) # Print summary print(f"\n{result['model_name']} Results:") print(f" Accuracy: {result['accuracy']*100:.2f}%") print(f" Correct: {result['correct_predictions']}/{result['total_samples']}") print(f" Errors: {result['failed_predictions']}") print(f" Error Rate: {result['failure_rate']*100:.2f}%") print(f" Avg Time: {result['avg_prediction_time']:.2f}s") print(f" Median Time: {result['median_prediction_time']:.2f}s") print(f" Time Range: {result['min_prediction_time']:.2f}s - {result['max_prediction_time']:.2f}s") print(f" VRAM Max: {result['vram_max_mb']:.1f}MB") print(f" VRAM Avg: {result['vram_avg_mb']:.1f}MB") # Print GPU memory info gpu_memory = get_gpu_memory() if gpu_memory and gpu_memory[0] > 0: print(f" GPU Free Memory: {gpu_memory[0]:.1f}MB") # Save results if all_results: save_results_to_markdown(all_results) save_visualizations(all_results, dataset_list) print("\nBenchmark completed successfully!") else: print("\nNo successful evaluations completed.") if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /docs/content/docs/telemetry.mdx: -------------------------------------------------------------------------------- ```markdown --- title: Telemetry description: This document explains how telemetry works in CUA libraries and how you can control it. icon: RadioTower --- # Telemetry in CUA CUA tracks anonymized usage and error report statistics; we ascribe to Posthog's approach as detailed [here](https://posthog.com/blog/open-source-telemetry-ethical). If you would like to opt out of sending anonymized info, you can set `telemetry_enabled` to false. ## What telemetry data we collect CUA libraries collect usage data to help improve our software. We have two categories of telemetry: ### Opt-Out Telemetry (Enabled by Default) Basic performance metrics and system information that help us understand usage patterns: - **System Information**: Operating system, OS version, Python version - **Module Initialization**: When modules are imported and their versions - **Performance Metrics**: Agent run durations, step counts, token usage, and API costs - **Session Tracking**: Anonymous session IDs and run IDs for performance analysis ### Opt-In Telemetry (Disabled by Default) **Conversation Trajectory Logging**: Full conversation history including: - User messages and agent responses - Computer actions and their outputs - Reasoning traces from the agent **Important**: Trajectory logging is **opt-in only** and must be explicitly enabled. ### We do NOT collect: - Personal information or user identifiers - API keys or credentials - File contents or application data - Information about files being accessed - Actual screenshots or screen contents (unless trajectory logging is enabled) - Specific text being typed, including user inputs, model outputs, computer outputs, or tool call outputs (unless trajectory logging is enabled) ## Controlling Telemetry We are committed to transparency and user control over telemetry. There are two ways to control telemetry: ### 1. Environment Variable (Global Control) Telemetry is enabled by default. To disable telemetry, set the `CUA_TELEMETRY_ENABLED` environment variable to a falsy value (`0`, `false`, `no`, or `off`): ```bash # Disable telemetry before running your script export CUA_TELEMETRY_ENABLED=false # Or as part of the command CUA_TELEMETRY_ENABLED=1 python your_script.py ``` Or from Python: ```python import os os.environ["CUA_TELEMETRY_ENABLED"] = "false" ``` ### 2. Instance-Level Control #### Computer SDK ```python from computer import Computer # Enable telemetry (default) computer = Computer(telemetry_enabled=True) # Disable telemetry computer = Computer(telemetry_enabled=False) ``` #### Agent SDK ```python from agent import ComputerAgent import os # Basic telemetry - performance metrics only (opt-out, enabled by default) agent = ComputerAgent( model="claude-3-5-sonnet-20241022", telemetry_enabled=True # Default is True ) # Enable telemetry with full conversation trajectory logging (opt-in) agent = ComputerAgent( model="claude-3-5-sonnet-20241022", telemetry_enabled={ "log_trajectory": True # Logs full conversation items } ) # Disable telemetry completely agent = ComputerAgent( model="claude-3-5-sonnet-20241022", telemetry_enabled=False ) # Disable telemetry completely using environment variables os.environ["CUA_TELEMETRY_ENABLED"] = "false" agent = ComputerAgent( model="claude-3-5-sonnet-20241022" ) ``` You can check if telemetry is enabled for an instance: ```python print(computer.telemetry_enabled) # Will print True or False print(agent.telemetry_enabled) # Will print True, False, or dict ``` Note that telemetry settings must be configured during initialization and cannot be changed after the object is created. ## Detailed Telemetry Events ### Computer SDK Events | Event Name | Data Collected | Trigger Notes | |------------|----------------|---------------| | **computer_initialized** | • `os`: Operating system (e.g., 'windows', 'darwin', 'linux')<br />• `os_version`: OS version<br />• `python_version`: Python version | Triggered when a Computer instance is created | | **module_init** | • `module`: "computer"<br />• `version`: Package version<br />• `python_version`: Full Python version string | Triggered once when the computer package is imported for the first time | ### Agent SDK Events | Event Name | Data Collected | Trigger Notes | |------------|----------------|---------------| | **module_init** | • `module`: "agent"<br />• `version`: Package version<br />• `python_version`: Full Python version string | Triggered once when the agent package is imported for the first time | | **agent_session_start** | • `session_id`: Unique UUID for this agent instance<br />• `agent_type`: Class name (e.g., "ComputerAgent")<br />• `model`: Model name (e.g., "claude-3-5-sonnet")<br />• `os`: Operating system<br />• `os_version`: OS version<br />• `python_version`: Python version | Triggered when TelemetryCallback is initialized (agent instantiation) | | **agent_run_start** | • `session_id`: Agent session UUID<br />• `run_id`: Unique UUID for this run<br />• `start_time`: Unix timestamp<br />• `input_context_size`: Character count of input messages<br />• `num_existing_messages`: Count of existing messages<br />• `uploaded_trajectory`: Full conversation items (opt-in) | Triggered at the start of each agent.run() call | | **agent_run_end** | • `session_id`: Agent session UUID<br />• `run_id`: Run UUID<br />• `end_time`: Unix timestamp<br />• `duration_seconds`: Total run duration<br />• `num_steps`: Total steps taken in this run<br />• `total_usage`: Accumulated token usage and costs<br />• `uploaded_trajectory`: Full conversation items (opt-in) | Triggered at the end of each agent.run() call | | **agent_step** | • `session_id`: Agent session UUID<br />• `run_id`: Run UUID<br />• `step`: Step number (incremental)<br />• `timestamp`: Unix timestamp<br />• `duration_seconds`: Duration of previous step | Triggered on each agent response/step during a run | | **agent_usage** | • `session_id`: Agent session UUID<br />• `run_id`: Run UUID<br />• `step`: Current step number<br />• `prompt_tokens`: Tokens in prompt<br />• `completion_tokens`: Tokens in response<br />• `total_tokens`: Total tokens used<br />• `response_cost`: Cost of this API call | Triggered whenever usage information is received from LLM API | ## Transparency We believe in being transparent about the data we collect. If you have any questions about our telemetry practices, please open an issue on our GitHub repository. ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/callbacks/operator_validator.py: -------------------------------------------------------------------------------- ```python """ OperatorValidatorCallback Ensures agent output actions conform to expected schemas by fixing common issues: - click: add default button='left' if missing - keypress: wrap keys string into a list - etc. This runs in on_llm_end, which receives the output array (AgentMessage[] as dicts). The purpose is to avoid spending another LLM call to fix broken computer call syntax when possible. """ from __future__ import annotations from typing import Any, Dict, List from .base import AsyncCallbackHandler class OperatorNormalizerCallback(AsyncCallbackHandler): """Normalizes common computer call hallucinations / errors in computer call syntax.""" async def on_llm_end(self, output: List[Dict[str, Any]]) -> List[Dict[str, Any]]: # Mutate in-place as requested, but still return the list for chaining for item in output or []: if item.get("type") != "computer_call": continue action = item.get("action") if not isinstance(action, dict): continue # rename mouse click actions to "click" for mouse_btn in ["left", "right", "wheel", "back", "forward"]: if action.get("type", "") == f"{mouse_btn}_click": action["type"] = "click" action["button"] = mouse_btn # rename hotkey actions to "keypress" for alias in ["hotkey", "key", "press", "key_press"]: if action.get("type", "") == alias: action["type"] = "keypress" # assume click actions if "button" in action and "type" not in action: action["type"] = "click" if "click" in action and "type" not in action: action["type"] = "click" if ("scroll_x" in action or "scroll_y" in action) and "type" not in action: action["type"] = "scroll" if "text" in action and "type" not in action: action["type"] = "type" action_type = action.get("type") def _keep_keys(action: Dict[str, Any], keys_to_keep: List[str]): """Keep only the provided keys on action; delete everything else. Always ensures required 'type' is present if listed in keys_to_keep. """ for key in list(action.keys()): if key not in keys_to_keep: del action[key] # rename "coordinate" to "x", "y" if "coordinate" in action: action["x"] = action["coordinate"][0] action["y"] = action["coordinate"][1] del action["coordinate"] if action_type == "click": # convert "click" to "button" if "button" not in action and "click" in action: action["button"] = action["click"] del action["click"] # default button to "left" action["button"] = action.get("button", "left") # add default scroll x, y if missing if action_type == "scroll": action["scroll_x"] = action.get("scroll_x", 0) action["scroll_y"] = action.get("scroll_y", 0) # ensure keys arg is a list (normalize aliases first) if action_type == "keypress": keys = action.get("keys") for keys_alias in ["keypress", "key", "press", "key_press", "text"]: if keys_alias in action: action["keys"] = action[keys_alias] del action[keys_alias] keys = action.get("keys") if isinstance(keys, str): action["keys"] = keys.replace("-", "+").split("+") if len(keys) > 1 else [keys] required_keys_by_type = { # OpenAI actions "click": ["type", "button", "x", "y"], "double_click": ["type", "x", "y"], "drag": ["type", "path"], "keypress": ["type", "keys"], "move": ["type", "x", "y"], "screenshot": ["type"], "scroll": ["type", "scroll_x", "scroll_y", "x", "y"], "type": ["type", "text"], "wait": ["type"], # Anthropic actions "left_mouse_down": ["type", "x", "y"], "left_mouse_up": ["type", "x", "y"], "triple_click": ["type", "button", "x", "y"], } keep = required_keys_by_type.get(action_type or "") if keep: _keep_keys(action, keep) # # Second pass: if an assistant message is immediately followed by a computer_call, # # replace the assistant message itself with a reasoning message with summary text. # if isinstance(output, list): # for i, item in enumerate(output): # # AssistantMessage shape: { type: 'message', role: 'assistant', content: OutputContent[] } # if item.get("type") == "message" and item.get("role") == "assistant": # next_idx = i + 1 # if next_idx >= len(output): # continue # next_item = output[next_idx] # if not isinstance(next_item, dict): # continue # if next_item.get("type") != "computer_call": # continue # contents = item.get("content") or [] # # Extract text from OutputContent[] # text_parts: List[str] = [] # if isinstance(contents, list): # for c in contents: # if isinstance(c, dict) and c.get("type") == "output_text" and isinstance(c.get("text"), str): # text_parts.append(c["text"]) # text_content = "\n".join(text_parts).strip() # # Replace assistant message with reasoning message # output[i] = { # "type": "reasoning", # "summary": [ # { # "type": "summary_text", # "text": text_content, # } # ], # } return output ``` -------------------------------------------------------------------------------- /.github/workflows/docker-reusable-publish.yml: -------------------------------------------------------------------------------- ```yaml name: Reusable Docker Publish Workflow on: workflow_call: inputs: image_name: description: "Name of the Docker image (e.g. cua-ubuntu, cua-xfce)" required: true type: string context_dir: description: "Directory containing the Dockerfile relative to workspace root (e.g. libs/kasm, libs/xfce)" required: true type: string dockerfile_path: description: "Path to Dockerfile relative to context_dir (e.g. Dockerfile)" required: false type: string default: "Dockerfile" tag_prefix: description: "Prefix for semantic version tags (e.g. docker-kasm-v, docker-xfce-v)" required: true type: string docker_hub_org: description: "Docker Hub organization name" required: false type: string default: "trycua" secrets: DOCKER_HUB_TOKEN: required: true jobs: build-and-push: runs-on: ubuntu-latest strategy: fail-fast: false matrix: platform: - linux/amd64 - linux/arm64 steps: - name: Checkout repository uses: actions/checkout@v4 - name: Prepare platform tag id: platform run: | # Convert platform (e.g., linux/amd64) to a valid tag suffix (e.g., linux-amd64) PLATFORM_TAG=$(echo "${{ matrix.platform }}" | sed 's/\//-/g') echo "tag=${PLATFORM_TAG}" >> $GITHUB_OUTPUT - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 - name: Log in to Docker Hub uses: docker/login-action@v3 with: username: ${{ inputs.docker_hub_org }} password: ${{ secrets.DOCKER_HUB_TOKEN }} - name: Extract metadata (PR) if: github.event_name == 'pull_request' id: meta-pr uses: docker/metadata-action@v5 with: images: ${{ inputs.docker_hub_org }}/${{ inputs.image_name }} tags: | type=raw,value=${{ github.sha }} - name: Extract metadata (main branch) if: github.event_name != 'pull_request' && github.ref == 'refs/heads/main' id: meta-main uses: docker/metadata-action@v5 with: images: ${{ inputs.docker_hub_org }}/${{ inputs.image_name }} tags: | type=raw,value=latest - name: Extract metadata (semantic version tag) if: startsWith(github.ref, format('refs/tags/{0}', inputs.tag_prefix)) id: meta-semver uses: docker/metadata-action@v5 with: images: ${{ inputs.docker_hub_org }}/${{ inputs.image_name }} tags: | type=semver,pattern={{version}},prefix=${{ inputs.tag_prefix }} type=semver,pattern={{major}}.{{minor}},prefix=${{ inputs.tag_prefix }} type=semver,pattern={{major}},prefix=${{ inputs.tag_prefix }} type=raw,value=latest - name: Build and push Docker image (PR) if: github.event_name == 'pull_request' uses: docker/build-push-action@v5 with: context: ./${{ inputs.context_dir }} file: ./${{ inputs.context_dir }}/${{ inputs.dockerfile_path }} push: true tags: ${{ steps.meta-pr.outputs.tags }} labels: ${{ steps.meta-pr.outputs.labels }} platforms: ${{ matrix.platform }} cache-from: | type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:buildcache-${{ steps.platform.outputs.tag }} type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:latest cache-to: type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:buildcache-${{ steps.platform.outputs.tag }},mode=max - name: Build and push Docker image (main branch) if: github.event_name != 'pull_request' && github.ref == 'refs/heads/main' uses: docker/build-push-action@v5 with: context: ./${{ inputs.context_dir }} file: ./${{ inputs.context_dir }}/${{ inputs.dockerfile_path }} push: true tags: ${{ steps.meta-main.outputs.tags }} labels: ${{ steps.meta-main.outputs.labels }} platforms: ${{ matrix.platform }} cache-from: | type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:buildcache-${{ steps.platform.outputs.tag }} type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:latest cache-to: type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:buildcache-${{ steps.platform.outputs.tag }},mode=max - name: Build and push Docker image (semantic version tag) if: startsWith(github.ref, format('refs/tags/{0}', inputs.tag_prefix)) uses: docker/build-push-action@v5 with: context: ./${{ inputs.context_dir }} file: ./${{ inputs.context_dir }}/${{ inputs.dockerfile_path }} push: true tags: ${{ steps.meta-semver.outputs.tags }} labels: ${{ steps.meta-semver.outputs.labels }} platforms: ${{ matrix.platform }} cache-from: | type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:buildcache-${{ steps.platform.outputs.tag }} type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:latest cache-to: type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:buildcache-${{ steps.platform.outputs.tag }},mode=max - name: Image digest if: github.event_name == 'pull_request' || github.ref == 'refs/heads/main' || startsWith(github.ref, format('refs/tags/{0}', inputs.tag_prefix)) run: | if [ "${{ github.event_name }}" == "pull_request" ]; then echo "Image pushed with digest ${{ steps.meta-pr.outputs.digest }}" elif [[ "${{ github.ref }}" == refs/tags/${{ inputs.tag_prefix }}* ]]; then echo "Image pushed with digest ${{ steps.meta-semver.outputs.digest }}" else echo "Image pushed with digest ${{ steps.meta-main.outputs.digest }}" fi - name: print image tags run: | if [ "${{ github.event_name }}" == "pull_request" ]; then echo "Image tags: ${{ steps.meta-pr.outputs.tags }}" elif [[ "${{ github.ref }}" == refs/tags/${{ inputs.tag_prefix }}* ]]; then echo "Image tags: ${{ steps.meta-semver.outputs.tags }}" else echo "Image tags: ${{ steps.meta-main.outputs.tags }}" fi ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/loops/internvl.py: -------------------------------------------------------------------------------- ```python """ InternVL agent loop implementation for click prediction using litellm.acompletion. Implements the ScreenSpot InternVL grounding baseline behavior: - Uses the exact grounding prompt format with <image> and <ref> tags - Expects coordinates in 0-1000 normalized range in formats [[x1,y1,x2,y2]] or [[x,y]] - Converts to pixel coordinates relative to the original screenshot size Note: We do NOT manually load the InternVL model; acompletions (via HuggingFaceLocalAdapter) will handle loading based on the provided model name. """ from __future__ import annotations import base64 import math import re from io import BytesIO from typing import Any, Dict, List, Optional, Tuple from PIL import Image import litellm from ..decorators import register_agent from .composed_grounded import ComposedGroundedConfig from ..types import AgentCapability # Regex patterns for extracting coordinates # Accept optional whitespace and optional decimal fractions _NUM = r"(\d+(?:\.\d+)?)" _POINT_PATTERN = re.compile(r"\[\[\s*" + _NUM + r"\s*,\s*" + _NUM + r"\s*\]\]") _BBOX_PATTERN = re.compile( r"\[\[\s*" + _NUM + r"\s*,\s*" + _NUM + r"\s*,\s*" + _NUM + r"\s*,\s*" + _NUM + r"\s*\]\]" ) def _extract_first_point(text: str) -> Optional[Tuple[float, float]]: """Extract the first [[x,y]] as normalized (0-1000) floats.""" m = _POINT_PATTERN.search(text) if not m: return None try: x = float(m.group(1)) y = float(m.group(2)) return x, y except Exception: return None def _extract_last_bbox(text: str) -> Optional[Tuple[float, float, float, float]]: """Extract the last [[x1,y1,x2,y2]] as normalized (0-1000) floats.""" matches = list(_BBOX_PATTERN.finditer(text)) if not matches: return None m = matches[-1] try: x1 = float(m.group(1)) y1 = float(m.group(2)) x2 = float(m.group(3)) y2 = float(m.group(4)) return x1, y1, x2, y2 except Exception: return None def _scale_norm_to_pixels(x_norm: float, y_norm: float, width: int, height: int) -> Tuple[int, int]: """Scale 0-1000 normalized coordinates to pixel coordinates for given image size.""" x_px = int(math.floor((x_norm / 1000.0) * width)) y_px = int(math.floor((y_norm / 1000.0) * height)) # Clamp to image bounds just in case x_px = max(0, min(width - 1, x_px)) y_px = max(0, min(height - 1, y_px)) return x_px, y_px @register_agent(models=r"(?i).*InternVL.*") class InternVLConfig(ComposedGroundedConfig): """InternVL agent configuration reusing ComposedGroundedConfig for steps and overriding predict_click to implement ScreenSpot InternVL grounding baseline.""" async def predict_step( self, messages: List[Dict[str, Any]], model: str, tools: Optional[List[Dict[str, Any]]] = None, max_retries: Optional[int] = None, stream: bool = False, computer_handler=None, _on_api_start=None, _on_api_end=None, _on_usage=None, _on_screenshot=None, **kwargs ) -> Dict[str, Any]: """Fallback to a self-composed model""" return await super().predict_step( messages=messages, model=f"{model}+{model}", tools=tools, max_retries=max_retries, stream=stream, computer_handler=computer_handler, _on_api_start=_on_api_start, _on_api_end=_on_api_end, _on_usage=_on_usage, _on_screenshot=_on_screenshot, **kwargs ) async def predict_click( self, model: str, image_b64: str, instruction: str, **kwargs ) -> Optional[Tuple[int, int]]: """ Predict click coordinates using InternVL via litellm.acompletion. Behavior mirrors the ScreenSpot InternVL baseline: - Prompt: "<image>\nPlease provide the bounding box coordinate of the UI element this user instruction describes: <ref>{instruction}</ref>. Answer in the format of [[x1, y1, x2, y2]]" - Parse either [[x,y]] point or [[x1,y1,x2,y2]] bbox, using bbox center if point missing - Coordinates are 0-1000 normalized; convert to pixel coordinates for the original screenshot """ try: # Decode image dimensions to scale the normalized outputs img_bytes = base64.b64decode(image_b64) image = Image.open(BytesIO(img_bytes)) width, height = image.size except Exception: # If decoding fails, proceed with a safe default size to avoid crash width, height = 1920, 1080 # Build grounding prompt exactly like the baseline grounding_prompt = ( f"Please provide the bounding box coordinate of the UI element this user instruction describes: <ref>{instruction}</ref>. " f"Answer in the format of [[x1, y1, x2, y2]]" ) # Prepare messages for LiteLLM messages = [ { "role": "user", "content": [ { "type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_b64}"}, }, {"type": "text", "text": grounding_prompt}, ], } ] # Call acompletion; HuggingFaceLocalAdapter/model handler will handle InternVL loading api_kwargs = { "model": model, "messages": messages, # Conservative generation params akin to baseline (deterministic) "max_tokens": kwargs.get("max_tokens", 256), "temperature": kwargs.get("temperature", 0.0), } response = await litellm.acompletion(**api_kwargs) output_text = (response.choices[0].message.content or "").strip() # type: ignore # print(f"InternVL output: {output_text}") # Try to parse a point first; if absent, parse bbox and take center point = _extract_first_point(output_text) if point is None: bbox = _extract_last_bbox(output_text) if bbox is None: return None x1, y1, x2, y2 = bbox cx = (x1 + x2) / 2.0 cy = (y1 + y2) / 2.0 point = (cx, cy) x_norm, y_norm = point x_px, y_px = _scale_norm_to_pixels(x_norm, y_norm, width, height) return (x_px, y_px) def get_capabilities(self) -> List[AgentCapability]: return ["click", "step"] ``` -------------------------------------------------------------------------------- /libs/python/computer/computer/providers/factory.py: -------------------------------------------------------------------------------- ```python """Factory for creating VM providers.""" import logging from typing import Dict, Optional, Any, Type, Union from .base import BaseVMProvider, VMProviderType logger = logging.getLogger(__name__) class VMProviderFactory: """Factory for creating VM providers based on provider type.""" @staticmethod def create_provider( provider_type: Union[str, VMProviderType], port: int = 7777, host: str = "localhost", bin_path: Optional[str] = None, storage: Optional[str] = None, shared_path: Optional[str] = None, image: Optional[str] = None, verbose: bool = False, ephemeral: bool = False, noVNC_port: Optional[int] = None, **kwargs, ) -> BaseVMProvider: """Create a VM provider of the specified type. Args: provider_type: Type of VM provider to create port: Port for the API server host: Hostname for the API server bin_path: Path to provider binary if needed storage: Path for persistent VM storage shared_path: Path for shared folder between host and VM image: VM image to use (for Lumier provider) verbose: Enable verbose logging ephemeral: Use ephemeral (temporary) storage noVNC_port: Specific port for noVNC interface (for Lumier provider) Returns: An instance of the requested VM provider Raises: ImportError: If the required dependencies for the provider are not installed ValueError: If the provider type is not supported """ # Convert string to enum if needed if isinstance(provider_type, str): try: provider_type = VMProviderType(provider_type.lower()) except ValueError: provider_type = VMProviderType.UNKNOWN if provider_type == VMProviderType.LUME: try: from .lume import LumeProvider, HAS_LUME if not HAS_LUME: raise ImportError( "The pylume package is required for LumeProvider. " "Please install it with 'pip install cua-computer[lume]'" ) return LumeProvider( port=port, host=host, storage=storage, verbose=verbose, ephemeral=ephemeral ) except ImportError as e: logger.error(f"Failed to import LumeProvider: {e}") raise ImportError( "The pylume package is required for LumeProvider. " "Please install it with 'pip install cua-computer[lume]'" ) from e elif provider_type == VMProviderType.LUMIER: try: from .lumier import LumierProvider, HAS_LUMIER if not HAS_LUMIER: raise ImportError( "Docker is required for LumierProvider. " "Please install Docker for Apple Silicon and Lume CLI before using this provider." ) return LumierProvider( port=port, host=host, storage=storage, shared_path=shared_path, image=image or "macos-sequoia-cua:latest", verbose=verbose, ephemeral=ephemeral, noVNC_port=noVNC_port ) except ImportError as e: logger.error(f"Failed to import LumierProvider: {e}") raise ImportError( "Docker and Lume CLI are required for LumierProvider. " "Please install Docker for Apple Silicon and run the Lume installer script." ) from e elif provider_type == VMProviderType.CLOUD: try: from .cloud import CloudProvider return CloudProvider( verbose=verbose, **kwargs, ) except ImportError as e: logger.error(f"Failed to import CloudProvider: {e}") raise ImportError( "The CloudProvider is not fully implemented yet. " "Please use LUME or LUMIER provider instead." ) from e elif provider_type == VMProviderType.WINSANDBOX: try: from .winsandbox import WinSandboxProvider, HAS_WINSANDBOX if not HAS_WINSANDBOX: raise ImportError( "pywinsandbox is required for WinSandboxProvider. " "Please install it with 'pip install -U git+https://github.com/karkason/pywinsandbox.git'" ) return WinSandboxProvider( port=port, host=host, storage=storage, verbose=verbose, ephemeral=ephemeral, **kwargs ) except ImportError as e: logger.error(f"Failed to import WinSandboxProvider: {e}") raise ImportError( "pywinsandbox is required for WinSandboxProvider. " "Please install it with 'pip install -U git+https://github.com/karkason/pywinsandbox.git'" ) from e elif provider_type == VMProviderType.DOCKER: try: from .docker import DockerProvider, HAS_DOCKER if not HAS_DOCKER: raise ImportError( "Docker is required for DockerProvider. " "Please install Docker and ensure it is running." ) return DockerProvider( port=port, host=host, storage=storage, shared_path=shared_path, image=image or "trycua/cua-ubuntu:latest", verbose=verbose, ephemeral=ephemeral, vnc_port=noVNC_port ) except ImportError as e: logger.error(f"Failed to import DockerProvider: {e}") raise ImportError( "Docker is required for DockerProvider. " "Please install Docker and ensure it is running." ) from e else: raise ValueError(f"Unsupported provider type: {provider_type}") ``` -------------------------------------------------------------------------------- /libs/python/agent/benchmarks/interactive.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Interactive Click Prediction Tool Takes screenshots and allows testing multiple models interactively. Models are loaded/unloaded one at a time to avoid memory issues. """ import asyncio import os from datetime import datetime from typing import List, Dict, Any from utils import ( ModelWrapper, take_screenshot, save_prediction_visualization, get_available_models ) async def predict_with_all_models(image, instruction: str, models) -> List[Dict[str, Any]]: """ Predict click coordinates with all models sequentially. Args: image: PIL Image to analyze instruction: Instruction text models: List of model instances Returns: List of prediction results """ predictions = [] for model in models: model_wrapper = ModelWrapper(model) print(f"\n🔄 Loading {model_wrapper.model_name}...") try: # Load model await model_wrapper.load_model() # Predict coords = await model_wrapper.predict_click(image, instruction) predictions.append({ 'model_name': model_wrapper.model_name, 'coords': coords, 'error': None }) if coords: print(f"✅ {model_wrapper.model_name}: ({coords[0]}, {coords[1]})") else: print(f"❌ {model_wrapper.model_name}: No prediction") except Exception as e: print(f"❌ {model_wrapper.model_name}: ERROR - {str(e)}") predictions.append({ 'model_name': model_wrapper.model_name, 'coords': None, 'error': str(e) }) finally: # Always unload model to free memory try: await model_wrapper.unload_model() print(f"🗑️ Unloaded {model_wrapper.model_name}") except Exception as e: print(f"⚠️ Error unloading {model_wrapper.model_name}: {e}") return predictions def print_header(): """Print the interactive tool header.""" print("=" * 60) print("🖱️ Interactive Click Prediction Tool") print("=" * 60) print("Commands:") print(" • Type an instruction to test models on last screenshot") print(" • 'screenshot' - Take a new screenshot") print(" • 'models' - List available models") print(" • 'quit' or 'exit' - Exit the tool") print("=" * 60) print("💡 Tip: Take a screenshot first, then send instructions to test models!") def print_models(models): """Print available models.""" print("\n📋 Available Models:") for i, model in enumerate(models, 1): if isinstance(model, str): print(f" {i}. {model}") else: print(f" {i}. models.{model.__class__.__name__}") async def main(): """ Main interactive loop. """ print_header() # Get available models models = get_available_models() print_models(models) # Create output directory for visualizations output_dir = "interactive_output" os.makedirs(output_dir, exist_ok=True) session_count = 0 last_screenshot = None screenshot_timestamp = None while True: try: # Get user input print(f"\n{'='*40}") user_input = input("🎯 Enter instruction (or command): ").strip() if not user_input: continue # Handle commands if user_input.lower() in ['quit', 'exit', 'q']: print("👋 Goodbye!") break elif user_input.lower() == 'models': print_models(models) continue elif user_input.lower() == 'screenshot': print("📸 Taking screenshot...") try: last_screenshot = take_screenshot() screenshot_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") screenshot_path = os.path.join(output_dir, f"screenshot_{screenshot_timestamp}.png") last_screenshot.save(screenshot_path) print(f"✅ Screenshot captured and saved to: {screenshot_path}") print(f"📝 Ready for instructions! Screenshot size: {last_screenshot.size}") except Exception as e: print(f"❌ Error taking screenshot: {e}") continue # Handle instruction input if last_screenshot is None: print("⚠️ No screenshot available! Please take a screenshot first using 'screenshot' command.") continue session_count += 1 print(f"\n🎯 Session {session_count}: '{user_input}'") print(f"📷 Using screenshot from: {screenshot_timestamp}") # Predict with all models using last screenshot print(f"\n🤖 Testing {len(models)} models on screenshot...") predictions = await predict_with_all_models(last_screenshot, user_input, models) # Display results summary print(f"\n📊 Results Summary:") print("-" * 50) for pred in predictions: if pred['coords']: print(f"✅ {pred['model_name']}: ({pred['coords'][0]}, {pred['coords'][1]})") elif pred['error']: print(f"❌ {pred['model_name']}: ERROR - {pred['error']}") else: print(f"❌ {pred['model_name']}: No prediction") # Save visualization timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") vis_filename = f"session_{session_count:03d}_{timestamp}.png" vis_path = os.path.join(output_dir, vis_filename) try: save_prediction_visualization(last_screenshot, user_input, predictions, vis_path) print(f"\n💾 Visualization saved to: {vis_path}") except Exception as e: print(f"⚠️ Error saving visualization: {e}") print(f"\n✨ Session {session_count} completed!") except KeyboardInterrupt: print("\n\n👋 Interrupted by user. Goodbye!") break except Exception as e: print(f"\n❌ Unexpected error: {e}") print("Continuing...") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n👋 Goodbye!") except Exception as e: print(f"❌ Fatal error: {e}") ``` -------------------------------------------------------------------------------- /tests/test_venv.py: -------------------------------------------------------------------------------- ```python """ Virtual Environment Testing Module This module tests the ability to execute python code in a virtual environment within Cua Containers. Required environment variables: - CUA_API_KEY: API key for Cua cloud provider - CUA_CONTAINER_NAME: Name of the container to use """ import os import asyncio import pytest from pathlib import Path import sys import traceback # Load environment variables from .env file project_root = Path(__file__).parent.parent env_file = project_root / ".env" print(f"Loading environment from: {env_file}") from dotenv import load_dotenv load_dotenv(env_file) # Add paths to sys.path if needed pythonpath = os.environ.get("PYTHONPATH", "") for path in pythonpath.split(":"): if path and path not in sys.path: sys.path.insert(0, path) # Insert at beginning to prioritize print(f"Added to sys.path: {path}") from computer import Computer, VMProviderType from computer.helpers import sandboxed, set_default_computer @pytest.fixture(scope="session") async def computer(): """Shared Computer instance for all test cases.""" # Create a remote Linux computer with Cua computer = Computer( os_type="linux", api_key=os.getenv("CUA_API_KEY"), name=str(os.getenv("CUA_CONTAINER_NAME")), provider_type=VMProviderType.CLOUD, ) # # Create a local macOS computer with Cua # computer = Computer() try: await computer.run() yield computer finally: await computer.disconnect() # Sample test cases @pytest.mark.asyncio(loop_scope="session") async def test_venv_install(computer): """Test virtual environment creation and package installation.""" # Create a test virtual environment and install requests stdout, _ = await computer.venv_install("test_env", ["requests"]) # Check that installation was successful (no major errors) assert "Successfully installed" in stdout or "Requirement already satisfied" in stdout @pytest.mark.asyncio(loop_scope="session") async def test_venv_cmd(computer): """Test executing shell commands in virtual environment.""" # Test Python version check stdout, _ = await computer.venv_cmd("test_env", "python --version") assert "Python" in stdout @pytest.mark.asyncio(loop_scope="session") async def test_venv_exec(computer): """Test executing Python functions in virtual environment.""" def test_function(message="Hello World"): import sys return f"Python {sys.version_info.major}.{sys.version_info.minor}: {message}" result = await computer.venv_exec("test_env", test_function, message="Test successful!") assert "Python" in result assert "Test successful!" in result @pytest.mark.asyncio(loop_scope="session") async def test_venv_exec_with_package(computer): """Test executing Python functions that use installed packages.""" def test_requests(): import requests return f"requests version: {requests.__version__}" result = await computer.venv_exec("test_env", test_requests) assert "requests version:" in result @pytest.mark.asyncio(loop_scope="session") async def test_venv_exec_error_handling(computer): """Test error handling in venv_exec.""" def test_error(): raise ValueError("This is a test error") with pytest.raises(ValueError, match="This is a test error"): await computer.venv_exec("test_env", test_error) @pytest.mark.asyncio(loop_scope="session") async def test_venv_exec_with_args_kwargs(computer): """Test executing Python functions with args and kwargs that return an object.""" def create_data_object(name, age, *hobbies, **metadata): return { "name": name, "age": age, "hobbies": list(hobbies), "metadata": metadata, "status": "active" } args = ["Alice", 25, "reading", "coding"] kwargs = {"location": "New York", "department": "Engineering"} result = await computer.venv_exec( "test_env", create_data_object, *args, **kwargs ) assert result["name"] == "Alice" assert result["age"] == 25 assert result["hobbies"] == ["reading", "coding"] assert result["metadata"]["location"] == "New York" assert result["status"] == "active" @pytest.mark.asyncio(loop_scope="session") async def test_venv_exec_stdout_capture(computer, capfd): """Test capturing stdout from Python functions executed in virtual environment.""" def hello_world_function(): print("Hello World!") return "Function completed" # Execute the function in the virtual environment result = await computer.venv_exec("test_env", hello_world_function) # Capture stdout and stderr out, _ = capfd.readouterr() # Assert the stdout contains our expected output assert out == "Hello World!\n\n" assert result == "Function completed" @pytest.mark.asyncio(loop_scope="session") async def test_remote_decorator(computer): """Test the remote decorator from computer.helpers module.""" # Set the computer as default for the remote decorator set_default_computer(computer) # Define a function with the remote decorator @sandboxed("test_env") def get_package_version(): import sys import platform return { "python_version": sys.version, "platform": platform.platform(), "success": True } # Call the decorated function result = await get_package_version() # Verify the function executed in the virtual environment assert "python_version" in result assert "platform" in result assert result["success"] == True @pytest.mark.asyncio(loop_scope="session") async def test_remote_decorator_with_custom_computer(computer): """Test the remote decorator with explicitly specified computer instance.""" # Define a function with the remote decorator that explicitly specifies the computer @sandboxed("test_env", computer=computer) def get_system_info(): import os import sys return { "python_version": sys.version, "environment_vars": dict(os.environ), "working_directory": os.getcwd() } # Call the decorated function result = await get_system_info() # Verify the function executed in the virtual environment assert "python_version" in result assert "environment_vars" in result assert "working_directory" in result # The virtual environment should have a different working directory # than the current test process assert result["working_directory"] != os.getcwd() if __name__ == "__main__": # Run tests directly pytest.main([__file__, "-v"]) ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/adapters/huggingfacelocal_adapter.py: -------------------------------------------------------------------------------- ```python import asyncio import functools import warnings from concurrent.futures import ThreadPoolExecutor from typing import Iterator, AsyncIterator, Dict, List, Any, Optional from litellm.types.utils import GenericStreamingChunk, ModelResponse from litellm.llms.custom_llm import CustomLLM from litellm import completion, acompletion # Try to import HuggingFace dependencies try: import torch from transformers import AutoModelForImageTextToText, AutoProcessor HF_AVAILABLE = True except ImportError: HF_AVAILABLE = False from .models import load_model as load_model_handler class HuggingFaceLocalAdapter(CustomLLM): """HuggingFace Local Adapter for running vision-language models locally.""" def __init__(self, device: str = "auto", trust_remote_code: bool = False, **kwargs): """Initialize the adapter. Args: device: Device to load model on ("auto", "cuda", "cpu", etc.) trust_remote_code: Whether to trust remote code **kwargs: Additional arguments """ super().__init__() self.device = device self.trust_remote_code = trust_remote_code # Cache for model handlers keyed by model_name self._handlers: Dict[str, Any] = {} self._executor = ThreadPoolExecutor(max_workers=1) # Single thread pool def _get_handler(self, model_name: str): """Get or create a model handler for the given model name.""" if model_name not in self._handlers: self._handlers[model_name] = load_model_handler(model_name=model_name, device=self.device, trust_remote_code=self.trust_remote_code) return self._handlers[model_name] def _convert_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Convert OpenAI format messages to HuggingFace format. Args: messages: Messages in OpenAI format Returns: Messages in HuggingFace format """ converted_messages = [] for message in messages: converted_message = { "role": message["role"], "content": [] } content = message.get("content", []) if isinstance(content, str): # Simple text content converted_message["content"].append({ "type": "text", "text": content }) elif isinstance(content, list): # Multi-modal content for item in content: if item.get("type") == "text": converted_message["content"].append({ "type": "text", "text": item.get("text", "") }) elif item.get("type") == "image_url": # Convert image_url format to image format image_url = item.get("image_url", {}).get("url", "") converted_message["content"].append({ "type": "image", "image": image_url }) converted_messages.append(converted_message) return converted_messages def _generate(self, **kwargs) -> str: """Generate response using the local HuggingFace model. Args: **kwargs: Keyword arguments containing messages and model info Returns: Generated text response """ if not HF_AVAILABLE: raise ImportError( "HuggingFace transformers dependencies not found. " "Please install with: pip install \"cua-agent[uitars-hf]\"" ) # Extract messages and model from kwargs messages = kwargs.get('messages', []) model_name = kwargs.get('model', 'ByteDance-Seed/UI-TARS-1.5-7B') max_new_tokens = kwargs.get('max_tokens', 128) # Warn about ignored kwargs ignored_kwargs = set(kwargs.keys()) - {'messages', 'model', 'max_tokens'} if ignored_kwargs: warnings.warn(f"Ignoring unsupported kwargs: {ignored_kwargs}") # Convert messages to HuggingFace format hf_messages = self._convert_messages(messages) # Delegate to model handler handler = self._get_handler(model_name) generated_text = handler.generate(hf_messages, max_new_tokens=max_new_tokens) return generated_text def completion(self, *args, **kwargs) -> ModelResponse: """Synchronous completion method. Returns: ModelResponse with generated text """ generated_text = self._generate(**kwargs) return completion( model=f"huggingface-local/{kwargs['model']}", mock_response=generated_text, ) async def acompletion(self, *args, **kwargs) -> ModelResponse: """Asynchronous completion method. Returns: ModelResponse with generated text """ # Run _generate in thread pool to avoid blocking loop = asyncio.get_event_loop() generated_text = await loop.run_in_executor( self._executor, functools.partial(self._generate, **kwargs) ) return await acompletion( model=f"huggingface-local/{kwargs['model']}", mock_response=generated_text, ) def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]: """Synchronous streaming method. Returns: Iterator of GenericStreamingChunk """ generated_text = self._generate(**kwargs) generic_streaming_chunk: GenericStreamingChunk = { "finish_reason": "stop", "index": 0, "is_finished": True, "text": generated_text, "tool_use": None, "usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0}, } yield generic_streaming_chunk async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]: """Asynchronous streaming method. Returns: AsyncIterator of GenericStreamingChunk """ # Run _generate in thread pool to avoid blocking loop = asyncio.get_event_loop() generated_text = await loop.run_in_executor( self._executor, functools.partial(self._generate, **kwargs) ) generic_streaming_chunk: GenericStreamingChunk = { "finish_reason": "stop", "index": 0, "is_finished": True, "text": generated_text, "tool_use": None, "usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0}, } yield generic_streaming_chunk ``` -------------------------------------------------------------------------------- /libs/python/som/som/util/utils.py: -------------------------------------------------------------------------------- ```python import easyocr import cv2 import matplotlib.pyplot as plt import numpy as np from PIL import Image from typing import Union, List, Tuple, Any, Optional, cast, Sequence import time import signal from contextlib import contextmanager import logging logger = logging.getLogger(__name__) class TimeoutException(Exception): pass @contextmanager def timeout(seconds): def timeout_handler(signum, frame): logger.warning(f"OCR process timed out after {seconds} seconds") raise TimeoutException("OCR processing timed out") # Register the signal handler original_handler = signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0) signal.signal(signal.SIGALRM, original_handler) # Initialize EasyOCR with optimized settings logger.info("Initializing EasyOCR with optimized settings...") reader = easyocr.Reader( ["en"], gpu=True, # Use GPU if available model_storage_directory=None, # Use default directory download_enabled=True, detector=True, # Enable text detection recognizer=True, # Enable text recognition verbose=False, # Disable verbose output quantize=True, # Enable quantization for faster inference cudnn_benchmark=True, # Enable cuDNN benchmarking ) logger.info("EasyOCR initialization complete") def check_ocr_box( image_source: Union[str, Image.Image], display_img=True, output_bb_format="xywh", goal_filtering=None, easyocr_args=None, use_paddleocr=False, ) -> Tuple[Tuple[List[str], List[Tuple[float, float, float, float]]], Optional[Any]]: """Check OCR box using EasyOCR with optimized settings. Args: image_source: Either a file path or PIL Image display_img: Whether to display the annotated image output_bb_format: Format for bounding boxes ('xywh' or 'xyxy') goal_filtering: Optional filtering of results easyocr_args: Arguments for EasyOCR use_paddleocr: Ignored (kept for backward compatibility) Returns: Tuple containing: - Tuple of (text_list, bounding_boxes) - goal_filtering value """ logger.info("Starting OCR processing...") start_time = time.time() if isinstance(image_source, str): logger.info(f"Loading image from path: {image_source}") image_source = Image.open(image_source) if image_source.mode == "RGBA": logger.info("Converting RGBA image to RGB") image_source = image_source.convert("RGB") image_np = np.array(image_source) w, h = image_source.size logger.info(f"Image size: {w}x{h}") # Default EasyOCR arguments optimized for speed default_args = { "paragraph": False, # Disable paragraph detection "text_threshold": 0.5, # Confidence threshold "link_threshold": 0.4, # Text link threshold "canvas_size": 2560, # Max image size "mag_ratio": 1.0, # Magnification ratio "slope_ths": 0.1, # Slope threshold "ycenter_ths": 0.5, # Y-center threshold "height_ths": 0.5, # Height threshold "width_ths": 0.5, # Width threshold "add_margin": 0.1, # Margin around text "min_size": 20, # Minimum text size } # Update with user-provided arguments if easyocr_args: logger.info(f"Using custom EasyOCR arguments: {easyocr_args}") default_args.update(easyocr_args) try: # Use EasyOCR with timeout logger.info("Starting EasyOCR detection with 5 second timeout...") with timeout(5): # 5 second timeout # EasyOCR's readtext returns a list of tuples, where each tuple is (bbox, text, confidence) raw_result = reader.readtext(image_np, **default_args) result = cast(Sequence[Tuple[List[Tuple[float, float]], str, float]], raw_result) coord = [item[0] for item in result] # item[0] is the bbox coordinates text = [item[1] for item in result] # item[1] is the text content logger.info(f"OCR completed successfully. Found {len(text)} text regions") logger.info(f"Detected text: {text}") except TimeoutException: logger.error("OCR processing timed out after 5 seconds") coord = [] text = [] except Exception as e: logger.error(f"OCR processing failed with error: {str(e)}") coord = [] text = [] processing_time = time.time() - start_time logger.info(f"Total OCR processing time: {processing_time:.2f} seconds") if display_img: logger.info("Creating visualization of OCR results...") opencv_img = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) bb = [] for item in coord: x, y, a, b = get_xywh(item) bb.append((x, y, a, b)) # Convert float coordinates to integers for cv2.rectangle x_val = cast(float, x) y_val = cast(float, y) a_val = cast(float, a) b_val = cast(float, b) x_int, y_int = int(x_val), int(y_val) a_int, b_int = int(a_val), int(b_val) cv2.rectangle( opencv_img, (x_int, y_int), (x_int + a_int, y_int + b_int), (0, 255, 0), 2 ) plt.imshow(cv2.cvtColor(opencv_img, cv2.COLOR_BGR2RGB)) else: if output_bb_format == "xywh": bb = [get_xywh(item) for item in coord] elif output_bb_format == "xyxy": bb = [get_xyxy(item) for item in coord] # Cast the bounding boxes to the expected type bb = cast(List[Tuple[float, float, float, float]], bb) logger.info("OCR processing complete") return (text, bb), goal_filtering def get_xywh(box): """ Convert a bounding box to xywh format (x, y, width, height). Args: box: Bounding box coordinates (various formats supported) Returns: Tuple of (x, y, width, height) """ # Handle different input formats if len(box) == 4: # If already in xywh format or xyxy format if isinstance(box[0], (int, float)) and isinstance(box[2], (int, float)): if box[2] < box[0] or box[3] < box[1]: # Already xyxy format, convert to xywh x1, y1, x2, y2 = box return x1, y1, x2 - x1, y2 - y1 else: # Already in xywh format return box elif len(box) == 2: # Format like [[x1,y1],[x2,y2]] from some OCR engines (x1, y1), (x2, y2) = box return x1, y1, x2 - x1, y2 - y1 # Default case - try to convert assuming it's a list of points x_coords = [p[0] for p in box] y_coords = [p[1] for p in box] x1, y1 = min(x_coords), min(y_coords) width, height = max(x_coords) - x1, max(y_coords) - y1 return x1, y1, width, height def get_xyxy(box): """ Convert a bounding box to xyxy format (x1, y1, x2, y2). Args: box: Bounding box coordinates (various formats supported) Returns: Tuple of (x1, y1, x2, y2) """ # Get xywh first, then convert to xyxy x, y, w, h = get_xywh(box) return x, y, x + w, y + h ``` -------------------------------------------------------------------------------- /libs/python/agent/benchmarks/ss-v2.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ ScreenSpot-v2 Benchmark Script Evaluates models on the ScreenSpot-v2 dataset for click prediction accuracy. Supports both ComputerAgent model strings and custom model classes. """ import argparse import asyncio import random import statistics import time from typing import Optional from datasets import load_dataset from tqdm import tqdm from utils import ( ModelWrapper, is_click_in_bbox, save_results_to_markdown, save_visualizations, get_available_models, get_gpu_memory ) async def evaluate_model(model_wrapper: ModelWrapper, samples, max_samples: Optional[int] = None) -> dict: """ Evaluate a model on any iterable of samples. Args: model_wrapper: ModelWrapper instance samples: Iterable of dicts with keys: image, bbox, instruction max_samples: Maximum number of samples to evaluate (None for all) Returns: Dictionary with evaluation results """ print(f"\nEvaluating model: {model_wrapper.model_name}") # Load model await model_wrapper.load_model() # Convert to list if needed and limit samples if hasattr(samples, '__len__'): total_samples = len(samples) if max_samples is not None: total_samples = min(max_samples, total_samples) sample_list = list(samples)[:total_samples] else: # For iterators, take max_samples or all sample_list = list(samples) if max_samples is not None: sample_list = sample_list[:max_samples] total_samples = len(sample_list) correct_predictions = 0 error_predictions = 0 results = [] for i, sample in enumerate(tqdm(sample_list, desc=f"Evaluating {model_wrapper.model_name}")): # Extract required data (only these 3 keys matter) image = sample['image'] instruction = sample['instruction'] bbox = sample['bbox'] # [x1, y1, x2, y2] # Predict click coordinates with timing start_time = time.time() click_coords = await model_wrapper.predict_click(image, instruction) prediction_time = time.time() - start_time # Check if prediction is correct is_correct = is_click_in_bbox(click_coords, bbox) if is_correct: correct_predictions += 1 results.append({ 'sample_idx': i, 'instruction': instruction, 'bbox': bbox, 'predicted_coords': click_coords, 'is_correct': is_correct, 'failed': False, 'prediction_time': prediction_time }) # Unload model await model_wrapper.unload_model() # Calculate metrics accuracy = correct_predictions / total_samples if total_samples > 0 else 0.0 error_rate = error_predictions / total_samples if total_samples > 0 else 0.0 # Calculate timing statistics successful_times = [r['prediction_time'] for r in results if not r['failed']] avg_prediction_time = sum(successful_times) / len(successful_times) if successful_times else 0.0 median_prediction_time = statistics.median(successful_times) if successful_times else 0.0 min_prediction_time = min(successful_times) if successful_times else 0.0 max_prediction_time = max(successful_times) if successful_times else 0.0 # Get VRAM statistics vram_stats = model_wrapper.get_vram_stats() return { 'model_name': model_wrapper.model_name, 'total_samples': total_samples, 'correct_predictions': correct_predictions, 'failed_predictions': error_predictions, 'accuracy': accuracy, 'failure_rate': error_rate, 'avg_prediction_time': avg_prediction_time, 'median_prediction_time': median_prediction_time, 'min_prediction_time': min_prediction_time, 'max_prediction_time': max_prediction_time, 'vram_max_mb': vram_stats['max_mb'], 'vram_avg_mb': vram_stats['avg_mb'], 'results': results } async def main(): """ Main function to run the benchmark. """ # Parse command line arguments parser = argparse.ArgumentParser(description='ScreenSpot-v2 Benchmark Script') parser.add_argument('--samples', type=int, default=500, help='Number of samples to evaluate (default: 500)') parser.add_argument('--seed', type=int, default=42, help='Random seed for shuffling (default: 42)') args = parser.parse_args() # Set random seed random.seed(args.seed) # Load dataset print("Loading ScreenSpot-v2 dataset...") ds = load_dataset("lmms-lab/ScreenSpot-v2") dataset = ds['train'] # type: ignore # Convert to simple list of dicts with only required keys samples = [] for item in dataset: # Convert dataset item to dict if needed item_dict = dict(item) if hasattr(item, 'keys') else item # Convert ScreenSpot-v2 bbox format [x, y, w, h] to [x1, y1, x2, y2] bbox_xywh = item_dict['bbox'] # type: ignore x, y, w, h = bbox_xywh bbox_xyxy = [x, y, x + w, y + h] samples.append({ 'image': item_dict['image'], # type: ignore 'instruction': item_dict['instruction'], # type: ignore 'bbox': bbox_xyxy }) print(f"Dataset loaded: {len(samples)} samples") # Shuffle samples with seed random.shuffle(samples) print(f"Samples shuffled with seed {args.seed}") # Get available models models = get_available_models() # Evaluation settings max_samples = args.samples # Use command line argument # Run evaluations all_results = [] for model in models: model_wrapper = ModelWrapper(model) result = await evaluate_model(model_wrapper, samples, max_samples) all_results.append(result) # Print summary print(f"\n{result['model_name']} Results:") print(f" Accuracy: {result['accuracy']*100:.2f}%") print(f" Correct: {result['correct_predictions']}/{result['total_samples']}") print(f" Errors: {result['failed_predictions']}") print(f" Error Rate: {result['failure_rate']*100:.2f}%") print(f" Avg Time: {result['avg_prediction_time']:.2f}s") print(f" Median Time: {result['median_prediction_time']:.2f}s") print(f" Time Range: {result['min_prediction_time']:.2f}s - {result['max_prediction_time']:.2f}s") print(f" VRAM Max: {result['vram_max_mb']:.1f}MB") print(f" VRAM Avg: {result['vram_avg_mb']:.1f}MB") # Print GPU memory info gpu_memory = get_gpu_memory() if gpu_memory and gpu_memory[0] > 0: print(f" GPU Free Memory: {gpu_memory[0]:.1f}MB") # Save results if all_results: save_results_to_markdown(all_results, "screenspot_v2_results.md", title="ScreenSpot-v2 Benchmark Results") save_visualizations(all_results, samples) print("\nBenchmark completed successfully!") else: print("\nNo successful evaluations completed.") if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /blog/ubuntu-docker-support.md: -------------------------------------------------------------------------------- ```markdown # Ubuntu Docker Support in Cua with Kasm *Published Aug 26, 2025 by Francesco Bonacci* Today we’re shipping **Ubuntu Docker support** in Cua. You get a full Linux desktop inside a Docker container, viewable right in your browser—no VM spin-up, no extra clients. It behaves the same on macOS, Windows, and Linux. <img src="./assets/docker-ubuntu-support.png" alt="Cua + KasmVNC Ubuntu container desktop"> ## Why we did this If you build automation or RL workflows with Cua, you’ve probably run into the usual platform walls: macOS VMs (via Lume) are Apple-Silicon only; Windows Sandbox needs Pro/Enterprise; giving agents your host desktop is… exciting, but risky; and little OS quirks make “build once, run anywhere” harder than it should be. We wanted something lightweight, isolated, and identical across machines. So we put a desktop in a container. ## Why we didn’t use QEMU/KVM Short answer: **portability, startup time, and ops friction.** * **Runs everywhere, no hypervisor drama.** KVM needs Linux; Hyper-V/Virtualization.Framework setups vary by host and policy. Docker is ubiquitous across macOS/Windows/Linux and allowed in most CI runners—so your GUI env actually runs where your team works. * **Faster boot & smaller footprints.** Containers cold-start in seconds and images are GB-scale; VMs tend to be minutes and tens of GB. That matters for parallel agents, CI, and local iteration. * **Lower ops overhead.** No nested virt, kernel modules, or privileged host tweaks that many orgs (and cloud runners) block. Pull → run → browser. * **Same image, everywhere.** One Docker image gives you an identical desktop on every dev laptop and in CI. * **Web-first access out of the box.** KasmVNC serves the desktop over HTTP—no extra VNC/RDP clients or SPICE config. **When we *do* reach for QEMU/KVM:** * You need **true OS isolation** or to run **non-Linux** guests. * You want **kernel-level features** or **device/GPU passthrough** (VFIO). * You’re optimizing for **hardware realism** over startup speed and density. For this release, the goal was a **cross-platform Linux desktop that feels instant and identical** across local dev and CI. Containers + KasmVNC hit that sweet spot. ## What we built Under the hood it’s **KasmVNC + Ubuntu 22.04 (Xfce) in Docker**, pre-configured for computer-use automation. You get a proper GUI desktop served over HTTP (no VNC/RDP client), accessible from any modern browser. Cua’s Computer server boots automatically so your agents can connect immediately. ### How it works (at a glance) ``` Your System └─ Docker Container └─ Xfce Desktop + KasmVNC → open in your browser ``` --- ## Quick start 1. **Install Docker** — Docker Desktop (macOS/Windows) or Docker Engine (Linux). 2. **Pull or build the image** ```bash # Pull (recommended) docker pull --platform=linux/amd64 trycua/cua-ubuntu:latest # Or build locally cd libs/kasm docker build -t cua-ubuntu:latest . ``` 3. **Run with Cua’s Computer SDK** ```python from computer import Computer computer = Computer( os_type="linux", provider_type="docker", image="trycua/cua-ubuntu:latest", name="my-automation-container" ) await computer.run() ``` ### Make an agent that drives this desktop ```python from agent import ComputerAgent # assumes `computer` is the instance created above agent = ComputerAgent("openrouter/z-ai/glm-4.5v", tools=[computer]) async for _ in agent.run("Click on the search bar and type 'hello world'"): pass ``` > Use any VLM with tool use; just make sure your OpenRouter creds are set. By default you land on **Ubuntu 22.04 + Xfce** with a browser and desktop basics, the **Computer server** is running, the **web viewer** is available at `http://localhost:8006`, and common automation tools are preinstalled. --- ## What’s inside (in plain English) A tidy Linux desktop with web access through **KasmVNC**, Python 3.11 and dev tools, plus utilities you’ll actually use for automation—`wmctrl` for windows, `xclip` for clipboard, `ffmpeg` for media, screenshot helpers, and so on. It starts as a **non-root `kasm-user`**, lives in an **isolated filesystem** (unless you mount volumes), and ships with **SSL off for local dev** so you terminate TLS upstream when you deploy. --- ## How it compares | Feature | KasmVNC Docker | Lume (macOS VM) | Windows Sandbox | | ---------------- | --------------------- | --------------------- | ---------------------- | | Platform support | macOS, Windows, Linux | macOS (Apple Silicon) | Windows Pro/Enterprise | | Resource usage | Low (container) | Medium (full VM) | Medium (full VM) | | Setup time | \~30s | 2–5 min | 1–2 min | | GUI desktop | Linux | macOS | Windows | | Web access | Browser (no client) | Typically VNC client | Typically RDP client | | Consistency | Same everywhere | Hardware-dependent | OS-dependent | **Use KasmVNC Docker when…** you want the **same GUI env across devs/CI/platforms**, you’re doing **RL or end-to-end GUI tests**, or you need **many isolated desktops on one machine**. **Use alternatives when…** you need native **macOS** (→ Lume) or native **Windows** (→ Windows Sandbox). --- ## Using the Agent Framework (parallel example) A compact pattern for running multiple desktops and agents side-by-side: ```python import asyncio from computer import Computer from agent import ComputerAgent # Create multiple computer instances (each gets its own desktop) computers = [] for i in range(3): c = Computer( os_type="linux", provider_type="docker", image="trycua/cua-ubuntu:latest", name=f"parallel-desktop-{i}" ) computers.append(c) await c.run() # Pair each desktop with a task tasks = [ "open github and search for 'trycua/cua'", "open a text editor and write 'hello world'", "open the browser and go to google.com", ] agents = [ ComputerAgent(model="openrouter/z-ai/glm-4.5v", tools=[c]) for c in computers ] async def run_agent(agent, task): async for _ in agent.run(task): pass await asyncio.gather(*[run_agent(a, t) for a, t in zip(agents, tasks)]) ``` --- ## What’s next We’re polishing a **CLI to push/scale these containers on Cua Cloud**, exploring **GPU acceleration** for in-container inference, and publishing **prebuilt images** for Playwright, Selenium, and friends. --- ## Try it ```python from computer import Computer computer = Computer(os_type="linux", provider_type="docker", image="trycua/cua-ubuntu:latest") await computer.run() ``` --- ## Links * **Docker Provider Docs:** [https://docs.trycua.com/computers/docker](https://docs.trycua.com/computers/docker) * **KasmVNC:** [https://github.com/kasmtech/KasmVNC](https://github.com/kasmtech/KasmVNC) * **Container Source:** [https://github.com/trycua/cua/tree/main/libs/kasm](https://github.com/trycua/cua/tree/main/libs/kasm) * **Computer SDK:** [https://docs.trycua.com/docs/computer-sdk/computers](https://docs.trycua.com/docs/computer-sdk/computers) * **Discord:** [https://discord.gg/cua-ai](https://discord.gg/cua-ai) Questions or weird edge cases? Ping us on Discord—we’re curious to see what you build. ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/callbacks/telemetry.py: -------------------------------------------------------------------------------- ```python """ Telemetry callback handler for Computer-Use Agent (cua-agent) """ import time import uuid from typing import List, Dict, Any, Optional, Union from .base import AsyncCallbackHandler from core.telemetry import ( record_event, is_telemetry_enabled, ) import platform SYSTEM_INFO = { "os": platform.system().lower(), "os_version": platform.release(), "python_version": platform.python_version(), } class TelemetryCallback(AsyncCallbackHandler): """ Telemetry callback handler for Computer-Use Agent (cua-agent) Tracks agent usage, performance metrics, and optionally trajectory data. """ def __init__( self, agent, log_trajectory: bool = False ): """ Initialize telemetry callback. Args: agent: The ComputerAgent instance log_trajectory: Whether to log full trajectory items (opt-in) """ self.agent = agent self.log_trajectory = log_trajectory # Generate session/run IDs self.session_id = str(uuid.uuid4()) self.run_id = None # Track timing and metrics self.run_start_time = None self.step_count = 0 self.step_start_time = None self.total_usage = { "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "response_cost": 0.0 } # Record agent initialization if is_telemetry_enabled(): self._record_agent_initialization() def _record_agent_initialization(self) -> None: """Record agent type/model and session initialization.""" agent_info = { "session_id": self.session_id, "agent_type": self.agent.agent_loop.__name__ if hasattr(self.agent, 'agent_loop') else 'unknown', "model": getattr(self.agent, 'model', 'unknown'), **SYSTEM_INFO } record_event("agent_session_start", agent_info) async def on_run_start(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]]) -> None: """Called at the start of an agent run loop.""" if not is_telemetry_enabled(): return self.run_id = str(uuid.uuid4()) self.run_start_time = time.time() self.step_count = 0 # Calculate input context size input_context_size = self._calculate_context_size(old_items) run_data = { "session_id": self.session_id, "run_id": self.run_id, "start_time": self.run_start_time, "input_context_size": input_context_size, "num_existing_messages": len(old_items) } # Log trajectory if opted in if self.log_trajectory: trajectory = self._extract_trajectory(old_items) if trajectory: run_data["uploaded_trajectory"] = trajectory record_event("agent_run_start", run_data) async def on_run_end(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]], new_items: List[Dict[str, Any]]) -> None: """Called at the end of an agent run loop.""" if not is_telemetry_enabled() or not self.run_start_time: return run_duration = time.time() - self.run_start_time run_data = { "session_id": self.session_id, "run_id": self.run_id, "end_time": time.time(), "duration_seconds": run_duration, "num_steps": self.step_count, "total_usage": self.total_usage.copy() } # Log trajectory if opted in if self.log_trajectory: trajectory = self._extract_trajectory(new_items) if trajectory: run_data["uploaded_trajectory"] = trajectory record_event("agent_run_end", run_data) async def on_usage(self, usage: Dict[str, Any]) -> None: """Called when usage information is received.""" if not is_telemetry_enabled(): return # Accumulate usage stats self.total_usage["prompt_tokens"] += usage.get("prompt_tokens", 0) self.total_usage["completion_tokens"] += usage.get("completion_tokens", 0) self.total_usage["total_tokens"] += usage.get("total_tokens", 0) self.total_usage["response_cost"] += usage.get("response_cost", 0.0) # Record individual usage event usage_data = { "session_id": self.session_id, "run_id": self.run_id, "step": self.step_count, **usage } record_event("agent_usage", usage_data) async def on_responses(self, kwargs: Dict[str, Any], responses: Dict[str, Any]) -> None: """Called when responses are received.""" if not is_telemetry_enabled(): return self.step_count += 1 step_duration = None if self.step_start_time: step_duration = time.time() - self.step_start_time self.step_start_time = time.time() step_data = { "session_id": self.session_id, "run_id": self.run_id, "step": self.step_count, "timestamp": self.step_start_time } if step_duration is not None: step_data["duration_seconds"] = step_duration record_event("agent_step", step_data) def _calculate_context_size(self, items: List[Dict[str, Any]]) -> int: """Calculate approximate context size in tokens/characters.""" total_size = 0 for item in items: if item.get("type") == "message" and "content" in item: content = item["content"] if isinstance(content, str): total_size += len(content) elif isinstance(content, list): for part in content: if isinstance(part, dict) and "text" in part: total_size += len(part["text"]) elif "content" in item and isinstance(item["content"], str): total_size += len(item["content"]) return total_size def _extract_trajectory(self, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Extract trajectory items that should be logged.""" trajectory = [] for item in items: # Include user messages, assistant messages, reasoning, computer calls, and computer outputs if ( item.get("role") == "user" or # User inputs (item.get("type") == "message" and item.get("role") == "assistant") or # Model outputs item.get("type") == "reasoning" or # Reasoning traces item.get("type") == "computer_call" or # Computer actions item.get("type") == "computer_call_output" # Computer outputs ): # Create a copy of the item with timestamp trajectory_item = item.copy() trajectory_item["logged_at"] = time.time() trajectory.append(trajectory_item) return trajectory ``` -------------------------------------------------------------------------------- /libs/python/computer-server/computer_server/handlers/base.py: -------------------------------------------------------------------------------- ```python from abc import ABC, abstractmethod from typing import Optional, Dict, Any, List, Tuple class BaseAccessibilityHandler(ABC): """Abstract base class for OS-specific accessibility handlers.""" @abstractmethod async def get_accessibility_tree(self) -> Dict[str, Any]: """Get the accessibility tree of the current window.""" pass @abstractmethod async def find_element(self, role: Optional[str] = None, title: Optional[str] = None, value: Optional[str] = None) -> Dict[str, Any]: """Find an element in the accessibility tree by criteria.""" pass class BaseFileHandler(ABC): """Abstract base class for OS-specific file handlers.""" @abstractmethod async def file_exists(self, path: str) -> Dict[str, Any]: """Check if a file exists at the specified path.""" pass @abstractmethod async def directory_exists(self, path: str) -> Dict[str, Any]: """Check if a directory exists at the specified path.""" pass @abstractmethod async def list_dir(self, path: str) -> Dict[str, Any]: """List the contents of a directory.""" pass @abstractmethod async def read_text(self, path: str) -> Dict[str, Any]: """Read the text contents of a file.""" pass @abstractmethod async def write_text(self, path: str, content: str) -> Dict[str, Any]: """Write text content to a file.""" pass @abstractmethod async def write_bytes(self, path: str, content_b64: str) -> Dict[str, Any]: """Write binary content to a file. Sent over the websocket as a base64 string.""" pass @abstractmethod async def delete_file(self, path: str) -> Dict[str, Any]: """Delete a file.""" pass @abstractmethod async def create_dir(self, path: str) -> Dict[str, Any]: """Create a directory.""" pass @abstractmethod async def delete_dir(self, path: str) -> Dict[str, Any]: """Delete a directory.""" pass @abstractmethod async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> Dict[str, Any]: """Read the binary contents of a file. Sent over the websocket as a base64 string. Args: path: Path to the file offset: Byte offset to start reading from (default: 0) length: Number of bytes to read (default: None for entire file) """ pass @abstractmethod async def get_file_size(self, path: str) -> Dict[str, Any]: """Get the size of a file in bytes.""" pass class BaseAutomationHandler(ABC): """Abstract base class for OS-specific automation handlers. Categories: - Mouse Actions: Methods for mouse control - Keyboard Actions: Methods for keyboard input - Scrolling Actions: Methods for scrolling - Screen Actions: Methods for screen interaction - Clipboard Actions: Methods for clipboard operations """ # Mouse Actions @abstractmethod async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]: """Perform a mouse down at the current or specified position.""" pass @abstractmethod async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]: """Perform a mouse up at the current or specified position.""" pass @abstractmethod async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: """Perform a left click at the current or specified position.""" pass @abstractmethod async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: """Perform a right click at the current or specified position.""" pass @abstractmethod async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: """Perform a double click at the current or specified position.""" pass @abstractmethod async def move_cursor(self, x: int, y: int) -> Dict[str, Any]: """Move the cursor to the specified position.""" pass @abstractmethod async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]: """Drag the cursor from current position to specified coordinates. Args: x: The x coordinate to drag to y: The y coordinate to drag to button: The mouse button to use ('left', 'middle', 'right') duration: How long the drag should take in seconds """ pass @abstractmethod async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]: """Drag the cursor from current position to specified coordinates. Args: path: A list of tuples of x and y coordinates to drag to button: The mouse button to use ('left', 'middle', 'right') duration: How long the drag should take in seconds """ pass # Keyboard Actions @abstractmethod async def key_down(self, key: str) -> Dict[str, Any]: """Press and hold the specified key.""" pass @abstractmethod async def key_up(self, key: str) -> Dict[str, Any]: """Release the specified key.""" pass @abstractmethod async def type_text(self, text: str) -> Dict[str, Any]: """Type the specified text.""" pass @abstractmethod async def press_key(self, key: str) -> Dict[str, Any]: """Press the specified key.""" pass @abstractmethod async def hotkey(self, keys: List[str]) -> Dict[str, Any]: """Press a combination of keys together.""" pass # Scrolling Actions @abstractmethod async def scroll(self, x: int, y: int) -> Dict[str, Any]: """Scroll the specified amount.""" pass @abstractmethod async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]: """Scroll down by the specified number of clicks.""" pass @abstractmethod async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]: """Scroll up by the specified number of clicks.""" pass # Screen Actions @abstractmethod async def screenshot(self) -> Dict[str, Any]: """Take a screenshot and return base64 encoded image data.""" pass @abstractmethod async def get_screen_size(self) -> Dict[str, Any]: """Get the screen size of the VM.""" pass @abstractmethod async def get_cursor_position(self) -> Dict[str, Any]: """Get the current cursor position.""" pass # Clipboard Actions @abstractmethod async def copy_to_clipboard(self) -> Dict[str, Any]: """Get the current clipboard content.""" pass @abstractmethod async def set_clipboard(self, text: str) -> Dict[str, Any]: """Set the clipboard content.""" pass @abstractmethod async def run_command(self, command: str) -> Dict[str, Any]: """Run a command and return the output.""" pass ``` -------------------------------------------------------------------------------- /Development.md: -------------------------------------------------------------------------------- ```markdown # Getting Started ## Project Structure The project is organized as a monorepo with these main packages: - `libs/core/` - Base package with telemetry support - `libs/computer/` - Computer-use interface (CUI) library - `libs/agent/` - AI agent library with multi-provider support - `libs/som/` - Set-of-Mark parser - `libs/computer-server/` - Server component for VM - `libs/lume/` - Lume CLI - `libs/pylume/` - Python bindings for Lume Each package has its own virtual environment and dependencies, managed through PDM. ## Local Development Setup 1. Install Lume CLI: ```bash /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/trycua/cua/main/libs/lume/scripts/install.sh)" ``` 2. Clone the repository: ```bash git clone https://github.com/trycua/cua.git cd cua ``` 3. Create a `.env.local` file in the root directory with your API keys: ```bash # Required for Anthropic provider ANTHROPIC_API_KEY=your_anthropic_key_here # Required for OpenAI provider OPENAI_API_KEY=your_openai_key_here ``` 4. Open the workspace in VSCode or Cursor: ```bash # For Cua Python development code .vscode/py.code-workspace # For Lume (Swift) development code .vscode/lume.code-workspace ``` Using the workspace file is strongly recommended as it: - Sets up correct Python environments for each package - Configures proper import paths - Enables debugging configurations - Maintains consistent settings across packages ## Lume Development Refer to the [Lume README](./libs/lume/Development.md) for instructions on how to develop the Lume CLI. ## Python Development There are two ways to install Lume: ### Run the build script Run the build script to set up all packages: ```bash ./scripts/build.sh ``` The build script creates a shared virtual environment for all packages. The workspace configuration automatically handles import paths with the correct Python path settings. This will: - Create a virtual environment for the project - Install all packages in development mode - Set up the correct Python path - Install development tools ### Install with PDM If PDM is not already installed, you can follow the installation instructions [here](https://pdm-project.org/en/latest/#installation). To install with PDM, simply run: ```console pdm install -G:all ``` This installs all the dependencies for development, testing, and building the docs. If you'd only like development dependencies, you can run: ```console pdm install -d ``` ## Running Examples The Python workspace includes launch configurations for all packages: - "Run Computer Examples" - Runs computer examples - "Run Agent Examples" - Runs agent examples - "SOM" configurations - Various settings for running SOM To run examples from VSCode / Cursor: 1. Press F5 or use the Run/Debug view 2. Select the desired configuration The workspace also includes compound launch configurations: - "Run Computer Examples + Server" - Runs both the Computer Examples and Server simultaneously ## Docker Development Environment As an alternative to installing directly on your host machine, you can use Docker for development. This approach has several advantages: ### Prerequisites - Docker installed on your machine - Lume server running on your host (port 7777): `lume serve` ### Setup and Usage 1. Build the development Docker image: ```bash ./scripts/run-docker-dev.sh build ``` 2. Run an example in the container: ```bash ./scripts/run-docker-dev.sh run computer_examples.py ``` 3. Get an interactive shell in the container: ```bash ./scripts/run-docker-dev.sh run --interactive ``` 4. Stop any running containers: ```bash ./scripts/run-docker-dev.sh stop ``` ### How it Works The Docker development environment: - Installs all required Python dependencies in the container - Mounts your source code from the host at runtime - Automatically configures the connection to use host.docker.internal:7777 for accessing the Lume server on your host machine - Preserves your code changes without requiring rebuilds (source code is mounted as a volume) > **Note**: The Docker container doesn't include the macOS-specific Lume executable. Instead, it connects to the Lume server running on your host machine via host.docker.internal:7777. Make sure to start the Lume server on your host before running examples in the container. ## Cleanup and Reset If you need to clean up the environment (non-docker) and start fresh: ```bash ./scripts/cleanup.sh ``` This will: - Remove all virtual environments - Clean Python cache files and directories - Remove build artifacts - Clean PDM-related files - Reset environment configurations ## Code Formatting Standards The cua project follows strict code formatting standards to ensure consistency across all packages. ### Python Code Formatting #### Tools The project uses the following tools for code formatting and linting: - **[Black](https://black.readthedocs.io/)**: Code formatter - **[Ruff](https://beta.ruff.rs/docs/)**: Fast linter and formatter - **[MyPy](https://mypy.readthedocs.io/)**: Static type checker These tools are automatically installed when you set up the development environment using the `./scripts/build.sh` script. #### Configuration The formatting configuration is defined in the root `pyproject.toml` file: ```toml [tool.black] line-length = 100 target-version = ["py311"] [tool.ruff] line-length = 100 target-version = "py311" select = ["E", "F", "B", "I"] fix = true [tool.ruff.format] docstring-code-format = true [tool.mypy] strict = true python_version = "3.11" ignore_missing_imports = true disallow_untyped_defs = true check_untyped_defs = true warn_return_any = true show_error_codes = true warn_unused_ignores = false ``` #### Key Formatting Rules - **Line Length**: Maximum of 100 characters - **Python Version**: Code should be compatible with Python 3.11+ - **Imports**: Automatically sorted (using Ruff's "I" rule) - **Type Hints**: Required for all function definitions (strict mypy mode) #### IDE Integration The repository includes VSCode workspace configurations that enable automatic formatting. When you open the workspace files (as recommended in the setup instructions), the correct formatting settings are automatically applied. Python-specific settings in the workspace files: ```json "[python]": { "editor.formatOnSave": true, "editor.defaultFormatter": "ms-python.black-formatter", "editor.codeActionsOnSave": { "source.organizeImports": "explicit" } } ``` Recommended VS Code extensions: - Black Formatter (ms-python.black-formatter) - Ruff (charliermarsh.ruff) - Pylance (ms-python.vscode-pylance) #### Manual Formatting To manually format code: ```bash # Format all Python files using Black pdm run black . # Run Ruff linter with auto-fix pdm run ruff check --fix . # Run type checking with MyPy pdm run mypy . ``` #### Pre-commit Validation Before submitting a pull request, ensure your code passes all formatting checks: ```bash # Run all checks pdm run black --check . pdm run ruff check . pdm run mypy . ``` ### Swift Code (Lume) For Swift code in the `libs/lume` directory: - Follow the [Swift API Design Guidelines](https://www.swift.org/documentation/api-design-guidelines/) - Use SwiftFormat for consistent formatting - Code will be automatically formatted on save when using the lume workspace ```