This is page 11 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── scripts
│ │ ├── get_pyproject_version.py
│ │ └── tests
│ │ ├── __init__.py
│ │ ├── README.md
│ │ └── test_get_pyproject_version.py
│ └── workflows
│ ├── bump-version.yml
│ ├── ci-lume.yml
│ ├── docker-publish-cua-linux.yml
│ ├── docker-publish-cua-windows.yml
│ ├── docker-publish-kasm.yml
│ ├── docker-publish-xfce.yml
│ ├── docker-reusable-publish.yml
│ ├── link-check.yml
│ ├── lint.yml
│ ├── npm-publish-cli.yml
│ ├── npm-publish-computer.yml
│ ├── npm-publish-core.yml
│ ├── publish-lume.yml
│ ├── pypi-publish-agent.yml
│ ├── pypi-publish-computer-server.yml
│ ├── pypi-publish-computer.yml
│ ├── pypi-publish-core.yml
│ ├── pypi-publish-mcp-server.yml
│ ├── pypi-publish-som.yml
│ ├── pypi-reusable-publish.yml
│ ├── python-tests.yml
│ ├── test-cua-models.yml
│ └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│ ├── docs.code-workspace
│ ├── extensions.json
│ ├── launch.json
│ ├── libs-ts.code-workspace
│ ├── lume.code-workspace
│ ├── lumier.code-workspace
│ ├── py.code-workspace
│ └── settings.json
├── blog
│ ├── app-use.md
│ ├── assets
│ │ ├── composite-agents.png
│ │ ├── docker-ubuntu-support.png
│ │ ├── hack-booth.png
│ │ ├── hack-closing-ceremony.jpg
│ │ ├── hack-cua-ollama-hud.jpeg
│ │ ├── hack-leaderboard.png
│ │ ├── hack-the-north.png
│ │ ├── hack-winners.jpeg
│ │ ├── hack-workshop.jpeg
│ │ ├── hud-agent-evals.png
│ │ └── trajectory-viewer.jpeg
│ ├── bringing-computer-use-to-the-web.md
│ ├── build-your-own-operator-on-macos-1.md
│ ├── build-your-own-operator-on-macos-2.md
│ ├── cloud-windows-ga-macos-preview.md
│ ├── composite-agents.md
│ ├── computer-use-agents-for-growth-hacking.md
│ ├── cua-hackathon.md
│ ├── cua-playground-preview.md
│ ├── cua-vlm-router.md
│ ├── hack-the-north.md
│ ├── hud-agent-evals.md
│ ├── human-in-the-loop.md
│ ├── introducing-cua-cli.md
│ ├── introducing-cua-cloud-containers.md
│ ├── lume-to-containerization.md
│ ├── neurips-2025-cua-papers.md
│ ├── sandboxed-python-execution.md
│ ├── training-computer-use-models-trajectories-1.md
│ ├── trajectory-viewer.md
│ ├── ubuntu-docker-support.md
│ └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│ ├── .env.example
│ ├── .gitignore
│ ├── content
│ │ └── docs
│ │ ├── agent-sdk
│ │ │ ├── agent-loops.mdx
│ │ │ ├── benchmarks
│ │ │ │ ├── index.mdx
│ │ │ │ ├── interactive.mdx
│ │ │ │ ├── introduction.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── osworld-verified.mdx
│ │ │ │ ├── screenspot-pro.mdx
│ │ │ │ └── screenspot-v2.mdx
│ │ │ ├── callbacks
│ │ │ │ ├── agent-lifecycle.mdx
│ │ │ │ ├── cost-saving.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── logging.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── pii-anonymization.mdx
│ │ │ │ └── trajectories.mdx
│ │ │ ├── chat-history.mdx
│ │ │ ├── custom-tools.mdx
│ │ │ ├── customizing-computeragent.mdx
│ │ │ ├── integrations
│ │ │ │ ├── hud.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── observability.mdx
│ │ │ ├── mcp-server
│ │ │ │ ├── client-integrations.mdx
│ │ │ │ ├── configuration.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── llm-integrations.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── tools.mdx
│ │ │ │ └── usage.mdx
│ │ │ ├── message-format.mdx
│ │ │ ├── meta.json
│ │ │ ├── migration-guide.mdx
│ │ │ ├── prompt-caching.mdx
│ │ │ ├── supported-agents
│ │ │ │ ├── composed-agents.mdx
│ │ │ │ ├── computer-use-agents.mdx
│ │ │ │ ├── grounding-models.mdx
│ │ │ │ ├── human-in-the-loop.mdx
│ │ │ │ └── meta.json
│ │ │ ├── supported-model-providers
│ │ │ │ ├── cua-vlm-router.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ └── local-models.mdx
│ │ │ ├── telemetry.mdx
│ │ │ └── usage-tracking.mdx
│ │ ├── cli-playbook
│ │ │ ├── commands.mdx
│ │ │ ├── index.mdx
│ │ │ └── meta.json
│ │ ├── computer-sdk
│ │ │ ├── cloud-vm-management.mdx
│ │ │ ├── commands.mdx
│ │ │ ├── computer-server
│ │ │ │ ├── Commands.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── REST-API.mdx
│ │ │ │ └── WebSocket-API.mdx
│ │ │ ├── computer-ui.mdx
│ │ │ ├── computers.mdx
│ │ │ ├── custom-computer-handlers.mdx
│ │ │ ├── meta.json
│ │ │ ├── sandboxed-python.mdx
│ │ │ └── tracing-api.mdx
│ │ ├── example-usecases
│ │ │ ├── form-filling.mdx
│ │ │ ├── gemini-complex-ui-navigation.mdx
│ │ │ ├── meta.json
│ │ │ ├── post-event-contact-export.mdx
│ │ │ └── windows-app-behind-vpn.mdx
│ │ ├── get-started
│ │ │ ├── meta.json
│ │ │ └── quickstart.mdx
│ │ ├── index.mdx
│ │ ├── macos-vm-cli-playbook
│ │ │ ├── lume
│ │ │ │ ├── cli-reference.mdx
│ │ │ │ ├── faq.md
│ │ │ │ ├── http-api.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── prebuilt-images.mdx
│ │ │ ├── lumier
│ │ │ │ ├── building-lumier.mdx
│ │ │ │ ├── docker-compose.mdx
│ │ │ │ ├── docker.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ └── meta.json
│ │ │ └── meta.json
│ │ └── meta.json
│ ├── next.config.mjs
│ ├── package-lock.json
│ ├── package.json
│ ├── pnpm-lock.yaml
│ ├── postcss.config.mjs
│ ├── public
│ │ └── img
│ │ ├── agent_gradio_ui.png
│ │ ├── agent.png
│ │ ├── bg-dark.jpg
│ │ ├── bg-light.jpg
│ │ ├── cli.png
│ │ ├── computer.png
│ │ ├── grounding-with-gemini3.gif
│ │ ├── hero.png
│ │ ├── laminar_trace_example.png
│ │ ├── som_box_threshold.png
│ │ └── som_iou_threshold.png
│ ├── README.md
│ ├── source.config.ts
│ ├── src
│ │ ├── app
│ │ │ ├── (home)
│ │ │ │ ├── [[...slug]]
│ │ │ │ │ └── page.tsx
│ │ │ │ └── layout.tsx
│ │ │ ├── api
│ │ │ │ ├── posthog
│ │ │ │ │ └── [...path]
│ │ │ │ │ └── route.ts
│ │ │ │ └── search
│ │ │ │ └── route.ts
│ │ │ ├── favicon.ico
│ │ │ ├── global.css
│ │ │ ├── layout.config.tsx
│ │ │ ├── layout.tsx
│ │ │ ├── llms.mdx
│ │ │ │ └── [[...slug]]
│ │ │ │ └── route.ts
│ │ │ ├── llms.txt
│ │ │ │ └── route.ts
│ │ │ ├── robots.ts
│ │ │ └── sitemap.ts
│ │ ├── assets
│ │ │ ├── discord-black.svg
│ │ │ ├── discord-white.svg
│ │ │ ├── logo-black.svg
│ │ │ └── logo-white.svg
│ │ ├── components
│ │ │ ├── analytics-tracker.tsx
│ │ │ ├── cookie-consent.tsx
│ │ │ ├── doc-actions-menu.tsx
│ │ │ ├── editable-code-block.tsx
│ │ │ ├── footer.tsx
│ │ │ ├── hero.tsx
│ │ │ ├── iou.tsx
│ │ │ ├── mermaid.tsx
│ │ │ └── page-feedback.tsx
│ │ ├── lib
│ │ │ ├── llms.ts
│ │ │ └── source.ts
│ │ ├── mdx-components.tsx
│ │ └── providers
│ │ └── posthog-provider.tsx
│ └── tsconfig.json
├── examples
│ ├── agent_examples.py
│ ├── agent_ui_examples.py
│ ├── browser_tool_example.py
│ ├── cloud_api_examples.py
│ ├── computer_examples_windows.py
│ ├── computer_examples.py
│ ├── computer_ui_examples.py
│ ├── computer-example-ts
│ │ ├── .env.example
│ │ ├── .gitignore
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── README.md
│ │ ├── src
│ │ │ ├── helpers.ts
│ │ │ └── index.ts
│ │ └── tsconfig.json
│ ├── docker_examples.py
│ ├── evals
│ │ ├── hud_eval_examples.py
│ │ └── wikipedia_most_linked.txt
│ ├── pylume_examples.py
│ ├── sandboxed_functions_examples.py
│ ├── som_examples.py
│ ├── tracing_examples.py
│ ├── utils.py
│ └── winsandbox_example.py
├── img
│ ├── agent_gradio_ui.png
│ ├── agent.png
│ ├── cli.png
│ ├── computer.png
│ ├── logo_black.png
│ └── logo_white.png
├── libs
│ ├── kasm
│ │ ├── Dockerfile
│ │ ├── LICENSE
│ │ ├── README.md
│ │ └── src
│ │ └── ubuntu
│ │ └── install
│ │ └── firefox
│ │ ├── custom_startup.sh
│ │ ├── firefox.desktop
│ │ └── install_firefox.sh
│ ├── lume
│ │ ├── .cursorignore
│ │ ├── CONTRIBUTING.md
│ │ ├── Development.md
│ │ ├── img
│ │ │ └── cli.png
│ │ ├── Package.resolved
│ │ ├── Package.swift
│ │ ├── README.md
│ │ ├── resources
│ │ │ └── lume.entitlements
│ │ ├── scripts
│ │ │ ├── build
│ │ │ │ ├── build-debug.sh
│ │ │ │ ├── build-release-notarized.sh
│ │ │ │ └── build-release.sh
│ │ │ └── install.sh
│ │ ├── src
│ │ │ ├── Commands
│ │ │ │ ├── Clone.swift
│ │ │ │ ├── Config.swift
│ │ │ │ ├── Create.swift
│ │ │ │ ├── Delete.swift
│ │ │ │ ├── Get.swift
│ │ │ │ ├── Images.swift
│ │ │ │ ├── IPSW.swift
│ │ │ │ ├── List.swift
│ │ │ │ ├── Logs.swift
│ │ │ │ ├── Options
│ │ │ │ │ └── FormatOption.swift
│ │ │ │ ├── Prune.swift
│ │ │ │ ├── Pull.swift
│ │ │ │ ├── Push.swift
│ │ │ │ ├── Run.swift
│ │ │ │ ├── Serve.swift
│ │ │ │ ├── Set.swift
│ │ │ │ └── Stop.swift
│ │ │ ├── ContainerRegistry
│ │ │ │ ├── ImageContainerRegistry.swift
│ │ │ │ ├── ImageList.swift
│ │ │ │ └── ImagesPrinter.swift
│ │ │ ├── Errors
│ │ │ │ └── Errors.swift
│ │ │ ├── FileSystem
│ │ │ │ ├── Home.swift
│ │ │ │ ├── Settings.swift
│ │ │ │ ├── VMConfig.swift
│ │ │ │ ├── VMDirectory.swift
│ │ │ │ └── VMLocation.swift
│ │ │ ├── LumeController.swift
│ │ │ ├── Main.swift
│ │ │ ├── Server
│ │ │ │ ├── Handlers.swift
│ │ │ │ ├── HTTP.swift
│ │ │ │ ├── Requests.swift
│ │ │ │ ├── Responses.swift
│ │ │ │ └── Server.swift
│ │ │ ├── Utils
│ │ │ │ ├── CommandRegistry.swift
│ │ │ │ ├── CommandUtils.swift
│ │ │ │ ├── Logger.swift
│ │ │ │ ├── NetworkUtils.swift
│ │ │ │ ├── Path.swift
│ │ │ │ ├── ProcessRunner.swift
│ │ │ │ ├── ProgressLogger.swift
│ │ │ │ ├── String.swift
│ │ │ │ └── Utils.swift
│ │ │ ├── Virtualization
│ │ │ │ ├── DarwinImageLoader.swift
│ │ │ │ ├── DHCPLeaseParser.swift
│ │ │ │ ├── ImageLoaderFactory.swift
│ │ │ │ └── VMVirtualizationService.swift
│ │ │ ├── VM
│ │ │ │ ├── DarwinVM.swift
│ │ │ │ ├── LinuxVM.swift
│ │ │ │ ├── VM.swift
│ │ │ │ ├── VMDetails.swift
│ │ │ │ ├── VMDetailsPrinter.swift
│ │ │ │ ├── VMDisplayResolution.swift
│ │ │ │ └── VMFactory.swift
│ │ │ └── VNC
│ │ │ ├── PassphraseGenerator.swift
│ │ │ └── VNCService.swift
│ │ └── tests
│ │ ├── Mocks
│ │ │ ├── MockVM.swift
│ │ │ ├── MockVMVirtualizationService.swift
│ │ │ └── MockVNCService.swift
│ │ ├── VM
│ │ │ └── VMDetailsPrinterTests.swift
│ │ ├── VMTests.swift
│ │ ├── VMVirtualizationServiceTests.swift
│ │ └── VNCServiceTests.swift
│ ├── lumier
│ │ ├── .dockerignore
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── bin
│ │ │ └── entry.sh
│ │ ├── config
│ │ │ └── constants.sh
│ │ ├── hooks
│ │ │ └── on-logon.sh
│ │ └── lib
│ │ ├── utils.sh
│ │ └── vm.sh
│ ├── python
│ │ ├── agent
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── agent
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── adapters
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── cua_adapter.py
│ │ │ │ │ ├── huggingfacelocal_adapter.py
│ │ │ │ │ ├── human_adapter.py
│ │ │ │ │ ├── mlxvlm_adapter.py
│ │ │ │ │ └── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ └── qwen2_5_vl.py
│ │ │ │ ├── agent.py
│ │ │ │ ├── callbacks
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── budget_manager.py
│ │ │ │ │ ├── image_retention.py
│ │ │ │ │ ├── logging.py
│ │ │ │ │ ├── operator_validator.py
│ │ │ │ │ ├── pii_anonymization.py
│ │ │ │ │ ├── prompt_instructions.py
│ │ │ │ │ ├── telemetry.py
│ │ │ │ │ └── trajectory_saver.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── computers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cua.py
│ │ │ │ │ └── custom.py
│ │ │ │ ├── decorators.py
│ │ │ │ ├── human_tool
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ ├── server.py
│ │ │ │ │ └── ui.py
│ │ │ │ ├── integrations
│ │ │ │ │ └── hud
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── agent.py
│ │ │ │ │ └── proxy.py
│ │ │ │ ├── loops
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── anthropic.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── composed_grounded.py
│ │ │ │ │ ├── gelato.py
│ │ │ │ │ ├── gemini.py
│ │ │ │ │ ├── generic_vlm.py
│ │ │ │ │ ├── glm45v.py
│ │ │ │ │ ├── gta1.py
│ │ │ │ │ ├── holo.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── model_types.csv
│ │ │ │ │ ├── moondream3.py
│ │ │ │ │ ├── omniparser.py
│ │ │ │ │ ├── openai.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ ├── uiins.py
│ │ │ │ │ ├── uitars.py
│ │ │ │ │ └── uitars2.py
│ │ │ │ ├── proxy
│ │ │ │ │ ├── examples.py
│ │ │ │ │ └── handlers.py
│ │ │ │ ├── responses.py
│ │ │ │ ├── tools
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── browser_tool.py
│ │ │ │ ├── types.py
│ │ │ │ └── ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ └── gradio
│ │ │ │ ├── __init__.py
│ │ │ │ ├── app.py
│ │ │ │ └── ui_components.py
│ │ │ ├── benchmarks
│ │ │ │ ├── .gitignore
│ │ │ │ ├── contrib.md
│ │ │ │ ├── interactive.py
│ │ │ │ ├── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ └── gta1.py
│ │ │ │ ├── README.md
│ │ │ │ ├── ss-pro.py
│ │ │ │ ├── ss-v2.py
│ │ │ │ └── utils.py
│ │ │ ├── example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer_agent.py
│ │ ├── bench-ui
│ │ │ ├── bench_ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api.py
│ │ │ │ └── child.py
│ │ │ ├── examples
│ │ │ │ ├── folder_example.py
│ │ │ │ ├── gui
│ │ │ │ │ ├── index.html
│ │ │ │ │ ├── logo.svg
│ │ │ │ │ └── styles.css
│ │ │ │ ├── output_overlay.png
│ │ │ │ └── simple_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ └── test_port_detection.py
│ │ ├── computer
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer
│ │ │ │ ├── __init__.py
│ │ │ │ ├── computer.py
│ │ │ │ ├── diorama_computer.py
│ │ │ │ ├── helpers.py
│ │ │ │ ├── interface
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ ├── models.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── logger.py
│ │ │ │ ├── models.py
│ │ │ │ ├── providers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cloud
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── docker
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── lume
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── lume_api.py
│ │ │ │ │ ├── lumier
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── types.py
│ │ │ │ │ └── winsandbox
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── provider.py
│ │ │ │ │ └── setup_script.ps1
│ │ │ │ ├── tracing_wrapper.py
│ │ │ │ ├── tracing.py
│ │ │ │ ├── ui
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ └── gradio
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── app.py
│ │ │ │ └── utils.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer.py
│ │ ├── computer-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── browser.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── diorama
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── diorama_computer.py
│ │ │ │ │ ├── diorama.py
│ │ │ │ │ ├── draw.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── safezone.py
│ │ │ │ ├── handlers
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── main.py
│ │ │ │ ├── server.py
│ │ │ │ ├── utils
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── wallpaper.py
│ │ │ │ └── watchdog.py
│ │ │ ├── examples
│ │ │ │ ├── __init__.py
│ │ │ │ └── usage_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ ├── run_server.py
│ │ │ ├── test_connection.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_server.py
│ │ ├── core
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── core
│ │ │ │ ├── __init__.py
│ │ │ │ └── telemetry
│ │ │ │ ├── __init__.py
│ │ │ │ └── posthog.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_telemetry.py
│ │ ├── mcp-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── build-extension.py
│ │ │ ├── CONCURRENT_SESSIONS.md
│ │ │ ├── desktop-extension
│ │ │ │ ├── cua-extension.mcpb
│ │ │ │ ├── desktop_extension.png
│ │ │ │ ├── manifest.json
│ │ │ │ ├── README.md
│ │ │ │ ├── requirements.txt
│ │ │ │ ├── run_server.sh
│ │ │ │ └── setup.py
│ │ │ ├── mcp_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── server.py
│ │ │ │ └── session_manager.py
│ │ │ ├── pdm.lock
│ │ │ ├── pyproject.toml
│ │ │ ├── QUICK_TEST_COMMANDS.sh
│ │ │ ├── quick_test_local_option.py
│ │ │ ├── README.md
│ │ │ ├── scripts
│ │ │ │ ├── install_mcp_server.sh
│ │ │ │ └── start_mcp_server.sh
│ │ │ ├── test_mcp_server_local_option.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_mcp_server.py
│ │ ├── pylume
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_pylume.py
│ │ └── som
│ │ ├── .bumpversion.cfg
│ │ ├── LICENSE
│ │ ├── poetry.toml
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ ├── som
│ │ │ ├── __init__.py
│ │ │ ├── detect.py
│ │ │ ├── detection.py
│ │ │ ├── models.py
│ │ │ ├── ocr.py
│ │ │ ├── util
│ │ │ │ └── utils.py
│ │ │ └── visualization.py
│ │ └── tests
│ │ ├── conftest.py
│ │ └── test_omniparser.py
│ ├── qemu-docker
│ │ ├── linux
│ │ │ ├── Dockerfile
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ ├── entry.sh
│ │ │ └── vm
│ │ │ ├── image
│ │ │ │ └── README.md
│ │ │ └── setup
│ │ │ ├── install.sh
│ │ │ ├── setup-cua-server.sh
│ │ │ └── setup.sh
│ │ ├── README.md
│ │ └── windows
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── entry.sh
│ │ └── vm
│ │ ├── image
│ │ │ └── README.md
│ │ └── setup
│ │ ├── install.bat
│ │ ├── on-logon.ps1
│ │ ├── setup-cua-server.ps1
│ │ ├── setup-utils.psm1
│ │ └── setup.ps1
│ ├── typescript
│ │ ├── .gitignore
│ │ ├── .nvmrc
│ │ ├── agent
│ │ │ ├── examples
│ │ │ │ ├── playground-example.html
│ │ │ │ └── README.md
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── client.ts
│ │ │ │ ├── index.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ └── client.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── computer
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── computer
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── providers
│ │ │ │ │ │ ├── base.ts
│ │ │ │ │ │ ├── cloud.ts
│ │ │ │ │ │ └── index.ts
│ │ │ │ │ └── types.ts
│ │ │ │ ├── index.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── base.ts
│ │ │ │ │ ├── factory.ts
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── linux.ts
│ │ │ │ │ ├── macos.ts
│ │ │ │ │ └── windows.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ ├── computer
│ │ │ │ │ └── cloud.test.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── factory.test.ts
│ │ │ │ │ ├── index.test.ts
│ │ │ │ │ ├── linux.test.ts
│ │ │ │ │ ├── macos.test.ts
│ │ │ │ │ └── windows.test.ts
│ │ │ │ └── setup.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── core
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── index.ts
│ │ │ │ └── telemetry
│ │ │ │ ├── clients
│ │ │ │ │ ├── index.ts
│ │ │ │ │ └── posthog.ts
│ │ │ │ └── index.ts
│ │ │ ├── tests
│ │ │ │ └── telemetry.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── cua-cli
│ │ │ ├── .gitignore
│ │ │ ├── .prettierrc
│ │ │ ├── bun.lock
│ │ │ ├── CLAUDE.md
│ │ │ ├── index.ts
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── auth.ts
│ │ │ │ ├── cli.ts
│ │ │ │ ├── commands
│ │ │ │ │ ├── auth.ts
│ │ │ │ │ └── sandbox.ts
│ │ │ │ ├── config.ts
│ │ │ │ ├── http.ts
│ │ │ │ ├── storage.ts
│ │ │ │ └── util.ts
│ │ │ └── tsconfig.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── pnpm-workspace.yaml
│ │ └── README.md
│ └── xfce
│ ├── .dockerignore
│ ├── .gitignore
│ ├── Development.md
│ ├── Dockerfile
│ ├── Dockerfile.dev
│ ├── README.md
│ └── src
│ ├── scripts
│ │ ├── resize-display.sh
│ │ ├── start-computer-server.sh
│ │ ├── start-novnc.sh
│ │ ├── start-vnc.sh
│ │ └── xstartup.sh
│ ├── supervisor
│ │ └── supervisord.conf
│ └── xfce-config
│ ├── helpers.rc
│ ├── xfce4-power-manager.xml
│ └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│ ├── agent_nb.ipynb
│ ├── blog
│ │ ├── build-your-own-operator-on-macos-1.ipynb
│ │ └── build-your-own-operator-on-macos-2.ipynb
│ ├── composite_agents_docker_nb.ipynb
│ ├── computer_nb.ipynb
│ ├── computer_server_nb.ipynb
│ ├── customizing_computeragent.ipynb
│ ├── eval_osworld.ipynb
│ ├── ollama_nb.ipynb
│ ├── README.md
│ ├── sota_hackathon_cloud.ipynb
│ └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── install-cli.ps1
│ ├── install-cli.sh
│ ├── playground-docker.sh
│ ├── playground.sh
│ ├── run-docker-dev.sh
│ └── typescript-typecheck.js
├── TESTING.md
├── tests
│ ├── agent_loop_testing
│ │ ├── agent_test.py
│ │ └── README.md
│ ├── pytest.ini
│ ├── shell_cmd.py
│ ├── test_files.py
│ ├── test_mcp_server_session_management.py
│ ├── test_mcp_server_streaming.py
│ ├── test_shell_bash.py
│ ├── test_telemetry.py
│ ├── test_tracing.py
│ ├── test_venv.py
│ └── test_watchdog.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/blog/sandboxed-python-execution.md:
--------------------------------------------------------------------------------
```markdown
# Sandboxed Python Execution: Run Code Safely in Cua Containers
_Published on June 23, 2025 by Dillon DuPont_
Cua's computer-use capabilities that we touched on in [Building your own Operator on macOS - Part 2](build-your-own-operator-on-macos-2.md) – your AI agents can click, scroll, type, and interact with any desktop application. But what if your agent needs to do more than just UI automation? What if it needs to process data, make API calls, analyze images, or run complex logic alongside those UI interactions, within the same virtual environment?
That's where Cua's `@sandboxed` decorator comes in. While Cua handles the clicking and typing, sandboxed execution lets you run full Python code inside the same virtual environment. It's like giving your AI agents a programming brain to complement their clicking fingers.
Think of it as the perfect marriage: Cua handles the "what you see" (UI interactions), while sandboxed Python handles the "what you compute" (data processing, logic, API calls) – all happening in the same isolated environment.
## So, what exactly is sandboxed execution?
Cua excels at automating user interfaces – clicking buttons, filling forms, navigating applications. But modern AI agents need to do more than just UI automation. They need to process the data they collect, make intelligent decisions, call external APIs, and run sophisticated algorithms.
Sandboxed execution bridges this gap. You write a Python function, decorate it with `@sandboxed`, and it runs inside your Cua container alongside your UI automation. Your agent can now click a button, extract some data, process it with Python, and then use those results to decide what to click next.
Here's what makes this combination powerful for AI agent development:
- **Unified environment**: Your UI automation and code execution happen in the same container
- **Rich capabilities**: Combine Cua's clicking with Python's data processing, API calls, and libraries
- **Seamless integration**: Pass data between UI interactions and Python functions effortlessly
- **Cross-platform consistency**: Your Python code runs the same way across different Cua environments
- **Complete workflows**: Build agents that can both interact with apps AND process the data they collect
## The architecture behind @sandboxed
Let's jump right into an example that'll make this crystal clear:
```python
from computer.helpers import sandboxed
@sandboxed("demo_venv")
def greet_and_print(name):
"""This function runs inside the container"""
import PyXA # macOS-specific library
safari = PyXA.Application("Safari")
html = safari.current_document.source()
print(f"Hello from inside the container, {name}!")
return {"greeted": name, "safari_html": html}
# When called, this executes in the container
result = await greet_and_print("Cua")
```
What's happening here? When you call `greet_and_print()`, Cua extracts the function's source code, transmits it to the container, and executes it there. The result returns to you seamlessly, while the actual execution remains completely isolated.
## How does sandboxed execution work?
Cua's sandboxed execution system employs several key architectural components:
### 1. Source Code Extraction
Cua uses Python's `inspect.getsource()` to extract your function's source code and reconstruct the function definition in the remote environment.
### 2. Virtual Environment Isolation
Each sandboxed function runs in a named virtual environment within the container. This provides complete dependency isolation between different functions and their respective environments.
### 3. Data Serialization and Transport
Arguments and return values are serialized as JSON and transported between the host and container. This ensures compatibility across different Python versions and execution environments.
### 4. Comprehensive Error Handling
The system captures both successful results and exceptions, preserving stack traces and error information for debugging purposes.
## Getting your sandbox ready
Setting up sandboxed execution is simple:
```python
import asyncio
from computer.computer import Computer
from computer.helpers import sandboxed, set_default_computer
async def main():
# Fire up the computer
computer = Computer()
await computer.run()
# Make it the default for all sandboxed functions
set_default_computer(computer)
# Install some packages in a virtual environment
await computer.venv_install("demo_venv", ["requests", "beautifulsoup4"])
```
If you want to get fancy, you can specify which computer instance to use:
```python
@sandboxed("my_venv", computer=my_specific_computer)
def my_function():
# This runs on your specified computer instance
pass
```
## Real-world examples that actually work
### Browser automation without the headaches
Ever tried to automate a browser and had it crash your entire system? Yeah, us too. Here's how to do it safely:
```python
@sandboxed("browser_env")
def automate_browser_with_playwright():
"""Automate browser interactions using Playwright"""
from playwright.sync_api import sync_playwright
import time
import base64
from datetime import datetime
try:
with sync_playwright() as p:
# Launch browser (visible, because why not?)
browser = p.chromium.launch(
headless=False,
args=['--no-sandbox', '--disable-dev-shm-usage']
)
page = browser.new_page()
page.set_viewport_size({"width": 1280, "height": 720})
actions = []
screenshots = {}
# Let's visit example.com and poke around
page.goto("https://example.com")
actions.append("Navigated to example.com")
# Grab a screenshot because screenshots are cool
screenshot_bytes = page.screenshot(full_page=True)
screenshots["initial"] = base64.b64encode(screenshot_bytes).decode()
# Get some basic info
title = page.title()
actions.append(f"Page title: {title}")
# Find links and headings
try:
links = page.locator("a").all()
link_texts = [link.text_content() for link in links[:5]]
actions.append(f"Found {len(links)} links: {link_texts}")
headings = page.locator("h1, h2, h3").all()
heading_texts = [h.text_content() for h in headings[:3]]
actions.append(f"Found headings: {heading_texts}")
except Exception as e:
actions.append(f"Element interaction error: {str(e)}")
# Let's try a form for good measure
try:
page.goto("https://httpbin.org/forms/post")
actions.append("Navigated to form page")
# Fill out the form
page.fill('input[name="custname"]', "Test User from Sandboxed Environment")
page.fill('input[name="custtel"]', "555-0123")
page.fill('input[name="custemail"]', "[email protected]")
page.select_option('select[name="size"]', "large")
actions.append("Filled out form fields")
# Submit and see what happens
page.click('input[type="submit"]')
page.wait_for_load_state("networkidle")
actions.append("Submitted form")
except Exception as e:
actions.append(f"Form interaction error: {str(e)}")
browser.close()
return {
"actions_performed": actions,
"screenshots": screenshots,
"success": True
}
except Exception as e:
return {"error": f"Browser automation failed: {str(e)}"}
# Install Playwright and its browsers
await computer.venv_install("browser_env", ["playwright"])
await computer.venv_cmd("browser_env", "playwright install chromium")
# Run the automation
result = await automate_browser_with_playwright()
print(f"Performed {len(result.get('actions_performed', []))} actions")
```
### Building code analysis agents
Want to build agents that can analyze code safely? Here's a security audit tool that won't accidentally `eval()` your system into oblivion:
```python
@sandboxed("analysis_env")
def security_audit_tool(code_snippet):
"""Analyze code for potential security issues"""
import ast
import re
issues = []
# Check for the usual suspects
dangerous_patterns = [
(r'eval\s*\(', "Use of eval() function"),
(r'exec\s*\(', "Use of exec() function"),
(r'__import__\s*\(', "Dynamic import usage"),
(r'subprocess\.', "Subprocess usage"),
(r'os\.system\s*\(', "OS system call"),
]
for pattern, description in dangerous_patterns:
if re.search(pattern, code_snippet):
issues.append(description)
# Get fancy with AST analysis
try:
tree = ast.parse(code_snippet)
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if hasattr(node.func, 'id'):
if node.func.id in ['eval', 'exec', 'compile']:
issues.append(f"Dangerous function call: {node.func.id}")
except SyntaxError:
issues.append("Syntax error in code")
return {
"security_issues": issues,
"risk_level": "HIGH" if len(issues) > 2 else "MEDIUM" if issues else "LOW"
}
# Test it on some sketchy code
audit_result = await security_audit_tool("eval(user_input)")
print(f"Security audit: {audit_result}")
```
### Desktop automation in the cloud
Here's where things get really interesting. Cua Cloud Sandbox comes with full desktop environments, so you can automate GUIs:
```python
@sandboxed("desktop_env")
def take_screenshot_and_analyze():
"""Take a screenshot and analyze the desktop"""
import io
import base64
from PIL import ImageGrab
from datetime import datetime
try:
# Grab the screen
screenshot = ImageGrab.grab()
# Convert to base64 for easy transport
buffer = io.BytesIO()
screenshot.save(buffer, format='PNG')
screenshot_data = base64.b64encode(buffer.getvalue()).decode()
# Get some basic info
screen_info = {
"size": screenshot.size,
"mode": screenshot.mode,
"timestamp": datetime.now().isoformat()
}
# Analyze the colors (because why not?)
colors = screenshot.getcolors(maxcolors=256*256*256)
dominant_color = max(colors, key=lambda x: x[0])[1] if colors else None
return {
"screenshot_base64": screenshot_data,
"screen_info": screen_info,
"dominant_color": dominant_color,
"unique_colors": len(colors) if colors else 0
}
except Exception as e:
return {"error": f"Screenshot failed: {str(e)}"}
# Install the dependencies
await computer.venv_install("desktop_env", ["Pillow"])
# Take and analyze a screenshot
result = await take_screenshot_and_analyze()
print("Desktop analysis complete!")
```
## Pro tips for sandboxed success
### Keep it self-contained
Always put your imports inside the function. Trust us on this one:
```python
@sandboxed("good_env")
def good_function():
import os # Import inside the function
import json
# Your code here
return {"result": "success"}
```
### Install dependencies first
Don't forget to install packages before using them:
```python
# Install first
await computer.venv_install("my_env", ["pandas", "numpy", "matplotlib"])
@sandboxed("my_env")
def data_analysis():
import pandas as pd
import numpy as np
# Now you can use them
```
### Use descriptive environment names
Future you will thank you:
```python
@sandboxed("data_processing_env")
def process_data(): pass
@sandboxed("web_scraping_env")
def scrape_site(): pass
@sandboxed("ml_training_env")
def train_model(): pass
```
### Always handle errors gracefully
Things break. Plan for it:
```python
@sandboxed("robust_env")
def robust_function(data):
try:
result = process_data(data)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e)}
```
## What about performance?
Let's be honest – there's some overhead here. Code needs to be serialized, sent over the network, and executed remotely. But for most use cases, the benefits far outweigh the costs.
If you're building something performance-critical, consider:
- Batching multiple operations into a single sandboxed function
- Minimizing data transfer between host and container
- Using persistent virtual environments
## The security angle
This is where sandboxed execution really shines:
1. **Complete process isolation** – code runs in a separate container
2. **File system protection** – limited access to your host files
3. **Network isolation** – controlled network access
4. **Clean environments** – no package conflicts or pollution
5. **Resource limits** – container-level constraints keep things in check
## Ready to get started?
The `@sandboxed` decorator is one of those features that sounds simple but opens up a world of possibilities. Whether you're testing sketchy code, building AI agents, or just want to keep your development environment pristine, it's got you covered.
Give it a try in your next Cua project and see how liberating it feels to run code without fear!
Happy coding (safely)!
---
_Want to dive deeper? Check out our [sandboxed functions examples](https://github.com/trycua/cua/blob/main/examples/sandboxed_functions_examples.py) and [virtual environment tests](https://github.com/trycua/cua/blob/main/tests/test_venv.py) on GitHub. Questions? Come chat with us on Discord!_
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/gemini.py:
--------------------------------------------------------------------------------
```python
"""
Gemini 2.5 Computer Use agent loop
Maps internal Agent SDK message format to Google's Gemini Computer Use API and back.
Key features:
- Lazy import of google.genai
- Configure Computer Use tool with excluded browser-specific predefined functions
- Optional custom function declarations hook for computer-call specific functions
- Convert Gemini function_call parts into internal computer_call actions
"""
from __future__ import annotations
import base64
import io
import uuid
from typing import Any, Dict, List, Optional, Tuple
from PIL import Image
from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..types import AgentCapability
def _lazy_import_genai():
"""Import google.genai lazily to avoid hard dependency unless used."""
try:
from google import genai # type: ignore
from google.genai import types # type: ignore
return genai, types
except Exception as e: # pragma: no cover
raise RuntimeError(
"google.genai is required for the Gemini Computer Use loop. Install the Google Gemini SDK."
) from e
def _data_url_to_bytes(data_url: str) -> Tuple[bytes, str]:
"""Convert a data URL to raw bytes and mime type."""
if not data_url.startswith("data:"):
# Assume it's base64 png payload
try:
return base64.b64decode(data_url), "image/png"
except Exception:
return b"", "application/octet-stream"
header, b64 = data_url.split(",", 1)
mime = "image/png"
if ";" in header:
mime = header.split(";")[0].split(":", 1)[1] or "image/png"
return base64.b64decode(b64), mime
def _bytes_image_size(img_bytes: bytes) -> Tuple[int, int]:
try:
img = Image.open(io.BytesIO(img_bytes))
return img.size
except Exception:
return (1024, 768)
def _find_last_user_text(messages: List[Dict[str, Any]]) -> List[str]:
texts: List[str] = []
for msg in reversed(messages):
if msg.get("type") in (None, "message") and msg.get("role") == "user":
content = msg.get("content")
if isinstance(content, str):
return [content]
elif isinstance(content, list):
for c in content:
if c.get("type") in ("input_text", "output_text") and c.get("text"):
texts.append(c["text"]) # newest first
if texts:
return list(reversed(texts))
return []
def _find_last_screenshot(messages: List[Dict[str, Any]]) -> Optional[bytes]:
for msg in reversed(messages):
if msg.get("type") == "computer_call_output":
out = msg.get("output", {})
if isinstance(out, dict) and out.get("type") in ("input_image", "computer_screenshot"):
image_url = out.get("image_url", "")
if image_url:
data, _ = _data_url_to_bytes(image_url)
return data
return None
def _denormalize(v: int, size: int) -> int:
# Gemini returns 0-999 normalized
try:
return max(0, min(size - 1, int(round(v / 1000 * size))))
except Exception:
return 0
def _map_gemini_fc_to_computer_call(
fc: Dict[str, Any],
screen_w: int,
screen_h: int,
) -> Optional[Dict[str, Any]]:
name = fc.get("name")
args = fc.get("args", {}) or {}
action: Dict[str, Any] = {}
if name == "click_at":
x = _denormalize(int(args.get("x", 0)), screen_w)
y = _denormalize(int(args.get("y", 0)), screen_h)
action = {"type": "click", "x": x, "y": y, "button": "left"}
elif name == "type_text_at":
x = _denormalize(int(args.get("x", 0)), screen_w)
y = _denormalize(int(args.get("y", 0)), screen_h)
text = args.get("text", "")
if args.get("press_enter") == True:
text += "\n"
action = {"type": "type", "x": x, "y": y, "text": text}
elif name == "hover_at":
x = _denormalize(int(args.get("x", 0)), screen_w)
y = _denormalize(int(args.get("y", 0)), screen_h)
action = {"type": "move", "x": x, "y": y}
elif name == "key_combination":
keys = str(args.get("keys", ""))
action = {"type": "keypress", "keys": keys}
elif name == "scroll_document":
direction = args.get("direction", "down")
magnitude = 800
dx, dy = 0, 0
if direction == "down":
dy = magnitude
elif direction == "up":
dy = -magnitude
elif direction == "right":
dx = magnitude
elif direction == "left":
dx = -magnitude
action = {
"type": "scroll",
"scroll_x": dx,
"scroll_y": dy,
"x": int(screen_w / 2),
"y": int(screen_h / 2),
}
elif name == "scroll_at":
x = _denormalize(int(args.get("x", 500)), screen_w)
y = _denormalize(int(args.get("y", 500)), screen_h)
direction = args.get("direction", "down")
magnitude = int(args.get("magnitude", 800))
dx, dy = 0, 0
if direction == "down":
dy = magnitude
elif direction == "up":
dy = -magnitude
elif direction == "right":
dx = magnitude
elif direction == "left":
dx = -magnitude
action = {"type": "scroll", "scroll_x": dx, "scroll_y": dy, "x": x, "y": y}
elif name == "drag_and_drop":
x = _denormalize(int(args.get("x", 0)), screen_w)
y = _denormalize(int(args.get("y", 0)), screen_h)
dx = _denormalize(int(args.get("destination_x", x)), screen_w)
dy = _denormalize(int(args.get("destination_y", y)), screen_h)
action = {
"type": "drag",
"start_x": x,
"start_y": y,
"end_x": dx,
"end_y": dy,
"button": "left",
}
elif name == "wait_5_seconds":
action = {"type": "wait"}
else:
# Unsupported / excluded browser-specific or custom function; ignore
return None
return {
"type": "computer_call",
"call_id": uuid.uuid4().hex,
"status": "completed",
"action": action,
}
@register_agent(models=r"^gemini-2\.5-computer-use-preview-10-2025$")
class GeminiComputerUseConfig(AsyncAgentConfig):
async def predict_step(
self,
messages: List[Dict[str, Any]],
model: str,
tools: Optional[List[Dict[str, Any]]] = None,
max_retries: Optional[int] = None,
stream: bool = False,
computer_handler=None,
use_prompt_caching: Optional[bool] = False,
_on_api_start=None,
_on_api_end=None,
_on_usage=None,
_on_screenshot=None,
**kwargs,
) -> Dict[str, Any]:
genai, types = _lazy_import_genai()
client = genai.Client()
# Build excluded predefined functions for browser-specific behavior
excluded = [
"open_web_browser",
"search",
"navigate",
"go_forward",
"go_back",
"scroll_document",
]
# Optional custom functions: can be extended by host code via `tools` parameter later if desired
CUSTOM_FUNCTION_DECLARATIONS: List[Any] = []
# Compose tools config
generate_content_config = types.GenerateContentConfig(
tools=[
types.Tool(
computer_use=types.ComputerUse(
environment=types.Environment.ENVIRONMENT_BROWSER,
excluded_predefined_functions=excluded,
)
),
# types.Tool(function_declarations=CUSTOM_FUNCTION_DECLARATIONS), # enable when custom functions needed
]
)
# Prepare contents: last user text + latest screenshot
user_texts = _find_last_user_text(messages)
screenshot_bytes = _find_last_screenshot(messages)
parts: List[Any] = []
for t in user_texts:
parts.append(types.Part(text=t))
screen_w, screen_h = 1024, 768
if screenshot_bytes:
screen_w, screen_h = _bytes_image_size(screenshot_bytes)
parts.append(types.Part.from_bytes(data=screenshot_bytes, mime_type="image/png"))
# If we don't have any content, at least pass an empty user part to prompt reasoning
if not parts:
parts = [types.Part(text="Proceed to the next action.")]
contents = [types.Content(role="user", parts=parts)]
api_kwargs = {
"model": model,
"contents": contents,
"config": generate_content_config,
}
if _on_api_start:
await _on_api_start(
{
"model": api_kwargs["model"],
# "contents": api_kwargs["contents"], # Disabled for now
"config": api_kwargs["config"],
}
)
response = client.models.generate_content(**api_kwargs)
if _on_api_end:
await _on_api_end(
{
"model": api_kwargs["model"],
# "contents": api_kwargs["contents"], # Disabled for now
"config": api_kwargs["config"],
},
response,
)
# Usage (Gemini SDK may not always provide token usage; populate when available)
usage: Dict[str, Any] = {}
try:
# Some SDKs expose response.usage; if available, copy
if getattr(response, "usage_metadata", None):
md = response.usage_metadata
usage = {
"prompt_tokens": getattr(md, "prompt_token_count", None) or 0,
"completion_tokens": getattr(md, "candidates_token_count", None) or 0,
"total_tokens": getattr(md, "total_token_count", None) or 0,
}
except Exception:
pass
if _on_usage and usage:
await _on_usage(usage)
# Parse output into internal items
output_items: List[Dict[str, Any]] = []
candidate = response.candidates[0]
# Text parts from the model (assistant message)
text_parts: List[str] = []
function_calls: List[Dict[str, Any]] = []
for p in candidate.content.parts:
if getattr(p, "text", None):
text_parts.append(p.text)
if getattr(p, "function_call", None):
# p.function_call has name and args
fc = {
"name": getattr(p.function_call, "name", None),
"args": dict(getattr(p.function_call, "args", {}) or {}),
}
function_calls.append(fc)
if text_parts:
output_items.append(
{
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": "\n".join(text_parts)}],
}
)
# Map function calls to internal computer_call actions
for fc in function_calls:
item = _map_gemini_fc_to_computer_call(fc, screen_w, screen_h)
if item is not None:
output_items.append(item)
return {"output": output_items, "usage": usage}
async def predict_click(
self,
model: str,
image_b64: str,
instruction: str,
**kwargs,
) -> Optional[Tuple[float, float]]:
"""Ask Gemini CUA to output a single click action for the given instruction.
Excludes all predefined tools except `click_at` and sends the screenshot.
Returns pixel (x, y) if a click is proposed, else None.
"""
genai, types = _lazy_import_genai()
client = genai.Client()
# Exclude all but click_at
exclude_all_but_click = [
"open_web_browser",
"wait_5_seconds",
"go_back",
"go_forward",
"search",
"navigate",
"hover_at",
"type_text_at",
"key_combination",
"scroll_document",
"scroll_at",
"drag_and_drop",
]
config = types.GenerateContentConfig(
tools=[
types.Tool(
computer_use=types.ComputerUse(
environment=types.Environment.ENVIRONMENT_BROWSER,
excluded_predefined_functions=exclude_all_but_click,
)
)
]
)
# Prepare prompt parts
try:
img_bytes = base64.b64decode(image_b64)
except Exception:
img_bytes = b""
w, h = _bytes_image_size(img_bytes) if img_bytes else (1024, 768)
parts: List[Any] = [types.Part(text=f"Click {instruction}.")]
if img_bytes:
parts.append(types.Part.from_bytes(data=img_bytes, mime_type="image/png"))
contents = [types.Content(role="user", parts=parts)]
response = client.models.generate_content(
model=model,
contents=contents,
config=config,
)
# Parse first click_at
try:
candidate = response.candidates[0]
for p in candidate.content.parts:
fc = getattr(p, "function_call", None)
if fc and getattr(fc, "name", None) == "click_at":
args = dict(getattr(fc, "args", {}) or {})
x = _denormalize(int(args.get("x", 0)), w)
y = _denormalize(int(args.get("y", 0)), h)
return float(x), float(y)
except Exception:
return None
return None
def get_capabilities(self) -> List[AgentCapability]:
return ["click", "step"]
```
--------------------------------------------------------------------------------
/libs/lume/src/FileSystem/Home.swift:
--------------------------------------------------------------------------------
```swift
import Foundation
/// Manages the application's home directory and virtual machine directories.
/// Responsible for creating, accessing, and validating the application's directory structure.
final class Home {
// MARK: - Constants
private enum Constants {
static let defaultDirectoryName = ".lume"
static let homeDirPath = "~/\(defaultDirectoryName)"
}
// MARK: - Properties
private var _homeDir: Path
private let settingsManager: SettingsManager
private let fileManager: FileManager
private var locations: [String: VMLocation] = [:]
// Current home directory based on default location
var homeDir: Path {
return _homeDir
}
// MARK: - Initialization
init(
settingsManager: SettingsManager = SettingsManager.shared,
fileManager: FileManager = .default
) {
self.settingsManager = settingsManager
self.fileManager = fileManager
// Get home directory path from settings or use default
let settings = settingsManager.getSettings()
guard let defaultLocation = settings.defaultLocation else {
fatalError("No default VM location found")
}
self._homeDir = Path(defaultLocation.path)
// Cache all locations
for location in settings.vmLocations {
locations[location.name] = location
}
}
// MARK: - VM Directory Management
/// Creates a temporary VM directory with a unique identifier
/// - Returns: A VMDirectory instance representing the created directory
/// - Throws: HomeError if directory creation fails
func createTempVMDirectory() throws -> VMDirectory {
let uuid = UUID().uuidString
let tempDir = homeDir.directory(uuid)
Logger.info("Creating temporary directory", metadata: ["path": tempDir.path])
do {
try createDirectory(at: tempDir.url)
return VMDirectory(tempDir)
} catch {
throw HomeError.directoryCreationFailed(path: tempDir.path)
}
}
/// Gets a VM directory for a specific VM name and optional location
///
/// - Parameters:
/// - name: Name of the VM directory
/// - storage: Optional name of the VM location (default: default location)
/// - Returns: A VMDirectory instance
/// - Throws: HomeError if location not found
func getVMDirectory(_ name: String, storage: String? = nil) throws -> VMDirectory {
// Special case for ephemeral storage using macOS temporary directory
if let storage = storage, storage == "ephemeral" {
// Get the current temporary directory
let tmpDir = ProcessInfo.processInfo.environment["TMPDIR"] ?? "/tmp"
// Remove trailing slash if present
let cleanPath = tmpDir.hasSuffix("/") ? String(tmpDir.dropLast()) : tmpDir
// Create the directory if it doesn't exist
if !fileExists(at: cleanPath) {
try createVMLocation(at: cleanPath)
}
let baseDir = Path(cleanPath)
return VMDirectory(baseDir.directory(name))
}
// Check if storage is a direct path
if let storage = storage, (storage.contains("/") || storage.contains("\\")) {
let cleanPath = storage.hasSuffix("/") ? String(storage.dropLast()) : storage
let baseDir = Path(cleanPath)
return VMDirectory(baseDir.directory(name))
}
let location: VMLocation
if let storage = storage {
// Get a specific location
guard let loc = locations[storage] else {
throw VMLocationError.locationNotFound(name: storage)
}
location = loc
} else {
// Use default location
let settings = settingsManager.getSettings()
guard let defaultLocation = settings.defaultLocation else {
throw HomeError.invalidHomeDirectory
}
location = defaultLocation
}
let baseDir = Path(location.expandedPath)
return VMDirectory(baseDir.directory(name))
}
/// Gets a VM directory from a direct file path
///
/// - Parameters:
/// - name: Name of the VM directory
/// - storagePath: Direct file system path where the VM is located
/// - Returns: A VMDirectory instance
/// - Throws: HomeError if path is invalid
func getVMDirectoryFromPath(_ name: String, storagePath: String) throws -> VMDirectory {
let baseDir = Path(storagePath)
// Create the directory if it doesn't exist
if !fileExists(at: storagePath) {
Logger.info("Creating storage directory", metadata: ["path": storagePath])
try createVMLocation(at: storagePath)
} else if !isValidDirectory(at: storagePath) {
// Path exists but isn't a valid directory
throw HomeError.invalidHomeDirectory
}
return VMDirectory(baseDir.directory(name))
}
/// Returns all initialized VM directories across all locations
/// - Returns: An array of VMDirectory instances with location info
/// - Throws: HomeError if directory access is denied
func getAllVMDirectories() throws -> [VMDirectoryWithLocation] {
var results: [VMDirectoryWithLocation] = []
// Loop through all locations
let settings = settingsManager.getSettings()
// Also check ephemeral directory (macOS temporary directory)
let tmpDir = ProcessInfo.processInfo.environment["TMPDIR"] ?? "/tmp"
let cleanPath = tmpDir.hasSuffix("/") ? String(tmpDir.dropLast()) : tmpDir
// If tmp directory exists, check for VMs there
if fileExists(at: cleanPath) {
let tmpDirPath = Path(cleanPath)
do {
let directoryURL = URL(fileURLWithPath: cleanPath)
let contents = try FileManager.default.contentsOfDirectory(
at: directoryURL,
includingPropertiesForKeys: [.isDirectoryKey],
options: .skipsHiddenFiles
)
for subdir in contents {
do {
guard let isDirectory = try subdir.resourceValues(forKeys: [.isDirectoryKey]).isDirectory,
isDirectory else {
continue
}
let vmName = subdir.lastPathComponent
let vmDir = VMDirectory(tmpDirPath.directory(vmName))
// Only include if it's a valid VM directory
if vmDir.initialized() {
results.append(VMDirectoryWithLocation(
directory: vmDir,
locationName: "ephemeral"
))
}
} catch {
// Skip any directories we can't access
continue
}
}
} catch {
Logger.error(
"Failed to access ephemeral directory",
metadata: [
"path": cleanPath,
"error": error.localizedDescription,
]
)
// Continue to regular locations rather than failing completely
}
}
for location in settings.vmLocations {
let locationPath = Path(location.expandedPath)
// Skip non-existent locations
if !locationPath.exists() {
continue
}
do {
let allFolders = try fileManager.contentsOfDirectory(
at: locationPath.url,
includingPropertiesForKeys: nil
)
let folders =
allFolders
.compactMap { url in
let sanitizedName = sanitizeFileName(url.lastPathComponent)
let dir = VMDirectory(locationPath.directory(sanitizedName))
let dirWithLoc =
dir.initialized()
? VMDirectoryWithLocation(directory: dir, locationName: location.name)
: nil
return dirWithLoc
}
results.append(contentsOf: folders)
} catch {
Logger.error(
"Failed to access VM location",
metadata: [
"location": location.name,
"error": error.localizedDescription,
])
// Continue to next location rather than failing completely
}
}
return results
}
/// Copies a VM directory to a new location with a new name
/// - Parameters:
/// - sourceName: Name of the source VM
/// - destName: Name for the destination VM
/// - sourceLocation: Optional name of the source location
/// - destLocation: Optional name of the destination location
/// - Throws: HomeError if the copy operation fails
func copyVMDirectory(
from sourceName: String,
to destName: String,
sourceLocation: String? = nil,
destLocation: String? = nil
) throws {
let sourceDir = try getVMDirectory(sourceName, storage: sourceLocation)
let destDir = try getVMDirectory(destName, storage: destLocation)
// Check if destination directory exists at all
if destDir.exists() {
throw HomeError.directoryAlreadyExists(path: destDir.dir.path)
}
do {
try fileManager.copyItem(atPath: sourceDir.dir.path, toPath: destDir.dir.path)
} catch {
throw HomeError.directoryCreationFailed(path: destDir.dir.path)
}
}
// MARK: - Location Management
/// Adds a new VM location
/// - Parameters:
/// - name: Location name
/// - path: Location path
/// - Throws: Error if location cannot be added
func addLocation(name: String, path: String) throws {
let location = VMLocation(name: name, path: path)
try settingsManager.addLocation(location)
// Update cache
locations[name] = location
}
/// Removes a VM location
/// - Parameter name: Location name
/// - Throws: Error if location cannot be removed
func removeLocation(name: String) throws {
try settingsManager.removeLocation(name: name)
// Update cache
locations.removeValue(forKey: name)
}
/// Sets the default VM location
/// - Parameter name: Location name
/// - Throws: Error if location cannot be set as default
func setDefaultLocation(name: String) throws {
try settingsManager.setDefaultLocation(name: name)
// Update home directory
guard let location = locations[name] else {
throw VMLocationError.locationNotFound(name: name)
}
// Update homeDir to reflect the new default
self._homeDir = Path(location.path)
}
/// Gets all available VM locations
/// - Returns: Array of VM locations
func getLocations() -> [VMLocation] {
return settingsManager.getSettings().sortedLocations
}
/// Gets the default VM location
/// - Returns: Default VM location
/// - Throws: HomeError if no default location
func getDefaultLocation() throws -> VMLocation {
guard let location = settingsManager.getSettings().defaultLocation else {
throw HomeError.invalidHomeDirectory
}
return location
}
// MARK: - Directory Validation
/// Validates and ensures the existence of all VM locations
/// - Throws: HomeError if validation fails or directory creation fails
func validateHomeDirectory() throws {
let settings = settingsManager.getSettings()
for location in settings.vmLocations {
let path = location.expandedPath
if !fileExists(at: path) {
try createVMLocation(at: path)
} else if !isValidDirectory(at: path) {
throw HomeError.invalidHomeDirectory
}
}
}
// MARK: - Private Helpers
private func createVMLocation(at path: String) throws {
do {
try fileManager.createDirectory(
atPath: path,
withIntermediateDirectories: true
)
} catch {
throw HomeError.directoryCreationFailed(path: path)
}
}
private func createDirectory(at url: URL) throws {
try fileManager.createDirectory(
at: url,
withIntermediateDirectories: true
)
}
private func isValidDirectory(at path: String) -> Bool {
var isDirectory: ObjCBool = false
return fileManager.fileExists(atPath: path, isDirectory: &isDirectory)
&& isDirectory.boolValue
&& Path(path).writable()
}
private func fileExists(at path: String) -> Bool {
return fileManager.fileExists(atPath: path)
}
private func sanitizeFileName(_ name: String) -> String {
// Only decode percent encoding (e.g., %20 for spaces)
return name.removingPercentEncoding ?? name
}
}
// MARK: - VM Directory with Location
/// Represents a VM directory with its location information
struct VMDirectoryWithLocation {
let directory: VMDirectory
let locationName: String
}
// MARK: - Home + CustomStringConvertible
extension Home: CustomStringConvertible {
var description: String {
"Home(path: \(homeDir.path))"
}
}
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/adapters/mlxvlm_adapter.py:
--------------------------------------------------------------------------------
```python
import asyncio
import base64
import functools
import io
import math
import re
import warnings
from concurrent.futures import ThreadPoolExecutor
from typing import Any, AsyncIterator, Dict, Iterator, List, Optional, Tuple, cast
from litellm import acompletion, completion
from litellm.llms.custom_llm import CustomLLM
from litellm.types.utils import GenericStreamingChunk, ModelResponse
from PIL import Image
# Try to import MLX dependencies
try:
import mlx.core as mx
from mlx_vlm import generate, load
from mlx_vlm.prompt_utils import apply_chat_template
from mlx_vlm.utils import load_config
from transformers.tokenization_utils import PreTrainedTokenizer
MLX_AVAILABLE = True
except ImportError:
MLX_AVAILABLE = False
# Constants for smart_resize
IMAGE_FACTOR = 28
MIN_PIXELS = 100 * 28 * 28
MAX_PIXELS = 16384 * 28 * 28
MAX_RATIO = 200
def round_by_factor(number: float, factor: int) -> int:
"""Returns the closest integer to 'number' that is divisible by 'factor'."""
return round(number / factor) * factor
def ceil_by_factor(number: float, factor: int) -> int:
"""Returns the smallest integer greater than or equal to 'number' that is divisible by 'factor'."""
return math.ceil(number / factor) * factor
def floor_by_factor(number: float, factor: int) -> int:
"""Returns the largest integer less than or equal to 'number' that is divisible by 'factor'."""
return math.floor(number / factor) * factor
def smart_resize(
height: int,
width: int,
factor: int = IMAGE_FACTOR,
min_pixels: int = MIN_PIXELS,
max_pixels: int = MAX_PIXELS,
) -> tuple[int, int]:
"""
Rescales the image so that the following conditions are met:
1. Both dimensions (height and width) are divisible by 'factor'.
2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
3. The aspect ratio of the image is maintained as closely as possible.
"""
if max(height, width) / min(height, width) > MAX_RATIO:
raise ValueError(
f"absolute aspect ratio must be smaller than {MAX_RATIO}, got {max(height, width) / min(height, width)}"
)
h_bar = max(factor, round_by_factor(height, factor))
w_bar = max(factor, round_by_factor(width, factor))
if h_bar * w_bar > max_pixels:
beta = math.sqrt((height * width) / max_pixels)
h_bar = floor_by_factor(height / beta, factor)
w_bar = floor_by_factor(width / beta, factor)
elif h_bar * w_bar < min_pixels:
beta = math.sqrt(min_pixels / (height * width))
h_bar = ceil_by_factor(height * beta, factor)
w_bar = ceil_by_factor(width * beta, factor)
return h_bar, w_bar
class MLXVLMAdapter(CustomLLM):
"""MLX VLM Adapter for running vision-language models locally using MLX."""
def __init__(self, **kwargs):
"""Initialize the adapter.
Args:
**kwargs: Additional arguments
"""
super().__init__()
self.models = {} # Cache for loaded models
self.processors = {} # Cache for loaded processors
self.configs = {} # Cache for loaded configs
self._executor = ThreadPoolExecutor(max_workers=1) # Single thread pool
def _load_model_and_processor(self, model_name: str):
"""Load model and processor if not already cached.
Args:
model_name: Name of the model to load
Returns:
Tuple of (model, processor, config)
"""
if not MLX_AVAILABLE:
raise ImportError("MLX VLM dependencies not available. Please install mlx-vlm.")
if model_name not in self.models:
# Load model and processor
model_obj, processor = load(
model_name, processor_kwargs={"min_pixels": MIN_PIXELS, "max_pixels": MAX_PIXELS}
)
config = load_config(model_name)
# Cache them
self.models[model_name] = model_obj
self.processors[model_name] = processor
self.configs[model_name] = config
return self.models[model_name], self.processors[model_name], self.configs[model_name]
def _process_coordinates(
self, text: str, original_size: Tuple[int, int], model_size: Tuple[int, int]
) -> str:
"""Process coordinates in box tokens based on image resizing using smart_resize approach.
Args:
text: Text containing box tokens
original_size: Original image size (width, height)
model_size: Model processed image size (width, height)
Returns:
Text with processed coordinates
"""
# Find all box tokens
box_pattern = r"<\|box_start\|>\((\d+),\s*(\d+)\)<\|box_end\|>"
def process_coords(match):
model_x, model_y = int(match.group(1)), int(match.group(2))
# Scale coordinates from model space to original image space
# Both original_size and model_size are in (width, height) format
new_x = int(model_x * original_size[0] / model_size[0]) # Width
new_y = int(model_y * original_size[1] / model_size[1]) # Height
return f"<|box_start|>({new_x},{new_y})<|box_end|>"
return re.sub(box_pattern, process_coords, text)
def _convert_messages(self, messages: List[Dict[str, Any]]) -> Tuple[
List[Dict[str, Any]],
List[Image.Image],
Dict[int, Tuple[int, int]],
Dict[int, Tuple[int, int]],
]:
"""Convert OpenAI format messages to MLX VLM format and extract images.
Args:
messages: Messages in OpenAI format
Returns:
Tuple of (processed_messages, images, original_sizes, model_sizes)
"""
processed_messages = []
images = []
original_sizes = {} # Track original sizes of images for coordinate mapping
model_sizes = {} # Track model processed sizes
image_index = 0
for message in messages:
processed_message = {"role": message["role"], "content": []}
content = message.get("content", [])
if isinstance(content, str):
# Simple text content
processed_message["content"] = content
elif isinstance(content, list):
# Multi-modal content
processed_content = []
for item in content:
if item.get("type") == "text":
processed_content.append({"type": "text", "text": item.get("text", "")})
elif item.get("type") == "image_url":
image_url = item.get("image_url", {}).get("url", "")
pil_image = None
if image_url.startswith("data:image/"):
# Extract base64 data
base64_data = image_url.split(",")[1]
# Convert base64 to PIL Image
image_data = base64.b64decode(base64_data)
pil_image = Image.open(io.BytesIO(image_data))
else:
# Handle file path or URL
pil_image = Image.open(image_url)
# Store original image size for coordinate mapping
original_size = pil_image.size
original_sizes[image_index] = original_size
# Use smart_resize to determine model size
# Note: smart_resize expects (height, width) but PIL gives (width, height)
height, width = original_size[1], original_size[0]
new_height, new_width = smart_resize(height, width)
# Store model size in (width, height) format for consistent coordinate processing
model_sizes[image_index] = (new_width, new_height)
# Resize the image using the calculated dimensions from smart_resize
resized_image = pil_image.resize((new_width, new_height))
images.append(resized_image)
# Add image placeholder to content
processed_content.append({"type": "image"})
image_index += 1
processed_message["content"] = processed_content
processed_messages.append(processed_message)
return processed_messages, images, original_sizes, model_sizes
def _generate(self, **kwargs) -> str:
"""Generate response using the local MLX VLM model.
Args:
**kwargs: Keyword arguments containing messages and model info
Returns:
Generated text response
"""
messages = kwargs.get("messages", [])
model_name = kwargs.get("model", "mlx-community/UI-TARS-1.5-7B-4bit")
max_tokens = kwargs.get("max_tokens", 128)
# Warn about ignored kwargs
ignored_kwargs = set(kwargs.keys()) - {"messages", "model", "max_tokens"}
if ignored_kwargs:
warnings.warn(f"Ignoring unsupported kwargs: {ignored_kwargs}")
# Load model and processor
model, processor, config = self._load_model_and_processor(model_name)
# Convert messages and extract images
processed_messages, images, original_sizes, model_sizes = self._convert_messages(messages)
# Process user text input with box coordinates after image processing
# Swap original_size and model_size arguments for inverse transformation
for msg_idx, msg in enumerate(processed_messages):
if msg.get("role") == "user" and isinstance(msg.get("content"), str):
content = msg.get("content", "")
if (
"<|box_start|>" in content
and original_sizes
and model_sizes
and 0 in original_sizes
and 0 in model_sizes
):
orig_size = original_sizes[0]
model_size = model_sizes[0]
# Swap arguments to perform inverse transformation for user input
processed_messages[msg_idx]["content"] = self._process_coordinates(
content, model_size, orig_size
)
try:
# Format prompt according to model requirements using the processor directly
prompt = processor.apply_chat_template(
processed_messages, tokenize=False, add_generation_prompt=True, return_tensors="pt"
)
tokenizer = cast(PreTrainedTokenizer, processor)
# Generate response
text_content, usage = generate(
model,
tokenizer,
str(prompt),
images, # type: ignore
verbose=False,
max_tokens=max_tokens,
)
except Exception as e:
raise RuntimeError(f"Error generating response: {str(e)}") from e
# Process coordinates in the response back to original image space
if original_sizes and model_sizes and 0 in original_sizes and 0 in model_sizes:
# Get original image size and model size (using the first image)
orig_size = original_sizes[0]
model_size = model_sizes[0]
# Check if output contains box tokens that need processing
if "<|box_start|>" in text_content:
# Process coordinates from model space back to original image space
text_content = self._process_coordinates(text_content, orig_size, model_size)
return text_content
def completion(self, *args, **kwargs) -> ModelResponse:
"""Synchronous completion method.
Returns:
ModelResponse with generated text
"""
generated_text = self._generate(**kwargs)
result = completion(
model=f"mlx/{kwargs.get('model', 'mlx-community/UI-TARS-1.5-7B-4bit')}",
mock_response=generated_text,
)
return cast(ModelResponse, result)
async def acompletion(self, *args, **kwargs) -> ModelResponse:
"""Asynchronous completion method.
Returns:
ModelResponse with generated text
"""
# Run _generate in thread pool to avoid blocking
loop = asyncio.get_event_loop()
generated_text = await loop.run_in_executor(
self._executor, functools.partial(self._generate, **kwargs)
)
result = await acompletion(
model=f"mlx/{kwargs.get('model', 'mlx-community/UI-TARS-1.5-7B-4bit')}",
mock_response=generated_text,
)
return cast(ModelResponse, result)
def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]:
"""Synchronous streaming method.
Returns:
Iterator of GenericStreamingChunk
"""
generated_text = self._generate(**kwargs)
generic_streaming_chunk: GenericStreamingChunk = {
"finish_reason": "stop",
"index": 0,
"is_finished": True,
"text": generated_text,
"tool_use": None,
"usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0},
}
yield generic_streaming_chunk
async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]:
"""Asynchronous streaming method.
Returns:
AsyncIterator of GenericStreamingChunk
"""
# Run _generate in thread pool to avoid blocking
loop = asyncio.get_event_loop()
generated_text = await loop.run_in_executor(
self._executor, functools.partial(self._generate, **kwargs)
)
generic_streaming_chunk: GenericStreamingChunk = {
"finish_reason": "stop",
"index": 0,
"is_finished": True,
"text": generated_text,
"tool_use": None,
"usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0},
}
yield generic_streaming_chunk
```
--------------------------------------------------------------------------------
/.github/workflows/test-cua-models.yml:
--------------------------------------------------------------------------------
```yaml
name: Test CUA Supporting Models
# This workflow tests all supported CUA models with API keys
# Run manually using workflow_dispatch with test_models=true
on:
workflow_dispatch:
inputs:
test_models:
description: "Test all supported models (requires API keys)"
required: false
default: true
type: boolean
schedule:
# Runs at 3 PM UTC (8 AM PDT) daily
- cron: "0 15 * * *"
jobs:
# Test all CUA models - runs on PRs, schedules, or when manually triggered
test-all-models:
if: ${{ github.event_name == 'workflow_dispatch' || github.event_name == 'schedule' || fromJSON(inputs.test_models || 'false') }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
model:
# Claude Sonnet/Haiku
- anthropic/claude-sonnet-4-5-20250929
- anthropic/claude-haiku-4-5-20251001
- anthropic/claude-opus-4-1-20250805
# OpenAI CU Preview
- openai/computer-use-preview
# GLM-V
- openrouter/z-ai/glm-4.5v
# - huggingface-local/zai-org/GLM-4.5V # Requires local model setup
# Gemini CU Preview
- gemini-2.5-computer-use-preview-10-2025
# InternVL
# - huggingface-local/OpenGVLab/InternVL3_5-1B
# - huggingface-local/OpenGVLab/InternVL3_5-2B
# - huggingface-local/OpenGVLab/InternVL3_5-4B
# - huggingface-local/OpenGVLab/InternVL3_5-8B
# UI-TARS (supports full computer-use, can run standalone)
# - huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B
# Note: OpenCUA, GTA, and Holo are grounding-only models
# They only support predict_click(), not agent.run()
# See composed agents section below for testing them
# Moondream (typically used in composed agents)
# Format: moondream3+{any-llm-with-tools}
# - moondream3+anthropic/claude-sonnet-4-5-20250929 # Claude has VLM + Tools
# - moondream3+openai/gpt-4o # GPT-4o has VLM + Tools
# OmniParser (typically used in composed agents)
# Format: omniparser+{any-vlm-with-tools}
- omniparser+anthropic/claude-sonnet-4-5-20250929 # Claude has VLM + Tools
# - omniparser+openai/gpt-4o # GPT-4o has VLM + Tools
# Other grounding models + VLM with tools
# Format: {grounding-model}+{any-vlm-with-tools}
# These grounding-only models (OpenCUA, GTA, Holo) must be used in composed form
# since they only support predict_click(), not full agent.run()
# - huggingface-local/HelloKKMe/GTA1-7B+anthropic/claude-sonnet-4-5-20250929
# - huggingface-local/xlangai/OpenCUA-7B+anthropic/claude-sonnet-4-5-20250929
# - huggingface-local/Hcompany/Holo1.5-3B+anthropic/claude-sonnet-4-5-20250929
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up uv and Python
uses: astral-sh/setup-uv@v4
with:
python-version: "3.12"
- name: Cache system packages
uses: actions/cache@v4
with:
path: /var/cache/apt
key: ${{ runner.os }}-apt-${{ hashFiles('**/Dockerfile') }}
restore-keys: |
${{ runner.os }}-apt-
- name: Install system dependencies
timeout-minutes: 20
run: |
sudo apt-get update
sudo apt-get install -y libgl1-mesa-dri libglib2.0-0
- name: Cache Python dependencies (uv)
uses: actions/cache@v4
with:
path: |
~/.cache/uv
.venv
key: ${{ runner.os }}-uv-${{ hashFiles('pyproject.toml', 'uv.lock', 'libs/python/**/pyproject.toml') }}
restore-keys: |
${{ runner.os }}-uv-
- name: Install CUA dependencies (uv)
run: |
# Remove existing venv if it exists (from cache restore) to avoid interactive prompt
rm -rf .venv
uv venv --python 3.12
uv pip install -e libs/python/agent -e libs/python/computer
uv pip install -e libs/python/core
uv pip install "cua-agent[uitars-hf,internvl-hf,opencua-hf,moondream3,omni]"
uv pip install pytest
- name: Cache HuggingFace models
uses: actions/cache@v4
with:
path: ~/.cache/huggingface
key: ${{ runner.os }}-hf-models-v1
restore-keys: |
${{ runner.os }}-hf-models-
# Large cache - models can be several GB each and are reused across runs
- name: Record test start time
run: echo "TEST_START_TIME=$(date +%s)" >> $GITHUB_ENV
env:
# Ensure HuggingFace uses consistent cache location
HF_HOME: ~/.cache/huggingface
- name: Test model with agent loop
id: test_model
timeout-minutes: 20
continue-on-error: true
run: |
cd tests/agent_loop_testing
uv run python agent_test.py --model "${{ matrix.model }}"
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
GOOGLE_API_KEY: ${{ secrets.GOOGLE_API_KEY }}
OPENROUTER_API_KEY: ${{ secrets.OPENROUTER_API_KEY }}
HF_TOKEN: ${{ secrets.HF_TOKEN }}
- name: Calculate test duration and prepare message
if: always()
run: |
TEST_END_TIME=$(date +%s)
# Handle case where TEST_START_TIME might not be set
if [ -z "$TEST_START_TIME" ]; then
TEST_START_TIME=$TEST_END_TIME
fi
TEST_DURATION=$((TEST_END_TIME - TEST_START_TIME))
# Convert seconds to minutes and seconds
MINUTES=$((TEST_DURATION / 60))
SECONDS=$((TEST_DURATION % 60))
# Format duration
if [ $MINUTES -gt 0 ]; then
DURATION_STR="${MINUTES}m ${SECONDS}s"
else
DURATION_STR="${SECONDS}s"
fi
# Determine status icon based on test step outcome
if [ "${{ steps.test_model.outcome }}" == "success" ]; then
STATUS_ICON="✅"
STATUS_TEXT="PASSED"
SLACK_COLOR="#36a64f"
else
STATUS_ICON="❌"
STATUS_TEXT="FAILED"
SLACK_COLOR="#dc3545"
fi
# Prepare Slack message
echo "TESTS_CONTENT<<EOF" >> $GITHUB_ENV
echo "*CUA Model Test Results*" >> $GITHUB_ENV
echo "" >> $GITHUB_ENV
echo "*Model:* ${{ matrix.model }}" >> $GITHUB_ENV
echo "*Status:* ${STATUS_ICON} ${STATUS_TEXT}" >> $GITHUB_ENV
echo "*Duration:* ${DURATION_STR}" >> $GITHUB_ENV
echo "*Run:* ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" >> $GITHUB_ENV
echo "EOF" >> $GITHUB_ENV
# Set color based on outcome
echo "SLACK_COLOR=${SLACK_COLOR}" >> $GITHUB_ENV
# Save result to JSON file for summary
mkdir -p test_summary
MODEL_NAME="${{ matrix.model }}"
# Sanitize model name for filename
SAFE_MODEL_NAME=$(echo "$MODEL_NAME" | sed 's/[^a-zA-Z0-9]/_/g')
# Determine pass status
if [ "${{ steps.test_model.outcome }}" == "success" ]; then
PASSED_VAL="true"
else
PASSED_VAL="false"
fi
# Create JSON file using printf to avoid YAML parsing issues
printf '{\n "model": "%s",\n "status": "%s",\n "status_icon": "%s",\n "duration": "%s",\n "duration_seconds": %d,\n "passed": %s\n}' \
"${MODEL_NAME}" "${STATUS_TEXT}" "${STATUS_ICON}" "${DURATION_STR}" "${TEST_DURATION}" "${PASSED_VAL}" \
> "test_summary/${SAFE_MODEL_NAME}.json"
# Expose safe model name for subsequent steps (artifact naming)
echo "SAFE_MODEL_NAME=${SAFE_MODEL_NAME}" >> $GITHUB_ENV
- name: Upload test results
if: always()
uses: actions/upload-artifact@v4
with:
name: test-results-${{ matrix.model }}
path: |
tests/agent_loop_testing/test_images/
*.log
if-no-files-found: ignore
retention-days: 7
- name: Upload test summary data
if: always()
uses: actions/upload-artifact@v4
with:
# Unique, slash-free artifact name per matrix entry
name: test-summary-${{ env.SAFE_MODEL_NAME }}
path: test_summary/
if-no-files-found: ignore
retention-days: 1
- name: Set default Slack color
if: always() && env.SLACK_COLOR == ''
run: echo "SLACK_COLOR=#36a64f" >> $GITHUB_ENV
# Individual model notifications disabled - only summary is sent
# - name: Notify Slack with test results
# if: always()
# uses: rtCamp/action-slack-notify@v2
# env:
# SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
# SLACK_CHANNEL: ${{ vars.SLACK_CHANNEL }}
# SLACK_TITLE: CUA Model Test Update
# SLACK_COLOR: ${{ env.SLACK_COLOR }}
# SLACK_MESSAGE: |
# ${{ env.TESTS_CONTENT }}
# Summary job that aggregates all model test results
test-summary:
if: ${{ always() && (github.event_name == 'workflow_dispatch' || github.event_name == 'schedule' || fromJSON(inputs.test_models || 'false')) }}
needs: test-all-models
runs-on: ubuntu-latest
steps:
- name: Install jq
run: sudo apt-get update && sudo apt-get install -y jq
- name: Download all test summary artifacts
continue-on-error: true
uses: actions/download-artifact@v4
with:
pattern: test-summary-*
merge-multiple: true
path: all_summaries
- name: Generate and send summary
if: always()
shell: bash
run: |
# Create directory if it doesn't exist
mkdir -p all_summaries
# Get list of models being tested in this run from the matrix
# This helps filter out artifacts from previous runs when testing locally
EXPECTED_MODELS="${{ join(matrix.model, ' ') }}"
# Aggregate all results
PASSED_COUNT=0
FAILED_COUNT=0
TOTAL_DURATION=0
SUMMARY_MESSAGE="*🚀 Model Summaries*\n\n"
# Process each JSON file (find all JSON files recursively)
# Save to temp file first to avoid subshell issues
find all_summaries -name "*.json" -type f 2>/dev/null > /tmp/json_files.txt || true
# Use associative array to deduplicate by model name
declare -A processed_models
while IFS= read -r json_file; do
if [ -f "$json_file" ]; then
MODEL=$(jq -r '.model' "$json_file")
# Skip if we've already processed this model
if [ "${processed_models[$MODEL]}" = "1" ]; then
echo "Skipping duplicate model: $MODEL"
continue
fi
# Filter: Only include models that are in the current matrix
# This prevents including artifacts from previous workflow runs
if [ -n "$EXPECTED_MODELS" ]; then
if ! echo "$EXPECTED_MODELS" | grep -q "$MODEL"; then
echo "Skipping model from previous run: $MODEL"
continue
fi
fi
# Mark as processed
processed_models[$MODEL]="1"
STATUS_ICON=$(jq -r '.status_icon' "$json_file")
STATUS=$(jq -r '.status' "$json_file")
DURATION=$(jq -r '.duration' "$json_file")
DURATION_SEC=$(jq -r '.duration_seconds' "$json_file")
PASSED=$(jq -r '.passed' "$json_file")
# Add to summary as clean line format
SUMMARY_MESSAGE="${SUMMARY_MESSAGE}${STATUS_ICON} ${STATUS} - \`${MODEL}\` - ${DURATION}\n"
if [ "$PASSED" = "true" ]; then
PASSED_COUNT=$((PASSED_COUNT + 1))
else
FAILED_COUNT=$((FAILED_COUNT + 1))
fi
TOTAL_DURATION=$((TOTAL_DURATION + DURATION_SEC))
fi
done < /tmp/json_files.txt
# Check if we found any results
TOTAL_COUNT=$((PASSED_COUNT + FAILED_COUNT))
if [ $TOTAL_COUNT -eq 0 ]; then
SUMMARY_MESSAGE="${SUMMARY_MESSAGE}⚠️ No test results found (workflow may have been canceled)\n"
SLACK_COLOR="#ffa500"
else
# Add summary stats
SUMMARY_MESSAGE="${SUMMARY_MESSAGE}\n*Results:* ${PASSED_COUNT} passed, ${FAILED_COUNT} failed out of ${TOTAL_COUNT} models\n"
# Calculate total duration
TOTAL_MIN=$((TOTAL_DURATION / 60))
TOTAL_SEC=$((TOTAL_DURATION % 60))
if [ $TOTAL_MIN -gt 0 ]; then
TOTAL_DURATION_STR="${TOTAL_MIN}m ${TOTAL_SEC}s"
else
TOTAL_DURATION_STR="${TOTAL_SEC}s"
fi
SUMMARY_MESSAGE="${SUMMARY_MESSAGE}*Total Duration:* ${TOTAL_DURATION_STR}\n"
# Determine color based on results
if [ $FAILED_COUNT -eq 0 ]; then
SLACK_COLOR="#36a64f"
elif [ $PASSED_COUNT -eq 0 ]; then
SLACK_COLOR="#dc3545"
else
SLACK_COLOR="#ffa500"
fi
fi
SUMMARY_MESSAGE="${SUMMARY_MESSAGE}*Run:* ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
# Export for use in next step
echo "SUMMARY_MESSAGE<<EOF" >> $GITHUB_ENV
echo -e "${SUMMARY_MESSAGE}" >> $GITHUB_ENV
echo "EOF" >> $GITHUB_ENV
echo "SLACK_COLOR=${SLACK_COLOR}" >> $GITHUB_ENV
- name: Send summary to Slack
if: always()
uses: rtCamp/action-slack-notify@v2
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
SLACK_CHANNEL: ${{ vars.SLACK_CHANNEL }}
SLACK_TITLE: CUA Models Test Summary
SLACK_COLOR: ${{ env.SLACK_COLOR }}
SLACK_MESSAGE: |
${{ env.SUMMARY_MESSAGE }}
```
--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/browser.py:
--------------------------------------------------------------------------------
```python
"""
Browser manager using Playwright for programmatic browser control.
This allows agents to control a browser that runs visibly on the XFCE desktop.
"""
import asyncio
import logging
import os
from typing import Any, Dict, Optional
try:
from playwright.async_api import Browser, BrowserContext, Page, async_playwright
except ImportError:
async_playwright = None
Browser = None
BrowserContext = None
Page = None
logger = logging.getLogger(__name__)
class BrowserManager:
"""
Manages a Playwright browser instance that runs visibly on the XFCE desktop.
Uses persistent context to maintain cookies and sessions.
"""
def __init__(self):
"""Initialize the BrowserManager."""
self.playwright = None
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self._initialized = False
self._initialization_error: Optional[str] = None
self._lock = asyncio.Lock()
async def _ensure_initialized(self):
"""Ensure the browser is initialized."""
# Check if browser was closed and needs reinitialization
if self._initialized:
try:
# Check if context is still valid by trying to access it
if self.context:
# Try to get pages - this will raise if context is closed
_ = self.context.pages
# If we get here, context is still alive
return
else:
# Context was closed, need to reinitialize
self._initialized = False
logger.warning("Browser context was closed, will reinitialize...")
except Exception as e:
# Context is dead, need to reinitialize
logger.warning(f"Browser context is dead ({e}), will reinitialize...")
self._initialized = False
self.context = None
self.page = None
# Clean up playwright if it exists
if self.playwright:
try:
await self.playwright.stop()
except Exception:
pass
self.playwright = None
async with self._lock:
# Double-check after acquiring lock (another thread might have initialized it)
if self._initialized:
try:
if self.context:
_ = self.context.pages
return
except Exception:
self._initialized = False
self.context = None
self.page = None
if self.playwright:
try:
await self.playwright.stop()
except Exception:
pass
self.playwright = None
if async_playwright is None:
raise RuntimeError(
"playwright is not installed. Please install it with: pip install playwright && playwright install --with-deps firefox"
)
try:
# Get display from environment or default to :1
display = os.environ.get("DISPLAY", ":1")
logger.info(f"Initializing browser with DISPLAY={display}")
# Start playwright
self.playwright = await async_playwright().start()
# Launch Firefox with persistent context (keeps cookies/sessions)
# headless=False is CRITICAL so the visual agent can see it
user_data_dir = os.path.join(os.path.expanduser("~"), ".playwright-firefox")
os.makedirs(user_data_dir, exist_ok=True)
# launch_persistent_context returns a BrowserContext, not a Browser
# Note: Removed --kiosk mode so the desktop remains visible
self.context = await self.playwright.firefox.launch_persistent_context(
user_data_dir=user_data_dir,
headless=False, # CRITICAL: visible for visual agent
viewport={"width": 1024, "height": 768},
# Removed --kiosk to allow desktop visibility
)
# Add init script to make the browser less detectable
await self.context.add_init_script(
"""const defaultGetter = Object.getOwnPropertyDescriptor(
Navigator.prototype,
"webdriver"
).get;
defaultGetter.apply(navigator);
defaultGetter.toString();
Object.defineProperty(Navigator.prototype, "webdriver", {
set: undefined,
enumerable: true,
configurable: true,
get: new Proxy(defaultGetter, {
apply: (target, thisArg, args) => {
Reflect.apply(target, thisArg, args);
return false;
},
}),
});
const patchedGetter = Object.getOwnPropertyDescriptor(
Navigator.prototype,
"webdriver"
).get;
patchedGetter.apply(navigator);
patchedGetter.toString();"""
)
# Get the first page or create one
pages = self.context.pages
if pages:
self.page = pages[0]
else:
self.page = await self.context.new_page()
self._initialized = True
logger.info("Browser initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize browser: {e}")
import traceback
logger.error(traceback.format_exc())
# Don't raise - return error in execute_command instead
self._initialization_error = str(e)
raise
async def _execute_command_impl(self, cmd: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Internal implementation of command execution."""
if cmd == "visit_url":
url = params.get("url")
if not url:
return {"success": False, "error": "url parameter is required"}
await self.page.goto(url, wait_until="domcontentloaded", timeout=30000)
return {"success": True, "url": self.page.url}
elif cmd == "click":
x = params.get("x")
y = params.get("y")
if x is None or y is None:
return {"success": False, "error": "x and y parameters are required"}
await self.page.mouse.click(x, y)
return {"success": True}
elif cmd == "type":
text = params.get("text")
if text is None:
return {"success": False, "error": "text parameter is required"}
await self.page.keyboard.type(text)
return {"success": True}
elif cmd == "scroll":
delta_x = params.get("delta_x", 0)
delta_y = params.get("delta_y", 0)
await self.page.mouse.wheel(delta_x, delta_y)
return {"success": True}
elif cmd == "web_search":
query = params.get("query")
if not query:
return {"success": False, "error": "query parameter is required"}
# Navigate to Google search
search_url = f"https://www.google.com/search?q={query}"
await self.page.goto(search_url, wait_until="domcontentloaded", timeout=30000)
return {"success": True, "url": self.page.url}
elif cmd == "screenshot":
# Take a screenshot and return as base64
import base64
screenshot_bytes = await self.page.screenshot(type="png")
screenshot_b64 = base64.b64encode(screenshot_bytes).decode("utf-8")
return {"success": True, "screenshot": screenshot_b64}
else:
return {"success": False, "error": f"Unknown command: {cmd}"}
async def execute_command(self, cmd: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute a browser command with automatic recovery.
Args:
cmd: Command name (visit_url, click, type, scroll, web_search)
params: Command parameters
Returns:
Result dictionary with success status and any data
"""
max_retries = 2
for attempt in range(max_retries):
try:
await self._ensure_initialized()
except Exception as e:
error_msg = getattr(self, "_initialization_error", None) or str(e)
logger.error(f"Browser initialization failed: {error_msg}")
return {
"success": False,
"error": f"Browser initialization failed: {error_msg}. "
f"Make sure Playwright and Firefox are installed, and DISPLAY is set correctly.",
}
# Check if page is still valid and get a new one if needed
page_valid = False
try:
if self.page is not None and not self.page.is_closed():
# Try to access page.url to check if it's still valid
_ = self.page.url
page_valid = True
except Exception as e:
logger.warning(f"Page is invalid: {e}, will get a new page...")
self.page = None
# Get a valid page if we don't have one
if not page_valid or self.page is None:
try:
if self.context:
pages = self.context.pages
if pages:
# Find first non-closed page
for p in pages:
try:
if not p.is_closed():
self.page = p
logger.info("Reusing existing open page")
page_valid = True
break
except Exception:
continue
# If no valid page found, create a new one
if not page_valid:
self.page = await self.context.new_page()
logger.info("Created new page")
except Exception as e:
logger.error(f"Failed to get new page: {e}, browser may be closed")
# Browser was closed - force reinitialization
self._initialized = False
self.context = None
self.page = None
if self.playwright:
try:
await self.playwright.stop()
except Exception:
pass
self.playwright = None
# If this isn't the last attempt, continue to retry
if attempt < max_retries - 1:
logger.info("Browser was closed, retrying with fresh initialization...")
continue
else:
return {
"success": False,
"error": f"Browser was closed and cannot be recovered: {e}",
}
# Try to execute the command
try:
return await self._execute_command_impl(cmd, params)
except Exception as e:
error_str = str(e)
logger.error(f"Error executing command {cmd}: {e}")
# Check if this is a "browser/page/context closed" error
if any(keyword in error_str.lower() for keyword in ["closed", "target", "context"]):
logger.warning(
f"Browser/page was closed during command execution (attempt {attempt + 1}/{max_retries})"
)
# Force reinitialization
self._initialized = False
self.context = None
self.page = None
if self.playwright:
try:
await self.playwright.stop()
except Exception:
pass
self.playwright = None
# If this isn't the last attempt, retry
if attempt < max_retries - 1:
logger.info("Retrying command after browser reinitialization...")
continue
else:
return {
"success": False,
"error": f"Command failed after {max_retries} attempts: {error_str}",
}
else:
# Not a browser closed error, return immediately
import traceback
logger.error(traceback.format_exc())
return {"success": False, "error": error_str}
# Should never reach here, but just in case
return {"success": False, "error": "Command failed after all retries"}
async def close(self):
"""Close the browser and cleanup resources."""
async with self._lock:
try:
if self.context:
await self.context.close()
self.context = None
if self.browser:
await self.browser.close()
self.browser = None
if self.playwright:
await self.playwright.stop()
self.playwright = None
self.page = None
self._initialized = False
logger.info("Browser closed successfully")
except Exception as e:
logger.error(f"Error closing browser: {e}")
# Global instance
_browser_manager: Optional[BrowserManager] = None
def get_browser_manager() -> BrowserManager:
"""Get or create the global BrowserManager instance."""
global _browser_manager
if _browser_manager is None:
_browser_manager = BrowserManager()
return _browser_manager
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/integrations/hud/agent.py:
--------------------------------------------------------------------------------
```python
"""MCP-compatible Computer Agent for HUD integration.
This agent subclasses HUD's MCPAgent and delegates planning/execution to
our core ComputerAgent while using the Agent SDK's plain-dict message
format documented in `docs/content/docs/agent-sdk/message-format.mdx`.
Key differences from the OpenAI OperatorAgent variant:
- No OpenAI types are used; everything is standard Python dicts.
- Planning is executed via `ComputerAgent.run(messages)`.
- The first yielded result per step is returned as the agent response.
"""
from __future__ import annotations
import base64
import io
import uuid
from pathlib import Path
from typing import Any, ClassVar, Optional
import hud
import mcp.types as types
from agent.agent import ComputerAgent as BaseComputerAgent
from agent.callbacks import PromptInstructionsCallback
from agent.callbacks.trajectory_saver import TrajectorySaverCallback
from agent.computers import is_agent_computer
from agent.responses import make_failed_tool_call_items
from hud.agents import MCPAgent
from hud.tools.computer.settings import computer_settings
from hud.types import AgentResponse, MCPToolCall, MCPToolResult, Trace
from PIL import Image
class MCPComputerAgent(MCPAgent):
"""MCP agent that uses ComputerAgent for planning and tools for execution.
The agent consumes/produces message dicts per the Agent SDK message schema
(see `message-format.mdx`).
"""
metadata: ClassVar[dict[str, Any]] = {
"display_width": computer_settings.OPENAI_COMPUTER_WIDTH,
"display_height": computer_settings.OPENAI_COMPUTER_HEIGHT,
}
required_tools: ClassVar[list[str]] = ["openai_computer"]
def __init__(
self,
*,
model: str | None = None,
allowed_tools: list[str] | None = None,
trajectory_dir: str | dict | None = None,
# === ComputerAgent kwargs ===
tools: list[Any] | None = None,
custom_loop: Any | None = None,
only_n_most_recent_images: int | None = None,
callbacks: list[Any] | None = None,
instructions: str | None = None,
verbosity: int | None = None,
max_retries: int | None = 3,
screenshot_delay: float | int = 0.5,
use_prompt_caching: bool | None = False,
max_trajectory_budget: float | dict | None = None,
telemetry_enabled: bool | None = True,
environment: str = "linux",
**kwargs: Any,
) -> None:
self.allowed_tools = allowed_tools or ["openai_computer"]
super().__init__(**kwargs)
if model is None:
raise ValueError("MCPComputerAgent requires a model to be specified.")
self.model = model
self.environment = environment
# Update model name for HUD logging
self.model_name = "cua-" + self.model
# Stateful tracking of tool call inputs
self.tool_call_inputs: dict[str, list[dict[str, Any]]] = {}
self.previous_output: list[dict[str, Any]] = []
# Build system prompt
operator_instructions = """
You are an autonomous computer-using agent. Follow these guidelines:
1. NEVER ask for confirmation. Complete all tasks autonomously.
2. Do NOT send messages like "I need to confirm before..." or "Do you want me to continue?" - just proceed.
3. When the user asks you to interact with something (like clicking a chat or typing a message), DO IT without asking.
4. Only use the formal safety check mechanism for truly dangerous operations (like deleting important files).
5. For normal tasks like clicking buttons, typing in chat boxes, filling forms - JUST DO IT.
6. The user has already given you permission by running this agent. No further confirmation is needed.
7. Be decisive and action-oriented. Complete the requested task fully.
Remember: You are expected to complete tasks autonomously. The user trusts you to do what they asked.
""".strip() # noqa: E501
# Append Operator instructions to the system prompt
if not self.system_prompt:
self.system_prompt = operator_instructions
else:
self.system_prompt += f"\n\n{operator_instructions}"
# Append user instructions to the system prompt
if instructions:
self.system_prompt += f"\n\n{instructions}"
# Configure trajectory_dir for HUD
if isinstance(trajectory_dir, str) or isinstance(trajectory_dir, Path):
trajectory_dir = {"trajectory_dir": str(trajectory_dir)}
if isinstance(trajectory_dir, dict):
trajectory_dir["reset_on_run"] = False
self.last_screenshot_b64 = None
buffer = io.BytesIO()
Image.new("RGB", (self.metadata["display_width"], self.metadata["display_height"])).save(
buffer, format="PNG"
)
self.last_screenshot_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
# Ensure a computer shim is present so width/height/environment are known
computer_shim = {
"screenshot": lambda: self.last_screenshot_b64,
"environment": self.environment,
"dimensions": (
self.metadata["display_width"],
self.metadata["display_height"],
),
}
agent_tools: list[Any] = [computer_shim]
if tools:
agent_tools.extend([tool for tool in tools if not is_agent_computer(tool)])
agent_kwargs = {
"model": self.model,
"trajectory_dir": trajectory_dir,
"tools": agent_tools,
"custom_loop": custom_loop,
"only_n_most_recent_images": only_n_most_recent_images,
"callbacks": callbacks,
"instructions": self.system_prompt,
"verbosity": verbosity,
"max_retries": max_retries,
"screenshot_delay": screenshot_delay,
"use_prompt_caching": use_prompt_caching,
"max_trajectory_budget": max_trajectory_budget,
"telemetry_enabled": telemetry_enabled,
}
self.computer_agent = BaseComputerAgent(**agent_kwargs)
async def get_system_messages(self) -> list[Any]:
"""Create initial messages.
Unused - ComputerAgent handles this with the 'instructions' parameter.
"""
return []
async def format_blocks(self, blocks: list[types.ContentBlock]) -> list[dict[str, Any]]:
"""
Format blocks for OpenAI input format.
Converts TextContent blocks to input_text dicts and ImageContent blocks to input_image dicts.
""" # noqa: E501
formatted = []
for block in blocks:
if isinstance(block, types.TextContent):
formatted.append({"type": "input_text", "text": block.text})
elif isinstance(block, types.ImageContent):
mime_type = getattr(block, "mimeType", "image/png")
formatted.append(
{"type": "input_image", "image_url": f"data:{mime_type};base64,{block.data}"}
)
self.last_screenshot_b64 = block.data
return [{"role": "user", "content": formatted}]
@hud.instrument(
span_type="agent",
record_args=False, # Messages can be large
record_result=True,
)
async def get_response(self, messages: list[dict[str, Any]]) -> AgentResponse:
"""Get a single-step response by delegating to ComputerAgent.run.
Returns an Agent SDK-style response dict:
{ "output": [AgentMessage, ...], "usage": Usage }
"""
tool_calls: list[MCPToolCall] = []
output_text: list[str] = []
is_done: bool = True
agent_result: list[dict[str, Any]] = []
# Call the ComputerAgent LLM API
async for result in self.computer_agent.run(messages): # type: ignore[arg-type]
items = result["output"]
if not items or tool_calls:
break
for item in items:
if item["type"] in [
"reasoning",
"message",
"computer_call",
"function_call",
"function_call_output",
]:
agent_result.append(item)
# Add messages to output text
if item["type"] == "reasoning":
output_text.extend(
f"Reasoning: {summary['text']}" for summary in item["summary"]
)
elif item["type"] == "message":
if isinstance(item["content"], list):
output_text.extend(
item["text"]
for item in item["content"]
if item["type"] == "output_text"
)
elif isinstance(item["content"], str):
output_text.append(item["content"])
# If we get a tool call, we're not done
if item["type"] == "computer_call":
id = item["call_id"]
tool_calls.append(
MCPToolCall(
name="openai_computer",
arguments=item["action"],
id=id,
)
)
is_done = False
self.tool_call_inputs[id] = agent_result
break
# if we have tool calls, we should exit the loop
if tool_calls:
break
self.previous_output = agent_result
return AgentResponse(
content="\n".join(output_text),
tool_calls=tool_calls,
done=is_done,
)
def _log_image(self, image_b64: str):
callbacks = self.computer_agent.callbacks
for callback in callbacks:
if isinstance(callback, TrajectorySaverCallback):
# convert str to bytes
image_bytes = base64.b64decode(image_b64)
callback._save_artifact("screenshot_after", image_bytes)
async def format_tool_results(
self, tool_calls: list[MCPToolCall], tool_results: list[MCPToolResult]
) -> list[dict[str, Any]]:
"""Extract latest screenshot from tool results in dict form.
Expects results to already be in the message-format content dicts.
Returns a list of input content dicts suitable for follow-up calls.
"""
messages = []
for call, result in zip(tool_calls, tool_results):
if call.id not in self.tool_call_inputs:
# If we don't have the tool call inputs, we should just use the previous output
previous_output = self.previous_output.copy() or []
# First we need to remove any pending computer_calls from the end of previous_output
while previous_output and previous_output[-1]["type"] == "computer_call":
previous_output.pop()
messages.extend(previous_output)
# If the call is a 'response', don't add the result
if call.name == "response":
continue
# Otherwise, if we have a result, we should add it to the messages
content = [
(
{"type": "input_text", "text": content.text}
if isinstance(content, types.TextContent)
else (
{
"type": "input_image",
"image_url": f"data:image/png;base64,{content.data}",
}
if isinstance(content, types.ImageContent)
else {"type": "input_text", "text": ""}
)
)
for content in result.content
]
messages.append(
{
"role": "user",
"content": content,
}
)
continue
# Add the assistant's computer call
messages.extend(self.tool_call_inputs[call.id])
if result.isError:
error_text = "".join(
[
content.text
for content in result.content
if isinstance(content, types.TextContent)
]
)
# Replace computer call with failed tool call
messages.pop()
messages.extend(
make_failed_tool_call_items(
tool_name=call.name,
tool_kwargs=call.arguments or {},
error_message=error_text,
call_id=call.id,
)
)
else:
# Get the latest screenshot
screenshots = [
content.data
for content in result.content
if isinstance(content, types.ImageContent)
]
# Add the resulting screenshot
if screenshots:
self._log_image(screenshots[0])
self.last_screenshot_b64 = screenshots[0]
messages.append(
{
"type": "computer_call_output",
"call_id": call.id,
"output": {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshots[0]}",
},
}
)
else:
# Otherwise, replace computer call with failed tool call
messages.pop()
messages.extend(
make_failed_tool_call_items(
tool_name=call.name,
tool_kwargs=call.arguments or {},
error_message="No screenshots returned.",
call_id=call.id,
)
)
return messages
__all__ = [
"MCPComputerAgent",
]
```
--------------------------------------------------------------------------------
/libs/lume/src/FileSystem/Settings.swift:
--------------------------------------------------------------------------------
```swift
import Foundation
/// Manages the application settings using a config file
struct LumeSettings: Codable, Sendable {
var vmLocations: [VMLocation]
var defaultLocationName: String
var cacheDirectory: String
var cachingEnabled: Bool
var defaultLocation: VMLocation? {
vmLocations.first { $0.name == defaultLocationName }
}
// For backward compatibility
var homeDirectory: String {
defaultLocation?.path ?? "~/.lume"
}
static let defaultSettings = LumeSettings(
vmLocations: [
VMLocation(name: "default", path: "~/.lume")
],
defaultLocationName: "default",
cacheDirectory: "~/.lume/cache",
cachingEnabled: true
)
/// Gets all locations sorted by name
var sortedLocations: [VMLocation] {
vmLocations.sorted { $0.name < $1.name }
}
}
final class SettingsManager: @unchecked Sendable {
// MARK: - Constants
private enum Constants {
// Default path for config
static let fallbackConfigDir = "~/.config/lume"
static let configFileName = "config.yaml"
}
// MARK: - Properties
static let shared = SettingsManager()
private let fileManager: FileManager
// Get the config directory following XDG spec
private var configDir: String {
// Check XDG_CONFIG_HOME environment variable first
if let xdgConfigHome = ProcessInfo.processInfo.environment["XDG_CONFIG_HOME"] {
return "\(xdgConfigHome)/lume"
}
// Fall back to default
return (Constants.fallbackConfigDir as NSString).expandingTildeInPath
}
// Path to config file
private var configFilePath: String {
return "\(configDir)/\(Constants.configFileName)"
}
// MARK: - Initialization
init(fileManager: FileManager = .default) {
self.fileManager = fileManager
ensureConfigDirectoryExists()
}
// MARK: - Settings Access
func getSettings() -> LumeSettings {
if let settings = readSettingsFromFile() {
return settings
}
// No settings file found, use defaults
let defaultSettings = LumeSettings(
vmLocations: [
VMLocation(name: "default", path: "~/.lume")
],
defaultLocationName: "default",
cacheDirectory: "~/.lume/cache",
cachingEnabled: true
)
// Try to save default settings
try? saveSettings(defaultSettings)
return defaultSettings
}
func saveSettings(_ settings: LumeSettings) throws {
try fileManager.createDirectory(atPath: configDir, withIntermediateDirectories: true)
// Create a human-readable YAML-like configuration file
var yamlContent = "# Lume Configuration\n\n"
// Default location
yamlContent += "defaultLocationName: \"\(settings.defaultLocationName)\"\n"
// Cache directory
yamlContent += "cacheDirectory: \"\(settings.cacheDirectory)\"\n"
// Caching enabled flag
yamlContent += "cachingEnabled: \(settings.cachingEnabled)\n"
// VM locations
yamlContent += "\n# VM Locations\nvmLocations:\n"
for location in settings.vmLocations {
yamlContent += " - name: \"\(location.name)\"\n"
yamlContent += " path: \"\(location.path)\"\n"
}
// Write YAML content to file
try yamlContent.write(
to: URL(fileURLWithPath: configFilePath), atomically: true, encoding: .utf8)
}
// MARK: - VM Location Management
func addLocation(_ location: VMLocation) throws {
var settings = getSettings()
// Validate location name (alphanumeric, dash, underscore)
let nameRegex = try NSRegularExpression(pattern: "^[a-zA-Z0-9_-]+$")
let nameRange = NSRange(location.name.startIndex..., in: location.name)
if nameRegex.firstMatch(in: location.name, range: nameRange) == nil {
throw VMLocationError.invalidLocationName(name: location.name)
}
// Check for duplicate name
if settings.vmLocations.contains(where: { $0.name == location.name }) {
throw VMLocationError.duplicateLocationName(name: location.name)
}
// Validate location path
try location.validate()
// Add location
settings.vmLocations.append(location)
try saveSettings(settings)
}
func removeLocation(name: String) throws {
var settings = getSettings()
// Check location exists
guard settings.vmLocations.contains(where: { $0.name == name }) else {
throw VMLocationError.locationNotFound(name: name)
}
// Prevent removing default location
if name == settings.defaultLocationName {
throw VMLocationError.defaultLocationCannotBeRemoved(name: name)
}
// Remove location
settings.vmLocations.removeAll(where: { $0.name == name })
try saveSettings(settings)
}
func setDefaultLocation(name: String) throws {
var settings = getSettings()
// Check location exists
guard settings.vmLocations.contains(where: { $0.name == name }) else {
throw VMLocationError.locationNotFound(name: name)
}
// Set default
settings.defaultLocationName = name
try saveSettings(settings)
}
func getLocation(name: String) throws -> VMLocation {
let settings = getSettings()
if let location = settings.vmLocations.first(where: { $0.name == name }) {
return location
}
throw VMLocationError.locationNotFound(name: name)
}
// MARK: - Legacy Home Directory Compatibility
func setHomeDirectory(path: String) throws {
var settings = getSettings()
let defaultLocation = VMLocation(name: "default", path: path)
try defaultLocation.validate()
// Replace default location
if let index = settings.vmLocations.firstIndex(where: { $0.name == "default" }) {
settings.vmLocations[index] = defaultLocation
} else {
settings.vmLocations.append(defaultLocation)
settings.defaultLocationName = "default"
}
try saveSettings(settings)
}
// MARK: - Cache Directory Management
func setCacheDirectory(path: String) throws {
var settings = getSettings()
// Validate path
let expandedPath = (path as NSString).expandingTildeInPath
var isDir: ObjCBool = false
// If directory exists, check if it's writable
if fileManager.fileExists(atPath: expandedPath, isDirectory: &isDir) {
if !isDir.boolValue {
throw SettingsError.notADirectory(path: expandedPath)
}
if !fileManager.isWritableFile(atPath: expandedPath) {
throw SettingsError.directoryNotWritable(path: expandedPath)
}
} else {
// Try to create the directory
do {
try fileManager.createDirectory(
atPath: expandedPath,
withIntermediateDirectories: true
)
} catch {
throw SettingsError.directoryCreationFailed(path: expandedPath, error: error)
}
}
// Update settings
settings.cacheDirectory = path
try saveSettings(settings)
}
func getCacheDirectory() -> String {
return getSettings().cacheDirectory
}
func setCachingEnabled(_ enabled: Bool) throws {
var settings = getSettings()
settings.cachingEnabled = enabled
try saveSettings(settings)
}
func isCachingEnabled() -> Bool {
return getSettings().cachingEnabled
}
// MARK: - Private Helpers
private func ensureConfigDirectoryExists() {
try? fileManager.createDirectory(atPath: configDir, withIntermediateDirectories: true)
}
private func readSettingsFromFile() -> LumeSettings? {
// Read from YAML file
if fileExists(at: configFilePath) {
do {
let yamlString = try String(
contentsOf: URL(fileURLWithPath: configFilePath), encoding: .utf8)
return parseYamlSettings(yamlString)
} catch {
Logger.error(
"Failed to read settings from YAML file",
metadata: ["error": error.localizedDescription]
)
}
}
return nil
}
private func parseYamlSettings(_ yamlString: String) -> LumeSettings? {
// This is a very basic YAML parser for our specific config format
// A real implementation would use a proper YAML library
var defaultLocationName = "default"
var cacheDirectory = "~/.lume/cache"
var cachingEnabled = true // default to true for backward compatibility
var vmLocations: [VMLocation] = []
var inLocationsSection = false
var currentLocation: (name: String?, path: String?) = (nil, nil)
let lines = yamlString.split(separator: "\n")
for (_, line) in lines.enumerated() {
let trimmedLine = line.trimmingCharacters(in: .whitespaces)
// Skip comments and empty lines
if trimmedLine.hasPrefix("#") || trimmedLine.isEmpty {
continue
}
// Check for section marker
if trimmedLine == "vmLocations:" {
inLocationsSection = true
continue
}
// In the locations section, handle line indentation more carefully
if inLocationsSection {
if trimmedLine.hasPrefix("-") || trimmedLine.contains("- name:") {
// Process the previous location before starting a new one
if let name = currentLocation.name, let path = currentLocation.path {
vmLocations.append(VMLocation(name: name, path: path))
}
currentLocation = (nil, nil)
}
// Process the key-value pairs within a location
if let colonIndex = trimmedLine.firstIndex(of: ":") {
let key = trimmedLine[..<colonIndex].trimmingCharacters(in: .whitespaces)
let rawValue = trimmedLine[trimmedLine.index(after: colonIndex)...]
.trimmingCharacters(in: .whitespaces)
let value = extractValueFromYaml(rawValue)
if key.hasSuffix("name") {
currentLocation.name = value
} else if key.hasSuffix("path") {
currentLocation.path = value
}
}
} else {
// Process top-level keys outside the locations section
if let colonIndex = trimmedLine.firstIndex(of: ":") {
let key = trimmedLine[..<colonIndex].trimmingCharacters(in: .whitespaces)
let rawValue = trimmedLine[trimmedLine.index(after: colonIndex)...]
.trimmingCharacters(in: .whitespaces)
let value = extractValueFromYaml(rawValue)
if key == "defaultLocationName" {
defaultLocationName = value
} else if key == "cacheDirectory" {
cacheDirectory = value
} else if key == "cachingEnabled" {
cachingEnabled = value.lowercased() == "true"
}
}
}
}
// Don't forget to add the last location
if let name = currentLocation.name, let path = currentLocation.path {
vmLocations.append(VMLocation(name: name, path: path))
}
// Ensure at least one location exists
if vmLocations.isEmpty {
vmLocations.append(VMLocation(name: "default", path: "~/.lume"))
}
return LumeSettings(
vmLocations: vmLocations,
defaultLocationName: defaultLocationName,
cacheDirectory: cacheDirectory,
cachingEnabled: cachingEnabled
)
}
// Helper method to extract a value from YAML, handling quotes
private func extractValueFromYaml(_ rawValue: String) -> String {
if rawValue.hasPrefix("\"") && rawValue.hasSuffix("\"") && rawValue.count >= 2 {
// Remove the surrounding quotes
let startIndex = rawValue.index(after: rawValue.startIndex)
let endIndex = rawValue.index(before: rawValue.endIndex)
return String(rawValue[startIndex..<endIndex])
}
return rawValue
}
// Helper method to output debug information about the current settings
func debugSettings() -> String {
let settings = getSettings()
var output = "Current Settings:\n"
output += "- Default VM storage: \(settings.defaultLocationName)\n"
output += "- Cache directory: \(settings.cacheDirectory)\n"
output += "- VM Locations (\(settings.vmLocations.count)):\n"
for (i, location) in settings.vmLocations.enumerated() {
let isDefault = location.name == settings.defaultLocationName
let defaultMark = isDefault ? " (default)" : ""
output += " \(i+1). \(location.name): \(location.path)\(defaultMark)\n"
}
// Also add raw file content
if fileExists(at: configFilePath) {
if let content = try? String(contentsOf: URL(fileURLWithPath: configFilePath)) {
output += "\nRaw YAML file content:\n"
output += content
}
}
return output
}
private func fileExists(at path: String) -> Bool {
fileManager.fileExists(atPath: path)
}
}
// MARK: - Errors
enum SettingsError: Error, LocalizedError {
case notADirectory(path: String)
case directoryNotWritable(path: String)
case directoryCreationFailed(path: String, error: Error)
var errorDescription: String? {
switch self {
case .notADirectory(let path):
return "Path is not a directory: \(path)"
case .directoryNotWritable(let path):
return "Directory is not writable: \(path)"
case .directoryCreationFailed(let path, let error):
return "Failed to create directory at \(path): \(error.localizedDescription)"
}
}
}
```
--------------------------------------------------------------------------------
/docs/content/docs/example-usecases/form-filling.mdx:
--------------------------------------------------------------------------------
```markdown
---
title: PDF to Form Automation
description: Enhance and Automate Interactions Between Form Filling and Local File Systems
---
import { Step, Steps } from 'fumadocs-ui/components/steps';
import { Tab, Tabs } from 'fumadocs-ui/components/tabs';
## Overview
Cua can be used to automate interactions between form filling and local file systems over any operating system. Cua let's you interact with all the elements of a web page and local file systems to integrate between the two.
This preset usecase uses [Cua Computer](/computer-sdk/computers) to interact with a web page and local file systems along with [Agent Loops](/agent-sdk/agent-loops) to run the agent in a loop with message history.
---
<Steps>
<Step>
### Set Up Your Environment
First, install the required dependencies:
Create a `requirements.txt` file:
```text
cua-agent
cua-computer
python-dotenv>=1.0.0
```
Install the dependencies:
```bash
pip install -r requirements.txt
```
Create a `.env` file with your API keys:
```text
ANTHROPIC_API_KEY=your-anthropic-api-key
CUA_API_KEY=sk_cua-api01...
```
</Step>
<Step>
### Create Your Form Filling Script
Create a Python file (e.g., `form_filling.py`) and select your environment:
<Tabs items={['Cloud Sandbox', 'Linux on Docker', 'macOS Sandbox', 'Windows Sandbox']}>
<Tab value="Cloud Sandbox">
```python
import asyncio
import logging
import os
import signal
import traceback
from agent import ComputerAgent
from computer import Computer, VMProviderType
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def handle_sigint(sig, frame):
print("\n\nExecution interrupted by user. Exiting gracefully...")
exit(0)
async def fill_application():
try:
async with Computer(
os_type="linux",
provider_type=VMProviderType.CLOUD,
name="your-sandbox-name", # Replace with your sandbox name
api_key=os.environ["CUA_API_KEY"],
verbosity=logging.INFO,
) as computer:
agent = ComputerAgent(
model="cua/anthropic/claude-sonnet-4.5",
tools=[computer],
only_n_most_recent_images=3,
verbosity=logging.INFO,
trajectory_dir="trajectories",
use_prompt_caching=True,
max_trajectory_budget=5.0,
)
tasks = [
"Visit https://www.overleaf.com/latex/templates/jakes-resume/syzfjbzwjncs.pdf and download the pdf.",
"Visit https://form.jotform.com/252881246782264 and fill the form from the information in the pdf."
]
history = []
for i, task in enumerate(tasks, 1):
print(f"\n[Task {i}/{len(tasks)}] {task}")
# Add user message to history
history.append({"role": "user", "content": task})
# Run agent with conversation history
async for result in agent.run(history, stream=False):
history += result.get("output", [])
# Print output for debugging
for item in result.get("output", []):
if item.get("type") == "message":
content = item.get("content", [])
for content_part in content:
if content_part.get("text"):
logger.info(f"Agent: {content_part.get('text')}")
elif item.get("type") == "computer_call":
action = item.get("action", {})
action_type = action.get("type", "")
logger.debug(f"Computer Action: {action_type}")
print(f"✅ Task {i}/{len(tasks)} completed")
print("\n🎉 All tasks completed successfully!")
except Exception as e:
logger.error(f"Error in fill_application: {e}")
traceback.print_exc()
raise
def main():
try:
load_dotenv()
if "ANTHROPIC_API_KEY" not in os.environ:
raise RuntimeError(
"Please set the ANTHROPIC_API_KEY environment variable.\n"
"You can add it to a .env file in the project root."
)
if "CUA_API_KEY" not in os.environ:
raise RuntimeError(
"Please set the CUA_API_KEY environment variable.\n"
"You can add it to a .env file in the project root."
)
signal.signal(signal.SIGINT, handle_sigint)
asyncio.run(fill_application())
except Exception as e:
logger.error(f"Error running automation: {e}")
traceback.print_exc()
if __name__ == "__main__":
main()
```
</Tab>
<Tab value="Linux on Docker">
```python
import asyncio
import logging
import os
import signal
import traceback
from agent import ComputerAgent
from computer import Computer, VMProviderType
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def handle_sigint(sig, frame):
print("\n\nExecution interrupted by user. Exiting gracefully...")
exit(0)
async def fill_application():
try:
async with Computer(
os_type="linux",
provider_type=VMProviderType.DOCKER,
image="trycua/cua-xfce:latest", # or "trycua/cua-ubuntu:latest"
verbosity=logging.INFO,
) as computer:
agent = ComputerAgent(
model="cua/anthropic/claude-sonnet-4.5",
tools=[computer],
only_n_most_recent_images=3,
verbosity=logging.INFO,
trajectory_dir="trajectories",
use_prompt_caching=True,
max_trajectory_budget=5.0,
)
tasks = [
"Visit https://www.overleaf.com/latex/templates/jakes-resume/syzfjbzwjncs.pdf and download the pdf.",
"Visit https://form.jotform.com/252881246782264 and fill the form from the information in the pdf."
]
history = []
for i, task in enumerate(tasks, 1):
print(f"\n[Task {i}/{len(tasks)}] {task}")
# Add user message to history
history.append({"role": "user", "content": task})
# Run agent with conversation history
async for result in agent.run(history, stream=False):
history += result.get("output", [])
# Print output for debugging
for item in result.get("output", []):
if item.get("type") == "message":
content = item.get("content", [])
for content_part in content:
if content_part.get("text"):
logger.info(f"Agent: {content_part.get('text')}")
elif item.get("type") == "computer_call":
action = item.get("action", {})
action_type = action.get("type", "")
logger.debug(f"Computer Action: {action_type}")
print(f"✅ Task {i}/{len(tasks)} completed")
print("\n🎉 All tasks completed successfully!")
except Exception as e:
logger.error(f"Error in fill_application: {e}")
traceback.print_exc()
raise
def main():
try:
load_dotenv()
if "ANTHROPIC_API_KEY" not in os.environ:
raise RuntimeError(
"Please set the ANTHROPIC_API_KEY environment variable.\n"
"You can add it to a .env file in the project root."
)
signal.signal(signal.SIGINT, handle_sigint)
asyncio.run(fill_application())
except Exception as e:
logger.error(f"Error running automation: {e}")
traceback.print_exc()
if __name__ == "__main__":
main()
```
</Tab>
<Tab value="macOS Sandbox">
```python
import asyncio
import logging
import os
import signal
import traceback
from agent import ComputerAgent
from computer import Computer, VMProviderType
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def handle_sigint(sig, frame):
print("\n\nExecution interrupted by user. Exiting gracefully...")
exit(0)
async def fill_application():
try:
async with Computer(
os_type="macos",
provider_type=VMProviderType.LUME,
name="macos-sequoia-cua:latest",
verbosity=logging.INFO,
) as computer:
agent = ComputerAgent(
model="cua/anthropic/claude-sonnet-4.5",
tools=[computer],
only_n_most_recent_images=3,
verbosity=logging.INFO,
trajectory_dir="trajectories",
use_prompt_caching=True,
max_trajectory_budget=5.0,
)
tasks = [
"Visit https://www.overleaf.com/latex/templates/jakes-resume/syzfjbzwjncs.pdf and download the pdf.",
"Visit https://form.jotform.com/252881246782264 and fill the form from the information in the pdf."
]
history = []
for i, task in enumerate(tasks, 1):
print(f"\n[Task {i}/{len(tasks)}] {task}")
# Add user message to history
history.append({"role": "user", "content": task})
# Run agent with conversation history
async for result in agent.run(history, stream=False):
history += result.get("output", [])
# Print output for debugging
for item in result.get("output", []):
if item.get("type") == "message":
content = item.get("content", [])
for content_part in content:
if content_part.get("text"):
logger.info(f"Agent: {content_part.get('text')}")
elif item.get("type") == "computer_call":
action = item.get("action", {})
action_type = action.get("type", "")
logger.debug(f"Computer Action: {action_type}")
print(f"✅ Task {i}/{len(tasks)} completed")
print("\n🎉 All tasks completed successfully!")
except Exception as e:
logger.error(f"Error in fill_application: {e}")
traceback.print_exc()
raise
def main():
try:
load_dotenv()
if "ANTHROPIC_API_KEY" not in os.environ:
raise RuntimeError(
"Please set the ANTHROPIC_API_KEY environment variable.\n"
"You can add it to a .env file in the project root."
)
signal.signal(signal.SIGINT, handle_sigint)
asyncio.run(fill_application())
except Exception as e:
logger.error(f"Error running automation: {e}")
traceback.print_exc()
if __name__ == "__main__":
main()
```
</Tab>
<Tab value="Windows Sandbox">
```python
import asyncio
import logging
import os
import signal
import traceback
from agent import ComputerAgent
from computer import Computer, VMProviderType
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def handle_sigint(sig, frame):
print("\n\nExecution interrupted by user. Exiting gracefully...")
exit(0)
async def fill_application():
try:
async with Computer(
os_type="windows",
provider_type=VMProviderType.WINDOWS_SANDBOX,
verbosity=logging.INFO,
) as computer:
agent = ComputerAgent(
model="cua/anthropic/claude-sonnet-4.5",
tools=[computer],
only_n_most_recent_images=3,
verbosity=logging.INFO,
trajectory_dir="trajectories",
use_prompt_caching=True,
max_trajectory_budget=5.0,
)
tasks = [
"Visit https://www.overleaf.com/latex/templates/jakes-resume/syzfjbzwjncs.pdf and download the pdf.",
"Visit https://form.jotform.com/252881246782264 and fill the form from the information in the pdf."
]
history = []
for i, task in enumerate(tasks, 1):
print(f"\n[Task {i}/{len(tasks)}] {task}")
# Add user message to history
history.append({"role": "user", "content": task})
# Run agent with conversation history
async for result in agent.run(history, stream=False):
history += result.get("output", [])
# Print output for debugging
for item in result.get("output", []):
if item.get("type") == "message":
content = item.get("content", [])
for content_part in content:
if content_part.get("text"):
logger.info(f"Agent: {content_part.get('text')}")
elif item.get("type") == "computer_call":
action = item.get("action", {})
action_type = action.get("type", "")
logger.debug(f"Computer Action: {action_type}")
print(f"✅ Task {i}/{len(tasks)} completed")
print("\n🎉 All tasks completed successfully!")
except Exception as e:
logger.error(f"Error in fill_application: {e}")
traceback.print_exc()
raise
def main():
try:
load_dotenv()
if "ANTHROPIC_API_KEY" not in os.environ:
raise RuntimeError(
"Please set the ANTHROPIC_API_KEY environment variable.\n"
"You can add it to a .env file in the project root."
)
signal.signal(signal.SIGINT, handle_sigint)
asyncio.run(fill_application())
except Exception as e:
logger.error(f"Error running automation: {e}")
traceback.print_exc()
if __name__ == "__main__":
main()
```
</Tab>
</Tabs>
</Step>
<Step>
### Run Your Script
Execute your form filling automation:
```bash
python form_filling.py
```
The agent will:
1. Download the PDF resume from Overleaf
2. Extract information from the PDF
3. Fill out the JotForm with the extracted information
Monitor the output to see the agent's progress through each task.
</Step>
</Steps>
---
## Next Steps
- Learn more about [Cua computers](/computer-sdk/computers) and [computer commands](/computer-sdk/commands)
- Read about [Agent loops](/agent-sdk/agent-loops), [tools](/agent-sdk/custom-tools), and [supported model providers](/agent-sdk/supported-model-providers/)
- Experiment with different [Models and Providers](/agent-sdk/supported-model-providers/)
- Join our [Discord community](https://discord.com/invite/mVnXXpdE85) for help
```
--------------------------------------------------------------------------------
/libs/lumier/src/lib/vm.sh:
--------------------------------------------------------------------------------
```bash
#!/usr/bin/env bash
# Initialize global flags
export PULL_IN_PROGRESS=0
start_vm() {
# Determine storage path for VM
STORAGE_PATH="$HOST_STORAGE_PATH"
if [ -z "$STORAGE_PATH" ]; then
STORAGE_PATH="storage_${VM_NAME}"
fi
# Check if VM exists and its status using JSON format - quietly
VM_INFO=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}")
# Check if VM not found error
if [[ $VM_INFO == *"Virtual machine not found"* ]]; then
IMAGE_NAME="${VERSION##*/}"
# Parse registry and organization from VERSION
REGISTRY=$(echo $VERSION | cut -d'/' -f1)
ORGANIZATION=$(echo $VERSION | cut -d'/' -f2)
echo "Pulling VM image $IMAGE_NAME..."
lume_pull "$IMAGE_NAME" "$VM_NAME" "$STORAGE_PATH" "$REGISTRY" "$ORGANIZATION"
else
# Parse the JSON status - check if it contains "status" : "running"
if [[ $VM_INFO == *'"status" : "running"'* ]]; then
lume_stop "$VM_NAME" "$STORAGE_PATH"
fi
fi
# Format memory size for display purposes
MEMORY_DISPLAY="$RAM_SIZE"
if [[ ! "$RAM_SIZE" == *"GB"* && ! "$RAM_SIZE" == *"MB"* ]]; then
MEMORY_DISPLAY="${RAM_SIZE}MB"
fi
# Set VM parameters using the wrapper function
if [[ "$LUMIER_DEBUG" == "1" ]]; then
echo "Updating VM settings: cpu=$CPU_CORES memory=$MEMORY_DISPLAY display=$DISPLAY"
fi
lume_set "$VM_NAME" "$STORAGE_PATH" "$CPU_CORES" "$RAM_SIZE" "$DISPLAY"
# Fetch VM configuration - quietly (don't display to console)
CONFIG_JSON=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}")
# Setup shared directory args if necessary
SHARED_DIR_ARGS=""
if [ -d "/shared" ]; then
if [ -n "$HOST_SHARED_PATH" ]; then
SHARED_DIR_ARGS="--shared-dir=$HOST_SHARED_PATH"
else
echo "Warning: /shared volume exists but HOST_SHARED_PATH is not set. Cannot mount volume."
fi
fi
# Run VM with VNC and shared directory using curl
lume_run $SHARED_DIR_ARGS --storage "$STORAGE_PATH" "$VM_NAME" &
# lume run "$VM_NAME" --storage "$STORAGE_PATH" --no-display
# sleep 10000000
# Wait for VM to be running and VNC URL to be available
vm_ip=""
vnc_url=""
max_attempts=30
attempt=0
while [ $attempt -lt $max_attempts ]; do
# Get VM info as JSON using the API function - pass debug flag
VM_INFO=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}")
# Extract status, IP address, and VNC URL using the helper function
vm_status=$(extract_json_field "status" "$VM_INFO")
vm_ip=$(extract_json_field "ipAddress" "$VM_INFO")
vnc_url=$(extract_json_field "vncUrl" "$VM_INFO")
# Check if VM status is 'running' and we have IP and VNC URL
if [ "$vm_status" = "running" ] && [ -n "$vm_ip" ] && [ -n "$vnc_url" ]; then
break
fi
sleep 2
attempt=$((attempt + 1))
done
if [ -z "$vm_ip" ] || [ -z "$vnc_url" ]; then
echo "Timed out waiting for VM to start or VNC URL to become available."
lume_stop "$VM_NAME" "$STORAGE_PATH" > /dev/null 2>&1
# lume stop "$VM_NAME" --storage "$STORAGE_PATH" > /dev/null 2>&1
exit 1
fi
# Parse VNC URL to extract password and port
VNC_PASSWORD=$(echo "$vnc_url" | sed -n 's/.*:\(.*\)@.*/\1/p')
VNC_PORT=$(echo "$vnc_url" | sed -n 's/.*:\([0-9]\+\)$/\1/p')
# Wait for SSH to become available
wait_for_ssh "$vm_ip" "$HOST_USER" "$HOST_PASSWORD" 5 20
# Export VNC variables for entry.sh to use
export VNC_PORT
export VNC_PASSWORD
# Execute on-logon.sh if present
on_logon_script="/run/lifecycle/on-logon.sh"
# Only show detailed logs in debug mode
if [ "${LUMIER_DEBUG:-0}" == "1" ]; then
echo "Running on-logon.sh hook script on VM..."
fi
# Check if script exists
if [ ! -f "$on_logon_script" ]; then
echo "Warning: on-logon.sh hook script not found at $on_logon_script"
else
# Execute the remote script
execute_remote_script "$vm_ip" "$HOST_USER" "$HOST_PASSWORD" "$on_logon_script" "$VNC_PASSWORD" "$HOST_SHARED_PATH"
fi
}
# Get VM information using curl
lume_get() {
local vm_name="$1"
local storage="$2"
local format="${3:-json}"
local debug="${4:-false}"
local api_host="${LUME_API_HOST:-host.docker.internal}"
local api_port="${LUME_API_PORT:-7777}"
# URL encode the storage path for the query parameter
# Replace special characters with their URL encoded equivalents
local encoded_storage=$(echo "$storage" | sed 's/\//%2F/g' | sed 's/ /%20/g' | sed 's/:/%3A/g')
# Construct API URL with encoded storage parameter
local api_url="http://${api_host}:${api_port}/lume/vms/${vm_name}?storage=${encoded_storage}"
# Construct the full curl command
local curl_cmd="curl --connect-timeout 6000 --max-time 5000 -s '$api_url'"
# Print debug info
if [[ "$debug" == "true" || "$LUMIER_DEBUG" == "1" ]]; then
echo "[DEBUG] Calling API: $api_url"
echo "[DEBUG] Full curl command: $curl_cmd"
fi
# Log curl commands only when in debug mode
if [[ "$debug" == "true" || "$LUMIER_DEBUG" == "1" ]]; then
echo "[$(date -u '+%Y-%m-%dT%H:%M:%SZ')] DEBUG: Executing curl request: $api_url" >&2
fi
# Make the API call
local response=$(curl --connect-timeout 6000 \
--max-time 5000 \
-s \
"$api_url")
# Print the response if debugging is enabled
if [[ "$debug" == "true" || "${LUMIER_DEBUG:-0}" == "1" ]]; then
echo "[DEBUG] API Response:"
echo "$response" | jq '.' 2>/dev/null || echo "$response"
fi
# Output the response so callers can capture it
echo "$response"
}
# Set VM properties using curl
lume_set() {
local vm_name="$1"
local storage="$2"
local cpu="${3:-4}"
local memory="${4:-8192}"
local display="${5:-1024x768}"
local api_host="${LUME_API_HOST:-host.docker.internal}"
local api_port="${LUME_API_PORT:-7777}"
# Handle memory format for the API
if [[ "$memory" == *"GB"* ]]; then
# Already in GB format, keep as is
: # No-op
elif [[ "$memory" =~ ^[0-9]+$ ]]; then
# If memory is a simple number, assume MB and convert to GB
memory="$(awk "BEGIN { printf \"%.1f\", $memory/1024 }")GB"
fi
# Only show memory formatting debug in debug mode
if [[ "$LUMIER_DEBUG" == "1" ]]; then
echo "[DEBUG] Formatted memory value: $memory"
fi
# Store response to conditionally show based on debug mode
local response=$(curl --connect-timeout 6000 \
--max-time 5000 \
-s \
-X PATCH \
-H "Content-Type: application/json" \
-d "{
\"cpu\": $cpu,
\"memory\": \"$memory\",
\"display\": \"$display\",
\"storage\": \"$storage\"
}" \
"http://${api_host}:${api_port}/lume/vms/${vm_name}")
# Only show response in debug mode
if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
echo "$response"
fi
}
stop_vm() {
local in_cleanup=${1:-false} # Optional first argument to indicate if called from cleanup trap
echo "Stopping VM '$VM_NAME'..."
STORAGE_PATH="$HOST_STORAGE_PATH"
# Only show storage path in debug mode
if [[ "$LUMIER_DEBUG" == "1" ]]; then
echo "STORAGE_PATH: $STORAGE_PATH"
fi
VM_INFO=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}")
vm_status=$(extract_json_field "status" "$VM_INFO")
if [ "$vm_status" == "running" ]; then
lume_stop "$VM_NAME" "$STORAGE_PATH"
elif [ "$vm_status" == "stopped" ]; then
echo "VM '$VM_NAME' is already stopped."
elif [ "$in_cleanup" = true ]; then
# If we are in the cleanup trap and status is unknown or VM not found,
# still attempt a stop just in case.
echo "VM status is unknown ('$vm_status') or VM not found during cleanup. Attempting stop anyway."
lume_stop "$VM_NAME" "$STORAGE_PATH"
sleep 5
echo "VM '$VM_NAME' stop command issued as a precaution."
else
echo "VM status is unknown ('$vm_status') or VM not found. Not attempting stop."
fi
}
is_vm_running() {
# Check VM status using the API function
local vm_info
vm_info=$(lume_get "$VM_NAME" "$HOST_STORAGE_PATH")
if [[ $vm_info == *'"status" : "running"'* ]]; then
return 0 # Running
else
return 1 # Not running or doesn't exist
fi
# lume ls | grep -q "$VM_NAME" # Old CLI check
}
# Stop VM with storage location specified using curl
lume_stop() {
local vm_name="$1"
local storage="$2"
local api_host="${LUME_API_HOST:-host.docker.internal}"
local api_port="${LUME_API_PORT:-7777}"
# Only log in debug mode
if [[ "$LUMIER_DEBUG" == "1" ]]; then
echo "Stopping VM $vm_name..."
fi
# Execute command and capture response
local response
if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
# Show output in debug mode
response=$(curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
-H "Content-Type: application/json" \
-d '{"storage":"'$storage'"}' \
"http://${api_host}:${api_port}/lume/vms/${vm_name}/stop")
echo "$response"
else
# Run silently in normal mode
response=$(curl --connect-timeout 6000 \
--max-time 5000 \
-s \
-X POST \
-H "Content-Type: application/json" \
-d '{"storage":"'$storage'"}' \
"http://${api_host}:${api_port}/lume/vms/${vm_name}/stop")
fi
}
# Pull a VM image using curl
lume_pull() {
local image="$1" # Image name with tag
local vm_name="$2" # Name for the new VM
local storage="$3" # Storage location
local registry="${4:-ghcr.io}" # Registry, default is ghcr.io
local organization="${5:-trycua}" # Organization, default is trycua
local api_host="${LUME_API_HOST:-host.docker.internal}"
local api_port="${LUME_API_PORT:-7777}"
# Mark that pull is in progress for interrupt handling
export PULL_IN_PROGRESS=1
# Only log full details in debug mode
if [[ "$LUMIER_DEBUG" == "1" ]]; then
echo "Pulling image $image from $registry/$organization..."
else
echo "Pulling image $image..."
fi
# Inform users how to check pull progress
echo "You can check the pull progress using: lume logs -f"
# Pull image via API and capture response
local response
if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
# Show full response in debug mode - no timeout limits
response=$(curl \
-X POST \
-H "Content-Type: application/json" \
-d "{
\"image\": \"$image\",
\"name\": \"$vm_name\",
\"registry\": \"$registry\",
\"organization\": \"$organization\",
\"storage\": \"$storage\"
}" \
"http://${api_host}:${api_port}/lume/pull")
echo "$response"
else
# Run silently in normal mode - no timeout limits
response=$(curl \
-s \
-X POST \
-H "Content-Type: application/json" \
-d "{
\"image\": \"$image\",
\"name\": \"$vm_name\",
\"registry\": \"$registry\",
\"organization\": \"$organization\",
\"storage\": \"$storage\"
}" \
"http://${api_host}:${api_port}/lume/pull")
fi
# Unset pull in progress flag
export PULL_IN_PROGRESS=0
}
# Run VM with VNC client started and shared directory using curl
lume_run() {
# Parse args
local shared_dir=""
local storage=""
local vm_name="lume_vm"
local no_display=true
while [[ $# -gt 0 ]]; do
case $1 in
--shared-dir=*)
shared_dir="${1#*=}"
shift
;;
--storage)
storage="$2"
shift 2
;;
--no-display)
no_display=true
shift
;;
*)
# Assume last arg is VM name if not an option
vm_name="$1"
shift
;;
esac
done
local api_host="${LUME_API_HOST:-host.docker.internal}"
local api_port="${LUME_API_PORT:-7777}"
# Only log in debug mode
if [[ "$LUMIER_DEBUG" == "1" ]]; then
echo "Running VM $vm_name..."
fi
# Build the JSON body dynamically based on what's provided
local json_body="{\"noDisplay\": true"
# Only include shared directories if shared_dir is provided
if [[ -n "$shared_dir" ]]; then
json_body+=", \"sharedDirectories\": [{\"hostPath\": \"$shared_dir\", \"readOnly\": false}]"
fi
# Only include storage if it's provided
if [[ -n "$storage" ]]; then
json_body+=", \"storage\": \"$storage\""
fi
# Add recovery mode (always false)
json_body+=", \"recoveryMode\": false}"
# Execute the command and store the response
local response
if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
# Show response in debug mode
response=$(curl --connect-timeout 6000 \
--max-time 5000 \
-X POST \
-H 'Content-Type: application/json' \
-d "$json_body" \
http://${api_host}:${api_port}/lume/vms/$vm_name/run)
echo "$response"
else
# Run silently in normal mode
response=$(curl --connect-timeout 6000 \
--max-time 5000 \
-s \
-X POST \
-H 'Content-Type: application/json' \
-d "$json_body" \
http://${api_host}:${api_port}/lume/vms/$vm_name/run)
fi
}
# Delete a VM using curl
lume_delete() {
local vm_name="$1"
local storage="$2"
local api_host="${LUME_API_HOST:-host.docker.internal}"
local api_port="${LUME_API_PORT:-7777}"
# URL encode the storage path for the query parameter
# Replace special characters with their URL encoded equivalents
local encoded_storage=$(echo "$storage" | sed 's/\//%2F/g' | sed 's/ /%20/g' | sed 's/:/%3A/g')
# Construct API URL with encoded storage parameter
local api_url="http://${api_host}:${api_port}/lume/vms/${vm_name}?storage=${encoded_storage}"
# Only log in debug mode
if [[ "$LUMIER_DEBUG" == "1" ]]; then
echo "Deleting VM $vm_name from storage $storage..."
fi
# Execute command and capture response
local response
if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
# Show output in debug mode
response=$(curl --connect-timeout 6000 \
--max-time 5000 \
-X DELETE \
"$api_url")
echo "$response"
else
# Run silently in normal mode
response=$(curl --connect-timeout 6000 \
--max-time 5000 \
-s \
-X DELETE \
"$api_url")
fi
}
```
--------------------------------------------------------------------------------
/libs/python/agent/benchmarks/utils.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Shared utilities for ScreenSpot-Pro benchmarking and interactive testing.
"""
import dotenv
dotenv.load_dotenv()
import asyncio
import base64
import gc
import os
import statistics
import subprocess as sp
import sys
from datetime import datetime
from io import BytesIO
from typing import List, Optional, Tuple, Union
import torch
from PIL import Image, ImageDraw
from tqdm import tqdm
# Add parent directory to path for imports
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from agent.agent import ComputerAgent
from models.base import ModelProtocol
def get_gpu_memory() -> List[int]:
"""
Get GPU memory usage using nvidia-smi.
Returns:
List of free memory values in MB for each GPU
"""
try:
command = "nvidia-smi --query-gpu=memory.free --format=csv"
memory_free_info = sp.check_output(command.split()).decode("ascii").split("\n")[:-1][1:]
memory_free_values = [int(x.split()[0]) for i, x in enumerate(memory_free_info)]
return memory_free_values
except (sp.CalledProcessError, FileNotFoundError, IndexError):
# Fallback to torch if nvidia-smi is not available
if torch.cuda.is_available():
device = torch.cuda.current_device()
total = torch.cuda.get_device_properties(device).total_memory / 1024 / 1024
reserved = torch.cuda.memory_reserved(device) / 1024 / 1024
return [int(total - reserved)]
return [0]
def get_vram_usage() -> dict:
"""
Get current VRAM usage statistics.
Returns:
Dictionary with VRAM usage info (in MB)
"""
if torch.cuda.is_available():
device = torch.cuda.current_device()
allocated = torch.cuda.memory_allocated(device) / 1024 / 1024 # Convert to MB
reserved = torch.cuda.memory_reserved(device) / 1024 / 1024 # Convert to MB
total = torch.cuda.get_device_properties(device).total_memory / 1024 / 1024
return {
"allocated_mb": allocated,
"reserved_mb": reserved,
"total_mb": total,
"free_mb": total - reserved,
}
else:
return {"allocated_mb": 0.0, "reserved_mb": 0.0, "total_mb": 0.0, "free_mb": 0.0}
def get_available_models() -> List[Union[str, ModelProtocol]]:
"""
Get list of available models for testing.
Returns:
List of model strings and model classes
"""
local_provider = "huggingface-local/" # Options: huggingface-local/ or mlx/
# from models.gta1 import GTA1Model
models = [
# === ComputerAgent model strings ===
"openai/computer-use-preview",
"anthropic/claude-opus-4-20250514",
# f"{local_provider}HelloKKMe/GTA1-7B",
# f"{local_provider}HelloKKMe/GTA1-32B",
"openai/computer-use-preview+openai/gpt-4o-mini",
"anthropic/claude-opus-4-20250514+openai/gpt-4o-mini",
# === Reference model classes ===
# GTA1Model("HelloKKMe/GTA1-7B"),
# GTA1Model("HelloKKMe/GTA1-32B"),
]
return models
def is_click_in_bbox(click_coords: Optional[Tuple[int, int]], bbox: List[int]) -> bool:
"""
Check if click coordinates are within the bounding box.
Args:
click_coords: (x, y) coordinates or None
bbox: [x1, y1, x2, y2] bounding box
Returns:
True if click is within bbox, False otherwise
"""
if click_coords is None:
return False
x, y = click_coords
x1, y1, x2, y2 = bbox
return x1 <= x <= x2 and y1 <= y <= y2
def image_to_base64(image: Image.Image) -> str:
"""
Convert PIL Image to base64 string.
Args:
image: PIL Image
Returns:
Base64 encoded image string
"""
buffered = BytesIO()
image.save(buffered, format="PNG")
return base64.b64encode(buffered.getvalue()).decode()
class ModelWrapper:
"""
Wrapper to provide unified interface for both ComputerAgent and custom models.
"""
def __init__(self, model: Union[str, ModelProtocol]):
self.model = model
self.is_computer_agent = isinstance(model, str)
self.agent: Optional[ComputerAgent] = None
self.vram_usage_history: List[float] = [] # Track VRAM usage over time
if self.is_computer_agent:
self.model_name = str(model)
else:
self.model_name = (
f"{model.__class__.__name__}('{getattr(model, 'model_name', 'unknown')}')"
)
async def load_model(self) -> None:
"""Load the model."""
if self.is_computer_agent:
self.agent = ComputerAgent(model=str(self.model))
else:
await self.model.load_model() # type: ignore
# Record initial VRAM usage after loading
vram_info = get_vram_usage()
self.vram_usage_history.append(vram_info["allocated_mb"])
async def unload_model(self) -> None:
"""Unload the model."""
if not self.is_computer_agent:
await self.model.unload_model() # type: ignore
else:
del self.agent
self.agent = None
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Record VRAM usage after unloading
vram_info = get_vram_usage()
self.vram_usage_history.append(vram_info["allocated_mb"])
def get_vram_stats(self) -> dict:
"""Get VRAM usage statistics for this model."""
if not self.vram_usage_history:
return {"max_mb": 0.0, "avg_mb": 0.0}
return {
"max_mb": max(self.vram_usage_history),
"avg_mb": sum(self.vram_usage_history) / len(self.vram_usage_history),
}
async def predict_click(
self, image: Image.Image, instruction: str
) -> Optional[Tuple[int, int]]:
"""Predict click coordinates."""
# Record VRAM usage before prediction
vram_info = get_vram_usage()
self.vram_usage_history.append(vram_info["allocated_mb"])
if self.is_computer_agent:
if self.agent is None:
await self.load_model()
if self.agent is not None:
image_b64 = image_to_base64(image)
result = await self.agent.predict_click(
instruction=instruction, image_b64=image_b64
)
# Record VRAM usage after prediction
vram_info = get_vram_usage()
self.vram_usage_history.append(vram_info["allocated_mb"])
return result
return None
else:
result = await self.model.predict_click(image, instruction) # type: ignore
# Record VRAM usage after prediction
vram_info = get_vram_usage()
self.vram_usage_history.append(vram_info["allocated_mb"])
return result
def save_results_to_markdown(
all_results: List[dict],
output_file: str = "screenspot_pro_results.md",
title: str = "ScreenSpot-Pro Benchmark Results",
) -> None:
"""
Save evaluation results to a markdown table.
Args:
all_results: List of evaluation results for each model
output_file: Output markdown file path
"""
with open(output_file, "w", encoding="utf-8") as f:
f.write(f"# {title}\n\n")
f.write(f"**Evaluation Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
# Summary table
f.write("## Summary\n\n")
f.write(
"| Model | Total Samples | Correct | Errors | Accuracy | Error Rate | Avg Time (s) | Median Time (s) | Time Range (s) | VRAM Max (GB) | VRAM Avg (GB) |\n"
)
f.write(
"|-------|---------------|---------|--------|----------|------------|--------------|-----------------|----------------|---------------|---------------|\n"
)
for result in all_results:
model_name = result["model_name"]
total = result["total_samples"]
correct = result["correct_predictions"]
errors = result["failed_predictions"]
accuracy = result["accuracy"] * 100
error_rate = result["failure_rate"] * 100
avg_time = result.get("avg_prediction_time", 0.0)
median_time = result.get("median_prediction_time", 0.0)
min_time = result.get("min_prediction_time", 0.0)
max_time = result.get("max_prediction_time", 0.0)
time_range = f"{min_time:.2f} - {max_time:.2f}"
vram_max = result.get("vram_max_mb", 0.0) / 1024
vram_avg = result.get("vram_avg_mb", 0.0) / 1024
f.write(
f"| {model_name} | {total} | {correct} | {errors} | {accuracy:.2f}% | {error_rate:.2f}% | {avg_time:.2f} | {median_time:.2f} | {time_range} | {vram_max:.1f} | {vram_avg:.1f} |\n"
)
# Detailed results for each model
for result in all_results:
f.write(f"\n## {result['model_name']} - Detailed Results\n\n")
f.write(
"| Sample Index | Instruction | BBox | Predicted | Correct | Error | Time (s) |\n"
)
f.write("|-----------|-------------|------|-----------|---------|-------|----------|\n")
for sample_result in result["results"][:10]: # Show first 10 samples
sample_idx = sample_result["sample_idx"]
instruction = (
sample_result["instruction"][:50] + "..."
if len(sample_result["instruction"]) > 50
else sample_result["instruction"]
)
bbox = str(sample_result["bbox"])
predicted = (
str(sample_result["predicted_coords"])
if sample_result["predicted_coords"]
else "None"
)
correct = "PASS" if sample_result["is_correct"] else "FAIL"
error = "YES" if sample_result["failed"] else "NO"
pred_time = sample_result.get("prediction_time", 0.0)
f.write(
f"| {sample_idx} | {instruction} | {bbox} | {predicted} | {correct} | {error} | {pred_time:.2f} |\n"
)
if len(result["results"]) > 10:
f.write(f"\n*Showing first 10 of {len(result['results'])} samples*\n")
print(f"\nResults saved to: {output_file}")
def save_visualizations(all_results: List[dict], samples, output_dir: str = "output") -> None:
"""
Save visualizations of predicted coordinates vs bboxes to an output folder.
Args:
all_results: List of evaluation results for each model
samples: List of sample dicts with image, bbox, instruction keys
output_dir: Output directory path
"""
os.makedirs(output_dir, exist_ok=True)
for result in all_results:
model_name = result["model_name"].replace("/", "_").replace("\\", "_")
model_dir = os.path.join(output_dir, model_name)
os.makedirs(model_dir, exist_ok=True)
print(f"Saving visualizations for {result['model_name']}...")
# Save first 10 samples for visualization
for i, sample_result in enumerate(
tqdm(result["results"][:10], desc=f"Saving {model_name} visualizations")
):
# Get sample data using index
sample_idx = sample_result["sample_idx"]
if sample_idx < len(samples):
sample = samples[sample_idx]
image = sample["image"].copy() # Make a copy to avoid modifying original
else:
print(f"Warning: Could not find sample at index {sample_idx}")
continue
bbox = sample_result["bbox"]
predicted_coords = sample_result["predicted_coords"]
is_correct = sample_result["is_correct"]
# Draw on image
draw = ImageDraw.Draw(image)
# Draw bounding box (ground truth) in green
x1, y1, x2, y2 = bbox
draw.rectangle([x1, y1, x2, y2], outline="green", width=3)
draw.text((x1, y1 - 20), "Ground Truth", fill="green")
# Draw predicted click in red or blue
if predicted_coords is not None:
px, py = predicted_coords
color = "blue" if is_correct else "red"
# Draw crosshair
crosshair_size = 15
draw.line(
[(px - crosshair_size, py), (px + crosshair_size, py)], fill=color, width=3
)
draw.line(
[(px, py - crosshair_size), (px, py + crosshair_size)], fill=color, width=3
)
draw.text((px + 10, py - 20), f"Predicted ({px},{py})", fill=color)
# Add status text
status = "CORRECT" if is_correct else "INCORRECT"
status_color = "blue" if is_correct else "red"
draw.text((10, 10), f"Status: {status}", fill=status_color)
draw.text(
(10, 30), f"Instruction: {sample_result['instruction'][:50]}...", fill="black"
)
# Save image
filename = f"sample_{i+1:02d}_idx{sample_idx}_{status.lower()}.png"
filepath = os.path.join(model_dir, filename)
image.save(filepath)
print(f"Visualizations saved to: {model_dir}")
def save_prediction_visualization(
image: Image.Image,
instruction: str,
predictions: List[dict],
output_file: str = "interactive_prediction.png",
) -> None:
"""
Save visualization of multiple model predictions on a single image.
Args:
image: PIL Image to visualize
instruction: Instruction text
predictions: List of prediction dicts with keys: model_name, coords, error
output_file: Output file path
"""
# Create a copy of the image
vis_image = image.copy()
draw = ImageDraw.Draw(vis_image)
# Colors for different models
colors = ["red", "blue", "orange", "purple", "brown", "pink", "gray", "olive"]
# Draw predictions
for i, pred in enumerate(predictions):
color = colors[i % len(colors)]
model_name = pred["model_name"]
coords = pred.get("coords")
error = pred.get("error")
if coords is not None:
px, py = coords
# Draw crosshair
crosshair_size = 20
draw.line([(px - crosshair_size, py), (px + crosshair_size, py)], fill=color, width=4)
draw.line([(px, py - crosshair_size), (px, py + crosshair_size)], fill=color, width=4)
# Draw model name
draw.text((px + 15, py + 15), f"{model_name}: ({px},{py})", fill=color)
else:
# Draw error text
draw.text((10, 50 + i * 20), f"{model_name}: ERROR - {error}", fill=color)
# Add instruction at the top
draw.text((10, 10), f"Instruction: {instruction}", fill="black")
# Save image
vis_image.save(output_file)
print(f"Prediction visualization saved to: {output_file}")
def take_screenshot() -> Image.Image:
"""
Take a screenshot of the current screen.
Returns:
PIL Image of the screenshot
"""
try:
import pyautogui
screenshot = pyautogui.screenshot()
return screenshot
except ImportError:
print("pyautogui not installed. Please install it with: pip install pyautogui")
raise
except Exception as e:
print(f"Error taking screenshot: {e}")
raise
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/callbacks/trajectory_saver.py:
--------------------------------------------------------------------------------
```python
"""
Trajectory saving callback handler for ComputerAgent.
"""
import base64
import io
import json
import os
import uuid
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, override
from PIL import Image, ImageDraw
from .base import AsyncCallbackHandler
def sanitize_image_urls(data: Any) -> Any:
"""
Recursively search for 'image_url' keys and set their values to '[omitted]'.
Args:
data: Any data structure (dict, list, or primitive type)
Returns:
A deep copy of the data with all 'image_url' values replaced with '[omitted]'
"""
if isinstance(data, dict):
# Create a copy of the dictionary
sanitized = {}
for key, value in data.items():
if key == "image_url":
sanitized[key] = "[omitted]"
else:
# Recursively sanitize the value
sanitized[key] = sanitize_image_urls(value)
return sanitized
elif isinstance(data, list):
# Recursively sanitize each item in the list
return [sanitize_image_urls(item) for item in data]
else:
# For primitive types (str, int, bool, None, etc.), return as-is
return data
def extract_computer_call_outputs(
items: List[Dict[str, Any]], screenshot_dir: Optional[Path]
) -> List[Dict[str, Any]]:
"""
Save any base64-encoded screenshots from computer_call_output entries to files and
replace their image_url with the saved file path when a call_id is present.
Only operates if screenshot_dir is provided and exists; otherwise returns items unchanged.
Args:
items: List of message/result dicts potentially containing computer_call_output entries
screenshot_dir: Directory to write screenshots into
Returns:
A new list with updated image_url fields when applicable.
"""
if not items:
return items
if not screenshot_dir or not screenshot_dir.exists():
return items
updated: List[Dict[str, Any]] = []
for item in items:
# work on a shallow copy; deep copy nested 'output' if we modify it
msg = dict(item)
try:
if msg.get("type") == "computer_call_output":
call_id = msg.get("call_id")
output = msg.get("output", {})
image_url = output.get("image_url")
if call_id and isinstance(image_url, str) and image_url.startswith("data:"):
# derive extension from MIME type e.g. data:image/png;base64,
try:
ext = image_url.split(";", 1)[0].split("/")[-1]
if not ext:
ext = "png"
except Exception:
ext = "png"
out_path = screenshot_dir / f"{call_id}.{ext}"
# write file if it doesn't exist
if not out_path.exists():
try:
b64_payload = image_url.split(",", 1)[1]
img_bytes = base64.b64decode(b64_payload)
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "wb") as f:
f.write(img_bytes)
except Exception:
# if anything fails, skip modifying this message
pass
# update image_url to file path
new_output = dict(output)
new_output["image_url"] = str(out_path)
msg["output"] = new_output
except Exception:
# do not block on malformed entries; keep original
pass
updated.append(msg)
return updated
class TrajectorySaverCallback(AsyncCallbackHandler):
"""
Callback handler that saves agent trajectories to disk.
Saves each run as a separate trajectory with unique ID, and each turn
within the trajectory gets its own folder with screenshots and responses.
"""
def __init__(
self, trajectory_dir: str, reset_on_run: bool = True, screenshot_dir: Optional[str] = None
):
"""
Initialize trajectory saver.
Args:
trajectory_dir: Base directory to save trajectories
reset_on_run: If True, reset trajectory_id/turn/artifact on each run.
If False, continue using existing trajectory_id if set.
"""
self.trajectory_dir = Path(trajectory_dir)
self.trajectory_id: Optional[str] = None
self.current_turn: int = 0
self.current_artifact: int = 0
self.model: Optional[str] = None
self.total_usage: Dict[str, Any] = {}
self.reset_on_run = reset_on_run
# Optional directory to store extracted screenshots from metadata/new_items
self.screenshot_dir: Optional[Path] = Path(screenshot_dir) if screenshot_dir else None
# Ensure trajectory directory exists
self.trajectory_dir.mkdir(parents=True, exist_ok=True)
def _get_turn_dir(self) -> Path:
"""Get the directory for the current turn."""
if not self.trajectory_id:
raise ValueError("Trajectory not initialized - call _on_run_start first")
# format: trajectory_id/turn_000
turn_dir = self.trajectory_dir / self.trajectory_id / f"turn_{self.current_turn:03d}"
turn_dir.mkdir(parents=True, exist_ok=True)
return turn_dir
def _save_artifact(self, name: str, artifact: Union[str, bytes, Dict[str, Any]]) -> None:
"""Save an artifact to the current turn directory."""
turn_dir = self._get_turn_dir()
if isinstance(artifact, bytes):
# format: turn_000/0000_name.png
artifact_filename = f"{self.current_artifact:04d}_{name}"
artifact_path = turn_dir / f"{artifact_filename}.png"
with open(artifact_path, "wb") as f:
f.write(artifact)
else:
# format: turn_000/0000_name.json
artifact_filename = f"{self.current_artifact:04d}_{name}"
artifact_path = turn_dir / f"{artifact_filename}.json"
# add created_at
if isinstance(artifact, dict):
artifact = artifact.copy()
artifact["created_at"] = str(uuid.uuid1().time)
with open(artifact_path, "w") as f:
json.dump(sanitize_image_urls(artifact), f, indent=2)
self.current_artifact += 1
def _update_usage(self, usage: Dict[str, Any]) -> None:
"""Update total usage statistics."""
def add_dicts(target: Dict[str, Any], source: Dict[str, Any]) -> None:
for key, value in source.items():
if isinstance(value, dict):
if key not in target:
target[key] = {}
add_dicts(target[key], value)
else:
if key not in target:
target[key] = 0
target[key] += value
add_dicts(self.total_usage, usage)
@override
async def on_run_start(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]]) -> None:
"""Initialize trajectory tracking for a new run."""
model = kwargs.get("model", "unknown")
# Only reset trajectory state if reset_on_run is True or no trajectory exists
if self.reset_on_run or not self.trajectory_id:
model_name_short = model.split("+")[-1].split("/")[-1].lower()[:16]
if "+" in model:
model_name_short = model.split("+")[0].lower()[:4] + "_" + model_name_short
# strip non-alphanumeric characters from model_name_short
model_name_short = "".join(c for c in model_name_short if c.isalnum() or c == "_")
# id format: yyyy-mm-dd_model_hhmmss_uuid[:4]
now = datetime.now()
self.trajectory_id = f"{now.strftime('%Y-%m-%d')}_{model_name_short}_{now.strftime('%H%M%S')}_{str(uuid.uuid4())[:4]}"
self.current_turn = 0
self.current_artifact = 0
self.model = model
self.total_usage = {}
# Create trajectory directory
trajectory_path = self.trajectory_dir / self.trajectory_id
trajectory_path.mkdir(parents=True, exist_ok=True)
# Save trajectory metadata (optionally extract screenshots to screenshot_dir)
kwargs_to_save = kwargs.copy()
try:
if "messages" in kwargs_to_save:
kwargs_to_save["messages"] = extract_computer_call_outputs(
kwargs_to_save["messages"], self.screenshot_dir
)
except Exception:
# If extraction fails, fall back to original messages
pass
metadata = {
"trajectory_id": self.trajectory_id,
"created_at": str(uuid.uuid1().time),
"status": "running",
"kwargs": kwargs_to_save,
}
with open(trajectory_path / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
else:
# Continue with existing trajectory - just update model if needed
self.model = model
@override
async def on_run_end(
self,
kwargs: Dict[str, Any],
old_items: List[Dict[str, Any]],
new_items: List[Dict[str, Any]],
) -> None:
"""Finalize run tracking by updating metadata with completion status, usage, and new items."""
if not self.trajectory_id:
return
# Update metadata with completion status, total usage, and new items
trajectory_path = self.trajectory_dir / self.trajectory_id
metadata_path = trajectory_path / "metadata.json"
# Read existing metadata
if metadata_path.exists():
with open(metadata_path, "r") as f:
metadata = json.load(f)
else:
metadata = {}
# Update metadata with completion info
# Optionally extract screenshots from new_items before persisting
new_items_to_save = new_items
try:
new_items_to_save = extract_computer_call_outputs(new_items, self.screenshot_dir)
except Exception:
pass
metadata.update(
{
"status": "completed",
"completed_at": str(uuid.uuid1().time),
"total_usage": self.total_usage,
"new_items": new_items_to_save,
"total_turns": self.current_turn,
}
)
# Save updated metadata
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
@override
async def on_api_start(self, kwargs: Dict[str, Any]) -> None:
if not self.trajectory_id:
return
self._save_artifact("api_start", {"kwargs": kwargs})
@override
async def on_api_end(self, kwargs: Dict[str, Any], result: Any) -> None:
"""Save API call result."""
if not self.trajectory_id:
return
self._save_artifact("api_result", {"kwargs": kwargs, "result": result})
@override
async def on_screenshot(self, screenshot: Union[str, bytes], name: str = "screenshot") -> None:
"""Save a screenshot."""
if isinstance(screenshot, str):
screenshot = base64.b64decode(screenshot)
self._save_artifact(name, screenshot)
@override
async def on_usage(self, usage: Dict[str, Any]) -> None:
"""Called when usage information is received."""
self._update_usage(usage)
@override
async def on_responses(self, kwargs: Dict[str, Any], responses: Dict[str, Any]) -> None:
"""Save responses to the current turn directory and update usage statistics."""
if not self.trajectory_id:
return
# Save responses
turn_dir = self._get_turn_dir()
response_data = {
"timestamp": str(uuid.uuid1().time),
"model": self.model,
"kwargs": kwargs,
"response": responses,
}
self._save_artifact("agent_response", response_data)
# Increment turn counter
self.current_turn += 1
def _draw_crosshair_on_image(self, image_bytes: bytes, x: int, y: int) -> bytes:
"""
Draw a red dot and crosshair at the specified coordinates on the image.
Args:
image_bytes: The original image as bytes
x: X coordinate for the crosshair
y: Y coordinate for the crosshair
Returns:
Modified image as bytes with red dot and crosshair
"""
# Open the image
image = Image.open(io.BytesIO(image_bytes))
draw = ImageDraw.Draw(image)
# Draw crosshair lines (red, 2px thick)
crosshair_size = 20
line_width = 2
color = "red"
# Horizontal line
draw.line([(x - crosshair_size, y), (x + crosshair_size, y)], fill=color, width=line_width)
# Vertical line
draw.line([(x, y - crosshair_size), (x, y + crosshair_size)], fill=color, width=line_width)
# Draw center dot (filled circle)
dot_radius = 3
draw.ellipse(
[(x - dot_radius, y - dot_radius), (x + dot_radius, y + dot_radius)], fill=color
)
# Convert back to bytes
output = io.BytesIO()
image.save(output, format="PNG")
return output.getvalue()
@override
async def on_computer_call_end(
self, item: Dict[str, Any], result: List[Dict[str, Any]]
) -> None:
"""
Called when a computer call has completed.
Saves screenshots and computer call output.
"""
if not self.trajectory_id:
return
self._save_artifact("computer_call_result", {"item": item, "result": result})
# Check if action has x/y coordinates and there's a screenshot in the result
action = item.get("action", {})
if "x" in action and "y" in action:
# Look for screenshot in the result
for result_item in result:
if (
result_item.get("type") == "computer_call_output"
and result_item.get("output", {}).get("type") == "input_image"
):
image_url = result_item["output"]["image_url"]
# Extract base64 image data
if image_url.startswith("data:image/"):
# Format: data:image/png;base64,<base64_data>
base64_data = image_url.split(",", 1)[1]
else:
# Assume it's just base64 data
base64_data = image_url
try:
# Decode the image
image_bytes = base64.b64decode(base64_data)
# Draw crosshair at the action coordinates
annotated_image = self._draw_crosshair_on_image(
image_bytes, int(action["x"]), int(action["y"])
)
# Save as screenshot_action
self._save_artifact("screenshot_action", annotated_image)
except Exception as e:
# If annotation fails, just log and continue
print(f"Failed to annotate screenshot: {e}")
break # Only process the first screenshot found
# Increment turn counter
self.current_turn += 1
```