This is page 13 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── scripts
│ │ ├── get_pyproject_version.py
│ │ └── tests
│ │ ├── __init__.py
│ │ ├── README.md
│ │ └── test_get_pyproject_version.py
│ └── workflows
│ ├── bump-version.yml
│ ├── ci-lume.yml
│ ├── docker-publish-cua-linux.yml
│ ├── docker-publish-cua-windows.yml
│ ├── docker-publish-kasm.yml
│ ├── docker-publish-xfce.yml
│ ├── docker-reusable-publish.yml
│ ├── link-check.yml
│ ├── lint.yml
│ ├── npm-publish-cli.yml
│ ├── npm-publish-computer.yml
│ ├── npm-publish-core.yml
│ ├── publish-lume.yml
│ ├── pypi-publish-agent.yml
│ ├── pypi-publish-computer-server.yml
│ ├── pypi-publish-computer.yml
│ ├── pypi-publish-core.yml
│ ├── pypi-publish-mcp-server.yml
│ ├── pypi-publish-som.yml
│ ├── pypi-reusable-publish.yml
│ ├── python-tests.yml
│ ├── test-cua-models.yml
│ └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│ ├── docs.code-workspace
│ ├── extensions.json
│ ├── launch.json
│ ├── libs-ts.code-workspace
│ ├── lume.code-workspace
│ ├── lumier.code-workspace
│ ├── py.code-workspace
│ └── settings.json
├── blog
│ ├── app-use.md
│ ├── assets
│ │ ├── composite-agents.png
│ │ ├── docker-ubuntu-support.png
│ │ ├── hack-booth.png
│ │ ├── hack-closing-ceremony.jpg
│ │ ├── hack-cua-ollama-hud.jpeg
│ │ ├── hack-leaderboard.png
│ │ ├── hack-the-north.png
│ │ ├── hack-winners.jpeg
│ │ ├── hack-workshop.jpeg
│ │ ├── hud-agent-evals.png
│ │ └── trajectory-viewer.jpeg
│ ├── bringing-computer-use-to-the-web.md
│ ├── build-your-own-operator-on-macos-1.md
│ ├── build-your-own-operator-on-macos-2.md
│ ├── cloud-windows-ga-macos-preview.md
│ ├── composite-agents.md
│ ├── computer-use-agents-for-growth-hacking.md
│ ├── cua-hackathon.md
│ ├── cua-playground-preview.md
│ ├── cua-vlm-router.md
│ ├── hack-the-north.md
│ ├── hud-agent-evals.md
│ ├── human-in-the-loop.md
│ ├── introducing-cua-cli.md
│ ├── introducing-cua-cloud-containers.md
│ ├── lume-to-containerization.md
│ ├── neurips-2025-cua-papers.md
│ ├── sandboxed-python-execution.md
│ ├── training-computer-use-models-trajectories-1.md
│ ├── trajectory-viewer.md
│ ├── ubuntu-docker-support.md
│ └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│ ├── .env.example
│ ├── .gitignore
│ ├── content
│ │ └── docs
│ │ ├── agent-sdk
│ │ │ ├── agent-loops.mdx
│ │ │ ├── benchmarks
│ │ │ │ ├── index.mdx
│ │ │ │ ├── interactive.mdx
│ │ │ │ ├── introduction.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── osworld-verified.mdx
│ │ │ │ ├── screenspot-pro.mdx
│ │ │ │ └── screenspot-v2.mdx
│ │ │ ├── callbacks
│ │ │ │ ├── agent-lifecycle.mdx
│ │ │ │ ├── cost-saving.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── logging.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── pii-anonymization.mdx
│ │ │ │ └── trajectories.mdx
│ │ │ ├── chat-history.mdx
│ │ │ ├── custom-tools.mdx
│ │ │ ├── customizing-computeragent.mdx
│ │ │ ├── integrations
│ │ │ │ ├── hud.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── observability.mdx
│ │ │ ├── mcp-server
│ │ │ │ ├── client-integrations.mdx
│ │ │ │ ├── configuration.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── llm-integrations.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── tools.mdx
│ │ │ │ └── usage.mdx
│ │ │ ├── message-format.mdx
│ │ │ ├── meta.json
│ │ │ ├── migration-guide.mdx
│ │ │ ├── prompt-caching.mdx
│ │ │ ├── supported-agents
│ │ │ │ ├── composed-agents.mdx
│ │ │ │ ├── computer-use-agents.mdx
│ │ │ │ ├── grounding-models.mdx
│ │ │ │ ├── human-in-the-loop.mdx
│ │ │ │ └── meta.json
│ │ │ ├── supported-model-providers
│ │ │ │ ├── cua-vlm-router.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ └── local-models.mdx
│ │ │ ├── telemetry.mdx
│ │ │ └── usage-tracking.mdx
│ │ ├── cli-playbook
│ │ │ ├── commands.mdx
│ │ │ ├── index.mdx
│ │ │ └── meta.json
│ │ ├── computer-sdk
│ │ │ ├── cloud-vm-management.mdx
│ │ │ ├── commands.mdx
│ │ │ ├── computer-server
│ │ │ │ ├── Commands.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── REST-API.mdx
│ │ │ │ └── WebSocket-API.mdx
│ │ │ ├── computer-ui.mdx
│ │ │ ├── computers.mdx
│ │ │ ├── custom-computer-handlers.mdx
│ │ │ ├── meta.json
│ │ │ ├── sandboxed-python.mdx
│ │ │ └── tracing-api.mdx
│ │ ├── example-usecases
│ │ │ ├── form-filling.mdx
│ │ │ ├── gemini-complex-ui-navigation.mdx
│ │ │ ├── meta.json
│ │ │ ├── post-event-contact-export.mdx
│ │ │ └── windows-app-behind-vpn.mdx
│ │ ├── get-started
│ │ │ ├── meta.json
│ │ │ └── quickstart.mdx
│ │ ├── index.mdx
│ │ ├── macos-vm-cli-playbook
│ │ │ ├── lume
│ │ │ │ ├── cli-reference.mdx
│ │ │ │ ├── faq.md
│ │ │ │ ├── http-api.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── prebuilt-images.mdx
│ │ │ ├── lumier
│ │ │ │ ├── building-lumier.mdx
│ │ │ │ ├── docker-compose.mdx
│ │ │ │ ├── docker.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ └── meta.json
│ │ │ └── meta.json
│ │ └── meta.json
│ ├── next.config.mjs
│ ├── package-lock.json
│ ├── package.json
│ ├── pnpm-lock.yaml
│ ├── postcss.config.mjs
│ ├── public
│ │ └── img
│ │ ├── agent_gradio_ui.png
│ │ ├── agent.png
│ │ ├── bg-dark.jpg
│ │ ├── bg-light.jpg
│ │ ├── cli.png
│ │ ├── computer.png
│ │ ├── grounding-with-gemini3.gif
│ │ ├── hero.png
│ │ ├── laminar_trace_example.png
│ │ ├── som_box_threshold.png
│ │ └── som_iou_threshold.png
│ ├── README.md
│ ├── source.config.ts
│ ├── src
│ │ ├── app
│ │ │ ├── (home)
│ │ │ │ ├── [[...slug]]
│ │ │ │ │ └── page.tsx
│ │ │ │ └── layout.tsx
│ │ │ ├── api
│ │ │ │ ├── posthog
│ │ │ │ │ └── [...path]
│ │ │ │ │ └── route.ts
│ │ │ │ └── search
│ │ │ │ └── route.ts
│ │ │ ├── favicon.ico
│ │ │ ├── global.css
│ │ │ ├── layout.config.tsx
│ │ │ ├── layout.tsx
│ │ │ ├── llms.mdx
│ │ │ │ └── [[...slug]]
│ │ │ │ └── route.ts
│ │ │ ├── llms.txt
│ │ │ │ └── route.ts
│ │ │ ├── robots.ts
│ │ │ └── sitemap.ts
│ │ ├── assets
│ │ │ ├── discord-black.svg
│ │ │ ├── discord-white.svg
│ │ │ ├── logo-black.svg
│ │ │ └── logo-white.svg
│ │ ├── components
│ │ │ ├── analytics-tracker.tsx
│ │ │ ├── cookie-consent.tsx
│ │ │ ├── doc-actions-menu.tsx
│ │ │ ├── editable-code-block.tsx
│ │ │ ├── footer.tsx
│ │ │ ├── hero.tsx
│ │ │ ├── iou.tsx
│ │ │ ├── mermaid.tsx
│ │ │ └── page-feedback.tsx
│ │ ├── lib
│ │ │ ├── llms.ts
│ │ │ └── source.ts
│ │ ├── mdx-components.tsx
│ │ └── providers
│ │ └── posthog-provider.tsx
│ └── tsconfig.json
├── examples
│ ├── agent_examples.py
│ ├── agent_ui_examples.py
│ ├── browser_tool_example.py
│ ├── cloud_api_examples.py
│ ├── computer_examples_windows.py
│ ├── computer_examples.py
│ ├── computer_ui_examples.py
│ ├── computer-example-ts
│ │ ├── .env.example
│ │ ├── .gitignore
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── README.md
│ │ ├── src
│ │ │ ├── helpers.ts
│ │ │ └── index.ts
│ │ └── tsconfig.json
│ ├── docker_examples.py
│ ├── evals
│ │ ├── hud_eval_examples.py
│ │ └── wikipedia_most_linked.txt
│ ├── pylume_examples.py
│ ├── sandboxed_functions_examples.py
│ ├── som_examples.py
│ ├── tracing_examples.py
│ ├── utils.py
│ └── winsandbox_example.py
├── img
│ ├── agent_gradio_ui.png
│ ├── agent.png
│ ├── cli.png
│ ├── computer.png
│ ├── logo_black.png
│ └── logo_white.png
├── libs
│ ├── kasm
│ │ ├── Dockerfile
│ │ ├── LICENSE
│ │ ├── README.md
│ │ └── src
│ │ └── ubuntu
│ │ └── install
│ │ └── firefox
│ │ ├── custom_startup.sh
│ │ ├── firefox.desktop
│ │ └── install_firefox.sh
│ ├── lume
│ │ ├── .cursorignore
│ │ ├── CONTRIBUTING.md
│ │ ├── Development.md
│ │ ├── img
│ │ │ └── cli.png
│ │ ├── Package.resolved
│ │ ├── Package.swift
│ │ ├── README.md
│ │ ├── resources
│ │ │ └── lume.entitlements
│ │ ├── scripts
│ │ │ ├── build
│ │ │ │ ├── build-debug.sh
│ │ │ │ ├── build-release-notarized.sh
│ │ │ │ └── build-release.sh
│ │ │ └── install.sh
│ │ ├── src
│ │ │ ├── Commands
│ │ │ │ ├── Clone.swift
│ │ │ │ ├── Config.swift
│ │ │ │ ├── Create.swift
│ │ │ │ ├── Delete.swift
│ │ │ │ ├── Get.swift
│ │ │ │ ├── Images.swift
│ │ │ │ ├── IPSW.swift
│ │ │ │ ├── List.swift
│ │ │ │ ├── Logs.swift
│ │ │ │ ├── Options
│ │ │ │ │ └── FormatOption.swift
│ │ │ │ ├── Prune.swift
│ │ │ │ ├── Pull.swift
│ │ │ │ ├── Push.swift
│ │ │ │ ├── Run.swift
│ │ │ │ ├── Serve.swift
│ │ │ │ ├── Set.swift
│ │ │ │ └── Stop.swift
│ │ │ ├── ContainerRegistry
│ │ │ │ ├── ImageContainerRegistry.swift
│ │ │ │ ├── ImageList.swift
│ │ │ │ └── ImagesPrinter.swift
│ │ │ ├── Errors
│ │ │ │ └── Errors.swift
│ │ │ ├── FileSystem
│ │ │ │ ├── Home.swift
│ │ │ │ ├── Settings.swift
│ │ │ │ ├── VMConfig.swift
│ │ │ │ ├── VMDirectory.swift
│ │ │ │ └── VMLocation.swift
│ │ │ ├── LumeController.swift
│ │ │ ├── Main.swift
│ │ │ ├── Server
│ │ │ │ ├── Handlers.swift
│ │ │ │ ├── HTTP.swift
│ │ │ │ ├── Requests.swift
│ │ │ │ ├── Responses.swift
│ │ │ │ └── Server.swift
│ │ │ ├── Utils
│ │ │ │ ├── CommandRegistry.swift
│ │ │ │ ├── CommandUtils.swift
│ │ │ │ ├── Logger.swift
│ │ │ │ ├── NetworkUtils.swift
│ │ │ │ ├── Path.swift
│ │ │ │ ├── ProcessRunner.swift
│ │ │ │ ├── ProgressLogger.swift
│ │ │ │ ├── String.swift
│ │ │ │ └── Utils.swift
│ │ │ ├── Virtualization
│ │ │ │ ├── DarwinImageLoader.swift
│ │ │ │ ├── DHCPLeaseParser.swift
│ │ │ │ ├── ImageLoaderFactory.swift
│ │ │ │ └── VMVirtualizationService.swift
│ │ │ ├── VM
│ │ │ │ ├── DarwinVM.swift
│ │ │ │ ├── LinuxVM.swift
│ │ │ │ ├── VM.swift
│ │ │ │ ├── VMDetails.swift
│ │ │ │ ├── VMDetailsPrinter.swift
│ │ │ │ ├── VMDisplayResolution.swift
│ │ │ │ └── VMFactory.swift
│ │ │ └── VNC
│ │ │ ├── PassphraseGenerator.swift
│ │ │ └── VNCService.swift
│ │ └── tests
│ │ ├── Mocks
│ │ │ ├── MockVM.swift
│ │ │ ├── MockVMVirtualizationService.swift
│ │ │ └── MockVNCService.swift
│ │ ├── VM
│ │ │ └── VMDetailsPrinterTests.swift
│ │ ├── VMTests.swift
│ │ ├── VMVirtualizationServiceTests.swift
│ │ └── VNCServiceTests.swift
│ ├── lumier
│ │ ├── .dockerignore
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── bin
│ │ │ └── entry.sh
│ │ ├── config
│ │ │ └── constants.sh
│ │ ├── hooks
│ │ │ └── on-logon.sh
│ │ └── lib
│ │ ├── utils.sh
│ │ └── vm.sh
│ ├── python
│ │ ├── agent
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── agent
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── adapters
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── cua_adapter.py
│ │ │ │ │ ├── huggingfacelocal_adapter.py
│ │ │ │ │ ├── human_adapter.py
│ │ │ │ │ ├── mlxvlm_adapter.py
│ │ │ │ │ └── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ └── qwen2_5_vl.py
│ │ │ │ ├── agent.py
│ │ │ │ ├── callbacks
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── budget_manager.py
│ │ │ │ │ ├── image_retention.py
│ │ │ │ │ ├── logging.py
│ │ │ │ │ ├── operator_validator.py
│ │ │ │ │ ├── pii_anonymization.py
│ │ │ │ │ ├── prompt_instructions.py
│ │ │ │ │ ├── telemetry.py
│ │ │ │ │ └── trajectory_saver.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── computers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cua.py
│ │ │ │ │ └── custom.py
│ │ │ │ ├── decorators.py
│ │ │ │ ├── human_tool
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ ├── server.py
│ │ │ │ │ └── ui.py
│ │ │ │ ├── integrations
│ │ │ │ │ └── hud
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── agent.py
│ │ │ │ │ └── proxy.py
│ │ │ │ ├── loops
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── anthropic.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── composed_grounded.py
│ │ │ │ │ ├── gelato.py
│ │ │ │ │ ├── gemini.py
│ │ │ │ │ ├── generic_vlm.py
│ │ │ │ │ ├── glm45v.py
│ │ │ │ │ ├── gta1.py
│ │ │ │ │ ├── holo.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── model_types.csv
│ │ │ │ │ ├── moondream3.py
│ │ │ │ │ ├── omniparser.py
│ │ │ │ │ ├── openai.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ ├── uiins.py
│ │ │ │ │ ├── uitars.py
│ │ │ │ │ └── uitars2.py
│ │ │ │ ├── proxy
│ │ │ │ │ ├── examples.py
│ │ │ │ │ └── handlers.py
│ │ │ │ ├── responses.py
│ │ │ │ ├── tools
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── browser_tool.py
│ │ │ │ ├── types.py
│ │ │ │ └── ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ └── gradio
│ │ │ │ ├── __init__.py
│ │ │ │ ├── app.py
│ │ │ │ └── ui_components.py
│ │ │ ├── benchmarks
│ │ │ │ ├── .gitignore
│ │ │ │ ├── contrib.md
│ │ │ │ ├── interactive.py
│ │ │ │ ├── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ └── gta1.py
│ │ │ │ ├── README.md
│ │ │ │ ├── ss-pro.py
│ │ │ │ ├── ss-v2.py
│ │ │ │ └── utils.py
│ │ │ ├── example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer_agent.py
│ │ ├── bench-ui
│ │ │ ├── bench_ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api.py
│ │ │ │ └── child.py
│ │ │ ├── examples
│ │ │ │ ├── folder_example.py
│ │ │ │ ├── gui
│ │ │ │ │ ├── index.html
│ │ │ │ │ ├── logo.svg
│ │ │ │ │ └── styles.css
│ │ │ │ ├── output_overlay.png
│ │ │ │ └── simple_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ └── test_port_detection.py
│ │ ├── computer
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer
│ │ │ │ ├── __init__.py
│ │ │ │ ├── computer.py
│ │ │ │ ├── diorama_computer.py
│ │ │ │ ├── helpers.py
│ │ │ │ ├── interface
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ ├── models.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── logger.py
│ │ │ │ ├── models.py
│ │ │ │ ├── providers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cloud
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── docker
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── lume
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── lume_api.py
│ │ │ │ │ ├── lumier
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── types.py
│ │ │ │ │ └── winsandbox
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── provider.py
│ │ │ │ │ └── setup_script.ps1
│ │ │ │ ├── tracing_wrapper.py
│ │ │ │ ├── tracing.py
│ │ │ │ ├── ui
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ └── gradio
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── app.py
│ │ │ │ └── utils.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer.py
│ │ ├── computer-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── browser.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── diorama
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── diorama_computer.py
│ │ │ │ │ ├── diorama.py
│ │ │ │ │ ├── draw.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── safezone.py
│ │ │ │ ├── handlers
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── main.py
│ │ │ │ ├── server.py
│ │ │ │ ├── utils
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── wallpaper.py
│ │ │ │ └── watchdog.py
│ │ │ ├── examples
│ │ │ │ ├── __init__.py
│ │ │ │ └── usage_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ ├── run_server.py
│ │ │ ├── test_connection.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_server.py
│ │ ├── core
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── core
│ │ │ │ ├── __init__.py
│ │ │ │ └── telemetry
│ │ │ │ ├── __init__.py
│ │ │ │ └── posthog.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_telemetry.py
│ │ ├── mcp-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── build-extension.py
│ │ │ ├── CONCURRENT_SESSIONS.md
│ │ │ ├── desktop-extension
│ │ │ │ ├── cua-extension.mcpb
│ │ │ │ ├── desktop_extension.png
│ │ │ │ ├── manifest.json
│ │ │ │ ├── README.md
│ │ │ │ ├── requirements.txt
│ │ │ │ ├── run_server.sh
│ │ │ │ └── setup.py
│ │ │ ├── mcp_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── server.py
│ │ │ │ └── session_manager.py
│ │ │ ├── pdm.lock
│ │ │ ├── pyproject.toml
│ │ │ ├── QUICK_TEST_COMMANDS.sh
│ │ │ ├── quick_test_local_option.py
│ │ │ ├── README.md
│ │ │ ├── scripts
│ │ │ │ ├── install_mcp_server.sh
│ │ │ │ └── start_mcp_server.sh
│ │ │ ├── test_mcp_server_local_option.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_mcp_server.py
│ │ ├── pylume
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_pylume.py
│ │ └── som
│ │ ├── .bumpversion.cfg
│ │ ├── LICENSE
│ │ ├── poetry.toml
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ ├── som
│ │ │ ├── __init__.py
│ │ │ ├── detect.py
│ │ │ ├── detection.py
│ │ │ ├── models.py
│ │ │ ├── ocr.py
│ │ │ ├── util
│ │ │ │ └── utils.py
│ │ │ └── visualization.py
│ │ └── tests
│ │ ├── conftest.py
│ │ └── test_omniparser.py
│ ├── qemu-docker
│ │ ├── linux
│ │ │ ├── Dockerfile
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ ├── entry.sh
│ │ │ └── vm
│ │ │ ├── image
│ │ │ │ └── README.md
│ │ │ └── setup
│ │ │ ├── install.sh
│ │ │ ├── setup-cua-server.sh
│ │ │ └── setup.sh
│ │ ├── README.md
│ │ └── windows
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── entry.sh
│ │ └── vm
│ │ ├── image
│ │ │ └── README.md
│ │ └── setup
│ │ ├── install.bat
│ │ ├── on-logon.ps1
│ │ ├── setup-cua-server.ps1
│ │ ├── setup-utils.psm1
│ │ └── setup.ps1
│ ├── typescript
│ │ ├── .gitignore
│ │ ├── .nvmrc
│ │ ├── agent
│ │ │ ├── examples
│ │ │ │ ├── playground-example.html
│ │ │ │ └── README.md
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── client.ts
│ │ │ │ ├── index.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ └── client.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── computer
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── computer
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── providers
│ │ │ │ │ │ ├── base.ts
│ │ │ │ │ │ ├── cloud.ts
│ │ │ │ │ │ └── index.ts
│ │ │ │ │ └── types.ts
│ │ │ │ ├── index.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── base.ts
│ │ │ │ │ ├── factory.ts
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── linux.ts
│ │ │ │ │ ├── macos.ts
│ │ │ │ │ └── windows.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ ├── computer
│ │ │ │ │ └── cloud.test.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── factory.test.ts
│ │ │ │ │ ├── index.test.ts
│ │ │ │ │ ├── linux.test.ts
│ │ │ │ │ ├── macos.test.ts
│ │ │ │ │ └── windows.test.ts
│ │ │ │ └── setup.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── core
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── index.ts
│ │ │ │ └── telemetry
│ │ │ │ ├── clients
│ │ │ │ │ ├── index.ts
│ │ │ │ │ └── posthog.ts
│ │ │ │ └── index.ts
│ │ │ ├── tests
│ │ │ │ └── telemetry.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── cua-cli
│ │ │ ├── .gitignore
│ │ │ ├── .prettierrc
│ │ │ ├── bun.lock
│ │ │ ├── CLAUDE.md
│ │ │ ├── index.ts
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── auth.ts
│ │ │ │ ├── cli.ts
│ │ │ │ ├── commands
│ │ │ │ │ ├── auth.ts
│ │ │ │ │ └── sandbox.ts
│ │ │ │ ├── config.ts
│ │ │ │ ├── http.ts
│ │ │ │ ├── storage.ts
│ │ │ │ └── util.ts
│ │ │ └── tsconfig.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── pnpm-workspace.yaml
│ │ └── README.md
│ └── xfce
│ ├── .dockerignore
│ ├── .gitignore
│ ├── Development.md
│ ├── Dockerfile
│ ├── Dockerfile.dev
│ ├── README.md
│ └── src
│ ├── scripts
│ │ ├── resize-display.sh
│ │ ├── start-computer-server.sh
│ │ ├── start-novnc.sh
│ │ ├── start-vnc.sh
│ │ └── xstartup.sh
│ ├── supervisor
│ │ └── supervisord.conf
│ └── xfce-config
│ ├── helpers.rc
│ ├── xfce4-power-manager.xml
│ └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│ ├── agent_nb.ipynb
│ ├── blog
│ │ ├── build-your-own-operator-on-macos-1.ipynb
│ │ └── build-your-own-operator-on-macos-2.ipynb
│ ├── composite_agents_docker_nb.ipynb
│ ├── computer_nb.ipynb
│ ├── computer_server_nb.ipynb
│ ├── customizing_computeragent.ipynb
│ ├── eval_osworld.ipynb
│ ├── ollama_nb.ipynb
│ ├── README.md
│ ├── sota_hackathon_cloud.ipynb
│ └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── install-cli.ps1
│ ├── install-cli.sh
│ ├── playground-docker.sh
│ ├── playground.sh
│ ├── run-docker-dev.sh
│ └── typescript-typecheck.js
├── TESTING.md
├── tests
│ ├── agent_loop_testing
│ │ ├── agent_test.py
│ │ └── README.md
│ ├── pytest.ini
│ ├── shell_cmd.py
│ ├── test_files.py
│ ├── test_mcp_server_session_management.py
│ ├── test_mcp_server_streaming.py
│ ├── test_shell_bash.py
│ ├── test_telemetry.py
│ ├── test_tracing.py
│ ├── test_venv.py
│ └── test_watchdog.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/libs/lume/src/Virtualization/VMVirtualizationService.swift:
--------------------------------------------------------------------------------
```swift
import Foundation
import Virtualization
/// Framework-agnostic VM configuration
struct VMVirtualizationServiceContext {
let cpuCount: Int
let memorySize: UInt64
let display: String
let sharedDirectories: [SharedDirectory]?
let mount: Path?
let hardwareModel: Data?
let machineIdentifier: Data?
let macAddress: String
let diskPath: Path
let nvramPath: Path
let recoveryMode: Bool
let usbMassStoragePaths: [Path]?
}
/// Protocol defining the interface for virtualization operations
@MainActor
protocol VMVirtualizationService {
var state: VZVirtualMachine.State { get }
func start() async throws
func stop() async throws
func pause() async throws
func resume() async throws
func getVirtualMachine() -> Any
}
/// Base implementation of VMVirtualizationService using VZVirtualMachine
@MainActor
class BaseVirtualizationService: VMVirtualizationService {
let virtualMachine: VZVirtualMachine
let recoveryMode: Bool // Store whether we should start in recovery mode
var state: VZVirtualMachine.State {
virtualMachine.state
}
init(virtualMachine: VZVirtualMachine, recoveryMode: Bool = false) {
self.virtualMachine = virtualMachine
self.recoveryMode = recoveryMode
}
func start() async throws {
try await withCheckedThrowingContinuation {
(continuation: CheckedContinuation<Void, Error>) in
Task { @MainActor in
if #available(macOS 13, *) {
let startOptions = VZMacOSVirtualMachineStartOptions()
startOptions.startUpFromMacOSRecovery = recoveryMode
if recoveryMode {
Logger.info("Starting VM in recovery mode")
}
virtualMachine.start(options: startOptions) { error in
if let error = error {
continuation.resume(throwing: error)
} else {
continuation.resume()
}
}
} else {
Logger.info("Starting VM in normal mode")
virtualMachine.start { result in
switch result {
case .success:
continuation.resume()
case .failure(let error):
continuation.resume(throwing: error)
}
}
}
}
}
}
func stop() async throws {
try await withCheckedThrowingContinuation {
(continuation: CheckedContinuation<Void, Error>) in
virtualMachine.stop { error in
if let error = error {
continuation.resume(throwing: error)
} else {
continuation.resume()
}
}
}
}
func pause() async throws {
try await withCheckedThrowingContinuation {
(continuation: CheckedContinuation<Void, Error>) in
virtualMachine.start { result in
switch result {
case .success:
continuation.resume()
case .failure(let error):
continuation.resume(throwing: error)
}
}
}
}
func resume() async throws {
try await withCheckedThrowingContinuation {
(continuation: CheckedContinuation<Void, Error>) in
virtualMachine.start { result in
switch result {
case .success:
continuation.resume()
case .failure(let error):
continuation.resume(throwing: error)
}
}
}
}
func getVirtualMachine() -> Any {
return virtualMachine
}
// Helper methods for creating common configurations
static func createStorageDeviceConfiguration(diskPath: Path, readOnly: Bool = false) throws
-> VZStorageDeviceConfiguration
{
return VZVirtioBlockDeviceConfiguration(
attachment: try VZDiskImageStorageDeviceAttachment(
url: diskPath.url,
readOnly: readOnly,
cachingMode: VZDiskImageCachingMode.automatic,
synchronizationMode: VZDiskImageSynchronizationMode.fsync
)
)
}
static func createUSBMassStorageDeviceConfiguration(diskPath: Path, readOnly: Bool = false)
throws
-> VZStorageDeviceConfiguration
{
if #available(macOS 15.0, *) {
return VZUSBMassStorageDeviceConfiguration(
attachment: try VZDiskImageStorageDeviceAttachment(
url: diskPath.url,
readOnly: readOnly,
cachingMode: VZDiskImageCachingMode.automatic,
synchronizationMode: VZDiskImageSynchronizationMode.fsync
)
)
} else {
// Fallback to normal storage device if USB mass storage not available
return try createStorageDeviceConfiguration(diskPath: diskPath, readOnly: readOnly)
}
}
static func createNetworkDeviceConfiguration(macAddress: String) throws
-> VZNetworkDeviceConfiguration
{
let network = VZVirtioNetworkDeviceConfiguration()
guard let vzMacAddress = VZMACAddress(string: macAddress) else {
throw VMConfigError.invalidMachineIdentifier
}
network.attachment = VZNATNetworkDeviceAttachment()
network.macAddress = vzMacAddress
return network
}
static func createDirectorySharingDevices(sharedDirectories: [SharedDirectory]?)
-> [VZDirectorySharingDeviceConfiguration]
{
return sharedDirectories?.map { sharedDir in
let device = VZVirtioFileSystemDeviceConfiguration(tag: sharedDir.tag)
let url = URL(fileURLWithPath: sharedDir.hostPath)
device.share = VZSingleDirectoryShare(
directory: VZSharedDirectory(url: url, readOnly: sharedDir.readOnly))
return device
} ?? []
}
}
/// macOS-specific virtualization service
@MainActor
final class DarwinVirtualizationService: BaseVirtualizationService {
static func createConfiguration(_ config: VMVirtualizationServiceContext) throws
-> VZVirtualMachineConfiguration
{
let vzConfig = VZVirtualMachineConfiguration()
vzConfig.cpuCount = config.cpuCount
vzConfig.memorySize = config.memorySize
// Platform configuration
guard let machineIdentifier = config.machineIdentifier else {
throw VMConfigError.emptyMachineIdentifier
}
guard let hardwareModel = config.hardwareModel else {
throw VMConfigError.emptyHardwareModel
}
let platform = VZMacPlatformConfiguration()
platform.auxiliaryStorage = VZMacAuxiliaryStorage(url: config.nvramPath.url)
Logger.info("Pre-VZMacHardwareModel: hardwareModel=\(hardwareModel)")
guard let vzHardwareModel = VZMacHardwareModel(dataRepresentation: hardwareModel) else {
throw VMConfigError.invalidHardwareModel
}
platform.hardwareModel = vzHardwareModel
guard
let vzMachineIdentifier = VZMacMachineIdentifier(dataRepresentation: machineIdentifier)
else {
throw VMConfigError.invalidMachineIdentifier
}
platform.machineIdentifier = vzMachineIdentifier
vzConfig.platform = platform
vzConfig.bootLoader = VZMacOSBootLoader()
// Graphics configuration
let display = VMDisplayResolution(string: config.display)!
let graphics = VZMacGraphicsDeviceConfiguration()
graphics.displays = [
VZMacGraphicsDisplayConfiguration(
widthInPixels: display.width,
heightInPixels: display.height,
pixelsPerInch: 220 // Retina display density
)
]
vzConfig.graphicsDevices = [graphics]
// Common configurations
vzConfig.keyboards = [VZUSBKeyboardConfiguration()]
vzConfig.pointingDevices = [VZUSBScreenCoordinatePointingDeviceConfiguration()]
var storageDevices = [try createStorageDeviceConfiguration(diskPath: config.diskPath)]
if let mount = config.mount {
storageDevices.append(
try createStorageDeviceConfiguration(diskPath: mount, readOnly: true))
}
// Add USB mass storage devices if specified
if #available(macOS 15.0, *), let usbPaths = config.usbMassStoragePaths, !usbPaths.isEmpty {
for usbPath in usbPaths {
storageDevices.append(
try createUSBMassStorageDeviceConfiguration(diskPath: usbPath, readOnly: true))
}
}
vzConfig.storageDevices = storageDevices
vzConfig.networkDevices = [
try createNetworkDeviceConfiguration(macAddress: config.macAddress)
]
vzConfig.memoryBalloonDevices = [VZVirtioTraditionalMemoryBalloonDeviceConfiguration()]
vzConfig.entropyDevices = [VZVirtioEntropyDeviceConfiguration()]
// Audio configuration
let soundDeviceConfiguration = VZVirtioSoundDeviceConfiguration()
let inputAudioStreamConfiguration = VZVirtioSoundDeviceInputStreamConfiguration()
let outputAudioStreamConfiguration = VZVirtioSoundDeviceOutputStreamConfiguration()
inputAudioStreamConfiguration.source = VZHostAudioInputStreamSource()
outputAudioStreamConfiguration.sink = VZHostAudioOutputStreamSink()
soundDeviceConfiguration.streams = [inputAudioStreamConfiguration, outputAudioStreamConfiguration]
vzConfig.audioDevices = [soundDeviceConfiguration]
// Clipboard sharing via Spice agent
let spiceAgentConsoleDevice = VZVirtioConsoleDeviceConfiguration()
let spiceAgentPort = VZVirtioConsolePortConfiguration()
spiceAgentPort.name = VZSpiceAgentPortAttachment.spiceAgentPortName
let spiceAgentPortAttachment = VZSpiceAgentPortAttachment()
spiceAgentPortAttachment.sharesClipboard = true
spiceAgentPort.attachment = spiceAgentPortAttachment
spiceAgentConsoleDevice.ports[0] = spiceAgentPort
vzConfig.consoleDevices.append(spiceAgentConsoleDevice)
// Directory sharing
let directorySharingDevices = createDirectorySharingDevices(
sharedDirectories: config.sharedDirectories)
if !directorySharingDevices.isEmpty {
vzConfig.directorySharingDevices = directorySharingDevices
}
// USB Controller configuration
if #available(macOS 15.0, *) {
let usbControllerConfiguration = VZXHCIControllerConfiguration()
vzConfig.usbControllers = [usbControllerConfiguration]
}
try vzConfig.validate()
return vzConfig
}
static func generateMacAddress() -> String {
VZMACAddress.randomLocallyAdministered().string
}
static func generateMachineIdentifier() -> Data {
VZMacMachineIdentifier().dataRepresentation
}
func createAuxiliaryStorage(at path: Path, hardwareModel: Data) throws {
guard let vzHardwareModel = VZMacHardwareModel(dataRepresentation: hardwareModel) else {
throw VMConfigError.invalidHardwareModel
}
_ = try VZMacAuxiliaryStorage(creatingStorageAt: path.url, hardwareModel: vzHardwareModel)
}
init(configuration: VMVirtualizationServiceContext) throws {
let vzConfig = try Self.createConfiguration(configuration)
super.init(
virtualMachine: VZVirtualMachine(configuration: vzConfig),
recoveryMode: configuration.recoveryMode)
}
func installMacOS(imagePath: Path, progressHandler: (@Sendable (Double) -> Void)?) async throws
{
var observers: [NSKeyValueObservation] = [] // must hold observer references during installation to print process
try await withCheckedThrowingContinuation {
(continuation: CheckedContinuation<Void, Error>) in
Task {
let installer = VZMacOSInstaller(
virtualMachine: virtualMachine, restoringFromImageAt: imagePath.url)
Logger.info("Starting macOS installation")
if let progressHandler = progressHandler {
let observer = installer.progress.observe(
\.fractionCompleted, options: [.initial, .new]
) { (progress, change) in
if let newValue = change.newValue {
progressHandler(newValue)
}
}
observers.append(observer)
}
installer.install { result in
switch result {
case .success:
continuation.resume()
case .failure(let error):
Logger.error("Failed to install, error=\(error))")
continuation.resume(throwing: error)
}
}
}
}
Logger.info("macOS installation finished")
}
}
/// Linux-specific virtualization service
@MainActor
final class LinuxVirtualizationService: BaseVirtualizationService {
static func createConfiguration(_ config: VMVirtualizationServiceContext) throws
-> VZVirtualMachineConfiguration
{
let vzConfig = VZVirtualMachineConfiguration()
vzConfig.cpuCount = config.cpuCount
vzConfig.memorySize = config.memorySize
// Platform configuration
let platform = VZGenericPlatformConfiguration()
if #available(macOS 15, *) {
platform.isNestedVirtualizationEnabled =
VZGenericPlatformConfiguration.isNestedVirtualizationSupported
}
vzConfig.platform = platform
let bootLoader = VZEFIBootLoader()
bootLoader.variableStore = VZEFIVariableStore(url: config.nvramPath.url)
vzConfig.bootLoader = bootLoader
// Graphics configuration
let display = VMDisplayResolution(string: config.display)!
let graphics = VZVirtioGraphicsDeviceConfiguration()
graphics.scanouts = [
VZVirtioGraphicsScanoutConfiguration(
widthInPixels: display.width,
heightInPixels: display.height
)
]
vzConfig.graphicsDevices = [graphics]
// Common configurations
vzConfig.keyboards = [VZUSBKeyboardConfiguration()]
vzConfig.pointingDevices = [VZUSBScreenCoordinatePointingDeviceConfiguration()]
var storageDevices = [try createStorageDeviceConfiguration(diskPath: config.diskPath)]
if let mount = config.mount {
storageDevices.append(
try createStorageDeviceConfiguration(diskPath: mount, readOnly: true))
}
// Add USB mass storage devices if specified
if #available(macOS 15.0, *), let usbPaths = config.usbMassStoragePaths, !usbPaths.isEmpty {
for usbPath in usbPaths {
storageDevices.append(
try createUSBMassStorageDeviceConfiguration(diskPath: usbPath, readOnly: true))
}
}
vzConfig.storageDevices = storageDevices
vzConfig.networkDevices = [
try createNetworkDeviceConfiguration(macAddress: config.macAddress)
]
vzConfig.memoryBalloonDevices = [VZVirtioTraditionalMemoryBalloonDeviceConfiguration()]
vzConfig.entropyDevices = [VZVirtioEntropyDeviceConfiguration()]
// Audio configuration
let soundDeviceConfiguration = VZVirtioSoundDeviceConfiguration()
let inputAudioStreamConfiguration = VZVirtioSoundDeviceInputStreamConfiguration()
let outputAudioStreamConfiguration = VZVirtioSoundDeviceOutputStreamConfiguration()
inputAudioStreamConfiguration.source = VZHostAudioInputStreamSource()
outputAudioStreamConfiguration.sink = VZHostAudioOutputStreamSink()
soundDeviceConfiguration.streams = [inputAudioStreamConfiguration, outputAudioStreamConfiguration]
vzConfig.audioDevices = [soundDeviceConfiguration]
// Clipboard sharing via Spice agent
let spiceAgentConsoleDevice = VZVirtioConsoleDeviceConfiguration()
let spiceAgentPort = VZVirtioConsolePortConfiguration()
spiceAgentPort.name = VZSpiceAgentPortAttachment.spiceAgentPortName
let spiceAgentPortAttachment = VZSpiceAgentPortAttachment()
spiceAgentPortAttachment.sharesClipboard = true
spiceAgentPort.attachment = spiceAgentPortAttachment
spiceAgentConsoleDevice.ports[0] = spiceAgentPort
vzConfig.consoleDevices.append(spiceAgentConsoleDevice)
// Directory sharing
var directorySharingDevices = createDirectorySharingDevices(
sharedDirectories: config.sharedDirectories)
// Add Rosetta support if available
if #available(macOS 13.0, *) {
if VZLinuxRosettaDirectoryShare.availability == .installed {
do {
let rosettaShare = try VZLinuxRosettaDirectoryShare()
let rosettaDevice = VZVirtioFileSystemDeviceConfiguration(tag: "rosetta")
rosettaDevice.share = rosettaShare
directorySharingDevices.append(rosettaDevice)
Logger.info("Added Rosetta support to Linux VM")
} catch {
Logger.info("Failed to add Rosetta support: \(error.localizedDescription)")
}
} else {
Logger.info("Rosetta not installed, skipping Rosetta support")
}
}
if !directorySharingDevices.isEmpty {
vzConfig.directorySharingDevices = directorySharingDevices
}
// USB Controller configuration
if #available(macOS 15.0, *) {
let usbControllerConfiguration = VZXHCIControllerConfiguration()
vzConfig.usbControllers = [usbControllerConfiguration]
}
try vzConfig.validate()
return vzConfig
}
func generateMacAddress() -> String {
VZMACAddress.randomLocallyAdministered().string
}
func createNVRAM(at path: Path) throws {
_ = try VZEFIVariableStore(creatingVariableStoreAt: path.url)
}
init(configuration: VMVirtualizationServiceContext) throws {
let vzConfig = try Self.createConfiguration(configuration)
super.init(virtualMachine: VZVirtualMachine(configuration: vzConfig))
}
}
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/omniparser.py:
--------------------------------------------------------------------------------
```python
"""
OpenAI computer-use-preview agent loop implementation using liteLLM
Paper: https://arxiv.org/abs/2408.00203
Code: https://github.com/microsoft/OmniParser
"""
import asyncio
import base64
import inspect
import json
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union
import litellm
from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..responses import (
convert_completion_messages_to_responses_items,
convert_responses_items_to_completion_messages,
)
from ..types import AgentCapability, AgentResponse, Messages, Tools
SOM_TOOL_SCHEMA = {
"type": "function",
"function": {
"name": "computer",
"description": "Control a computer by taking screenshots and interacting with UI elements. This tool shows screenshots with numbered elements overlaid on them. Each UI element has been assigned a unique ID number that you can see in the image. Use the element's ID number to interact with any element instead of pixel coordinates.",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [
"screenshot",
"click",
"double_click",
"drag",
"type",
"keypress",
"scroll",
"move",
"wait",
"get_current_url",
"get_dimensions",
"get_environment",
],
"description": "The action to perform",
},
"element_id": {
"type": "integer",
"description": "The ID of the element to interact with (required for click, double_click, move, scroll actions, and as start/end for drag)",
},
"start_element_id": {
"type": "integer",
"description": "The ID of the element to start dragging from (required for drag action)",
},
"end_element_id": {
"type": "integer",
"description": "The ID of the element to drag to (required for drag action)",
},
"text": {
"type": "string",
"description": "The text to type (required for type action)",
},
"keys": {
"type": "string",
"description": "Key combination to press (required for keypress action). Single key for individual key press, multiple keys for combinations (e.g., 'ctrl+c')",
},
"button": {
"type": "string",
"description": "The mouse button to use for click action (left, right, wheel, back, forward) Default: left",
},
"scroll_x": {
"type": "integer",
"description": "Horizontal scroll amount for scroll action (positive for right, negative for left)",
},
"scroll_y": {
"type": "integer",
"description": "Vertical scroll amount for scroll action (positive for down, negative for up)",
},
},
"required": ["action", "element_id"],
},
},
}
OMNIPARSER_AVAILABLE = False
try:
from som import OmniParser
OMNIPARSER_AVAILABLE = True
except ImportError:
pass
OMNIPARSER_SINGLETON = None
def get_parser():
global OMNIPARSER_SINGLETON
if OMNIPARSER_SINGLETON is None:
OMNIPARSER_SINGLETON = OmniParser()
return OMNIPARSER_SINGLETON
def get_last_computer_call_output(messages: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Get the last computer_call_output message from a messages list.
Args:
messages: List of messages to search through
Returns:
The last computer_call_output message dict, or None if not found
"""
for message in reversed(messages):
if isinstance(message, dict) and message.get("type") == "computer_call_output":
return message
return None
def _prepare_tools_for_omniparser(tool_schemas: List[Dict[str, Any]]) -> Tuple[Tools, dict]:
"""Prepare tools for OpenAI API format"""
omniparser_tools = []
id2xy = dict()
for schema in tool_schemas:
if schema["type"] == "computer":
omniparser_tools.append(SOM_TOOL_SCHEMA)
if "id2xy" in schema:
id2xy = schema["id2xy"]
else:
schema["id2xy"] = id2xy
elif schema["type"] == "function":
# Function tools use OpenAI-compatible schema directly (liteLLM expects this format)
# Schema should be: {type, name, description, parameters}
omniparser_tools.append({"type": "function", **schema["function"]})
return omniparser_tools, id2xy
async def replace_function_with_computer_call(
item: Dict[str, Any], id2xy: Dict[int, Tuple[float, float]]
):
item_type = item.get("type")
def _get_xy(element_id: Optional[int]) -> Union[Tuple[float, float], Tuple[None, None]]:
if element_id is None:
return (None, None)
return id2xy.get(element_id, (None, None))
if item_type == "function_call":
fn_name = item.get("name")
fn_args = json.loads(item.get("arguments", "{}"))
item_id = item.get("id")
call_id = item.get("call_id")
if fn_name == "computer":
action = fn_args.get("action")
element_id = fn_args.get("element_id")
start_element_id = fn_args.get("start_element_id")
end_element_id = fn_args.get("end_element_id")
text = fn_args.get("text")
keys = fn_args.get("keys")
button = fn_args.get("button")
scroll_x = fn_args.get("scroll_x")
scroll_y = fn_args.get("scroll_y")
x, y = _get_xy(element_id)
start_x, start_y = _get_xy(start_element_id)
end_x, end_y = _get_xy(end_element_id)
action_args = {
"type": action,
"x": x,
"y": y,
"start_x": start_x,
"start_y": start_y,
"end_x": end_x,
"end_y": end_y,
"text": text,
"keys": keys,
"button": button,
"scroll_x": scroll_x,
"scroll_y": scroll_y,
}
# Remove None values to keep the JSON clean
action_args = {k: v for k, v in action_args.items() if v is not None}
return [
{
"type": "computer_call",
"action": action_args,
"id": item_id,
"call_id": call_id,
"status": "completed",
}
]
return [item]
async def replace_computer_call_with_function(
item: Dict[str, Any], xy2id: Dict[Tuple[float, float], int]
):
"""
Convert computer_call back to function_call format.
Also handles computer_call_output -> function_call_output conversion.
Args:
item: The item to convert
xy2id: Mapping from (x, y) coordinates to element IDs
"""
item_type = item.get("type")
def _get_element_id(x: Optional[float], y: Optional[float]) -> Optional[int]:
"""Get element ID from coordinates, return None if coordinates are None"""
if x is None or y is None:
return None
return xy2id.get((x, y))
if item_type == "computer_call":
action_data = item.get("action", {})
# Extract coordinates and convert back to element IDs
element_id = _get_element_id(action_data.get("x"), action_data.get("y"))
start_element_id = _get_element_id(action_data.get("start_x"), action_data.get("start_y"))
end_element_id = _get_element_id(action_data.get("end_x"), action_data.get("end_y"))
# Build function arguments
fn_args = {
"action": action_data.get("type"),
"element_id": element_id,
"start_element_id": start_element_id,
"end_element_id": end_element_id,
"text": action_data.get("text"),
"keys": action_data.get("keys"),
"button": action_data.get("button"),
"scroll_x": action_data.get("scroll_x"),
"scroll_y": action_data.get("scroll_y"),
}
# Remove None values to keep the JSON clean
fn_args = {k: v for k, v in fn_args.items() if v is not None}
return [
{
"type": "function_call",
"name": "computer",
"arguments": json.dumps(fn_args),
"id": item.get("id"),
"call_id": item.get("call_id"),
"status": "completed",
}
]
elif item_type == "computer_call_output":
output = item.get("output")
if isinstance(output, dict):
output = [output]
return [
{
"type": "function_call_output",
"call_id": item.get("call_id"),
"output": item.get("output"),
"id": item.get("id"),
"status": "completed",
}
]
return [item]
@register_agent(models=r"omniparser\+.*|omni\+.*", priority=2)
class OmniparserConfig(AsyncAgentConfig):
"""Omniparser agent configuration implementing AsyncAgentConfig protocol."""
async def predict_step(
self,
messages: List[Dict[str, Any]],
model: str,
tools: Optional[List[Dict[str, Any]]] = None,
max_retries: Optional[int] = None,
stream: bool = False,
computer_handler=None,
use_prompt_caching: Optional[bool] = False,
_on_api_start=None,
_on_api_end=None,
_on_usage=None,
_on_screenshot=None,
**kwargs,
) -> Dict[str, Any]:
"""
OpenAI computer-use-preview agent loop using liteLLM responses.
Supports OpenAI's computer use preview models.
"""
if not OMNIPARSER_AVAILABLE:
raise ValueError(
"omniparser loop requires som to be installed. Install it with `pip install cua-som`."
)
tools = tools or []
llm_model = model.split("+")[-1]
# Get screen dimensions from computer handler
try:
width, height = await computer_handler.get_dimensions()
except Exception:
# Fallback to default dimensions if method fails
width, height = 1024, 768
# Prepare tools for OpenAI API
openai_tools, id2xy = _prepare_tools_for_omniparser(tools)
# Find last computer_call_output
last_computer_call_output = get_last_computer_call_output(messages) # type: ignore
if last_computer_call_output:
image_url = last_computer_call_output.get("output", {}).get("image_url", "")
image_data = image_url.split(",")[-1]
if image_data:
parser = get_parser()
result = parser.parse(image_data)
if _on_screenshot:
await _on_screenshot(result.annotated_image_base64, "annotated_image")
# Convert OmniParser normalized coordinates (0-1) to absolute pixels, convert to pixels
for element in result.elements:
norm_x = (element.bbox.x1 + element.bbox.x2) / 2
norm_y = (element.bbox.y1 + element.bbox.y2) / 2
pixel_x = int(norm_x * width)
pixel_y = int(norm_y * height)
id2xy[element.id] = (pixel_x, pixel_y)
# Replace the original screenshot with the annotated image
annotated_image_url = f"data:image/png;base64,{result.annotated_image_base64}"
last_computer_call_output["output"]["image_url"] = annotated_image_url
xy2id = {v: k for k, v in id2xy.items()}
messages_with_element_ids = []
for i, message in enumerate(messages):
if not isinstance(message, dict):
message = message.__dict__
msg_type = message.get("type")
if msg_type == "computer_call" and "action" in message:
action = message.get("action", {})
converted = await replace_computer_call_with_function(message, xy2id) # type: ignore
messages_with_element_ids += converted
completion_messages = convert_responses_items_to_completion_messages(
messages_with_element_ids, allow_images_in_tool_results=False
)
# Prepare API call kwargs
api_kwargs = {
"model": llm_model,
"messages": completion_messages,
"tools": openai_tools if openai_tools else None,
"stream": stream,
"num_retries": max_retries,
**kwargs,
}
# Add Vertex AI specific parameters if using vertex_ai models
if llm_model.startswith("vertex_ai/"):
import os
# Pass vertex_project and vertex_location to liteLLM
if "vertex_project" not in api_kwargs:
api_kwargs["vertex_project"] = os.getenv("GOOGLE_CLOUD_PROJECT")
if "vertex_location" not in api_kwargs:
api_kwargs["vertex_location"] = "global"
# Pass through Gemini 3-specific parameters if provided
if "thinking_level" in kwargs:
api_kwargs["thinking_level"] = kwargs["thinking_level"]
if "media_resolution" in kwargs:
api_kwargs["media_resolution"] = kwargs["media_resolution"]
# Call API start hook
if _on_api_start:
await _on_api_start(api_kwargs)
print(str(api_kwargs)[:1000])
# Use liteLLM completion
response = await litellm.acompletion(**api_kwargs)
# Call API end hook
if _on_api_end:
await _on_api_end(api_kwargs, response)
# Extract usage information
usage = {
**response.usage.model_dump(), # type: ignore
"response_cost": response._hidden_params.get("response_cost", 0.0), # type: ignore
}
if _on_usage:
await _on_usage(usage)
response_dict = response.model_dump() # type: ignore
choice_messages = [choice["message"] for choice in response_dict["choices"]]
responses_items = []
for choice_message in choice_messages:
responses_items.extend(convert_completion_messages_to_responses_items([choice_message]))
# Convert element_id → x,y (similar to moondream's convert_computer_calls_desc2xy)
final_output = []
for item in responses_items:
if item.get("type") == "computer_call" and "action" in item:
action = item["action"].copy()
# Handle single element_id
if "element_id" in action:
element_id = action["element_id"]
if element_id in id2xy:
x, y = id2xy[element_id]
action["x"] = x
action["y"] = y
del action["element_id"]
# Handle start_element_id and end_element_id for drag operations
elif "start_element_id" in action and "end_element_id" in action:
start_id = action["start_element_id"]
end_id = action["end_element_id"]
if start_id in id2xy and end_id in id2xy:
start_x, start_y = id2xy[start_id]
end_x, end_y = id2xy[end_id]
action["path"] = [{"x": start_x, "y": start_y}, {"x": end_x, "y": end_y}]
del action["start_element_id"]
del action["end_element_id"]
converted_item = item.copy()
converted_item["action"] = action
final_output.append(converted_item)
else:
final_output.append(item)
return {"output": final_output, "usage": usage}
async def predict_click(
self, model: str, image_b64: str, instruction: str, **kwargs
) -> Optional[Tuple[float, float]]:
"""
Predict click coordinates using OmniParser and LLM.
Uses OmniParser to annotate the image with element IDs, then uses LLM
to identify the correct element ID based on the instruction.
"""
if not OMNIPARSER_AVAILABLE:
return None
# Parse the image with OmniParser to get annotated image and elements
parser = get_parser()
result = parser.parse(image_b64)
# Extract the LLM model from composed model string
llm_model = model.split("+")[-1]
# Create system prompt for element ID prediction
SYSTEM_PROMPT = """
You are an expert UI element locator. Given a GUI image annotated with numerical IDs over each interactable element, along with a user's element description, provide the ID of the specified element.
The image shows UI elements with numbered overlays. Each number corresponds to a clickable/interactable element.
Output only the element ID as a single integer.
""".strip()
# Prepare messages for LLM
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{result.annotated_image_base64}"
},
},
{"type": "text", "text": f"Find the element: {instruction}"},
],
},
]
# Call LLM to predict element ID
response = await litellm.acompletion(
model=llm_model, messages=messages, max_tokens=10, temperature=0.1
)
# Extract element ID from response
response_text = response.choices[0].message.content.strip() # type: ignore
# Try to parse the element ID
try:
element_id = int(response_text)
# Find the element with this ID and return its center coordinates
for element in result.elements:
if element.id == element_id:
center_x = (element.bbox.x1 + element.bbox.x2) / 2
center_y = (element.bbox.y1 + element.bbox.y2) / 2
return (center_x, center_y)
except ValueError:
# If we can't parse the ID, return None
pass
return None
def get_capabilities(self) -> List[AgentCapability]:
"""Return the capabilities supported by this agent."""
return ["step"]
```
--------------------------------------------------------------------------------
/docs/content/docs/example-usecases/windows-app-behind-vpn.mdx:
--------------------------------------------------------------------------------
```markdown
---
title: Windows App behind VPN
description: Automate legacy Windows desktop applications behind VPN with Cua
---
import { Step, Steps } from 'fumadocs-ui/components/steps';
import { Tab, Tabs } from 'fumadocs-ui/components/tabs';
## Overview
This guide demonstrates how to automate Windows desktop applications (like eGecko HR/payroll systems) that run behind corporate VPN. This is a common enterprise scenario where legacy desktop applications require manual data entry, report generation, or workflow execution.
**Use cases:**
- HR/payroll processing (employee onboarding, payroll runs, benefits administration)
- Desktop ERP systems behind corporate networks
- Legacy financial applications requiring VPN access
- Compliance reporting from on-premise systems
**Architecture:**
- Client-side Cua agent (Python SDK or Playground UI)
- Windows VM/Sandbox with VPN client configured
- RDP/remote desktop connection to target environment
- Desktop application automation via computer vision and UI control
<Callout type="info">
**Production Deployment**: For production use, consider workflow mining and custom finetuning to
create vertical-specific actions (e.g., "Run payroll", "Onboard employee") instead of generic UI
automation. This provides better audit trails and higher success rates.
</Callout>
---
## Video Demo
<div className="rounded-lg border bg-card text-card-foreground shadow-sm p-4 mb-6">
<video
src="https://github.com/user-attachments/assets/8ab07646-6018-4128-87ce-53180cfea696"
controls
className="w-full rounded"
>
Your browser does not support the video tag.
</video>
<div className="text-sm text-muted-foreground mt-2">
Demo showing Cua automating an eGecko-like desktop application on Windows behind AWS VPN
</div>
</div>
---
<Steps>
<Step>
### Set Up Your Environment
Install the required dependencies:
Create a `requirements.txt` file:
```text
cua-agent
cua-computer
python-dotenv>=1.0.0
```
Install the dependencies:
```bash
pip install -r requirements.txt
```
Create a `.env` file with your API keys:
```text
ANTHROPIC_API_KEY=your-anthropic-api-key
CUA_API_KEY=sk_cua-api01...
CUA_SANDBOX_NAME=your-windows-sandbox
```
</Step>
<Step>
### Configure Windows Sandbox with VPN
<Tabs items={['Cloud Sandbox (Recommended)', 'Windows Sandbox', 'Self-Hosted VM']}>
<Tab value="Cloud Sandbox (Recommended)">
For enterprise deployments, use Cua Cloud Sandbox with pre-configured VPN:
1. Go to [cua.ai/signin](https://cua.ai/signin)
2. Navigate to **Dashboard > Containers > Create Instance**
3. Create a **Windows** sandbox (Medium or Large for desktop apps)
4. Configure VPN settings:
- Upload your AWS VPN Client configuration (`.ovpn` file)
- Or configure VPN credentials directly in the dashboard
5. Note your sandbox name and API key
Your Windows sandbox will launch with VPN automatically connected.
</Tab>
<Tab value="Windows Sandbox">
For local development on Windows 10 Pro/Enterprise or Windows 11:
1. Enable [Windows Sandbox](https://learn.microsoft.com/en-us/windows/security/application-security/application-isolation/windows-sandbox/windows-sandbox-install)
2. Install the `pywinsandbox` dependency:
```bash
pip install -U git+git://github.com/karkason/pywinsandbox.git
```
3. Create a VPN setup script that runs on sandbox startup
4. Configure your desktop application installation within the sandbox
<Callout type="warn">
**Manual VPN Setup**: Windows Sandbox requires manual VPN configuration each time it starts. For
production use, consider Cloud Sandbox or self-hosted VMs with persistent VPN connections.
</Callout>
</Tab>
<Tab value="Self-Hosted VM">
For self-managed infrastructure:
1. Deploy Windows VM on your preferred cloud (AWS, Azure, GCP)
2. Install and configure VPN client (AWS VPN Client, OpenVPN, etc.)
3. Install target desktop application and any dependencies
4. Install `cua-computer-server`:
```bash
pip install cua-computer-server
python -m computer_server
```
5. Configure firewall rules to allow Cua agent connections
</Tab>
</Tabs>
</Step>
<Step>
### Create Your Automation Script
Create a Python file (e.g., `hr_automation.py`):
<Tabs items={['Cloud Sandbox', 'Windows Sandbox', 'Self-Hosted']}>
<Tab value="Cloud Sandbox">
```python
import asyncio
import logging
import os
from agent import ComputerAgent
from computer import Computer, VMProviderType
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
load_dotenv()
async def automate_hr_workflow():
"""
Automate HR/payroll desktop application workflow.
This example demonstrates:
- Launching Windows desktop application
- Navigating complex desktop UI
- Data entry and form filling
- Report generation and export
"""
try:
# Connect to Windows Cloud Sandbox with VPN
async with Computer(
os_type="windows",
provider_type=VMProviderType.CLOUD,
name=os.environ["CUA_SANDBOX_NAME"],
api_key=os.environ["CUA_API_KEY"],
verbosity=logging.INFO,
) as computer:
# Configure agent with specialized instructions
agent = ComputerAgent(
model="cua/anthropic/claude-sonnet-4.5",
tools=[computer],
only_n_most_recent_images=3,
verbosity=logging.INFO,
trajectory_dir="trajectories",
use_prompt_caching=True,
max_trajectory_budget=10.0,
instructions="""
You are automating a Windows desktop HR/payroll application.
IMPORTANT GUIDELINES:
- Always wait for windows and dialogs to fully load before interacting
- Look for loading indicators and wait for them to disappear
- Verify each action by checking on-screen confirmation messages
- If a button or field is not visible, try scrolling or navigating tabs
- Desktop apps often have nested menus - explore systematically
- Save work frequently using File > Save or Ctrl+S
- Before closing, always verify changes were saved
COMMON UI PATTERNS:
- Menu bar navigation (File, Edit, View, etc.)
- Ribbon interfaces with tabs
- Modal dialogs that block interaction
- Data grids/tables for viewing records
- Form fields with validation
- Status bars showing operation progress
""".strip()
)
# Define workflow tasks
tasks = [
"Launch the HR application from the desktop or start menu",
"Log in with the credentials shown in credentials.txt on the desktop",
"Navigate to Employee Management section",
"Create a new employee record with information from new_hire.xlsx on desktop",
"Verify the employee was created successfully by searching for their name",
"Generate an onboarding report for the new employee",
"Export the report as PDF to the desktop",
"Log out of the application"
]
history = []
for task in tasks:
logger.info(f"\n{'='*60}")
logger.info(f"Task: {task}")
logger.info(f"{'='*60}\n")
history.append({"role": "user", "content": task})
async for result in agent.run(history):
for item in result.get("output", []):
if item.get("type") == "message":
content = item.get("content", [])
for block in content:
if block.get("type") == "text":
response = block.get("text", "")
logger.info(f"Agent: {response}")
history.append({"role": "assistant", "content": response})
logger.info("\nTask completed. Moving to next task...\n")
logger.info("\n" + "="*60)
logger.info("All tasks completed successfully!")
logger.info("="*60)
except Exception as e:
logger.error(f"Error during automation: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(automate_hr_workflow())
```
</Tab>
<Tab value="Windows Sandbox">
```python
import asyncio
import logging
import os
from agent import ComputerAgent
from computer import Computer, VMProviderType
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
load_dotenv()
async def automate_hr_workflow():
try:
# Connect to Windows Sandbox
async with Computer(
os_type="windows",
provider_type=VMProviderType.WINDOWS_SANDBOX,
verbosity=logging.INFO,
) as computer:
agent = ComputerAgent(
model="cua/anthropic/claude-sonnet-4.5",
tools=[computer],
only_n_most_recent_images=3,
verbosity=logging.INFO,
trajectory_dir="trajectories",
use_prompt_caching=True,
max_trajectory_budget=10.0,
instructions="""
You are automating a Windows desktop HR/payroll application.
IMPORTANT GUIDELINES:
- Always wait for windows and dialogs to fully load before interacting
- Verify each action by checking on-screen confirmation messages
- Desktop apps often have nested menus - explore systematically
- Save work frequently using File > Save or Ctrl+S
""".strip()
)
tasks = [
"Launch the HR application from the desktop",
"Log in with credentials from credentials.txt on desktop",
"Navigate to Employee Management and create new employee from new_hire.xlsx",
"Generate and export onboarding report as PDF",
"Log out of the application"
]
history = []
for task in tasks:
logger.info(f"\nTask: {task}")
history.append({"role": "user", "content": task})
async for result in agent.run(history):
for item in result.get("output", []):
if item.get("type") == "message":
content = item.get("content", [])
for block in content:
if block.get("type") == "text":
response = block.get("text", "")
logger.info(f"Agent: {response}")
history.append({"role": "assistant", "content": response})
logger.info("\nAll tasks completed!")
except Exception as e:
logger.error(f"Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(automate_hr_workflow())
```
</Tab>
<Tab value="Self-Hosted">
```python
import asyncio
import logging
import os
from agent import ComputerAgent
from computer import Computer
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
load_dotenv()
async def automate_hr_workflow():
try:
# Connect to self-hosted Windows VM running computer-server
async with Computer(
use_host_computer_server=True,
base_url="http://your-windows-vm-ip:5757", # Update with your VM IP
verbosity=logging.INFO,
) as computer:
agent = ComputerAgent(
model="cua/anthropic/claude-sonnet-4.5",
tools=[computer],
only_n_most_recent_images=3,
verbosity=logging.INFO,
trajectory_dir="trajectories",
use_prompt_caching=True,
max_trajectory_budget=10.0,
instructions="""
You are automating a Windows desktop HR/payroll application.
IMPORTANT GUIDELINES:
- Always wait for windows and dialogs to fully load before interacting
- Verify each action by checking on-screen confirmation messages
- Save work frequently using File > Save or Ctrl+S
""".strip()
)
tasks = [
"Launch the HR application",
"Log in with provided credentials",
"Complete the required HR workflow",
"Generate and export report",
"Log out"
]
history = []
for task in tasks:
logger.info(f"\nTask: {task}")
history.append({"role": "user", "content": task})
async for result in agent.run(history):
for item in result.get("output", []):
if item.get("type") == "message":
content = item.get("content", [])
for block in content:
if block.get("type") == "text":
response = block.get("text", "")
logger.info(f"Agent: {response}")
history.append({"role": "assistant", "content": response})
logger.info("\nAll tasks completed!")
except Exception as e:
logger.error(f"Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(automate_hr_workflow())
```
</Tab>
</Tabs>
</Step>
<Step>
### Run Your Automation
Execute the script:
```bash
python hr_automation.py
```
The agent will:
1. Connect to your Windows environment (with VPN if configured)
2. Launch and navigate the desktop application
3. Execute each workflow step sequentially
4. Verify actions and handle errors
5. Save trajectory logs for audit and debugging
Monitor the console output to see the agent's progress through each task.
</Step>
</Steps>
---
## Key Configuration Options
### Agent Instructions
The `instructions` parameter is critical for reliable desktop automation:
```python
instructions="""
You are automating a Windows desktop HR/payroll application.
IMPORTANT GUIDELINES:
- Always wait for windows and dialogs to fully load before interacting
- Look for loading indicators and wait for them to disappear
- Verify each action by checking on-screen confirmation messages
- If a button or field is not visible, try scrolling or navigating tabs
- Desktop apps often have nested menus - explore systematically
- Save work frequently using File > Save or Ctrl+S
- Before closing, always verify changes were saved
COMMON UI PATTERNS:
- Menu bar navigation (File, Edit, View, etc.)
- Ribbon interfaces with tabs
- Modal dialogs that block interaction
- Data grids/tables for viewing records
- Form fields with validation
- Status bars showing operation progress
APPLICATION-SPECIFIC:
- Login is at top-left corner
- Employee records are under "HR Management" > "Employees"
- Reports are generated via "Tools" > "Reports" > "Generate"
- Always click "Save" before navigating away from a form
""".strip()
```
### Budget Management
For long-running workflows, adjust budget limits:
```python
agent = ComputerAgent(
model="cua/anthropic/claude-sonnet-4.5",
tools=[computer],
max_trajectory_budget=20.0, # Increase for complex workflows
# ... other params
)
```
### Image Retention
Balance context and cost by retaining only recent screenshots:
```python
agent = ComputerAgent(
# ...
only_n_most_recent_images=3, # Keep last 3 screenshots
# ...
)
```
---
## Production Considerations
<Callout type="warn" title="Production Deployment">
For enterprise production deployments, consider these additional steps:
</Callout>
### 1. Workflow Mining
Before deploying, analyze your actual workflows:
- Record user interactions with the application
- Identify common patterns and edge cases
- Map out decision trees and validation requirements
- Document application-specific quirks and timing issues
### 2. Custom Finetuning
Create vertical-specific actions instead of generic UI automation:
```python
# Instead of generic steps:
tasks = ["Click login", "Type username", "Type password", "Click submit"]
# Create semantic actions:
tasks = ["onboard_employee", "run_payroll", "generate_compliance_report"]
```
This provides:
- Better audit trails
- Approval gates at business logic level
- Higher success rates
- Easier maintenance and updates
### 3. Human-in-the-Loop
Add approval gates for critical operations:
```python
agent = ComputerAgent(
model="cua/anthropic/claude-sonnet-4.5",
tools=[computer],
# Add human approval callback for sensitive operations
callbacks=[ApprovalCallback(require_approval_for=["payroll", "termination"])]
)
```
### 4. Deployment Options
Choose your deployment model:
**Managed (Recommended)**
- Cua hosts Windows sandboxes, VPN/RDP stack, and agent runtime
- You get UI/API endpoints for triggering workflows
- Automatic scaling, monitoring, and maintenance
- SLA guarantees and enterprise support
**Self-Hosted**
- You manage Windows VMs, VPN infrastructure, and agent deployment
- Full control over data and security
- Custom network configurations
- On-premise or your preferred cloud
---
## Troubleshooting
### VPN Connection Issues
If the agent cannot reach the application:
1. Verify VPN is connected: Check VPN client status in the Windows sandbox
2. Test network connectivity: Try pinging internal resources
3. Check firewall rules: Ensure RDP and application ports are open
4. Review VPN logs: Look for authentication or routing errors
### Application Not Launching
If the desktop application fails to start:
1. Verify installation: Check the application is installed in the sandbox
2. Check dependencies: Ensure all required DLLs and frameworks are present
3. Review permissions: Application may require admin rights
4. Check logs: Look for error messages in Windows Event Viewer
### UI Element Not Found
If the agent cannot find buttons or fields:
1. Increase wait times: Some applications load slowly
2. Check screen resolution: UI elements may be off-screen
3. Verify DPI scaling: High DPI settings can affect element positions
4. Update instructions: Provide more specific navigation guidance
### Cost Management
If costs are higher than expected:
1. Reduce `max_trajectory_budget`
2. Decrease `only_n_most_recent_images`
3. Use prompt caching: Set `use_prompt_caching=True`
4. Optimize task descriptions: Be more specific to reduce retry attempts
---
## Next Steps
- **Explore custom tools**: Learn how to create [custom tools](/agent-sdk/custom-tools) for application-specific actions
- **Implement callbacks**: Add [monitoring and logging](/agent-sdk/callbacks) for production workflows
- **Join community**: Get help in our [Discord](https://discord.com/invite/mVnXXpdE85)
---
## Related Examples
- [Form Filling](/example-usecases/form-filling) - Web form automation
- [Post-Event Contact Export](/example-usecases/post-event-contact-export) - Data extraction workflows
- [Custom Tools](/agent-sdk/custom-tools) - Building application-specific functions
```
--------------------------------------------------------------------------------
/libs/typescript/cua-cli/src/commands/sandbox.ts:
--------------------------------------------------------------------------------
```typescript
import type { Argv } from 'yargs';
import { ensureApiKeyInteractive } from '../auth';
import { http } from '../http';
import { clearApiKey } from '../storage';
import type { SandboxItem } from '../util';
import { openInBrowser, printSandboxList } from '../util';
// Helper function to fetch sandbox details with computer-server probes
async function fetchSandboxDetails(
name: string,
token: string,
options: {
showPasswords?: boolean;
showVncUrl?: boolean;
probeComputerServer?: boolean;
} = {}
) {
// Fetch sandbox list
const listRes = await http('/v1/vms', { token });
if (listRes.status === 401) {
clearApiKey();
console.error("Unauthorized. Try 'cua login' again.");
process.exit(1);
}
if (!listRes.ok) {
console.error(`Request failed: ${listRes.status}`);
process.exit(1);
}
const sandboxes = (await listRes.json()) as SandboxItem[];
const sandbox = sandboxes.find((s) => s.name === name);
if (!sandbox) {
console.error('Sandbox not found');
process.exit(1);
}
// Build result object
const result: any = {
name: sandbox.name,
status: sandbox.status,
host: sandbox.host || `${sandbox.name}.sandbox.cua.ai`,
};
if (options.showPasswords) {
result.password = sandbox.password;
}
// Compute VNC URL if requested
if (options.showVncUrl) {
const host = sandbox.host || `${sandbox.name}.sandbox.cua.ai`;
result.vnc_url = `https://${host}/vnc.html?autoconnect=true&password=${encodeURIComponent(sandbox.password)}&show_dot=true`;
}
// Probe computer-server if requested and sandbox is running
if (
options.probeComputerServer &&
sandbox.status === 'running' &&
sandbox.host
) {
let statusProbeSuccess = false;
let versionProbeSuccess = false;
try {
// Probe OS type
const statusUrl = `https://${sandbox.host}:8443/status`;
const statusController = new AbortController();
const statusTimeout = setTimeout(() => statusController.abort(), 3000);
try {
const statusRes = await fetch(statusUrl, {
signal: statusController.signal,
});
clearTimeout(statusTimeout);
if (statusRes.ok) {
const statusData = (await statusRes.json()) as {
status: string;
os_type: string;
features?: string[];
};
result.os_type = statusData.os_type;
statusProbeSuccess = true;
}
} catch (err) {
// Timeout or connection error - skip
}
// Probe computer-server version
const versionUrl = `https://${sandbox.host}:8443/cmd`;
const versionController = new AbortController();
const versionTimeout = setTimeout(() => versionController.abort(), 3000);
try {
const versionRes = await fetch(versionUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Container-Name': sandbox.name,
'X-API-Key': token,
},
body: JSON.stringify({
command: 'version',
params: {},
}),
signal: versionController.signal,
});
clearTimeout(versionTimeout);
if (versionRes.ok) {
const versionDataRaw = await versionRes.text();
if (versionDataRaw.startsWith('data: ')) {
const jsonStr = versionDataRaw.slice(6);
const versionData = JSON.parse(jsonStr) as {
success: boolean;
protocol: number;
package: string;
};
if (versionData.package) {
result.computer_server_version = versionData.package;
versionProbeSuccess = true;
}
}
}
} catch (err) {
// Timeout or connection error - skip
}
} catch (err) {
// General error - skip probing
}
// Set computer server status based on probe results
if (statusProbeSuccess && versionProbeSuccess) {
result.computer_server_status = 'healthy';
}
}
return result;
}
// Command handlers
const listHandler = async (argv: Record<string, unknown>) => {
const token = await ensureApiKeyInteractive();
const res = await http('/v1/vms', { token });
if (res.status === 401) {
clearApiKey();
console.error("Unauthorized. Try 'cua login' again.");
process.exit(1);
}
if (!res.ok) {
console.error(`Request failed: ${res.status}`);
process.exit(1);
}
const data = (await res.json()) as SandboxItem[];
printSandboxList(data, Boolean(argv['show-passwords']));
};
const createHandler = async (argv: Record<string, unknown>) => {
const token = await ensureApiKeyInteractive();
const { os, size, region } = argv as {
os: string;
size: string;
region: string;
};
const res = await http('/v1/vms', {
token,
method: 'POST',
body: { os, configuration: size, region },
});
if (res.status === 401) {
clearApiKey();
console.error("Unauthorized. Try 'cua login' again.");
process.exit(1);
}
if (res.status === 400) {
console.error('Invalid request or unsupported configuration');
process.exit(1);
}
if (res.status === 500) {
console.error('Internal server error');
process.exit(1);
}
if (res.status === 200) {
const data = (await res.json()) as {
status: string;
name: string;
password: string;
host: string;
};
console.log(`Sandbox created and ready: ${data.name}`);
console.log(`Password: ${data.password}`);
console.log(`Host: ${data.host}`);
return;
}
if (res.status === 202) {
const data = (await res.json()) as {
status: string;
name: string;
job_id: string;
};
console.log(`Sandbox provisioning started: ${data.name}`);
console.log(`Job ID: ${data.job_id}`);
console.log("Use 'cua list' to monitor provisioning progress");
return;
}
console.error(`Unexpected status: ${res.status}`);
process.exit(1);
};
const deleteHandler = async (argv: Record<string, unknown>) => {
const token = await ensureApiKeyInteractive();
const name = String((argv as any).name);
const res = await http(`/v1/vms/${encodeURIComponent(name)}`, {
token,
method: 'DELETE',
});
if (res.status === 202) {
const body = (await res.json().catch(() => ({}))) as {
status?: string;
};
console.log(`Sandbox deletion initiated: ${body.status ?? 'deleting'}`);
return;
}
if (res.status === 404) {
console.error('Sandbox not found or not owned by you');
process.exit(1);
}
if (res.status === 401) {
clearApiKey();
console.error("Unauthorized. Try 'cua login' again.");
process.exit(1);
}
console.error(`Unexpected status: ${res.status}`);
process.exit(1);
};
const startHandler = async (argv: Record<string, unknown>) => {
const token = await ensureApiKeyInteractive();
const name = String((argv as any).name);
const res = await http(`/v1/vms/${encodeURIComponent(name)}/start`, {
token,
method: 'POST',
});
if (res.status === 204) {
console.log('Start accepted');
return;
}
if (res.status === 404) {
console.error('Sandbox not found');
process.exit(1);
}
if (res.status === 401) {
clearApiKey();
console.error("Unauthorized. Try 'cua login' again.");
process.exit(1);
}
console.error(`Unexpected status: ${res.status}`);
process.exit(1);
};
const stopHandler = async (argv: Record<string, unknown>) => {
const token = await ensureApiKeyInteractive();
const name = String((argv as any).name);
const res = await http(`/v1/vms/${encodeURIComponent(name)}/stop`, {
token,
method: 'POST',
});
if (res.status === 202) {
const body = (await res.json().catch(() => ({}))) as {
status?: string;
};
console.log(body.status ?? 'stopping');
return;
}
if (res.status === 404) {
console.error('Sandbox not found');
process.exit(1);
}
if (res.status === 401) {
clearApiKey();
console.error("Unauthorized. Try 'cua login' again.");
process.exit(1);
}
console.error(`Unexpected status: ${res.status}`);
process.exit(1);
};
const restartHandler = async (argv: Record<string, unknown>) => {
const token = await ensureApiKeyInteractive();
const name = String((argv as any).name);
const res = await http(`/v1/vms/${encodeURIComponent(name)}/restart`, {
token,
method: 'POST',
});
if (res.status === 202) {
const body = (await res.json().catch(() => ({}))) as {
status?: string;
};
console.log(body.status ?? 'restarting');
return;
}
if (res.status === 404) {
console.error('Sandbox not found');
process.exit(1);
}
if (res.status === 401) {
clearApiKey();
console.error("Unauthorized. Try 'cua login' again.");
process.exit(1);
}
console.error(`Unexpected status: ${res.status}`);
process.exit(1);
};
const suspendHandler = async (argv: Record<string, unknown>) => {
const token = await ensureApiKeyInteractive();
const name = String((argv as any).name);
const res = await http(`/v1/vms/${encodeURIComponent(name)}/suspend`, {
token,
method: 'POST',
});
if (res.status === 202) {
const body = (await res.json().catch(() => ({}))) as {
status?: string;
};
console.log(body.status ?? 'suspending');
return;
}
if (res.status === 404) {
console.error('Sandbox not found');
process.exit(1);
}
if (res.status === 401) {
clearApiKey();
console.error("Unauthorized. Try 'cua login' again.");
process.exit(1);
}
if (res.status === 400 || res.status === 500) {
const body = (await res.json().catch(() => ({}))) as { error?: string };
console.error(
body.error ??
"Suspend not supported for this VM. Use 'cua sb stop' instead."
);
process.exit(1);
}
console.error(`Unexpected status: ${res.status}`);
process.exit(1);
};
const openHandler = async (argv: Record<string, unknown>) => {
const token = await ensureApiKeyInteractive();
const name = String((argv as any).name);
const listRes = await http('/v1/vms', { token });
if (listRes.status === 401) {
clearApiKey();
console.error("Unauthorized. Try 'cua login' again.");
process.exit(1);
}
if (!listRes.ok) {
console.error(`Request failed: ${listRes.status}`);
process.exit(1);
}
const sandboxes = (await listRes.json()) as SandboxItem[];
const sandbox = sandboxes.find((s) => s.name === name);
if (!sandbox) {
console.error('Sandbox not found');
process.exit(1);
}
const host =
sandbox.host && sandbox.host.length
? sandbox.host
: `${sandbox.name}.sandbox.cua.ai`;
const url = `https://${host}/vnc.html?autoconnect=true&password=${encodeURIComponent(sandbox.password)}&show_dot=true`;
console.log(`Opening NoVNC: ${url}`);
await openInBrowser(url);
};
const getHandler = async (argv: Record<string, unknown>) => {
const token = await ensureApiKeyInteractive();
const name = String((argv as any).name);
const showPasswords = Boolean(argv['show-passwords']);
const showVncUrl = Boolean(argv['show-vnc-url']);
const json = Boolean(argv.json);
const details = await fetchSandboxDetails(name, token, {
showPasswords,
showVncUrl,
probeComputerServer: true,
});
if (json) {
console.log(JSON.stringify(details, null, 2));
} else {
// Pretty print the details
console.log(`Name: ${details.name}`);
console.log(`Status: ${details.status}`);
console.log(`Host: ${details.host}`);
if (showPasswords) {
console.log(`Password: ${details.password}`);
}
if (details.os_type) {
console.log(`OS Type: ${details.os_type}`);
}
if (details.computer_server_version) {
console.log(
`Computer Server Version: ${details.computer_server_version}`
);
}
if (details.computer_server_status) {
console.log(`Computer Server Status: ${details.computer_server_status}`);
}
if (showVncUrl) {
console.log(`VNC URL: ${details.vnc_url}`);
}
}
};
// Register commands in both flat and grouped structures
export function registerSandboxCommands(y: Argv) {
// Grouped structure: cua sandbox <command> or cua sb <command> (register first to appear first in help)
y.command(
['sandbox', 'sb'],
'Create and manage cloud sandboxes (Linux, Windows, or macOS)',
(y) => {
return y
.command(
['list', 'ls', 'ps'],
'List all your sandboxes with status and connection details',
(y) =>
y.option('show-passwords', {
type: 'boolean',
default: false,
describe: 'Show sandbox passwords in output',
}),
listHandler
)
.command(
'create',
'Provision a new cloud sandbox in your chosen OS, size, and region',
(y) =>
y
.option('os', {
type: 'string',
choices: ['linux', 'windows', 'macos'],
demandOption: true,
describe: 'Operating system',
})
.option('size', {
type: 'string',
choices: ['small', 'medium', 'large'],
demandOption: true,
describe: 'Sandbox size',
})
.option('region', {
type: 'string',
choices: [
'north-america',
'europe',
'asia-pacific',
'south-america',
],
demandOption: true,
describe: 'Sandbox region',
}),
createHandler
)
.command(
'delete <name>',
'Permanently delete a sandbox and all its data',
(y) =>
y.positional('name', { type: 'string', describe: 'Sandbox name' }),
deleteHandler
)
.command(
'start <name>',
'Start a stopped sandbox',
(y) =>
y.positional('name', { type: 'string', describe: 'Sandbox name' }),
startHandler
)
.command(
'stop <name>',
'Stop a running sandbox (data is preserved)',
(y) =>
y.positional('name', { type: 'string', describe: 'Sandbox name' }),
stopHandler
)
.command(
'restart <name>',
'Restart a sandbox (reboot the system)',
(y) =>
y.positional('name', { type: 'string', describe: 'Sandbox name' }),
restartHandler
)
.command(
'suspend <name>',
'Suspend a sandbox, preserving memory state (use start to resume)',
(y) =>
y.positional('name', { type: 'string', describe: 'Sandbox name' }),
suspendHandler
)
.command(
['vnc <name>', 'open <name>'],
'Open remote desktop (VNC) connection in your browser',
(y) =>
y.positional('name', { type: 'string', describe: 'Sandbox name' }),
openHandler
)
.command(
'get <name>',
'Get detailed information about a specific sandbox',
(y) =>
y
.positional('name', { type: 'string', describe: 'Sandbox name' })
.option('json', {
type: 'boolean',
default: false,
describe: 'Output in JSON format',
})
.option('show-passwords', {
type: 'boolean',
default: false,
describe: 'Include password in output',
})
.option('show-vnc-url', {
type: 'boolean',
default: false,
describe: 'Include computed NoVNC URL in output',
}),
getHandler
)
.demandCommand(1, 'You must provide a sandbox command');
},
() => {}
);
// Flat structure (backwards compatible, hidden from help)
y.command({
command: ['list', 'ls', 'ps'],
describe: false as any, // Hide from help
builder: (y: Argv) =>
y.option('show-passwords', {
type: 'boolean',
default: false,
describe: 'Show sandbox passwords in output',
}),
handler: listHandler,
} as any)
.command({
command: 'create',
describe: false as any, // Hide from help
builder: (y: Argv) =>
y
.option('os', {
type: 'string',
choices: ['linux', 'windows', 'macos'],
demandOption: true,
describe: 'Operating system',
})
.option('size', {
type: 'string',
choices: ['small', 'medium', 'large'],
demandOption: true,
describe: 'Sandbox size',
})
.option('region', {
type: 'string',
choices: [
'north-america',
'europe',
'asia-pacific',
'south-america',
],
demandOption: true,
describe: 'Sandbox region',
}),
handler: createHandler,
} as any)
.command({
command: 'delete <name>',
describe: false as any, // Hide from help
builder: (y: Argv) =>
y.positional('name', { type: 'string', describe: 'Sandbox name' }),
handler: deleteHandler,
} as any)
.command({
command: 'start <name>',
describe: false as any, // Hide from help
builder: (y: Argv) =>
y.positional('name', { type: 'string', describe: 'Sandbox name' }),
handler: startHandler,
} as any)
.command({
command: 'stop <name>',
describe: false as any, // Hide from help
builder: (y: Argv) =>
y.positional('name', { type: 'string', describe: 'Sandbox name' }),
handler: stopHandler,
} as any)
.command({
command: 'restart <name>',
describe: false as any, // Hide from help
builder: (y: Argv) =>
y.positional('name', { type: 'string', describe: 'Sandbox name' }),
handler: restartHandler,
} as any)
.command({
command: 'suspend <name>',
describe: false as any, // Hide from help
builder: (y: Argv) =>
y.positional('name', { type: 'string', describe: 'Sandbox name' }),
handler: suspendHandler,
} as any)
.command({
command: ['vnc <name>', 'open <name>'],
describe: false as any, // Hide from help
builder: (y: Argv) =>
y.positional('name', { type: 'string', describe: 'Sandbox name' }),
handler: openHandler,
} as any)
.command({
command: 'get <name>',
describe: false as any, // Hide from help
builder: (y: Argv) =>
y
.positional('name', { type: 'string', describe: 'Sandbox name' })
.option('json', {
type: 'boolean',
default: false,
describe: 'Output in JSON format',
})
.option('show-passwords', {
type: 'boolean',
default: false,
describe: 'Include password in output',
})
.option('show-vnc-url', {
type: 'boolean',
default: false,
describe: 'Include computed NoVNC URL in output',
}),
handler: getHandler,
} as any);
return y;
}
```
--------------------------------------------------------------------------------
/libs/python/computer/computer/providers/winsandbox/provider.py:
--------------------------------------------------------------------------------
```python
"""Windows Sandbox VM provider implementation using pywinsandbox."""
import asyncio
import logging
import os
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from ..base import BaseVMProvider, VMProviderType
# Setup logging
logger = logging.getLogger(__name__)
try:
import winsandbox
HAS_WINSANDBOX = True
except ImportError:
HAS_WINSANDBOX = False
class WinSandboxProvider(BaseVMProvider):
"""Windows Sandbox VM provider implementation using pywinsandbox.
This provider uses Windows Sandbox to create isolated Windows environments.
Storage is always ephemeral with Windows Sandbox.
"""
def __init__(
self,
host: str = "localhost",
storage: Optional[str] = None,
verbose: bool = False,
ephemeral: bool = True, # Windows Sandbox is always ephemeral
memory_mb: int = 4096,
networking: bool = True,
**kwargs,
):
"""Initialize the Windows Sandbox provider.
Args:
host: Host to use for connections (default: localhost)
storage: Storage path (ignored - Windows Sandbox is always ephemeral)
verbose: Enable verbose logging
ephemeral: Always True for Windows Sandbox
memory_mb: Memory allocation in MB (default: 4096)
networking: Enable networking in sandbox (default: True)
"""
if not HAS_WINSANDBOX:
raise ImportError(
"pywinsandbox is required for WinSandboxProvider. "
"Please install it with 'pip install pywinsandbox'"
)
self.host = host
self.verbose = verbose
self.memory_mb = memory_mb
self.networking = networking
# Windows Sandbox is always ephemeral
if not ephemeral:
logger.warning("Windows Sandbox storage is always ephemeral. Ignoring ephemeral=False.")
self.ephemeral = True
# Storage is always ephemeral for Windows Sandbox
if storage and storage != "ephemeral":
logger.warning(
"Windows Sandbox does not support persistent storage. Using ephemeral storage."
)
self.storage = "ephemeral"
self.logger = logging.getLogger(__name__)
# Track active sandboxes
self._active_sandboxes: Dict[str, Any] = {}
@property
def provider_type(self) -> VMProviderType:
"""Get the provider type."""
return VMProviderType.WINSANDBOX
async def __aenter__(self):
"""Enter async context manager."""
# Verify Windows Sandbox is available
if not HAS_WINSANDBOX:
raise ImportError("pywinsandbox is not available")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context manager."""
# Clean up any active sandboxes
for name, sandbox in self._active_sandboxes.items():
try:
sandbox.shutdown()
self.logger.info(f"Terminated sandbox: {name}")
except Exception as e:
self.logger.error(f"Error terminating sandbox {name}: {e}")
self._active_sandboxes.clear()
async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Get VM information by name.
Args:
name: Name of the VM to get information for
storage: Ignored for Windows Sandbox (always ephemeral)
Returns:
Dictionary with VM information including status, IP address, etc.
"""
if name not in self._active_sandboxes:
return {"name": name, "status": "stopped", "ip_address": None, "storage": "ephemeral"}
sandbox = self._active_sandboxes[name]
# Check if sandbox is still running
try:
# Try to ping the sandbox to see if it's responsive
try:
sandbox.rpyc.modules.os.getcwd()
sandbox_responsive = True
except Exception:
sandbox_responsive = False
if not sandbox_responsive:
return {
"name": name,
"status": "starting",
"ip_address": None,
"storage": "ephemeral",
"memory_mb": self.memory_mb,
"networking": self.networking,
}
# Check for computer server address file
server_address_file = (
r"C:\Users\WDAGUtilityAccount\Desktop\shared_windows_sandbox_dir\server_address"
)
try:
# Check if the server address file exists
file_exists = sandbox.rpyc.modules.os.path.exists(server_address_file)
if file_exists:
# Read the server address file
with sandbox.rpyc.builtin.open(server_address_file, "r") as f:
server_address = f.read().strip()
if server_address and ":" in server_address:
# Parse IP:port from the file
ip_address, port = server_address.split(":", 1)
# Verify the server is actually responding
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex((ip_address, int(port)))
sock.close()
if result == 0:
# Server is responding
status = "running"
self.logger.debug(f"Computer server found at {ip_address}:{port}")
else:
# Server file exists but not responding
status = "starting"
ip_address = None
except Exception as e:
self.logger.debug(f"Error checking server connectivity: {e}")
status = "starting"
ip_address = None
else:
# File exists but doesn't contain valid address
status = "starting"
ip_address = None
else:
# Server address file doesn't exist yet
status = "starting"
ip_address = None
except Exception as e:
self.logger.debug(f"Error checking server address file: {e}")
status = "starting"
ip_address = None
except Exception as e:
self.logger.error(f"Error checking sandbox status: {e}")
status = "error"
ip_address = None
return {
"name": name,
"status": status,
"ip_address": ip_address,
"storage": "ephemeral",
"memory_mb": self.memory_mb,
"networking": self.networking,
}
async def list_vms(self) -> List[Dict[str, Any]]:
"""List all available VMs."""
vms = []
for name in self._active_sandboxes.keys():
vm_info = await self.get_vm(name)
vms.append(vm_info)
return vms
async def run_vm(
self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None
) -> Dict[str, Any]:
"""Run a VM with the given options.
Args:
image: Image name (ignored for Windows Sandbox - always uses host Windows)
name: Name of the VM to run
run_opts: Dictionary of run options (memory, cpu, etc.)
storage: Ignored for Windows Sandbox (always ephemeral)
Returns:
Dictionary with VM run status and information
"""
if name in self._active_sandboxes:
return {"success": False, "error": f"Sandbox {name} is already running"}
try:
# Extract options from run_opts
memory_mb = run_opts.get("memory_mb", self.memory_mb)
if isinstance(memory_mb, str):
# Convert memory string like "4GB" to MB
if memory_mb.upper().endswith("GB"):
memory_mb = int(float(memory_mb[:-2]) * 1024)
elif memory_mb.upper().endswith("MB"):
memory_mb = int(memory_mb[:-2])
else:
memory_mb = self.memory_mb
networking = run_opts.get("networking", self.networking)
# Create folder mappers; always map a persistent venv directory on host for caching packages
folder_mappers = []
# Ensure host side persistent venv directory exists (Path.home()/wsb_venv)
host_wsb_env = Path.home() / ".cua" / "wsb_cache"
try:
host_wsb_env.mkdir(parents=True, exist_ok=True)
except Exception:
# If cannot create, continue without persistent mapping
host_wsb_env = None
shared_directories = run_opts.get("shared_directories", [])
for shared_dir in shared_directories:
if isinstance(shared_dir, dict):
host_path = shared_dir.get("hostPath", "")
elif isinstance(shared_dir, str):
host_path = shared_dir
else:
continue
if host_path and os.path.exists(host_path):
folder_mappers.append(winsandbox.FolderMapper(host_path))
# Add mapping for the persistent venv directory (read/write) so it appears in Sandbox Desktop
if host_wsb_env is not None and host_wsb_env.exists():
try:
folder_mappers.append(
winsandbox.FolderMapper(str(host_wsb_env), read_only=False)
)
except Exception as e:
self.logger.warning(f"Failed to map host winsandbox_venv: {e}")
self.logger.info(f"Creating Windows Sandbox: {name}")
self.logger.info(f"Memory: {memory_mb}MB, Networking: {networking}")
if folder_mappers:
self.logger.info(f"Shared directories: {len(folder_mappers)}")
# Create the sandbox without logon script
try:
# Try with memory_mb parameter (newer pywinsandbox version)
sandbox = winsandbox.new_sandbox(
memory_mb=str(memory_mb), networking=networking, folder_mappers=folder_mappers
)
except TypeError as e:
if "memory_mb" in str(e):
# Fallback for older pywinsandbox version that doesn't support memory_mb
self.logger.warning(
"Your pywinsandbox version doesn't support memory_mb parameter. "
"Using default memory settings. To use custom memory settings, "
"please update pywinsandbox: pip install -U git+https://github.com/karkason/pywinsandbox.git"
)
sandbox = winsandbox.new_sandbox(
networking=networking, folder_mappers=folder_mappers
)
else:
# Re-raise if it's a different TypeError
raise
# Store the sandbox
self._active_sandboxes[name] = sandbox
self.logger.info(f"Windows Sandbox {name} created successfully")
venv_exists = (
(host_wsb_env / "venv" / "Lib" / "site-packages" / "computer_server").exists()
if host_wsb_env
else False
)
# Setup the computer server in the sandbox
await self._setup_computer_server(sandbox, name, wait_for_venv=(not venv_exists))
return {
"success": True,
"name": name,
"status": "starting",
"memory_mb": memory_mb,
"networking": networking,
"storage": "ephemeral",
}
except Exception as e:
self.logger.error(f"Failed to create Windows Sandbox {name}: {e}")
# stack trace
import traceback
self.logger.error(f"Stack trace: {traceback.format_exc()}")
return {"success": False, "error": f"Failed to create sandbox: {str(e)}"}
async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Stop a running VM.
Args:
name: Name of the VM to stop
storage: Ignored for Windows Sandbox
Returns:
Dictionary with stop status and information
"""
if name not in self._active_sandboxes:
return {"success": False, "error": f"Sandbox {name} is not running"}
try:
sandbox = self._active_sandboxes[name]
# Terminate the sandbox
sandbox.shutdown()
# Remove from active sandboxes
del self._active_sandboxes[name]
self.logger.info(f"Windows Sandbox {name} stopped successfully")
return {"success": True, "name": name, "status": "stopped"}
except Exception as e:
self.logger.error(f"Failed to stop Windows Sandbox {name}: {e}")
return {"success": False, "error": f"Failed to stop sandbox: {str(e)}"}
async def update_vm(
self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None
) -> Dict[str, Any]:
"""Update VM configuration.
Note: Windows Sandbox does not support runtime configuration updates.
The sandbox must be stopped and restarted with new configuration.
Args:
name: Name of the VM to update
update_opts: Dictionary of update options
storage: Ignored for Windows Sandbox
Returns:
Dictionary with update status and information
"""
return {
"success": False,
"error": "Windows Sandbox does not support runtime configuration updates. "
"Please stop and restart the sandbox with new configuration.",
}
async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
raise NotImplementedError("WinSandboxProvider does not support restarting VMs.")
async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str:
"""Get the IP address of a VM, waiting indefinitely until it's available.
Args:
name: Name of the VM to get the IP for
storage: Ignored for Windows Sandbox
retry_delay: Delay between retries in seconds (default: 2)
Returns:
IP address of the VM when it becomes available
"""
total_attempts = 0
# Loop indefinitely until we get a valid IP
while True:
total_attempts += 1
# Log retry message but not on first attempt
if total_attempts > 1:
self.logger.info(
f"Waiting for Windows Sandbox {name} IP address (attempt {total_attempts})..."
)
try:
# Get VM information
vm_info = await self.get_vm(name, storage=storage)
# Check if we got a valid IP
ip = vm_info.get("ip_address", None)
if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
self.logger.info(f"Got valid Windows Sandbox IP address: {ip}")
return ip
# Check the VM status
status = vm_info.get("status", "unknown")
# If VM is not running yet, log and wait
if status != "running":
self.logger.info(
f"Windows Sandbox is not running yet (status: {status}). Waiting..."
)
# If VM is running but no IP yet, wait and retry
else:
self.logger.info(
"Windows Sandbox is running but no valid IP address yet. Waiting..."
)
except Exception as e:
self.logger.warning(
f"Error getting Windows Sandbox {name} IP: {e}, continuing to wait..."
)
# Wait before next retry
await asyncio.sleep(retry_delay)
# Add progress log every 10 attempts
if total_attempts % 10 == 0:
self.logger.info(
f"Still waiting for Windows Sandbox {name} IP after {total_attempts} attempts..."
)
async def _setup_computer_server(
self, sandbox, name: str, visible: bool = False, wait_for_venv: bool = True
):
"""Setup the computer server in the Windows Sandbox using RPyC.
Args:
sandbox: The Windows Sandbox instance
name: Name of the sandbox
visible: Whether the opened process should be visible (default: False)
"""
try:
self.logger.info(f"Setting up computer server in sandbox {name}...")
# Read the PowerShell setup script
script_path = os.path.join(os.path.dirname(__file__), "setup_script.ps1")
with open(script_path, "r", encoding="utf-8") as f:
setup_script_content = f.read()
# Write the setup script to the sandbox using RPyC
script_dest_path = r"C:\Users\WDAGUtilityAccount\setup_cua.ps1"
self.logger.info(f"Writing setup script to {script_dest_path}")
with sandbox.rpyc.builtin.open(script_dest_path, "w") as f:
f.write(setup_script_content)
# Execute the PowerShell script in the background
self.logger.info("Executing setup script in sandbox...")
# Use subprocess to run PowerShell script
import subprocess
powershell_cmd = [
"powershell.exe",
"-ExecutionPolicy",
"Bypass",
"-NoExit", # Keep window open after script completes
"-File",
script_dest_path,
]
# Set creation flags based on visibility preference
if visible:
# CREATE_NEW_CONSOLE - creates a new console window (visible)
creation_flags = 0x00000010
else:
creation_flags = 0x08000000 # CREATE_NO_WINDOW
# Start the process using RPyC
process = sandbox.rpyc.modules.subprocess.Popen(
powershell_cmd, creationflags=creation_flags, shell=False
)
if wait_for_venv:
print(
"Waiting for venv to be created for the first time setup of Windows Sandbox..."
)
print("This may take a minute...")
await asyncio.sleep(120)
ip = await self.get_ip(name)
self.logger.info(f"Sandbox IP: {ip}")
self.logger.info(
f"Setup script started in background in sandbox {name} with PID: {process.pid}"
)
except Exception as e:
self.logger.error(f"Failed to setup computer server in sandbox {name}: {e}")
import traceback
self.logger.error(f"Stack trace: {traceback.format_exc()}")
```
--------------------------------------------------------------------------------
/libs/python/computer/computer/providers/lume/provider.py:
--------------------------------------------------------------------------------
```python
"""Lume VM provider implementation using curl commands.
This provider uses direct curl commands to interact with the Lume API,
removing the dependency on the pylume Python package.
"""
import asyncio
import json
import logging
import os
import re
import subprocess
import urllib.parse
from typing import Any, Dict, List, Optional, Tuple
from ...logger import Logger, LogLevel
from ..base import BaseVMProvider, VMProviderType
from ..lume_api import (
HAS_CURL,
lume_api_get,
lume_api_pull,
lume_api_run,
lume_api_stop,
lume_api_update,
parse_memory,
)
# Setup logging
logger = logging.getLogger(__name__)
class LumeProvider(BaseVMProvider):
"""Lume VM provider implementation using direct curl commands.
This provider uses curl to interact with the Lume API server,
removing the dependency on the pylume Python package.
"""
def __init__(
self,
provider_port: int = 7777,
host: str = "localhost",
storage: Optional[str] = None,
verbose: bool = False,
ephemeral: bool = False,
):
"""Initialize the Lume provider.
Args:
provider_port: Port for the Lume API server (default: 7777)
host: Host to use for API connections (default: localhost)
storage: Path to store VM data
verbose: Enable verbose logging
"""
if not HAS_CURL:
raise ImportError(
"curl is required for LumeProvider. "
"Please ensure it is installed and in your PATH."
)
self.host = host
self.port = provider_port # Default port for Lume API
self.storage = storage
self.verbose = verbose
self.ephemeral = ephemeral # If True, VMs will be deleted after stopping
# Base API URL for Lume API calls
self.api_base_url = f"http://{self.host}:{self.port}"
self.logger = logging.getLogger(__name__)
@property
def provider_type(self) -> VMProviderType:
"""Get the provider type."""
return VMProviderType.LUME
async def __aenter__(self):
"""Enter async context manager."""
# No initialization needed, just return self
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context manager."""
# No cleanup needed
pass
def _lume_api_get(
self, vm_name: str = "", storage: Optional[str] = None, debug: bool = False
) -> Dict[str, Any]:
"""Get VM information using shared lume_api function.
Args:
vm_name: Optional name of the VM to get info for.
If empty, lists all VMs.
storage: Optional storage path override. If provided, this will be used instead of self.storage
debug: Whether to show debug output
Returns:
Dictionary with VM status information parsed from JSON response
"""
# Use the shared implementation from lume_api module
return lume_api_get(
vm_name=vm_name,
host=self.host,
port=self.port,
storage=storage if storage is not None else self.storage,
debug=debug,
verbose=self.verbose,
)
def _lume_api_run(
self, vm_name: str, run_opts: Dict[str, Any], debug: bool = False
) -> Dict[str, Any]:
"""Run a VM using shared lume_api function.
Args:
vm_name: Name of the VM to run
run_opts: Dictionary of run options
debug: Whether to show debug output
Returns:
Dictionary with API response or error information
"""
# Use the shared implementation from lume_api module
return lume_api_run(
vm_name=vm_name,
host=self.host,
port=self.port,
run_opts=run_opts,
storage=self.storage,
debug=debug,
verbose=self.verbose,
)
def _lume_api_stop(self, vm_name: str, debug: bool = False) -> Dict[str, Any]:
"""Stop a VM using shared lume_api function.
Args:
vm_name: Name of the VM to stop
debug: Whether to show debug output
Returns:
Dictionary with API response or error information
"""
# Use the shared implementation from lume_api module
return lume_api_stop(
vm_name=vm_name,
host=self.host,
port=self.port,
storage=self.storage,
debug=debug,
verbose=self.verbose,
)
def _lume_api_update(
self, vm_name: str, update_opts: Dict[str, Any], debug: bool = False
) -> Dict[str, Any]:
"""Update VM configuration using shared lume_api function.
Args:
vm_name: Name of the VM to update
update_opts: Dictionary of update options
debug: Whether to show debug output
Returns:
Dictionary with API response or error information
"""
# Use the shared implementation from lume_api module
return lume_api_update(
vm_name=vm_name,
host=self.host,
port=self.port,
update_opts=update_opts,
storage=self.storage,
debug=debug,
verbose=self.verbose,
)
async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Get VM information by name.
Args:
name: Name of the VM to get information for
storage: Optional storage path override. If provided, this will be used
instead of the provider's default storage path.
Returns:
Dictionary with VM information including status, IP address, etc.
Note:
If storage is not provided, the provider's default storage path will be used.
The storage parameter allows overriding the storage location for this specific call.
"""
if not HAS_CURL:
logger.error("curl is not available. Cannot get VM status.")
return {"name": name, "status": "unavailable", "error": "curl is not available"}
# First try to get detailed VM info from the API
try:
# Query the Lume API for VM status using the provider's storage_path
vm_info = self._lume_api_get(
vm_name=name,
storage=storage if storage is not None else self.storage,
debug=self.verbose,
)
# Check for API errors
if "error" in vm_info:
logger.debug(f"API request error: {vm_info['error']}")
# If we got an error from the API, report the VM as not ready yet
return {
"name": name,
"status": "starting", # VM is still starting - do not attempt to connect yet
"api_status": "error",
"error": vm_info["error"],
}
# Process the VM status information
vm_status = vm_info.get("status", "unknown")
# Check if VM is stopped or not running - don't wait for IP in this case
if vm_status == "stopped":
logger.info(f"VM {name} is in '{vm_status}' state - not waiting for IP address")
# Return the status as-is without waiting for an IP
result = {
"name": name,
"status": vm_status,
**vm_info, # Include all original fields from the API response
}
return result
# Handle field name differences between APIs
# Some APIs use camelCase, others use snake_case
if "vncUrl" in vm_info:
vnc_url = vm_info["vncUrl"]
elif "vnc_url" in vm_info:
vnc_url = vm_info["vnc_url"]
else:
vnc_url = ""
if "ipAddress" in vm_info:
ip_address = vm_info["ipAddress"]
elif "ip_address" in vm_info:
ip_address = vm_info["ip_address"]
else:
# If no IP address is provided and VM is supposed to be running,
# report it as still starting
ip_address = None
logger.info(
f"VM {name} is in '{vm_status}' state but no IP address found - reporting as still starting"
)
logger.info(f"VM {name} status: {vm_status}")
# Return the complete status information
result = {
"name": name,
"status": vm_status if vm_status else "running",
"ip_address": ip_address,
"vnc_url": vnc_url,
"api_status": "ok",
}
# Include all original fields from the API response
if isinstance(vm_info, dict):
for key, value in vm_info.items():
if key not in result: # Don't override our carefully processed fields
result[key] = value
return result
except Exception as e:
logger.error(f"Failed to get VM status: {e}")
# Return a fallback status that indicates the VM is not ready yet
return {
"name": name,
"status": "initializing", # VM is still initializing
"error": f"Failed to get VM status: {str(e)}",
}
async def list_vms(self) -> List[Dict[str, Any]]:
"""List all available VMs."""
result = self._lume_api_get(debug=self.verbose)
# Extract the VMs list from the response
if "vms" in result and isinstance(result["vms"], list):
return result["vms"]
elif "error" in result:
logger.error(f"Error listing VMs: {result['error']}")
return []
else:
return []
async def run_vm(
self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None
) -> Dict[str, Any]:
"""Run a VM with the given options.
If the VM does not exist in the storage location, this will attempt to pull it
from the Lume registry first.
Args:
image: Image name to use when pulling the VM if it doesn't exist
name: Name of the VM to run
run_opts: Dictionary of run options (memory, cpu, etc.)
storage: Optional storage path override. If provided, this will be used
instead of the provider's default storage path.
Returns:
Dictionary with VM run status and information
"""
# First check if VM exists by trying to get its info
vm_info = await self.get_vm(name, storage=storage)
if "error" in vm_info:
# VM doesn't exist, try to pull it
self.logger.info(
f"VM {name} not found, attempting to pull image {image} from registry..."
)
# Call pull_vm with the image parameter
pull_result = await self.pull_vm(name=name, image=image, storage=storage)
# Check if pull was successful
if "error" in pull_result:
self.logger.error(f"Failed to pull VM image: {pull_result['error']}")
return pull_result # Return the error from pull
self.logger.info(f"Successfully pulled VM image {image} as {name}")
# Now run the VM with the given options
self.logger.info(f"Running VM {name} with options: {run_opts}")
from ..lume_api import lume_api_run
return lume_api_run(
vm_name=name,
host=self.host,
port=self.port,
run_opts=run_opts,
storage=storage if storage is not None else self.storage,
debug=self.verbose,
verbose=self.verbose,
)
async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Stop a running VM.
If this provider was initialized with ephemeral=True, the VM will also
be deleted after it is stopped.
Args:
name: Name of the VM to stop
storage: Optional storage path override
Returns:
Dictionary with stop status and information
"""
# Stop the VM first
stop_result = self._lume_api_stop(name, debug=self.verbose)
# Log ephemeral status for debugging
self.logger.info(f"Ephemeral mode status: {self.ephemeral}")
# If ephemeral mode is enabled, delete the VM after stopping
if self.ephemeral and (stop_result.get("success", False) or "error" not in stop_result):
self.logger.info(f"Ephemeral mode enabled - deleting VM {name} after stopping")
try:
delete_result = await self.delete_vm(name, storage=storage)
# Return combined result
return {
**stop_result, # Include all stop result info
"deleted": True,
"delete_result": delete_result,
}
except Exception as e:
self.logger.error(f"Failed to delete ephemeral VM {name}: {e}")
# Include the error but still return stop result
return {**stop_result, "deleted": False, "delete_error": str(e)}
# Just return the stop result if not ephemeral
return stop_result
async def pull_vm(
self,
name: str,
image: str,
storage: Optional[str] = None,
registry: str = "ghcr.io",
organization: str = "trycua",
pull_opts: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Pull a VM image from the registry.
Args:
name: Name for the VM after pulling
image: The image name to pull (e.g. 'macos-sequoia-cua:latest')
storage: Optional storage path to use
registry: Registry to pull from (default: ghcr.io)
organization: Organization in registry (default: trycua)
pull_opts: Additional options for pulling the VM (optional)
Returns:
Dictionary with information about the pulled VM
Raises:
RuntimeError: If pull operation fails or image is not provided
"""
# Validate image parameter
if not image:
raise ValueError("Image parameter is required for pull_vm")
self.logger.info(f"Pulling VM image '{image}' as '{name}'")
self.logger.info("You can check the pull progress using: lume logs -f")
# Set default pull_opts if not provided
if pull_opts is None:
pull_opts = {}
# Log information about the operation
self.logger.debug(f"Pull storage location: {storage or 'default'}")
try:
# Call the lume_api_pull function from lume_api.py
from ..lume_api import lume_api_pull
result = lume_api_pull(
image=image,
name=name,
host=self.host,
port=self.port,
storage=storage if storage is not None else self.storage,
registry=registry,
organization=organization,
debug=self.verbose,
verbose=self.verbose,
)
# Check for errors in the result
if "error" in result:
self.logger.error(f"Failed to pull VM image: {result['error']}")
return result
self.logger.info(f"Successfully pulled VM image '{image}' as '{name}'")
return result
except Exception as e:
self.logger.error(f"Failed to pull VM image '{image}': {e}")
return {"error": f"Failed to pull VM: {str(e)}"}
async def delete_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Delete a VM permanently.
Args:
name: Name of the VM to delete
storage: Optional storage path override
Returns:
Dictionary with delete status and information
"""
self.logger.info(f"Deleting VM {name}...")
try:
# Call the lume_api_delete function we created
from ..lume_api import lume_api_delete
result = lume_api_delete(
vm_name=name,
host=self.host,
port=self.port,
storage=storage if storage is not None else self.storage,
debug=self.verbose,
verbose=self.verbose,
)
# Check for errors in the result
if "error" in result:
self.logger.error(f"Failed to delete VM: {result['error']}")
return result
self.logger.info(f"Successfully deleted VM '{name}'")
return result
except Exception as e:
self.logger.error(f"Failed to delete VM '{name}': {e}")
return {"error": f"Failed to delete VM: {str(e)}"}
async def update_vm(
self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None
) -> Dict[str, Any]:
"""Update VM configuration."""
return self._lume_api_update(name, update_opts, debug=self.verbose)
async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
raise NotImplementedError("LumeProvider does not support restarting VMs.")
async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str:
"""Get the IP address of a VM, waiting indefinitely until it's available.
Args:
name: Name of the VM to get the IP for
storage: Optional storage path override
retry_delay: Delay between retries in seconds (default: 2)
Returns:
IP address of the VM when it becomes available
"""
# Track total attempts for logging purposes
total_attempts = 0
# Loop indefinitely until we get a valid IP
while True:
total_attempts += 1
# Log retry message but not on first attempt
if total_attempts > 1:
self.logger.info(f"Waiting for VM {name} IP address (attempt {total_attempts})...")
try:
# Get VM information
vm_info = await self.get_vm(name, storage=storage)
# Check if we got a valid IP
ip = vm_info.get("ip_address", None)
if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
self.logger.info(f"Got valid VM IP address: {ip}")
return ip
# Check the VM status
status = vm_info.get("status", "unknown")
# If VM is not running yet, log and wait
if status != "running":
self.logger.info(f"VM is not running yet (status: {status}). Waiting...")
# If VM is running but no IP yet, wait and retry
else:
self.logger.info("VM is running but no valid IP address yet. Waiting...")
except Exception as e:
self.logger.warning(f"Error getting VM {name} IP: {e}, continuing to wait...")
# Wait before next retry
await asyncio.sleep(retry_delay)
# Add progress log every 10 attempts
if total_attempts % 10 == 0:
self.logger.info(
f"Still waiting for VM {name} IP after {total_attempts} attempts..."
)
```
--------------------------------------------------------------------------------
/libs/python/computer/computer/providers/lume_api.py:
--------------------------------------------------------------------------------
```python
"""Shared API utilities for Lume and Lumier providers.
This module contains shared functions for interacting with the Lume API,
used by both the LumeProvider and LumierProvider classes.
"""
import json
import logging
import subprocess
import urllib.parse
from typing import Any, Dict, List, Optional
from computer.utils import safe_join
# Setup logging
logger = logging.getLogger(__name__)
# Check if curl is available
try:
subprocess.run(["curl", "--version"], capture_output=True, check=True)
HAS_CURL = True
except (subprocess.SubprocessError, FileNotFoundError):
HAS_CURL = False
def lume_api_get(
vm_name: str,
host: str,
port: int,
storage: Optional[str] = None,
debug: bool = False,
verbose: bool = False,
) -> Dict[str, Any]:
"""Use curl to get VM information from Lume API.
Args:
vm_name: Name of the VM to get info for
host: API host
port: API port
storage: Storage path for the VM
debug: Whether to show debug output
verbose: Enable verbose logging
Returns:
Dictionary with VM status information parsed from JSON response
"""
# URL encode the storage parameter for the query
encoded_storage = ""
storage_param = ""
if storage:
# First encode the storage path properly
encoded_storage = urllib.parse.quote(storage, safe="")
storage_param = f"?storage={encoded_storage}"
# Construct API URL with encoded storage parameter if needed
api_url = f"http://{host}:{port}/lume/vms/{vm_name}{storage_param}"
# Construct the curl command with increased timeouts for more reliability
# --connect-timeout: Time to establish connection (15 seconds)
# --max-time: Maximum time for the whole operation (20 seconds)
# -f: Fail silently (no output at all) on server errors
# Add single quotes around URL to ensure special characters are handled correctly
cmd = ["curl", "--connect-timeout", "15", "--max-time", "20", "-s", "-f", api_url]
# For logging and display, show the properly escaped URL
display_cmd = ["curl", "--connect-timeout", "15", "--max-time", "20", "-s", "-f", api_url]
# Only print the curl command when debug is enabled
display_curl_string = " ".join(display_cmd)
logger.debug(f"Executing API request: {display_curl_string}")
# Execute the command - for execution we need to use shell=True to handle URLs with special characters
try:
# Use a single string with shell=True for proper URL handling
shell_cmd = safe_join(cmd)
result = subprocess.run(shell_cmd, shell=True, capture_output=True, text=True)
# Handle curl exit codes
if result.returncode != 0:
curl_error = "Unknown error"
# Map common curl error codes to helpful messages
if result.returncode == 7:
curl_error = "Failed to connect to the API server - it might still be starting up"
elif result.returncode == 22:
curl_error = "HTTP error returned from API server"
elif result.returncode == 28:
curl_error = "Operation timeout - the API server is taking too long to respond"
elif result.returncode == 52:
curl_error = (
"Empty reply from server - the API server is starting but not fully ready yet"
)
elif result.returncode == 56:
curl_error = "Network problem during data transfer - check container networking"
# Only log at debug level to reduce noise during retries
logger.debug(f"API request failed with code {result.returncode}: {curl_error}")
# Return a more useful error message
return {
"error": f"API request failed: {curl_error}",
"curl_code": result.returncode,
"vm_name": vm_name,
"status": "unknown", # We don't know the actual status due to API error
}
# Try to parse the response as JSON
if result.stdout and result.stdout.strip():
try:
vm_status = json.loads(result.stdout)
if debug or verbose:
logger.info(
f"Successfully parsed VM status: {vm_status.get('status', 'unknown')}"
)
return vm_status
except json.JSONDecodeError as e:
# Return the raw response if it's not valid JSON
logger.warning(f"Invalid JSON response: {e}")
if "Virtual machine not found" in result.stdout:
return {"status": "not_found", "message": "VM not found in Lume API"}
return {
"error": f"Invalid JSON response: {result.stdout[:100]}...",
"status": "unknown",
}
else:
return {"error": "Empty response from API", "status": "unknown"}
except subprocess.SubprocessError as e:
logger.error(f"Failed to execute API request: {e}")
return {"error": f"Failed to execute API request: {str(e)}", "status": "unknown"}
def lume_api_run(
vm_name: str,
host: str,
port: int,
run_opts: Dict[str, Any],
storage: Optional[str] = None,
debug: bool = False,
verbose: bool = False,
) -> Dict[str, Any]:
"""Run a VM using curl.
Args:
vm_name: Name of the VM to run
host: API host
port: API port
run_opts: Dictionary of run options
storage: Storage path for the VM
debug: Whether to show debug output
verbose: Enable verbose logging
Returns:
Dictionary with API response or error information
"""
# Construct API URL
api_url = f"http://{host}:{port}/lume/vms/{vm_name}/run"
# Prepare JSON payload with required parameters
payload = {}
# Add CPU cores if specified
if "cpu" in run_opts:
payload["cpu"] = run_opts["cpu"]
# Add memory if specified
if "memory" in run_opts:
payload["memory"] = run_opts["memory"]
# Add storage parameter if specified
if storage:
payload["storage"] = storage
elif "storage" in run_opts:
payload["storage"] = run_opts["storage"]
# Add shared directories if specified
if "shared_directories" in run_opts and run_opts["shared_directories"]:
payload["sharedDirectories"] = run_opts["shared_directories"]
# Log the payload for debugging
logger.debug(f"API payload: {json.dumps(payload, indent=2)}")
# Construct the curl command
cmd = [
"curl",
"--connect-timeout",
"30",
"--max-time",
"30",
"-s",
"-X",
"POST",
"-H",
"Content-Type: application/json",
"-d",
json.dumps(payload),
api_url,
]
# Execute the command
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"API request failed with code {result.returncode}: {result.stderr}")
return {"error": f"API request failed: {result.stderr}"}
# Try to parse the response as JSON
if result.stdout and result.stdout.strip():
try:
response = json.loads(result.stdout)
return response
except json.JSONDecodeError:
# Return the raw response if it's not valid JSON
return {
"success": True,
"message": "VM started successfully",
"raw_response": result.stdout,
}
else:
return {"success": True, "message": "VM started successfully"}
except subprocess.SubprocessError as e:
logger.error(f"Failed to execute run request: {e}")
return {"error": f"Failed to execute run request: {str(e)}"}
def lume_api_stop(
vm_name: str,
host: str,
port: int,
storage: Optional[str] = None,
debug: bool = False,
verbose: bool = False,
) -> Dict[str, Any]:
"""Stop a VM using curl.
Args:
vm_name: Name of the VM to stop
host: API host
port: API port
storage: Storage path for the VM
debug: Whether to show debug output
verbose: Enable verbose logging
Returns:
Dictionary with API response or error information
"""
# Construct API URL
api_url = f"http://{host}:{port}/lume/vms/{vm_name}/stop"
# Prepare JSON payload with required parameters
payload = {}
# Add storage path if specified
if storage:
payload["storage"] = storage
# Construct the curl command
cmd = [
"curl",
"--connect-timeout",
"15",
"--max-time",
"20",
"-s",
"-X",
"POST",
"-H",
"Content-Type: application/json",
"-d",
json.dumps(payload),
api_url,
]
# Execute the command
try:
if debug or verbose:
logger.info(f"Executing: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"API request failed with code {result.returncode}: {result.stderr}")
return {"error": f"API request failed: {result.stderr}"}
# Try to parse the response as JSON
if result.stdout and result.stdout.strip():
try:
response = json.loads(result.stdout)
return response
except json.JSONDecodeError:
# Return the raw response if it's not valid JSON
return {
"success": True,
"message": "VM stopped successfully",
"raw_response": result.stdout,
}
else:
return {"success": True, "message": "VM stopped successfully"}
except subprocess.SubprocessError as e:
logger.error(f"Failed to execute stop request: {e}")
return {"error": f"Failed to execute stop request: {str(e)}"}
def lume_api_update(
vm_name: str,
host: str,
port: int,
update_opts: Dict[str, Any],
storage: Optional[str] = None,
debug: bool = False,
verbose: bool = False,
) -> Dict[str, Any]:
"""Update VM settings using curl.
Args:
vm_name: Name of the VM to update
host: API host
port: API port
update_opts: Dictionary of update options
storage: Storage path for the VM
debug: Whether to show debug output
verbose: Enable verbose logging
Returns:
Dictionary with API response or error information
"""
# Construct API URL
api_url = f"http://{host}:{port}/lume/vms/{vm_name}/update"
# Prepare JSON payload with required parameters
payload = {}
# Add CPU cores if specified
if "cpu" in update_opts:
payload["cpu"] = update_opts["cpu"]
# Add memory if specified
if "memory" in update_opts:
payload["memory"] = update_opts["memory"]
# Add storage path if specified
if storage:
payload["storage"] = storage
# Construct the curl command
cmd = [
"curl",
"--connect-timeout",
"15",
"--max-time",
"20",
"-s",
"-X",
"POST",
"-H",
"Content-Type: application/json",
"-d",
json.dumps(payload),
api_url,
]
# Execute the command
try:
if debug:
logger.info(f"Executing: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"API request failed with code {result.returncode}: {result.stderr}")
return {"error": f"API request failed: {result.stderr}"}
# Try to parse the response as JSON
if result.stdout and result.stdout.strip():
try:
response = json.loads(result.stdout)
return response
except json.JSONDecodeError:
# Return the raw response if it's not valid JSON
return {
"success": True,
"message": "VM updated successfully",
"raw_response": result.stdout,
}
else:
return {"success": True, "message": "VM updated successfully"}
except subprocess.SubprocessError as e:
logger.error(f"Failed to execute update request: {e}")
return {"error": f"Failed to execute update request: {str(e)}"}
def lume_api_pull(
image: str,
name: str,
host: str,
port: int,
storage: Optional[str] = None,
registry: str = "ghcr.io",
organization: str = "trycua",
debug: bool = False,
verbose: bool = False,
) -> Dict[str, Any]:
"""Pull a VM image from a registry using curl.
Args:
image: Name/tag of the image to pull
name: Name to give the VM after pulling
host: API host
port: API port
storage: Storage path for the VM
registry: Registry to pull from (default: ghcr.io)
organization: Organization in registry (default: trycua)
debug: Whether to show debug output
verbose: Enable verbose logging
Returns:
Dictionary with pull status and information
"""
# Prepare pull request payload
pull_payload = {
"image": image, # Use provided image name
"name": name, # Always use name as the target VM name
"registry": registry,
"organization": organization,
}
if storage:
pull_payload["storage"] = storage
# Construct pull command with proper JSON payload
pull_cmd = ["curl"]
if not verbose:
pull_cmd.append("-s")
pull_cmd.extend(
[
"-X",
"POST",
"-H",
"Content-Type: application/json",
"-d",
json.dumps(pull_payload),
f"http://{host}:{port}/lume/pull",
]
)
logger.debug(f"Executing API request: {' '.join(pull_cmd)}")
try:
# Execute pull command
result = subprocess.run(pull_cmd, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f"Failed to pull VM {name}: {result.stderr}"
logger.error(error_msg)
return {"error": error_msg}
try:
response = json.loads(result.stdout)
logger.info(f"Successfully initiated pull for VM {name}")
return response
except json.JSONDecodeError:
if result.stdout:
logger.info(f"Pull response: {result.stdout}")
return {"success": True, "message": f"Successfully initiated pull for VM {name}"}
except subprocess.SubprocessError as e:
error_msg = f"Failed to execute pull command: {str(e)}"
logger.error(error_msg)
return {"error": error_msg}
def lume_api_delete(
vm_name: str,
host: str,
port: int,
storage: Optional[str] = None,
debug: bool = False,
verbose: bool = False,
) -> Dict[str, Any]:
"""Delete a VM using curl.
Args:
vm_name: Name of the VM to delete
host: API host
port: API port
storage: Storage path for the VM
debug: Whether to show debug output
verbose: Enable verbose logging
Returns:
Dictionary with API response or error information
"""
# URL encode the storage parameter for the query
encoded_storage = ""
storage_param = ""
if storage:
# First encode the storage path properly
encoded_storage = urllib.parse.quote(storage, safe="")
storage_param = f"?storage={encoded_storage}"
# Construct API URL with encoded storage parameter if needed
api_url = f"http://{host}:{port}/lume/vms/{vm_name}{storage_param}"
# Construct the curl command for DELETE operation - using much longer timeouts matching shell implementation
cmd = [
"curl",
"--connect-timeout",
"6000",
"--max-time",
"5000",
"-s",
"-X",
"DELETE",
api_url,
]
# For logging and display, show the properly escaped URL
display_cmd = [
"curl",
"--connect-timeout",
"6000",
"--max-time",
"5000",
"-s",
"-X",
"DELETE",
api_url,
]
# Only print the curl command when debug is enabled
display_curl_string = " ".join(display_cmd)
logger.debug(f"Executing API request: {display_curl_string}")
# Execute the command - for execution we need to use shell=True to handle URLs with special characters
try:
# Use a single string with shell=True for proper URL handling
shell_cmd = safe_join(cmd)
result = subprocess.run(shell_cmd, shell=True, capture_output=True, text=True)
# Handle curl exit codes
if result.returncode != 0:
curl_error = "Unknown error"
# Map common curl error codes to helpful messages
if result.returncode == 7:
curl_error = "Failed to connect to the API server - it might still be starting up"
elif result.returncode == 22:
curl_error = "HTTP error returned from API server"
elif result.returncode == 28:
curl_error = "Operation timeout - the API server is taking too long to respond"
elif result.returncode == 52:
curl_error = (
"Empty reply from server - the API server is starting but not fully ready yet"
)
elif result.returncode == 56:
curl_error = "Network problem during data transfer - check container networking"
# Only log at debug level to reduce noise during retries
logger.debug(f"API request failed with code {result.returncode}: {curl_error}")
# Return a more useful error message
return {
"error": f"API request failed: {curl_error}",
"curl_code": result.returncode,
"vm_name": vm_name,
"storage": storage,
}
# Try to parse the response as JSON
if result.stdout and result.stdout.strip():
try:
response = json.loads(result.stdout)
return response
except json.JSONDecodeError:
# Return the raw response if it's not valid JSON
return {
"success": True,
"message": "VM deleted successfully",
"raw_response": result.stdout,
}
else:
return {"success": True, "message": "VM deleted successfully"}
except subprocess.SubprocessError as e:
logger.error(f"Failed to execute delete request: {e}")
return {"error": f"Failed to execute delete request: {str(e)}"}
def parse_memory(memory_str: str) -> int:
"""Parse memory string to MB integer.
Examples:
"8GB" -> 8192
"1024MB" -> 1024
"512" -> 512
Returns:
Memory value in MB
"""
if isinstance(memory_str, int):
return memory_str
if isinstance(memory_str, str):
# Extract number and unit
import re
match = re.match(r"(\d+)([A-Za-z]*)", memory_str)
if match:
value, unit = match.groups()
value = int(value)
unit = unit.upper()
if unit == "GB" or unit == "G":
return value * 1024
elif unit == "MB" or unit == "M" or unit == "":
return value
# Default fallback
logger.warning(f"Could not parse memory string '{memory_str}', using 8GB default")
return 8192 # Default to 8GB
```
--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/handlers/linux.py:
--------------------------------------------------------------------------------
```python
"""
Linux implementation of automation and accessibility handlers.
This implementation attempts to use pyautogui for GUI automation when available.
If running in a headless environment without X11, it will fall back to simulated responses.
To use GUI automation in a headless environment:
1. Install Xvfb: sudo apt-get install xvfb
2. Run with virtual display: xvfb-run python -m computer_server
"""
import asyncio
import base64
import json
import logging
import os
import subprocess
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple
# Configure logger
logger = logging.getLogger(__name__)
# Try to import pyautogui, but don't fail if it's not available
# This allows the server to run in headless environments
try:
import pyautogui
pyautogui.FAILSAFE = False
logger.info("pyautogui successfully imported, GUI automation available")
except Exception as e:
logger.warning(f"pyautogui import failed: {str(e)}. GUI operations will be simulated.")
from pynput.keyboard import Controller as KeyboardController
from pynput.keyboard import Key
from pynput.mouse import Button
from pynput.mouse import Controller as MouseController
from .base import BaseAccessibilityHandler, BaseAutomationHandler
class LinuxAccessibilityHandler(BaseAccessibilityHandler):
"""Linux implementation of accessibility handler."""
async def get_accessibility_tree(self) -> Dict[str, Any]:
"""Get the accessibility tree of the current window.
Returns:
Dict[str, Any]: A dictionary containing success status and a simulated tree structure
since Linux doesn't have equivalent accessibility API like macOS.
"""
# Linux doesn't have equivalent accessibility API like macOS
# Return a minimal dummy tree
logger.info(
"Getting accessibility tree (simulated, no accessibility API available on Linux)"
)
return {
"success": True,
"tree": {
"role": "Window",
"title": "Linux Window",
"position": {"x": 0, "y": 0},
"size": {"width": 1920, "height": 1080},
"children": [],
},
}
async def find_element(
self, role: Optional[str] = None, title: Optional[str] = None, value: Optional[str] = None
) -> Dict[str, Any]:
"""Find an element in the accessibility tree by criteria.
Args:
role: The role of the element to find.
title: The title of the element to find.
value: The value of the element to find.
Returns:
Dict[str, Any]: A dictionary indicating that element search is not supported on Linux.
"""
logger.info(
f"Finding element with role={role}, title={title}, value={value} (not supported on Linux)"
)
return {"success": False, "message": "Element search not supported on Linux"}
def get_cursor_position(self) -> Tuple[int, int]:
"""Get the current cursor position.
Returns:
Tuple[int, int]: The x and y coordinates of the cursor position.
Returns (0, 0) if pyautogui is not available.
"""
try:
pos = pyautogui.position()
return pos.x, pos.y
except Exception as e:
logger.warning(f"Failed to get cursor position with pyautogui: {e}")
logger.info("Getting cursor position (simulated)")
return 0, 0
def get_screen_size(self) -> Tuple[int, int]:
"""Get the screen size.
Returns:
Tuple[int, int]: The width and height of the screen in pixels.
Returns (1920, 1080) if pyautogui is not available.
"""
try:
size = pyautogui.size()
return size.width, size.height
except Exception as e:
logger.warning(f"Failed to get screen size with pyautogui: {e}")
logger.info("Getting screen size (simulated)")
return 1920, 1080
class LinuxAutomationHandler(BaseAutomationHandler):
"""Linux implementation of automation handler using pyautogui."""
keyboard = KeyboardController()
mouse = MouseController()
# Mouse Actions
async def mouse_down(
self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left"
) -> Dict[str, Any]:
"""Press and hold a mouse button at the specified coordinates.
Args:
x: The x coordinate to move to before pressing. If None, uses current position.
y: The y coordinate to move to before pressing. If None, uses current position.
button: The mouse button to press ("left", "right", or "middle").
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.mouseDown(button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def mouse_up(
self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left"
) -> Dict[str, Any]:
"""Release a mouse button at the specified coordinates.
Args:
x: The x coordinate to move to before releasing. If None, uses current position.
y: The y coordinate to move to before releasing. If None, uses current position.
button: The mouse button to release ("left", "right", or "middle").
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.mouseUp(button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
"""Move the cursor to the specified coordinates.
Args:
x: The x coordinate to move to.
y: The y coordinate to move to.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.moveTo(x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a left mouse click at the specified coordinates.
Args:
x: The x coordinate to click at. If None, clicks at current position.
y: The y coordinate to click at. If None, clicks at current position.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.click()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a right mouse click at the specified coordinates.
Args:
x: The x coordinate to click at. If None, clicks at current position.
y: The y coordinate to click at. If None, clicks at current position.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.rightClick()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def double_click(
self, x: Optional[int] = None, y: Optional[int] = None
) -> Dict[str, Any]:
"""Perform a double click at the specified coordinates.
Args:
x: The x coordinate to double click at. If None, clicks at current position.
y: The y coordinate to double click at. If None, clicks at current position.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.doubleClick(interval=0.1)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def click(
self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left"
) -> Dict[str, Any]:
"""Perform a mouse click with the specified button at the given coordinates.
Args:
x: The x coordinate to click at. If None, clicks at current position.
y: The y coordinate to click at. If None, clicks at current position.
button: The mouse button to click ("left", "right", or "middle").
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.click(button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def drag_to(
self, x: int, y: int, button: str = "left", duration: float = 0.5
) -> Dict[str, Any]:
"""Drag from the current position to the specified coordinates.
Args:
x: The x coordinate to drag to.
y: The y coordinate to drag to.
button: The mouse button to use for dragging.
duration: The time in seconds to take for the drag operation.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.dragTo(x, y, duration=duration, button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def drag(
self, start_x: int, start_y: int, end_x: int, end_y: int, button: str = "left"
) -> Dict[str, Any]:
"""Drag from start coordinates to end coordinates.
Args:
start_x: The starting x coordinate.
start_y: The starting y coordinate.
end_x: The ending x coordinate.
end_y: The ending y coordinate.
button: The mouse button to use for dragging.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.moveTo(start_x, start_y)
pyautogui.dragTo(end_x, end_y, duration=0.5, button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def drag_path(
self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5
) -> Dict[str, Any]:
"""Drag along a path defined by a list of coordinates.
Args:
path: A list of (x, y) coordinate tuples defining the drag path.
button: The mouse button to use for dragging.
duration: The time in seconds to take for each segment of the drag.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if not path:
return {"success": False, "error": "Path is empty"}
pyautogui.moveTo(*path[0])
for x, y in path[1:]:
pyautogui.dragTo(x, y, duration=duration, button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Keyboard Actions
async def key_down(self, key: str) -> Dict[str, Any]:
"""Press and hold a key.
Args:
key: The key to press down.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.keyDown(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def key_up(self, key: str) -> Dict[str, Any]:
"""Release a key.
Args:
key: The key to release.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.keyUp(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def type_text(self, text: str) -> Dict[str, Any]:
"""Type the specified text using the keyboard.
Args:
text: The text to type.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
# use pynput for Unicode support
self.keyboard.type(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def press_key(self, key: str) -> Dict[str, Any]:
"""Press and release a key.
Args:
key: The key to press.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.press(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
"""Press a combination of keys simultaneously.
Args:
keys: A list of keys to press together as a hotkey combination.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.hotkey(*keys)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Scrolling Actions
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
"""Scroll the mouse wheel.
Args:
x: The horizontal scroll amount.
y: The vertical scroll amount.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
self.mouse.scroll(x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
"""Scroll down by the specified number of clicks.
Args:
clicks: The number of scroll clicks to perform downward.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.scroll(-clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
"""Scroll up by the specified number of clicks.
Args:
clicks: The number of scroll clicks to perform upward.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.scroll(clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Screen Actions
async def screenshot(self) -> Dict[str, Any]:
"""Take a screenshot of the current screen.
Returns:
Dict[str, Any]: A dictionary containing success status and base64-encoded image data,
or error message if failed.
"""
try:
from PIL import Image
screenshot = pyautogui.screenshot()
if not isinstance(screenshot, Image.Image):
return {"success": False, "error": "Failed to capture screenshot"}
buffered = BytesIO()
screenshot.save(buffered, format="PNG", optimize=True)
buffered.seek(0)
image_data = base64.b64encode(buffered.getvalue()).decode()
return {"success": True, "image_data": image_data}
except Exception as e:
return {"success": False, "error": f"Screenshot error: {str(e)}"}
async def get_screen_size(self) -> Dict[str, Any]:
"""Get the size of the screen.
Returns:
Dict[str, Any]: A dictionary containing success status and screen dimensions,
or error message if failed.
"""
try:
size = pyautogui.size()
return {"success": True, "size": {"width": size.width, "height": size.height}}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_cursor_position(self) -> Dict[str, Any]:
"""Get the current position of the cursor.
Returns:
Dict[str, Any]: A dictionary containing success status and cursor coordinates,
or error message if failed.
"""
try:
pos = pyautogui.position()
return {"success": True, "position": {"x": pos.x, "y": pos.y}}
except Exception as e:
return {"success": False, "error": str(e)}
# Clipboard Actions
async def copy_to_clipboard(self) -> Dict[str, Any]:
"""Get the current content of the clipboard.
Returns:
Dict[str, Any]: A dictionary containing success status and clipboard content,
or error message if failed.
"""
try:
import pyperclip
content = pyperclip.paste()
return {"success": True, "content": content}
except Exception as e:
return {"success": False, "error": str(e)}
async def set_clipboard(self, text: str) -> Dict[str, Any]:
"""Set the clipboard content to the specified text.
Args:
text: The text to copy to the clipboard.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
import pyperclip
pyperclip.copy(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Command Execution
async def run_command(self, command: str) -> Dict[str, Any]:
"""Execute a shell command asynchronously.
Args:
command: The shell command to execute.
Returns:
Dict[str, Any]: A dictionary containing success status, stdout, stderr,
and return code, or error message if failed.
"""
try:
# Create subprocess
process = await asyncio.create_subprocess_shell(
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
# Return decoded output
return {
"success": True,
"stdout": stdout.decode() if stdout else "",
"stderr": stderr.decode() if stderr else "",
"return_code": process.returncode,
}
except Exception as e:
return {"success": False, "error": str(e)}
```
--------------------------------------------------------------------------------
/libs/python/computer/computer/interface/base.py:
--------------------------------------------------------------------------------
```python
"""Base interface for computer control."""
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple
from ..logger import Logger, LogLevel
from .models import CommandResult, MouseButton
class BaseComputerInterface(ABC):
"""Base class for computer control interfaces."""
def __init__(
self,
ip_address: str,
username: str = "lume",
password: str = "lume",
api_key: Optional[str] = None,
vm_name: Optional[str] = None,
):
"""Initialize interface.
Args:
ip_address: IP address of the computer to control
username: Username for authentication
password: Password for authentication
api_key: Optional API key for cloud authentication
vm_name: Optional VM name for cloud authentication
"""
self.ip_address = ip_address
self.username = username
self.password = password
self.api_key = api_key
self.vm_name = vm_name
self.logger = Logger("cua.interface", LogLevel.NORMAL)
# Optional default delay time between commands (in seconds)
self.delay: float = 0.0
@abstractmethod
async def wait_for_ready(self, timeout: int = 60) -> None:
"""Wait for interface to be ready.
Args:
timeout: Maximum time to wait in seconds
Raises:
TimeoutError: If interface is not ready within timeout
"""
pass
@abstractmethod
def close(self) -> None:
"""Close the interface connection."""
pass
def force_close(self) -> None:
"""Force close the interface connection.
By default, this just calls close(), but subclasses can override
to provide more forceful cleanup.
"""
self.close()
# Mouse Actions
@abstractmethod
async def mouse_down(
self,
x: Optional[int] = None,
y: Optional[int] = None,
button: "MouseButton" = "left",
delay: Optional[float] = None,
) -> None:
"""Press and hold a mouse button.
Args:
x: X coordinate to press at. If None, uses current cursor position.
y: Y coordinate to press at. If None, uses current cursor position.
button: Mouse button to press ('left', 'middle', 'right').
delay: Optional delay in seconds after the action
"""
pass
@abstractmethod
async def mouse_up(
self,
x: Optional[int] = None,
y: Optional[int] = None,
button: "MouseButton" = "left",
delay: Optional[float] = None,
) -> None:
"""Release a mouse button.
Args:
x: X coordinate to release at. If None, uses current cursor position.
y: Y coordinate to release at. If None, uses current cursor position.
button: Mouse button to release ('left', 'middle', 'right').
delay: Optional delay in seconds after the action
"""
pass
@abstractmethod
async def left_click(
self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None
) -> None:
"""Perform a left mouse button click.
Args:
x: X coordinate to click at. If None, uses current cursor position.
y: Y coordinate to click at. If None, uses current cursor position.
delay: Optional delay in seconds after the action
"""
pass
@abstractmethod
async def right_click(
self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None
) -> None:
"""Perform a right mouse button click.
Args:
x: X coordinate to click at. If None, uses current cursor position.
y: Y coordinate to click at. If None, uses current cursor position.
delay: Optional delay in seconds after the action
"""
pass
@abstractmethod
async def double_click(
self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None
) -> None:
"""Perform a double left mouse button click.
Args:
x: X coordinate to double-click at. If None, uses current cursor position.
y: Y coordinate to double-click at. If None, uses current cursor position.
delay: Optional delay in seconds after the action
"""
pass
@abstractmethod
async def move_cursor(self, x: int, y: int, delay: Optional[float] = None) -> None:
"""Move the cursor to the specified screen coordinates.
Args:
x: X coordinate to move cursor to.
y: Y coordinate to move cursor to.
delay: Optional delay in seconds after the action
"""
pass
@abstractmethod
async def drag_to(
self,
x: int,
y: int,
button: str = "left",
duration: float = 0.5,
delay: Optional[float] = None,
) -> None:
"""Drag from current position to specified coordinates.
Args:
x: The x coordinate to drag to
y: The y coordinate to drag to
button: The mouse button to use ('left', 'middle', 'right')
duration: How long the drag should take in seconds
delay: Optional delay in seconds after the action
"""
pass
@abstractmethod
async def drag(
self,
path: List[Tuple[int, int]],
button: str = "left",
duration: float = 0.5,
delay: Optional[float] = None,
) -> None:
"""Drag the cursor along a path of coordinates.
Args:
path: List of (x, y) coordinate tuples defining the drag path
button: The mouse button to use ('left', 'middle', 'right')
duration: Total time in seconds that the drag operation should take
delay: Optional delay in seconds after the action
"""
pass
# Keyboard Actions
@abstractmethod
async def key_down(self, key: str, delay: Optional[float] = None) -> None:
"""Press and hold a key.
Args:
key: The key to press and hold (e.g., 'a', 'shift', 'ctrl').
delay: Optional delay in seconds after the action.
"""
pass
@abstractmethod
async def key_up(self, key: str, delay: Optional[float] = None) -> None:
"""Release a previously pressed key.
Args:
key: The key to release (e.g., 'a', 'shift', 'ctrl').
delay: Optional delay in seconds after the action.
"""
pass
@abstractmethod
async def type_text(self, text: str, delay: Optional[float] = None) -> None:
"""Type the specified text string.
Args:
text: The text string to type.
delay: Optional delay in seconds after the action.
"""
pass
@abstractmethod
async def press_key(self, key: str, delay: Optional[float] = None) -> None:
"""Press and release a single key.
Args:
key: The key to press (e.g., 'a', 'enter', 'escape').
delay: Optional delay in seconds after the action.
"""
pass
@abstractmethod
async def hotkey(self, *keys: str, delay: Optional[float] = None) -> None:
"""Press multiple keys simultaneously (keyboard shortcut).
Args:
*keys: Variable number of keys to press together (e.g., 'ctrl', 'c').
delay: Optional delay in seconds after the action.
"""
pass
# Scrolling Actions
@abstractmethod
async def scroll(self, x: int, y: int, delay: Optional[float] = None) -> None:
"""Scroll the mouse wheel by specified amounts.
Args:
x: Horizontal scroll amount (positive = right, negative = left).
y: Vertical scroll amount (positive = up, negative = down).
delay: Optional delay in seconds after the action.
"""
pass
@abstractmethod
async def scroll_down(self, clicks: int = 1, delay: Optional[float] = None) -> None:
"""Scroll down by the specified number of clicks.
Args:
clicks: Number of scroll clicks to perform downward.
delay: Optional delay in seconds after the action.
"""
pass
@abstractmethod
async def scroll_up(self, clicks: int = 1, delay: Optional[float] = None) -> None:
"""Scroll up by the specified number of clicks.
Args:
clicks: Number of scroll clicks to perform upward.
delay: Optional delay in seconds after the action.
"""
pass
# Screen Actions
@abstractmethod
async def screenshot(self) -> bytes:
"""Take a screenshot.
Returns:
Raw bytes of the screenshot image
"""
pass
@abstractmethod
async def get_screen_size(self) -> Dict[str, int]:
"""Get the screen dimensions.
Returns:
Dict with 'width' and 'height' keys
"""
pass
@abstractmethod
async def get_cursor_position(self) -> Dict[str, int]:
"""Get the current cursor position on screen.
Returns:
Dict with 'x' and 'y' keys containing cursor coordinates.
"""
pass
# Clipboard Actions
@abstractmethod
async def copy_to_clipboard(self) -> str:
"""Get the current clipboard content.
Returns:
The text content currently stored in the clipboard.
"""
pass
@abstractmethod
async def set_clipboard(self, text: str) -> None:
"""Set the clipboard content to the specified text.
Args:
text: The text to store in the clipboard.
"""
pass
# File System Actions
@abstractmethod
async def file_exists(self, path: str) -> bool:
"""Check if a file exists at the specified path.
Args:
path: The file path to check.
Returns:
True if the file exists, False otherwise.
"""
pass
@abstractmethod
async def directory_exists(self, path: str) -> bool:
"""Check if a directory exists at the specified path.
Args:
path: The directory path to check.
Returns:
True if the directory exists, False otherwise.
"""
pass
@abstractmethod
async def list_dir(self, path: str) -> List[str]:
"""List the contents of a directory.
Args:
path: The directory path to list.
Returns:
List of file and directory names in the specified directory.
"""
pass
@abstractmethod
async def read_text(self, path: str) -> str:
"""Read the text contents of a file.
Args:
path: The file path to read from.
Returns:
The text content of the file.
"""
pass
@abstractmethod
async def write_text(self, path: str, content: str) -> None:
"""Write text content to a file.
Args:
path: The file path to write to.
content: The text content to write.
"""
pass
@abstractmethod
async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> bytes:
"""Read file binary contents with optional seeking support.
Args:
path: Path to the file
offset: Byte offset to start reading from (default: 0)
length: Number of bytes to read (default: None for entire file)
"""
pass
@abstractmethod
async def write_bytes(self, path: str, content: bytes) -> None:
"""Write binary content to a file.
Args:
path: The file path to write to.
content: The binary content to write.
"""
pass
@abstractmethod
async def delete_file(self, path: str) -> None:
"""Delete a file at the specified path.
Args:
path: The file path to delete.
"""
pass
@abstractmethod
async def create_dir(self, path: str) -> None:
"""Create a directory at the specified path.
Args:
path: The directory path to create.
"""
pass
@abstractmethod
async def delete_dir(self, path: str) -> None:
"""Delete a directory at the specified path.
Args:
path: The directory path to delete.
"""
pass
@abstractmethod
async def get_file_size(self, path: str) -> int:
"""Get the size of a file in bytes.
Args:
path: The file path to get the size of.
Returns:
The size of the file in bytes.
"""
pass
# Desktop actions
@abstractmethod
async def get_desktop_environment(self) -> str:
"""Get the current desktop environment.
Returns:
The name of the current desktop environment.
"""
pass
@abstractmethod
async def set_wallpaper(self, path: str) -> None:
"""Set the desktop wallpaper to the specified path.
Args:
path: The file path to set as wallpaper
"""
pass
# Window management
@abstractmethod
async def open(self, target: str) -> None:
"""Open a target using the system's default handler.
Typically opens files, folders, or URLs with the associated application.
Args:
target: The file path, folder path, or URL to open.
"""
pass
@abstractmethod
async def launch(self, app: str, args: List[str] | None = None) -> Optional[int]:
"""Launch an application with optional arguments.
Args:
app: The application executable or bundle identifier.
args: Optional list of arguments to pass to the application.
Returns:
Optional process ID (PID) of the launched application if available, otherwise None.
"""
pass
@abstractmethod
async def get_current_window_id(self) -> int | str:
"""Get the identifier of the currently active/focused window.
Returns:
A window identifier that can be used with other window management methods.
"""
pass
@abstractmethod
async def get_application_windows(self, app: str) -> List[int | str]:
"""Get all window identifiers for a specific application.
Args:
app: The application name, executable, or identifier to query.
Returns:
A list of window identifiers belonging to the specified application.
"""
pass
@abstractmethod
async def get_window_name(self, window_id: int | str) -> str:
"""Get the title/name of a window.
Args:
window_id: The window identifier.
Returns:
The window's title or name string.
"""
pass
@abstractmethod
async def get_window_size(self, window_id: int | str) -> tuple[int, int]:
"""Get the size of a window in pixels.
Args:
window_id: The window identifier.
Returns:
A tuple of (width, height) representing the window size in pixels.
"""
pass
@abstractmethod
async def get_window_position(self, window_id: int | str) -> tuple[int, int]:
"""Get the screen position of a window.
Args:
window_id: The window identifier.
Returns:
A tuple of (x, y) representing the window's top-left corner in screen coordinates.
"""
pass
@abstractmethod
async def set_window_size(self, window_id: int | str, width: int, height: int) -> None:
"""Set the size of a window in pixels.
Args:
window_id: The window identifier.
width: Desired width in pixels.
height: Desired height in pixels.
"""
pass
@abstractmethod
async def set_window_position(self, window_id: int | str, x: int, y: int) -> None:
"""Move a window to a specific position on the screen.
Args:
window_id: The window identifier.
x: X coordinate for the window's top-left corner.
y: Y coordinate for the window's top-left corner.
"""
pass
@abstractmethod
async def maximize_window(self, window_id: int | str) -> None:
"""Maximize a window.
Args:
window_id: The window identifier.
"""
pass
@abstractmethod
async def minimize_window(self, window_id: int | str) -> None:
"""Minimize a window.
Args:
window_id: The window identifier.
"""
pass
@abstractmethod
async def activate_window(self, window_id: int | str) -> None:
"""Bring a window to the foreground and focus it.
Args:
window_id: The window identifier.
"""
pass
@abstractmethod
async def close_window(self, window_id: int | str) -> None:
"""Close a window.
Args:
window_id: The window identifier.
"""
pass
# Convenience aliases
async def get_window_title(self, window_id: int | str) -> str:
"""Convenience alias for get_window_name().
Args:
window_id: The window identifier.
Returns:
The window's title or name string.
"""
return await self.get_window_name(window_id)
async def window_size(self, window_id: int | str) -> tuple[int, int]:
"""Convenience alias for get_window_size().
Args:
window_id: The window identifier.
Returns:
A tuple of (width, height) representing the window size in pixels.
"""
return await self.get_window_size(window_id)
# Shell actions
@abstractmethod
async def run_command(self, command: str) -> CommandResult:
"""Run shell command and return structured result.
Executes a shell command using subprocess.run with shell=True and check=False.
The command is run in the target environment and captures both stdout and stderr.
Args:
command (str): The shell command to execute
Returns:
CommandResult: A structured result containing:
- stdout (str): Standard output from the command
- stderr (str): Standard error from the command
- returncode (int): Exit code from the command (0 indicates success)
Raises:
RuntimeError: If the command execution fails at the system level
Example:
result = await interface.run_command("ls -la")
if result.returncode == 0:
print(f"Output: {result.stdout}")
else:
print(f"Error: {result.stderr}, Exit code: {result.returncode}")
"""
pass
# Accessibility Actions
@abstractmethod
async def get_accessibility_tree(self) -> Dict:
"""Get the accessibility tree of the current screen.
Returns:
Dict containing the hierarchical accessibility information of screen elements.
"""
pass
@abstractmethod
async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screenshot coordinates to screen coordinates.
Args:
x: X coordinate in screenshot space
y: Y coordinate in screenshot space
Returns:
tuple[float, float]: (x, y) coordinates in screen space
"""
pass
@abstractmethod
async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screen coordinates to screenshot coordinates.
Args:
x: X coordinate in screen space
y: Y coordinate in screen space
Returns:
tuple[float, float]: (x, y) coordinates in screenshot space
"""
pass
```