This is page 16 of 21. Use http://codebase.md/trycua/cua?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── scripts
│ │ ├── get_pyproject_version.py
│ │ └── tests
│ │ ├── __init__.py
│ │ ├── README.md
│ │ └── test_get_pyproject_version.py
│ └── workflows
│ ├── bump-version.yml
│ ├── ci-lume.yml
│ ├── docker-publish-cua-linux.yml
│ ├── docker-publish-cua-windows.yml
│ ├── docker-publish-kasm.yml
│ ├── docker-publish-xfce.yml
│ ├── docker-reusable-publish.yml
│ ├── link-check.yml
│ ├── lint.yml
│ ├── npm-publish-cli.yml
│ ├── npm-publish-computer.yml
│ ├── npm-publish-core.yml
│ ├── publish-lume.yml
│ ├── pypi-publish-agent.yml
│ ├── pypi-publish-computer-server.yml
│ ├── pypi-publish-computer.yml
│ ├── pypi-publish-core.yml
│ ├── pypi-publish-mcp-server.yml
│ ├── pypi-publish-som.yml
│ ├── pypi-reusable-publish.yml
│ ├── python-tests.yml
│ ├── test-cua-models.yml
│ └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│ ├── docs.code-workspace
│ ├── extensions.json
│ ├── launch.json
│ ├── libs-ts.code-workspace
│ ├── lume.code-workspace
│ ├── lumier.code-workspace
│ ├── py.code-workspace
│ └── settings.json
├── blog
│ ├── app-use.md
│ ├── assets
│ │ ├── composite-agents.png
│ │ ├── docker-ubuntu-support.png
│ │ ├── hack-booth.png
│ │ ├── hack-closing-ceremony.jpg
│ │ ├── hack-cua-ollama-hud.jpeg
│ │ ├── hack-leaderboard.png
│ │ ├── hack-the-north.png
│ │ ├── hack-winners.jpeg
│ │ ├── hack-workshop.jpeg
│ │ ├── hud-agent-evals.png
│ │ └── trajectory-viewer.jpeg
│ ├── bringing-computer-use-to-the-web.md
│ ├── build-your-own-operator-on-macos-1.md
│ ├── build-your-own-operator-on-macos-2.md
│ ├── cloud-windows-ga-macos-preview.md
│ ├── composite-agents.md
│ ├── computer-use-agents-for-growth-hacking.md
│ ├── cua-hackathon.md
│ ├── cua-playground-preview.md
│ ├── cua-vlm-router.md
│ ├── hack-the-north.md
│ ├── hud-agent-evals.md
│ ├── human-in-the-loop.md
│ ├── introducing-cua-cli.md
│ ├── introducing-cua-cloud-containers.md
│ ├── lume-to-containerization.md
│ ├── neurips-2025-cua-papers.md
│ ├── sandboxed-python-execution.md
│ ├── training-computer-use-models-trajectories-1.md
│ ├── trajectory-viewer.md
│ ├── ubuntu-docker-support.md
│ └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│ ├── .env.example
│ ├── .gitignore
│ ├── content
│ │ └── docs
│ │ ├── agent-sdk
│ │ │ ├── agent-loops.mdx
│ │ │ ├── benchmarks
│ │ │ │ ├── index.mdx
│ │ │ │ ├── interactive.mdx
│ │ │ │ ├── introduction.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── osworld-verified.mdx
│ │ │ │ ├── screenspot-pro.mdx
│ │ │ │ └── screenspot-v2.mdx
│ │ │ ├── callbacks
│ │ │ │ ├── agent-lifecycle.mdx
│ │ │ │ ├── cost-saving.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── logging.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── pii-anonymization.mdx
│ │ │ │ └── trajectories.mdx
│ │ │ ├── chat-history.mdx
│ │ │ ├── custom-tools.mdx
│ │ │ ├── customizing-computeragent.mdx
│ │ │ ├── integrations
│ │ │ │ ├── hud.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── observability.mdx
│ │ │ ├── mcp-server
│ │ │ │ ├── client-integrations.mdx
│ │ │ │ ├── configuration.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── llm-integrations.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── tools.mdx
│ │ │ │ └── usage.mdx
│ │ │ ├── message-format.mdx
│ │ │ ├── meta.json
│ │ │ ├── migration-guide.mdx
│ │ │ ├── prompt-caching.mdx
│ │ │ ├── supported-agents
│ │ │ │ ├── composed-agents.mdx
│ │ │ │ ├── computer-use-agents.mdx
│ │ │ │ ├── grounding-models.mdx
│ │ │ │ ├── human-in-the-loop.mdx
│ │ │ │ └── meta.json
│ │ │ ├── supported-model-providers
│ │ │ │ ├── cua-vlm-router.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ └── local-models.mdx
│ │ │ ├── telemetry.mdx
│ │ │ └── usage-tracking.mdx
│ │ ├── cli-playbook
│ │ │ ├── commands.mdx
│ │ │ ├── index.mdx
│ │ │ └── meta.json
│ │ ├── computer-sdk
│ │ │ ├── cloud-vm-management.mdx
│ │ │ ├── commands.mdx
│ │ │ ├── computer-server
│ │ │ │ ├── Commands.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── REST-API.mdx
│ │ │ │ └── WebSocket-API.mdx
│ │ │ ├── computer-ui.mdx
│ │ │ ├── computers.mdx
│ │ │ ├── custom-computer-handlers.mdx
│ │ │ ├── meta.json
│ │ │ ├── sandboxed-python.mdx
│ │ │ └── tracing-api.mdx
│ │ ├── example-usecases
│ │ │ ├── form-filling.mdx
│ │ │ ├── gemini-complex-ui-navigation.mdx
│ │ │ ├── meta.json
│ │ │ ├── post-event-contact-export.mdx
│ │ │ └── windows-app-behind-vpn.mdx
│ │ ├── get-started
│ │ │ ├── meta.json
│ │ │ └── quickstart.mdx
│ │ ├── index.mdx
│ │ ├── macos-vm-cli-playbook
│ │ │ ├── lume
│ │ │ │ ├── cli-reference.mdx
│ │ │ │ ├── faq.md
│ │ │ │ ├── http-api.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── prebuilt-images.mdx
│ │ │ ├── lumier
│ │ │ │ ├── building-lumier.mdx
│ │ │ │ ├── docker-compose.mdx
│ │ │ │ ├── docker.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ └── meta.json
│ │ │ └── meta.json
│ │ └── meta.json
│ ├── next.config.mjs
│ ├── package-lock.json
│ ├── package.json
│ ├── pnpm-lock.yaml
│ ├── postcss.config.mjs
│ ├── public
│ │ └── img
│ │ ├── agent_gradio_ui.png
│ │ ├── agent.png
│ │ ├── bg-dark.jpg
│ │ ├── bg-light.jpg
│ │ ├── cli.png
│ │ ├── computer.png
│ │ ├── grounding-with-gemini3.gif
│ │ ├── hero.png
│ │ ├── laminar_trace_example.png
│ │ ├── som_box_threshold.png
│ │ └── som_iou_threshold.png
│ ├── README.md
│ ├── source.config.ts
│ ├── src
│ │ ├── app
│ │ │ ├── (home)
│ │ │ │ ├── [[...slug]]
│ │ │ │ │ └── page.tsx
│ │ │ │ └── layout.tsx
│ │ │ ├── api
│ │ │ │ ├── posthog
│ │ │ │ │ └── [...path]
│ │ │ │ │ └── route.ts
│ │ │ │ └── search
│ │ │ │ └── route.ts
│ │ │ ├── favicon.ico
│ │ │ ├── global.css
│ │ │ ├── layout.config.tsx
│ │ │ ├── layout.tsx
│ │ │ ├── llms.mdx
│ │ │ │ └── [[...slug]]
│ │ │ │ └── route.ts
│ │ │ ├── llms.txt
│ │ │ │ └── route.ts
│ │ │ ├── robots.ts
│ │ │ └── sitemap.ts
│ │ ├── assets
│ │ │ ├── discord-black.svg
│ │ │ ├── discord-white.svg
│ │ │ ├── logo-black.svg
│ │ │ └── logo-white.svg
│ │ ├── components
│ │ │ ├── analytics-tracker.tsx
│ │ │ ├── cookie-consent.tsx
│ │ │ ├── doc-actions-menu.tsx
│ │ │ ├── editable-code-block.tsx
│ │ │ ├── footer.tsx
│ │ │ ├── hero.tsx
│ │ │ ├── iou.tsx
│ │ │ ├── mermaid.tsx
│ │ │ └── page-feedback.tsx
│ │ ├── lib
│ │ │ ├── llms.ts
│ │ │ └── source.ts
│ │ ├── mdx-components.tsx
│ │ └── providers
│ │ └── posthog-provider.tsx
│ └── tsconfig.json
├── examples
│ ├── agent_examples.py
│ ├── agent_ui_examples.py
│ ├── browser_agent_example.py
│ ├── browser_tool_example.py
│ ├── cloud_api_examples.py
│ ├── computer_examples_windows.py
│ ├── computer_examples.py
│ ├── computer_ui_examples.py
│ ├── computer-example-ts
│ │ ├── .env.example
│ │ ├── .gitignore
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── README.md
│ │ ├── src
│ │ │ ├── helpers.ts
│ │ │ └── index.ts
│ │ └── tsconfig.json
│ ├── docker_examples.py
│ ├── evals
│ │ ├── hud_eval_examples.py
│ │ └── wikipedia_most_linked.txt
│ ├── pylume_examples.py
│ ├── sandboxed_functions_examples.py
│ ├── som_examples.py
│ ├── tracing_examples.py
│ ├── utils.py
│ └── winsandbox_example.py
├── img
│ ├── agent_gradio_ui.png
│ ├── agent.png
│ ├── cli.png
│ ├── computer.png
│ ├── logo_black.png
│ └── logo_white.png
├── libs
│ ├── kasm
│ │ ├── Dockerfile
│ │ ├── LICENSE
│ │ ├── README.md
│ │ └── src
│ │ └── ubuntu
│ │ └── install
│ │ └── firefox
│ │ ├── custom_startup.sh
│ │ ├── firefox.desktop
│ │ └── install_firefox.sh
│ ├── lume
│ │ ├── .cursorignore
│ │ ├── CONTRIBUTING.md
│ │ ├── Development.md
│ │ ├── img
│ │ │ └── cli.png
│ │ ├── Package.resolved
│ │ ├── Package.swift
│ │ ├── README.md
│ │ ├── resources
│ │ │ └── lume.entitlements
│ │ ├── scripts
│ │ │ ├── build
│ │ │ │ ├── build-debug.sh
│ │ │ │ ├── build-release-notarized.sh
│ │ │ │ └── build-release.sh
│ │ │ └── install.sh
│ │ ├── src
│ │ │ ├── Commands
│ │ │ │ ├── Clone.swift
│ │ │ │ ├── Config.swift
│ │ │ │ ├── Create.swift
│ │ │ │ ├── Delete.swift
│ │ │ │ ├── Get.swift
│ │ │ │ ├── Images.swift
│ │ │ │ ├── IPSW.swift
│ │ │ │ ├── List.swift
│ │ │ │ ├── Logs.swift
│ │ │ │ ├── Options
│ │ │ │ │ └── FormatOption.swift
│ │ │ │ ├── Prune.swift
│ │ │ │ ├── Pull.swift
│ │ │ │ ├── Push.swift
│ │ │ │ ├── Run.swift
│ │ │ │ ├── Serve.swift
│ │ │ │ ├── Set.swift
│ │ │ │ └── Stop.swift
│ │ │ ├── ContainerRegistry
│ │ │ │ ├── ImageContainerRegistry.swift
│ │ │ │ ├── ImageList.swift
│ │ │ │ └── ImagesPrinter.swift
│ │ │ ├── Errors
│ │ │ │ └── Errors.swift
│ │ │ ├── FileSystem
│ │ │ │ ├── Home.swift
│ │ │ │ ├── Settings.swift
│ │ │ │ ├── VMConfig.swift
│ │ │ │ ├── VMDirectory.swift
│ │ │ │ └── VMLocation.swift
│ │ │ ├── LumeController.swift
│ │ │ ├── Main.swift
│ │ │ ├── Server
│ │ │ │ ├── Handlers.swift
│ │ │ │ ├── HTTP.swift
│ │ │ │ ├── Requests.swift
│ │ │ │ ├── Responses.swift
│ │ │ │ └── Server.swift
│ │ │ ├── Utils
│ │ │ │ ├── CommandRegistry.swift
│ │ │ │ ├── CommandUtils.swift
│ │ │ │ ├── Logger.swift
│ │ │ │ ├── NetworkUtils.swift
│ │ │ │ ├── Path.swift
│ │ │ │ ├── ProcessRunner.swift
│ │ │ │ ├── ProgressLogger.swift
│ │ │ │ ├── String.swift
│ │ │ │ └── Utils.swift
│ │ │ ├── Virtualization
│ │ │ │ ├── DarwinImageLoader.swift
│ │ │ │ ├── DHCPLeaseParser.swift
│ │ │ │ ├── ImageLoaderFactory.swift
│ │ │ │ └── VMVirtualizationService.swift
│ │ │ ├── VM
│ │ │ │ ├── DarwinVM.swift
│ │ │ │ ├── LinuxVM.swift
│ │ │ │ ├── VM.swift
│ │ │ │ ├── VMDetails.swift
│ │ │ │ ├── VMDetailsPrinter.swift
│ │ │ │ ├── VMDisplayResolution.swift
│ │ │ │ └── VMFactory.swift
│ │ │ └── VNC
│ │ │ ├── PassphraseGenerator.swift
│ │ │ └── VNCService.swift
│ │ └── tests
│ │ ├── Mocks
│ │ │ ├── MockVM.swift
│ │ │ ├── MockVMVirtualizationService.swift
│ │ │ └── MockVNCService.swift
│ │ ├── VM
│ │ │ └── VMDetailsPrinterTests.swift
│ │ ├── VMTests.swift
│ │ ├── VMVirtualizationServiceTests.swift
│ │ └── VNCServiceTests.swift
│ ├── lumier
│ │ ├── .dockerignore
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── bin
│ │ │ └── entry.sh
│ │ ├── config
│ │ │ └── constants.sh
│ │ ├── hooks
│ │ │ └── on-logon.sh
│ │ └── lib
│ │ ├── utils.sh
│ │ └── vm.sh
│ ├── python
│ │ ├── agent
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── agent
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── adapters
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── azure_ml_adapter.py
│ │ │ │ │ ├── cua_adapter.py
│ │ │ │ │ ├── huggingfacelocal_adapter.py
│ │ │ │ │ ├── human_adapter.py
│ │ │ │ │ ├── mlxvlm_adapter.py
│ │ │ │ │ └── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ └── qwen2_5_vl.py
│ │ │ │ ├── agent.py
│ │ │ │ ├── callbacks
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── budget_manager.py
│ │ │ │ │ ├── image_retention.py
│ │ │ │ │ ├── logging.py
│ │ │ │ │ ├── operator_validator.py
│ │ │ │ │ ├── pii_anonymization.py
│ │ │ │ │ ├── prompt_instructions.py
│ │ │ │ │ ├── telemetry.py
│ │ │ │ │ └── trajectory_saver.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── computers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cua.py
│ │ │ │ │ └── custom.py
│ │ │ │ ├── decorators.py
│ │ │ │ ├── human_tool
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ ├── server.py
│ │ │ │ │ └── ui.py
│ │ │ │ ├── integrations
│ │ │ │ │ └── hud
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── agent.py
│ │ │ │ │ └── proxy.py
│ │ │ │ ├── loops
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── anthropic.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── composed_grounded.py
│ │ │ │ │ ├── fara.py
│ │ │ │ │ ├── gelato.py
│ │ │ │ │ ├── gemini.py
│ │ │ │ │ ├── generic_vlm.py
│ │ │ │ │ ├── glm45v.py
│ │ │ │ │ ├── gta1.py
│ │ │ │ │ ├── holo.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── model_types.csv
│ │ │ │ │ ├── moondream3.py
│ │ │ │ │ ├── omniparser.py
│ │ │ │ │ ├── openai.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ ├── uiins.py
│ │ │ │ │ ├── uitars.py
│ │ │ │ │ └── uitars2.py
│ │ │ │ ├── playground
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── server.py
│ │ │ │ ├── proxy
│ │ │ │ │ ├── examples.py
│ │ │ │ │ └── handlers.py
│ │ │ │ ├── responses.py
│ │ │ │ ├── tools
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ └── browser_tool.py
│ │ │ │ ├── types.py
│ │ │ │ └── ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ └── gradio
│ │ │ │ ├── __init__.py
│ │ │ │ ├── app.py
│ │ │ │ └── ui_components.py
│ │ │ ├── benchmarks
│ │ │ │ ├── .gitignore
│ │ │ │ ├── contrib.md
│ │ │ │ ├── interactive.py
│ │ │ │ ├── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ └── gta1.py
│ │ │ │ ├── README.md
│ │ │ │ ├── ss-pro.py
│ │ │ │ ├── ss-v2.py
│ │ │ │ └── utils.py
│ │ │ ├── example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer_agent.py
│ │ ├── bench-ui
│ │ │ ├── bench_ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api.py
│ │ │ │ └── child.py
│ │ │ ├── examples
│ │ │ │ ├── folder_example.py
│ │ │ │ ├── gui
│ │ │ │ │ ├── index.html
│ │ │ │ │ ├── logo.svg
│ │ │ │ │ └── styles.css
│ │ │ │ ├── output_overlay.png
│ │ │ │ └── simple_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ └── test_port_detection.py
│ │ ├── computer
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer
│ │ │ │ ├── __init__.py
│ │ │ │ ├── computer.py
│ │ │ │ ├── diorama_computer.py
│ │ │ │ ├── helpers.py
│ │ │ │ ├── interface
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ ├── models.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── logger.py
│ │ │ │ ├── models.py
│ │ │ │ ├── providers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cloud
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── docker
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── lume
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── lume_api.py
│ │ │ │ │ ├── lumier
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── types.py
│ │ │ │ │ └── winsandbox
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── provider.py
│ │ │ │ │ └── setup_script.ps1
│ │ │ │ ├── tracing_wrapper.py
│ │ │ │ ├── tracing.py
│ │ │ │ ├── ui
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ └── gradio
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── app.py
│ │ │ │ └── utils.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ ├── test_computer.py
│ │ │ └── test_helpers.py
│ │ ├── computer-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── browser.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── diorama
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── diorama_computer.py
│ │ │ │ │ ├── diorama.py
│ │ │ │ │ ├── draw.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── safezone.py
│ │ │ │ ├── handlers
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── main.py
│ │ │ │ ├── server.py
│ │ │ │ ├── utils
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── wallpaper.py
│ │ │ │ └── watchdog.py
│ │ │ ├── examples
│ │ │ │ ├── __init__.py
│ │ │ │ └── usage_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ ├── run_server.py
│ │ │ ├── test_connection.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_server.py
│ │ ├── core
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── core
│ │ │ │ ├── __init__.py
│ │ │ │ └── telemetry
│ │ │ │ ├── __init__.py
│ │ │ │ └── posthog.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_telemetry.py
│ │ ├── mcp-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── build-extension.py
│ │ │ ├── CONCURRENT_SESSIONS.md
│ │ │ ├── desktop-extension
│ │ │ │ ├── cua-extension.mcpb
│ │ │ │ ├── desktop_extension.png
│ │ │ │ ├── manifest.json
│ │ │ │ ├── README.md
│ │ │ │ ├── requirements.txt
│ │ │ │ ├── run_server.sh
│ │ │ │ └── setup.py
│ │ │ ├── mcp_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── server.py
│ │ │ │ └── session_manager.py
│ │ │ ├── pdm.lock
│ │ │ ├── pyproject.toml
│ │ │ ├── QUICK_TEST_COMMANDS.sh
│ │ │ ├── quick_test_local_option.py
│ │ │ ├── README.md
│ │ │ ├── scripts
│ │ │ │ ├── install_mcp_server.sh
│ │ │ │ └── start_mcp_server.sh
│ │ │ ├── test_mcp_server_local_option.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_mcp_server.py
│ │ ├── pylume
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_pylume.py
│ │ └── som
│ │ ├── .bumpversion.cfg
│ │ ├── LICENSE
│ │ ├── poetry.toml
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ ├── som
│ │ │ ├── __init__.py
│ │ │ ├── detect.py
│ │ │ ├── detection.py
│ │ │ ├── models.py
│ │ │ ├── ocr.py
│ │ │ ├── util
│ │ │ │ └── utils.py
│ │ │ └── visualization.py
│ │ └── tests
│ │ ├── conftest.py
│ │ └── test_omniparser.py
│ ├── qemu-docker
│ │ ├── linux
│ │ │ ├── Dockerfile
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ ├── entry.sh
│ │ │ └── vm
│ │ │ ├── image
│ │ │ │ └── README.md
│ │ │ └── setup
│ │ │ ├── install.sh
│ │ │ ├── setup-cua-server.sh
│ │ │ └── setup.sh
│ │ ├── README.md
│ │ └── windows
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── entry.sh
│ │ └── vm
│ │ ├── image
│ │ │ └── README.md
│ │ └── setup
│ │ ├── install.bat
│ │ ├── on-logon.ps1
│ │ ├── setup-cua-server.ps1
│ │ ├── setup-utils.psm1
│ │ └── setup.ps1
│ ├── typescript
│ │ ├── .gitignore
│ │ ├── .nvmrc
│ │ ├── agent
│ │ │ ├── examples
│ │ │ │ ├── playground-example.html
│ │ │ │ └── README.md
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── client.ts
│ │ │ │ ├── index.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ └── client.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── computer
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── computer
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── providers
│ │ │ │ │ │ ├── base.ts
│ │ │ │ │ │ ├── cloud.ts
│ │ │ │ │ │ └── index.ts
│ │ │ │ │ └── types.ts
│ │ │ │ ├── index.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── base.ts
│ │ │ │ │ ├── factory.ts
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── linux.ts
│ │ │ │ │ ├── macos.ts
│ │ │ │ │ └── windows.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ ├── computer
│ │ │ │ │ └── cloud.test.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── factory.test.ts
│ │ │ │ │ ├── index.test.ts
│ │ │ │ │ ├── linux.test.ts
│ │ │ │ │ ├── macos.test.ts
│ │ │ │ │ └── windows.test.ts
│ │ │ │ └── setup.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── core
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── index.ts
│ │ │ │ └── telemetry
│ │ │ │ ├── clients
│ │ │ │ │ ├── index.ts
│ │ │ │ │ └── posthog.ts
│ │ │ │ └── index.ts
│ │ │ ├── tests
│ │ │ │ └── telemetry.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── cua-cli
│ │ │ ├── .gitignore
│ │ │ ├── .prettierrc
│ │ │ ├── bun.lock
│ │ │ ├── CLAUDE.md
│ │ │ ├── index.ts
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── auth.ts
│ │ │ │ ├── cli.ts
│ │ │ │ ├── commands
│ │ │ │ │ ├── auth.ts
│ │ │ │ │ └── sandbox.ts
│ │ │ │ ├── config.ts
│ │ │ │ ├── http.ts
│ │ │ │ ├── storage.ts
│ │ │ │ └── util.ts
│ │ │ └── tsconfig.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── pnpm-workspace.yaml
│ │ └── README.md
│ └── xfce
│ ├── .dockerignore
│ ├── .gitignore
│ ├── Development.md
│ ├── Dockerfile
│ ├── Dockerfile.dev
│ ├── README.md
│ └── src
│ ├── scripts
│ │ ├── resize-display.sh
│ │ ├── start-computer-server.sh
│ │ ├── start-novnc.sh
│ │ ├── start-vnc.sh
│ │ └── xstartup.sh
│ ├── supervisor
│ │ └── supervisord.conf
│ └── xfce-config
│ ├── helpers.rc
│ ├── xfce4-power-manager.xml
│ └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│ ├── agent_nb.ipynb
│ ├── blog
│ │ ├── build-your-own-operator-on-macos-1.ipynb
│ │ └── build-your-own-operator-on-macos-2.ipynb
│ ├── composite_agents_docker_nb.ipynb
│ ├── computer_nb.ipynb
│ ├── computer_server_nb.ipynb
│ ├── customizing_computeragent.ipynb
│ ├── eval_osworld.ipynb
│ ├── ollama_nb.ipynb
│ ├── README.md
│ ├── sota_hackathon_cloud.ipynb
│ └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── install-cli.ps1
│ ├── install-cli.sh
│ ├── playground-docker.sh
│ ├── playground.sh
│ ├── run-docker-dev.sh
│ └── typescript-typecheck.js
├── TESTING.md
├── tests
│ ├── agent_loop_testing
│ │ ├── agent_test.py
│ │ └── README.md
│ ├── pytest.ini
│ ├── shell_cmd.py
│ ├── test_files.py
│ ├── test_mcp_server_session_management.py
│ ├── test_mcp_server_streaming.py
│ ├── test_shell_bash.py
│ ├── test_telemetry.py
│ ├── test_tracing.py
│ ├── test_venv.py
│ └── test_watchdog.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/docs/content/docs/macos-vm-cli-playbook/lume/http-api.mdx:
--------------------------------------------------------------------------------
```markdown
---
title: HTTP Server API
description: Lume exposes a local HTTP API server that listens at localhost for programmatic management of VMs.
---
import { Tabs, Tab } from 'fumadocs-ui/components/tabs';
import { Callout } from 'fumadocs-ui/components/callout';
## Default URL
```
http://localhost:7777
```
<Callout type="info">
The HTTP API service runs on port `7777` by default. If you'd like to use a different port, pass
the `--port` option during installation or when running `lume serve`.
</Callout>
## Endpoints
---
### Create VM
Create a new virtual machine.
`POST: /lume/vms`
#### Parameters
| Name | Type | Required | Description |
| -------- | ------- | -------- | ------------------------------------ |
| name | string | Yes | Name of the VM |
| os | string | Yes | Guest OS (`macOS`, `linux`, etc.) |
| cpu | integer | Yes | Number of CPU cores |
| memory | string | Yes | Memory size (e.g. `4GB`) |
| diskSize | string | Yes | Disk size (e.g. `64GB`) |
| display | string | No | Display resolution (e.g. `1024x768`) |
| ipsw | string | No | IPSW version (e.g. `latest`) |
| storage | string | No | Storage type (`ssd`, etc.) |
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "lume_vm",
"os": "macOS",
"cpu": 2,
"memory": "4GB",
"diskSize": "64GB",
"display": "1024x768",
"ipsw": "latest",
"storage": "ssd"
}' \
http://localhost:7777/lume/vms
```
</Tab>
<Tab value="Python">
```python
import requests
payload = {
"name": "lume_vm",
"os": "macOS",
"cpu": 2,
"memory": "4GB",
"diskSize": "64GB",
"display": "1024x768",
"ipsw": "latest",
"storage": "ssd"
}
r = requests.post("http://localhost:7777/lume/vms", json=payload, timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const payload = {
name: 'lume_vm',
os: 'macOS',
cpu: 2,
memory: '4GB',
diskSize: '64GB',
display: '1024x768',
ipsw: 'latest',
storage: 'ssd',
};
const res = await fetch('http://localhost:7777/lume/vms', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
console.log(await res.json());
```
</Tab>
</Tabs>
---
### Run VM
Run a virtual machine instance.
`POST: /lume/vms/:name/run`
#### Parameters
| Name | Type | Required | Description |
| ----------------- | --------------- | -------- | --------------------------------------------------- |
| noDisplay | boolean | No | If true, do not start VNC client |
| sharedDirectories | array of object | No | List of shared directories (`hostPath`, `readOnly`) |
| recoveryMode | boolean | No | Start in recovery mode |
| storage | string | No | Storage type (`ssd`, etc.) |
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
# Basic run
curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
http://localhost:7777/lume/vms/my-vm-name/run
# Run with VNC client started and shared directory
curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
-H "Content-Type: application/json" \
-d '{
"noDisplay": false,
"sharedDirectories": [
{
"hostPath": "~/Projects",
"readOnly": false
}
],
"recoveryMode": false,
"storage": "ssd"
}' \
http://localhost:7777/lume/vms/lume_vm/run
```
</Tab>
<Tab value="Python">
```python
import requests
# Basic run
r = requests.post("http://localhost:7777/lume/vms/my-vm-name/run", timeout=50)
print(r.json())
# With VNC and shared directory
payload = {
"noDisplay": False,
"sharedDirectories": [
{"hostPath": "~/Projects", "readOnly": False}
],
"recoveryMode": False,
"storage": "ssd"
}
r = requests.post("http://localhost:7777/lume/vms/lume_vm/run", json=payload, timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
// Basic run
let res = await fetch('http://localhost:7777/lume/vms/my-vm-name/run', {
method: 'POST',
});
console.log(await res.json());
// With VNC and shared directory
const payload = {
noDisplay: false,
sharedDirectories: [{ hostPath: '~/Projects', readOnly: false }],
recoveryMode: false,
storage: 'ssd',
};
res = await fetch('http://localhost:7777/lume/vms/lume_vm/run', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
console.log(await res.json());
```
</Tab>
</Tabs>
---
### List VMs
List all virtual machines.
`GET: /lume/vms`
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
http://localhost:7777/lume/vms
```
</Tab>
<Tab value="Python">
```python
import requests
r = requests.get("http://localhost:7777/lume/vms", timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const res = await fetch('http://localhost:7777/lume/vms');
console.log(await res.json());
```
</Tab>
</Tabs>
```json
[
{
"name": "my-vm",
"state": "stopped",
"os": "macOS",
"cpu": 2,
"memory": "4GB",
"diskSize": "64GB"
},
{
"name": "my-vm-2",
"state": "stopped",
"os": "linux",
"cpu": 2,
"memory": "4GB",
"diskSize": "64GB"
}
]
```
---
### Get VM Details
Get details for a specific virtual machine.
`GET: /lume/vms/:name`
#### Parameters
| Name | Type | Required | Description |
| ------- | ------ | -------- | -------------------------- |
| storage | string | No | Storage type (`ssd`, etc.) |
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
# Basic get
curl --connect-timeout 6000 \
--max-time 5000 \
http://localhost:7777/lume/vms/lume_vm
# Get with specific storage
curl --connect-timeout 6000 \
--max-time 5000 \
http://localhost:7777/lume/vms/lume_vm?storage=ssd
```
</Tab>
<Tab value="Python">
```python
import requests
# Basic get
details = requests.get("http://localhost:7777/lume/vms/lume_vm", timeout=50)
print(details.json())
# Get with specific storage
details = requests.get("http://localhost:7777/lume/vms/lume_vm", params={"storage": "ssd"}, timeout=50)
print(details.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
// Basic get
let res = await fetch('http://localhost:7777/lume/vms/lume_vm');
console.log(await res.json());
// Get with specific storage
res = await fetch('http://localhost:7777/lume/vms/lume_vm?storage=ssd');
console.log(await res.json());
```
</Tab>
</Tabs>
```json
{
"name": "lume_vm",
"state": "stopped",
"os": "macOS",
"cpu": 2,
"memory": "4GB",
"diskSize": "64GB",
"display": "1024x768",
"ipAddress": "192.168.65.2",
"vncPort": 5900,
"sharedDirectories": [
{
"hostPath": "~/Projects",
"readOnly": false,
"tag": "com.apple.virtio-fs.automount"
}
]
}
```
---
### Update VM Configuration
Update the configuration of a virtual machine.
`PATCH: /lume/vms/:name`
#### Parameters
| Name | Type | Required | Description |
| -------- | ------- | -------- | ------------------------------------- |
| cpu | integer | No | Number of CPU cores |
| memory | string | No | Memory size (e.g. `8GB`) |
| diskSize | string | No | Disk size (e.g. `100GB`) |
| display | string | No | Display resolution (e.g. `1920x1080`) |
| storage | string | No | Storage type (`ssd`, etc.) |
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
-X PATCH \
-H "Content-Type: application/json" \
-d '{
"cpu": 4,
"memory": "8GB",
"diskSize": "100GB",
"display": "1920x1080",
"storage": "ssd"
}' \
http://localhost:7777/lume/vms/lume_vm
```
</Tab>
<Tab value="Python">
```python
import requests
payload = {
"cpu": 4,
"memory": "8GB",
"diskSize": "100GB",
"display": "1920x1080",
"storage": "ssd"
}
r = requests.patch("http://localhost:7777/lume/vms/lume_vm", json=payload, timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const payload = {
cpu: 4,
memory: '8GB',
diskSize: '100GB',
display: '1920x1080',
storage: 'ssd',
};
const res = await fetch('http://localhost:7777/lume/vms/lume_vm', {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
console.log(await res.json());
```
</Tab>
</Tabs>
---
### Stop VM
Stop a running virtual machine.
`POST: /lume/vms/:name/stop`
#### Parameters
| Name | Type | Required | Description |
| ------- | ------ | -------- | -------------------------- |
| storage | string | No | Storage type (`ssd`, etc.) |
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
# Basic stop
curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
http://localhost:7777/lume/vms/lume_vm/stop
# Stop with storage location specified
curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
http://localhost:7777/lume/vms/lume_vm/stop?storage=ssd
```
</Tab>
<Tab value="Python">
```python
import requests
# Basic stop
r = requests.post("http://localhost:7777/lume/vms/lume_vm/stop", timeout=50)
print(r.json())
# Stop with storage location specified
r = requests.post("http://localhost:7777/lume/vms/lume_vm/stop", params={"storage": "ssd"}, timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
// Basic stop
let res = await fetch('http://localhost:7777/lume/vms/lume_vm/stop', {
method: 'POST',
});
console.log(await res.json());
// Stop with storage location specified
res = await fetch('http://localhost:7777/lume/vms/lume_vm/stop?storage=ssd', {
method: 'POST',
});
console.log(await res.json());
```
</Tab>
</Tabs>
---
### Delete VM
Delete a virtual machine instance.
`DELETE: /lume/vms/:name`
#### Parameters
| Name | Type | Required | Description |
| ------- | ------ | -------- | -------------------------- |
| storage | string | No | Storage type (`ssd`, etc.) |
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
# Basic delete
curl --connect-timeout 6000 \
--max-time 5000 \
-X DELETE \
http://localhost:7777/lume/vms/lume_vm
# Delete with specific storage
curl --connect-timeout 6000 \
--max-time 5000 \
-X DELETE \
http://localhost:7777/lume/vms/lume_vm?storage=ssd
```
</Tab>
<Tab value="Python">
```python
import requests
# Basic delete
r = requests.delete("http://localhost:7777/lume/vms/lume_vm", timeout=50)
print(r.status_code)
# Delete with specific storage
r = requests.delete("http://localhost:7777/lume/vms/lume_vm", params={"storage": "ssd"}, timeout=50)
print(r.status_code)
```
</Tab>
<Tab value="TypeScript">
```typescript
// Basic delete
let res = await fetch('http://localhost:7777/lume/vms/lume_vm', {
method: 'DELETE',
});
console.log(res.status);
// Delete with specific storage
res = await fetch('http://localhost:7777/lume/vms/lume_vm?storage=ssd', {
method: 'DELETE',
});
console.log(res.status);
```
</Tab>
</Tabs>
---
### Clone VM
Clone an existing virtual machine.
`POST: /lume/vms/clone`
#### Parameters
| Name | Type | Required | Description |
| -------------- | ------ | -------- | ----------------------------------- |
| name | string | Yes | Source VM name |
| newName | string | Yes | New VM name |
| sourceLocation | string | No | Source storage location (`default`) |
| destLocation | string | No | Destination storage location |
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "source-vm",
"newName": "cloned-vm",
"sourceLocation": "default",
"destLocation": "ssd"
}' \
http://localhost:7777/lume/vms/clone
```
</Tab>
<Tab value="Python">
```python
import requests
payload = {
"name": "source-vm",
"newName": "cloned-vm",
"sourceLocation": "default",
"destLocation": "ssd"
}
r = requests.post("http://localhost:7777/lume/vms/clone", json=payload, timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const payload = {
name: 'source-vm',
newName: 'cloned-vm',
sourceLocation: 'default',
destLocation: 'ssd',
};
const res = await fetch('http://localhost:7777/lume/vms/clone', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
console.log(await res.json());
```
</Tab>
</Tabs>
---
### Pull VM Image
Pull a VM image from a registry.
`POST: /lume/pull`
#### Parameters
| Name | Type | Required | Description |
| ------------ | ------ | -------- | ------------------------------------- |
| image | string | Yes | Image name (e.g. `macos-sequoia-...`) |
| name | string | No | VM name for the pulled image |
| registry | string | No | Registry host (e.g. `ghcr.io`) |
| organization | string | No | Organization name |
| storage | string | No | Storage type (`ssd`, etc.) |
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
-H "Content-Type: application/json" \
-d '{
"image": "macos-sequoia-vanilla:latest",
"name": "my-vm-name",
"registry": "ghcr.io",
"organization": "trycua",
"storage": "ssd"
}' \
http://localhost:7777/lume/pull
```
</Tab>
<Tab value="Python">
```python
import requests
payload = {
"image": "macos-sequoia-vanilla:latest",
"name": "my-vm-name",
"registry": "ghcr.io",
"organization": "trycua",
"storage": "ssd"
}
r = requests.post("http://localhost:7777/lume/pull", json=payload, timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const payload = {
image: 'macos-sequoia-vanilla:latest',
name: 'my-vm-name',
registry: 'ghcr.io',
organization: 'trycua',
storage: 'ssd',
};
const res = await fetch('http://localhost:7777/lume/pull', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
console.log(await res.json());
```
</Tab>
</Tabs>
---
### Push VM Image
Push a VM to a registry as an image (asynchronous operation).
`POST: /lume/vms/push`
#### Parameters
| Name | Type | Required | Description |
| ------------ | ----------- | -------- | ------------------------------------ |
| name | string | Yes | Local VM name to push |
| imageName | string | Yes | Image name in registry |
| tags | array | Yes | Image tags (e.g. `["latest", "v1"]`) |
| organization | string | Yes | Organization name |
| registry | string | No | Registry host (e.g. `ghcr.io`) |
| chunkSizeMb | integer | No | Chunk size in MB for upload |
| storage | string/null | No | Storage type (`ssd`, etc.) |
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "my-local-vm",
"imageName": "my-image",
"tags": ["latest", "v1"],
"organization": "my-org",
"registry": "ghcr.io",
"chunkSizeMb": 512,
"storage": null
}' \
http://localhost:7777/lume/vms/push
```
</Tab>
<Tab value="Python">
```python
import requests
payload = {
"name": "my-local-vm",
"imageName": "my-image",
"tags": ["latest", "v1"],
"organization": "my-org",
"registry": "ghcr.io",
"chunkSizeMb": 512,
"storage": None
}
r = requests.post("http://localhost:7777/lume/vms/push", json=payload, timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const payload = {
name: 'my-local-vm',
imageName: 'my-image',
tags: ['latest', 'v1'],
organization: 'my-org',
registry: 'ghcr.io',
chunkSizeMb: 512,
storage: null,
};
const res = await fetch('http://localhost:7777/lume/vms/push', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
console.log(await res.json());
```
</Tab>
</Tabs>
**Response (202 Accepted):**
```json
{
"message": "Push initiated in background",
"name": "my-local-vm",
"imageName": "my-image",
"tags": ["latest", "v1"]
}
```
---
### List Images
List available VM images.
`GET: /lume/images`
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
http://localhost:7777/lume/images
```
</Tab>
<Tab value="Python">
```python
import requests
r = requests.get("http://localhost:7777/lume/images", timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const res = await fetch('http://localhost:7777/lume/images');
console.log(await res.json());
```
</Tab>
</Tabs>
```json
{
"local": ["macos-sequoia-xcode:latest", "macos-sequoia-vanilla:latest"]
}
```
---
### Prune Images
Remove unused VM images to free up disk space.
`POST: /lume/prune`
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
http://localhost:7777/lume/prune
```
</Tab>
<Tab value="Python">
```python
import requests
r = requests.post("http://localhost:7777/lume/prune", timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const res = await fetch('http://localhost:7777/lume/prune', {
method: 'POST',
});
console.log(await res.json());
```
</Tab>
</Tabs>
---
### Get Latest IPSW URL
Get the URL for the latest macOS IPSW file.
`GET: /lume/ipsw`
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
http://localhost:7777/lume/ipsw
```
</Tab>
<Tab value="Python">
```python
import requests
r = requests.get("http://localhost:7777/lume/ipsw", timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const res = await fetch('http://localhost:7777/lume/ipsw');
console.log(await res.json());
```
</Tab>
</Tabs>
---
## Configuration Management
### Get Configuration
Get current Lume configuration settings.
`GET: /lume/config`
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
http://localhost:7777/lume/config
```
</Tab>
<Tab value="Python">
```python
import requests
r = requests.get("http://localhost:7777/lume/config", timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const res = await fetch('http://localhost:7777/lume/config');
console.log(await res.json());
```
</Tab>
</Tabs>
```json
{
"homeDirectory": "~/.lume",
"cacheDirectory": "~/.lume/cache",
"cachingEnabled": true
}
```
### Update Configuration
Update Lume configuration settings.
`POST: /lume/config`
#### Parameters
| Name | Type | Required | Description |
| -------------- | ------- | -------- | ------------------------- |
| homeDirectory | string | No | Lume home directory path |
| cacheDirectory | string | No | Cache directory path |
| cachingEnabled | boolean | No | Enable or disable caching |
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
-H "Content-Type: application/json" \
-d '{
"homeDirectory": "~/custom/lume",
"cacheDirectory": "~/custom/lume/cache",
"cachingEnabled": true
}' \
http://localhost:7777/lume/config
```
</Tab>
<Tab value="Python">
```python
import requests
payload = {
"homeDirectory": "~/custom/lume",
"cacheDirectory": "~/custom/lume/cache",
"cachingEnabled": True
}
r = requests.post("http://localhost:7777/lume/config", json=payload, timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const payload = {
homeDirectory: '~/custom/lume',
cacheDirectory: '~/custom/lume/cache',
cachingEnabled: true,
};
const res = await fetch('http://localhost:7777/lume/config', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
console.log(await res.json());
```
</Tab>
</Tabs>
---
## Storage Location Management
### Get VM Storage Locations
List all configured VM storage locations.
`GET: /lume/config/locations`
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
http://localhost:7777/lume/config/locations
```
</Tab>
<Tab value="Python">
```python
import requests
r = requests.get("http://localhost:7777/lume/config/locations", timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const res = await fetch('http://localhost:7777/lume/config/locations');
console.log(await res.json());
```
</Tab>
</Tabs>
```json
[
{
"name": "default",
"path": "~/.lume/vms",
"isDefault": true
},
{
"name": "ssd",
"path": "/Volumes/SSD/lume/vms",
"isDefault": false
}
]
```
### Add VM Storage Location
Add a new VM storage location.
`POST: /lume/config/locations`
#### Parameters
| Name | Type | Required | Description |
| ---- | ------ | -------- | ---------------------------- |
| name | string | Yes | Storage location name |
| path | string | Yes | File system path for storage |
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "ssd",
"path": "/Volumes/SSD/lume/vms"
}' \
http://localhost:7777/lume/config/locations
```
</Tab>
<Tab value="Python">
```python
import requests
payload = {
"name": "ssd",
"path": "/Volumes/SSD/lume/vms"
}
r = requests.post("http://localhost:7777/lume/config/locations", json=payload, timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const payload = {
name: 'ssd',
path: '/Volumes/SSD/lume/vms',
};
const res = await fetch('http://localhost:7777/lume/config/locations', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
console.log(await res.json());
```
</Tab>
</Tabs>
### Remove VM Storage Location
Remove a VM storage location.
`DELETE: /lume/config/locations/:name`
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
-X DELETE \
http://localhost:7777/lume/config/locations/ssd
```
</Tab>
<Tab value="Python">
```python
import requests
r = requests.delete("http://localhost:7777/lume/config/locations/ssd", timeout=50)
print(r.status_code)
```
</Tab>
<Tab value="TypeScript">
```typescript
const res = await fetch('http://localhost:7777/lume/config/locations/ssd', {
method: 'DELETE',
});
console.log(res.status);
```
</Tab>
</Tabs>
### Set Default VM Storage Location
Set a storage location as the default.
`POST: /lume/config/locations/default/:name`
#### Example Request
<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
<Tab value="Curl">
```bash
curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
http://localhost:7777/lume/config/locations/default/ssd
```
</Tab>
<Tab value="Python">
```python
import requests
r = requests.post("http://localhost:7777/lume/config/locations/default/ssd", timeout=50)
print(r.json())
```
</Tab>
<Tab value="TypeScript">
```typescript
const res = await fetch('http://localhost:7777/lume/config/locations/default/ssd', {
method: 'POST',
});
console.log(await res.json());
```
</Tab>
</Tabs>
```
--------------------------------------------------------------------------------
/libs/lume/src/Server/Handlers.swift:
--------------------------------------------------------------------------------
```swift
import ArgumentParser
import Foundation
import Virtualization
@MainActor
extension Server {
// MARK: - VM Management Handlers
func handleListVMs(storage: String? = nil) async throws -> HTTPResponse {
do {
let vmController = LumeController()
let vms = try vmController.list(storage: storage)
return try .json(vms)
} catch {
print(
"ERROR: Failed to list VMs: \(error.localizedDescription), storage=\(String(describing: storage))"
)
return .badRequest(message: error.localizedDescription)
}
}
func handleGetVM(name: String, storage: String? = nil) async throws -> HTTPResponse {
print("Getting VM details: name=\(name), storage=\(String(describing: storage))")
do {
let vmController = LumeController()
print("Created VM controller, attempting to get VM")
let vm = try vmController.get(name: name, storage: storage)
print("Successfully retrieved VM")
// Check for nil values that might cause crashes
if vm.vmDirContext.config.macAddress == nil {
print("ERROR: VM has nil macAddress")
return .badRequest(message: "VM configuration is invalid (nil macAddress)")
}
print("MacAddress check passed")
// Log that we're about to access details
print("Preparing VM details response")
// Print the full details object for debugging
let details = vm.details
print("VM DETAILS: \(details)")
print(" name: \(details.name)")
print(" os: \(details.os)")
print(" cpuCount: \(details.cpuCount)")
print(" memorySize: \(details.memorySize)")
print(" diskSize: \(details.diskSize)")
print(" display: \(details.display)")
print(" status: \(details.status)")
print(" vncUrl: \(String(describing: details.vncUrl))")
print(" ipAddress: \(String(describing: details.ipAddress))")
print(" locationName: \(details.locationName)")
// Serialize the VM details
print("About to serialize VM details")
let response = try HTTPResponse.json(vm.details)
print("Successfully serialized VM details")
return response
} catch {
// This will catch errors from both vmController.get and the json serialization
print("ERROR: Failed to get VM details: \(error.localizedDescription)")
return .badRequest(message: error.localizedDescription)
}
}
func handleCreateVM(_ body: Data?) async throws -> HTTPResponse {
guard let body = body,
let request = try? JSONDecoder().decode(CreateVMRequest.self, from: body)
else {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
)
}
do {
let sizes = try request.parse()
let vmController = LumeController()
try await vmController.create(
name: request.name,
os: request.os,
diskSize: sizes.diskSize,
cpuCount: request.cpu,
memorySize: sizes.memory,
display: request.display,
ipsw: request.ipsw,
storage: request.storage
)
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode([
"message": "VM created successfully", "name": request.name,
])
)
} catch {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
func handleDeleteVM(name: String, storage: String? = nil) async throws -> HTTPResponse {
do {
let vmController = LumeController()
try await vmController.delete(name: name, storage: storage)
return HTTPResponse(
statusCode: .ok, headers: ["Content-Type": "application/json"], body: Data())
} catch {
return HTTPResponse(
statusCode: .badRequest, headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription)))
}
}
func handleCloneVM(_ body: Data?) async throws -> HTTPResponse {
guard let body = body,
let request = try? JSONDecoder().decode(CloneRequest.self, from: body)
else {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
)
}
do {
let vmController = LumeController()
try vmController.clone(
name: request.name,
newName: request.newName,
sourceLocation: request.sourceLocation,
destLocation: request.destLocation
)
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode([
"message": "VM cloned successfully",
"source": request.name,
"destination": request.newName,
])
)
} catch {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
// MARK: - VM Operation Handlers
func handleSetVM(name: String, body: Data?) async throws -> HTTPResponse {
guard let body = body,
let request = try? JSONDecoder().decode(SetVMRequest.self, from: body)
else {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
)
}
do {
let vmController = LumeController()
let sizes = try request.parse()
try vmController.updateSettings(
name: name,
cpu: request.cpu,
memory: sizes.memory,
diskSize: sizes.diskSize,
display: sizes.display?.string,
storage: request.storage
)
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(["message": "VM settings updated successfully"])
)
} catch {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
func handleStopVM(name: String, storage: String? = nil) async throws -> HTTPResponse {
Logger.info(
"Stopping VM", metadata: ["name": name, "storage": String(describing: storage)])
do {
Logger.info("Creating VM controller", metadata: ["name": name])
let vmController = LumeController()
Logger.info("Calling stopVM on controller", metadata: ["name": name])
try await vmController.stopVM(name: name, storage: storage)
Logger.info(
"VM stopped, waiting 5 seconds for locks to clear", metadata: ["name": name])
// Add a delay to ensure locks are fully released before returning
for i in 1...5 {
try? await Task.sleep(nanoseconds: 1_000_000_000)
Logger.info("Lock clearing delay", metadata: ["name": name, "seconds": "\(i)/5"])
}
// Verify the VM is really in a stopped state
Logger.info("Verifying VM is stopped", metadata: ["name": name])
let vm = try? vmController.get(name: name, storage: storage)
if let vm = vm, vm.details.status == "running" {
Logger.info(
"VM still reports as running despite stop operation",
metadata: ["name": name, "severity": "warning"])
} else {
Logger.info(
"Verification complete: VM is in stopped state", metadata: ["name": name])
}
Logger.info("Returning successful response", metadata: ["name": name])
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(["message": "VM stopped successfully"])
)
} catch {
Logger.error(
"Failed to stop VM",
metadata: [
"name": name,
"error": error.localizedDescription,
"storage": String(describing: storage),
])
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
func handleRunVM(name: String, body: Data?) async throws -> HTTPResponse {
Logger.info("Running VM", metadata: ["name": name])
// Log the raw body data if available
if let body = body, let bodyString = String(data: body, encoding: .utf8) {
Logger.info("Run VM raw request body", metadata: ["name": name, "body": bodyString])
} else {
Logger.info("No request body or could not decode as string", metadata: ["name": name])
}
do {
Logger.info("Creating VM controller and parsing request", metadata: ["name": name])
let request =
body.flatMap { try? JSONDecoder().decode(RunVMRequest.self, from: $0) }
?? RunVMRequest(
noDisplay: nil, sharedDirectories: nil, recoveryMode: nil, storage: nil)
Logger.info(
"Parsed request",
metadata: [
"name": name,
"noDisplay": String(describing: request.noDisplay),
"sharedDirectories": "\(request.sharedDirectories?.count ?? 0)",
"storage": String(describing: request.storage),
])
Logger.info("Parsing shared directories", metadata: ["name": name])
let dirs = try request.parse()
Logger.info(
"Successfully parsed shared directories",
metadata: ["name": name, "count": "\(dirs.count)"])
// Start VM in background
Logger.info("Starting VM in background", metadata: ["name": name])
startVM(
name: name,
noDisplay: request.noDisplay ?? false,
sharedDirectories: dirs,
recoveryMode: request.recoveryMode ?? false,
storage: request.storage
)
Logger.info("VM start initiated in background", metadata: ["name": name])
// Return response immediately
return HTTPResponse(
statusCode: .accepted,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode([
"message": "VM start initiated",
"name": name,
"status": "pending",
])
)
} catch {
Logger.error(
"Failed to run VM",
metadata: [
"name": name,
"error": error.localizedDescription,
])
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
// MARK: - Image Management Handlers
func handleIPSW() async throws -> HTTPResponse {
do {
let vmController = LumeController()
let url = try await vmController.getLatestIPSWURL()
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(["url": url.absoluteString])
)
} catch {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
func handlePull(_ body: Data?) async throws -> HTTPResponse {
guard let body = body,
let request = try? JSONDecoder().decode(PullRequest.self, from: body)
else {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
)
}
do {
let vmController = LumeController()
try await vmController.pullImage(
image: request.image,
name: request.name,
registry: request.registry,
organization: request.organization,
storage: request.storage
)
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode([
"message": "Image pulled successfully",
"image": request.image,
"name": request.name ?? "default",
])
)
} catch {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
func handlePruneImages() async throws -> HTTPResponse {
do {
let vmController = LumeController()
try await vmController.pruneImages()
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(["message": "Successfully removed cached images"])
)
} catch {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
func handlePush(_ body: Data?) async throws -> HTTPResponse {
guard let body = body,
let request = try? JSONDecoder().decode(PushRequest.self, from: body)
else {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
)
}
// Trigger push asynchronously, return Accepted immediately
Task.detached { @MainActor @Sendable in
do {
let vmController = LumeController()
try await vmController.pushImage(
name: request.name,
imageName: request.imageName,
tags: request.tags,
registry: request.registry,
organization: request.organization,
storage: request.storage,
chunkSizeMb: request.chunkSizeMb,
verbose: false, // Verbose typically handled by server logs
dryRun: false, // Default API behavior is likely non-dry-run
reassemble: false // Default API behavior is likely non-reassemble
)
print(
"Background push completed successfully for image: \(request.imageName):\(request.tags.joined(separator: ","))"
)
} catch {
print(
"Background push failed for image: \(request.imageName):\(request.tags.joined(separator: ",")) - Error: \(error.localizedDescription)"
)
}
}
return HTTPResponse(
statusCode: .accepted,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode([
"message": AnyEncodable("Push initiated in background"),
"name": AnyEncodable(request.name),
"imageName": AnyEncodable(request.imageName),
"tags": AnyEncodable(request.tags),
])
)
}
func handleGetImages(_ request: HTTPRequest) async throws -> HTTPResponse {
let pathAndQuery = request.path.split(separator: "?", maxSplits: 1)
let queryParams =
pathAndQuery.count > 1
? pathAndQuery[1]
.split(separator: "&")
.reduce(into: [String: String]()) { dict, param in
let parts = param.split(separator: "=", maxSplits: 1)
if parts.count == 2 {
dict[String(parts[0])] = String(parts[1])
}
} : [:]
let organization = queryParams["organization"] ?? "trycua"
do {
let vmController = LumeController()
let imageList = try await vmController.getImages(organization: organization)
// Create a response format that matches the CLI output
let response = imageList.local.map {
[
"repository": $0.repository,
"imageId": $0.imageId,
]
}
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(response)
)
} catch {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
// MARK: - Config Management Handlers
func handleGetConfig() async throws -> HTTPResponse {
do {
let vmController = LumeController()
let settings = vmController.getSettings()
return try .json(settings)
} catch {
return .badRequest(message: error.localizedDescription)
}
}
struct ConfigRequest: Codable {
let homeDirectory: String?
let cacheDirectory: String?
let cachingEnabled: Bool?
}
func handleUpdateConfig(_ body: Data?) async throws -> HTTPResponse {
guard let body = body,
let request = try? JSONDecoder().decode(ConfigRequest.self, from: body)
else {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
)
}
do {
let vmController = LumeController()
if let homeDir = request.homeDirectory {
try vmController.setHomeDirectory(homeDir)
}
if let cacheDir = request.cacheDirectory {
try vmController.setCacheDirectory(path: cacheDir)
}
if let cachingEnabled = request.cachingEnabled {
try vmController.setCachingEnabled(cachingEnabled)
}
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(["message": "Configuration updated successfully"])
)
} catch {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
func handleGetLocations() async throws -> HTTPResponse {
do {
let vmController = LumeController()
let locations = vmController.getLocations()
return try .json(locations)
} catch {
return .badRequest(message: error.localizedDescription)
}
}
struct LocationRequest: Codable {
let name: String
let path: String
}
func handleAddLocation(_ body: Data?) async throws -> HTTPResponse {
guard let body = body,
let request = try? JSONDecoder().decode(LocationRequest.self, from: body)
else {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
)
}
do {
let vmController = LumeController()
try vmController.addLocation(name: request.name, path: request.path)
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode([
"message": "Location added successfully",
"name": request.name,
"path": request.path,
])
)
} catch {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
func handleRemoveLocation(_ name: String) async throws -> HTTPResponse {
do {
let vmController = LumeController()
try vmController.removeLocation(name: name)
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(["message": "Location removed successfully"])
)
} catch {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
func handleSetDefaultLocation(_ name: String) async throws -> HTTPResponse {
do {
let vmController = LumeController()
try vmController.setDefaultLocation(name: name)
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(["message": "Default location set successfully"])
)
} catch {
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
)
}
}
// MARK: - Log Handlers
func handleGetLogs(type: String?, lines: Int?) async throws -> HTTPResponse {
do {
let logType = type?.lowercased() ?? "all"
let infoPath = "/tmp/lume_daemon.log"
let errorPath = "/tmp/lume_daemon.error.log"
let fileManager = FileManager.default
var response: [String: String] = [:]
// Function to read log files
func readLogFile(path: String) -> String? {
guard fileManager.fileExists(atPath: path) else {
return nil
}
do {
let content = try String(contentsOfFile: path, encoding: .utf8)
// If lines parameter is provided, return only the specified number of lines from the end
if let lineCount = lines {
let allLines = content.components(separatedBy: .newlines)
let startIndex = max(0, allLines.count - lineCount)
let lastLines = Array(allLines[startIndex...])
return lastLines.joined(separator: "\n")
}
return content
} catch {
return "Error reading log file: \(error.localizedDescription)"
}
}
// Get logs based on requested type
if logType == "info" || logType == "all" {
response["info"] = readLogFile(path: infoPath) ?? "Info log file not found"
}
if logType == "error" || logType == "all" {
response["error"] = readLogFile(path: errorPath) ?? "Error log file not found"
}
return try .json(response)
} catch {
return .badRequest(message: error.localizedDescription)
}
}
// MARK: - Private Helper Methods
nonisolated private func startVM(
name: String,
noDisplay: Bool,
sharedDirectories: [SharedDirectory] = [],
recoveryMode: Bool = false,
storage: String? = nil
) {
Logger.info(
"Starting VM in detached task",
metadata: [
"name": name,
"noDisplay": "\(noDisplay)",
"recoveryMode": "\(recoveryMode)",
"storage": String(describing: storage),
])
Task.detached { @MainActor @Sendable in
Logger.info("Background task started for VM", metadata: ["name": name])
do {
Logger.info("Creating VM controller in background task", metadata: ["name": name])
let vmController = LumeController()
Logger.info(
"Calling runVM on controller",
metadata: [
"name": name,
"noDisplay": "\(noDisplay)",
])
try await vmController.runVM(
name: name,
noDisplay: noDisplay,
sharedDirectories: sharedDirectories,
recoveryMode: recoveryMode,
storage: storage
)
Logger.info("VM started successfully in background task", metadata: ["name": name])
} catch {
Logger.error(
"Failed to start VM in background task",
metadata: [
"name": name,
"error": error.localizedDescription,
])
}
}
Logger.info("Background task dispatched for VM", metadata: ["name": name])
}
}
```
--------------------------------------------------------------------------------
/blog/build-your-own-operator-on-macos-2.md:
--------------------------------------------------------------------------------
```markdown
# Build Your Own Operator on macOS - Part 2
_Published on April 27, 2025 by Francesco Bonacci_
In our [previous post](build-your-own-operator-on-macos-1.md), we built a basic Computer-Use Operator from scratch using OpenAI's `computer-use-preview` model and our [cua-computer](https://pypi.org/project/cua-computer) package. While educational, implementing the control loop manually can be tedious and error-prone.
In this follow-up, we'll explore our [cua-agent](https://pypi.org/project/cua-agent) framework - a high-level abstraction that handles all the complexity of VM interaction, screenshot processing, model communication, and action execution automatically.
<div align="center">
<video src="https://github.com/user-attachments/assets/0be7e3e3-eead-4646-a4a3-5bb392501ee7" width="600" controls></video>
</div>
## What You'll Learn
By the end of this tutorial, you'll be able to:
- Set up the `cua-agent` framework with various agent loop types and model providers
- Understand the different agent loop types and their capabilities
- Work with local models for cost-effective workflows
- Use a simple UI for your operator
**Prerequisites:**
- Completed setup from Part 1 ([lume CLI installed](https://github.com/trycua/cua?tab=readme-ov-file#option-2-full-computer-use-agent-capabilities), macOS CUA image already pulled)
- Python 3.10+. We recommend using Conda (or Anaconda) to create an ad hoc Python environment.
- API keys for OpenAI and/or Anthropic (optional for local models)
**Estimated Time:** 30-45 minutes
## Introduction to cua-agent
The `cua-agent` framework is designed to simplify building Computer-Use Agents. It abstracts away the complex interaction loop we built manually in Part 1, letting you focus on defining tasks rather than implementing the machinery. Among other features, it includes:
- **Multiple Provider Support**: Works with OpenAI, Anthropic, UI-Tars, local models (via Ollama), or any OpenAI-compatible model (e.g. LM Studio, vLLM, LocalAI, OpenRouter, Groq, etc.)
- **Flexible Loop Types**: Different implementations optimized for various models (e.g. OpenAI vs. Anthropic)
- **Structured Responses**: Clean, consistent output following the OpenAI Agent SDK specification we touched on in Part 1
- **Local Model Support**: Run cost-effectively with locally hosted models (Ollama, LM Studio, vLLM, LocalAI, etc.)
- **Gradio UI**: Optional visual interface for interacting with your agent
## Installation
Let's start by installing the `cua-agent` package. You can install it with all features or selectively install only what you need.
From your python 3.10+ environment, run:
```bash
# For all features
pip install "cua-agent[all]"
# Or selectively install only what you need
pip install "cua-agent[openai]" # OpenAI support
pip install "cua-agent[anthropic]" # Anthropic support
pip install "cua-agent[uitars]" # UI-Tars support
pip install "cua-agent[omni]" # OmniParser + VLMs support
pip install "cua-agent[ui]" # Gradio UI
```
## Setting Up Your Environment
Before running any code examples, let's set up a proper environment:
1. **Create a new directory** for your project:
```bash
mkdir cua-agent-tutorial
cd cua-agent-tutorial
```
2. **Set up a Python environment** using one of these methods:
**Option A: Using conda command line**
```bash
# Using conda
conda create -n cua-agent python=3.10
conda activate cua-agent
```
**Option B: Using Anaconda Navigator UI**
- Open Anaconda Navigator
- Click on "Environments" in the left sidebar
- Click the "Create" button at the bottom
- Name your environment "cua-agent"
- Select Python 3.10
- Click "Create"
- Once created, select the environment and click "Open Terminal" to activate it
**Option C: Using venv**
```bash
python -m venv cua-env
source cua-env/bin/activate # On macOS/Linux
```
3. **Install the cua-agent package**:
```bash
pip install "cua-agent[all]"
```
4. **Set up your API keys as environment variables**:
```bash
# For OpenAI models
export OPENAI_API_KEY=your_openai_key_here
# For Anthropic models (if needed)
export ANTHROPIC_API_KEY=your_anthropic_key_here
```
5. **Create a Python file or notebook**:
**Option A: Create a Python script**
```bash
# For a Python script
touch cua_agent_example.py
```
**Option B: Use VS Code notebooks**
- Open VS Code
- Install the Python extension if you haven't already
- Create a new file with a `.ipynb` extension (e.g., `cua_agent_tutorial.ipynb`)
- Select your Python environment when prompted
- You can now create and run code cells in the notebook interface
Now you're ready to run the code examples!
## Understanding Agent Loops
If you recall from Part 1, we had to implement a custom interaction loop to interact with the compute-use-preview model.
In the `cua-agent` framework, an **Agent Loop** is the core abstraction that implements the continuous interaction cycle between an AI model and the computer environment. It manages the flow of:
1. Capturing screenshots of the computer's state
2. Processing these screenshots (with or without UI element detection)
3. Sending this visual context to an AI model along with the task instructions
4. Receiving the model's decisions on what actions to take
5. Safely executing these actions in the environment
6. Repeating this cycle until the task is complete
The loop handles all the complex error handling, retries, context management, and model-specific interaction patterns so you don't have to implement them yourself.
While the core concept remains the same across all agent loops, different AI models require specialized handling for optimal performance. To address this, the framework provides 4 different agent loop implementations, each designed for different computer-use modalities.
| Agent Loop | Supported Models | Description | Set-Of-Marks |
|:-----------|:-----------------|:------------|:-------------|
| `AgentLoop.OPENAI` | • `computer_use_preview` | Use OpenAI Operator CUA Preview model | Not Required |
| `AgentLoop.ANTHROPIC` | • `claude-sonnet-4-5-20250929`<br>• `claude-3-7-sonnet-20250219` | Use Anthropic Computer-Use Beta Tools | Not Required |
| `AgentLoop.UITARS` | • `ByteDance-Seed/UI-TARS-1.5-7B` | Uses ByteDance's UI-TARS 1.5 model | Not Required |
| `AgentLoop.OMNI` | • `claude-sonnet-4-5-20250929`<br>• `claude-3-7-sonnet-20250219`<br>• `gpt-4.5-preview`<br>• `gpt-4o`<br>• `gpt-4`<br>• `phi4`<br>• `phi4-mini`<br>• `gemma3`<br>• `...`<br>• `Any Ollama or OpenAI-compatible model` | Use OmniParser for element pixel-detection (SoM) and any VLMs for UI Grounding and Reasoning | OmniParser |
Each loop handles the same basic pattern we implemented manually in Part 1:
1. Take a screenshot of the VM
2. Send the screenshot and task to the AI model
3. Receive an action to perform
4. Execute the action
5. Repeat until the task is complete
### Why Different Agent Loops?
The `cua-agent` framework provides multiple agent loop implementations to abstract away the complexity of interacting with different CUA models. Each provider has unique API structures, response formats, conventions and capabilities that require specialized handling:
- **OpenAI Loop**: Uses the Responses API with a specific `computer_call_output` format for sending screenshots after actions. Requires handling safety checks and maintains a chain of requests using `previous_response_id`.
- **Anthropic Loop**: Implements a [multi-agent loop pattern](https://docs.anthropic.com/en/docs/agents-and-tools/computer-use#understanding-the-multi-agent-loop) with a sophisticated message handling system, supporting various API providers (Anthropic, Bedrock, Vertex) with token management and prompt caching capabilities.
- **UI-TARS Loop**: Requires custom message formatting and specialized parsing to extract actions from text responses using a "box token" system for UI element identification.
- **OMNI Loop**: Uses [Microsoft's OmniParser](https://github.com/microsoft/OmniParser) to create a [Set-of-Marks (SoM)](https://arxiv.org/abs/2310.11441) representation of the UI, enabling any vision-language model to interact with interfaces without specialized UI training.
- **AgentLoop.OMNI**: The most flexible option that works with virtually any vision-language model including local and open-source ones. Perfect for cost-effective development or when you need to use models without native computer-use capabilities.
These abstractions allow you to easily switch between providers without changing your application code. All loop implementations are available in the [cua-agent GitHub repository](https://github.com/trycua/cua/tree/main/libs/python/agent).
Choosing the right agent loop depends not only on your API access and technical requirements but also on the specific tasks you need to accomplish. To make an informed decision, it's helpful to understand how these underlying models perform across different computing environments – from desktop operating systems to web browsers and mobile interfaces.
## Computer-Use Model Capabilities
The performance of different Computer-Use models varies significantly across tasks. These benchmark evaluations measure an agent's ability to follow instructions and complete real-world tasks in different computing environments.
| Benchmark type | Benchmark | UI-TARS-1.5 | OpenAI CUA | Claude 3.7 | Previous SOTA | Human |
| ---------------- | ------------------------------------------------------------------ | ----------- | ---------- | ---------- | --------------- | ----- |
| **Computer Use** | [OSworld](https://arxiv.org/abs/2404.07972) (100 steps) | **42.5** | 36.4 | 28 | 38.1 (200 step) | 72.4 |
| | [Windows Agent Arena](https://arxiv.org/abs/2409.08264) (50 steps) | **42.1** | - | - | 29.8 | - |
| **Browser Use** | [WebVoyager](https://arxiv.org/abs/2401.13919) | 84.8 | **87** | 84.1 | 87 | - |
| | [Online-Mind2web](https://arxiv.org/abs/2504.01382) | **75.8** | 71 | 62.9 | 71 | - |
| **Phone Use** | [Android World](https://arxiv.org/abs/2405.14573) | **64.2** | - | - | 59.5 | - |
### When to Use Each Loop
- **AgentLoop.OPENAI**: Choose when you have OpenAI Tier 3 access and need the most capable computer-use agent for web-based tasks. Uses the same [OpenAI Computer-Use Loop](https://platform.openai.com/docs/guides/tools-computer-use) as Part 1, delivering strong performance on browser-based benchmarks.
- **AgentLoop.ANTHROPIC**: Ideal for users with Anthropic API access who need strong reasoning capabilities with computer-use abilities. Works with `claude-sonnet-4-5-20250929` and `claude-3-7-sonnet-20250219` models following [Anthropic's Computer-Use tools](https://docs.anthropic.com/en/docs/agents-and-tools/computer-use#understanding-the-multi-agent-loop).
- **AgentLoop.UITARS**: Best for scenarios requiring more powerful OS/desktop, and latency-sensitive automation, as UI-TARS-1.5 leads in OS capabilities benchmarks. Requires running the model locally or accessing it through compatible endpoints (e.g. on Hugging Face).
- **AgentLoop.OMNI**: The most flexible option that works with virtually any vision-language model including local and open-source ones. Perfect for cost-effective development or when you need to use models without native computer-use capabilities.
Now that we understand the capabilities and strengths of different models, let's see how easy it is to implement a Computer-Use Agent using the `cua-agent` framework. Let's look at the implementation details.
## Creating Your First Computer-Use Agent
With the `cua-agent` framework, creating a Computer-Use Agent becomes remarkably straightforward. The framework handles all the complexities of model interaction, screenshot processing, and action execution behind the scenes. Let's look at a simple example of how to build your first agent:
**How to run this example:**
1. Create a new file named `simple_task.py` in your text editor or IDE (like VS Code, PyCharm, or Cursor)
2. Copy and paste the following code:
```python
import asyncio
from computer import Computer
from agent import ComputerAgent
async def run_simple_task():
async with Computer() as macos_computer:
# Create agent with OpenAI loop
agent = ComputerAgent(
model="openai/computer-use-preview",
tools=[macos_computer]
)
# Define a simple task
task = "Open Safari and search for 'Python tutorials'"
# Run the task and process responses
async for result in agent.run(task):
print(f"Action: {result.get('text')}")
# Run the example
if __name__ == "__main__":
asyncio.run(run_simple_task())
```
3. Save the file
4. Open a terminal, navigate to your project directory, and run:
```bash
python simple_task.py
```
5. The code will initialize the macOS virtual machine, create an agent, and execute the task of opening Safari and searching for Python tutorials.
You can also run this in a VS Code notebook:
1. Create a new notebook in VS Code (.ipynb file)
2. Copy the code into a cell (without the `if __name__ == "__main__":` part)
3. Run the cell to execute the code
You can find the full code in our [notebook](https://github.com/trycua/cua/blob/main/notebooks/blog/build-your-own-operator-on-macos-2.ipynb).
Compare this to the manual implementation from Part 1 - we've reduced dozens of lines of code to just a few. The cua-agent framework handles all the complex logic internally, letting you focus on the overarching agentic system.
## Working with Multiple Tasks
Another advantage of the cua-agent framework is easily chaining multiple tasks. Instead of managing complex state between tasks, you can simply provide a sequence of instructions to be executed in order:
**How to run this example:**
1. Create a new file named `multi_task.py` with the following code:
```python
import asyncio
from computer import Computer
from agent import ComputerAgent
async def run_multi_task_workflow():
async with Computer() as macos_computer:
agent = ComputerAgent(
model="anthropic/claude-sonnet-4-5-20250929",
tools=[macos_computer]
)
tasks = [
"Open Safari and go to github.com",
"Search for 'trycua/cua'",
"Open the repository page",
"Click on the 'Issues' tab",
"Read the first open issue"
]
for i, task in enumerate(tasks):
print(f"\nTask {i+1}/{len(tasks)}: {task}")
async for result in agent.run(task):
# Print just the action description for brevity
if result.get("text"):
print(f" → {result.get('text')}")
print(f"✅ Task {i+1} completed")
if __name__ == "__main__":
asyncio.run(run_multi_task_workflow())
```
2. Save the file
3. Make sure you have set your Anthropic API key:
```bash
export ANTHROPIC_API_KEY=your_anthropic_key_here
```
4. Run the script:
```bash
python multi_task.py
```
This pattern is particularly useful for creating workflows that navigate through multiple steps of an application or process. The agent maintains visual context between tasks, making it more likely to successfully complete complex sequences of actions.
## Understanding the Response Format
Each action taken by the agent returns a structured response following the OpenAI Agent SDK specification. This standardized format makes it easy to extract detailed information about what the agent is doing and why:
```python
async for result in agent.run(task):
# Basic information
print(f"Response ID: {result.get('id')}")
print(f"Response Text: {result.get('text')}")
# Detailed token usage statistics
usage = result.get('usage')
if usage:
print(f"Input Tokens: {usage.get('input_tokens')}")
print(f"Output Tokens: {usage.get('output_tokens')}")
# Reasoning and actions
for output in result.get('output', []):
if output.get('type') == 'reasoning':
print(f"Reasoning: {output.get('summary', [{}])[0].get('text')}")
elif output.get('type') == 'computer_call':
action = output.get('action', {})
print(f"Action: {action.get('type')} at ({action.get('x')}, {action.get('y')})")
```
This structured format allows you to:
- Log detailed information about agent actions
- Provide real-time feedback to users
- Track token usage for cost monitoring
- Access the reasoning behind decisions for debugging or user explanation
## Using Local Models with OMNI
One of the most powerful features of the framework is the ability to use local models via the OMNI loop. This approach dramatically reduces costs while maintaining acceptable reliability for many agentic workflows:
**How to run this example:**
1. First, you'll need to install Ollama for running local models:
- Visit [ollama.com](https://ollama.com) and download the installer for your OS
- Follow the installation instructions
- Pull the Gemma 3 model:
```bash
ollama pull gemma3:4b-it-q4_K_M
```
2. Create a file named `local_model.py` with this code:
```python
import asyncio
from computer import Computer
from agent import ComputerAgent
async def run_with_local_model():
async with Computer() as macos_computer:
agent = ComputerAgent(
model="omniparser+ollama_chat/gemma3",
tools=[macos_computer]
)
task = "Open the Calculator app and perform a simple calculation"
async for result in agent.run(task):
print(f"Action: {result.get('text')}")
if __name__ == "__main__":
asyncio.run(run_with_local_model())
```
3. Run the script:
```bash
python local_model.py
```
You can also use other local model servers with the OAICOMPAT provider, which enables compatibility with any API endpoint following the OpenAI API structure:
```python
agent = ComputerAgent(
model=LLM(
provider=LLMProvider.OAICOMPAT,
name="gemma-3-12b-it",
provider_base_url="http://localhost:1234/v1" # LM Studio endpoint
),
tools=[macos_computer]
)
```
Common local endpoints include:
- LM Studio: `http://localhost:1234/v1`
- vLLM: `http://localhost:8000/v1`
- LocalAI: `http://localhost:8080/v1`
- Ollama with OpenAI compat: `http://localhost:11434/v1`
This approach is perfect for:
- Development and testing without incurring API costs
- Offline or air-gapped environments where API access isn't possible
- Privacy-sensitive applications where data can't leave your network
- Experimenting with different models to find the best fit for your use case
## Deploying and Using UI-TARS
UI-TARS is ByteDance's Computer-Use model designed for navigating OS-level interfaces. It shows excellent performance on desktop OS tasks. To use UI-TARS, you'll first need to deploy the model.
### Deployment Options
1. **Local Deployment**: Follow the [UI-TARS deployment guide](https://github.com/bytedance/UI-TARS/blob/main/README_deploy.md) to run the model locally.
2. **Hugging Face Endpoint**: Deploy UI-TARS on Hugging Face Inference Endpoints, which will give you a URL like:
`https://**************.us-east-1.aws.endpoints.huggingface.cloud/v1`
3. **Using with cua-agent**: Once deployed, you can use UI-TARS with the cua-agent framework:
```python
agent = ComputerAgent(
model=LLM(
provider=LLMProvider.OAICOMPAT,
name="tgi",
provider_base_url="https://**************.us-east-1.aws.endpoints.huggingface.cloud/v1"
),
tools=[macos_computer]
)
```
UI-TARS is particularly useful for desktop automation tasks, as it shows the highest performance on OS-level benchmarks like OSworld and Windows Agent Arena.
## Understanding Agent Responses in Detail
The `run()` method of your agent yields structured responses that follow the OpenAI Agent SDK specification. This provides a rich set of information beyond just the basic action text:
```python
async for result in agent.run(task):
# Basic ID and text
print("Response ID:", result.get("id"))
print("Response Text:", result.get("text"))
# Token usage statistics
usage = result.get("usage")
if usage:
print("\nUsage Details:")
print(f" Input Tokens: {usage.get('input_tokens')}")
if "input_tokens_details" in usage:
print(f" Input Tokens Details: {usage.get('input_tokens_details')}")
print(f" Output Tokens: {usage.get('output_tokens')}")
if "output_tokens_details" in usage:
print(f" Output Tokens Details: {usage.get('output_tokens_details')}")
print(f" Total Tokens: {usage.get('total_tokens')}")
# Detailed reasoning and actions
outputs = result.get("output", [])
for output in outputs:
output_type = output.get("type")
if output_type == "reasoning":
print("\nReasoning:")
for summary in output.get("summary", []):
print(f" {summary.get('text')}")
elif output_type == "computer_call":
action = output.get("action", {})
print("\nComputer Action:")
print(f" Type: {action.get('type')}")
print(f" Position: ({action.get('x')}, {action.get('y')})")
if action.get("text"):
print(f" Text: {action.get('text')}")
```
This detailed information is invaluable for debugging, logging, and understanding the agent's decision-making process in an agentic system. More details can be found in the [OpenAI Agent SDK Specification](https://platform.openai.com/docs/guides/responses-vs-chat-completions).
## Building a Gradio UI
For a visual interface to your agent, the package also includes a Gradio UI:
**How to run the Gradio UI:**
1. Create a file named `launch_ui.py` with the following code:
```python
from agent.ui.gradio.app import create_gradio_ui
# Create and launch the UI
if __name__ == "__main__":
app = create_gradio_ui()
app.launch(share=False) # Set share=False for local access only
```
2. Install the UI dependencies if you haven't already:
```bash
pip install "cua-agent[ui]"
```
3. Run the script:
```bash
python launch_ui.py
```
4. Open your browser to the displayed URL (usually http://127.0.0.1:7860)
**Creating a Shareable Link (Optional):**
You can also create a temporary public URL to access your Gradio UI from anywhere:
```python
# In launch_ui.py
if __name__ == "__main__":
app = create_gradio_ui()
app.launch(share=True) # Creates a public link
```
When you run this, Gradio will display both a local URL and a public URL like:
```
Running on local URL: http://127.0.0.1:7860
Running on public URL: https://abcd1234.gradio.live
```
**Security Note:** Be cautious when sharing your Gradio UI publicly:
- The public URL gives anyone with the link full access to your agent
- Consider using basic authentication for additional protection:
```python
app.launch(share=True, auth=("username", "password"))
```
- Only use this feature for personal or team use, not for production environments
- The temporary link expires when you stop the Gradio application
This provides:
- Model provider selection
- Agent loop selection
- Task input field
- Real-time display of VM screenshots
- Action history
### Setting API Keys for the UI
To use the UI with different providers, set your API keys as environment variables:
```bash
# For OpenAI models
export OPENAI_API_KEY=your_openai_key_here
# For Anthropic models
export ANTHROPIC_API_KEY=your_anthropic_key_here
# Launch with both keys set
OPENAI_API_KEY=your_key ANTHROPIC_API_KEY=your_key python launch_ui.py
```
### UI Settings Persistence
The Gradio UI automatically saves your configuration to maintain your preferences between sessions:
- Settings like Agent Loop, Model Choice, Custom Base URL, and configuration options are saved to `.gradio_settings.json` in the project's root directory
- These settings are loaded automatically when you restart the UI
- API keys entered in the custom provider field are **not** saved for security reasons
- It's recommended to add `.gradio_settings.json` to your `.gitignore` file
## Advanced Example: GitHub Repository Workflow
Let's look at a more complex example that automates a GitHub workflow:
**How to run this advanced example:**
1. Create a file named `github_workflow.py` with the following code:
```python
import asyncio
import logging
from computer import Computer
from agent import ComputerAgent
async def github_workflow():
async with Computer(verbosity=logging.INFO) as macos_computer:
agent = ComputerAgent(
model="openai/computer-use-preview",
save_trajectory=True, # Save screenshots for debugging
only_n_most_recent_images=3, # Only keep last 3 images in context
verbosity=logging.INFO,
tools=[macos_computer]
)
tasks = [
"Look for a repository named trycua/cua on GitHub.",
"Check the open issues, open the most recent one and read it.",
"Clone the repository in users/lume/projects if it doesn't exist yet.",
"Open the repository with Cursor (on the dock, black background and white cube icon).",
"From Cursor, open Composer if not already open.",
"Focus on the Composer text area, then write and submit a task to help resolve the GitHub issue.",
]
for i, task in enumerate(tasks):
print(f"\nExecuting task {i+1}/{len(tasks)}: {task}")
async for result in agent.run(task):
print(f"Action: {result.get('text')}")
print(f"✅ Task {i+1}/{len(tasks)} completed")
if __name__ == "__main__":
asyncio.run(github_workflow())
```
2. Make sure your OpenAI API key is set:
```bash
export OPENAI_API_KEY=your_openai_key_here
```
3. Run the script:
```bash
python github_workflow.py
```
4. Watch as the agent completes the entire workflow:
- The agent will navigate to GitHub
- Find and investigate issues in the repository
- Clone the repository to the local machine
- Open it in Cursor
- Use Cursor's AI features to work on a solution
This example:
1. Searches GitHub for a repository
2. Reads an issue
3. Clones the repository
4. Opens it in an IDE
5. Uses AI to write a solution
## Comparing Implementation Approaches
Let's compare our manual implementation from Part 1 with the framework approach:
### Manual Implementation (Part 1)
- Required writing custom code for the interaction loop
- Needed explicit handling of different action types
- Required direct management of the OpenAI API calls
- Around 50-100 lines of code for basic functionality
- Limited to OpenAI's computer-use model
### Framework Implementation (Part 2)
- Abstracts the interaction loop
- Handles all action types automatically
- Manages API calls internally
- Only 10-15 lines of code for the same functionality
- Works with multiple model providers
- Includes UI capabilities
## Conclusion
The `cua-agent` framework transforms what was a complex implementation task into a simple, high-level interface for building Computer-Use Agents. By abstracting away the technical details, it lets you focus on defining the tasks rather than the machinery.
### When to Use Each Approach
- **Manual Implementation (Part 1)**: When you need complete control over the interaction loop or are implementing a custom solution
- **Framework (Part 2)**: For most applications where you want to quickly build and deploy Computer-Use Agents
### Next Steps
With the basics covered, you might want to explore:
- Customizing the agent's behavior with additional parameters
- Building more complex workflows spanning multiple applications
- Integrating your agent into other applications
- Contributing to the open-source project on GitHub
### Resources
- [cua-agent GitHub repository](https://github.com/trycua/cua/tree/main/libs/python/agent)
- [Agent Notebook Examples](https://github.com/trycua/cua/blob/main/notebooks/agent_nb.ipynb)
- [OpenAI Agent SDK Specification](https://platform.openai.com/docs/api-reference/responses)
- [Anthropic API Documentation](https://docs.anthropic.com/en/api/getting-started)
- [UI-TARS GitHub](https://github.com/ByteDance/UI-TARS)
- [OmniParser GitHub](https://github.com/microsoft/OmniParser)
```
--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/main.py:
--------------------------------------------------------------------------------
```python
import asyncio
import hashlib
import inspect
import json
import logging
import os
import platform
import time
import traceback
from contextlib import redirect_stderr, redirect_stdout
from io import StringIO
from typing import Any, Dict, List, Literal, Optional, Union, cast
import aiohttp
import uvicorn
from fastapi import (
FastAPI,
Header,
HTTPException,
Request,
WebSocket,
WebSocketDisconnect,
)
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from .browser import get_browser_manager
from .handlers.factory import HandlerFactory
# Authentication session TTL (in seconds). Override via env var CUA_AUTH_TTL_SECONDS. Default: 60s
AUTH_SESSION_TTL_SECONDS: int = int(os.environ.get("CUA_AUTH_TTL_SECONDS", "60"))
try:
from agent import ComputerAgent
HAS_AGENT = True
except ImportError:
HAS_AGENT = False
# Set up logging with more detail
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Configure WebSocket with larger message size
WEBSOCKET_MAX_SIZE = 1024 * 1024 * 10 # 10MB limit
# Configure application with WebSocket settings
app = FastAPI(
title="Computer API",
description="API for the Computer project",
version="0.1.0",
websocket_max_size=WEBSOCKET_MAX_SIZE,
)
# CORS configuration
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
protocol_version = 1
try:
from importlib.metadata import version
package_version = version("cua-computer-server")
except Exception:
# Fallback for cases where package is not installed or importlib.metadata is not available
try:
import pkg_resources
package_version = pkg_resources.get_distribution("cua-computer-server").version
except Exception:
package_version = "unknown"
(
accessibility_handler,
automation_handler,
diorama_handler,
file_handler,
desktop_handler,
window_handler,
) = HandlerFactory.create_handlers()
handlers = {
"version": lambda: {"protocol": protocol_version, "package": package_version},
# App-Use commands
"diorama_cmd": diorama_handler.diorama_cmd,
# Accessibility commands
"get_accessibility_tree": accessibility_handler.get_accessibility_tree,
"find_element": accessibility_handler.find_element,
# Shell commands
"run_command": automation_handler.run_command,
# File system commands
"file_exists": file_handler.file_exists,
"directory_exists": file_handler.directory_exists,
"list_dir": file_handler.list_dir,
"read_text": file_handler.read_text,
"write_text": file_handler.write_text,
"read_bytes": file_handler.read_bytes,
"write_bytes": file_handler.write_bytes,
"get_file_size": file_handler.get_file_size,
"delete_file": file_handler.delete_file,
"create_dir": file_handler.create_dir,
"delete_dir": file_handler.delete_dir,
# Desktop commands
"get_desktop_environment": desktop_handler.get_desktop_environment,
"set_wallpaper": desktop_handler.set_wallpaper,
# Window management
"open": window_handler.open,
"launch": window_handler.launch,
"get_current_window_id": window_handler.get_current_window_id,
"get_application_windows": window_handler.get_application_windows,
"get_window_name": window_handler.get_window_name,
"get_window_size": window_handler.get_window_size,
"get_window_position": window_handler.get_window_position,
"set_window_size": window_handler.set_window_size,
"set_window_position": window_handler.set_window_position,
"maximize_window": window_handler.maximize_window,
"minimize_window": window_handler.minimize_window,
"activate_window": window_handler.activate_window,
"close_window": window_handler.close_window,
# Mouse commands
"mouse_down": automation_handler.mouse_down,
"mouse_up": automation_handler.mouse_up,
"left_click": automation_handler.left_click,
"right_click": automation_handler.right_click,
"double_click": automation_handler.double_click,
"move_cursor": automation_handler.move_cursor,
"drag_to": automation_handler.drag_to,
"drag": automation_handler.drag,
# Keyboard commands
"key_down": automation_handler.key_down,
"key_up": automation_handler.key_up,
"type_text": automation_handler.type_text,
"press_key": automation_handler.press_key,
"hotkey": automation_handler.hotkey,
# Scrolling actions
"scroll": automation_handler.scroll,
"scroll_down": automation_handler.scroll_down,
"scroll_up": automation_handler.scroll_up,
# Screen actions
"screenshot": automation_handler.screenshot,
"get_cursor_position": automation_handler.get_cursor_position,
"get_screen_size": automation_handler.get_screen_size,
# Clipboard actions
"copy_to_clipboard": automation_handler.copy_to_clipboard,
"set_clipboard": automation_handler.set_clipboard,
}
class AuthenticationManager:
def __init__(self):
self.sessions: Dict[str, Dict[str, Any]] = {}
self.container_name = os.environ.get("CONTAINER_NAME")
def _hash_credentials(self, container_name: str, api_key: str) -> str:
"""Create a hash of container name and API key for session identification"""
combined = f"{container_name}:{api_key}"
return hashlib.sha256(combined.encode()).hexdigest()
def _is_session_valid(self, session_data: Dict[str, Any]) -> bool:
"""Check if a session is still valid based on expiration time"""
if not session_data.get("valid", False):
return False
expires_at = session_data.get("expires_at", 0)
return time.time() < expires_at
async def auth(self, container_name: str, api_key: str) -> bool:
"""Authenticate container name and API key, using cached sessions when possible"""
# If no CONTAINER_NAME is set, always allow access (local development)
if not self.container_name:
logger.info(
"No CONTAINER_NAME set in environment. Allowing access (local development mode)"
)
return True
# Layer 1: VM Identity Verification
if container_name != self.container_name:
logger.warning(
f"VM name mismatch. Expected: {self.container_name}, Got: {container_name}"
)
return False
# Create hash for session lookup
session_hash = self._hash_credentials(container_name, api_key)
# Check if we have a valid cached session
if session_hash in self.sessions:
session_data = self.sessions[session_hash]
if self._is_session_valid(session_data):
logger.info(f"Using cached authentication for container: {container_name}")
return session_data["valid"]
else:
# Remove expired session
del self.sessions[session_hash]
# No valid cached session, authenticate with API
logger.info(f"Authenticating with TryCUA API for container: {container_name}")
try:
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {api_key}"}
async with session.get(
f"https://www.cua.ai/api/vm/auth?container_name={container_name}",
headers=headers,
) as resp:
is_valid = resp.status == 200 and bool((await resp.text()).strip())
# Cache the result with configurable expiration
self.sessions[session_hash] = {
"valid": is_valid,
"expires_at": time.time() + AUTH_SESSION_TTL_SECONDS,
}
if is_valid:
logger.info(f"Authentication successful for container: {container_name}")
else:
logger.warning(
f"Authentication failed for container: {container_name}. Status: {resp.status}"
)
return is_valid
except aiohttp.ClientError as e:
logger.error(f"Failed to validate API key with TryCUA API: {str(e)}")
# Cache failed result to avoid repeated requests
self.sessions[session_hash] = {
"valid": False,
"expires_at": time.time() + AUTH_SESSION_TTL_SECONDS,
}
return False
except Exception as e:
logger.error(f"Unexpected error during authentication: {str(e)}")
# Cache failed result to avoid repeated requests
self.sessions[session_hash] = {
"valid": False,
"expires_at": time.time() + AUTH_SESSION_TTL_SECONDS,
}
return False
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
manager = ConnectionManager()
auth_manager = AuthenticationManager()
@app.get("/status")
async def status():
sys = platform.system().lower()
# get os type
if "darwin" in sys or sys == "macos" or sys == "mac":
os_type = "macos"
elif "windows" in sys:
os_type = "windows"
else:
os_type = "linux"
# get computer-server features
features = []
if HAS_AGENT:
features.append("agent")
return {"status": "ok", "os_type": os_type, "features": features}
@app.websocket("/ws", name="websocket_endpoint")
async def websocket_endpoint(websocket: WebSocket):
global handlers
# WebSocket message size is configured at the app or endpoint level, not on the instance
await manager.connect(websocket)
# Check if CONTAINER_NAME is set (indicating cloud provider)
server_container_name = os.environ.get("CONTAINER_NAME")
# If cloud provider, perform authentication handshake
if server_container_name:
try:
logger.info(
f"Cloud provider detected. CONTAINER_NAME: {server_container_name}. Waiting for authentication..."
)
# Wait for authentication message
auth_data = await websocket.receive_json()
# Validate auth message format
if auth_data.get("command") != "authenticate":
await websocket.send_json(
{"success": False, "error": "First message must be authentication"}
)
await websocket.close()
manager.disconnect(websocket)
return
# Extract credentials
client_api_key = auth_data.get("params", {}).get("api_key")
client_container_name = auth_data.get("params", {}).get("container_name")
# Validate credentials using AuthenticationManager
if not client_api_key:
await websocket.send_json({"success": False, "error": "API key required"})
await websocket.close()
manager.disconnect(websocket)
return
if not client_container_name:
await websocket.send_json({"success": False, "error": "Container name required"})
await websocket.close()
manager.disconnect(websocket)
return
# Use AuthenticationManager for validation
is_authenticated = await auth_manager.auth(client_container_name, client_api_key)
if not is_authenticated:
await websocket.send_json({"success": False, "error": "Authentication failed"})
await websocket.close()
manager.disconnect(websocket)
return
logger.info(f"Authentication successful for VM: {client_container_name}")
await websocket.send_json({"success": True, "message": "Authentication successful"})
except Exception as e:
logger.error(f"Error during authentication handshake: {str(e)}")
await websocket.send_json({"success": False, "error": "Authentication failed"})
await websocket.close()
manager.disconnect(websocket)
return
try:
while True:
try:
data = await websocket.receive_json()
command = data.get("command")
params = data.get("params", {})
if command not in handlers:
await websocket.send_json(
{"success": False, "error": f"Unknown command: {command}"}
)
continue
try:
# Filter params to only include those accepted by the handler function
handler_func = handlers[command]
sig = inspect.signature(handler_func)
filtered_params = {k: v for k, v in params.items() if k in sig.parameters}
# Handle both sync and async functions
if asyncio.iscoroutinefunction(handler_func):
result = await handler_func(**filtered_params)
else:
# Run sync functions in thread pool to avoid blocking event loop
result = await asyncio.to_thread(handler_func, **filtered_params)
await websocket.send_json({"success": True, **result})
except Exception as cmd_error:
logger.error(f"Error executing command {command}: {str(cmd_error)}")
logger.error(traceback.format_exc())
await websocket.send_json({"success": False, "error": str(cmd_error)})
except WebSocketDisconnect:
raise
except json.JSONDecodeError as json_err:
logger.error(f"JSON decode error: {str(json_err)}")
await websocket.send_json(
{"success": False, "error": f"Invalid JSON: {str(json_err)}"}
)
except Exception as loop_error:
logger.error(f"Error in message loop: {str(loop_error)}")
logger.error(traceback.format_exc())
await websocket.send_json({"success": False, "error": str(loop_error)})
except WebSocketDisconnect:
logger.info("Client disconnected")
manager.disconnect(websocket)
except Exception as e:
logger.error(f"Fatal error in websocket connection: {str(e)}")
logger.error(traceback.format_exc())
try:
await websocket.close()
except:
pass
manager.disconnect(websocket)
@app.post("/cmd")
async def cmd_endpoint(
request: Request,
container_name: Optional[str] = Header(None, alias="X-Container-Name"),
api_key: Optional[str] = Header(None, alias="X-API-Key"),
):
"""
Backup endpoint for when WebSocket connections fail.
Accepts commands via HTTP POST with streaming response.
Headers:
- X-Container-Name: Container name for cloud authentication
- X-API-Key: API key for cloud authentication
Body:
{
"command": "command_name",
"params": {...}
}
"""
global handlers
# Parse request body
try:
body = await request.json()
command = body.get("command")
params = body.get("params", {})
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid JSON body: {str(e)}")
if not command:
raise HTTPException(status_code=400, detail="Command is required")
# Check if CONTAINER_NAME is set (indicating cloud provider)
server_container_name = os.environ.get("CONTAINER_NAME")
# If cloud provider, perform authentication
if server_container_name:
logger.info(
f"Cloud provider detected. CONTAINER_NAME: {server_container_name}. Performing authentication..."
)
# Validate required headers
if not container_name:
raise HTTPException(status_code=401, detail="Container name required")
if not api_key:
raise HTTPException(status_code=401, detail="API key required")
# Validate with AuthenticationManager
is_authenticated = await auth_manager.auth(container_name, api_key)
if not is_authenticated:
raise HTTPException(status_code=401, detail="Authentication failed")
if command not in handlers:
raise HTTPException(status_code=400, detail=f"Unknown command: {command}")
async def generate_response():
"""Generate streaming response for the command execution"""
try:
# Filter params to only include those accepted by the handler function
handler_func = handlers[command]
sig = inspect.signature(handler_func)
filtered_params = {k: v for k, v in params.items() if k in sig.parameters}
# Handle both sync and async functions
if asyncio.iscoroutinefunction(handler_func):
result = await handler_func(**filtered_params)
else:
# Run sync functions in thread pool to avoid blocking event loop
result = await asyncio.to_thread(handler_func, **filtered_params)
# Stream the successful result
response_data = {"success": True, **result}
yield f"data: {json.dumps(response_data)}\n\n"
except Exception as cmd_error:
logger.error(f"Error executing command {command}: {str(cmd_error)}")
logger.error(traceback.format_exc())
# Stream the error result
error_data = {"success": False, "error": str(cmd_error)}
yield f"data: {json.dumps(error_data)}\n\n"
return StreamingResponse(
generate_response(),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
@app.post("/responses")
async def agent_response_endpoint(
request: Request,
api_key: Optional[str] = Header(None, alias="X-API-Key"),
):
"""
Minimal proxy to run ComputerAgent for up to 2 turns.
Security:
- If CONTAINER_NAME is set on the server, require X-API-Key
and validate using AuthenticationManager unless CUA_ENABLE_PUBLIC_PROXY is true.
Body JSON:
{
"model": "...", # required
"input": "... or messages[]", # required
"agent_kwargs": { ... }, # optional, passed directly to ComputerAgent
"env": { ... } # optional env overrides for agent
}
"""
if not HAS_AGENT:
raise HTTPException(status_code=501, detail="ComputerAgent not available")
# Authenticate via AuthenticationManager if running in cloud (CONTAINER_NAME set)
container_name = os.environ.get("CONTAINER_NAME")
if container_name:
is_public = os.environ.get("CUA_ENABLE_PUBLIC_PROXY", "").lower().strip() in [
"1",
"true",
"yes",
"y",
"on",
]
if not is_public:
if not api_key:
raise HTTPException(status_code=401, detail="Missing AGENT PROXY auth headers")
ok = await auth_manager.auth(container_name, api_key)
if not ok:
raise HTTPException(status_code=401, detail="Unauthorized")
# Parse request body
try:
body = await request.json()
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid JSON body: {str(e)}")
model = body.get("model")
input_data = body.get("input")
if not model or input_data is None:
raise HTTPException(status_code=400, detail="'model' and 'input' are required")
agent_kwargs: Dict[str, Any] = body.get("agent_kwargs") or {}
env_overrides: Dict[str, str] = body.get("env") or {}
# Simple env override context
class _EnvOverride:
def __init__(self, overrides: Dict[str, str]):
self.overrides = overrides
self._original: Dict[str, Optional[str]] = {}
def __enter__(self):
for k, v in (self.overrides or {}).items():
self._original[k] = os.environ.get(k)
os.environ[k] = str(v)
def __exit__(self, exc_type, exc, tb):
for k, old in self._original.items():
if old is None:
os.environ.pop(k, None)
else:
os.environ[k] = old
# Convert input to messages
def _to_messages(data: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
if isinstance(data, str):
return [{"role": "user", "content": data}]
if isinstance(data, list):
return data
messages = _to_messages(input_data)
# Define a direct computer tool that implements the AsyncComputerHandler protocol
# and delegates to our existing automation/file/accessibility handlers.
from agent.computers import AsyncComputerHandler # runtime-checkable Protocol
class DirectComputer(AsyncComputerHandler):
def __init__(self):
# use module-scope handler singletons created by HandlerFactory
self._auto = automation_handler
self._file = file_handler
self._access = accessibility_handler
async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]:
sys = platform.system().lower()
if "darwin" in sys or sys in ("macos", "mac"):
return "mac"
if "windows" in sys:
return "windows"
return "linux"
async def get_dimensions(self) -> tuple[int, int]:
size = await self._auto.get_screen_size()
return size["width"], size["height"]
async def screenshot(self) -> str:
img_b64 = await self._auto.screenshot()
return img_b64["image_data"]
async def click(self, x: int, y: int, button: str = "left") -> None:
if button == "left":
await self._auto.left_click(x, y)
elif button == "right":
await self._auto.right_click(x, y)
else:
await self._auto.left_click(x, y)
async def double_click(self, x: int, y: int) -> None:
await self._auto.double_click(x, y)
async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
await self._auto.move_cursor(x, y)
await self._auto.scroll(scroll_x, scroll_y)
async def type(self, text: str) -> None:
await self._auto.type_text(text)
async def wait(self, ms: int = 1000) -> None:
await asyncio.sleep(ms / 1000.0)
async def move(self, x: int, y: int) -> None:
await self._auto.move_cursor(x, y)
async def keypress(self, keys: Union[List[str], str]) -> None:
if isinstance(keys, str):
parts = keys.replace("-", "+").split("+") if len(keys) > 1 else [keys]
else:
parts = keys
if len(parts) == 1:
await self._auto.press_key(parts[0])
else:
await self._auto.hotkey(parts)
async def drag(self, path: List[Dict[str, int]]) -> None:
if not path:
return
start = path[0]
await self._auto.mouse_down(start["x"], start["y"])
for pt in path[1:]:
await self._auto.move_cursor(pt["x"], pt["y"])
end = path[-1]
await self._auto.mouse_up(end["x"], end["y"])
async def get_current_url(self) -> str:
# Not available in this server context
return ""
async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
await self._auto.mouse_down(x, y, button="left")
async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
await self._auto.mouse_up(x, y, button="left")
# # Inline image URLs to base64
# import base64, mimetypes, requests
# # Use a browser-like User-Agent to avoid 403s from some CDNs (e.g., Wikimedia)
# HEADERS = {
# "User-Agent": (
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
# "AppleWebKit/537.36 (KHTML, like Gecko) "
# "Chrome/124.0.0.0 Safari/537.36"
# )
# }
# def _to_data_url(content_bytes: bytes, url: str, resp: requests.Response) -> str:
# ctype = resp.headers.get("Content-Type") or mimetypes.guess_type(url)[0] or "application/octet-stream"
# b64 = base64.b64encode(content_bytes).decode("utf-8")
# return f"data:{ctype};base64,{b64}"
# def inline_image_urls(messages):
# # messages: List[{"role": "...","content":[...]}]
# out = []
# for m in messages:
# if not isinstance(m.get("content"), list):
# out.append(m)
# continue
# new_content = []
# for part in (m.get("content") or []):
# if part.get("type") == "input_image" and (url := part.get("image_url")):
# resp = requests.get(url, headers=HEADERS, timeout=30)
# resp.raise_for_status()
# new_content.append({
# "type": "input_image",
# "image_url": _to_data_url(resp.content, url, resp)
# })
# else:
# new_content.append(part)
# out.append({**m, "content": new_content})
# return out
# messages = inline_image_urls(messages)
error = None
with _EnvOverride(env_overrides):
# Prepare tools: if caller did not pass tools, inject our DirectComputer
tools = agent_kwargs.get("tools")
if not tools:
tools = [DirectComputer()]
agent_kwargs = {**agent_kwargs, "tools": tools}
# Instantiate agent with our tools
agent = ComputerAgent(model=model, **agent_kwargs) # type: ignore[arg-type]
total_output: List[Any] = []
total_usage: Dict[str, Any] = {}
pending_computer_call_ids = set()
try:
async for result in agent.run(messages):
total_output += result["output"]
# Try to collect usage if present
if (
isinstance(result, dict)
and "usage" in result
and isinstance(result["usage"], dict)
):
# Merge usage counters
for k, v in result["usage"].items():
if isinstance(v, (int, float)):
total_usage[k] = total_usage.get(k, 0) + v
else:
total_usage[k] = v
for msg in result.get("output", []):
if msg.get("type") == "computer_call":
pending_computer_call_ids.add(msg["call_id"])
elif msg.get("type") == "computer_call_output":
pending_computer_call_ids.discard(msg["call_id"])
# exit if no pending computer calls
if not pending_computer_call_ids:
break
except Exception as e:
logger.error(f"Error running agent: {str(e)}")
logger.error(traceback.format_exc())
error = str(e)
# Build response payload
payload = {
"model": model,
"error": error,
"output": total_output,
"usage": total_usage,
"status": "completed" if not error else "failed",
}
# CORS: allow any origin
headers = {
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
return JSONResponse(content=payload, headers=headers)
@app.post("/playwright_exec")
async def playwright_exec_endpoint(
request: Request,
container_name: Optional[str] = Header(None, alias="X-Container-Name"),
api_key: Optional[str] = Header(None, alias="X-API-Key"),
):
"""
Execute Playwright browser commands.
Headers:
- X-Container-Name: Container name for cloud authentication
- X-API-Key: API key for cloud authentication
Body:
{
"command": "visit_url|click|type|scroll|web_search",
"params": {...}
}
"""
# Parse request body
try:
body = await request.json()
command = body.get("command")
params = body.get("params", {})
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid JSON body: {str(e)}")
if not command:
raise HTTPException(status_code=400, detail="Command is required")
# Check if CONTAINER_NAME is set (indicating cloud provider)
server_container_name = os.environ.get("CONTAINER_NAME")
# If cloud provider, perform authentication
if server_container_name:
logger.info(
f"Cloud provider detected. CONTAINER_NAME: {server_container_name}. Performing authentication..."
)
# Validate required headers
if not container_name:
raise HTTPException(status_code=401, detail="Container name required")
if not api_key:
raise HTTPException(status_code=401, detail="API key required")
# Validate with AuthenticationManager
is_authenticated = await auth_manager.auth(container_name, api_key)
if not is_authenticated:
raise HTTPException(status_code=401, detail="Authentication failed")
# Get browser manager and execute command
try:
browser_manager = get_browser_manager()
result = await browser_manager.execute_command(command, params)
if result.get("success"):
return JSONResponse(content=result)
else:
raise HTTPException(status_code=400, detail=result.get("error", "Command failed"))
except Exception as e:
logger.error(f"Error executing playwright command: {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/human_tool/ui.py:
--------------------------------------------------------------------------------
```python
import base64
import io
import json
import time
from datetime import datetime
from typing import Any, Dict, List, Optional
import gradio as gr
import requests
from PIL import Image
from .server import completion_queue
class HumanCompletionUI:
def __init__(self, server_url: str = "http://localhost:8002"):
self.server_url = server_url
self.current_call_id: Optional[str] = None
self.refresh_interval = 2.0 # seconds
self.last_image = None # Store the last image for display
# Track current interactive action controls
self.current_action_type: str = "click"
self.current_button: str = "left"
self.current_scroll_x: int = 0
self.current_scroll_y: int = -120
def format_messages_for_chatbot(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format messages for display in gr.Chatbot with type='messages'."""
formatted = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
tool_calls = msg.get("tool_calls", [])
# Handle different content formats
if isinstance(content, list):
# Multi-modal content - can include text and images
formatted_content = []
for item in content:
if item.get("type") == "text":
text = item.get("text", "")
if text.strip(): # Only add non-empty text
formatted_content.append(text)
elif item.get("type") == "image_url":
image_url = item.get("image_url", {}).get("url", "")
if image_url:
# Check if it's a base64 image or URL
if image_url.startswith("data:image"):
# For base64 images, decode and create gr.Image
try:
header, data = image_url.split(",", 1)
image_data = base64.b64decode(data)
image = Image.open(io.BytesIO(image_data))
formatted_content.append(gr.Image(value=image))
except Exception as e:
print(f"Error loading image: {e}")
formatted_content.append(f"[Image loading error: {e}]")
else:
# For URL images, create gr.Image with URL
formatted_content.append(gr.Image(value=image_url))
# Determine final content format
if len(formatted_content) == 1:
content = formatted_content[0]
elif len(formatted_content) > 1:
content = formatted_content
else:
content = "[Empty content]"
# Ensure role is valid for Gradio Chatbot
if role not in ["user", "assistant"]:
role = "assistant" if role == "system" else "user"
# Invert roles for better display in human UI context
# (what the AI says becomes "user", what human should respond becomes "assistant")
if role == "user":
role = "assistant"
else:
role = "user"
# Add the main message if it has content
if content and str(content).strip():
formatted.append({"role": role, "content": content})
# Handle tool calls - create separate messages for each tool call
if tool_calls:
for tool_call in tool_calls:
function_name = tool_call.get("function", {}).get("name", "unknown")
arguments_str = tool_call.get("function", {}).get("arguments", "{}")
try:
# Parse arguments to format them nicely
arguments = json.loads(arguments_str)
formatted_args = json.dumps(arguments, indent=2)
except json.JSONDecodeError:
# If parsing fails, use the raw string
formatted_args = arguments_str
# Create a formatted message for the tool call
tool_call_content = f"```json\n{formatted_args}\n```"
formatted.append(
{
"role": role,
"content": tool_call_content,
"metadata": {"title": f"🛠️ Used {function_name}"},
}
)
return formatted
def get_pending_calls(self) -> List[Dict[str, Any]]:
"""Get pending calls from the server."""
try:
response = requests.get(f"{self.server_url}/pending", timeout=5)
if response.status_code == 200:
return response.json().get("pending_calls", [])
except Exception as e:
print(f"Error fetching pending calls: {e}")
return []
def complete_call_with_response(self, call_id: str, response: str) -> bool:
"""Complete a call with a text response."""
try:
response_data = {"response": response}
response_obj = requests.post(
f"{self.server_url}/complete/{call_id}", json=response_data, timeout=10
)
response_obj.raise_for_status()
return True
except requests.RequestException as e:
print(f"Error completing call: {e}")
return False
def complete_call_with_tool_calls(self, call_id: str, tool_calls: List[Dict[str, Any]]) -> bool:
"""Complete a call with tool calls."""
try:
response_data = {"tool_calls": tool_calls}
response_obj = requests.post(
f"{self.server_url}/complete/{call_id}", json=response_data, timeout=10
)
response_obj.raise_for_status()
return True
except requests.RequestException as e:
print(f"Error completing call: {e}")
return False
def complete_call(
self,
call_id: str,
response: Optional[str] = None,
tool_calls: Optional[List[Dict[str, Any]]] = None,
) -> bool:
"""Complete a call with either a response or tool calls."""
try:
response_data = {}
if response:
response_data["response"] = response
if tool_calls:
response_data["tool_calls"] = tool_calls
response_obj = requests.post(
f"{self.server_url}/complete/{call_id}", json=response_data, timeout=10
)
response_obj.raise_for_status()
return True
except requests.RequestException as e:
print(f"Error completing call: {e}")
return False
def get_last_image_from_messages(self, messages: List[Dict[str, Any]]) -> Optional[Any]:
"""Extract the last image from the messages for display above conversation."""
last_image = None
for msg in reversed(messages): # Start from the last message
content = msg.get("content", "")
if isinstance(content, list):
for item in reversed(content): # Get the last image in the message
if item.get("type") == "image_url":
image_url = item.get("image_url", {}).get("url", "")
if image_url:
if image_url.startswith("data:image"):
# For base64 images, create a gr.Image component
try:
header, data = image_url.split(",", 1)
image_data = base64.b64decode(data)
image = Image.open(io.BytesIO(image_data))
return image
except Exception as e:
print(f"Error loading image: {e}")
continue
else:
# For URL images, return the URL
return image_url
return last_image
def refresh_pending_calls(self):
"""Refresh the list of pending calls."""
pending_calls = self.get_pending_calls()
if not pending_calls:
return (
gr.update(choices=["latest"], value="latest"), # dropdown
gr.update(value=None), # image (no image)
gr.update(value=[]), # chatbot (empty messages)
gr.update(interactive=False), # submit button
gr.update(visible=False), # click_actions_group hidden
gr.update(visible=False), # actions_group hidden
)
# Sort pending calls by created_at to get oldest first
sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", ""))
# Create choices for dropdown
choices = [("latest", "latest")] # Add "latest" option first
for call in sorted_calls:
call_id = call["id"]
model = call.get("model", "unknown")
created_at = call.get("created_at", "")
# Format timestamp
try:
dt = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
time_str = dt.strftime("%H:%M:%S")
except:
time_str = created_at
choice_label = f"{call_id[:8]}... ({model}) - {time_str}"
choices.append((choice_label, call_id))
# Default to "latest" which shows the oldest pending conversation
selected_call_id = "latest"
if selected_call_id == "latest" and sorted_calls:
# Use the oldest call (first in sorted list)
selected_call = sorted_calls[0]
conversation = self.format_messages_for_chatbot(selected_call.get("messages", []))
self.current_call_id = selected_call["id"]
# Get the last image from messages
self.last_image = self.get_last_image_from_messages(selected_call.get("messages", []))
else:
conversation = []
self.current_call_id = None
self.last_image = None
return (
gr.update(choices=choices, value="latest"),
gr.update(value=self.last_image),
gr.update(value=conversation),
gr.update(interactive=bool(choices)),
gr.update(visible=True), # click_actions_group visible when there is a call
gr.update(visible=True), # actions_group visible when there is a call
)
def on_call_selected(self, selected_choice):
"""Handle when a call is selected from the dropdown."""
if not selected_choice:
return (
gr.update(value=None), # no image
gr.update(value=[]), # empty chatbot
gr.update(interactive=False),
gr.update(visible=False), # click_actions_group hidden
gr.update(visible=False), # actions_group hidden
)
pending_calls = self.get_pending_calls()
if not pending_calls:
return (
gr.update(value=None), # no image
gr.update(value=[]), # empty chatbot
gr.update(interactive=False),
gr.update(visible=False), # click_actions_group hidden
gr.update(visible=False), # actions_group hidden
)
# Handle "latest" option
if selected_choice == "latest":
# Sort calls by created_at to get oldest first
sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", ""))
selected_call = sorted_calls[0] # Get the oldest call
call_id = selected_call["id"]
else:
# Extract call_id from the choice for specific calls
call_id = None
for call in pending_calls:
call_id_short = call["id"][:8]
if call_id_short in selected_choice:
call_id = call["id"]
break
if not call_id:
return (
gr.update(value=None), # no image
gr.update(value=[]), # empty chatbot
gr.update(interactive=False),
)
# Find the selected call
selected_call = next((c for c in pending_calls if c["id"] == call_id), None)
if not selected_call:
return (
gr.update(value=None), # no image
gr.update(value=[]), # empty chatbot
gr.update(interactive=False),
gr.update(visible=False), # click_actions_group hidden
gr.update(visible=False), # actions_group hidden
)
conversation = self.format_messages_for_chatbot(selected_call.get("messages", []))
self.current_call_id = call_id
# Get the last image from messages
self.last_image = self.get_last_image_from_messages(selected_call.get("messages", []))
return (
gr.update(value=self.last_image),
gr.update(value=conversation),
gr.update(interactive=True),
gr.update(visible=True), # click_actions_group visible
gr.update(visible=True), # actions_group visible
)
def submit_response(self, response_text: str):
"""Submit a text response to the current call."""
if not self.current_call_id:
return (
gr.update(value=response_text), # keep response text
gr.update(value="❌ No call selected"), # status
)
if not response_text.strip():
return (
gr.update(value=response_text), # keep response text
gr.update(value="❌ Response cannot be empty"), # status
)
success = self.complete_call_with_response(self.current_call_id, response_text)
if success:
status_msg = "✅ Response submitted successfully!"
return (
gr.update(value=""), # clear response text
gr.update(value=status_msg), # status
)
else:
return (
gr.update(value=response_text), # keep response text
gr.update(value="❌ Failed to submit response"), # status
)
def submit_action(self, action_type: str, **kwargs) -> str:
"""Submit a computer action as a tool call."""
if not self.current_call_id:
return "❌ No call selected"
import uuid
# Create tool call structure
action_data = {"type": action_type, **kwargs}
tool_call = {
"id": f"call_{uuid.uuid4().hex[:24]}",
"type": "function",
"function": {"name": "computer", "arguments": json.dumps(action_data)},
}
success = self.complete_call_with_tool_calls(self.current_call_id, [tool_call])
if success:
return f"✅ {action_type.capitalize()} action submitted as tool call"
else:
return f"❌ Failed to submit {action_type} action"
def submit_click_action(
self, x: int, y: int, action_type: str = "click", button: str = "left"
) -> str:
"""Submit a coordinate-based action."""
if action_type == "click":
return self.submit_action(action_type, x=x, y=y, button=button)
else:
return self.submit_action(action_type, x=x, y=y)
def submit_type_action(self, text: str) -> str:
"""Submit a type action."""
return self.submit_action("type", text=text)
def submit_hotkey_action(self, keys: str) -> str:
"""Submit a hotkey action."""
return self.submit_action("keypress", keys=keys)
def submit_wait_action(self) -> str:
"""Submit a wait action with no kwargs."""
return self.submit_action("wait")
def submit_description_click(
self, description: str, action_type: str = "click", button: str = "left"
) -> str:
"""Submit a description-based action."""
if action_type == "click":
return self.submit_action(action_type, element_description=description, button=button)
else:
return self.submit_action(action_type, element_description=description)
def wait_for_pending_calls(self, max_seconds: float = 10.0, check_interval: float = 0.2):
"""Wait for pending calls to appear or until max_seconds elapsed.
This method loops and checks for pending calls at regular intervals,
returning as soon as a pending call is found or the maximum wait time is reached.
Args:
max_seconds: Maximum number of seconds to wait
check_interval: How often to check for pending calls (in seconds)
"""
import time
start_time = time.time()
while time.time() - start_time < max_seconds:
# Check if there are any pending calls
pending_calls = self.get_pending_calls()
if pending_calls:
# Found pending calls, return immediately
return self.refresh_pending_calls()
# Wait before checking again
time.sleep(check_interval)
# Max wait time reached, return current state
return self.refresh_pending_calls()
def create_ui():
"""Create the Gradio interface."""
ui_handler = HumanCompletionUI()
with gr.Blocks(title="Human-in-the-Loop Agent Tool", fill_width=True) as demo:
gr.Markdown("# 🤖 Human-in-the-Loop Agent Tool")
gr.Markdown("Review AI conversation requests and provide human responses.")
with gr.Row():
with gr.Column(scale=2):
with gr.Group():
screenshot_image = gr.Image(
label="Interactive Screenshot", interactive=False, height=600
)
# Action type selection for image clicks (wrapped for visibility control)
with gr.Group(visible=False) as click_actions_group:
with gr.Row():
action_type_radio = gr.Dropdown(
label="Interactive Action",
choices=[
"click",
"double_click",
"move",
"left_mouse_up",
"left_mouse_down",
"scroll",
],
value="click",
scale=2,
)
action_button_radio = gr.Dropdown(
label="Button",
choices=["left", "right", "wheel", "back", "forward"],
value="left",
visible=True,
scale=1,
)
scroll_x_input = gr.Number(
label="scroll_x", value=0, visible=False, scale=1
)
scroll_y_input = gr.Number(
label="scroll_y", value=-120, visible=False, scale=1
)
conversation_chatbot = gr.Chatbot(
label="Conversation", type="messages", height=500, show_copy_button=True
)
with gr.Column(scale=1):
with gr.Group():
call_dropdown = gr.Dropdown(
label="Select a pending conversation request",
choices=["latest"],
interactive=True,
value="latest",
)
refresh_btn = gr.Button("🔄 Refresh", variant="secondary")
status_display = gr.Textbox(
label="Status", interactive=False, value="Ready to receive requests..."
)
with gr.Group():
response_text = gr.Textbox(
label="Message", lines=3, placeholder="Enter your message here..."
)
submit_btn = gr.Button(
"📤 Submit Message", variant="primary", interactive=False
)
# Action Accordions (wrapped for visibility control)
with gr.Group(visible=False) as actions_group:
with gr.Tabs():
with gr.Tab("🖱️ Click Actions"):
with gr.Group():
description_text = gr.Textbox(
label="Element Description",
placeholder="e.g., 'Privacy and security option in left sidebar'",
)
with gr.Row():
description_action_type = gr.Dropdown(
label="Action",
choices=[
"click",
"double_click",
"move",
"left_mouse_up",
"left_mouse_down",
],
value="click",
)
description_button = gr.Dropdown(
label="Button",
choices=["left", "right", "wheel", "back", "forward"],
value="left",
)
description_submit_btn = gr.Button("Submit Click Action")
with gr.Tab("📝 Type Action"):
with gr.Group():
type_text = gr.Textbox(
label="Text to Type", placeholder="Enter text to type..."
)
type_submit_btn = gr.Button("Submit Type")
with gr.Tab("⌨️ Keypress Action"):
with gr.Group():
keypress_text = gr.Textbox(
label="Keys", placeholder="e.g., ctrl+c, alt+tab"
)
keypress_submit_btn = gr.Button("Submit Keypress")
with gr.Tab("🧰 Misc Actions"):
with gr.Group():
misc_action_dropdown = gr.Dropdown(
label="Action", choices=["wait"], value="wait"
)
misc_submit_btn = gr.Button("Submit Action")
# Event handlers
refresh_btn.click(
fn=ui_handler.refresh_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
call_dropdown.change(
fn=ui_handler.on_call_selected,
inputs=[call_dropdown],
outputs=[
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
def handle_image_click(evt: gr.SelectData):
if evt.index is not None:
x, y = evt.index
action_type = ui_handler.current_action_type or "click"
button = ui_handler.current_button or "left"
if action_type == "scroll":
sx_i = int(ui_handler.current_scroll_x or 0)
sy_i = int(ui_handler.current_scroll_y or 0)
# Submit a scroll action with x,y position and scroll deltas
result = ui_handler.submit_action(
"scroll", x=x, y=y, scroll_x=sx_i, scroll_y=sy_i
)
else:
result = ui_handler.submit_click_action(x, y, action_type, button)
ui_handler.wait_for_pending_calls()
return result
return "No coordinates selected"
screenshot_image.select(fn=handle_image_click, outputs=[status_display]).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
# Response submission
submit_btn.click(
fn=ui_handler.submit_response,
inputs=[response_text],
outputs=[response_text, status_display],
).then(
fn=ui_handler.refresh_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
# Toggle visibility of controls based on action type
def toggle_action_controls(action_type):
# Button visible only for click
button_vis = gr.update(visible=(action_type == "click"))
# Scroll inputs visible only for scroll
scroll_x_vis = gr.update(visible=(action_type == "scroll"))
scroll_y_vis = gr.update(visible=(action_type == "scroll"))
# Update state
ui_handler.current_action_type = action_type or "click"
return button_vis, scroll_x_vis, scroll_y_vis
action_type_radio.change(
fn=toggle_action_controls,
inputs=[action_type_radio],
outputs=[action_button_radio, scroll_x_input, scroll_y_input],
)
# Keep other control values in ui_handler state
def on_button_change(val):
ui_handler.current_button = val or "left"
action_button_radio.change(fn=on_button_change, inputs=[action_button_radio])
def on_scroll_x_change(val):
try:
ui_handler.current_scroll_x = int(val) if val is not None else 0
except Exception:
ui_handler.current_scroll_x = 0
scroll_x_input.change(fn=on_scroll_x_change, inputs=[scroll_x_input])
def on_scroll_y_change(val):
try:
ui_handler.current_scroll_y = int(val) if val is not None else 0
except Exception:
ui_handler.current_scroll_y = 0
scroll_y_input.change(fn=on_scroll_y_change, inputs=[scroll_y_input])
type_submit_btn.click(
fn=ui_handler.submit_type_action, inputs=[type_text], outputs=[status_display]
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
keypress_submit_btn.click(
fn=ui_handler.submit_hotkey_action, inputs=[keypress_text], outputs=[status_display]
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
def handle_description_submit(description, action_type, button):
if description:
result = ui_handler.submit_description_click(description, action_type, button)
ui_handler.wait_for_pending_calls()
return result
return "Please enter a description"
description_submit_btn.click(
fn=handle_description_submit,
inputs=[description_text, description_action_type, description_button],
outputs=[status_display],
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
# Misc action handler
def handle_misc_submit(selected_action):
if selected_action == "wait":
result = ui_handler.submit_wait_action()
ui_handler.wait_for_pending_calls()
return result
return f"Unsupported misc action: {selected_action}"
misc_submit_btn.click(
fn=handle_misc_submit, inputs=[misc_action_dropdown], outputs=[status_display]
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
# Load initial data
demo.load(
fn=ui_handler.refresh_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
return demo
if __name__ == "__main__":
demo = create_ui()
demo.queue()
demo.launch(server_name="0.0.0.0", server_port=7860)
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/uitars.py:
--------------------------------------------------------------------------------
```python
"""
UITARS agent loop implementation using liteLLM for ByteDance-Seed/UI-TARS-1.5-7B
Paper: https://arxiv.org/abs/2501.12326
Code: https://github.com/bytedance/UI-TARS
"""
import ast
import asyncio
import base64
import json
import math
import re
from ctypes import cast
from io import BytesIO
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union
import litellm
from litellm.responses.litellm_completion_transformation.transformation import (
LiteLLMCompletionResponsesConfig,
)
from litellm.responses.utils import Usage
from litellm.types.utils import ModelResponse
from openai.types.responses.response_computer_tool_call_param import (
ActionType,
ResponseComputerToolCallParam,
)
from openai.types.responses.response_input_param import ComputerCallOutput
from openai.types.responses.response_output_message_param import (
ResponseOutputMessageParam,
)
from openai.types.responses.response_reasoning_item_param import (
ResponseReasoningItemParam,
Summary,
)
from PIL import Image
from ..decorators import register_agent
from ..responses import (
make_click_item,
make_double_click_item,
make_drag_item,
make_input_image_item,
make_keypress_item,
make_output_text_item,
make_reasoning_item,
make_scroll_item,
make_type_item,
make_wait_item,
)
from ..types import AgentCapability, AgentResponse, Messages, Tools
# Constants from reference code
IMAGE_FACTOR = 28
MIN_PIXELS = 100 * 28 * 28
MAX_PIXELS = 16384 * 28 * 28
MAX_RATIO = 200
FINISH_WORD = "finished"
WAIT_WORD = "wait"
ENV_FAIL_WORD = "error_env"
CALL_USER = "call_user"
# Action space prompt for UITARS
UITARS_ACTION_SPACE = """
click(start_box='<|box_start|>(x1,y1)<|box_end|>')
left_double(start_box='<|box_start|>(x1,y1)<|box_end|>')
right_single(start_box='<|box_start|>(x1,y1)<|box_end|>')
drag(start_box='<|box_start|>(x1,y1)<|box_end|>', end_box='<|box_start|>(x3,y3)<|box_end|>')
hotkey(key='')
type(content='') #If you want to submit your input, use "\\n" at the end of `content`.
scroll(start_box='<|box_start|>(x1,y1)<|box_end|>', direction='down or up or right or left')
wait() #Sleep for 5s and take a screenshot to check for any changes.
finished(content='xxx') # Use escape characters \\', \\", and \\n in content part to ensure we can parse the content in normal python string format.
"""
UITARS_PROMPT_TEMPLATE = """You are a GUI agent. You are given a task and your action history, with screenshots. You need to perform the next action to complete the task.
## Output Format
```
Thought: ...
Action: ...
```
## Action Space
{action_space}
## Note
- Use {language} in `Thought` part.
- Write a small plan and finally summarize your next action (with its target element) in one sentence in `Thought` part.
## User Instruction
{instruction}
"""
GROUNDING_UITARS_PROMPT_TEMPLATE = """You are a GUI agent. You are given a task and your action history, with screenshots. You need to perform the next action to complete the task.
## Output Format
Action: ...
## Action Space
click(point='<|box_start|>(x1,y1)<|box_end|>')
## User Instruction
{instruction}"""
def round_by_factor(number: float, factor: int) -> int:
"""Returns the closest integer to 'number' that is divisible by 'factor'."""
return round(number / factor) * factor
def ceil_by_factor(number: float, factor: int) -> int:
"""Returns the smallest integer greater than or equal to 'number' that is divisible by 'factor'."""
return math.ceil(number / factor) * factor
def floor_by_factor(number: float, factor: int) -> int:
"""Returns the largest integer less than or equal to 'number' that is divisible by 'factor'."""
return math.floor(number / factor) * factor
def smart_resize(
height: int,
width: int,
factor: int = IMAGE_FACTOR,
min_pixels: int = MIN_PIXELS,
max_pixels: int = MAX_PIXELS,
) -> tuple[int, int]:
"""
Rescales the image so that the following conditions are met:
1. Both dimensions (height and width) are divisible by 'factor'.
2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
3. The aspect ratio of the image is maintained as closely as possible.
"""
if max(height, width) / min(height, width) > MAX_RATIO:
raise ValueError(
f"absolute aspect ratio must be smaller than {MAX_RATIO}, got {max(height, width) / min(height, width)}"
)
h_bar = max(factor, round_by_factor(height, factor))
w_bar = max(factor, round_by_factor(width, factor))
if h_bar * w_bar > max_pixels:
beta = math.sqrt((height * width) / max_pixels)
h_bar = floor_by_factor(height / beta, factor)
w_bar = floor_by_factor(width / beta, factor)
elif h_bar * w_bar < min_pixels:
beta = math.sqrt(min_pixels / (height * width))
h_bar = ceil_by_factor(height * beta, factor)
w_bar = ceil_by_factor(width * beta, factor)
return h_bar, w_bar
def escape_single_quotes(text):
"""Escape single quotes in text for safe string formatting."""
pattern = r"(?<!\\)'"
return re.sub(pattern, r"\\'", text)
def parse_action(action_str):
"""Parse action string into structured format."""
try:
node = ast.parse(action_str, mode="eval")
if not isinstance(node, ast.Expression):
raise ValueError("Not an expression")
call = node.body
if not isinstance(call, ast.Call):
raise ValueError("Not a function call")
# Get function name
if isinstance(call.func, ast.Name):
func_name = call.func.id
elif isinstance(call.func, ast.Attribute):
func_name = call.func.attr
else:
func_name = None
# Get keyword arguments
kwargs = {}
for kw in call.keywords:
key = kw.arg
if isinstance(kw.value, ast.Constant):
value = kw.value.value
elif isinstance(kw.value, ast.Str): # Compatibility with older Python
value = kw.value.s
else:
value = None
kwargs[key] = value
return {"function": func_name, "args": kwargs}
except Exception as e:
print(f"Failed to parse action '{action_str}': {e}")
return None
def parse_uitars_response(text: str, image_width: int, image_height: int) -> List[Dict[str, Any]]:
"""Parse UITARS model response into structured actions."""
text = text.strip()
# Extract thought
thought = None
if text.startswith("Thought:"):
thought_match = re.search(r"Thought: (.+?)(?=\s*Action:|$)", text, re.DOTALL)
if thought_match:
thought = thought_match.group(1).strip()
# Extract action
if "Action:" not in text:
raise ValueError("No Action found in response")
action_str = text.split("Action:")[-1].strip()
# Handle special case for type actions
if "type(content" in action_str:
def escape_quotes(match):
return match.group(1)
pattern = r"type\(content='(.*?)'\)"
content = re.sub(pattern, escape_quotes, action_str)
action_str = escape_single_quotes(content)
action_str = "type(content='" + action_str + "')"
# Parse the action
parsed_action = parse_action(action_str.replace("\n", "\\n").lstrip())
if parsed_action is None:
raise ValueError(f"Action can't parse: {action_str}")
action_type = parsed_action["function"]
params = parsed_action["args"]
# Process parameters
action_inputs = {}
for param_name, param in params.items():
if param == "":
continue
param = str(param).lstrip()
action_inputs[param_name.strip()] = param
# Handle coordinate parameters
if "start_box" in param_name or "end_box" in param_name:
# Parse coordinates like '<|box_start|>(x,y)<|box_end|>' or '(x,y)'
# First, remove special tokens
clean_param = param.replace("<|box_start|>", "").replace("<|box_end|>", "")
# Then remove parentheses and split
numbers = clean_param.replace("(", "").replace(")", "").split(",")
try:
float_numbers = [
float(num.strip()) / 1000 for num in numbers
] # Normalize to 0-1 range
if len(float_numbers) == 2:
# Single point, duplicate for box format
float_numbers = [
float_numbers[0],
float_numbers[1],
float_numbers[0],
float_numbers[1],
]
action_inputs[param_name.strip()] = str(float_numbers)
except ValueError as e:
# If parsing fails, keep the original parameter value
print(f"Warning: Could not parse coordinates '{param}': {e}")
action_inputs[param_name.strip()] = param
return [
{
"thought": thought,
"action_type": action_type,
"action_inputs": action_inputs,
"text": text,
}
]
def convert_to_computer_actions(
parsed_responses: List[Dict[str, Any]], image_width: int, image_height: int
) -> List[ResponseComputerToolCallParam | ResponseOutputMessageParam]:
"""Convert parsed UITARS responses to computer actions."""
computer_actions = []
for response in parsed_responses:
action_type = response.get("action_type")
action_inputs = response.get("action_inputs", {})
if action_type == "finished":
finished_text = action_inputs.get("content", "Task completed successfully.")
computer_actions.append(make_output_text_item(finished_text))
break
elif action_type == "wait":
computer_actions.append(make_wait_item())
elif action_type == "call_user":
computer_actions.append(
make_output_text_item("I need assistance from the user to proceed with this task.")
)
elif action_type in ["click", "left_single"]:
start_box = action_inputs.get("start_box")
if start_box:
coords = eval(start_box)
x = int((coords[0] + coords[2]) / 2 * image_width)
y = int((coords[1] + coords[3]) / 2 * image_height)
computer_actions.append(make_click_item(x, y, "left"))
elif action_type == "double_click":
start_box = action_inputs.get("start_box")
if start_box:
coords = eval(start_box)
x = int((coords[0] + coords[2]) / 2 * image_width)
y = int((coords[1] + coords[3]) / 2 * image_height)
computer_actions.append(make_double_click_item(x, y))
elif action_type == "right_click":
start_box = action_inputs.get("start_box")
if start_box:
coords = eval(start_box)
x = int((coords[0] + coords[2]) / 2 * image_width)
y = int((coords[1] + coords[3]) / 2 * image_height)
computer_actions.append(make_click_item(x, y, "right"))
elif action_type == "type":
content = action_inputs.get("content", "")
computer_actions.append(make_type_item(content))
elif action_type == "hotkey":
key = action_inputs.get("key", "")
keys = key.split()
computer_actions.append(make_keypress_item(keys))
elif action_type == "press":
key = action_inputs.get("key", "")
computer_actions.append(make_keypress_item([key]))
elif action_type == "scroll":
start_box = action_inputs.get("start_box")
direction = action_inputs.get("direction", "down")
if start_box:
coords = eval(start_box)
x = int((coords[0] + coords[2]) / 2 * image_width)
y = int((coords[1] + coords[3]) / 2 * image_height)
else:
x, y = image_width // 2, image_height // 2
scroll_y = 5 if "up" in direction.lower() else -5
computer_actions.append(make_scroll_item(x, y, 0, scroll_y))
elif action_type == "drag":
start_box = action_inputs.get("start_box")
end_box = action_inputs.get("end_box")
if start_box and end_box:
start_coords = eval(start_box)
end_coords = eval(end_box)
start_x = int((start_coords[0] + start_coords[2]) / 2 * image_width)
start_y = int((start_coords[1] + start_coords[3]) / 2 * image_height)
end_x = int((end_coords[0] + end_coords[2]) / 2 * image_width)
end_y = int((end_coords[1] + end_coords[3]) / 2 * image_height)
path = [{"x": start_x, "y": start_y}, {"x": end_x, "y": end_y}]
computer_actions.append(make_drag_item(path))
return computer_actions
def pil_to_base64(image: Image.Image) -> str:
"""Convert PIL image to base64 string."""
buffer = BytesIO()
image.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode("utf-8")
def process_image_for_uitars(
image_data: str, max_pixels: int = MAX_PIXELS, min_pixels: int = MIN_PIXELS
) -> tuple[Image.Image, int, int]:
"""Process image for UITARS model input."""
# Decode base64 image
if image_data.startswith("data:image"):
image_data = image_data.split(",")[1]
image_bytes = base64.b64decode(image_data)
image = Image.open(BytesIO(image_bytes))
original_width, original_height = image.size
# Resize image according to UITARS requirements
if image.width * image.height > max_pixels:
resize_factor = math.sqrt(max_pixels / (image.width * image.height))
width = int(image.width * resize_factor)
height = int(image.height * resize_factor)
image = image.resize((width, height))
if image.width * image.height < min_pixels:
resize_factor = math.sqrt(min_pixels / (image.width * image.height))
width = math.ceil(image.width * resize_factor)
height = math.ceil(image.height * resize_factor)
image = image.resize((width, height))
if image.mode != "RGB":
image = image.convert("RGB")
return image, original_width, original_height
def sanitize_message(msg: Any) -> Any:
"""Return a copy of the message with image_url ommited within content parts"""
if isinstance(msg, dict):
result = {}
for key, value in msg.items():
if key == "content" and isinstance(value, list):
result[key] = [
(
{k: v for k, v in item.items() if k != "image_url"}
if isinstance(item, dict)
else item
)
for item in value
]
else:
result[key] = value
return result
elif isinstance(msg, list):
return [sanitize_message(item) for item in msg]
else:
return msg
def convert_uitars_messages_to_litellm(messages: Messages) -> List[Dict[str, Any]]:
"""
Convert UITARS internal message format back to LiteLLM format.
This function processes reasoning, computer_call, and computer_call_output messages
and converts them to the appropriate LiteLLM assistant message format.
Args:
messages: List of UITARS internal messages
Returns:
List of LiteLLM formatted messages
"""
litellm_messages = []
current_assistant_content = []
for message in messages:
if isinstance(message, dict):
message_type = message.get("type")
if message_type == "reasoning":
# Extract reasoning text from summary
summary = message.get("summary", [])
if summary and isinstance(summary, list):
for summary_item in summary:
if (
isinstance(summary_item, dict)
and summary_item.get("type") == "summary_text"
):
reasoning_text = summary_item.get("text", "")
if reasoning_text:
current_assistant_content.append(f"Thought: {reasoning_text}")
elif message_type == "computer_call":
# Convert computer action to UITARS action format
action = message.get("action", {})
action_type = action.get("type")
if action_type == "click":
x, y = action.get("x", 0), action.get("y", 0)
button = action.get("button", "left")
if button == "left":
action_text = f"Action: click(start_box='({x},{y})')"
elif button == "right":
action_text = f"Action: right_single(start_box='({x},{y})')"
else:
action_text = f"Action: click(start_box='({x},{y})')"
elif action_type == "double_click":
x, y = action.get("x", 0), action.get("y", 0)
action_text = f"Action: left_double(start_box='({x},{y})')"
elif action_type == "drag":
start_x, start_y = action.get("start_x", 0), action.get("start_y", 0)
end_x, end_y = action.get("end_x", 0), action.get("end_y", 0)
action_text = f"Action: drag(start_box='({start_x},{start_y})', end_box='({end_x},{end_y})')"
elif action_type == "key":
key = action.get("key", "")
action_text = f"Action: hotkey(key='{key}')"
elif action_type == "type":
text = action.get("text", "")
# Escape single quotes in the text
escaped_text = escape_single_quotes(text)
action_text = f"Action: type(content='{escaped_text}')"
elif action_type == "scroll":
x, y = action.get("x", 0), action.get("y", 0)
direction = action.get("direction", "down")
action_text = f"Action: scroll(start_box='({x},{y})', direction='{direction}')"
elif action_type == "wait":
action_text = "Action: wait()"
else:
# Fallback for unknown action types
action_text = f"Action: {action_type}({action})"
current_assistant_content.append(action_text)
# When we hit a computer_call_output, finalize the current assistant message
if current_assistant_content:
litellm_messages.append(
{
"role": "assistant",
"content": [
{"type": "text", "text": "\n".join(current_assistant_content)}
],
}
)
current_assistant_content = []
elif message_type == "computer_call_output":
# Add screenshot from computer call output
output = message.get("output", {})
if isinstance(output, dict) and output.get("type") == "input_image":
image_url = output.get("image_url", "")
if image_url:
litellm_messages.append(
{
"role": "user",
"content": [{"type": "image_url", "image_url": {"url": image_url}}],
}
)
elif message.get("role") == "user":
# # Handle user messages
# content = message.get("content", "")
# if isinstance(content, str):
# litellm_messages.append({
# "role": "user",
# "content": content
# })
# elif isinstance(content, list):
# litellm_messages.append({
# "role": "user",
# "content": content
# })
pass
# Add any remaining assistant content
if current_assistant_content:
litellm_messages.append({"role": "assistant", "content": current_assistant_content})
return litellm_messages
@register_agent(models=r"(?i).*ui-?tars.*", priority=-1)
class UITARSConfig:
"""
UITARS agent configuration using liteLLM for ByteDance-Seed/UI-TARS-1.5-7B model.
Supports UITARS vision-language models for computer control.
"""
async def predict_step(
self,
messages: List[Dict[str, Any]],
model: str,
tools: Optional[List[Dict[str, Any]]] = None,
max_retries: Optional[int] = None,
stream: bool = False,
computer_handler=None,
use_prompt_caching: Optional[bool] = False,
_on_api_start=None,
_on_api_end=None,
_on_usage=None,
_on_screenshot=None,
**kwargs,
) -> Dict[str, Any]:
"""
Predict the next step based on input messages.
Args:
messages: Input messages following Responses format
model: Model name to use
tools: Optional list of tool schemas
max_retries: Maximum number of retries
stream: Whether to stream responses
computer_handler: Computer handler instance
_on_api_start: Callback for API start
_on_api_end: Callback for API end
_on_usage: Callback for usage tracking
_on_screenshot: Callback for screenshot events
**kwargs: Additional arguments
Returns:
Dictionary with "output" (output items) and "usage" array
"""
tools = tools or []
# Create response items
response_items = []
# Find computer tool for screen dimensions
computer_tool = None
for tool_schema in tools:
if tool_schema["type"] == "computer":
computer_tool = tool_schema["computer"]
break
# Get screen dimensions
screen_width, screen_height = 1024, 768
if computer_tool:
try:
screen_width, screen_height = await computer_tool.get_dimensions()
except:
pass
# Process messages to extract instruction and image
instruction = ""
image_data = None
# Convert messages to list if string
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# Extract instruction and latest screenshot
for message in reversed(messages):
if isinstance(message, dict):
content = message.get("content", "")
# Handle different content formats
if isinstance(content, str):
if not instruction and message.get("role") == "user":
instruction = content
elif isinstance(content, list):
for item in content:
if isinstance(item, dict):
if item.get("type") == "text" and not instruction:
instruction = item.get("text", "")
elif item.get("type") == "image_url" and not image_data:
image_url = item.get("image_url", {})
if isinstance(image_url, dict):
image_data = image_url.get("url", "")
else:
image_data = image_url
# Also check for computer_call_output with screenshots
if message.get("type") == "computer_call_output" and not image_data:
output = message.get("output", {})
if isinstance(output, dict) and output.get("type") == "input_image":
image_data = output.get("image_url", "")
if instruction and image_data:
break
if not instruction:
instruction = (
"Help me complete this task by analyzing the screen and taking appropriate actions."
)
# Create prompt
user_prompt = UITARS_PROMPT_TEMPLATE.format(
instruction=instruction, action_space=UITARS_ACTION_SPACE, language="English"
)
# Convert conversation history to LiteLLM format
history_messages = convert_uitars_messages_to_litellm(messages)
# Prepare messages for liteLLM
litellm_messages = [{"role": "system", "content": "You are a helpful assistant."}]
# Add current user instruction with screenshot
current_user_message = {
"role": "user",
"content": [
{"type": "text", "text": user_prompt},
],
}
litellm_messages.append(current_user_message)
# Process image for UITARS
if not image_data:
# Take screenshot if none found in messages
if computer_handler:
image_data = await computer_handler.screenshot()
await _on_screenshot(image_data, "screenshot_before")
# Add screenshot to output items so it can be retained in history
response_items.append(make_input_image_item(image_data))
else:
raise ValueError("No screenshot found in messages and no computer_handler provided")
processed_image, original_width, original_height = process_image_for_uitars(image_data)
encoded_image = pil_to_base64(processed_image)
# Add conversation history
if history_messages:
litellm_messages.extend(history_messages)
else:
litellm_messages.append(
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{encoded_image}"},
}
],
}
)
# Prepare API call kwargs
api_kwargs = {
"model": model,
"messages": litellm_messages,
"max_tokens": kwargs.get("max_tokens", 500),
"temperature": kwargs.get("temperature", 0.0),
"do_sample": kwargs.get("temperature", 0.0) > 0.0,
"num_retries": max_retries,
**{k: v for k, v in kwargs.items() if k not in ["max_tokens", "temperature"]},
}
# Call API start hook
if _on_api_start:
await _on_api_start(api_kwargs)
# Call liteLLM with UITARS model
response = await litellm.acompletion(**api_kwargs)
# Call API end hook
if _on_api_end:
await _on_api_end(api_kwargs, response)
# Extract response content
response_content = response.choices[0].message.content.strip() # type: ignore
# Parse UITARS response
parsed_responses = parse_uitars_response(response_content, original_width, original_height)
# Convert to computer actions
computer_actions = convert_to_computer_actions(
parsed_responses, original_width, original_height
)
# Add computer actions to response items
thought = parsed_responses[0].get("thought", "")
if thought:
response_items.append(make_reasoning_item(thought))
response_items.extend(computer_actions)
# Extract usage information
response_usage = {
**LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage(
response.usage
).model_dump(),
"response_cost": response._hidden_params.get("response_cost", 0.0),
}
if _on_usage:
await _on_usage(response_usage)
# Create agent response
agent_response = {"output": response_items, "usage": response_usage}
return agent_response
async def predict_click(
self, model: str, image_b64: str, instruction: str, **kwargs
) -> Optional[Tuple[int, int]]:
"""
Predict click coordinates based on image and instruction.
UITARS supports click prediction through its action parsing.
Args:
model: Model name to use
image_b64: Base64 encoded image
instruction: Instruction for where to click
Returns:
Tuple with (x, y) coordinates or None
"""
try:
# Create prompt using grounding template
user_prompt = GROUNDING_UITARS_PROMPT_TEMPLATE.format(instruction=instruction)
# Process image for UITARS
processed_image, original_width, original_height = process_image_for_uitars(image_b64)
encoded_image = pil_to_base64(processed_image)
# Prepare messages for liteLLM
litellm_messages = [
{"role": "system", "content": "You are a helpful assistant."},
{
"role": "user",
"content": [
{"type": "text", "text": user_prompt},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{encoded_image}"},
},
],
},
]
# Prepare API call kwargs
api_kwargs = {
"model": model,
"messages": litellm_messages,
"max_tokens": 2056,
"temperature": 0.0,
"do_sample": False,
}
api_kwargs.update({k: v for k, v in (kwargs or {}).items()})
# Call liteLLM with UITARS model
response = await litellm.acompletion(**api_kwargs)
# Extract response content
response_content = response.choices[0].message.content.strip() # type: ignore
print(response_content)
# Parse the response to extract click coordinates
# Look for click action with coordinates (with special tokens)
click_pattern = r"click\(point='<\|box_start\|>\((\d+),(\d+)\)<\|box_end\|>'\)"
match = re.search(click_pattern, response_content)
# Fallback: Look for simpler format without special tokens
if not match:
# Pattern for: click(start_box='(x,y)') or click(point='(x,y)')
fallback_pattern = r"click\((?:start_box|point)='\((\d+),(\d+)\)'\)"
match = re.search(fallback_pattern, response_content)
if match:
x, y = int(match.group(1)), int(match.group(2))
# Scale coordinates back to original image dimensions
scale_x = original_width / processed_image.width
scale_y = original_height / processed_image.height
scaled_x = int(x * scale_x)
scaled_y = int(y * scale_y)
return (scaled_x, scaled_y)
return None
except Exception as e:
# Log error and return None
print(f"Error in predict_click: {e}")
return None
def get_capabilities(self) -> List[AgentCapability]:
"""
Get list of capabilities supported by this agent config.
Returns:
List of capability strings
"""
return ["step", "click"]
```