This is page 6 of 16. Use http://codebase.md/trycua/cua?page={x} to view the full context. # Directory Structure ``` ├── .all-contributorsrc ├── .cursorignore ├── .devcontainer │ ├── devcontainer.json │ ├── post-install.sh │ └── README.md ├── .dockerignore ├── .gitattributes ├── .github │ ├── FUNDING.yml │ ├── scripts │ │ ├── get_pyproject_version.py │ │ └── tests │ │ ├── __init__.py │ │ ├── README.md │ │ └── test_get_pyproject_version.py │ └── workflows │ ├── ci-lume.yml │ ├── docker-publish-kasm.yml │ ├── docker-publish-xfce.yml │ ├── docker-reusable-publish.yml │ ├── npm-publish-computer.yml │ ├── npm-publish-core.yml │ ├── publish-lume.yml │ ├── pypi-publish-agent.yml │ ├── pypi-publish-computer-server.yml │ ├── pypi-publish-computer.yml │ ├── pypi-publish-core.yml │ ├── pypi-publish-mcp-server.yml │ ├── pypi-publish-pylume.yml │ ├── pypi-publish-som.yml │ ├── pypi-reusable-publish.yml │ └── test-validation-script.yml ├── .gitignore ├── .vscode │ ├── docs.code-workspace │ ├── launch.json │ ├── libs-ts.code-workspace │ ├── lume.code-workspace │ ├── lumier.code-workspace │ ├── py.code-workspace │ └── settings.json ├── blog │ ├── app-use.md │ ├── assets │ │ ├── composite-agents.png │ │ ├── docker-ubuntu-support.png │ │ ├── hack-booth.png │ │ ├── hack-closing-ceremony.jpg │ │ ├── hack-cua-ollama-hud.jpeg │ │ ├── hack-leaderboard.png │ │ ├── hack-the-north.png │ │ ├── hack-winners.jpeg │ │ ├── hack-workshop.jpeg │ │ ├── hud-agent-evals.png │ │ └── trajectory-viewer.jpeg │ ├── bringing-computer-use-to-the-web.md │ ├── build-your-own-operator-on-macos-1.md │ ├── build-your-own-operator-on-macos-2.md │ ├── composite-agents.md │ ├── cua-hackathon.md │ ├── hack-the-north.md │ ├── hud-agent-evals.md │ ├── human-in-the-loop.md │ ├── introducing-cua-cloud-containers.md │ ├── lume-to-containerization.md │ ├── sandboxed-python-execution.md │ ├── training-computer-use-models-trajectories-1.md │ ├── trajectory-viewer.md │ ├── ubuntu-docker-support.md │ └── windows-sandbox.md ├── CONTRIBUTING.md ├── Development.md ├── Dockerfile ├── docs │ ├── .gitignore │ ├── .prettierrc │ ├── content │ │ └── docs │ │ ├── agent-sdk │ │ │ ├── agent-loops.mdx │ │ │ ├── benchmarks │ │ │ │ ├── index.mdx │ │ │ │ ├── interactive.mdx │ │ │ │ ├── introduction.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── osworld-verified.mdx │ │ │ │ ├── screenspot-pro.mdx │ │ │ │ └── screenspot-v2.mdx │ │ │ ├── callbacks │ │ │ │ ├── agent-lifecycle.mdx │ │ │ │ ├── cost-saving.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── logging.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── pii-anonymization.mdx │ │ │ │ └── trajectories.mdx │ │ │ ├── chat-history.mdx │ │ │ ├── custom-computer-handlers.mdx │ │ │ ├── custom-tools.mdx │ │ │ ├── customizing-computeragent.mdx │ │ │ ├── integrations │ │ │ │ ├── hud.mdx │ │ │ │ └── meta.json │ │ │ ├── message-format.mdx │ │ │ ├── meta.json │ │ │ ├── migration-guide.mdx │ │ │ ├── prompt-caching.mdx │ │ │ ├── supported-agents │ │ │ │ ├── composed-agents.mdx │ │ │ │ ├── computer-use-agents.mdx │ │ │ │ ├── grounding-models.mdx │ │ │ │ ├── human-in-the-loop.mdx │ │ │ │ └── meta.json │ │ │ ├── supported-model-providers │ │ │ │ ├── index.mdx │ │ │ │ └── local-models.mdx │ │ │ └── usage-tracking.mdx │ │ ├── computer-sdk │ │ │ ├── cloud-vm-management.mdx │ │ │ ├── commands.mdx │ │ │ ├── computer-ui.mdx │ │ │ ├── computers.mdx │ │ │ ├── meta.json │ │ │ └── sandboxed-python.mdx │ │ ├── index.mdx │ │ ├── libraries │ │ │ ├── agent │ │ │ │ └── index.mdx │ │ │ ├── computer │ │ │ │ └── index.mdx │ │ │ ├── computer-server │ │ │ │ ├── Commands.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── REST-API.mdx │ │ │ │ └── WebSocket-API.mdx │ │ │ ├── core │ │ │ │ └── index.mdx │ │ │ ├── lume │ │ │ │ ├── cli-reference.mdx │ │ │ │ ├── faq.md │ │ │ │ ├── http-api.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── meta.json │ │ │ │ └── prebuilt-images.mdx │ │ │ ├── lumier │ │ │ │ ├── building-lumier.mdx │ │ │ │ ├── docker-compose.mdx │ │ │ │ ├── docker.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ └── meta.json │ │ │ ├── mcp-server │ │ │ │ ├── client-integrations.mdx │ │ │ │ ├── configuration.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── llm-integrations.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── tools.mdx │ │ │ │ └── usage.mdx │ │ │ └── som │ │ │ ├── configuration.mdx │ │ │ └── index.mdx │ │ ├── meta.json │ │ ├── quickstart-cli.mdx │ │ ├── quickstart-devs.mdx │ │ └── telemetry.mdx │ ├── next.config.mjs │ ├── package-lock.json │ ├── package.json │ ├── pnpm-lock.yaml │ ├── postcss.config.mjs │ ├── public │ │ └── img │ │ ├── agent_gradio_ui.png │ │ ├── agent.png │ │ ├── cli.png │ │ ├── computer.png │ │ ├── som_box_threshold.png │ │ └── som_iou_threshold.png │ ├── README.md │ ├── source.config.ts │ ├── src │ │ ├── app │ │ │ ├── (home) │ │ │ │ ├── [[...slug]] │ │ │ │ │ └── page.tsx │ │ │ │ └── layout.tsx │ │ │ ├── api │ │ │ │ └── search │ │ │ │ └── route.ts │ │ │ ├── favicon.ico │ │ │ ├── global.css │ │ │ ├── layout.config.tsx │ │ │ ├── layout.tsx │ │ │ ├── llms.mdx │ │ │ │ └── [[...slug]] │ │ │ │ └── route.ts │ │ │ └── llms.txt │ │ │ └── route.ts │ │ ├── assets │ │ │ ├── discord-black.svg │ │ │ ├── discord-white.svg │ │ │ ├── logo-black.svg │ │ │ └── logo-white.svg │ │ ├── components │ │ │ ├── iou.tsx │ │ │ └── mermaid.tsx │ │ ├── lib │ │ │ ├── llms.ts │ │ │ └── source.ts │ │ └── mdx-components.tsx │ └── tsconfig.json ├── examples │ ├── agent_examples.py │ ├── agent_ui_examples.py │ ├── cloud_api_examples.py │ ├── computer_examples_windows.py │ ├── computer_examples.py │ ├── computer_ui_examples.py │ ├── computer-example-ts │ │ ├── .env.example │ │ ├── .gitignore │ │ ├── .prettierrc │ │ ├── package-lock.json │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── README.md │ │ ├── src │ │ │ ├── helpers.ts │ │ │ └── index.ts │ │ └── tsconfig.json │ ├── docker_examples.py │ ├── evals │ │ ├── hud_eval_examples.py │ │ └── wikipedia_most_linked.txt │ ├── pylume_examples.py │ ├── sandboxed_functions_examples.py │ ├── som_examples.py │ ├── utils.py │ └── winsandbox_example.py ├── img │ ├── agent_gradio_ui.png │ ├── agent.png │ ├── cli.png │ ├── computer.png │ ├── logo_black.png │ └── logo_white.png ├── libs │ ├── kasm │ │ ├── Dockerfile │ │ ├── LICENSE │ │ ├── README.md │ │ └── src │ │ └── ubuntu │ │ └── install │ │ └── firefox │ │ ├── custom_startup.sh │ │ ├── firefox.desktop │ │ └── install_firefox.sh │ ├── lume │ │ ├── .cursorignore │ │ ├── CONTRIBUTING.md │ │ ├── Development.md │ │ ├── img │ │ │ └── cli.png │ │ ├── Package.resolved │ │ ├── Package.swift │ │ ├── README.md │ │ ├── resources │ │ │ └── lume.entitlements │ │ ├── scripts │ │ │ ├── build │ │ │ │ ├── build-debug.sh │ │ │ │ ├── build-release-notarized.sh │ │ │ │ └── build-release.sh │ │ │ └── install.sh │ │ ├── src │ │ │ ├── Commands │ │ │ │ ├── Clone.swift │ │ │ │ ├── Config.swift │ │ │ │ ├── Create.swift │ │ │ │ ├── Delete.swift │ │ │ │ ├── Get.swift │ │ │ │ ├── Images.swift │ │ │ │ ├── IPSW.swift │ │ │ │ ├── List.swift │ │ │ │ ├── Logs.swift │ │ │ │ ├── Options │ │ │ │ │ └── FormatOption.swift │ │ │ │ ├── Prune.swift │ │ │ │ ├── Pull.swift │ │ │ │ ├── Push.swift │ │ │ │ ├── Run.swift │ │ │ │ ├── Serve.swift │ │ │ │ ├── Set.swift │ │ │ │ └── Stop.swift │ │ │ ├── ContainerRegistry │ │ │ │ ├── ImageContainerRegistry.swift │ │ │ │ ├── ImageList.swift │ │ │ │ └── ImagesPrinter.swift │ │ │ ├── Errors │ │ │ │ └── Errors.swift │ │ │ ├── FileSystem │ │ │ │ ├── Home.swift │ │ │ │ ├── Settings.swift │ │ │ │ ├── VMConfig.swift │ │ │ │ ├── VMDirectory.swift │ │ │ │ └── VMLocation.swift │ │ │ ├── LumeController.swift │ │ │ ├── Main.swift │ │ │ ├── Server │ │ │ │ ├── Handlers.swift │ │ │ │ ├── HTTP.swift │ │ │ │ ├── Requests.swift │ │ │ │ ├── Responses.swift │ │ │ │ └── Server.swift │ │ │ ├── Utils │ │ │ │ ├── CommandRegistry.swift │ │ │ │ ├── CommandUtils.swift │ │ │ │ ├── Logger.swift │ │ │ │ ├── NetworkUtils.swift │ │ │ │ ├── Path.swift │ │ │ │ ├── ProcessRunner.swift │ │ │ │ ├── ProgressLogger.swift │ │ │ │ ├── String.swift │ │ │ │ └── Utils.swift │ │ │ ├── Virtualization │ │ │ │ ├── DarwinImageLoader.swift │ │ │ │ ├── DHCPLeaseParser.swift │ │ │ │ ├── ImageLoaderFactory.swift │ │ │ │ └── VMVirtualizationService.swift │ │ │ ├── VM │ │ │ │ ├── DarwinVM.swift │ │ │ │ ├── LinuxVM.swift │ │ │ │ ├── VM.swift │ │ │ │ ├── VMDetails.swift │ │ │ │ ├── VMDetailsPrinter.swift │ │ │ │ ├── VMDisplayResolution.swift │ │ │ │ └── VMFactory.swift │ │ │ └── VNC │ │ │ ├── PassphraseGenerator.swift │ │ │ └── VNCService.swift │ │ └── tests │ │ ├── Mocks │ │ │ ├── MockVM.swift │ │ │ ├── MockVMVirtualizationService.swift │ │ │ └── MockVNCService.swift │ │ ├── VM │ │ │ └── VMDetailsPrinterTests.swift │ │ ├── VMTests.swift │ │ ├── VMVirtualizationServiceTests.swift │ │ └── VNCServiceTests.swift │ ├── lumier │ │ ├── .dockerignore │ │ ├── Dockerfile │ │ ├── README.md │ │ └── src │ │ ├── bin │ │ │ └── entry.sh │ │ ├── config │ │ │ └── constants.sh │ │ ├── hooks │ │ │ └── on-logon.sh │ │ └── lib │ │ ├── utils.sh │ │ └── vm.sh │ ├── python │ │ ├── agent │ │ │ ├── .bumpversion.cfg │ │ │ ├── agent │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── adapters │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── huggingfacelocal_adapter.py │ │ │ │ │ ├── human_adapter.py │ │ │ │ │ ├── mlxvlm_adapter.py │ │ │ │ │ └── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── qwen2_5_vl.py │ │ │ │ ├── agent.py │ │ │ │ ├── callbacks │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── budget_manager.py │ │ │ │ │ ├── image_retention.py │ │ │ │ │ ├── logging.py │ │ │ │ │ ├── operator_validator.py │ │ │ │ │ ├── pii_anonymization.py │ │ │ │ │ ├── prompt_instructions.py │ │ │ │ │ ├── telemetry.py │ │ │ │ │ └── trajectory_saver.py │ │ │ │ ├── cli.py │ │ │ │ ├── computers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cua.py │ │ │ │ │ └── custom.py │ │ │ │ ├── decorators.py │ │ │ │ ├── human_tool │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ ├── server.py │ │ │ │ │ └── ui.py │ │ │ │ ├── integrations │ │ │ │ │ └── hud │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── agent.py │ │ │ │ │ └── proxy.py │ │ │ │ ├── loops │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── anthropic.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── composed_grounded.py │ │ │ │ │ ├── gemini.py │ │ │ │ │ ├── glm45v.py │ │ │ │ │ ├── gta1.py │ │ │ │ │ ├── holo.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── model_types.csv │ │ │ │ │ ├── moondream3.py │ │ │ │ │ ├── omniparser.py │ │ │ │ │ ├── openai.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── uitars.py │ │ │ │ ├── proxy │ │ │ │ │ ├── examples.py │ │ │ │ │ └── handlers.py │ │ │ │ ├── responses.py │ │ │ │ ├── types.py │ │ │ │ └── ui │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ └── gradio │ │ │ │ ├── __init__.py │ │ │ │ ├── app.py │ │ │ │ └── ui_components.py │ │ │ ├── benchmarks │ │ │ │ ├── .gitignore │ │ │ │ ├── contrib.md │ │ │ │ ├── interactive.py │ │ │ │ ├── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ └── gta1.py │ │ │ │ ├── README.md │ │ │ │ ├── ss-pro.py │ │ │ │ ├── ss-v2.py │ │ │ │ └── utils.py │ │ │ ├── example.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer │ │ │ │ ├── __init__.py │ │ │ │ ├── computer.py │ │ │ │ ├── diorama_computer.py │ │ │ │ ├── helpers.py │ │ │ │ ├── interface │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ ├── models.py │ │ │ │ │ └── windows.py │ │ │ │ ├── logger.py │ │ │ │ ├── models.py │ │ │ │ ├── providers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cloud │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── docker │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── lume │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── lume_api.py │ │ │ │ │ ├── lumier │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── types.py │ │ │ │ │ └── winsandbox │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── provider.py │ │ │ │ │ └── setup_script.ps1 │ │ │ │ ├── ui │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ └── gradio │ │ │ │ │ ├── __init__.py │ │ │ │ │ └── app.py │ │ │ │ └── utils.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── cli.py │ │ │ │ ├── diorama │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── diorama_computer.py │ │ │ │ │ ├── diorama.py │ │ │ │ │ ├── draw.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── safezone.py │ │ │ │ ├── handlers │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── windows.py │ │ │ │ ├── main.py │ │ │ │ ├── server.py │ │ │ │ └── watchdog.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── usage_example.py │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ ├── run_server.py │ │ │ └── test_connection.py │ │ ├── core │ │ │ ├── .bumpversion.cfg │ │ │ ├── core │ │ │ │ ├── __init__.py │ │ │ │ └── telemetry │ │ │ │ ├── __init__.py │ │ │ │ └── posthog.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── mcp-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── CONCURRENT_SESSIONS.md │ │ │ ├── mcp_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── server.py │ │ │ │ └── session_manager.py │ │ │ ├── pdm.lock │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ └── scripts │ │ │ ├── install_mcp_server.sh │ │ │ └── start_mcp_server.sh │ │ ├── pylume │ │ │ ├── __init__.py │ │ │ ├── .bumpversion.cfg │ │ │ ├── pylume │ │ │ │ ├── __init__.py │ │ │ │ ├── client.py │ │ │ │ ├── exceptions.py │ │ │ │ ├── lume │ │ │ │ ├── models.py │ │ │ │ ├── pylume.py │ │ │ │ └── server.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ └── som │ │ ├── .bumpversion.cfg │ │ ├── LICENSE │ │ ├── poetry.toml │ │ ├── pyproject.toml │ │ ├── README.md │ │ ├── som │ │ │ ├── __init__.py │ │ │ ├── detect.py │ │ │ ├── detection.py │ │ │ ├── models.py │ │ │ ├── ocr.py │ │ │ ├── util │ │ │ │ └── utils.py │ │ │ └── visualization.py │ │ └── tests │ │ └── test_omniparser.py │ ├── typescript │ │ ├── .gitignore │ │ ├── .nvmrc │ │ ├── agent │ │ │ ├── examples │ │ │ │ ├── playground-example.html │ │ │ │ └── README.md │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── client.ts │ │ │ │ ├── index.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ └── client.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── biome.json │ │ ├── computer │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── computer │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── providers │ │ │ │ │ │ ├── base.ts │ │ │ │ │ │ ├── cloud.ts │ │ │ │ │ │ └── index.ts │ │ │ │ │ └── types.ts │ │ │ │ ├── index.ts │ │ │ │ ├── interface │ │ │ │ │ ├── base.ts │ │ │ │ │ ├── factory.ts │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── linux.ts │ │ │ │ │ ├── macos.ts │ │ │ │ │ └── windows.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ ├── computer │ │ │ │ │ └── cloud.test.ts │ │ │ │ ├── interface │ │ │ │ │ ├── factory.test.ts │ │ │ │ │ ├── index.test.ts │ │ │ │ │ ├── linux.test.ts │ │ │ │ │ ├── macos.test.ts │ │ │ │ │ └── windows.test.ts │ │ │ │ └── setup.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── core │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── index.ts │ │ │ │ └── telemetry │ │ │ │ ├── clients │ │ │ │ │ ├── index.ts │ │ │ │ │ └── posthog.ts │ │ │ │ └── index.ts │ │ │ ├── tests │ │ │ │ └── telemetry.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── pnpm-workspace.yaml │ │ └── README.md │ └── xfce │ ├── .dockerignore │ ├── .gitignore │ ├── Dockerfile │ ├── README.md │ └── src │ ├── scripts │ │ ├── resize-display.sh │ │ ├── start-computer-server.sh │ │ ├── start-novnc.sh │ │ ├── start-vnc.sh │ │ └── xstartup.sh │ ├── supervisor │ │ └── supervisord.conf │ └── xfce-config │ ├── helpers.rc │ ├── xfce4-power-manager.xml │ └── xfce4-session.xml ├── LICENSE.md ├── Makefile ├── notebooks │ ├── agent_nb.ipynb │ ├── blog │ │ ├── build-your-own-operator-on-macos-1.ipynb │ │ └── build-your-own-operator-on-macos-2.ipynb │ ├── composite_agents_docker_nb.ipynb │ ├── computer_nb.ipynb │ ├── computer_server_nb.ipynb │ ├── customizing_computeragent.ipynb │ ├── eval_osworld.ipynb │ ├── ollama_nb.ipynb │ ├── pylume_nb.ipynb │ ├── README.md │ ├── sota_hackathon_cloud.ipynb │ └── sota_hackathon.ipynb ├── pdm.lock ├── pyproject.toml ├── pyrightconfig.json ├── README.md ├── samples │ └── community │ ├── global-online │ │ └── README.md │ └── hack-the-north │ └── README.md ├── scripts │ ├── build-uv.sh │ ├── build.ps1 │ ├── build.sh │ ├── cleanup.sh │ ├── playground-docker.sh │ ├── playground.sh │ └── run-docker-dev.sh └── tests ├── pytest.ini ├── shell_cmd.py ├── test_files.py ├── test_mcp_server_session_management.py ├── test_mcp_server_streaming.py ├── test_shell_bash.py ├── test_telemetry.py ├── test_venv.py └── test_watchdog.py ``` # Files -------------------------------------------------------------------------------- /libs/python/agent/agent/loops/holo.py: -------------------------------------------------------------------------------- ```python """ Holo 1.5 agent loop implementation for click prediction using litellm.acompletion. Implements the Holo1.5 grounding behavior: - Prompt asks for absolute pixel coordinates in JSON: {"action":"click_absolute","x":int,"y":int} - Optionally resizes the image using Qwen2-VL smart_resize parameters (via transformers AutoProcessor) - If resized, maps predicted coordinates back to the original screenshot resolution Note: We do NOT manually load the model; acompletions (via HuggingFaceLocalAdapter) will handle loading based on the provided model name. """ from __future__ import annotations import base64 import json from io import BytesIO from typing import Any, Dict, List, Optional, Tuple import litellm from PIL import Image from ..decorators import register_agent from .base import AsyncAgentConfig from ..types import AgentCapability def _strip_hf_prefix(model: str) -> str: """Strip provider prefixes like 'huggingface-local/' from model names for HF processor load.""" if "/" in model and model.lower().startswith("huggingface-local/"): return model.split("/", 1)[1] return model def _maybe_smart_resize(image: Image.Image, model: str) -> Tuple[Image.Image, Tuple[int, int]]: """ Try to compute Qwen2-VL smart_resize output size using transformers AutoProcessor. Returns (processed_image, (orig_w, orig_h)). If transformers or processor unavailable, returns the original image and size without resizing. """ orig_w, orig_h = image.size try: # Import lazily to avoid hard dependency if not installed from transformers import AutoProcessor # type: ignore from transformers.models.qwen2_vl.image_processing_qwen2_vl import ( # type: ignore smart_resize, ) processor_name = _strip_hf_prefix(model) processor = AutoProcessor.from_pretrained(processor_name) image_processor = getattr(processor, "image_processor", None) if image_processor is None: return image, (orig_w, orig_h) factor = getattr(image_processor, "patch_size", 14) * getattr(image_processor, "merge_size", 1) min_pixels = getattr(image_processor, "min_pixels", 256 * 256) max_pixels = getattr(image_processor, "max_pixels", 1536 * 1536) resized_h, resized_w = smart_resize( orig_h, orig_w, factor=factor, min_pixels=min_pixels, max_pixels=max_pixels, ) if (resized_w, resized_h) == (orig_w, orig_h): return image, (orig_w, orig_h) processed = image.resize((resized_w, resized_h), resample=Image.Resampling.LANCZOS) return processed, (orig_w, orig_h) except Exception: # If any failure (no transformers, processor load error), fall back to original return image, (orig_w, orig_h) def _build_holo_prompt(instruction: str) -> str: """Construct the Holo1.5 grounding prompt.""" # Keep it close to the cookbook while avoiding heavy schema generation schema_hint = '{"action": "click_absolute", "x": <int>, "y": <int>}' return ( "Localize an element on the GUI image according to the provided target and output a click position. " f"You must output a valid JSON following the format: {schema_hint} " f"Your target is: {instruction}" ) def _parse_click_json(output_text: str) -> Optional[Tuple[int, int]]: """ Parse JSON from model output and extract x, y ints. Tries to find the first JSON object substring if extra text is present. """ try: # Fast path: direct JSON data = json.loads(output_text) except Exception: # Try to locate a JSON object within the text start = output_text.find("{") end = output_text.rfind("}") if start == -1 or end == -1 or end <= start: return None try: data = json.loads(output_text[start : end + 1]) except Exception: return None try: x = int(data.get("x")) y = int(data.get("y")) return x, y except Exception: return None @register_agent(models=r"(?i).*(Holo1\.5|Hcompany/Holo1\.5).*") class HoloConfig(AsyncAgentConfig): """Holo is a family of UI grounding models from H Company""" async def predict_step( self, messages: List[Dict[str, Any]], model: str, tools: Optional[List[Dict[str, Any]]] = None, max_retries: Optional[int] = None, stream: bool = False, computer_handler=None, _on_api_start=None, _on_api_end=None, _on_usage=None, _on_screenshot=None, **kwargs, ) -> Dict[str, Any]: # Holo models are only trained on UI localization tasks, not all-in-one agent raise NotImplementedError() async def predict_click( self, model: str, image_b64: str, instruction: str, **kwargs, ) -> Optional[Tuple[int, int]]: """ Predict click coordinates using Holo1.5 via litellm.acompletion. - Optionally smart-resizes the image using Qwen2-VL rules if transformers are available - Prompts for JSON with absolute pixel coordinates - Parses x,y and maps back to original screenshot size if resized """ try: img_bytes = base64.b64decode(image_b64) original_img = Image.open(BytesIO(img_bytes)) except Exception: return None # Optional preprocessing processed_img, (orig_w, orig_h) = _maybe_smart_resize(original_img, model) # If we resized, send the resized image; otherwise send original img_to_send = processed_img buf = BytesIO() img_to_send.save(buf, format="PNG") processed_b64 = base64.b64encode(buf.getvalue()).decode("utf-8") prompt = _build_holo_prompt(instruction) messages = [ { "role": "user", "content": [ { "type": "image_url", "image_url": {"url": f"data:image/png;base64,{processed_b64}"}, }, {"type": "text", "text": prompt}, ], } ] api_kwargs = { "model": model, "messages": messages, # Deterministic, small output "max_tokens": kwargs.get("max_tokens", 256), "temperature": kwargs.get("temperature", 0.0), } response = await litellm.acompletion(**api_kwargs) output_text = (response.choices[0].message.content or "").strip() # type: ignore coords = _parse_click_json(output_text) if coords is None: return None x, y = coords # Map back to original size if we resized proc_w, proc_h = img_to_send.size if (proc_w, proc_h) != (orig_w, orig_h): try: sx = orig_w / float(proc_w) sy = orig_h / float(proc_h) x = int(round(x * sx)) y = int(round(y * sy)) except Exception: # Fallback: clamp within original bounds pass # Clamp to original image bounds x = max(0, min(orig_w - 1, x)) y = max(0, min(orig_h - 1, y)) return x, y def get_capabilities(self) -> List[AgentCapability]: return ["click"] ``` -------------------------------------------------------------------------------- /.vscode/launch.json: -------------------------------------------------------------------------------- ```json { "configurations": [ { "name": "Agent UI", "type": "debugpy", "request": "launch", "program": "examples/agent_ui_examples.py", "console": "integratedTerminal", "justMyCode": false, "python": "${workspaceFolder:cua-root}/.venv/bin/python", "cwd": "${workspaceFolder:cua-root}", "env": { "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume" } }, { "name": "Computer UI", "type": "debugpy", "request": "launch", "program": "examples/computer_ui_examples.py", "console": "integratedTerminal", "justMyCode": false, "python": "${workspaceFolder:cua-root}/.venv/bin/python", "cwd": "${workspaceFolder:cua-root}", "env": { "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume" } }, { "name": "Run Computer Examples", "type": "debugpy", "request": "launch", "program": "examples/computer_examples.py", "console": "integratedTerminal", "justMyCode": true, "python": "${workspaceFolder:cua-root}/.venv/bin/python", "cwd": "${workspaceFolder:cua-root}", "env": { "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume" } }, { "name": "Run Agent Examples", "type": "debugpy", "request": "launch", "program": "examples/agent_examples.py", "console": "integratedTerminal", "justMyCode": false, "python": "${workspaceFolder:cua-root}/.venv/bin/python", "cwd": "${workspaceFolder:cua-root}", "env": { "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume" } }, { "name": "Run PyLume Examples", "type": "debugpy", "request": "launch", "program": "examples/pylume_examples.py", "console": "integratedTerminal", "justMyCode": true, "python": "${workspaceFolder:cua-root}/.venv/bin/python", "cwd": "${workspaceFolder:cua-root}", "env": { "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume" } }, { "name": "SOM: Run Experiments (No OCR)", "type": "debugpy", "request": "launch", "program": "examples/som_examples.py", "args": [ "examples/test_data", "--output-dir", "examples/output", "--ocr", "none", "--mode", "experiment" ], "console": "integratedTerminal", "justMyCode": false, "python": "${workspaceFolder:cua-root}/.venv/bin/python", "cwd": "${workspaceFolder:cua-root}", "env": { "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume" } }, { "name": "SOM: Run Experiments (EasyOCR)", "type": "debugpy", "request": "launch", "program": "examples/som_examples.py", "args": [ "examples/test_data", "--output-dir", "examples/output", "--ocr", "easyocr", "--mode", "experiment" ], "console": "integratedTerminal", "justMyCode": false, "python": "${workspaceFolder:cua-root}/.venv/bin/python", "cwd": "${workspaceFolder:cua-root}", "env": { "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume" } }, { "name": "Run Computer Server", "type": "debugpy", "request": "launch", "program": "${workspaceFolder}/libs/python/computer-server/run_server.py", "console": "integratedTerminal", "justMyCode": true, "python": "${workspaceFolder:cua-root}/.venv/bin/python", "cwd": "${workspaceFolder:cua-root}", "env": { "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume" } }, { "name": "Run Computer Server with Args", "type": "debugpy", "request": "launch", "program": "${workspaceFolder}/libs/python/computer-server/run_server.py", "args": [ "--host", "0.0.0.0", "--port", "8000", "--log-level", "debug" ], "console": "integratedTerminal", "justMyCode": false, "python": "${workspaceFolder:cua-root}/.venv/bin/python", "cwd": "${workspaceFolder:cua-root}", "env": { "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer-server" } }, { "type": "lldb", "request": "launch", "args": [], "cwd": "${workspaceFolder:cua-root}/libs/lume", "name": "Debug lume (libs/lume)", "program": "${workspaceFolder:cua-root}/libs/lume/.build/debug/lume", "preLaunchTask": "swift: Build Debug lume (libs/lume)" }, { "type": "lldb", "request": "launch", "args": [], "cwd": "${workspaceFolder:cua-root}/libs/lume", "name": "Release lume (libs/lume)", "program": "${workspaceFolder:cua-root}/libs/lume/.build/release/lume", "preLaunchTask": "swift: Build Release lume (libs/lume)" } ] } ``` -------------------------------------------------------------------------------- /libs/lume/src/Commands/Config.swift: -------------------------------------------------------------------------------- ```swift import ArgumentParser import Foundation struct Config: ParsableCommand { static let configuration = CommandConfiguration( commandName: "config", abstract: "Get or set lume configuration", subcommands: [Get.self, Storage.self, Cache.self, Caching.self], defaultSubcommand: Get.self ) // MARK: - Basic Configuration Subcommands struct Get: ParsableCommand { static let configuration = CommandConfiguration( commandName: "get", abstract: "Get current configuration" ) func run() throws { let controller = LumeController() let settings = controller.getSettings() // Display default location print( "Default VM storage: \(settings.defaultLocationName) (\(settings.defaultLocation?.path ?? "not set"))" ) // Display cache directory print("Cache directory: \(settings.cacheDirectory)") // Display caching enabled status print("Caching enabled: \(settings.cachingEnabled)") // Display all locations if !settings.vmLocations.isEmpty { print("\nConfigured VM storage locations:") for location in settings.sortedLocations { let isDefault = location.name == settings.defaultLocationName let defaultMark = isDefault ? " (default)" : "" print(" - \(location.name): \(location.path)\(defaultMark)") } } } } // MARK: - Debug Command struct Debug: ParsableCommand { static let configuration = CommandConfiguration( commandName: "debug", abstract: "Output detailed debug information about current configuration", shouldDisplay: false ) func run() throws { let debugInfo = SettingsManager.shared.debugSettings() print(debugInfo) } } // MARK: - Caching Management Subcommands struct Caching: ParsableCommand { static let configuration = CommandConfiguration( commandName: "caching", abstract: "Manage image caching settings", subcommands: [GetCaching.self, SetCaching.self] ) struct GetCaching: ParsableCommand { static let configuration = CommandConfiguration( commandName: "get", abstract: "Show current caching status" ) func run() throws { let controller = LumeController() let cachingEnabled = controller.isCachingEnabled() print("Caching enabled: \(cachingEnabled)") } } struct SetCaching: ParsableCommand { static let configuration = CommandConfiguration( commandName: "set", abstract: "Enable or disable image caching" ) @Argument(help: "Enable or disable caching (true/false)") var enabled: Bool func run() throws { let controller = LumeController() try controller.setCachingEnabled(enabled) print("Caching \(enabled ? "enabled" : "disabled")") } } } // MARK: - Cache Management Subcommands struct Cache: ParsableCommand { static let configuration = CommandConfiguration( commandName: "cache", abstract: "Manage cache settings", subcommands: [GetCache.self, SetCache.self] ) struct GetCache: ParsableCommand { static let configuration = CommandConfiguration( commandName: "get", abstract: "Get current cache directory" ) func run() throws { let controller = LumeController() let cacheDir = controller.getCacheDirectory() print("Cache directory: \(cacheDir)") } } struct SetCache: ParsableCommand { static let configuration = CommandConfiguration( commandName: "set", abstract: "Set cache directory" ) @Argument(help: "Path to cache directory") var path: String func run() throws { let controller = LumeController() try controller.setCacheDirectory(path: path) print("Cache directory set to: \(path)") } } } // MARK: - Storage Management Subcommands struct Storage: ParsableCommand { static let configuration = CommandConfiguration( commandName: "storage", abstract: "Manage VM storage locations", subcommands: [Add.self, Remove.self, List.self, Default.self] ) struct Add: ParsableCommand { static let configuration = CommandConfiguration( commandName: "add", abstract: "Add a new VM storage location" ) @Argument(help: "Storage name (alphanumeric with dashes/underscores)") var name: String @Argument(help: "Path to VM storage directory") var path: String func run() throws { let controller = LumeController() try controller.addLocation(name: name, path: path) print("Added VM storage location: \(name) at \(path)") } } struct Remove: ParsableCommand { static let configuration = CommandConfiguration( commandName: "remove", abstract: "Remove a VM storage location" ) @Argument(help: "Storage name to remove") var name: String func run() throws { let controller = LumeController() try controller.removeLocation(name: name) print("Removed VM storage location: \(name)") } } struct List: ParsableCommand { static let configuration = CommandConfiguration( commandName: "list", abstract: "List all VM storage locations" ) func run() throws { let controller = LumeController() let settings = controller.getSettings() if settings.vmLocations.isEmpty { print("No VM storage locations configured") return } print("VM Storage Locations:") for location in settings.sortedLocations { let isDefault = location.name == settings.defaultLocationName let defaultMark = isDefault ? " (default)" : "" print(" - \(location.name): \(location.path)\(defaultMark)") } } } struct Default: ParsableCommand { static let configuration = CommandConfiguration( commandName: "default", abstract: "Set the default VM storage location" ) @Argument(help: "Storage name to set as default") var name: String func run() throws { let controller = LumeController() try controller.setDefaultLocation(name: name) print("Set default VM storage location to: \(name)") } } } } ``` -------------------------------------------------------------------------------- /libs/python/computer-server/computer_server/handlers/generic.py: -------------------------------------------------------------------------------- ```python """ Generic handlers for all OSes. Includes: - FileHandler """ from pathlib import Path from typing import Dict, Any, Optional from .base import BaseFileHandler import base64 def resolve_path(path: str) -> Path: """Resolve a path to its absolute path. Expand ~ to the user's home directory. Args: path: The file or directory path to resolve Returns: Path: The resolved absolute path """ return Path(path).expanduser().resolve() class GenericFileHandler(BaseFileHandler): """ Generic file handler that provides file system operations for all operating systems. This class implements the BaseFileHandler interface and provides methods for file and directory operations including reading, writing, creating, and deleting files and directories. """ async def file_exists(self, path: str) -> Dict[str, Any]: """ Check if a file exists at the specified path. Args: path: The file path to check Returns: Dict containing 'success' boolean and either 'exists' boolean or 'error' string """ try: return {"success": True, "exists": resolve_path(path).is_file()} except Exception as e: return {"success": False, "error": str(e)} async def directory_exists(self, path: str) -> Dict[str, Any]: """ Check if a directory exists at the specified path. Args: path: The directory path to check Returns: Dict containing 'success' boolean and either 'exists' boolean or 'error' string """ try: return {"success": True, "exists": resolve_path(path).is_dir()} except Exception as e: return {"success": False, "error": str(e)} async def list_dir(self, path: str) -> Dict[str, Any]: """ List all files and directories in the specified directory. Args: path: The directory path to list Returns: Dict containing 'success' boolean and either 'files' list of names or 'error' string """ try: return {"success": True, "files": [p.name for p in resolve_path(path).iterdir() if p.is_file() or p.is_dir()]} except Exception as e: return {"success": False, "error": str(e)} async def read_text(self, path: str) -> Dict[str, Any]: """ Read the contents of a text file. Args: path: The file path to read from Returns: Dict containing 'success' boolean and either 'content' string or 'error' string """ try: return {"success": True, "content": resolve_path(path).read_text()} except Exception as e: return {"success": False, "error": str(e)} async def write_text(self, path: str, content: str) -> Dict[str, Any]: """ Write text content to a file. Args: path: The file path to write to content: The text content to write Returns: Dict containing 'success' boolean and optionally 'error' string """ try: resolve_path(path).write_text(content) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def write_bytes(self, path: str, content_b64: str, append: bool = False) -> Dict[str, Any]: """ Write binary content to a file from base64 encoded string. Args: path: The file path to write to content_b64: Base64 encoded binary content append: If True, append to existing file; if False, overwrite Returns: Dict containing 'success' boolean and optionally 'error' string """ try: mode = 'ab' if append else 'wb' with open(resolve_path(path), mode) as f: f.write(base64.b64decode(content_b64)) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> Dict[str, Any]: """ Read binary content from a file and return as base64 encoded string. Args: path: The file path to read from offset: Byte offset to start reading from length: Number of bytes to read; if None, read entire file from offset Returns: Dict containing 'success' boolean and either 'content_b64' string or 'error' string """ try: file_path = resolve_path(path) with open(file_path, 'rb') as f: if offset > 0: f.seek(offset) if length is not None: content = f.read(length) else: content = f.read() return {"success": True, "content_b64": base64.b64encode(content).decode('utf-8')} except Exception as e: return {"success": False, "error": str(e)} async def get_file_size(self, path: str) -> Dict[str, Any]: """ Get the size of a file in bytes. Args: path: The file path to get size for Returns: Dict containing 'success' boolean and either 'size' integer or 'error' string """ try: file_path = resolve_path(path) size = file_path.stat().st_size return {"success": True, "size": size} except Exception as e: return {"success": False, "error": str(e)} async def delete_file(self, path: str) -> Dict[str, Any]: """ Delete a file at the specified path. Args: path: The file path to delete Returns: Dict containing 'success' boolean and optionally 'error' string """ try: resolve_path(path).unlink() return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def create_dir(self, path: str) -> Dict[str, Any]: """ Create a directory at the specified path. Creates parent directories if they don't exist and doesn't raise an error if the directory already exists. Args: path: The directory path to create Returns: Dict containing 'success' boolean and optionally 'error' string """ try: resolve_path(path).mkdir(parents=True, exist_ok=True) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def delete_dir(self, path: str) -> Dict[str, Any]: """ Delete an empty directory at the specified path. Args: path: The directory path to delete Returns: Dict containing 'success' boolean and optionally 'error' string """ try: resolve_path(path).rmdir() return {"success": True} except Exception as e: return {"success": False, "error": str(e)} ``` -------------------------------------------------------------------------------- /libs/python/pylume/pylume/models.py: -------------------------------------------------------------------------------- ```python from typing import Optional, List, Literal, Dict, Any import re from pydantic import BaseModel, Field, computed_field, validator, ConfigDict, RootModel class DiskInfo(BaseModel): """Information about disk storage allocation. Attributes: total: Total disk space in bytes allocated: Currently allocated disk space in bytes """ total: int allocated: int class VMConfig(BaseModel): """Configuration for creating a new VM. Note: Memory and disk sizes should be specified with units (e.g., "4GB", "64GB") Attributes: name: Name of the virtual machine os: Operating system type, either "macOS" or "linux" cpu: Number of CPU cores to allocate memory: Amount of memory to allocate with units disk_size: Size of the disk to create with units display: Display resolution in format "widthxheight" ipsw: IPSW path or 'latest' for macOS VMs, None for other OS types """ name: str os: Literal["macOS", "linux"] = "macOS" cpu: int = Field(default=2, ge=1) memory: str = "4GB" disk_size: str = Field(default="64GB", alias="diskSize") display: str = "1024x768" ipsw: Optional[str] = Field(default=None, description="IPSW path or 'latest', for macOS VMs") class Config: populate_by_alias = True class SharedDirectory(BaseModel): """Configuration for a shared directory. Attributes: host_path: Path to the directory on the host system read_only: Whether the directory should be mounted as read-only """ host_path: str = Field(..., alias="hostPath") # Allow host_path but serialize as hostPath read_only: bool = False class Config: populate_by_name = True # Allow both alias and original name alias_generator = lambda s: ''.join(word.capitalize() if i else word for i, word in enumerate(s.split('_'))) class VMRunOpts(BaseModel): """Configuration for running a VM. Args: no_display: Whether to not display the VNC client shared_directories: List of directories to share with the VM """ no_display: bool = Field(default=False, alias="noDisplay") shared_directories: Optional[list[SharedDirectory]] = Field( default=None, alias="sharedDirectories" ) model_config = ConfigDict( populate_by_name=True, alias_generator=lambda s: ''.join(word.capitalize() if i else word for i, word in enumerate(s.split('_'))) ) def model_dump(self, **kwargs): """Export model data with proper field name conversion. Converts shared directory fields to match API expectations when using aliases. Args: **kwargs: Keyword arguments passed to parent model_dump method Returns: dict: Model data with properly formatted field names """ data = super().model_dump(**kwargs) # Convert shared directory fields to match API expectations if self.shared_directories and "by_alias" in kwargs and kwargs["by_alias"]: data["sharedDirectories"] = [ { "hostPath": d.host_path, "readOnly": d.read_only } for d in self.shared_directories ] # Remove the snake_case version if it exists data.pop("shared_directories", None) return data class VMStatus(BaseModel): """Status information for a virtual machine. Attributes: name: Name of the virtual machine status: Current status of the VM os: Operating system type cpu_count: Number of CPU cores allocated memory_size: Amount of memory allocated in bytes disk_size: Disk storage information vnc_url: URL for VNC connection if available ip_address: IP address of the VM if available """ name: str status: str os: Literal["macOS", "linux"] cpu_count: int = Field(alias="cpuCount") memory_size: int = Field(alias="memorySize") # API returns memory size in bytes disk_size: DiskInfo = Field(alias="diskSize") vnc_url: Optional[str] = Field(default=None, alias="vncUrl") ip_address: Optional[str] = Field(default=None, alias="ipAddress") class Config: populate_by_alias = True @computed_field @property def state(self) -> str: """Get the current state of the VM. Returns: str: Current VM status """ return self.status @computed_field @property def cpu(self) -> int: """Get the number of CPU cores. Returns: int: Number of CPU cores allocated to the VM """ return self.cpu_count @computed_field @property def memory(self) -> str: """Get memory allocation in human-readable format. Returns: str: Memory size formatted as "{size}GB" """ # Convert bytes to GB gb = self.memory_size / (1024 * 1024 * 1024) return f"{int(gb)}GB" class VMUpdateOpts(BaseModel): """Options for updating VM configuration. Attributes: cpu: Number of CPU cores to update to memory: Amount of memory to update to with units disk_size: Size of disk to update to with units """ cpu: Optional[int] = None memory: Optional[str] = None disk_size: Optional[str] = None class ImageRef(BaseModel): """Reference to a VM image. Attributes: image: Name of the image tag: Tag version of the image registry: Registry hostname where image is stored organization: Organization or namespace in the registry """ image: str tag: str = "latest" registry: Optional[str] = "ghcr.io" organization: Optional[str] = "trycua" def model_dump(self, **kwargs): """Override model_dump to return just the image:tag format. Args: **kwargs: Keyword arguments (ignored) Returns: str: Image reference in "image:tag" format """ return f"{self.image}:{self.tag}" class CloneSpec(BaseModel): """Specification for cloning a VM. Attributes: name: Name of the source VM to clone new_name: Name for the new cloned VM """ name: str new_name: str = Field(alias="newName") class Config: populate_by_alias = True class ImageInfo(BaseModel): """Model for individual image information. Attributes: imageId: Unique identifier for the image """ imageId: str class ImageList(RootModel): """Response model for the images endpoint. A list-like container for ImageInfo objects that provides iteration and indexing capabilities. """ root: List[ImageInfo] def __iter__(self): """Iterate over the image list. Returns: Iterator over ImageInfo objects """ return iter(self.root) def __getitem__(self, item): """Get an item from the image list by index. Args: item: Index or slice to retrieve Returns: ImageInfo or list of ImageInfo objects """ return self.root[item] def __len__(self): """Get the number of images in the list. Returns: int: Number of images in the list """ return len(self.root) ``` -------------------------------------------------------------------------------- /libs/python/mcp-server/CONCURRENT_SESSIONS.md: -------------------------------------------------------------------------------- ```markdown # MCP Server Concurrent Session Management This document describes the improvements made to the MCP Server to address concurrent session management and resource lifecycle issues. ## Problem Statement The original MCP server implementation had several critical issues: 1. **Global Computer Instance**: Used a single `global_computer` variable shared across all clients 2. **No Resource Isolation**: Multiple clients would interfere with each other 3. **Sequential Task Processing**: Multi-task operations were always sequential 4. **No Graceful Shutdown**: Server couldn't properly cleanup resources on shutdown 5. **Hidden Event Loop**: `server.run()` hid the event loop, preventing proper lifecycle management ## Solution Architecture ### 1. Session Manager (`session_manager.py`) The `SessionManager` class provides: - **Per-session computer instances**: Each client gets isolated computer resources - **Computer instance pooling**: Efficient reuse of computer instances with lifecycle management - **Task registration**: Track active tasks per session for graceful cleanup - **Automatic cleanup**: Background task cleans up idle sessions - **Resource limits**: Configurable maximum concurrent sessions #### Key Components: ```python class SessionManager: def __init__(self, max_concurrent_sessions: int = 10): self._sessions: Dict[str, SessionInfo] = {} self._computer_pool = ComputerPool() # ... lifecycle management ``` #### Session Lifecycle: 1. **Creation**: New session created when client first connects 2. **Task Registration**: Each task is registered with the session 3. **Activity Tracking**: Last activity time updated on each operation 4. **Cleanup**: Sessions cleaned up when idle or on shutdown ### 2. Computer Pool (`ComputerPool`) Manages computer instances efficiently: - **Pool Size Limits**: Maximum number of concurrent computer instances - **Instance Reuse**: Available instances reused across sessions - **Lifecycle Management**: Proper startup/shutdown of computer instances - **Resource Cleanup**: All instances properly closed on shutdown ### 3. Enhanced Server Tools All server tools now support: - **Session ID Parameter**: Optional `session_id` for multi-client support - **Resource Isolation**: Each session gets its own computer instance - **Task Tracking**: Proper registration/unregistration of tasks - **Error Handling**: Graceful error handling with session cleanup #### Updated Tool Signatures: ```python async def screenshot_cua(ctx: Context, session_id: Optional[str] = None) -> Any: async def run_cua_task(ctx: Context, task: str, session_id: Optional[str] = None) -> Any: async def run_multi_cua_tasks(ctx: Context, tasks: List[str], session_id: Optional[str] = None, concurrent: bool = False) -> Any: ``` ### 4. Concurrent Task Execution The `run_multi_cua_tasks` tool now supports: - **Sequential Mode** (default): Tasks run one after another - **Concurrent Mode**: Tasks run in parallel using `asyncio.gather()` - **Progress Tracking**: Proper progress reporting for both modes - **Error Handling**: Individual task failures don't stop other tasks ### 5. Graceful Shutdown The server now provides: - **Signal Handlers**: Proper handling of SIGINT and SIGTERM - **Session Cleanup**: All active sessions properly cleaned up - **Resource Release**: Computer instances returned to pool and closed - **Async Lifecycle**: Event loop properly exposed for cleanup ## Usage Examples ### Basic Usage (Backward Compatible) ```python # These calls work exactly as before await screenshot_cua(ctx) await run_cua_task(ctx, "Open browser") await run_multi_cua_tasks(ctx, ["Task 1", "Task 2"]) ``` ### Multi-Client Usage ```python # Client 1 session_id_1 = "client-1-session" await screenshot_cua(ctx, session_id_1) await run_cua_task(ctx, "Open browser", session_id_1) # Client 2 (completely isolated) session_id_2 = "client-2-session" await screenshot_cua(ctx, session_id_2) await run_cua_task(ctx, "Open editor", session_id_2) ``` ### Concurrent Task Execution ```python # Run tasks concurrently instead of sequentially tasks = ["Open browser", "Open editor", "Open terminal"] results = await run_multi_cua_tasks(ctx, tasks, concurrent=True) ``` ### Session Management ```python # Get session statistics stats = await get_session_stats(ctx) print(f"Active sessions: {stats['total_sessions']}") # Cleanup specific session await cleanup_session(ctx, "session-to-cleanup") ``` ## Configuration ### Environment Variables - `CUA_MODEL_NAME`: Model to use (default: `anthropic/claude-3-5-sonnet-20241022`) - `CUA_MAX_IMAGES`: Maximum images to keep (default: `3`) ### Session Manager Configuration ```python # In session_manager.py class SessionManager: def __init__(self, max_concurrent_sessions: int = 10): # Configurable maximum concurrent sessions class ComputerPool: def __init__(self, max_size: int = 5, idle_timeout: float = 300.0): # Configurable pool size and idle timeout ``` ## Performance Improvements ### Before (Issues): - ❌ Single global computer instance - ❌ Client interference and resource conflicts - ❌ Sequential task processing only - ❌ No graceful shutdown - ❌ 30s timeout issues with long-running tasks ### After (Benefits): - ✅ Per-session computer instances with proper isolation - ✅ Computer instance pooling for efficient resource usage - ✅ Concurrent task execution support - ✅ Graceful shutdown with proper cleanup - ✅ Streaming updates prevent timeout issues - ✅ Configurable resource limits - ✅ Automatic session cleanup ## Testing Comprehensive test coverage includes: - Session creation and reuse - Concurrent session isolation - Task registration and cleanup - Error handling with session management - Concurrent vs sequential task execution - Session statistics and cleanup Run tests with: ```bash pytest tests/test_mcp_server_session_management.py -v ``` ## Migration Guide ### For Existing Clients No changes required! The new implementation is fully backward compatible: ```python # This still works exactly as before await run_cua_task(ctx, "My task") ``` ### For New Multi-Client Applications Use session IDs for proper isolation: ```python # Create a unique session ID for each client session_id = str(uuid.uuid4()) await run_cua_task(ctx, "My task", session_id) ``` ### For Concurrent Task Execution Enable concurrent mode for better performance: ```python tasks = ["Task 1", "Task 2", "Task 3"] results = await run_multi_cua_tasks(ctx, tasks, concurrent=True) ``` ## Monitoring and Debugging ### Session Statistics ```python stats = await get_session_stats(ctx) print(f"Total sessions: {stats['total_sessions']}") print(f"Max concurrent: {stats['max_concurrent']}") for session_id, session_info in stats['sessions'].items(): print(f"Session {session_id}: {session_info['active_tasks']} active tasks") ``` ### Logging The server provides detailed logging for: - Session creation and cleanup - Task registration and completion - Resource pool usage - Error conditions and recovery ### Graceful Shutdown The server properly handles shutdown signals: ```bash # Send SIGTERM for graceful shutdown kill -TERM <server_pid> # Or use Ctrl+C (SIGINT) ``` ## Future Enhancements Potential future improvements: 1. **Session Persistence**: Save/restore session state across restarts 2. **Load Balancing**: Distribute sessions across multiple server instances 3. **Resource Monitoring**: Real-time monitoring of resource usage 4. **Auto-scaling**: Dynamic adjustment of pool size based on demand 5. **Session Timeouts**: Configurable timeouts for different session types ``` -------------------------------------------------------------------------------- /blog/human-in-the-loop.md: -------------------------------------------------------------------------------- ```markdown # When Agents Need Human Wisdom - Introducing Human-In-The-Loop Support *Published on August 29, 2025 by Francesco Bonacci* Sometimes the best AI agent is a human. Whether you're creating training demonstrations, evaluating complex scenarios, or need to intervene when automation hits a wall, our new Human-In-The-Loop integration puts you directly in control. With yesterday's [HUD evaluation integration](hud-agent-evals.md), you could benchmark any agent at scale. Today's update lets you *become* the agent when it matters most—seamlessly switching between automated intelligence and human judgment. <div align="center"> <video src="https://github.com/user-attachments/assets/9091b50f-26e7-4981-95ce-40e5d42a1260" width="600" controls></video> </div> ## What you get - **One-line human takeover** for any agent configuration with `human/human` or `model+human/human` - **Interactive web UI** to see what your agent sees and control what it does - **Zero context switching** - step in exactly where automation left off - **Training data generation** - create perfect demonstrations by doing tasks yourself - **Ground truth evaluation** - validate agent performance with human expertise ## Why Human-In-The-Loop? Even the most sophisticated agents encounter edge cases, ambiguous interfaces, or tasks requiring human judgment. Rather than failing gracefully, they can now fail *intelligently*—by asking for human help. This approach bridges the gap between fully automated systems and pure manual control, letting you: - **Demonstrate complex workflows** that agents can learn from - **Evaluate tricky scenarios** where ground truth requires human assessment - **Intervene selectively** when automated agents need guidance - **Test and debug** your tools and environments manually ## Getting Started Launch the human agent interface: ```bash python -m agent.human_tool ``` The web UI will show pending completions. Click any completion to take control of the agent and see exactly what it sees. ## Usage Examples ### Direct Human Control Perfect for creating demonstrations or when you want full manual control: ```python from agent import ComputerAgent from agent.computer import computer agent = ComputerAgent( "human/human", tools=[computer] ) # You'll get full control through the web UI async for _ in agent.run("Take a screenshot, analyze the UI, and click on the most prominent button"): pass ``` ### Hybrid: AI Planning + Human Execution Combine model intelligence with human precision—let AI plan, then execute manually: ```python agent = ComputerAgent( "huggingface-local/HelloKKMe/GTA1-7B+human/human", tools=[computer] ) # AI creates the plan, human executes each step async for _ in agent.run("Navigate to the settings page and enable dark mode"): pass ``` ### Fallback Pattern Start automated, escalate to human when needed: ```python # Primary automated agent primary_agent = ComputerAgent("openai/computer-use-preview", tools=[computer]) # Human fallback agent fallback_agent = ComputerAgent("human/human", tools=[computer]) try: async for result in primary_agent.run(task): if result.confidence < 0.7: # Low confidence threshold # Seamlessly hand off to human async for _ in fallback_agent.run(f"Continue this task: {task}"): pass except Exception: # Agent failed, human takes over async for _ in fallback_agent.run(f"Handle this failed task: {task}"): pass ``` ## Interactive Features The human-in-the-loop interface provides a rich, responsive experience: ### **Visual Environment** - **Screenshot display** with live updates as you work - **Click handlers** for direct interaction with UI elements - **Zoom and pan** to see details clearly ### **Action Controls** - **Click actions** - precise cursor positioning and clicking - **Keyboard input** - type text naturally or send specific key combinations - **Action history** - see the sequence of actions taken - **Undo support** - step back when needed ### **Tool Integration** - **Full OpenAI compatibility** - standard tool call format - **Custom tools** - integrate your own tools seamlessly - **Real-time feedback** - see tool responses immediately ### **Smart Polling** - **Responsive updates** - UI refreshes when new completions arrive - **Background processing** - continue working while waiting for tasks - **Session persistence** - resume interrupted sessions ## Real-World Use Cases ### **Training Data Generation** Create perfect demonstrations for fine-tuning: ```python # Generate training examples for spreadsheet tasks demo_agent = ComputerAgent("human/human", tools=[computer]) tasks = [ "Create a budget spreadsheet with income and expense categories", "Apply conditional formatting to highlight overbudget items", "Generate a pie chart showing expense distribution" ] for task in tasks: # Human demonstrates each task perfectly async for _ in demo_agent.run(task): pass # Recorded actions become training data ``` ### **Evaluation and Ground Truth** Validate agent performance on complex scenarios: ```python # Human evaluates agent performance evaluator = ComputerAgent("human/human", tools=[computer]) async for _ in evaluator.run("Review this completed form and rate accuracy (1-10)"): pass # Human provides authoritative quality assessment ``` ### **Interactive Debugging** Step through agent behavior manually: ```python # Test a workflow step by step debug_agent = ComputerAgent("human/human", tools=[computer]) async for _ in debug_agent.run("Reproduce the agent's failed login sequence"): pass # Human identifies exactly where automation breaks ``` ### **Edge Case Handling** Handle scenarios that break automated agents: ```python # Complex UI interaction requiring human judgment edge_case_agent = ComputerAgent("human/human", tools=[computer]) async for _ in edge_case_agent.run("Navigate this CAPTCHA-protected form"): pass # Human handles what automation cannot ``` ## Configuration Options Customize the human agent experience: - **UI refresh rate**: Adjust polling frequency for your workflow - **Image quality**: Balance detail vs. performance for screenshots - **Action logging**: Save detailed traces for analysis and training - **Session timeout**: Configure idle timeouts for security - **Tool permissions**: Restrict which tools humans can access ## When to Use Human-In-The-Loop | **Scenario** | **Why Human Control** | |--------------|----------------------| | **Creating training data** | Perfect demonstrations for model fine-tuning | | **Evaluating complex tasks** | Human judgment for subjective or nuanced assessment | | **Handling edge cases** | CAPTCHAs, unusual UIs, context-dependent decisions | | **Debugging workflows** | Step through failures to identify breaking points | | **High-stakes operations** | Critical tasks requiring human oversight and approval | | **Testing new environments** | Validate tools and environments work as expected | ## Learn More - **Interactive examples**: Try human-in-the-loop control with sample tasks - **Training data pipelines**: Learn how to convert human demonstrations into model training data - **Evaluation frameworks**: Build human-validated test suites for your agents - **API documentation**: Full reference for human agent configuration Ready to put humans back in the loop? The most sophisticated AI system knows when to ask for help. --- *Questions about human-in-the-loop agents? Join the conversation in our [Discord community](https://discord.gg/cua-ai) or check out our [documentation](https://docs.trycua.com/docs/agent-sdk/supported-agents/human-in-the-loop).* ``` -------------------------------------------------------------------------------- /docs/content/docs/quickstart-cli.mdx: -------------------------------------------------------------------------------- ```markdown --- title: Quickstart (CLI) description: Get started with the cua Agent CLI in 4 steps icon: Rocket --- import { Step, Steps } from 'fumadocs-ui/components/steps'; import { Tab, Tabs } from 'fumadocs-ui/components/tabs'; import { Accordion, Accordions } from 'fumadocs-ui/components/accordion'; Get up and running with the cua Agent CLI in 4 simple steps. <Steps> <Step> ## Introduction cua combines Computer (interface) + Agent (AI) for automating desktop apps. The Agent CLI provides a clean terminal interface to control your remote computer using natural language commands. </Step> <Step> ## Set Up Your Computer Environment Choose how you want to run your cua computer. **Cloud Sandbox is recommended** for the easiest setup: <Tabs items={['☁️ Cloud Sandbox (Recommended)', 'Linux on Docker', 'Windows Sandbox', 'macOS VM']}> <Tab value="☁️ Cloud Sandbox (Recommended)"> **Easiest & safest way to get started - works on any host OS** 1. Go to [trycua.com/signin](https://www.trycua.com/signin) 2. Navigate to **Dashboard > Containers > Create Instance** 3. Create a **Medium, Ubuntu 22** container 4. Note your container name and API key Your cloud container will be automatically configured and ready to use. </Tab> <Tab value="Linux on Docker"> **Run Linux desktop locally on macOS, Windows, or Linux hosts** 1. Install Docker Desktop or Docker Engine 2. Pull the CUA XFCE container (lightweight desktop) ```bash docker pull --platform=linux/amd64 trycua/cua-xfce:latest ``` Or use KASM for a full-featured desktop: ```bash docker pull --platform=linux/amd64 trycua/cua-ubuntu:latest ``` </Tab> <Tab value="Windows Sandbox"> **Windows hosts only - requires Windows 10 Pro/Enterprise or Windows 11** 1. Enable Windows Sandbox 2. Install pywinsandbox dependency ```bash pip install -U git+git://github.com/karkason/pywinsandbox.git ``` 3. Windows Sandbox will be automatically configured when you run the CLI </Tab> <Tab value="macOS VM"> **macOS hosts only - requires Lume CLI** 1. Install lume cli ```bash /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/trycua/cua/main/libs/lume/scripts/install.sh)" ``` 2. Start a local cua macOS VM ```bash lume run macos-sequoia-cua:latest ``` </Tab> </Tabs> </Step> <Step> ## Install cua <Accordions type="single" defaultValue="uv"> <Accordion title="uv (Recommended)" value="uv"> ### Install uv <Tabs items={['macOS / Linux', 'Windows']} persist> <Tab value="macOS / Linux"> ```bash # Use curl to download the script and execute it with sh: curl -LsSf https://astral.sh/uv/install.sh | sh # If your system doesn't have curl, you can use wget: # wget -qO- https://astral.sh/uv/install.sh | sh ``` </Tab> <Tab value="Windows"> ```powershell # Use irm to download the script and execute it with iex: powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex" ``` </Tab> </Tabs> ### Install Python 3.12 ```bash uv python install 3.12 # uv will install cua dependencies automatically when you use --with "cua-agent[cli]" ``` </Accordion> <Accordion title="conda" value="conda"> ### Install conda <Tabs items={['macOS', 'Linux', 'Windows']} persist> <Tab value="macOS"> ```bash mkdir -p ~/miniconda3 curl https://repo.anaconda.com/miniconda/Miniconda3-latest-MacOSX-arm64.sh -o ~/miniconda3/miniconda.sh bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3 rm ~/miniconda3/miniconda.sh source ~/miniconda3/bin/activate ``` </Tab> <Tab value="Linux"> ```bash mkdir -p ~/miniconda3 wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda3/miniconda.sh bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3 rm ~/miniconda3/miniconda.sh source ~/miniconda3/bin/activate ``` </Tab> <Tab value="Windows"> ```powershell wget "https://repo.anaconda.com/miniconda/Miniconda3-latest-Windows-x86_64.exe" -outfile ".\miniconda.exe" Start-Process -FilePath ".\miniconda.exe" -ArgumentList "/S" -Wait del .\miniconda.exe ``` </Tab> </Tabs> ### Create and activate Python 3.12 environment ```bash conda create -n cua python=3.12 conda activate cua ``` ### Install cua ```bash pip install "cua-agent[cli]" cua-computer ``` </Accordion> <Accordion title="pip" value="pip"> ### Install cua ```bash pip install "cua-agent[cli]" cua-computer ``` </Accordion> </Accordions> </Step> <Step> ## Run cua CLI Choose your preferred AI model: ### OpenAI Computer Use Preview <Tabs items={['uv', 'conda/pip']} persist> <Tab value="uv"> ```bash uv run --with "cua-agent[cli]" -m agent.cli openai/computer-use-preview ``` </Tab> <Tab value="conda/pip"> ```bash python -m agent.cli openai/computer-use-preview ``` </Tab> </Tabs> ### Anthropic Claude <Tabs items={['uv', 'conda/pip']} persist> <Tab value="uv"> ```bash uv run --with "cua-agent[cli]" -m agent.cli anthropic/claude-sonnet-4-5-20250929 uv run --with "cua-agent[cli]" -m agent.cli anthropic/claude-opus-4-20250514 uv run --with "cua-agent[cli]" -m agent.cli anthropic/claude-opus-4-1-20250805 uv run --with "cua-agent[cli]" -m agent.cli anthropic/claude-sonnet-4-20250514 uv run --with "cua-agent[cli]" -m agent.cli anthropic/claude-3-5-sonnet-20241022 ``` </Tab> <Tab value="conda/pip"> ```bash python -m agent.cli anthropic/claude-sonnet-4-5-20250929 python -m agent.cli anthropic/claude-opus-4-1-20250805 python -m agent.cli anthropic/claude-opus-4-20250514 python -m agent.cli anthropic/claude-sonnet-4-20250514 python -m agent.cli anthropic/claude-3-5-sonnet-20241022 ``` </Tab> </Tabs> ### Omniparser + LLMs <Tabs items={['uv', 'conda/pip']} persist> <Tab value="uv"> ```bash uv run --with "cua-agent[cli]" -m agent.cli omniparser+anthropic/claude-3-5-sonnet-20241022 uv run --with "cua-agent[cli]" -m agent.cli omniparser+openai/gpt-4o uv run --with "cua-agent[cli]" -m agent.cli omniparser+vertex_ai/gemini-pro ``` </Tab> <Tab value="conda/pip"> ```bash python -m agent.cli omniparser+anthropic/claude-3-5-sonnet-20241022 python -m agent.cli omniparser+openai/gpt-4o python -m agent.cli omniparser+vertex_ai/gemini-pro ``` </Tab> </Tabs> ### Local Models <Tabs items={['uv', 'conda/pip']} persist> <Tab value="uv"> ```bash # Hugging Face models (local) uv run --with "cua-agent[cli]" -m agent.cli huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B # MLX models (Apple Silicon) uv run --with "cua-agent[cli]" -m agent.cli mlx/mlx-community/UI-TARS-1.5-7B-6bit # Ollama models uv run --with "cua-agent[cli]" -m agent.cli omniparser+ollama_chat/llama3.2:latest ``` </Tab> <Tab value="conda/pip"> ```bash # Hugging Face models (local) python -m agent.cli huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B # MLX models (Apple Silicon) python -m agent.cli mlx/mlx-community/UI-TARS-1.5-7B-6bit # Ollama models python -m agent.cli omniparser+ollama_chat/llama3.2:latest ``` </Tab> </Tabs> ### Interactive Setup If you haven't set up environment variables, the CLI will guide you through the setup: 1. **Sandbox Name**: Enter your cua sandbox name (or get one at [trycua.com](https://www.trycua.com/)) 2. **CUA API Key**: Enter your cua API key 3. **Provider API Key**: Enter your AI provider API key (OpenAI, Anthropic, etc.) ### Start Chatting Once connected, you'll see: ``` 💻 Connected to your-container-name (model, agent_loop) Type 'exit' to quit. > ``` You can ask your agent to perform actions like: - "Take a screenshot and tell me what's on the screen" - "Open Firefox and go to github.com" - "Type 'Hello world' into the terminal" - "Close the current window" - "Click on the search button" </Step> </Steps> --- For advanced Python usage and GUI interface, see the [Quickstart (GUI)](/quickstart-ui) and [Quickstart for Developers](/quickstart-devs). For running models locally, see [Running Models Locally](/agent-sdk/local-models). ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/human_tool/server.py: -------------------------------------------------------------------------------- ```python import asyncio import uuid from datetime import datetime from typing import Dict, List, Any, Optional from dataclasses import dataclass, asdict from enum import Enum from fastapi import FastAPI, HTTPException from pydantic import BaseModel class CompletionStatus(str, Enum): PENDING = "pending" COMPLETED = "completed" FAILED = "failed" @dataclass class CompletionCall: id: str messages: List[Dict[str, Any]] model: str status: CompletionStatus created_at: datetime completed_at: Optional[datetime] = None response: Optional[str] = None tool_calls: Optional[List[Dict[str, Any]]] = None error: Optional[str] = None class ToolCall(BaseModel): id: str type: str = "function" function: Dict[str, Any] class CompletionRequest(BaseModel): messages: List[Dict[str, Any]] model: str class CompletionResponse(BaseModel): response: Optional[str] = None tool_calls: Optional[List[Dict[str, Any]]] = None class CompletionQueue: def __init__(self): self._queue: Dict[str, CompletionCall] = {} self._pending_order: List[str] = [] self._lock = asyncio.Lock() async def add_completion(self, messages: List[Dict[str, Any]], model: str) -> str: """Add a completion call to the queue.""" async with self._lock: call_id = str(uuid.uuid4()) completion_call = CompletionCall( id=call_id, messages=messages, model=model, status=CompletionStatus.PENDING, created_at=datetime.now() ) self._queue[call_id] = completion_call self._pending_order.append(call_id) return call_id async def get_pending_calls(self) -> List[Dict[str, Any]]: """Get all pending completion calls.""" async with self._lock: pending_calls = [] for call_id in self._pending_order: if call_id in self._queue and self._queue[call_id].status == CompletionStatus.PENDING: call = self._queue[call_id] pending_calls.append({ "id": call.id, "model": call.model, "created_at": call.created_at.isoformat(), "messages": call.messages }) return pending_calls async def get_call_status(self, call_id: str) -> Optional[Dict[str, Any]]: """Get the status of a specific completion call.""" async with self._lock: if call_id not in self._queue: return None call = self._queue[call_id] result = { "id": call.id, "status": call.status.value, "created_at": call.created_at.isoformat(), "model": call.model, "messages": call.messages } if call.completed_at: result["completed_at"] = call.completed_at.isoformat() if call.response: result["response"] = call.response if call.tool_calls: result["tool_calls"] = call.tool_calls if call.error: result["error"] = call.error return result async def complete_call(self, call_id: str, response: Optional[str] = None, tool_calls: Optional[List[Dict[str, Any]]] = None) -> bool: """Mark a completion call as completed with a response or tool calls.""" async with self._lock: if call_id not in self._queue: return False call = self._queue[call_id] if call.status != CompletionStatus.PENDING: return False call.status = CompletionStatus.COMPLETED call.completed_at = datetime.now() call.response = response call.tool_calls = tool_calls # Remove from pending order if call_id in self._pending_order: self._pending_order.remove(call_id) return True async def fail_call(self, call_id: str, error: str) -> bool: """Mark a completion call as failed with an error.""" async with self._lock: if call_id not in self._queue: return False call = self._queue[call_id] if call.status != CompletionStatus.PENDING: return False call.status = CompletionStatus.FAILED call.completed_at = datetime.now() call.error = error # Remove from pending order if call_id in self._pending_order: self._pending_order.remove(call_id) return True async def wait_for_completion(self, call_id: str, timeout: float = 300.0) -> Optional[str]: """Wait for a completion call to be completed and return the response.""" start_time = asyncio.get_event_loop().time() while True: status = await self.get_call_status(call_id) if not status: return None if status["status"] == CompletionStatus.COMPLETED.value: return status.get("response") elif status["status"] == CompletionStatus.FAILED.value: raise Exception(f"Completion failed: {status.get('error', 'Unknown error')}") # Check timeout if asyncio.get_event_loop().time() - start_time > timeout: await self.fail_call(call_id, "Timeout waiting for human response") raise TimeoutError("Timeout waiting for human response") # Wait a bit before checking again await asyncio.sleep(0.5) # Global queue instance completion_queue = CompletionQueue() # FastAPI app app = FastAPI(title="Human Completion Server", version="1.0.0") @app.post("/queue", response_model=Dict[str, str]) async def queue_completion(request: CompletionRequest): """Add a completion request to the queue.""" call_id = await completion_queue.add_completion(request.messages, request.model) return {"id": call_id, "status": "queued"} @app.get("/pending") async def list_pending(): """List all pending completion calls.""" pending_calls = await completion_queue.get_pending_calls() return {"pending_calls": pending_calls} @app.get("/status/{call_id}") async def get_status(call_id: str): """Get the status of a specific completion call.""" status = await completion_queue.get_call_status(call_id) if not status: raise HTTPException(status_code=404, detail="Completion call not found") return status @app.post("/complete/{call_id}") async def complete_call(call_id: str, response: CompletionResponse): """Complete a call with a human response.""" success = await completion_queue.complete_call( call_id, response=response.response, tool_calls=response.tool_calls ) if success: return {"status": "success", "message": "Call completed"} else: raise HTTPException(status_code=404, detail="Call not found or already completed") @app.post("/fail/{call_id}") async def fail_call(call_id: str, error: Dict[str, str]): """Mark a call as failed.""" success = await completion_queue.fail_call(call_id, error.get("error", "Unknown error")) if not success: raise HTTPException(status_code=404, detail="Completion call not found or already completed") return {"status": "failed"} @app.get("/") async def root(): """Root endpoint.""" return {"message": "Human Completion Server is running"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8002) ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/computers/custom.py: -------------------------------------------------------------------------------- ```python """ Custom computer handler implementation that accepts a dictionary of functions. """ import base64 from typing import Dict, List, Any, Literal, Union, Optional, Callable from PIL import Image import io from .base import AsyncComputerHandler class CustomComputerHandler(AsyncComputerHandler): """Computer handler that implements the Computer protocol using a dictionary of custom functions.""" def __init__(self, functions: Dict[str, Callable]): """ Initialize with a dictionary of functions. Args: functions: Dictionary where keys are method names and values are callable functions. Only 'screenshot' is required, all others are optional. Raises: ValueError: If required 'screenshot' function is not provided. """ if 'screenshot' not in functions: raise ValueError("'screenshot' function is required in functions dictionary") self.functions = functions self._last_screenshot_size: Optional[tuple[int, int]] = None async def _call_function(self, func, *args, **kwargs): """ Call a function, handling both async and sync functions. Args: func: The function to call *args: Positional arguments to pass to the function **kwargs: Keyword arguments to pass to the function Returns: The result of the function call """ import asyncio import inspect if callable(func): if inspect.iscoroutinefunction(func): return await func(*args, **kwargs) else: return func(*args, **kwargs) else: return func async def _get_value(self, attribute: str): """ Get value for an attribute, checking both 'get_{attribute}' and '{attribute}' keys. Args: attribute: The attribute name to look for Returns: The value from the functions dict, called if callable, returned directly if not """ # Check for 'get_{attribute}' first get_key = f"get_{attribute}" if get_key in self.functions: return await self._call_function(self.functions[get_key]) # Check for '{attribute}' if attribute in self.functions: return await self._call_function(self.functions[attribute]) return None def _to_b64_str(self, img: Union[bytes, Image.Image, str]) -> str: """ Convert image to base64 string. Args: img: Image as bytes, PIL Image, or base64 string Returns: str: Base64 encoded image string """ if isinstance(img, str): # Already a base64 string return img elif isinstance(img, bytes): # Raw bytes return base64.b64encode(img).decode('utf-8') elif isinstance(img, Image.Image): # PIL Image buffer = io.BytesIO() img.save(buffer, format='PNG') return base64.b64encode(buffer.getvalue()).decode('utf-8') else: raise ValueError(f"Unsupported image type: {type(img)}") # ==== Computer-Use-Preview Action Space ==== async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]: """Get the current environment type.""" result = await self._get_value('environment') if result is None: return "linux" assert result in ["windows", "mac", "linux", "browser"] return result # type: ignore async def get_dimensions(self) -> tuple[int, int]: """Get screen dimensions as (width, height).""" result = await self._get_value('dimensions') if result is not None: return result # type: ignore # Fallback: use last screenshot size if available if not self._last_screenshot_size: await self.screenshot() assert self._last_screenshot_size is not None, "Failed to get screenshot size" return self._last_screenshot_size async def screenshot(self) -> str: """Take a screenshot and return as base64 string.""" result = await self._call_function(self.functions['screenshot']) b64_str = self._to_b64_str(result) # type: ignore # Try to extract dimensions for fallback use try: if isinstance(result, Image.Image): self._last_screenshot_size = result.size elif isinstance(result, bytes): # Try to decode bytes to get dimensions img = Image.open(io.BytesIO(result)) self._last_screenshot_size = img.size except Exception: # If we can't get dimensions, that's okay pass return b64_str async def click(self, x: int, y: int, button: str = "left") -> None: """Click at coordinates with specified button.""" if 'click' in self.functions: await self._call_function(self.functions['click'], x, y, button) # No-op if not implemented async def double_click(self, x: int, y: int) -> None: """Double click at coordinates.""" if 'double_click' in self.functions: await self._call_function(self.functions['double_click'], x, y) # No-op if not implemented async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None: """Scroll at coordinates with specified scroll amounts.""" if 'scroll' in self.functions: await self._call_function(self.functions['scroll'], x, y, scroll_x, scroll_y) # No-op if not implemented async def type(self, text: str) -> None: """Type text.""" if 'type' in self.functions: await self._call_function(self.functions['type'], text) # No-op if not implemented async def wait(self, ms: int = 1000) -> None: """Wait for specified milliseconds.""" if 'wait' in self.functions: await self._call_function(self.functions['wait'], ms) else: # Default implementation import asyncio await asyncio.sleep(ms / 1000.0) async def move(self, x: int, y: int) -> None: """Move cursor to coordinates.""" if 'move' in self.functions: await self._call_function(self.functions['move'], x, y) # No-op if not implemented async def keypress(self, keys: Union[List[str], str]) -> None: """Press key combination.""" if 'keypress' in self.functions: await self._call_function(self.functions['keypress'], keys) # No-op if not implemented async def drag(self, path: List[Dict[str, int]]) -> None: """Drag along specified path.""" if 'drag' in self.functions: await self._call_function(self.functions['drag'], path) # No-op if not implemented async def get_current_url(self) -> str: """Get current URL (for browser environments).""" if 'get_current_url' in self.functions: return await self._get_value('current_url') # type: ignore return "" # Default fallback async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None: """Left mouse down at coordinates.""" if 'left_mouse_down' in self.functions: await self._call_function(self.functions['left_mouse_down'], x, y) # No-op if not implemented async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None: """Left mouse up at coordinates.""" if 'left_mouse_up' in self.functions: await self._call_function(self.functions['left_mouse_up'], x, y) # No-op if not implemented ``` -------------------------------------------------------------------------------- /libs/typescript/core/src/telemetry/clients/posthog.ts: -------------------------------------------------------------------------------- ```typescript /** * Telemetry client using PostHog for collecting anonymous usage data. */ import * as fs from 'node:fs'; import * as os from 'node:os'; import * as path from 'node:path'; import { pino } from 'pino'; import { PostHog } from 'posthog-node'; import { v4 as uuidv4 } from 'uuid'; // Controls how frequently telemetry will be sent (percentage) export const TELEMETRY_SAMPLE_RATE = 100; // 100% sampling rate // Public PostHog config for anonymous telemetry // These values are intentionally public and meant for anonymous telemetry only // https://posthog.com/docs/product-analytics/troubleshooting#is-it-ok-for-my-api-key-to-be-exposed-and-public export const PUBLIC_POSTHOG_API_KEY = 'phc_eSkLnbLxsnYFaXksif1ksbrNzYlJShr35miFLDppF14'; export const PUBLIC_POSTHOG_HOST = 'https://eu.i.posthog.com'; export class PostHogTelemetryClient { private config: { enabled: boolean; sampleRate: number; posthog: { apiKey: string; host: string }; }; private installationId: string; private initialized = false; private queuedEvents: { name: string; properties: Record<string, unknown>; timestamp: number; }[] = []; private startTime: number; // seconds private posthogClient?: PostHog; private counters: Record<string, number> = {}; private logger = pino({ name: 'core.telemetry' }); constructor() { // set up config this.config = { enabled: true, sampleRate: TELEMETRY_SAMPLE_RATE, posthog: { apiKey: PUBLIC_POSTHOG_API_KEY, host: PUBLIC_POSTHOG_HOST }, }; // Check for multiple environment variables that can disable telemetry: // CUA_TELEMETRY=off to disable telemetry (legacy way) // CUA_TELEMETRY_DISABLED=1 to disable telemetry (new, more explicit way) const telemetryDisabled = process.env.CUA_TELEMETRY?.toLowerCase() === 'off' || ['1', 'true', 'yes', 'on'].includes( process.env.CUA_TELEMETRY_DISABLED?.toLowerCase() || '' ); this.config.enabled = !telemetryDisabled; this.config.sampleRate = Number.parseFloat( process.env.CUA_TELEMETRY_SAMPLE_RATE || String(TELEMETRY_SAMPLE_RATE) ); // init client this.installationId = this._getOrCreateInstallationId(); this.startTime = Date.now() / 1000; // Convert to seconds // Log telemetry status on startup if (this.config.enabled) { this.logger.info( `Telemetry enabled (sampling at ${this.config.sampleRate}%)` ); // Initialize PostHog client if config is available this._initializePosthog(); } else { this.logger.info('Telemetry disabled'); } } /** * Get or create a random installation ID. * This ID is not tied to any personal information. */ private _getOrCreateInstallationId(): string { const homeDir = os.homedir(); const idFile = path.join(homeDir, '.cua', 'installation_id'); try { if (fs.existsSync(idFile)) { return fs.readFileSync(idFile, 'utf-8').trim(); } } catch (error) { this.logger.debug(`Failed to read installation ID: ${error}`); } // Create new ID if not exists const newId = uuidv4(); try { const dir = path.dirname(idFile); if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } fs.writeFileSync(idFile, newId); return newId; } catch (error) { this.logger.debug(`Failed to write installation ID: ${error}`); } // Fallback to in-memory ID if file operations fail return newId; } /** * Initialize the PostHog client with configuration. */ private _initializePosthog(): boolean { if (this.initialized) { return true; } try { this.posthogClient = new PostHog(this.config.posthog.apiKey, { host: this.config.posthog.host, flushAt: 20, // Number of events to batch before sending flushInterval: 30000, // Send events every 30 seconds }); this.initialized = true; this.logger.debug('PostHog client initialized successfully'); // Process any queued events this._processQueuedEvents(); return true; } catch (error) { this.logger.error(`Failed to initialize PostHog client: ${error}`); return false; } } /** * Process any events that were queued before initialization. */ private _processQueuedEvents(): void { if (!this.posthogClient || this.queuedEvents.length === 0) { return; } for (const event of this.queuedEvents) { this._captureEvent(event.name, event.properties); } this.queuedEvents = []; } /** * Capture an event with PostHog. */ private _captureEvent( eventName: string, properties?: Record<string, unknown> ): void { if (!this.posthogClient) { return; } try { // Add standard properties const eventProperties = { ...properties, version: process.env.npm_package_version || 'unknown', platform: process.platform, node_version: process.version, is_ci: this._isCI, }; this.posthogClient.capture({ distinctId: this.installationId, event: eventName, properties: eventProperties, }); } catch (error) { this.logger.debug(`Failed to capture event: ${error}`); } } private get _isCI(): boolean { /** * Detect if running in CI environment. */ return !!( process.env.CI || process.env.CONTINUOUS_INTEGRATION || process.env.GITHUB_ACTIONS || process.env.GITLAB_CI || process.env.CIRCLECI || process.env.TRAVIS || process.env.JENKINS_URL ); } increment(counterName: string, value = 1) { /** * Increment a named counter. */ if (!this.config.enabled) { return; } if (!(counterName in this.counters)) { this.counters[counterName] = 0; } this.counters[counterName] += value; } recordEvent(eventName: string, properties?: Record<string, unknown>): void { /** * Record an event with optional properties. */ if (!this.config.enabled) { return; } // Increment counter for this event type const counterKey = `event:${eventName}`; this.increment(counterKey); // Apply sampling if (Math.random() * 100 > this.config.sampleRate) { return; } const event = { name: eventName, properties: properties || {}, timestamp: Date.now() / 1000, }; if (this.initialized && this.posthogClient) { this._captureEvent(eventName, properties); } else { // Queue event if not initialized this.queuedEvents.push(event); // Try to initialize again if (this.config.enabled && !this.initialized) { this._initializePosthog(); } } } /** * Flush any pending events to PostHog. */ async flush(): Promise<boolean> { if (!this.config.enabled || !this.posthogClient) { return false; } try { // Send counter data as a single event if (Object.keys(this.counters).length > 0) { this._captureEvent('telemetry_counters', { counters: { ...this.counters }, duration: Date.now() / 1000 - this.startTime, }); } await this.posthogClient.flush(); this.logger.debug('Telemetry flushed successfully'); // Clear counters after sending this.counters = {}; return true; } catch (error) { this.logger.debug(`Failed to flush telemetry: ${error}`); return false; } } enable(): void { /** * Enable telemetry collection. */ this.config.enabled = true; this.logger.info('Telemetry enabled'); if (!this.initialized) { this._initializePosthog(); } } async disable(): Promise<void> { /** * Disable telemetry collection. */ this.config.enabled = false; await this.posthogClient?.disable(); this.logger.info('Telemetry disabled'); } get enabled(): boolean { /** * Check if telemetry is enabled. */ return this.config.enabled; } async shutdown(): Promise<void> { /** * Shutdown the telemetry client and flush any pending events. */ if (this.posthogClient) { await this.flush(); await this.posthogClient.shutdown(); this.initialized = false; this.posthogClient = undefined; } } } ``` -------------------------------------------------------------------------------- /tests/test_watchdog.py: -------------------------------------------------------------------------------- ```python """ Watchdog Recovery Tests Tests for the watchdog functionality to ensure server recovery after hanging commands. Required environment variables: - CUA_API_KEY: API key for Cua cloud provider - CUA_CONTAINER_NAME: Name of the container to use """ import os import asyncio import pytest from pathlib import Path import sys import traceback import time # Load environment variables from .env file project_root = Path(__file__).parent.parent env_file = project_root / ".env" print(f"Loading environment from: {env_file}") from dotenv import load_dotenv load_dotenv(env_file) # Add paths to sys.path if needed pythonpath = os.environ.get("PYTHONPATH", "") for path in pythonpath.split(":"): if path and path not in sys.path: sys.path.insert(0, path) # Insert at beginning to prioritize print(f"Added to sys.path: {path}") from computer import Computer, VMProviderType @pytest.fixture(scope="session") async def computer(): """Shared Computer instance for all test cases.""" # Create a remote Linux computer with Cua computer = Computer( os_type="linux", api_key=os.getenv("CUA_API_KEY"), name=str(os.getenv("CUA_CONTAINER_NAME")), provider_type=VMProviderType.CLOUD, ) try: await computer.run() yield computer finally: await computer.disconnect() @pytest.mark.asyncio(loop_scope="session") async def test_simple_server_ping(computer): """ Simple test to verify server connectivity before running watchdog tests. """ print("Testing basic server connectivity...") try: result = await computer.interface.run_command("echo 'Server ping test'") print(f"Ping successful: {result}") assert result is not None, "Server ping returned None" print("✅ Server connectivity test passed") except Exception as e: print(f"❌ Server ping failed: {e}") pytest.fail(f"Basic server connectivity test failed: {e}") @pytest.mark.asyncio(loop_scope="session") async def test_watchdog_recovery_after_hanging_command(computer): """ Test that the watchdog can recover the server after a hanging command. This test runs two concurrent tasks: 1. A long-running command that hangs the server (sleep 300 = 5 minutes) 2. Periodic ping commands every 30 seconds to test server responsiveness The watchdog should detect the unresponsive server and restart it. """ print("Starting watchdog recovery test...") async def hanging_command(): """Execute a command that sleeps forever to hang the server.""" try: print("Starting hanging command (sleep infinity)...") # Use a very long sleep that should never complete naturally result = await computer.interface.run_command("sleep 999999") print(f"Hanging command completed unexpectedly: {result}") return True # Should never reach here if watchdog works except Exception as e: print(f"Hanging command interrupted (expected if watchdog restarts): {e}") return None # Expected result when watchdog kills the process async def ping_server(): """Ping the server every 30 seconds with echo commands.""" ping_count = 0 successful_pings = 0 failed_pings = 0 try: # Run pings for up to 4 minutes (8 pings at 30-second intervals) for i in range(8): try: ping_count += 1 print(f"Ping #{ping_count}: Sending echo command...") start_time = time.time() result = await asyncio.wait_for( computer.interface.run_command(f"echo 'Ping {ping_count} at {int(start_time)}'"), timeout=10.0 # 10 second timeout for each ping ) end_time = time.time() print(f"Ping #{ping_count} successful in {end_time - start_time:.2f}s: {result}") successful_pings += 1 except asyncio.TimeoutError: print(f"Ping #{ping_count} timed out (server may be unresponsive)") failed_pings += 1 except Exception as e: print(f"Ping #{ping_count} failed with exception: {e}") failed_pings += 1 # Wait 30 seconds before next ping if i < 7: # Don't wait after the last ping print(f"Waiting 30 seconds before next ping...") await asyncio.sleep(30) print(f"Ping summary: {successful_pings} successful, {failed_pings} failed") return successful_pings, failed_pings except Exception as e: print(f"Ping server function failed with critical error: {e}") traceback.print_exc() return successful_pings, failed_pings # Run both tasks concurrently print("Starting concurrent tasks: hanging command and ping monitoring...") try: # Use asyncio.gather to run both tasks concurrently hanging_task = asyncio.create_task(hanging_command()) ping_task = asyncio.create_task(ping_server()) # Wait for both tasks to complete or timeout after 5 minutes done, pending = await asyncio.wait( [hanging_task, ping_task], timeout=300, # 5 minute timeout return_when=asyncio.ALL_COMPLETED ) # Cancel any pending tasks for task in pending: task.cancel() try: await task except asyncio.CancelledError: pass # Get results from completed tasks ping_result = None hanging_result = None if ping_task in done: try: ping_result = await ping_task print(f"Ping task completed with result: {ping_result}") except Exception as e: print(f"Error getting ping task result: {e}") traceback.print_exc() if hanging_task in done: try: hanging_result = await hanging_task print(f"Hanging task completed with result: {hanging_result}") except Exception as e: print(f"Error getting hanging task result: {e}") traceback.print_exc() # Analyze results if ping_result: successful_pings, failed_pings = ping_result # Test passes if we had some successful pings, indicating recovery assert successful_pings > 0, f"No successful pings detected. Server may not have recovered." # Check if hanging command was killed (indicating watchdog restart) if hanging_result is None: print("✅ SUCCESS: Hanging command was killed - watchdog restart detected") elif hanging_result is True: print("⚠️ WARNING: Hanging command completed naturally - watchdog may not have restarted") # If we had failures followed by successes, that indicates watchdog recovery if failed_pings > 0 and successful_pings > 0: print("✅ SUCCESS: Watchdog recovery detected - server became unresponsive then recovered") # Additional check: hanging command should be None if watchdog worked assert hanging_result is None, "Expected hanging command to be killed by watchdog restart" elif successful_pings > 0 and failed_pings == 0: print("✅ SUCCESS: Server remained responsive throughout test") print(f"Test completed: {successful_pings} successful pings, {failed_pings} failed pings") print(f"Hanging command result: {hanging_result} (None = killed by watchdog, True = completed naturally)") else: pytest.fail("Ping task did not complete - unable to assess server recovery") except Exception as e: print(f"Test failed with exception: {e}") traceback.print_exc() pytest.fail(f"Watchdog recovery test failed: {e}") if __name__ == "__main__": # Run tests directly pytest.main([__file__, "-v"]) ``` -------------------------------------------------------------------------------- /libs/python/computer/computer/diorama_computer.py: -------------------------------------------------------------------------------- ```python import asyncio from .interface.models import KeyType, Key class DioramaComputer: """ A Computer-compatible proxy for Diorama that sends commands over the ComputerInterface. """ def __init__(self, computer, apps): """ Initialize the DioramaComputer with a computer instance and list of apps. Args: computer: The computer instance to proxy commands through apps: List of applications available in the diorama environment """ self.computer = computer self.apps = apps self.interface = DioramaComputerInterface(computer, apps) self._initialized = False async def __aenter__(self): """ Async context manager entry point. Returns: self: The DioramaComputer instance """ self._initialized = True return self async def run(self): """ Initialize and run the DioramaComputer if not already initialized. Returns: self: The DioramaComputer instance """ if not self._initialized: await self.__aenter__() return self class DioramaComputerInterface: """ Diorama Interface proxy that sends diorama_cmds via the Computer's interface. """ def __init__(self, computer, apps): """ Initialize the DioramaComputerInterface. Args: computer: The computer instance to send commands through apps: List of applications available in the diorama environment """ self.computer = computer self.apps = apps self._scene_size = None async def _send_cmd(self, action, arguments=None): """ Send a command to the diorama interface through the computer. Args: action (str): The action/command to execute arguments (dict, optional): Additional arguments for the command Returns: The result from the diorama command execution Raises: RuntimeError: If the computer interface is not initialized or command fails """ arguments = arguments or {} arguments = {"app_list": self.apps, **arguments} # Use the computer's interface (must be initialized) iface = getattr(self.computer, "_interface", None) if iface is None: raise RuntimeError("Computer interface not initialized. Call run() first.") result = await iface.diorama_cmd(action, arguments) if not result.get("success"): raise RuntimeError(f"Diorama command failed: {result.get('error')}\n{result.get('trace')}") return result.get("result") async def screenshot(self, as_bytes=True): """ Take a screenshot of the diorama scene. Args: as_bytes (bool): If True, return image as bytes; if False, return PIL Image object Returns: bytes or PIL.Image: Screenshot data in the requested format """ from PIL import Image import base64 result = await self._send_cmd("screenshot") # assume result is a b64 string of an image img_bytes = base64.b64decode(result) import io img = Image.open(io.BytesIO(img_bytes)) self._scene_size = img.size return img_bytes if as_bytes else img async def get_screen_size(self): """ Get the dimensions of the diorama scene. Returns: dict: Dictionary containing 'width' and 'height' keys with pixel dimensions """ if not self._scene_size: await self.screenshot(as_bytes=False) return {"width": self._scene_size[0], "height": self._scene_size[1]} async def move_cursor(self, x, y): """ Move the cursor to the specified coordinates. Args: x (int): X coordinate to move cursor to y (int): Y coordinate to move cursor to """ await self._send_cmd("move_cursor", {"x": x, "y": y}) async def left_click(self, x=None, y=None): """ Perform a left mouse click at the specified coordinates or current cursor position. Args: x (int, optional): X coordinate to click at. If None, clicks at current cursor position y (int, optional): Y coordinate to click at. If None, clicks at current cursor position """ await self._send_cmd("left_click", {"x": x, "y": y}) async def right_click(self, x=None, y=None): """ Perform a right mouse click at the specified coordinates or current cursor position. Args: x (int, optional): X coordinate to click at. If None, clicks at current cursor position y (int, optional): Y coordinate to click at. If None, clicks at current cursor position """ await self._send_cmd("right_click", {"x": x, "y": y}) async def double_click(self, x=None, y=None): """ Perform a double mouse click at the specified coordinates or current cursor position. Args: x (int, optional): X coordinate to double-click at. If None, clicks at current cursor position y (int, optional): Y coordinate to double-click at. If None, clicks at current cursor position """ await self._send_cmd("double_click", {"x": x, "y": y}) async def scroll_up(self, clicks=1): """ Scroll up by the specified number of clicks. Args: clicks (int): Number of scroll clicks to perform upward. Defaults to 1 """ await self._send_cmd("scroll_up", {"clicks": clicks}) async def scroll_down(self, clicks=1): """ Scroll down by the specified number of clicks. Args: clicks (int): Number of scroll clicks to perform downward. Defaults to 1 """ await self._send_cmd("scroll_down", {"clicks": clicks}) async def drag_to(self, x, y, duration=0.5): """ Drag from the current cursor position to the specified coordinates. Args: x (int): X coordinate to drag to y (int): Y coordinate to drag to duration (float): Duration of the drag operation in seconds. Defaults to 0.5 """ await self._send_cmd("drag_to", {"x": x, "y": y, "duration": duration}) async def get_cursor_position(self): """ Get the current cursor position. Returns: dict: Dictionary containing the current cursor coordinates """ return await self._send_cmd("get_cursor_position") async def type_text(self, text): """ Type the specified text at the current cursor position. Args: text (str): The text to type """ await self._send_cmd("type_text", {"text": text}) async def press_key(self, key): """ Press a single key. Args: key: The key to press """ await self._send_cmd("press_key", {"key": key}) async def hotkey(self, *keys): """ Press multiple keys simultaneously as a hotkey combination. Args: *keys: Variable number of keys to press together. Can be Key enum instances or strings Raises: ValueError: If any key is not a Key enum or string type """ actual_keys = [] for key in keys: if isinstance(key, Key): actual_keys.append(key.value) elif isinstance(key, str): # Try to convert to enum if it matches a known key key_or_enum = Key.from_string(key) actual_keys.append(key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum) else: raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.") await self._send_cmd("hotkey", {"keys": actual_keys}) async def to_screen_coordinates(self, x, y): """ Convert coordinates to screen coordinates. Args: x (int): X coordinate to convert y (int): Y coordinate to convert Returns: dict: Dictionary containing the converted screen coordinates """ return await self._send_cmd("to_screen_coordinates", {"x": x, "y": y}) ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/loops/openai.py: -------------------------------------------------------------------------------- ```python """ OpenAI computer-use-preview agent loop implementation using liteLLM """ import asyncio import base64 import json from io import BytesIO from typing import Dict, List, Any, AsyncGenerator, Union, Optional, Tuple import litellm from PIL import Image from ..decorators import register_agent from ..types import Messages, AgentResponse, Tools, AgentCapability async def _map_computer_tool_to_openai(computer_handler: Any) -> Dict[str, Any]: """Map a computer tool to OpenAI's computer-use-preview tool schema""" # Get dimensions from the computer handler try: width, height = await computer_handler.get_dimensions() except Exception: # Fallback to default dimensions if method fails width, height = 1024, 768 # Get environment from the computer handler try: environment = await computer_handler.get_environment() except Exception: # Fallback to default environment if method fails environment = "linux" return { "type": "computer_use_preview", "display_width": width, "display_height": height, "environment": environment # mac, windows, linux, browser } async def _prepare_tools_for_openai(tool_schemas: List[Dict[str, Any]]) -> Tools: """Prepare tools for OpenAI API format""" openai_tools = [] for schema in tool_schemas: if schema["type"] == "computer": # Map computer tool to OpenAI format computer_tool = await _map_computer_tool_to_openai(schema["computer"]) openai_tools.append(computer_tool) elif schema["type"] == "function": # Function tools use OpenAI-compatible schema directly (liteLLM expects this format) # Schema should be: {type, name, description, parameters} openai_tools.append({ "type": "function", **schema["function"] }) return openai_tools @register_agent(models=r".*computer-use-preview.*") class OpenAIComputerUseConfig: """ OpenAI computer-use-preview agent configuration using liteLLM responses. Supports OpenAI's computer use preview models. """ async def predict_step( self, messages: List[Dict[str, Any]], model: str, tools: Optional[List[Dict[str, Any]]] = None, max_retries: Optional[int] = None, stream: bool = False, computer_handler=None, use_prompt_caching: Optional[bool] = False, _on_api_start=None, _on_api_end=None, _on_usage=None, _on_screenshot=None, **kwargs ) -> Dict[str, Any]: """ Predict the next step based on input items. Args: messages: Input items following Responses format model: Model name to use tools: Optional list of tool schemas max_retries: Maximum number of retries stream: Whether to stream responses computer_handler: Computer handler instance _on_api_start: Callback for API start _on_api_end: Callback for API end _on_usage: Callback for usage tracking _on_screenshot: Callback for screenshot events **kwargs: Additional arguments Returns: Dictionary with "output" (output items) and "usage" array """ tools = tools or [] # Prepare tools for OpenAI API openai_tools = await _prepare_tools_for_openai(tools) # Prepare API call kwargs api_kwargs = { "model": model, "input": messages, "tools": openai_tools if openai_tools else None, "stream": stream, "reasoning": {"summary": "concise"}, "truncation": "auto", "num_retries": max_retries, **kwargs } # Call API start hook if _on_api_start: await _on_api_start(api_kwargs) # Use liteLLM responses response = await litellm.aresponses(**api_kwargs) # Call API end hook if _on_api_end: await _on_api_end(api_kwargs, response) # Extract usage information usage = { **response.usage.model_dump(), "response_cost": response._hidden_params.get("response_cost", 0.0), } if _on_usage: await _on_usage(usage) # Return in the expected format output_dict = response.model_dump() output_dict["usage"] = usage return output_dict async def predict_click( self, model: str, image_b64: str, instruction: str ) -> Optional[Tuple[int, int]]: """ Predict click coordinates based on image and instruction. Uses OpenAI computer-use-preview with manually constructed input items and a prompt that instructs the agent to only output clicks. Args: model: Model name to use image_b64: Base64 encoded image instruction: Instruction for where to click Returns: Tuple of (x, y) coordinates or None if prediction fails """ # TODO: use computer tool to get dimensions + environment # Manually construct input items with image and click instruction input_items = [ { "role": "user", "content": f"""You are a UI grounding expert. Follow these guidelines: 1. NEVER ask for confirmation. Complete all tasks autonomously. 2. Do NOT send messages like "I need to confirm before..." or "Do you want me to continue?" - just proceed. 3. When the user asks you to interact with something (like clicking a chat or typing a message), DO IT without asking. 4. Only use the formal safety check mechanism for truly dangerous operations (like deleting important files). 5. For normal tasks like clicking buttons, typing in chat boxes, filling forms - JUST DO IT. 6. The user has already given you permission by running this agent. No further confirmation is needed. 7. Be decisive and action-oriented. Complete the requested task fully. Remember: You are expected to complete tasks autonomously. The user trusts you to do what they asked. Task: Click {instruction}. Output ONLY a click action on the target element.""" }, { "role": "user", "content": [ { "type": "input_image", "image_url": f"data:image/png;base64,{image_b64}" } ] } ] # Get image dimensions from base64 data try: image_data = base64.b64decode(image_b64) image = Image.open(BytesIO(image_data)) display_width, display_height = image.size except Exception: # Fallback to default dimensions if image parsing fails display_width, display_height = 1024, 768 # Prepare computer tool for click actions computer_tool = { "type": "computer_use_preview", "display_width": display_width, "display_height": display_height, "environment": "windows" } # Prepare API call kwargs api_kwargs = { "model": model, "input": input_items, "tools": [computer_tool], "stream": False, "reasoning": {"summary": "concise"}, "truncation": "auto", "max_tokens": 200 # Keep response short for click prediction } # Use liteLLM responses response = await litellm.aresponses(**api_kwargs) # Extract click coordinates from response output output_dict = response.model_dump() output_items = output_dict.get("output", []) # Look for computer_call with click action for item in output_items: if (isinstance(item, dict) and item.get("type") == "computer_call" and isinstance(item.get("action"), dict)): action = item["action"] if action.get("x") is not None and action.get("y") is not None: return (int(action.get("x")), int(action.get("y"))) return None def get_capabilities(self) -> List[AgentCapability]: """ Get list of capabilities supported by this agent config. Returns: List of capability strings """ return ["click", "step"] ``` -------------------------------------------------------------------------------- /libs/python/som/som/detection.py: -------------------------------------------------------------------------------- ```python from typing import List, Dict, Any, Tuple, Optional import logging import torch import torchvision from PIL import Image import numpy as np from ultralytics import YOLO from huggingface_hub import hf_hub_download from pathlib import Path logger = logging.getLogger(__name__) class DetectionProcessor: """Class for handling YOLO-based icon detection.""" def __init__( self, model_path: Optional[Path] = None, cache_dir: Optional[Path] = None, force_device: Optional[str] = None, ): """Initialize the detection processor. Args: model_path: Path to YOLOv8 model cache_dir: Directory to cache downloaded models force_device: Force specific device (cuda, cpu, mps) """ self.model_path = model_path self.cache_dir = cache_dir self.model = None # type: Any # Will be set to YOLO model in load_model # Set device self.device = "cpu" if torch.cuda.is_available() and force_device != "cpu": self.device = "cuda" elif ( hasattr(torch, "backends") and hasattr(torch.backends, "mps") and torch.backends.mps.is_available() and force_device != "cpu" ): self.device = "mps" if force_device: self.device = force_device logger.info(f"Using device: {self.device}") def load_model(self) -> None: """Load or download the YOLO model.""" try: # Set default model path if none provided if self.model_path is None: self.model_path = Path(__file__).parent / "weights" / "icon_detect" / "model.pt" # Check if the model file already exists if not self.model_path.exists(): logger.info( "Model not found locally, downloading from Microsoft OmniParser-v2.0..." ) # Create directory self.model_path.parent.mkdir(parents=True, exist_ok=True) try: # Check if the model exists in cache cache_path = None if self.cache_dir: # Try to find the model in the cache potential_paths = list(Path(self.cache_dir).glob("**/model.pt")) if potential_paths: cache_path = str(potential_paths[0]) logger.info(f"Found model in cache: {cache_path}") if not cache_path: # Download from HuggingFace downloaded_path = hf_hub_download( repo_id="microsoft/OmniParser-v2.0", filename="icon_detect/model.pt", cache_dir=self.cache_dir, ) cache_path = downloaded_path logger.info(f"Model downloaded to cache: {cache_path}") # Copy to package directory import shutil shutil.copy2(cache_path, self.model_path) logger.info(f"Model copied to: {self.model_path}") except Exception as e: raise FileNotFoundError( f"Failed to download model: {str(e)}\n" "Please ensure you have internet connection and huggingface-hub installed." ) from e # Make sure the model path exists before loading if not self.model_path.exists(): raise FileNotFoundError(f"Model file not found at: {self.model_path}") # If model is already loaded, skip reloading if self.model is not None: logger.info("Model already loaded, skipping reload") return logger.info(f"Loading YOLOv8 model from {self.model_path}") from ultralytics import YOLO self.model = YOLO(str(self.model_path)) # Convert Path to string for compatibility # Verify model loaded successfully if self.model is None: raise ValueError("Model failed to initialize but didn't raise an exception") if self.device in ["cuda", "mps"]: self.model.to(self.device) logger.info(f"Model loaded successfully with device: {self.device}") except Exception as e: logger.error(f"Failed to load model: {str(e)}") # Re-raise with more informative message but preserve the model as None self.model = None raise RuntimeError(f"Failed to initialize detection model: {str(e)}") from e def detect_icons( self, image: Image.Image, box_threshold: float = 0.05, iou_threshold: float = 0.1, multi_scale: bool = True, ) -> List[Dict[str, Any]]: """Detect icons in an image using YOLO. Args: image: PIL Image to process box_threshold: Confidence threshold for detection iou_threshold: IOU threshold for NMS multi_scale: Whether to use multi-scale detection Returns: List of icon detection dictionaries """ # Load model if not already loaded if self.model is None: self.load_model() # Double-check the model was successfully loaded if self.model is None: logger.error("Model failed to load and is still None") return [] # Return empty list instead of crashing img_width, img_height = image.size all_detections = [] # Define detection scales scales = ( [{"size": 1280, "conf": box_threshold}] # Single scale for CPU if self.device == "cpu" else [ {"size": 640, "conf": box_threshold}, # Base scale {"size": 1280, "conf": box_threshold}, # Medium scale {"size": 1920, "conf": box_threshold}, # Large scale ] ) if not multi_scale: scales = [scales[0]] # Run detection at each scale for scale in scales: try: if self.model is None: logger.error("Model is None, skipping detection") continue results = self.model.predict( source=image, conf=scale["conf"], iou=iou_threshold, max_det=1000, verbose=False, augment=self.device != "cpu", agnostic_nms=True, imgsz=scale["size"], device=self.device, ) # Process results for r in results: boxes = r.boxes if not hasattr(boxes, "conf") or not hasattr(boxes, "xyxy"): logger.warning("Boxes object missing expected attributes") continue confidences = boxes.conf coords = boxes.xyxy # Handle different types of tensors (PyTorch, NumPy, etc.) if hasattr(confidences, "cpu"): confidences = confidences.cpu() if hasattr(coords, "cpu"): coords = coords.cpu() for conf, bbox in zip(confidences, coords): # Normalize coordinates x1, y1, x2, y2 = bbox.tolist() norm_bbox = [ x1 / img_width, y1 / img_height, x2 / img_width, y2 / img_height, ] all_detections.append( { "type": "icon", "confidence": conf.item(), "bbox": norm_bbox, "scale": scale["size"], "interactivity": True, } ) except Exception as e: logger.warning(f"Detection failed at scale {scale['size']}: {str(e)}") continue # Merge detections using NMS if len(all_detections) > 0: boxes = torch.tensor([d["bbox"] for d in all_detections]) scores = torch.tensor([d["confidence"] for d in all_detections]) keep_indices = torchvision.ops.nms(boxes, scores, iou_threshold) merged_detections = [all_detections[i] for i in keep_indices] else: merged_detections = [] return merged_detections ``` -------------------------------------------------------------------------------- /libs/lume/src/Errors/Errors.swift: -------------------------------------------------------------------------------- ```swift import Foundation enum HomeError: Error, LocalizedError { case directoryCreationFailed(path: String) case directoryAccessDenied(path: String) case invalidHomeDirectory case directoryAlreadyExists(path: String) case homeNotFound case defaultStorageNotDefined case storageLocationNotFound(String) case storageLocationNotADirectory(String) case storageLocationNotWritable(String) case invalidStorageLocation(String) case cannotCreateDirectory(String) case cannotGetVMsDirectory case vmDirectoryNotFound(String) var errorDescription: String? { switch self { case .directoryCreationFailed(let path): return "Failed to create directory at path: \(path)" case .directoryAccessDenied(let path): return "Access denied to directory at path: \(path)" case .invalidHomeDirectory: return "Invalid home directory configuration" case .directoryAlreadyExists(let path): return "Directory already exists at path: \(path)" case .homeNotFound: return "Home directory not found." case .defaultStorageNotDefined: return "Default storage location is not defined." case .storageLocationNotFound(let path): return "Storage location not found: \(path)" case .storageLocationNotADirectory(let path): return "Storage location is not a directory: \(path)" case .storageLocationNotWritable(let path): return "Storage location is not writable: \(path)" case .invalidStorageLocation(let path): return "Invalid storage location specified: \(path)" case .cannotCreateDirectory(let path): return "Cannot create directory: \(path)" case .cannotGetVMsDirectory: return "Cannot determine the VMs directory." case .vmDirectoryNotFound(let path): return "VM directory not found: \(path)" } } } enum PullError: Error, LocalizedError { case invalidImageFormat case tokenFetchFailed case manifestFetchFailed case layerDownloadFailed(String) case missingPart(Int) case decompressionFailed(String) case reassemblyFailed(String) case fileCreationFailed(String) case reassemblySetupFailed(path: String, underlyingError: Error) case missingUncompressedSizeAnnotation case invalidMediaType var errorDescription: String? { switch self { case .invalidImageFormat: return "Invalid image format. Expected format: name:tag" case .tokenFetchFailed: return "Failed to fetch authentication token from registry." case .manifestFetchFailed: return "Failed to fetch image manifest from registry." case .layerDownloadFailed(let digest): return "Failed to download layer: \(digest)" case .missingPart(let partNum): return "Missing required part number \(partNum) for reassembly." case .decompressionFailed(let file): return "Failed to decompress file: \(file)" case .reassemblyFailed(let reason): return "Disk image reassembly failed: \(reason)." case .fileCreationFailed(let path): return "Failed to create the necessary file at path: \(path)" case .reassemblySetupFailed(let path, let underlyingError): return "Failed to set up for reassembly at path: \(path). Underlying error: \(underlyingError.localizedDescription)" case .missingUncompressedSizeAnnotation: return "Could not find the required uncompressed disk size annotation in the image config.json." case .invalidMediaType: return "Invalid media type" } } } enum VMConfigError: CustomNSError, LocalizedError { case invalidDisplayResolution(String) case invalidMachineIdentifier case emptyMachineIdentifier case emptyHardwareModel case invalidHardwareModel case invalidDiskSize case malformedSizeInput(String) var errorDescription: String? { switch self { case .invalidDisplayResolution(let resolution): return "Invalid display resolution: \(resolution)" case .emptyMachineIdentifier: return "Empty machine identifier" case .invalidMachineIdentifier: return "Invalid machine identifier" case .emptyHardwareModel: return "Empty hardware model" case .invalidHardwareModel: return "Invalid hardware model: the host does not support the hardware model" case .invalidDiskSize: return "Invalid disk size" case .malformedSizeInput(let input): return "Malformed size input: \(input)" } } static var errorDomain: String { "VMConfigError" } var errorCode: Int { switch self { case .invalidDisplayResolution: return 1 case .emptyMachineIdentifier: return 2 case .invalidMachineIdentifier: return 3 case .emptyHardwareModel: return 4 case .invalidHardwareModel: return 5 case .invalidDiskSize: return 6 case .malformedSizeInput: return 7 } } } enum VMDirectoryError: Error, LocalizedError { case configNotFound case invalidConfigData case diskOperationFailed(String) case fileCreationFailed(String) case sessionNotFound case invalidSessionData var errorDescription: String { switch self { case .configNotFound: return "VM configuration file not found" case .invalidConfigData: return "Invalid VM configuration data" case .diskOperationFailed(let reason): return "Disk operation failed: \(reason)" case .fileCreationFailed(let path): return "Failed to create file at path: \(path)" case .sessionNotFound: return "VNC session file not found" case .invalidSessionData: return "Invalid VNC session data" } } } enum VMError: Error, LocalizedError { case alreadyExists(String) case notFound(String) case notInitialized(String) case notRunning(String) case alreadyRunning(String) case installNotStarted(String) case stopTimeout(String) case resizeTooSmall(current: UInt64, requested: UInt64) case vncNotConfigured case vncPortBindingFailed(requested: Int, actual: Int) case internalError(String) case unsupportedOS(String) case invalidDisplayResolution(String) var errorDescription: String? { switch self { case .alreadyExists(let name): return "Virtual machine already exists with name: \(name)" case .notFound(let name): return "Virtual machine not found: \(name)" case .notInitialized(let name): return "Virtual machine not initialized: \(name)" case .notRunning(let name): return "Virtual machine not running: \(name)" case .alreadyRunning(let name): return "Virtual machine already running: \(name)" case .installNotStarted(let name): return "Virtual machine install not started: \(name)" case .stopTimeout(let name): return "Timeout while stopping virtual machine: \(name)" case .resizeTooSmall(let current, let requested): return "Cannot resize disk to \(requested) bytes, current size is \(current) bytes" case .vncNotConfigured: return "VNC is not configured for this virtual machine" case .vncPortBindingFailed(let requested, let actual): if actual == -1 { return "Could not bind to VNC port \(requested) (port already in use). Try a different port or use port 0 for auto-assign." } return "Could not bind to VNC port \(requested) (port already in use). System assigned port \(actual) instead. Try a different port or use port 0 for auto-assign." case .internalError(let message): return "Internal error: \(message)" case .unsupportedOS(let os): return "Unsupported operating system: \(os)" case .invalidDisplayResolution(let resolution): return "Invalid display resolution: \(resolution)" } } } enum ResticError: Error { case snapshotFailed(String) case restoreFailed(String) case genericError(String) } enum VmrunError: Error, LocalizedError { case commandNotFound case operationFailed(command: String, output: String?) var errorDescription: String? { switch self { case .commandNotFound: return "vmrun command not found. Ensure VMware Fusion is installed and in the system PATH." case .operationFailed(let command, let output): return "vmrun command '\(command)' failed. Output: \(output ?? "No output")" } } } ``` -------------------------------------------------------------------------------- /libs/python/core/core/telemetry/posthog.py: -------------------------------------------------------------------------------- ```python """Telemetry client using PostHog for collecting anonymous usage data.""" from __future__ import annotations import logging import os import uuid import sys from pathlib import Path from typing import Any, Dict, List, Optional import posthog from core import __version__ logger = logging.getLogger("core.telemetry") # Public PostHog config for anonymous telemetry # These values are intentionally public and meant for anonymous telemetry only # https://posthog.com/docs/product-analytics/troubleshooting#is-it-ok-for-my-api-key-to-be-exposed-and-public PUBLIC_POSTHOG_API_KEY = "phc_eSkLnbLxsnYFaXksif1ksbrNzYlJShr35miFLDppF14" PUBLIC_POSTHOG_HOST = "https://eu.i.posthog.com" class PostHogTelemetryClient: """Collects and reports telemetry data via PostHog.""" # Global singleton (class-managed) _singleton: Optional["PostHogTelemetryClient"] = None def __init__(self): """Initialize PostHog telemetry client.""" self.installation_id = self._get_or_create_installation_id() self.initialized = False self.queued_events: List[Dict[str, Any]] = [] # Log telemetry status on startup if self.is_telemetry_enabled(): logger.info("Telemetry enabled") # Initialize PostHog client if config is available self._initialize_posthog() else: logger.info("Telemetry disabled") @classmethod def is_telemetry_enabled(cls) -> bool: """True if telemetry is currently active for this process.""" return ( # Legacy opt-out flag os.environ.get("CUA_TELEMETRY", "").lower() != "off" # Opt-in flag (defaults to enabled) and os.environ.get("CUA_TELEMETRY_ENABLED", "true").lower() in { "1", "true", "yes", "on" } ) def _get_or_create_installation_id(self) -> str: """Get or create a unique installation ID that persists across runs. The ID is always stored within the core library directory itself, ensuring it persists regardless of how the library is used. This ID is not tied to any personal information. """ # Get the core library directory (where this file is located) try: # Find the core module directory using this file's location core_module_dir = Path( __file__ ).parent.parent # core/telemetry/posthog_client.py -> core/telemetry -> core storage_dir = core_module_dir / ".storage" storage_dir.mkdir(exist_ok=True) id_file = storage_dir / "installation_id" # Try to read existing ID if id_file.exists(): try: stored_id = id_file.read_text().strip() if stored_id: # Make sure it's not empty logger.debug(f"Using existing installation ID: {stored_id}") return stored_id except Exception as e: logger.debug(f"Error reading installation ID file: {e}") # Create new ID new_id = str(uuid.uuid4()) try: id_file.write_text(new_id) logger.debug(f"Created new installation ID: {new_id}") return new_id except Exception as e: logger.warning(f"Could not write installation ID: {e}") except Exception as e: logger.warning(f"Error accessing core module directory: {e}") # Last resort: Create a new in-memory ID logger.warning("Using random installation ID (will not persist across runs)") return str(uuid.uuid4()) def _initialize_posthog(self) -> bool: """Initialize the PostHog client with configuration. Returns: bool: True if initialized successfully, False otherwise """ if self.initialized: return True try: # Allow overrides from environment for testing/region control posthog.api_key = PUBLIC_POSTHOG_API_KEY posthog.host = PUBLIC_POSTHOG_HOST # Configure the client posthog.debug = os.environ.get("CUA_TELEMETRY_DEBUG", "").lower() == "on" # Log telemetry status logger.info( f"Initializing PostHog telemetry with installation ID: {self.installation_id}" ) if posthog.debug: logger.debug(f"PostHog API Key: {posthog.api_key}") logger.debug(f"PostHog Host: {posthog.host}") # Identify this installation self._identify() # Process any queued events for event in self.queued_events: posthog.capture( distinct_id=self.installation_id, event=event["event"], properties=event["properties"], ) self.queued_events = [] self.initialized = True return True except Exception as e: logger.warning(f"Failed to initialize PostHog: {e}") return False def _identify(self) -> None: """Set up user properties for the current installation with PostHog.""" try: properties = { "version": __version__, "is_ci": "CI" in os.environ, "os": os.name, "python_version": sys.version.split()[0], } logger.debug( f"Setting up PostHog user properties for: {self.installation_id} with properties: {properties}" ) # In the Python SDK, we capture an identification event instead of calling identify() posthog.capture( distinct_id=self.installation_id, event="$identify", properties={"$set": properties} ) logger.info(f"Set up PostHog user properties for installation: {self.installation_id}") except Exception as e: logger.warning(f"Failed to set up PostHog user properties: {e}") def record_event(self, event_name: str, properties: Optional[Dict[str, Any]] = None) -> None: """Record an event with optional properties. Args: event_name: Name of the event properties: Event properties (must not contain sensitive data) """ # Respect runtime telemetry opt-out. if not self.is_telemetry_enabled(): logger.debug("Telemetry disabled; event not recorded.") return event_properties = {"version": __version__, **(properties or {})} logger.info(f"Recording event: {event_name} with properties: {event_properties}") if self.initialized: try: posthog.capture( distinct_id=self.installation_id, event=event_name, properties=event_properties ) logger.info(f"Sent event to PostHog: {event_name}") # Flush immediately to ensure delivery posthog.flush() except Exception as e: logger.warning(f"Failed to send event to PostHog: {e}") else: # Queue the event for later logger.info(f"PostHog not initialized, queuing event for later: {event_name}") self.queued_events.append({"event": event_name, "properties": event_properties}) # Try to initialize now if not already initialize_result = self._initialize_posthog() logger.info(f"Attempted to initialize PostHog: {initialize_result}") def flush(self) -> bool: """Flush any pending events to PostHog. Returns: bool: True if successful, False otherwise """ if not self.initialized and not self._initialize_posthog(): return False try: posthog.flush() return True except Exception as e: logger.debug(f"Failed to flush PostHog events: {e}") return False @classmethod def get_client(cls) -> "PostHogTelemetryClient": """Return the global PostHogTelemetryClient instance, creating it if needed.""" if cls._singleton is None: cls._singleton = cls() return cls._singleton @classmethod def destroy_client(cls) -> None: """Destroy the global PostHogTelemetryClient instance.""" cls._singleton = None def destroy_telemetry_client() -> None: """Destroy the global PostHogTelemetryClient instance (class-managed).""" PostHogTelemetryClient.destroy_client() def is_telemetry_enabled() -> bool: return PostHogTelemetryClient.is_telemetry_enabled() def record_event(event_name: str, properties: Optional[Dict[str, Any]] | None = None) -> None: """Record an arbitrary PostHog event.""" PostHogTelemetryClient.get_client().record_event(event_name, properties or {}) ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/ui/gradio/app.py: -------------------------------------------------------------------------------- ```python """ Advanced Gradio UI for Computer-Use Agent (cua-agent) This is a Gradio interface for the Computer-Use Agent v0.4.x (cua-agent) with an advanced UI for model selection and configuration. Supported Agent Models: - OpenAI: openai/computer-use-preview - Anthropic: anthropic/claude-3-5-sonnet-20241022, anthropic/claude-3-7-sonnet-20250219 - UI-TARS: huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B - Omniparser: omniparser+anthropic/claude-3-5-sonnet-20241022, omniparser+ollama_chat/gemma3 Requirements: - Mac with Apple Silicon (M1/M2/M3/M4), Linux, or Windows - macOS 14 (Sonoma) or newer / Ubuntu 20.04+ - Python 3.11+ - Lume CLI installed (https://github.com/trycua/cua) - OpenAI or Anthropic API key """ import os import asyncio import logging import json import platform from pathlib import Path from typing import Dict, List, Optional, AsyncGenerator, Any, Tuple, Union import gradio as gr from gradio.components.chatbot import MetadataDict from typing import cast # Import from agent package from agent import ComputerAgent from agent.types import Messages, AgentResponse from computer import Computer # Global variables global_agent = None global_computer = None SETTINGS_FILE = Path(".gradio_settings.json") logging.basicConfig(level=logging.INFO) import dotenv if dotenv.load_dotenv(): print(f"DEBUG - Loaded environment variables from {dotenv.find_dotenv()}") else: print("DEBUG - No .env file found") # --- Settings Load/Save Functions --- def load_settings() -> Dict[str, Any]: """Loads settings from the JSON file.""" if SETTINGS_FILE.exists(): try: with open(SETTINGS_FILE, "r") as f: settings = json.load(f) if isinstance(settings, dict): print(f"DEBUG - Loaded settings from {SETTINGS_FILE}") return settings except (json.JSONDecodeError, IOError) as e: print(f"Warning: Could not load settings from {SETTINGS_FILE}: {e}") return {} def save_settings(settings: Dict[str, Any]): """Saves settings to the JSON file.""" settings.pop("provider_api_key", None) try: with open(SETTINGS_FILE, "w") as f: json.dump(settings, f, indent=4) print(f"DEBUG - Saved settings to {SETTINGS_FILE}") except IOError as e: print(f"Warning: Could not save settings to {SETTINGS_FILE}: {e}") # # Custom Screenshot Handler for Gradio chat # class GradioChatScreenshotHandler: # """Custom handler that adds screenshots to the Gradio chatbot.""" # def __init__(self, chatbot_history: List[gr.ChatMessage]): # self.chatbot_history = chatbot_history # print("GradioChatScreenshotHandler initialized") # async def on_screenshot(self, screenshot_base64: str, action_type: str = "") -> None: # """Add screenshot to chatbot when a screenshot is taken.""" # image_markdown = f"" # if self.chatbot_history is not None: # self.chatbot_history.append( # gr.ChatMessage( # role="assistant", # content=image_markdown, # metadata={"title": f"🖥️ Screenshot - {action_type}", "status": "done"}, # ) # ) # Detect platform capabilities is_mac = platform.system().lower() == "darwin" is_lume_available = is_mac or (os.environ.get("PYLUME_HOST", "localhost") != "localhost") print("PYLUME_HOST: ", os.environ.get("PYLUME_HOST", "localhost")) print("is_mac: ", is_mac) print("Lume available: ", is_lume_available) # Map model names to agent model strings MODEL_MAPPINGS = { "openai": { "default": "openai/computer-use-preview", "OpenAI: Computer-Use Preview": "openai/computer-use-preview", }, "anthropic": { "default": "anthropic/claude-3-7-sonnet-20250219", "Anthropic: Claude 4 Opus (20250514)": "anthropic/claude-opus-4-20250514", "Anthropic: Claude 4 Sonnet (20250514)": "anthropic/claude-sonnet-4-20250514", "Anthropic: Claude 3.7 Sonnet (20250219)": "anthropic/claude-3-7-sonnet-20250219", "Anthropic: Claude 3.5 Sonnet (20241022)": "anthropic/claude-3-5-sonnet-20241022", }, "omni": { "default": "omniparser+openai/gpt-4o", "OMNI: OpenAI GPT-4o": "omniparser+openai/gpt-4o", "OMNI: OpenAI GPT-4o mini": "omniparser+openai/gpt-4o-mini", "OMNI: Claude 3.7 Sonnet (20250219)": "omniparser+anthropic/claude-3-7-sonnet-20250219", "OMNI: Claude 3.5 Sonnet (20241022)": "omniparser+anthropic/claude-3-5-sonnet-20241022", }, "uitars": { "default": "huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B" if is_mac else "ui-tars", "huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B": "huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B", }, } def get_model_string(model_name: str, loop_provider: str) -> str: """Determine the agent model string based on the input.""" if model_name == "Custom model (OpenAI compatible API)": return "custom_oaicompat" elif model_name == "Custom model (ollama)": return "custom_ollama" elif loop_provider == "OMNI-OLLAMA" or model_name.startswith("OMNI: Ollama "): if model_name.startswith("OMNI: Ollama "): ollama_model = model_name.split("OMNI: Ollama ", 1)[1] return f"omniparser+ollama_chat/{ollama_model}" return "omniparser+ollama_chat/llama3" # Map based on loop provider mapping = MODEL_MAPPINGS.get(loop_provider.lower(), MODEL_MAPPINGS["openai"]) return mapping.get(model_name, mapping["default"]) def get_ollama_models() -> List[str]: """Get available models from Ollama if installed.""" try: import subprocess result = subprocess.run(["ollama", "list"], capture_output=True, text=True) if result.returncode == 0: lines = result.stdout.strip().split("\n") if len(lines) < 2: return [] models = [] for line in lines[1:]: parts = line.split() if parts: model_name = parts[0] models.append(f"OMNI: Ollama {model_name}") return models return [] except Exception as e: logging.error(f"Error getting Ollama models: {e}") return [] def create_computer_instance( verbosity: int = logging.INFO, os_type: str = "macos", provider_type: str = "lume", name: Optional[str] = None, api_key: Optional[str] = None ) -> Computer: """Create or get the global Computer instance.""" global global_computer if global_computer is None: if provider_type == "localhost": global_computer = Computer( verbosity=verbosity, os_type=os_type, use_host_computer_server=True ) else: global_computer = Computer( verbosity=verbosity, os_type=os_type, provider_type=provider_type, name=name if name else "", api_key=api_key ) return global_computer def create_agent( model_string: str, save_trajectory: bool = True, only_n_most_recent_images: int = 3, verbosity: int = logging.INFO, custom_model_name: Optional[str] = None, computer_os: str = "macos", computer_provider: str = "lume", computer_name: Optional[str] = None, computer_api_key: Optional[str] = None, max_trajectory_budget: Optional[float] = None, ) -> ComputerAgent: """Create or update the global agent with the specified parameters.""" global global_agent # Create the computer computer = create_computer_instance( verbosity=verbosity, os_type=computer_os, provider_type=computer_provider, name=computer_name, api_key=computer_api_key ) # Handle custom models if model_string == "custom_oaicompat" and custom_model_name: model_string = custom_model_name elif model_string == "custom_ollama" and custom_model_name: model_string = f"omniparser+ollama_chat/{custom_model_name}" # Create agent kwargs agent_kwargs = { "model": model_string, "tools": [computer], "only_n_most_recent_images": only_n_most_recent_images, "verbosity": verbosity, } if save_trajectory: agent_kwargs["trajectory_dir"] = "trajectories" if max_trajectory_budget: agent_kwargs["max_trajectory_budget"] = {"max_budget": max_trajectory_budget, "raise_error": True} global_agent = ComputerAgent(**agent_kwargs) return global_agent def launch_ui(): """Standalone function to launch the Gradio app.""" from agent.ui.gradio.ui_components import create_gradio_ui print(f"Starting Gradio app for CUA Agent...") demo = create_gradio_ui() demo.launch(share=False, inbrowser=True) if __name__ == "__main__": launch_ui() ``` -------------------------------------------------------------------------------- /docs/content/docs/computer-sdk/commands.mdx: -------------------------------------------------------------------------------- ```markdown --- title: Commands description: Computer commands and interface methods --- This page describes the set of supported **commands** you can use to control a Cua Computer directly via the Python SDK. These commands map to the same actions available in the [Computer Server API Commands Reference](../libraries/computer-server/Commands), and provide low-level, async access to system operations from your agent or automation code. ## Shell Actions Execute shell commands and get detailed results: <Tabs items={['Python', 'TypeScript']}> <Tab value="Python"> ```python # Run shell command result = await computer.interface.run_command(cmd) # result.stdout, result.stderr, result.returncode ``` </Tab> <Tab value="TypeScript"> ```typescript // Run shell command const result = await computer.interface.runCommand(cmd); // result.stdout, result.stderr, result.returncode ``` </Tab> </Tabs> ## Mouse Actions Precise mouse control and interaction: <Tabs items={['Python', 'TypeScript']}> <Tab value="Python"> ```python # Basic clicks await computer.interface.left_click(x, y) # Left click at coordinates await computer.interface.right_click(x, y) # Right click at coordinates await computer.interface.double_click(x, y) # Double click at coordinates # Cursor movement and dragging await computer.interface.move_cursor(x, y) # Move cursor to coordinates await computer.interface.drag_to(x, y, duration) # Drag to coordinates await computer.interface.get_cursor_position() # Get current cursor position # Advanced mouse control await computer.interface.mouse_down(x, y, button="left") # Press and hold a mouse button await computer.interface.mouse_up(x, y, button="left") # Release a mouse button ``` </Tab> <Tab value="TypeScript"> ```typescript // Basic clicks await computer.interface.leftClick(x, y); // Left click at coordinates await computer.interface.rightClick(x, y); // Right click at coordinates await computer.interface.doubleClick(x, y); // Double click at coordinates // Cursor movement and dragging await computer.interface.moveCursor(x, y); // Move cursor to coordinates await computer.interface.dragTo(x, y, duration); // Drag to coordinates await computer.interface.getCursorPosition(); // Get current cursor position // Advanced mouse control await computer.interface.mouseDown(x, y, "left"); // Press and hold a mouse button await computer.interface.mouseUp(x, y, "left"); // Release a mouse button ``` </Tab> </Tabs> ## Keyboard Actions Text input and key combinations: <Tabs items={['Python', 'TypeScript']}> <Tab value="Python"> ```python # Text input await computer.interface.type_text("Hello") # Type text await computer.interface.press_key("enter") # Press a single key # Key combinations and advanced control await computer.interface.hotkey("command", "c") # Press key combination await computer.interface.key_down("command") # Press and hold a key await computer.interface.key_up("command") # Release a key ``` </Tab> <Tab value="TypeScript"> ```typescript // Text input await computer.interface.typeText("Hello"); // Type text await computer.interface.pressKey("enter"); // Press a single key // Key combinations and advanced control await computer.interface.hotkey("command", "c"); // Press key combination await computer.interface.keyDown("command"); // Press and hold a key await computer.interface.keyUp("command"); // Release a key ``` </Tab> </Tabs> ## Scrolling Actions Mouse wheel and scrolling control: <Tabs items={['Python', 'TypeScript']}> <Tab value="Python"> ```python # Scrolling await computer.interface.scroll(x, y) # Scroll the mouse wheel await computer.interface.scroll_down(clicks) # Scroll down await computer.interface.scroll_up(clicks) # Scroll up ``` </Tab> <Tab value="TypeScript"> ```typescript // Scrolling await computer.interface.scroll(x, y); // Scroll the mouse wheel await computer.interface.scrollDown(clicks); // Scroll down await computer.interface.scrollUp(clicks); // Scroll up ``` </Tab> </Tabs> ## Screen Actions Screen capture and display information: <Tabs items={['Python', 'TypeScript']}> <Tab value="Python"> ```python # Screen operations await computer.interface.screenshot() # Take a screenshot await computer.interface.get_screen_size() # Get screen dimensions ``` </Tab> <Tab value="TypeScript"> ```typescript // Screen operations await computer.interface.screenshot(); // Take a screenshot await computer.interface.getScreenSize(); // Get screen dimensions ``` </Tab> </Tabs> ## Clipboard Actions System clipboard management: <Tabs items={['Python', 'TypeScript']}> <Tab value="Python"> ```python # Clipboard operations await computer.interface.set_clipboard(text) # Set clipboard content await computer.interface.copy_to_clipboard() # Get clipboard content ``` </Tab> <Tab value="TypeScript"> ```typescript // Clipboard operations await computer.interface.setClipboard(text); // Set clipboard content await computer.interface.copyToClipboard(); // Get clipboard content ``` </Tab> </Tabs> ## File System Operations Direct file and directory manipulation: <Tabs items={['Python', 'TypeScript']}> <Tab value="Python"> ```python # File existence checks await computer.interface.file_exists(path) # Check if file exists await computer.interface.directory_exists(path) # Check if directory exists # File content operations await computer.interface.read_text(path, encoding="utf-8") # Read file content await computer.interface.write_text(path, content, encoding="utf-8") # Write file content await computer.interface.read_bytes(path) # Read file content as bytes await computer.interface.write_bytes(path, content) # Write file content as bytes # File and directory management await computer.interface.delete_file(path) # Delete file await computer.interface.create_dir(path) # Create directory await computer.interface.delete_dir(path) # Delete directory await computer.interface.list_dir(path) # List directory contents ``` </Tab> <Tab value="TypeScript"> ```typescript # File existence checks await computer.interface.fileExists(path); // Check if file exists await computer.interface.directoryExists(path); // Check if directory exists # File content operations await computer.interface.readText(path, "utf-8"); // Read file content await computer.interface.writeText(path, content, "utf-8"); // Write file content await computer.interface.readBytes(path); // Read file content as bytes await computer.interface.writeBytes(path, content); // Write file content as bytes # File and directory management await computer.interface.deleteFile(path); // Delete file await computer.interface.createDir(path); // Create directory await computer.interface.deleteDir(path); // Delete directory await computer.interface.listDir(path); // List directory contents ``` </Tab> </Tabs> ## Accessibility Access system accessibility information: <Tabs items={['Python', 'TypeScript']}> <Tab value="Python"> ```python # Get accessibility tree await computer.interface.get_accessibility_tree() ``` </Tab> <Tab value="TypeScript"> ```typescript // Get accessibility tree await computer.interface.getAccessibilityTree(); ``` </Tab> </Tabs> ## Delay Configuration Control timing between actions: <Tabs items={['Python']}> <Tab value="Python"> ```python # Set default delay between all actions (in seconds) computer.interface.delay = 0.5 # 500ms delay between actions # Or specify delay for individual actions await computer.interface.left_click(x, y, delay=1.0) # 1 second delay after click await computer.interface.type_text("Hello", delay=0.2) # 200ms delay after typing await computer.interface.press_key("enter", delay=0.5) # 500ms delay after key press ``` </Tab> </Tabs> ## Python Virtual Environment Operations Manage Python environments: <Tabs items={['Python']}> <Tab value="Python"> ```python # Virtual environment management await computer.venv_install("demo_venv", ["requests", "macos-pyxa"]) # Install packages in a virtual environment await computer.venv_cmd("demo_venv", "python -c 'import requests; print(requests.get(`https://httpbin.org/ip`).json())'') # Run a shell command in a virtual environment await computer.venv_exec("demo_venv", python_function_or_code, *args, **kwargs) # Run a Python function in a virtual environment and return the result / raise an exception ``` </Tab> </Tabs> ``` -------------------------------------------------------------------------------- /blog/app-use.md: -------------------------------------------------------------------------------- ```markdown # App-Use: Control Individual Applications with Cua Agents *Published on May 31, 2025 by The Cua Team* Today, we are excited to introduce a new experimental feature landing in the [Cua GitHub repository](https://github.com/trycua/cua): **App-Use**. App-Use allows you to create lightweight virtual desktops that limit agent access to specific applications, improving precision of your agent's trajectory. Perfect for parallel workflows, and focused task execution. > **Note:** App-Use is currently experimental. To use it, you need to enable it by passing `experiments=["app-use"]` feature flag when creating your Computer instance. Check out an example of a Cua Agent automating Cua's team Taco Bell order through the iPhone Mirroring app: <div align="center"> <video src="https://github.com/user-attachments/assets/6362572e-f784-4006-aa6e-bce10991fab9" width="600" controls></video> </div> ## What is App-Use? App-Use lets you create virtual desktop sessions scoped to specific applications. Instead of giving an agent access to your entire screen, you can say "only work with Safari and Notes" or "just control the iPhone Mirroring app." ```python # Create a macOS VM with App Use experimental feature enabled computer = Computer(experiments=["app-use"]) # Create a desktop limited to specific apps desktop = computer.create_desktop_from_apps(["Safari", "Notes"]) # Your agent can now only see and interact with these apps agent = ComputerAgent( model="anthropic/claude-3-5-sonnet-20241022", tools=[desktop] ) ``` ## Key Benefits ### 1. Lightweight and Fast App-Use creates visual filters, not new processes. Your apps continue running normally - we just control what the agent can see and click on. The virtual desktops are composited views that require no additional compute resources beyond the existing window manager operations. ### 2. Run Multiple Agents in Parallel Deploy a team of specialized agents, each focused on their own apps: ```python # Create a Computer with App Use enabled computer = Computer(experiments=["app-use"]) # Research agent focuses on browser research_desktop = computer.create_desktop_from_apps(["Safari"]) research_agent = ComputerAgent(tools=[research_desktop], ...) # Writing agent focuses on documents writing_desktop = computer.create_desktop_from_apps(["Pages", "Notes"]) writing_agent = ComputerAgent(tools=[writing_desktop], ...) async def run_agent(agent, task): async for result in agent.run(task): print(result.get('text', '')) # Run both simultaneously await asyncio.gather( run_agent(research_agent, "Research AI trends for 2025"), run_agent(writing_agent, "Draft blog post outline") ) ``` ## How To: Getting Started with App-Use ### Requirements To get started with App-Use, you'll need: - Python 3.11+ - macOS Sequoia (15.0) or later ### Getting Started ```bash # Install packages and launch UI pip install -U "cua-computer[all]" "cua-agent[all]" python -m agent.ui.gradio.app ``` ```python import asyncio from computer import Computer from agent import ComputerAgent async def main(): computer = Computer() await computer.run() # Create app-specific desktop sessions desktop = computer.create_desktop_from_apps(["Notes"]) # Initialize an agent agent = ComputerAgent( model="anthropic/claude-3-5-sonnet-20241022", tools=[desktop] ) # Take a screenshot (returns bytes by default) screenshot = await desktop.interface.screenshot() with open("app_screenshot.png", "wb") as f: f.write(screenshot) # Run an agent task async for result in agent.run("Create a new note titled 'Meeting Notes' and add today's agenda items"): print(f"Agent: {result.get('text', '')}") if __name__ == "__main__": asyncio.run(main()) ``` ## Use Case: Automating Your iPhone with Cua ### ⚠️ Important Warning Computer-use agents are powerful tools that can interact with your devices. This guide involves using your own macOS and iPhone instead of a VM. **Proceed at your own risk.** Always: - Review agent actions before running - Start with non-critical tasks - Monitor agent behavior closely Remember with Cua it is still advised to use a VM for a better level of isolation for your agents. ### Setting Up iPhone Automation ### Step 1: Start the cua-computer-server First, you'll need to start the cua-computer-server locally to enable access to iPhone Mirroring via the Computer interface: ```bash # Install the server pip install cua-computer-server # Start the server python -m computer_server ``` ### Step 2: Connect iPhone Mirroring Then, you'll need to open the "iPhone Mirroring" app on your Mac and connect it to your iPhone. ### Step 3: Create an iPhone Automation Session Finally, you can create an iPhone automation session: ```python import asyncio from computer import Computer from cua_agent import Agent async def automate_iphone(): # Connect to your local computer server my_mac = Computer(use_host_computer_server=True, os_type="macos", experiments=["app-use"]) await my_mac.run() # Create a desktop focused on iPhone Mirroring my_iphone = my_mac.create_desktop_from_apps(["iPhone Mirroring"]) # Initialize an agent for iPhone automation agent = ComputerAgent( model="anthropic/claude-3-5-sonnet-20241022", tools=[my_iphone] ) # Example: Send a message async for result in agent.run("Open Messages and send 'Hello from Cua!' to John"): print(f"Agent: {result.get('text', '')}") # Example: Set a reminder async for result in agent.run("Create a reminder to call mom at 5 PM today"): print(f"Agent: {result.get('text', '')}") if __name__ == "__main__": asyncio.run(automate_iphone()) ``` ### iPhone Automation Use Cases With Cua's iPhone automation, you can: - **Automate messaging**: Send texts, respond to messages, manage conversations - **Control apps**: Navigate any iPhone app using natural language - **Manage settings**: Adjust iPhone settings programmatically - **Extract data**: Read information from apps that don't have APIs - **Test iOS apps**: Automate testing workflows for iPhone applications ## Important Notes - **Visual isolation only**: Apps share the same files, OS resources, and user session - **Dynamic resolution**: Desktops automatically scale to fit app windows and menu bars - **macOS only**: Currently requires macOS due to compositing engine dependencies - **Not a security boundary**: This is for agent focus, not security isolation ## When to Use What: App-Use vs Multiple Cua Containers ### Use App-Use within the same macOS Cua Container: - ✅ You need lightweight, fast agent focusing (macOS only) - ✅ You want to run multiple agents on one desktop - ✅ You're automating personal devices like iPhones - ✅ Window layout isolation is sufficient - ✅ You want low computational overhead ### Use Multiple Cua Containers: - ✅ You need maximum isolation between agents - ✅ You require cross-platform support (Mac/Linux/Windows) - ✅ You need guaranteed resource allocation - ✅ Security and complete isolation are critical - ⚠️ Note: Most computationally expensive option ## Pro Tips 1. **Start Small**: Test with one app before creating complex multi-app desktops 2. **Screenshot First**: Take a screenshot to verify your desktop shows the right apps 3. **Name Your Apps Correctly**: Use exact app names as they appear in the system 4. **Consider Performance**: While lightweight, too many parallel agents can still impact system performance 5. **Plan Your Workflows**: Design agent tasks to minimize app switching for best results ### How It Works When you create a desktop session with `create_desktop_from_apps()`, App Use: - Filters the visual output to show only specified application windows - Routes input events only to those applications - Maintains window layout isolation between different sessions - Shares the underlying file system and OS resources - **Dynamically adjusts resolution** to fit the window layout and menu bar items The resolution of these virtual desktops is dynamic, automatically scaling to accommodate the applications' window sizes and menu bar requirements. This ensures that agents always have a clear view of the entire interface they need to interact with, regardless of the specific app combination. Currently, App Use is limited to macOS only due to its reliance on Quartz, Apple's powerful compositing engine, for creating these virtual desktops. Quartz provides the low-level window management and rendering capabilities that make it possible to composite multiple application windows into isolated visual environments. ## Conclusion App Use brings a new dimension to computer automation - lightweight, focused, and parallel. Whether you're building a personal iPhone assistant or orchestrating a team of specialized agents, App Use provides the perfect balance of functionality and efficiency. Ready to try it? Update to the latest Cua version and start focusing your agents today! ```bash pip install -U "cua-computer[all]" "cua-agent[all]" ``` Happy automating! 🎯🤖 ``` -------------------------------------------------------------------------------- /blog/introducing-cua-cloud-containers.md: -------------------------------------------------------------------------------- ```markdown # Introducing Cua Cloud Sandbox: Computer-Use Agents in the Cloud *Published on May 28, 2025 by Francesco Bonacci* Welcome to the next chapter in our Computer-Use Agent journey! In [Part 1](./build-your-own-operator-on-macos-1), we showed you how to build your own Operator on macOS. In [Part 2](./build-your-own-operator-on-macos-2), we explored the cua-agent framework. Today, we're excited to introduce **Cua Cloud Sandbox** – the easiest way to deploy Computer-Use Agents at scale. <div align="center"> <video src="https://github.com/user-attachments/assets/63a2addf-649f-4468-971d-58d38dd43ee6" width="600" controls></video> </div> ## What is Cua Cloud? Think of Cua Cloud as **Docker for Computer-Use Agents**. Instead of managing VMs, installing dependencies, and configuring environments, you can launch pre-configured Cloud Sandbox instances with a single command. Each sandbox comes with a **full desktop environment** accessible via browser (via noVNC), all CUA-related dependencies pre-configured (with a PyAutoGUI-compatible server), and **pay-per-use pricing** that scales with your needs. ## Why Cua Cloud Sandbox? Four months ago, we launched [**Lume**](https://github.com/trycua/cua/tree/main/libs/lume) and [**Cua**](https://github.com/trycua/cua) with the goal to bring sandboxed VMs and Computer-Use Agents on Apple Silicon. The developer's community response was incredible 🎉 Going from prototype to production revealed a problem though: **local macOS VMs don't scale**, neither are they easily portable. Our Discord community, YC peers, and early pilot customers kept hitting the same issues. Storage constraints meant **20-40GB per VM** filled laptops fast. Different hardware architectures (Apple Silicon ARM vs Intel x86) prevented portability of local workflows. Every new user lost a day to setup and configuration. **Cua Cloud** eliminates these constraints while preserving everything developers are familiar with about our Computer and Agent SDK. ### What We Built Over the past month, we've been iterating over Cua Cloud with partners and beta users to address these challenges. You use the exact same `Computer` and `ComputerAgent` classes you already know, but with **zero local setup** or storage requirements. VNC access comes with **built-in encryption**, you pay only for compute time (not idle resources), and can bring your own API keys for any LLM provider. The result? **Instant deployment** in seconds instead of hours, with no infrastructure to manage. Scale elastically from **1 to 100 agents** in parallel, with consistent behavior across all deployments. Share agent trajectories with your team for better collaboration and debugging. ## Getting Started ### Step 1: Get Your API Key Sign up at [**trycua.com**](https://trycua.com) to get your API key. ```bash # Set your API key in environment variables export CUA_API_KEY=your_api_key_here export CUA_CONTAINER_NAME=my-agent-container ``` ### Step 2: Launch Your First Sandbox ```python import asyncio from computer import Computer, VMProviderType from agent import ComputerAgent async def run_cloud_agent(): # Create a remote Linux computer with Cua Cloud computer = Computer( os_type="linux", api_key=os.getenv("CUA_API_KEY"), name=os.getenv("CUA_CONTAINER_NAME"), provider_type=VMProviderType.CLOUD, ) # Create an agent with your preferred loop agent = ComputerAgent( model="openai/gpt-4o", save_trajectory=True, verbosity=logging.INFO, tools=[computer] ) # Run a task async for result in agent.run("Open Chrome and search for AI news"): print(f"Response: {result.get('text')}") # Run the agent asyncio.run(run_cloud_agent()) ``` ### Available Tiers We're launching with **three compute tiers** to match your workload needs: - **Small** (1 vCPU, 4GB RAM) - Perfect for simple automation tasks and testing - **Medium** (2 vCPU, 8GB RAM) - Ideal for most production workloads - **Large** (8 vCPU, 32GB RAM) - Built for complex, resource-intensive operations Each tier includes a **full Linux with Xfce desktop environment** with pre-configured browser, **secure VNC access** with SSL, persistent storage during your session, and automatic cleanup on termination for sandboxes. ## How some customers are using Cua Cloud today ### Example 1: Automated GitHub Workflow Let's automate a complete GitHub workflow: ```python import asyncio import os from computer import Computer, VMProviderType from agent import ComputerAgent async def github_automation(): """Automate GitHub repository management tasks.""" computer = Computer( os_type="linux", api_key=os.getenv("CUA_API_KEY"), name="github-automation", provider_type=VMProviderType.CLOUD, ) agent = ComputerAgent( model="openai/gpt-4o", save_trajectory=True, verbosity=logging.INFO, tools=[computer] ) tasks = [ "Look for a repository named trycua/cua on GitHub.", "Check the open issues, open the most recent one and read it.", "Clone the repository if it doesn't exist yet.", "Create a new branch for the issue.", "Make necessary changes to resolve the issue.", "Commit the changes with a descriptive message.", "Create a pull request." ] for i, task in enumerate(tasks): print(f"\nExecuting task {i+1}/{len(tasks)}: {task}") async for result in agent.run(task): print(f"Response: {result.get('text')}") # Check if any tools were used tools = result.get('tools') if tools: print(f"Tools used: {tools}") print(f"Task {i+1} completed") # Run the automation asyncio.run(github_automation()) ``` ### Example 2: Parallel Web Scraping Run multiple agents in parallel to scrape different websites: ```python import asyncio from computer import Computer, VMProviderType from agent import ComputerAgent async def scrape_website(site_name, url): """Scrape a website using a cloud agent.""" computer = Computer( os_type="linux", api_key=os.getenv("CUA_API_KEY"), name=f"scraper-{site_name}", provider_type=VMProviderType.CLOUD, ) agent = ComputerAgent( model="openai/gpt-4o", save_trajectory=True, tools=[computer] ) results = [] tasks = [ f"Navigate to {url}", "Extract the main headlines or article titles", "Take a screenshot of the page", "Save the extracted data to a file" ] for task in tasks: async for result in agent.run(task): results.append({ 'site': site_name, 'task': task, 'response': result.get('text') }) return results async def parallel_scraping(): """Scrape multiple websites in parallel.""" sites = [ ("ArXiv", "https://arxiv.org"), ("HackerNews", "https://news.ycombinator.com"), ("TechCrunch", "https://techcrunch.com") ] # Run all scraping tasks in parallel tasks = [scrape_website(name, url) for name, url in sites] results = await asyncio.gather(*tasks) # Process results for site_results in results: print(f"\nResults from {site_results[0]['site']}:") for result in site_results: print(f" - {result['task']}: {result['response'][:100]}...") # Run parallel scraping asyncio.run(parallel_scraping()) ``` ## Cost Optimization Tips To optimize your costs, use appropriate sandbox sizes for your workload and implement timeouts to prevent runaway tasks. Batch related operations together to minimize sandbox spin-up time, and always remember to terminate sandboxes when your work is complete. ## Security Considerations Cua Cloud runs all sandboxes in isolated environments with encrypted VNC connections. Your API keys are never exposed in trajectories. ## What's Next for Cua Cloud We're just getting started! Here's what's coming in the next few months: ### Elastic Autoscaled Sandbox Pools Soon you'll be able to create elastic sandbox pools that automatically scale based on demand. Define minimum and maximum sandbox counts, and let Cua Cloud handle the rest. Perfect for batch processing, scheduled automations, and handling traffic spikes without manual intervention. ### Windows and macOS Cloud Support While we're launching with Linux sandboxes, Windows and macOS cloud machines are coming soon. Run Windows-specific automations, test cross-platform workflows, or leverage macOS-exclusive applications – all in the cloud with the same simple API. Stay tuned for updates and join our [**Discord**](https://discord.gg/cua-ai) to vote on which features you'd like to see first! ## Get Started Today Ready to deploy your Computer-Use Agents in the cloud? Visit [**trycua.com**](https://trycua.com) to sign up and get your API key. Join our [**Discord community**](https://discord.gg/cua-ai) for support and explore more examples on [**GitHub**](https://github.com/trycua/cua). Happy RPA 2.0! 🚀 ```