This is page 8 of 20. Use http://codebase.md/trycua/cua?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── scripts
│ │ ├── get_pyproject_version.py
│ │ └── tests
│ │ ├── __init__.py
│ │ ├── README.md
│ │ └── test_get_pyproject_version.py
│ └── workflows
│ ├── bump-version.yml
│ ├── ci-lume.yml
│ ├── docker-publish-cua-linux.yml
│ ├── docker-publish-cua-windows.yml
│ ├── docker-publish-kasm.yml
│ ├── docker-publish-xfce.yml
│ ├── docker-reusable-publish.yml
│ ├── link-check.yml
│ ├── lint.yml
│ ├── npm-publish-cli.yml
│ ├── npm-publish-computer.yml
│ ├── npm-publish-core.yml
│ ├── publish-lume.yml
│ ├── pypi-publish-agent.yml
│ ├── pypi-publish-computer-server.yml
│ ├── pypi-publish-computer.yml
│ ├── pypi-publish-core.yml
│ ├── pypi-publish-mcp-server.yml
│ ├── pypi-publish-som.yml
│ ├── pypi-reusable-publish.yml
│ ├── python-tests.yml
│ ├── test-cua-models.yml
│ └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│ ├── docs.code-workspace
│ ├── extensions.json
│ ├── launch.json
│ ├── libs-ts.code-workspace
│ ├── lume.code-workspace
│ ├── lumier.code-workspace
│ ├── py.code-workspace
│ └── settings.json
├── blog
│ ├── app-use.md
│ ├── assets
│ │ ├── composite-agents.png
│ │ ├── docker-ubuntu-support.png
│ │ ├── hack-booth.png
│ │ ├── hack-closing-ceremony.jpg
│ │ ├── hack-cua-ollama-hud.jpeg
│ │ ├── hack-leaderboard.png
│ │ ├── hack-the-north.png
│ │ ├── hack-winners.jpeg
│ │ ├── hack-workshop.jpeg
│ │ ├── hud-agent-evals.png
│ │ └── trajectory-viewer.jpeg
│ ├── bringing-computer-use-to-the-web.md
│ ├── build-your-own-operator-on-macos-1.md
│ ├── build-your-own-operator-on-macos-2.md
│ ├── cloud-windows-ga-macos-preview.md
│ ├── composite-agents.md
│ ├── computer-use-agents-for-growth-hacking.md
│ ├── cua-hackathon.md
│ ├── cua-playground-preview.md
│ ├── cua-vlm-router.md
│ ├── hack-the-north.md
│ ├── hud-agent-evals.md
│ ├── human-in-the-loop.md
│ ├── introducing-cua-cli.md
│ ├── introducing-cua-cloud-containers.md
│ ├── lume-to-containerization.md
│ ├── neurips-2025-cua-papers.md
│ ├── sandboxed-python-execution.md
│ ├── training-computer-use-models-trajectories-1.md
│ ├── trajectory-viewer.md
│ ├── ubuntu-docker-support.md
│ └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│ ├── .env.example
│ ├── .gitignore
│ ├── content
│ │ └── docs
│ │ ├── agent-sdk
│ │ │ ├── agent-loops.mdx
│ │ │ ├── benchmarks
│ │ │ │ ├── index.mdx
│ │ │ │ ├── interactive.mdx
│ │ │ │ ├── introduction.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── osworld-verified.mdx
│ │ │ │ ├── screenspot-pro.mdx
│ │ │ │ └── screenspot-v2.mdx
│ │ │ ├── callbacks
│ │ │ │ ├── agent-lifecycle.mdx
│ │ │ │ ├── cost-saving.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── logging.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── pii-anonymization.mdx
│ │ │ │ └── trajectories.mdx
│ │ │ ├── chat-history.mdx
│ │ │ ├── custom-tools.mdx
│ │ │ ├── customizing-computeragent.mdx
│ │ │ ├── integrations
│ │ │ │ ├── hud.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── observability.mdx
│ │ │ ├── mcp-server
│ │ │ │ ├── client-integrations.mdx
│ │ │ │ ├── configuration.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── llm-integrations.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── tools.mdx
│ │ │ │ └── usage.mdx
│ │ │ ├── message-format.mdx
│ │ │ ├── meta.json
│ │ │ ├── migration-guide.mdx
│ │ │ ├── prompt-caching.mdx
│ │ │ ├── supported-agents
│ │ │ │ ├── composed-agents.mdx
│ │ │ │ ├── computer-use-agents.mdx
│ │ │ │ ├── grounding-models.mdx
│ │ │ │ ├── human-in-the-loop.mdx
│ │ │ │ └── meta.json
│ │ │ ├── supported-model-providers
│ │ │ │ ├── cua-vlm-router.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ └── local-models.mdx
│ │ │ ├── telemetry.mdx
│ │ │ └── usage-tracking.mdx
│ │ ├── cli-playbook
│ │ │ ├── commands.mdx
│ │ │ ├── index.mdx
│ │ │ └── meta.json
│ │ ├── computer-sdk
│ │ │ ├── cloud-vm-management.mdx
│ │ │ ├── commands.mdx
│ │ │ ├── computer-server
│ │ │ │ ├── Commands.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── REST-API.mdx
│ │ │ │ └── WebSocket-API.mdx
│ │ │ ├── computer-ui.mdx
│ │ │ ├── computers.mdx
│ │ │ ├── custom-computer-handlers.mdx
│ │ │ ├── meta.json
│ │ │ ├── sandboxed-python.mdx
│ │ │ └── tracing-api.mdx
│ │ ├── example-usecases
│ │ │ ├── form-filling.mdx
│ │ │ ├── gemini-complex-ui-navigation.mdx
│ │ │ ├── meta.json
│ │ │ ├── post-event-contact-export.mdx
│ │ │ └── windows-app-behind-vpn.mdx
│ │ ├── get-started
│ │ │ ├── meta.json
│ │ │ └── quickstart.mdx
│ │ ├── index.mdx
│ │ ├── macos-vm-cli-playbook
│ │ │ ├── lume
│ │ │ │ ├── cli-reference.mdx
│ │ │ │ ├── faq.md
│ │ │ │ ├── http-api.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── prebuilt-images.mdx
│ │ │ ├── lumier
│ │ │ │ ├── building-lumier.mdx
│ │ │ │ ├── docker-compose.mdx
│ │ │ │ ├── docker.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ └── meta.json
│ │ │ └── meta.json
│ │ └── meta.json
│ ├── next.config.mjs
│ ├── package-lock.json
│ ├── package.json
│ ├── pnpm-lock.yaml
│ ├── postcss.config.mjs
│ ├── public
│ │ └── img
│ │ ├── agent_gradio_ui.png
│ │ ├── agent.png
│ │ ├── bg-dark.jpg
│ │ ├── bg-light.jpg
│ │ ├── cli.png
│ │ ├── computer.png
│ │ ├── grounding-with-gemini3.gif
│ │ ├── hero.png
│ │ ├── laminar_trace_example.png
│ │ ├── som_box_threshold.png
│ │ └── som_iou_threshold.png
│ ├── README.md
│ ├── source.config.ts
│ ├── src
│ │ ├── app
│ │ │ ├── (home)
│ │ │ │ ├── [[...slug]]
│ │ │ │ │ └── page.tsx
│ │ │ │ └── layout.tsx
│ │ │ ├── api
│ │ │ │ ├── posthog
│ │ │ │ │ └── [...path]
│ │ │ │ │ └── route.ts
│ │ │ │ └── search
│ │ │ │ └── route.ts
│ │ │ ├── favicon.ico
│ │ │ ├── global.css
│ │ │ ├── layout.config.tsx
│ │ │ ├── layout.tsx
│ │ │ ├── llms.mdx
│ │ │ │ └── [[...slug]]
│ │ │ │ └── route.ts
│ │ │ ├── llms.txt
│ │ │ │ └── route.ts
│ │ │ ├── robots.ts
│ │ │ └── sitemap.ts
│ │ ├── assets
│ │ │ ├── discord-black.svg
│ │ │ ├── discord-white.svg
│ │ │ ├── logo-black.svg
│ │ │ └── logo-white.svg
│ │ ├── components
│ │ │ ├── analytics-tracker.tsx
│ │ │ ├── cookie-consent.tsx
│ │ │ ├── doc-actions-menu.tsx
│ │ │ ├── editable-code-block.tsx
│ │ │ ├── footer.tsx
│ │ │ ├── hero.tsx
│ │ │ ├── iou.tsx
│ │ │ ├── mermaid.tsx
│ │ │ └── page-feedback.tsx
│ │ ├── lib
│ │ │ ├── llms.ts
│ │ │ └── source.ts
│ │ ├── mdx-components.tsx
│ │ └── providers
│ │ └── posthog-provider.tsx
│ └── tsconfig.json
├── examples
│ ├── agent_examples.py
│ ├── agent_ui_examples.py
│ ├── browser_tool_example.py
│ ├── cloud_api_examples.py
│ ├── computer_examples_windows.py
│ ├── computer_examples.py
│ ├── computer_ui_examples.py
│ ├── computer-example-ts
│ │ ├── .env.example
│ │ ├── .gitignore
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── README.md
│ │ ├── src
│ │ │ ├── helpers.ts
│ │ │ └── index.ts
│ │ └── tsconfig.json
│ ├── docker_examples.py
│ ├── evals
│ │ ├── hud_eval_examples.py
│ │ └── wikipedia_most_linked.txt
│ ├── pylume_examples.py
│ ├── sandboxed_functions_examples.py
│ ├── som_examples.py
│ ├── tracing_examples.py
│ ├── utils.py
│ └── winsandbox_example.py
├── img
│ ├── agent_gradio_ui.png
│ ├── agent.png
│ ├── cli.png
│ ├── computer.png
│ ├── logo_black.png
│ └── logo_white.png
├── libs
│ ├── kasm
│ │ ├── Dockerfile
│ │ ├── LICENSE
│ │ ├── README.md
│ │ └── src
│ │ └── ubuntu
│ │ └── install
│ │ └── firefox
│ │ ├── custom_startup.sh
│ │ ├── firefox.desktop
│ │ └── install_firefox.sh
│ ├── lume
│ │ ├── .cursorignore
│ │ ├── CONTRIBUTING.md
│ │ ├── Development.md
│ │ ├── img
│ │ │ └── cli.png
│ │ ├── Package.resolved
│ │ ├── Package.swift
│ │ ├── README.md
│ │ ├── resources
│ │ │ └── lume.entitlements
│ │ ├── scripts
│ │ │ ├── build
│ │ │ │ ├── build-debug.sh
│ │ │ │ ├── build-release-notarized.sh
│ │ │ │ └── build-release.sh
│ │ │ └── install.sh
│ │ ├── src
│ │ │ ├── Commands
│ │ │ │ ├── Clone.swift
│ │ │ │ ├── Config.swift
│ │ │ │ ├── Create.swift
│ │ │ │ ├── Delete.swift
│ │ │ │ ├── Get.swift
│ │ │ │ ├── Images.swift
│ │ │ │ ├── IPSW.swift
│ │ │ │ ├── List.swift
│ │ │ │ ├── Logs.swift
│ │ │ │ ├── Options
│ │ │ │ │ └── FormatOption.swift
│ │ │ │ ├── Prune.swift
│ │ │ │ ├── Pull.swift
│ │ │ │ ├── Push.swift
│ │ │ │ ├── Run.swift
│ │ │ │ ├── Serve.swift
│ │ │ │ ├── Set.swift
│ │ │ │ └── Stop.swift
│ │ │ ├── ContainerRegistry
│ │ │ │ ├── ImageContainerRegistry.swift
│ │ │ │ ├── ImageList.swift
│ │ │ │ └── ImagesPrinter.swift
│ │ │ ├── Errors
│ │ │ │ └── Errors.swift
│ │ │ ├── FileSystem
│ │ │ │ ├── Home.swift
│ │ │ │ ├── Settings.swift
│ │ │ │ ├── VMConfig.swift
│ │ │ │ ├── VMDirectory.swift
│ │ │ │ └── VMLocation.swift
│ │ │ ├── LumeController.swift
│ │ │ ├── Main.swift
│ │ │ ├── Server
│ │ │ │ ├── Handlers.swift
│ │ │ │ ├── HTTP.swift
│ │ │ │ ├── Requests.swift
│ │ │ │ ├── Responses.swift
│ │ │ │ └── Server.swift
│ │ │ ├── Utils
│ │ │ │ ├── CommandRegistry.swift
│ │ │ │ ├── CommandUtils.swift
│ │ │ │ ├── Logger.swift
│ │ │ │ ├── NetworkUtils.swift
│ │ │ │ ├── Path.swift
│ │ │ │ ├── ProcessRunner.swift
│ │ │ │ ├── ProgressLogger.swift
│ │ │ │ ├── String.swift
│ │ │ │ └── Utils.swift
│ │ │ ├── Virtualization
│ │ │ │ ├── DarwinImageLoader.swift
│ │ │ │ ├── DHCPLeaseParser.swift
│ │ │ │ ├── ImageLoaderFactory.swift
│ │ │ │ └── VMVirtualizationService.swift
│ │ │ ├── VM
│ │ │ │ ├── DarwinVM.swift
│ │ │ │ ├── LinuxVM.swift
│ │ │ │ ├── VM.swift
│ │ │ │ ├── VMDetails.swift
│ │ │ │ ├── VMDetailsPrinter.swift
│ │ │ │ ├── VMDisplayResolution.swift
│ │ │ │ └── VMFactory.swift
│ │ │ └── VNC
│ │ │ ├── PassphraseGenerator.swift
│ │ │ └── VNCService.swift
│ │ └── tests
│ │ ├── Mocks
│ │ │ ├── MockVM.swift
│ │ │ ├── MockVMVirtualizationService.swift
│ │ │ └── MockVNCService.swift
│ │ ├── VM
│ │ │ └── VMDetailsPrinterTests.swift
│ │ ├── VMTests.swift
│ │ ├── VMVirtualizationServiceTests.swift
│ │ └── VNCServiceTests.swift
│ ├── lumier
│ │ ├── .dockerignore
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── bin
│ │ │ └── entry.sh
│ │ ├── config
│ │ │ └── constants.sh
│ │ ├── hooks
│ │ │ └── on-logon.sh
│ │ └── lib
│ │ ├── utils.sh
│ │ └── vm.sh
│ ├── python
│ │ ├── agent
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── agent
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── adapters
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── cua_adapter.py
│ │ │ │ │ ├── huggingfacelocal_adapter.py
│ │ │ │ │ ├── human_adapter.py
│ │ │ │ │ ├── mlxvlm_adapter.py
│ │ │ │ │ └── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ └── qwen2_5_vl.py
│ │ │ │ ├── agent.py
│ │ │ │ ├── callbacks
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── budget_manager.py
│ │ │ │ │ ├── image_retention.py
│ │ │ │ │ ├── logging.py
│ │ │ │ │ ├── operator_validator.py
│ │ │ │ │ ├── pii_anonymization.py
│ │ │ │ │ ├── prompt_instructions.py
│ │ │ │ │ ├── telemetry.py
│ │ │ │ │ └── trajectory_saver.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── computers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cua.py
│ │ │ │ │ └── custom.py
│ │ │ │ ├── decorators.py
│ │ │ │ ├── human_tool
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ ├── server.py
│ │ │ │ │ └── ui.py
│ │ │ │ ├── integrations
│ │ │ │ │ └── hud
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── agent.py
│ │ │ │ │ └── proxy.py
│ │ │ │ ├── loops
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── anthropic.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── composed_grounded.py
│ │ │ │ │ ├── gelato.py
│ │ │ │ │ ├── gemini.py
│ │ │ │ │ ├── generic_vlm.py
│ │ │ │ │ ├── glm45v.py
│ │ │ │ │ ├── gta1.py
│ │ │ │ │ ├── holo.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── model_types.csv
│ │ │ │ │ ├── moondream3.py
│ │ │ │ │ ├── omniparser.py
│ │ │ │ │ ├── openai.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ ├── uiins.py
│ │ │ │ │ ├── uitars.py
│ │ │ │ │ └── uitars2.py
│ │ │ │ ├── proxy
│ │ │ │ │ ├── examples.py
│ │ │ │ │ └── handlers.py
│ │ │ │ ├── responses.py
│ │ │ │ ├── tools
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── browser_tool.py
│ │ │ │ ├── types.py
│ │ │ │ └── ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ └── gradio
│ │ │ │ ├── __init__.py
│ │ │ │ ├── app.py
│ │ │ │ └── ui_components.py
│ │ │ ├── benchmarks
│ │ │ │ ├── .gitignore
│ │ │ │ ├── contrib.md
│ │ │ │ ├── interactive.py
│ │ │ │ ├── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ └── gta1.py
│ │ │ │ ├── README.md
│ │ │ │ ├── ss-pro.py
│ │ │ │ ├── ss-v2.py
│ │ │ │ └── utils.py
│ │ │ ├── example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer_agent.py
│ │ ├── bench-ui
│ │ │ ├── bench_ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api.py
│ │ │ │ └── child.py
│ │ │ ├── examples
│ │ │ │ ├── folder_example.py
│ │ │ │ ├── gui
│ │ │ │ │ ├── index.html
│ │ │ │ │ ├── logo.svg
│ │ │ │ │ └── styles.css
│ │ │ │ ├── output_overlay.png
│ │ │ │ └── simple_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ └── test_port_detection.py
│ │ ├── computer
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer
│ │ │ │ ├── __init__.py
│ │ │ │ ├── computer.py
│ │ │ │ ├── diorama_computer.py
│ │ │ │ ├── helpers.py
│ │ │ │ ├── interface
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ ├── models.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── logger.py
│ │ │ │ ├── models.py
│ │ │ │ ├── providers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cloud
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── docker
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── lume
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── lume_api.py
│ │ │ │ │ ├── lumier
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── types.py
│ │ │ │ │ └── winsandbox
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── provider.py
│ │ │ │ │ └── setup_script.ps1
│ │ │ │ ├── tracing_wrapper.py
│ │ │ │ ├── tracing.py
│ │ │ │ ├── ui
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ └── gradio
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── app.py
│ │ │ │ └── utils.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer.py
│ │ ├── computer-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── browser.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── diorama
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── diorama_computer.py
│ │ │ │ │ ├── diorama.py
│ │ │ │ │ ├── draw.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── safezone.py
│ │ │ │ ├── handlers
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── main.py
│ │ │ │ ├── server.py
│ │ │ │ ├── utils
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── wallpaper.py
│ │ │ │ └── watchdog.py
│ │ │ ├── examples
│ │ │ │ ├── __init__.py
│ │ │ │ └── usage_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ ├── run_server.py
│ │ │ ├── test_connection.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_server.py
│ │ ├── core
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── core
│ │ │ │ ├── __init__.py
│ │ │ │ └── telemetry
│ │ │ │ ├── __init__.py
│ │ │ │ └── posthog.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_telemetry.py
│ │ ├── mcp-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── build-extension.py
│ │ │ ├── CONCURRENT_SESSIONS.md
│ │ │ ├── desktop-extension
│ │ │ │ ├── cua-extension.mcpb
│ │ │ │ ├── desktop_extension.png
│ │ │ │ ├── manifest.json
│ │ │ │ ├── README.md
│ │ │ │ ├── requirements.txt
│ │ │ │ ├── run_server.sh
│ │ │ │ └── setup.py
│ │ │ ├── mcp_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── server.py
│ │ │ │ └── session_manager.py
│ │ │ ├── pdm.lock
│ │ │ ├── pyproject.toml
│ │ │ ├── QUICK_TEST_COMMANDS.sh
│ │ │ ├── quick_test_local_option.py
│ │ │ ├── README.md
│ │ │ ├── scripts
│ │ │ │ ├── install_mcp_server.sh
│ │ │ │ └── start_mcp_server.sh
│ │ │ ├── test_mcp_server_local_option.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_mcp_server.py
│ │ ├── pylume
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_pylume.py
│ │ └── som
│ │ ├── .bumpversion.cfg
│ │ ├── LICENSE
│ │ ├── poetry.toml
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ ├── som
│ │ │ ├── __init__.py
│ │ │ ├── detect.py
│ │ │ ├── detection.py
│ │ │ ├── models.py
│ │ │ ├── ocr.py
│ │ │ ├── util
│ │ │ │ └── utils.py
│ │ │ └── visualization.py
│ │ └── tests
│ │ ├── conftest.py
│ │ └── test_omniparser.py
│ ├── qemu-docker
│ │ ├── linux
│ │ │ ├── Dockerfile
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ ├── entry.sh
│ │ │ └── vm
│ │ │ ├── image
│ │ │ │ └── README.md
│ │ │ └── setup
│ │ │ ├── install.sh
│ │ │ ├── setup-cua-server.sh
│ │ │ └── setup.sh
│ │ ├── README.md
│ │ └── windows
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── entry.sh
│ │ └── vm
│ │ ├── image
│ │ │ └── README.md
│ │ └── setup
│ │ ├── install.bat
│ │ ├── on-logon.ps1
│ │ ├── setup-cua-server.ps1
│ │ ├── setup-utils.psm1
│ │ └── setup.ps1
│ ├── typescript
│ │ ├── .gitignore
│ │ ├── .nvmrc
│ │ ├── agent
│ │ │ ├── examples
│ │ │ │ ├── playground-example.html
│ │ │ │ └── README.md
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── client.ts
│ │ │ │ ├── index.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ └── client.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── computer
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── computer
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── providers
│ │ │ │ │ │ ├── base.ts
│ │ │ │ │ │ ├── cloud.ts
│ │ │ │ │ │ └── index.ts
│ │ │ │ │ └── types.ts
│ │ │ │ ├── index.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── base.ts
│ │ │ │ │ ├── factory.ts
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── linux.ts
│ │ │ │ │ ├── macos.ts
│ │ │ │ │ └── windows.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ ├── computer
│ │ │ │ │ └── cloud.test.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── factory.test.ts
│ │ │ │ │ ├── index.test.ts
│ │ │ │ │ ├── linux.test.ts
│ │ │ │ │ ├── macos.test.ts
│ │ │ │ │ └── windows.test.ts
│ │ │ │ └── setup.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── core
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── index.ts
│ │ │ │ └── telemetry
│ │ │ │ ├── clients
│ │ │ │ │ ├── index.ts
│ │ │ │ │ └── posthog.ts
│ │ │ │ └── index.ts
│ │ │ ├── tests
│ │ │ │ └── telemetry.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── cua-cli
│ │ │ ├── .gitignore
│ │ │ ├── .prettierrc
│ │ │ ├── bun.lock
│ │ │ ├── CLAUDE.md
│ │ │ ├── index.ts
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── auth.ts
│ │ │ │ ├── cli.ts
│ │ │ │ ├── commands
│ │ │ │ │ ├── auth.ts
│ │ │ │ │ └── sandbox.ts
│ │ │ │ ├── config.ts
│ │ │ │ ├── http.ts
│ │ │ │ ├── storage.ts
│ │ │ │ └── util.ts
│ │ │ └── tsconfig.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── pnpm-workspace.yaml
│ │ └── README.md
│ └── xfce
│ ├── .dockerignore
│ ├── .gitignore
│ ├── Development.md
│ ├── Dockerfile
│ ├── Dockerfile.dev
│ ├── README.md
│ └── src
│ ├── scripts
│ │ ├── resize-display.sh
│ │ ├── start-computer-server.sh
│ │ ├── start-novnc.sh
│ │ ├── start-vnc.sh
│ │ └── xstartup.sh
│ ├── supervisor
│ │ └── supervisord.conf
│ └── xfce-config
│ ├── helpers.rc
│ ├── xfce4-power-manager.xml
│ └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│ ├── agent_nb.ipynb
│ ├── blog
│ │ ├── build-your-own-operator-on-macos-1.ipynb
│ │ └── build-your-own-operator-on-macos-2.ipynb
│ ├── composite_agents_docker_nb.ipynb
│ ├── computer_nb.ipynb
│ ├── computer_server_nb.ipynb
│ ├── customizing_computeragent.ipynb
│ ├── eval_osworld.ipynb
│ ├── ollama_nb.ipynb
│ ├── README.md
│ ├── sota_hackathon_cloud.ipynb
│ └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── install-cli.ps1
│ ├── install-cli.sh
│ ├── playground-docker.sh
│ ├── playground.sh
│ ├── run-docker-dev.sh
│ └── typescript-typecheck.js
├── TESTING.md
├── tests
│ ├── agent_loop_testing
│ │ ├── agent_test.py
│ │ └── README.md
│ ├── pytest.ini
│ ├── shell_cmd.py
│ ├── test_files.py
│ ├── test_mcp_server_session_management.py
│ ├── test_mcp_server_streaming.py
│ ├── test_shell_bash.py
│ ├── test_telemetry.py
│ ├── test_tracing.py
│ ├── test_venv.py
│ └── test_watchdog.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/test_tracing.py:
--------------------------------------------------------------------------------
```python
"""
Tests for Computer.tracing functionality.
"""
import asyncio
import json
import tempfile
from pathlib import Path
import pytest
from computer.tracing import ComputerTracing
class MockComputer:
"""Mock computer for testing tracing functionality."""
def __init__(self):
self.os_type = "macos"
self.provider_type = "lume"
self.image = "test-image"
self.interface = MockInterface()
self.logger = MockLogger()
class MockInterface:
"""Mock interface for testing."""
async def screenshot(self):
"""Return mock screenshot data."""
return b"mock_screenshot_data"
async def get_accessibility_tree(self):
"""Return mock accessibility tree."""
return {"type": "window", "children": []}
class MockLogger:
"""Mock logger for testing."""
def warning(self, message):
print(f"Warning: {message}")
@pytest.mark.asyncio
async def test_tracing_start_stop():
"""Test basic start and stop functionality."""
computer = MockComputer()
tracing = ComputerTracing(computer)
# Test initial state
assert not tracing.is_tracing
# Start tracing
with tempfile.TemporaryDirectory() as temp_dir:
await tracing.start({"screenshots": True, "api_calls": True, "path": temp_dir})
# Test tracing is active
assert tracing.is_tracing
# Stop tracing
trace_path = await tracing.stop({"format": "dir"})
# Test tracing is stopped
assert not tracing.is_tracing
# Verify trace directory exists
assert Path(trace_path).exists()
# Verify metadata file exists
metadata_file = Path(trace_path) / "trace_metadata.json"
assert metadata_file.exists()
# Verify metadata content
with open(metadata_file) as f:
metadata = json.load(f)
assert "trace_id" in metadata
assert "config" in metadata
assert "start_time" in metadata
assert "end_time" in metadata
@pytest.mark.asyncio
async def test_tracing_api_call_recording():
"""Test API call recording functionality."""
computer = MockComputer()
tracing = ComputerTracing(computer)
with tempfile.TemporaryDirectory() as temp_dir:
await tracing.start({"api_calls": True, "screenshots": False, "path": temp_dir})
# Record an API call
await tracing.record_api_call("left_click", {"x": 100, "y": 200}, result=None, error=None)
# Record another API call with error
test_error = Exception("Test error")
await tracing.record_api_call("type_text", {"text": "test"}, result=None, error=test_error)
trace_path = await tracing.stop({"format": "dir"})
# Verify event files were created
trace_dir = Path(trace_path)
event_files = list(trace_dir.glob("event_*_api_call.json"))
assert len(event_files) >= 2
# Verify event content
with open(event_files[0]) as f:
event = json.load(f)
assert event["type"] == "api_call"
assert event["data"]["method"] == "left_click"
assert event["data"]["success"] is True
@pytest.mark.asyncio
async def test_tracing_metadata():
"""Test metadata recording functionality."""
computer = MockComputer()
tracing = ComputerTracing(computer)
with tempfile.TemporaryDirectory() as temp_dir:
await tracing.start({"metadata": True, "path": temp_dir})
# Add custom metadata
await tracing.add_metadata("test_key", "test_value")
await tracing.add_metadata("numeric_key", 42)
await tracing.add_metadata("complex_key", {"nested": "data"})
trace_path = await tracing.stop({"format": "dir"})
# Verify metadata event files
trace_dir = Path(trace_path)
metadata_files = list(trace_dir.glob("event_*_metadata.json"))
assert len(metadata_files) >= 3
@pytest.mark.asyncio
async def test_tracing_screenshots():
"""Test screenshot recording functionality."""
computer = MockComputer()
tracing = ComputerTracing(computer)
with tempfile.TemporaryDirectory() as temp_dir:
await tracing.start({"screenshots": True, "path": temp_dir})
# Take a screenshot manually
await tracing._take_screenshot("manual_test")
trace_path = await tracing.stop({"format": "dir"})
# Verify screenshot files
trace_dir = Path(trace_path)
screenshot_files = list(trace_dir.glob("*.png"))
assert len(screenshot_files) >= 2 # Initial + manual + final
@pytest.mark.asyncio
async def test_tracing_config_options():
"""Test different configuration options."""
computer = MockComputer()
tracing = ComputerTracing(computer)
# Test with minimal config
with tempfile.TemporaryDirectory() as temp_dir:
await tracing.start(
{"screenshots": False, "api_calls": False, "metadata": False, "path": temp_dir}
)
await tracing.record_api_call("test_call", {})
await tracing.add_metadata("test", "value")
trace_path = await tracing.stop({"format": "dir"})
# With everything disabled, should only have basic trace events
trace_dir = Path(trace_path)
event_files = list(trace_dir.glob("event_*.json"))
# Should have trace_start and trace_end events only
assert len(event_files) == 2
@pytest.mark.asyncio
async def test_tracing_zip_output():
"""Test zip file output format."""
computer = MockComputer()
tracing = ComputerTracing(computer)
with tempfile.TemporaryDirectory() as temp_dir:
await tracing.start({"screenshots": True, "api_calls": True, "path": temp_dir})
await tracing.record_api_call("test_call", {"arg": "value"})
# Stop with zip format
trace_path = await tracing.stop({"format": "zip"})
# Verify zip file exists
assert Path(trace_path).exists()
assert trace_path.endswith(".zip")
@pytest.mark.asyncio
async def test_tracing_accessibility_tree():
"""Test accessibility tree recording."""
computer = MockComputer()
tracing = ComputerTracing(computer)
with tempfile.TemporaryDirectory() as temp_dir:
await tracing.start({"accessibility_tree": True, "path": temp_dir})
# Record accessibility tree
await tracing.record_accessibility_tree()
trace_path = await tracing.stop({"format": "dir"})
# Verify accessibility tree event
trace_dir = Path(trace_path)
tree_files = list(trace_dir.glob("event_*_accessibility_tree.json"))
assert len(tree_files) >= 1
# Verify content
with open(tree_files[0]) as f:
event = json.load(f)
assert event["type"] == "accessibility_tree"
assert "tree" in event["data"]
def test_tracing_errors():
"""Test error handling in tracing."""
computer = MockComputer()
tracing = ComputerTracing(computer)
# Test stop without start
with pytest.raises(RuntimeError, match="Tracing is not active"):
asyncio.run(tracing.stop())
# Test start when already started
async def test_double_start():
await tracing.start()
with pytest.raises(RuntimeError, match="Tracing is already active"):
await tracing.start()
await tracing.stop()
asyncio.run(test_double_start())
if __name__ == "__main__":
# Run tests directly
import sys
async def run_tests():
"""Run all tests manually."""
tests = [
test_tracing_start_stop,
test_tracing_api_call_recording,
test_tracing_metadata,
test_tracing_screenshots,
test_tracing_config_options,
test_tracing_zip_output,
test_tracing_accessibility_tree,
]
print("Running Computer.tracing tests...")
for test in tests:
try:
await test()
print(f"✓ {test.__name__}")
except Exception as e:
print(f"✗ {test.__name__}: {e}")
# Run sync tests
try:
test_tracing_errors()
print("✓ test_tracing_errors")
except Exception as e:
print(f"✗ test_tracing_errors: {e}")
print("Tests completed!")
asyncio.run(run_tests())
```
--------------------------------------------------------------------------------
/docs/content/docs/cli-playbook/commands.mdx:
--------------------------------------------------------------------------------
```markdown
---
title: Command Reference
description: Complete reference for all CUA CLI commands
---
import { Tabs, Tab } from 'fumadocs-ui/components/tabs';
import { Callout } from 'fumadocs-ui/components/callout';
## Overview
The CUA CLI provides commands for authentication and sandbox management.
### Command Styles
The CLI supports **two command styles** for flexibility:
**Flat style** (quick & concise):
```bash
cua list
cua create --os linux --size small --region north-america
cua start my-sandbox
```
**Grouped style** (explicit & clear):
```bash
cua sb list # or: cua sandbox list
cua sb create # or: cua sandbox create
cua sb start # or: cua sandbox start
```
Both styles work identically - use whichever you prefer!
### Available Commands
- **Authentication** - `cua auth login`, `cua auth env`, `cua auth logout` (also available as flat commands: `cua login`, `cua env`, `cua logout`)
- **Sandbox Management** - `cua list`, `cua create`, `cua get`, `cua start`, `cua stop`, `cua restart`, `cua delete`, `cua vnc`
## Authentication Commands
### `cua auth login`
Authenticate with your CUA account using browser-based OAuth flow.
```bash
# Interactive browser login
cua auth login
# Direct API key login
cua auth login --api-key sk-your-api-key-here
# Alternative flat style
cua login
cua login --api-key sk-your-api-key-here
```
**Options:**
- `--api-key <key>` - Provide API key directly instead of browser flow
**Example:**
```bash
$ cua auth login
Opening browser for CLI auth...
API key saved
```
### `cua auth env`
Create or update a `.env` file in the current directory with your CUA API key.
```bash
cua auth env
# Alternative flat style
cua env
```
**Example:**
```bash
$ cua auth env
Wrote /path/to/your/project/.env
```
The generated `.env` file will contain:
```
CUA_API_KEY=sk-your-api-key-here
```
### `cua auth logout`
Remove the stored API key from your system.
```bash
cua auth logout
# Alternative flat style
cua logout
```
**Example:**
```bash
$ cua auth logout
Logged out
```
## Sandbox Commands
### `cua list`
List all your sandboxes with their current status. Passwords are hidden by default for security.
```bash
# List sandboxes (passwords hidden)
cua list
# Show passwords explicitly
cua list --show-passwords
# Alternative aliases
cua ls
cua ps
```
**Example Output (default, passwords hidden):**
```
NAME STATUS HOST
my-dev-sandbox running my-dev-sandbox.sandbox.cua.ai
test-windows stopped test-windows.sandbox.cua.ai
```
**Example Output (with --show-passwords):**
```
NAME STATUS PASSWORD HOST
my-dev-sandbox running secure-pass-123 my-dev-sandbox.sandbox.cua.ai
test-windows stopped another-pass-456 test-windows.sandbox.cua.ai
```
### `cua create`
Create a new sandbox.
```bash
cua create --os <OS> --size <SIZE> --region <REGION>
```
**Required Options:**
- `--os` - Operating system: `linux`, `windows`, `macos`
- `--size` - Sandbox size: `small`, `medium`, `large`
- `--region` - Region: `north-america`, `europe`, `asia-pacific`, `south-america`
**Examples:**
```bash
# Create a small Linux sandbox in North America
cua create --os linux --size small --region north-america
# Create a medium Windows sandbox in Europe
cua create --os windows --size medium --region europe
# Create a large macOS sandbox in Asia Pacific
cua create --os macos --size large --region asia-pacific
```
**Response Types:**
**Immediate (Status 200):**
```bash
Sandbox created and ready: my-new-sandbox-abc123
Password: secure-password-here
Host: my-new-sandbox-abc123.sandbox.cua.ai
```
**Provisioning (Status 202):**
```bash
Sandbox provisioning started: my-new-sandbox-abc123
Job ID: job-xyz789
Use 'cua list' to monitor provisioning progress
```
### `cua get`
Get detailed information about a specific sandbox, including computer-server health status.
```bash
cua get <name>
# With additional options
cua get <name> --json
cua get <name> --show-passwords
cua get <name> --show-vnc-url
```
**Options:**
- `--json` - Output all details in JSON format
- `--show-passwords` - Include password in output
- `--show-vnc-url` - Include computed NoVNC URL
**Example Output (default):**
```bash
$ cua get my-dev-sandbox
Name: my-dev-sandbox
Status: running
Host: my-dev-sandbox.containers.cloud.trycua.com
OS Type: linux
Computer Server Version: 0.1.30
Computer Server Status: healthy
```
**Example Output (with --show-passwords and --show-vnc-url):**
```bash
$ cua get my-dev-sandbox --show-passwords --show-vnc-url
Name: my-dev-sandbox
Status: running
Host: my-dev-sandbox.containers.cloud.trycua.com
Password: secure-pass-123
OS Type: linux
Computer Server Version: 0.1.30
Computer Server Status: healthy
VNC URL: https://my-dev-sandbox.containers.cloud.trycua.com/vnc.html?autoconnect=true&password=secure-pass-123
```
**Example Output (JSON format):**
```bash
$ cua get my-dev-sandbox --json
{
"name": "my-dev-sandbox",
"status": "running",
"host": "my-dev-sandbox.containers.cloud.trycua.com",
"os_type": "linux",
"computer_server_version": "0.1.30",
"computer_server_status": "healthy"
}
```
**Computer Server Health Check:**
The `cua get` command automatically probes the computer-server when the sandbox is running:
- Checks OS type via `https://{host}:8443/status`
- Checks version via `https://{host}:8443/cmd`
- Shows "Computer Server Status: healthy" when both probes succeed
- Uses a 3-second timeout for each probe
<Callout type="info">
The computer server status is only checked for running sandboxes. Stopped or suspended sandboxes
will not show computer server information.
</Callout>
### `cua start`
Start a stopped sandbox.
```bash
cua start <name>
```
**Example:**
```bash
$ cua start my-dev-sandbox
Start accepted
```
### `cua stop`
Stop a running sandbox.
```bash
cua stop <name>
```
**Example:**
```bash
$ cua stop my-dev-sandbox
stopping
```
### `cua restart`
Restart a sandbox.
```bash
cua restart <name>
```
**Example:**
```bash
$ cua restart my-dev-sandbox
restarting
```
### `cua delete`
Delete a sandbox permanently.
```bash
cua delete <name>
```
**Example:**
```bash
$ cua delete old-test-sandbox
Sandbox deletion initiated: deleting
```
<Callout type="warn">
This action is irreversible. All data on the sandbox will be permanently lost.
</Callout>
### `cua vnc`
Open the VNC interface for a sandbox in your browser.
```bash
cua vnc <name>
# Alternative alias
cua open <name>
```
**Example:**
```bash
$ cua vnc my-dev-sandbox
Opening NoVNC: https://my-dev-sandbox.sandbox.cua.ai/vnc.html?autoconnect=true&password=...
```
This command automatically opens your default browser to the VNC interface with the correct password pre-filled.
## Global Options
### Help
Get help for any command:
```bash
cua --help
cua auth login --help
cua create --help
cua list --help
```
## Error Handling
The CLI provides clear error messages for common issues:
### Authentication Errors
```bash
$ cua list
Unauthorized. Try 'cua auth login' again.
```
### Sandbox Not Found
```bash
$ cua start nonexistent-sandbox
Sandbox not found
```
### Invalid Configuration
```bash
$ cua create --os invalid --configuration small --region north-america
Invalid request or unsupported configuration
```
## Tips and Best Practices
### 1. Use Descriptive Sandbox Names
```bash
# Good
cua create --os linux --size small --region north-america
# Then rename or use meaningful names in the dashboard
# Better workflow
cua list # Check the generated name
# Use that name consistently
```
### 2. Environment Management
```bash
# Set up your project with API key
cd my-project
cua auth env
# Now your project has CUA_API_KEY in .env
```
### 3. Quick Sandbox Access
```bash
# Create aliases for frequently used sandboxes
alias dev-sandbox="cua vnc my-development-sandbox"
alias prod-sandbox="cua vnc my-production-sandbox"
```
### 4. Monitoring Provisioning
```bash
# For sandboxes that need provisioning time
cua create --os windows --size large --region europe
# Sandbox provisioning started: my-sandbox-abc123
# Job ID: job-xyz789
# Check status periodically
watch -n 5 cua list
```
## Next Steps
- [Get started with the quickstart guide](/get-started/quickstart#cli-quickstart)
- [Learn about CUA computers](/computer-sdk/computers)
- [Explore agent automation](/agent-sdk/agent-loops)
```
--------------------------------------------------------------------------------
/libs/python/computer/computer/diorama_computer.py:
--------------------------------------------------------------------------------
```python
import asyncio
from .interface.models import Key, KeyType
class DioramaComputer:
"""
A Computer-compatible proxy for Diorama that sends commands over the ComputerInterface.
"""
def __init__(self, computer, apps):
"""
Initialize the DioramaComputer with a computer instance and list of apps.
Args:
computer: The computer instance to proxy commands through
apps: List of applications available in the diorama environment
"""
self.computer = computer
self.apps = apps
self.interface = DioramaComputerInterface(computer, apps)
self._initialized = False
async def __aenter__(self):
"""
Async context manager entry point.
Returns:
self: The DioramaComputer instance
"""
self._initialized = True
return self
async def run(self):
"""
Initialize and run the DioramaComputer if not already initialized.
Returns:
self: The DioramaComputer instance
"""
if not self._initialized:
await self.__aenter__()
return self
class DioramaComputerInterface:
"""
Diorama Interface proxy that sends diorama_cmds via the Computer's interface.
"""
def __init__(self, computer, apps):
"""
Initialize the DioramaComputerInterface.
Args:
computer: The computer instance to send commands through
apps: List of applications available in the diorama environment
"""
self.computer = computer
self.apps = apps
self._scene_size = None
async def _send_cmd(self, action, arguments=None):
"""
Send a command to the diorama interface through the computer.
Args:
action (str): The action/command to execute
arguments (dict, optional): Additional arguments for the command
Returns:
The result from the diorama command execution
Raises:
RuntimeError: If the computer interface is not initialized or command fails
"""
arguments = arguments or {}
arguments = {"app_list": self.apps, **arguments}
# Use the computer's interface (must be initialized)
iface = getattr(self.computer, "_interface", None)
if iface is None:
raise RuntimeError("Computer interface not initialized. Call run() first.")
result = await iface.diorama_cmd(action, arguments)
if not result.get("success"):
raise RuntimeError(
f"Diorama command failed: {result.get('error')}\n{result.get('trace')}"
)
return result.get("result")
async def screenshot(self, as_bytes=True):
"""
Take a screenshot of the diorama scene.
Args:
as_bytes (bool): If True, return image as bytes; if False, return PIL Image object
Returns:
bytes or PIL.Image: Screenshot data in the requested format
"""
import base64
from PIL import Image
result = await self._send_cmd("screenshot")
# assume result is a b64 string of an image
img_bytes = base64.b64decode(result)
import io
img = Image.open(io.BytesIO(img_bytes))
self._scene_size = img.size
return img_bytes if as_bytes else img
async def get_screen_size(self):
"""
Get the dimensions of the diorama scene.
Returns:
dict: Dictionary containing 'width' and 'height' keys with pixel dimensions
"""
if not self._scene_size:
await self.screenshot(as_bytes=False)
return {"width": self._scene_size[0], "height": self._scene_size[1]}
async def move_cursor(self, x, y):
"""
Move the cursor to the specified coordinates.
Args:
x (int): X coordinate to move cursor to
y (int): Y coordinate to move cursor to
"""
await self._send_cmd("move_cursor", {"x": x, "y": y})
async def left_click(self, x=None, y=None):
"""
Perform a left mouse click at the specified coordinates or current cursor position.
Args:
x (int, optional): X coordinate to click at. If None, clicks at current cursor position
y (int, optional): Y coordinate to click at. If None, clicks at current cursor position
"""
await self._send_cmd("left_click", {"x": x, "y": y})
async def right_click(self, x=None, y=None):
"""
Perform a right mouse click at the specified coordinates or current cursor position.
Args:
x (int, optional): X coordinate to click at. If None, clicks at current cursor position
y (int, optional): Y coordinate to click at. If None, clicks at current cursor position
"""
await self._send_cmd("right_click", {"x": x, "y": y})
async def double_click(self, x=None, y=None):
"""
Perform a double mouse click at the specified coordinates or current cursor position.
Args:
x (int, optional): X coordinate to double-click at. If None, clicks at current cursor position
y (int, optional): Y coordinate to double-click at. If None, clicks at current cursor position
"""
await self._send_cmd("double_click", {"x": x, "y": y})
async def scroll_up(self, clicks=1):
"""
Scroll up by the specified number of clicks.
Args:
clicks (int): Number of scroll clicks to perform upward. Defaults to 1
"""
await self._send_cmd("scroll_up", {"clicks": clicks})
async def scroll_down(self, clicks=1):
"""
Scroll down by the specified number of clicks.
Args:
clicks (int): Number of scroll clicks to perform downward. Defaults to 1
"""
await self._send_cmd("scroll_down", {"clicks": clicks})
async def drag_to(self, x, y, duration=0.5):
"""
Drag from the current cursor position to the specified coordinates.
Args:
x (int): X coordinate to drag to
y (int): Y coordinate to drag to
duration (float): Duration of the drag operation in seconds. Defaults to 0.5
"""
await self._send_cmd("drag_to", {"x": x, "y": y, "duration": duration})
async def get_cursor_position(self):
"""
Get the current cursor position.
Returns:
dict: Dictionary containing the current cursor coordinates
"""
return await self._send_cmd("get_cursor_position")
async def type_text(self, text):
"""
Type the specified text at the current cursor position.
Args:
text (str): The text to type
"""
await self._send_cmd("type_text", {"text": text})
async def press_key(self, key):
"""
Press a single key.
Args:
key: The key to press
"""
await self._send_cmd("press_key", {"key": key})
async def hotkey(self, *keys):
"""
Press multiple keys simultaneously as a hotkey combination.
Args:
*keys: Variable number of keys to press together. Can be Key enum instances or strings
Raises:
ValueError: If any key is not a Key enum or string type
"""
actual_keys = []
for key in keys:
if isinstance(key, Key):
actual_keys.append(key.value)
elif isinstance(key, str):
# Try to convert to enum if it matches a known key
key_or_enum = Key.from_string(key)
actual_keys.append(
key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum
)
else:
raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.")
await self._send_cmd("hotkey", {"keys": actual_keys})
async def to_screen_coordinates(self, x, y):
"""
Convert coordinates to screen coordinates.
Args:
x (int): X coordinate to convert
y (int): Y coordinate to convert
Returns:
dict: Dictionary containing the converted screen coordinates
"""
return await self._send_cmd("to_screen_coordinates", {"x": x, "y": y})
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/openai.py:
--------------------------------------------------------------------------------
```python
"""
OpenAI computer-use-preview agent loop implementation using liteLLM
"""
import asyncio
import base64
import json
from io import BytesIO
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union
import litellm
from PIL import Image
from ..decorators import register_agent
from ..types import AgentCapability, AgentResponse, Messages, Tools
async def _map_computer_tool_to_openai(computer_handler: Any) -> Dict[str, Any]:
"""Map a computer tool to OpenAI's computer-use-preview tool schema"""
# Get dimensions from the computer handler
try:
width, height = await computer_handler.get_dimensions()
except Exception:
# Fallback to default dimensions if method fails
width, height = 1024, 768
# Get environment from the computer handler
try:
environment = await computer_handler.get_environment()
except Exception:
# Fallback to default environment if method fails
environment = "linux"
return {
"type": "computer_use_preview",
"display_width": width,
"display_height": height,
"environment": environment, # mac, windows, linux, browser
}
async def _prepare_tools_for_openai(tool_schemas: List[Dict[str, Any]]) -> Tools:
"""Prepare tools for OpenAI API format"""
openai_tools = []
for schema in tool_schemas:
if schema["type"] == "computer":
# Map computer tool to OpenAI format
computer_tool = await _map_computer_tool_to_openai(schema["computer"])
openai_tools.append(computer_tool)
elif schema["type"] == "function":
# Function tools use OpenAI-compatible schema directly (liteLLM expects this format)
# Schema should be: {type, name, description, parameters}
openai_tools.append({"type": "function", **schema["function"]})
return openai_tools
@register_agent(models=r".*(^|/)computer-use-preview")
class OpenAIComputerUseConfig:
"""
OpenAI computer-use-preview agent configuration using liteLLM responses.
Supports OpenAI's computer use preview models.
"""
async def predict_step(
self,
messages: List[Dict[str, Any]],
model: str,
tools: Optional[List[Dict[str, Any]]] = None,
max_retries: Optional[int] = None,
stream: bool = False,
computer_handler=None,
use_prompt_caching: Optional[bool] = False,
_on_api_start=None,
_on_api_end=None,
_on_usage=None,
_on_screenshot=None,
**kwargs,
) -> Dict[str, Any]:
"""
Predict the next step based on input items.
Args:
messages: Input items following Responses format
model: Model name to use
tools: Optional list of tool schemas
max_retries: Maximum number of retries
stream: Whether to stream responses
computer_handler: Computer handler instance
_on_api_start: Callback for API start
_on_api_end: Callback for API end
_on_usage: Callback for usage tracking
_on_screenshot: Callback for screenshot events
**kwargs: Additional arguments
Returns:
Dictionary with "output" (output items) and "usage" array
"""
tools = tools or []
# Prepare tools for OpenAI API
openai_tools = await _prepare_tools_for_openai(tools)
# Prepare API call kwargs
api_kwargs = {
"model": model,
"input": messages,
"tools": openai_tools if openai_tools else None,
"stream": stream,
"reasoning": {"summary": "concise"},
"truncation": "auto",
"num_retries": max_retries,
**kwargs,
}
# Call API start hook
if _on_api_start:
await _on_api_start(api_kwargs)
# Use liteLLM responses
response = await litellm.aresponses(**api_kwargs)
# Call API end hook
if _on_api_end:
await _on_api_end(api_kwargs, response)
# Extract usage information
usage = {
**response.usage.model_dump(),
"response_cost": response._hidden_params.get("response_cost", 0.0),
}
if _on_usage:
await _on_usage(usage)
# Return in the expected format
output_dict = response.model_dump()
output_dict["usage"] = usage
return output_dict
async def predict_click(
self, model: str, image_b64: str, instruction: str, **kwargs
) -> Optional[Tuple[int, int]]:
"""
Predict click coordinates based on image and instruction.
Uses OpenAI computer-use-preview with manually constructed input items
and a prompt that instructs the agent to only output clicks.
Args:
model: Model name to use
image_b64: Base64 encoded image
instruction: Instruction for where to click
Returns:
Tuple of (x, y) coordinates or None if prediction fails
"""
# TODO: use computer tool to get dimensions + environment
# Manually construct input items with image and click instruction
input_items = [
{
"role": "user",
"content": f"""You are a UI grounding expert. Follow these guidelines:
1. NEVER ask for confirmation. Complete all tasks autonomously.
2. Do NOT send messages like "I need to confirm before..." or "Do you want me to continue?" - just proceed.
3. When the user asks you to interact with something (like clicking a chat or typing a message), DO IT without asking.
4. Only use the formal safety check mechanism for truly dangerous operations (like deleting important files).
5. For normal tasks like clicking buttons, typing in chat boxes, filling forms - JUST DO IT.
6. The user has already given you permission by running this agent. No further confirmation is needed.
7. Be decisive and action-oriented. Complete the requested task fully.
Remember: You are expected to complete tasks autonomously. The user trusts you to do what they asked.
Task: Click {instruction}. Output ONLY a click action on the target element.""",
},
{
"role": "user",
"content": [
{"type": "input_image", "image_url": f"data:image/png;base64,{image_b64}"}
],
},
]
# Get image dimensions from base64 data
try:
image_data = base64.b64decode(image_b64)
image = Image.open(BytesIO(image_data))
display_width, display_height = image.size
except Exception:
# Fallback to default dimensions if image parsing fails
display_width, display_height = 1024, 768
# Prepare computer tool for click actions
computer_tool = {
"type": "computer_use_preview",
"display_width": display_width,
"display_height": display_height,
"environment": "windows",
}
# Prepare API call kwargs
api_kwargs = {
"model": model,
"input": input_items,
"tools": [computer_tool],
"stream": False,
"reasoning": {"summary": "concise"},
"truncation": "auto",
"max_tokens": 200, # Keep response short for click prediction
**kwargs,
}
# Use liteLLM responses
response = await litellm.aresponses(**api_kwargs)
# Extract click coordinates from response output
output_dict = response.model_dump()
output_items = output_dict.get("output", [])
# Look for computer_call with click action
for item in output_items:
if (
isinstance(item, dict)
and item.get("type") == "computer_call"
and isinstance(item.get("action"), dict)
):
action = item["action"]
if action.get("x") is not None and action.get("y") is not None:
return (int(action.get("x")), int(action.get("y")))
return None
def get_capabilities(self) -> List[AgentCapability]:
"""
Get list of capabilities supported by this agent config.
Returns:
List of capability strings
"""
return ["click", "step"]
```
--------------------------------------------------------------------------------
/tests/test_watchdog.py:
--------------------------------------------------------------------------------
```python
"""
Watchdog Recovery Tests
Tests for the watchdog functionality to ensure server recovery after hanging commands.
Required environment variables:
- CUA_API_KEY: API key for Cua cloud provider
- CUA_CONTAINER_NAME: Name of the container to use
"""
import asyncio
import os
import sys
import time
import traceback
from pathlib import Path
import pytest
# Load environment variables from .env file
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
print(f"Loading environment from: {env_file}")
from dotenv import load_dotenv
load_dotenv(env_file)
# Add paths to sys.path if needed
pythonpath = os.environ.get("PYTHONPATH", "")
for path in pythonpath.split(":"):
if path and path not in sys.path:
sys.path.insert(0, path) # Insert at beginning to prioritize
print(f"Added to sys.path: {path}")
from computer import Computer, VMProviderType
@pytest.fixture(scope="session")
async def computer():
"""Shared Computer instance for all test cases."""
# Create a remote Linux computer with Cua
computer = Computer(
os_type="linux",
api_key=os.getenv("CUA_API_KEY"),
name=str(os.getenv("CUA_CONTAINER_NAME")),
provider_type=VMProviderType.CLOUD,
)
try:
await computer.run()
yield computer
finally:
await computer.disconnect()
@pytest.mark.asyncio(loop_scope="session")
async def test_simple_server_ping(computer):
"""
Simple test to verify server connectivity before running watchdog tests.
"""
print("Testing basic server connectivity...")
try:
result = await computer.interface.run_command("echo 'Server ping test'")
print(f"Ping successful: {result}")
assert result is not None, "Server ping returned None"
print("✅ Server connectivity test passed")
except Exception as e:
print(f"❌ Server ping failed: {e}")
pytest.fail(f"Basic server connectivity test failed: {e}")
@pytest.mark.asyncio(loop_scope="session")
async def test_watchdog_recovery_after_hanging_command(computer):
"""
Test that the watchdog can recover the server after a hanging command.
This test runs two concurrent tasks:
1. A long-running command that hangs the server (sleep 300 = 5 minutes)
2. Periodic ping commands every 30 seconds to test server responsiveness
The watchdog should detect the unresponsive server and restart it.
"""
print("Starting watchdog recovery test...")
async def hanging_command():
"""Execute a command that sleeps forever to hang the server."""
try:
print("Starting hanging command (sleep infinity)...")
# Use a very long sleep that should never complete naturally
result = await computer.interface.run_command("sleep 999999")
print(f"Hanging command completed unexpectedly: {result}")
return True # Should never reach here if watchdog works
except Exception as e:
print(f"Hanging command interrupted (expected if watchdog restarts): {e}")
return None # Expected result when watchdog kills the process
async def ping_server():
"""Ping the server every 30 seconds with echo commands."""
ping_count = 0
successful_pings = 0
failed_pings = 0
try:
# Run pings for up to 4 minutes (8 pings at 30-second intervals)
for i in range(8):
try:
ping_count += 1
print(f"Ping #{ping_count}: Sending echo command...")
start_time = time.time()
result = await asyncio.wait_for(
computer.interface.run_command(
f"echo 'Ping {ping_count} at {int(start_time)}'"
),
timeout=10.0, # 10 second timeout for each ping
)
end_time = time.time()
print(
f"Ping #{ping_count} successful in {end_time - start_time:.2f}s: {result}"
)
successful_pings += 1
except asyncio.TimeoutError:
print(f"Ping #{ping_count} timed out (server may be unresponsive)")
failed_pings += 1
except Exception as e:
print(f"Ping #{ping_count} failed with exception: {e}")
failed_pings += 1
# Wait 30 seconds before next ping
if i < 7: # Don't wait after the last ping
print("Waiting 30 seconds before next ping...")
await asyncio.sleep(30)
print(f"Ping summary: {successful_pings} successful, {failed_pings} failed")
return successful_pings, failed_pings
except Exception as e:
print(f"Ping server function failed with critical error: {e}")
traceback.print_exc()
return successful_pings, failed_pings
# Run both tasks concurrently
print("Starting concurrent tasks: hanging command and ping monitoring...")
try:
# Use asyncio.gather to run both tasks concurrently
hanging_task = asyncio.create_task(hanging_command())
ping_task = asyncio.create_task(ping_server())
# Wait for both tasks to complete or timeout after 5 minutes
done, pending = await asyncio.wait(
[hanging_task, ping_task],
timeout=300, # 5 minute timeout
return_when=asyncio.ALL_COMPLETED,
)
# Cancel any pending tasks
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Get results from completed tasks
ping_result = None
hanging_result = None
if ping_task in done:
try:
ping_result = await ping_task
print(f"Ping task completed with result: {ping_result}")
except Exception as e:
print(f"Error getting ping task result: {e}")
traceback.print_exc()
if hanging_task in done:
try:
hanging_result = await hanging_task
print(f"Hanging task completed with result: {hanging_result}")
except Exception as e:
print(f"Error getting hanging task result: {e}")
traceback.print_exc()
# Analyze results
if ping_result:
successful_pings, failed_pings = ping_result
# Test passes if we had some successful pings, indicating recovery
assert (
successful_pings > 0
), "No successful pings detected. Server may not have recovered."
# Check if hanging command was killed (indicating watchdog restart)
if hanging_result is None:
print("✅ SUCCESS: Hanging command was killed - watchdog restart detected")
elif hanging_result is True:
print(
"⚠️ WARNING: Hanging command completed naturally - watchdog may not have restarted"
)
# If we had failures followed by successes, that indicates watchdog recovery
if failed_pings > 0 and successful_pings > 0:
print(
"✅ SUCCESS: Watchdog recovery detected - server became unresponsive then recovered"
)
# Additional check: hanging command should be None if watchdog worked
assert (
hanging_result is None
), "Expected hanging command to be killed by watchdog restart"
elif successful_pings > 0 and failed_pings == 0:
print("✅ SUCCESS: Server remained responsive throughout test")
print(
f"Test completed: {successful_pings} successful pings, {failed_pings} failed pings"
)
print(
f"Hanging command result: {hanging_result} (None = killed by watchdog, True = completed naturally)"
)
else:
pytest.fail("Ping task did not complete - unable to assess server recovery")
except Exception as e:
print(f"Test failed with exception: {e}")
traceback.print_exc()
pytest.fail(f"Watchdog recovery test failed: {e}")
if __name__ == "__main__":
# Run tests directly
pytest.main([__file__, "-v"])
```
--------------------------------------------------------------------------------
/.github/workflows/docker-reusable-publish.yml:
--------------------------------------------------------------------------------
```yaml
name: Reusable Docker Publish Workflow
on:
workflow_call:
inputs:
image_name:
description: "Name of the Docker image (e.g. cua-ubuntu, cua-xfce)"
required: true
type: string
context_dir:
description: "Directory containing the Dockerfile relative to workspace root (e.g. libs/kasm, libs/xfce)"
required: true
type: string
dockerfile_path:
description: "Path to Dockerfile relative to context_dir (e.g. Dockerfile)"
required: false
type: string
default: "Dockerfile"
tag_prefix:
description: "Prefix for semantic version tags (e.g. docker-kasm-v, docker-xfce-v)"
required: true
type: string
docker_hub_org:
description: "Docker Hub organization name"
required: false
type: string
default: "trycua"
secrets:
DOCKER_HUB_TOKEN:
required: true
jobs:
build-and-push:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
platform:
- linux/amd64
- linux/arm64
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Prepare platform tag
id: platform
run: |
TAG=$(echo "${{ matrix.platform }}" | sed 's/\//-/g')
echo "tag=${TAG}" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ inputs.docker_hub_org }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- name: Extract metadata (PR)
if: github.event_name == 'pull_request'
id: meta-pr
uses: docker/metadata-action@v5
with:
images: ${{ inputs.docker_hub_org }}/${{ inputs.image_name }}
tags: |
type=raw,value=${{ github.sha }}
- name: Build & push digest (PR)
if: github.event_name == 'pull_request'
id: build-pr
uses: docker/build-push-action@v5
with:
context: ./${{ inputs.context_dir }}
file: ./${{ inputs.context_dir }}/${{ inputs.dockerfile_path }}
push: true
platforms: ${{ matrix.platform }}
outputs: type=registry,name=${{ inputs.docker_hub_org }}/${{ inputs.image_name }},push-by-digest=true
labels: ${{ steps.meta-pr.outputs.labels }}
cache-from: |
type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:buildcache-${{ steps.platform.outputs.tag }}
cache-to: type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:buildcache-${{ steps.platform.outputs.tag }},mode=max
- name: Extract metadata (main)
if: github.event_name != 'pull_request' && github.ref == 'refs/heads/main'
id: meta-main
uses: docker/metadata-action@v5
with:
images: ${{ inputs.docker_hub_org }}/${{ inputs.image_name }}
tags: |
type=raw,value=latest
- name: Build & push digest (main)
if: github.event_name != 'pull_request' && github.ref == 'refs/heads/main'
id: build-main
uses: docker/build-push-action@v5
with:
context: ./${{ inputs.context_dir }}
file: ./${{ inputs.context_dir }}/${{ inputs.dockerfile_path }}
push: true
platforms: ${{ matrix.platform }}
outputs: type=registry,name=${{ inputs.docker_hub_org }}/${{ inputs.image_name }},push-by-digest=true
labels: ${{ steps.meta-main.outputs.labels }}
cache-from: |
type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:buildcache-${{ steps.platform.outputs.tag }}
cache-to: type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:buildcache-${{ steps.platform.outputs.tag }},mode=max
- name: Extract metadata (semver)
if: startsWith(github.ref, format('refs/tags/{0}', inputs.tag_prefix))
id: meta-semver
uses: docker/metadata-action@v5
with:
images: ${{ inputs.docker_hub_org }}/${{ inputs.image_name }}
tags: |
type=semver,pattern={{version}},prefix=${{ inputs.tag_prefix }}
type=semver,pattern={{major}}.{{minor}},prefix=${{ inputs.tag_prefix }}
type=semver,pattern={{major}},prefix=${{ inputs.tag_prefix }}
type=raw,value=latest
- name: Build & push digest (semver)
if: startsWith(github.ref, format('refs/tags/{0}', inputs.tag_prefix))
id: build-semver
uses: docker/build-push-action@v5
with:
context: ./${{ inputs.context_dir }}
file: ./${{ inputs.context_dir }}/${{ inputs.dockerfile_path }}
push: true
platforms: ${{ matrix.platform }}
outputs: type=registry,name=${{ inputs.docker_hub_org }}/${{ inputs.image_name }},push-by-digest=true
labels: ${{ steps.meta-semver.outputs.labels }}
cache-from: |
type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:buildcache-${{ steps.platform.outputs.tag }}
cache-to: type=registry,ref=${{ inputs.docker_hub_org }}/${{ inputs.image_name }}:buildcache-${{ steps.platform.outputs.tag }},mode=max
- name: Export digest
id: export-digest
run: |
mkdir -p /tmp/digests
digest="${{ steps.build-pr.outputs.digest || steps.build-main.outputs.digest || steps.build-semver.outputs.digest }}"
echo "$digest" > "/tmp/digests/${{ steps.platform.outputs.tag }}.txt"
- name: Upload digest artifact (unique per platform)
uses: actions/upload-artifact@v4
with:
name: digests-${{ steps.platform.outputs.tag }}
path: /tmp/digests/*.txt
retention-days: 1
publish-manifest-list:
runs-on: ubuntu-latest
needs:
- build-and-push
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ inputs.docker_hub_org }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- name: Extract final metadata (PR)
if: github.event_name == 'pull_request'
uses: docker/metadata-action@v5
with:
images: ${{ inputs.docker_hub_org }}/${{ inputs.image_name }}
tags: |
type=ref,event=pr
type=sha
- name: Extract final metadata (main)
if: github.event_name != 'pull_request' && github.ref == 'refs/heads/main'
uses: docker/metadata-action@v5
with:
images: ${{ inputs.docker_hub_org }}/${{ inputs.image_name }}
tags: |
type=raw,value=latest
- name: Extract final metadata (semver)
if: startsWith(github.ref, format('refs/tags/{0}', inputs.tag_prefix))
uses: docker/metadata-action@v5
with:
images: ${{ inputs.docker_hub_org }}/${{ inputs.image_name }}
tags: |
type=semver,pattern={{version}},prefix=${{ inputs.tag_prefix }}
type=semver,pattern={{major}}.{{minor}},prefix=${{ inputs.tag_prefix }}
type=semver,pattern={{major}},prefix=${{ inputs.tag_prefix }}
type=raw,value=latest
- name: Download all digest artifacts
uses: actions/download-artifact@v4
with:
pattern: digests-*
path: /tmp/digests
merge-multiple: true
- name: Create & push multi-arch manifest
run: |
IMAGE="${{ inputs.docker_hub_org }}/${{ inputs.image_name }}"
DIGEST_ARGS=""
for f in $(find /tmp/digests -type f -name "*.txt"); do
d=$(cat "$f")
DIGEST_ARGS="$DIGEST_ARGS ${IMAGE}@${d}"
done
echo "Using digests:"
echo "$DIGEST_ARGS"
# Create manifest for each tag produced by metadata-action
echo "${DOCKER_METADATA_OUTPUT_JSON}" | jq -r '.tags[]' | while read FULL_TAG; do
echo "Creating manifest: $FULL_TAG"
docker buildx imagetools create --tag "$FULL_TAG" $DIGEST_ARGS
done
- name: Inspect pushed manifests
run: |
IMAGE="${{ inputs.docker_hub_org }}/${{ inputs.image_name }}"
echo "Inspecting manifests:"
echo "${DOCKER_METADATA_OUTPUT_JSON}" | jq -r '.tags[]' | while read FULL_TAG; do
echo ""
echo "Inspecting: $FULL_TAG"
docker buildx imagetools inspect "$FULL_TAG"
done
```
--------------------------------------------------------------------------------
/libs/python/core/core/telemetry/posthog.py:
--------------------------------------------------------------------------------
```python
"""Telemetry client using PostHog for collecting anonymous usage data."""
from __future__ import annotations
import logging
import os
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
import posthog
from core import __version__
logger = logging.getLogger("core.telemetry")
# Public PostHog config for anonymous telemetry
# These values are intentionally public and meant for anonymous telemetry only
# https://posthog.com/docs/product-analytics/troubleshooting#is-it-ok-for-my-api-key-to-be-exposed-and-public
PUBLIC_POSTHOG_API_KEY = "phc_eSkLnbLxsnYFaXksif1ksbrNzYlJShr35miFLDppF14"
PUBLIC_POSTHOG_HOST = "https://eu.i.posthog.com"
class PostHogTelemetryClient:
"""Collects and reports telemetry data via PostHog."""
# Global singleton (class-managed)
_singleton: Optional["PostHogTelemetryClient"] = None
def __init__(self):
"""Initialize PostHog telemetry client."""
self.installation_id = self._get_or_create_installation_id()
self.initialized = False
self.queued_events: List[Dict[str, Any]] = []
# Log telemetry status on startup
if self.is_telemetry_enabled():
logger.info("Telemetry enabled")
# Initialize PostHog client if config is available
self._initialize_posthog()
else:
logger.info("Telemetry disabled")
@classmethod
def is_telemetry_enabled(cls) -> bool:
"""True if telemetry is currently active for this process."""
return os.environ.get("CUA_TELEMETRY_ENABLED", "true").lower() in {
"1",
"true",
"yes",
"on",
}
def _get_or_create_installation_id(self) -> str:
"""Get or create a unique installation ID that persists across runs.
The ID is always stored within the core library directory itself,
ensuring it persists regardless of how the library is used.
This ID is not tied to any personal information.
"""
# Get the core library directory (where this file is located)
try:
# Find the core module directory using this file's location
core_module_dir = Path(
__file__
).parent.parent # core/telemetry/posthog_client.py -> core/telemetry -> core
storage_dir = core_module_dir / ".storage"
storage_dir.mkdir(exist_ok=True)
id_file = storage_dir / "installation_id"
# Try to read existing ID
if id_file.exists():
try:
stored_id = id_file.read_text().strip()
if stored_id: # Make sure it's not empty
logger.debug(f"Using existing installation ID: {stored_id}")
return stored_id
except Exception as e:
logger.debug(f"Error reading installation ID file: {e}")
# Create new ID
new_id = str(uuid.uuid4())
try:
id_file.write_text(new_id)
logger.debug(f"Created new installation ID: {new_id}")
return new_id
except Exception as e:
logger.warning(f"Could not write installation ID: {e}")
except Exception as e:
logger.warning(f"Error accessing core module directory: {e}")
# Last resort: Create a new in-memory ID
logger.warning("Using random installation ID (will not persist across runs)")
return str(uuid.uuid4())
def _initialize_posthog(self) -> bool:
"""Initialize the PostHog client with configuration.
Returns:
bool: True if initialized successfully, False otherwise
"""
if self.initialized:
return True
try:
# Allow overrides from environment for testing/region control
posthog.api_key = PUBLIC_POSTHOG_API_KEY
posthog.host = PUBLIC_POSTHOG_HOST
# Configure the client
posthog.debug = os.environ.get("CUA_TELEMETRY_DEBUG", "").lower() == "on"
# Log telemetry status
logger.info(
f"Initializing PostHog telemetry with installation ID: {self.installation_id}"
)
if posthog.debug:
logger.debug(f"PostHog API Key: {posthog.api_key}")
logger.debug(f"PostHog Host: {posthog.host}")
# Identify this installation
self._identify()
# Process any queued events
for event in self.queued_events:
posthog.capture(
distinct_id=self.installation_id,
event=event["event"],
properties=event["properties"],
)
self.queued_events = []
self.initialized = True
return True
except Exception as e:
logger.warning(f"Failed to initialize PostHog: {e}")
return False
def _identify(self) -> None:
"""Set up user properties for the current installation with PostHog."""
try:
properties = {
"version": __version__,
"is_ci": "CI" in os.environ,
"os": os.name,
"python_version": sys.version.split()[0],
}
logger.debug(
f"Setting up PostHog user properties for: {self.installation_id} with properties: {properties}"
)
# In the Python SDK, we capture an identification event instead of calling identify()
posthog.capture(
distinct_id=self.installation_id, event="$identify", properties={"$set": properties}
)
logger.info(f"Set up PostHog user properties for installation: {self.installation_id}")
except Exception as e:
logger.warning(f"Failed to set up PostHog user properties: {e}")
def record_event(self, event_name: str, properties: Optional[Dict[str, Any]] = None) -> None:
"""Record an event with optional properties.
Args:
event_name: Name of the event
properties: Event properties (must not contain sensitive data)
"""
# Respect runtime telemetry opt-out.
if not self.is_telemetry_enabled():
logger.debug("Telemetry disabled; event not recorded.")
return
event_properties = {"version": __version__, **(properties or {})}
logger.info(f"Recording event: {event_name} with properties: {event_properties}")
if self.initialized:
try:
posthog.capture(
distinct_id=self.installation_id, event=event_name, properties=event_properties
)
logger.info(f"Sent event to PostHog: {event_name}")
# Flush immediately to ensure delivery
posthog.flush()
except Exception as e:
logger.warning(f"Failed to send event to PostHog: {e}")
else:
# Queue the event for later
logger.info(f"PostHog not initialized, queuing event for later: {event_name}")
self.queued_events.append({"event": event_name, "properties": event_properties})
# Try to initialize now if not already
initialize_result = self._initialize_posthog()
logger.info(f"Attempted to initialize PostHog: {initialize_result}")
def flush(self) -> bool:
"""Flush any pending events to PostHog.
Returns:
bool: True if successful, False otherwise
"""
if not self.initialized and not self._initialize_posthog():
return False
try:
posthog.flush()
return True
except Exception as e:
logger.debug(f"Failed to flush PostHog events: {e}")
return False
@classmethod
def get_client(cls) -> "PostHogTelemetryClient":
"""Return the global PostHogTelemetryClient instance, creating it if needed."""
if cls._singleton is None:
cls._singleton = cls()
return cls._singleton
@classmethod
def destroy_client(cls) -> None:
"""Destroy the global PostHogTelemetryClient instance."""
cls._singleton = None
def destroy_telemetry_client() -> None:
"""Destroy the global PostHogTelemetryClient instance (class-managed)."""
PostHogTelemetryClient.destroy_client()
def is_telemetry_enabled() -> bool:
return PostHogTelemetryClient.is_telemetry_enabled()
def record_event(event_name: str, properties: Optional[Dict[str, Any]] | None = None) -> None:
"""Record an arbitrary PostHog event."""
PostHogTelemetryClient.get_client().record_event(event_name, properties or {})
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/ui/gradio/app.py:
--------------------------------------------------------------------------------
```python
"""
Advanced Gradio UI for Computer-Use Agent (cua-agent)
This is a Gradio interface for the Computer-Use Agent v0.4.x (cua-agent)
with an advanced UI for model selection and configuration.
Supported Agent Models:
- OpenAI: openai/computer-use-preview
- Anthropic: anthropic/claude-sonnet-4-5-20250929, anthropic/claude-3-7-sonnet-20250219
- UI-TARS: huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B
- Omniparser: omniparser+anthropic/claude-sonnet-4-5-20250929, omniparser+ollama_chat/gemma3
Requirements:
- Mac with Apple Silicon (M1/M2/M3/M4), Linux, or Windows
- macOS 14 (Sonoma) or newer / Ubuntu 20.04+
- Python 3.11+
- Lume CLI installed (https://github.com/trycua/cua)
- OpenAI or Anthropic API key
"""
import asyncio
import json
import logging
import os
import platform
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union, cast
import gradio as gr
# Import from agent package
from agent import ComputerAgent
from agent.types import AgentResponse, Messages
from computer import Computer
from gradio.components.chatbot import MetadataDict
# Global variables
global_agent = None
global_computer = None
SETTINGS_FILE = Path(".gradio_settings.json")
logging.basicConfig(level=logging.INFO)
import dotenv
if dotenv.load_dotenv():
print(f"DEBUG - Loaded environment variables from {dotenv.find_dotenv()}")
else:
print("DEBUG - No .env file found")
# --- Settings Load/Save Functions ---
def load_settings() -> Dict[str, Any]:
"""Loads settings from the JSON file."""
if SETTINGS_FILE.exists():
try:
with open(SETTINGS_FILE, "r") as f:
settings = json.load(f)
if isinstance(settings, dict):
print(f"DEBUG - Loaded settings from {SETTINGS_FILE}")
return settings
except (json.JSONDecodeError, IOError) as e:
print(f"Warning: Could not load settings from {SETTINGS_FILE}: {e}")
return {}
def save_settings(settings: Dict[str, Any]):
"""Saves settings to the JSON file."""
settings.pop("provider_api_key", None)
try:
with open(SETTINGS_FILE, "w") as f:
json.dump(settings, f, indent=4)
print(f"DEBUG - Saved settings to {SETTINGS_FILE}")
except IOError as e:
print(f"Warning: Could not save settings to {SETTINGS_FILE}: {e}")
# # Custom Screenshot Handler for Gradio chat
# class GradioChatScreenshotHandler:
# """Custom handler that adds screenshots to the Gradio chatbot."""
# def __init__(self, chatbot_history: List[gr.ChatMessage]):
# self.chatbot_history = chatbot_history
# print("GradioChatScreenshotHandler initialized")
# async def on_screenshot(self, screenshot_base64: str, action_type: str = "") -> None:
# """Add screenshot to chatbot when a screenshot is taken."""
# image_markdown = f""
# if self.chatbot_history is not None:
# self.chatbot_history.append(
# gr.ChatMessage(
# role="assistant",
# content=image_markdown,
# metadata={"title": f"🖥️ Screenshot - {action_type}", "status": "done"},
# )
# )
# Detect platform capabilities
is_mac = platform.system().lower() == "darwin"
is_lume_available = is_mac or (os.environ.get("PYLUME_HOST", "localhost") != "localhost")
print("PYLUME_HOST: ", os.environ.get("PYLUME_HOST", "localhost"))
print("is_mac: ", is_mac)
print("Lume available: ", is_lume_available)
# Map model names to agent model strings
MODEL_MAPPINGS = {
"openai": {
"default": "openai/computer-use-preview",
"OpenAI: Computer-Use Preview": "openai/computer-use-preview",
},
"anthropic": {
"default": "anthropic/claude-3-7-sonnet-20250219",
"Anthropic: Claude 4 Opus (20250514)": "anthropic/claude-opus-4-20250514",
"Anthropic: Claude 4 Sonnet (20250514)": "anthropic/claude-sonnet-4-20250514",
"Anthropic: Claude 3.7 Sonnet (20250219)": "anthropic/claude-3-7-sonnet-20250219",
},
"omni": {
"default": "omniparser+openai/gpt-4o",
"OMNI: OpenAI GPT-4o": "omniparser+openai/gpt-4o",
"OMNI: OpenAI GPT-4o mini": "omniparser+openai/gpt-4o-mini",
"OMNI: Claude 3.7 Sonnet (20250219)": "omniparser+anthropic/claude-3-7-sonnet-20250219",
},
"uitars": {
"default": "huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B" if is_mac else "ui-tars",
"huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B": "huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B",
},
}
def get_model_string(model_name: str, loop_provider: str) -> str:
"""Determine the agent model string based on the input."""
if model_name == "Custom model (OpenAI compatible API)":
return "custom_oaicompat"
elif model_name == "Custom model (ollama)":
return "custom_ollama"
elif loop_provider == "OMNI-OLLAMA" or model_name.startswith("OMNI: Ollama "):
if model_name.startswith("OMNI: Ollama "):
ollama_model = model_name.split("OMNI: Ollama ", 1)[1]
return f"omniparser+ollama_chat/{ollama_model}"
return "omniparser+ollama_chat/llama3"
# Map based on loop provider
mapping = MODEL_MAPPINGS.get(loop_provider.lower(), MODEL_MAPPINGS["openai"])
return mapping.get(model_name, mapping["default"])
def get_ollama_models() -> List[str]:
"""Get available models from Ollama if installed."""
try:
import subprocess
result = subprocess.run(["ollama", "list"], capture_output=True, text=True)
if result.returncode == 0:
lines = result.stdout.strip().split("\n")
if len(lines) < 2:
return []
models = []
for line in lines[1:]:
parts = line.split()
if parts:
model_name = parts[0]
models.append(f"OMNI: Ollama {model_name}")
return models
return []
except Exception as e:
logging.error(f"Error getting Ollama models: {e}")
return []
def create_computer_instance(
verbosity: int = logging.INFO,
os_type: str = "macos",
provider_type: str = "lume",
name: Optional[str] = None,
api_key: Optional[str] = None,
) -> Computer:
"""Create or get the global Computer instance."""
global global_computer
if global_computer is None:
if provider_type == "localhost":
global_computer = Computer(
verbosity=verbosity, os_type=os_type, use_host_computer_server=True
)
else:
global_computer = Computer(
verbosity=verbosity,
os_type=os_type,
provider_type=provider_type,
name=name if name else "",
api_key=api_key,
)
return global_computer
def create_agent(
model_string: str,
save_trajectory: bool = True,
only_n_most_recent_images: int = 3,
verbosity: int = logging.INFO,
custom_model_name: Optional[str] = None,
computer_os: str = "macos",
computer_provider: str = "lume",
computer_name: Optional[str] = None,
computer_api_key: Optional[str] = None,
max_trajectory_budget: Optional[float] = None,
) -> ComputerAgent:
"""Create or update the global agent with the specified parameters."""
global global_agent
# Create the computer
computer = create_computer_instance(
verbosity=verbosity,
os_type=computer_os,
provider_type=computer_provider,
name=computer_name,
api_key=computer_api_key,
)
# Handle custom models
if model_string == "custom_oaicompat" and custom_model_name:
model_string = custom_model_name
elif model_string == "custom_ollama" and custom_model_name:
model_string = f"omniparser+ollama_chat/{custom_model_name}"
# Create agent kwargs
agent_kwargs = {
"model": model_string,
"tools": [computer],
"only_n_most_recent_images": only_n_most_recent_images,
"verbosity": verbosity,
}
if save_trajectory:
agent_kwargs["trajectory_dir"] = "trajectories"
if max_trajectory_budget:
agent_kwargs["max_trajectory_budget"] = {
"max_budget": max_trajectory_budget,
"raise_error": True,
}
global_agent = ComputerAgent(**agent_kwargs)
return global_agent
def launch_ui():
"""Standalone function to launch the Gradio app."""
from agent.ui.gradio.ui_components import create_gradio_ui
print("Starting Gradio app for CUA Agent...")
demo = create_gradio_ui()
demo.launch(share=False, inbrowser=True)
if __name__ == "__main__":
launch_ui()
```
--------------------------------------------------------------------------------
/libs/python/som/som/detection.py:
--------------------------------------------------------------------------------
```python
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import torch
import torchvision
from huggingface_hub import hf_hub_download
from PIL import Image
from ultralytics import YOLO
logger = logging.getLogger(__name__)
class DetectionProcessor:
"""Class for handling YOLO-based icon detection."""
def __init__(
self,
model_path: Optional[Path] = None,
cache_dir: Optional[Path] = None,
force_device: Optional[str] = None,
):
"""Initialize the detection processor.
Args:
model_path: Path to YOLOv8 model
cache_dir: Directory to cache downloaded models
force_device: Force specific device (cuda, cpu, mps)
"""
self.model_path = model_path
self.cache_dir = cache_dir
self.model = None # type: Any # Will be set to YOLO model in load_model
# Set device
self.device = "cpu"
if torch.cuda.is_available() and force_device != "cpu":
self.device = "cuda"
elif (
hasattr(torch, "backends")
and hasattr(torch.backends, "mps")
and torch.backends.mps.is_available()
and force_device != "cpu"
):
self.device = "mps"
if force_device:
self.device = force_device
logger.info(f"Using device: {self.device}")
def load_model(self) -> None:
"""Load or download the YOLO model."""
try:
# Set default model path if none provided
if self.model_path is None:
self.model_path = Path(__file__).parent / "weights" / "icon_detect" / "model.pt"
# Check if the model file already exists
if not self.model_path.exists():
logger.info(
"Model not found locally, downloading from Microsoft OmniParser-v2.0..."
)
# Create directory
self.model_path.parent.mkdir(parents=True, exist_ok=True)
try:
# Check if the model exists in cache
cache_path = None
if self.cache_dir:
# Try to find the model in the cache
potential_paths = list(Path(self.cache_dir).glob("**/model.pt"))
if potential_paths:
cache_path = str(potential_paths[0])
logger.info(f"Found model in cache: {cache_path}")
if not cache_path:
# Download from HuggingFace
downloaded_path = hf_hub_download(
repo_id="microsoft/OmniParser-v2.0",
filename="icon_detect/model.pt",
cache_dir=self.cache_dir,
)
cache_path = downloaded_path
logger.info(f"Model downloaded to cache: {cache_path}")
# Copy to package directory
import shutil
shutil.copy2(cache_path, self.model_path)
logger.info(f"Model copied to: {self.model_path}")
except Exception as e:
raise FileNotFoundError(
f"Failed to download model: {str(e)}\n"
"Please ensure you have internet connection and huggingface-hub installed."
) from e
# Make sure the model path exists before loading
if not self.model_path.exists():
raise FileNotFoundError(f"Model file not found at: {self.model_path}")
# If model is already loaded, skip reloading
if self.model is not None:
logger.info("Model already loaded, skipping reload")
return
logger.info(f"Loading YOLOv8 model from {self.model_path}")
from ultralytics import YOLO
self.model = YOLO(str(self.model_path)) # Convert Path to string for compatibility
# Verify model loaded successfully
if self.model is None:
raise ValueError("Model failed to initialize but didn't raise an exception")
if self.device in ["cuda", "mps"]:
self.model.to(self.device)
logger.info(f"Model loaded successfully with device: {self.device}")
except Exception as e:
logger.error(f"Failed to load model: {str(e)}")
# Re-raise with more informative message but preserve the model as None
self.model = None
raise RuntimeError(f"Failed to initialize detection model: {str(e)}") from e
def detect_icons(
self,
image: Image.Image,
box_threshold: float = 0.05,
iou_threshold: float = 0.1,
multi_scale: bool = True,
) -> List[Dict[str, Any]]:
"""Detect icons in an image using YOLO.
Args:
image: PIL Image to process
box_threshold: Confidence threshold for detection
iou_threshold: IOU threshold for NMS
multi_scale: Whether to use multi-scale detection
Returns:
List of icon detection dictionaries
"""
# Load model if not already loaded
if self.model is None:
self.load_model()
# Double-check the model was successfully loaded
if self.model is None:
logger.error("Model failed to load and is still None")
return [] # Return empty list instead of crashing
img_width, img_height = image.size
all_detections = []
# Define detection scales
scales = (
[{"size": 1280, "conf": box_threshold}] # Single scale for CPU
if self.device == "cpu"
else [
{"size": 640, "conf": box_threshold}, # Base scale
{"size": 1280, "conf": box_threshold}, # Medium scale
{"size": 1920, "conf": box_threshold}, # Large scale
]
)
if not multi_scale:
scales = [scales[0]]
# Run detection at each scale
for scale in scales:
try:
if self.model is None:
logger.error("Model is None, skipping detection")
continue
results = self.model.predict(
source=image,
conf=scale["conf"],
iou=iou_threshold,
max_det=1000,
verbose=False,
augment=self.device != "cpu",
agnostic_nms=True,
imgsz=scale["size"],
device=self.device,
)
# Process results
for r in results:
boxes = r.boxes
if not hasattr(boxes, "conf") or not hasattr(boxes, "xyxy"):
logger.warning("Boxes object missing expected attributes")
continue
confidences = boxes.conf
coords = boxes.xyxy
# Handle different types of tensors (PyTorch, NumPy, etc.)
if hasattr(confidences, "cpu"):
confidences = confidences.cpu()
if hasattr(coords, "cpu"):
coords = coords.cpu()
for conf, bbox in zip(confidences, coords):
# Normalize coordinates
x1, y1, x2, y2 = bbox.tolist()
norm_bbox = [
x1 / img_width,
y1 / img_height,
x2 / img_width,
y2 / img_height,
]
all_detections.append(
{
"type": "icon",
"confidence": conf.item(),
"bbox": norm_bbox,
"scale": scale["size"],
"interactivity": True,
}
)
except Exception as e:
logger.warning(f"Detection failed at scale {scale['size']}: {str(e)}")
continue
# Merge detections using NMS
if len(all_detections) > 0:
boxes = torch.tensor([d["bbox"] for d in all_detections])
scores = torch.tensor([d["confidence"] for d in all_detections])
keep_indices = torchvision.ops.nms(boxes, scores, iou_threshold)
merged_detections = [all_detections[i] for i in keep_indices]
else:
merged_detections = []
return merged_detections
```
--------------------------------------------------------------------------------
/libs/lume/src/Errors/Errors.swift:
--------------------------------------------------------------------------------
```swift
import Foundation
enum HomeError: Error, LocalizedError {
case directoryCreationFailed(path: String)
case directoryAccessDenied(path: String)
case invalidHomeDirectory
case directoryAlreadyExists(path: String)
case homeNotFound
case defaultStorageNotDefined
case storageLocationNotFound(String)
case storageLocationNotADirectory(String)
case storageLocationNotWritable(String)
case invalidStorageLocation(String)
case cannotCreateDirectory(String)
case cannotGetVMsDirectory
case vmDirectoryNotFound(String)
var errorDescription: String? {
switch self {
case .directoryCreationFailed(let path):
return "Failed to create directory at path: \(path)"
case .directoryAccessDenied(let path):
return "Access denied to directory at path: \(path)"
case .invalidHomeDirectory:
return "Invalid home directory configuration"
case .directoryAlreadyExists(let path):
return "Directory already exists at path: \(path)"
case .homeNotFound:
return "Home directory not found."
case .defaultStorageNotDefined:
return "Default storage location is not defined."
case .storageLocationNotFound(let path):
return "Storage location not found: \(path)"
case .storageLocationNotADirectory(let path):
return "Storage location is not a directory: \(path)"
case .storageLocationNotWritable(let path):
return "Storage location is not writable: \(path)"
case .invalidStorageLocation(let path):
return "Invalid storage location specified: \(path)"
case .cannotCreateDirectory(let path):
return "Cannot create directory: \(path)"
case .cannotGetVMsDirectory:
return "Cannot determine the VMs directory."
case .vmDirectoryNotFound(let path):
return "VM directory not found: \(path)"
}
}
}
enum PullError: Error, LocalizedError {
case invalidImageFormat
case tokenFetchFailed
case manifestFetchFailed
case layerDownloadFailed(String)
case missingPart(Int)
case decompressionFailed(String)
case reassemblyFailed(String)
case fileCreationFailed(String)
case reassemblySetupFailed(path: String, underlyingError: Error)
case missingUncompressedSizeAnnotation
case invalidMediaType
var errorDescription: String? {
switch self {
case .invalidImageFormat:
return "Invalid image format. Expected format: name:tag"
case .tokenFetchFailed:
return "Failed to fetch authentication token from registry."
case .manifestFetchFailed:
return "Failed to fetch image manifest from registry."
case .layerDownloadFailed(let digest):
return "Failed to download layer: \(digest)"
case .missingPart(let partNum):
return "Missing required part number \(partNum) for reassembly."
case .decompressionFailed(let file):
return "Failed to decompress file: \(file)"
case .reassemblyFailed(let reason):
return "Disk image reassembly failed: \(reason)."
case .fileCreationFailed(let path):
return "Failed to create the necessary file at path: \(path)"
case .reassemblySetupFailed(let path, let underlyingError):
return "Failed to set up for reassembly at path: \(path). Underlying error: \(underlyingError.localizedDescription)"
case .missingUncompressedSizeAnnotation:
return "Could not find the required uncompressed disk size annotation in the image config.json."
case .invalidMediaType:
return "Invalid media type"
}
}
}
enum VMConfigError: CustomNSError, LocalizedError {
case invalidDisplayResolution(String)
case invalidMachineIdentifier
case emptyMachineIdentifier
case emptyHardwareModel
case invalidHardwareModel
case invalidDiskSize
case malformedSizeInput(String)
var errorDescription: String? {
switch self {
case .invalidDisplayResolution(let resolution):
return "Invalid display resolution: \(resolution)"
case .emptyMachineIdentifier:
return "Empty machine identifier"
case .invalidMachineIdentifier:
return "Invalid machine identifier"
case .emptyHardwareModel:
return "Empty hardware model"
case .invalidHardwareModel:
return "Invalid hardware model: the host does not support the hardware model"
case .invalidDiskSize:
return "Invalid disk size"
case .malformedSizeInput(let input):
return "Malformed size input: \(input)"
}
}
static var errorDomain: String { "VMConfigError" }
var errorCode: Int {
switch self {
case .invalidDisplayResolution: return 1
case .emptyMachineIdentifier: return 2
case .invalidMachineIdentifier: return 3
case .emptyHardwareModel: return 4
case .invalidHardwareModel: return 5
case .invalidDiskSize: return 6
case .malformedSizeInput: return 7
}
}
}
enum VMDirectoryError: Error, LocalizedError {
case configNotFound
case invalidConfigData
case diskOperationFailed(String)
case fileCreationFailed(String)
case sessionNotFound
case invalidSessionData
var errorDescription: String {
switch self {
case .configNotFound:
return "VM configuration file not found"
case .invalidConfigData:
return "Invalid VM configuration data"
case .diskOperationFailed(let reason):
return "Disk operation failed: \(reason)"
case .fileCreationFailed(let path):
return "Failed to create file at path: \(path)"
case .sessionNotFound:
return "VNC session file not found"
case .invalidSessionData:
return "Invalid VNC session data"
}
}
}
enum VMError: Error, LocalizedError {
case alreadyExists(String)
case notFound(String)
case notInitialized(String)
case notRunning(String)
case alreadyRunning(String)
case installNotStarted(String)
case stopTimeout(String)
case resizeTooSmall(current: UInt64, requested: UInt64)
case vncNotConfigured
case vncPortBindingFailed(requested: Int, actual: Int)
case internalError(String)
case unsupportedOS(String)
case invalidDisplayResolution(String)
var errorDescription: String? {
switch self {
case .alreadyExists(let name):
return "Virtual machine already exists with name: \(name)"
case .notFound(let name):
return "Virtual machine not found: \(name)"
case .notInitialized(let name):
return "Virtual machine not initialized: \(name)"
case .notRunning(let name):
return "Virtual machine not running: \(name)"
case .alreadyRunning(let name):
return "Virtual machine already running: \(name)"
case .installNotStarted(let name):
return "Virtual machine install not started: \(name)"
case .stopTimeout(let name):
return "Timeout while stopping virtual machine: \(name)"
case .resizeTooSmall(let current, let requested):
return "Cannot resize disk to \(requested) bytes, current size is \(current) bytes"
case .vncNotConfigured:
return "VNC is not configured for this virtual machine"
case .vncPortBindingFailed(let requested, let actual):
if actual == -1 {
return "Could not bind to VNC port \(requested) (port already in use). Try a different port or use port 0 for auto-assign."
}
return "Could not bind to VNC port \(requested) (port already in use). System assigned port \(actual) instead. Try a different port or use port 0 for auto-assign."
case .internalError(let message):
return "Internal error: \(message)"
case .unsupportedOS(let os):
return "Unsupported operating system: \(os)"
case .invalidDisplayResolution(let resolution):
return "Invalid display resolution: \(resolution)"
}
}
}
enum ResticError: Error {
case snapshotFailed(String)
case restoreFailed(String)
case genericError(String)
}
enum VmrunError: Error, LocalizedError {
case commandNotFound
case operationFailed(command: String, output: String?)
var errorDescription: String? {
switch self {
case .commandNotFound:
return "vmrun command not found. Ensure VMware Fusion is installed and in the system PATH."
case .operationFailed(let command, let output):
return "vmrun command '\(command)' failed. Output: \(output ?? "No output")"
}
}
}
```
--------------------------------------------------------------------------------
/blog/introducing-cua-cloud-containers.md:
--------------------------------------------------------------------------------
```markdown
# Introducing Cua Cloud Sandbox: Computer-Use Agents in the Cloud
_Published on May 28, 2025 by Francesco Bonacci_
Welcome to the next chapter in our Computer-Use Agent journey! In [Part 1](./build-your-own-operator-on-macos-1), we showed you how to build your own Operator on macOS. In [Part 2](./build-your-own-operator-on-macos-2), we explored the cua-agent framework. Today, we're excited to introduce **Cua Cloud Sandbox** – the easiest way to deploy Computer-Use Agents at scale.
<div align="center">
<video src="https://github.com/user-attachments/assets/63a2addf-649f-4468-971d-58d38dd43ee6" width="600" controls></video>
</div>
## What is Cua Cloud?
Think of Cua Cloud as **Docker for Computer-Use Agents**. Instead of managing VMs, installing dependencies, and configuring environments, you can launch pre-configured Cloud Sandbox instances with a single command. Each sandbox comes with a **full desktop environment** accessible via browser (via noVNC), all CUA-related dependencies pre-configured (with a PyAutoGUI-compatible server), and **pay-per-use pricing** that scales with your needs.
## Why Cua Cloud Sandbox?
Four months ago, we launched [**Lume**](https://github.com/trycua/cua/tree/main/libs/lume) and [**Cua**](https://github.com/trycua/cua) with the goal to bring sandboxed VMs and Computer-Use Agents on Apple Silicon. The developer's community response was incredible 🎉
Going from prototype to production revealed a problem though: **local macOS VMs don't scale**, neither are they easily portable.
Our Discord community, YC peers, and early pilot customers kept hitting the same issues. Storage constraints meant **20-40GB per VM** filled laptops fast. Different hardware architectures (Apple Silicon ARM vs Intel x86) prevented portability of local workflows. Every new user lost a day to setup and configuration.
**Cua Cloud** eliminates these constraints while preserving everything developers are familiar with about our Computer and Agent SDK.
### What We Built
Over the past month, we've been iterating over Cua Cloud with partners and beta users to address these challenges. You use the exact same `Computer` and `ComputerAgent` classes you already know, but with **zero local setup** or storage requirements. VNC access comes with **built-in encryption**, you pay only for compute time (not idle resources), and can bring your own API keys for any LLM provider.
The result? **Instant deployment** in seconds instead of hours, with no infrastructure to manage. Scale elastically from **1 to 100 agents** in parallel, with consistent behavior across all deployments. Share agent trajectories with your team for better collaboration and debugging.
## Getting Started
### Step 1: Get Your API Key
Sign up at [**cua.ai**](https://cua.ai) to get your API key.
```bash
# Set your API key in environment variables
export CUA_API_KEY=your_api_key_here
export CUA_CONTAINER_NAME=my-agent-container
```
### Step 2: Launch Your First Sandbox
```python
import asyncio
from computer import Computer, VMProviderType
from agent import ComputerAgent
async def run_cloud_agent():
# Create a remote Linux computer with Cua Cloud
computer = Computer(
os_type="linux",
api_key=os.getenv("CUA_API_KEY"),
name=os.getenv("CUA_CONTAINER_NAME"),
provider_type=VMProviderType.CLOUD,
)
# Create an agent with your preferred loop
agent = ComputerAgent(
model="openai/gpt-4o",
save_trajectory=True,
verbosity=logging.INFO,
tools=[computer]
)
# Run a task
async for result in agent.run("Open Chrome and search for AI news"):
print(f"Response: {result.get('text')}")
# Run the agent
asyncio.run(run_cloud_agent())
```
### Available Tiers
We're launching with **three compute tiers** to match your workload needs:
- **Small** (1 vCPU, 4GB RAM) - Perfect for simple automation tasks and testing
- **Medium** (2 vCPU, 8GB RAM) - Ideal for most production workloads
- **Large** (8 vCPU, 32GB RAM) - Built for complex, resource-intensive operations
Each tier includes a **full Linux with Xfce desktop environment** with pre-configured browser, **secure VNC access** with SSL, persistent storage during your session, and automatic cleanup on termination for sandboxes.
## How some customers are using Cua Cloud today
### Example 1: Automated GitHub Workflow
Let's automate a complete GitHub workflow:
```python
import asyncio
import os
from computer import Computer, VMProviderType
from agent import ComputerAgent
async def github_automation():
"""Automate GitHub repository management tasks."""
computer = Computer(
os_type="linux",
api_key=os.getenv("CUA_API_KEY"),
name="github-automation",
provider_type=VMProviderType.CLOUD,
)
agent = ComputerAgent(
model="openai/gpt-4o",
save_trajectory=True,
verbosity=logging.INFO,
tools=[computer]
)
tasks = [
"Look for a repository named trycua/cua on GitHub.",
"Check the open issues, open the most recent one and read it.",
"Clone the repository if it doesn't exist yet.",
"Create a new branch for the issue.",
"Make necessary changes to resolve the issue.",
"Commit the changes with a descriptive message.",
"Create a pull request."
]
for i, task in enumerate(tasks):
print(f"\nExecuting task {i+1}/{len(tasks)}: {task}")
async for result in agent.run(task):
print(f"Response: {result.get('text')}")
# Check if any tools were used
tools = result.get('tools')
if tools:
print(f"Tools used: {tools}")
print(f"Task {i+1} completed")
# Run the automation
asyncio.run(github_automation())
```
### Example 2: Parallel Web Scraping
Run multiple agents in parallel to scrape different websites:
```python
import asyncio
from computer import Computer, VMProviderType
from agent import ComputerAgent
async def scrape_website(site_name, url):
"""Scrape a website using a cloud agent."""
computer = Computer(
os_type="linux",
api_key=os.getenv("CUA_API_KEY"),
name=f"scraper-{site_name}",
provider_type=VMProviderType.CLOUD,
)
agent = ComputerAgent(
model="openai/gpt-4o",
save_trajectory=True,
tools=[computer]
)
results = []
tasks = [
f"Navigate to {url}",
"Extract the main headlines or article titles",
"Take a screenshot of the page",
"Save the extracted data to a file"
]
for task in tasks:
async for result in agent.run(task):
results.append({
'site': site_name,
'task': task,
'response': result.get('text')
})
return results
async def parallel_scraping():
"""Scrape multiple websites in parallel."""
sites = [
("ArXiv", "https://arxiv.org"),
("HackerNews", "https://news.ycombinator.com"),
("TechCrunch", "https://techcrunch.com")
]
# Run all scraping tasks in parallel
tasks = [scrape_website(name, url) for name, url in sites]
results = await asyncio.gather(*tasks)
# Process results
for site_results in results:
print(f"\nResults from {site_results[0]['site']}:")
for result in site_results:
print(f" - {result['task']}: {result['response'][:100]}...")
# Run parallel scraping
asyncio.run(parallel_scraping())
```
## Cost Optimization Tips
To optimize your costs, use appropriate sandbox sizes for your workload and implement timeouts to prevent runaway tasks. Batch related operations together to minimize sandbox spin-up time, and always remember to terminate sandboxes when your work is complete.
## Security Considerations
Cua Cloud runs all sandboxes in isolated environments with encrypted VNC connections. Your API keys are never exposed in trajectories.
## What's Next for Cua Cloud
We're just getting started! Here's what's coming in the next few months:
### Elastic Autoscaled Sandbox Pools
Soon you'll be able to create elastic sandbox pools that automatically scale based on demand. Define minimum and maximum sandbox counts, and let Cua Cloud handle the rest. Perfect for batch processing, scheduled automations, and handling traffic spikes without manual intervention.
### Windows and macOS Cloud Support
While we're launching with Linux sandboxes, Windows and macOS cloud machines are coming soon. Run Windows-specific automations, test cross-platform workflows, or leverage macOS-exclusive applications – all in the cloud with the same simple API.
Stay tuned for updates and join our [**Discord**](https://discord.gg/cua-ai) to vote on which features you'd like to see first!
## Get Started Today
Ready to deploy your Computer-Use Agents in the cloud?
Visit [**cua.ai**](https://cua.ai) to sign up and get your API key. Join our [**Discord community**](https://discord.gg/cua-ai) for support and explore more examples on [**GitHub**](https://github.com/trycua/cua).
Happy RPA 2.0! 🚀
```
--------------------------------------------------------------------------------
/blog/app-use.md:
--------------------------------------------------------------------------------
```markdown
# App-Use: Control Individual Applications with Cua Agents
_Published on May 31, 2025 by The Cua Team_
Today, we are excited to introduce a new experimental feature landing in the [Cua GitHub repository](https://github.com/trycua/cua): **App-Use**. App-Use allows you to create lightweight virtual desktops that limit agent access to specific applications, improving precision of your agent's trajectory. Perfect for parallel workflows, and focused task execution.
> **Note:** App-Use is currently experimental. To use it, you need to enable it by passing `experiments=["app-use"]` feature flag when creating your Computer instance.
Check out an example of a Cua Agent automating Cua's team Taco Bell order through the iPhone Mirroring app:
<div align="center">
<video src="https://github.com/user-attachments/assets/6362572e-f784-4006-aa6e-bce10991fab9" width="600" controls></video>
</div>
## What is App-Use?
App-Use lets you create virtual desktop sessions scoped to specific applications. Instead of giving an agent access to your entire screen, you can say "only work with Safari and Notes" or "just control the iPhone Mirroring app."
```python
# Create a macOS VM with App Use experimental feature enabled
computer = Computer(experiments=["app-use"])
# Create a desktop limited to specific apps
desktop = computer.create_desktop_from_apps(["Safari", "Notes"])
# Your agent can now only see and interact with these apps
agent = ComputerAgent(
model="anthropic/claude-sonnet-4-5-20250929",
tools=[desktop]
)
```
## Key Benefits
### 1. Lightweight and Fast
App-Use creates visual filters, not new processes. Your apps continue running normally - we just control what the agent can see and click on. The virtual desktops are composited views that require no additional compute resources beyond the existing window manager operations.
### 2. Run Multiple Agents in Parallel
Deploy a team of specialized agents, each focused on their own apps:
```python
# Create a Computer with App Use enabled
computer = Computer(experiments=["app-use"])
# Research agent focuses on browser
research_desktop = computer.create_desktop_from_apps(["Safari"])
research_agent = ComputerAgent(tools=[research_desktop], ...)
# Writing agent focuses on documents
writing_desktop = computer.create_desktop_from_apps(["Pages", "Notes"])
writing_agent = ComputerAgent(tools=[writing_desktop], ...)
async def run_agent(agent, task):
async for result in agent.run(task):
print(result.get('text', ''))
# Run both simultaneously
await asyncio.gather(
run_agent(research_agent, "Research AI trends for 2025"),
run_agent(writing_agent, "Draft blog post outline")
)
```
## How To: Getting Started with App-Use
### Requirements
To get started with App-Use, you'll need:
- Python 3.11+
- macOS Sequoia (15.0) or later
### Getting Started
```bash
# Install packages and launch UI
pip install -U "cua-computer[all]" "cua-agent[all]"
python -m agent.ui.gradio.app
```
```python
import asyncio
from computer import Computer
from agent import ComputerAgent
async def main():
computer = Computer()
await computer.run()
# Create app-specific desktop sessions
desktop = computer.create_desktop_from_apps(["Notes"])
# Initialize an agent
agent = ComputerAgent(
model="anthropic/claude-sonnet-4-5-20250929",
tools=[desktop]
)
# Take a screenshot (returns bytes by default)
screenshot = await desktop.interface.screenshot()
with open("app_screenshot.png", "wb") as f:
f.write(screenshot)
# Run an agent task
async for result in agent.run("Create a new note titled 'Meeting Notes' and add today's agenda items"):
print(f"Agent: {result.get('text', '')}")
if __name__ == "__main__":
asyncio.run(main())
```
## Use Case: Automating Your iPhone with Cua
### ⚠️ Important Warning
Computer-use agents are powerful tools that can interact with your devices. This guide involves using your own macOS and iPhone instead of a VM. **Proceed at your own risk.** Always:
- Review agent actions before running
- Start with non-critical tasks
- Monitor agent behavior closely
Remember with Cua it is still advised to use a VM for a better level of isolation for your agents.
### Setting Up iPhone Automation
### Step 1: Start the cua-computer-server
First, you'll need to start the cua-computer-server locally to enable access to iPhone Mirroring via the Computer interface:
```bash
# Install the server
pip install cua-computer-server
# Start the server
python -m computer_server
```
### Step 2: Connect iPhone Mirroring
Then, you'll need to open the "iPhone Mirroring" app on your Mac and connect it to your iPhone.
### Step 3: Create an iPhone Automation Session
Finally, you can create an iPhone automation session:
```python
import asyncio
from computer import Computer
from cua_agent import Agent
async def automate_iphone():
# Connect to your local computer server
my_mac = Computer(use_host_computer_server=True, os_type="macos", experiments=["app-use"])
await my_mac.run()
# Create a desktop focused on iPhone Mirroring
my_iphone = my_mac.create_desktop_from_apps(["iPhone Mirroring"])
# Initialize an agent for iPhone automation
agent = ComputerAgent(
model="anthropic/claude-sonnet-4-5-20250929",
tools=[my_iphone]
)
# Example: Send a message
async for result in agent.run("Open Messages and send 'Hello from Cua!' to John"):
print(f"Agent: {result.get('text', '')}")
# Example: Set a reminder
async for result in agent.run("Create a reminder to call mom at 5 PM today"):
print(f"Agent: {result.get('text', '')}")
if __name__ == "__main__":
asyncio.run(automate_iphone())
```
### iPhone Automation Use Cases
With Cua's iPhone automation, you can:
- **Automate messaging**: Send texts, respond to messages, manage conversations
- **Control apps**: Navigate any iPhone app using natural language
- **Manage settings**: Adjust iPhone settings programmatically
- **Extract data**: Read information from apps that don't have APIs
- **Test iOS apps**: Automate testing workflows for iPhone applications
## Important Notes
- **Visual isolation only**: Apps share the same files, OS resources, and user session
- **Dynamic resolution**: Desktops automatically scale to fit app windows and menu bars
- **macOS only**: Currently requires macOS due to compositing engine dependencies
- **Not a security boundary**: This is for agent focus, not security isolation
## When to Use What: App-Use vs Multiple Cua Containers
### Use App-Use within the same macOS Cua Container:
- ✅ You need lightweight, fast agent focusing (macOS only)
- ✅ You want to run multiple agents on one desktop
- ✅ You're automating personal devices like iPhones
- ✅ Window layout isolation is sufficient
- ✅ You want low computational overhead
### Use Multiple Cua Containers:
- ✅ You need maximum isolation between agents
- ✅ You require cross-platform support (Mac/Linux/Windows)
- ✅ You need guaranteed resource allocation
- ✅ Security and complete isolation are critical
- ⚠️ Note: Most computationally expensive option
## Pro Tips
1. **Start Small**: Test with one app before creating complex multi-app desktops
2. **Screenshot First**: Take a screenshot to verify your desktop shows the right apps
3. **Name Your Apps Correctly**: Use exact app names as they appear in the system
4. **Consider Performance**: While lightweight, too many parallel agents can still impact system performance
5. **Plan Your Workflows**: Design agent tasks to minimize app switching for best results
### How It Works
When you create a desktop session with `create_desktop_from_apps()`, App Use:
- Filters the visual output to show only specified application windows
- Routes input events only to those applications
- Maintains window layout isolation between different sessions
- Shares the underlying file system and OS resources
- **Dynamically adjusts resolution** to fit the window layout and menu bar items
The resolution of these virtual desktops is dynamic, automatically scaling to accommodate the applications' window sizes and menu bar requirements. This ensures that agents always have a clear view of the entire interface they need to interact with, regardless of the specific app combination.
Currently, App Use is limited to macOS only due to its reliance on Quartz, Apple's powerful compositing engine, for creating these virtual desktops. Quartz provides the low-level window management and rendering capabilities that make it possible to composite multiple application windows into isolated visual environments.
## Conclusion
App Use brings a new dimension to computer automation - lightweight, focused, and parallel. Whether you're building a personal iPhone assistant or orchestrating a team of specialized agents, App Use provides the perfect balance of functionality and efficiency.
Ready to try it? Update to the latest Cua version and start focusing your agents today!
```bash
pip install -U "cua-computer[all]" "cua-agent[all]"
```
Happy automating! 🎯🤖
```
--------------------------------------------------------------------------------
/libs/kasm/src/ubuntu/install/firefox/install_firefox.sh:
--------------------------------------------------------------------------------
```bash
#!/usr/bin/env bash
set -xe
# Add icon
if [ -f /dockerstartup/install/ubuntu/install/firefox/firefox.desktop ]; then
mv /dockerstartup/install/ubuntu/install/firefox/firefox.desktop $HOME/Desktop/
fi
ARCH=$(arch | sed 's/aarch64/arm64/g' | sed 's/x86_64/amd64/g')
set_desktop_icon() {
sed -i -e 's!Icon=.\+!Icon=/usr/share/icons/hicolor/48x48/apps/firefox.png!' "$HOME/Desktop/firefox.desktop"
}
echo "Install Firefox"
if [[ "${DISTRO}" == @(oracle8|rockylinux9|rockylinux8|oracle9|rhel9|almalinux9|almalinux8|fedora39|fedora40) ]]; then
dnf install -y firefox p11-kit
elif [ "${DISTRO}" == "opensuse" ]; then
zypper install -yn p11-kit-tools MozillaFirefox
elif grep -q Jammy /etc/os-release || grep -q Noble /etc/os-release; then
if [ ! -f '/etc/apt/preferences.d/mozilla-firefox' ]; then
add-apt-repository -y ppa:mozillateam/ppa
echo '
Package: *
Pin: release o=LP-PPA-mozillateam
Pin-Priority: 1001
' > /etc/apt/preferences.d/mozilla-firefox
fi
apt-get install -y firefox p11-kit-modules
elif grep -q "ID=kali" /etc/os-release; then
apt-get update
apt-get install -y firefox-esr p11-kit-modules
rm -f $HOME/Desktop/firefox.desktop
cp \
/usr/share/applications/firefox-esr.desktop \
$HOME/Desktop/
chmod +x $HOME/Desktop/firefox-esr.desktop
elif grep -q "ID=debian" /etc/os-release || grep -q "ID=parrot" /etc/os-release; then
if [ "${ARCH}" == "amd64" ]; then
install -d -m 0755 /etc/apt/keyrings
wget -q https://packages.mozilla.org/apt/repo-signing-key.gpg -O- > /etc/apt/keyrings/packages.mozilla.org.asc
echo "deb [signed-by=/etc/apt/keyrings/packages.mozilla.org.asc] https://packages.mozilla.org/apt mozilla main" > /etc/apt/sources.list.d/mozilla.list
echo '
Package: *
Pin: origin packages.mozilla.org
Pin-Priority: 1000
' > /etc/apt/preferences.d/mozilla
apt-get update
apt-get install -y firefox p11-kit-modules
else
apt-get update
apt-get install -y firefox-esr p11-kit-modules
rm -f $HOME/Desktop/firefox.desktop
cp \
/usr/share/applications/firefox-esr.desktop \
$HOME/Desktop/
chmod +x $HOME/Desktop/firefox-esr.desktop
fi
else
apt-mark unhold firefox || :
apt-get remove firefox
apt-get update
apt-get install -y firefox p11-kit-modules
fi
# Add Langpacks
FIREFOX_VERSION=$(curl -sI https://download.mozilla.org/?product=firefox-latest | awk -F '(releases/|/win32)' '/Location/ {print $2}')
RELEASE_URL="https://releases.mozilla.org/pub/firefox/releases/${FIREFOX_VERSION}/win64/xpi/"
LANGS=$(curl -Ls ${RELEASE_URL} | awk -F '(xpi">|</a>)' '/href.*xpi/ {print $2}' | tr '\n' ' ')
EXTENSION_DIR=/usr/lib/firefox-addons/distribution/extensions/
mkdir -p ${EXTENSION_DIR}
for LANG in ${LANGS}; do
LANGCODE=$(echo ${LANG} | sed 's/\.xpi//g')
echo "Downloading ${LANG} Language pack"
curl -o \
${EXTENSION_DIR}langpack-${LANGCODE}@firefox.mozilla.org.xpi -Ls \
${RELEASE_URL}${LANG}
done
# Cleanup and install flash if supported
if [[ "${DISTRO}" == @(oracle8|rockylinux9|rockylinux8|oracle9|rhel9|almalinux9|almalinux8|fedora39|fedora40) ]]; then
if [ -z ${SKIP_CLEAN+x} ]; then
dnf clean all
fi
elif [ "${DISTRO}" == "opensuse" ]; then
if [ -z ${SKIP_CLEAN+x} ]; then
zypper clean --all
fi
else
if [ "$ARCH" == "arm64" ] && [ "$(lsb_release -cs)" == "focal" ] ; then
echo "Firefox flash player not supported on arm64 Ubuntu Focal Skipping"
elif grep -q "ID=debian" /etc/os-release || grep -q "ID=kali" /etc/os-release || grep -q "ID=parrot" /etc/os-release; then
echo "Firefox flash player not supported on Debian"
elif grep -q Focal /etc/os-release; then
# Plugin to support running flash videos for sites like vimeo
apt-get update
apt-get install -y browser-plugin-freshplayer-pepperflash
apt-mark hold firefox
if [ -z ${SKIP_CLEAN+x} ]; then
apt-get autoclean
rm -rf \
/var/lib/apt/lists/* \
/var/tmp/*
fi
fi
fi
if [[ "${DISTRO}" != @(oracle8|rockylinux9|rockylinux8|oracle9|rhel9|almalinux9|almalinux8|opensuse|fedora39|fedora40) ]]; then
# Update firefox to utilize the system certificate store instead of the one that ships with firefox
if grep -q "ID=debian" /etc/os-release || grep -q "ID=kali" /etc/os-release || grep -q "ID=parrot" /etc/os-release && [ "${ARCH}" == "arm64" ]; then
rm -f /usr/lib/firefox-esr/libnssckbi.so
ln /usr/lib/$(arch)-linux-gnu/pkcs11/p11-kit-trust.so /usr/lib/firefox-esr/libnssckbi.so
elif grep -q "ID=kali" /etc/os-release && [ "${ARCH}" == "amd64" ]; then
rm -f /usr/lib/firefox-esr/libnssckbi.so
ln /usr/lib/$(arch)-linux-gnu/pkcs11/p11-kit-trust.so /usr/lib/firefox-esr/libnssckbi.so
else
rm -f /usr/lib/firefox/libnssckbi.so
ln /usr/lib/$(arch)-linux-gnu/pkcs11/p11-kit-trust.so /usr/lib/firefox/libnssckbi.so
fi
fi
if [[ "${DISTRO}" == @(oracle8|rockylinux9|rockylinux8|oracle9|rhel9|almalinux9|almalinux8|fedora39|fedora40) ]]; then
if [[ "${DISTRO}" == @(fedora39|fedora40) ]]; then
preferences_file=/usr/lib64/firefox/browser/defaults/preferences/firefox-redhat-default-prefs.js
else
preferences_file=/usr/lib64/firefox/browser/defaults/preferences/all-redhat.js
fi
sed -i -e '/homepage/d' "$preferences_file"
elif [ "${DISTRO}" == "opensuse" ]; then
preferences_file=/usr/lib64/firefox/browser/defaults/preferences/firefox.js
elif grep -q "ID=kali" /etc/os-release; then
preferences_file=/usr/lib/firefox-esr/defaults/pref/firefox.js
elif grep -q "ID=debian" /etc/os-release || grep -q "ID=parrot" /etc/os-release; then
if [ "${ARCH}" == "amd64" ]; then
preferences_file=/usr/lib/firefox/defaults/pref/firefox.js
else
preferences_file=/usr/lib/firefox-esr/defaults/pref/firefox.js
fi
else
preferences_file=/usr/lib/firefox/browser/defaults/preferences/firefox.js
fi
# Disabling default first run URL for Debian based images
if [[ "${DISTRO}" != @(oracle8|rockylinux9|rockylinux8|oracle9|rhel9|almalinux9|almalinux8|opensuse|fedora39|fedora40) ]]; then
cat >"$preferences_file" <<EOF
pref("datareporting.policy.firstRunURL", "");
pref("datareporting.policy.dataSubmissionEnabled", false);
pref("datareporting.healthreport.service.enabled", false);
pref("datareporting.healthreport.uploadEnabled", false);
pref("trailhead.firstrun.branches", "nofirstrun-empty");
pref("browser.aboutwelcome.enabled", false);
EOF
fi
if [[ "${DISTRO}" == @(oracle8|rockylinux9|rockylinux8|oracle9|rhel9|almalinux9|almalinux8|opensuse|fedora39|fedora40) ]]; then
# Creating a default profile
chown -R root:root $HOME
firefox -headless -CreateProfile "kasm $HOME/.mozilla/firefox/kasm"
# Generate a certdb to be detected on squid start
HOME=/root firefox --headless &
mkdir -p /root/.mozilla
CERTDB=$(find /root/.mozilla* -name "cert9.db")
while [ -z "${CERTDB}" ] ; do
sleep 1
echo "waiting for certdb"
CERTDB=$(find /root/.mozilla* -name "cert9.db")
done
sleep 2
kill $(pgrep firefox)
CERTDIR=$(dirname ${CERTDB})
mv ${CERTDB} $HOME/.mozilla/firefox/kasm/
rm -Rf /root/.mozilla
else
# Creating Default Profile
chown -R 0:0 $HOME
firefox -headless -CreateProfile "kasm $HOME/.mozilla/firefox/kasm"
fi
# Silence Firefox security nag "Some of Firefox's features may offer less protection on your current operating system".
echo 'user_pref("security.sandbox.warn_unprivileged_namespaces", false);' > $HOME/.mozilla/firefox/kasm/user.js
chown 1000:1000 $HOME/.mozilla/firefox/kasm/user.js
if [[ "${DISTRO}" == @(oracle8|rockylinux9|rockylinux8|oracle9|rhel9|almalinux9|almalinux8|opensuse|fedora39|fedora40) ]]; then
set_desktop_icon
fi
# Starting with version 67, Firefox creates a unique profile mapping per installation which is hash generated
# based off the installation path. Because that path will be static for our deployments we can assume the hash
# and thus assign our profile to the default for the installation
if grep -q "ID=kali" /etc/os-release; then
cat >>$HOME/.mozilla/firefox/profiles.ini <<EOL
[Install3B6073811A6ABF12]
Default=kasm
Locked=1
EOL
elif grep -q "ID=debian" /etc/os-release || grep -q "ID=parrot" /etc/os-release; then
if [ "${ARCH}" != "amd64" ]; then
cat >>$HOME/.mozilla/firefox/profiles.ini <<EOL
[Install3B6073811A6ABF12]
Default=kasm
Locked=1
EOL
else
cat >>$HOME/.mozilla/firefox/profiles.ini <<EOL
[Install4F96D1932A9F858E]
Default=kasm
Locked=1
EOL
fi
elif [[ "${DISTRO}" != @(oracle8|rockylinux9|rockylinux8|oracle9|rhel9|almalinux9|almalinux8|opensuse|fedora39|fedora40) ]]; then
cat >>$HOME/.mozilla/firefox/profiles.ini <<EOL
[Install4F96D1932A9F858E]
Default=kasm
Locked=1
EOL
elif [[ "${DISTRO}" == @(oracle8|rockylinux9|rockylinux8|oracle9|rhel9|almalinux9|almalinux8|opensuse|fedora39|fedora40) ]]; then
cat >>$HOME/.mozilla/firefox/profiles.ini <<EOL
[Install11457493C5A56847]
Default=kasm
Locked=1
EOL
fi
# Desktop Icon FIxes
if [[ "${DISTRO}" == @(rockylinux9|oracle9|rhel9|almalinux9|fedora39|fedora40) ]]; then
sed -i 's#Icon=/usr/lib/firefox#Icon=/usr/lib64/firefox#g' $HOME/Desktop/firefox.desktop
fi
# Cleanup for app layer
chown -R 1000:0 $HOME
find /usr/share/ -name "icon-theme.cache" -exec rm -f {} \;
if [ -f $HOME/Desktop/firefox.desktop ]; then
chmod +x $HOME/Desktop/firefox.desktop
fi
chown -R 1000:1000 $HOME/.mozilla
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/proxy/handlers.py:
--------------------------------------------------------------------------------
```python
"""
Request handlers for the proxy endpoints.
"""
import asyncio
import json
import logging
import os
from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Union
from computer import Computer
from ..agent import ComputerAgent
logger = logging.getLogger(__name__)
class ResponsesHandler:
"""Handler for /responses endpoint that processes agent requests."""
def __init__(self):
self.computer = None
self.agent = None
# Simple in-memory caches
self._computer_cache: Dict[str, Any] = {}
self._agent_cache: Dict[str, Any] = {}
async def setup_computer_agent(
self,
model: str,
agent_kwargs: Optional[Dict[str, Any]] = None,
computer_kwargs: Optional[Dict[str, Any]] = None,
):
"""Set up (and cache) computer and agent instances.
Caching keys:
- Computer cache key: computer_kwargs
- Agent cache key: {"model": model, **agent_kwargs}
"""
agent_kwargs = agent_kwargs or {}
computer_kwargs = computer_kwargs or {}
def _stable_key(obj: Dict[str, Any]) -> str:
try:
return json.dumps(obj, sort_keys=True, separators=(",", ":"))
except Exception:
# Fallback: stringify non-serializable values
safe_obj = {}
for k, v in obj.items():
try:
json.dumps(v)
safe_obj[k] = v
except Exception:
safe_obj[k] = str(v)
return json.dumps(safe_obj, sort_keys=True, separators=(",", ":"))
# Determine if custom tools are supplied; if so, skip computer setup entirely
has_custom_tools = bool(agent_kwargs.get("tools"))
computer = None
if not has_custom_tools:
# ---------- Computer setup (with cache) ----------
comp_key = _stable_key(computer_kwargs)
computer = self._computer_cache.get(comp_key)
if computer is None:
# Default computer configuration
default_c_config = {
"os_type": "linux",
"provider_type": "cloud",
"name": os.getenv("CUA_CONTAINER_NAME"),
"api_key": os.getenv("CUA_API_KEY"),
}
default_c_config.update(computer_kwargs)
computer = Computer(**default_c_config)
await computer.__aenter__()
self._computer_cache[comp_key] = computer
logger.info(
f"Computer created and cached with key={comp_key} config={default_c_config}"
)
else:
logger.info(f"Reusing cached computer for key={comp_key}")
# Bind current computer reference (None if custom tools supplied)
self.computer = computer
# ---------- Agent setup (with cache) ----------
# Build agent cache key from {model} + agent_kwargs (excluding tools unless explicitly passed)
agent_kwargs_for_key = dict(agent_kwargs)
agent_key_payload = {"model": model, **agent_kwargs_for_key}
agent_key = _stable_key(agent_key_payload)
agent = self._agent_cache.get(agent_key)
if agent is None:
# Default agent configuration
default_a_config: Dict[str, Any] = {"model": model}
if not has_custom_tools:
default_a_config["tools"] = [computer]
# Apply user overrides, but keep tools unless user explicitly sets
if agent_kwargs:
if not has_custom_tools:
agent_kwargs.setdefault("tools", [computer])
default_a_config.update(agent_kwargs)
# JSON-derived kwargs may have loose types; ignore static arg typing here
agent = ComputerAgent(**default_a_config) # type: ignore[arg-type]
self._agent_cache[agent_key] = agent
logger.info(f"Agent created and cached with key={agent_key} model={model}")
else:
# Ensure cached agent uses the current computer tool (in case object differs)
# Only update if tools not explicitly provided in agent_kwargs
if not has_custom_tools:
try:
agent.tools = [computer]
except Exception:
pass
logger.info(f"Reusing cached agent for key={agent_key}")
# Bind current agent reference
self.agent = agent
async def process_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process a /responses request and return the result.
Args:
request_data: Dictionary containing model, input, and optional kwargs
Returns:
Dictionary with the agent's response
"""
try:
# Extract request parameters
model = request_data.get("model")
input_data = request_data.get("input")
agent_kwargs = request_data.get("agent_kwargs", {})
computer_kwargs = request_data.get("computer_kwargs", {})
env_overrides = request_data.get("env", {}) or {}
if not model:
raise ValueError("Model is required")
if not input_data:
raise ValueError("Input is required")
# Apply env overrides for the duration of this request
with self._env_overrides(env_overrides):
# Set up (and possibly reuse) computer and agent via caches
await self.setup_computer_agent(model, agent_kwargs, computer_kwargs)
# Defensive: ensure agent is initialized for type checkers
agent = self.agent
if agent is None:
raise RuntimeError("Agent failed to initialize")
# Convert input to messages format
messages = self._convert_input_to_messages(input_data)
# Run agent and get first result
async for result in agent.run(messages):
# Return the first result and break
return {"success": True, "result": result, "model": model}
# If no results were yielded
return {"success": False, "error": "No results from agent", "model": model}
except Exception as e:
logger.error(f"Error processing request: {e}")
return {
"success": False,
"error": str(e),
"model": request_data.get("model", "unknown"),
}
def _convert_input_to_messages(
self, input_data: Union[str, List[Dict[str, Any]]]
) -> List[Dict[str, Any]]:
"""Convert input data to messages format."""
if isinstance(input_data, str):
# Simple string input
return [{"role": "user", "content": input_data}]
elif isinstance(input_data, list):
# Already in messages format
messages = []
for msg in input_data:
# Convert content array format if needed
if isinstance(msg.get("content"), list):
content_parts = []
for part in msg["content"]:
if part.get("type") == "input_text":
content_parts.append({"type": "text", "text": part["text"]})
elif part.get("type") == "input_image":
content_parts.append(
{"type": "image_url", "image_url": {"url": part["image_url"]}}
)
else:
content_parts.append(part)
messages.append({"role": msg["role"], "content": content_parts})
else:
messages.append(msg)
return messages
else:
raise ValueError("Input must be string or list of messages")
async def cleanup(self):
"""Clean up resources."""
if self.computer:
try:
await self.computer.__aexit__(None, None, None)
except Exception as e:
logger.error(f"Error cleaning up computer: {e}")
finally:
self.computer = None
self.agent = None
@staticmethod
@contextmanager
def _env_overrides(env: Dict[str, str]):
"""Temporarily apply environment variable overrides for the current process.
Restores previous values after the context exits.
Args:
env: Mapping of env var names to override for this request.
"""
if not env:
# No-op context
yield
return
original: Dict[str, Optional[str]] = {}
try:
for k, v in env.items():
original[k] = os.environ.get(k)
os.environ[k] = str(v)
yield
finally:
for k, old in original.items():
if old is None:
# Was not set before
os.environ.pop(k, None)
else:
os.environ[k] = old
```
--------------------------------------------------------------------------------
/libs/python/computer-server/test_connection.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""
Connection test script for Computer Server.
This script tests both WebSocket (/ws) and REST (/cmd) connections to the Computer Server
and keeps it alive, allowing you to verify the server is running correctly.
"""
import argparse
import asyncio
import json
import os
import sys
import aiohttp
import dotenv
import websockets
dotenv.load_dotenv()
async def test_websocket_connection(
host="localhost", port=8000, keep_alive=False, container_name=None, api_key=None
):
"""Test WebSocket connection to the Computer Server."""
if container_name:
# Container mode: use WSS with container domain and port 8443
uri = f"wss://{container_name}.containers.cloud.trycua.com:8443/ws"
print(f"Connecting to container {container_name} at {uri}...")
else:
# Local mode: use WS with specified host and port
uri = f"ws://{host}:{port}/ws"
print(f"Connecting to local server at {uri}...")
try:
async with websockets.connect(uri) as websocket:
print("WebSocket connection established!")
# If container connection, send authentication first
if container_name:
if not api_key:
print("Error: API key required for container connections")
return False
print("Sending authentication...")
auth_message = {
"command": "authenticate",
"params": {"api_key": api_key, "container_name": container_name},
}
await websocket.send(json.dumps(auth_message))
auth_response = await websocket.recv()
print(f"Authentication response: {auth_response}")
# Check if authentication was successful
auth_data = json.loads(auth_response)
if not auth_data.get("success", False):
print("Authentication failed!")
return False
print("Authentication successful!")
# Send a test command to get version
await websocket.send(json.dumps({"command": "version", "params": {}}))
response = await websocket.recv()
print(f"Version response: {response}")
# Send a test command to get screen size
await websocket.send(json.dumps({"command": "get_screen_size", "params": {}}))
response = await websocket.recv()
print(f"Screen size response: {response}")
if keep_alive:
print("\nKeeping WebSocket connection alive. Press Ctrl+C to exit...")
while True:
# Send a command every 5 seconds to keep the connection alive
await asyncio.sleep(5)
await websocket.send(
json.dumps({"command": "get_cursor_position", "params": {}})
)
response = await websocket.recv()
print(f"Cursor position: {response}")
except websockets.exceptions.ConnectionClosed as e:
print(f"WebSocket connection closed: {e}")
return False
except ConnectionRefusedError:
print(f"Connection refused. Is the server running at {host}:{port}?")
return False
except Exception as e:
print(f"WebSocket error: {e}")
return False
return True
async def test_rest_connection(
host="localhost", port=8000, keep_alive=False, container_name=None, api_key=None
):
"""Test REST connection to the Computer Server."""
if container_name:
# Container mode: use HTTPS with container domain and port 8443
base_url = f"https://{container_name}.containers.cloud.trycua.com:8443"
print(f"Connecting to container {container_name} at {base_url}...")
else:
# Local mode: use HTTP with specified host and port
base_url = f"http://{host}:{port}"
print(f"Connecting to local server at {base_url}...")
try:
async with aiohttp.ClientSession() as session:
print("REST connection established!")
# Prepare headers for container authentication
headers = {}
if container_name:
if not api_key:
print("Error: API key required for container connections")
return False
headers["X-Container-Name"] = container_name
headers["X-API-Key"] = api_key
print("Using container authentication headers")
# Test screenshot endpoint
async with session.post(
f"{base_url}/cmd", json={"command": "screenshot", "params": {}}, headers=headers
) as response:
if response.status == 200:
text = await response.text()
print(f"Screenshot response: {text}")
else:
print(f"Screenshot request failed with status: {response.status}")
print(await response.text())
return False
# Test screen size endpoint
async with session.post(
f"{base_url}/cmd",
json={"command": "get_screen_size", "params": {}},
headers=headers,
) as response:
if response.status == 200:
text = await response.text()
print(f"Screen size response: {text}")
else:
print(f"Screen size request failed with status: {response.status}")
print(await response.text())
return False
if keep_alive:
print("\nKeeping REST connection alive. Press Ctrl+C to exit...")
while True:
# Send a command every 5 seconds to keep testing
await asyncio.sleep(5)
async with session.post(
f"{base_url}/cmd",
json={"command": "get_cursor_position", "params": {}},
headers=headers,
) as response:
if response.status == 200:
text = await response.text()
print(f"Cursor position: {text}")
else:
print(f"Cursor position request failed with status: {response.status}")
print(await response.text())
return False
except aiohttp.ClientError as e:
print(f"REST connection error: {e}")
return False
except Exception as e:
print(f"REST error: {e}")
return False
return True
async def test_connection(
host="localhost", port=8000, keep_alive=False, container_name=None, use_rest=False, api_key=None
):
"""Test connection to the Computer Server using WebSocket or REST."""
if use_rest:
return await test_rest_connection(host, port, keep_alive, container_name, api_key)
else:
return await test_websocket_connection(host, port, keep_alive, container_name, api_key)
def parse_args():
parser = argparse.ArgumentParser(description="Test connection to Computer Server")
parser.add_argument("--host", default="localhost", help="Host address (default: localhost)")
parser.add_argument("-p", "--port", type=int, default=8000, help="Port number (default: 8000)")
parser.add_argument(
"-c",
"--container-name",
help="Container name for cloud connection (uses WSS/HTTPS and port 8443)",
)
parser.add_argument(
"--api-key", help="API key for container authentication (can also use CUA_API_KEY env var)"
)
parser.add_argument("--keep-alive", action="store_true", help="Keep connection alive")
parser.add_argument(
"--rest", action="store_true", help="Use REST endpoint (/cmd) instead of WebSocket (/ws)"
)
return parser.parse_args()
async def main():
args = parse_args()
# Convert hyphenated argument to underscore for function parameter
container_name = getattr(args, "container_name", None)
# Get API key from argument or environment variable
api_key = getattr(args, "api_key", None) or os.environ.get("CUA_API_KEY")
# Check if container name is provided but API key is missing
if container_name and not api_key:
print("Warning: Container name provided but no API key found.")
print("Please provide --api-key argument or set CUA_API_KEY environment variable.")
return 1
print(f"Testing {'REST' if args.rest else 'WebSocket'} connection...")
if container_name:
print(f"Container: {container_name}")
print(
f"API Key: {'***' + api_key[-4:] if api_key and len(api_key) > 4 else 'Not provided'}"
)
success = await test_connection(
host=args.host,
port=args.port,
keep_alive=args.keep_alive,
container_name=container_name,
use_rest=args.rest,
api_key=api_key,
)
return 0 if success else 1
if __name__ == "__main__":
try:
sys.exit(asyncio.run(main()))
except KeyboardInterrupt:
print("\nExiting...")
sys.exit(0)
```
--------------------------------------------------------------------------------
/libs/python/core/tests/test_telemetry.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for core telemetry functionality.
This file tests ONLY telemetry logic, following SRP.
All external dependencies (PostHog, file system) are mocked.
"""
import os
from pathlib import Path
from unittest.mock import MagicMock, Mock, mock_open, patch
import pytest
class TestTelemetryEnabled:
"""Test telemetry enable/disable logic (SRP: Only tests enable/disable)."""
def test_telemetry_enabled_by_default(self, monkeypatch):
"""Test that telemetry is enabled by default."""
# Remove any environment variables that might affect the test
monkeypatch.delenv("CUA_TELEMETRY", raising=False)
monkeypatch.delenv("CUA_TELEMETRY_ENABLED", raising=False)
from core.telemetry import is_telemetry_enabled
assert is_telemetry_enabled() is True
def test_telemetry_disabled_with_flag(self, monkeypatch):
"""Test that telemetry can be disabled with CUA_TELEMETRY_ENABLED=false."""
monkeypatch.setenv("CUA_TELEMETRY_ENABLED", "false")
from core.telemetry import is_telemetry_enabled
assert is_telemetry_enabled() is False
@pytest.mark.parametrize("value", ["0", "false", "no", "off"])
def test_telemetry_disabled_with_various_values(self, monkeypatch, value):
"""Test that telemetry respects various disable values."""
monkeypatch.setenv("CUA_TELEMETRY_ENABLED", value)
from core.telemetry import is_telemetry_enabled
assert is_telemetry_enabled() is False
@pytest.mark.parametrize("value", ["1", "true", "yes", "on"])
def test_telemetry_enabled_with_various_values(self, monkeypatch, value):
"""Test that telemetry respects various enable values."""
monkeypatch.setenv("CUA_TELEMETRY_ENABLED", value)
from core.telemetry import is_telemetry_enabled
assert is_telemetry_enabled() is True
class TestPostHogTelemetryClient:
"""Test PostHogTelemetryClient class (SRP: Only tests client logic)."""
@patch("core.telemetry.posthog.posthog")
@patch("core.telemetry.posthog.Path")
def test_client_initialization(self, mock_path, mock_posthog, disable_telemetry):
"""Test that client initializes correctly."""
from core.telemetry.posthog import PostHogTelemetryClient
# Mock the storage directory
mock_storage_dir = MagicMock()
mock_storage_dir.exists.return_value = False
mock_path.return_value.parent.parent = MagicMock()
mock_path.return_value.parent.parent.__truediv__.return_value = mock_storage_dir
# Reset singleton
PostHogTelemetryClient.destroy_client()
client = PostHogTelemetryClient()
assert client is not None
assert hasattr(client, "installation_id")
assert hasattr(client, "initialized")
assert hasattr(client, "queued_events")
@patch("core.telemetry.posthog.posthog")
@patch("core.telemetry.posthog.Path")
def test_installation_id_generation(self, mock_path, mock_posthog, disable_telemetry):
"""Test that installation ID is generated if not exists."""
from core.telemetry.posthog import PostHogTelemetryClient
# Mock file system
mock_id_file = MagicMock()
mock_id_file.exists.return_value = False
mock_storage_dir = MagicMock()
mock_storage_dir.__truediv__.return_value = mock_id_file
mock_core_dir = MagicMock()
mock_core_dir.__truediv__.return_value = mock_storage_dir
mock_path.return_value.parent.parent = mock_core_dir
# Reset singleton
PostHogTelemetryClient.destroy_client()
client = PostHogTelemetryClient()
# Should have generated a new UUID
assert client.installation_id is not None
assert len(client.installation_id) == 36 # UUID format
@patch("core.telemetry.posthog.posthog")
@patch("core.telemetry.posthog.Path")
def test_installation_id_persistence(self, mock_path, mock_posthog, disable_telemetry):
"""Test that installation ID is read from file if exists."""
from core.telemetry.posthog import PostHogTelemetryClient
existing_id = "test-installation-id-123"
# Mock file system
mock_id_file = MagicMock()
mock_id_file.exists.return_value = True
mock_id_file.read_text.return_value = existing_id
mock_storage_dir = MagicMock()
mock_storage_dir.__truediv__.return_value = mock_id_file
mock_core_dir = MagicMock()
mock_core_dir.__truediv__.return_value = mock_storage_dir
mock_path.return_value.parent.parent = mock_core_dir
# Reset singleton
PostHogTelemetryClient.destroy_client()
client = PostHogTelemetryClient()
assert client.installation_id == existing_id
@patch("core.telemetry.posthog.posthog")
@patch("core.telemetry.posthog.Path")
def test_record_event_when_disabled(self, mock_path, mock_posthog, monkeypatch):
"""Test that events are not recorded when telemetry is disabled."""
from core.telemetry.posthog import PostHogTelemetryClient
# Disable telemetry explicitly using the correct environment variable
monkeypatch.setenv("CUA_TELEMETRY_ENABLED", "false")
# Mock file system
mock_storage_dir = MagicMock()
mock_storage_dir.exists.return_value = False
mock_path.return_value.parent.parent = MagicMock()
mock_path.return_value.parent.parent.__truediv__.return_value = mock_storage_dir
# Reset singleton
PostHogTelemetryClient.destroy_client()
client = PostHogTelemetryClient()
client.record_event("test_event", {"key": "value"})
# PostHog capture should not be called at all when telemetry is disabled
mock_posthog.capture.assert_not_called()
@patch("core.telemetry.posthog.posthog")
@patch("core.telemetry.posthog.Path")
def test_record_event_when_enabled(self, mock_path, mock_posthog, monkeypatch):
"""Test that events are recorded when telemetry is enabled."""
from core.telemetry.posthog import PostHogTelemetryClient
# Enable telemetry
monkeypatch.setenv("CUA_TELEMETRY_ENABLED", "true")
# Mock file system
mock_storage_dir = MagicMock()
mock_storage_dir.exists.return_value = False
mock_path.return_value.parent.parent = MagicMock()
mock_path.return_value.parent.parent.__truediv__.return_value = mock_storage_dir
# Reset singleton
PostHogTelemetryClient.destroy_client()
client = PostHogTelemetryClient()
client.initialized = True # Pretend it's initialized
event_name = "test_event"
event_props = {"key": "value"}
client.record_event(event_name, event_props)
# PostHog capture should be called
assert mock_posthog.capture.call_count >= 1
@patch("core.telemetry.posthog.posthog")
@patch("core.telemetry.posthog.Path")
def test_singleton_pattern(self, mock_path, mock_posthog, disable_telemetry):
"""Test that get_client returns the same instance."""
from core.telemetry.posthog import PostHogTelemetryClient
# Mock file system
mock_storage_dir = MagicMock()
mock_storage_dir.exists.return_value = False
mock_path.return_value.parent.parent = MagicMock()
mock_path.return_value.parent.parent.__truediv__.return_value = mock_storage_dir
# Reset singleton
PostHogTelemetryClient.destroy_client()
client1 = PostHogTelemetryClient.get_client()
client2 = PostHogTelemetryClient.get_client()
assert client1 is client2
class TestRecordEvent:
"""Test the public record_event function (SRP: Only tests public API)."""
@patch("core.telemetry.posthog.PostHogTelemetryClient")
def test_record_event_calls_client(self, mock_client_class, disable_telemetry):
"""Test that record_event delegates to the client."""
from core.telemetry import record_event
mock_client_instance = Mock()
mock_client_class.get_client.return_value = mock_client_instance
event_name = "test_event"
event_props = {"key": "value"}
record_event(event_name, event_props)
mock_client_instance.record_event.assert_called_once_with(event_name, event_props)
@patch("core.telemetry.posthog.PostHogTelemetryClient")
def test_record_event_without_properties(self, mock_client_class, disable_telemetry):
"""Test that record_event works without properties."""
from core.telemetry import record_event
mock_client_instance = Mock()
mock_client_class.get_client.return_value = mock_client_instance
event_name = "test_event"
record_event(event_name)
mock_client_instance.record_event.assert_called_once_with(event_name, {})
class TestDestroyTelemetryClient:
"""Test client destruction (SRP: Only tests cleanup)."""
@patch("core.telemetry.posthog.PostHogTelemetryClient")
def test_destroy_client_calls_class_method(self, mock_client_class):
"""Test that destroy_telemetry_client delegates correctly."""
from core.telemetry import destroy_telemetry_client
destroy_telemetry_client()
mock_client_class.destroy_client.assert_called_once()
```
--------------------------------------------------------------------------------
/tests/test_mcp_server_streaming.py:
--------------------------------------------------------------------------------
```python
import asyncio
import importlib.util
import sys
import types
from pathlib import Path
import pytest
def _install_stub_module(
name: str, module: types.ModuleType, registry: dict[str, types.ModuleType | None]
) -> None:
registry[name] = sys.modules.get(name)
sys.modules[name] = module
@pytest.fixture
def server_module():
stubbed_modules: dict[str, types.ModuleType | None] = {}
# Stub MCP Context primitives
mcp_module = types.ModuleType("mcp")
mcp_module.__path__ = [] # mark as package
mcp_server_module = types.ModuleType("mcp.server")
mcp_server_module.__path__ = []
fastmcp_module = types.ModuleType("mcp.server.fastmcp")
class _StubContext:
async def yield_message(self, *args, **kwargs):
return None
async def yield_tool_call(self, *args, **kwargs):
return None
async def yield_tool_output(self, *args, **kwargs):
return None
def report_progress(self, *_args, **_kwargs):
return None
def info(self, *_args, **_kwargs):
return None
def error(self, *_args, **_kwargs):
return None
class _StubImage:
def __init__(self, format: str, data: bytes):
self.format = format
self.data = data
class _StubFastMCP:
def __init__(self, name: str):
self.name = name
self._tools: dict[str, types.FunctionType] = {}
def tool(self, *args, **kwargs):
def decorator(func):
self._tools[func.__name__] = func
return func
return decorator
def run(self):
return None
fastmcp_module.Context = _StubContext
fastmcp_module.FastMCP = _StubFastMCP
fastmcp_module.Image = _StubImage
_install_stub_module("mcp", mcp_module, stubbed_modules)
_install_stub_module("mcp.server", mcp_server_module, stubbed_modules)
_install_stub_module("mcp.server.fastmcp", fastmcp_module, stubbed_modules)
# Stub Computer module to avoid heavy dependencies
computer_module = types.ModuleType("computer")
class _StubInterface:
async def screenshot(self) -> bytes: # pragma: no cover - default stub
return b""
class _StubComputer:
def __init__(self, *args, **kwargs):
self.interface = _StubInterface()
async def run(self): # pragma: no cover - default stub
return None
class _StubVMProviderType:
CLOUD = "cloud"
LOCAL = "local"
computer_module.Computer = _StubComputer
computer_module.VMProviderType = _StubVMProviderType
_install_stub_module("computer", computer_module, stubbed_modules)
# Stub agent module so server can import ComputerAgent
agent_module = types.ModuleType("agent")
class _StubComputerAgent:
def __init__(self, *args, **kwargs):
pass
async def run(self, *_args, **_kwargs): # pragma: no cover - default stub
if False: # pragma: no cover
yield {}
return
agent_module.ComputerAgent = _StubComputerAgent
_install_stub_module("agent", agent_module, stubbed_modules)
module_name = "mcp_server_server_under_test"
module_path = Path("libs/python/mcp-server/mcp_server/server.py").resolve()
spec = importlib.util.spec_from_file_location(module_name, module_path)
server_module = importlib.util.module_from_spec(spec)
assert spec and spec.loader
spec.loader.exec_module(server_module)
server_instance = getattr(server_module, "server", None)
if server_instance is not None and hasattr(server_instance, "_tools"):
for name, func in server_instance._tools.items():
setattr(server_module, name, func)
try:
yield server_module
finally:
sys.modules.pop(module_name, None)
for name, original in stubbed_modules.items():
if original is None:
sys.modules.pop(name, None)
else:
sys.modules[name] = original
class FakeContext:
def __init__(self) -> None:
self.events: list[tuple] = []
self.progress_updates: list[float] = []
def info(self, message: str) -> None:
self.events.append(("info", message))
def error(self, message: str) -> None:
self.events.append(("error", message))
def report_progress(self, value: float) -> None:
self.progress_updates.append(value)
async def yield_message(self, *, role: str, content):
timestamp = asyncio.get_running_loop().time()
self.events.append(("message", role, content, timestamp))
async def yield_tool_call(self, *, name: str | None, call_id: str, input):
timestamp = asyncio.get_running_loop().time()
self.events.append(("tool_call", name, call_id, input, timestamp))
async def yield_tool_output(self, *, call_id: str, output, is_error: bool = False):
timestamp = asyncio.get_running_loop().time()
self.events.append(("tool_output", call_id, output, is_error, timestamp))
def test_run_cua_task_streams_partial_results(server_module):
async def _run_test():
class FakeAgent:
script = []
def __init__(self, *args, **kwargs):
pass
async def run(self, messages): # type: ignore[override]
for factory, delay in type(self).script:
yield factory(messages)
if delay:
await asyncio.sleep(delay)
FakeAgent.script = [
(
lambda _messages: {
"output": [
{
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": "First chunk"}],
}
]
},
0.0,
),
(
lambda _messages: {
"output": [
{
"type": "tool_use",
"id": "call_1",
"name": "computer",
"input": {"action": "click"},
},
{
"type": "computer_call_output",
"call_id": "call_1",
"output": [{"type": "text", "text": "Tool completed"}],
},
]
},
0.05,
),
]
class FakeInterface:
def __init__(self) -> None:
self.calls = 0
async def screenshot(self) -> bytes:
self.calls += 1
return b"final-image"
fake_interface = FakeInterface()
server_module.global_computer = types.SimpleNamespace(interface=fake_interface)
server_module.ComputerAgent = FakeAgent # type: ignore[assignment]
ctx = FakeContext()
task = asyncio.create_task(server_module.run_cua_task(ctx, "open settings"))
await asyncio.sleep(0.01)
assert not task.done(), "Task should still be running to simulate long operation"
message_events = [event for event in ctx.events if event[0] == "message"]
assert message_events, "Expected message event before task completion"
text_result, image = await task
assert "First chunk" in text_result
assert "Tool completed" in text_result
assert image.data == b"final-image"
assert fake_interface.calls == 1
tool_call_events = [event for event in ctx.events if event[0] == "tool_call"]
tool_output_events = [event for event in ctx.events if event[0] == "tool_output"]
assert tool_call_events and tool_output_events
assert tool_call_events[0][2] == "call_1"
assert tool_output_events[0][1] == "call_1"
asyncio.run(_run_test())
def test_run_multi_cua_tasks_reports_progress(server_module, monkeypatch):
async def _run_test():
class FakeAgent:
script = []
def __init__(self, *args, **kwargs):
pass
async def run(self, messages): # type: ignore[override]
for factory, delay in type(self).script:
yield factory(messages)
if delay:
await asyncio.sleep(delay)
FakeAgent.script = [
(
lambda messages: {
"output": [
{
"type": "message",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": f"Result for {messages[0].get('content')}",
}
],
}
]
},
0.0,
)
]
server_module.ComputerAgent = FakeAgent # type: ignore[assignment]
class FakeInterface:
async def screenshot(self) -> bytes:
return b"progress-image"
server_module.global_computer = types.SimpleNamespace(interface=FakeInterface())
ctx = FakeContext()
results = await server_module.run_multi_cua_tasks(ctx, ["a", "b", "c"])
assert len(results) == 3
assert results[0][0] == "Result for a"
assert ctx.progress_updates[0] == pytest.approx(0.0)
assert ctx.progress_updates[-1] == pytest.approx(1.0)
assert len(ctx.progress_updates) == 6
asyncio.run(_run_test())
```
--------------------------------------------------------------------------------
/.github/workflows/publish-lume.yml:
--------------------------------------------------------------------------------
```yaml
name: Publish Notarized Lume
on:
push:
tags:
- "lume-v*"
workflow_dispatch:
inputs:
version:
description: "Version to notarize (without v prefix)"
required: true
default: "0.1.0"
workflow_call:
inputs:
version:
description: "Version to notarize"
required: true
type: string
secrets:
APPLICATION_CERT_BASE64:
required: true
INSTALLER_CERT_BASE64:
required: true
CERT_PASSWORD:
required: true
APPLE_ID:
required: true
TEAM_ID:
required: true
APP_SPECIFIC_PASSWORD:
required: true
DEVELOPER_NAME:
required: true
permissions:
contents: write
env:
APPLICATION_CERT_BASE64: ${{ secrets.APPLICATION_CERT_BASE64 }}
INSTALLER_CERT_BASE64: ${{ secrets.INSTALLER_CERT_BASE64 }}
CERT_PASSWORD: ${{ secrets.CERT_PASSWORD }}
APPLE_ID: ${{ secrets.APPLE_ID }}
TEAM_ID: ${{ secrets.TEAM_ID }}
APP_SPECIFIC_PASSWORD: ${{ secrets.APP_SPECIFIC_PASSWORD }}
DEVELOPER_NAME: ${{ secrets.DEVELOPER_NAME }}
jobs:
notarize:
runs-on: macos-15
outputs:
sha256_checksums: ${{ steps.generate_checksums.outputs.checksums }}
version: ${{ steps.set_version.outputs.version }}
steps:
- uses: actions/checkout@v4
- name: Select Xcode 16
run: |
sudo xcode-select -s /Applications/Xcode_16.app
xcodebuild -version
- name: Install dependencies
run: |
brew install cpio
- name: Create .release directory
run: mkdir -p .release
- name: Set version
id: set_version
run: |
# Determine version from tag or input
if [[ "$GITHUB_REF" == refs/tags/lume-v* ]]; then
VERSION="${GITHUB_REF#refs/tags/lume-v}"
echo "Using version from tag: $VERSION"
elif [[ -n "${{ inputs.version }}" ]]; then
VERSION="${{ inputs.version }}"
echo "Using version from input: $VERSION"
elif [[ -n "${{ inputs.version }}" ]]; then
VERSION="${{ inputs.version }}"
echo "Using version from workflow_call input: $VERSION"
else
echo "Error: No version found in tag or input"
exit 1
fi
# Update version in Main.swift
echo "Updating version in Main.swift to $VERSION"
sed -i '' "s/static let current: String = \".*\"/static let current: String = \"$VERSION\"/" libs/lume/src/Main.swift
# Set output for later steps
echo "version=$VERSION" >> $GITHUB_OUTPUT
- name: Import Certificates
env:
APPLICATION_CERT_BASE64: ${{ secrets.APPLICATION_CERT_BASE64 }}
INSTALLER_CERT_BASE64: ${{ secrets.INSTALLER_CERT_BASE64 }}
CERT_PASSWORD: ${{ secrets.CERT_PASSWORD }}
KEYCHAIN_PASSWORD: "temp_password"
run: |
# Create a temporary keychain
security create-keychain -p "$KEYCHAIN_PASSWORD" build.keychain
security default-keychain -s build.keychain
security unlock-keychain -p "$KEYCHAIN_PASSWORD" build.keychain
security set-keychain-settings -t 3600 -l build.keychain
# Import certificates
echo $APPLICATION_CERT_BASE64 | base64 --decode > application.p12
echo $INSTALLER_CERT_BASE64 | base64 --decode > installer.p12
# Import certificates silently (minimize output)
security import application.p12 -k build.keychain -P "$CERT_PASSWORD" -T /usr/bin/codesign -T /usr/bin/pkgbuild > /dev/null 2>&1
security import installer.p12 -k build.keychain -P "$CERT_PASSWORD" -T /usr/bin/codesign -T /usr/bin/pkgbuild > /dev/null 2>&1
# Allow codesign to access the certificates (minimal output)
security set-key-partition-list -S apple-tool:,apple:,codesign: -s -k "$KEYCHAIN_PASSWORD" build.keychain > /dev/null 2>&1
# Verify certificates were imported
echo "Verifying signing identities..."
CERT_COUNT=$(security find-identity -v -p codesigning build.keychain | grep -c "Developer ID Application" || echo "0")
INSTALLER_COUNT=$(security find-identity -v build.keychain | grep -c "Developer ID Installer" || echo "0")
if [ "$CERT_COUNT" -eq 0 ]; then
echo "Error: No Developer ID Application certificate found"
security find-identity -v -p codesigning build.keychain
exit 1
fi
if [ "$INSTALLER_COUNT" -eq 0 ]; then
echo "Error: No Developer ID Installer certificate found"
security find-identity -v build.keychain
exit 1
fi
echo "Found $CERT_COUNT Developer ID Application certificate(s) and $INSTALLER_COUNT Developer ID Installer certificate(s)"
echo "All required certificates verified successfully"
# Clean up certificate files
rm application.p12 installer.p12
- name: Build and Notarize
id: build_notarize
env:
APPLE_ID: ${{ secrets.APPLE_ID }}
TEAM_ID: ${{ secrets.TEAM_ID }}
APP_SPECIFIC_PASSWORD: ${{ secrets.APP_SPECIFIC_PASSWORD }}
# These will now reference the imported certificates
CERT_APPLICATION_NAME: "Developer ID Application: ${{ secrets.DEVELOPER_NAME }} (${{ secrets.TEAM_ID }})"
CERT_INSTALLER_NAME: "Developer ID Installer: ${{ secrets.DEVELOPER_NAME }} (${{ secrets.TEAM_ID }})"
VERSION: ${{ steps.set_version.outputs.version }}
working-directory: ./libs/lume
run: |
# Minimal debug information
echo "Starting build process..."
echo "Swift version: $(swift --version | head -n 1)"
echo "Building version: $VERSION"
# Ensure .release directory exists
mkdir -p .release
chmod 755 .release
# Build the project first (redirect verbose output)
echo "Building project..."
swift build --configuration release > build.log 2>&1
echo "Build completed."
# Run the notarization script with LOG_LEVEL env var
chmod +x scripts/build/build-release-notarized.sh
cd scripts/build
LOG_LEVEL=minimal ./build-release-notarized.sh
# Return to the lume directory
cd ../..
# Debug: List what files were actually created
echo "Files in .release directory:"
find .release -type f -name "*.tar.gz" -o -name "*.pkg.tar.gz"
# Get architecture for output filename
ARCH=$(uname -m)
OS_IDENTIFIER="darwin-${ARCH}"
# Output paths for later use
echo "tarball_path=.release/lume-${VERSION}-${OS_IDENTIFIER}.tar.gz" >> $GITHUB_OUTPUT
echo "pkg_path=.release/lume-${VERSION}-${OS_IDENTIFIER}.pkg.tar.gz" >> $GITHUB_OUTPUT
- name: Generate SHA256 Checksums
id: generate_checksums
working-directory: ./libs/lume/.release
run: |
# Use existing checksums file if it exists, otherwise generate one
if [ -f "checksums.txt" ]; then
echo "Using existing checksums file"
cat checksums.txt
else
echo "## SHA256 Checksums" > checksums.txt
echo '```' >> checksums.txt
shasum -a 256 lume-*.tar.gz >> checksums.txt
echo '```' >> checksums.txt
fi
checksums=$(cat checksums.txt)
echo "checksums<<EOF" >> $GITHUB_OUTPUT
echo "$checksums" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
# Debug: Show all files in the release directory
echo "All files in release directory:"
ls -la
- name: Create Standard Version Releases
working-directory: ./libs/lume/.release
run: |
VERSION=${{ steps.set_version.outputs.version }}
ARCH=$(uname -m)
OS_IDENTIFIER="darwin-${ARCH}"
# Create OS-tagged symlinks
ln -sf "lume-${VERSION}-${OS_IDENTIFIER}.tar.gz" "lume-darwin.tar.gz"
ln -sf "lume-${VERSION}-${OS_IDENTIFIER}.pkg.tar.gz" "lume-darwin.pkg.tar.gz"
# Create simple symlinks
ln -sf "lume-${VERSION}-${OS_IDENTIFIER}.tar.gz" "lume.tar.gz"
ln -sf "lume-${VERSION}-${OS_IDENTIFIER}.pkg.tar.gz" "lume.pkg.tar.gz"
# List all files (including symlinks)
echo "Files with symlinks in release directory:"
ls -la
- name: Upload Notarized Package (Tarball)
uses: actions/upload-artifact@v4
with:
name: lume-notarized-tarball
path: ./libs/lume/${{ steps.build_notarize.outputs.tarball_path }}
if-no-files-found: error
- name: Upload Notarized Package (Installer)
uses: actions/upload-artifact@v4
with:
name: lume-notarized-installer
path: ./libs/lume/${{ steps.build_notarize.outputs.pkg_path }}
if-no-files-found: error
- name: Create Release
if: startsWith(github.ref, 'refs/tags/lume-v')
uses: softprops/action-gh-release@v1
with:
files: |
./libs/lume/${{ steps.build_notarize.outputs.tarball_path }}
./libs/lume/${{ steps.build_notarize.outputs.pkg_path }}
./libs/lume/.release/lume-darwin.tar.gz
./libs/lume/.release/lume-darwin.pkg.tar.gz
./libs/lume/.release/lume.tar.gz
./libs/lume/.release/lume.pkg.tar.gz
body: |
${{ steps.generate_checksums.outputs.checksums }}
### Installation with script
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/trycua/cua/main/libs/lume/scripts/install.sh)"
```
generate_release_notes: true
make_latest: true
```
--------------------------------------------------------------------------------
/scripts/playground-docker.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
set -e
# Colors for output
GREEN='\033[0;32m'
BLUE='\033[0;34m'
RED='\033[0;31m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Print with color
print_info() {
echo -e "${BLUE}==> $1${NC}"
}
print_success() {
echo -e "${GREEN}==> $1${NC}"
}
print_error() {
echo -e "${RED}==> $1${NC}"
}
print_warning() {
echo -e "${YELLOW}==> $1${NC}"
}
echo "🚀 Launching Cua Computer-Use Agent UI..."
# Check if Docker is installed
if ! command -v docker &> /dev/null; then
print_error "Docker is not installed!"
echo ""
echo "To use Cua with Docker containers, you need to install Docker first:"
echo ""
echo "📦 Install Docker:"
echo " • macOS: Download Docker Desktop from https://docker.com/products/docker-desktop"
echo " • Windows: Download Docker Desktop from https://docker.com/products/docker-desktop"
echo " • Linux: Follow instructions at https://docs.docker.com/engine/install/"
echo ""
echo "After installing Docker, run this script again."
exit 1
fi
# Check if Docker daemon is running
if ! docker info &> /dev/null; then
print_error "Docker is installed but not running!"
echo ""
echo "Please start Docker Desktop and try again."
exit 1
fi
print_success "Docker is installed and running!"
# Save the original working directory
ORIGINAL_DIR="$(pwd)"
DEMO_DIR="$HOME/.cua"
mkdir -p "$DEMO_DIR"
# Check if we're already in the Cua repository
# Look for the specific trycua identifier in pyproject.toml
if [[ -f "pyproject.toml" ]] && grep -q "[email protected]" "pyproject.toml"; then
print_success "Already in Cua repository - using current directory"
REPO_DIR="$ORIGINAL_DIR"
USE_EXISTING_REPO=true
else
# Directories used by the script when not in repo
REPO_DIR="$DEMO_DIR/cua"
USE_EXISTING_REPO=false
fi
# Function to clean up on exit
cleanup() {
cd "$ORIGINAL_DIR" 2>/dev/null || true
}
trap cleanup EXIT
echo ""
echo "Choose your Cua setup:"
echo "1) ☁️ Cua Cloud Sandbox (works on any system)"
echo "2) 🖥️ Local macOS VMs (requires Apple Silicon Mac + macOS 15+)"
echo "3) 🖥️ Local Windows VMs (requires Windows 10 / 11)"
echo ""
read -p "Enter your choice (1, 2, or 3): " CHOICE
if [[ "$CHOICE" == "1" ]]; then
# Cua Cloud Sandbox setup
echo ""
print_info "Setting up Cua Cloud Sandbox..."
echo ""
# Check if existing .env.local already has CUA_API_KEY
REPO_ENV_FILE="$REPO_DIR/.env.local"
CURRENT_ENV_FILE="$ORIGINAL_DIR/.env.local"
CUA_API_KEY=""
# First check current directory
if [[ -f "$CURRENT_ENV_FILE" ]] && grep -q "CUA_API_KEY=" "$CURRENT_ENV_FILE"; then
EXISTING_CUA_KEY=$(grep "CUA_API_KEY=" "$CURRENT_ENV_FILE" | cut -d'=' -f2- | tr -d '"' | tr -d "'" | xargs)
if [[ -n "$EXISTING_CUA_KEY" && "$EXISTING_CUA_KEY" != "your_cua_api_key_here" && "$EXISTING_CUA_KEY" != "" ]]; then
CUA_API_KEY="$EXISTING_CUA_KEY"
fi
fi
# Then check repo directory if not found in current dir
if [[ -z "$CUA_API_KEY" ]] && [[ -f "$REPO_ENV_FILE" ]] && grep -q "CUA_API_KEY=" "$REPO_ENV_FILE"; then
EXISTING_CUA_KEY=$(grep "CUA_API_KEY=" "$REPO_ENV_FILE" | cut -d'=' -f2- | tr -d '"' | tr -d "'" | xargs)
if [[ -n "$EXISTING_CUA_KEY" && "$EXISTING_CUA_KEY" != "your_cua_api_key_here" && "$EXISTING_CUA_KEY" != "" ]]; then
CUA_API_KEY="$EXISTING_CUA_KEY"
fi
fi
# If no valid API key found, prompt for one
if [[ -z "$CUA_API_KEY" ]]; then
echo "To use Cua Cloud Sandbox, you need to:"
echo "1. Sign up at https://cua.ai"
echo "2. Create a Cloud Sandbox"
echo "3. Generate an Api Key"
echo ""
read -p "Enter your Cua Api Key: " CUA_API_KEY
if [[ -z "$CUA_API_KEY" ]]; then
print_error "Cua Api Key is required for Cloud Sandbox."
exit 1
fi
else
print_success "Found existing CUA API key"
fi
USE_CLOUD=true
COMPUTER_TYPE="cloud"
elif [[ "$CHOICE" == "2" ]]; then
# Local macOS VM setup
echo ""
print_info "Setting up local macOS VMs..."
# Check for Apple Silicon Mac
if [[ $(uname -s) != "Darwin" || $(uname -m) != "arm64" ]]; then
print_error "Local macOS VMs require an Apple Silicon Mac (M1/M2/M3/M4)."
echo "💡 Consider using Cua Cloud Sandbox instead (option 1)."
exit 1
fi
# Check for macOS 15 (Sequoia) or newer
OSVERSION=$(sw_vers -productVersion)
if [[ $(echo "$OSVERSION 15.0" | tr " " "\n" | sort -V | head -n 1) != "15.0" ]]; then
print_error "Local macOS VMs require macOS 15 (Sequoia) or newer. You have $OSVERSION."
echo "💡 Consider using Cua Cloud Sandbox instead (option 1)."
exit 1
fi
USE_CLOUD=false
COMPUTER_TYPE="macos"
elif [[ "$CHOICE" == "3" ]]; then
# Local Windows VM setup
echo ""
print_info "Setting up local Windows VMs..."
# Check if we're on Windows
if [[ $(uname -s) != MINGW* && $(uname -s) != CYGWIN* && $(uname -s) != MSYS* ]]; then
print_error "Local Windows VMs require Windows 10 or 11."
echo "💡 Consider using Cua Cloud Sandbox instead (option 1)."
echo ""
echo "🔗 If you are using WSL, refer to the blog post to get started: https://cua.ai/blog/windows-sandbox"
exit 1
fi
USE_CLOUD=false
COMPUTER_TYPE="windows"
else
print_error "Invalid choice. Please run the script again and choose 1, 2, or 3."
exit 1
fi
print_success "All checks passed! 🎉"
# Create demo directory and handle repository
if [[ "$USE_EXISTING_REPO" == "true" ]]; then
print_info "Using existing repository in current directory"
cd "$REPO_DIR"
else
# Clone or update the repository
if [[ ! -d "$REPO_DIR" ]]; then
print_info "Cloning Cua repository..."
cd "$DEMO_DIR"
git clone https://github.com/trycua/cua.git
else
print_info "Updating Cua repository..."
cd "$REPO_DIR"
git pull origin main
fi
cd "$REPO_DIR"
fi
# Create .env.local file with API keys
ENV_FILE="$REPO_DIR/.env.local"
if [[ ! -f "$ENV_FILE" ]]; then
cat > "$ENV_FILE" << EOF
# Uncomment and add your API keys here
# OPENAI_API_KEY=your_openai_api_key_here
# ANTHROPIC_API_KEY=your_anthropic_api_key_here
CUA_API_KEY=your_cua_api_key_here
EOF
print_success "Created .env.local file with API key placeholders"
else
print_success "Found existing .env.local file - keeping your current settings"
fi
if [[ "$USE_CLOUD" == "true" ]]; then
# Add CUA API key to .env.local if not already present
if ! grep -q "CUA_API_KEY" "$ENV_FILE"; then
echo "CUA_API_KEY=$CUA_API_KEY" >> "$ENV_FILE"
print_success "Added CUA_API_KEY to .env.local"
elif grep -q "CUA_API_KEY=your_cua_api_key_here" "$ENV_FILE"; then
# Update placeholder with actual key
sed -i.bak "s/CUA_API_KEY=your_cua_api_key_here/CUA_API_KEY=$CUA_API_KEY/" "$ENV_FILE"
print_success "Updated CUA_API_KEY in .env.local"
fi
fi
# Build the Docker image if it doesn't exist
print_info "Checking Docker image..."
if ! docker image inspect cua-dev-image &> /dev/null; then
print_info "Building Docker image (this may take a while)..."
./scripts/run-docker-dev.sh build
else
print_success "Docker image already exists"
fi
# Install Lume if needed for local VMs
if [[ "$USE_CLOUD" == "false" && "$COMPUTER_TYPE" == "macos" ]]; then
if ! command -v lume &> /dev/null; then
print_info "Installing Lume CLI..."
curl -fsSL https://raw.githubusercontent.com/trycua/cua/main/libs/lume/scripts/install.sh | bash
# Add lume to PATH for this session if it's not already there
if ! command -v lume &> /dev/null; then
export PATH="$PATH:$HOME/.local/bin"
fi
fi
# Pull the macOS CUA image if not already present
if ! lume ls | grep -q "macos-sequoia-cua"; then
# Check available disk space
IMAGE_SIZE_GB=30
AVAILABLE_SPACE_KB=$(df -k $HOME | tail -1 | awk '{print $4}')
AVAILABLE_SPACE_GB=$(($AVAILABLE_SPACE_KB / 1024 / 1024))
echo "📊 The macOS CUA image will use approximately ${IMAGE_SIZE_GB}GB of disk space."
echo " You currently have ${AVAILABLE_SPACE_GB}GB available on your system."
# Prompt for confirmation
read -p " Continue? [y]/n: " CONTINUE
CONTINUE=${CONTINUE:-y}
if [[ $CONTINUE =~ ^[Yy]$ ]]; then
print_info "Pulling macOS CUA image (this may take a while)..."
# Use caffeinate on macOS to prevent system sleep during the pull
if command -v caffeinate &> /dev/null; then
print_info "Using caffeinate to prevent system sleep during download..."
caffeinate -i lume pull macos-sequoia-cua:latest
else
lume pull macos-sequoia-cua:latest
fi
else
print_error "Installation cancelled."
exit 1
fi
fi
# Check if the VM is running
print_info "Checking if the macOS CUA VM is running..."
VM_RUNNING=$(lume ls | grep "macos-sequoia-cua" | grep "running" || echo "")
if [ -z "$VM_RUNNING" ]; then
print_info "Starting the macOS CUA VM in the background..."
lume run macos-sequoia-cua:latest &
# Wait a moment for the VM to initialize
sleep 5
print_success "VM started successfully."
else
print_success "macOS CUA VM is already running."
fi
fi
# Create a convenience script to run the demo
cat > "$DEMO_DIR/start_ui.sh" << EOF
#!/bin/bash
cd "$REPO_DIR"
./scripts/run-docker-dev.sh run agent_ui_examples.py
EOF
chmod +x "$DEMO_DIR/start_ui.sh"
print_success "Setup complete!"
if [[ "$USE_CLOUD" == "true" ]]; then
echo "☁️ Cua Cloud Sandbox setup complete!"
else
echo "🖥️ Cua Local VM setup complete!"
fi
echo "📝 Edit $ENV_FILE to update your API keys"
echo "🖥️ Start the playground by running: $DEMO_DIR/start_ui.sh"
# Start the demo automatically
echo
print_info "Starting the Cua Computer-Use Agent UI..."
echo ""
print_success "Cua Computer-Use Agent UI is now running at http://localhost:7860/"
echo
echo "🌐 Open your browser and go to: http://localhost:7860/"
echo
"$DEMO_DIR/start_ui.sh"
```