This is page 16 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── scripts
│ │ ├── get_pyproject_version.py
│ │ └── tests
│ │ ├── __init__.py
│ │ ├── README.md
│ │ └── test_get_pyproject_version.py
│ └── workflows
│ ├── bump-version.yml
│ ├── ci-lume.yml
│ ├── docker-publish-cua-linux.yml
│ ├── docker-publish-cua-windows.yml
│ ├── docker-publish-kasm.yml
│ ├── docker-publish-xfce.yml
│ ├── docker-reusable-publish.yml
│ ├── link-check.yml
│ ├── lint.yml
│ ├── npm-publish-cli.yml
│ ├── npm-publish-computer.yml
│ ├── npm-publish-core.yml
│ ├── publish-lume.yml
│ ├── pypi-publish-agent.yml
│ ├── pypi-publish-computer-server.yml
│ ├── pypi-publish-computer.yml
│ ├── pypi-publish-core.yml
│ ├── pypi-publish-mcp-server.yml
│ ├── pypi-publish-som.yml
│ ├── pypi-reusable-publish.yml
│ ├── python-tests.yml
│ ├── test-cua-models.yml
│ └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│ ├── docs.code-workspace
│ ├── extensions.json
│ ├── launch.json
│ ├── libs-ts.code-workspace
│ ├── lume.code-workspace
│ ├── lumier.code-workspace
│ ├── py.code-workspace
│ └── settings.json
├── blog
│ ├── app-use.md
│ ├── assets
│ │ ├── composite-agents.png
│ │ ├── docker-ubuntu-support.png
│ │ ├── hack-booth.png
│ │ ├── hack-closing-ceremony.jpg
│ │ ├── hack-cua-ollama-hud.jpeg
│ │ ├── hack-leaderboard.png
│ │ ├── hack-the-north.png
│ │ ├── hack-winners.jpeg
│ │ ├── hack-workshop.jpeg
│ │ ├── hud-agent-evals.png
│ │ └── trajectory-viewer.jpeg
│ ├── bringing-computer-use-to-the-web.md
│ ├── build-your-own-operator-on-macos-1.md
│ ├── build-your-own-operator-on-macos-2.md
│ ├── cloud-windows-ga-macos-preview.md
│ ├── composite-agents.md
│ ├── computer-use-agents-for-growth-hacking.md
│ ├── cua-hackathon.md
│ ├── cua-playground-preview.md
│ ├── cua-vlm-router.md
│ ├── hack-the-north.md
│ ├── hud-agent-evals.md
│ ├── human-in-the-loop.md
│ ├── introducing-cua-cli.md
│ ├── introducing-cua-cloud-containers.md
│ ├── lume-to-containerization.md
│ ├── neurips-2025-cua-papers.md
│ ├── sandboxed-python-execution.md
│ ├── training-computer-use-models-trajectories-1.md
│ ├── trajectory-viewer.md
│ ├── ubuntu-docker-support.md
│ └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│ ├── .env.example
│ ├── .gitignore
│ ├── content
│ │ └── docs
│ │ ├── agent-sdk
│ │ │ ├── agent-loops.mdx
│ │ │ ├── benchmarks
│ │ │ │ ├── index.mdx
│ │ │ │ ├── interactive.mdx
│ │ │ │ ├── introduction.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── osworld-verified.mdx
│ │ │ │ ├── screenspot-pro.mdx
│ │ │ │ └── screenspot-v2.mdx
│ │ │ ├── callbacks
│ │ │ │ ├── agent-lifecycle.mdx
│ │ │ │ ├── cost-saving.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── logging.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── pii-anonymization.mdx
│ │ │ │ └── trajectories.mdx
│ │ │ ├── chat-history.mdx
│ │ │ ├── custom-tools.mdx
│ │ │ ├── customizing-computeragent.mdx
│ │ │ ├── integrations
│ │ │ │ ├── hud.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── observability.mdx
│ │ │ ├── mcp-server
│ │ │ │ ├── client-integrations.mdx
│ │ │ │ ├── configuration.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── llm-integrations.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── tools.mdx
│ │ │ │ └── usage.mdx
│ │ │ ├── message-format.mdx
│ │ │ ├── meta.json
│ │ │ ├── migration-guide.mdx
│ │ │ ├── prompt-caching.mdx
│ │ │ ├── supported-agents
│ │ │ │ ├── composed-agents.mdx
│ │ │ │ ├── computer-use-agents.mdx
│ │ │ │ ├── grounding-models.mdx
│ │ │ │ ├── human-in-the-loop.mdx
│ │ │ │ └── meta.json
│ │ │ ├── supported-model-providers
│ │ │ │ ├── cua-vlm-router.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ └── local-models.mdx
│ │ │ ├── telemetry.mdx
│ │ │ └── usage-tracking.mdx
│ │ ├── cli-playbook
│ │ │ ├── commands.mdx
│ │ │ ├── index.mdx
│ │ │ └── meta.json
│ │ ├── computer-sdk
│ │ │ ├── cloud-vm-management.mdx
│ │ │ ├── commands.mdx
│ │ │ ├── computer-server
│ │ │ │ ├── Commands.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── REST-API.mdx
│ │ │ │ └── WebSocket-API.mdx
│ │ │ ├── computer-ui.mdx
│ │ │ ├── computers.mdx
│ │ │ ├── custom-computer-handlers.mdx
│ │ │ ├── meta.json
│ │ │ ├── sandboxed-python.mdx
│ │ │ └── tracing-api.mdx
│ │ ├── example-usecases
│ │ │ ├── form-filling.mdx
│ │ │ ├── gemini-complex-ui-navigation.mdx
│ │ │ ├── meta.json
│ │ │ ├── post-event-contact-export.mdx
│ │ │ └── windows-app-behind-vpn.mdx
│ │ ├── get-started
│ │ │ ├── meta.json
│ │ │ └── quickstart.mdx
│ │ ├── index.mdx
│ │ ├── macos-vm-cli-playbook
│ │ │ ├── lume
│ │ │ │ ├── cli-reference.mdx
│ │ │ │ ├── faq.md
│ │ │ │ ├── http-api.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── prebuilt-images.mdx
│ │ │ ├── lumier
│ │ │ │ ├── building-lumier.mdx
│ │ │ │ ├── docker-compose.mdx
│ │ │ │ ├── docker.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ └── meta.json
│ │ │ └── meta.json
│ │ └── meta.json
│ ├── next.config.mjs
│ ├── package-lock.json
│ ├── package.json
│ ├── pnpm-lock.yaml
│ ├── postcss.config.mjs
│ ├── public
│ │ └── img
│ │ ├── agent_gradio_ui.png
│ │ ├── agent.png
│ │ ├── bg-dark.jpg
│ │ ├── bg-light.jpg
│ │ ├── cli.png
│ │ ├── computer.png
│ │ ├── grounding-with-gemini3.gif
│ │ ├── hero.png
│ │ ├── laminar_trace_example.png
│ │ ├── som_box_threshold.png
│ │ └── som_iou_threshold.png
│ ├── README.md
│ ├── source.config.ts
│ ├── src
│ │ ├── app
│ │ │ ├── (home)
│ │ │ │ ├── [[...slug]]
│ │ │ │ │ └── page.tsx
│ │ │ │ └── layout.tsx
│ │ │ ├── api
│ │ │ │ ├── posthog
│ │ │ │ │ └── [...path]
│ │ │ │ │ └── route.ts
│ │ │ │ └── search
│ │ │ │ └── route.ts
│ │ │ ├── favicon.ico
│ │ │ ├── global.css
│ │ │ ├── layout.config.tsx
│ │ │ ├── layout.tsx
│ │ │ ├── llms.mdx
│ │ │ │ └── [[...slug]]
│ │ │ │ └── route.ts
│ │ │ ├── llms.txt
│ │ │ │ └── route.ts
│ │ │ ├── robots.ts
│ │ │ └── sitemap.ts
│ │ ├── assets
│ │ │ ├── discord-black.svg
│ │ │ ├── discord-white.svg
│ │ │ ├── logo-black.svg
│ │ │ └── logo-white.svg
│ │ ├── components
│ │ │ ├── analytics-tracker.tsx
│ │ │ ├── cookie-consent.tsx
│ │ │ ├── doc-actions-menu.tsx
│ │ │ ├── editable-code-block.tsx
│ │ │ ├── footer.tsx
│ │ │ ├── hero.tsx
│ │ │ ├── iou.tsx
│ │ │ ├── mermaid.tsx
│ │ │ └── page-feedback.tsx
│ │ ├── lib
│ │ │ ├── llms.ts
│ │ │ └── source.ts
│ │ ├── mdx-components.tsx
│ │ └── providers
│ │ └── posthog-provider.tsx
│ └── tsconfig.json
├── examples
│ ├── agent_examples.py
│ ├── agent_ui_examples.py
│ ├── browser_tool_example.py
│ ├── cloud_api_examples.py
│ ├── computer_examples_windows.py
│ ├── computer_examples.py
│ ├── computer_ui_examples.py
│ ├── computer-example-ts
│ │ ├── .env.example
│ │ ├── .gitignore
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── README.md
│ │ ├── src
│ │ │ ├── helpers.ts
│ │ │ └── index.ts
│ │ └── tsconfig.json
│ ├── docker_examples.py
│ ├── evals
│ │ ├── hud_eval_examples.py
│ │ └── wikipedia_most_linked.txt
│ ├── pylume_examples.py
│ ├── sandboxed_functions_examples.py
│ ├── som_examples.py
│ ├── tracing_examples.py
│ ├── utils.py
│ └── winsandbox_example.py
├── img
│ ├── agent_gradio_ui.png
│ ├── agent.png
│ ├── cli.png
│ ├── computer.png
│ ├── logo_black.png
│ └── logo_white.png
├── libs
│ ├── kasm
│ │ ├── Dockerfile
│ │ ├── LICENSE
│ │ ├── README.md
│ │ └── src
│ │ └── ubuntu
│ │ └── install
│ │ └── firefox
│ │ ├── custom_startup.sh
│ │ ├── firefox.desktop
│ │ └── install_firefox.sh
│ ├── lume
│ │ ├── .cursorignore
│ │ ├── CONTRIBUTING.md
│ │ ├── Development.md
│ │ ├── img
│ │ │ └── cli.png
│ │ ├── Package.resolved
│ │ ├── Package.swift
│ │ ├── README.md
│ │ ├── resources
│ │ │ └── lume.entitlements
│ │ ├── scripts
│ │ │ ├── build
│ │ │ │ ├── build-debug.sh
│ │ │ │ ├── build-release-notarized.sh
│ │ │ │ └── build-release.sh
│ │ │ └── install.sh
│ │ ├── src
│ │ │ ├── Commands
│ │ │ │ ├── Clone.swift
│ │ │ │ ├── Config.swift
│ │ │ │ ├── Create.swift
│ │ │ │ ├── Delete.swift
│ │ │ │ ├── Get.swift
│ │ │ │ ├── Images.swift
│ │ │ │ ├── IPSW.swift
│ │ │ │ ├── List.swift
│ │ │ │ ├── Logs.swift
│ │ │ │ ├── Options
│ │ │ │ │ └── FormatOption.swift
│ │ │ │ ├── Prune.swift
│ │ │ │ ├── Pull.swift
│ │ │ │ ├── Push.swift
│ │ │ │ ├── Run.swift
│ │ │ │ ├── Serve.swift
│ │ │ │ ├── Set.swift
│ │ │ │ └── Stop.swift
│ │ │ ├── ContainerRegistry
│ │ │ │ ├── ImageContainerRegistry.swift
│ │ │ │ ├── ImageList.swift
│ │ │ │ └── ImagesPrinter.swift
│ │ │ ├── Errors
│ │ │ │ └── Errors.swift
│ │ │ ├── FileSystem
│ │ │ │ ├── Home.swift
│ │ │ │ ├── Settings.swift
│ │ │ │ ├── VMConfig.swift
│ │ │ │ ├── VMDirectory.swift
│ │ │ │ └── VMLocation.swift
│ │ │ ├── LumeController.swift
│ │ │ ├── Main.swift
│ │ │ ├── Server
│ │ │ │ ├── Handlers.swift
│ │ │ │ ├── HTTP.swift
│ │ │ │ ├── Requests.swift
│ │ │ │ ├── Responses.swift
│ │ │ │ └── Server.swift
│ │ │ ├── Utils
│ │ │ │ ├── CommandRegistry.swift
│ │ │ │ ├── CommandUtils.swift
│ │ │ │ ├── Logger.swift
│ │ │ │ ├── NetworkUtils.swift
│ │ │ │ ├── Path.swift
│ │ │ │ ├── ProcessRunner.swift
│ │ │ │ ├── ProgressLogger.swift
│ │ │ │ ├── String.swift
│ │ │ │ └── Utils.swift
│ │ │ ├── Virtualization
│ │ │ │ ├── DarwinImageLoader.swift
│ │ │ │ ├── DHCPLeaseParser.swift
│ │ │ │ ├── ImageLoaderFactory.swift
│ │ │ │ └── VMVirtualizationService.swift
│ │ │ ├── VM
│ │ │ │ ├── DarwinVM.swift
│ │ │ │ ├── LinuxVM.swift
│ │ │ │ ├── VM.swift
│ │ │ │ ├── VMDetails.swift
│ │ │ │ ├── VMDetailsPrinter.swift
│ │ │ │ ├── VMDisplayResolution.swift
│ │ │ │ └── VMFactory.swift
│ │ │ └── VNC
│ │ │ ├── PassphraseGenerator.swift
│ │ │ └── VNCService.swift
│ │ └── tests
│ │ ├── Mocks
│ │ │ ├── MockVM.swift
│ │ │ ├── MockVMVirtualizationService.swift
│ │ │ └── MockVNCService.swift
│ │ ├── VM
│ │ │ └── VMDetailsPrinterTests.swift
│ │ ├── VMTests.swift
│ │ ├── VMVirtualizationServiceTests.swift
│ │ └── VNCServiceTests.swift
│ ├── lumier
│ │ ├── .dockerignore
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── bin
│ │ │ └── entry.sh
│ │ ├── config
│ │ │ └── constants.sh
│ │ ├── hooks
│ │ │ └── on-logon.sh
│ │ └── lib
│ │ ├── utils.sh
│ │ └── vm.sh
│ ├── python
│ │ ├── agent
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── agent
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── adapters
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── cua_adapter.py
│ │ │ │ │ ├── huggingfacelocal_adapter.py
│ │ │ │ │ ├── human_adapter.py
│ │ │ │ │ ├── mlxvlm_adapter.py
│ │ │ │ │ └── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ └── qwen2_5_vl.py
│ │ │ │ ├── agent.py
│ │ │ │ ├── callbacks
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── budget_manager.py
│ │ │ │ │ ├── image_retention.py
│ │ │ │ │ ├── logging.py
│ │ │ │ │ ├── operator_validator.py
│ │ │ │ │ ├── pii_anonymization.py
│ │ │ │ │ ├── prompt_instructions.py
│ │ │ │ │ ├── telemetry.py
│ │ │ │ │ └── trajectory_saver.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── computers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cua.py
│ │ │ │ │ └── custom.py
│ │ │ │ ├── decorators.py
│ │ │ │ ├── human_tool
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ ├── server.py
│ │ │ │ │ └── ui.py
│ │ │ │ ├── integrations
│ │ │ │ │ └── hud
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── agent.py
│ │ │ │ │ └── proxy.py
│ │ │ │ ├── loops
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── anthropic.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── composed_grounded.py
│ │ │ │ │ ├── gelato.py
│ │ │ │ │ ├── gemini.py
│ │ │ │ │ ├── generic_vlm.py
│ │ │ │ │ ├── glm45v.py
│ │ │ │ │ ├── gta1.py
│ │ │ │ │ ├── holo.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── model_types.csv
│ │ │ │ │ ├── moondream3.py
│ │ │ │ │ ├── omniparser.py
│ │ │ │ │ ├── openai.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ ├── uiins.py
│ │ │ │ │ ├── uitars.py
│ │ │ │ │ └── uitars2.py
│ │ │ │ ├── proxy
│ │ │ │ │ ├── examples.py
│ │ │ │ │ └── handlers.py
│ │ │ │ ├── responses.py
│ │ │ │ ├── tools
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── browser_tool.py
│ │ │ │ ├── types.py
│ │ │ │ └── ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ └── gradio
│ │ │ │ ├── __init__.py
│ │ │ │ ├── app.py
│ │ │ │ └── ui_components.py
│ │ │ ├── benchmarks
│ │ │ │ ├── .gitignore
│ │ │ │ ├── contrib.md
│ │ │ │ ├── interactive.py
│ │ │ │ ├── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ └── gta1.py
│ │ │ │ ├── README.md
│ │ │ │ ├── ss-pro.py
│ │ │ │ ├── ss-v2.py
│ │ │ │ └── utils.py
│ │ │ ├── example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer_agent.py
│ │ ├── bench-ui
│ │ │ ├── bench_ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api.py
│ │ │ │ └── child.py
│ │ │ ├── examples
│ │ │ │ ├── folder_example.py
│ │ │ │ ├── gui
│ │ │ │ │ ├── index.html
│ │ │ │ │ ├── logo.svg
│ │ │ │ │ └── styles.css
│ │ │ │ ├── output_overlay.png
│ │ │ │ └── simple_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ └── test_port_detection.py
│ │ ├── computer
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer
│ │ │ │ ├── __init__.py
│ │ │ │ ├── computer.py
│ │ │ │ ├── diorama_computer.py
│ │ │ │ ├── helpers.py
│ │ │ │ ├── interface
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ ├── models.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── logger.py
│ │ │ │ ├── models.py
│ │ │ │ ├── providers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cloud
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── docker
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── lume
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── lume_api.py
│ │ │ │ │ ├── lumier
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── types.py
│ │ │ │ │ └── winsandbox
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── provider.py
│ │ │ │ │ └── setup_script.ps1
│ │ │ │ ├── tracing_wrapper.py
│ │ │ │ ├── tracing.py
│ │ │ │ ├── ui
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ └── gradio
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── app.py
│ │ │ │ └── utils.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer.py
│ │ ├── computer-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── browser.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── diorama
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── diorama_computer.py
│ │ │ │ │ ├── diorama.py
│ │ │ │ │ ├── draw.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── safezone.py
│ │ │ │ ├── handlers
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── main.py
│ │ │ │ ├── server.py
│ │ │ │ ├── utils
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── wallpaper.py
│ │ │ │ └── watchdog.py
│ │ │ ├── examples
│ │ │ │ ├── __init__.py
│ │ │ │ └── usage_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ ├── run_server.py
│ │ │ ├── test_connection.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_server.py
│ │ ├── core
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── core
│ │ │ │ ├── __init__.py
│ │ │ │ └── telemetry
│ │ │ │ ├── __init__.py
│ │ │ │ └── posthog.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_telemetry.py
│ │ ├── mcp-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── build-extension.py
│ │ │ ├── CONCURRENT_SESSIONS.md
│ │ │ ├── desktop-extension
│ │ │ │ ├── cua-extension.mcpb
│ │ │ │ ├── desktop_extension.png
│ │ │ │ ├── manifest.json
│ │ │ │ ├── README.md
│ │ │ │ ├── requirements.txt
│ │ │ │ ├── run_server.sh
│ │ │ │ └── setup.py
│ │ │ ├── mcp_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── server.py
│ │ │ │ └── session_manager.py
│ │ │ ├── pdm.lock
│ │ │ ├── pyproject.toml
│ │ │ ├── QUICK_TEST_COMMANDS.sh
│ │ │ ├── quick_test_local_option.py
│ │ │ ├── README.md
│ │ │ ├── scripts
│ │ │ │ ├── install_mcp_server.sh
│ │ │ │ └── start_mcp_server.sh
│ │ │ ├── test_mcp_server_local_option.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_mcp_server.py
│ │ ├── pylume
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_pylume.py
│ │ └── som
│ │ ├── .bumpversion.cfg
│ │ ├── LICENSE
│ │ ├── poetry.toml
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ ├── som
│ │ │ ├── __init__.py
│ │ │ ├── detect.py
│ │ │ ├── detection.py
│ │ │ ├── models.py
│ │ │ ├── ocr.py
│ │ │ ├── util
│ │ │ │ └── utils.py
│ │ │ └── visualization.py
│ │ └── tests
│ │ ├── conftest.py
│ │ └── test_omniparser.py
│ ├── qemu-docker
│ │ ├── linux
│ │ │ ├── Dockerfile
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ ├── entry.sh
│ │ │ └── vm
│ │ │ ├── image
│ │ │ │ └── README.md
│ │ │ └── setup
│ │ │ ├── install.sh
│ │ │ ├── setup-cua-server.sh
│ │ │ └── setup.sh
│ │ ├── README.md
│ │ └── windows
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── entry.sh
│ │ └── vm
│ │ ├── image
│ │ │ └── README.md
│ │ └── setup
│ │ ├── install.bat
│ │ ├── on-logon.ps1
│ │ ├── setup-cua-server.ps1
│ │ ├── setup-utils.psm1
│ │ └── setup.ps1
│ ├── typescript
│ │ ├── .gitignore
│ │ ├── .nvmrc
│ │ ├── agent
│ │ │ ├── examples
│ │ │ │ ├── playground-example.html
│ │ │ │ └── README.md
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── client.ts
│ │ │ │ ├── index.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ └── client.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── computer
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── computer
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── providers
│ │ │ │ │ │ ├── base.ts
│ │ │ │ │ │ ├── cloud.ts
│ │ │ │ │ │ └── index.ts
│ │ │ │ │ └── types.ts
│ │ │ │ ├── index.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── base.ts
│ │ │ │ │ ├── factory.ts
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── linux.ts
│ │ │ │ │ ├── macos.ts
│ │ │ │ │ └── windows.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ ├── computer
│ │ │ │ │ └── cloud.test.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── factory.test.ts
│ │ │ │ │ ├── index.test.ts
│ │ │ │ │ ├── linux.test.ts
│ │ │ │ │ ├── macos.test.ts
│ │ │ │ │ └── windows.test.ts
│ │ │ │ └── setup.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── core
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── index.ts
│ │ │ │ └── telemetry
│ │ │ │ ├── clients
│ │ │ │ │ ├── index.ts
│ │ │ │ │ └── posthog.ts
│ │ │ │ └── index.ts
│ │ │ ├── tests
│ │ │ │ └── telemetry.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── cua-cli
│ │ │ ├── .gitignore
│ │ │ ├── .prettierrc
│ │ │ ├── bun.lock
│ │ │ ├── CLAUDE.md
│ │ │ ├── index.ts
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── auth.ts
│ │ │ │ ├── cli.ts
│ │ │ │ ├── commands
│ │ │ │ │ ├── auth.ts
│ │ │ │ │ └── sandbox.ts
│ │ │ │ ├── config.ts
│ │ │ │ ├── http.ts
│ │ │ │ ├── storage.ts
│ │ │ │ └── util.ts
│ │ │ └── tsconfig.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── pnpm-workspace.yaml
│ │ └── README.md
│ └── xfce
│ ├── .dockerignore
│ ├── .gitignore
│ ├── Development.md
│ ├── Dockerfile
│ ├── Dockerfile.dev
│ ├── README.md
│ └── src
│ ├── scripts
│ │ ├── resize-display.sh
│ │ ├── start-computer-server.sh
│ │ ├── start-novnc.sh
│ │ ├── start-vnc.sh
│ │ └── xstartup.sh
│ ├── supervisor
│ │ └── supervisord.conf
│ └── xfce-config
│ ├── helpers.rc
│ ├── xfce4-power-manager.xml
│ └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│ ├── agent_nb.ipynb
│ ├── blog
│ │ ├── build-your-own-operator-on-macos-1.ipynb
│ │ └── build-your-own-operator-on-macos-2.ipynb
│ ├── composite_agents_docker_nb.ipynb
│ ├── computer_nb.ipynb
│ ├── computer_server_nb.ipynb
│ ├── customizing_computeragent.ipynb
│ ├── eval_osworld.ipynb
│ ├── ollama_nb.ipynb
│ ├── README.md
│ ├── sota_hackathon_cloud.ipynb
│ └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── install-cli.ps1
│ ├── install-cli.sh
│ ├── playground-docker.sh
│ ├── playground.sh
│ ├── run-docker-dev.sh
│ └── typescript-typecheck.js
├── TESTING.md
├── tests
│ ├── agent_loop_testing
│ │ ├── agent_test.py
│ │ └── README.md
│ ├── pytest.ini
│ ├── shell_cmd.py
│ ├── test_files.py
│ ├── test_mcp_server_session_management.py
│ ├── test_mcp_server_streaming.py
│ ├── test_shell_bash.py
│ ├── test_telemetry.py
│ ├── test_tracing.py
│ ├── test_venv.py
│ └── test_watchdog.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/libs/python/agent/agent/human_tool/ui.py:
--------------------------------------------------------------------------------
```python
import base64
import io
import json
import time
from datetime import datetime
from typing import Any, Dict, List, Optional
import gradio as gr
import requests
from PIL import Image
from .server import completion_queue
class HumanCompletionUI:
def __init__(self, server_url: str = "http://localhost:8002"):
self.server_url = server_url
self.current_call_id: Optional[str] = None
self.refresh_interval = 2.0 # seconds
self.last_image = None # Store the last image for display
# Track current interactive action controls
self.current_action_type: str = "click"
self.current_button: str = "left"
self.current_scroll_x: int = 0
self.current_scroll_y: int = -120
def format_messages_for_chatbot(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format messages for display in gr.Chatbot with type='messages'."""
formatted = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
tool_calls = msg.get("tool_calls", [])
# Handle different content formats
if isinstance(content, list):
# Multi-modal content - can include text and images
formatted_content = []
for item in content:
if item.get("type") == "text":
text = item.get("text", "")
if text.strip(): # Only add non-empty text
formatted_content.append(text)
elif item.get("type") == "image_url":
image_url = item.get("image_url", {}).get("url", "")
if image_url:
# Check if it's a base64 image or URL
if image_url.startswith("data:image"):
# For base64 images, decode and create gr.Image
try:
header, data = image_url.split(",", 1)
image_data = base64.b64decode(data)
image = Image.open(io.BytesIO(image_data))
formatted_content.append(gr.Image(value=image))
except Exception as e:
print(f"Error loading image: {e}")
formatted_content.append(f"[Image loading error: {e}]")
else:
# For URL images, create gr.Image with URL
formatted_content.append(gr.Image(value=image_url))
# Determine final content format
if len(formatted_content) == 1:
content = formatted_content[0]
elif len(formatted_content) > 1:
content = formatted_content
else:
content = "[Empty content]"
# Ensure role is valid for Gradio Chatbot
if role not in ["user", "assistant"]:
role = "assistant" if role == "system" else "user"
# Invert roles for better display in human UI context
# (what the AI says becomes "user", what human should respond becomes "assistant")
if role == "user":
role = "assistant"
else:
role = "user"
# Add the main message if it has content
if content and str(content).strip():
formatted.append({"role": role, "content": content})
# Handle tool calls - create separate messages for each tool call
if tool_calls:
for tool_call in tool_calls:
function_name = tool_call.get("function", {}).get("name", "unknown")
arguments_str = tool_call.get("function", {}).get("arguments", "{}")
try:
# Parse arguments to format them nicely
arguments = json.loads(arguments_str)
formatted_args = json.dumps(arguments, indent=2)
except json.JSONDecodeError:
# If parsing fails, use the raw string
formatted_args = arguments_str
# Create a formatted message for the tool call
tool_call_content = f"```json\n{formatted_args}\n```"
formatted.append(
{
"role": role,
"content": tool_call_content,
"metadata": {"title": f"🛠️ Used {function_name}"},
}
)
return formatted
def get_pending_calls(self) -> List[Dict[str, Any]]:
"""Get pending calls from the server."""
try:
response = requests.get(f"{self.server_url}/pending", timeout=5)
if response.status_code == 200:
return response.json().get("pending_calls", [])
except Exception as e:
print(f"Error fetching pending calls: {e}")
return []
def complete_call_with_response(self, call_id: str, response: str) -> bool:
"""Complete a call with a text response."""
try:
response_data = {"response": response}
response_obj = requests.post(
f"{self.server_url}/complete/{call_id}", json=response_data, timeout=10
)
response_obj.raise_for_status()
return True
except requests.RequestException as e:
print(f"Error completing call: {e}")
return False
def complete_call_with_tool_calls(self, call_id: str, tool_calls: List[Dict[str, Any]]) -> bool:
"""Complete a call with tool calls."""
try:
response_data = {"tool_calls": tool_calls}
response_obj = requests.post(
f"{self.server_url}/complete/{call_id}", json=response_data, timeout=10
)
response_obj.raise_for_status()
return True
except requests.RequestException as e:
print(f"Error completing call: {e}")
return False
def complete_call(
self,
call_id: str,
response: Optional[str] = None,
tool_calls: Optional[List[Dict[str, Any]]] = None,
) -> bool:
"""Complete a call with either a response or tool calls."""
try:
response_data = {}
if response:
response_data["response"] = response
if tool_calls:
response_data["tool_calls"] = tool_calls
response_obj = requests.post(
f"{self.server_url}/complete/{call_id}", json=response_data, timeout=10
)
response_obj.raise_for_status()
return True
except requests.RequestException as e:
print(f"Error completing call: {e}")
return False
def get_last_image_from_messages(self, messages: List[Dict[str, Any]]) -> Optional[Any]:
"""Extract the last image from the messages for display above conversation."""
last_image = None
for msg in reversed(messages): # Start from the last message
content = msg.get("content", "")
if isinstance(content, list):
for item in reversed(content): # Get the last image in the message
if item.get("type") == "image_url":
image_url = item.get("image_url", {}).get("url", "")
if image_url:
if image_url.startswith("data:image"):
# For base64 images, create a gr.Image component
try:
header, data = image_url.split(",", 1)
image_data = base64.b64decode(data)
image = Image.open(io.BytesIO(image_data))
return image
except Exception as e:
print(f"Error loading image: {e}")
continue
else:
# For URL images, return the URL
return image_url
return last_image
def refresh_pending_calls(self):
"""Refresh the list of pending calls."""
pending_calls = self.get_pending_calls()
if not pending_calls:
return (
gr.update(choices=["latest"], value="latest"), # dropdown
gr.update(value=None), # image (no image)
gr.update(value=[]), # chatbot (empty messages)
gr.update(interactive=False), # submit button
gr.update(visible=False), # click_actions_group hidden
gr.update(visible=False), # actions_group hidden
)
# Sort pending calls by created_at to get oldest first
sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", ""))
# Create choices for dropdown
choices = [("latest", "latest")] # Add "latest" option first
for call in sorted_calls:
call_id = call["id"]
model = call.get("model", "unknown")
created_at = call.get("created_at", "")
# Format timestamp
try:
dt = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
time_str = dt.strftime("%H:%M:%S")
except:
time_str = created_at
choice_label = f"{call_id[:8]}... ({model}) - {time_str}"
choices.append((choice_label, call_id))
# Default to "latest" which shows the oldest pending conversation
selected_call_id = "latest"
if selected_call_id == "latest" and sorted_calls:
# Use the oldest call (first in sorted list)
selected_call = sorted_calls[0]
conversation = self.format_messages_for_chatbot(selected_call.get("messages", []))
self.current_call_id = selected_call["id"]
# Get the last image from messages
self.last_image = self.get_last_image_from_messages(selected_call.get("messages", []))
else:
conversation = []
self.current_call_id = None
self.last_image = None
return (
gr.update(choices=choices, value="latest"),
gr.update(value=self.last_image),
gr.update(value=conversation),
gr.update(interactive=bool(choices)),
gr.update(visible=True), # click_actions_group visible when there is a call
gr.update(visible=True), # actions_group visible when there is a call
)
def on_call_selected(self, selected_choice):
"""Handle when a call is selected from the dropdown."""
if not selected_choice:
return (
gr.update(value=None), # no image
gr.update(value=[]), # empty chatbot
gr.update(interactive=False),
gr.update(visible=False), # click_actions_group hidden
gr.update(visible=False), # actions_group hidden
)
pending_calls = self.get_pending_calls()
if not pending_calls:
return (
gr.update(value=None), # no image
gr.update(value=[]), # empty chatbot
gr.update(interactive=False),
gr.update(visible=False), # click_actions_group hidden
gr.update(visible=False), # actions_group hidden
)
# Handle "latest" option
if selected_choice == "latest":
# Sort calls by created_at to get oldest first
sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", ""))
selected_call = sorted_calls[0] # Get the oldest call
call_id = selected_call["id"]
else:
# Extract call_id from the choice for specific calls
call_id = None
for call in pending_calls:
call_id_short = call["id"][:8]
if call_id_short in selected_choice:
call_id = call["id"]
break
if not call_id:
return (
gr.update(value=None), # no image
gr.update(value=[]), # empty chatbot
gr.update(interactive=False),
)
# Find the selected call
selected_call = next((c for c in pending_calls if c["id"] == call_id), None)
if not selected_call:
return (
gr.update(value=None), # no image
gr.update(value=[]), # empty chatbot
gr.update(interactive=False),
gr.update(visible=False), # click_actions_group hidden
gr.update(visible=False), # actions_group hidden
)
conversation = self.format_messages_for_chatbot(selected_call.get("messages", []))
self.current_call_id = call_id
# Get the last image from messages
self.last_image = self.get_last_image_from_messages(selected_call.get("messages", []))
return (
gr.update(value=self.last_image),
gr.update(value=conversation),
gr.update(interactive=True),
gr.update(visible=True), # click_actions_group visible
gr.update(visible=True), # actions_group visible
)
def submit_response(self, response_text: str):
"""Submit a text response to the current call."""
if not self.current_call_id:
return (
gr.update(value=response_text), # keep response text
gr.update(value="❌ No call selected"), # status
)
if not response_text.strip():
return (
gr.update(value=response_text), # keep response text
gr.update(value="❌ Response cannot be empty"), # status
)
success = self.complete_call_with_response(self.current_call_id, response_text)
if success:
status_msg = "✅ Response submitted successfully!"
return (
gr.update(value=""), # clear response text
gr.update(value=status_msg), # status
)
else:
return (
gr.update(value=response_text), # keep response text
gr.update(value="❌ Failed to submit response"), # status
)
def submit_action(self, action_type: str, **kwargs) -> str:
"""Submit a computer action as a tool call."""
if not self.current_call_id:
return "❌ No call selected"
import uuid
# Create tool call structure
action_data = {"type": action_type, **kwargs}
tool_call = {
"id": f"call_{uuid.uuid4().hex[:24]}",
"type": "function",
"function": {"name": "computer", "arguments": json.dumps(action_data)},
}
success = self.complete_call_with_tool_calls(self.current_call_id, [tool_call])
if success:
return f"✅ {action_type.capitalize()} action submitted as tool call"
else:
return f"❌ Failed to submit {action_type} action"
def submit_click_action(
self, x: int, y: int, action_type: str = "click", button: str = "left"
) -> str:
"""Submit a coordinate-based action."""
if action_type == "click":
return self.submit_action(action_type, x=x, y=y, button=button)
else:
return self.submit_action(action_type, x=x, y=y)
def submit_type_action(self, text: str) -> str:
"""Submit a type action."""
return self.submit_action("type", text=text)
def submit_hotkey_action(self, keys: str) -> str:
"""Submit a hotkey action."""
return self.submit_action("keypress", keys=keys)
def submit_wait_action(self) -> str:
"""Submit a wait action with no kwargs."""
return self.submit_action("wait")
def submit_description_click(
self, description: str, action_type: str = "click", button: str = "left"
) -> str:
"""Submit a description-based action."""
if action_type == "click":
return self.submit_action(action_type, element_description=description, button=button)
else:
return self.submit_action(action_type, element_description=description)
def wait_for_pending_calls(self, max_seconds: float = 10.0, check_interval: float = 0.2):
"""Wait for pending calls to appear or until max_seconds elapsed.
This method loops and checks for pending calls at regular intervals,
returning as soon as a pending call is found or the maximum wait time is reached.
Args:
max_seconds: Maximum number of seconds to wait
check_interval: How often to check for pending calls (in seconds)
"""
import time
start_time = time.time()
while time.time() - start_time < max_seconds:
# Check if there are any pending calls
pending_calls = self.get_pending_calls()
if pending_calls:
# Found pending calls, return immediately
return self.refresh_pending_calls()
# Wait before checking again
time.sleep(check_interval)
# Max wait time reached, return current state
return self.refresh_pending_calls()
def create_ui():
"""Create the Gradio interface."""
ui_handler = HumanCompletionUI()
with gr.Blocks(title="Human-in-the-Loop Agent Tool", fill_width=True) as demo:
gr.Markdown("# 🤖 Human-in-the-Loop Agent Tool")
gr.Markdown("Review AI conversation requests and provide human responses.")
with gr.Row():
with gr.Column(scale=2):
with gr.Group():
screenshot_image = gr.Image(
label="Interactive Screenshot", interactive=False, height=600
)
# Action type selection for image clicks (wrapped for visibility control)
with gr.Group(visible=False) as click_actions_group:
with gr.Row():
action_type_radio = gr.Dropdown(
label="Interactive Action",
choices=[
"click",
"double_click",
"move",
"left_mouse_up",
"left_mouse_down",
"scroll",
],
value="click",
scale=2,
)
action_button_radio = gr.Dropdown(
label="Button",
choices=["left", "right", "wheel", "back", "forward"],
value="left",
visible=True,
scale=1,
)
scroll_x_input = gr.Number(
label="scroll_x", value=0, visible=False, scale=1
)
scroll_y_input = gr.Number(
label="scroll_y", value=-120, visible=False, scale=1
)
conversation_chatbot = gr.Chatbot(
label="Conversation", type="messages", height=500, show_copy_button=True
)
with gr.Column(scale=1):
with gr.Group():
call_dropdown = gr.Dropdown(
label="Select a pending conversation request",
choices=["latest"],
interactive=True,
value="latest",
)
refresh_btn = gr.Button("🔄 Refresh", variant="secondary")
status_display = gr.Textbox(
label="Status", interactive=False, value="Ready to receive requests..."
)
with gr.Group():
response_text = gr.Textbox(
label="Message", lines=3, placeholder="Enter your message here..."
)
submit_btn = gr.Button(
"📤 Submit Message", variant="primary", interactive=False
)
# Action Accordions (wrapped for visibility control)
with gr.Group(visible=False) as actions_group:
with gr.Tabs():
with gr.Tab("🖱️ Click Actions"):
with gr.Group():
description_text = gr.Textbox(
label="Element Description",
placeholder="e.g., 'Privacy and security option in left sidebar'",
)
with gr.Row():
description_action_type = gr.Dropdown(
label="Action",
choices=[
"click",
"double_click",
"move",
"left_mouse_up",
"left_mouse_down",
],
value="click",
)
description_button = gr.Dropdown(
label="Button",
choices=["left", "right", "wheel", "back", "forward"],
value="left",
)
description_submit_btn = gr.Button("Submit Click Action")
with gr.Tab("📝 Type Action"):
with gr.Group():
type_text = gr.Textbox(
label="Text to Type", placeholder="Enter text to type..."
)
type_submit_btn = gr.Button("Submit Type")
with gr.Tab("⌨️ Keypress Action"):
with gr.Group():
keypress_text = gr.Textbox(
label="Keys", placeholder="e.g., ctrl+c, alt+tab"
)
keypress_submit_btn = gr.Button("Submit Keypress")
with gr.Tab("🧰 Misc Actions"):
with gr.Group():
misc_action_dropdown = gr.Dropdown(
label="Action", choices=["wait"], value="wait"
)
misc_submit_btn = gr.Button("Submit Action")
# Event handlers
refresh_btn.click(
fn=ui_handler.refresh_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
call_dropdown.change(
fn=ui_handler.on_call_selected,
inputs=[call_dropdown],
outputs=[
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
def handle_image_click(evt: gr.SelectData):
if evt.index is not None:
x, y = evt.index
action_type = ui_handler.current_action_type or "click"
button = ui_handler.current_button or "left"
if action_type == "scroll":
sx_i = int(ui_handler.current_scroll_x or 0)
sy_i = int(ui_handler.current_scroll_y or 0)
# Submit a scroll action with x,y position and scroll deltas
result = ui_handler.submit_action(
"scroll", x=x, y=y, scroll_x=sx_i, scroll_y=sy_i
)
else:
result = ui_handler.submit_click_action(x, y, action_type, button)
ui_handler.wait_for_pending_calls()
return result
return "No coordinates selected"
screenshot_image.select(fn=handle_image_click, outputs=[status_display]).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
# Response submission
submit_btn.click(
fn=ui_handler.submit_response,
inputs=[response_text],
outputs=[response_text, status_display],
).then(
fn=ui_handler.refresh_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
# Toggle visibility of controls based on action type
def toggle_action_controls(action_type):
# Button visible only for click
button_vis = gr.update(visible=(action_type == "click"))
# Scroll inputs visible only for scroll
scroll_x_vis = gr.update(visible=(action_type == "scroll"))
scroll_y_vis = gr.update(visible=(action_type == "scroll"))
# Update state
ui_handler.current_action_type = action_type or "click"
return button_vis, scroll_x_vis, scroll_y_vis
action_type_radio.change(
fn=toggle_action_controls,
inputs=[action_type_radio],
outputs=[action_button_radio, scroll_x_input, scroll_y_input],
)
# Keep other control values in ui_handler state
def on_button_change(val):
ui_handler.current_button = val or "left"
action_button_radio.change(fn=on_button_change, inputs=[action_button_radio])
def on_scroll_x_change(val):
try:
ui_handler.current_scroll_x = int(val) if val is not None else 0
except Exception:
ui_handler.current_scroll_x = 0
scroll_x_input.change(fn=on_scroll_x_change, inputs=[scroll_x_input])
def on_scroll_y_change(val):
try:
ui_handler.current_scroll_y = int(val) if val is not None else 0
except Exception:
ui_handler.current_scroll_y = 0
scroll_y_input.change(fn=on_scroll_y_change, inputs=[scroll_y_input])
type_submit_btn.click(
fn=ui_handler.submit_type_action, inputs=[type_text], outputs=[status_display]
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
keypress_submit_btn.click(
fn=ui_handler.submit_hotkey_action, inputs=[keypress_text], outputs=[status_display]
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
def handle_description_submit(description, action_type, button):
if description:
result = ui_handler.submit_description_click(description, action_type, button)
ui_handler.wait_for_pending_calls()
return result
return "Please enter a description"
description_submit_btn.click(
fn=handle_description_submit,
inputs=[description_text, description_action_type, description_button],
outputs=[status_display],
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
# Misc action handler
def handle_misc_submit(selected_action):
if selected_action == "wait":
result = ui_handler.submit_wait_action()
ui_handler.wait_for_pending_calls()
return result
return f"Unsupported misc action: {selected_action}"
misc_submit_btn.click(
fn=handle_misc_submit, inputs=[misc_action_dropdown], outputs=[status_display]
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
# Load initial data
demo.load(
fn=ui_handler.refresh_pending_calls,
outputs=[
call_dropdown,
screenshot_image,
conversation_chatbot,
submit_btn,
click_actions_group,
actions_group,
],
)
return demo
if __name__ == "__main__":
demo = create_ui()
demo.queue()
demo.launch(server_name="0.0.0.0", server_port=7860)
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/responses.py:
--------------------------------------------------------------------------------
```python
"""
Functions for making various Responses API items from different types of responses.
Based on the OpenAI spec for Responses API items.
"""
import base64
import json
import uuid
from typing import Any, Dict, List, Literal, Optional, Union
from openai.types.responses.easy_input_message_param import EasyInputMessageParam
from openai.types.responses.response_computer_tool_call_param import (
ActionClick,
ActionDoubleClick,
ActionDrag,
ActionDragPath,
ActionKeypress,
ActionMove,
ActionScreenshot,
ActionScroll,
)
from openai.types.responses.response_computer_tool_call_param import (
ActionType as ActionTypeAction,
)
from openai.types.responses.response_computer_tool_call_param import (
ActionWait,
PendingSafetyCheck,
ResponseComputerToolCallParam,
)
from openai.types.responses.response_function_tool_call_param import (
ResponseFunctionToolCallParam,
)
from openai.types.responses.response_input_image_param import ResponseInputImageParam
from openai.types.responses.response_output_message_param import (
ResponseOutputMessageParam,
)
from openai.types.responses.response_output_text_param import ResponseOutputTextParam
from openai.types.responses.response_reasoning_item_param import (
ResponseReasoningItemParam,
Summary,
)
def random_id():
return str(uuid.uuid4())
# User message items
def make_input_image_item(image_data: Union[str, bytes]) -> EasyInputMessageParam:
return EasyInputMessageParam(
content=[
ResponseInputImageParam(
type="input_image",
image_url=f"data:image/png;base64,{base64.b64encode(image_data).decode('utf-8') if isinstance(image_data, bytes) else image_data}",
) # type: ignore
],
role="user",
type="message",
)
# Text items
def make_reasoning_item(reasoning: str) -> ResponseReasoningItemParam:
return ResponseReasoningItemParam(
id=random_id(), summary=[Summary(text=reasoning, type="summary_text")], type="reasoning"
)
def make_output_text_item(content: str) -> ResponseOutputMessageParam:
return ResponseOutputMessageParam(
id=random_id(),
content=[ResponseOutputTextParam(text=content, type="output_text", annotations=[])],
role="assistant",
status="completed",
type="message",
)
# Function call items
def make_function_call_item(
function_name: str, arguments: Dict[str, Any], call_id: Optional[str] = None
) -> ResponseFunctionToolCallParam:
return ResponseFunctionToolCallParam(
id=random_id(),
call_id=call_id if call_id else random_id(),
name=function_name,
arguments=json.dumps(arguments),
status="completed",
type="function_call",
)
# Computer tool call items
def make_click_item(
x: int,
y: int,
button: Literal["left", "right", "wheel", "back", "forward"] = "left",
call_id: Optional[str] = None,
) -> ResponseComputerToolCallParam:
return ResponseComputerToolCallParam(
id=random_id(),
call_id=call_id if call_id else random_id(),
action=ActionClick(button=button, type="click", x=x, y=y),
pending_safety_checks=[],
status="completed",
type="computer_call",
)
def make_double_click_item(
x: int, y: int, call_id: Optional[str] = None
) -> ResponseComputerToolCallParam:
return ResponseComputerToolCallParam(
id=random_id(),
call_id=call_id if call_id else random_id(),
action=ActionDoubleClick(type="double_click", x=x, y=y),
pending_safety_checks=[],
status="completed",
type="computer_call",
)
def make_drag_item(
path: List[Dict[str, int]], call_id: Optional[str] = None
) -> ResponseComputerToolCallParam:
drag_path = [ActionDragPath(x=point["x"], y=point["y"]) for point in path]
return ResponseComputerToolCallParam(
id=random_id(),
call_id=call_id if call_id else random_id(),
action=ActionDrag(path=drag_path, type="drag"),
pending_safety_checks=[],
status="completed",
type="computer_call",
)
def make_keypress_item(
keys: List[str], call_id: Optional[str] = None
) -> ResponseComputerToolCallParam:
return ResponseComputerToolCallParam(
id=random_id(),
call_id=call_id if call_id else random_id(),
action=ActionKeypress(keys=keys, type="keypress"),
pending_safety_checks=[],
status="completed",
type="computer_call",
)
def make_move_item(x: int, y: int, call_id: Optional[str] = None) -> ResponseComputerToolCallParam:
return ResponseComputerToolCallParam(
id=random_id(),
call_id=call_id if call_id else random_id(),
action=ActionMove(type="move", x=x, y=y),
pending_safety_checks=[],
status="completed",
type="computer_call",
)
def make_screenshot_item(call_id: Optional[str] = None) -> ResponseComputerToolCallParam:
return ResponseComputerToolCallParam(
id=random_id(),
call_id=call_id if call_id else random_id(),
action=ActionScreenshot(type="screenshot"),
pending_safety_checks=[],
status="completed",
type="computer_call",
)
def make_scroll_item(
x: int, y: int, scroll_x: int, scroll_y: int, call_id: Optional[str] = None
) -> ResponseComputerToolCallParam:
return ResponseComputerToolCallParam(
id=random_id(),
call_id=call_id if call_id else random_id(),
action=ActionScroll(scroll_x=scroll_x, scroll_y=scroll_y, type="scroll", x=x, y=y),
pending_safety_checks=[],
status="completed",
type="computer_call",
)
def make_type_item(text: str, call_id: Optional[str] = None) -> ResponseComputerToolCallParam:
return ResponseComputerToolCallParam(
id=random_id(),
call_id=call_id if call_id else random_id(),
action=ActionTypeAction(text=text, type="type"),
pending_safety_checks=[],
status="completed",
type="computer_call",
)
def make_wait_item(call_id: Optional[str] = None) -> ResponseComputerToolCallParam:
return ResponseComputerToolCallParam(
id=random_id(),
call_id=call_id if call_id else random_id(),
action=ActionWait(type="wait"),
pending_safety_checks=[],
status="completed",
type="computer_call",
)
# Extra anthropic computer calls
def make_left_mouse_down_item(
x: Optional[int] = None, y: Optional[int] = None, call_id: Optional[str] = None
) -> Dict[str, Any]:
return {
"id": random_id(),
"call_id": call_id if call_id else random_id(),
"action": {"type": "left_mouse_down", "x": x, "y": y},
"pending_safety_checks": [],
"status": "completed",
"type": "computer_call",
}
def make_left_mouse_up_item(
x: Optional[int] = None, y: Optional[int] = None, call_id: Optional[str] = None
) -> Dict[str, Any]:
return {
"id": random_id(),
"call_id": call_id if call_id else random_id(),
"action": {"type": "left_mouse_up", "x": x, "y": y},
"pending_safety_checks": [],
"status": "completed",
"type": "computer_call",
}
def make_failed_tool_call_items(
tool_name: str, tool_kwargs: Dict[str, Any], error_message: str, call_id: Optional[str] = None
) -> List[Dict[str, Any]]:
call_id = call_id if call_id else random_id()
return [
{
"type": "function_call",
"id": random_id(),
"call_id": call_id,
"name": tool_name,
"arguments": json.dumps(tool_kwargs),
},
{
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps({"error": error_message}),
},
]
def make_tool_error_item(error_message: str, call_id: Optional[str] = None) -> Dict[str, Any]:
call_id = call_id if call_id else random_id()
return {
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps({"error": error_message}),
}
def replace_failed_computer_calls_with_function_calls(
messages: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Replace computer_call items with function_call items if they share a call_id with a function_call_output.
This indicates the computer call failed and should be treated as a function call instead.
We do this because the computer_call_output items do not support text output.
Args:
messages: List of message items to process
"""
messages = messages.copy()
# Find all call_ids that have function_call_output items
failed_call_ids = set()
for msg in messages:
if msg.get("type") == "function_call_output":
call_id = msg.get("call_id")
if call_id:
failed_call_ids.add(call_id)
# Replace computer_call items that have matching call_ids
for i, msg in enumerate(messages):
if msg.get("type") == "computer_call" and msg.get("call_id") in failed_call_ids:
# Extract action from computer_call
action = msg.get("action", {})
call_id = msg.get("call_id")
# Create function_call replacement
messages[i] = {
"type": "function_call",
"id": msg.get("id", random_id()),
"call_id": call_id,
"name": "computer",
"arguments": json.dumps(action),
}
return messages
# Conversion functions between element descriptions and coordinates
def convert_computer_calls_desc2xy(
responses_items: List[Dict[str, Any]], desc2xy: Dict[str, tuple]
) -> List[Dict[str, Any]]:
"""
Convert computer calls from element descriptions to x,y coordinates.
Args:
responses_items: List of response items containing computer calls with element_description
desc2xy: Dictionary mapping element descriptions to (x, y) coordinate tuples
Returns:
List of response items with element_description replaced by x,y coordinates
"""
converted_items = []
for item in responses_items:
if item.get("type") == "computer_call" and "action" in item:
action = item["action"].copy()
# Handle single element_description
if "element_description" in action:
desc = action["element_description"]
if desc in desc2xy:
x, y = desc2xy[desc]
action["x"] = x
action["y"] = y
del action["element_description"]
# Handle start_element_description and end_element_description for drag operations
elif "start_element_description" in action and "end_element_description" in action:
start_desc = action["start_element_description"]
end_desc = action["end_element_description"]
if start_desc in desc2xy and end_desc in desc2xy:
start_x, start_y = desc2xy[start_desc]
end_x, end_y = desc2xy[end_desc]
action["path"] = [{"x": start_x, "y": start_y}, {"x": end_x, "y": end_y}]
del action["start_element_description"]
del action["end_element_description"]
converted_item = item.copy()
converted_item["action"] = action
converted_items.append(converted_item)
else:
converted_items.append(item)
return converted_items
def convert_computer_calls_xy2desc(
responses_items: List[Dict[str, Any]], desc2xy: Dict[str, tuple]
) -> List[Dict[str, Any]]:
"""
Convert computer calls from x,y coordinates to element descriptions.
Args:
responses_items: List of response items containing computer calls with x,y coordinates
desc2xy: Dictionary mapping element descriptions to (x, y) coordinate tuples
Returns:
List of response items with x,y coordinates replaced by element_description
"""
# Create reverse mapping from coordinates to descriptions
xy2desc = {coords: desc for desc, coords in desc2xy.items()}
converted_items = []
for item in responses_items:
if item.get("type") == "computer_call" and "action" in item:
action = item["action"].copy()
# Handle single x,y coordinates
if "x" in action and "y" in action:
coords = (action["x"], action["y"])
if coords in xy2desc:
action["element_description"] = xy2desc[coords]
del action["x"]
del action["y"]
# Handle path for drag operations
elif "path" in action and isinstance(action["path"], list) and len(action["path"]) == 2:
start_point = action["path"][0]
end_point = action["path"][1]
if (
"x" in start_point
and "y" in start_point
and "x" in end_point
and "y" in end_point
):
start_coords = (start_point["x"], start_point["y"])
end_coords = (end_point["x"], end_point["y"])
if start_coords in xy2desc and end_coords in xy2desc:
action["start_element_description"] = xy2desc[start_coords]
action["end_element_description"] = xy2desc[end_coords]
del action["path"]
converted_item = item.copy()
converted_item["action"] = action
converted_items.append(converted_item)
else:
converted_items.append(item)
return converted_items
def get_all_element_descriptions(responses_items: List[Dict[str, Any]]) -> List[str]:
"""
Extract all element descriptions from computer calls in responses items.
Args:
responses_items: List of response items containing computer calls
Returns:
List of unique element descriptions found in computer calls
"""
descriptions = set()
for item in responses_items:
if item.get("type") == "computer_call" and "action" in item:
action = item["action"]
# Handle single element_description
if "element_description" in action:
descriptions.add(action["element_description"])
# Handle start_element_description and end_element_description for drag operations
if "start_element_description" in action:
descriptions.add(action["start_element_description"])
if "end_element_description" in action:
descriptions.add(action["end_element_description"])
return list(descriptions)
# Conversion functions between responses_items and completion messages formats
def convert_responses_items_to_completion_messages(
messages: List[Dict[str, Any]],
allow_images_in_tool_results: bool = True,
send_multiple_user_images_per_parallel_tool_results: bool = False,
) -> List[Dict[str, Any]]:
"""Convert responses_items message format to liteLLM completion format.
Args:
messages: List of responses_items format messages
allow_images_in_tool_results: If True, include images in tool role messages.
If False, send tool message + separate user message with image.
send_multiple_user_images_per_parallel_tool_results: If True, send multiple user images in parallel tool results.
"""
completion_messages = []
for i, message in enumerate(messages):
msg_type = message.get("type")
role = message.get("role")
# Handle user messages (both with and without explicit type)
if role == "user" or msg_type == "user":
content = message.get("content", "")
if isinstance(content, list):
# Handle list content (images, text blocks)
completion_content = []
for item in content:
if item.get("type") == "input_image":
completion_content.append(
{"type": "image_url", "image_url": {"url": item.get("image_url")}}
)
elif item.get("type") == "input_text":
completion_content.append({"type": "text", "text": item.get("text")})
elif item.get("type") == "text":
completion_content.append({"type": "text", "text": item.get("text")})
completion_messages.append({"role": "user", "content": completion_content})
elif isinstance(content, str):
# Handle string content
completion_messages.append({"role": "user", "content": content})
# Handle assistant messages
elif role == "assistant" or msg_type == "message":
content = message.get("content", [])
if isinstance(content, list):
text_parts = []
for item in content:
if item.get("type") == "output_text":
text_parts.append(item.get("text", ""))
elif item.get("type") == "text":
text_parts.append(item.get("text", ""))
if text_parts:
completion_messages.append(
{"role": "assistant", "content": "\n".join(text_parts)}
)
# Handle reasoning items (convert to assistant message)
elif msg_type == "reasoning":
summary = message.get("summary", [])
text_parts = []
for item in summary:
if item.get("type") == "summary_text":
text_parts.append(item.get("text", ""))
if text_parts:
completion_messages.append({"role": "assistant", "content": "\n".join(text_parts)})
# Handle function calls
elif msg_type == "function_call":
# Add tool call to last assistant message or create new one
if not completion_messages or completion_messages[-1]["role"] != "assistant":
completion_messages.append({"role": "assistant", "content": "", "tool_calls": []})
if "tool_calls" not in completion_messages[-1]:
completion_messages[-1]["tool_calls"] = []
completion_messages[-1]["tool_calls"].append(
{
"id": message.get("call_id"),
"type": "function",
"function": {
"name": message.get("name"),
"arguments": message.get("arguments"),
},
}
)
# Handle computer calls
elif msg_type == "computer_call":
# Add tool call to last assistant message or create new one
if not completion_messages or completion_messages[-1]["role"] != "assistant":
completion_messages.append({"role": "assistant", "content": "", "tool_calls": []})
if "tool_calls" not in completion_messages[-1]:
completion_messages[-1]["tool_calls"] = []
action = message.get("action", {})
completion_messages[-1]["tool_calls"].append(
{
"id": message.get("call_id"),
"type": "function",
"function": {"name": "computer", "arguments": json.dumps(action)},
}
)
# Handle function/computer call outputs
elif msg_type in ["function_call_output", "computer_call_output"]:
output = message.get("output")
call_id = message.get("call_id")
if isinstance(output, dict) and output.get("type") == "input_image":
if allow_images_in_tool_results:
# Handle image output as tool response (may not work with all APIs)
completion_messages.append(
{
"role": "tool",
"tool_call_id": call_id,
"content": [
{"type": "image_url", "image_url": {"url": output.get("image_url")}}
],
}
)
else:
# Determine if the next message is also a tool call output
next_type = None
if i + 1 < len(messages):
next_msg = messages[i + 1]
next_type = next_msg.get("type")
is_next_message_image_result = next_type in [
"computer_call_output",
]
# Send tool message + separate user message with image (OpenAI compatible)
completion_messages += (
[
{
"role": "tool",
"tool_call_id": call_id,
"content": "[Execution completed. See screenshot below]",
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": output.get("image_url")},
}
],
},
]
if send_multiple_user_images_per_parallel_tool_results
or (not is_next_message_image_result)
else [
{
"role": "tool",
"tool_call_id": call_id,
"content": "[Execution completed. See screenshot below]",
},
]
)
else:
# Handle text output as tool response
completion_messages.append(
{"role": "tool", "tool_call_id": call_id, "content": str(output)}
)
return completion_messages
def convert_completion_messages_to_responses_items(
completion_messages: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Convert completion messages format to responses_items message format."""
responses_items = []
skip_next = False
for i, message in enumerate(completion_messages):
if skip_next:
skip_next = False
continue
role = message.get("role")
content = message.get("content")
tool_calls = message.get("tool_calls", [])
# Handle assistant messages with text content
if role == "assistant" and content and isinstance(content, str):
responses_items.append(
{
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": content}],
}
)
# Handle tool calls
if tool_calls:
for tool_call in tool_calls:
if tool_call.get("type") == "function":
function = tool_call.get("function", {})
function_name = function.get("name")
if function_name == "computer":
# Parse computer action
try:
action = json.loads(function.get("arguments", "{}"))
# Change key from "action" -> "type"
if action.get("action"):
action["type"] = action["action"]
del action["action"]
responses_items.append(
{
"type": "computer_call",
"call_id": tool_call.get("id"),
"action": action,
"status": "completed",
}
)
except json.JSONDecodeError:
# Fallback to function call format
responses_items.append(
{
"type": "function_call",
"call_id": tool_call.get("id"),
"name": function_name,
"arguments": function.get("arguments", "{}"),
"status": "completed",
}
)
else:
# Regular function call
responses_items.append(
{
"type": "function_call",
"call_id": tool_call.get("id"),
"name": function_name,
"arguments": function.get("arguments", "{}"),
"status": "completed",
}
)
# Handle tool messages (function/computer call outputs)
elif role == "tool" and content:
tool_call_id = message.get("tool_call_id")
if isinstance(content, str):
# Check if this is the "[Execution completed. See screenshot below]" pattern
if content == "[Execution completed. See screenshot below]":
# Look ahead for the next user message with image
next_idx = i + 1
if (
next_idx < len(completion_messages)
and completion_messages[next_idx].get("role") == "user"
and isinstance(completion_messages[next_idx].get("content"), list)
):
# Found the pattern - extract image from next message
next_content = completion_messages[next_idx]["content"]
for item in next_content:
if item.get("type") == "image_url":
responses_items.append(
{
"type": "computer_call_output",
"call_id": tool_call_id,
"output": {
"type": "input_image",
"image_url": item.get("image_url", {}).get("url"),
},
}
)
# Skip the next user message since we processed it
skip_next = True
break
else:
# No matching user message, treat as regular text
responses_items.append(
{
"type": "computer_call_output",
"call_id": tool_call_id,
"output": content,
}
)
else:
# Determine if this is a computer call or function call output
try:
# Try to parse as structured output
parsed_content = json.loads(content)
if parsed_content.get("type") == "input_image":
responses_items.append(
{
"type": "computer_call_output",
"call_id": tool_call_id,
"output": parsed_content,
}
)
else:
responses_items.append(
{
"type": "computer_call_output",
"call_id": tool_call_id,
"output": content,
}
)
except json.JSONDecodeError:
# Plain text output - could be function or computer call
responses_items.append(
{
"type": "function_call_output",
"call_id": tool_call_id,
"output": content,
}
)
elif isinstance(content, list):
# Handle structured content (e.g., images)
for item in content:
if item.get("type") == "image_url":
responses_items.append(
{
"type": "computer_call_output",
"call_id": tool_call_id,
"output": {
"type": "input_image",
"image_url": item.get("image_url", {}).get("url"),
},
}
)
elif item.get("type") == "text":
responses_items.append(
{
"type": "function_call_output",
"call_id": tool_call_id,
"output": item.get("text"),
}
)
# Handle actual user messages
elif role == "user" and content:
if isinstance(content, list):
# Handle structured user content (e.g., text + images)
user_content = []
for item in content:
if item.get("type") == "image_url":
user_content.append(
{
"type": "input_image",
"image_url": item.get("image_url", {}).get("url"),
}
)
elif item.get("type") == "text":
user_content.append({"type": "input_text", "text": item.get("text")})
if user_content:
responses_items.append(
{"role": "user", "type": "message", "content": user_content}
)
elif isinstance(content, str):
# Handle simple text user message
responses_items.append({"role": "user", "content": content})
return responses_items
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/agent.py:
--------------------------------------------------------------------------------
```python
"""
ComputerAgent - Main agent class that selects and runs agent loops
"""
import asyncio
import inspect
import json
from pathlib import Path
from typing import (
Any,
AsyncGenerator,
Callable,
Dict,
List,
Optional,
Set,
Tuple,
Union,
cast,
)
import litellm
import litellm.utils
from litellm.responses.utils import Usage
from .adapters import CUAAdapter, HuggingFaceLocalAdapter, HumanAdapter, MLXVLMAdapter
from .callbacks import (
BudgetManagerCallback,
ImageRetentionCallback,
LoggingCallback,
OperatorNormalizerCallback,
PromptInstructionsCallback,
TelemetryCallback,
TrajectorySaverCallback,
)
from .computers import AsyncComputerHandler, is_agent_computer, make_computer_handler
from .decorators import find_agent_config
from .responses import (
make_tool_error_item,
replace_failed_computer_calls_with_function_calls,
)
from .types import AgentCapability, IllegalArgumentError, Messages, ToolError
def assert_callable_with(f, *args, **kwargs):
"""Check if function can be called with given arguments."""
try:
inspect.signature(f).bind(*args, **kwargs)
return True
except TypeError as e:
sig = inspect.signature(f)
raise IllegalArgumentError(f"Expected {sig}, got args={args} kwargs={kwargs}") from e
def get_json(obj: Any, max_depth: int = 10) -> Any:
def custom_serializer(o: Any, depth: int = 0, seen: Optional[Set[int]] = None) -> Any:
if seen is None:
seen = set()
# Use model_dump() if available
if hasattr(o, "model_dump"):
return o.model_dump()
# Check depth limit
if depth > max_depth:
return f"<max_depth_exceeded:{max_depth}>"
# Check for circular references using object id
obj_id = id(o)
if obj_id in seen:
return f"<circular_reference:{type(o).__name__}>"
# Handle Computer objects
if hasattr(o, "__class__") and "computer" in o.__class__.__name__.lower():
return f"<computer:{o.__class__.__name__}>"
# Handle objects with __dict__
if hasattr(o, "__dict__"):
seen.add(obj_id)
try:
result = {}
for k, v in o.__dict__.items():
if v is not None:
# Recursively serialize with updated depth and seen set
serialized_value = custom_serializer(v, depth + 1, seen.copy())
result[k] = serialized_value
return result
finally:
seen.discard(obj_id)
# Handle common types that might contain nested objects
elif isinstance(o, dict):
seen.add(obj_id)
try:
return {
k: custom_serializer(v, depth + 1, seen.copy())
for k, v in o.items()
if v is not None
}
finally:
seen.discard(obj_id)
elif isinstance(o, (list, tuple, set)):
seen.add(obj_id)
try:
return [
custom_serializer(item, depth + 1, seen.copy())
for item in o
if item is not None
]
finally:
seen.discard(obj_id)
# For basic types that json.dumps can handle
elif isinstance(o, (str, int, float, bool)) or o is None:
return o
# Fallback to string representation
else:
return str(o)
def remove_nones(obj: Any) -> Any:
if isinstance(obj, dict):
return {k: remove_nones(v) for k, v in obj.items() if v is not None}
elif isinstance(obj, list):
return [remove_nones(item) for item in obj if item is not None]
return obj
# Serialize with circular reference and depth protection
serialized = custom_serializer(obj)
# Convert to JSON string and back to ensure JSON compatibility
json_str = json.dumps(serialized)
parsed = json.loads(json_str)
# Final cleanup of any remaining None values
return remove_nones(parsed)
def sanitize_message(msg: Any) -> Any:
"""Return a copy of the message with image_url omitted for computer_call_output messages."""
if msg.get("type") == "computer_call_output":
output = msg.get("output", {})
if isinstance(output, dict):
sanitized = msg.copy()
sanitized["output"] = {**output, "image_url": "[omitted]"}
return sanitized
return msg
def get_output_call_ids(messages: List[Dict[str, Any]]) -> List[str]:
call_ids = []
for message in messages:
if (
message.get("type") == "computer_call_output"
or message.get("type") == "function_call_output"
):
call_ids.append(message.get("call_id"))
return call_ids
class ComputerAgent:
"""
Main agent class that automatically selects the appropriate agent loop
based on the model and executes tool calls.
"""
def __init__(
self,
model: str,
tools: Optional[List[Any]] = None,
custom_loop: Optional[Callable] = None,
only_n_most_recent_images: Optional[int] = None,
callbacks: Optional[List[Any]] = None,
instructions: Optional[str] = None,
verbosity: Optional[int] = None,
trajectory_dir: Optional[str | Path | dict] = None,
max_retries: Optional[int] = 3,
screenshot_delay: Optional[float | int] = 0.5,
use_prompt_caching: Optional[bool] = False,
max_trajectory_budget: Optional[float | dict] = None,
telemetry_enabled: Optional[bool] = True,
trust_remote_code: Optional[bool] = False,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
**additional_generation_kwargs,
):
"""
Initialize ComputerAgent.
Args:
model: Model name (e.g., "claude-sonnet-4-5-20250929", "computer-use-preview", "omni+vertex_ai/gemini-pro")
tools: List of tools (computer objects, decorated functions, etc.)
custom_loop: Custom agent loop function to use instead of auto-selection
only_n_most_recent_images: If set, only keep the N most recent images in message history. Adds ImageRetentionCallback automatically.
callbacks: List of AsyncCallbackHandler instances for preprocessing/postprocessing
instructions: Optional system instructions to be passed to the model
verbosity: Logging level (logging.DEBUG, logging.INFO, etc.). If set, adds LoggingCallback automatically
trajectory_dir: If set, saves trajectory data (screenshots, responses) to this directory. Adds TrajectorySaverCallback automatically.
max_retries: Maximum number of retries for failed API calls
screenshot_delay: Delay before screenshots in seconds
use_prompt_caching: If set, use prompt caching to avoid reprocessing the same prompt. Intended for use with anthropic providers.
max_trajectory_budget: If set, adds BudgetManagerCallback to track usage costs and stop when budget is exceeded
telemetry_enabled: If set, adds TelemetryCallback to track anonymized usage data. Enabled by default.
trust_remote_code: If set, trust remote code when loading local models. Disabled by default.
api_key: Optional API key override for the model provider
api_base: Optional API base URL override for the model provider
**additional_generation_kwargs: Additional arguments passed to the model provider
"""
# If the loop is "human/human", we need to prefix a grounding model fallback
if model in ["human/human", "human"]:
model = "openai/computer-use-preview+human/human"
self.model = model
self.tools = tools or []
self.custom_loop = custom_loop
self.only_n_most_recent_images = only_n_most_recent_images
self.callbacks = callbacks or []
self.instructions = instructions
self.verbosity = verbosity
self.trajectory_dir = trajectory_dir
self.max_retries = max_retries
self.screenshot_delay = screenshot_delay
self.use_prompt_caching = use_prompt_caching
self.telemetry_enabled = telemetry_enabled
self.kwargs = additional_generation_kwargs
self.trust_remote_code = trust_remote_code
self.api_key = api_key
self.api_base = api_base
# == Add built-in callbacks ==
# Prepend operator normalizer callback
self.callbacks.insert(0, OperatorNormalizerCallback())
# Add prompt instructions callback if provided
if self.instructions:
self.callbacks.append(PromptInstructionsCallback(self.instructions))
# Add logging callback if verbosity is set
if self.verbosity is not None:
self.callbacks.append(LoggingCallback(level=self.verbosity))
# Add image retention callback if only_n_most_recent_images is set
if self.only_n_most_recent_images:
self.callbacks.append(ImageRetentionCallback(self.only_n_most_recent_images))
# Add trajectory saver callback if trajectory_dir is set
if self.trajectory_dir:
if isinstance(self.trajectory_dir, dict):
self.callbacks.append(TrajectorySaverCallback(**self.trajectory_dir))
elif isinstance(self.trajectory_dir, (str, Path)):
self.callbacks.append(TrajectorySaverCallback(str(self.trajectory_dir)))
# Add budget manager if max_trajectory_budget is set
if max_trajectory_budget:
if isinstance(max_trajectory_budget, dict):
self.callbacks.append(BudgetManagerCallback(**max_trajectory_budget))
else:
self.callbacks.append(BudgetManagerCallback(max_trajectory_budget))
# == Enable local model providers w/ LiteLLM ==
# Register local model providers
hf_adapter = HuggingFaceLocalAdapter(
device="auto", trust_remote_code=self.trust_remote_code or False
)
human_adapter = HumanAdapter()
mlx_adapter = MLXVLMAdapter()
cua_adapter = CUAAdapter()
litellm.custom_provider_map = [
{"provider": "huggingface-local", "custom_handler": hf_adapter},
{"provider": "human", "custom_handler": human_adapter},
{"provider": "mlx", "custom_handler": mlx_adapter},
{"provider": "cua", "custom_handler": cua_adapter},
]
litellm.suppress_debug_info = True
# == Initialize computer agent ==
# Find the appropriate agent loop
if custom_loop:
self.agent_loop = custom_loop
self.agent_config_info = None
else:
config_info = find_agent_config(model)
if not config_info:
raise ValueError(f"No agent config found for model: {model}")
# Instantiate the agent config class
self.agent_loop = config_info.agent_class()
self.agent_config_info = config_info
# Add telemetry callback AFTER agent_loop is set so it can capture the correct agent_type
if self.telemetry_enabled:
if isinstance(self.telemetry_enabled, bool):
self.callbacks.append(TelemetryCallback(self))
else:
self.callbacks.append(TelemetryCallback(self, **self.telemetry_enabled))
self.tool_schemas = []
self.computer_handler = None
async def _initialize_computers(self):
"""Initialize computer objects"""
if not self.tool_schemas:
# Process tools and create tool schemas
self.tool_schemas = self._process_tools()
# Find computer tool and create interface adapter
computer_handler = None
for schema in self.tool_schemas:
if schema["type"] == "computer":
computer_handler = await make_computer_handler(schema["computer"])
break
self.computer_handler = computer_handler
def _process_input(self, input: Messages) -> List[Dict[str, Any]]:
"""Process input messages and create schemas for the agent loop"""
if isinstance(input, str):
return [{"role": "user", "content": input}]
return [get_json(msg) for msg in input]
def _process_tools(self) -> List[Dict[str, Any]]:
"""Process tools and create schemas for the agent loop"""
schemas = []
for tool in self.tools:
# Check if it's a computer object (has interface attribute)
if is_agent_computer(tool):
# This is a computer tool - will be handled by agent loop
schemas.append({"type": "computer", "computer": tool})
elif callable(tool):
# Use litellm.utils.function_to_dict to extract schema from docstring
try:
function_schema = litellm.utils.function_to_dict(tool)
schemas.append({"type": "function", "function": function_schema})
except Exception as e:
print(f"Warning: Could not process tool {tool}: {e}")
else:
print(f"Warning: Unknown tool type: {tool}")
return schemas
def _get_tool(self, name: str) -> Optional[Callable]:
"""Get a tool by name"""
for tool in self.tools:
if hasattr(tool, "__name__") and tool.__name__ == name:
return tool
elif hasattr(tool, "func") and tool.func.__name__ == name:
return tool
return None
# ============================================================================
# AGENT RUN LOOP LIFECYCLE HOOKS
# ============================================================================
async def _on_run_start(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]]) -> None:
"""Initialize run tracking by calling callbacks."""
for callback in self.callbacks:
if hasattr(callback, "on_run_start"):
await callback.on_run_start(kwargs, old_items)
async def _on_run_end(
self,
kwargs: Dict[str, Any],
old_items: List[Dict[str, Any]],
new_items: List[Dict[str, Any]],
) -> None:
"""Finalize run tracking by calling callbacks."""
for callback in self.callbacks:
if hasattr(callback, "on_run_end"):
await callback.on_run_end(kwargs, old_items, new_items)
async def _on_run_continue(
self,
kwargs: Dict[str, Any],
old_items: List[Dict[str, Any]],
new_items: List[Dict[str, Any]],
) -> bool:
"""Check if run should continue by calling callbacks."""
for callback in self.callbacks:
if hasattr(callback, "on_run_continue"):
should_continue = await callback.on_run_continue(kwargs, old_items, new_items)
if not should_continue:
return False
return True
async def _on_llm_start(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Prepare messages for the LLM call by applying callbacks."""
result = messages
for callback in self.callbacks:
if hasattr(callback, "on_llm_start"):
result = await callback.on_llm_start(result)
return result
async def _on_llm_end(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Postprocess messages after the LLM call by applying callbacks."""
result = messages
for callback in self.callbacks:
if hasattr(callback, "on_llm_end"):
result = await callback.on_llm_end(result)
return result
async def _on_responses(self, kwargs: Dict[str, Any], responses: Dict[str, Any]) -> None:
"""Called when responses are received."""
for callback in self.callbacks:
if hasattr(callback, "on_responses"):
await callback.on_responses(get_json(kwargs), get_json(responses))
async def _on_computer_call_start(self, item: Dict[str, Any]) -> None:
"""Called when a computer call is about to start."""
for callback in self.callbacks:
if hasattr(callback, "on_computer_call_start"):
await callback.on_computer_call_start(get_json(item))
async def _on_computer_call_end(
self, item: Dict[str, Any], result: List[Dict[str, Any]]
) -> None:
"""Called when a computer call has completed."""
for callback in self.callbacks:
if hasattr(callback, "on_computer_call_end"):
await callback.on_computer_call_end(get_json(item), get_json(result))
async def _on_function_call_start(self, item: Dict[str, Any]) -> None:
"""Called when a function call is about to start."""
for callback in self.callbacks:
if hasattr(callback, "on_function_call_start"):
await callback.on_function_call_start(get_json(item))
async def _on_function_call_end(
self, item: Dict[str, Any], result: List[Dict[str, Any]]
) -> None:
"""Called when a function call has completed."""
for callback in self.callbacks:
if hasattr(callback, "on_function_call_end"):
await callback.on_function_call_end(get_json(item), get_json(result))
async def _on_text(self, item: Dict[str, Any]) -> None:
"""Called when a text message is encountered."""
for callback in self.callbacks:
if hasattr(callback, "on_text"):
await callback.on_text(get_json(item))
async def _on_api_start(self, kwargs: Dict[str, Any]) -> None:
"""Called when an LLM API call is about to start."""
for callback in self.callbacks:
if hasattr(callback, "on_api_start"):
await callback.on_api_start(get_json(kwargs))
async def _on_api_end(self, kwargs: Dict[str, Any], result: Any) -> None:
"""Called when an LLM API call has completed."""
for callback in self.callbacks:
if hasattr(callback, "on_api_end"):
await callback.on_api_end(get_json(kwargs), get_json(result))
async def _on_usage(self, usage: Dict[str, Any]) -> None:
"""Called when usage information is received."""
for callback in self.callbacks:
if hasattr(callback, "on_usage"):
await callback.on_usage(get_json(usage))
async def _on_screenshot(self, screenshot: Union[str, bytes], name: str = "screenshot") -> None:
"""Called when a screenshot is taken."""
for callback in self.callbacks:
if hasattr(callback, "on_screenshot"):
await callback.on_screenshot(screenshot, name)
# ============================================================================
# AGENT OUTPUT PROCESSING
# ============================================================================
async def _handle_item(
self,
item: Any,
computer: Optional[AsyncComputerHandler] = None,
ignore_call_ids: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""Handle each item; may cause a computer action + screenshot."""
call_id = item.get("call_id")
if ignore_call_ids and call_id and call_id in ignore_call_ids:
return []
item_type = item.get("type", None)
if item_type == "message":
await self._on_text(item)
# # Print messages
# if item.get("content"):
# for content_item in item.get("content"):
# if content_item.get("text"):
# print(content_item.get("text"))
return []
try:
if item_type == "computer_call":
await self._on_computer_call_start(item)
if not computer:
raise ValueError("Computer handler is required for computer calls")
# Perform computer actions
action = item.get("action")
action_type = action.get("type")
if action_type is None:
print(
f"Action type cannot be `None`: action={action}, action_type={action_type}"
)
return []
# Extract action arguments (all fields except 'type')
action_args = {k: v for k, v in action.items() if k != "type"}
# print(f"{action_type}({action_args})")
# Execute the computer action
computer_method = getattr(computer, action_type, None)
if computer_method:
assert_callable_with(computer_method, **action_args)
await computer_method(**action_args)
else:
raise ToolError(f"Unknown computer action: {action_type}")
# Take screenshot after action
if self.screenshot_delay and self.screenshot_delay > 0:
await asyncio.sleep(self.screenshot_delay)
screenshot_base64 = await computer.screenshot()
await self._on_screenshot(screenshot_base64, "screenshot_after")
# Handle safety checks
pending_checks = item.get("pending_safety_checks", [])
acknowledged_checks = []
for check in pending_checks:
check_message = check.get("message", str(check))
acknowledged_checks.append(check)
# TODO: implement a callback for safety checks
# if acknowledge_safety_check_callback(check_message, allow_always=True):
# acknowledged_checks.append(check)
# else:
# raise ValueError(f"Safety check failed: {check_message}")
# Create call output
call_output = {
"type": "computer_call_output",
"call_id": item.get("call_id"),
"acknowledged_safety_checks": acknowledged_checks,
"output": {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_base64}",
},
}
# # Additional URL safety checks for browser environments
# if await computer.get_environment() == "browser":
# current_url = await computer.get_current_url()
# call_output["output"]["current_url"] = current_url
# # TODO: implement a callback for URL safety checks
# # check_blocklisted_url(current_url)
result = [call_output]
await self._on_computer_call_end(item, result)
return result
if item_type == "function_call":
await self._on_function_call_start(item)
# Perform function call
function = self._get_tool(item.get("name"))
if not function:
raise ToolError(f"Function {item.get('name')} not found")
args = json.loads(item.get("arguments"))
# Validate arguments before execution
assert_callable_with(function, **args)
# Execute function - use asyncio.to_thread for non-async functions
if inspect.iscoroutinefunction(function):
result = await function(**args)
else:
result = await asyncio.to_thread(function, **args)
# Create function call output
call_output = {
"type": "function_call_output",
"call_id": item.get("call_id"),
"output": str(result),
}
result = [call_output]
await self._on_function_call_end(item, result)
return result
except ToolError as e:
return [make_tool_error_item(repr(e), call_id)]
return []
# ============================================================================
# MAIN AGENT LOOP
# ============================================================================
async def run(
self,
messages: Messages,
stream: bool = False,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
**additional_generation_kwargs,
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Run the agent with the given messages using Computer protocol handler pattern.
Args:
messages: List of message dictionaries
stream: Whether to stream the response
api_key: Optional API key override for the model provider
api_base: Optional API base URL override for the model provider
**additional_generation_kwargs: Additional arguments passed to the model provider
Returns:
AsyncGenerator that yields response chunks
"""
if not self.agent_config_info:
raise ValueError("Agent configuration not found")
capabilities = self.get_capabilities()
if "step" not in capabilities:
raise ValueError(
f"Agent loop {self.agent_config_info.agent_class.__name__} does not support step predictions"
)
await self._initialize_computers()
# Merge kwargs and thread api credentials (run overrides constructor)
merged_kwargs = {**self.kwargs, **additional_generation_kwargs}
if (api_key is not None) or (self.api_key is not None):
merged_kwargs["api_key"] = api_key if api_key is not None else self.api_key
if (api_base is not None) or (self.api_base is not None):
merged_kwargs["api_base"] = api_base if api_base is not None else self.api_base
old_items = self._process_input(messages)
new_items = []
# Initialize run tracking
run_kwargs = {
"messages": messages,
"stream": stream,
"model": self.model,
"agent_loop": self.agent_config_info.agent_class.__name__,
**merged_kwargs,
}
await self._on_run_start(run_kwargs, old_items)
while new_items[-1].get("role") != "assistant" if new_items else True:
# Lifecycle hook: Check if we should continue based on callbacks (e.g., budget manager)
should_continue = await self._on_run_continue(run_kwargs, old_items, new_items)
if not should_continue:
break
# Lifecycle hook: Prepare messages for the LLM call
# Use cases:
# - PII anonymization
# - Image retention policy
combined_messages = old_items + new_items
combined_messages = replace_failed_computer_calls_with_function_calls(combined_messages)
preprocessed_messages = await self._on_llm_start(combined_messages)
loop_kwargs = {
"messages": preprocessed_messages,
"model": self.model,
"tools": self.tool_schemas,
"stream": False,
"computer_handler": self.computer_handler,
"max_retries": self.max_retries,
"use_prompt_caching": self.use_prompt_caching,
**merged_kwargs,
}
# Run agent loop iteration
result = await self.agent_loop.predict_step(
**loop_kwargs,
_on_api_start=self._on_api_start,
_on_api_end=self._on_api_end,
_on_usage=self._on_usage,
_on_screenshot=self._on_screenshot,
)
result = get_json(result)
# Lifecycle hook: Postprocess messages after the LLM call
# Use cases:
# - PII deanonymization (if you want tool calls to see PII)
result["output"] = await self._on_llm_end(result.get("output", []))
await self._on_responses(loop_kwargs, result)
# Yield agent response
yield result
# Add agent response to new_items
new_items += result.get("output")
# Get output call ids
output_call_ids = get_output_call_ids(result.get("output", []))
# Handle computer actions
for item in result.get("output"):
partial_items = await self._handle_item(
item, self.computer_handler, ignore_call_ids=output_call_ids
)
new_items += partial_items
# Yield partial response
yield {
"output": partial_items,
"usage": Usage(
prompt_tokens=0,
completion_tokens=0,
total_tokens=0,
),
}
await self._on_run_end(loop_kwargs, old_items, new_items)
async def predict_click(
self, instruction: str, image_b64: Optional[str] = None
) -> Optional[Tuple[int, int]]:
"""
Predict click coordinates based on image and instruction.
Args:
instruction: Instruction for where to click
image_b64: Base64 encoded image (optional, will take screenshot if not provided)
Returns:
None or tuple with (x, y) coordinates
"""
if not self.agent_config_info:
raise ValueError("Agent configuration not found")
capabilities = self.get_capabilities()
if "click" not in capabilities:
raise ValueError(
f"Agent loop {self.agent_config_info.agent_class.__name__} does not support click predictions"
)
if hasattr(self.agent_loop, "predict_click"):
if not image_b64:
if not self.computer_handler:
raise ValueError("Computer tool or image_b64 is required for predict_click")
image_b64 = await self.computer_handler.screenshot()
# Pass along api credentials if available
click_kwargs: Dict[str, Any] = {}
if self.api_key is not None:
click_kwargs["api_key"] = self.api_key
if self.api_base is not None:
click_kwargs["api_base"] = self.api_base
return await self.agent_loop.predict_click(
model=self.model, image_b64=image_b64, instruction=instruction, **click_kwargs
)
return None
def get_capabilities(self) -> List[AgentCapability]:
"""
Get list of capabilities supported by the current agent config.
Returns:
List of capability strings (e.g., ["step", "click"])
"""
if not self.agent_config_info:
raise ValueError("Agent configuration not found")
if hasattr(self.agent_loop, "get_capabilities"):
return self.agent_loop.get_capabilities()
return ["step"] # Default capability
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/uitars.py:
--------------------------------------------------------------------------------
```python
"""
UITARS agent loop implementation using liteLLM for ByteDance-Seed/UI-TARS-1.5-7B
Paper: https://arxiv.org/abs/2501.12326
Code: https://github.com/bytedance/UI-TARS
"""
import ast
import asyncio
import base64
import json
import math
import re
from ctypes import cast
from io import BytesIO
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union
import litellm
from litellm.responses.litellm_completion_transformation.transformation import (
LiteLLMCompletionResponsesConfig,
)
from litellm.responses.utils import Usage
from litellm.types.utils import ModelResponse
from openai.types.responses.response_computer_tool_call_param import (
ActionType,
ResponseComputerToolCallParam,
)
from openai.types.responses.response_input_param import ComputerCallOutput
from openai.types.responses.response_output_message_param import (
ResponseOutputMessageParam,
)
from openai.types.responses.response_reasoning_item_param import (
ResponseReasoningItemParam,
Summary,
)
from PIL import Image
from ..decorators import register_agent
from ..responses import (
make_click_item,
make_double_click_item,
make_drag_item,
make_input_image_item,
make_keypress_item,
make_output_text_item,
make_reasoning_item,
make_scroll_item,
make_type_item,
make_wait_item,
)
from ..types import AgentCapability, AgentResponse, Messages, Tools
# Constants from reference code
IMAGE_FACTOR = 28
MIN_PIXELS = 100 * 28 * 28
MAX_PIXELS = 16384 * 28 * 28
MAX_RATIO = 200
FINISH_WORD = "finished"
WAIT_WORD = "wait"
ENV_FAIL_WORD = "error_env"
CALL_USER = "call_user"
# Action space prompt for UITARS
UITARS_ACTION_SPACE = """
click(start_box='<|box_start|>(x1,y1)<|box_end|>')
left_double(start_box='<|box_start|>(x1,y1)<|box_end|>')
right_single(start_box='<|box_start|>(x1,y1)<|box_end|>')
drag(start_box='<|box_start|>(x1,y1)<|box_end|>', end_box='<|box_start|>(x3,y3)<|box_end|>')
hotkey(key='')
type(content='') #If you want to submit your input, use "\\n" at the end of `content`.
scroll(start_box='<|box_start|>(x1,y1)<|box_end|>', direction='down or up or right or left')
wait() #Sleep for 5s and take a screenshot to check for any changes.
finished(content='xxx') # Use escape characters \\', \\", and \\n in content part to ensure we can parse the content in normal python string format.
"""
UITARS_PROMPT_TEMPLATE = """You are a GUI agent. You are given a task and your action history, with screenshots. You need to perform the next action to complete the task.
## Output Format
```
Thought: ...
Action: ...
```
## Action Space
{action_space}
## Note
- Use {language} in `Thought` part.
- Write a small plan and finally summarize your next action (with its target element) in one sentence in `Thought` part.
## User Instruction
{instruction}
"""
GROUNDING_UITARS_PROMPT_TEMPLATE = """You are a GUI agent. You are given a task and your action history, with screenshots. You need to perform the next action to complete the task.
## Output Format
Action: ...
## Action Space
click(point='<|box_start|>(x1,y1)<|box_end|>')
## User Instruction
{instruction}"""
def round_by_factor(number: float, factor: int) -> int:
"""Returns the closest integer to 'number' that is divisible by 'factor'."""
return round(number / factor) * factor
def ceil_by_factor(number: float, factor: int) -> int:
"""Returns the smallest integer greater than or equal to 'number' that is divisible by 'factor'."""
return math.ceil(number / factor) * factor
def floor_by_factor(number: float, factor: int) -> int:
"""Returns the largest integer less than or equal to 'number' that is divisible by 'factor'."""
return math.floor(number / factor) * factor
def smart_resize(
height: int,
width: int,
factor: int = IMAGE_FACTOR,
min_pixels: int = MIN_PIXELS,
max_pixels: int = MAX_PIXELS,
) -> tuple[int, int]:
"""
Rescales the image so that the following conditions are met:
1. Both dimensions (height and width) are divisible by 'factor'.
2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
3. The aspect ratio of the image is maintained as closely as possible.
"""
if max(height, width) / min(height, width) > MAX_RATIO:
raise ValueError(
f"absolute aspect ratio must be smaller than {MAX_RATIO}, got {max(height, width) / min(height, width)}"
)
h_bar = max(factor, round_by_factor(height, factor))
w_bar = max(factor, round_by_factor(width, factor))
if h_bar * w_bar > max_pixels:
beta = math.sqrt((height * width) / max_pixels)
h_bar = floor_by_factor(height / beta, factor)
w_bar = floor_by_factor(width / beta, factor)
elif h_bar * w_bar < min_pixels:
beta = math.sqrt(min_pixels / (height * width))
h_bar = ceil_by_factor(height * beta, factor)
w_bar = ceil_by_factor(width * beta, factor)
return h_bar, w_bar
def escape_single_quotes(text):
"""Escape single quotes in text for safe string formatting."""
pattern = r"(?<!\\)'"
return re.sub(pattern, r"\\'", text)
def parse_action(action_str):
"""Parse action string into structured format."""
try:
node = ast.parse(action_str, mode="eval")
if not isinstance(node, ast.Expression):
raise ValueError("Not an expression")
call = node.body
if not isinstance(call, ast.Call):
raise ValueError("Not a function call")
# Get function name
if isinstance(call.func, ast.Name):
func_name = call.func.id
elif isinstance(call.func, ast.Attribute):
func_name = call.func.attr
else:
func_name = None
# Get keyword arguments
kwargs = {}
for kw in call.keywords:
key = kw.arg
if isinstance(kw.value, ast.Constant):
value = kw.value.value
elif isinstance(kw.value, ast.Str): # Compatibility with older Python
value = kw.value.s
else:
value = None
kwargs[key] = value
return {"function": func_name, "args": kwargs}
except Exception as e:
print(f"Failed to parse action '{action_str}': {e}")
return None
def parse_uitars_response(text: str, image_width: int, image_height: int) -> List[Dict[str, Any]]:
"""Parse UITARS model response into structured actions."""
text = text.strip()
# Extract thought
thought = None
if text.startswith("Thought:"):
thought_match = re.search(r"Thought: (.+?)(?=\s*Action:|$)", text, re.DOTALL)
if thought_match:
thought = thought_match.group(1).strip()
# Extract action
if "Action:" not in text:
raise ValueError("No Action found in response")
action_str = text.split("Action:")[-1].strip()
# Handle special case for type actions
if "type(content" in action_str:
def escape_quotes(match):
return match.group(1)
pattern = r"type\(content='(.*?)'\)"
content = re.sub(pattern, escape_quotes, action_str)
action_str = escape_single_quotes(content)
action_str = "type(content='" + action_str + "')"
# Parse the action
parsed_action = parse_action(action_str.replace("\n", "\\n").lstrip())
if parsed_action is None:
raise ValueError(f"Action can't parse: {action_str}")
action_type = parsed_action["function"]
params = parsed_action["args"]
# Process parameters
action_inputs = {}
for param_name, param in params.items():
if param == "":
continue
param = str(param).lstrip()
action_inputs[param_name.strip()] = param
# Handle coordinate parameters
if "start_box" in param_name or "end_box" in param_name:
# Parse coordinates like '<|box_start|>(x,y)<|box_end|>' or '(x,y)'
# First, remove special tokens
clean_param = param.replace("<|box_start|>", "").replace("<|box_end|>", "")
# Then remove parentheses and split
numbers = clean_param.replace("(", "").replace(")", "").split(",")
try:
float_numbers = [
float(num.strip()) / 1000 for num in numbers
] # Normalize to 0-1 range
if len(float_numbers) == 2:
# Single point, duplicate for box format
float_numbers = [
float_numbers[0],
float_numbers[1],
float_numbers[0],
float_numbers[1],
]
action_inputs[param_name.strip()] = str(float_numbers)
except ValueError as e:
# If parsing fails, keep the original parameter value
print(f"Warning: Could not parse coordinates '{param}': {e}")
action_inputs[param_name.strip()] = param
return [
{
"thought": thought,
"action_type": action_type,
"action_inputs": action_inputs,
"text": text,
}
]
def convert_to_computer_actions(
parsed_responses: List[Dict[str, Any]], image_width: int, image_height: int
) -> List[ResponseComputerToolCallParam | ResponseOutputMessageParam]:
"""Convert parsed UITARS responses to computer actions."""
computer_actions = []
for response in parsed_responses:
action_type = response.get("action_type")
action_inputs = response.get("action_inputs", {})
if action_type == "finished":
finished_text = action_inputs.get("content", "Task completed successfully.")
computer_actions.append(make_output_text_item(finished_text))
break
elif action_type == "wait":
computer_actions.append(make_wait_item())
elif action_type == "call_user":
computer_actions.append(
make_output_text_item("I need assistance from the user to proceed with this task.")
)
elif action_type in ["click", "left_single"]:
start_box = action_inputs.get("start_box")
if start_box:
coords = eval(start_box)
x = int((coords[0] + coords[2]) / 2 * image_width)
y = int((coords[1] + coords[3]) / 2 * image_height)
computer_actions.append(make_click_item(x, y, "left"))
elif action_type == "double_click":
start_box = action_inputs.get("start_box")
if start_box:
coords = eval(start_box)
x = int((coords[0] + coords[2]) / 2 * image_width)
y = int((coords[1] + coords[3]) / 2 * image_height)
computer_actions.append(make_double_click_item(x, y))
elif action_type == "right_click":
start_box = action_inputs.get("start_box")
if start_box:
coords = eval(start_box)
x = int((coords[0] + coords[2]) / 2 * image_width)
y = int((coords[1] + coords[3]) / 2 * image_height)
computer_actions.append(make_click_item(x, y, "right"))
elif action_type == "type":
content = action_inputs.get("content", "")
computer_actions.append(make_type_item(content))
elif action_type == "hotkey":
key = action_inputs.get("key", "")
keys = key.split()
computer_actions.append(make_keypress_item(keys))
elif action_type == "press":
key = action_inputs.get("key", "")
computer_actions.append(make_keypress_item([key]))
elif action_type == "scroll":
start_box = action_inputs.get("start_box")
direction = action_inputs.get("direction", "down")
if start_box:
coords = eval(start_box)
x = int((coords[0] + coords[2]) / 2 * image_width)
y = int((coords[1] + coords[3]) / 2 * image_height)
else:
x, y = image_width // 2, image_height // 2
scroll_y = 5 if "up" in direction.lower() else -5
computer_actions.append(make_scroll_item(x, y, 0, scroll_y))
elif action_type == "drag":
start_box = action_inputs.get("start_box")
end_box = action_inputs.get("end_box")
if start_box and end_box:
start_coords = eval(start_box)
end_coords = eval(end_box)
start_x = int((start_coords[0] + start_coords[2]) / 2 * image_width)
start_y = int((start_coords[1] + start_coords[3]) / 2 * image_height)
end_x = int((end_coords[0] + end_coords[2]) / 2 * image_width)
end_y = int((end_coords[1] + end_coords[3]) / 2 * image_height)
path = [{"x": start_x, "y": start_y}, {"x": end_x, "y": end_y}]
computer_actions.append(make_drag_item(path))
return computer_actions
def pil_to_base64(image: Image.Image) -> str:
"""Convert PIL image to base64 string."""
buffer = BytesIO()
image.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode("utf-8")
def process_image_for_uitars(
image_data: str, max_pixels: int = MAX_PIXELS, min_pixels: int = MIN_PIXELS
) -> tuple[Image.Image, int, int]:
"""Process image for UITARS model input."""
# Decode base64 image
if image_data.startswith("data:image"):
image_data = image_data.split(",")[1]
image_bytes = base64.b64decode(image_data)
image = Image.open(BytesIO(image_bytes))
original_width, original_height = image.size
# Resize image according to UITARS requirements
if image.width * image.height > max_pixels:
resize_factor = math.sqrt(max_pixels / (image.width * image.height))
width = int(image.width * resize_factor)
height = int(image.height * resize_factor)
image = image.resize((width, height))
if image.width * image.height < min_pixels:
resize_factor = math.sqrt(min_pixels / (image.width * image.height))
width = math.ceil(image.width * resize_factor)
height = math.ceil(image.height * resize_factor)
image = image.resize((width, height))
if image.mode != "RGB":
image = image.convert("RGB")
return image, original_width, original_height
def sanitize_message(msg: Any) -> Any:
"""Return a copy of the message with image_url ommited within content parts"""
if isinstance(msg, dict):
result = {}
for key, value in msg.items():
if key == "content" and isinstance(value, list):
result[key] = [
(
{k: v for k, v in item.items() if k != "image_url"}
if isinstance(item, dict)
else item
)
for item in value
]
else:
result[key] = value
return result
elif isinstance(msg, list):
return [sanitize_message(item) for item in msg]
else:
return msg
def convert_uitars_messages_to_litellm(messages: Messages) -> List[Dict[str, Any]]:
"""
Convert UITARS internal message format back to LiteLLM format.
This function processes reasoning, computer_call, and computer_call_output messages
and converts them to the appropriate LiteLLM assistant message format.
Args:
messages: List of UITARS internal messages
Returns:
List of LiteLLM formatted messages
"""
litellm_messages = []
current_assistant_content = []
for message in messages:
if isinstance(message, dict):
message_type = message.get("type")
if message_type == "reasoning":
# Extract reasoning text from summary
summary = message.get("summary", [])
if summary and isinstance(summary, list):
for summary_item in summary:
if (
isinstance(summary_item, dict)
and summary_item.get("type") == "summary_text"
):
reasoning_text = summary_item.get("text", "")
if reasoning_text:
current_assistant_content.append(f"Thought: {reasoning_text}")
elif message_type == "computer_call":
# Convert computer action to UITARS action format
action = message.get("action", {})
action_type = action.get("type")
if action_type == "click":
x, y = action.get("x", 0), action.get("y", 0)
button = action.get("button", "left")
if button == "left":
action_text = f"Action: click(start_box='({x},{y})')"
elif button == "right":
action_text = f"Action: right_single(start_box='({x},{y})')"
else:
action_text = f"Action: click(start_box='({x},{y})')"
elif action_type == "double_click":
x, y = action.get("x", 0), action.get("y", 0)
action_text = f"Action: left_double(start_box='({x},{y})')"
elif action_type == "drag":
start_x, start_y = action.get("start_x", 0), action.get("start_y", 0)
end_x, end_y = action.get("end_x", 0), action.get("end_y", 0)
action_text = f"Action: drag(start_box='({start_x},{start_y})', end_box='({end_x},{end_y})')"
elif action_type == "key":
key = action.get("key", "")
action_text = f"Action: hotkey(key='{key}')"
elif action_type == "type":
text = action.get("text", "")
# Escape single quotes in the text
escaped_text = escape_single_quotes(text)
action_text = f"Action: type(content='{escaped_text}')"
elif action_type == "scroll":
x, y = action.get("x", 0), action.get("y", 0)
direction = action.get("direction", "down")
action_text = f"Action: scroll(start_box='({x},{y})', direction='{direction}')"
elif action_type == "wait":
action_text = "Action: wait()"
else:
# Fallback for unknown action types
action_text = f"Action: {action_type}({action})"
current_assistant_content.append(action_text)
# When we hit a computer_call_output, finalize the current assistant message
if current_assistant_content:
litellm_messages.append(
{
"role": "assistant",
"content": [
{"type": "text", "text": "\n".join(current_assistant_content)}
],
}
)
current_assistant_content = []
elif message_type == "computer_call_output":
# Add screenshot from computer call output
output = message.get("output", {})
if isinstance(output, dict) and output.get("type") == "input_image":
image_url = output.get("image_url", "")
if image_url:
litellm_messages.append(
{
"role": "user",
"content": [{"type": "image_url", "image_url": {"url": image_url}}],
}
)
elif message.get("role") == "user":
# # Handle user messages
# content = message.get("content", "")
# if isinstance(content, str):
# litellm_messages.append({
# "role": "user",
# "content": content
# })
# elif isinstance(content, list):
# litellm_messages.append({
# "role": "user",
# "content": content
# })
pass
# Add any remaining assistant content
if current_assistant_content:
litellm_messages.append({"role": "assistant", "content": current_assistant_content})
return litellm_messages
@register_agent(models=r"(?i).*ui-?tars.*", priority=-1)
class UITARSConfig:
"""
UITARS agent configuration using liteLLM for ByteDance-Seed/UI-TARS-1.5-7B model.
Supports UITARS vision-language models for computer control.
"""
async def predict_step(
self,
messages: List[Dict[str, Any]],
model: str,
tools: Optional[List[Dict[str, Any]]] = None,
max_retries: Optional[int] = None,
stream: bool = False,
computer_handler=None,
use_prompt_caching: Optional[bool] = False,
_on_api_start=None,
_on_api_end=None,
_on_usage=None,
_on_screenshot=None,
**kwargs,
) -> Dict[str, Any]:
"""
Predict the next step based on input messages.
Args:
messages: Input messages following Responses format
model: Model name to use
tools: Optional list of tool schemas
max_retries: Maximum number of retries
stream: Whether to stream responses
computer_handler: Computer handler instance
_on_api_start: Callback for API start
_on_api_end: Callback for API end
_on_usage: Callback for usage tracking
_on_screenshot: Callback for screenshot events
**kwargs: Additional arguments
Returns:
Dictionary with "output" (output items) and "usage" array
"""
tools = tools or []
# Create response items
response_items = []
# Find computer tool for screen dimensions
computer_tool = None
for tool_schema in tools:
if tool_schema["type"] == "computer":
computer_tool = tool_schema["computer"]
break
# Get screen dimensions
screen_width, screen_height = 1024, 768
if computer_tool:
try:
screen_width, screen_height = await computer_tool.get_dimensions()
except:
pass
# Process messages to extract instruction and image
instruction = ""
image_data = None
# Convert messages to list if string
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# Extract instruction and latest screenshot
for message in reversed(messages):
if isinstance(message, dict):
content = message.get("content", "")
# Handle different content formats
if isinstance(content, str):
if not instruction and message.get("role") == "user":
instruction = content
elif isinstance(content, list):
for item in content:
if isinstance(item, dict):
if item.get("type") == "text" and not instruction:
instruction = item.get("text", "")
elif item.get("type") == "image_url" and not image_data:
image_url = item.get("image_url", {})
if isinstance(image_url, dict):
image_data = image_url.get("url", "")
else:
image_data = image_url
# Also check for computer_call_output with screenshots
if message.get("type") == "computer_call_output" and not image_data:
output = message.get("output", {})
if isinstance(output, dict) and output.get("type") == "input_image":
image_data = output.get("image_url", "")
if instruction and image_data:
break
if not instruction:
instruction = (
"Help me complete this task by analyzing the screen and taking appropriate actions."
)
# Create prompt
user_prompt = UITARS_PROMPT_TEMPLATE.format(
instruction=instruction, action_space=UITARS_ACTION_SPACE, language="English"
)
# Convert conversation history to LiteLLM format
history_messages = convert_uitars_messages_to_litellm(messages)
# Prepare messages for liteLLM
litellm_messages = [{"role": "system", "content": "You are a helpful assistant."}]
# Add current user instruction with screenshot
current_user_message = {
"role": "user",
"content": [
{"type": "text", "text": user_prompt},
],
}
litellm_messages.append(current_user_message)
# Process image for UITARS
if not image_data:
# Take screenshot if none found in messages
if computer_handler:
image_data = await computer_handler.screenshot()
await _on_screenshot(image_data, "screenshot_before")
# Add screenshot to output items so it can be retained in history
response_items.append(make_input_image_item(image_data))
else:
raise ValueError("No screenshot found in messages and no computer_handler provided")
processed_image, original_width, original_height = process_image_for_uitars(image_data)
encoded_image = pil_to_base64(processed_image)
# Add conversation history
if history_messages:
litellm_messages.extend(history_messages)
else:
litellm_messages.append(
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{encoded_image}"},
}
],
}
)
# Prepare API call kwargs
api_kwargs = {
"model": model,
"messages": litellm_messages,
"max_tokens": kwargs.get("max_tokens", 500),
"temperature": kwargs.get("temperature", 0.0),
"do_sample": kwargs.get("temperature", 0.0) > 0.0,
"num_retries": max_retries,
**{k: v for k, v in kwargs.items() if k not in ["max_tokens", "temperature"]},
}
# Call API start hook
if _on_api_start:
await _on_api_start(api_kwargs)
# Call liteLLM with UITARS model
response = await litellm.acompletion(**api_kwargs)
# Call API end hook
if _on_api_end:
await _on_api_end(api_kwargs, response)
# Extract response content
response_content = response.choices[0].message.content.strip() # type: ignore
# Parse UITARS response
parsed_responses = parse_uitars_response(response_content, original_width, original_height)
# Convert to computer actions
computer_actions = convert_to_computer_actions(
parsed_responses, original_width, original_height
)
# Add computer actions to response items
thought = parsed_responses[0].get("thought", "")
if thought:
response_items.append(make_reasoning_item(thought))
response_items.extend(computer_actions)
# Extract usage information
response_usage = {
**LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage(
response.usage
).model_dump(),
"response_cost": response._hidden_params.get("response_cost", 0.0),
}
if _on_usage:
await _on_usage(response_usage)
# Create agent response
agent_response = {"output": response_items, "usage": response_usage}
return agent_response
async def predict_click(
self, model: str, image_b64: str, instruction: str, **kwargs
) -> Optional[Tuple[int, int]]:
"""
Predict click coordinates based on image and instruction.
UITARS supports click prediction through its action parsing.
Args:
model: Model name to use
image_b64: Base64 encoded image
instruction: Instruction for where to click
Returns:
Tuple with (x, y) coordinates or None
"""
try:
# Create prompt using grounding template
user_prompt = GROUNDING_UITARS_PROMPT_TEMPLATE.format(instruction=instruction)
# Process image for UITARS
processed_image, original_width, original_height = process_image_for_uitars(image_b64)
encoded_image = pil_to_base64(processed_image)
# Prepare messages for liteLLM
litellm_messages = [
{"role": "system", "content": "You are a helpful assistant."},
{
"role": "user",
"content": [
{"type": "text", "text": user_prompt},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{encoded_image}"},
},
],
},
]
# Prepare API call kwargs
api_kwargs = {
"model": model,
"messages": litellm_messages,
"max_tokens": 2056,
"temperature": 0.0,
"do_sample": False,
}
api_kwargs.update({k: v for k, v in (kwargs or {}).items()})
# Call liteLLM with UITARS model
response = await litellm.acompletion(**api_kwargs)
# Extract response content
response_content = response.choices[0].message.content.strip() # type: ignore
print(response_content)
# Parse the response to extract click coordinates
# Look for click action with coordinates (with special tokens)
click_pattern = r"click\(point='<\|box_start\|>\((\d+),(\d+)\)<\|box_end\|>'\)"
match = re.search(click_pattern, response_content)
# Fallback: Look for simpler format without special tokens
if not match:
# Pattern for: click(start_box='(x,y)') or click(point='(x,y)')
fallback_pattern = r"click\((?:start_box|point)='\((\d+),(\d+)\)'\)"
match = re.search(fallback_pattern, response_content)
if match:
x, y = int(match.group(1)), int(match.group(2))
# Scale coordinates back to original image dimensions
scale_x = original_width / processed_image.width
scale_y = original_height / processed_image.height
scaled_x = int(x * scale_x)
scaled_y = int(y * scale_y)
return (scaled_x, scaled_y)
return None
except Exception as e:
# Log error and return None
print(f"Error in predict_click: {e}")
return None
def get_capabilities(self) -> List[AgentCapability]:
"""
Get list of capabilities supported by this agent config.
Returns:
List of capability strings
"""
return ["step", "click"]
```
--------------------------------------------------------------------------------
/libs/lume/src/VM/VM.swift:
--------------------------------------------------------------------------------
```swift
import Foundation
// MARK: - Support Types
/// Base context for virtual machine directory and configuration
struct VMDirContext {
let dir: VMDirectory
var config: VMConfig
let home: Home
let storage: String?
func saveConfig() throws {
try dir.saveConfig(config)
}
var name: String { dir.name }
var initialized: Bool { dir.initialized() }
var diskPath: Path { dir.diskPath }
var nvramPath: Path { dir.nvramPath }
func setDisk(_ size: UInt64) throws {
try dir.setDisk(size)
}
func finalize(to name: String) throws {
let vmDir = try home.getVMDirectory(name)
try FileManager.default.moveItem(at: dir.dir.url, to: vmDir.dir.url)
}
}
// MARK: - Base VM Class
/// Base class for virtual machine implementations
@MainActor
class VM {
// MARK: - Properties
var vmDirContext: VMDirContext
@MainActor
private var virtualizationService: VMVirtualizationService?
private let vncService: VNCService
internal let virtualizationServiceFactory:
(VMVirtualizationServiceContext) throws -> VMVirtualizationService
private let vncServiceFactory: (VMDirectory) -> VNCService
// MARK: - Initialization
init(
vmDirContext: VMDirContext,
virtualizationServiceFactory: @escaping (VMVirtualizationServiceContext) throws ->
VMVirtualizationService = { try DarwinVirtualizationService(configuration: $0) },
vncServiceFactory: @escaping (VMDirectory) -> VNCService = {
DefaultVNCService(vmDirectory: $0)
}
) {
self.vmDirContext = vmDirContext
self.virtualizationServiceFactory = virtualizationServiceFactory
self.vncServiceFactory = vncServiceFactory
// Initialize VNC service
self.vncService = vncServiceFactory(vmDirContext.dir)
}
// MARK: - VM State Management
private var isRunning: Bool {
// First check if we have a MAC address
guard let macAddress = vmDirContext.config.macAddress else {
Logger.info(
"Cannot check if VM is running: macAddress is nil",
metadata: ["name": vmDirContext.name])
return false
}
// Then check if we have an IP address
guard let ipAddress = DHCPLeaseParser.getIPAddress(forMAC: macAddress) else {
return false
}
// Then check if it's reachable
return NetworkUtils.isReachable(ipAddress: ipAddress)
}
var details: VMDetails {
let isRunning: Bool = self.isRunning
let vncUrl = isRunning ? getVNCUrl() : nil
// Safely get disk size with fallback
let diskSizeValue: DiskSize
do {
diskSizeValue = try getDiskSize()
} catch {
Logger.error(
"Failed to get disk size",
metadata: ["name": vmDirContext.name, "error": "\(error)"])
// Provide a fallback value to avoid crashing
diskSizeValue = DiskSize(allocated: 0, total: vmDirContext.config.diskSize ?? 0)
}
// Safely access MAC address
let macAddress = vmDirContext.config.macAddress
let ipAddress: String? =
isRunning && macAddress != nil ? DHCPLeaseParser.getIPAddress(forMAC: macAddress!) : nil
return VMDetails(
name: vmDirContext.name,
os: getOSType(),
cpuCount: vmDirContext.config.cpuCount ?? 0,
memorySize: vmDirContext.config.memorySize ?? 0,
diskSize: diskSizeValue,
display: vmDirContext.config.display.string,
status: isRunning ? "running" : "stopped",
vncUrl: vncUrl,
ipAddress: ipAddress,
locationName: vmDirContext.storage ?? "default"
)
}
// MARK: - VM Lifecycle Management
func run(
noDisplay: Bool, sharedDirectories: [SharedDirectory], mount: Path?, vncPort: Int = 0,
recoveryMode: Bool = false, usbMassStoragePaths: [Path]? = nil
) async throws {
Logger.info(
"VM.run method called",
metadata: [
"name": vmDirContext.name,
"noDisplay": "\(noDisplay)",
"recoveryMode": "\(recoveryMode)",
])
guard vmDirContext.initialized else {
Logger.error("VM not initialized", metadata: ["name": vmDirContext.name])
throw VMError.notInitialized(vmDirContext.name)
}
guard let cpuCount = vmDirContext.config.cpuCount,
let memorySize = vmDirContext.config.memorySize
else {
Logger.error("VM missing cpuCount or memorySize", metadata: ["name": vmDirContext.name])
throw VMError.notInitialized(vmDirContext.name)
}
// Try to acquire lock on config file
Logger.info(
"Attempting to acquire lock on config file",
metadata: [
"path": vmDirContext.dir.configPath.path,
"name": vmDirContext.name,
])
var fileHandle = try FileHandle(forWritingTo: vmDirContext.dir.configPath.url)
if flock(fileHandle.fileDescriptor, LOCK_EX | LOCK_NB) != 0 {
try? fileHandle.close()
Logger.error(
"VM already running (failed to acquire lock)", metadata: ["name": vmDirContext.name]
)
// Try to forcibly clear the lock before giving up
Logger.info("Attempting emergency lock cleanup", metadata: ["name": vmDirContext.name])
unlockConfigFile()
// Try one more time to acquire the lock
if let retryHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url),
flock(retryHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0
{
Logger.info("Emergency lock cleanup worked", metadata: ["name": vmDirContext.name])
// Continue with a fresh file handle
try? retryHandle.close()
// Get a completely new file handle to be safe
guard let newHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url)
else {
throw VMError.internalError("Failed to open file handle after lock cleanup")
}
// Update our main file handle
fileHandle = newHandle
} else {
// If we still can't get the lock, give up
Logger.error(
"Could not acquire lock even after emergency cleanup",
metadata: ["name": vmDirContext.name])
throw VMError.alreadyRunning(vmDirContext.name)
}
}
Logger.info("Successfully acquired lock", metadata: ["name": vmDirContext.name])
Logger.info(
"Running VM with configuration",
metadata: [
"name": vmDirContext.name,
"cpuCount": "\(cpuCount)",
"memorySize": "\(memorySize)",
"diskSize": "\(vmDirContext.config.diskSize ?? 0)",
"sharedDirectories": sharedDirectories.map { $0.string }.joined(separator: ", "),
"recoveryMode": "\(recoveryMode)",
])
// Create and configure the VM
do {
Logger.info(
"Creating virtualization service context", metadata: ["name": vmDirContext.name])
let config = try createVMVirtualizationServiceContext(
cpuCount: cpuCount,
memorySize: memorySize,
display: vmDirContext.config.display.string,
sharedDirectories: sharedDirectories,
mount: mount,
recoveryMode: recoveryMode,
usbMassStoragePaths: usbMassStoragePaths
)
Logger.info(
"Successfully created virtualization service context",
metadata: ["name": vmDirContext.name])
Logger.info(
"Initializing virtualization service", metadata: ["name": vmDirContext.name])
virtualizationService = try virtualizationServiceFactory(config)
Logger.info(
"Successfully initialized virtualization service",
metadata: ["name": vmDirContext.name])
Logger.info(
"Setting up VNC",
metadata: [
"name": vmDirContext.name,
"noDisplay": "\(noDisplay)",
"port": "\(vncPort)",
])
let vncInfo = try await setupSession(
noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories)
Logger.info(
"VNC setup successful", metadata: ["name": vmDirContext.name, "vncInfo": vncInfo])
// Start the VM
guard let service = virtualizationService else {
Logger.error("Virtualization service is nil", metadata: ["name": vmDirContext.name])
throw VMError.internalError("Virtualization service not initialized")
}
Logger.info(
"Starting VM via virtualization service", metadata: ["name": vmDirContext.name])
try await service.start()
Logger.info("VM started successfully", metadata: ["name": vmDirContext.name])
while true {
try await Task.sleep(nanoseconds: UInt64(1e9))
}
} catch {
Logger.error(
"Failed in VM.run",
metadata: [
"name": vmDirContext.name,
"error": error.localizedDescription,
"errorType": "\(type(of: error))",
])
virtualizationService = nil
vncService.stop()
// Release lock
Logger.info("Releasing file lock after error", metadata: ["name": vmDirContext.name])
flock(fileHandle.fileDescriptor, LOCK_UN)
try? fileHandle.close()
// Additionally, perform our aggressive unlock to ensure no locks remain
Logger.info(
"Performing additional lock cleanup after error",
metadata: ["name": vmDirContext.name])
unlockConfigFile()
throw error
}
}
@MainActor
func stop() async throws {
guard vmDirContext.initialized else {
throw VMError.notInitialized(vmDirContext.name)
}
Logger.info("Attempting to stop VM", metadata: ["name": vmDirContext.name])
// If we have a virtualization service, try to stop it cleanly first
if let service = virtualizationService {
do {
Logger.info(
"Stopping VM via virtualization service", metadata: ["name": vmDirContext.name])
try await service.stop()
virtualizationService = nil
vncService.stop()
Logger.info(
"VM stopped successfully via virtualization service",
metadata: ["name": vmDirContext.name])
// Try to ensure any existing locks are released
Logger.info(
"Attempting to clear any locks on config file",
metadata: ["name": vmDirContext.name])
unlockConfigFile()
return
} catch let error {
Logger.error(
"Failed to stop VM via virtualization service",
metadata: [
"name": vmDirContext.name,
"error": error.localizedDescription,
])
// Fall through to process termination
}
}
// Try to open config file to get file descriptor
Logger.info(
"Attempting to access config file lock",
metadata: [
"path": vmDirContext.dir.configPath.path,
"name": vmDirContext.name,
])
let fileHandle = try? FileHandle(forReadingFrom: vmDirContext.dir.configPath.url)
guard let fileHandle = fileHandle else {
Logger.info(
"Failed to open config file - VM may not be running",
metadata: ["name": vmDirContext.name])
// Even though we couldn't open the file, try to force unlock anyway
unlockConfigFile()
throw VMError.notRunning(vmDirContext.name)
}
// Get the PID of the process holding the lock using lsof command
Logger.info(
"Finding process holding lock on config file", metadata: ["name": vmDirContext.name])
let task = Process()
task.executableURL = URL(fileURLWithPath: "/usr/sbin/lsof")
task.arguments = ["-F", "p", vmDirContext.dir.configPath.path]
let outputPipe = Pipe()
task.standardOutput = outputPipe
try task.run()
task.waitUntilExit()
let outputData = try outputPipe.fileHandleForReading.readToEnd() ?? Data()
guard let outputString = String(data: outputData, encoding: .utf8),
let pidString = outputString.split(separator: "\n").first?.dropFirst(), // Drop the 'p' prefix
let pid = pid_t(pidString)
else {
try? fileHandle.close()
Logger.info(
"Failed to find process holding lock - VM may not be running",
metadata: ["name": vmDirContext.name])
// Even though we couldn't find the process, try to force unlock
unlockConfigFile()
throw VMError.notRunning(vmDirContext.name)
}
Logger.info(
"Found process \(pid) holding lock on config file",
metadata: ["name": vmDirContext.name])
// First try graceful shutdown with SIGINT
if kill(pid, SIGINT) == 0 {
Logger.info("Sent SIGINT to VM process \(pid)", metadata: ["name": vmDirContext.name])
}
// Wait for process to stop with timeout
var attempts = 0
while attempts < 10 {
Logger.info(
"Waiting for process \(pid) to terminate (attempt \(attempts + 1)/10)",
metadata: ["name": vmDirContext.name])
try await Task.sleep(nanoseconds: 1_000_000_000)
// Check if process still exists
if kill(pid, 0) != 0 {
// Process is gone, do final cleanup
Logger.info("Process \(pid) has terminated", metadata: ["name": vmDirContext.name])
virtualizationService = nil
vncService.stop()
try? fileHandle.close()
// Force unlock the config file
unlockConfigFile()
Logger.info(
"VM stopped successfully via process termination",
metadata: ["name": vmDirContext.name])
return
}
attempts += 1
}
// If graceful shutdown failed, force kill the process
Logger.info(
"Graceful shutdown failed, forcing termination of process \(pid)",
metadata: ["name": vmDirContext.name])
if kill(pid, SIGKILL) == 0 {
Logger.info("Sent SIGKILL to process \(pid)", metadata: ["name": vmDirContext.name])
// Wait a moment for the process to be fully killed
try await Task.sleep(nanoseconds: 2_000_000_000)
// Do final cleanup
virtualizationService = nil
vncService.stop()
try? fileHandle.close()
// Force unlock the config file
unlockConfigFile()
Logger.info("VM forcefully stopped", metadata: ["name": vmDirContext.name])
return
}
// If we get here, something went very wrong
try? fileHandle.close()
Logger.error(
"Failed to stop VM - could not terminate process \(pid)",
metadata: ["name": vmDirContext.name])
// As a last resort, try to force unlock
unlockConfigFile()
throw VMError.internalError("Failed to stop VM process")
}
// Helper method to forcibly clear any locks on the config file
private func unlockConfigFile() {
Logger.info(
"Forcibly clearing locks on config file",
metadata: [
"path": vmDirContext.dir.configPath.path,
"name": vmDirContext.name,
])
// First attempt: standard unlock methods
if let fileHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) {
// Use F_GETLK and F_SETLK to check and clear locks
var lockInfo = flock()
lockInfo.l_type = Int16(F_UNLCK)
lockInfo.l_whence = Int16(SEEK_SET)
lockInfo.l_start = 0
lockInfo.l_len = 0
// Try to unlock the file using fcntl
_ = fcntl(fileHandle.fileDescriptor, F_SETLK, &lockInfo)
// Also try the regular flock method
flock(fileHandle.fileDescriptor, LOCK_UN)
try? fileHandle.close()
Logger.info("Standard unlock attempts performed", metadata: ["name": vmDirContext.name])
}
// Second attempt: try to acquire and immediately release a fresh lock
if let tempHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) {
if flock(tempHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0 {
Logger.info(
"Successfully acquired and released lock to reset state",
metadata: ["name": vmDirContext.name])
flock(tempHandle.fileDescriptor, LOCK_UN)
} else {
Logger.info(
"Could not acquire lock for resetting - may still be locked",
metadata: ["name": vmDirContext.name])
}
try? tempHandle.close()
}
// Third attempt (most aggressive): copy the config file, remove the original, and restore
Logger.info(
"Trying aggressive method: backup and restore config file",
metadata: ["name": vmDirContext.name])
// Only proceed if the config file exists
let fileManager = FileManager.default
let configPath = vmDirContext.dir.configPath.path
let backupPath = configPath + ".backup"
if fileManager.fileExists(atPath: configPath) {
// Create a backup of the config file
if let configData = try? Data(contentsOf: URL(fileURLWithPath: configPath)) {
// Make backup
try? configData.write(to: URL(fileURLWithPath: backupPath))
// Remove the original file to clear all locks
try? fileManager.removeItem(atPath: configPath)
Logger.info(
"Removed original config file to clear locks",
metadata: ["name": vmDirContext.name])
// Wait a moment for OS to fully release resources
Thread.sleep(forTimeInterval: 0.1)
// Restore from backup
try? configData.write(to: URL(fileURLWithPath: configPath))
Logger.info(
"Restored config file from backup", metadata: ["name": vmDirContext.name])
} else {
Logger.error(
"Could not read config file content for backup",
metadata: ["name": vmDirContext.name])
}
} else {
Logger.info(
"Config file does not exist, cannot perform aggressive unlock",
metadata: ["name": vmDirContext.name])
}
// Final check
if let finalHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) {
let lockResult = flock(finalHandle.fileDescriptor, LOCK_EX | LOCK_NB)
if lockResult == 0 {
Logger.info(
"Lock successfully cleared - verified by acquiring test lock",
metadata: ["name": vmDirContext.name])
flock(finalHandle.fileDescriptor, LOCK_UN)
} else {
Logger.info(
"Lock still present after all clearing attempts",
metadata: ["name": vmDirContext.name, "severity": "warning"])
}
try? finalHandle.close()
}
}
// MARK: - Resource Management
func updateVMConfig(vmConfig: VMConfig) throws {
vmDirContext.config = vmConfig
try vmDirContext.saveConfig()
}
private func getDiskSize() throws -> DiskSize {
let resourceValues = try vmDirContext.diskPath.url.resourceValues(forKeys: [
.totalFileAllocatedSizeKey,
.totalFileSizeKey,
])
guard let allocated = resourceValues.totalFileAllocatedSize,
let total = resourceValues.totalFileSize
else {
throw VMConfigError.invalidDiskSize
}
return DiskSize(allocated: UInt64(allocated), total: UInt64(total))
}
func resizeDisk(_ newSize: UInt64) throws {
let currentSize = try getDiskSize()
guard newSize >= currentSize.total else {
throw VMError.resizeTooSmall(current: currentSize.total, requested: newSize)
}
try setDiskSize(newSize)
}
func setCpuCount(_ newCpuCount: Int) throws {
guard !isRunning else {
throw VMError.alreadyRunning(vmDirContext.name)
}
vmDirContext.config.setCpuCount(newCpuCount)
try vmDirContext.saveConfig()
}
func setMemorySize(_ newMemorySize: UInt64) throws {
guard !isRunning else {
throw VMError.alreadyRunning(vmDirContext.name)
}
vmDirContext.config.setMemorySize(newMemorySize)
try vmDirContext.saveConfig()
}
func setDiskSize(_ newDiskSize: UInt64) throws {
try vmDirContext.setDisk(newDiskSize)
vmDirContext.config.setDiskSize(newDiskSize)
try vmDirContext.saveConfig()
}
func setDisplay(_ newDisplay: String) throws {
guard !isRunning else {
throw VMError.alreadyRunning(vmDirContext.name)
}
guard let display: VMDisplayResolution = VMDisplayResolution(string: newDisplay) else {
throw VMError.invalidDisplayResolution(newDisplay)
}
vmDirContext.config.setDisplay(display)
try vmDirContext.saveConfig()
}
func setHardwareModel(_ newHardwareModel: Data) throws {
guard !isRunning else {
throw VMError.alreadyRunning(vmDirContext.name)
}
vmDirContext.config.setHardwareModel(newHardwareModel)
try vmDirContext.saveConfig()
}
func setMachineIdentifier(_ newMachineIdentifier: Data) throws {
guard !isRunning else {
throw VMError.alreadyRunning(vmDirContext.name)
}
vmDirContext.config.setMachineIdentifier(newMachineIdentifier)
try vmDirContext.saveConfig()
}
func setMacAddress(_ newMacAddress: String) throws {
guard !isRunning else {
throw VMError.alreadyRunning(vmDirContext.name)
}
vmDirContext.config.setMacAddress(newMacAddress)
try vmDirContext.saveConfig()
}
// MARK: - VNC Management
func getVNCUrl() -> String? {
return vncService.url
}
/// Sets up the VNC service and returns the VNC URL
private func startVNCService(port: Int = 0) async throws -> String {
guard let service = virtualizationService else {
throw VMError.internalError("Virtualization service not initialized")
}
try await vncService.start(port: port, virtualMachine: service.getVirtualMachine())
guard let url = vncService.url else {
throw VMError.vncNotConfigured
}
return url
}
/// Saves the session information including shared directories to disk
private func saveSessionData(url: String, sharedDirectories: [SharedDirectory]) {
do {
let session = VNCSession(
url: url, sharedDirectories: sharedDirectories.isEmpty ? nil : sharedDirectories)
try vmDirContext.dir.saveSession(session)
Logger.info(
"Saved VNC session with shared directories",
metadata: [
"count": "\(sharedDirectories.count)",
"dirs": "\(sharedDirectories.map { $0.hostPath }.joined(separator: ", "))",
"sessionsPath": "\(vmDirContext.dir.sessionsPath.path)",
])
} catch {
Logger.error("Failed to save VNC session", metadata: ["error": "\(error)"])
}
}
/// Main session setup method that handles VNC and persists session data
private func setupSession(
noDisplay: Bool, port: Int = 0, sharedDirectories: [SharedDirectory] = []
) async throws -> String {
// Start the VNC service and get the URL
let url = try await startVNCService(port: port)
// Save the session data
saveSessionData(url: url, sharedDirectories: sharedDirectories)
// Open the VNC client if needed
if !noDisplay {
Logger.info("Starting VNC session", metadata: ["name": vmDirContext.name])
try await vncService.openClient(url: url)
}
return url
}
// MARK: - Platform-specific Methods
func getOSType() -> String {
fatalError("Must be implemented by subclass")
}
func createVMVirtualizationServiceContext(
cpuCount: Int,
memorySize: UInt64,
display: String,
sharedDirectories: [SharedDirectory] = [],
mount: Path? = nil,
recoveryMode: Bool = false,
usbMassStoragePaths: [Path]? = nil
) throws -> VMVirtualizationServiceContext {
// This is a diagnostic log to track actual file paths on disk for debugging
try validateDiskState()
return VMVirtualizationServiceContext(
cpuCount: cpuCount,
memorySize: memorySize,
display: display,
sharedDirectories: sharedDirectories,
mount: mount,
hardwareModel: vmDirContext.config.hardwareModel,
machineIdentifier: vmDirContext.config.machineIdentifier,
macAddress: vmDirContext.config.macAddress!,
diskPath: vmDirContext.diskPath,
nvramPath: vmDirContext.nvramPath,
recoveryMode: recoveryMode,
usbMassStoragePaths: usbMassStoragePaths
)
}
/// Validates the disk state to help diagnose storage attachment issues
private func validateDiskState() throws {
// Check disk image state
let diskPath = vmDirContext.diskPath.path
let diskExists = FileManager.default.fileExists(atPath: diskPath)
var diskSize: UInt64 = 0
var diskPermissions = ""
if diskExists {
if let attrs = try? FileManager.default.attributesOfItem(atPath: diskPath) {
diskSize = attrs[.size] as? UInt64 ?? 0
let posixPerms = attrs[.posixPermissions] as? Int ?? 0
diskPermissions = String(format: "%o", posixPerms)
}
}
// Check disk container directory permissions
let diskDir = (diskPath as NSString).deletingLastPathComponent
let dirPerms =
try? FileManager.default.attributesOfItem(atPath: diskDir)[.posixPermissions] as? Int
?? 0
let dirPermsString = dirPerms != nil ? String(format: "%o", dirPerms!) : "unknown"
// Log detailed diagnostics
Logger.info(
"Validating VM disk state",
metadata: [
"diskPath": diskPath,
"diskExists": "\(diskExists)",
"diskSize":
"\(ByteCountFormatter.string(fromByteCount: Int64(diskSize), countStyle: .file))",
"diskPermissions": diskPermissions,
"dirPermissions": dirPermsString,
"locationName": vmDirContext.storage ?? "default",
])
if !diskExists {
Logger.error("VM disk image does not exist", metadata: ["diskPath": diskPath])
} else if diskSize == 0 {
Logger.error("VM disk image exists but has zero size", metadata: ["diskPath": diskPath])
}
}
func setup(
ipswPath: String,
cpuCount: Int,
memorySize: UInt64,
diskSize: UInt64,
display: String
) async throws {
fatalError("Must be implemented by subclass")
}
// MARK: - Finalization
/// Post-installation step to move the VM directory to the home directory
func finalize(to name: String, home: Home, storage: String? = nil) throws {
let vmDir = try home.getVMDirectory(name, storage: storage)
try FileManager.default.moveItem(at: vmDirContext.dir.dir.url, to: vmDir.dir.url)
}
// Method to run VM with additional USB mass storage devices
func runWithUSBStorage(
noDisplay: Bool, sharedDirectories: [SharedDirectory], mount: Path?, vncPort: Int = 0,
recoveryMode: Bool = false, usbImagePaths: [Path]
) async throws {
guard vmDirContext.initialized else {
throw VMError.notInitialized(vmDirContext.name)
}
guard let cpuCount = vmDirContext.config.cpuCount,
let memorySize = vmDirContext.config.memorySize
else {
throw VMError.notInitialized(vmDirContext.name)
}
// Try to acquire lock on config file
let fileHandle = try FileHandle(forWritingTo: vmDirContext.dir.configPath.url)
guard flock(fileHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0 else {
try? fileHandle.close()
throw VMError.alreadyRunning(vmDirContext.name)
}
Logger.info(
"Running VM with USB storage devices",
metadata: [
"cpuCount": "\(cpuCount)",
"memorySize": "\(memorySize)",
"diskSize": "\(vmDirContext.config.diskSize ?? 0)",
"usbImageCount": "\(usbImagePaths.count)",
"recoveryMode": "\(recoveryMode)",
])
// Create and configure the VM
do {
let config = try createVMVirtualizationServiceContext(
cpuCount: cpuCount,
memorySize: memorySize,
display: vmDirContext.config.display.string,
sharedDirectories: sharedDirectories,
mount: mount,
recoveryMode: recoveryMode,
usbMassStoragePaths: usbImagePaths
)
virtualizationService = try virtualizationServiceFactory(config)
let vncInfo = try await setupSession(
noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories)
Logger.info("VNC info", metadata: ["vncInfo": vncInfo])
// Start the VM
guard let service = virtualizationService else {
throw VMError.internalError("Virtualization service not initialized")
}
try await service.start()
while true {
try await Task.sleep(nanoseconds: UInt64(1e9))
}
} catch {
Logger.error(
"Failed to create/start VM with USB storage",
metadata: [
"error": "\(error)",
"errorType": "\(type(of: error))",
])
virtualizationService = nil
vncService.stop()
// Release lock
flock(fileHandle.fileDescriptor, LOCK_UN)
try? fileHandle.close()
throw error
}
}
}
```
--------------------------------------------------------------------------------
/blog/neurips-2025-cua-papers.md:
--------------------------------------------------------------------------------
```markdown
# NeurIPS 2025: 45 Computer-Use Agent Papers You Should Know About
<img alt="neurips" src="https://github.com/user-attachments/assets/bd649067-bb2c-45f4-827b-087021ec3ad7" />
If you're following the computer-use agent space, you already know that NeurIPS is where the most important work gets presented. But with thousands of papers across every area of machine learning, finding the ones relevant to CUAs means hours of filtering through proceedings, skimming abstracts, and hoping you don't miss something important.
We did that work for you. We're excited to announce that **Cua will be at NeurIPS 2025**, and we've compiled a curated list of **45 papers** focused specifically on Computer-Use Agents—covering benchmarks, safety, grounding, visual reasoning, and agent architectures.
## Why This Matters
Computer-use agents are evolving rapidly. This year's NeurIPS showcases several important developments:
**The benchmark landscape is maturing.** We're seeing comprehensive evaluations across macOS (macOSWorld), professional tools (VideoCAD), and real-world websites (REAL, TheAgentCompany). These aren't toy problems anymore—they're measuring what agents can actually do in production environments.
**Safety is becoming a first-class concern.** Multiple papers (OS-Harm, RiOSWorld, WASP, AgentDAM) are systematically documenting how agents fail when confronted with adversarial inputs, privacy requirements, or misuse scenarios. The findings are sobering: even frontier models often comply with harmful requests.
**Grounding remains the bottleneck.** Papers like GUI-Actor, GUI-G1, and SE-GUI are pushing the state of the art on mapping language to UI actions. The best approaches are achieving significant gains with surprisingly small models and datasets.
**Open-source is catching up.** OpenCUA's 72B model hits 45% on OSWorld-Verified, establishing that community-driven development can compete with proprietary systems.
## Highlights Worth Your Attention
A few papers stand out for their immediate relevance to anyone building or deploying computer-use agents:
- **macOSWorld** reveals a dramatic capability gap: proprietary agents achieve 30%+ success on macOS tasks while open-source models struggle below 5%.
- **TheAgentCompany** simulates a software company where agents browse, code, and communicate. The best agent completes 30% of tasks autonomously.
- **WASP** demonstrates that simple prompt injections deceive top-tier models in 86% of cases.
- **GUI-G1** shows that a 3B model can achieve 90.3% on ScreenSpot by fixing issues with chain-of-thought reasoning.
## Summary Statistics
| Category | Count |
| ------------------------------ | ----- |
| Benchmarks & Datasets | 18 |
| Safety & Security | 12 |
| Grounding & Visual Reasoning | 14 |
| Agent Architectures & Training | 11 |
| Adversarial Attacks | 8 |
**Total Papers:** 45
## Meet Us at NeurIPS
We'll be at NeurIPS in San Diego. If you're working on computer-use agents, building applications on top of CUA infrastructure, or just curious about where this space is heading, we'd love to connect.
- **Book a Meeting**: [cal.com/cua/neurips-slot](https://cal.com/cua/neurips-slot)
- **X/Twitter**: [@trycua](https://x.com/trycua)
- **Discord**: [discord.gg/cua-ai](https://discord.gg/cua-ai)
---
# The Papers
## 1. macOSWorld: A Multilingual Interactive Benchmark for GUI Agents
**Summary:** The first comprehensive benchmark for evaluating GUI agents on macOS. Features 202 multilingual interactive tasks across 30 applications (28 macOS-exclusive), with support for 5 languages (English, Chinese, Arabic, Japanese, Russian). Reveals a dramatic gap: proprietary agents achieve 30%+ success rate while open-source models lag below 5%. Also includes safety benchmarking for deception attacks.
**Key Findings:**
- Proprietary computer-use agents lead at above 30% success rate
- Open-source lightweight models struggle below 5%, highlighting need for macOS domain adaptation
- Multilingual benchmarks expose weaknesses, especially in Arabic (28.8% degradation vs English)
- Deception attacks are a general vulnerability requiring immediate attention
**Poster:** https://neurips.cc/virtual/2025/poster/117427
---
## 2. OS-Harm: A Benchmark for Measuring Safety of Computer Use Agents
**Summary:** A comprehensive safety benchmark built on OSWorld for testing computer-use agents across three harm categories: deliberate user misuse, prompt injection attacks, and model misbehavior. Includes 150 tasks spanning harassment, copyright infringement, disinformation, data exfiltration, and more. Proposes an automated judge achieving high agreement with human annotations (0.76-0.79 F1 score).
**Key Findings:**
- All tested models (o4-mini, Claude 3.7 Sonnet, Gemini 2.5 Pro) tend to directly comply with many deliberate misuse queries
- Models are relatively vulnerable to static prompt injections
- Models occasionally perform unsafe actions without explicit malicious prompts
**Poster:** https://neurips.cc/virtual/2025/loc/san-diego/poster/121772
---
## 3. OpenCUA: Open Foundations for Computer-Use Agents
**Summary:** A comprehensive open-source framework for scaling computer-use agent data and foundation models. Introduces AgentNet, the first large-scale computer-use task dataset spanning 3 operating systems and 200+ applications/websites. OpenCUA-72B achieves 45% success rate on OSWorld-Verified, establishing new state-of-the-art among open-source models.
**Key Contributions:**
- Annotation infrastructure for capturing human computer-use demonstrations
- AgentNet: large-scale dataset across 3 OSes and 200+ apps
- Scalable pipeline transforming demonstrations into state-action pairs with reflective Chain-of-Thought reasoning
- Models generalize well across domains and benefit from increased test-time computation
**Poster:** https://neurips.cc/virtual/2025/poster/119771
---
## 4. Mind2Web 2: Evaluating Agentic Search with Agent-as-a-Judge
**Summary:** A benchmark of 130 realistic, high-quality, long-horizon tasks for agentic search systems (like Deep Research), requiring real-time web browsing and extensive information synthesis. Constructed with 1000+ hours of human labor. Introduces Agent-as-a-Judge framework using tree-structured rubric design for automated evaluation.
**Key Findings:**
- OpenAI Deep Research achieves 50-70% of human performance while spending half the time
- First systematic evaluation of ten frontier agentic search systems vs. human performance
- Addresses the challenge of evaluating time-varying, complex answers
**Poster:** https://neurips.cc/virtual/2025/poster/121798
---
## 5. Scaling Computer-Use Grounding via User Interface Decomposition and Synthesis
**Summary:** Addresses GUI grounding—mapping natural language to specific UI actions—as a critical bottleneck in agent development. Introduces OSWorld-G benchmark (564 annotated samples) and Jedi dataset (4 million synthetic examples), the largest computer-use grounding dataset. Improved grounding directly enhances agentic capabilities, boosting OSWorld performance from 23% to 51%.
**Key Contributions:**
- OSWorld-G: comprehensive benchmark for diverse grounding tasks (text matching, element recognition, layout understanding, precise manipulation)
- Jedi: 4M examples through multi-perspective task decoupling
- Demonstrates compositional generalization to novel interfaces
**Poster:** https://neurips.cc/virtual/2025/poster/121759
---
## 6. RiOSWorld: Benchmarking the Risk of Multimodal Computer-Use Agents
**Summary:** Evaluates potential safety risks of MLLM-based agents during real-world computer manipulation. Features 492 risky tasks spanning web, social media, multimedia, OS, email, and office software. Categorizes risks into user-originated and environmental risks, evaluating both risk goal intention and completion.
**Key Findings:**
- Current computer-use agents face significant safety risks in real-world scenarios
- Safety principles designed for dialogue scenarios don't transfer well to computer-use
- Highlights necessity and urgency of safety alignment for computer-use agents
**Poster:** https://neurips.cc/virtual/2025/poster/117273
---
## 7. REAL: Benchmarking Autonomous Agents on Deterministic Simulations of Real Websites
**Summary:** A benchmark featuring high-fidelity, deterministic replicas of 11 widely-used websites across e-commerce, travel, communication, and professional networking. Contains 112 practical tasks requiring both information retrieval and state-changing actions. Enables reproducible evaluation without safety risks.
**Key Findings:**
- Best frontier language models achieve only 41% success rate
- Highlights critical gaps in autonomous web navigation and task completion
- Supports scalable post-training data generation
**Poster:** https://neurips.cc/virtual/2025/poster/121619
---
## 8. SE-GUI: Enhancing Visual Grounding for GUI Agents via Self-Evolutionary Reinforcement Learning
**Summary:** An RL-based framework for GUI grounding incorporating seed data curation, dense policy gradients, and self-evolutionary reinforcement finetuning using attention maps. With only 3K training samples, the 7B model achieves state-of-the-art on three grounding benchmarks, outperforming UI-TARS-72B by 24.2% on ScreenSpot-Pro.
**Key Results:**
- 47.3% accuracy on ScreenSpot-Pro with 7B model
- Outperforms 72B models with fraction of training data
- Demonstrates effectiveness of RL for high-resolution, complex environments
**Poster:** https://neurips.cc/virtual/2025/poster/118788
---
## 9. TRAP: Targeted Redirecting of Agentic Preferences
**Summary:** A generative adversarial framework that manipulates agent decision-making using diffusion-based semantic injections. Combines negative prompt degradation with positive semantic optimization. Without model access, produces visually natural images that induce consistent decision biases in agents.
**Key Findings:**
- Consistently induces decision-level preference redirection on LLaVA-34B, Gemma3, GPT-4o, and Mistral-3.2
- Outperforms baselines (SPSA, Bandit, standard diffusion)
- Exposes vulnerability: autonomous agents can be misled through visually subtle, semantically-guided manipulations
**Poster:** https://neurips.cc/virtual/2025/poster/117547
---
## 10. TheAgentCompany: Benchmarking LLM Agents on Consequential Real World Tasks
**Summary:** An extensible benchmark simulating a small software company environment where AI agents interact like digital workers: browsing the web, writing code, running programs, and communicating with coworkers. Tests agents on real professional tasks with important implications for industry adoption and labor market effects.
**Key Findings:**
- Best agent achieves 30% autonomous task completion
- Simpler tasks are solvable autonomously
- More difficult long-horizon tasks remain beyond current systems' reach
**Poster:** https://neurips.cc/virtual/2025/poster/121705
---
## 11. VideoGameQA-Bench: Evaluating Vision-Language Models for Video Game Quality Assurance
**Summary:** A comprehensive benchmark for VLMs in video game QA, encompassing visual unit testing, visual regression testing, needle-in-a-haystack challenges, glitch detection, and bug report generation for both images and videos. Addresses the need for standardized benchmarks in this labor-intensive domain.
**Key Focus:**
- First benchmark specifically designed for video game QA with VLMs
- Covers wide range of QA activities across images and videos
- Addresses lack of automation in game development workflows
**Poster:** https://neurips.cc/virtual/2025/poster/121740
---
## 12. WASP: Benchmarking Web Agent Security Against Prompt Injection Attacks
**Summary:** End-to-end benchmark for evaluating web agent security against prompt injection attacks. Tests realistic scenarios where even simple, low-effort human-written injections can deceive top-tier AI models including those with advanced reasoning.
**Key Findings:**
- Attacks partially succeed in up to 86% of cases
- State-of-the-art agents often struggle to fully complete attacker goals
- Reveals "security by incompetence"—agents' limitations sometimes prevent full attack success
**Poster:** https://neurips.cc/virtual/2025/poster/121728
---
## 13. AgentDAM: Privacy Leakage Evaluation for Autonomous Web Agents
**Summary:** Measures whether AI web-navigation agents follow the privacy principle of "data minimization"—using sensitive information only when truly necessary to complete a task. Simulates realistic web interaction scenarios end-to-end.
**Key Findings:**
- Agents built on GPT-4, Llama-3, and Claude are prone to inadvertent use of unnecessary sensitive information
- Proposes prompting-based defense that reduces information leakage
- End-to-end benchmarking provides more realistic measure than probing LLMs about privacy
**Poster:** https://neurips.cc/virtual/2025/poster/121443
---
## 14. Embodied Web Agents: Bridging Physical-Digital Realms for Integrated Agent Intelligence
**Summary:** A novel paradigm for AI agents that fluidly bridge embodiment and web-scale reasoning. Creates unified simulation integrating realistic 3D indoor/outdoor environments with functional web interfaces. Tasks include cooking from online recipes, navigating with dynamic map data, and interpreting landmarks using web knowledge.
**Key Contributions:**
- Unified platform combining 3D environments with web interfaces
- Benchmark spanning cooking, navigation, shopping, tourism, and geolocation
- Reveals significant performance gaps between AI systems and humans
**Poster:** https://neurips.cc/virtual/2025/poster/121809
---
## 15. VideoCAD: A Dataset and Model for Learning Long-Horizon 3D CAD UI Interactions from Video
**Summary:** The first attempt to model UI interactions for precision engineering tasks. Features 41K+ annotated video recordings of CAD operations with time horizons up to 20x longer than existing datasets. Proposes VideoCADFormer for learning CAD interactions directly from video.
**Key Contributions:**
- Large-scale synthetic dataset for CAD UI interactions
- VQA benchmark for evaluating spatial reasoning and video understanding
- Reveals challenges in precise action grounding and long-horizon dependencies
**Poster:** https://neurips.cc/virtual/2025/poster/121820
---
## 16. Look Before You Leap: A GUI-Critic-R1 Model for Pre-Operative Error Diagnosis
**Summary:** Introduces a pre-operative critic mechanism that provides feedback before action execution by reasoning about potential outcomes. Proposes Suggestion-aware Group Relative Policy Optimization (S-GRPO) for building the GUI-Critic-R1 model with fully automated data generation.
**Key Results:**
- Significant advantages in critic accuracy compared to current MLLMs
- Improved success rates and operational efficiency on GUI automation benchmarks
- Works across both mobile and web domains
**Poster:** https://neurips.cc/virtual/2025/poster/115566
---
## 17. Grounded Reinforcement Learning for Visual Reasoning (ViGoRL)
**Summary:** A vision-language model trained with RL to explicitly anchor each reasoning step to specific visual coordinates. Introduces multi-turn RL framework enabling dynamic zooming into predicted coordinates during reasoning.
**Key Results:**
- 86.4% on V\*Bench for visual search
- Outperforms supervised fine-tuning and conventional RL across spatial reasoning, visual search, and web-based grounding
- Grounding amplifies region exploration, subgoal setting, and visual verification
**Poster:** https://neurips.cc/virtual/2025/poster/120218
---
## 18. GUI-Actor: Coordinate-Free Visual Grounding for GUI Agents
**Summary:** A VLM-based method for coordinate-free GUI grounding using an attention-based action head. Enables proposing one or more action regions in a single forward pass with a grounding verifier for selection.
**Key Results:**
- GUI-Actor-7B achieves 44.6 on ScreenSpot-Pro with Qwen2.5-VL, outperforming UI-TARS-72B (38.1)
- Improved generalization to unseen resolutions and layouts
- Fine-tuning only ~100M parameters achieves SOTA performance
**Poster:** https://neurips.cc/virtual/2025/poster/119841
---
## 19. GUI-G1: Understanding R1-Zero-Like Training for Visual Grounding in GUI Agents
**Summary:** Extensive analysis of the R1-Zero paradigm (online RL + chain-of-thought reasoning) for GUI grounding. Identifies issues: longer reasoning chains lead to worse performance, reward hacking via box size exploitation, and overfitting easy examples.
**Solutions Proposed:**
- Fast Thinking Template for direct answer generation
- Box size constraint in reward function
- Difficulty-aware scaling in RL objective
**Key Results:**
- GUI-G1-3B achieves 90.3% on ScreenSpot and 37.1% on ScreenSpot-Pro
- Outperforms larger UI-TARS-7B with only 3B parameters
**Poster:** https://neurips.cc/virtual/2025/poster/120227
---
## 20. GUI-Reflection: Empowering Multimodal GUI Models with Self-Reflection Behavior
**Summary:** Framework integrating self-reflection and error correction into end-to-end multimodal GUI models through GUI-specific pre-training, offline SFT, and online reflection tuning. Enables self-reflection emergence with fully automated data generation.
**Key Contributions:**
- Scalable pipelines for automatic reflection/correction data from successful trajectories
- GUI-Reflection Task Suite for reflection-oriented abilities
- Diverse environment for online training on mobile devices
- Iterative online reflection tuning algorithm
**Poster:** https://neurips.cc/virtual/2025/poster/115826
---
## 21. InfantAgent-Next: A Multimodal Generalist Agent for Automated Computer Interaction
**Summary:** A generalist agent capable of multimodal computer interaction (text, images, audio, video). Integrates tool-based and pure vision agents within highly modular architecture, enabling collaborative step-by-step task solving.
**Key Results:**
- 7.27 accuracy gain over Claude-Computer-Use on OSWorld
- Evaluated on pure vision benchmarks (OSWorld), general benchmarks (GAIA), and tool-intensive benchmarks (SWE-Bench)
- Demonstrates value of modular, collaborative agent architecture
**Poster:** https://neurips.cc/virtual/2025/poster/118379
---
## 22. AdvEDM: Fine-grained Adversarial Attack against VLM-based Embodied Agents
**Summary:** A fine-grained adversarial attack framework that modifies VLM perception of only key objects while preserving semantics of remaining regions. Unlike broad semantic disruption, this targeted approach reduces conflicts with task context, making VLMs output valid but incorrect decisions that affect agent actions in the physical world.
**Key Contributions:**
- AdvEDM-R: removes semantics of specific objects from images
- AdvEDM-A: adds semantics of new objects into images
- Demonstrates fine-grained control with excellent attack performance in embodied decision-making tasks
**Poster:** https://neurips.cc/virtual/2025/poster/116436
---
## 23. BLINK-Twice: A Reasoning Benchmark on Visual Perception
**Summary:** A vision-centric reasoning benchmark grounded in challenging perceptual tasks. Unlike prior benchmarks, it moves beyond shallow perception ("see") to require fine-grained observation and analytical reasoning ("observe"). Features natural adversarial image pairs and annotated reasoning chains for process evaluation.
**Key Findings:**
- Tests 20 leading MLLMs including 12 foundation models and 8 reasoning-enhanced models
- Existing reasoning strategies (chain-of-thought, self-criticism) result in unstable and redundant reasoning
- Repeated image observation improves performance across models
- Active visual interaction (as in o3) highlights need for new vision reasoning paradigm
**Poster:** https://neurips.cc/virtual/2025/poster/121522
---
## 24. BadVLA: Backdoor Attacks on Vision-Language-Action Models
**Summary:** First systematic investigation of backdoor vulnerabilities in VLA models. Proposes Objective-Decoupled Optimization with two stages: explicit feature-space separation to isolate trigger representations, and conditional control deviations activated only by triggers.
**Key Findings:**
- Consistently achieves near-100% attack success rates with minimal impact on clean task accuracy
- Robust against common input perturbations, task transfers, and model fine-tuning
- Exposes critical security vulnerabilities in current VLA deployments under Training-as-a-Service paradigm
**Poster:** https://neurips.cc/virtual/2025/poster/115803
---
## 25. Benchmarking Egocentric Multimodal Goal Inference for Assistive Wearable Agents
**Summary:** Benchmark for proactively inferring user goals from multimodal contextual observations for wearable assistant agents (smart glasses). Dataset comprises ~30 hours from 363 participants across 3,482 recordings with visual, audio, digital, and longitudinal context.
**Key Findings:**
- Humans achieve 93% MCQ accuracy; best VLM reaches ~84%
- For open-ended generation, best models produce relevant goals only ~57% of the time
- Smaller models (suited for wearables) achieve ~49% accuracy
- Models benefit from relevant modalities but struggle with noisy ones
**Poster:** https://neurips.cc/virtual/2025/poster/121655
---
## 26. GAM-Agent: Game-Theoretic Multi-Agent Framework for Visual Reasoning
**Summary:** A game-theoretic multi-agent framework formulating reasoning as a non-zero-sum game between base agents (visual perception specialists) and a critical agent (logic/fact verification). Features uncertainty-aware controller for dynamic agent collaboration with multi-round debates.
**Key Results:**
- Boosts small-to-mid scale models (Qwen2.5-VL-7B, InternVL3-14B) by 5-6%
- Enhances strong models like GPT-4o by 2-3%
- Modular, scalable, and generalizable framework
**Poster:** https://neurips.cc/virtual/2025/poster/119144
---
## 27. GRIT: Teaching MLLMs to Think with Images
**Summary:** Introduces Grounded Reasoning with Images and Texts—a method for training MLLMs to generate reasoning chains interleaving natural language with explicit bounding box coordinates. Uses GRPO-GR reinforcement learning with rewards focused on answer accuracy and grounding format.
**Key Contributions:**
- Exceptional data efficiency: requires as few as 20 image-question-answer triplets
- Successfully unifies reasoning and grounding abilities
- Eliminates need for reasoning chain annotations or explicit bounding box labels
**Poster:** https://neurips.cc/virtual/2025/poster/118020
---
## 28. Safe RLHF-V: Safe Reinforcement Learning from Multi-modal Human Feedback
**Summary:** First multimodal safety alignment framework. Introduces BeaverTails-V (first dataset with dual preference annotations for helpfulness and safety), and Beaver-Guard-V (multi-level guardrail system defending against unsafe queries and adversarial attacks).
**Key Results:**
- Guard model improves precursor model's safety by average of 40.9% over five filtering rounds
- Safe RLHF-V enhances model safety by 34.2% and helpfulness by 34.3%
- First exploration of multi-modal safety alignment within constrained optimization
**Poster:** https://neurips.cc/virtual/2025/poster/118304
---
## 29. Dropout Decoding: Uncertainty-Guided Token Dropout for LVLM Reliability
**Summary:** An inference-time approach that quantifies visual token uncertainty and selectively masks uncertain tokens. Decomposes uncertainty into aleatoric and epistemic components, focusing on epistemic uncertainty for perception-related errors.
**Key Results:**
- Significantly reduces object hallucinations
- Enhances reliability and quality of LVLM outputs across diverse visual contexts
- Validated on CHAIR, THRONE, and MMBench benchmarks
**Poster:** https://neurips.cc/virtual/2025/poster/118572
---
## 30. FOCUS: Unified Vision-Language Modeling for Interactive Editing
**Summary:** A unified LVLM integrating segmentation-aware perception and controllable object-centric generation. Uses dual-branch visual encoder for global semantic context and fine-grained spatial details, with MoVQGAN-based visual tokenizer for discrete visual tokens.
**Key Contributions:**
- Progressive multi-stage training pipeline
- Segmentation masks jointly optimized as spatial condition prompts
- Bridges segmentation-aware perception with fine-grained visual synthesis
**Poster:** https://neurips.cc/virtual/2025/poster/119062
---
## 31. Fine-Grained Preference Optimization for Spatial Reasoning (SpatialReasoner-R1)
**Summary:** Introduces Multi-Model Monte Carlo Tree Search (M3CTS) for generating diverse Long Chain-of-Thought reasoning trajectories. Proposes fine-grained Direct Preference Optimization (fDPO) with segment-specific preference granularity guided by spatial reward mechanism.
**Key Results:**
- fDPO achieves 4.1% and 9.0% gains over standard DPO on spatial quality and quantity tasks
- SpatialReasoner-R1 sets new SOTA on SpatialRGPT-Bench, outperforming strongest baseline by 9.8%
- Maintains competitive performance on general vision-language tasks
**Poster:** https://neurips.cc/virtual/2025/poster/118573
---
## 32. Reason-RFT: Reinforcement Fine-Tuning for Visual Reasoning
**Summary:** A two-stage reinforcement fine-tuning framework: SFT with curated Chain-of-Thought data activates reasoning potential, followed by RL based on Group Relative Policy Optimization (GRPO) for domain shift adaptability.
**Key Advantages:**
- State-of-the-art results outperforming both open-source and proprietary models
- Robust performance under domain shifts across various tasks
- Excellent data efficiency in few-shot learning scenarios
**Poster:** https://neurips.cc/virtual/2025/poster/118345
---
## 33. Safe + Safe = Unsafe? Exploiting Safe Images to Jailbreak LVLMs
**Summary:** Reveals that safe images can be exploited for jailbreaking when combined with additional safe images and prompts, exploiting LVLMs' universal reasoning capabilities and safety snowball effect. Proposes Safety Snowball Agent (SSA) framework.
**Key Findings:**
- SSA can use nearly any image to induce LVLMs to produce unsafe content
- Achieves high jailbreak success rates against latest LVLMs
- Exploits inherent LVLM properties rather than alignment flaws
**Poster:** https://neurips.cc/virtual/2025/loc/san-diego/poster/116422
---
## 34. MIP against Agent: Malicious Image Patches Hijacking Multimodal OS Agents
**Summary:** Uncovers novel attack vector: Malicious Image Patches (MIPs)—adversarially perturbed screen regions that induce OS agents to perform harmful actions. MIPs can be embedded in wallpapers or shared on social media to exfiltrate sensitive data.
**Key Findings:**
- MIPs generalize across user prompts and screen configurations
- Can hijack multiple OS agents during execution of benign instructions
- Exposes critical security vulnerabilities requiring attention before widespread deployment
**Poster:** https://neurips.cc/virtual/2025/loc/san-diego/poster/117813
---
## 35. CogVLA: Cognition-Aligned Vision-Language-Action Models
**Summary:** A framework leveraging instruction-driven routing and sparsification for VLA efficiency. Features 3-stage progressive architecture inspired by human multimodal coordination: Encoder-FiLM Aggregation Routing, LLM-FiLM Pruning Routing, and V-L-A Coupled Attention.
**Key Results:**
- 97.4% success rate on LIBERO benchmark, 70.0% on real-world robotic tasks
- Reduces training costs by 2.5x and inference latency by 2.8x compared to OpenVLA
- Achieves state-of-the-art performance
**Poster:** https://neurips.cc/virtual/2025/poster/119023
---
## 36. Succeed or Learn Slowly (SoLS): Sample Efficient RL for Mobile App Control
**Summary:** Novel off-policy RL algorithm applying direct policy updates for positive samples and conservative, regularized updates for negative ones. Augmented with Successful Transition Replay (STR) for prioritizing successful interactions.
**Key Results:**
- At least 17% relative increase over existing methods on AndroidWorld benchmark
- Substantially fewer computational resources than GPT-4o-based methods
- 5-60x faster inference
**Poster:** https://neurips.cc/virtual/2025/poster/119910
---
## 37. TAI3: Testing Agent Integrity in Interpreting User Intent
**Summary:** An API-centric stress testing framework that uncovers intent integrity violations in LLM agents. Uses semantic partitioning to organize tasks into meaningful categories, with targeted mutations to expose subtle agent errors while preserving user intent.
**Key Contributions:**
- Datatype-aware strategy memory for retrieving effective mutation patterns
- Lightweight predictor for ranking mutations by error likelihood
- Generalizes to stronger target models using smaller LLMs for test generation
**Poster:** https://neurips.cc/virtual/2025/poster/118952
---
## 38. ThinkAct: Vision-Language-Action Reasoning via Reinforced Visual Latent Planning
**Summary:** A dual-system framework bridging high-level reasoning with low-level action execution. Trains multimodal LLM to generate embodied reasoning plans guided by action-aligned visual rewards, compressed into visual plan latents for downstream action execution.
**Key Capabilities:**
- Few-shot adaptation
- Long-horizon planning
- Self-correction behaviors in complex embodied AI tasks
**Poster:** https://neurips.cc/virtual/2025/poster/119747
---
## 39. Visualization-of-Thought Attack (VoTA) against VLMs
**Summary:** Automated attack framework that constructs chains of images with risky visual thoughts to challenge VLMs. Exploits the conflict between logical processing and safety protocols, leading to unsafe content generation.
**Key Results:**
- Improves average attack success rate by 26.71% (from 63.70% to 90.41%)
- Tested on 9 open-source and 6 commercial VLMs
- Outperforms state-of-the-art methods
**Poster:** https://neurips.cc/virtual/2025/poster/119873
---
## 40. Open CaptchaWorld: Benchmarking MLLM Agents on CAPTCHA Puzzles
**Summary:** First web-based benchmark evaluating MLLM agents on diverse CAPTCHA puzzles. Spans 20 modern CAPTCHA types (225 total) with novel metric: CAPTCHA Reasoning Depth quantifying cognitive and motor steps required.
**Key Findings:**
- Humans achieve 93.3% success rate
- State-of-the-art agents achieve at most 40.0% (Browser-Use OpenAI-o3)
- Highlights significant gap between human and agent capabilities
**Poster:** https://neurips.cc/virtual/2025/poster/121537
---
## 41. Pixel Reasoner: Pixel-Space Reasoning with Curiosity-Driven RL
**Summary:** Introduces pixel-space reasoning framework where VLMs use visual operations (zoom-in, select-frame) to directly inspect and infer from visual evidence. Two-phase training: instruction tuning on synthesized traces, then RL with curiosity-driven rewards.
**Key Results:**
- 84% on V\*Bench, 74% on TallyQA-Complex, 84% on InfographicsVQA
- Highest accuracy achieved by any open-source 7B model
- Enables proactive information gathering from complex visual inputs
**Poster:** https://neurips.cc/virtual/2025/poster/117667
---
## 42. BTL-UI: Blink-Think-Link Reasoning Model for GUI Agent
**Summary:** Brain-inspired framework decomposing interactions into three biologically plausible phases: Blink (rapid detection via saccadic-like attention), Think (higher-level reasoning/planning), and Link (executable command generation for motor control).
**Key Innovations:**
- Automated annotation pipeline for blink data
- BTL Reward: first rule-based reward mechanism driven by both process and outcome
- Competitive performance on static GUI understanding and dynamic interaction tasks
**Poster:** https://neurips.cc/virtual/2025/poster/119419
---
## 43. GUI Exploration Lab: Multi-Turn RL for Screen Navigation
**Summary:** Simulation environment engine enabling flexible definition of screens, icons, and navigation graphs with full environment access for agent training/evaluation. Demonstrates progressive training approach from SFT to multi-turn RL.
**Key Findings:**
- Supervised fine-tuning enables memorization of fundamental knowledge
- Single-turn RL enhances generalization to unseen scenarios
- Multi-turn RL encourages exploration strategies through interactive trial and error
**Poster:** https://neurips.cc/virtual/2025/loc/san-diego/poster/117497
---
## 44. GUI-Rise: Structured Reasoning and History Summarization for GUI Navigation
**Summary:** Reasoning-enhanced framework integrating structured reasoning, action prediction, and history summarization. Uses Chain-of-Thought analyses combining progress estimation and decision reasoning, trained via SFT and GRPO with history-aware rewards.
**Key Results:**
- State-of-the-art under identical training data conditions
- Particularly strong in out-of-domain scenarios
- Robust reasoning and generalization across diverse GUI navigation tasks
**Poster:** https://neurips.cc/virtual/2025/poster/117425
---
## 45. UI-Genie: A Self-Improving Framework for MLLM-based Mobile GUI Agents
**Summary:** Self-improving framework addressing trajectory verification and training data scalability. Features UI-Genie-RM (image-text interleaved reward model) and self-improvement pipeline with reward-guided exploration and outcome verification.
**Key Contributions:**
- UI-Genie-RM-517k: first reward-specific dataset for GUI agents
- UI-Genie-Agent-16k: high-quality synthetic trajectories without manual annotation
- State-of-the-art across multiple GUI agent benchmarks through three generations of self-improvement
**Poster:** https://neurips.cc/virtual/2025/poster/119990
---
## What We're Building
At Cua, we're focused on the infrastructure layer for computer-use agents: cloud sandboxes for safe execution, SDKs for agent development, and tools that make it easier to build and deploy agents in production.
If you're experimenting with any of the approaches in these papers, our [Cloud Sandboxes](https://cua.ai) provide isolated Linux, Windows, and macOS environments where you can test agent behavior without risk to real systems.
---
**Start building:** [cua.ai](https://cua.ai)
**Join the community:** [Discord](https://discord.gg/cua-ai)
```