#
tokens: 47046/50000 21/513 files (page 6/16)
lines: off (toggle) GitHub
raw markdown copy
This is page 6 of 16. Use http://codebase.md/trycua/cua?page={x} to view the full context.

# Directory Structure

```
├── .all-contributorsrc
├── .cursorignore
├── .devcontainer
│   ├── devcontainer.json
│   ├── post-install.sh
│   └── README.md
├── .dockerignore
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── ci-lume.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-pylume.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       └── test-validation-script.yml
├── .gitignore
├── .vscode
│   ├── docs.code-workspace
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── composite-agents.md
│   ├── cua-hackathon.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .gitignore
│   ├── .prettierrc
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   └── meta.json
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── meta.json
│   │       │   └── sandboxed-python.mdx
│   │       ├── index.mdx
│   │       ├── libraries
│   │       │   ├── agent
│   │       │   │   └── index.mdx
│   │       │   ├── computer
│   │       │   │   └── index.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── core
│   │       │   │   └── index.mdx
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   └── som
│   │       │       ├── configuration.mdx
│   │       │       └── index.mdx
│   │       ├── meta.json
│   │       ├── quickstart-cli.mdx
│   │       ├── quickstart-devs.mdx
│   │       └── telemetry.mdx
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   └── llms.txt
│   │   │       └── route.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── iou.tsx
│   │   │   └── mermaid.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   └── mdx-components.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── .prettierrc
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   └── uitars.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   └── test_connection.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── scripts
│   │   │       ├── install_mcp_server.sh
│   │   │       └── start_mcp_server.sh
│   │   ├── pylume
│   │   │   ├── __init__.py
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── pylume
│   │   │   │   ├── __init__.py
│   │   │   │   ├── client.py
│   │   │   │   ├── exceptions.py
│   │   │   │   ├── lume
│   │   │   │   ├── models.py
│   │   │   │   ├── pylume.py
│   │   │   │   └── server.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           └── test_omniparser.py
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── biome.json
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Dockerfile
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── pylume_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── pdm.lock
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── samples
│   └── community
│       ├── global-online
│       │   └── README.md
│       └── hack-the-north
│           └── README.md
├── scripts
│   ├── build-uv.sh
│   ├── build.ps1
│   ├── build.sh
│   ├── cleanup.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   └── run-docker-dev.sh
└── tests
    ├── pytest.ini
    ├── shell_cmd.py
    ├── test_files.py
    ├── test_mcp_server_session_management.py
    ├── test_mcp_server_streaming.py
    ├── test_shell_bash.py
    ├── test_telemetry.py
    ├── test_venv.py
    └── test_watchdog.py
```

# Files

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/holo.py:
--------------------------------------------------------------------------------

```python
"""
Holo 1.5 agent loop implementation for click prediction using litellm.acompletion.

Implements the Holo1.5 grounding behavior:
- Prompt asks for absolute pixel coordinates in JSON: {"action":"click_absolute","x":int,"y":int}
- Optionally resizes the image using Qwen2-VL smart_resize parameters (via transformers AutoProcessor)
- If resized, maps predicted coordinates back to the original screenshot resolution

Note: We do NOT manually load the model; acompletions (via HuggingFaceLocalAdapter)
will handle loading based on the provided model name.
"""

from __future__ import annotations

import base64
import json
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple

import litellm
from PIL import Image

from ..decorators import register_agent
from .base import AsyncAgentConfig
from ..types import AgentCapability


def _strip_hf_prefix(model: str) -> str:
    """Strip provider prefixes like 'huggingface-local/' from model names for HF processor load."""
    if "/" in model and model.lower().startswith("huggingface-local/"):
        return model.split("/", 1)[1]
    return model


def _maybe_smart_resize(image: Image.Image, model: str) -> Tuple[Image.Image, Tuple[int, int]]:
    """
    Try to compute Qwen2-VL smart_resize output size using transformers AutoProcessor.

    Returns (processed_image, (orig_w, orig_h)). If transformers or processor unavailable,
    returns the original image and size without resizing.
    """
    orig_w, orig_h = image.size
    try:
        # Import lazily to avoid hard dependency if not installed
        from transformers import AutoProcessor  # type: ignore
        from transformers.models.qwen2_vl.image_processing_qwen2_vl import (  # type: ignore
            smart_resize,
        )

        processor_name = _strip_hf_prefix(model)
        processor = AutoProcessor.from_pretrained(processor_name)
        image_processor = getattr(processor, "image_processor", None)
        if image_processor is None:
            return image, (orig_w, orig_h)

        factor = getattr(image_processor, "patch_size", 14) * getattr(image_processor, "merge_size", 1)
        min_pixels = getattr(image_processor, "min_pixels", 256 * 256)
        max_pixels = getattr(image_processor, "max_pixels", 1536 * 1536)

        resized_h, resized_w = smart_resize(
            orig_h,
            orig_w,
            factor=factor,
            min_pixels=min_pixels,
            max_pixels=max_pixels,
        )

        if (resized_w, resized_h) == (orig_w, orig_h):
            return image, (orig_w, orig_h)

        processed = image.resize((resized_w, resized_h), resample=Image.Resampling.LANCZOS)
        return processed, (orig_w, orig_h)
    except Exception:
        # If any failure (no transformers, processor load error), fall back to original
        return image, (orig_w, orig_h)


def _build_holo_prompt(instruction: str) -> str:
    """Construct the Holo1.5 grounding prompt."""
    # Keep it close to the cookbook while avoiding heavy schema generation
    schema_hint = '{"action": "click_absolute", "x": <int>, "y": <int>}'
    return (
        "Localize an element on the GUI image according to the provided target and output a click position. "
        f"You must output a valid JSON following the format: {schema_hint} "
        f"Your target is: {instruction}"
    )


def _parse_click_json(output_text: str) -> Optional[Tuple[int, int]]:
    """
    Parse JSON from model output and extract x, y ints.
    Tries to find the first JSON object substring if extra text is present.
    """
    try:
        # Fast path: direct JSON
        data = json.loads(output_text)
    except Exception:
        # Try to locate a JSON object within the text
        start = output_text.find("{")
        end = output_text.rfind("}")
        if start == -1 or end == -1 or end <= start:
            return None
        try:
            data = json.loads(output_text[start : end + 1])
        except Exception:
            return None

    try:
        x = int(data.get("x"))
        y = int(data.get("y"))
        return x, y
    except Exception:
        return None


@register_agent(models=r"(?i).*(Holo1\.5|Hcompany/Holo1\.5).*")
class HoloConfig(AsyncAgentConfig):
    """Holo is a family of UI grounding models from H Company"""

    async def predict_step(
        self,
        messages: List[Dict[str, Any]],
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs,
    ) -> Dict[str, Any]:
        # Holo models are only trained on UI localization tasks, not all-in-one agent
        raise NotImplementedError()

    async def predict_click(
        self,
        model: str,
        image_b64: str,
        instruction: str,
        **kwargs,
    ) -> Optional[Tuple[int, int]]:
        """
        Predict click coordinates using Holo1.5 via litellm.acompletion.

        - Optionally smart-resizes the image using Qwen2-VL rules if transformers are available
        - Prompts for JSON with absolute pixel coordinates
        - Parses x,y and maps back to original screenshot size if resized
        """
        try:
            img_bytes = base64.b64decode(image_b64)
            original_img = Image.open(BytesIO(img_bytes))
        except Exception:
            return None

        # Optional preprocessing
        processed_img, (orig_w, orig_h) = _maybe_smart_resize(original_img, model)

        # If we resized, send the resized image; otherwise send original
        img_to_send = processed_img
        buf = BytesIO()
        img_to_send.save(buf, format="PNG")
        processed_b64 = base64.b64encode(buf.getvalue()).decode("utf-8")

        prompt = _build_holo_prompt(instruction)

        messages = [
            {
                "role": "user",
                "content": [
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:image/png;base64,{processed_b64}"},
                    },
                    {"type": "text", "text": prompt},
                ],
            }
        ]

        api_kwargs = {
            "model": model,
            "messages": messages,
            # Deterministic, small output
            "max_tokens": kwargs.get("max_tokens", 256),
            "temperature": kwargs.get("temperature", 0.0),
        }

        response = await litellm.acompletion(**api_kwargs)
        output_text = (response.choices[0].message.content or "").strip()  # type: ignore

        coords = _parse_click_json(output_text)
        if coords is None:
            return None

        x, y = coords

        # Map back to original size if we resized
        proc_w, proc_h = img_to_send.size
        if (proc_w, proc_h) != (orig_w, orig_h):
            try:
                sx = orig_w / float(proc_w)
                sy = orig_h / float(proc_h)
                x = int(round(x * sx))
                y = int(round(y * sy))
            except Exception:
                # Fallback: clamp within original bounds
                pass

        # Clamp to original image bounds
        x = max(0, min(orig_w - 1, x))
        y = max(0, min(orig_h - 1, y))
        return x, y

    def get_capabilities(self) -> List[AgentCapability]:
        return ["click"]

```

--------------------------------------------------------------------------------
/.vscode/launch.json:
--------------------------------------------------------------------------------

```json
{
    "configurations": [
        {
            "name": "Agent UI",
            "type": "debugpy",
            "request": "launch",
            "program": "examples/agent_ui_examples.py",
            "console": "integratedTerminal",
            "justMyCode": false,
            "python": "${workspaceFolder:cua-root}/.venv/bin/python",
            "cwd": "${workspaceFolder:cua-root}",
            "env": {
                "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume"
            }
        },
        {
            "name": "Computer UI",
            "type": "debugpy",
            "request": "launch",
            "program": "examples/computer_ui_examples.py",
            "console": "integratedTerminal",
            "justMyCode": false,
            "python": "${workspaceFolder:cua-root}/.venv/bin/python",
            "cwd": "${workspaceFolder:cua-root}",
            "env": {
                "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume"
            }
        },
        {
            "name": "Run Computer Examples",
            "type": "debugpy",
            "request": "launch",
            "program": "examples/computer_examples.py",
            "console": "integratedTerminal",
            "justMyCode": true,
            "python": "${workspaceFolder:cua-root}/.venv/bin/python",
            "cwd": "${workspaceFolder:cua-root}",
            "env": {
                "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume"
            }
        },
        {
            "name": "Run Agent Examples",
            "type": "debugpy",
            "request": "launch",
            "program": "examples/agent_examples.py",
            "console": "integratedTerminal",
            "justMyCode": false,
            "python": "${workspaceFolder:cua-root}/.venv/bin/python",
            "cwd": "${workspaceFolder:cua-root}",
            "env": {
                "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume"
            }
        },
        {
            "name": "Run PyLume Examples",
            "type": "debugpy",
            "request": "launch",
            "program": "examples/pylume_examples.py",
            "console": "integratedTerminal",
            "justMyCode": true,
            "python": "${workspaceFolder:cua-root}/.venv/bin/python",
            "cwd": "${workspaceFolder:cua-root}",
            "env": {
                "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume"
            }
        },
        {
            "name": "SOM: Run Experiments (No OCR)",
            "type": "debugpy",
            "request": "launch",
            "program": "examples/som_examples.py",
            "args": [
                "examples/test_data",
                "--output-dir",
                "examples/output",
                "--ocr",
                "none",
                "--mode",
                "experiment"
            ],
            "console": "integratedTerminal",
            "justMyCode": false,
            "python": "${workspaceFolder:cua-root}/.venv/bin/python",
            "cwd": "${workspaceFolder:cua-root}",
            "env": {
                "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume"
            }
        },
        {
            "name": "SOM: Run Experiments (EasyOCR)",
            "type": "debugpy",
            "request": "launch",
            "program": "examples/som_examples.py",
            "args": [
                "examples/test_data",
                "--output-dir",
                "examples/output",
                "--ocr",
                "easyocr",
                "--mode",
                "experiment"
            ],
            "console": "integratedTerminal",
            "justMyCode": false,
            "python": "${workspaceFolder:cua-root}/.venv/bin/python",
            "cwd": "${workspaceFolder:cua-root}",
            "env": {
                "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume"
            }
        },
        {
            "name": "Run Computer Server",
            "type": "debugpy",
            "request": "launch",
            "program": "${workspaceFolder}/libs/python/computer-server/run_server.py",
            "console": "integratedTerminal",
            "justMyCode": true,
            "python": "${workspaceFolder:cua-root}/.venv/bin/python",
            "cwd": "${workspaceFolder:cua-root}",
            "env": {
                "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer:${workspaceFolder:cua-root}/libs/python/agent:${workspaceFolder:cua-root}/libs/python/som:${workspaceFolder:cua-root}/libs/python/pylume"
            }
        },
        {
            "name": "Run Computer Server with Args",
            "type": "debugpy",
            "request": "launch",
            "program": "${workspaceFolder}/libs/python/computer-server/run_server.py",
            "args": [
                "--host",
                "0.0.0.0",
                "--port",
                "8000",
                "--log-level",
                "debug"
            ],
            "console": "integratedTerminal",
            "justMyCode": false,
            "python": "${workspaceFolder:cua-root}/.venv/bin/python",
            "cwd": "${workspaceFolder:cua-root}",
            "env": {
                "PYTHONPATH": "${workspaceFolder:cua-root}/libs/python/core:${workspaceFolder:cua-root}/libs/python/computer-server"
            }
        },
        {
            "type": "lldb",
            "request": "launch",
            "args": [],
            "cwd": "${workspaceFolder:cua-root}/libs/lume",
            "name": "Debug lume (libs/lume)",
            "program": "${workspaceFolder:cua-root}/libs/lume/.build/debug/lume",
            "preLaunchTask": "swift: Build Debug lume (libs/lume)"
        },
        {
            "type": "lldb",
            "request": "launch",
            "args": [],
            "cwd": "${workspaceFolder:cua-root}/libs/lume",
            "name": "Release lume (libs/lume)",
            "program": "${workspaceFolder:cua-root}/libs/lume/.build/release/lume",
            "preLaunchTask": "swift: Build Release lume (libs/lume)"
        }
    ]
}
```

--------------------------------------------------------------------------------
/libs/lume/src/Commands/Config.swift:
--------------------------------------------------------------------------------

```swift
import ArgumentParser
import Foundation

struct Config: ParsableCommand {
    static let configuration = CommandConfiguration(
        commandName: "config",
        abstract: "Get or set lume configuration",
        subcommands: [Get.self, Storage.self, Cache.self, Caching.self],
        defaultSubcommand: Get.self
    )

    // MARK: - Basic Configuration Subcommands

    struct Get: ParsableCommand {
        static let configuration = CommandConfiguration(
            commandName: "get",
            abstract: "Get current configuration"
        )

        func run() throws {
            let controller = LumeController()
            let settings = controller.getSettings()

            // Display default location
            print(
                "Default VM storage: \(settings.defaultLocationName) (\(settings.defaultLocation?.path ?? "not set"))"
            )

            // Display cache directory
            print("Cache directory: \(settings.cacheDirectory)")

            // Display caching enabled status
            print("Caching enabled: \(settings.cachingEnabled)")

            // Display all locations
            if !settings.vmLocations.isEmpty {
                print("\nConfigured VM storage locations:")
                for location in settings.sortedLocations {
                    let isDefault = location.name == settings.defaultLocationName
                    let defaultMark = isDefault ? " (default)" : ""
                    print("  - \(location.name): \(location.path)\(defaultMark)")
                }
            }
        }
    }

    // MARK: - Debug Command

    struct Debug: ParsableCommand {
        static let configuration = CommandConfiguration(
            commandName: "debug",
            abstract: "Output detailed debug information about current configuration",
            shouldDisplay: false
        )

        func run() throws {
            let debugInfo = SettingsManager.shared.debugSettings()
            print(debugInfo)
        }
    }

    // MARK: - Caching Management Subcommands

    struct Caching: ParsableCommand {
        static let configuration = CommandConfiguration(
            commandName: "caching",
            abstract: "Manage image caching settings",
            subcommands: [GetCaching.self, SetCaching.self]
        )

        struct GetCaching: ParsableCommand {
            static let configuration = CommandConfiguration(
                commandName: "get",
                abstract: "Show current caching status"
            )

            func run() throws {
                let controller = LumeController()
                let cachingEnabled = controller.isCachingEnabled()
                print("Caching enabled: \(cachingEnabled)")
            }
        }

        struct SetCaching: ParsableCommand {
            static let configuration = CommandConfiguration(
                commandName: "set",
                abstract: "Enable or disable image caching"
            )

            @Argument(help: "Enable or disable caching (true/false)")
            var enabled: Bool

            func run() throws {
                let controller = LumeController()
                try controller.setCachingEnabled(enabled)
                print("Caching \(enabled ? "enabled" : "disabled")")
            }
        }
    }

    // MARK: - Cache Management Subcommands

    struct Cache: ParsableCommand {
        static let configuration = CommandConfiguration(
            commandName: "cache",
            abstract: "Manage cache settings",
            subcommands: [GetCache.self, SetCache.self]
        )

        struct GetCache: ParsableCommand {
            static let configuration = CommandConfiguration(
                commandName: "get",
                abstract: "Get current cache directory"
            )

            func run() throws {
                let controller = LumeController()
                let cacheDir = controller.getCacheDirectory()
                print("Cache directory: \(cacheDir)")
            }
        }

        struct SetCache: ParsableCommand {
            static let configuration = CommandConfiguration(
                commandName: "set",
                abstract: "Set cache directory"
            )

            @Argument(help: "Path to cache directory")
            var path: String

            func run() throws {
                let controller = LumeController()
                try controller.setCacheDirectory(path: path)
                print("Cache directory set to: \(path)")
            }
        }
    }

    // MARK: - Storage Management Subcommands

    struct Storage: ParsableCommand {
        static let configuration = CommandConfiguration(
            commandName: "storage",
            abstract: "Manage VM storage locations",
            subcommands: [Add.self, Remove.self, List.self, Default.self]
        )

        struct Add: ParsableCommand {
            static let configuration = CommandConfiguration(
                commandName: "add",
                abstract: "Add a new VM storage location"
            )

            @Argument(help: "Storage name (alphanumeric with dashes/underscores)")
            var name: String

            @Argument(help: "Path to VM storage directory")
            var path: String

            func run() throws {
                let controller = LumeController()
                try controller.addLocation(name: name, path: path)
                print("Added VM storage location: \(name) at \(path)")
            }
        }

        struct Remove: ParsableCommand {
            static let configuration = CommandConfiguration(
                commandName: "remove",
                abstract: "Remove a VM storage location"
            )

            @Argument(help: "Storage name to remove")
            var name: String

            func run() throws {
                let controller = LumeController()
                try controller.removeLocation(name: name)
                print("Removed VM storage location: \(name)")
            }
        }

        struct List: ParsableCommand {
            static let configuration = CommandConfiguration(
                commandName: "list",
                abstract: "List all VM storage locations"
            )

            func run() throws {
                let controller = LumeController()
                let settings = controller.getSettings()

                if settings.vmLocations.isEmpty {
                    print("No VM storage locations configured")
                    return
                }

                print("VM Storage Locations:")
                for location in settings.sortedLocations {
                    let isDefault = location.name == settings.defaultLocationName
                    let defaultMark = isDefault ? " (default)" : ""
                    print("  - \(location.name): \(location.path)\(defaultMark)")
                }
            }
        }

        struct Default: ParsableCommand {
            static let configuration = CommandConfiguration(
                commandName: "default",
                abstract: "Set the default VM storage location"
            )

            @Argument(help: "Storage name to set as default")
            var name: String

            func run() throws {
                let controller = LumeController()
                try controller.setDefaultLocation(name: name)
                print("Set default VM storage location to: \(name)")
            }
        }
    }
}

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/handlers/generic.py:
--------------------------------------------------------------------------------

```python
"""
Generic handlers for all OSes.

Includes:
- FileHandler

"""

from pathlib import Path
from typing import Dict, Any, Optional
from .base import BaseFileHandler
import base64

def resolve_path(path: str) -> Path:
    """Resolve a path to its absolute path. Expand ~ to the user's home directory.
    
    Args:
        path: The file or directory path to resolve
        
    Returns:
        Path: The resolved absolute path
    """
    return Path(path).expanduser().resolve()

class GenericFileHandler(BaseFileHandler):
    """
    Generic file handler that provides file system operations for all operating systems.
    
    This class implements the BaseFileHandler interface and provides methods for
    file and directory operations including reading, writing, creating, and deleting
    files and directories.
    """
    
    async def file_exists(self, path: str) -> Dict[str, Any]:
        """
        Check if a file exists at the specified path.
        
        Args:
            path: The file path to check
            
        Returns:
            Dict containing 'success' boolean and either 'exists' boolean or 'error' string
        """
        try:
            return {"success": True, "exists": resolve_path(path).is_file()}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def directory_exists(self, path: str) -> Dict[str, Any]:
        """
        Check if a directory exists at the specified path.
        
        Args:
            path: The directory path to check
            
        Returns:
            Dict containing 'success' boolean and either 'exists' boolean or 'error' string
        """
        try:
            return {"success": True, "exists": resolve_path(path).is_dir()}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def list_dir(self, path: str) -> Dict[str, Any]:
        """
        List all files and directories in the specified directory.
        
        Args:
            path: The directory path to list
            
        Returns:
            Dict containing 'success' boolean and either 'files' list of names or 'error' string
        """
        try:
            return {"success": True, "files": [p.name for p in resolve_path(path).iterdir() if p.is_file() or p.is_dir()]}
        except Exception as e:
            return {"success": False, "error": str(e)}
        
    async def read_text(self, path: str) -> Dict[str, Any]:
        """
        Read the contents of a text file.
        
        Args:
            path: The file path to read from
            
        Returns:
            Dict containing 'success' boolean and either 'content' string or 'error' string
        """
        try:
            return {"success": True, "content": resolve_path(path).read_text()}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def write_text(self, path: str, content: str) -> Dict[str, Any]:
        """
        Write text content to a file.
        
        Args:
            path: The file path to write to
            content: The text content to write
            
        Returns:
            Dict containing 'success' boolean and optionally 'error' string
        """
        try:
            resolve_path(path).write_text(content)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def write_bytes(self, path: str, content_b64: str, append: bool = False) -> Dict[str, Any]:
        """
        Write binary content to a file from base64 encoded string.
        
        Args:
            path: The file path to write to
            content_b64: Base64 encoded binary content
            append: If True, append to existing file; if False, overwrite
            
        Returns:
            Dict containing 'success' boolean and optionally 'error' string
        """
        try:
            mode = 'ab' if append else 'wb'
            with open(resolve_path(path), mode) as f:
                f.write(base64.b64decode(content_b64))
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}
        
    async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> Dict[str, Any]:
        """
        Read binary content from a file and return as base64 encoded string.
        
        Args:
            path: The file path to read from
            offset: Byte offset to start reading from
            length: Number of bytes to read; if None, read entire file from offset
            
        Returns:
            Dict containing 'success' boolean and either 'content_b64' string or 'error' string
        """
        try:
            file_path = resolve_path(path)
            with open(file_path, 'rb') as f:
                if offset > 0:
                    f.seek(offset)
                
                if length is not None:
                    content = f.read(length)
                else:
                    content = f.read()
                
            return {"success": True, "content_b64": base64.b64encode(content).decode('utf-8')}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def get_file_size(self, path: str) -> Dict[str, Any]:
        """
        Get the size of a file in bytes.
        
        Args:
            path: The file path to get size for
            
        Returns:
            Dict containing 'success' boolean and either 'size' integer or 'error' string
        """
        try:
            file_path = resolve_path(path)
            size = file_path.stat().st_size
            return {"success": True, "size": size}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def delete_file(self, path: str) -> Dict[str, Any]:
        """
        Delete a file at the specified path.
        
        Args:
            path: The file path to delete
            
        Returns:
            Dict containing 'success' boolean and optionally 'error' string
        """
        try:
            resolve_path(path).unlink()
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def create_dir(self, path: str) -> Dict[str, Any]:
        """
        Create a directory at the specified path.
        
        Creates parent directories if they don't exist and doesn't raise an error
        if the directory already exists.
        
        Args:
            path: The directory path to create
            
        Returns:
            Dict containing 'success' boolean and optionally 'error' string
        """
        try:
            resolve_path(path).mkdir(parents=True, exist_ok=True)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def delete_dir(self, path: str) -> Dict[str, Any]:
        """
        Delete an empty directory at the specified path.
        
        Args:
            path: The directory path to delete
            
        Returns:
            Dict containing 'success' boolean and optionally 'error' string
        """
        try:
            resolve_path(path).rmdir()
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

```

--------------------------------------------------------------------------------
/libs/python/pylume/pylume/models.py:
--------------------------------------------------------------------------------

```python
from typing import Optional, List, Literal, Dict, Any
import re
from pydantic import BaseModel, Field, computed_field, validator, ConfigDict, RootModel

class DiskInfo(BaseModel):
    """Information about disk storage allocation.
    
    Attributes:
        total: Total disk space in bytes
        allocated: Currently allocated disk space in bytes
    """
    total: int
    allocated: int

class VMConfig(BaseModel):
    """Configuration for creating a new VM.
    
    Note: Memory and disk sizes should be specified with units (e.g., "4GB", "64GB")
    
    Attributes:
        name: Name of the virtual machine
        os: Operating system type, either "macOS" or "linux"
        cpu: Number of CPU cores to allocate
        memory: Amount of memory to allocate with units
        disk_size: Size of the disk to create with units
        display: Display resolution in format "widthxheight"
        ipsw: IPSW path or 'latest' for macOS VMs, None for other OS types
    """
    name: str
    os: Literal["macOS", "linux"] = "macOS"
    cpu: int = Field(default=2, ge=1)
    memory: str = "4GB"
    disk_size: str = Field(default="64GB", alias="diskSize")
    display: str = "1024x768"
    ipsw: Optional[str] = Field(default=None, description="IPSW path or 'latest', for macOS VMs")

    class Config:
        populate_by_alias = True

class SharedDirectory(BaseModel):
    """Configuration for a shared directory.
    
    Attributes:
        host_path: Path to the directory on the host system
        read_only: Whether the directory should be mounted as read-only
    """
    host_path: str = Field(..., alias="hostPath")  # Allow host_path but serialize as hostPath
    read_only: bool = False
    
    class Config:
        populate_by_name = True  # Allow both alias and original name
        alias_generator = lambda s: ''.join(word.capitalize() if i else word for i, word in enumerate(s.split('_')))

class VMRunOpts(BaseModel):
    """Configuration for running a VM.
    
    Args:
        no_display: Whether to not display the VNC client
        shared_directories: List of directories to share with the VM
    """
    no_display: bool = Field(default=False, alias="noDisplay")
    shared_directories: Optional[list[SharedDirectory]] = Field(
        default=None, 
        alias="sharedDirectories"
    )

    model_config = ConfigDict(
        populate_by_name=True,
        alias_generator=lambda s: ''.join(word.capitalize() if i else word for i, word in enumerate(s.split('_')))
    )

    def model_dump(self, **kwargs):
        """Export model data with proper field name conversion.
        
        Converts shared directory fields to match API expectations when using aliases.
        
        Args:
            **kwargs: Keyword arguments passed to parent model_dump method
            
        Returns:
            dict: Model data with properly formatted field names
        """
        data = super().model_dump(**kwargs)
        # Convert shared directory fields to match API expectations
        if self.shared_directories and "by_alias" in kwargs and kwargs["by_alias"]:
            data["sharedDirectories"] = [
                {
                    "hostPath": d.host_path,
                    "readOnly": d.read_only
                }
                for d in self.shared_directories
            ]
            # Remove the snake_case version if it exists
            data.pop("shared_directories", None)
        return data

class VMStatus(BaseModel):
    """Status information for a virtual machine.
    
    Attributes:
        name: Name of the virtual machine
        status: Current status of the VM
        os: Operating system type
        cpu_count: Number of CPU cores allocated
        memory_size: Amount of memory allocated in bytes
        disk_size: Disk storage information
        vnc_url: URL for VNC connection if available
        ip_address: IP address of the VM if available
    """
    name: str
    status: str
    os: Literal["macOS", "linux"]
    cpu_count: int = Field(alias="cpuCount")
    memory_size: int = Field(alias="memorySize")  # API returns memory size in bytes
    disk_size: DiskInfo = Field(alias="diskSize")
    vnc_url: Optional[str] = Field(default=None, alias="vncUrl")
    ip_address: Optional[str] = Field(default=None, alias="ipAddress")

    class Config:
        populate_by_alias = True

    @computed_field
    @property
    def state(self) -> str:
        """Get the current state of the VM.
        
        Returns:
            str: Current VM status
        """
        return self.status

    @computed_field
    @property
    def cpu(self) -> int:
        """Get the number of CPU cores.
        
        Returns:
            int: Number of CPU cores allocated to the VM
        """
        return self.cpu_count

    @computed_field
    @property
    def memory(self) -> str:
        """Get memory allocation in human-readable format.
        
        Returns:
            str: Memory size formatted as "{size}GB"
        """
        # Convert bytes to GB
        gb = self.memory_size / (1024 * 1024 * 1024)
        return f"{int(gb)}GB"

class VMUpdateOpts(BaseModel):
    """Options for updating VM configuration.
    
    Attributes:
        cpu: Number of CPU cores to update to
        memory: Amount of memory to update to with units
        disk_size: Size of disk to update to with units
    """
    cpu: Optional[int] = None
    memory: Optional[str] = None
    disk_size: Optional[str] = None

class ImageRef(BaseModel):
    """Reference to a VM image.
    
    Attributes:
        image: Name of the image
        tag: Tag version of the image
        registry: Registry hostname where image is stored
        organization: Organization or namespace in the registry
    """
    image: str
    tag: str = "latest"
    registry: Optional[str] = "ghcr.io"
    organization: Optional[str] = "trycua"

    def model_dump(self, **kwargs):
        """Override model_dump to return just the image:tag format.
        
        Args:
            **kwargs: Keyword arguments (ignored)
            
        Returns:
            str: Image reference in "image:tag" format
        """
        return f"{self.image}:{self.tag}"

class CloneSpec(BaseModel):
    """Specification for cloning a VM.
    
    Attributes:
        name: Name of the source VM to clone
        new_name: Name for the new cloned VM
    """
    name: str
    new_name: str = Field(alias="newName")

    class Config:
        populate_by_alias = True

class ImageInfo(BaseModel):
    """Model for individual image information.
    
    Attributes:
        imageId: Unique identifier for the image
    """
    imageId: str

class ImageList(RootModel):
    """Response model for the images endpoint.
    
    A list-like container for ImageInfo objects that provides
    iteration and indexing capabilities.
    """
    root: List[ImageInfo]

    def __iter__(self):
        """Iterate over the image list.
        
        Returns:
            Iterator over ImageInfo objects
        """
        return iter(self.root)

    def __getitem__(self, item):
        """Get an item from the image list by index.
        
        Args:
            item: Index or slice to retrieve
            
        Returns:
            ImageInfo or list of ImageInfo objects
        """
        return self.root[item]

    def __len__(self):
        """Get the number of images in the list.
        
        Returns:
            int: Number of images in the list
        """
        return len(self.root)
```

--------------------------------------------------------------------------------
/libs/python/mcp-server/CONCURRENT_SESSIONS.md:
--------------------------------------------------------------------------------

```markdown
# MCP Server Concurrent Session Management

This document describes the improvements made to the MCP Server to address concurrent session management and resource lifecycle issues.

## Problem Statement

The original MCP server implementation had several critical issues:

1. **Global Computer Instance**: Used a single `global_computer` variable shared across all clients
2. **No Resource Isolation**: Multiple clients would interfere with each other
3. **Sequential Task Processing**: Multi-task operations were always sequential
4. **No Graceful Shutdown**: Server couldn't properly cleanup resources on shutdown
5. **Hidden Event Loop**: `server.run()` hid the event loop, preventing proper lifecycle management

## Solution Architecture

### 1. Session Manager (`session_manager.py`)

The `SessionManager` class provides:

- **Per-session computer instances**: Each client gets isolated computer resources
- **Computer instance pooling**: Efficient reuse of computer instances with lifecycle management
- **Task registration**: Track active tasks per session for graceful cleanup
- **Automatic cleanup**: Background task cleans up idle sessions
- **Resource limits**: Configurable maximum concurrent sessions

#### Key Components:

```python
class SessionManager:
    def __init__(self, max_concurrent_sessions: int = 10):
        self._sessions: Dict[str, SessionInfo] = {}
        self._computer_pool = ComputerPool()
        # ... lifecycle management
```

#### Session Lifecycle:

1. **Creation**: New session created when client first connects
2. **Task Registration**: Each task is registered with the session
3. **Activity Tracking**: Last activity time updated on each operation
4. **Cleanup**: Sessions cleaned up when idle or on shutdown

### 2. Computer Pool (`ComputerPool`)

Manages computer instances efficiently:

- **Pool Size Limits**: Maximum number of concurrent computer instances
- **Instance Reuse**: Available instances reused across sessions
- **Lifecycle Management**: Proper startup/shutdown of computer instances
- **Resource Cleanup**: All instances properly closed on shutdown

### 3. Enhanced Server Tools

All server tools now support:

- **Session ID Parameter**: Optional `session_id` for multi-client support
- **Resource Isolation**: Each session gets its own computer instance
- **Task Tracking**: Proper registration/unregistration of tasks
- **Error Handling**: Graceful error handling with session cleanup

#### Updated Tool Signatures:

```python
async def screenshot_cua(ctx: Context, session_id: Optional[str] = None) -> Any:
async def run_cua_task(ctx: Context, task: str, session_id: Optional[str] = None) -> Any:
async def run_multi_cua_tasks(ctx: Context, tasks: List[str], session_id: Optional[str] = None, concurrent: bool = False) -> Any:
```

### 4. Concurrent Task Execution

The `run_multi_cua_tasks` tool now supports:

- **Sequential Mode** (default): Tasks run one after another
- **Concurrent Mode**: Tasks run in parallel using `asyncio.gather()`
- **Progress Tracking**: Proper progress reporting for both modes
- **Error Handling**: Individual task failures don't stop other tasks

### 5. Graceful Shutdown

The server now provides:

- **Signal Handlers**: Proper handling of SIGINT and SIGTERM
- **Session Cleanup**: All active sessions properly cleaned up
- **Resource Release**: Computer instances returned to pool and closed
- **Async Lifecycle**: Event loop properly exposed for cleanup

## Usage Examples

### Basic Usage (Backward Compatible)

```python
# These calls work exactly as before
await screenshot_cua(ctx)
await run_cua_task(ctx, "Open browser")
await run_multi_cua_tasks(ctx, ["Task 1", "Task 2"])
```

### Multi-Client Usage

```python
# Client 1
session_id_1 = "client-1-session"
await screenshot_cua(ctx, session_id_1)
await run_cua_task(ctx, "Open browser", session_id_1)

# Client 2 (completely isolated)
session_id_2 = "client-2-session"
await screenshot_cua(ctx, session_id_2)
await run_cua_task(ctx, "Open editor", session_id_2)
```

### Concurrent Task Execution

```python
# Run tasks concurrently instead of sequentially
tasks = ["Open browser", "Open editor", "Open terminal"]
results = await run_multi_cua_tasks(ctx, tasks, concurrent=True)
```

### Session Management

```python
# Get session statistics
stats = await get_session_stats(ctx)
print(f"Active sessions: {stats['total_sessions']}")

# Cleanup specific session
await cleanup_session(ctx, "session-to-cleanup")
```

## Configuration

### Environment Variables

- `CUA_MODEL_NAME`: Model to use (default: `anthropic/claude-3-5-sonnet-20241022`)
- `CUA_MAX_IMAGES`: Maximum images to keep (default: `3`)

### Session Manager Configuration

```python
# In session_manager.py
class SessionManager:
    def __init__(self, max_concurrent_sessions: int = 10):
        # Configurable maximum concurrent sessions
        
class ComputerPool:
    def __init__(self, max_size: int = 5, idle_timeout: float = 300.0):
        # Configurable pool size and idle timeout
```

## Performance Improvements

### Before (Issues):
- ❌ Single global computer instance
- ❌ Client interference and resource conflicts
- ❌ Sequential task processing only
- ❌ No graceful shutdown
- ❌ 30s timeout issues with long-running tasks

### After (Benefits):
- ✅ Per-session computer instances with proper isolation
- ✅ Computer instance pooling for efficient resource usage
- ✅ Concurrent task execution support
- ✅ Graceful shutdown with proper cleanup
- ✅ Streaming updates prevent timeout issues
- ✅ Configurable resource limits
- ✅ Automatic session cleanup

## Testing

Comprehensive test coverage includes:

- Session creation and reuse
- Concurrent session isolation
- Task registration and cleanup
- Error handling with session management
- Concurrent vs sequential task execution
- Session statistics and cleanup

Run tests with:

```bash
pytest tests/test_mcp_server_session_management.py -v
```

## Migration Guide

### For Existing Clients

No changes required! The new implementation is fully backward compatible:

```python
# This still works exactly as before
await run_cua_task(ctx, "My task")
```

### For New Multi-Client Applications

Use session IDs for proper isolation:

```python
# Create a unique session ID for each client
session_id = str(uuid.uuid4())
await run_cua_task(ctx, "My task", session_id)
```

### For Concurrent Task Execution

Enable concurrent mode for better performance:

```python
tasks = ["Task 1", "Task 2", "Task 3"]
results = await run_multi_cua_tasks(ctx, tasks, concurrent=True)
```

## Monitoring and Debugging

### Session Statistics

```python
stats = await get_session_stats(ctx)
print(f"Total sessions: {stats['total_sessions']}")
print(f"Max concurrent: {stats['max_concurrent']}")
for session_id, session_info in stats['sessions'].items():
    print(f"Session {session_id}: {session_info['active_tasks']} active tasks")
```

### Logging

The server provides detailed logging for:

- Session creation and cleanup
- Task registration and completion
- Resource pool usage
- Error conditions and recovery

### Graceful Shutdown

The server properly handles shutdown signals:

```bash
# Send SIGTERM for graceful shutdown
kill -TERM <server_pid>

# Or use Ctrl+C (SIGINT)
```

## Future Enhancements

Potential future improvements:

1. **Session Persistence**: Save/restore session state across restarts
2. **Load Balancing**: Distribute sessions across multiple server instances
3. **Resource Monitoring**: Real-time monitoring of resource usage
4. **Auto-scaling**: Dynamic adjustment of pool size based on demand
5. **Session Timeouts**: Configurable timeouts for different session types

```

--------------------------------------------------------------------------------
/blog/human-in-the-loop.md:
--------------------------------------------------------------------------------

```markdown
# When Agents Need Human Wisdom - Introducing Human-In-The-Loop Support

*Published on August 29, 2025 by Francesco Bonacci*

Sometimes the best AI agent is a human. Whether you're creating training demonstrations, evaluating complex scenarios, or need to intervene when automation hits a wall, our new Human-In-The-Loop integration puts you directly in control.

With yesterday's [HUD evaluation integration](hud-agent-evals.md), you could benchmark any agent at scale. Today's update lets you *become* the agent when it matters most—seamlessly switching between automated intelligence and human judgment.

<div align="center">
  <video src="https://github.com/user-attachments/assets/9091b50f-26e7-4981-95ce-40e5d42a1260" width="600" controls></video>
</div>

## What you get

- **One-line human takeover** for any agent configuration with `human/human` or `model+human/human`
- **Interactive web UI** to see what your agent sees and control what it does
- **Zero context switching** - step in exactly where automation left off
- **Training data generation** - create perfect demonstrations by doing tasks yourself
- **Ground truth evaluation** - validate agent performance with human expertise

## Why Human-In-The-Loop?

Even the most sophisticated agents encounter edge cases, ambiguous interfaces, or tasks requiring human judgment. Rather than failing gracefully, they can now fail *intelligently*—by asking for human help.

This approach bridges the gap between fully automated systems and pure manual control, letting you:
- **Demonstrate complex workflows** that agents can learn from
- **Evaluate tricky scenarios** where ground truth requires human assessment  
- **Intervene selectively** when automated agents need guidance
- **Test and debug** your tools and environments manually

## Getting Started

Launch the human agent interface:

```bash
python -m agent.human_tool
```

The web UI will show pending completions. Click any completion to take control of the agent and see exactly what it sees.

## Usage Examples

### Direct Human Control

Perfect for creating demonstrations or when you want full manual control:

```python
from agent import ComputerAgent
from agent.computer import computer

agent = ComputerAgent(
    "human/human",
    tools=[computer]
)

# You'll get full control through the web UI
async for _ in agent.run("Take a screenshot, analyze the UI, and click on the most prominent button"):
    pass
```

### Hybrid: AI Planning + Human Execution

Combine model intelligence with human precision—let AI plan, then execute manually:

```python
agent = ComputerAgent(
    "huggingface-local/HelloKKMe/GTA1-7B+human/human",  
    tools=[computer]
)

# AI creates the plan, human executes each step
async for _ in agent.run("Navigate to the settings page and enable dark mode"):
    pass
```

### Fallback Pattern

Start automated, escalate to human when needed:

```python
# Primary automated agent
primary_agent = ComputerAgent("openai/computer-use-preview", tools=[computer])

# Human fallback agent  
fallback_agent = ComputerAgent("human/human", tools=[computer])

try:
    async for result in primary_agent.run(task):
        if result.confidence < 0.7:  # Low confidence threshold
            # Seamlessly hand off to human
            async for _ in fallback_agent.run(f"Continue this task: {task}"):
                pass
except Exception:
    # Agent failed, human takes over
    async for _ in fallback_agent.run(f"Handle this failed task: {task}"):
        pass
```

## Interactive Features

The human-in-the-loop interface provides a rich, responsive experience:

### **Visual Environment**
- **Screenshot display** with live updates as you work
- **Click handlers** for direct interaction with UI elements  
- **Zoom and pan** to see details clearly

### **Action Controls**
- **Click actions** - precise cursor positioning and clicking
- **Keyboard input** - type text naturally or send specific key combinations
- **Action history** - see the sequence of actions taken
- **Undo support** - step back when needed

### **Tool Integration** 
- **Full OpenAI compatibility** - standard tool call format
- **Custom tools** - integrate your own tools seamlessly
- **Real-time feedback** - see tool responses immediately

### **Smart Polling**
- **Responsive updates** - UI refreshes when new completions arrive
- **Background processing** - continue working while waiting for tasks
- **Session persistence** - resume interrupted sessions

## Real-World Use Cases

### **Training Data Generation**
Create perfect demonstrations for fine-tuning:

```python
# Generate training examples for spreadsheet tasks
demo_agent = ComputerAgent("human/human", tools=[computer])

tasks = [
    "Create a budget spreadsheet with income and expense categories",
    "Apply conditional formatting to highlight overbudget items", 
    "Generate a pie chart showing expense distribution"
]

for task in tasks:
    # Human demonstrates each task perfectly
    async for _ in demo_agent.run(task):
        pass  # Recorded actions become training data
```

### **Evaluation and Ground Truth**
Validate agent performance on complex scenarios:

```python
# Human evaluates agent performance
evaluator = ComputerAgent("human/human", tools=[computer])

async for _ in evaluator.run("Review this completed form and rate accuracy (1-10)"):
    pass  # Human provides authoritative quality assessment
```

### **Interactive Debugging**
Step through agent behavior manually:

```python
# Test a workflow step by step
debug_agent = ComputerAgent("human/human", tools=[computer])

async for _ in debug_agent.run("Reproduce the agent's failed login sequence"):
    pass  # Human identifies exactly where automation breaks
```

### **Edge Case Handling**
Handle scenarios that break automated agents:

```python
# Complex UI interaction requiring human judgment
edge_case_agent = ComputerAgent("human/human", tools=[computer])

async for _ in edge_case_agent.run("Navigate this CAPTCHA-protected form"):
    pass  # Human handles what automation cannot
```

## Configuration Options

Customize the human agent experience:

- **UI refresh rate**: Adjust polling frequency for your workflow
- **Image quality**: Balance detail vs. performance for screenshots  
- **Action logging**: Save detailed traces for analysis and training
- **Session timeout**: Configure idle timeouts for security
- **Tool permissions**: Restrict which tools humans can access

## When to Use Human-In-The-Loop

| **Scenario** | **Why Human Control** |
|--------------|----------------------|
| **Creating training data** | Perfect demonstrations for model fine-tuning |
| **Evaluating complex tasks** | Human judgment for subjective or nuanced assessment |  
| **Handling edge cases** | CAPTCHAs, unusual UIs, context-dependent decisions |
| **Debugging workflows** | Step through failures to identify breaking points |
| **High-stakes operations** | Critical tasks requiring human oversight and approval |
| **Testing new environments** | Validate tools and environments work as expected |

## Learn More

- **Interactive examples**: Try human-in-the-loop control with sample tasks
- **Training data pipelines**: Learn how to convert human demonstrations into model training data  
- **Evaluation frameworks**: Build human-validated test suites for your agents
- **API documentation**: Full reference for human agent configuration

Ready to put humans back in the loop? The most sophisticated AI system knows when to ask for help.

---

*Questions about human-in-the-loop agents? Join the conversation in our [Discord community](https://discord.gg/cua-ai) or check out our [documentation](https://docs.trycua.com/docs/agent-sdk/supported-agents/human-in-the-loop).*

```

--------------------------------------------------------------------------------
/docs/content/docs/quickstart-cli.mdx:
--------------------------------------------------------------------------------

```markdown
---
title: Quickstart (CLI)
description: Get started with the cua Agent CLI in 4 steps
icon: Rocket
---

import { Step, Steps } from 'fumadocs-ui/components/steps';
import { Tab, Tabs } from 'fumadocs-ui/components/tabs';
import { Accordion, Accordions } from 'fumadocs-ui/components/accordion';

Get up and running with the cua Agent CLI in 4 simple steps.

<Steps>
<Step>

## Introduction

cua combines Computer (interface) + Agent (AI) for automating desktop apps. The Agent CLI provides a clean terminal interface to control your remote computer using natural language commands.

</Step>

<Step>

## Set Up Your Computer Environment

Choose how you want to run your cua computer. **Cloud Sandbox is recommended** for the easiest setup:

<Tabs items={['☁️ Cloud Sandbox (Recommended)', 'Linux on Docker', 'Windows Sandbox', 'macOS VM']}>
  <Tab value="☁️ Cloud Sandbox (Recommended)">

    **Easiest & safest way to get started - works on any host OS**

    1. Go to [trycua.com/signin](https://www.trycua.com/signin)
    2. Navigate to **Dashboard > Containers > Create Instance**
    3. Create a **Medium, Ubuntu 22** container
    4. Note your container name and API key

    Your cloud container will be automatically configured and ready to use.

  </Tab>
  <Tab value="Linux on Docker">

    **Run Linux desktop locally on macOS, Windows, or Linux hosts**

    1. Install Docker Desktop or Docker Engine

    2. Pull the CUA XFCE container (lightweight desktop)

    ```bash
    docker pull --platform=linux/amd64 trycua/cua-xfce:latest
    ```

    Or use KASM for a full-featured desktop:

    ```bash
    docker pull --platform=linux/amd64 trycua/cua-ubuntu:latest
    ```

  </Tab>
  <Tab value="Windows Sandbox">

    **Windows hosts only - requires Windows 10 Pro/Enterprise or Windows 11**

    1. Enable Windows Sandbox
    2. Install pywinsandbox dependency

    ```bash
    pip install -U git+git://github.com/karkason/pywinsandbox.git
    ```

    3. Windows Sandbox will be automatically configured when you run the CLI

  </Tab>
  <Tab value="macOS VM">

    **macOS hosts only - requires Lume CLI**

    1. Install lume cli

    ```bash
    /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/trycua/cua/main/libs/lume/scripts/install.sh)"
    ```

    2. Start a local cua macOS VM

    ```bash
    lume run macos-sequoia-cua:latest
    ```

  </Tab>
</Tabs>

</Step>

<Step>

## Install cua

<Accordions type="single" defaultValue="uv">

<Accordion title="uv (Recommended)" value="uv">

### Install uv

<Tabs items={['macOS / Linux', 'Windows']} persist>
<Tab value="macOS / Linux">

```bash
# Use curl to download the script and execute it with sh:
curl -LsSf https://astral.sh/uv/install.sh | sh

# If your system doesn't have curl, you can use wget:
# wget -qO- https://astral.sh/uv/install.sh | sh
```

</Tab>
<Tab value="Windows">

```powershell
# Use irm to download the script and execute it with iex:
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
```

</Tab>
</Tabs>

### Install Python 3.12

```bash
uv python install 3.12
# uv will install cua dependencies automatically when you use --with "cua-agent[cli]"
```

</Accordion>

<Accordion title="conda" value="conda">

### Install conda

<Tabs items={['macOS', 'Linux', 'Windows']} persist>
<Tab value="macOS">

```bash
mkdir -p ~/miniconda3
curl https://repo.anaconda.com/miniconda/Miniconda3-latest-MacOSX-arm64.sh -o ~/miniconda3/miniconda.sh
bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3
rm ~/miniconda3/miniconda.sh
source ~/miniconda3/bin/activate
```

</Tab>
<Tab value="Linux">

```bash
mkdir -p ~/miniconda3
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda3/miniconda.sh
bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3
rm ~/miniconda3/miniconda.sh
source ~/miniconda3/bin/activate
```

</Tab>
<Tab value="Windows">

```powershell
wget "https://repo.anaconda.com/miniconda/Miniconda3-latest-Windows-x86_64.exe" -outfile ".\miniconda.exe"
Start-Process -FilePath ".\miniconda.exe" -ArgumentList "/S" -Wait
del .\miniconda.exe
```

</Tab>
</Tabs>

### Create and activate Python 3.12 environment

```bash
conda create -n cua python=3.12
conda activate cua
```

### Install cua

```bash
pip install "cua-agent[cli]" cua-computer
```

</Accordion>

<Accordion title="pip" value="pip">

### Install cua

```bash
pip install "cua-agent[cli]" cua-computer
```

</Accordion>

</Accordions>

</Step>

<Step>

## Run cua CLI

Choose your preferred AI model:

### OpenAI Computer Use Preview

<Tabs items={['uv', 'conda/pip']} persist>
<Tab value="uv">

```bash
uv run --with "cua-agent[cli]" -m agent.cli openai/computer-use-preview
```

</Tab>
<Tab value="conda/pip">

```bash
python -m agent.cli openai/computer-use-preview
```

</Tab>
</Tabs>

### Anthropic Claude

<Tabs items={['uv', 'conda/pip']} persist>
<Tab value="uv">

```bash
uv run --with "cua-agent[cli]" -m agent.cli anthropic/claude-sonnet-4-5-20250929
uv run --with "cua-agent[cli]" -m agent.cli anthropic/claude-opus-4-20250514
uv run --with "cua-agent[cli]" -m agent.cli anthropic/claude-opus-4-1-20250805
uv run --with "cua-agent[cli]" -m agent.cli anthropic/claude-sonnet-4-20250514
uv run --with "cua-agent[cli]" -m agent.cli anthropic/claude-3-5-sonnet-20241022
```

</Tab>
<Tab value="conda/pip">

```bash
python -m agent.cli anthropic/claude-sonnet-4-5-20250929
python -m agent.cli anthropic/claude-opus-4-1-20250805
python -m agent.cli anthropic/claude-opus-4-20250514
python -m agent.cli anthropic/claude-sonnet-4-20250514
python -m agent.cli anthropic/claude-3-5-sonnet-20241022
```

</Tab>
</Tabs>

### Omniparser + LLMs

<Tabs items={['uv', 'conda/pip']} persist>
<Tab value="uv">

```bash
uv run --with "cua-agent[cli]" -m agent.cli omniparser+anthropic/claude-3-5-sonnet-20241022
uv run --with "cua-agent[cli]" -m agent.cli omniparser+openai/gpt-4o
uv run --with "cua-agent[cli]" -m agent.cli omniparser+vertex_ai/gemini-pro
```

</Tab>
<Tab value="conda/pip">

```bash
python -m agent.cli omniparser+anthropic/claude-3-5-sonnet-20241022
python -m agent.cli omniparser+openai/gpt-4o
python -m agent.cli omniparser+vertex_ai/gemini-pro
```

</Tab>
</Tabs>

### Local Models

<Tabs items={['uv', 'conda/pip']} persist>
<Tab value="uv">

```bash
# Hugging Face models (local)
uv run --with "cua-agent[cli]" -m agent.cli huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B

# MLX models (Apple Silicon)
uv run --with "cua-agent[cli]" -m agent.cli mlx/mlx-community/UI-TARS-1.5-7B-6bit

# Ollama models
uv run --with "cua-agent[cli]" -m agent.cli omniparser+ollama_chat/llama3.2:latest
```

</Tab>
<Tab value="conda/pip">

```bash
# Hugging Face models (local)
python -m agent.cli huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B

# MLX models (Apple Silicon)
python -m agent.cli mlx/mlx-community/UI-TARS-1.5-7B-6bit

# Ollama models
python -m agent.cli omniparser+ollama_chat/llama3.2:latest
```

</Tab>
</Tabs>

### Interactive Setup

If you haven't set up environment variables, the CLI will guide you through the setup:

1. **Sandbox Name**: Enter your cua sandbox name (or get one at [trycua.com](https://www.trycua.com/))
2. **CUA API Key**: Enter your cua API key
3. **Provider API Key**: Enter your AI provider API key (OpenAI, Anthropic, etc.)

### Start Chatting

Once connected, you'll see:

```
💻 Connected to your-container-name (model, agent_loop)
Type 'exit' to quit.

>
```

You can ask your agent to perform actions like:

- "Take a screenshot and tell me what's on the screen"
- "Open Firefox and go to github.com"
- "Type 'Hello world' into the terminal"
- "Close the current window"
- "Click on the search button"

</Step>
</Steps>

---

For advanced Python usage and GUI interface, see the [Quickstart (GUI)](/quickstart-ui) and [Quickstart for Developers](/quickstart-devs).

For running models locally, see [Running Models Locally](/agent-sdk/local-models).

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/human_tool/server.py:
--------------------------------------------------------------------------------

```python
import asyncio
import uuid
from datetime import datetime
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel


class CompletionStatus(str, Enum):
    PENDING = "pending"
    COMPLETED = "completed"
    FAILED = "failed"


@dataclass
class CompletionCall:
    id: str
    messages: List[Dict[str, Any]]
    model: str
    status: CompletionStatus
    created_at: datetime
    completed_at: Optional[datetime] = None
    response: Optional[str] = None
    tool_calls: Optional[List[Dict[str, Any]]] = None
    error: Optional[str] = None


class ToolCall(BaseModel):
    id: str
    type: str = "function"
    function: Dict[str, Any]


class CompletionRequest(BaseModel):
    messages: List[Dict[str, Any]]
    model: str


class CompletionResponse(BaseModel):
    response: Optional[str] = None
    tool_calls: Optional[List[Dict[str, Any]]] = None


class CompletionQueue:
    def __init__(self):
        self._queue: Dict[str, CompletionCall] = {}
        self._pending_order: List[str] = []
        self._lock = asyncio.Lock()
    
    async def add_completion(self, messages: List[Dict[str, Any]], model: str) -> str:
        """Add a completion call to the queue."""
        async with self._lock:
            call_id = str(uuid.uuid4())
            completion_call = CompletionCall(
                id=call_id,
                messages=messages,
                model=model,
                status=CompletionStatus.PENDING,
                created_at=datetime.now()
            )
            self._queue[call_id] = completion_call
            self._pending_order.append(call_id)
            return call_id
    
    async def get_pending_calls(self) -> List[Dict[str, Any]]:
        """Get all pending completion calls."""
        async with self._lock:
            pending_calls = []
            for call_id in self._pending_order:
                if call_id in self._queue and self._queue[call_id].status == CompletionStatus.PENDING:
                    call = self._queue[call_id]
                    pending_calls.append({
                        "id": call.id,
                        "model": call.model,
                        "created_at": call.created_at.isoformat(),
                        "messages": call.messages
                    })
            return pending_calls
    
    async def get_call_status(self, call_id: str) -> Optional[Dict[str, Any]]:
        """Get the status of a specific completion call."""
        async with self._lock:
            if call_id not in self._queue:
                return None
            
            call = self._queue[call_id]
            result = {
                "id": call.id,
                "status": call.status.value,
                "created_at": call.created_at.isoformat(),
                "model": call.model,
                "messages": call.messages
            }
            
            if call.completed_at:
                result["completed_at"] = call.completed_at.isoformat()
            if call.response:
                result["response"] = call.response
            if call.tool_calls:
                result["tool_calls"] = call.tool_calls
            if call.error:
                result["error"] = call.error
                
            return result
    
    async def complete_call(self, call_id: str, response: Optional[str] = None, tool_calls: Optional[List[Dict[str, Any]]] = None) -> bool:
        """Mark a completion call as completed with a response or tool calls."""
        async with self._lock:
            if call_id not in self._queue:
                return False
            
            call = self._queue[call_id]
            if call.status != CompletionStatus.PENDING:
                return False
            
            call.status = CompletionStatus.COMPLETED
            call.completed_at = datetime.now()
            call.response = response
            call.tool_calls = tool_calls
            
            # Remove from pending order
            if call_id in self._pending_order:
                self._pending_order.remove(call_id)
            
            return True
    
    async def fail_call(self, call_id: str, error: str) -> bool:
        """Mark a completion call as failed with an error."""
        async with self._lock:
            if call_id not in self._queue:
                return False
            
            call = self._queue[call_id]
            if call.status != CompletionStatus.PENDING:
                return False
            
            call.status = CompletionStatus.FAILED
            call.completed_at = datetime.now()
            call.error = error
            
            # Remove from pending order
            if call_id in self._pending_order:
                self._pending_order.remove(call_id)
            
            return True
    
    async def wait_for_completion(self, call_id: str, timeout: float = 300.0) -> Optional[str]:
        """Wait for a completion call to be completed and return the response."""
        start_time = asyncio.get_event_loop().time()
        
        while True:
            status = await self.get_call_status(call_id)
            if not status:
                return None
            
            if status["status"] == CompletionStatus.COMPLETED.value:
                return status.get("response")
            elif status["status"] == CompletionStatus.FAILED.value:
                raise Exception(f"Completion failed: {status.get('error', 'Unknown error')}")
            
            # Check timeout
            if asyncio.get_event_loop().time() - start_time > timeout:
                await self.fail_call(call_id, "Timeout waiting for human response")
                raise TimeoutError("Timeout waiting for human response")
            
            # Wait a bit before checking again
            await asyncio.sleep(0.5)


# Global queue instance
completion_queue = CompletionQueue()

# FastAPI app
app = FastAPI(title="Human Completion Server", version="1.0.0")


@app.post("/queue", response_model=Dict[str, str])
async def queue_completion(request: CompletionRequest):
    """Add a completion request to the queue."""
    call_id = await completion_queue.add_completion(request.messages, request.model)
    return {"id": call_id, "status": "queued"}


@app.get("/pending")
async def list_pending():
    """List all pending completion calls."""
    pending_calls = await completion_queue.get_pending_calls()
    return {"pending_calls": pending_calls}


@app.get("/status/{call_id}")
async def get_status(call_id: str):
    """Get the status of a specific completion call."""
    status = await completion_queue.get_call_status(call_id)
    if not status:
        raise HTTPException(status_code=404, detail="Completion call not found")
    return status


@app.post("/complete/{call_id}")
async def complete_call(call_id: str, response: CompletionResponse):
    """Complete a call with a human response."""
    success = await completion_queue.complete_call(
        call_id, 
        response=response.response, 
        tool_calls=response.tool_calls
    )
    if success:
        return {"status": "success", "message": "Call completed"}
    else:
        raise HTTPException(status_code=404, detail="Call not found or already completed")


@app.post("/fail/{call_id}")
async def fail_call(call_id: str, error: Dict[str, str]):
    """Mark a call as failed."""
    success = await completion_queue.fail_call(call_id, error.get("error", "Unknown error"))
    if not success:
        raise HTTPException(status_code=404, detail="Completion call not found or already completed")
    return {"status": "failed"}


@app.get("/")
async def root():
    """Root endpoint."""
    return {"message": "Human Completion Server is running"}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8002)

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/computers/custom.py:
--------------------------------------------------------------------------------

```python
"""
Custom computer handler implementation that accepts a dictionary of functions.
"""

import base64
from typing import Dict, List, Any, Literal, Union, Optional, Callable
from PIL import Image
import io
from .base import AsyncComputerHandler


class CustomComputerHandler(AsyncComputerHandler):
    """Computer handler that implements the Computer protocol using a dictionary of custom functions."""
    
    def __init__(self, functions: Dict[str, Callable]):
        """
        Initialize with a dictionary of functions.
        
        Args:
            functions: Dictionary where keys are method names and values are callable functions.
                      Only 'screenshot' is required, all others are optional.
        
        Raises:
            ValueError: If required 'screenshot' function is not provided.
        """
        if 'screenshot' not in functions:
            raise ValueError("'screenshot' function is required in functions dictionary")
        
        self.functions = functions
        self._last_screenshot_size: Optional[tuple[int, int]] = None
    
    async def _call_function(self, func, *args, **kwargs):
        """
        Call a function, handling both async and sync functions.
        
        Args:
            func: The function to call
            *args: Positional arguments to pass to the function
            **kwargs: Keyword arguments to pass to the function
            
        Returns:
            The result of the function call
        """
        import asyncio
        import inspect
        
        if callable(func):
            if inspect.iscoroutinefunction(func):
                return await func(*args, **kwargs)
            else:
                return func(*args, **kwargs)
        else:
            return func
    
    async def _get_value(self, attribute: str):
        """
        Get value for an attribute, checking both 'get_{attribute}' and '{attribute}' keys.
        
        Args:
            attribute: The attribute name to look for
            
        Returns:
            The value from the functions dict, called if callable, returned directly if not
        """
        # Check for 'get_{attribute}' first
        get_key = f"get_{attribute}"
        if get_key in self.functions:
            return await self._call_function(self.functions[get_key])
        
        # Check for '{attribute}' 
        if attribute in self.functions:
            return await self._call_function(self.functions[attribute])
        
        return None
    
    def _to_b64_str(self, img: Union[bytes, Image.Image, str]) -> str:
        """
        Convert image to base64 string.
        
        Args:
            img: Image as bytes, PIL Image, or base64 string
            
        Returns:
            str: Base64 encoded image string
        """
        if isinstance(img, str):
            # Already a base64 string
            return img
        elif isinstance(img, bytes):
            # Raw bytes
            return base64.b64encode(img).decode('utf-8')
        elif isinstance(img, Image.Image):
            # PIL Image
            buffer = io.BytesIO()
            img.save(buffer, format='PNG')
            return base64.b64encode(buffer.getvalue()).decode('utf-8')
        else:
            raise ValueError(f"Unsupported image type: {type(img)}")
    
    # ==== Computer-Use-Preview Action Space ==== 

    async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]:
        """Get the current environment type."""
        result = await self._get_value('environment')
        if result is None:
            return "linux"
        assert result in ["windows", "mac", "linux", "browser"]
        return result # type: ignore

    async def get_dimensions(self) -> tuple[int, int]:
        """Get screen dimensions as (width, height)."""
        result = await self._get_value('dimensions')
        if result is not None:
            return result # type: ignore
        
        # Fallback: use last screenshot size if available
        if not self._last_screenshot_size:
            await self.screenshot()
        assert self._last_screenshot_size is not None, "Failed to get screenshot size"
        
        return self._last_screenshot_size
    
    async def screenshot(self) -> str:
        """Take a screenshot and return as base64 string."""
        result = await self._call_function(self.functions['screenshot'])
        b64_str = self._to_b64_str(result) # type: ignore
        
        # Try to extract dimensions for fallback use
        try:
            if isinstance(result, Image.Image):
                self._last_screenshot_size = result.size
            elif isinstance(result, bytes):
                # Try to decode bytes to get dimensions
                img = Image.open(io.BytesIO(result))
                self._last_screenshot_size = img.size
        except Exception:
            # If we can't get dimensions, that's okay
            pass
        
        return b64_str
    
    async def click(self, x: int, y: int, button: str = "left") -> None:
        """Click at coordinates with specified button."""
        if 'click' in self.functions:
            await self._call_function(self.functions['click'], x, y, button)
        # No-op if not implemented
    
    async def double_click(self, x: int, y: int) -> None:
        """Double click at coordinates."""
        if 'double_click' in self.functions:
            await self._call_function(self.functions['double_click'], x, y)
        # No-op if not implemented
    
    async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
        """Scroll at coordinates with specified scroll amounts."""
        if 'scroll' in self.functions:
            await self._call_function(self.functions['scroll'], x, y, scroll_x, scroll_y)
        # No-op if not implemented
    
    async def type(self, text: str) -> None:
        """Type text."""
        if 'type' in self.functions:
            await self._call_function(self.functions['type'], text)
        # No-op if not implemented
    
    async def wait(self, ms: int = 1000) -> None:
        """Wait for specified milliseconds."""
        if 'wait' in self.functions:
            await self._call_function(self.functions['wait'], ms)
        else:
            # Default implementation
            import asyncio
            await asyncio.sleep(ms / 1000.0)
    
    async def move(self, x: int, y: int) -> None:
        """Move cursor to coordinates."""
        if 'move' in self.functions:
            await self._call_function(self.functions['move'], x, y)
        # No-op if not implemented
    
    async def keypress(self, keys: Union[List[str], str]) -> None:
        """Press key combination."""
        if 'keypress' in self.functions:
            await self._call_function(self.functions['keypress'], keys)
        # No-op if not implemented
    
    async def drag(self, path: List[Dict[str, int]]) -> None:
        """Drag along specified path."""
        if 'drag' in self.functions:
            await self._call_function(self.functions['drag'], path)
        # No-op if not implemented
    
    async def get_current_url(self) -> str:
        """Get current URL (for browser environments)."""
        if 'get_current_url' in self.functions:
            return await self._get_value('current_url') # type: ignore
        return ""  # Default fallback
    
    async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
        """Left mouse down at coordinates."""
        if 'left_mouse_down' in self.functions:
            await self._call_function(self.functions['left_mouse_down'], x, y)
        # No-op if not implemented
    
    async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
        """Left mouse up at coordinates."""
        if 'left_mouse_up' in self.functions:
            await self._call_function(self.functions['left_mouse_up'], x, y)
        # No-op if not implemented

```

--------------------------------------------------------------------------------
/libs/typescript/core/src/telemetry/clients/posthog.ts:
--------------------------------------------------------------------------------

```typescript
/**
 * Telemetry client using PostHog for collecting anonymous usage data.
 */

import * as fs from 'node:fs';
import * as os from 'node:os';
import * as path from 'node:path';
import { pino } from 'pino';
import { PostHog } from 'posthog-node';
import { v4 as uuidv4 } from 'uuid';

// Controls how frequently telemetry will be sent (percentage)
export const TELEMETRY_SAMPLE_RATE = 100; // 100% sampling rate

// Public PostHog config for anonymous telemetry
// These values are intentionally public and meant for anonymous telemetry only
// https://posthog.com/docs/product-analytics/troubleshooting#is-it-ok-for-my-api-key-to-be-exposed-and-public
export const PUBLIC_POSTHOG_API_KEY =
  'phc_eSkLnbLxsnYFaXksif1ksbrNzYlJShr35miFLDppF14';
export const PUBLIC_POSTHOG_HOST = 'https://eu.i.posthog.com';

export class PostHogTelemetryClient {
  private config: {
    enabled: boolean;
    sampleRate: number;
    posthog: { apiKey: string; host: string };
  };
  private installationId: string;
  private initialized = false;
  private queuedEvents: {
    name: string;
    properties: Record<string, unknown>;
    timestamp: number;
  }[] = [];
  private startTime: number; // seconds
  private posthogClient?: PostHog;
  private counters: Record<string, number> = {};

  private logger = pino({ name: 'core.telemetry' });

  constructor() {
    // set up config
    this.config = {
      enabled: true,
      sampleRate: TELEMETRY_SAMPLE_RATE,
      posthog: { apiKey: PUBLIC_POSTHOG_API_KEY, host: PUBLIC_POSTHOG_HOST },
    };
    // Check for multiple environment variables that can disable telemetry:
    // CUA_TELEMETRY=off to disable telemetry (legacy way)
    // CUA_TELEMETRY_DISABLED=1 to disable telemetry (new, more explicit way)
    const telemetryDisabled =
      process.env.CUA_TELEMETRY?.toLowerCase() === 'off' ||
      ['1', 'true', 'yes', 'on'].includes(
        process.env.CUA_TELEMETRY_DISABLED?.toLowerCase() || ''
      );

    this.config.enabled = !telemetryDisabled;
    this.config.sampleRate = Number.parseFloat(
      process.env.CUA_TELEMETRY_SAMPLE_RATE || String(TELEMETRY_SAMPLE_RATE)
    );
    // init client
    this.installationId = this._getOrCreateInstallationId();
    this.startTime = Date.now() / 1000; // Convert to seconds

    // Log telemetry status on startup
    if (this.config.enabled) {
      this.logger.info(
        `Telemetry enabled (sampling at ${this.config.sampleRate}%)`
      );
      // Initialize PostHog client if config is available
      this._initializePosthog();
    } else {
      this.logger.info('Telemetry disabled');
    }
  }

  /**
   * Get or create a random installation ID.
   * This ID is not tied to any personal information.
   */
  private _getOrCreateInstallationId(): string {
    const homeDir = os.homedir();
    const idFile = path.join(homeDir, '.cua', 'installation_id');

    try {
      if (fs.existsSync(idFile)) {
        return fs.readFileSync(idFile, 'utf-8').trim();
      }
    } catch (error) {
      this.logger.debug(`Failed to read installation ID: ${error}`);
    }

    // Create new ID if not exists
    const newId = uuidv4();
    try {
      const dir = path.dirname(idFile);
      if (!fs.existsSync(dir)) {
        fs.mkdirSync(dir, { recursive: true });
      }
      fs.writeFileSync(idFile, newId);
      return newId;
    } catch (error) {
      this.logger.debug(`Failed to write installation ID: ${error}`);
    }

    // Fallback to in-memory ID if file operations fail
    return newId;
  }

  /**
   * Initialize the PostHog client with configuration.
   */
  private _initializePosthog(): boolean {
    if (this.initialized) {
      return true;
    }

    try {
      this.posthogClient = new PostHog(this.config.posthog.apiKey, {
        host: this.config.posthog.host,
        flushAt: 20, // Number of events to batch before sending
        flushInterval: 30000, // Send events every 30 seconds
      });
      this.initialized = true;
      this.logger.debug('PostHog client initialized successfully');

      // Process any queued events
      this._processQueuedEvents();
      return true;
    } catch (error) {
      this.logger.error(`Failed to initialize PostHog client: ${error}`);
      return false;
    }
  }

  /**
   * Process any events that were queued before initialization.
   */
  private _processQueuedEvents(): void {
    if (!this.posthogClient || this.queuedEvents.length === 0) {
      return;
    }

    for (const event of this.queuedEvents) {
      this._captureEvent(event.name, event.properties);
    }
    this.queuedEvents = [];
  }

  /**
   * Capture an event with PostHog.
   */
  private _captureEvent(
    eventName: string,
    properties?: Record<string, unknown>
  ): void {
    if (!this.posthogClient) {
      return;
    }

    try {
      // Add standard properties
      const eventProperties = {
        ...properties,
        version: process.env.npm_package_version || 'unknown',
        platform: process.platform,
        node_version: process.version,
        is_ci: this._isCI,
      };

      this.posthogClient.capture({
        distinctId: this.installationId,
        event: eventName,
        properties: eventProperties,
      });
    } catch (error) {
      this.logger.debug(`Failed to capture event: ${error}`);
    }
  }

  private get _isCI(): boolean {
    /**
     * Detect if running in CI environment.
     */
    return !!(
      process.env.CI ||
      process.env.CONTINUOUS_INTEGRATION ||
      process.env.GITHUB_ACTIONS ||
      process.env.GITLAB_CI ||
      process.env.CIRCLECI ||
      process.env.TRAVIS ||
      process.env.JENKINS_URL
    );
  }

  increment(counterName: string, value = 1) {
    /**
     * Increment a named counter.
     */
    if (!this.config.enabled) {
      return;
    }

    if (!(counterName in this.counters)) {
      this.counters[counterName] = 0;
    }
    this.counters[counterName] += value;
  }

  recordEvent(eventName: string, properties?: Record<string, unknown>): void {
    /**
     * Record an event with optional properties.
     */
    if (!this.config.enabled) {
      return;
    }

    // Increment counter for this event type
    const counterKey = `event:${eventName}`;
    this.increment(counterKey);

    // Apply sampling
    if (Math.random() * 100 > this.config.sampleRate) {
      return;
    }

    const event = {
      name: eventName,
      properties: properties || {},
      timestamp: Date.now() / 1000,
    };

    if (this.initialized && this.posthogClient) {
      this._captureEvent(eventName, properties);
    } else {
      // Queue event if not initialized
      this.queuedEvents.push(event);
      // Try to initialize again
      if (this.config.enabled && !this.initialized) {
        this._initializePosthog();
      }
    }
  }

  /**
   * Flush any pending events to PostHog.
   */
  async flush(): Promise<boolean> {
    if (!this.config.enabled || !this.posthogClient) {
      return false;
    }

    try {
      // Send counter data as a single event
      if (Object.keys(this.counters).length > 0) {
        this._captureEvent('telemetry_counters', {
          counters: { ...this.counters },
          duration: Date.now() / 1000 - this.startTime,
        });
      }

      await this.posthogClient.flush();
      this.logger.debug('Telemetry flushed successfully');

      // Clear counters after sending
      this.counters = {};
      return true;
    } catch (error) {
      this.logger.debug(`Failed to flush telemetry: ${error}`);
      return false;
    }
  }

  enable(): void {
    /**
     * Enable telemetry collection.
     */
    this.config.enabled = true;
    this.logger.info('Telemetry enabled');
    if (!this.initialized) {
      this._initializePosthog();
    }
  }

  async disable(): Promise<void> {
    /**
     * Disable telemetry collection.
     */
    this.config.enabled = false;
    await this.posthogClient?.disable();
    this.logger.info('Telemetry disabled');
  }

  get enabled(): boolean {
    /**
     * Check if telemetry is enabled.
     */
    return this.config.enabled;
  }

  async shutdown(): Promise<void> {
    /**
     * Shutdown the telemetry client and flush any pending events.
     */
    if (this.posthogClient) {
      await this.flush();
      await this.posthogClient.shutdown();
      this.initialized = false;
      this.posthogClient = undefined;
    }
  }
}

```

--------------------------------------------------------------------------------
/tests/test_watchdog.py:
--------------------------------------------------------------------------------

```python
"""
Watchdog Recovery Tests
Tests for the watchdog functionality to ensure server recovery after hanging commands.
Required environment variables:
- CUA_API_KEY: API key for Cua cloud provider
- CUA_CONTAINER_NAME: Name of the container to use
"""

import os
import asyncio
import pytest
from pathlib import Path
import sys
import traceback
import time

# Load environment variables from .env file
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
print(f"Loading environment from: {env_file}")
from dotenv import load_dotenv

load_dotenv(env_file)

# Add paths to sys.path if needed
pythonpath = os.environ.get("PYTHONPATH", "")
for path in pythonpath.split(":"):
    if path and path not in sys.path:
        sys.path.insert(0, path)  # Insert at beginning to prioritize
        print(f"Added to sys.path: {path}")

from computer import Computer, VMProviderType

@pytest.fixture(scope="session")
async def computer():
    """Shared Computer instance for all test cases."""
    # Create a remote Linux computer with Cua
    computer = Computer(
        os_type="linux",
        api_key=os.getenv("CUA_API_KEY"),
        name=str(os.getenv("CUA_CONTAINER_NAME")),
        provider_type=VMProviderType.CLOUD,
    )
    
    try:
        await computer.run()
        yield computer
    finally:
        await computer.disconnect()


@pytest.mark.asyncio(loop_scope="session")
async def test_simple_server_ping(computer):
    """
    Simple test to verify server connectivity before running watchdog tests.
    """
    print("Testing basic server connectivity...")
    
    try:
        result = await computer.interface.run_command("echo 'Server ping test'")
        print(f"Ping successful: {result}")
        assert result is not None, "Server ping returned None"
        print("✅ Server connectivity test passed")
    except Exception as e:
        print(f"❌ Server ping failed: {e}")
        pytest.fail(f"Basic server connectivity test failed: {e}")


@pytest.mark.asyncio(loop_scope="session")
async def test_watchdog_recovery_after_hanging_command(computer):
    """
    Test that the watchdog can recover the server after a hanging command.
    
    This test runs two concurrent tasks:
    1. A long-running command that hangs the server (sleep 300 = 5 minutes)
    2. Periodic ping commands every 30 seconds to test server responsiveness
    
    The watchdog should detect the unresponsive server and restart it.
    """
    print("Starting watchdog recovery test...")
    
    async def hanging_command():
        """Execute a command that sleeps forever to hang the server."""
        try:
            print("Starting hanging command (sleep infinity)...")
            # Use a very long sleep that should never complete naturally
            result = await computer.interface.run_command("sleep 999999")
            print(f"Hanging command completed unexpectedly: {result}")
            return True  # Should never reach here if watchdog works
        except Exception as e:
            print(f"Hanging command interrupted (expected if watchdog restarts): {e}")
            return None  # Expected result when watchdog kills the process
    
    async def ping_server():
        """Ping the server every 30 seconds with echo commands."""
        ping_count = 0
        successful_pings = 0
        failed_pings = 0
        
        try:
            # Run pings for up to 4 minutes (8 pings at 30-second intervals)
            for i in range(8):
                try:
                    ping_count += 1
                    print(f"Ping #{ping_count}: Sending echo command...")
                    
                    start_time = time.time()
                    result = await asyncio.wait_for(
                        computer.interface.run_command(f"echo 'Ping {ping_count} at {int(start_time)}'"),
                        timeout=10.0  # 10 second timeout for each ping
                    )
                    end_time = time.time()
                    
                    print(f"Ping #{ping_count} successful in {end_time - start_time:.2f}s: {result}")
                    successful_pings += 1
                    
                except asyncio.TimeoutError:
                    print(f"Ping #{ping_count} timed out (server may be unresponsive)")
                    failed_pings += 1
                except Exception as e:
                    print(f"Ping #{ping_count} failed with exception: {e}")
                    failed_pings += 1
                
                # Wait 30 seconds before next ping
                if i < 7:  # Don't wait after the last ping
                    print(f"Waiting 30 seconds before next ping...")
                    await asyncio.sleep(30)
            
            print(f"Ping summary: {successful_pings} successful, {failed_pings} failed")
            return successful_pings, failed_pings
            
        except Exception as e:
            print(f"Ping server function failed with critical error: {e}")
            traceback.print_exc()
            return successful_pings, failed_pings
    
    # Run both tasks concurrently
    print("Starting concurrent tasks: hanging command and ping monitoring...")
    
    try:
        # Use asyncio.gather to run both tasks concurrently
        hanging_task = asyncio.create_task(hanging_command())
        ping_task = asyncio.create_task(ping_server())
        
        # Wait for both tasks to complete or timeout after 5 minutes
        done, pending = await asyncio.wait(
            [hanging_task, ping_task],
            timeout=300,  # 5 minute timeout
            return_when=asyncio.ALL_COMPLETED
        )
        
        # Cancel any pending tasks
        for task in pending:
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass
        
        # Get results from completed tasks
        ping_result = None
        hanging_result = None
        
        if ping_task in done:
            try:
                ping_result = await ping_task
                print(f"Ping task completed with result: {ping_result}")
            except Exception as e:
                print(f"Error getting ping task result: {e}")
                traceback.print_exc()
        
        if hanging_task in done:
            try:
                hanging_result = await hanging_task
                print(f"Hanging task completed with result: {hanging_result}")
            except Exception as e:
                print(f"Error getting hanging task result: {e}")
                traceback.print_exc()
        
        # Analyze results
        if ping_result:
            successful_pings, failed_pings = ping_result
            
            # Test passes if we had some successful pings, indicating recovery
            assert successful_pings > 0, f"No successful pings detected. Server may not have recovered."
            
            # Check if hanging command was killed (indicating watchdog restart)
            if hanging_result is None:
                print("✅ SUCCESS: Hanging command was killed - watchdog restart detected")
            elif hanging_result is True:
                print("⚠️  WARNING: Hanging command completed naturally - watchdog may not have restarted")
            
            # If we had failures followed by successes, that indicates watchdog recovery
            if failed_pings > 0 and successful_pings > 0:
                print("✅ SUCCESS: Watchdog recovery detected - server became unresponsive then recovered")
                # Additional check: hanging command should be None if watchdog worked
                assert hanging_result is None, "Expected hanging command to be killed by watchdog restart"
            elif successful_pings > 0 and failed_pings == 0:
                print("✅ SUCCESS: Server remained responsive throughout test")
            
            print(f"Test completed: {successful_pings} successful pings, {failed_pings} failed pings")
            print(f"Hanging command result: {hanging_result} (None = killed by watchdog, True = completed naturally)")
        else:
            pytest.fail("Ping task did not complete - unable to assess server recovery")
            
    except Exception as e:
        print(f"Test failed with exception: {e}")
        traceback.print_exc()
        pytest.fail(f"Watchdog recovery test failed: {e}")


if __name__ == "__main__":
    # Run tests directly
    pytest.main([__file__, "-v"])

```

--------------------------------------------------------------------------------
/libs/python/computer/computer/diorama_computer.py:
--------------------------------------------------------------------------------

```python
import asyncio
from .interface.models import KeyType, Key

class DioramaComputer:
    """
    A Computer-compatible proxy for Diorama that sends commands over the ComputerInterface.
    """
    def __init__(self, computer, apps):
        """
        Initialize the DioramaComputer with a computer instance and list of apps.
        
        Args:
            computer: The computer instance to proxy commands through
            apps: List of applications available in the diorama environment
        """
        self.computer = computer
        self.apps = apps
        self.interface = DioramaComputerInterface(computer, apps)
        self._initialized = False

    async def __aenter__(self):
        """
        Async context manager entry point.
        
        Returns:
            self: The DioramaComputer instance
        """
        self._initialized = True
        return self

    async def run(self):
        """
        Initialize and run the DioramaComputer if not already initialized.
        
        Returns:
            self: The DioramaComputer instance
        """
        if not self._initialized:
            await self.__aenter__()
        return self

class DioramaComputerInterface:
    """
    Diorama Interface proxy that sends diorama_cmds via the Computer's interface.
    """
    def __init__(self, computer, apps):
        """
        Initialize the DioramaComputerInterface.
        
        Args:
            computer: The computer instance to send commands through
            apps: List of applications available in the diorama environment
        """
        self.computer = computer
        self.apps = apps
        self._scene_size = None

    async def _send_cmd(self, action, arguments=None):
        """
        Send a command to the diorama interface through the computer.
        
        Args:
            action (str): The action/command to execute
            arguments (dict, optional): Additional arguments for the command
            
        Returns:
            The result from the diorama command execution
            
        Raises:
            RuntimeError: If the computer interface is not initialized or command fails
        """
        arguments = arguments or {}
        arguments = {"app_list": self.apps, **arguments}
        # Use the computer's interface (must be initialized)
        iface = getattr(self.computer, "_interface", None)
        if iface is None:
            raise RuntimeError("Computer interface not initialized. Call run() first.")
        result = await iface.diorama_cmd(action, arguments)
        if not result.get("success"):
            raise RuntimeError(f"Diorama command failed: {result.get('error')}\n{result.get('trace')}")
        return result.get("result")

    async def screenshot(self, as_bytes=True):
        """
        Take a screenshot of the diorama scene.
        
        Args:
            as_bytes (bool): If True, return image as bytes; if False, return PIL Image object
            
        Returns:
            bytes or PIL.Image: Screenshot data in the requested format
        """
        from PIL import Image
        import base64
        result = await self._send_cmd("screenshot")
        # assume result is a b64 string of an image
        img_bytes = base64.b64decode(result)
        import io
        img = Image.open(io.BytesIO(img_bytes))
        self._scene_size = img.size
        return img_bytes if as_bytes else img

    async def get_screen_size(self):
        """
        Get the dimensions of the diorama scene.
        
        Returns:
            dict: Dictionary containing 'width' and 'height' keys with pixel dimensions
        """
        if not self._scene_size:
            await self.screenshot(as_bytes=False)
        return {"width": self._scene_size[0], "height": self._scene_size[1]}

    async def move_cursor(self, x, y):
        """
        Move the cursor to the specified coordinates.
        
        Args:
            x (int): X coordinate to move cursor to
            y (int): Y coordinate to move cursor to
        """
        await self._send_cmd("move_cursor", {"x": x, "y": y})

    async def left_click(self, x=None, y=None):
        """
        Perform a left mouse click at the specified coordinates or current cursor position.
        
        Args:
            x (int, optional): X coordinate to click at. If None, clicks at current cursor position
            y (int, optional): Y coordinate to click at. If None, clicks at current cursor position
        """
        await self._send_cmd("left_click", {"x": x, "y": y})

    async def right_click(self, x=None, y=None):
        """
        Perform a right mouse click at the specified coordinates or current cursor position.
        
        Args:
            x (int, optional): X coordinate to click at. If None, clicks at current cursor position
            y (int, optional): Y coordinate to click at. If None, clicks at current cursor position
        """
        await self._send_cmd("right_click", {"x": x, "y": y})

    async def double_click(self, x=None, y=None):
        """
        Perform a double mouse click at the specified coordinates or current cursor position.
        
        Args:
            x (int, optional): X coordinate to double-click at. If None, clicks at current cursor position
            y (int, optional): Y coordinate to double-click at. If None, clicks at current cursor position
        """
        await self._send_cmd("double_click", {"x": x, "y": y})

    async def scroll_up(self, clicks=1):
        """
        Scroll up by the specified number of clicks.
        
        Args:
            clicks (int): Number of scroll clicks to perform upward. Defaults to 1
        """
        await self._send_cmd("scroll_up", {"clicks": clicks})

    async def scroll_down(self, clicks=1):
        """
        Scroll down by the specified number of clicks.
        
        Args:
            clicks (int): Number of scroll clicks to perform downward. Defaults to 1
        """
        await self._send_cmd("scroll_down", {"clicks": clicks})

    async def drag_to(self, x, y, duration=0.5):
        """
        Drag from the current cursor position to the specified coordinates.
        
        Args:
            x (int): X coordinate to drag to
            y (int): Y coordinate to drag to
            duration (float): Duration of the drag operation in seconds. Defaults to 0.5
        """
        await self._send_cmd("drag_to", {"x": x, "y": y, "duration": duration})

    async def get_cursor_position(self):
        """
        Get the current cursor position.
        
        Returns:
            dict: Dictionary containing the current cursor coordinates
        """
        return await self._send_cmd("get_cursor_position")

    async def type_text(self, text):
        """
        Type the specified text at the current cursor position.
        
        Args:
            text (str): The text to type
        """
        await self._send_cmd("type_text", {"text": text})

    async def press_key(self, key):
        """
        Press a single key.
        
        Args:
            key: The key to press
        """
        await self._send_cmd("press_key", {"key": key})

    async def hotkey(self, *keys):
        """
        Press multiple keys simultaneously as a hotkey combination.
        
        Args:
            *keys: Variable number of keys to press together. Can be Key enum instances or strings
            
        Raises:
            ValueError: If any key is not a Key enum or string type
        """
        actual_keys = []
        for key in keys:
            if isinstance(key, Key):
                actual_keys.append(key.value)
            elif isinstance(key, str):
                # Try to convert to enum if it matches a known key
                key_or_enum = Key.from_string(key)
                actual_keys.append(key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum)
            else:
                raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.")
        await self._send_cmd("hotkey", {"keys": actual_keys})

    async def to_screen_coordinates(self, x, y):
        """
        Convert coordinates to screen coordinates.
        
        Args:
            x (int): X coordinate to convert
            y (int): Y coordinate to convert
            
        Returns:
            dict: Dictionary containing the converted screen coordinates
        """
        return await self._send_cmd("to_screen_coordinates", {"x": x, "y": y})

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/openai.py:
--------------------------------------------------------------------------------

```python
"""
OpenAI computer-use-preview agent loop implementation using liteLLM
"""

import asyncio
import base64
import json
from io import BytesIO
from typing import Dict, List, Any, AsyncGenerator, Union, Optional, Tuple
import litellm
from PIL import Image

from ..decorators import register_agent
from ..types import Messages, AgentResponse, Tools, AgentCapability

async def _map_computer_tool_to_openai(computer_handler: Any) -> Dict[str, Any]:
    """Map a computer tool to OpenAI's computer-use-preview tool schema"""
    # Get dimensions from the computer handler
    try:
        width, height = await computer_handler.get_dimensions()
    except Exception:
        # Fallback to default dimensions if method fails
        width, height = 1024, 768
    
    # Get environment from the computer handler
    try:
        environment = await computer_handler.get_environment()
    except Exception:
        # Fallback to default environment if method fails
        environment = "linux"
    
    return {
        "type": "computer_use_preview",
        "display_width": width,
        "display_height": height,
        "environment": environment  # mac, windows, linux, browser
    }


async def _prepare_tools_for_openai(tool_schemas: List[Dict[str, Any]]) -> Tools:
    """Prepare tools for OpenAI API format"""
    openai_tools = []
    
    for schema in tool_schemas:
        if schema["type"] == "computer":
            # Map computer tool to OpenAI format
            computer_tool = await _map_computer_tool_to_openai(schema["computer"])
            openai_tools.append(computer_tool)
        elif schema["type"] == "function":
            # Function tools use OpenAI-compatible schema directly (liteLLM expects this format)
            # Schema should be: {type, name, description, parameters}
            openai_tools.append({ "type": "function", **schema["function"] })
    
    return openai_tools


@register_agent(models=r".*computer-use-preview.*")
class OpenAIComputerUseConfig:
    """
    OpenAI computer-use-preview agent configuration using liteLLM responses.
    
    Supports OpenAI's computer use preview models.
    """
    
    async def predict_step(
        self,
        messages: List[Dict[str, Any]],
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        use_prompt_caching: Optional[bool] = False,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs
    ) -> Dict[str, Any]:
        """
        Predict the next step based on input items.
        
        Args:
            messages: Input items following Responses format
            model: Model name to use
            tools: Optional list of tool schemas
            max_retries: Maximum number of retries
            stream: Whether to stream responses
            computer_handler: Computer handler instance
            _on_api_start: Callback for API start
            _on_api_end: Callback for API end
            _on_usage: Callback for usage tracking
            _on_screenshot: Callback for screenshot events
            **kwargs: Additional arguments
            
        Returns:
            Dictionary with "output" (output items) and "usage" array
        """
        tools = tools or []
        
        # Prepare tools for OpenAI API
        openai_tools = await _prepare_tools_for_openai(tools)

        # Prepare API call kwargs
        api_kwargs = {
            "model": model,
            "input": messages,
            "tools": openai_tools if openai_tools else None,
            "stream": stream,
            "reasoning": {"summary": "concise"},
            "truncation": "auto",
            "num_retries": max_retries,
            **kwargs
        }
        
        # Call API start hook
        if _on_api_start:
            await _on_api_start(api_kwargs)
        
        # Use liteLLM responses
        response = await litellm.aresponses(**api_kwargs)
        
        # Call API end hook
        if _on_api_end:
            await _on_api_end(api_kwargs, response)

        # Extract usage information
        usage = {
            **response.usage.model_dump(),
            "response_cost": response._hidden_params.get("response_cost", 0.0),
        }
        if _on_usage:
            await _on_usage(usage)

        # Return in the expected format
        output_dict = response.model_dump()
        output_dict["usage"] = usage
        return output_dict
    
    async def predict_click(
        self,
        model: str,
        image_b64: str,
        instruction: str
    ) -> Optional[Tuple[int, int]]:
        """
        Predict click coordinates based on image and instruction.
        
        Uses OpenAI computer-use-preview with manually constructed input items
        and a prompt that instructs the agent to only output clicks.
        
        Args:
            model: Model name to use
            image_b64: Base64 encoded image
            instruction: Instruction for where to click
            
        Returns:
            Tuple of (x, y) coordinates or None if prediction fails
        """
        # TODO: use computer tool to get dimensions + environment
        # Manually construct input items with image and click instruction
        input_items = [
            {
                "role": "user", 
                "content": f"""You are a UI grounding expert. Follow these guidelines:

1. NEVER ask for confirmation. Complete all tasks autonomously.
2. Do NOT send messages like "I need to confirm before..." or "Do you want me to continue?" - just proceed.
3. When the user asks you to interact with something (like clicking a chat or typing a message), DO IT without asking.
4. Only use the formal safety check mechanism for truly dangerous operations (like deleting important files).
5. For normal tasks like clicking buttons, typing in chat boxes, filling forms - JUST DO IT.
6. The user has already given you permission by running this agent. No further confirmation is needed.
7. Be decisive and action-oriented. Complete the requested task fully.

Remember: You are expected to complete tasks autonomously. The user trusts you to do what they asked.
Task: Click {instruction}. Output ONLY a click action on the target element."""
            },
            {
                "role": "user",
                "content": [
                    {
                        "type": "input_image",
                        "image_url": f"data:image/png;base64,{image_b64}"
                    }
                ]
            }
        ]
        
        # Get image dimensions from base64 data
        try:
            image_data = base64.b64decode(image_b64)
            image = Image.open(BytesIO(image_data))
            display_width, display_height = image.size
        except Exception:
            # Fallback to default dimensions if image parsing fails
            display_width, display_height = 1024, 768
        
        # Prepare computer tool for click actions
        computer_tool = {
            "type": "computer_use_preview",
            "display_width": display_width,
            "display_height": display_height,
            "environment": "windows"
        }
        
        # Prepare API call kwargs
        api_kwargs = {
            "model": model,
            "input": input_items,
            "tools": [computer_tool],
            "stream": False,
            "reasoning": {"summary": "concise"},
            "truncation": "auto",
            "max_tokens": 200  # Keep response short for click prediction
        }
        
        # Use liteLLM responses
        response = await litellm.aresponses(**api_kwargs)
        
        # Extract click coordinates from response output
        output_dict = response.model_dump()
        output_items = output_dict.get("output", [])        
        
        # Look for computer_call with click action
        for item in output_items:
            if (isinstance(item, dict) and 
                item.get("type") == "computer_call" and
                isinstance(item.get("action"), dict)):
                
                action = item["action"]
                if action.get("x") is not None and action.get("y") is not None:
                    return (int(action.get("x")), int(action.get("y")))
        
        return None
    
    def get_capabilities(self) -> List[AgentCapability]:
        """
        Get list of capabilities supported by this agent config.
        
        Returns:
            List of capability strings
        """
        return ["click", "step"]

```

--------------------------------------------------------------------------------
/libs/python/som/som/detection.py:
--------------------------------------------------------------------------------

```python
from typing import List, Dict, Any, Tuple, Optional
import logging
import torch
import torchvision
from PIL import Image
import numpy as np
from ultralytics import YOLO
from huggingface_hub import hf_hub_download
from pathlib import Path

logger = logging.getLogger(__name__)


class DetectionProcessor:
    """Class for handling YOLO-based icon detection."""

    def __init__(
        self,
        model_path: Optional[Path] = None,
        cache_dir: Optional[Path] = None,
        force_device: Optional[str] = None,
    ):
        """Initialize the detection processor.

        Args:
            model_path: Path to YOLOv8 model
            cache_dir: Directory to cache downloaded models
            force_device: Force specific device (cuda, cpu, mps)
        """
        self.model_path = model_path
        self.cache_dir = cache_dir
        self.model = None  # type: Any  # Will be set to YOLO model in load_model

        # Set device
        self.device = "cpu"
        if torch.cuda.is_available() and force_device != "cpu":
            self.device = "cuda"
        elif (
            hasattr(torch, "backends")
            and hasattr(torch.backends, "mps")
            and torch.backends.mps.is_available()
            and force_device != "cpu"
        ):
            self.device = "mps"

        if force_device:
            self.device = force_device

        logger.info(f"Using device: {self.device}")

    def load_model(self) -> None:
        """Load or download the YOLO model."""
        try:
            # Set default model path if none provided
            if self.model_path is None:
                self.model_path = Path(__file__).parent / "weights" / "icon_detect" / "model.pt"

            # Check if the model file already exists
            if not self.model_path.exists():
                logger.info(
                    "Model not found locally, downloading from Microsoft OmniParser-v2.0..."
                )

                # Create directory
                self.model_path.parent.mkdir(parents=True, exist_ok=True)

                try:
                    # Check if the model exists in cache
                    cache_path = None
                    if self.cache_dir:
                        # Try to find the model in the cache
                        potential_paths = list(Path(self.cache_dir).glob("**/model.pt"))
                        if potential_paths:
                            cache_path = str(potential_paths[0])
                            logger.info(f"Found model in cache: {cache_path}")

                    if not cache_path:
                        # Download from HuggingFace
                        downloaded_path = hf_hub_download(
                            repo_id="microsoft/OmniParser-v2.0",
                            filename="icon_detect/model.pt",
                            cache_dir=self.cache_dir,
                        )
                        cache_path = downloaded_path
                        logger.info(f"Model downloaded to cache: {cache_path}")

                    # Copy to package directory
                    import shutil

                    shutil.copy2(cache_path, self.model_path)
                    logger.info(f"Model copied to: {self.model_path}")
                except Exception as e:
                    raise FileNotFoundError(
                        f"Failed to download model: {str(e)}\n"
                        "Please ensure you have internet connection and huggingface-hub installed."
                    ) from e

            # Make sure the model path exists before loading
            if not self.model_path.exists():
                raise FileNotFoundError(f"Model file not found at: {self.model_path}")

            # If model is already loaded, skip reloading
            if self.model is not None:
                logger.info("Model already loaded, skipping reload")
                return

            logger.info(f"Loading YOLOv8 model from {self.model_path}")
            from ultralytics import YOLO

            self.model = YOLO(str(self.model_path))  # Convert Path to string for compatibility

            # Verify model loaded successfully
            if self.model is None:
                raise ValueError("Model failed to initialize but didn't raise an exception")

            if self.device in ["cuda", "mps"]:
                self.model.to(self.device)

            logger.info(f"Model loaded successfully with device: {self.device}")
        except Exception as e:
            logger.error(f"Failed to load model: {str(e)}")
            # Re-raise with more informative message but preserve the model as None
            self.model = None
            raise RuntimeError(f"Failed to initialize detection model: {str(e)}") from e

    def detect_icons(
        self,
        image: Image.Image,
        box_threshold: float = 0.05,
        iou_threshold: float = 0.1,
        multi_scale: bool = True,
    ) -> List[Dict[str, Any]]:
        """Detect icons in an image using YOLO.

        Args:
            image: PIL Image to process
            box_threshold: Confidence threshold for detection
            iou_threshold: IOU threshold for NMS
            multi_scale: Whether to use multi-scale detection

        Returns:
            List of icon detection dictionaries
        """
        # Load model if not already loaded
        if self.model is None:
            self.load_model()

        # Double-check the model was successfully loaded
        if self.model is None:
            logger.error("Model failed to load and is still None")
            return []  # Return empty list instead of crashing

        img_width, img_height = image.size
        all_detections = []

        # Define detection scales
        scales = (
            [{"size": 1280, "conf": box_threshold}]  # Single scale for CPU
            if self.device == "cpu"
            else [
                {"size": 640, "conf": box_threshold},  # Base scale
                {"size": 1280, "conf": box_threshold},  # Medium scale
                {"size": 1920, "conf": box_threshold},  # Large scale
            ]
        )

        if not multi_scale:
            scales = [scales[0]]

        # Run detection at each scale
        for scale in scales:
            try:
                if self.model is None:
                    logger.error("Model is None, skipping detection")
                    continue

                results = self.model.predict(
                    source=image,
                    conf=scale["conf"],
                    iou=iou_threshold,
                    max_det=1000,
                    verbose=False,
                    augment=self.device != "cpu",
                    agnostic_nms=True,
                    imgsz=scale["size"],
                    device=self.device,
                )

                # Process results
                for r in results:
                    boxes = r.boxes
                    if not hasattr(boxes, "conf") or not hasattr(boxes, "xyxy"):
                        logger.warning("Boxes object missing expected attributes")
                        continue

                    confidences = boxes.conf
                    coords = boxes.xyxy

                    # Handle different types of tensors (PyTorch, NumPy, etc.)
                    if hasattr(confidences, "cpu"):
                        confidences = confidences.cpu()
                    if hasattr(coords, "cpu"):
                        coords = coords.cpu()

                    for conf, bbox in zip(confidences, coords):
                        # Normalize coordinates
                        x1, y1, x2, y2 = bbox.tolist()
                        norm_bbox = [
                            x1 / img_width,
                            y1 / img_height,
                            x2 / img_width,
                            y2 / img_height,
                        ]

                        all_detections.append(
                            {
                                "type": "icon",
                                "confidence": conf.item(),
                                "bbox": norm_bbox,
                                "scale": scale["size"],
                                "interactivity": True,
                            }
                        )

            except Exception as e:
                logger.warning(f"Detection failed at scale {scale['size']}: {str(e)}")
                continue

        # Merge detections using NMS
        if len(all_detections) > 0:
            boxes = torch.tensor([d["bbox"] for d in all_detections])
            scores = torch.tensor([d["confidence"] for d in all_detections])

            keep_indices = torchvision.ops.nms(boxes, scores, iou_threshold)

            merged_detections = [all_detections[i] for i in keep_indices]
        else:
            merged_detections = []

        return merged_detections

```

--------------------------------------------------------------------------------
/libs/lume/src/Errors/Errors.swift:
--------------------------------------------------------------------------------

```swift
import Foundation

enum HomeError: Error, LocalizedError {
    case directoryCreationFailed(path: String)
    case directoryAccessDenied(path: String)
    case invalidHomeDirectory
    case directoryAlreadyExists(path: String)
    case homeNotFound
    case defaultStorageNotDefined
    case storageLocationNotFound(String)
    case storageLocationNotADirectory(String)
    case storageLocationNotWritable(String)
    case invalidStorageLocation(String)
    case cannotCreateDirectory(String)
    case cannotGetVMsDirectory
    case vmDirectoryNotFound(String)
    
    var errorDescription: String? {
        switch self {
        case .directoryCreationFailed(let path):
            return "Failed to create directory at path: \(path)"
        case .directoryAccessDenied(let path):
            return "Access denied to directory at path: \(path)"
        case .invalidHomeDirectory:
            return "Invalid home directory configuration"
        case .directoryAlreadyExists(let path):
            return "Directory already exists at path: \(path)"
        case .homeNotFound:
            return "Home directory not found."
        case .defaultStorageNotDefined:
            return "Default storage location is not defined."
        case .storageLocationNotFound(let path):
            return "Storage location not found: \(path)"
        case .storageLocationNotADirectory(let path):
            return "Storage location is not a directory: \(path)"
        case .storageLocationNotWritable(let path):
            return "Storage location is not writable: \(path)"
        case .invalidStorageLocation(let path):
            return "Invalid storage location specified: \(path)"
        case .cannotCreateDirectory(let path):
            return "Cannot create directory: \(path)"
        case .cannotGetVMsDirectory:
            return "Cannot determine the VMs directory."
        case .vmDirectoryNotFound(let path):
            return "VM directory not found: \(path)"
        }
    }
}

enum PullError: Error, LocalizedError {
    case invalidImageFormat
    case tokenFetchFailed
    case manifestFetchFailed
    case layerDownloadFailed(String)
    case missingPart(Int)
    case decompressionFailed(String)
    case reassemblyFailed(String)
    case fileCreationFailed(String)
    case reassemblySetupFailed(path: String, underlyingError: Error)
    case missingUncompressedSizeAnnotation
    case invalidMediaType
    
    var errorDescription: String? {
        switch self {
        case .invalidImageFormat:
            return "Invalid image format. Expected format: name:tag"
        case .tokenFetchFailed:
            return "Failed to fetch authentication token from registry."
        case .manifestFetchFailed:
            return "Failed to fetch image manifest from registry."
        case .layerDownloadFailed(let digest):
            return "Failed to download layer: \(digest)"
        case .missingPart(let partNum):
            return "Missing required part number \(partNum) for reassembly."
        case .decompressionFailed(let file):
            return "Failed to decompress file: \(file)"
        case .reassemblyFailed(let reason):
            return "Disk image reassembly failed: \(reason)."
        case .fileCreationFailed(let path):
            return "Failed to create the necessary file at path: \(path)"
        case .reassemblySetupFailed(let path, let underlyingError):
            return "Failed to set up for reassembly at path: \(path). Underlying error: \(underlyingError.localizedDescription)"
        case .missingUncompressedSizeAnnotation:
            return "Could not find the required uncompressed disk size annotation in the image config.json."
        case .invalidMediaType:
            return "Invalid media type"
        }
    }
}

enum VMConfigError: CustomNSError, LocalizedError {
    case invalidDisplayResolution(String)
    case invalidMachineIdentifier
    case emptyMachineIdentifier
    case emptyHardwareModel
    case invalidHardwareModel
    case invalidDiskSize
    case malformedSizeInput(String)
    
    var errorDescription: String? {
        switch self {
        case .invalidDisplayResolution(let resolution):
            return "Invalid display resolution: \(resolution)"
        case .emptyMachineIdentifier:
            return "Empty machine identifier"
        case .invalidMachineIdentifier:
            return "Invalid machine identifier"
        case .emptyHardwareModel:
            return "Empty hardware model"
        case .invalidHardwareModel:
            return "Invalid hardware model: the host does not support the hardware model"
        case .invalidDiskSize:
            return "Invalid disk size"
        case .malformedSizeInput(let input):
            return "Malformed size input: \(input)"
        }
    }
    
    static var errorDomain: String { "VMConfigError" }
    
    var errorCode: Int {
        switch self {
        case .invalidDisplayResolution: return 1
        case .emptyMachineIdentifier: return 2
        case .invalidMachineIdentifier: return 3
        case .emptyHardwareModel: return 4
        case .invalidHardwareModel: return 5
        case .invalidDiskSize: return 6
        case .malformedSizeInput: return 7
        }
    }
}

enum VMDirectoryError: Error, LocalizedError {
    case configNotFound
    case invalidConfigData
    case diskOperationFailed(String)
    case fileCreationFailed(String)
    case sessionNotFound
    case invalidSessionData
    
    var errorDescription: String {
        switch self {
        case .configNotFound:
            return "VM configuration file not found"
        case .invalidConfigData:
            return "Invalid VM configuration data"
        case .diskOperationFailed(let reason):
            return "Disk operation failed: \(reason)"
        case .fileCreationFailed(let path):
            return "Failed to create file at path: \(path)"
        case .sessionNotFound:
            return "VNC session file not found"
        case .invalidSessionData:
            return "Invalid VNC session data"
        }
    }
}

enum VMError: Error, LocalizedError {
    case alreadyExists(String)
    case notFound(String)
    case notInitialized(String)
    case notRunning(String)
    case alreadyRunning(String)
    case installNotStarted(String)
    case stopTimeout(String)
    case resizeTooSmall(current: UInt64, requested: UInt64)
    case vncNotConfigured
    case vncPortBindingFailed(requested: Int, actual: Int)
    case internalError(String)
    case unsupportedOS(String)
    case invalidDisplayResolution(String)
    var errorDescription: String? {
        switch self {
        case .alreadyExists(let name):
            return "Virtual machine already exists with name: \(name)"
        case .notFound(let name):
            return "Virtual machine not found: \(name)"
        case .notInitialized(let name):
            return "Virtual machine not initialized: \(name)"
        case .notRunning(let name):
            return "Virtual machine not running: \(name)"
        case .alreadyRunning(let name):
            return "Virtual machine already running: \(name)"
        case .installNotStarted(let name):
            return "Virtual machine install not started: \(name)"
        case .stopTimeout(let name):
            return "Timeout while stopping virtual machine: \(name)"
        case .resizeTooSmall(let current, let requested):
            return "Cannot resize disk to \(requested) bytes, current size is \(current) bytes"
        case .vncNotConfigured:
            return "VNC is not configured for this virtual machine"
        case .vncPortBindingFailed(let requested, let actual):
            if actual == -1 {
                return "Could not bind to VNC port \(requested) (port already in use). Try a different port or use port 0 for auto-assign."
            }
            return "Could not bind to VNC port \(requested) (port already in use). System assigned port \(actual) instead. Try a different port or use port 0 for auto-assign."
        case .internalError(let message):
            return "Internal error: \(message)"
        case .unsupportedOS(let os):
            return "Unsupported operating system: \(os)"
        case .invalidDisplayResolution(let resolution):
            return "Invalid display resolution: \(resolution)"
        }
    }
}

enum ResticError: Error {
    case snapshotFailed(String)
    case restoreFailed(String)
    case genericError(String)
}

enum VmrunError: Error, LocalizedError {
    case commandNotFound
    case operationFailed(command: String, output: String?)

    var errorDescription: String? {
        switch self {
        case .commandNotFound:
            return "vmrun command not found. Ensure VMware Fusion is installed and in the system PATH."
        case .operationFailed(let command, let output):
            return "vmrun command '\(command)' failed. Output: \(output ?? "No output")"
        }
    }
}
```

--------------------------------------------------------------------------------
/libs/python/core/core/telemetry/posthog.py:
--------------------------------------------------------------------------------

```python
"""Telemetry client using PostHog for collecting anonymous usage data."""

from __future__ import annotations

import logging
import os
import uuid
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional

import posthog
from core import __version__

logger = logging.getLogger("core.telemetry")

# Public PostHog config for anonymous telemetry
# These values are intentionally public and meant for anonymous telemetry only
# https://posthog.com/docs/product-analytics/troubleshooting#is-it-ok-for-my-api-key-to-be-exposed-and-public
PUBLIC_POSTHOG_API_KEY = "phc_eSkLnbLxsnYFaXksif1ksbrNzYlJShr35miFLDppF14"
PUBLIC_POSTHOG_HOST = "https://eu.i.posthog.com"

class PostHogTelemetryClient:
    """Collects and reports telemetry data via PostHog."""

    # Global singleton (class-managed)
    _singleton: Optional["PostHogTelemetryClient"] = None

    def __init__(self):
        """Initialize PostHog telemetry client."""
        self.installation_id = self._get_or_create_installation_id()
        self.initialized = False
        self.queued_events: List[Dict[str, Any]] = []

        # Log telemetry status on startup
        if self.is_telemetry_enabled():
            logger.info("Telemetry enabled")
            # Initialize PostHog client if config is available
            self._initialize_posthog()
        else:
            logger.info("Telemetry disabled")

    @classmethod
    def is_telemetry_enabled(cls) -> bool:
        """True if telemetry is currently active for this process."""
        return (
            # Legacy opt-out flag
            os.environ.get("CUA_TELEMETRY", "").lower() != "off"
            # Opt-in flag (defaults to enabled)
            and os.environ.get("CUA_TELEMETRY_ENABLED", "true").lower() in { "1", "true", "yes", "on" }
        )

    def _get_or_create_installation_id(self) -> str:
        """Get or create a unique installation ID that persists across runs.

        The ID is always stored within the core library directory itself,
        ensuring it persists regardless of how the library is used.

        This ID is not tied to any personal information.
        """
        # Get the core library directory (where this file is located)
        try:
            # Find the core module directory using this file's location
            core_module_dir = Path(
                __file__
            ).parent.parent  # core/telemetry/posthog_client.py -> core/telemetry -> core
            storage_dir = core_module_dir / ".storage"
            storage_dir.mkdir(exist_ok=True)

            id_file = storage_dir / "installation_id"

            # Try to read existing ID
            if id_file.exists():
                try:
                    stored_id = id_file.read_text().strip()
                    if stored_id:  # Make sure it's not empty
                        logger.debug(f"Using existing installation ID: {stored_id}")
                        return stored_id
                except Exception as e:
                    logger.debug(f"Error reading installation ID file: {e}")

            # Create new ID
            new_id = str(uuid.uuid4())
            try:
                id_file.write_text(new_id)
                logger.debug(f"Created new installation ID: {new_id}")
                return new_id
            except Exception as e:
                logger.warning(f"Could not write installation ID: {e}")
        except Exception as e:
            logger.warning(f"Error accessing core module directory: {e}")

        # Last resort: Create a new in-memory ID
        logger.warning("Using random installation ID (will not persist across runs)")
        return str(uuid.uuid4())

    def _initialize_posthog(self) -> bool:
        """Initialize the PostHog client with configuration.

        Returns:
            bool: True if initialized successfully, False otherwise
        """
        if self.initialized:
            return True

        try:
            # Allow overrides from environment for testing/region control
            posthog.api_key = PUBLIC_POSTHOG_API_KEY
            posthog.host = PUBLIC_POSTHOG_HOST

            # Configure the client
            posthog.debug = os.environ.get("CUA_TELEMETRY_DEBUG", "").lower() == "on"

            # Log telemetry status
            logger.info(
                f"Initializing PostHog telemetry with installation ID: {self.installation_id}"
            )
            if posthog.debug:
                logger.debug(f"PostHog API Key: {posthog.api_key}")
                logger.debug(f"PostHog Host: {posthog.host}")

            # Identify this installation
            self._identify()

            # Process any queued events
            for event in self.queued_events:
                posthog.capture(
                    distinct_id=self.installation_id,
                    event=event["event"],
                    properties=event["properties"],
                )
            self.queued_events = []

            self.initialized = True
            return True
        except Exception as e:
            logger.warning(f"Failed to initialize PostHog: {e}")
            return False

    def _identify(self) -> None:
        """Set up user properties for the current installation with PostHog."""
        try:
            properties = {
                "version": __version__,
                "is_ci": "CI" in os.environ,
                "os": os.name,
                "python_version": sys.version.split()[0],
            }

            logger.debug(
                f"Setting up PostHog user properties for: {self.installation_id} with properties: {properties}"
            )
            
            # In the Python SDK, we capture an identification event instead of calling identify()
            posthog.capture(
                distinct_id=self.installation_id,
                event="$identify",
                properties={"$set": properties}
            )
            
            logger.info(f"Set up PostHog user properties for installation: {self.installation_id}")
        except Exception as e:
            logger.warning(f"Failed to set up PostHog user properties: {e}")

    def record_event(self, event_name: str, properties: Optional[Dict[str, Any]] = None) -> None:
        """Record an event with optional properties.

        Args:
            event_name: Name of the event
            properties: Event properties (must not contain sensitive data)
        """
        # Respect runtime telemetry opt-out.
        if not self.is_telemetry_enabled():
            logger.debug("Telemetry disabled; event not recorded.")
            return

        event_properties = {"version": __version__, **(properties or {})}

        logger.info(f"Recording event: {event_name} with properties: {event_properties}")

        if self.initialized:
            try:
                posthog.capture(
                    distinct_id=self.installation_id, event=event_name, properties=event_properties
                )
                logger.info(f"Sent event to PostHog: {event_name}")
                # Flush immediately to ensure delivery
                posthog.flush()
            except Exception as e:
                logger.warning(f"Failed to send event to PostHog: {e}")
        else:
            # Queue the event for later
            logger.info(f"PostHog not initialized, queuing event for later: {event_name}")
            self.queued_events.append({"event": event_name, "properties": event_properties})
            # Try to initialize now if not already
            initialize_result = self._initialize_posthog()
            logger.info(f"Attempted to initialize PostHog: {initialize_result}")

    def flush(self) -> bool:
        """Flush any pending events to PostHog.

        Returns:
            bool: True if successful, False otherwise
        """
        if not self.initialized and not self._initialize_posthog():
            return False

        try:
            posthog.flush()
            return True
        except Exception as e:
            logger.debug(f"Failed to flush PostHog events: {e}")
            return False

    @classmethod
    def get_client(cls) -> "PostHogTelemetryClient":
        """Return the global PostHogTelemetryClient instance, creating it if needed."""
        if cls._singleton is None:
            cls._singleton = cls()
        return cls._singleton

    @classmethod
    def destroy_client(cls) -> None:
        """Destroy the global PostHogTelemetryClient instance."""
        cls._singleton = None

def destroy_telemetry_client() -> None:
    """Destroy the global PostHogTelemetryClient instance (class-managed)."""
    PostHogTelemetryClient.destroy_client()

def is_telemetry_enabled() -> bool:
    return PostHogTelemetryClient.is_telemetry_enabled()

def record_event(event_name: str, properties: Optional[Dict[str, Any]] | None = None) -> None:
    """Record an arbitrary PostHog event."""
    PostHogTelemetryClient.get_client().record_event(event_name, properties or {})
```

--------------------------------------------------------------------------------
/libs/python/agent/agent/ui/gradio/app.py:
--------------------------------------------------------------------------------

```python
"""
Advanced Gradio UI for Computer-Use Agent (cua-agent)

This is a Gradio interface for the Computer-Use Agent v0.4.x (cua-agent)
with an advanced UI for model selection and configuration.

Supported Agent Models:
- OpenAI: openai/computer-use-preview
- Anthropic: anthropic/claude-3-5-sonnet-20241022, anthropic/claude-3-7-sonnet-20250219
- UI-TARS: huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B
- Omniparser: omniparser+anthropic/claude-3-5-sonnet-20241022, omniparser+ollama_chat/gemma3

Requirements:
    - Mac with Apple Silicon (M1/M2/M3/M4), Linux, or Windows
    - macOS 14 (Sonoma) or newer / Ubuntu 20.04+
    - Python 3.11+
    - Lume CLI installed (https://github.com/trycua/cua)
    - OpenAI or Anthropic API key
"""

import os
import asyncio
import logging
import json
import platform
from pathlib import Path
from typing import Dict, List, Optional, AsyncGenerator, Any, Tuple, Union
import gradio as gr
from gradio.components.chatbot import MetadataDict
from typing import cast

# Import from agent package
from agent import ComputerAgent
from agent.types import Messages, AgentResponse
from computer import Computer

# Global variables
global_agent = None
global_computer = None
SETTINGS_FILE = Path(".gradio_settings.json")

logging.basicConfig(level=logging.INFO)

import dotenv
if dotenv.load_dotenv():
    print(f"DEBUG - Loaded environment variables from {dotenv.find_dotenv()}")
else:
    print("DEBUG - No .env file found")

# --- Settings Load/Save Functions ---
def load_settings() -> Dict[str, Any]:
    """Loads settings from the JSON file."""
    if SETTINGS_FILE.exists():
        try:
            with open(SETTINGS_FILE, "r") as f:
                settings = json.load(f)
                if isinstance(settings, dict):
                    print(f"DEBUG - Loaded settings from {SETTINGS_FILE}")
                    return settings
        except (json.JSONDecodeError, IOError) as e:
            print(f"Warning: Could not load settings from {SETTINGS_FILE}: {e}")
    return {}


def save_settings(settings: Dict[str, Any]):
    """Saves settings to the JSON file."""
    settings.pop("provider_api_key", None)
    try:
        with open(SETTINGS_FILE, "w") as f:
            json.dump(settings, f, indent=4)
        print(f"DEBUG - Saved settings to {SETTINGS_FILE}")
    except IOError as e:
        print(f"Warning: Could not save settings to {SETTINGS_FILE}: {e}")


# # Custom Screenshot Handler for Gradio chat
# class GradioChatScreenshotHandler:
#     """Custom handler that adds screenshots to the Gradio chatbot."""

#     def __init__(self, chatbot_history: List[gr.ChatMessage]):
#         self.chatbot_history = chatbot_history
#         print("GradioChatScreenshotHandler initialized")

#     async def on_screenshot(self, screenshot_base64: str, action_type: str = "") -> None:
#         """Add screenshot to chatbot when a screenshot is taken."""
#         image_markdown = f"![Screenshot after {action_type}](data:image/png;base64,{screenshot_base64})"
        
#         if self.chatbot_history is not None:
#             self.chatbot_history.append(
#                 gr.ChatMessage(
#                     role="assistant",
#                     content=image_markdown,
#                     metadata={"title": f"🖥️ Screenshot - {action_type}", "status": "done"},
#                 )
#             )


# Detect platform capabilities
is_mac = platform.system().lower() == "darwin"
is_lume_available = is_mac or (os.environ.get("PYLUME_HOST", "localhost") != "localhost")

print("PYLUME_HOST: ", os.environ.get("PYLUME_HOST", "localhost"))
print("is_mac: ", is_mac)
print("Lume available: ", is_lume_available)

# Map model names to agent model strings
MODEL_MAPPINGS = {
    "openai": {
        "default": "openai/computer-use-preview",
        "OpenAI: Computer-Use Preview": "openai/computer-use-preview",
    },
    "anthropic": {
        "default": "anthropic/claude-3-7-sonnet-20250219",
        "Anthropic: Claude 4 Opus (20250514)": "anthropic/claude-opus-4-20250514",
        "Anthropic: Claude 4 Sonnet (20250514)": "anthropic/claude-sonnet-4-20250514",
        "Anthropic: Claude 3.7 Sonnet (20250219)": "anthropic/claude-3-7-sonnet-20250219",
        "Anthropic: Claude 3.5 Sonnet (20241022)": "anthropic/claude-3-5-sonnet-20241022",
    },
    "omni": {
        "default": "omniparser+openai/gpt-4o",
        "OMNI: OpenAI GPT-4o": "omniparser+openai/gpt-4o",
        "OMNI: OpenAI GPT-4o mini": "omniparser+openai/gpt-4o-mini",
        "OMNI: Claude 3.7 Sonnet (20250219)": "omniparser+anthropic/claude-3-7-sonnet-20250219",
        "OMNI: Claude 3.5 Sonnet (20241022)": "omniparser+anthropic/claude-3-5-sonnet-20241022",
    },
    "uitars": {
        "default": "huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B" if is_mac else "ui-tars",
        "huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B": "huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B",
    },
}


def get_model_string(model_name: str, loop_provider: str) -> str:
    """Determine the agent model string based on the input."""
    if model_name == "Custom model (OpenAI compatible API)":
        return "custom_oaicompat"
    elif model_name == "Custom model (ollama)":
        return "custom_ollama"
    elif loop_provider == "OMNI-OLLAMA" or model_name.startswith("OMNI: Ollama "):
        if model_name.startswith("OMNI: Ollama "):
            ollama_model = model_name.split("OMNI: Ollama ", 1)[1]
            return f"omniparser+ollama_chat/{ollama_model}"
        return "omniparser+ollama_chat/llama3"
    
    # Map based on loop provider
    mapping = MODEL_MAPPINGS.get(loop_provider.lower(), MODEL_MAPPINGS["openai"])
    return mapping.get(model_name, mapping["default"])


def get_ollama_models() -> List[str]:
    """Get available models from Ollama if installed."""
    try:
        import subprocess
        result = subprocess.run(["ollama", "list"], capture_output=True, text=True)
        if result.returncode == 0:
            lines = result.stdout.strip().split("\n")
            if len(lines) < 2:
                return []
            models = []
            for line in lines[1:]:
                parts = line.split()
                if parts:
                    model_name = parts[0]
                    models.append(f"OMNI: Ollama {model_name}")
            return models
        return []
    except Exception as e:
        logging.error(f"Error getting Ollama models: {e}")
        return []


def create_computer_instance(
    verbosity: int = logging.INFO,
    os_type: str = "macos",
    provider_type: str = "lume",
    name: Optional[str] = None,
    api_key: Optional[str] = None
) -> Computer:
    """Create or get the global Computer instance."""
    global global_computer
    if global_computer is None:
        if provider_type == "localhost":
            global_computer = Computer(
                verbosity=verbosity,
                os_type=os_type,
                use_host_computer_server=True
            )
        else:
            global_computer = Computer(
                verbosity=verbosity,
                os_type=os_type,
                provider_type=provider_type,
                name=name if name else "",
                api_key=api_key
            )
    return global_computer


def create_agent(
    model_string: str,
    save_trajectory: bool = True,
    only_n_most_recent_images: int = 3,
    verbosity: int = logging.INFO,
    custom_model_name: Optional[str] = None,
    computer_os: str = "macos",
    computer_provider: str = "lume",
    computer_name: Optional[str] = None,
    computer_api_key: Optional[str] = None,
    max_trajectory_budget: Optional[float] = None,
) -> ComputerAgent:
    """Create or update the global agent with the specified parameters."""
    global global_agent

    # Create the computer
    computer = create_computer_instance(
        verbosity=verbosity,
        os_type=computer_os,
        provider_type=computer_provider,
        name=computer_name,
        api_key=computer_api_key
    )

    # Handle custom models
    if model_string == "custom_oaicompat" and custom_model_name:
        model_string = custom_model_name
    elif model_string == "custom_ollama" and custom_model_name:
        model_string = f"omniparser+ollama_chat/{custom_model_name}"

    # Create agent kwargs
    agent_kwargs = {
        "model": model_string,
        "tools": [computer],
        "only_n_most_recent_images": only_n_most_recent_images,
        "verbosity": verbosity,
    }
    
    if save_trajectory:
        agent_kwargs["trajectory_dir"] = "trajectories"
    
    if max_trajectory_budget:
        agent_kwargs["max_trajectory_budget"] = {"max_budget": max_trajectory_budget, "raise_error": True}

    global_agent = ComputerAgent(**agent_kwargs)
    return global_agent


def launch_ui():
    """Standalone function to launch the Gradio app."""
    from agent.ui.gradio.ui_components import create_gradio_ui
    print(f"Starting Gradio app for CUA Agent...")
    demo = create_gradio_ui()
    demo.launch(share=False, inbrowser=True)


if __name__ == "__main__":
    launch_ui()

```

--------------------------------------------------------------------------------
/docs/content/docs/computer-sdk/commands.mdx:
--------------------------------------------------------------------------------

```markdown
---
title: Commands
description: Computer commands and interface methods
---

This page describes the set of supported **commands** you can use to control a Cua Computer directly via the Python SDK.

These commands map to the same actions available in the [Computer Server API Commands Reference](../libraries/computer-server/Commands), and provide low-level, async access to system operations from your agent or automation code.

## Shell Actions

Execute shell commands and get detailed results:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">
    ```python
    # Run shell command result = await
    computer.interface.run_command(cmd) # result.stdout, result.stderr, result.returncode
    ```
  </Tab>
  <Tab value="TypeScript">
    ```typescript
    // Run shell command const result = await
    computer.interface.runCommand(cmd); // result.stdout, result.stderr, result.returncode
    ```
  </Tab>
</Tabs>

## Mouse Actions

Precise mouse control and interaction:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">
    ```python
    # Basic clicks
    await computer.interface.left_click(x, y)       # Left click at coordinates
    await computer.interface.right_click(x, y)      # Right click at coordinates
    await computer.interface.double_click(x, y)     # Double click at coordinates

    # Cursor movement and dragging
    await computer.interface.move_cursor(x, y)      # Move cursor to coordinates
    await computer.interface.drag_to(x, y, duration)  # Drag to coordinates
    await computer.interface.get_cursor_position()  # Get current cursor position

    # Advanced mouse control
    await computer.interface.mouse_down(x, y, button="left")  # Press and hold a mouse button
    await computer.interface.mouse_up(x, y, button="left")    # Release a mouse button
    ```

  </Tab>
  <Tab value="TypeScript">
    ```typescript
    // Basic clicks
    await computer.interface.leftClick(x, y);       // Left click at coordinates
    await computer.interface.rightClick(x, y);      // Right click at coordinates
    await computer.interface.doubleClick(x, y);     // Double click at coordinates

    // Cursor movement and dragging
    await computer.interface.moveCursor(x, y);      // Move cursor to coordinates
    await computer.interface.dragTo(x, y, duration);  // Drag to coordinates
    await computer.interface.getCursorPosition();  // Get current cursor position

    // Advanced mouse control
    await computer.interface.mouseDown(x, y, "left");  // Press and hold a mouse button
    await computer.interface.mouseUp(x, y, "left");    // Release a mouse button
    ```

  </Tab>
</Tabs>

## Keyboard Actions

Text input and key combinations:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">
    ```python
    # Text input
    await computer.interface.type_text("Hello")     # Type text
    await computer.interface.press_key("enter")     # Press a single key

    # Key combinations and advanced control
    await computer.interface.hotkey("command", "c") # Press key combination
    await computer.interface.key_down("command")    # Press and hold a key
    await computer.interface.key_up("command")      # Release a key
    ```

  </Tab>
  <Tab value="TypeScript">
    ```typescript
    // Text input
    await computer.interface.typeText("Hello");     // Type text
    await computer.interface.pressKey("enter");     // Press a single key

    // Key combinations and advanced control
    await computer.interface.hotkey("command", "c"); // Press key combination
    await computer.interface.keyDown("command");    // Press and hold a key
    await computer.interface.keyUp("command");      // Release a key
    ```

  </Tab>
</Tabs>

## Scrolling Actions

Mouse wheel and scrolling control:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">
    ```python
    # Scrolling
    await computer.interface.scroll(x, y) # Scroll the mouse wheel
    await computer.interface.scroll_down(clicks) # Scroll down await
    computer.interface.scroll_up(clicks) # Scroll up
    ```
  </Tab>
  <Tab value="TypeScript">
    ```typescript 
    // Scrolling 
    await computer.interface.scroll(x, y); // Scroll the mouse wheel 
    await computer.interface.scrollDown(clicks); // Scroll down
    await computer.interface.scrollUp(clicks); // Scroll up 
    ```
  </Tab>
</Tabs>

## Screen Actions

Screen capture and display information:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">
    ```python 
    # Screen operations 
    await computer.interface.screenshot() # Take a screenshot 
    await computer.interface.get_screen_size() # Get screen dimensions

    ```

  </Tab>
  <Tab value="TypeScript">
    ```typescript 
    // Screen operations 
    await computer.interface.screenshot(); // Take a screenshot 
    await computer.interface.getScreenSize(); // Get screen dimensions 
    
    ```
  </Tab>
</Tabs>

## Clipboard Actions

System clipboard management:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">
    ```python 
    # Clipboard operations await
    computer.interface.set_clipboard(text) # Set clipboard content await
    computer.interface.copy_to_clipboard() # Get clipboard content

    ```

  </Tab>
  <Tab value="TypeScript">
    ```typescript 
    // Clipboard operations 
    await computer.interface.setClipboard(text); // Set clipboard content
    await computer.interface.copyToClipboard(); // Get clipboard content

    ```

  </Tab>
</Tabs>

## File System Operations

Direct file and directory manipulation:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">

    ```python
    # File existence checks
    await computer.interface.file_exists(path)      # Check if file exists
    await computer.interface.directory_exists(path) # Check if directory exists

    # File content operations
    await computer.interface.read_text(path, encoding="utf-8")        # Read file content
    await computer.interface.write_text(path, content, encoding="utf-8") # Write file content
    await computer.interface.read_bytes(path)       # Read file content as bytes
    await computer.interface.write_bytes(path, content) # Write file content as bytes

    # File and directory management
    await computer.interface.delete_file(path)      # Delete file
    await computer.interface.create_dir(path)       # Create directory
    await computer.interface.delete_dir(path)       # Delete directory
    await computer.interface.list_dir(path)         # List directory contents
    ```

  </Tab>
  <Tab value="TypeScript">
    ```typescript
    # File existence checks
    await computer.interface.fileExists(path);      // Check if file exists
    await computer.interface.directoryExists(path); // Check if directory exists

    # File content operations
    await computer.interface.readText(path, "utf-8");        // Read file content
    await computer.interface.writeText(path, content, "utf-8"); // Write file content
    await computer.interface.readBytes(path);       // Read file content as bytes
    await computer.interface.writeBytes(path, content); // Write file content as bytes

    # File and directory management
    await computer.interface.deleteFile(path);      // Delete file
    await computer.interface.createDir(path);       // Create directory
    await computer.interface.deleteDir(path);       // Delete directory
    await computer.interface.listDir(path);         // List directory contents
    ```

  </Tab>
</Tabs>

## Accessibility

Access system accessibility information:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">
    ```python 
    # Get accessibility tree 
    await computer.interface.get_accessibility_tree()

    ```

  </Tab>
  <Tab value="TypeScript">
    ```typescript 
    // Get accessibility tree 
   await computer.interface.getAccessibilityTree();

```
</Tab>
</Tabs>

## Delay Configuration

Control timing between actions:

<Tabs items={['Python']}>
  <Tab value="Python">
    ```python
    # Set default delay between all actions (in seconds)
    computer.interface.delay = 0.5  # 500ms delay between actions

    # Or specify delay for individual actions
    await computer.interface.left_click(x, y, delay=1.0)     # 1 second delay after click
    await computer.interface.type_text("Hello", delay=0.2)   # 200ms delay after typing
    await computer.interface.press_key("enter", delay=0.5)   # 500ms delay after key press
    ```

  </Tab>
</Tabs>

## Python Virtual Environment Operations

Manage Python environments:

<Tabs items={['Python']}>
  <Tab value="Python">
    ```python
    # Virtual environment management
    await computer.venv_install("demo_venv", ["requests", "macos-pyxa"]) # Install packages in a virtual environment
    await computer.venv_cmd("demo_venv", "python -c 'import requests; print(requests.get(`https://httpbin.org/ip`).json())'') # Run a shell command in a virtual environment
    await computer.venv_exec("demo_venv", python_function_or_code, *args, **kwargs) # Run a Python function in a virtual environment and return the result / raise an exception
    ```

  </Tab>
</Tabs>
```

--------------------------------------------------------------------------------
/blog/app-use.md:
--------------------------------------------------------------------------------

```markdown
# App-Use: Control Individual Applications with Cua Agents

*Published on May 31, 2025 by The Cua Team*

Today, we are excited to introduce a new experimental feature landing in the [Cua GitHub repository](https://github.com/trycua/cua): **App-Use**. App-Use allows you to create lightweight virtual desktops that limit agent access to specific applications, improving precision of your agent's trajectory. Perfect for parallel workflows, and focused task execution.

> **Note:** App-Use is currently experimental. To use it, you need to enable it by passing `experiments=["app-use"]` feature flag when creating your Computer instance.

Check out an example of a Cua Agent automating Cua's team Taco Bell order through the iPhone Mirroring app:

<div align="center">
  <video src="https://github.com/user-attachments/assets/6362572e-f784-4006-aa6e-bce10991fab9" width="600" controls></video>
</div>

## What is App-Use?

App-Use lets you create virtual desktop sessions scoped to specific applications. Instead of giving an agent access to your entire screen, you can say "only work with Safari and Notes" or "just control the iPhone Mirroring app."

```python
# Create a macOS VM with App Use experimental feature enabled
computer = Computer(experiments=["app-use"])

# Create a desktop limited to specific apps
desktop = computer.create_desktop_from_apps(["Safari", "Notes"])

# Your agent can now only see and interact with these apps
agent = ComputerAgent(
    model="anthropic/claude-3-5-sonnet-20241022",
    tools=[desktop]
)
```

## Key Benefits

### 1. Lightweight and Fast
App-Use creates visual filters, not new processes. Your apps continue running normally - we just control what the agent can see and click on. The virtual desktops are composited views that require no additional compute resources beyond the existing window manager operations.

### 2. Run Multiple Agents in Parallel
Deploy a team of specialized agents, each focused on their own apps:

```python
# Create a Computer with App Use enabled
computer = Computer(experiments=["app-use"])

# Research agent focuses on browser
research_desktop = computer.create_desktop_from_apps(["Safari"])
research_agent = ComputerAgent(tools=[research_desktop], ...)

# Writing agent focuses on documents  
writing_desktop = computer.create_desktop_from_apps(["Pages", "Notes"])
writing_agent = ComputerAgent(tools=[writing_desktop], ...)

async def run_agent(agent, task):
    async for result in agent.run(task):
        print(result.get('text', ''))

# Run both simultaneously
await asyncio.gather(
    run_agent(research_agent, "Research AI trends for 2025"),
    run_agent(writing_agent, "Draft blog post outline")
)
```

## How To: Getting Started with App-Use

### Requirements

To get started with App-Use, you'll need:
- Python 3.11+
- macOS Sequoia (15.0) or later

### Getting Started

```bash
# Install packages and launch UI
pip install -U "cua-computer[all]" "cua-agent[all]"
python -m agent.ui.gradio.app
```

```python
import asyncio
from computer import Computer
from agent import ComputerAgent

async def main():
    computer = Computer()
    await computer.run()
    
    # Create app-specific desktop sessions
    desktop = computer.create_desktop_from_apps(["Notes"])
    
    # Initialize an agent
    agent = ComputerAgent(
        model="anthropic/claude-3-5-sonnet-20241022",
        tools=[desktop]
    )
    
    # Take a screenshot (returns bytes by default)
    screenshot = await desktop.interface.screenshot()
    with open("app_screenshot.png", "wb") as f:
        f.write(screenshot)
    
    # Run an agent task
    async for result in agent.run("Create a new note titled 'Meeting Notes' and add today's agenda items"):
        print(f"Agent: {result.get('text', '')}")

if __name__ == "__main__":
    asyncio.run(main())
```

## Use Case: Automating Your iPhone with Cua

### ⚠️ Important Warning

Computer-use agents are powerful tools that can interact with your devices. This guide involves using your own macOS and iPhone instead of a VM. **Proceed at your own risk.** Always:
- Review agent actions before running
- Start with non-critical tasks
- Monitor agent behavior closely

Remember with Cua it is still advised to use a VM for a better level of isolation for your agents.

### Setting Up iPhone Automation

### Step 1: Start the cua-computer-server

First, you'll need to start the cua-computer-server locally to enable access to iPhone Mirroring via the Computer interface:

```bash
# Install the server
pip install cua-computer-server

# Start the server
python -m computer_server
```

### Step 2: Connect iPhone Mirroring

Then, you'll need to open the "iPhone Mirroring" app on your Mac and connect it to your iPhone.

### Step 3: Create an iPhone Automation Session

Finally, you can create an iPhone automation session:

```python
import asyncio
from computer import Computer
from cua_agent import Agent

async def automate_iphone():
    # Connect to your local computer server
    my_mac = Computer(use_host_computer_server=True, os_type="macos", experiments=["app-use"])
    await my_mac.run()
    
    # Create a desktop focused on iPhone Mirroring
    my_iphone = my_mac.create_desktop_from_apps(["iPhone Mirroring"])
    
    # Initialize an agent for iPhone automation
    agent = ComputerAgent(
        model="anthropic/claude-3-5-sonnet-20241022",
        tools=[my_iphone]
    )
    
    # Example: Send a message
    async for result in agent.run("Open Messages and send 'Hello from Cua!' to John"):
        print(f"Agent: {result.get('text', '')}")
    
    # Example: Set a reminder
    async for result in agent.run("Create a reminder to call mom at 5 PM today"):
        print(f"Agent: {result.get('text', '')}")

if __name__ == "__main__":
    asyncio.run(automate_iphone())
```

### iPhone Automation Use Cases

With Cua's iPhone automation, you can:
- **Automate messaging**: Send texts, respond to messages, manage conversations
- **Control apps**: Navigate any iPhone app using natural language
- **Manage settings**: Adjust iPhone settings programmatically
- **Extract data**: Read information from apps that don't have APIs
- **Test iOS apps**: Automate testing workflows for iPhone applications

## Important Notes

- **Visual isolation only**: Apps share the same files, OS resources, and user session
- **Dynamic resolution**: Desktops automatically scale to fit app windows and menu bars
- **macOS only**: Currently requires macOS due to compositing engine dependencies
- **Not a security boundary**: This is for agent focus, not security isolation

## When to Use What: App-Use vs Multiple Cua Containers

### Use App-Use within the same macOS Cua Container:
- ✅ You need lightweight, fast agent focusing (macOS only)
- ✅ You want to run multiple agents on one desktop
- ✅ You're automating personal devices like iPhones
- ✅ Window layout isolation is sufficient
- ✅ You want low computational overhead

### Use Multiple Cua Containers:
- ✅ You need maximum isolation between agents
- ✅ You require cross-platform support (Mac/Linux/Windows)
- ✅ You need guaranteed resource allocation
- ✅ Security and complete isolation are critical
- ⚠️ Note: Most computationally expensive option

## Pro Tips

1. **Start Small**: Test with one app before creating complex multi-app desktops
2. **Screenshot First**: Take a screenshot to verify your desktop shows the right apps
3. **Name Your Apps Correctly**: Use exact app names as they appear in the system
4. **Consider Performance**: While lightweight, too many parallel agents can still impact system performance
5. **Plan Your Workflows**: Design agent tasks to minimize app switching for best results

### How It Works

When you create a desktop session with `create_desktop_from_apps()`, App Use:
- Filters the visual output to show only specified application windows
- Routes input events only to those applications
- Maintains window layout isolation between different sessions
- Shares the underlying file system and OS resources
- **Dynamically adjusts resolution** to fit the window layout and menu bar items

The resolution of these virtual desktops is dynamic, automatically scaling to accommodate the applications' window sizes and menu bar requirements. This ensures that agents always have a clear view of the entire interface they need to interact with, regardless of the specific app combination.

Currently, App Use is limited to macOS only due to its reliance on Quartz, Apple's powerful compositing engine, for creating these virtual desktops. Quartz provides the low-level window management and rendering capabilities that make it possible to composite multiple application windows into isolated visual environments.

## Conclusion

App Use brings a new dimension to computer automation - lightweight, focused, and parallel. Whether you're building a personal iPhone assistant or orchestrating a team of specialized agents, App Use provides the perfect balance of functionality and efficiency.

Ready to try it? Update to the latest Cua version and start focusing your agents today!

```bash
pip install -U "cua-computer[all]" "cua-agent[all]"
```

Happy automating! 🎯🤖

```

--------------------------------------------------------------------------------
/blog/introducing-cua-cloud-containers.md:
--------------------------------------------------------------------------------

```markdown
# Introducing Cua Cloud Sandbox: Computer-Use Agents in the Cloud

*Published on May 28, 2025 by Francesco Bonacci*

Welcome to the next chapter in our Computer-Use Agent journey! In [Part 1](./build-your-own-operator-on-macos-1), we showed you how to build your own Operator on macOS. In [Part 2](./build-your-own-operator-on-macos-2), we explored the cua-agent framework. Today, we're excited to introduce **Cua Cloud Sandbox** – the easiest way to deploy Computer-Use Agents at scale.

<div align="center">
  <video src="https://github.com/user-attachments/assets/63a2addf-649f-4468-971d-58d38dd43ee6" width="600" controls></video>
</div>

## What is Cua Cloud?

Think of Cua Cloud as **Docker for Computer-Use Agents**. Instead of managing VMs, installing dependencies, and configuring environments, you can launch pre-configured Cloud Sandbox instances with a single command. Each sandbox comes with a **full desktop environment** accessible via browser (via noVNC), all CUA-related dependencies pre-configured (with a PyAutoGUI-compatible server), and **pay-per-use pricing** that scales with your needs.

## Why Cua Cloud Sandbox?

Four months ago, we launched [**Lume**](https://github.com/trycua/cua/tree/main/libs/lume) and [**Cua**](https://github.com/trycua/cua) with the goal to bring sandboxed VMs and Computer-Use Agents on Apple Silicon. The developer's community response was incredible 🎉 

Going from prototype to production revealed a problem though: **local macOS VMs don't scale**, neither are they easily portable. 

Our Discord community, YC peers, and early pilot customers kept hitting the same issues. Storage constraints meant **20-40GB per VM** filled laptops fast. Different hardware architectures (Apple Silicon ARM vs Intel x86) prevented portability of local workflows. Every new user lost a day to setup and configuration.

**Cua Cloud** eliminates these constraints while preserving everything developers are familiar with about our Computer and Agent SDK.

### What We Built

Over the past month, we've been iterating over Cua Cloud with partners and beta users to address these challenges. You use the exact same `Computer` and `ComputerAgent` classes you already know, but with **zero local setup** or storage requirements. VNC access comes with **built-in encryption**, you pay only for compute time (not idle resources), and can bring your own API keys for any LLM provider.

The result? **Instant deployment** in seconds instead of hours, with no infrastructure to manage. Scale elastically from **1 to 100 agents** in parallel, with consistent behavior across all deployments. Share agent trajectories with your team for better collaboration and debugging.

## Getting Started

### Step 1: Get Your API Key

Sign up at [**trycua.com**](https://trycua.com) to get your API key.

```bash
# Set your API key in environment variables
export CUA_API_KEY=your_api_key_here
export CUA_CONTAINER_NAME=my-agent-container
```

### Step 2: Launch Your First Sandbox

```python
import asyncio
from computer import Computer, VMProviderType
from agent import ComputerAgent

async def run_cloud_agent():
    # Create a remote Linux computer with Cua Cloud
    computer = Computer(
        os_type="linux",
        api_key=os.getenv("CUA_API_KEY"),
        name=os.getenv("CUA_CONTAINER_NAME"),
        provider_type=VMProviderType.CLOUD,
    )
    
    # Create an agent with your preferred loop
    agent = ComputerAgent(
        model="openai/gpt-4o",
        save_trajectory=True,
        verbosity=logging.INFO,
        tools=[computer]
    )
    
    # Run a task
    async for result in agent.run("Open Chrome and search for AI news"):
        print(f"Response: {result.get('text')}")

# Run the agent
asyncio.run(run_cloud_agent())
```

### Available Tiers

We're launching with **three compute tiers** to match your workload needs:

- **Small** (1 vCPU, 4GB RAM) - Perfect for simple automation tasks and testing
- **Medium** (2 vCPU, 8GB RAM) - Ideal for most production workloads
- **Large** (8 vCPU, 32GB RAM) - Built for complex, resource-intensive operations

Each tier includes a **full Linux with Xfce desktop environment** with pre-configured browser, **secure VNC access** with SSL, persistent storage during your session, and automatic cleanup on termination for sandboxes.

## How some customers are using Cua Cloud today

### Example 1: Automated GitHub Workflow

Let's automate a complete GitHub workflow:

```python
import asyncio
import os
from computer import Computer, VMProviderType
from agent import ComputerAgent

async def github_automation():
    """Automate GitHub repository management tasks."""
    computer = Computer(
        os_type="linux",
        api_key=os.getenv("CUA_API_KEY"),
        name="github-automation",
        provider_type=VMProviderType.CLOUD,
    )
    
    agent = ComputerAgent(
        model="openai/gpt-4o",
        save_trajectory=True,
        verbosity=logging.INFO,
        tools=[computer]
    )
    
    tasks = [
        "Look for a repository named trycua/cua on GitHub.",
        "Check the open issues, open the most recent one and read it.",
        "Clone the repository if it doesn't exist yet.",
        "Create a new branch for the issue.",
        "Make necessary changes to resolve the issue.",
        "Commit the changes with a descriptive message.",
        "Create a pull request."
    ]
    
    for i, task in enumerate(tasks):
        print(f"\nExecuting task {i+1}/{len(tasks)}: {task}")
        async for result in agent.run(task):
            print(f"Response: {result.get('text')}")
            
            # Check if any tools were used
            tools = result.get('tools')
            if tools:
                print(f"Tools used: {tools}")
        
        print(f"Task {i+1} completed")

# Run the automation
asyncio.run(github_automation())
```

### Example 2: Parallel Web Scraping

Run multiple agents in parallel to scrape different websites:

```python
import asyncio
from computer import Computer, VMProviderType
from agent import ComputerAgent

async def scrape_website(site_name, url):
    """Scrape a website using a cloud agent."""
    computer = Computer(
        os_type="linux",
        api_key=os.getenv("CUA_API_KEY"),
        name=f"scraper-{site_name}",
        provider_type=VMProviderType.CLOUD,
    )
    
    agent = ComputerAgent(
        model="openai/gpt-4o",
        save_trajectory=True,
        tools=[computer]
    )
    
    results = []
    tasks = [
        f"Navigate to {url}",
        "Extract the main headlines or article titles",
        "Take a screenshot of the page",
        "Save the extracted data to a file"
    ]
    
    for task in tasks:
        async for result in agent.run(task):
            results.append({
                'site': site_name,
                'task': task,
                'response': result.get('text')
            })
    
    return results

async def parallel_scraping():
    """Scrape multiple websites in parallel."""
    sites = [
        ("ArXiv", "https://arxiv.org"),
        ("HackerNews", "https://news.ycombinator.com"),
        ("TechCrunch", "https://techcrunch.com")
    ]
    
    # Run all scraping tasks in parallel
    tasks = [scrape_website(name, url) for name, url in sites]
    results = await asyncio.gather(*tasks)
    
    # Process results
    for site_results in results:
        print(f"\nResults from {site_results[0]['site']}:")
        for result in site_results:
            print(f"  - {result['task']}: {result['response'][:100]}...")

# Run parallel scraping
asyncio.run(parallel_scraping())
```

## Cost Optimization Tips

To optimize your costs, use appropriate sandbox sizes for your workload and implement timeouts to prevent runaway tasks. Batch related operations together to minimize sandbox spin-up time, and always remember to terminate sandboxes when your work is complete.

## Security Considerations

Cua Cloud runs all sandboxes in isolated environments with encrypted VNC connections. Your API keys are never exposed in trajectories.

## What's Next for Cua Cloud

We're just getting started! Here's what's coming in the next few months:

### Elastic Autoscaled Sandbox Pools

Soon you'll be able to create elastic sandbox pools that automatically scale based on demand. Define minimum and maximum sandbox counts, and let Cua Cloud handle the rest. Perfect for batch processing, scheduled automations, and handling traffic spikes without manual intervention.

### Windows and macOS Cloud Support

While we're launching with Linux sandboxes, Windows and macOS cloud machines are coming soon. Run Windows-specific automations, test cross-platform workflows, or leverage macOS-exclusive applications – all in the cloud with the same simple API.

Stay tuned for updates and join our [**Discord**](https://discord.gg/cua-ai) to vote on which features you'd like to see first!

## Get Started Today

Ready to deploy your Computer-Use Agents in the cloud?

Visit [**trycua.com**](https://trycua.com) to sign up and get your API key. Join our [**Discord community**](https://discord.gg/cua-ai) for support and explore more examples on [**GitHub**](https://github.com/trycua/cua).

Happy RPA 2.0! 🚀

```
Page 6/16FirstPrevNextLast