#
tokens: 48547/50000 6/616 files (page 16/20)
lines: off (toggle) GitHub
raw markdown copy
This is page 16 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── bump-version.yml
│       ├── ci-lume.yml
│       ├── docker-publish-cua-linux.yml
│       ├── docker-publish-cua-windows.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── link-check.yml
│       ├── lint.yml
│       ├── npm-publish-cli.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       ├── python-tests.yml
│       ├── test-cua-models.yml
│       └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│   ├── docs.code-workspace
│   ├── extensions.json
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── cloud-windows-ga-macos-preview.md
│   ├── composite-agents.md
│   ├── computer-use-agents-for-growth-hacking.md
│   ├── cua-hackathon.md
│   ├── cua-playground-preview.md
│   ├── cua-vlm-router.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cli.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── neurips-2025-cua-papers.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .env.example
│   ├── .gitignore
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── observability.mdx
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── cua-vlm-router.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   ├── telemetry.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── cli-playbook
│   │       │   ├── commands.mdx
│   │       │   ├── index.mdx
│   │       │   └── meta.json
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── meta.json
│   │       │   ├── sandboxed-python.mdx
│   │       │   └── tracing-api.mdx
│   │       ├── example-usecases
│   │       │   ├── form-filling.mdx
│   │       │   ├── gemini-complex-ui-navigation.mdx
│   │       │   ├── meta.json
│   │       │   ├── post-event-contact-export.mdx
│   │       │   └── windows-app-behind-vpn.mdx
│   │       ├── get-started
│   │       │   ├── meta.json
│   │       │   └── quickstart.mdx
│   │       ├── index.mdx
│   │       ├── macos-vm-cli-playbook
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   └── meta.json
│   │       └── meta.json
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── bg-dark.jpg
│   │       ├── bg-light.jpg
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── grounding-with-gemini3.gif
│   │       ├── hero.png
│   │       ├── laminar_trace_example.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   ├── posthog
│   │   │   │   │   └── [...path]
│   │   │   │   │       └── route.ts
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   ├── llms.txt
│   │   │   │   └── route.ts
│   │   │   ├── robots.ts
│   │   │   └── sitemap.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── analytics-tracker.tsx
│   │   │   ├── cookie-consent.tsx
│   │   │   ├── doc-actions-menu.tsx
│   │   │   ├── editable-code-block.tsx
│   │   │   ├── footer.tsx
│   │   │   ├── hero.tsx
│   │   │   ├── iou.tsx
│   │   │   ├── mermaid.tsx
│   │   │   └── page-feedback.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   ├── mdx-components.tsx
│   │   └── providers
│   │       └── posthog-provider.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── browser_tool_example.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── tracing_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── cua_adapter.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gelato.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── generic_vlm.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   ├── uiins.py
│   │   │   │   │   ├── uitars.py
│   │   │   │   │   └── uitars2.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── tools
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── browser_tool.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer_agent.py
│   │   ├── bench-ui
│   │   │   ├── bench_ui
│   │   │   │   ├── __init__.py
│   │   │   │   ├── api.py
│   │   │   │   └── child.py
│   │   │   ├── examples
│   │   │   │   ├── folder_example.py
│   │   │   │   ├── gui
│   │   │   │   │   ├── index.html
│   │   │   │   │   ├── logo.svg
│   │   │   │   │   └── styles.css
│   │   │   │   ├── output_overlay.png
│   │   │   │   └── simple_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       └── test_port_detection.py
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── tracing_wrapper.py
│   │   │   │   ├── tracing.py
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer.py
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── browser.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   ├── utils
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── wallpaper.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   ├── test_connection.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_server.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_telemetry.py
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── build-extension.py
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── desktop-extension
│   │   │   │   ├── cua-extension.mcpb
│   │   │   │   ├── desktop_extension.png
│   │   │   │   ├── manifest.json
│   │   │   │   ├── README.md
│   │   │   │   ├── requirements.txt
│   │   │   │   ├── run_server.sh
│   │   │   │   └── setup.py
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── QUICK_TEST_COMMANDS.sh
│   │   │   ├── quick_test_local_option.py
│   │   │   ├── README.md
│   │   │   ├── scripts
│   │   │   │   ├── install_mcp_server.sh
│   │   │   │   └── start_mcp_server.sh
│   │   │   ├── test_mcp_server_local_option.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_mcp_server.py
│   │   ├── pylume
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_pylume.py
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           ├── conftest.py
│   │           └── test_omniparser.py
│   ├── qemu-docker
│   │   ├── linux
│   │   │   ├── Dockerfile
│   │   │   ├── README.md
│   │   │   └── src
│   │   │       ├── entry.sh
│   │   │       └── vm
│   │   │           ├── image
│   │   │           │   └── README.md
│   │   │           └── setup
│   │   │               ├── install.sh
│   │   │               ├── setup-cua-server.sh
│   │   │               └── setup.sh
│   │   ├── README.md
│   │   └── windows
│   │       ├── Dockerfile
│   │       ├── README.md
│   │       └── src
│   │           ├── entry.sh
│   │           └── vm
│   │               ├── image
│   │               │   └── README.md
│   │               └── setup
│   │                   ├── install.bat
│   │                   ├── on-logon.ps1
│   │                   ├── setup-cua-server.ps1
│   │                   ├── setup-utils.psm1
│   │                   └── setup.ps1
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── cua-cli
│   │   │   ├── .gitignore
│   │   │   ├── .prettierrc
│   │   │   ├── bun.lock
│   │   │   ├── CLAUDE.md
│   │   │   ├── index.ts
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── auth.ts
│   │   │   │   ├── cli.ts
│   │   │   │   ├── commands
│   │   │   │   │   ├── auth.ts
│   │   │   │   │   └── sandbox.ts
│   │   │   │   ├── config.ts
│   │   │   │   ├── http.ts
│   │   │   │   ├── storage.ts
│   │   │   │   └── util.ts
│   │   │   └── tsconfig.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Development.md
│       ├── Dockerfile
│       ├── Dockerfile.dev
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── install-cli.ps1
│   ├── install-cli.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   ├── run-docker-dev.sh
│   └── typescript-typecheck.js
├── TESTING.md
├── tests
│   ├── agent_loop_testing
│   │   ├── agent_test.py
│   │   └── README.md
│   ├── pytest.ini
│   ├── shell_cmd.py
│   ├── test_files.py
│   ├── test_mcp_server_session_management.py
│   ├── test_mcp_server_streaming.py
│   ├── test_shell_bash.py
│   ├── test_telemetry.py
│   ├── test_tracing.py
│   ├── test_venv.py
│   └── test_watchdog.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/libs/python/agent/agent/human_tool/ui.py:
--------------------------------------------------------------------------------

```python
import base64
import io
import json
import time
from datetime import datetime
from typing import Any, Dict, List, Optional

import gradio as gr
import requests
from PIL import Image

from .server import completion_queue


class HumanCompletionUI:
    def __init__(self, server_url: str = "http://localhost:8002"):
        self.server_url = server_url
        self.current_call_id: Optional[str] = None
        self.refresh_interval = 2.0  # seconds
        self.last_image = None  # Store the last image for display
        # Track current interactive action controls
        self.current_action_type: str = "click"
        self.current_button: str = "left"
        self.current_scroll_x: int = 0
        self.current_scroll_y: int = -120

    def format_messages_for_chatbot(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Format messages for display in gr.Chatbot with type='messages'."""
        formatted = []
        for msg in messages:
            role = msg.get("role", "user")
            content = msg.get("content", "")
            tool_calls = msg.get("tool_calls", [])

            # Handle different content formats
            if isinstance(content, list):
                # Multi-modal content - can include text and images
                formatted_content = []
                for item in content:
                    if item.get("type") == "text":
                        text = item.get("text", "")
                        if text.strip():  # Only add non-empty text
                            formatted_content.append(text)
                    elif item.get("type") == "image_url":
                        image_url = item.get("image_url", {}).get("url", "")
                        if image_url:
                            # Check if it's a base64 image or URL
                            if image_url.startswith("data:image"):
                                # For base64 images, decode and create gr.Image
                                try:
                                    header, data = image_url.split(",", 1)
                                    image_data = base64.b64decode(data)
                                    image = Image.open(io.BytesIO(image_data))
                                    formatted_content.append(gr.Image(value=image))
                                except Exception as e:
                                    print(f"Error loading image: {e}")
                                    formatted_content.append(f"[Image loading error: {e}]")
                            else:
                                # For URL images, create gr.Image with URL
                                formatted_content.append(gr.Image(value=image_url))

                # Determine final content format
                if len(formatted_content) == 1:
                    content = formatted_content[0]
                elif len(formatted_content) > 1:
                    content = formatted_content
                else:
                    content = "[Empty content]"

            # Ensure role is valid for Gradio Chatbot
            if role not in ["user", "assistant"]:
                role = "assistant" if role == "system" else "user"

            # Invert roles for better display in human UI context
            # (what the AI says becomes "user", what human should respond becomes "assistant")
            if role == "user":
                role = "assistant"
            else:
                role = "user"

            # Add the main message if it has content
            if content and str(content).strip():
                formatted.append({"role": role, "content": content})

            # Handle tool calls - create separate messages for each tool call
            if tool_calls:
                for tool_call in tool_calls:
                    function_name = tool_call.get("function", {}).get("name", "unknown")
                    arguments_str = tool_call.get("function", {}).get("arguments", "{}")

                    try:
                        # Parse arguments to format them nicely
                        arguments = json.loads(arguments_str)
                        formatted_args = json.dumps(arguments, indent=2)
                    except json.JSONDecodeError:
                        # If parsing fails, use the raw string
                        formatted_args = arguments_str

                    # Create a formatted message for the tool call
                    tool_call_content = f"```json\n{formatted_args}\n```"

                    formatted.append(
                        {
                            "role": role,
                            "content": tool_call_content,
                            "metadata": {"title": f"🛠️ Used {function_name}"},
                        }
                    )

        return formatted

    def get_pending_calls(self) -> List[Dict[str, Any]]:
        """Get pending calls from the server."""
        try:
            response = requests.get(f"{self.server_url}/pending", timeout=5)
            if response.status_code == 200:
                return response.json().get("pending_calls", [])
        except Exception as e:
            print(f"Error fetching pending calls: {e}")
        return []

    def complete_call_with_response(self, call_id: str, response: str) -> bool:
        """Complete a call with a text response."""
        try:
            response_data = {"response": response}
            response_obj = requests.post(
                f"{self.server_url}/complete/{call_id}", json=response_data, timeout=10
            )
            response_obj.raise_for_status()
            return True
        except requests.RequestException as e:
            print(f"Error completing call: {e}")
            return False

    def complete_call_with_tool_calls(self, call_id: str, tool_calls: List[Dict[str, Any]]) -> bool:
        """Complete a call with tool calls."""
        try:
            response_data = {"tool_calls": tool_calls}
            response_obj = requests.post(
                f"{self.server_url}/complete/{call_id}", json=response_data, timeout=10
            )
            response_obj.raise_for_status()
            return True
        except requests.RequestException as e:
            print(f"Error completing call: {e}")
            return False

    def complete_call(
        self,
        call_id: str,
        response: Optional[str] = None,
        tool_calls: Optional[List[Dict[str, Any]]] = None,
    ) -> bool:
        """Complete a call with either a response or tool calls."""
        try:
            response_data = {}
            if response:
                response_data["response"] = response
            if tool_calls:
                response_data["tool_calls"] = tool_calls

            response_obj = requests.post(
                f"{self.server_url}/complete/{call_id}", json=response_data, timeout=10
            )
            response_obj.raise_for_status()
            return True
        except requests.RequestException as e:
            print(f"Error completing call: {e}")
            return False

    def get_last_image_from_messages(self, messages: List[Dict[str, Any]]) -> Optional[Any]:
        """Extract the last image from the messages for display above conversation."""
        last_image = None

        for msg in reversed(messages):  # Start from the last message
            content = msg.get("content", "")

            if isinstance(content, list):
                for item in reversed(content):  # Get the last image in the message
                    if item.get("type") == "image_url":
                        image_url = item.get("image_url", {}).get("url", "")
                        if image_url:
                            if image_url.startswith("data:image"):
                                # For base64 images, create a gr.Image component
                                try:
                                    header, data = image_url.split(",", 1)
                                    image_data = base64.b64decode(data)
                                    image = Image.open(io.BytesIO(image_data))
                                    return image
                                except Exception as e:
                                    print(f"Error loading image: {e}")
                                    continue
                            else:
                                # For URL images, return the URL
                                return image_url

        return last_image

    def refresh_pending_calls(self):
        """Refresh the list of pending calls."""
        pending_calls = self.get_pending_calls()

        if not pending_calls:
            return (
                gr.update(choices=["latest"], value="latest"),  # dropdown
                gr.update(value=None),  # image (no image)
                gr.update(value=[]),  # chatbot (empty messages)
                gr.update(interactive=False),  # submit button
                gr.update(visible=False),  # click_actions_group hidden
                gr.update(visible=False),  # actions_group hidden
            )

        # Sort pending calls by created_at to get oldest first
        sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", ""))

        # Create choices for dropdown
        choices = [("latest", "latest")]  # Add "latest" option first

        for call in sorted_calls:
            call_id = call["id"]
            model = call.get("model", "unknown")
            created_at = call.get("created_at", "")
            # Format timestamp
            try:
                dt = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
                time_str = dt.strftime("%H:%M:%S")
            except:
                time_str = created_at

            choice_label = f"{call_id[:8]}... ({model}) - {time_str}"
            choices.append((choice_label, call_id))

        # Default to "latest" which shows the oldest pending conversation
        selected_call_id = "latest"
        if selected_call_id == "latest" and sorted_calls:
            # Use the oldest call (first in sorted list)
            selected_call = sorted_calls[0]
            conversation = self.format_messages_for_chatbot(selected_call.get("messages", []))
            self.current_call_id = selected_call["id"]
            # Get the last image from messages
            self.last_image = self.get_last_image_from_messages(selected_call.get("messages", []))
        else:
            conversation = []
            self.current_call_id = None
            self.last_image = None

        return (
            gr.update(choices=choices, value="latest"),
            gr.update(value=self.last_image),
            gr.update(value=conversation),
            gr.update(interactive=bool(choices)),
            gr.update(visible=True),  # click_actions_group visible when there is a call
            gr.update(visible=True),  # actions_group visible when there is a call
        )

    def on_call_selected(self, selected_choice):
        """Handle when a call is selected from the dropdown."""
        if not selected_choice:
            return (
                gr.update(value=None),  # no image
                gr.update(value=[]),  # empty chatbot
                gr.update(interactive=False),
                gr.update(visible=False),  # click_actions_group hidden
                gr.update(visible=False),  # actions_group hidden
            )

        pending_calls = self.get_pending_calls()
        if not pending_calls:
            return (
                gr.update(value=None),  # no image
                gr.update(value=[]),  # empty chatbot
                gr.update(interactive=False),
                gr.update(visible=False),  # click_actions_group hidden
                gr.update(visible=False),  # actions_group hidden
            )

        # Handle "latest" option
        if selected_choice == "latest":
            # Sort calls by created_at to get oldest first
            sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", ""))
            selected_call = sorted_calls[0]  # Get the oldest call
            call_id = selected_call["id"]
        else:
            # Extract call_id from the choice for specific calls
            call_id = None
            for call in pending_calls:
                call_id_short = call["id"][:8]
                if call_id_short in selected_choice:
                    call_id = call["id"]
                    break

            if not call_id:
                return (
                    gr.update(value=None),  # no image
                    gr.update(value=[]),  # empty chatbot
                    gr.update(interactive=False),
                )

            # Find the selected call
            selected_call = next((c for c in pending_calls if c["id"] == call_id), None)

        if not selected_call:
            return (
                gr.update(value=None),  # no image
                gr.update(value=[]),  # empty chatbot
                gr.update(interactive=False),
                gr.update(visible=False),  # click_actions_group hidden
                gr.update(visible=False),  # actions_group hidden
            )

        conversation = self.format_messages_for_chatbot(selected_call.get("messages", []))
        self.current_call_id = call_id
        # Get the last image from messages
        self.last_image = self.get_last_image_from_messages(selected_call.get("messages", []))

        return (
            gr.update(value=self.last_image),
            gr.update(value=conversation),
            gr.update(interactive=True),
            gr.update(visible=True),  # click_actions_group visible
            gr.update(visible=True),  # actions_group visible
        )

    def submit_response(self, response_text: str):
        """Submit a text response to the current call."""
        if not self.current_call_id:
            return (
                gr.update(value=response_text),  # keep response text
                gr.update(value="❌ No call selected"),  # status
            )

        if not response_text.strip():
            return (
                gr.update(value=response_text),  # keep response text
                gr.update(value="❌ Response cannot be empty"),  # status
            )

        success = self.complete_call_with_response(self.current_call_id, response_text)

        if success:
            status_msg = "✅ Response submitted successfully!"
            return (
                gr.update(value=""),  # clear response text
                gr.update(value=status_msg),  # status
            )
        else:
            return (
                gr.update(value=response_text),  # keep response text
                gr.update(value="❌ Failed to submit response"),  # status
            )

    def submit_action(self, action_type: str, **kwargs) -> str:
        """Submit a computer action as a tool call."""
        if not self.current_call_id:
            return "❌ No call selected"

        import uuid

        # Create tool call structure
        action_data = {"type": action_type, **kwargs}
        tool_call = {
            "id": f"call_{uuid.uuid4().hex[:24]}",
            "type": "function",
            "function": {"name": "computer", "arguments": json.dumps(action_data)},
        }

        success = self.complete_call_with_tool_calls(self.current_call_id, [tool_call])

        if success:
            return f"✅ {action_type.capitalize()} action submitted as tool call"
        else:
            return f"❌ Failed to submit {action_type} action"

    def submit_click_action(
        self, x: int, y: int, action_type: str = "click", button: str = "left"
    ) -> str:
        """Submit a coordinate-based action."""
        if action_type == "click":
            return self.submit_action(action_type, x=x, y=y, button=button)
        else:
            return self.submit_action(action_type, x=x, y=y)

    def submit_type_action(self, text: str) -> str:
        """Submit a type action."""
        return self.submit_action("type", text=text)

    def submit_hotkey_action(self, keys: str) -> str:
        """Submit a hotkey action."""
        return self.submit_action("keypress", keys=keys)

    def submit_wait_action(self) -> str:
        """Submit a wait action with no kwargs."""
        return self.submit_action("wait")

    def submit_description_click(
        self, description: str, action_type: str = "click", button: str = "left"
    ) -> str:
        """Submit a description-based action."""
        if action_type == "click":
            return self.submit_action(action_type, element_description=description, button=button)
        else:
            return self.submit_action(action_type, element_description=description)

    def wait_for_pending_calls(self, max_seconds: float = 10.0, check_interval: float = 0.2):
        """Wait for pending calls to appear or until max_seconds elapsed.

        This method loops and checks for pending calls at regular intervals,
        returning as soon as a pending call is found or the maximum wait time is reached.

        Args:
            max_seconds: Maximum number of seconds to wait
            check_interval: How often to check for pending calls (in seconds)
        """
        import time

        start_time = time.time()

        while time.time() - start_time < max_seconds:
            # Check if there are any pending calls
            pending_calls = self.get_pending_calls()
            if pending_calls:
                # Found pending calls, return immediately
                return self.refresh_pending_calls()

            # Wait before checking again
            time.sleep(check_interval)

        # Max wait time reached, return current state
        return self.refresh_pending_calls()


def create_ui():
    """Create the Gradio interface."""
    ui_handler = HumanCompletionUI()

    with gr.Blocks(title="Human-in-the-Loop Agent Tool", fill_width=True) as demo:
        gr.Markdown("# 🤖 Human-in-the-Loop Agent Tool")
        gr.Markdown("Review AI conversation requests and provide human responses.")

        with gr.Row():
            with gr.Column(scale=2):
                with gr.Group():
                    screenshot_image = gr.Image(
                        label="Interactive Screenshot", interactive=False, height=600
                    )

                    # Action type selection for image clicks (wrapped for visibility control)
                    with gr.Group(visible=False) as click_actions_group:
                        with gr.Row():
                            action_type_radio = gr.Dropdown(
                                label="Interactive Action",
                                choices=[
                                    "click",
                                    "double_click",
                                    "move",
                                    "left_mouse_up",
                                    "left_mouse_down",
                                    "scroll",
                                ],
                                value="click",
                                scale=2,
                            )
                            action_button_radio = gr.Dropdown(
                                label="Button",
                                choices=["left", "right", "wheel", "back", "forward"],
                                value="left",
                                visible=True,
                                scale=1,
                            )
                            scroll_x_input = gr.Number(
                                label="scroll_x", value=0, visible=False, scale=1
                            )
                            scroll_y_input = gr.Number(
                                label="scroll_y", value=-120, visible=False, scale=1
                            )

                    conversation_chatbot = gr.Chatbot(
                        label="Conversation", type="messages", height=500, show_copy_button=True
                    )

            with gr.Column(scale=1):
                with gr.Group():
                    call_dropdown = gr.Dropdown(
                        label="Select a pending conversation request",
                        choices=["latest"],
                        interactive=True,
                        value="latest",
                    )
                    refresh_btn = gr.Button("🔄 Refresh", variant="secondary")
                    status_display = gr.Textbox(
                        label="Status", interactive=False, value="Ready to receive requests..."
                    )

                with gr.Group():
                    response_text = gr.Textbox(
                        label="Message", lines=3, placeholder="Enter your message here..."
                    )
                    submit_btn = gr.Button(
                        "📤 Submit Message", variant="primary", interactive=False
                    )

                # Action Accordions (wrapped for visibility control)
                with gr.Group(visible=False) as actions_group:
                    with gr.Tabs():
                        with gr.Tab("🖱️ Click Actions"):
                            with gr.Group():
                                description_text = gr.Textbox(
                                    label="Element Description",
                                    placeholder="e.g., 'Privacy and security option in left sidebar'",
                                )
                                with gr.Row():
                                    description_action_type = gr.Dropdown(
                                        label="Action",
                                        choices=[
                                            "click",
                                            "double_click",
                                            "move",
                                            "left_mouse_up",
                                            "left_mouse_down",
                                        ],
                                        value="click",
                                    )
                                    description_button = gr.Dropdown(
                                        label="Button",
                                        choices=["left", "right", "wheel", "back", "forward"],
                                        value="left",
                                    )
                                description_submit_btn = gr.Button("Submit Click Action")

                        with gr.Tab("📝 Type Action"):
                            with gr.Group():
                                type_text = gr.Textbox(
                                    label="Text to Type", placeholder="Enter text to type..."
                                )
                                type_submit_btn = gr.Button("Submit Type")

                        with gr.Tab("⌨️ Keypress Action"):
                            with gr.Group():
                                keypress_text = gr.Textbox(
                                    label="Keys", placeholder="e.g., ctrl+c, alt+tab"
                                )
                                keypress_submit_btn = gr.Button("Submit Keypress")

                        with gr.Tab("🧰 Misc Actions"):
                            with gr.Group():
                                misc_action_dropdown = gr.Dropdown(
                                    label="Action", choices=["wait"], value="wait"
                                )
                                misc_submit_btn = gr.Button("Submit Action")

        # Event handlers
        refresh_btn.click(
            fn=ui_handler.refresh_pending_calls,
            outputs=[
                call_dropdown,
                screenshot_image,
                conversation_chatbot,
                submit_btn,
                click_actions_group,
                actions_group,
            ],
        )

        call_dropdown.change(
            fn=ui_handler.on_call_selected,
            inputs=[call_dropdown],
            outputs=[
                screenshot_image,
                conversation_chatbot,
                submit_btn,
                click_actions_group,
                actions_group,
            ],
        )

        def handle_image_click(evt: gr.SelectData):
            if evt.index is not None:
                x, y = evt.index
                action_type = ui_handler.current_action_type or "click"
                button = ui_handler.current_button or "left"
                if action_type == "scroll":
                    sx_i = int(ui_handler.current_scroll_x or 0)
                    sy_i = int(ui_handler.current_scroll_y or 0)
                    # Submit a scroll action with x,y position and scroll deltas
                    result = ui_handler.submit_action(
                        "scroll", x=x, y=y, scroll_x=sx_i, scroll_y=sy_i
                    )
                else:
                    result = ui_handler.submit_click_action(x, y, action_type, button)
                ui_handler.wait_for_pending_calls()
                return result
            return "No coordinates selected"

        screenshot_image.select(fn=handle_image_click, outputs=[status_display]).then(
            fn=ui_handler.wait_for_pending_calls,
            outputs=[
                call_dropdown,
                screenshot_image,
                conversation_chatbot,
                submit_btn,
                click_actions_group,
                actions_group,
            ],
        )

        # Response submission
        submit_btn.click(
            fn=ui_handler.submit_response,
            inputs=[response_text],
            outputs=[response_text, status_display],
        ).then(
            fn=ui_handler.refresh_pending_calls,
            outputs=[
                call_dropdown,
                screenshot_image,
                conversation_chatbot,
                submit_btn,
                click_actions_group,
                actions_group,
            ],
        )

        # Toggle visibility of controls based on action type
        def toggle_action_controls(action_type):
            # Button visible only for click
            button_vis = gr.update(visible=(action_type == "click"))
            # Scroll inputs visible only for scroll
            scroll_x_vis = gr.update(visible=(action_type == "scroll"))
            scroll_y_vis = gr.update(visible=(action_type == "scroll"))
            # Update state
            ui_handler.current_action_type = action_type or "click"
            return button_vis, scroll_x_vis, scroll_y_vis

        action_type_radio.change(
            fn=toggle_action_controls,
            inputs=[action_type_radio],
            outputs=[action_button_radio, scroll_x_input, scroll_y_input],
        )

        # Keep other control values in ui_handler state
        def on_button_change(val):
            ui_handler.current_button = val or "left"

        action_button_radio.change(fn=on_button_change, inputs=[action_button_radio])

        def on_scroll_x_change(val):
            try:
                ui_handler.current_scroll_x = int(val) if val is not None else 0
            except Exception:
                ui_handler.current_scroll_x = 0

        scroll_x_input.change(fn=on_scroll_x_change, inputs=[scroll_x_input])

        def on_scroll_y_change(val):
            try:
                ui_handler.current_scroll_y = int(val) if val is not None else 0
            except Exception:
                ui_handler.current_scroll_y = 0

        scroll_y_input.change(fn=on_scroll_y_change, inputs=[scroll_y_input])

        type_submit_btn.click(
            fn=ui_handler.submit_type_action, inputs=[type_text], outputs=[status_display]
        ).then(
            fn=ui_handler.wait_for_pending_calls,
            outputs=[
                call_dropdown,
                screenshot_image,
                conversation_chatbot,
                submit_btn,
                click_actions_group,
                actions_group,
            ],
        )

        keypress_submit_btn.click(
            fn=ui_handler.submit_hotkey_action, inputs=[keypress_text], outputs=[status_display]
        ).then(
            fn=ui_handler.wait_for_pending_calls,
            outputs=[
                call_dropdown,
                screenshot_image,
                conversation_chatbot,
                submit_btn,
                click_actions_group,
                actions_group,
            ],
        )

        def handle_description_submit(description, action_type, button):
            if description:
                result = ui_handler.submit_description_click(description, action_type, button)
                ui_handler.wait_for_pending_calls()
                return result
            return "Please enter a description"

        description_submit_btn.click(
            fn=handle_description_submit,
            inputs=[description_text, description_action_type, description_button],
            outputs=[status_display],
        ).then(
            fn=ui_handler.wait_for_pending_calls,
            outputs=[
                call_dropdown,
                screenshot_image,
                conversation_chatbot,
                submit_btn,
                click_actions_group,
                actions_group,
            ],
        )

        # Misc action handler
        def handle_misc_submit(selected_action):
            if selected_action == "wait":
                result = ui_handler.submit_wait_action()
                ui_handler.wait_for_pending_calls()
                return result
            return f"Unsupported misc action: {selected_action}"

        misc_submit_btn.click(
            fn=handle_misc_submit, inputs=[misc_action_dropdown], outputs=[status_display]
        ).then(
            fn=ui_handler.wait_for_pending_calls,
            outputs=[
                call_dropdown,
                screenshot_image,
                conversation_chatbot,
                submit_btn,
                click_actions_group,
                actions_group,
            ],
        )

        # Load initial data
        demo.load(
            fn=ui_handler.refresh_pending_calls,
            outputs=[
                call_dropdown,
                screenshot_image,
                conversation_chatbot,
                submit_btn,
                click_actions_group,
                actions_group,
            ],
        )

    return demo


if __name__ == "__main__":
    demo = create_ui()
    demo.queue()
    demo.launch(server_name="0.0.0.0", server_port=7860)

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/responses.py:
--------------------------------------------------------------------------------

```python
"""
Functions for making various Responses API items from different types of responses.
Based on the OpenAI spec for Responses API items.
"""

import base64
import json
import uuid
from typing import Any, Dict, List, Literal, Optional, Union

from openai.types.responses.easy_input_message_param import EasyInputMessageParam
from openai.types.responses.response_computer_tool_call_param import (
    ActionClick,
    ActionDoubleClick,
    ActionDrag,
    ActionDragPath,
    ActionKeypress,
    ActionMove,
    ActionScreenshot,
    ActionScroll,
)
from openai.types.responses.response_computer_tool_call_param import (
    ActionType as ActionTypeAction,
)
from openai.types.responses.response_computer_tool_call_param import (
    ActionWait,
    PendingSafetyCheck,
    ResponseComputerToolCallParam,
)
from openai.types.responses.response_function_tool_call_param import (
    ResponseFunctionToolCallParam,
)
from openai.types.responses.response_input_image_param import ResponseInputImageParam
from openai.types.responses.response_output_message_param import (
    ResponseOutputMessageParam,
)
from openai.types.responses.response_output_text_param import ResponseOutputTextParam
from openai.types.responses.response_reasoning_item_param import (
    ResponseReasoningItemParam,
    Summary,
)


def random_id():
    return str(uuid.uuid4())


# User message items
def make_input_image_item(image_data: Union[str, bytes]) -> EasyInputMessageParam:
    return EasyInputMessageParam(
        content=[
            ResponseInputImageParam(
                type="input_image",
                image_url=f"data:image/png;base64,{base64.b64encode(image_data).decode('utf-8') if isinstance(image_data, bytes) else image_data}",
            )  # type: ignore
        ],
        role="user",
        type="message",
    )


# Text items
def make_reasoning_item(reasoning: str) -> ResponseReasoningItemParam:
    return ResponseReasoningItemParam(
        id=random_id(), summary=[Summary(text=reasoning, type="summary_text")], type="reasoning"
    )


def make_output_text_item(content: str) -> ResponseOutputMessageParam:
    return ResponseOutputMessageParam(
        id=random_id(),
        content=[ResponseOutputTextParam(text=content, type="output_text", annotations=[])],
        role="assistant",
        status="completed",
        type="message",
    )


# Function call items
def make_function_call_item(
    function_name: str, arguments: Dict[str, Any], call_id: Optional[str] = None
) -> ResponseFunctionToolCallParam:
    return ResponseFunctionToolCallParam(
        id=random_id(),
        call_id=call_id if call_id else random_id(),
        name=function_name,
        arguments=json.dumps(arguments),
        status="completed",
        type="function_call",
    )


# Computer tool call items
def make_click_item(
    x: int,
    y: int,
    button: Literal["left", "right", "wheel", "back", "forward"] = "left",
    call_id: Optional[str] = None,
) -> ResponseComputerToolCallParam:
    return ResponseComputerToolCallParam(
        id=random_id(),
        call_id=call_id if call_id else random_id(),
        action=ActionClick(button=button, type="click", x=x, y=y),
        pending_safety_checks=[],
        status="completed",
        type="computer_call",
    )


def make_double_click_item(
    x: int, y: int, call_id: Optional[str] = None
) -> ResponseComputerToolCallParam:
    return ResponseComputerToolCallParam(
        id=random_id(),
        call_id=call_id if call_id else random_id(),
        action=ActionDoubleClick(type="double_click", x=x, y=y),
        pending_safety_checks=[],
        status="completed",
        type="computer_call",
    )


def make_drag_item(
    path: List[Dict[str, int]], call_id: Optional[str] = None
) -> ResponseComputerToolCallParam:
    drag_path = [ActionDragPath(x=point["x"], y=point["y"]) for point in path]
    return ResponseComputerToolCallParam(
        id=random_id(),
        call_id=call_id if call_id else random_id(),
        action=ActionDrag(path=drag_path, type="drag"),
        pending_safety_checks=[],
        status="completed",
        type="computer_call",
    )


def make_keypress_item(
    keys: List[str], call_id: Optional[str] = None
) -> ResponseComputerToolCallParam:
    return ResponseComputerToolCallParam(
        id=random_id(),
        call_id=call_id if call_id else random_id(),
        action=ActionKeypress(keys=keys, type="keypress"),
        pending_safety_checks=[],
        status="completed",
        type="computer_call",
    )


def make_move_item(x: int, y: int, call_id: Optional[str] = None) -> ResponseComputerToolCallParam:
    return ResponseComputerToolCallParam(
        id=random_id(),
        call_id=call_id if call_id else random_id(),
        action=ActionMove(type="move", x=x, y=y),
        pending_safety_checks=[],
        status="completed",
        type="computer_call",
    )


def make_screenshot_item(call_id: Optional[str] = None) -> ResponseComputerToolCallParam:
    return ResponseComputerToolCallParam(
        id=random_id(),
        call_id=call_id if call_id else random_id(),
        action=ActionScreenshot(type="screenshot"),
        pending_safety_checks=[],
        status="completed",
        type="computer_call",
    )


def make_scroll_item(
    x: int, y: int, scroll_x: int, scroll_y: int, call_id: Optional[str] = None
) -> ResponseComputerToolCallParam:
    return ResponseComputerToolCallParam(
        id=random_id(),
        call_id=call_id if call_id else random_id(),
        action=ActionScroll(scroll_x=scroll_x, scroll_y=scroll_y, type="scroll", x=x, y=y),
        pending_safety_checks=[],
        status="completed",
        type="computer_call",
    )


def make_type_item(text: str, call_id: Optional[str] = None) -> ResponseComputerToolCallParam:
    return ResponseComputerToolCallParam(
        id=random_id(),
        call_id=call_id if call_id else random_id(),
        action=ActionTypeAction(text=text, type="type"),
        pending_safety_checks=[],
        status="completed",
        type="computer_call",
    )


def make_wait_item(call_id: Optional[str] = None) -> ResponseComputerToolCallParam:
    return ResponseComputerToolCallParam(
        id=random_id(),
        call_id=call_id if call_id else random_id(),
        action=ActionWait(type="wait"),
        pending_safety_checks=[],
        status="completed",
        type="computer_call",
    )


# Extra anthropic computer calls
def make_left_mouse_down_item(
    x: Optional[int] = None, y: Optional[int] = None, call_id: Optional[str] = None
) -> Dict[str, Any]:
    return {
        "id": random_id(),
        "call_id": call_id if call_id else random_id(),
        "action": {"type": "left_mouse_down", "x": x, "y": y},
        "pending_safety_checks": [],
        "status": "completed",
        "type": "computer_call",
    }


def make_left_mouse_up_item(
    x: Optional[int] = None, y: Optional[int] = None, call_id: Optional[str] = None
) -> Dict[str, Any]:
    return {
        "id": random_id(),
        "call_id": call_id if call_id else random_id(),
        "action": {"type": "left_mouse_up", "x": x, "y": y},
        "pending_safety_checks": [],
        "status": "completed",
        "type": "computer_call",
    }


def make_failed_tool_call_items(
    tool_name: str, tool_kwargs: Dict[str, Any], error_message: str, call_id: Optional[str] = None
) -> List[Dict[str, Any]]:
    call_id = call_id if call_id else random_id()
    return [
        {
            "type": "function_call",
            "id": random_id(),
            "call_id": call_id,
            "name": tool_name,
            "arguments": json.dumps(tool_kwargs),
        },
        {
            "type": "function_call_output",
            "call_id": call_id,
            "output": json.dumps({"error": error_message}),
        },
    ]


def make_tool_error_item(error_message: str, call_id: Optional[str] = None) -> Dict[str, Any]:
    call_id = call_id if call_id else random_id()
    return {
        "type": "function_call_output",
        "call_id": call_id,
        "output": json.dumps({"error": error_message}),
    }


def replace_failed_computer_calls_with_function_calls(
    messages: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
    """
    Replace computer_call items with function_call items if they share a call_id with a function_call_output.
    This indicates the computer call failed and should be treated as a function call instead.
    We do this because the computer_call_output items do not support text output.

    Args:
        messages: List of message items to process
    """
    messages = messages.copy()

    # Find all call_ids that have function_call_output items
    failed_call_ids = set()
    for msg in messages:
        if msg.get("type") == "function_call_output":
            call_id = msg.get("call_id")
            if call_id:
                failed_call_ids.add(call_id)

    # Replace computer_call items that have matching call_ids
    for i, msg in enumerate(messages):
        if msg.get("type") == "computer_call" and msg.get("call_id") in failed_call_ids:

            # Extract action from computer_call
            action = msg.get("action", {})
            call_id = msg.get("call_id")

            # Create function_call replacement
            messages[i] = {
                "type": "function_call",
                "id": msg.get("id", random_id()),
                "call_id": call_id,
                "name": "computer",
                "arguments": json.dumps(action),
            }

    return messages


# Conversion functions between element descriptions and coordinates
def convert_computer_calls_desc2xy(
    responses_items: List[Dict[str, Any]], desc2xy: Dict[str, tuple]
) -> List[Dict[str, Any]]:
    """
    Convert computer calls from element descriptions to x,y coordinates.

    Args:
        responses_items: List of response items containing computer calls with element_description
        desc2xy: Dictionary mapping element descriptions to (x, y) coordinate tuples

    Returns:
        List of response items with element_description replaced by x,y coordinates
    """
    converted_items = []

    for item in responses_items:
        if item.get("type") == "computer_call" and "action" in item:
            action = item["action"].copy()

            # Handle single element_description
            if "element_description" in action:
                desc = action["element_description"]
                if desc in desc2xy:
                    x, y = desc2xy[desc]
                    action["x"] = x
                    action["y"] = y
                    del action["element_description"]

            # Handle start_element_description and end_element_description for drag operations
            elif "start_element_description" in action and "end_element_description" in action:
                start_desc = action["start_element_description"]
                end_desc = action["end_element_description"]

                if start_desc in desc2xy and end_desc in desc2xy:
                    start_x, start_y = desc2xy[start_desc]
                    end_x, end_y = desc2xy[end_desc]
                    action["path"] = [{"x": start_x, "y": start_y}, {"x": end_x, "y": end_y}]
                    del action["start_element_description"]
                    del action["end_element_description"]

            converted_item = item.copy()
            converted_item["action"] = action
            converted_items.append(converted_item)
        else:
            converted_items.append(item)

    return converted_items


def convert_computer_calls_xy2desc(
    responses_items: List[Dict[str, Any]], desc2xy: Dict[str, tuple]
) -> List[Dict[str, Any]]:
    """
    Convert computer calls from x,y coordinates to element descriptions.

    Args:
        responses_items: List of response items containing computer calls with x,y coordinates
        desc2xy: Dictionary mapping element descriptions to (x, y) coordinate tuples

    Returns:
        List of response items with x,y coordinates replaced by element_description
    """
    # Create reverse mapping from coordinates to descriptions
    xy2desc = {coords: desc for desc, coords in desc2xy.items()}

    converted_items = []

    for item in responses_items:
        if item.get("type") == "computer_call" and "action" in item:
            action = item["action"].copy()

            # Handle single x,y coordinates
            if "x" in action and "y" in action:
                coords = (action["x"], action["y"])
                if coords in xy2desc:
                    action["element_description"] = xy2desc[coords]
                    del action["x"]
                    del action["y"]

            # Handle path for drag operations
            elif "path" in action and isinstance(action["path"], list) and len(action["path"]) == 2:
                start_point = action["path"][0]
                end_point = action["path"][1]

                if (
                    "x" in start_point
                    and "y" in start_point
                    and "x" in end_point
                    and "y" in end_point
                ):

                    start_coords = (start_point["x"], start_point["y"])
                    end_coords = (end_point["x"], end_point["y"])

                    if start_coords in xy2desc and end_coords in xy2desc:
                        action["start_element_description"] = xy2desc[start_coords]
                        action["end_element_description"] = xy2desc[end_coords]
                        del action["path"]

            converted_item = item.copy()
            converted_item["action"] = action
            converted_items.append(converted_item)
        else:
            converted_items.append(item)

    return converted_items


def get_all_element_descriptions(responses_items: List[Dict[str, Any]]) -> List[str]:
    """
    Extract all element descriptions from computer calls in responses items.

    Args:
        responses_items: List of response items containing computer calls

    Returns:
        List of unique element descriptions found in computer calls
    """
    descriptions = set()

    for item in responses_items:
        if item.get("type") == "computer_call" and "action" in item:
            action = item["action"]

            # Handle single element_description
            if "element_description" in action:
                descriptions.add(action["element_description"])

            # Handle start_element_description and end_element_description for drag operations
            if "start_element_description" in action:
                descriptions.add(action["start_element_description"])

            if "end_element_description" in action:
                descriptions.add(action["end_element_description"])

    return list(descriptions)


# Conversion functions between responses_items and completion messages formats
def convert_responses_items_to_completion_messages(
    messages: List[Dict[str, Any]],
    allow_images_in_tool_results: bool = True,
    send_multiple_user_images_per_parallel_tool_results: bool = False,
) -> List[Dict[str, Any]]:
    """Convert responses_items message format to liteLLM completion format.

    Args:
        messages: List of responses_items format messages
        allow_images_in_tool_results: If True, include images in tool role messages.
                                    If False, send tool message + separate user message with image.
        send_multiple_user_images_per_parallel_tool_results: If True, send multiple user images in parallel tool results.
    """
    completion_messages = []

    for i, message in enumerate(messages):
        msg_type = message.get("type")
        role = message.get("role")

        # Handle user messages (both with and without explicit type)
        if role == "user" or msg_type == "user":
            content = message.get("content", "")
            if isinstance(content, list):
                # Handle list content (images, text blocks)
                completion_content = []
                for item in content:
                    if item.get("type") == "input_image":
                        completion_content.append(
                            {"type": "image_url", "image_url": {"url": item.get("image_url")}}
                        )
                    elif item.get("type") == "input_text":
                        completion_content.append({"type": "text", "text": item.get("text")})
                    elif item.get("type") == "text":
                        completion_content.append({"type": "text", "text": item.get("text")})

                completion_messages.append({"role": "user", "content": completion_content})
            elif isinstance(content, str):
                # Handle string content
                completion_messages.append({"role": "user", "content": content})

        # Handle assistant messages
        elif role == "assistant" or msg_type == "message":
            content = message.get("content", [])
            if isinstance(content, list):
                text_parts = []
                for item in content:
                    if item.get("type") == "output_text":
                        text_parts.append(item.get("text", ""))
                    elif item.get("type") == "text":
                        text_parts.append(item.get("text", ""))

                if text_parts:
                    completion_messages.append(
                        {"role": "assistant", "content": "\n".join(text_parts)}
                    )

        # Handle reasoning items (convert to assistant message)
        elif msg_type == "reasoning":
            summary = message.get("summary", [])
            text_parts = []
            for item in summary:
                if item.get("type") == "summary_text":
                    text_parts.append(item.get("text", ""))

            if text_parts:
                completion_messages.append({"role": "assistant", "content": "\n".join(text_parts)})

        # Handle function calls
        elif msg_type == "function_call":
            # Add tool call to last assistant message or create new one
            if not completion_messages or completion_messages[-1]["role"] != "assistant":
                completion_messages.append({"role": "assistant", "content": "", "tool_calls": []})

            if "tool_calls" not in completion_messages[-1]:
                completion_messages[-1]["tool_calls"] = []

            completion_messages[-1]["tool_calls"].append(
                {
                    "id": message.get("call_id"),
                    "type": "function",
                    "function": {
                        "name": message.get("name"),
                        "arguments": message.get("arguments"),
                    },
                }
            )

        # Handle computer calls
        elif msg_type == "computer_call":
            # Add tool call to last assistant message or create new one
            if not completion_messages or completion_messages[-1]["role"] != "assistant":
                completion_messages.append({"role": "assistant", "content": "", "tool_calls": []})

            if "tool_calls" not in completion_messages[-1]:
                completion_messages[-1]["tool_calls"] = []

            action = message.get("action", {})
            completion_messages[-1]["tool_calls"].append(
                {
                    "id": message.get("call_id"),
                    "type": "function",
                    "function": {"name": "computer", "arguments": json.dumps(action)},
                }
            )

        # Handle function/computer call outputs
        elif msg_type in ["function_call_output", "computer_call_output"]:
            output = message.get("output")
            call_id = message.get("call_id")

            if isinstance(output, dict) and output.get("type") == "input_image":
                if allow_images_in_tool_results:
                    # Handle image output as tool response (may not work with all APIs)
                    completion_messages.append(
                        {
                            "role": "tool",
                            "tool_call_id": call_id,
                            "content": [
                                {"type": "image_url", "image_url": {"url": output.get("image_url")}}
                            ],
                        }
                    )
                else:
                    # Determine if the next message is also a tool call output
                    next_type = None
                    if i + 1 < len(messages):
                        next_msg = messages[i + 1]
                        next_type = next_msg.get("type")
                    is_next_message_image_result = next_type in [
                        "computer_call_output",
                    ]
                    # Send tool message + separate user message with image (OpenAI compatible)
                    completion_messages += (
                        [
                            {
                                "role": "tool",
                                "tool_call_id": call_id,
                                "content": "[Execution completed. See screenshot below]",
                            },
                            {
                                "role": "user",
                                "content": [
                                    {
                                        "type": "image_url",
                                        "image_url": {"url": output.get("image_url")},
                                    }
                                ],
                            },
                        ]
                        if send_multiple_user_images_per_parallel_tool_results
                        or (not is_next_message_image_result)
                        else [
                            {
                                "role": "tool",
                                "tool_call_id": call_id,
                                "content": "[Execution completed. See screenshot below]",
                            },
                        ]
                    )
            else:
                # Handle text output as tool response
                completion_messages.append(
                    {"role": "tool", "tool_call_id": call_id, "content": str(output)}
                )

    return completion_messages


def convert_completion_messages_to_responses_items(
    completion_messages: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
    """Convert completion messages format to responses_items message format."""
    responses_items = []
    skip_next = False

    for i, message in enumerate(completion_messages):
        if skip_next:
            skip_next = False
            continue

        role = message.get("role")
        content = message.get("content")
        tool_calls = message.get("tool_calls", [])

        # Handle assistant messages with text content
        if role == "assistant" and content and isinstance(content, str):
            responses_items.append(
                {
                    "type": "message",
                    "role": "assistant",
                    "content": [{"type": "output_text", "text": content}],
                }
            )

        # Handle tool calls
        if tool_calls:
            for tool_call in tool_calls:
                if tool_call.get("type") == "function":
                    function = tool_call.get("function", {})
                    function_name = function.get("name")

                    if function_name == "computer":
                        # Parse computer action
                        try:
                            action = json.loads(function.get("arguments", "{}"))
                            # Change key from "action" -> "type"
                            if action.get("action"):
                                action["type"] = action["action"]
                                del action["action"]
                            responses_items.append(
                                {
                                    "type": "computer_call",
                                    "call_id": tool_call.get("id"),
                                    "action": action,
                                    "status": "completed",
                                }
                            )
                        except json.JSONDecodeError:
                            # Fallback to function call format
                            responses_items.append(
                                {
                                    "type": "function_call",
                                    "call_id": tool_call.get("id"),
                                    "name": function_name,
                                    "arguments": function.get("arguments", "{}"),
                                    "status": "completed",
                                }
                            )
                    else:
                        # Regular function call
                        responses_items.append(
                            {
                                "type": "function_call",
                                "call_id": tool_call.get("id"),
                                "name": function_name,
                                "arguments": function.get("arguments", "{}"),
                                "status": "completed",
                            }
                        )

        # Handle tool messages (function/computer call outputs)
        elif role == "tool" and content:
            tool_call_id = message.get("tool_call_id")
            if isinstance(content, str):
                # Check if this is the "[Execution completed. See screenshot below]" pattern
                if content == "[Execution completed. See screenshot below]":
                    # Look ahead for the next user message with image
                    next_idx = i + 1
                    if (
                        next_idx < len(completion_messages)
                        and completion_messages[next_idx].get("role") == "user"
                        and isinstance(completion_messages[next_idx].get("content"), list)
                    ):
                        # Found the pattern - extract image from next message
                        next_content = completion_messages[next_idx]["content"]
                        for item in next_content:
                            if item.get("type") == "image_url":
                                responses_items.append(
                                    {
                                        "type": "computer_call_output",
                                        "call_id": tool_call_id,
                                        "output": {
                                            "type": "input_image",
                                            "image_url": item.get("image_url", {}).get("url"),
                                        },
                                    }
                                )
                                # Skip the next user message since we processed it
                                skip_next = True
                                break
                    else:
                        # No matching user message, treat as regular text
                        responses_items.append(
                            {
                                "type": "computer_call_output",
                                "call_id": tool_call_id,
                                "output": content,
                            }
                        )
                else:
                    # Determine if this is a computer call or function call output
                    try:
                        # Try to parse as structured output
                        parsed_content = json.loads(content)
                        if parsed_content.get("type") == "input_image":
                            responses_items.append(
                                {
                                    "type": "computer_call_output",
                                    "call_id": tool_call_id,
                                    "output": parsed_content,
                                }
                            )
                        else:
                            responses_items.append(
                                {
                                    "type": "computer_call_output",
                                    "call_id": tool_call_id,
                                    "output": content,
                                }
                            )
                    except json.JSONDecodeError:
                        # Plain text output - could be function or computer call
                        responses_items.append(
                            {
                                "type": "function_call_output",
                                "call_id": tool_call_id,
                                "output": content,
                            }
                        )
            elif isinstance(content, list):
                # Handle structured content (e.g., images)
                for item in content:
                    if item.get("type") == "image_url":
                        responses_items.append(
                            {
                                "type": "computer_call_output",
                                "call_id": tool_call_id,
                                "output": {
                                    "type": "input_image",
                                    "image_url": item.get("image_url", {}).get("url"),
                                },
                            }
                        )
                    elif item.get("type") == "text":
                        responses_items.append(
                            {
                                "type": "function_call_output",
                                "call_id": tool_call_id,
                                "output": item.get("text"),
                            }
                        )

        # Handle actual user messages
        elif role == "user" and content:
            if isinstance(content, list):
                # Handle structured user content (e.g., text + images)
                user_content = []
                for item in content:
                    if item.get("type") == "image_url":
                        user_content.append(
                            {
                                "type": "input_image",
                                "image_url": item.get("image_url", {}).get("url"),
                            }
                        )
                    elif item.get("type") == "text":
                        user_content.append({"type": "input_text", "text": item.get("text")})

                if user_content:
                    responses_items.append(
                        {"role": "user", "type": "message", "content": user_content}
                    )
            elif isinstance(content, str):
                # Handle simple text user message
                responses_items.append({"role": "user", "content": content})

    return responses_items

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/agent.py:
--------------------------------------------------------------------------------

```python
"""
ComputerAgent - Main agent class that selects and runs agent loops
"""

import asyncio
import inspect
import json
from pathlib import Path
from typing import (
    Any,
    AsyncGenerator,
    Callable,
    Dict,
    List,
    Optional,
    Set,
    Tuple,
    Union,
    cast,
)

import litellm
import litellm.utils
from litellm.responses.utils import Usage

from .adapters import CUAAdapter, HuggingFaceLocalAdapter, HumanAdapter, MLXVLMAdapter
from .callbacks import (
    BudgetManagerCallback,
    ImageRetentionCallback,
    LoggingCallback,
    OperatorNormalizerCallback,
    PromptInstructionsCallback,
    TelemetryCallback,
    TrajectorySaverCallback,
)
from .computers import AsyncComputerHandler, is_agent_computer, make_computer_handler
from .decorators import find_agent_config
from .responses import (
    make_tool_error_item,
    replace_failed_computer_calls_with_function_calls,
)
from .types import AgentCapability, IllegalArgumentError, Messages, ToolError


def assert_callable_with(f, *args, **kwargs):
    """Check if function can be called with given arguments."""
    try:
        inspect.signature(f).bind(*args, **kwargs)
        return True
    except TypeError as e:
        sig = inspect.signature(f)
        raise IllegalArgumentError(f"Expected {sig}, got args={args} kwargs={kwargs}") from e


def get_json(obj: Any, max_depth: int = 10) -> Any:
    def custom_serializer(o: Any, depth: int = 0, seen: Optional[Set[int]] = None) -> Any:
        if seen is None:
            seen = set()

        # Use model_dump() if available
        if hasattr(o, "model_dump"):
            return o.model_dump()

        # Check depth limit
        if depth > max_depth:
            return f"<max_depth_exceeded:{max_depth}>"

        # Check for circular references using object id
        obj_id = id(o)
        if obj_id in seen:
            return f"<circular_reference:{type(o).__name__}>"

        # Handle Computer objects
        if hasattr(o, "__class__") and "computer" in o.__class__.__name__.lower():
            return f"<computer:{o.__class__.__name__}>"

        # Handle objects with __dict__
        if hasattr(o, "__dict__"):
            seen.add(obj_id)
            try:
                result = {}
                for k, v in o.__dict__.items():
                    if v is not None:
                        # Recursively serialize with updated depth and seen set
                        serialized_value = custom_serializer(v, depth + 1, seen.copy())
                        result[k] = serialized_value
                return result
            finally:
                seen.discard(obj_id)

        # Handle common types that might contain nested objects
        elif isinstance(o, dict):
            seen.add(obj_id)
            try:
                return {
                    k: custom_serializer(v, depth + 1, seen.copy())
                    for k, v in o.items()
                    if v is not None
                }
            finally:
                seen.discard(obj_id)

        elif isinstance(o, (list, tuple, set)):
            seen.add(obj_id)
            try:
                return [
                    custom_serializer(item, depth + 1, seen.copy())
                    for item in o
                    if item is not None
                ]
            finally:
                seen.discard(obj_id)

        # For basic types that json.dumps can handle
        elif isinstance(o, (str, int, float, bool)) or o is None:
            return o

        # Fallback to string representation
        else:
            return str(o)

    def remove_nones(obj: Any) -> Any:
        if isinstance(obj, dict):
            return {k: remove_nones(v) for k, v in obj.items() if v is not None}
        elif isinstance(obj, list):
            return [remove_nones(item) for item in obj if item is not None]
        return obj

    # Serialize with circular reference and depth protection
    serialized = custom_serializer(obj)

    # Convert to JSON string and back to ensure JSON compatibility
    json_str = json.dumps(serialized)
    parsed = json.loads(json_str)

    # Final cleanup of any remaining None values
    return remove_nones(parsed)


def sanitize_message(msg: Any) -> Any:
    """Return a copy of the message with image_url omitted for computer_call_output messages."""
    if msg.get("type") == "computer_call_output":
        output = msg.get("output", {})
        if isinstance(output, dict):
            sanitized = msg.copy()
            sanitized["output"] = {**output, "image_url": "[omitted]"}
            return sanitized
    return msg


def get_output_call_ids(messages: List[Dict[str, Any]]) -> List[str]:
    call_ids = []
    for message in messages:
        if (
            message.get("type") == "computer_call_output"
            or message.get("type") == "function_call_output"
        ):
            call_ids.append(message.get("call_id"))
    return call_ids


class ComputerAgent:
    """
    Main agent class that automatically selects the appropriate agent loop
    based on the model and executes tool calls.
    """

    def __init__(
        self,
        model: str,
        tools: Optional[List[Any]] = None,
        custom_loop: Optional[Callable] = None,
        only_n_most_recent_images: Optional[int] = None,
        callbacks: Optional[List[Any]] = None,
        instructions: Optional[str] = None,
        verbosity: Optional[int] = None,
        trajectory_dir: Optional[str | Path | dict] = None,
        max_retries: Optional[int] = 3,
        screenshot_delay: Optional[float | int] = 0.5,
        use_prompt_caching: Optional[bool] = False,
        max_trajectory_budget: Optional[float | dict] = None,
        telemetry_enabled: Optional[bool] = True,
        trust_remote_code: Optional[bool] = False,
        api_key: Optional[str] = None,
        api_base: Optional[str] = None,
        **additional_generation_kwargs,
    ):
        """
        Initialize ComputerAgent.

        Args:
            model: Model name (e.g., "claude-sonnet-4-5-20250929", "computer-use-preview", "omni+vertex_ai/gemini-pro")
            tools: List of tools (computer objects, decorated functions, etc.)
            custom_loop: Custom agent loop function to use instead of auto-selection
            only_n_most_recent_images: If set, only keep the N most recent images in message history. Adds ImageRetentionCallback automatically.
            callbacks: List of AsyncCallbackHandler instances for preprocessing/postprocessing
            instructions: Optional system instructions to be passed to the model
            verbosity: Logging level (logging.DEBUG, logging.INFO, etc.). If set, adds LoggingCallback automatically
            trajectory_dir: If set, saves trajectory data (screenshots, responses) to this directory. Adds TrajectorySaverCallback automatically.
            max_retries: Maximum number of retries for failed API calls
            screenshot_delay: Delay before screenshots in seconds
            use_prompt_caching: If set, use prompt caching to avoid reprocessing the same prompt. Intended for use with anthropic providers.
            max_trajectory_budget: If set, adds BudgetManagerCallback to track usage costs and stop when budget is exceeded
            telemetry_enabled: If set, adds TelemetryCallback to track anonymized usage data. Enabled by default.
            trust_remote_code: If set, trust remote code when loading local models. Disabled by default.
            api_key: Optional API key override for the model provider
            api_base: Optional API base URL override for the model provider
            **additional_generation_kwargs: Additional arguments passed to the model provider
        """
        # If the loop is "human/human", we need to prefix a grounding model fallback
        if model in ["human/human", "human"]:
            model = "openai/computer-use-preview+human/human"

        self.model = model
        self.tools = tools or []
        self.custom_loop = custom_loop
        self.only_n_most_recent_images = only_n_most_recent_images
        self.callbacks = callbacks or []
        self.instructions = instructions
        self.verbosity = verbosity
        self.trajectory_dir = trajectory_dir
        self.max_retries = max_retries
        self.screenshot_delay = screenshot_delay
        self.use_prompt_caching = use_prompt_caching
        self.telemetry_enabled = telemetry_enabled
        self.kwargs = additional_generation_kwargs
        self.trust_remote_code = trust_remote_code
        self.api_key = api_key
        self.api_base = api_base

        # == Add built-in callbacks ==

        # Prepend operator normalizer callback
        self.callbacks.insert(0, OperatorNormalizerCallback())

        # Add prompt instructions callback if provided
        if self.instructions:
            self.callbacks.append(PromptInstructionsCallback(self.instructions))

        # Add logging callback if verbosity is set
        if self.verbosity is not None:
            self.callbacks.append(LoggingCallback(level=self.verbosity))

        # Add image retention callback if only_n_most_recent_images is set
        if self.only_n_most_recent_images:
            self.callbacks.append(ImageRetentionCallback(self.only_n_most_recent_images))

        # Add trajectory saver callback if trajectory_dir is set
        if self.trajectory_dir:
            if isinstance(self.trajectory_dir, dict):
                self.callbacks.append(TrajectorySaverCallback(**self.trajectory_dir))
            elif isinstance(self.trajectory_dir, (str, Path)):
                self.callbacks.append(TrajectorySaverCallback(str(self.trajectory_dir)))

        # Add budget manager if max_trajectory_budget is set
        if max_trajectory_budget:
            if isinstance(max_trajectory_budget, dict):
                self.callbacks.append(BudgetManagerCallback(**max_trajectory_budget))
            else:
                self.callbacks.append(BudgetManagerCallback(max_trajectory_budget))

        # == Enable local model providers w/ LiteLLM ==

        # Register local model providers
        hf_adapter = HuggingFaceLocalAdapter(
            device="auto", trust_remote_code=self.trust_remote_code or False
        )
        human_adapter = HumanAdapter()
        mlx_adapter = MLXVLMAdapter()
        cua_adapter = CUAAdapter()
        litellm.custom_provider_map = [
            {"provider": "huggingface-local", "custom_handler": hf_adapter},
            {"provider": "human", "custom_handler": human_adapter},
            {"provider": "mlx", "custom_handler": mlx_adapter},
            {"provider": "cua", "custom_handler": cua_adapter},
        ]
        litellm.suppress_debug_info = True

        # == Initialize computer agent ==

        # Find the appropriate agent loop
        if custom_loop:
            self.agent_loop = custom_loop
            self.agent_config_info = None
        else:
            config_info = find_agent_config(model)
            if not config_info:
                raise ValueError(f"No agent config found for model: {model}")
            # Instantiate the agent config class
            self.agent_loop = config_info.agent_class()
            self.agent_config_info = config_info

        # Add telemetry callback AFTER agent_loop is set so it can capture the correct agent_type
        if self.telemetry_enabled:
            if isinstance(self.telemetry_enabled, bool):
                self.callbacks.append(TelemetryCallback(self))
            else:
                self.callbacks.append(TelemetryCallback(self, **self.telemetry_enabled))

        self.tool_schemas = []
        self.computer_handler = None

    async def _initialize_computers(self):
        """Initialize computer objects"""
        if not self.tool_schemas:
            # Process tools and create tool schemas
            self.tool_schemas = self._process_tools()

            # Find computer tool and create interface adapter
            computer_handler = None
            for schema in self.tool_schemas:
                if schema["type"] == "computer":
                    computer_handler = await make_computer_handler(schema["computer"])
                    break
            self.computer_handler = computer_handler

    def _process_input(self, input: Messages) -> List[Dict[str, Any]]:
        """Process input messages and create schemas for the agent loop"""
        if isinstance(input, str):
            return [{"role": "user", "content": input}]
        return [get_json(msg) for msg in input]

    def _process_tools(self) -> List[Dict[str, Any]]:
        """Process tools and create schemas for the agent loop"""
        schemas = []

        for tool in self.tools:
            # Check if it's a computer object (has interface attribute)
            if is_agent_computer(tool):
                # This is a computer tool - will be handled by agent loop
                schemas.append({"type": "computer", "computer": tool})
            elif callable(tool):
                # Use litellm.utils.function_to_dict to extract schema from docstring
                try:
                    function_schema = litellm.utils.function_to_dict(tool)
                    schemas.append({"type": "function", "function": function_schema})
                except Exception as e:
                    print(f"Warning: Could not process tool {tool}: {e}")
            else:
                print(f"Warning: Unknown tool type: {tool}")

        return schemas

    def _get_tool(self, name: str) -> Optional[Callable]:
        """Get a tool by name"""
        for tool in self.tools:
            if hasattr(tool, "__name__") and tool.__name__ == name:
                return tool
            elif hasattr(tool, "func") and tool.func.__name__ == name:
                return tool
        return None

    # ============================================================================
    # AGENT RUN LOOP LIFECYCLE HOOKS
    # ============================================================================

    async def _on_run_start(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]]) -> None:
        """Initialize run tracking by calling callbacks."""
        for callback in self.callbacks:
            if hasattr(callback, "on_run_start"):
                await callback.on_run_start(kwargs, old_items)

    async def _on_run_end(
        self,
        kwargs: Dict[str, Any],
        old_items: List[Dict[str, Any]],
        new_items: List[Dict[str, Any]],
    ) -> None:
        """Finalize run tracking by calling callbacks."""
        for callback in self.callbacks:
            if hasattr(callback, "on_run_end"):
                await callback.on_run_end(kwargs, old_items, new_items)

    async def _on_run_continue(
        self,
        kwargs: Dict[str, Any],
        old_items: List[Dict[str, Any]],
        new_items: List[Dict[str, Any]],
    ) -> bool:
        """Check if run should continue by calling callbacks."""
        for callback in self.callbacks:
            if hasattr(callback, "on_run_continue"):
                should_continue = await callback.on_run_continue(kwargs, old_items, new_items)
                if not should_continue:
                    return False
        return True

    async def _on_llm_start(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Prepare messages for the LLM call by applying callbacks."""
        result = messages
        for callback in self.callbacks:
            if hasattr(callback, "on_llm_start"):
                result = await callback.on_llm_start(result)
        return result

    async def _on_llm_end(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Postprocess messages after the LLM call by applying callbacks."""
        result = messages
        for callback in self.callbacks:
            if hasattr(callback, "on_llm_end"):
                result = await callback.on_llm_end(result)
        return result

    async def _on_responses(self, kwargs: Dict[str, Any], responses: Dict[str, Any]) -> None:
        """Called when responses are received."""
        for callback in self.callbacks:
            if hasattr(callback, "on_responses"):
                await callback.on_responses(get_json(kwargs), get_json(responses))

    async def _on_computer_call_start(self, item: Dict[str, Any]) -> None:
        """Called when a computer call is about to start."""
        for callback in self.callbacks:
            if hasattr(callback, "on_computer_call_start"):
                await callback.on_computer_call_start(get_json(item))

    async def _on_computer_call_end(
        self, item: Dict[str, Any], result: List[Dict[str, Any]]
    ) -> None:
        """Called when a computer call has completed."""
        for callback in self.callbacks:
            if hasattr(callback, "on_computer_call_end"):
                await callback.on_computer_call_end(get_json(item), get_json(result))

    async def _on_function_call_start(self, item: Dict[str, Any]) -> None:
        """Called when a function call is about to start."""
        for callback in self.callbacks:
            if hasattr(callback, "on_function_call_start"):
                await callback.on_function_call_start(get_json(item))

    async def _on_function_call_end(
        self, item: Dict[str, Any], result: List[Dict[str, Any]]
    ) -> None:
        """Called when a function call has completed."""
        for callback in self.callbacks:
            if hasattr(callback, "on_function_call_end"):
                await callback.on_function_call_end(get_json(item), get_json(result))

    async def _on_text(self, item: Dict[str, Any]) -> None:
        """Called when a text message is encountered."""
        for callback in self.callbacks:
            if hasattr(callback, "on_text"):
                await callback.on_text(get_json(item))

    async def _on_api_start(self, kwargs: Dict[str, Any]) -> None:
        """Called when an LLM API call is about to start."""
        for callback in self.callbacks:
            if hasattr(callback, "on_api_start"):
                await callback.on_api_start(get_json(kwargs))

    async def _on_api_end(self, kwargs: Dict[str, Any], result: Any) -> None:
        """Called when an LLM API call has completed."""
        for callback in self.callbacks:
            if hasattr(callback, "on_api_end"):
                await callback.on_api_end(get_json(kwargs), get_json(result))

    async def _on_usage(self, usage: Dict[str, Any]) -> None:
        """Called when usage information is received."""
        for callback in self.callbacks:
            if hasattr(callback, "on_usage"):
                await callback.on_usage(get_json(usage))

    async def _on_screenshot(self, screenshot: Union[str, bytes], name: str = "screenshot") -> None:
        """Called when a screenshot is taken."""
        for callback in self.callbacks:
            if hasattr(callback, "on_screenshot"):
                await callback.on_screenshot(screenshot, name)

    # ============================================================================
    # AGENT OUTPUT PROCESSING
    # ============================================================================

    async def _handle_item(
        self,
        item: Any,
        computer: Optional[AsyncComputerHandler] = None,
        ignore_call_ids: Optional[List[str]] = None,
    ) -> List[Dict[str, Any]]:
        """Handle each item; may cause a computer action + screenshot."""
        call_id = item.get("call_id")
        if ignore_call_ids and call_id and call_id in ignore_call_ids:
            return []

        item_type = item.get("type", None)

        if item_type == "message":
            await self._on_text(item)
            # # Print messages
            # if item.get("content"):
            #     for content_item in item.get("content"):
            #         if content_item.get("text"):
            #             print(content_item.get("text"))
            return []

        try:
            if item_type == "computer_call":
                await self._on_computer_call_start(item)
                if not computer:
                    raise ValueError("Computer handler is required for computer calls")

                # Perform computer actions
                action = item.get("action")
                action_type = action.get("type")
                if action_type is None:
                    print(
                        f"Action type cannot be `None`: action={action}, action_type={action_type}"
                    )
                    return []

                # Extract action arguments (all fields except 'type')
                action_args = {k: v for k, v in action.items() if k != "type"}

                # print(f"{action_type}({action_args})")

                # Execute the computer action
                computer_method = getattr(computer, action_type, None)
                if computer_method:
                    assert_callable_with(computer_method, **action_args)
                    await computer_method(**action_args)
                else:
                    raise ToolError(f"Unknown computer action: {action_type}")

                # Take screenshot after action
                if self.screenshot_delay and self.screenshot_delay > 0:
                    await asyncio.sleep(self.screenshot_delay)
                screenshot_base64 = await computer.screenshot()
                await self._on_screenshot(screenshot_base64, "screenshot_after")

                # Handle safety checks
                pending_checks = item.get("pending_safety_checks", [])
                acknowledged_checks = []
                for check in pending_checks:
                    check_message = check.get("message", str(check))
                    acknowledged_checks.append(check)
                    # TODO: implement a callback for safety checks
                    # if acknowledge_safety_check_callback(check_message, allow_always=True):
                    #     acknowledged_checks.append(check)
                    # else:
                    #     raise ValueError(f"Safety check failed: {check_message}")

                # Create call output
                call_output = {
                    "type": "computer_call_output",
                    "call_id": item.get("call_id"),
                    "acknowledged_safety_checks": acknowledged_checks,
                    "output": {
                        "type": "input_image",
                        "image_url": f"data:image/png;base64,{screenshot_base64}",
                    },
                }

                # # Additional URL safety checks for browser environments
                # if await computer.get_environment() == "browser":
                #     current_url = await computer.get_current_url()
                #     call_output["output"]["current_url"] = current_url
                #     # TODO: implement a callback for URL safety checks
                #     # check_blocklisted_url(current_url)

                result = [call_output]
                await self._on_computer_call_end(item, result)
                return result

            if item_type == "function_call":
                await self._on_function_call_start(item)
                # Perform function call
                function = self._get_tool(item.get("name"))
                if not function:
                    raise ToolError(f"Function {item.get('name')} not found")

                args = json.loads(item.get("arguments"))

                # Validate arguments before execution
                assert_callable_with(function, **args)

                # Execute function - use asyncio.to_thread for non-async functions
                if inspect.iscoroutinefunction(function):
                    result = await function(**args)
                else:
                    result = await asyncio.to_thread(function, **args)

                # Create function call output
                call_output = {
                    "type": "function_call_output",
                    "call_id": item.get("call_id"),
                    "output": str(result),
                }

                result = [call_output]
                await self._on_function_call_end(item, result)
                return result
        except ToolError as e:
            return [make_tool_error_item(repr(e), call_id)]

        return []

    # ============================================================================
    # MAIN AGENT LOOP
    # ============================================================================

    async def run(
        self,
        messages: Messages,
        stream: bool = False,
        api_key: Optional[str] = None,
        api_base: Optional[str] = None,
        **additional_generation_kwargs,
    ) -> AsyncGenerator[Dict[str, Any], None]:
        """
        Run the agent with the given messages using Computer protocol handler pattern.

        Args:
            messages: List of message dictionaries
            stream: Whether to stream the response
            api_key: Optional API key override for the model provider
            api_base: Optional API base URL override for the model provider
            **additional_generation_kwargs: Additional arguments passed to the model provider

        Returns:
            AsyncGenerator that yields response chunks
        """
        if not self.agent_config_info:
            raise ValueError("Agent configuration not found")

        capabilities = self.get_capabilities()
        if "step" not in capabilities:
            raise ValueError(
                f"Agent loop {self.agent_config_info.agent_class.__name__} does not support step predictions"
            )

        await self._initialize_computers()

        # Merge kwargs and thread api credentials (run overrides constructor)
        merged_kwargs = {**self.kwargs, **additional_generation_kwargs}
        if (api_key is not None) or (self.api_key is not None):
            merged_kwargs["api_key"] = api_key if api_key is not None else self.api_key
        if (api_base is not None) or (self.api_base is not None):
            merged_kwargs["api_base"] = api_base if api_base is not None else self.api_base

        old_items = self._process_input(messages)
        new_items = []

        # Initialize run tracking
        run_kwargs = {
            "messages": messages,
            "stream": stream,
            "model": self.model,
            "agent_loop": self.agent_config_info.agent_class.__name__,
            **merged_kwargs,
        }
        await self._on_run_start(run_kwargs, old_items)

        while new_items[-1].get("role") != "assistant" if new_items else True:
            # Lifecycle hook: Check if we should continue based on callbacks (e.g., budget manager)
            should_continue = await self._on_run_continue(run_kwargs, old_items, new_items)
            if not should_continue:
                break

            # Lifecycle hook: Prepare messages for the LLM call
            # Use cases:
            # - PII anonymization
            # - Image retention policy
            combined_messages = old_items + new_items
            combined_messages = replace_failed_computer_calls_with_function_calls(combined_messages)
            preprocessed_messages = await self._on_llm_start(combined_messages)

            loop_kwargs = {
                "messages": preprocessed_messages,
                "model": self.model,
                "tools": self.tool_schemas,
                "stream": False,
                "computer_handler": self.computer_handler,
                "max_retries": self.max_retries,
                "use_prompt_caching": self.use_prompt_caching,
                **merged_kwargs,
            }

            # Run agent loop iteration
            result = await self.agent_loop.predict_step(
                **loop_kwargs,
                _on_api_start=self._on_api_start,
                _on_api_end=self._on_api_end,
                _on_usage=self._on_usage,
                _on_screenshot=self._on_screenshot,
            )
            result = get_json(result)

            # Lifecycle hook: Postprocess messages after the LLM call
            # Use cases:
            # - PII deanonymization (if you want tool calls to see PII)
            result["output"] = await self._on_llm_end(result.get("output", []))
            await self._on_responses(loop_kwargs, result)

            # Yield agent response
            yield result

            # Add agent response to new_items
            new_items += result.get("output")

            # Get output call ids
            output_call_ids = get_output_call_ids(result.get("output", []))

            # Handle computer actions
            for item in result.get("output"):
                partial_items = await self._handle_item(
                    item, self.computer_handler, ignore_call_ids=output_call_ids
                )
                new_items += partial_items

                # Yield partial response
                yield {
                    "output": partial_items,
                    "usage": Usage(
                        prompt_tokens=0,
                        completion_tokens=0,
                        total_tokens=0,
                    ),
                }

        await self._on_run_end(loop_kwargs, old_items, new_items)

    async def predict_click(
        self, instruction: str, image_b64: Optional[str] = None
    ) -> Optional[Tuple[int, int]]:
        """
        Predict click coordinates based on image and instruction.

        Args:
            instruction: Instruction for where to click
            image_b64: Base64 encoded image (optional, will take screenshot if not provided)

        Returns:
            None or tuple with (x, y) coordinates
        """
        if not self.agent_config_info:
            raise ValueError("Agent configuration not found")

        capabilities = self.get_capabilities()
        if "click" not in capabilities:
            raise ValueError(
                f"Agent loop {self.agent_config_info.agent_class.__name__} does not support click predictions"
            )
        if hasattr(self.agent_loop, "predict_click"):
            if not image_b64:
                if not self.computer_handler:
                    raise ValueError("Computer tool or image_b64 is required for predict_click")
                image_b64 = await self.computer_handler.screenshot()
            # Pass along api credentials if available
            click_kwargs: Dict[str, Any] = {}
            if self.api_key is not None:
                click_kwargs["api_key"] = self.api_key
            if self.api_base is not None:
                click_kwargs["api_base"] = self.api_base
            return await self.agent_loop.predict_click(
                model=self.model, image_b64=image_b64, instruction=instruction, **click_kwargs
            )
        return None

    def get_capabilities(self) -> List[AgentCapability]:
        """
        Get list of capabilities supported by the current agent config.

        Returns:
            List of capability strings (e.g., ["step", "click"])
        """
        if not self.agent_config_info:
            raise ValueError("Agent configuration not found")

        if hasattr(self.agent_loop, "get_capabilities"):
            return self.agent_loop.get_capabilities()
        return ["step"]  # Default capability

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/uitars.py:
--------------------------------------------------------------------------------

```python
"""
UITARS agent loop implementation using liteLLM for ByteDance-Seed/UI-TARS-1.5-7B
Paper: https://arxiv.org/abs/2501.12326
Code: https://github.com/bytedance/UI-TARS
"""

import ast
import asyncio
import base64
import json
import math
import re
from ctypes import cast
from io import BytesIO
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union

import litellm
from litellm.responses.litellm_completion_transformation.transformation import (
    LiteLLMCompletionResponsesConfig,
)
from litellm.responses.utils import Usage
from litellm.types.utils import ModelResponse
from openai.types.responses.response_computer_tool_call_param import (
    ActionType,
    ResponseComputerToolCallParam,
)
from openai.types.responses.response_input_param import ComputerCallOutput
from openai.types.responses.response_output_message_param import (
    ResponseOutputMessageParam,
)
from openai.types.responses.response_reasoning_item_param import (
    ResponseReasoningItemParam,
    Summary,
)
from PIL import Image

from ..decorators import register_agent
from ..responses import (
    make_click_item,
    make_double_click_item,
    make_drag_item,
    make_input_image_item,
    make_keypress_item,
    make_output_text_item,
    make_reasoning_item,
    make_scroll_item,
    make_type_item,
    make_wait_item,
)
from ..types import AgentCapability, AgentResponse, Messages, Tools

# Constants from reference code
IMAGE_FACTOR = 28
MIN_PIXELS = 100 * 28 * 28
MAX_PIXELS = 16384 * 28 * 28
MAX_RATIO = 200

FINISH_WORD = "finished"
WAIT_WORD = "wait"
ENV_FAIL_WORD = "error_env"
CALL_USER = "call_user"

# Action space prompt for UITARS
UITARS_ACTION_SPACE = """
click(start_box='<|box_start|>(x1,y1)<|box_end|>')
left_double(start_box='<|box_start|>(x1,y1)<|box_end|>')
right_single(start_box='<|box_start|>(x1,y1)<|box_end|>')
drag(start_box='<|box_start|>(x1,y1)<|box_end|>', end_box='<|box_start|>(x3,y3)<|box_end|>')
hotkey(key='')
type(content='') #If you want to submit your input, use "\\n" at the end of `content`.
scroll(start_box='<|box_start|>(x1,y1)<|box_end|>', direction='down or up or right or left')
wait() #Sleep for 5s and take a screenshot to check for any changes.
finished(content='xxx') # Use escape characters \\', \\", and \\n in content part to ensure we can parse the content in normal python string format.
"""

UITARS_PROMPT_TEMPLATE = """You are a GUI agent. You are given a task and your action history, with screenshots. You need to perform the next action to complete the task. 

## Output Format
```
Thought: ...
Action: ...
```

## Action Space
{action_space}

## Note
- Use {language} in `Thought` part.
- Write a small plan and finally summarize your next action (with its target element) in one sentence in `Thought` part.

## User Instruction
{instruction}
"""

GROUNDING_UITARS_PROMPT_TEMPLATE = """You are a GUI agent. You are given a task and your action history, with screenshots. You need to perform the next action to complete the task. 

## Output Format

Action: ...


## Action Space
click(point='<|box_start|>(x1,y1)<|box_end|>')

## User Instruction
{instruction}"""


def round_by_factor(number: float, factor: int) -> int:
    """Returns the closest integer to 'number' that is divisible by 'factor'."""
    return round(number / factor) * factor


def ceil_by_factor(number: float, factor: int) -> int:
    """Returns the smallest integer greater than or equal to 'number' that is divisible by 'factor'."""
    return math.ceil(number / factor) * factor


def floor_by_factor(number: float, factor: int) -> int:
    """Returns the largest integer less than or equal to 'number' that is divisible by 'factor'."""
    return math.floor(number / factor) * factor


def smart_resize(
    height: int,
    width: int,
    factor: int = IMAGE_FACTOR,
    min_pixels: int = MIN_PIXELS,
    max_pixels: int = MAX_PIXELS,
) -> tuple[int, int]:
    """
    Rescales the image so that the following conditions are met:
    1. Both dimensions (height and width) are divisible by 'factor'.
    2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
    3. The aspect ratio of the image is maintained as closely as possible.
    """
    if max(height, width) / min(height, width) > MAX_RATIO:
        raise ValueError(
            f"absolute aspect ratio must be smaller than {MAX_RATIO}, got {max(height, width) / min(height, width)}"
        )
    h_bar = max(factor, round_by_factor(height, factor))
    w_bar = max(factor, round_by_factor(width, factor))
    if h_bar * w_bar > max_pixels:
        beta = math.sqrt((height * width) / max_pixels)
        h_bar = floor_by_factor(height / beta, factor)
        w_bar = floor_by_factor(width / beta, factor)
    elif h_bar * w_bar < min_pixels:
        beta = math.sqrt(min_pixels / (height * width))
        h_bar = ceil_by_factor(height * beta, factor)
        w_bar = ceil_by_factor(width * beta, factor)
    return h_bar, w_bar


def escape_single_quotes(text):
    """Escape single quotes in text for safe string formatting."""
    pattern = r"(?<!\\)'"
    return re.sub(pattern, r"\\'", text)


def parse_action(action_str):
    """Parse action string into structured format."""
    try:
        node = ast.parse(action_str, mode="eval")
        if not isinstance(node, ast.Expression):
            raise ValueError("Not an expression")

        call = node.body
        if not isinstance(call, ast.Call):
            raise ValueError("Not a function call")

        # Get function name
        if isinstance(call.func, ast.Name):
            func_name = call.func.id
        elif isinstance(call.func, ast.Attribute):
            func_name = call.func.attr
        else:
            func_name = None

        # Get keyword arguments
        kwargs = {}
        for kw in call.keywords:
            key = kw.arg
            if isinstance(kw.value, ast.Constant):
                value = kw.value.value
            elif isinstance(kw.value, ast.Str):  # Compatibility with older Python
                value = kw.value.s
            else:
                value = None
            kwargs[key] = value

        return {"function": func_name, "args": kwargs}

    except Exception as e:
        print(f"Failed to parse action '{action_str}': {e}")
        return None


def parse_uitars_response(text: str, image_width: int, image_height: int) -> List[Dict[str, Any]]:
    """Parse UITARS model response into structured actions."""
    text = text.strip()

    # Extract thought
    thought = None
    if text.startswith("Thought:"):
        thought_match = re.search(r"Thought: (.+?)(?=\s*Action:|$)", text, re.DOTALL)
        if thought_match:
            thought = thought_match.group(1).strip()

    # Extract action
    if "Action:" not in text:
        raise ValueError("No Action found in response")

    action_str = text.split("Action:")[-1].strip()

    # Handle special case for type actions
    if "type(content" in action_str:

        def escape_quotes(match):
            return match.group(1)

        pattern = r"type\(content='(.*?)'\)"
        content = re.sub(pattern, escape_quotes, action_str)
        action_str = escape_single_quotes(content)
        action_str = "type(content='" + action_str + "')"

    # Parse the action
    parsed_action = parse_action(action_str.replace("\n", "\\n").lstrip())
    if parsed_action is None:
        raise ValueError(f"Action can't parse: {action_str}")

    action_type = parsed_action["function"]
    params = parsed_action["args"]

    # Process parameters
    action_inputs = {}
    for param_name, param in params.items():
        if param == "":
            continue
        param = str(param).lstrip()
        action_inputs[param_name.strip()] = param

        # Handle coordinate parameters
        if "start_box" in param_name or "end_box" in param_name:
            # Parse coordinates like '<|box_start|>(x,y)<|box_end|>' or '(x,y)'
            # First, remove special tokens
            clean_param = param.replace("<|box_start|>", "").replace("<|box_end|>", "")
            # Then remove parentheses and split
            numbers = clean_param.replace("(", "").replace(")", "").split(",")

            try:
                float_numbers = [
                    float(num.strip()) / 1000 for num in numbers
                ]  # Normalize to 0-1 range

                if len(float_numbers) == 2:
                    # Single point, duplicate for box format
                    float_numbers = [
                        float_numbers[0],
                        float_numbers[1],
                        float_numbers[0],
                        float_numbers[1],
                    ]

                action_inputs[param_name.strip()] = str(float_numbers)
            except ValueError as e:
                # If parsing fails, keep the original parameter value
                print(f"Warning: Could not parse coordinates '{param}': {e}")
                action_inputs[param_name.strip()] = param

    return [
        {
            "thought": thought,
            "action_type": action_type,
            "action_inputs": action_inputs,
            "text": text,
        }
    ]


def convert_to_computer_actions(
    parsed_responses: List[Dict[str, Any]], image_width: int, image_height: int
) -> List[ResponseComputerToolCallParam | ResponseOutputMessageParam]:
    """Convert parsed UITARS responses to computer actions."""
    computer_actions = []

    for response in parsed_responses:
        action_type = response.get("action_type")
        action_inputs = response.get("action_inputs", {})

        if action_type == "finished":
            finished_text = action_inputs.get("content", "Task completed successfully.")
            computer_actions.append(make_output_text_item(finished_text))
            break

        elif action_type == "wait":
            computer_actions.append(make_wait_item())

        elif action_type == "call_user":
            computer_actions.append(
                make_output_text_item("I need assistance from the user to proceed with this task.")
            )

        elif action_type in ["click", "left_single"]:
            start_box = action_inputs.get("start_box")
            if start_box:
                coords = eval(start_box)
                x = int((coords[0] + coords[2]) / 2 * image_width)
                y = int((coords[1] + coords[3]) / 2 * image_height)

                computer_actions.append(make_click_item(x, y, "left"))

        elif action_type == "double_click":
            start_box = action_inputs.get("start_box")
            if start_box:
                coords = eval(start_box)
                x = int((coords[0] + coords[2]) / 2 * image_width)
                y = int((coords[1] + coords[3]) / 2 * image_height)

                computer_actions.append(make_double_click_item(x, y))

        elif action_type == "right_click":
            start_box = action_inputs.get("start_box")
            if start_box:
                coords = eval(start_box)
                x = int((coords[0] + coords[2]) / 2 * image_width)
                y = int((coords[1] + coords[3]) / 2 * image_height)

                computer_actions.append(make_click_item(x, y, "right"))

        elif action_type == "type":
            content = action_inputs.get("content", "")
            computer_actions.append(make_type_item(content))

        elif action_type == "hotkey":
            key = action_inputs.get("key", "")
            keys = key.split()
            computer_actions.append(make_keypress_item(keys))

        elif action_type == "press":
            key = action_inputs.get("key", "")
            computer_actions.append(make_keypress_item([key]))

        elif action_type == "scroll":
            start_box = action_inputs.get("start_box")
            direction = action_inputs.get("direction", "down")

            if start_box:
                coords = eval(start_box)
                x = int((coords[0] + coords[2]) / 2 * image_width)
                y = int((coords[1] + coords[3]) / 2 * image_height)
            else:
                x, y = image_width // 2, image_height // 2

            scroll_y = 5 if "up" in direction.lower() else -5
            computer_actions.append(make_scroll_item(x, y, 0, scroll_y))

        elif action_type == "drag":
            start_box = action_inputs.get("start_box")
            end_box = action_inputs.get("end_box")

            if start_box and end_box:
                start_coords = eval(start_box)
                end_coords = eval(end_box)

                start_x = int((start_coords[0] + start_coords[2]) / 2 * image_width)
                start_y = int((start_coords[1] + start_coords[3]) / 2 * image_height)
                end_x = int((end_coords[0] + end_coords[2]) / 2 * image_width)
                end_y = int((end_coords[1] + end_coords[3]) / 2 * image_height)

                path = [{"x": start_x, "y": start_y}, {"x": end_x, "y": end_y}]
                computer_actions.append(make_drag_item(path))

    return computer_actions


def pil_to_base64(image: Image.Image) -> str:
    """Convert PIL image to base64 string."""
    buffer = BytesIO()
    image.save(buffer, format="PNG")
    return base64.b64encode(buffer.getvalue()).decode("utf-8")


def process_image_for_uitars(
    image_data: str, max_pixels: int = MAX_PIXELS, min_pixels: int = MIN_PIXELS
) -> tuple[Image.Image, int, int]:
    """Process image for UITARS model input."""
    # Decode base64 image
    if image_data.startswith("data:image"):
        image_data = image_data.split(",")[1]

    image_bytes = base64.b64decode(image_data)
    image = Image.open(BytesIO(image_bytes))

    original_width, original_height = image.size

    # Resize image according to UITARS requirements
    if image.width * image.height > max_pixels:
        resize_factor = math.sqrt(max_pixels / (image.width * image.height))
        width = int(image.width * resize_factor)
        height = int(image.height * resize_factor)
        image = image.resize((width, height))

    if image.width * image.height < min_pixels:
        resize_factor = math.sqrt(min_pixels / (image.width * image.height))
        width = math.ceil(image.width * resize_factor)
        height = math.ceil(image.height * resize_factor)
        image = image.resize((width, height))

    if image.mode != "RGB":
        image = image.convert("RGB")

    return image, original_width, original_height


def sanitize_message(msg: Any) -> Any:
    """Return a copy of the message with image_url ommited within content parts"""
    if isinstance(msg, dict):
        result = {}
        for key, value in msg.items():
            if key == "content" and isinstance(value, list):
                result[key] = [
                    (
                        {k: v for k, v in item.items() if k != "image_url"}
                        if isinstance(item, dict)
                        else item
                    )
                    for item in value
                ]
            else:
                result[key] = value
        return result
    elif isinstance(msg, list):
        return [sanitize_message(item) for item in msg]
    else:
        return msg


def convert_uitars_messages_to_litellm(messages: Messages) -> List[Dict[str, Any]]:
    """
    Convert UITARS internal message format back to LiteLLM format.

    This function processes reasoning, computer_call, and computer_call_output messages
    and converts them to the appropriate LiteLLM assistant message format.

    Args:
        messages: List of UITARS internal messages

    Returns:
        List of LiteLLM formatted messages
    """
    litellm_messages = []
    current_assistant_content = []

    for message in messages:
        if isinstance(message, dict):
            message_type = message.get("type")

            if message_type == "reasoning":
                # Extract reasoning text from summary
                summary = message.get("summary", [])
                if summary and isinstance(summary, list):
                    for summary_item in summary:
                        if (
                            isinstance(summary_item, dict)
                            and summary_item.get("type") == "summary_text"
                        ):
                            reasoning_text = summary_item.get("text", "")
                            if reasoning_text:
                                current_assistant_content.append(f"Thought: {reasoning_text}")

            elif message_type == "computer_call":
                # Convert computer action to UITARS action format
                action = message.get("action", {})
                action_type = action.get("type")

                if action_type == "click":
                    x, y = action.get("x", 0), action.get("y", 0)
                    button = action.get("button", "left")
                    if button == "left":
                        action_text = f"Action: click(start_box='({x},{y})')"
                    elif button == "right":
                        action_text = f"Action: right_single(start_box='({x},{y})')"
                    else:
                        action_text = f"Action: click(start_box='({x},{y})')"

                elif action_type == "double_click":
                    x, y = action.get("x", 0), action.get("y", 0)
                    action_text = f"Action: left_double(start_box='({x},{y})')"

                elif action_type == "drag":
                    start_x, start_y = action.get("start_x", 0), action.get("start_y", 0)
                    end_x, end_y = action.get("end_x", 0), action.get("end_y", 0)
                    action_text = f"Action: drag(start_box='({start_x},{start_y})', end_box='({end_x},{end_y})')"

                elif action_type == "key":
                    key = action.get("key", "")
                    action_text = f"Action: hotkey(key='{key}')"

                elif action_type == "type":
                    text = action.get("text", "")
                    # Escape single quotes in the text
                    escaped_text = escape_single_quotes(text)
                    action_text = f"Action: type(content='{escaped_text}')"

                elif action_type == "scroll":
                    x, y = action.get("x", 0), action.get("y", 0)
                    direction = action.get("direction", "down")
                    action_text = f"Action: scroll(start_box='({x},{y})', direction='{direction}')"

                elif action_type == "wait":
                    action_text = "Action: wait()"

                else:
                    # Fallback for unknown action types
                    action_text = f"Action: {action_type}({action})"

                current_assistant_content.append(action_text)

                # When we hit a computer_call_output, finalize the current assistant message
                if current_assistant_content:
                    litellm_messages.append(
                        {
                            "role": "assistant",
                            "content": [
                                {"type": "text", "text": "\n".join(current_assistant_content)}
                            ],
                        }
                    )
                    current_assistant_content = []

            elif message_type == "computer_call_output":
                # Add screenshot from computer call output
                output = message.get("output", {})
                if isinstance(output, dict) and output.get("type") == "input_image":
                    image_url = output.get("image_url", "")
                    if image_url:
                        litellm_messages.append(
                            {
                                "role": "user",
                                "content": [{"type": "image_url", "image_url": {"url": image_url}}],
                            }
                        )

            elif message.get("role") == "user":
                # # Handle user messages
                # content = message.get("content", "")
                # if isinstance(content, str):
                #     litellm_messages.append({
                #         "role": "user",
                #         "content": content
                #     })
                # elif isinstance(content, list):
                #     litellm_messages.append({
                #         "role": "user",
                #         "content": content
                #     })
                pass

    # Add any remaining assistant content
    if current_assistant_content:
        litellm_messages.append({"role": "assistant", "content": current_assistant_content})

    return litellm_messages


@register_agent(models=r"(?i).*ui-?tars.*", priority=-1)
class UITARSConfig:
    """
    UITARS agent configuration using liteLLM for ByteDance-Seed/UI-TARS-1.5-7B model.

    Supports UITARS vision-language models for computer control.
    """

    async def predict_step(
        self,
        messages: List[Dict[str, Any]],
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        use_prompt_caching: Optional[bool] = False,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs,
    ) -> Dict[str, Any]:
        """
        Predict the next step based on input messages.

        Args:
            messages: Input messages following Responses format
            model: Model name to use
            tools: Optional list of tool schemas
            max_retries: Maximum number of retries
            stream: Whether to stream responses
            computer_handler: Computer handler instance
            _on_api_start: Callback for API start
            _on_api_end: Callback for API end
            _on_usage: Callback for usage tracking
            _on_screenshot: Callback for screenshot events
            **kwargs: Additional arguments

        Returns:
            Dictionary with "output" (output items) and "usage" array
        """
        tools = tools or []

        # Create response items
        response_items = []

        # Find computer tool for screen dimensions
        computer_tool = None
        for tool_schema in tools:
            if tool_schema["type"] == "computer":
                computer_tool = tool_schema["computer"]
                break

        # Get screen dimensions
        screen_width, screen_height = 1024, 768
        if computer_tool:
            try:
                screen_width, screen_height = await computer_tool.get_dimensions()
            except:
                pass

        # Process messages to extract instruction and image
        instruction = ""
        image_data = None

        # Convert messages to list if string
        if isinstance(messages, str):
            messages = [{"role": "user", "content": messages}]

        # Extract instruction and latest screenshot
        for message in reversed(messages):
            if isinstance(message, dict):
                content = message.get("content", "")

                # Handle different content formats
                if isinstance(content, str):
                    if not instruction and message.get("role") == "user":
                        instruction = content
                elif isinstance(content, list):
                    for item in content:
                        if isinstance(item, dict):
                            if item.get("type") == "text" and not instruction:
                                instruction = item.get("text", "")
                            elif item.get("type") == "image_url" and not image_data:
                                image_url = item.get("image_url", {})
                                if isinstance(image_url, dict):
                                    image_data = image_url.get("url", "")
                                else:
                                    image_data = image_url

            # Also check for computer_call_output with screenshots
            if message.get("type") == "computer_call_output" and not image_data:
                output = message.get("output", {})
                if isinstance(output, dict) and output.get("type") == "input_image":
                    image_data = output.get("image_url", "")

            if instruction and image_data:
                break

        if not instruction:
            instruction = (
                "Help me complete this task by analyzing the screen and taking appropriate actions."
            )

        # Create prompt
        user_prompt = UITARS_PROMPT_TEMPLATE.format(
            instruction=instruction, action_space=UITARS_ACTION_SPACE, language="English"
        )

        # Convert conversation history to LiteLLM format
        history_messages = convert_uitars_messages_to_litellm(messages)

        # Prepare messages for liteLLM
        litellm_messages = [{"role": "system", "content": "You are a helpful assistant."}]

        # Add current user instruction with screenshot
        current_user_message = {
            "role": "user",
            "content": [
                {"type": "text", "text": user_prompt},
            ],
        }
        litellm_messages.append(current_user_message)

        # Process image for UITARS
        if not image_data:
            # Take screenshot if none found in messages
            if computer_handler:
                image_data = await computer_handler.screenshot()
                await _on_screenshot(image_data, "screenshot_before")

                # Add screenshot to output items so it can be retained in history
                response_items.append(make_input_image_item(image_data))
            else:
                raise ValueError("No screenshot found in messages and no computer_handler provided")
        processed_image, original_width, original_height = process_image_for_uitars(image_data)
        encoded_image = pil_to_base64(processed_image)

        # Add conversation history
        if history_messages:
            litellm_messages.extend(history_messages)
        else:
            litellm_messages.append(
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image_url",
                            "image_url": {"url": f"data:image/png;base64,{encoded_image}"},
                        }
                    ],
                }
            )

        # Prepare API call kwargs
        api_kwargs = {
            "model": model,
            "messages": litellm_messages,
            "max_tokens": kwargs.get("max_tokens", 500),
            "temperature": kwargs.get("temperature", 0.0),
            "do_sample": kwargs.get("temperature", 0.0) > 0.0,
            "num_retries": max_retries,
            **{k: v for k, v in kwargs.items() if k not in ["max_tokens", "temperature"]},
        }

        # Call API start hook
        if _on_api_start:
            await _on_api_start(api_kwargs)

        # Call liteLLM with UITARS model
        response = await litellm.acompletion(**api_kwargs)

        # Call API end hook
        if _on_api_end:
            await _on_api_end(api_kwargs, response)

        # Extract response content
        response_content = response.choices[0].message.content.strip()  # type: ignore

        # Parse UITARS response
        parsed_responses = parse_uitars_response(response_content, original_width, original_height)

        # Convert to computer actions
        computer_actions = convert_to_computer_actions(
            parsed_responses, original_width, original_height
        )

        # Add computer actions to response items
        thought = parsed_responses[0].get("thought", "")
        if thought:
            response_items.append(make_reasoning_item(thought))
        response_items.extend(computer_actions)

        # Extract usage information
        response_usage = {
            **LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage(
                response.usage
            ).model_dump(),
            "response_cost": response._hidden_params.get("response_cost", 0.0),
        }
        if _on_usage:
            await _on_usage(response_usage)

        # Create agent response
        agent_response = {"output": response_items, "usage": response_usage}

        return agent_response

    async def predict_click(
        self, model: str, image_b64: str, instruction: str, **kwargs
    ) -> Optional[Tuple[int, int]]:
        """
        Predict click coordinates based on image and instruction.

        UITARS supports click prediction through its action parsing.

        Args:
            model: Model name to use
            image_b64: Base64 encoded image
            instruction: Instruction for where to click

        Returns:
            Tuple with (x, y) coordinates or None
        """
        try:
            # Create prompt using grounding template
            user_prompt = GROUNDING_UITARS_PROMPT_TEMPLATE.format(instruction=instruction)

            # Process image for UITARS
            processed_image, original_width, original_height = process_image_for_uitars(image_b64)
            encoded_image = pil_to_base64(processed_image)

            # Prepare messages for liteLLM
            litellm_messages = [
                {"role": "system", "content": "You are a helpful assistant."},
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": user_prompt},
                        {
                            "type": "image_url",
                            "image_url": {"url": f"data:image/png;base64,{encoded_image}"},
                        },
                    ],
                },
            ]

            # Prepare API call kwargs
            api_kwargs = {
                "model": model,
                "messages": litellm_messages,
                "max_tokens": 2056,
                "temperature": 0.0,
                "do_sample": False,
            }
            api_kwargs.update({k: v for k, v in (kwargs or {}).items()})

            # Call liteLLM with UITARS model
            response = await litellm.acompletion(**api_kwargs)

            # Extract response content
            response_content = response.choices[0].message.content.strip()  # type: ignore

            print(response_content)

            # Parse the response to extract click coordinates
            # Look for click action with coordinates (with special tokens)
            click_pattern = r"click\(point='<\|box_start\|>\((\d+),(\d+)\)<\|box_end\|>'\)"
            match = re.search(click_pattern, response_content)

            # Fallback: Look for simpler format without special tokens
            if not match:
                # Pattern for: click(start_box='(x,y)') or click(point='(x,y)')
                fallback_pattern = r"click\((?:start_box|point)='\((\d+),(\d+)\)'\)"
                match = re.search(fallback_pattern, response_content)

            if match:
                x, y = int(match.group(1)), int(match.group(2))
                # Scale coordinates back to original image dimensions
                scale_x = original_width / processed_image.width
                scale_y = original_height / processed_image.height

                scaled_x = int(x * scale_x)
                scaled_y = int(y * scale_y)

                return (scaled_x, scaled_y)

            return None

        except Exception as e:
            # Log error and return None
            print(f"Error in predict_click: {e}")
            return None

    def get_capabilities(self) -> List[AgentCapability]:
        """
        Get list of capabilities supported by this agent config.

        Returns:
            List of capability strings
        """
        return ["step", "click"]

```

--------------------------------------------------------------------------------
/libs/lume/src/VM/VM.swift:
--------------------------------------------------------------------------------

```swift
import Foundation

// MARK: - Support Types

/// Base context for virtual machine directory and configuration
struct VMDirContext {
    let dir: VMDirectory
    var config: VMConfig
    let home: Home
    let storage: String?

    func saveConfig() throws {
        try dir.saveConfig(config)
    }

    var name: String { dir.name }
    var initialized: Bool { dir.initialized() }
    var diskPath: Path { dir.diskPath }
    var nvramPath: Path { dir.nvramPath }

    func setDisk(_ size: UInt64) throws {
        try dir.setDisk(size)
    }

    func finalize(to name: String) throws {
        let vmDir = try home.getVMDirectory(name)
        try FileManager.default.moveItem(at: dir.dir.url, to: vmDir.dir.url)
    }
}

// MARK: - Base VM Class

/// Base class for virtual machine implementations
@MainActor
class VM {
    // MARK: - Properties

    var vmDirContext: VMDirContext

    @MainActor
    private var virtualizationService: VMVirtualizationService?
    private let vncService: VNCService
    internal let virtualizationServiceFactory:
        (VMVirtualizationServiceContext) throws -> VMVirtualizationService
    private let vncServiceFactory: (VMDirectory) -> VNCService

    // MARK: - Initialization

    init(
        vmDirContext: VMDirContext,
        virtualizationServiceFactory: @escaping (VMVirtualizationServiceContext) throws ->
            VMVirtualizationService = { try DarwinVirtualizationService(configuration: $0) },
        vncServiceFactory: @escaping (VMDirectory) -> VNCService = {
            DefaultVNCService(vmDirectory: $0)
        }
    ) {
        self.vmDirContext = vmDirContext
        self.virtualizationServiceFactory = virtualizationServiceFactory
        self.vncServiceFactory = vncServiceFactory

        // Initialize VNC service
        self.vncService = vncServiceFactory(vmDirContext.dir)
    }

    // MARK: - VM State Management

    private var isRunning: Bool {
        // First check if we have a MAC address
        guard let macAddress = vmDirContext.config.macAddress else {
            Logger.info(
                "Cannot check if VM is running: macAddress is nil",
                metadata: ["name": vmDirContext.name])
            return false
        }

        // Then check if we have an IP address
        guard let ipAddress = DHCPLeaseParser.getIPAddress(forMAC: macAddress) else {
            return false
        }

        // Then check if it's reachable
        return NetworkUtils.isReachable(ipAddress: ipAddress)
    }

    var details: VMDetails {
        let isRunning: Bool = self.isRunning
        let vncUrl = isRunning ? getVNCUrl() : nil

        // Safely get disk size with fallback
        let diskSizeValue: DiskSize
        do {
            diskSizeValue = try getDiskSize()
        } catch {
            Logger.error(
                "Failed to get disk size",
                metadata: ["name": vmDirContext.name, "error": "\(error)"])
            // Provide a fallback value to avoid crashing
            diskSizeValue = DiskSize(allocated: 0, total: vmDirContext.config.diskSize ?? 0)
        }

        // Safely access MAC address
        let macAddress = vmDirContext.config.macAddress
        let ipAddress: String? =
            isRunning && macAddress != nil ? DHCPLeaseParser.getIPAddress(forMAC: macAddress!) : nil

        return VMDetails(
            name: vmDirContext.name,
            os: getOSType(),
            cpuCount: vmDirContext.config.cpuCount ?? 0,
            memorySize: vmDirContext.config.memorySize ?? 0,
            diskSize: diskSizeValue,
            display: vmDirContext.config.display.string,
            status: isRunning ? "running" : "stopped",
            vncUrl: vncUrl,
            ipAddress: ipAddress,
            locationName: vmDirContext.storage ?? "default"
        )
    }

    // MARK: - VM Lifecycle Management

    func run(
        noDisplay: Bool, sharedDirectories: [SharedDirectory], mount: Path?, vncPort: Int = 0,
        recoveryMode: Bool = false, usbMassStoragePaths: [Path]? = nil
    ) async throws {
        Logger.info(
            "VM.run method called",
            metadata: [
                "name": vmDirContext.name,
                "noDisplay": "\(noDisplay)",
                "recoveryMode": "\(recoveryMode)",
            ])

        guard vmDirContext.initialized else {
            Logger.error("VM not initialized", metadata: ["name": vmDirContext.name])
            throw VMError.notInitialized(vmDirContext.name)
        }

        guard let cpuCount = vmDirContext.config.cpuCount,
            let memorySize = vmDirContext.config.memorySize
        else {
            Logger.error("VM missing cpuCount or memorySize", metadata: ["name": vmDirContext.name])
            throw VMError.notInitialized(vmDirContext.name)
        }

        // Try to acquire lock on config file
        Logger.info(
            "Attempting to acquire lock on config file",
            metadata: [
                "path": vmDirContext.dir.configPath.path,
                "name": vmDirContext.name,
            ])
        var fileHandle = try FileHandle(forWritingTo: vmDirContext.dir.configPath.url)

        if flock(fileHandle.fileDescriptor, LOCK_EX | LOCK_NB) != 0 {
            try? fileHandle.close()
            Logger.error(
                "VM already running (failed to acquire lock)", metadata: ["name": vmDirContext.name]
            )

            // Try to forcibly clear the lock before giving up
            Logger.info("Attempting emergency lock cleanup", metadata: ["name": vmDirContext.name])
            unlockConfigFile()

            // Try one more time to acquire the lock
            if let retryHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url),
                flock(retryHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0
            {
                Logger.info("Emergency lock cleanup worked", metadata: ["name": vmDirContext.name])
                // Continue with a fresh file handle
                try? retryHandle.close()
                // Get a completely new file handle to be safe
                guard let newHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url)
                else {
                    throw VMError.internalError("Failed to open file handle after lock cleanup")
                }
                // Update our main file handle
                fileHandle = newHandle
            } else {
                // If we still can't get the lock, give up
                Logger.error(
                    "Could not acquire lock even after emergency cleanup",
                    metadata: ["name": vmDirContext.name])
                throw VMError.alreadyRunning(vmDirContext.name)
            }
        }
        Logger.info("Successfully acquired lock", metadata: ["name": vmDirContext.name])

        Logger.info(
            "Running VM with configuration",
            metadata: [
                "name": vmDirContext.name,
                "cpuCount": "\(cpuCount)",
                "memorySize": "\(memorySize)",
                "diskSize": "\(vmDirContext.config.diskSize ?? 0)",
                "sharedDirectories": sharedDirectories.map { $0.string }.joined(separator: ", "),
                "recoveryMode": "\(recoveryMode)",
            ])

        // Create and configure the VM
        do {
            Logger.info(
                "Creating virtualization service context", metadata: ["name": vmDirContext.name])
            let config = try createVMVirtualizationServiceContext(
                cpuCount: cpuCount,
                memorySize: memorySize,
                display: vmDirContext.config.display.string,
                sharedDirectories: sharedDirectories,
                mount: mount,
                recoveryMode: recoveryMode,
                usbMassStoragePaths: usbMassStoragePaths
            )
            Logger.info(
                "Successfully created virtualization service context",
                metadata: ["name": vmDirContext.name])

            Logger.info(
                "Initializing virtualization service", metadata: ["name": vmDirContext.name])
            virtualizationService = try virtualizationServiceFactory(config)
            Logger.info(
                "Successfully initialized virtualization service",
                metadata: ["name": vmDirContext.name])

            Logger.info(
                "Setting up VNC",
                metadata: [
                    "name": vmDirContext.name,
                    "noDisplay": "\(noDisplay)",
                    "port": "\(vncPort)",
                ])
            let vncInfo = try await setupSession(
                noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories)
            Logger.info(
                "VNC setup successful", metadata: ["name": vmDirContext.name, "vncInfo": vncInfo])

            // Start the VM
            guard let service = virtualizationService else {
                Logger.error("Virtualization service is nil", metadata: ["name": vmDirContext.name])
                throw VMError.internalError("Virtualization service not initialized")
            }
            Logger.info(
                "Starting VM via virtualization service", metadata: ["name": vmDirContext.name])
            try await service.start()
            Logger.info("VM started successfully", metadata: ["name": vmDirContext.name])

            while true {
                try await Task.sleep(nanoseconds: UInt64(1e9))
            }
        } catch {
            Logger.error(
                "Failed in VM.run",
                metadata: [
                    "name": vmDirContext.name,
                    "error": error.localizedDescription,
                    "errorType": "\(type(of: error))",
                ])
            virtualizationService = nil
            vncService.stop()

            // Release lock
            Logger.info("Releasing file lock after error", metadata: ["name": vmDirContext.name])
            flock(fileHandle.fileDescriptor, LOCK_UN)
            try? fileHandle.close()

            // Additionally, perform our aggressive unlock to ensure no locks remain
            Logger.info(
                "Performing additional lock cleanup after error",
                metadata: ["name": vmDirContext.name])
            unlockConfigFile()

            throw error
        }
    }

    @MainActor
    func stop() async throws {
        guard vmDirContext.initialized else {
            throw VMError.notInitialized(vmDirContext.name)
        }

        Logger.info("Attempting to stop VM", metadata: ["name": vmDirContext.name])

        // If we have a virtualization service, try to stop it cleanly first
        if let service = virtualizationService {
            do {
                Logger.info(
                    "Stopping VM via virtualization service", metadata: ["name": vmDirContext.name])
                try await service.stop()
                virtualizationService = nil
                vncService.stop()
                Logger.info(
                    "VM stopped successfully via virtualization service",
                    metadata: ["name": vmDirContext.name])

                // Try to ensure any existing locks are released
                Logger.info(
                    "Attempting to clear any locks on config file",
                    metadata: ["name": vmDirContext.name])
                unlockConfigFile()

                return
            } catch let error {
                Logger.error(
                    "Failed to stop VM via virtualization service",
                    metadata: [
                        "name": vmDirContext.name,
                        "error": error.localizedDescription,
                    ])
                // Fall through to process termination
            }
        }

        // Try to open config file to get file descriptor
        Logger.info(
            "Attempting to access config file lock",
            metadata: [
                "path": vmDirContext.dir.configPath.path,
                "name": vmDirContext.name,
            ])
        let fileHandle = try? FileHandle(forReadingFrom: vmDirContext.dir.configPath.url)
        guard let fileHandle = fileHandle else {
            Logger.info(
                "Failed to open config file - VM may not be running",
                metadata: ["name": vmDirContext.name])

            // Even though we couldn't open the file, try to force unlock anyway
            unlockConfigFile()

            throw VMError.notRunning(vmDirContext.name)
        }

        // Get the PID of the process holding the lock using lsof command
        Logger.info(
            "Finding process holding lock on config file", metadata: ["name": vmDirContext.name])
        let task = Process()
        task.executableURL = URL(fileURLWithPath: "/usr/sbin/lsof")
        task.arguments = ["-F", "p", vmDirContext.dir.configPath.path]

        let outputPipe = Pipe()
        task.standardOutput = outputPipe

        try task.run()
        task.waitUntilExit()

        let outputData = try outputPipe.fileHandleForReading.readToEnd() ?? Data()
        guard let outputString = String(data: outputData, encoding: .utf8),
            let pidString = outputString.split(separator: "\n").first?.dropFirst(),  // Drop the 'p' prefix
            let pid = pid_t(pidString)
        else {
            try? fileHandle.close()
            Logger.info(
                "Failed to find process holding lock - VM may not be running",
                metadata: ["name": vmDirContext.name])

            // Even though we couldn't find the process, try to force unlock
            unlockConfigFile()

            throw VMError.notRunning(vmDirContext.name)
        }

        Logger.info(
            "Found process \(pid) holding lock on config file",
            metadata: ["name": vmDirContext.name])

        // First try graceful shutdown with SIGINT
        if kill(pid, SIGINT) == 0 {
            Logger.info("Sent SIGINT to VM process \(pid)", metadata: ["name": vmDirContext.name])
        }

        // Wait for process to stop with timeout
        var attempts = 0
        while attempts < 10 {
            Logger.info(
                "Waiting for process \(pid) to terminate (attempt \(attempts + 1)/10)",
                metadata: ["name": vmDirContext.name])
            try await Task.sleep(nanoseconds: 1_000_000_000)

            // Check if process still exists
            if kill(pid, 0) != 0 {
                // Process is gone, do final cleanup
                Logger.info("Process \(pid) has terminated", metadata: ["name": vmDirContext.name])
                virtualizationService = nil
                vncService.stop()
                try? fileHandle.close()

                // Force unlock the config file
                unlockConfigFile()

                Logger.info(
                    "VM stopped successfully via process termination",
                    metadata: ["name": vmDirContext.name])
                return
            }
            attempts += 1
        }

        // If graceful shutdown failed, force kill the process
        Logger.info(
            "Graceful shutdown failed, forcing termination of process \(pid)",
            metadata: ["name": vmDirContext.name])
        if kill(pid, SIGKILL) == 0 {
            Logger.info("Sent SIGKILL to process \(pid)", metadata: ["name": vmDirContext.name])

            // Wait a moment for the process to be fully killed
            try await Task.sleep(nanoseconds: 2_000_000_000)

            // Do final cleanup
            virtualizationService = nil
            vncService.stop()
            try? fileHandle.close()

            // Force unlock the config file
            unlockConfigFile()

            Logger.info("VM forcefully stopped", metadata: ["name": vmDirContext.name])
            return
        }

        // If we get here, something went very wrong
        try? fileHandle.close()
        Logger.error(
            "Failed to stop VM - could not terminate process \(pid)",
            metadata: ["name": vmDirContext.name])

        // As a last resort, try to force unlock
        unlockConfigFile()

        throw VMError.internalError("Failed to stop VM process")
    }

    // Helper method to forcibly clear any locks on the config file
    private func unlockConfigFile() {
        Logger.info(
            "Forcibly clearing locks on config file",
            metadata: [
                "path": vmDirContext.dir.configPath.path,
                "name": vmDirContext.name,
            ])

        // First attempt: standard unlock methods
        if let fileHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) {
            // Use F_GETLK and F_SETLK to check and clear locks
            var lockInfo = flock()
            lockInfo.l_type = Int16(F_UNLCK)
            lockInfo.l_whence = Int16(SEEK_SET)
            lockInfo.l_start = 0
            lockInfo.l_len = 0

            // Try to unlock the file using fcntl
            _ = fcntl(fileHandle.fileDescriptor, F_SETLK, &lockInfo)

            // Also try the regular flock method
            flock(fileHandle.fileDescriptor, LOCK_UN)

            try? fileHandle.close()
            Logger.info("Standard unlock attempts performed", metadata: ["name": vmDirContext.name])
        }

        // Second attempt: try to acquire and immediately release a fresh lock
        if let tempHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) {
            if flock(tempHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0 {
                Logger.info(
                    "Successfully acquired and released lock to reset state",
                    metadata: ["name": vmDirContext.name])
                flock(tempHandle.fileDescriptor, LOCK_UN)
            } else {
                Logger.info(
                    "Could not acquire lock for resetting - may still be locked",
                    metadata: ["name": vmDirContext.name])
            }
            try? tempHandle.close()
        }

        // Third attempt (most aggressive): copy the config file, remove the original, and restore
        Logger.info(
            "Trying aggressive method: backup and restore config file",
            metadata: ["name": vmDirContext.name])
        // Only proceed if the config file exists
        let fileManager = FileManager.default
        let configPath = vmDirContext.dir.configPath.path
        let backupPath = configPath + ".backup"

        if fileManager.fileExists(atPath: configPath) {
            // Create a backup of the config file
            if let configData = try? Data(contentsOf: URL(fileURLWithPath: configPath)) {
                // Make backup
                try? configData.write(to: URL(fileURLWithPath: backupPath))

                // Remove the original file to clear all locks
                try? fileManager.removeItem(atPath: configPath)
                Logger.info(
                    "Removed original config file to clear locks",
                    metadata: ["name": vmDirContext.name])

                // Wait a moment for OS to fully release resources
                Thread.sleep(forTimeInterval: 0.1)

                // Restore from backup
                try? configData.write(to: URL(fileURLWithPath: configPath))
                Logger.info(
                    "Restored config file from backup", metadata: ["name": vmDirContext.name])
            } else {
                Logger.error(
                    "Could not read config file content for backup",
                    metadata: ["name": vmDirContext.name])
            }
        } else {
            Logger.info(
                "Config file does not exist, cannot perform aggressive unlock",
                metadata: ["name": vmDirContext.name])
        }

        // Final check
        if let finalHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) {
            let lockResult = flock(finalHandle.fileDescriptor, LOCK_EX | LOCK_NB)
            if lockResult == 0 {
                Logger.info(
                    "Lock successfully cleared - verified by acquiring test lock",
                    metadata: ["name": vmDirContext.name])
                flock(finalHandle.fileDescriptor, LOCK_UN)
            } else {
                Logger.info(
                    "Lock still present after all clearing attempts",
                    metadata: ["name": vmDirContext.name, "severity": "warning"])
            }
            try? finalHandle.close()
        }
    }

    // MARK: - Resource Management

    func updateVMConfig(vmConfig: VMConfig) throws {
        vmDirContext.config = vmConfig
        try vmDirContext.saveConfig()
    }

    private func getDiskSize() throws -> DiskSize {
        let resourceValues = try vmDirContext.diskPath.url.resourceValues(forKeys: [
            .totalFileAllocatedSizeKey,
            .totalFileSizeKey,
        ])

        guard let allocated = resourceValues.totalFileAllocatedSize,
            let total = resourceValues.totalFileSize
        else {
            throw VMConfigError.invalidDiskSize
        }

        return DiskSize(allocated: UInt64(allocated), total: UInt64(total))
    }

    func resizeDisk(_ newSize: UInt64) throws {
        let currentSize = try getDiskSize()

        guard newSize >= currentSize.total else {
            throw VMError.resizeTooSmall(current: currentSize.total, requested: newSize)
        }

        try setDiskSize(newSize)
    }

    func setCpuCount(_ newCpuCount: Int) throws {
        guard !isRunning else {
            throw VMError.alreadyRunning(vmDirContext.name)
        }
        vmDirContext.config.setCpuCount(newCpuCount)
        try vmDirContext.saveConfig()
    }

    func setMemorySize(_ newMemorySize: UInt64) throws {
        guard !isRunning else {
            throw VMError.alreadyRunning(vmDirContext.name)
        }
        vmDirContext.config.setMemorySize(newMemorySize)
        try vmDirContext.saveConfig()
    }

    func setDiskSize(_ newDiskSize: UInt64) throws {
        try vmDirContext.setDisk(newDiskSize)
        vmDirContext.config.setDiskSize(newDiskSize)
        try vmDirContext.saveConfig()
    }

    func setDisplay(_ newDisplay: String) throws {
        guard !isRunning else {
            throw VMError.alreadyRunning(vmDirContext.name)
        }
        guard let display: VMDisplayResolution = VMDisplayResolution(string: newDisplay) else {
            throw VMError.invalidDisplayResolution(newDisplay)
        }
        vmDirContext.config.setDisplay(display)
        try vmDirContext.saveConfig()
    }

    func setHardwareModel(_ newHardwareModel: Data) throws {
        guard !isRunning else {
            throw VMError.alreadyRunning(vmDirContext.name)
        }
        vmDirContext.config.setHardwareModel(newHardwareModel)
        try vmDirContext.saveConfig()
    }

    func setMachineIdentifier(_ newMachineIdentifier: Data) throws {
        guard !isRunning else {
            throw VMError.alreadyRunning(vmDirContext.name)
        }
        vmDirContext.config.setMachineIdentifier(newMachineIdentifier)
        try vmDirContext.saveConfig()
    }

    func setMacAddress(_ newMacAddress: String) throws {
        guard !isRunning else {
            throw VMError.alreadyRunning(vmDirContext.name)
        }
        vmDirContext.config.setMacAddress(newMacAddress)
        try vmDirContext.saveConfig()
    }

    // MARK: - VNC Management

    func getVNCUrl() -> String? {
        return vncService.url
    }

    /// Sets up the VNC service and returns the VNC URL
    private func startVNCService(port: Int = 0) async throws -> String {
        guard let service = virtualizationService else {
            throw VMError.internalError("Virtualization service not initialized")
        }

        try await vncService.start(port: port, virtualMachine: service.getVirtualMachine())

        guard let url = vncService.url else {
            throw VMError.vncNotConfigured
        }

        return url
    }

    /// Saves the session information including shared directories to disk
    private func saveSessionData(url: String, sharedDirectories: [SharedDirectory]) {
        do {
            let session = VNCSession(
                url: url, sharedDirectories: sharedDirectories.isEmpty ? nil : sharedDirectories)
            try vmDirContext.dir.saveSession(session)
            Logger.info(
                "Saved VNC session with shared directories",
                metadata: [
                    "count": "\(sharedDirectories.count)",
                    "dirs": "\(sharedDirectories.map { $0.hostPath }.joined(separator: ", "))",
                    "sessionsPath": "\(vmDirContext.dir.sessionsPath.path)",
                ])
        } catch {
            Logger.error("Failed to save VNC session", metadata: ["error": "\(error)"])
        }
    }

    /// Main session setup method that handles VNC and persists session data
    private func setupSession(
        noDisplay: Bool, port: Int = 0, sharedDirectories: [SharedDirectory] = []
    ) async throws -> String {
        // Start the VNC service and get the URL
        let url = try await startVNCService(port: port)

        // Save the session data
        saveSessionData(url: url, sharedDirectories: sharedDirectories)

        // Open the VNC client if needed
        if !noDisplay {
            Logger.info("Starting VNC session", metadata: ["name": vmDirContext.name])
            try await vncService.openClient(url: url)
        }

        return url
    }

    // MARK: - Platform-specific Methods

    func getOSType() -> String {
        fatalError("Must be implemented by subclass")
    }

    func createVMVirtualizationServiceContext(
        cpuCount: Int,
        memorySize: UInt64,
        display: String,
        sharedDirectories: [SharedDirectory] = [],
        mount: Path? = nil,
        recoveryMode: Bool = false,
        usbMassStoragePaths: [Path]? = nil
    ) throws -> VMVirtualizationServiceContext {
        // This is a diagnostic log to track actual file paths on disk for debugging
        try validateDiskState()

        return VMVirtualizationServiceContext(
            cpuCount: cpuCount,
            memorySize: memorySize,
            display: display,
            sharedDirectories: sharedDirectories,
            mount: mount,
            hardwareModel: vmDirContext.config.hardwareModel,
            machineIdentifier: vmDirContext.config.machineIdentifier,
            macAddress: vmDirContext.config.macAddress!,
            diskPath: vmDirContext.diskPath,
            nvramPath: vmDirContext.nvramPath,
            recoveryMode: recoveryMode,
            usbMassStoragePaths: usbMassStoragePaths
        )
    }

    /// Validates the disk state to help diagnose storage attachment issues
    private func validateDiskState() throws {
        // Check disk image state
        let diskPath = vmDirContext.diskPath.path
        let diskExists = FileManager.default.fileExists(atPath: diskPath)
        var diskSize: UInt64 = 0
        var diskPermissions = ""

        if diskExists {
            if let attrs = try? FileManager.default.attributesOfItem(atPath: diskPath) {
                diskSize = attrs[.size] as? UInt64 ?? 0
                let posixPerms = attrs[.posixPermissions] as? Int ?? 0
                diskPermissions = String(format: "%o", posixPerms)
            }
        }

        // Check disk container directory permissions
        let diskDir = (diskPath as NSString).deletingLastPathComponent
        let dirPerms =
            try? FileManager.default.attributesOfItem(atPath: diskDir)[.posixPermissions] as? Int
            ?? 0
        let dirPermsString = dirPerms != nil ? String(format: "%o", dirPerms!) : "unknown"

        // Log detailed diagnostics
        Logger.info(
            "Validating VM disk state",
            metadata: [
                "diskPath": diskPath,
                "diskExists": "\(diskExists)",
                "diskSize":
                    "\(ByteCountFormatter.string(fromByteCount: Int64(diskSize), countStyle: .file))",
                "diskPermissions": diskPermissions,
                "dirPermissions": dirPermsString,
                "locationName": vmDirContext.storage ?? "default",
            ])

        if !diskExists {
            Logger.error("VM disk image does not exist", metadata: ["diskPath": diskPath])
        } else if diskSize == 0 {
            Logger.error("VM disk image exists but has zero size", metadata: ["diskPath": diskPath])
        }
    }

    func setup(
        ipswPath: String,
        cpuCount: Int,
        memorySize: UInt64,
        diskSize: UInt64,
        display: String
    ) async throws {
        fatalError("Must be implemented by subclass")
    }

    // MARK: - Finalization

    /// Post-installation step to move the VM directory to the home directory
    func finalize(to name: String, home: Home, storage: String? = nil) throws {
        let vmDir = try home.getVMDirectory(name, storage: storage)
        try FileManager.default.moveItem(at: vmDirContext.dir.dir.url, to: vmDir.dir.url)
    }

    // Method to run VM with additional USB mass storage devices
    func runWithUSBStorage(
        noDisplay: Bool, sharedDirectories: [SharedDirectory], mount: Path?, vncPort: Int = 0,
        recoveryMode: Bool = false, usbImagePaths: [Path]
    ) async throws {
        guard vmDirContext.initialized else {
            throw VMError.notInitialized(vmDirContext.name)
        }

        guard let cpuCount = vmDirContext.config.cpuCount,
            let memorySize = vmDirContext.config.memorySize
        else {
            throw VMError.notInitialized(vmDirContext.name)
        }

        // Try to acquire lock on config file
        let fileHandle = try FileHandle(forWritingTo: vmDirContext.dir.configPath.url)
        guard flock(fileHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0 else {
            try? fileHandle.close()
            throw VMError.alreadyRunning(vmDirContext.name)
        }

        Logger.info(
            "Running VM with USB storage devices",
            metadata: [
                "cpuCount": "\(cpuCount)",
                "memorySize": "\(memorySize)",
                "diskSize": "\(vmDirContext.config.diskSize ?? 0)",
                "usbImageCount": "\(usbImagePaths.count)",
                "recoveryMode": "\(recoveryMode)",
            ])

        // Create and configure the VM
        do {
            let config = try createVMVirtualizationServiceContext(
                cpuCount: cpuCount,
                memorySize: memorySize,
                display: vmDirContext.config.display.string,
                sharedDirectories: sharedDirectories,
                mount: mount,
                recoveryMode: recoveryMode,
                usbMassStoragePaths: usbImagePaths
            )
            virtualizationService = try virtualizationServiceFactory(config)

            let vncInfo = try await setupSession(
                noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories)
            Logger.info("VNC info", metadata: ["vncInfo": vncInfo])

            // Start the VM
            guard let service = virtualizationService else {
                throw VMError.internalError("Virtualization service not initialized")
            }
            try await service.start()

            while true {
                try await Task.sleep(nanoseconds: UInt64(1e9))
            }
        } catch {
            Logger.error(
                "Failed to create/start VM with USB storage",
                metadata: [
                    "error": "\(error)",
                    "errorType": "\(type(of: error))",
                ])
            virtualizationService = nil
            vncService.stop()
            // Release lock
            flock(fileHandle.fileDescriptor, LOCK_UN)
            try? fileHandle.close()
            throw error
        }
    }
}

```

--------------------------------------------------------------------------------
/blog/neurips-2025-cua-papers.md:
--------------------------------------------------------------------------------

```markdown
# NeurIPS 2025: 45 Computer-Use Agent Papers You Should Know About

<img alt="neurips" src="https://github.com/user-attachments/assets/bd649067-bb2c-45f4-827b-087021ec3ad7" />

If you're following the computer-use agent space, you already know that NeurIPS is where the most important work gets presented. But with thousands of papers across every area of machine learning, finding the ones relevant to CUAs means hours of filtering through proceedings, skimming abstracts, and hoping you don't miss something important.

We did that work for you. We're excited to announce that **Cua will be at NeurIPS 2025**, and we've compiled a curated list of **45 papers** focused specifically on Computer-Use Agents—covering benchmarks, safety, grounding, visual reasoning, and agent architectures.

## Why This Matters

Computer-use agents are evolving rapidly. This year's NeurIPS showcases several important developments:

**The benchmark landscape is maturing.** We're seeing comprehensive evaluations across macOS (macOSWorld), professional tools (VideoCAD), and real-world websites (REAL, TheAgentCompany). These aren't toy problems anymore—they're measuring what agents can actually do in production environments.

**Safety is becoming a first-class concern.** Multiple papers (OS-Harm, RiOSWorld, WASP, AgentDAM) are systematically documenting how agents fail when confronted with adversarial inputs, privacy requirements, or misuse scenarios. The findings are sobering: even frontier models often comply with harmful requests.

**Grounding remains the bottleneck.** Papers like GUI-Actor, GUI-G1, and SE-GUI are pushing the state of the art on mapping language to UI actions. The best approaches are achieving significant gains with surprisingly small models and datasets.

**Open-source is catching up.** OpenCUA's 72B model hits 45% on OSWorld-Verified, establishing that community-driven development can compete with proprietary systems.

## Highlights Worth Your Attention

A few papers stand out for their immediate relevance to anyone building or deploying computer-use agents:

- **macOSWorld** reveals a dramatic capability gap: proprietary agents achieve 30%+ success on macOS tasks while open-source models struggle below 5%.
- **TheAgentCompany** simulates a software company where agents browse, code, and communicate. The best agent completes 30% of tasks autonomously.
- **WASP** demonstrates that simple prompt injections deceive top-tier models in 86% of cases.
- **GUI-G1** shows that a 3B model can achieve 90.3% on ScreenSpot by fixing issues with chain-of-thought reasoning.

## Summary Statistics

| Category                       | Count |
| ------------------------------ | ----- |
| Benchmarks & Datasets          | 18    |
| Safety & Security              | 12    |
| Grounding & Visual Reasoning   | 14    |
| Agent Architectures & Training | 11    |
| Adversarial Attacks            | 8     |

**Total Papers:** 45

## Meet Us at NeurIPS

We'll be at NeurIPS in San Diego. If you're working on computer-use agents, building applications on top of CUA infrastructure, or just curious about where this space is heading, we'd love to connect.

- **Book a Meeting**: [cal.com/cua/neurips-slot](https://cal.com/cua/neurips-slot)
- **X/Twitter**: [@trycua](https://x.com/trycua)
- **Discord**: [discord.gg/cua-ai](https://discord.gg/cua-ai)

---

# The Papers

## 1. macOSWorld: A Multilingual Interactive Benchmark for GUI Agents

**Summary:** The first comprehensive benchmark for evaluating GUI agents on macOS. Features 202 multilingual interactive tasks across 30 applications (28 macOS-exclusive), with support for 5 languages (English, Chinese, Arabic, Japanese, Russian). Reveals a dramatic gap: proprietary agents achieve 30%+ success rate while open-source models lag below 5%. Also includes safety benchmarking for deception attacks.

**Key Findings:**

- Proprietary computer-use agents lead at above 30% success rate
- Open-source lightweight models struggle below 5%, highlighting need for macOS domain adaptation
- Multilingual benchmarks expose weaknesses, especially in Arabic (28.8% degradation vs English)
- Deception attacks are a general vulnerability requiring immediate attention

**Poster:** https://neurips.cc/virtual/2025/poster/117427

---

## 2. OS-Harm: A Benchmark for Measuring Safety of Computer Use Agents

**Summary:** A comprehensive safety benchmark built on OSWorld for testing computer-use agents across three harm categories: deliberate user misuse, prompt injection attacks, and model misbehavior. Includes 150 tasks spanning harassment, copyright infringement, disinformation, data exfiltration, and more. Proposes an automated judge achieving high agreement with human annotations (0.76-0.79 F1 score).

**Key Findings:**

- All tested models (o4-mini, Claude 3.7 Sonnet, Gemini 2.5 Pro) tend to directly comply with many deliberate misuse queries
- Models are relatively vulnerable to static prompt injections
- Models occasionally perform unsafe actions without explicit malicious prompts

**Poster:** https://neurips.cc/virtual/2025/loc/san-diego/poster/121772

---

## 3. OpenCUA: Open Foundations for Computer-Use Agents

**Summary:** A comprehensive open-source framework for scaling computer-use agent data and foundation models. Introduces AgentNet, the first large-scale computer-use task dataset spanning 3 operating systems and 200+ applications/websites. OpenCUA-72B achieves 45% success rate on OSWorld-Verified, establishing new state-of-the-art among open-source models.

**Key Contributions:**

- Annotation infrastructure for capturing human computer-use demonstrations
- AgentNet: large-scale dataset across 3 OSes and 200+ apps
- Scalable pipeline transforming demonstrations into state-action pairs with reflective Chain-of-Thought reasoning
- Models generalize well across domains and benefit from increased test-time computation

**Poster:** https://neurips.cc/virtual/2025/poster/119771

---

## 4. Mind2Web 2: Evaluating Agentic Search with Agent-as-a-Judge

**Summary:** A benchmark of 130 realistic, high-quality, long-horizon tasks for agentic search systems (like Deep Research), requiring real-time web browsing and extensive information synthesis. Constructed with 1000+ hours of human labor. Introduces Agent-as-a-Judge framework using tree-structured rubric design for automated evaluation.

**Key Findings:**

- OpenAI Deep Research achieves 50-70% of human performance while spending half the time
- First systematic evaluation of ten frontier agentic search systems vs. human performance
- Addresses the challenge of evaluating time-varying, complex answers

**Poster:** https://neurips.cc/virtual/2025/poster/121798

---

## 5. Scaling Computer-Use Grounding via User Interface Decomposition and Synthesis

**Summary:** Addresses GUI grounding—mapping natural language to specific UI actions—as a critical bottleneck in agent development. Introduces OSWorld-G benchmark (564 annotated samples) and Jedi dataset (4 million synthetic examples), the largest computer-use grounding dataset. Improved grounding directly enhances agentic capabilities, boosting OSWorld performance from 23% to 51%.

**Key Contributions:**

- OSWorld-G: comprehensive benchmark for diverse grounding tasks (text matching, element recognition, layout understanding, precise manipulation)
- Jedi: 4M examples through multi-perspective task decoupling
- Demonstrates compositional generalization to novel interfaces

**Poster:** https://neurips.cc/virtual/2025/poster/121759

---

## 6. RiOSWorld: Benchmarking the Risk of Multimodal Computer-Use Agents

**Summary:** Evaluates potential safety risks of MLLM-based agents during real-world computer manipulation. Features 492 risky tasks spanning web, social media, multimedia, OS, email, and office software. Categorizes risks into user-originated and environmental risks, evaluating both risk goal intention and completion.

**Key Findings:**

- Current computer-use agents face significant safety risks in real-world scenarios
- Safety principles designed for dialogue scenarios don't transfer well to computer-use
- Highlights necessity and urgency of safety alignment for computer-use agents

**Poster:** https://neurips.cc/virtual/2025/poster/117273

---

## 7. REAL: Benchmarking Autonomous Agents on Deterministic Simulations of Real Websites

**Summary:** A benchmark featuring high-fidelity, deterministic replicas of 11 widely-used websites across e-commerce, travel, communication, and professional networking. Contains 112 practical tasks requiring both information retrieval and state-changing actions. Enables reproducible evaluation without safety risks.

**Key Findings:**

- Best frontier language models achieve only 41% success rate
- Highlights critical gaps in autonomous web navigation and task completion
- Supports scalable post-training data generation

**Poster:** https://neurips.cc/virtual/2025/poster/121619

---

## 8. SE-GUI: Enhancing Visual Grounding for GUI Agents via Self-Evolutionary Reinforcement Learning

**Summary:** An RL-based framework for GUI grounding incorporating seed data curation, dense policy gradients, and self-evolutionary reinforcement finetuning using attention maps. With only 3K training samples, the 7B model achieves state-of-the-art on three grounding benchmarks, outperforming UI-TARS-72B by 24.2% on ScreenSpot-Pro.

**Key Results:**

- 47.3% accuracy on ScreenSpot-Pro with 7B model
- Outperforms 72B models with fraction of training data
- Demonstrates effectiveness of RL for high-resolution, complex environments

**Poster:** https://neurips.cc/virtual/2025/poster/118788

---

## 9. TRAP: Targeted Redirecting of Agentic Preferences

**Summary:** A generative adversarial framework that manipulates agent decision-making using diffusion-based semantic injections. Combines negative prompt degradation with positive semantic optimization. Without model access, produces visually natural images that induce consistent decision biases in agents.

**Key Findings:**

- Consistently induces decision-level preference redirection on LLaVA-34B, Gemma3, GPT-4o, and Mistral-3.2
- Outperforms baselines (SPSA, Bandit, standard diffusion)
- Exposes vulnerability: autonomous agents can be misled through visually subtle, semantically-guided manipulations

**Poster:** https://neurips.cc/virtual/2025/poster/117547

---

## 10. TheAgentCompany: Benchmarking LLM Agents on Consequential Real World Tasks

**Summary:** An extensible benchmark simulating a small software company environment where AI agents interact like digital workers: browsing the web, writing code, running programs, and communicating with coworkers. Tests agents on real professional tasks with important implications for industry adoption and labor market effects.

**Key Findings:**

- Best agent achieves 30% autonomous task completion
- Simpler tasks are solvable autonomously
- More difficult long-horizon tasks remain beyond current systems' reach

**Poster:** https://neurips.cc/virtual/2025/poster/121705

---

## 11. VideoGameQA-Bench: Evaluating Vision-Language Models for Video Game Quality Assurance

**Summary:** A comprehensive benchmark for VLMs in video game QA, encompassing visual unit testing, visual regression testing, needle-in-a-haystack challenges, glitch detection, and bug report generation for both images and videos. Addresses the need for standardized benchmarks in this labor-intensive domain.

**Key Focus:**

- First benchmark specifically designed for video game QA with VLMs
- Covers wide range of QA activities across images and videos
- Addresses lack of automation in game development workflows

**Poster:** https://neurips.cc/virtual/2025/poster/121740

---

## 12. WASP: Benchmarking Web Agent Security Against Prompt Injection Attacks

**Summary:** End-to-end benchmark for evaluating web agent security against prompt injection attacks. Tests realistic scenarios where even simple, low-effort human-written injections can deceive top-tier AI models including those with advanced reasoning.

**Key Findings:**

- Attacks partially succeed in up to 86% of cases
- State-of-the-art agents often struggle to fully complete attacker goals
- Reveals "security by incompetence"—agents' limitations sometimes prevent full attack success

**Poster:** https://neurips.cc/virtual/2025/poster/121728

---

## 13. AgentDAM: Privacy Leakage Evaluation for Autonomous Web Agents

**Summary:** Measures whether AI web-navigation agents follow the privacy principle of "data minimization"—using sensitive information only when truly necessary to complete a task. Simulates realistic web interaction scenarios end-to-end.

**Key Findings:**

- Agents built on GPT-4, Llama-3, and Claude are prone to inadvertent use of unnecessary sensitive information
- Proposes prompting-based defense that reduces information leakage
- End-to-end benchmarking provides more realistic measure than probing LLMs about privacy

**Poster:** https://neurips.cc/virtual/2025/poster/121443

---

## 14. Embodied Web Agents: Bridging Physical-Digital Realms for Integrated Agent Intelligence

**Summary:** A novel paradigm for AI agents that fluidly bridge embodiment and web-scale reasoning. Creates unified simulation integrating realistic 3D indoor/outdoor environments with functional web interfaces. Tasks include cooking from online recipes, navigating with dynamic map data, and interpreting landmarks using web knowledge.

**Key Contributions:**

- Unified platform combining 3D environments with web interfaces
- Benchmark spanning cooking, navigation, shopping, tourism, and geolocation
- Reveals significant performance gaps between AI systems and humans

**Poster:** https://neurips.cc/virtual/2025/poster/121809

---

## 15. VideoCAD: A Dataset and Model for Learning Long-Horizon 3D CAD UI Interactions from Video

**Summary:** The first attempt to model UI interactions for precision engineering tasks. Features 41K+ annotated video recordings of CAD operations with time horizons up to 20x longer than existing datasets. Proposes VideoCADFormer for learning CAD interactions directly from video.

**Key Contributions:**

- Large-scale synthetic dataset for CAD UI interactions
- VQA benchmark for evaluating spatial reasoning and video understanding
- Reveals challenges in precise action grounding and long-horizon dependencies

**Poster:** https://neurips.cc/virtual/2025/poster/121820

---

## 16. Look Before You Leap: A GUI-Critic-R1 Model for Pre-Operative Error Diagnosis

**Summary:** Introduces a pre-operative critic mechanism that provides feedback before action execution by reasoning about potential outcomes. Proposes Suggestion-aware Group Relative Policy Optimization (S-GRPO) for building the GUI-Critic-R1 model with fully automated data generation.

**Key Results:**

- Significant advantages in critic accuracy compared to current MLLMs
- Improved success rates and operational efficiency on GUI automation benchmarks
- Works across both mobile and web domains

**Poster:** https://neurips.cc/virtual/2025/poster/115566

---

## 17. Grounded Reinforcement Learning for Visual Reasoning (ViGoRL)

**Summary:** A vision-language model trained with RL to explicitly anchor each reasoning step to specific visual coordinates. Introduces multi-turn RL framework enabling dynamic zooming into predicted coordinates during reasoning.

**Key Results:**

- 86.4% on V\*Bench for visual search
- Outperforms supervised fine-tuning and conventional RL across spatial reasoning, visual search, and web-based grounding
- Grounding amplifies region exploration, subgoal setting, and visual verification

**Poster:** https://neurips.cc/virtual/2025/poster/120218

---

## 18. GUI-Actor: Coordinate-Free Visual Grounding for GUI Agents

**Summary:** A VLM-based method for coordinate-free GUI grounding using an attention-based action head. Enables proposing one or more action regions in a single forward pass with a grounding verifier for selection.

**Key Results:**

- GUI-Actor-7B achieves 44.6 on ScreenSpot-Pro with Qwen2.5-VL, outperforming UI-TARS-72B (38.1)
- Improved generalization to unseen resolutions and layouts
- Fine-tuning only ~100M parameters achieves SOTA performance

**Poster:** https://neurips.cc/virtual/2025/poster/119841

---

## 19. GUI-G1: Understanding R1-Zero-Like Training for Visual Grounding in GUI Agents

**Summary:** Extensive analysis of the R1-Zero paradigm (online RL + chain-of-thought reasoning) for GUI grounding. Identifies issues: longer reasoning chains lead to worse performance, reward hacking via box size exploitation, and overfitting easy examples.

**Solutions Proposed:**

- Fast Thinking Template for direct answer generation
- Box size constraint in reward function
- Difficulty-aware scaling in RL objective

**Key Results:**

- GUI-G1-3B achieves 90.3% on ScreenSpot and 37.1% on ScreenSpot-Pro
- Outperforms larger UI-TARS-7B with only 3B parameters

**Poster:** https://neurips.cc/virtual/2025/poster/120227

---

## 20. GUI-Reflection: Empowering Multimodal GUI Models with Self-Reflection Behavior

**Summary:** Framework integrating self-reflection and error correction into end-to-end multimodal GUI models through GUI-specific pre-training, offline SFT, and online reflection tuning. Enables self-reflection emergence with fully automated data generation.

**Key Contributions:**

- Scalable pipelines for automatic reflection/correction data from successful trajectories
- GUI-Reflection Task Suite for reflection-oriented abilities
- Diverse environment for online training on mobile devices
- Iterative online reflection tuning algorithm

**Poster:** https://neurips.cc/virtual/2025/poster/115826

---

## 21. InfantAgent-Next: A Multimodal Generalist Agent for Automated Computer Interaction

**Summary:** A generalist agent capable of multimodal computer interaction (text, images, audio, video). Integrates tool-based and pure vision agents within highly modular architecture, enabling collaborative step-by-step task solving.

**Key Results:**

- 7.27 accuracy gain over Claude-Computer-Use on OSWorld
- Evaluated on pure vision benchmarks (OSWorld), general benchmarks (GAIA), and tool-intensive benchmarks (SWE-Bench)
- Demonstrates value of modular, collaborative agent architecture

**Poster:** https://neurips.cc/virtual/2025/poster/118379

---

## 22. AdvEDM: Fine-grained Adversarial Attack against VLM-based Embodied Agents

**Summary:** A fine-grained adversarial attack framework that modifies VLM perception of only key objects while preserving semantics of remaining regions. Unlike broad semantic disruption, this targeted approach reduces conflicts with task context, making VLMs output valid but incorrect decisions that affect agent actions in the physical world.

**Key Contributions:**

- AdvEDM-R: removes semantics of specific objects from images
- AdvEDM-A: adds semantics of new objects into images
- Demonstrates fine-grained control with excellent attack performance in embodied decision-making tasks

**Poster:** https://neurips.cc/virtual/2025/poster/116436

---

## 23. BLINK-Twice: A Reasoning Benchmark on Visual Perception

**Summary:** A vision-centric reasoning benchmark grounded in challenging perceptual tasks. Unlike prior benchmarks, it moves beyond shallow perception ("see") to require fine-grained observation and analytical reasoning ("observe"). Features natural adversarial image pairs and annotated reasoning chains for process evaluation.

**Key Findings:**

- Tests 20 leading MLLMs including 12 foundation models and 8 reasoning-enhanced models
- Existing reasoning strategies (chain-of-thought, self-criticism) result in unstable and redundant reasoning
- Repeated image observation improves performance across models
- Active visual interaction (as in o3) highlights need for new vision reasoning paradigm

**Poster:** https://neurips.cc/virtual/2025/poster/121522

---

## 24. BadVLA: Backdoor Attacks on Vision-Language-Action Models

**Summary:** First systematic investigation of backdoor vulnerabilities in VLA models. Proposes Objective-Decoupled Optimization with two stages: explicit feature-space separation to isolate trigger representations, and conditional control deviations activated only by triggers.

**Key Findings:**

- Consistently achieves near-100% attack success rates with minimal impact on clean task accuracy
- Robust against common input perturbations, task transfers, and model fine-tuning
- Exposes critical security vulnerabilities in current VLA deployments under Training-as-a-Service paradigm

**Poster:** https://neurips.cc/virtual/2025/poster/115803

---

## 25. Benchmarking Egocentric Multimodal Goal Inference for Assistive Wearable Agents

**Summary:** Benchmark for proactively inferring user goals from multimodal contextual observations for wearable assistant agents (smart glasses). Dataset comprises ~30 hours from 363 participants across 3,482 recordings with visual, audio, digital, and longitudinal context.

**Key Findings:**

- Humans achieve 93% MCQ accuracy; best VLM reaches ~84%
- For open-ended generation, best models produce relevant goals only ~57% of the time
- Smaller models (suited for wearables) achieve ~49% accuracy
- Models benefit from relevant modalities but struggle with noisy ones

**Poster:** https://neurips.cc/virtual/2025/poster/121655

---

## 26. GAM-Agent: Game-Theoretic Multi-Agent Framework for Visual Reasoning

**Summary:** A game-theoretic multi-agent framework formulating reasoning as a non-zero-sum game between base agents (visual perception specialists) and a critical agent (logic/fact verification). Features uncertainty-aware controller for dynamic agent collaboration with multi-round debates.

**Key Results:**

- Boosts small-to-mid scale models (Qwen2.5-VL-7B, InternVL3-14B) by 5-6%
- Enhances strong models like GPT-4o by 2-3%
- Modular, scalable, and generalizable framework

**Poster:** https://neurips.cc/virtual/2025/poster/119144

---

## 27. GRIT: Teaching MLLMs to Think with Images

**Summary:** Introduces Grounded Reasoning with Images and Texts—a method for training MLLMs to generate reasoning chains interleaving natural language with explicit bounding box coordinates. Uses GRPO-GR reinforcement learning with rewards focused on answer accuracy and grounding format.

**Key Contributions:**

- Exceptional data efficiency: requires as few as 20 image-question-answer triplets
- Successfully unifies reasoning and grounding abilities
- Eliminates need for reasoning chain annotations or explicit bounding box labels

**Poster:** https://neurips.cc/virtual/2025/poster/118020

---

## 28. Safe RLHF-V: Safe Reinforcement Learning from Multi-modal Human Feedback

**Summary:** First multimodal safety alignment framework. Introduces BeaverTails-V (first dataset with dual preference annotations for helpfulness and safety), and Beaver-Guard-V (multi-level guardrail system defending against unsafe queries and adversarial attacks).

**Key Results:**

- Guard model improves precursor model's safety by average of 40.9% over five filtering rounds
- Safe RLHF-V enhances model safety by 34.2% and helpfulness by 34.3%
- First exploration of multi-modal safety alignment within constrained optimization

**Poster:** https://neurips.cc/virtual/2025/poster/118304

---

## 29. Dropout Decoding: Uncertainty-Guided Token Dropout for LVLM Reliability

**Summary:** An inference-time approach that quantifies visual token uncertainty and selectively masks uncertain tokens. Decomposes uncertainty into aleatoric and epistemic components, focusing on epistemic uncertainty for perception-related errors.

**Key Results:**

- Significantly reduces object hallucinations
- Enhances reliability and quality of LVLM outputs across diverse visual contexts
- Validated on CHAIR, THRONE, and MMBench benchmarks

**Poster:** https://neurips.cc/virtual/2025/poster/118572

---

## 30. FOCUS: Unified Vision-Language Modeling for Interactive Editing

**Summary:** A unified LVLM integrating segmentation-aware perception and controllable object-centric generation. Uses dual-branch visual encoder for global semantic context and fine-grained spatial details, with MoVQGAN-based visual tokenizer for discrete visual tokens.

**Key Contributions:**

- Progressive multi-stage training pipeline
- Segmentation masks jointly optimized as spatial condition prompts
- Bridges segmentation-aware perception with fine-grained visual synthesis

**Poster:** https://neurips.cc/virtual/2025/poster/119062

---

## 31. Fine-Grained Preference Optimization for Spatial Reasoning (SpatialReasoner-R1)

**Summary:** Introduces Multi-Model Monte Carlo Tree Search (M3CTS) for generating diverse Long Chain-of-Thought reasoning trajectories. Proposes fine-grained Direct Preference Optimization (fDPO) with segment-specific preference granularity guided by spatial reward mechanism.

**Key Results:**

- fDPO achieves 4.1% and 9.0% gains over standard DPO on spatial quality and quantity tasks
- SpatialReasoner-R1 sets new SOTA on SpatialRGPT-Bench, outperforming strongest baseline by 9.8%
- Maintains competitive performance on general vision-language tasks

**Poster:** https://neurips.cc/virtual/2025/poster/118573

---

## 32. Reason-RFT: Reinforcement Fine-Tuning for Visual Reasoning

**Summary:** A two-stage reinforcement fine-tuning framework: SFT with curated Chain-of-Thought data activates reasoning potential, followed by RL based on Group Relative Policy Optimization (GRPO) for domain shift adaptability.

**Key Advantages:**

- State-of-the-art results outperforming both open-source and proprietary models
- Robust performance under domain shifts across various tasks
- Excellent data efficiency in few-shot learning scenarios

**Poster:** https://neurips.cc/virtual/2025/poster/118345

---

## 33. Safe + Safe = Unsafe? Exploiting Safe Images to Jailbreak LVLMs

**Summary:** Reveals that safe images can be exploited for jailbreaking when combined with additional safe images and prompts, exploiting LVLMs' universal reasoning capabilities and safety snowball effect. Proposes Safety Snowball Agent (SSA) framework.

**Key Findings:**

- SSA can use nearly any image to induce LVLMs to produce unsafe content
- Achieves high jailbreak success rates against latest LVLMs
- Exploits inherent LVLM properties rather than alignment flaws

**Poster:** https://neurips.cc/virtual/2025/loc/san-diego/poster/116422

---

## 34. MIP against Agent: Malicious Image Patches Hijacking Multimodal OS Agents

**Summary:** Uncovers novel attack vector: Malicious Image Patches (MIPs)—adversarially perturbed screen regions that induce OS agents to perform harmful actions. MIPs can be embedded in wallpapers or shared on social media to exfiltrate sensitive data.

**Key Findings:**

- MIPs generalize across user prompts and screen configurations
- Can hijack multiple OS agents during execution of benign instructions
- Exposes critical security vulnerabilities requiring attention before widespread deployment

**Poster:** https://neurips.cc/virtual/2025/loc/san-diego/poster/117813

---

## 35. CogVLA: Cognition-Aligned Vision-Language-Action Models

**Summary:** A framework leveraging instruction-driven routing and sparsification for VLA efficiency. Features 3-stage progressive architecture inspired by human multimodal coordination: Encoder-FiLM Aggregation Routing, LLM-FiLM Pruning Routing, and V-L-A Coupled Attention.

**Key Results:**

- 97.4% success rate on LIBERO benchmark, 70.0% on real-world robotic tasks
- Reduces training costs by 2.5x and inference latency by 2.8x compared to OpenVLA
- Achieves state-of-the-art performance

**Poster:** https://neurips.cc/virtual/2025/poster/119023

---

## 36. Succeed or Learn Slowly (SoLS): Sample Efficient RL for Mobile App Control

**Summary:** Novel off-policy RL algorithm applying direct policy updates for positive samples and conservative, regularized updates for negative ones. Augmented with Successful Transition Replay (STR) for prioritizing successful interactions.

**Key Results:**

- At least 17% relative increase over existing methods on AndroidWorld benchmark
- Substantially fewer computational resources than GPT-4o-based methods
- 5-60x faster inference

**Poster:** https://neurips.cc/virtual/2025/poster/119910

---

## 37. TAI3: Testing Agent Integrity in Interpreting User Intent

**Summary:** An API-centric stress testing framework that uncovers intent integrity violations in LLM agents. Uses semantic partitioning to organize tasks into meaningful categories, with targeted mutations to expose subtle agent errors while preserving user intent.

**Key Contributions:**

- Datatype-aware strategy memory for retrieving effective mutation patterns
- Lightweight predictor for ranking mutations by error likelihood
- Generalizes to stronger target models using smaller LLMs for test generation

**Poster:** https://neurips.cc/virtual/2025/poster/118952

---

## 38. ThinkAct: Vision-Language-Action Reasoning via Reinforced Visual Latent Planning

**Summary:** A dual-system framework bridging high-level reasoning with low-level action execution. Trains multimodal LLM to generate embodied reasoning plans guided by action-aligned visual rewards, compressed into visual plan latents for downstream action execution.

**Key Capabilities:**

- Few-shot adaptation
- Long-horizon planning
- Self-correction behaviors in complex embodied AI tasks

**Poster:** https://neurips.cc/virtual/2025/poster/119747

---

## 39. Visualization-of-Thought Attack (VoTA) against VLMs

**Summary:** Automated attack framework that constructs chains of images with risky visual thoughts to challenge VLMs. Exploits the conflict between logical processing and safety protocols, leading to unsafe content generation.

**Key Results:**

- Improves average attack success rate by 26.71% (from 63.70% to 90.41%)
- Tested on 9 open-source and 6 commercial VLMs
- Outperforms state-of-the-art methods

**Poster:** https://neurips.cc/virtual/2025/poster/119873

---

## 40. Open CaptchaWorld: Benchmarking MLLM Agents on CAPTCHA Puzzles

**Summary:** First web-based benchmark evaluating MLLM agents on diverse CAPTCHA puzzles. Spans 20 modern CAPTCHA types (225 total) with novel metric: CAPTCHA Reasoning Depth quantifying cognitive and motor steps required.

**Key Findings:**

- Humans achieve 93.3% success rate
- State-of-the-art agents achieve at most 40.0% (Browser-Use OpenAI-o3)
- Highlights significant gap between human and agent capabilities

**Poster:** https://neurips.cc/virtual/2025/poster/121537

---

## 41. Pixel Reasoner: Pixel-Space Reasoning with Curiosity-Driven RL

**Summary:** Introduces pixel-space reasoning framework where VLMs use visual operations (zoom-in, select-frame) to directly inspect and infer from visual evidence. Two-phase training: instruction tuning on synthesized traces, then RL with curiosity-driven rewards.

**Key Results:**

- 84% on V\*Bench, 74% on TallyQA-Complex, 84% on InfographicsVQA
- Highest accuracy achieved by any open-source 7B model
- Enables proactive information gathering from complex visual inputs

**Poster:** https://neurips.cc/virtual/2025/poster/117667

---

## 42. BTL-UI: Blink-Think-Link Reasoning Model for GUI Agent

**Summary:** Brain-inspired framework decomposing interactions into three biologically plausible phases: Blink (rapid detection via saccadic-like attention), Think (higher-level reasoning/planning), and Link (executable command generation for motor control).

**Key Innovations:**

- Automated annotation pipeline for blink data
- BTL Reward: first rule-based reward mechanism driven by both process and outcome
- Competitive performance on static GUI understanding and dynamic interaction tasks

**Poster:** https://neurips.cc/virtual/2025/poster/119419

---

## 43. GUI Exploration Lab: Multi-Turn RL for Screen Navigation

**Summary:** Simulation environment engine enabling flexible definition of screens, icons, and navigation graphs with full environment access for agent training/evaluation. Demonstrates progressive training approach from SFT to multi-turn RL.

**Key Findings:**

- Supervised fine-tuning enables memorization of fundamental knowledge
- Single-turn RL enhances generalization to unseen scenarios
- Multi-turn RL encourages exploration strategies through interactive trial and error

**Poster:** https://neurips.cc/virtual/2025/loc/san-diego/poster/117497

---

## 44. GUI-Rise: Structured Reasoning and History Summarization for GUI Navigation

**Summary:** Reasoning-enhanced framework integrating structured reasoning, action prediction, and history summarization. Uses Chain-of-Thought analyses combining progress estimation and decision reasoning, trained via SFT and GRPO with history-aware rewards.

**Key Results:**

- State-of-the-art under identical training data conditions
- Particularly strong in out-of-domain scenarios
- Robust reasoning and generalization across diverse GUI navigation tasks

**Poster:** https://neurips.cc/virtual/2025/poster/117425

---

## 45. UI-Genie: A Self-Improving Framework for MLLM-based Mobile GUI Agents

**Summary:** Self-improving framework addressing trajectory verification and training data scalability. Features UI-Genie-RM (image-text interleaved reward model) and self-improvement pipeline with reward-guided exploration and outcome verification.

**Key Contributions:**

- UI-Genie-RM-517k: first reward-specific dataset for GUI agents
- UI-Genie-Agent-16k: high-quality synthetic trajectories without manual annotation
- State-of-the-art across multiple GUI agent benchmarks through three generations of self-improvement

**Poster:** https://neurips.cc/virtual/2025/poster/119990

---

## What We're Building

At Cua, we're focused on the infrastructure layer for computer-use agents: cloud sandboxes for safe execution, SDKs for agent development, and tools that make it easier to build and deploy agents in production.

If you're experimenting with any of the approaches in these papers, our [Cloud Sandboxes](https://cua.ai) provide isolated Linux, Windows, and macOS environments where you can test agent behavior without risk to real systems.

---

**Start building:** [cua.ai](https://cua.ai)

**Join the community:** [Discord](https://discord.gg/cua-ai)

```
Page 16/20FirstPrevNextLast