#
tokens: 48213/50000 12/616 files (page 11/20)
lines: off (toggle) GitHub
raw markdown copy
This is page 11 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── bump-version.yml
│       ├── ci-lume.yml
│       ├── docker-publish-cua-linux.yml
│       ├── docker-publish-cua-windows.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── link-check.yml
│       ├── lint.yml
│       ├── npm-publish-cli.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       ├── python-tests.yml
│       ├── test-cua-models.yml
│       └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│   ├── docs.code-workspace
│   ├── extensions.json
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── cloud-windows-ga-macos-preview.md
│   ├── composite-agents.md
│   ├── computer-use-agents-for-growth-hacking.md
│   ├── cua-hackathon.md
│   ├── cua-playground-preview.md
│   ├── cua-vlm-router.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cli.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── neurips-2025-cua-papers.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .env.example
│   ├── .gitignore
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── observability.mdx
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── cua-vlm-router.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   ├── telemetry.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── cli-playbook
│   │       │   ├── commands.mdx
│   │       │   ├── index.mdx
│   │       │   └── meta.json
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── meta.json
│   │       │   ├── sandboxed-python.mdx
│   │       │   └── tracing-api.mdx
│   │       ├── example-usecases
│   │       │   ├── form-filling.mdx
│   │       │   ├── gemini-complex-ui-navigation.mdx
│   │       │   ├── meta.json
│   │       │   ├── post-event-contact-export.mdx
│   │       │   └── windows-app-behind-vpn.mdx
│   │       ├── get-started
│   │       │   ├── meta.json
│   │       │   └── quickstart.mdx
│   │       ├── index.mdx
│   │       ├── macos-vm-cli-playbook
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   └── meta.json
│   │       └── meta.json
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── bg-dark.jpg
│   │       ├── bg-light.jpg
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── grounding-with-gemini3.gif
│   │       ├── hero.png
│   │       ├── laminar_trace_example.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   ├── posthog
│   │   │   │   │   └── [...path]
│   │   │   │   │       └── route.ts
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   ├── llms.txt
│   │   │   │   └── route.ts
│   │   │   ├── robots.ts
│   │   │   └── sitemap.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── analytics-tracker.tsx
│   │   │   ├── cookie-consent.tsx
│   │   │   ├── doc-actions-menu.tsx
│   │   │   ├── editable-code-block.tsx
│   │   │   ├── footer.tsx
│   │   │   ├── hero.tsx
│   │   │   ├── iou.tsx
│   │   │   ├── mermaid.tsx
│   │   │   └── page-feedback.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   ├── mdx-components.tsx
│   │   └── providers
│   │       └── posthog-provider.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── browser_tool_example.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── tracing_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── cua_adapter.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gelato.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── generic_vlm.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   ├── uiins.py
│   │   │   │   │   ├── uitars.py
│   │   │   │   │   └── uitars2.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── tools
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── browser_tool.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer_agent.py
│   │   ├── bench-ui
│   │   │   ├── bench_ui
│   │   │   │   ├── __init__.py
│   │   │   │   ├── api.py
│   │   │   │   └── child.py
│   │   │   ├── examples
│   │   │   │   ├── folder_example.py
│   │   │   │   ├── gui
│   │   │   │   │   ├── index.html
│   │   │   │   │   ├── logo.svg
│   │   │   │   │   └── styles.css
│   │   │   │   ├── output_overlay.png
│   │   │   │   └── simple_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       └── test_port_detection.py
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── tracing_wrapper.py
│   │   │   │   ├── tracing.py
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer.py
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── browser.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   ├── utils
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── wallpaper.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   ├── test_connection.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_server.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_telemetry.py
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── build-extension.py
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── desktop-extension
│   │   │   │   ├── cua-extension.mcpb
│   │   │   │   ├── desktop_extension.png
│   │   │   │   ├── manifest.json
│   │   │   │   ├── README.md
│   │   │   │   ├── requirements.txt
│   │   │   │   ├── run_server.sh
│   │   │   │   └── setup.py
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── QUICK_TEST_COMMANDS.sh
│   │   │   ├── quick_test_local_option.py
│   │   │   ├── README.md
│   │   │   ├── scripts
│   │   │   │   ├── install_mcp_server.sh
│   │   │   │   └── start_mcp_server.sh
│   │   │   ├── test_mcp_server_local_option.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_mcp_server.py
│   │   ├── pylume
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_pylume.py
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           ├── conftest.py
│   │           └── test_omniparser.py
│   ├── qemu-docker
│   │   ├── linux
│   │   │   ├── Dockerfile
│   │   │   ├── README.md
│   │   │   └── src
│   │   │       ├── entry.sh
│   │   │       └── vm
│   │   │           ├── image
│   │   │           │   └── README.md
│   │   │           └── setup
│   │   │               ├── install.sh
│   │   │               ├── setup-cua-server.sh
│   │   │               └── setup.sh
│   │   ├── README.md
│   │   └── windows
│   │       ├── Dockerfile
│   │       ├── README.md
│   │       └── src
│   │           ├── entry.sh
│   │           └── vm
│   │               ├── image
│   │               │   └── README.md
│   │               └── setup
│   │                   ├── install.bat
│   │                   ├── on-logon.ps1
│   │                   ├── setup-cua-server.ps1
│   │                   ├── setup-utils.psm1
│   │                   └── setup.ps1
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── cua-cli
│   │   │   ├── .gitignore
│   │   │   ├── .prettierrc
│   │   │   ├── bun.lock
│   │   │   ├── CLAUDE.md
│   │   │   ├── index.ts
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── auth.ts
│   │   │   │   ├── cli.ts
│   │   │   │   ├── commands
│   │   │   │   │   ├── auth.ts
│   │   │   │   │   └── sandbox.ts
│   │   │   │   ├── config.ts
│   │   │   │   ├── http.ts
│   │   │   │   ├── storage.ts
│   │   │   │   └── util.ts
│   │   │   └── tsconfig.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Development.md
│       ├── Dockerfile
│       ├── Dockerfile.dev
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── install-cli.ps1
│   ├── install-cli.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   ├── run-docker-dev.sh
│   └── typescript-typecheck.js
├── TESTING.md
├── tests
│   ├── agent_loop_testing
│   │   ├── agent_test.py
│   │   └── README.md
│   ├── pytest.ini
│   ├── shell_cmd.py
│   ├── test_files.py
│   ├── test_mcp_server_session_management.py
│   ├── test_mcp_server_streaming.py
│   ├── test_shell_bash.py
│   ├── test_telemetry.py
│   ├── test_tracing.py
│   ├── test_venv.py
│   └── test_watchdog.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/blog/sandboxed-python-execution.md:
--------------------------------------------------------------------------------

```markdown
# Sandboxed Python Execution: Run Code Safely in Cua Containers

_Published on June 23, 2025 by Dillon DuPont_

Cua's computer-use capabilities that we touched on in [Building your own Operator on macOS - Part 2](build-your-own-operator-on-macos-2.md) – your AI agents can click, scroll, type, and interact with any desktop application. But what if your agent needs to do more than just UI automation? What if it needs to process data, make API calls, analyze images, or run complex logic alongside those UI interactions, within the same virtual environment?

That's where Cua's `@sandboxed` decorator comes in. While Cua handles the clicking and typing, sandboxed execution lets you run full Python code inside the same virtual environment. It's like giving your AI agents a programming brain to complement their clicking fingers.

Think of it as the perfect marriage: Cua handles the "what you see" (UI interactions), while sandboxed Python handles the "what you compute" (data processing, logic, API calls) – all happening in the same isolated environment.

## So, what exactly is sandboxed execution?

Cua excels at automating user interfaces – clicking buttons, filling forms, navigating applications. But modern AI agents need to do more than just UI automation. They need to process the data they collect, make intelligent decisions, call external APIs, and run sophisticated algorithms.

Sandboxed execution bridges this gap. You write a Python function, decorate it with `@sandboxed`, and it runs inside your Cua container alongside your UI automation. Your agent can now click a button, extract some data, process it with Python, and then use those results to decide what to click next.

Here's what makes this combination powerful for AI agent development:

- **Unified environment**: Your UI automation and code execution happen in the same container
- **Rich capabilities**: Combine Cua's clicking with Python's data processing, API calls, and libraries
- **Seamless integration**: Pass data between UI interactions and Python functions effortlessly
- **Cross-platform consistency**: Your Python code runs the same way across different Cua environments
- **Complete workflows**: Build agents that can both interact with apps AND process the data they collect

## The architecture behind @sandboxed

Let's jump right into an example that'll make this crystal clear:

```python
from computer.helpers import sandboxed

@sandboxed("demo_venv")
def greet_and_print(name):
    """This function runs inside the container"""
    import PyXA  # macOS-specific library
    safari = PyXA.Application("Safari")
    html = safari.current_document.source()
    print(f"Hello from inside the container, {name}!")
    return {"greeted": name, "safari_html": html}

# When called, this executes in the container
result = await greet_and_print("Cua")
```

What's happening here? When you call `greet_and_print()`, Cua extracts the function's source code, transmits it to the container, and executes it there. The result returns to you seamlessly, while the actual execution remains completely isolated.

## How does sandboxed execution work?

Cua's sandboxed execution system employs several key architectural components:

### 1. Source Code Extraction

Cua uses Python's `inspect.getsource()` to extract your function's source code and reconstruct the function definition in the remote environment.

### 2. Virtual Environment Isolation

Each sandboxed function runs in a named virtual environment within the container. This provides complete dependency isolation between different functions and their respective environments.

### 3. Data Serialization and Transport

Arguments and return values are serialized as JSON and transported between the host and container. This ensures compatibility across different Python versions and execution environments.

### 4. Comprehensive Error Handling

The system captures both successful results and exceptions, preserving stack traces and error information for debugging purposes.

## Getting your sandbox ready

Setting up sandboxed execution is simple:

```python
import asyncio
from computer.computer import Computer
from computer.helpers import sandboxed, set_default_computer

async def main():
    # Fire up the computer
    computer = Computer()
    await computer.run()

    # Make it the default for all sandboxed functions
    set_default_computer(computer)

    # Install some packages in a virtual environment
    await computer.venv_install("demo_venv", ["requests", "beautifulsoup4"])
```

If you want to get fancy, you can specify which computer instance to use:

```python
@sandboxed("my_venv", computer=my_specific_computer)
def my_function():
    # This runs on your specified computer instance
    pass
```

## Real-world examples that actually work

### Browser automation without the headaches

Ever tried to automate a browser and had it crash your entire system? Yeah, us too. Here's how to do it safely:

```python
@sandboxed("browser_env")
def automate_browser_with_playwright():
    """Automate browser interactions using Playwright"""
    from playwright.sync_api import sync_playwright
    import time
    import base64
    from datetime import datetime

    try:
        with sync_playwright() as p:
            # Launch browser (visible, because why not?)
            browser = p.chromium.launch(
                headless=False,
                args=['--no-sandbox', '--disable-dev-shm-usage']
            )

            page = browser.new_page()
            page.set_viewport_size({"width": 1280, "height": 720})

            actions = []
            screenshots = {}

            # Let's visit example.com and poke around
            page.goto("https://example.com")
            actions.append("Navigated to example.com")

            # Grab a screenshot because screenshots are cool
            screenshot_bytes = page.screenshot(full_page=True)
            screenshots["initial"] = base64.b64encode(screenshot_bytes).decode()

            # Get some basic info
            title = page.title()
            actions.append(f"Page title: {title}")

            # Find links and headings
            try:
                links = page.locator("a").all()
                link_texts = [link.text_content() for link in links[:5]]
                actions.append(f"Found {len(links)} links: {link_texts}")

                headings = page.locator("h1, h2, h3").all()
                heading_texts = [h.text_content() for h in headings[:3]]
                actions.append(f"Found headings: {heading_texts}")

            except Exception as e:
                actions.append(f"Element interaction error: {str(e)}")

            # Let's try a form for good measure
            try:
                page.goto("https://httpbin.org/forms/post")
                actions.append("Navigated to form page")

                # Fill out the form
                page.fill('input[name="custname"]', "Test User from Sandboxed Environment")
                page.fill('input[name="custtel"]', "555-0123")
                page.fill('input[name="custemail"]', "[email protected]")
                page.select_option('select[name="size"]', "large")

                actions.append("Filled out form fields")

                # Submit and see what happens
                page.click('input[type="submit"]')
                page.wait_for_load_state("networkidle")

                actions.append("Submitted form")

            except Exception as e:
                actions.append(f"Form interaction error: {str(e)}")

            browser.close()

            return {
                "actions_performed": actions,
                "screenshots": screenshots,
                "success": True
            }

    except Exception as e:
        return {"error": f"Browser automation failed: {str(e)}"}

# Install Playwright and its browsers
await computer.venv_install("browser_env", ["playwright"])
await computer.venv_cmd("browser_env", "playwright install chromium")

# Run the automation
result = await automate_browser_with_playwright()
print(f"Performed {len(result.get('actions_performed', []))} actions")
```

### Building code analysis agents

Want to build agents that can analyze code safely? Here's a security audit tool that won't accidentally `eval()` your system into oblivion:

```python
@sandboxed("analysis_env")
def security_audit_tool(code_snippet):
    """Analyze code for potential security issues"""
    import ast
    import re

    issues = []

    # Check for the usual suspects
    dangerous_patterns = [
        (r'eval\s*\(', "Use of eval() function"),
        (r'exec\s*\(', "Use of exec() function"),
        (r'__import__\s*\(', "Dynamic import usage"),
        (r'subprocess\.', "Subprocess usage"),
        (r'os\.system\s*\(', "OS system call"),
    ]

    for pattern, description in dangerous_patterns:
        if re.search(pattern, code_snippet):
            issues.append(description)

    # Get fancy with AST analysis
    try:
        tree = ast.parse(code_snippet)
        for node in ast.walk(tree):
            if isinstance(node, ast.Call):
                if hasattr(node.func, 'id'):
                    if node.func.id in ['eval', 'exec', 'compile']:
                        issues.append(f"Dangerous function call: {node.func.id}")
    except SyntaxError:
        issues.append("Syntax error in code")

    return {
        "security_issues": issues,
        "risk_level": "HIGH" if len(issues) > 2 else "MEDIUM" if issues else "LOW"
    }

# Test it on some sketchy code
audit_result = await security_audit_tool("eval(user_input)")
print(f"Security audit: {audit_result}")
```

### Desktop automation in the cloud

Here's where things get really interesting. Cua Cloud Sandbox comes with full desktop environments, so you can automate GUIs:

```python
@sandboxed("desktop_env")
def take_screenshot_and_analyze():
    """Take a screenshot and analyze the desktop"""
    import io
    import base64
    from PIL import ImageGrab
    from datetime import datetime

    try:
        # Grab the screen
        screenshot = ImageGrab.grab()

        # Convert to base64 for easy transport
        buffer = io.BytesIO()
        screenshot.save(buffer, format='PNG')
        screenshot_data = base64.b64encode(buffer.getvalue()).decode()

        # Get some basic info
        screen_info = {
            "size": screenshot.size,
            "mode": screenshot.mode,
            "timestamp": datetime.now().isoformat()
        }

        # Analyze the colors (because why not?)
        colors = screenshot.getcolors(maxcolors=256*256*256)
        dominant_color = max(colors, key=lambda x: x[0])[1] if colors else None

        return {
            "screenshot_base64": screenshot_data,
            "screen_info": screen_info,
            "dominant_color": dominant_color,
            "unique_colors": len(colors) if colors else 0
        }

    except Exception as e:
        return {"error": f"Screenshot failed: {str(e)}"}

# Install the dependencies
await computer.venv_install("desktop_env", ["Pillow"])

# Take and analyze a screenshot
result = await take_screenshot_and_analyze()
print("Desktop analysis complete!")
```

## Pro tips for sandboxed success

### Keep it self-contained

Always put your imports inside the function. Trust us on this one:

```python
@sandboxed("good_env")
def good_function():
    import os  # Import inside the function
    import json

    # Your code here
    return {"result": "success"}
```

### Install dependencies first

Don't forget to install packages before using them:

```python
# Install first
await computer.venv_install("my_env", ["pandas", "numpy", "matplotlib"])

@sandboxed("my_env")
def data_analysis():
    import pandas as pd
    import numpy as np
    # Now you can use them
```

### Use descriptive environment names

Future you will thank you:

```python
@sandboxed("data_processing_env")
def process_data(): pass

@sandboxed("web_scraping_env")
def scrape_site(): pass

@sandboxed("ml_training_env")
def train_model(): pass
```

### Always handle errors gracefully

Things break. Plan for it:

```python
@sandboxed("robust_env")
def robust_function(data):
    try:
        result = process_data(data)
        return {"success": True, "result": result}
    except Exception as e:
        return {"success": False, "error": str(e)}
```

## What about performance?

Let's be honest – there's some overhead here. Code needs to be serialized, sent over the network, and executed remotely. But for most use cases, the benefits far outweigh the costs.

If you're building something performance-critical, consider:

- Batching multiple operations into a single sandboxed function
- Minimizing data transfer between host and container
- Using persistent virtual environments

## The security angle

This is where sandboxed execution really shines:

1. **Complete process isolation** – code runs in a separate container
2. **File system protection** – limited access to your host files
3. **Network isolation** – controlled network access
4. **Clean environments** – no package conflicts or pollution
5. **Resource limits** – container-level constraints keep things in check

## Ready to get started?

The `@sandboxed` decorator is one of those features that sounds simple but opens up a world of possibilities. Whether you're testing sketchy code, building AI agents, or just want to keep your development environment pristine, it's got you covered.

Give it a try in your next Cua project and see how liberating it feels to run code without fear!

Happy coding (safely)!

---

_Want to dive deeper? Check out our [sandboxed functions examples](https://github.com/trycua/cua/blob/main/examples/sandboxed_functions_examples.py) and [virtual environment tests](https://github.com/trycua/cua/blob/main/tests/test_venv.py) on GitHub. Questions? Come chat with us on Discord!_

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/gemini.py:
--------------------------------------------------------------------------------

```python
"""
Gemini 2.5 Computer Use agent loop

Maps internal Agent SDK message format to Google's Gemini Computer Use API and back.

Key features:
- Lazy import of google.genai
- Configure Computer Use tool with excluded browser-specific predefined functions
- Optional custom function declarations hook for computer-call specific functions
- Convert Gemini function_call parts into internal computer_call actions
"""

from __future__ import annotations

import base64
import io
import uuid
from typing import Any, Dict, List, Optional, Tuple

from PIL import Image

from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..types import AgentCapability


def _lazy_import_genai():
    """Import google.genai lazily to avoid hard dependency unless used."""
    try:
        from google import genai  # type: ignore
        from google.genai import types  # type: ignore

        return genai, types
    except Exception as e:  # pragma: no cover
        raise RuntimeError(
            "google.genai is required for the Gemini Computer Use loop. Install the Google Gemini SDK."
        ) from e


def _data_url_to_bytes(data_url: str) -> Tuple[bytes, str]:
    """Convert a data URL to raw bytes and mime type."""
    if not data_url.startswith("data:"):
        # Assume it's base64 png payload
        try:
            return base64.b64decode(data_url), "image/png"
        except Exception:
            return b"", "application/octet-stream"
    header, b64 = data_url.split(",", 1)
    mime = "image/png"
    if ";" in header:
        mime = header.split(";")[0].split(":", 1)[1] or "image/png"
    return base64.b64decode(b64), mime


def _bytes_image_size(img_bytes: bytes) -> Tuple[int, int]:
    try:
        img = Image.open(io.BytesIO(img_bytes))
        return img.size
    except Exception:
        return (1024, 768)


def _find_last_user_text(messages: List[Dict[str, Any]]) -> List[str]:
    texts: List[str] = []
    for msg in reversed(messages):
        if msg.get("type") in (None, "message") and msg.get("role") == "user":
            content = msg.get("content")
            if isinstance(content, str):
                return [content]
            elif isinstance(content, list):
                for c in content:
                    if c.get("type") in ("input_text", "output_text") and c.get("text"):
                        texts.append(c["text"])  # newest first
                if texts:
                    return list(reversed(texts))
    return []


def _find_last_screenshot(messages: List[Dict[str, Any]]) -> Optional[bytes]:
    for msg in reversed(messages):
        if msg.get("type") == "computer_call_output":
            out = msg.get("output", {})
            if isinstance(out, dict) and out.get("type") in ("input_image", "computer_screenshot"):
                image_url = out.get("image_url", "")
                if image_url:
                    data, _ = _data_url_to_bytes(image_url)
                    return data
    return None


def _denormalize(v: int, size: int) -> int:
    # Gemini returns 0-999 normalized
    try:
        return max(0, min(size - 1, int(round(v / 1000 * size))))
    except Exception:
        return 0


def _map_gemini_fc_to_computer_call(
    fc: Dict[str, Any],
    screen_w: int,
    screen_h: int,
) -> Optional[Dict[str, Any]]:
    name = fc.get("name")
    args = fc.get("args", {}) or {}

    action: Dict[str, Any] = {}
    if name == "click_at":
        x = _denormalize(int(args.get("x", 0)), screen_w)
        y = _denormalize(int(args.get("y", 0)), screen_h)
        action = {"type": "click", "x": x, "y": y, "button": "left"}
    elif name == "type_text_at":
        x = _denormalize(int(args.get("x", 0)), screen_w)
        y = _denormalize(int(args.get("y", 0)), screen_h)
        text = args.get("text", "")
        if args.get("press_enter") == True:
            text += "\n"
        action = {"type": "type", "x": x, "y": y, "text": text}
    elif name == "hover_at":
        x = _denormalize(int(args.get("x", 0)), screen_w)
        y = _denormalize(int(args.get("y", 0)), screen_h)
        action = {"type": "move", "x": x, "y": y}
    elif name == "key_combination":
        keys = str(args.get("keys", ""))
        action = {"type": "keypress", "keys": keys}
    elif name == "scroll_document":
        direction = args.get("direction", "down")
        magnitude = 800
        dx, dy = 0, 0
        if direction == "down":
            dy = magnitude
        elif direction == "up":
            dy = -magnitude
        elif direction == "right":
            dx = magnitude
        elif direction == "left":
            dx = -magnitude
        action = {
            "type": "scroll",
            "scroll_x": dx,
            "scroll_y": dy,
            "x": int(screen_w / 2),
            "y": int(screen_h / 2),
        }
    elif name == "scroll_at":
        x = _denormalize(int(args.get("x", 500)), screen_w)
        y = _denormalize(int(args.get("y", 500)), screen_h)
        direction = args.get("direction", "down")
        magnitude = int(args.get("magnitude", 800))
        dx, dy = 0, 0
        if direction == "down":
            dy = magnitude
        elif direction == "up":
            dy = -magnitude
        elif direction == "right":
            dx = magnitude
        elif direction == "left":
            dx = -magnitude
        action = {"type": "scroll", "scroll_x": dx, "scroll_y": dy, "x": x, "y": y}
    elif name == "drag_and_drop":
        x = _denormalize(int(args.get("x", 0)), screen_w)
        y = _denormalize(int(args.get("y", 0)), screen_h)
        dx = _denormalize(int(args.get("destination_x", x)), screen_w)
        dy = _denormalize(int(args.get("destination_y", y)), screen_h)
        action = {
            "type": "drag",
            "start_x": x,
            "start_y": y,
            "end_x": dx,
            "end_y": dy,
            "button": "left",
        }
    elif name == "wait_5_seconds":
        action = {"type": "wait"}
    else:
        # Unsupported / excluded browser-specific or custom function; ignore
        return None

    return {
        "type": "computer_call",
        "call_id": uuid.uuid4().hex,
        "status": "completed",
        "action": action,
    }


@register_agent(models=r"^gemini-2\.5-computer-use-preview-10-2025$")
class GeminiComputerUseConfig(AsyncAgentConfig):
    async def predict_step(
        self,
        messages: List[Dict[str, Any]],
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        use_prompt_caching: Optional[bool] = False,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs,
    ) -> Dict[str, Any]:
        genai, types = _lazy_import_genai()

        client = genai.Client()

        # Build excluded predefined functions for browser-specific behavior
        excluded = [
            "open_web_browser",
            "search",
            "navigate",
            "go_forward",
            "go_back",
            "scroll_document",
        ]
        # Optional custom functions: can be extended by host code via `tools` parameter later if desired
        CUSTOM_FUNCTION_DECLARATIONS: List[Any] = []

        # Compose tools config
        generate_content_config = types.GenerateContentConfig(
            tools=[
                types.Tool(
                    computer_use=types.ComputerUse(
                        environment=types.Environment.ENVIRONMENT_BROWSER,
                        excluded_predefined_functions=excluded,
                    )
                ),
                # types.Tool(function_declarations=CUSTOM_FUNCTION_DECLARATIONS),  # enable when custom functions needed
            ]
        )

        # Prepare contents: last user text + latest screenshot
        user_texts = _find_last_user_text(messages)
        screenshot_bytes = _find_last_screenshot(messages)

        parts: List[Any] = []
        for t in user_texts:
            parts.append(types.Part(text=t))

        screen_w, screen_h = 1024, 768
        if screenshot_bytes:
            screen_w, screen_h = _bytes_image_size(screenshot_bytes)
            parts.append(types.Part.from_bytes(data=screenshot_bytes, mime_type="image/png"))

        # If we don't have any content, at least pass an empty user part to prompt reasoning
        if not parts:
            parts = [types.Part(text="Proceed to the next action.")]

        contents = [types.Content(role="user", parts=parts)]

        api_kwargs = {
            "model": model,
            "contents": contents,
            "config": generate_content_config,
        }

        if _on_api_start:
            await _on_api_start(
                {
                    "model": api_kwargs["model"],
                    # "contents": api_kwargs["contents"], # Disabled for now
                    "config": api_kwargs["config"],
                }
            )

        response = client.models.generate_content(**api_kwargs)

        if _on_api_end:
            await _on_api_end(
                {
                    "model": api_kwargs["model"],
                    # "contents": api_kwargs["contents"], # Disabled for now
                    "config": api_kwargs["config"],
                },
                response,
            )

        # Usage (Gemini SDK may not always provide token usage; populate when available)
        usage: Dict[str, Any] = {}
        try:
            # Some SDKs expose response.usage; if available, copy
            if getattr(response, "usage_metadata", None):
                md = response.usage_metadata
                usage = {
                    "prompt_tokens": getattr(md, "prompt_token_count", None) or 0,
                    "completion_tokens": getattr(md, "candidates_token_count", None) or 0,
                    "total_tokens": getattr(md, "total_token_count", None) or 0,
                }
        except Exception:
            pass

        if _on_usage and usage:
            await _on_usage(usage)

        # Parse output into internal items
        output_items: List[Dict[str, Any]] = []

        candidate = response.candidates[0]
        # Text parts from the model (assistant message)
        text_parts: List[str] = []
        function_calls: List[Dict[str, Any]] = []
        for p in candidate.content.parts:
            if getattr(p, "text", None):
                text_parts.append(p.text)
            if getattr(p, "function_call", None):
                # p.function_call has name and args
                fc = {
                    "name": getattr(p.function_call, "name", None),
                    "args": dict(getattr(p.function_call, "args", {}) or {}),
                }
                function_calls.append(fc)

        if text_parts:
            output_items.append(
                {
                    "type": "message",
                    "role": "assistant",
                    "content": [{"type": "output_text", "text": "\n".join(text_parts)}],
                }
            )

        # Map function calls to internal computer_call actions
        for fc in function_calls:
            item = _map_gemini_fc_to_computer_call(fc, screen_w, screen_h)
            if item is not None:
                output_items.append(item)

        return {"output": output_items, "usage": usage}

    async def predict_click(
        self,
        model: str,
        image_b64: str,
        instruction: str,
        **kwargs,
    ) -> Optional[Tuple[float, float]]:
        """Ask Gemini CUA to output a single click action for the given instruction.

        Excludes all predefined tools except `click_at` and sends the screenshot.
        Returns pixel (x, y) if a click is proposed, else None.
        """
        genai, types = _lazy_import_genai()

        client = genai.Client()

        # Exclude all but click_at
        exclude_all_but_click = [
            "open_web_browser",
            "wait_5_seconds",
            "go_back",
            "go_forward",
            "search",
            "navigate",
            "hover_at",
            "type_text_at",
            "key_combination",
            "scroll_document",
            "scroll_at",
            "drag_and_drop",
        ]

        config = types.GenerateContentConfig(
            tools=[
                types.Tool(
                    computer_use=types.ComputerUse(
                        environment=types.Environment.ENVIRONMENT_BROWSER,
                        excluded_predefined_functions=exclude_all_but_click,
                    )
                )
            ]
        )

        # Prepare prompt parts
        try:
            img_bytes = base64.b64decode(image_b64)
        except Exception:
            img_bytes = b""

        w, h = _bytes_image_size(img_bytes) if img_bytes else (1024, 768)

        parts: List[Any] = [types.Part(text=f"Click {instruction}.")]
        if img_bytes:
            parts.append(types.Part.from_bytes(data=img_bytes, mime_type="image/png"))

        contents = [types.Content(role="user", parts=parts)]

        response = client.models.generate_content(
            model=model,
            contents=contents,
            config=config,
        )

        # Parse first click_at
        try:
            candidate = response.candidates[0]
            for p in candidate.content.parts:
                fc = getattr(p, "function_call", None)
                if fc and getattr(fc, "name", None) == "click_at":
                    args = dict(getattr(fc, "args", {}) or {})
                    x = _denormalize(int(args.get("x", 0)), w)
                    y = _denormalize(int(args.get("y", 0)), h)
                    return float(x), float(y)
        except Exception:
            return None

        return None

    def get_capabilities(self) -> List[AgentCapability]:
        return ["click", "step"]

```

--------------------------------------------------------------------------------
/libs/lume/src/FileSystem/Home.swift:
--------------------------------------------------------------------------------

```swift
import Foundation

/// Manages the application's home directory and virtual machine directories.
/// Responsible for creating, accessing, and validating the application's directory structure.
final class Home {
    // MARK: - Constants

    private enum Constants {
        static let defaultDirectoryName = ".lume"
        static let homeDirPath = "~/\(defaultDirectoryName)"
    }

    // MARK: - Properties

    private var _homeDir: Path
    private let settingsManager: SettingsManager
    private let fileManager: FileManager
    private var locations: [String: VMLocation] = [:]

    // Current home directory based on default location
    var homeDir: Path {
        return _homeDir
    }

    // MARK: - Initialization

    init(
        settingsManager: SettingsManager = SettingsManager.shared,
        fileManager: FileManager = .default
    ) {
        self.settingsManager = settingsManager
        self.fileManager = fileManager

        // Get home directory path from settings or use default
        let settings = settingsManager.getSettings()
        guard let defaultLocation = settings.defaultLocation else {
            fatalError("No default VM location found")
        }

        self._homeDir = Path(defaultLocation.path)

        // Cache all locations
        for location in settings.vmLocations {
            locations[location.name] = location
        }
    }

    // MARK: - VM Directory Management

    /// Creates a temporary VM directory with a unique identifier
    /// - Returns: A VMDirectory instance representing the created directory
    /// - Throws: HomeError if directory creation fails
    func createTempVMDirectory() throws -> VMDirectory {
        let uuid = UUID().uuidString
        let tempDir = homeDir.directory(uuid)

        Logger.info("Creating temporary directory", metadata: ["path": tempDir.path])

        do {
            try createDirectory(at: tempDir.url)
            return VMDirectory(tempDir)
        } catch {
            throw HomeError.directoryCreationFailed(path: tempDir.path)
        }
    }

    /// Gets a VM directory for a specific VM name and optional location
    ///
    /// - Parameters:
    ///   - name: Name of the VM directory
    ///   - storage: Optional name of the VM location (default: default location)
    /// - Returns: A VMDirectory instance
    /// - Throws: HomeError if location not found
    func getVMDirectory(_ name: String, storage: String? = nil) throws -> VMDirectory {
        // Special case for ephemeral storage using macOS temporary directory
        if let storage = storage, storage == "ephemeral" {
            // Get the current temporary directory
            let tmpDir = ProcessInfo.processInfo.environment["TMPDIR"] ?? "/tmp"
            // Remove trailing slash if present
            let cleanPath = tmpDir.hasSuffix("/") ? String(tmpDir.dropLast()) : tmpDir
            
            // Create the directory if it doesn't exist
            if !fileExists(at: cleanPath) {
                try createVMLocation(at: cleanPath)
            }
            
            let baseDir = Path(cleanPath)
            return VMDirectory(baseDir.directory(name))
        }

        // Check if storage is a direct path
        if let storage = storage, (storage.contains("/") || storage.contains("\\")) {
            let cleanPath = storage.hasSuffix("/") ? String(storage.dropLast()) : storage
            let baseDir = Path(cleanPath)
            return VMDirectory(baseDir.directory(name))
        }

        let location: VMLocation

        if let storage = storage {
            // Get a specific location
            guard let loc = locations[storage] else {
                throw VMLocationError.locationNotFound(name: storage)
            }
            location = loc
        } else {
            // Use default location
            let settings = settingsManager.getSettings()
            guard let defaultLocation = settings.defaultLocation else {
                throw HomeError.invalidHomeDirectory
            }
            location = defaultLocation
        }

        let baseDir = Path(location.expandedPath)
        return VMDirectory(baseDir.directory(name))
    }
    
    /// Gets a VM directory from a direct file path
    ///
    /// - Parameters:
    ///   - name: Name of the VM directory
    ///   - storagePath: Direct file system path where the VM is located
    /// - Returns: A VMDirectory instance
    /// - Throws: HomeError if path is invalid
    func getVMDirectoryFromPath(_ name: String, storagePath: String) throws -> VMDirectory {
        let baseDir = Path(storagePath)
        
        // Create the directory if it doesn't exist
        if !fileExists(at: storagePath) {
            Logger.info("Creating storage directory", metadata: ["path": storagePath])
            try createVMLocation(at: storagePath)
        } else if !isValidDirectory(at: storagePath) {
            // Path exists but isn't a valid directory
            throw HomeError.invalidHomeDirectory
        }
        
        return VMDirectory(baseDir.directory(name))
    }

    /// Returns all initialized VM directories across all locations
    /// - Returns: An array of VMDirectory instances with location info
    /// - Throws: HomeError if directory access is denied
    func getAllVMDirectories() throws -> [VMDirectoryWithLocation] {
        var results: [VMDirectoryWithLocation] = []

        // Loop through all locations
        let settings = settingsManager.getSettings()
        
        // Also check ephemeral directory (macOS temporary directory)
        let tmpDir = ProcessInfo.processInfo.environment["TMPDIR"] ?? "/tmp"
        let cleanPath = tmpDir.hasSuffix("/") ? String(tmpDir.dropLast()) : tmpDir
        
        // If tmp directory exists, check for VMs there
        if fileExists(at: cleanPath) {
            let tmpDirPath = Path(cleanPath)
            do {
                let directoryURL = URL(fileURLWithPath: cleanPath)
                let contents = try FileManager.default.contentsOfDirectory(
                    at: directoryURL,
                    includingPropertiesForKeys: [.isDirectoryKey],
                    options: .skipsHiddenFiles
                )
                
                for subdir in contents {
                    do {
                        guard let isDirectory = try subdir.resourceValues(forKeys: [.isDirectoryKey]).isDirectory,
                              isDirectory else {
                            continue
                        }
                        
                        let vmName = subdir.lastPathComponent
                        let vmDir = VMDirectory(tmpDirPath.directory(vmName))
                        
                        // Only include if it's a valid VM directory
                        if vmDir.initialized() {
                            results.append(VMDirectoryWithLocation(
                                directory: vmDir,
                                locationName: "ephemeral"
                            ))
                        }
                    } catch {
                        // Skip any directories we can't access
                        continue
                    }
                }
            } catch {
                Logger.error(
                    "Failed to access ephemeral directory",
                    metadata: [
                        "path": cleanPath,
                        "error": error.localizedDescription,
                    ]
                )
                // Continue to regular locations rather than failing completely
            }
        }
        for location in settings.vmLocations {
            let locationPath = Path(location.expandedPath)

            // Skip non-existent locations
            if !locationPath.exists() {
                continue
            }

            do {
                let allFolders = try fileManager.contentsOfDirectory(
                    at: locationPath.url,
                    includingPropertiesForKeys: nil
                )

                let folders =
                    allFolders
                    .compactMap { url in
                        let sanitizedName = sanitizeFileName(url.lastPathComponent)
                        let dir = VMDirectory(locationPath.directory(sanitizedName))
                        let dirWithLoc =
                            dir.initialized()
                            ? VMDirectoryWithLocation(directory: dir, locationName: location.name)
                            : nil
                        return dirWithLoc
                    }

                results.append(contentsOf: folders)
            } catch {
                Logger.error(
                    "Failed to access VM location",
                    metadata: [
                        "location": location.name,
                        "error": error.localizedDescription,
                    ])
                // Continue to next location rather than failing completely
            }
        }

        return results
    }

    /// Copies a VM directory to a new location with a new name
    /// - Parameters:
    ///   - sourceName: Name of the source VM
    ///   - destName: Name for the destination VM
    ///   - sourceLocation: Optional name of the source location
    ///   - destLocation: Optional name of the destination location
    /// - Throws: HomeError if the copy operation fails
    func copyVMDirectory(
        from sourceName: String,
        to destName: String,
        sourceLocation: String? = nil,
        destLocation: String? = nil
    ) throws {
        let sourceDir = try getVMDirectory(sourceName, storage: sourceLocation)
        let destDir = try getVMDirectory(destName, storage: destLocation)

        // Check if destination directory exists at all
        if destDir.exists() {
            throw HomeError.directoryAlreadyExists(path: destDir.dir.path)
        }

        do {
            try fileManager.copyItem(atPath: sourceDir.dir.path, toPath: destDir.dir.path)
        } catch {
            throw HomeError.directoryCreationFailed(path: destDir.dir.path)
        }
    }

    // MARK: - Location Management

    /// Adds a new VM location
    /// - Parameters:
    ///   - name: Location name
    ///   - path: Location path
    /// - Throws: Error if location cannot be added
    func addLocation(name: String, path: String) throws {
        let location = VMLocation(name: name, path: path)
        try settingsManager.addLocation(location)

        // Update cache
        locations[name] = location
    }

    /// Removes a VM location
    /// - Parameter name: Location name
    /// - Throws: Error if location cannot be removed
    func removeLocation(name: String) throws {
        try settingsManager.removeLocation(name: name)

        // Update cache
        locations.removeValue(forKey: name)
    }

    /// Sets the default VM location
    /// - Parameter name: Location name
    /// - Throws: Error if location cannot be set as default
    func setDefaultLocation(name: String) throws {
        try settingsManager.setDefaultLocation(name: name)

        // Update home directory
        guard let location = locations[name] else {
            throw VMLocationError.locationNotFound(name: name)
        }

        // Update homeDir to reflect the new default
        self._homeDir = Path(location.path)
    }

    /// Gets all available VM locations
    /// - Returns: Array of VM locations
    func getLocations() -> [VMLocation] {
        return settingsManager.getSettings().sortedLocations
    }

    /// Gets the default VM location
    /// - Returns: Default VM location
    /// - Throws: HomeError if no default location
    func getDefaultLocation() throws -> VMLocation {
        guard let location = settingsManager.getSettings().defaultLocation else {
            throw HomeError.invalidHomeDirectory
        }
        return location
    }

    // MARK: - Directory Validation

    /// Validates and ensures the existence of all VM locations
    /// - Throws: HomeError if validation fails or directory creation fails
    func validateHomeDirectory() throws {
        let settings = settingsManager.getSettings()

        for location in settings.vmLocations {
            let path = location.expandedPath
            if !fileExists(at: path) {
                try createVMLocation(at: path)
            } else if !isValidDirectory(at: path) {
                throw HomeError.invalidHomeDirectory
            }
        }
    }

    // MARK: - Private Helpers

    private func createVMLocation(at path: String) throws {
        do {
            try fileManager.createDirectory(
                atPath: path,
                withIntermediateDirectories: true
            )
        } catch {
            throw HomeError.directoryCreationFailed(path: path)
        }
    }

    private func createDirectory(at url: URL) throws {
        try fileManager.createDirectory(
            at: url,
            withIntermediateDirectories: true
        )
    }

    private func isValidDirectory(at path: String) -> Bool {
        var isDirectory: ObjCBool = false
        return fileManager.fileExists(atPath: path, isDirectory: &isDirectory)
            && isDirectory.boolValue
            && Path(path).writable()
    }

    private func fileExists(at path: String) -> Bool {
        return fileManager.fileExists(atPath: path)
    }

    private func sanitizeFileName(_ name: String) -> String {
        // Only decode percent encoding (e.g., %20 for spaces)
        return name.removingPercentEncoding ?? name
    }
}

// MARK: - VM Directory with Location

/// Represents a VM directory with its location information
struct VMDirectoryWithLocation {
    let directory: VMDirectory
    let locationName: String
}

// MARK: - Home + CustomStringConvertible

extension Home: CustomStringConvertible {
    var description: String {
        "Home(path: \(homeDir.path))"
    }
}

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/adapters/mlxvlm_adapter.py:
--------------------------------------------------------------------------------

```python
import asyncio
import base64
import functools
import io
import math
import re
import warnings
from concurrent.futures import ThreadPoolExecutor
from typing import Any, AsyncIterator, Dict, Iterator, List, Optional, Tuple, cast

from litellm import acompletion, completion
from litellm.llms.custom_llm import CustomLLM
from litellm.types.utils import GenericStreamingChunk, ModelResponse
from PIL import Image

# Try to import MLX dependencies
try:
    import mlx.core as mx
    from mlx_vlm import generate, load
    from mlx_vlm.prompt_utils import apply_chat_template
    from mlx_vlm.utils import load_config
    from transformers.tokenization_utils import PreTrainedTokenizer

    MLX_AVAILABLE = True
except ImportError:
    MLX_AVAILABLE = False

# Constants for smart_resize
IMAGE_FACTOR = 28
MIN_PIXELS = 100 * 28 * 28
MAX_PIXELS = 16384 * 28 * 28
MAX_RATIO = 200


def round_by_factor(number: float, factor: int) -> int:
    """Returns the closest integer to 'number' that is divisible by 'factor'."""
    return round(number / factor) * factor


def ceil_by_factor(number: float, factor: int) -> int:
    """Returns the smallest integer greater than or equal to 'number' that is divisible by 'factor'."""
    return math.ceil(number / factor) * factor


def floor_by_factor(number: float, factor: int) -> int:
    """Returns the largest integer less than or equal to 'number' that is divisible by 'factor'."""
    return math.floor(number / factor) * factor


def smart_resize(
    height: int,
    width: int,
    factor: int = IMAGE_FACTOR,
    min_pixels: int = MIN_PIXELS,
    max_pixels: int = MAX_PIXELS,
) -> tuple[int, int]:
    """
    Rescales the image so that the following conditions are met:

    1. Both dimensions (height and width) are divisible by 'factor'.
    2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
    3. The aspect ratio of the image is maintained as closely as possible.
    """
    if max(height, width) / min(height, width) > MAX_RATIO:
        raise ValueError(
            f"absolute aspect ratio must be smaller than {MAX_RATIO}, got {max(height, width) / min(height, width)}"
        )
    h_bar = max(factor, round_by_factor(height, factor))
    w_bar = max(factor, round_by_factor(width, factor))
    if h_bar * w_bar > max_pixels:
        beta = math.sqrt((height * width) / max_pixels)
        h_bar = floor_by_factor(height / beta, factor)
        w_bar = floor_by_factor(width / beta, factor)
    elif h_bar * w_bar < min_pixels:
        beta = math.sqrt(min_pixels / (height * width))
        h_bar = ceil_by_factor(height * beta, factor)
        w_bar = ceil_by_factor(width * beta, factor)
    return h_bar, w_bar


class MLXVLMAdapter(CustomLLM):
    """MLX VLM Adapter for running vision-language models locally using MLX."""

    def __init__(self, **kwargs):
        """Initialize the adapter.

        Args:
            **kwargs: Additional arguments
        """
        super().__init__()

        self.models = {}  # Cache for loaded models
        self.processors = {}  # Cache for loaded processors
        self.configs = {}  # Cache for loaded configs
        self._executor = ThreadPoolExecutor(max_workers=1)  # Single thread pool

    def _load_model_and_processor(self, model_name: str):
        """Load model and processor if not already cached.

        Args:
            model_name: Name of the model to load

        Returns:
            Tuple of (model, processor, config)
        """
        if not MLX_AVAILABLE:
            raise ImportError("MLX VLM dependencies not available. Please install mlx-vlm.")

        if model_name not in self.models:
            # Load model and processor
            model_obj, processor = load(
                model_name, processor_kwargs={"min_pixels": MIN_PIXELS, "max_pixels": MAX_PIXELS}
            )
            config = load_config(model_name)

            # Cache them
            self.models[model_name] = model_obj
            self.processors[model_name] = processor
            self.configs[model_name] = config

        return self.models[model_name], self.processors[model_name], self.configs[model_name]

    def _process_coordinates(
        self, text: str, original_size: Tuple[int, int], model_size: Tuple[int, int]
    ) -> str:
        """Process coordinates in box tokens based on image resizing using smart_resize approach.

        Args:
            text: Text containing box tokens
            original_size: Original image size (width, height)
            model_size: Model processed image size (width, height)

        Returns:
            Text with processed coordinates
        """
        # Find all box tokens
        box_pattern = r"<\|box_start\|>\((\d+),\s*(\d+)\)<\|box_end\|>"

        def process_coords(match):
            model_x, model_y = int(match.group(1)), int(match.group(2))
            # Scale coordinates from model space to original image space
            # Both original_size and model_size are in (width, height) format
            new_x = int(model_x * original_size[0] / model_size[0])  # Width
            new_y = int(model_y * original_size[1] / model_size[1])  # Height
            return f"<|box_start|>({new_x},{new_y})<|box_end|>"

        return re.sub(box_pattern, process_coords, text)

    def _convert_messages(self, messages: List[Dict[str, Any]]) -> Tuple[
        List[Dict[str, Any]],
        List[Image.Image],
        Dict[int, Tuple[int, int]],
        Dict[int, Tuple[int, int]],
    ]:
        """Convert OpenAI format messages to MLX VLM format and extract images.

        Args:
            messages: Messages in OpenAI format

        Returns:
            Tuple of (processed_messages, images, original_sizes, model_sizes)
        """
        processed_messages = []
        images = []
        original_sizes = {}  # Track original sizes of images for coordinate mapping
        model_sizes = {}  # Track model processed sizes
        image_index = 0

        for message in messages:
            processed_message = {"role": message["role"], "content": []}

            content = message.get("content", [])
            if isinstance(content, str):
                # Simple text content
                processed_message["content"] = content
            elif isinstance(content, list):
                # Multi-modal content
                processed_content = []
                for item in content:
                    if item.get("type") == "text":
                        processed_content.append({"type": "text", "text": item.get("text", "")})
                    elif item.get("type") == "image_url":
                        image_url = item.get("image_url", {}).get("url", "")
                        pil_image = None

                        if image_url.startswith("data:image/"):
                            # Extract base64 data
                            base64_data = image_url.split(",")[1]
                            # Convert base64 to PIL Image
                            image_data = base64.b64decode(base64_data)
                            pil_image = Image.open(io.BytesIO(image_data))
                        else:
                            # Handle file path or URL
                            pil_image = Image.open(image_url)

                        # Store original image size for coordinate mapping
                        original_size = pil_image.size
                        original_sizes[image_index] = original_size

                        # Use smart_resize to determine model size
                        # Note: smart_resize expects (height, width) but PIL gives (width, height)
                        height, width = original_size[1], original_size[0]
                        new_height, new_width = smart_resize(height, width)
                        # Store model size in (width, height) format for consistent coordinate processing
                        model_sizes[image_index] = (new_width, new_height)

                        # Resize the image using the calculated dimensions from smart_resize
                        resized_image = pil_image.resize((new_width, new_height))
                        images.append(resized_image)

                        # Add image placeholder to content
                        processed_content.append({"type": "image"})

                        image_index += 1

                processed_message["content"] = processed_content

            processed_messages.append(processed_message)

        return processed_messages, images, original_sizes, model_sizes

    def _generate(self, **kwargs) -> str:
        """Generate response using the local MLX VLM model.

        Args:
            **kwargs: Keyword arguments containing messages and model info

        Returns:
            Generated text response
        """
        messages = kwargs.get("messages", [])
        model_name = kwargs.get("model", "mlx-community/UI-TARS-1.5-7B-4bit")
        max_tokens = kwargs.get("max_tokens", 128)

        # Warn about ignored kwargs
        ignored_kwargs = set(kwargs.keys()) - {"messages", "model", "max_tokens"}
        if ignored_kwargs:
            warnings.warn(f"Ignoring unsupported kwargs: {ignored_kwargs}")

        # Load model and processor
        model, processor, config = self._load_model_and_processor(model_name)

        # Convert messages and extract images
        processed_messages, images, original_sizes, model_sizes = self._convert_messages(messages)

        # Process user text input with box coordinates after image processing
        # Swap original_size and model_size arguments for inverse transformation
        for msg_idx, msg in enumerate(processed_messages):
            if msg.get("role") == "user" and isinstance(msg.get("content"), str):
                content = msg.get("content", "")
                if (
                    "<|box_start|>" in content
                    and original_sizes
                    and model_sizes
                    and 0 in original_sizes
                    and 0 in model_sizes
                ):
                    orig_size = original_sizes[0]
                    model_size = model_sizes[0]
                    # Swap arguments to perform inverse transformation for user input
                    processed_messages[msg_idx]["content"] = self._process_coordinates(
                        content, model_size, orig_size
                    )

        try:
            # Format prompt according to model requirements using the processor directly
            prompt = processor.apply_chat_template(
                processed_messages, tokenize=False, add_generation_prompt=True, return_tensors="pt"
            )
            tokenizer = cast(PreTrainedTokenizer, processor)

            # Generate response
            text_content, usage = generate(
                model,
                tokenizer,
                str(prompt),
                images,  # type: ignore
                verbose=False,
                max_tokens=max_tokens,
            )

        except Exception as e:
            raise RuntimeError(f"Error generating response: {str(e)}") from e

        # Process coordinates in the response back to original image space
        if original_sizes and model_sizes and 0 in original_sizes and 0 in model_sizes:
            # Get original image size and model size (using the first image)
            orig_size = original_sizes[0]
            model_size = model_sizes[0]

            # Check if output contains box tokens that need processing
            if "<|box_start|>" in text_content:
                # Process coordinates from model space back to original image space
                text_content = self._process_coordinates(text_content, orig_size, model_size)

        return text_content

    def completion(self, *args, **kwargs) -> ModelResponse:
        """Synchronous completion method.

        Returns:
            ModelResponse with generated text
        """
        generated_text = self._generate(**kwargs)

        result = completion(
            model=f"mlx/{kwargs.get('model', 'mlx-community/UI-TARS-1.5-7B-4bit')}",
            mock_response=generated_text,
        )
        return cast(ModelResponse, result)

    async def acompletion(self, *args, **kwargs) -> ModelResponse:
        """Asynchronous completion method.

        Returns:
            ModelResponse with generated text
        """
        # Run _generate in thread pool to avoid blocking
        loop = asyncio.get_event_loop()
        generated_text = await loop.run_in_executor(
            self._executor, functools.partial(self._generate, **kwargs)
        )

        result = await acompletion(
            model=f"mlx/{kwargs.get('model', 'mlx-community/UI-TARS-1.5-7B-4bit')}",
            mock_response=generated_text,
        )
        return cast(ModelResponse, result)

    def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]:
        """Synchronous streaming method.

        Returns:
            Iterator of GenericStreamingChunk
        """
        generated_text = self._generate(**kwargs)

        generic_streaming_chunk: GenericStreamingChunk = {
            "finish_reason": "stop",
            "index": 0,
            "is_finished": True,
            "text": generated_text,
            "tool_use": None,
            "usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0},
        }

        yield generic_streaming_chunk

    async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]:
        """Asynchronous streaming method.

        Returns:
            AsyncIterator of GenericStreamingChunk
        """
        # Run _generate in thread pool to avoid blocking
        loop = asyncio.get_event_loop()
        generated_text = await loop.run_in_executor(
            self._executor, functools.partial(self._generate, **kwargs)
        )

        generic_streaming_chunk: GenericStreamingChunk = {
            "finish_reason": "stop",
            "index": 0,
            "is_finished": True,
            "text": generated_text,
            "tool_use": None,
            "usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0},
        }

        yield generic_streaming_chunk

```

--------------------------------------------------------------------------------
/.github/workflows/test-cua-models.yml:
--------------------------------------------------------------------------------

```yaml
name: Test CUA Supporting Models

# This workflow tests all supported CUA models with API keys
# Run manually using workflow_dispatch with test_models=true

on:
  workflow_dispatch:
    inputs:
      test_models:
        description: "Test all supported models (requires API keys)"
        required: false
        default: true
        type: boolean
  schedule:
    # Runs at 3 PM UTC (8 AM PDT) daily
    - cron: "0 15 * * *"

jobs:
  # Test all CUA models - runs on PRs, schedules, or when manually triggered
  test-all-models:
    if: ${{ github.event_name == 'workflow_dispatch' || github.event_name == 'schedule' || fromJSON(inputs.test_models || 'false') }}
    runs-on: ubuntu-latest
    strategy:
      fail-fast: false
      matrix:
        model:
          # Claude Sonnet/Haiku
          - anthropic/claude-sonnet-4-5-20250929
          - anthropic/claude-haiku-4-5-20251001
          - anthropic/claude-opus-4-1-20250805

          # OpenAI CU Preview
          - openai/computer-use-preview

          # GLM-V
          - openrouter/z-ai/glm-4.5v
          # - huggingface-local/zai-org/GLM-4.5V  # Requires local model setup

          # Gemini CU Preview
          - gemini-2.5-computer-use-preview-10-2025

          # InternVL
          # - huggingface-local/OpenGVLab/InternVL3_5-1B
          # - huggingface-local/OpenGVLab/InternVL3_5-2B
          # - huggingface-local/OpenGVLab/InternVL3_5-4B
          # - huggingface-local/OpenGVLab/InternVL3_5-8B

          # UI-TARS (supports full computer-use, can run standalone)
          # - huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B

          # Note: OpenCUA, GTA, and Holo are grounding-only models
          # They only support predict_click(), not agent.run()
          # See composed agents section below for testing them

          # Moondream (typically used in composed agents)
          # Format: moondream3+{any-llm-with-tools}
          # - moondream3+anthropic/claude-sonnet-4-5-20250929 # Claude has VLM + Tools
          # - moondream3+openai/gpt-4o  # GPT-4o has VLM + Tools

          # OmniParser (typically used in composed agents)
          # Format: omniparser+{any-vlm-with-tools}
          - omniparser+anthropic/claude-sonnet-4-5-20250929 # Claude has VLM + Tools
          # - omniparser+openai/gpt-4o  # GPT-4o has VLM + Tools

          # Other grounding models + VLM with tools
          # Format: {grounding-model}+{any-vlm-with-tools}
          # These grounding-only models (OpenCUA, GTA, Holo) must be used in composed form
          # since they only support predict_click(), not full agent.run()
          # - huggingface-local/HelloKKMe/GTA1-7B+anthropic/claude-sonnet-4-5-20250929
          # - huggingface-local/xlangai/OpenCUA-7B+anthropic/claude-sonnet-4-5-20250929
          # - huggingface-local/Hcompany/Holo1.5-3B+anthropic/claude-sonnet-4-5-20250929

    steps:
      - name: Checkout repository
        uses: actions/checkout@v4

      - name: Set up uv and Python
        uses: astral-sh/setup-uv@v4
        with:
          python-version: "3.12"

      - name: Cache system packages
        uses: actions/cache@v4
        with:
          path: /var/cache/apt
          key: ${{ runner.os }}-apt-${{ hashFiles('**/Dockerfile') }}
          restore-keys: |
            ${{ runner.os }}-apt-

      - name: Install system dependencies
        timeout-minutes: 20
        run: |
          sudo apt-get update
          sudo apt-get install -y libgl1-mesa-dri libglib2.0-0

      - name: Cache Python dependencies (uv)
        uses: actions/cache@v4
        with:
          path: |
            ~/.cache/uv
            .venv
          key: ${{ runner.os }}-uv-${{ hashFiles('pyproject.toml', 'uv.lock', 'libs/python/**/pyproject.toml') }}
          restore-keys: |
            ${{ runner.os }}-uv-

      - name: Install CUA dependencies (uv)
        run: |
          # Remove existing venv if it exists (from cache restore) to avoid interactive prompt
          rm -rf .venv
          uv venv --python 3.12
          uv pip install -e libs/python/agent -e libs/python/computer
          uv pip install -e libs/python/core
          uv pip install "cua-agent[uitars-hf,internvl-hf,opencua-hf,moondream3,omni]"
          uv pip install pytest

      - name: Cache HuggingFace models
        uses: actions/cache@v4
        with:
          path: ~/.cache/huggingface
          key: ${{ runner.os }}-hf-models-v1
          restore-keys: |
            ${{ runner.os }}-hf-models-
          # Large cache - models can be several GB each and are reused across runs

      - name: Record test start time
        run: echo "TEST_START_TIME=$(date +%s)" >> $GITHUB_ENV
        env:
          # Ensure HuggingFace uses consistent cache location
          HF_HOME: ~/.cache/huggingface

      - name: Test model with agent loop
        id: test_model
        timeout-minutes: 20
        continue-on-error: true
        run: |
          cd tests/agent_loop_testing
          uv run python agent_test.py --model "${{ matrix.model }}"
        env:
          ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
          GOOGLE_API_KEY: ${{ secrets.GOOGLE_API_KEY }}
          OPENROUTER_API_KEY: ${{ secrets.OPENROUTER_API_KEY }}
          HF_TOKEN: ${{ secrets.HF_TOKEN }}

      - name: Calculate test duration and prepare message
        if: always()
        run: |
          TEST_END_TIME=$(date +%s)

          # Handle case where TEST_START_TIME might not be set
          if [ -z "$TEST_START_TIME" ]; then
            TEST_START_TIME=$TEST_END_TIME
          fi

          TEST_DURATION=$((TEST_END_TIME - TEST_START_TIME))

          # Convert seconds to minutes and seconds
          MINUTES=$((TEST_DURATION / 60))
          SECONDS=$((TEST_DURATION % 60))

          # Format duration
          if [ $MINUTES -gt 0 ]; then
            DURATION_STR="${MINUTES}m ${SECONDS}s"
          else
            DURATION_STR="${SECONDS}s"
          fi

          # Determine status icon based on test step outcome
          if [ "${{ steps.test_model.outcome }}" == "success" ]; then
            STATUS_ICON="✅"
            STATUS_TEXT="PASSED"
            SLACK_COLOR="#36a64f"
          else
            STATUS_ICON="❌"
            STATUS_TEXT="FAILED"
            SLACK_COLOR="#dc3545"
          fi

          # Prepare Slack message
          echo "TESTS_CONTENT<<EOF" >> $GITHUB_ENV
          echo "*CUA Model Test Results*" >> $GITHUB_ENV
          echo "" >> $GITHUB_ENV
          echo "*Model:* ${{ matrix.model }}" >> $GITHUB_ENV
          echo "*Status:* ${STATUS_ICON} ${STATUS_TEXT}" >> $GITHUB_ENV
          echo "*Duration:* ${DURATION_STR}" >> $GITHUB_ENV
          echo "*Run:* ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" >> $GITHUB_ENV
          echo "EOF" >> $GITHUB_ENV

          # Set color based on outcome
          echo "SLACK_COLOR=${SLACK_COLOR}" >> $GITHUB_ENV

          # Save result to JSON file for summary
          mkdir -p test_summary
          MODEL_NAME="${{ matrix.model }}"
          # Sanitize model name for filename
          SAFE_MODEL_NAME=$(echo "$MODEL_NAME" | sed 's/[^a-zA-Z0-9]/_/g')

          # Determine pass status
          if [ "${{ steps.test_model.outcome }}" == "success" ]; then
            PASSED_VAL="true"
          else
            PASSED_VAL="false"
          fi

          # Create JSON file using printf to avoid YAML parsing issues
          printf '{\n  "model": "%s",\n  "status": "%s",\n  "status_icon": "%s",\n  "duration": "%s",\n  "duration_seconds": %d,\n  "passed": %s\n}' \
            "${MODEL_NAME}" "${STATUS_TEXT}" "${STATUS_ICON}" "${DURATION_STR}" "${TEST_DURATION}" "${PASSED_VAL}" \
            > "test_summary/${SAFE_MODEL_NAME}.json"
          # Expose safe model name for subsequent steps (artifact naming)
          echo "SAFE_MODEL_NAME=${SAFE_MODEL_NAME}" >> $GITHUB_ENV

      - name: Upload test results
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: test-results-${{ matrix.model }}
          path: |
            tests/agent_loop_testing/test_images/
            *.log
          if-no-files-found: ignore
          retention-days: 7

      - name: Upload test summary data
        if: always()
        uses: actions/upload-artifact@v4
        with:
          # Unique, slash-free artifact name per matrix entry
          name: test-summary-${{ env.SAFE_MODEL_NAME }}
          path: test_summary/
          if-no-files-found: ignore
          retention-days: 1

      - name: Set default Slack color
        if: always() && env.SLACK_COLOR == ''
        run: echo "SLACK_COLOR=#36a64f" >> $GITHUB_ENV

      # Individual model notifications disabled - only summary is sent
      # - name: Notify Slack with test results
      #   if: always()
      #   uses: rtCamp/action-slack-notify@v2
      #   env:
      #     SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
      #     SLACK_CHANNEL: ${{ vars.SLACK_CHANNEL }}
      #     SLACK_TITLE: CUA Model Test Update
      #     SLACK_COLOR: ${{ env.SLACK_COLOR }}
      #     SLACK_MESSAGE: |
      #       ${{ env.TESTS_CONTENT }}

  # Summary job that aggregates all model test results
  test-summary:
    if: ${{ always() && (github.event_name == 'workflow_dispatch' || github.event_name == 'schedule' || fromJSON(inputs.test_models || 'false')) }}
    needs: test-all-models
    runs-on: ubuntu-latest
    steps:
      - name: Install jq
        run: sudo apt-get update && sudo apt-get install -y jq

      - name: Download all test summary artifacts
        continue-on-error: true
        uses: actions/download-artifact@v4
        with:
          pattern: test-summary-*
          merge-multiple: true
          path: all_summaries

      - name: Generate and send summary
        if: always()
        shell: bash
        run: |
          # Create directory if it doesn't exist
          mkdir -p all_summaries

          # Get list of models being tested in this run from the matrix
          # This helps filter out artifacts from previous runs when testing locally
          EXPECTED_MODELS="${{ join(matrix.model, ' ') }}"

          # Aggregate all results
          PASSED_COUNT=0
          FAILED_COUNT=0
          TOTAL_DURATION=0
          SUMMARY_MESSAGE="*🚀 Model Summaries*\n\n"

          # Process each JSON file (find all JSON files recursively)
          # Save to temp file first to avoid subshell issues
          find all_summaries -name "*.json" -type f 2>/dev/null > /tmp/json_files.txt || true

          # Use associative array to deduplicate by model name
          declare -A processed_models

          while IFS= read -r json_file; do
            if [ -f "$json_file" ]; then
              MODEL=$(jq -r '.model' "$json_file")
              
              # Skip if we've already processed this model
              if [ "${processed_models[$MODEL]}" = "1" ]; then
                echo "Skipping duplicate model: $MODEL"
                continue
              fi
              
              # Filter: Only include models that are in the current matrix
              # This prevents including artifacts from previous workflow runs
              if [ -n "$EXPECTED_MODELS" ]; then
                if ! echo "$EXPECTED_MODELS" | grep -q "$MODEL"; then
                  echo "Skipping model from previous run: $MODEL"
                  continue
                fi
              fi
              
              # Mark as processed
              processed_models[$MODEL]="1"
              
              STATUS_ICON=$(jq -r '.status_icon' "$json_file")
              STATUS=$(jq -r '.status' "$json_file")
              DURATION=$(jq -r '.duration' "$json_file")
              DURATION_SEC=$(jq -r '.duration_seconds' "$json_file")
              PASSED=$(jq -r '.passed' "$json_file")
              
              # Add to summary as clean line format
              SUMMARY_MESSAGE="${SUMMARY_MESSAGE}${STATUS_ICON} ${STATUS} - \`${MODEL}\` - ${DURATION}\n"
              
              if [ "$PASSED" = "true" ]; then
                PASSED_COUNT=$((PASSED_COUNT + 1))
              else
                FAILED_COUNT=$((FAILED_COUNT + 1))
              fi
              TOTAL_DURATION=$((TOTAL_DURATION + DURATION_SEC))
            fi
          done < /tmp/json_files.txt

          # Check if we found any results
          TOTAL_COUNT=$((PASSED_COUNT + FAILED_COUNT))
          if [ $TOTAL_COUNT -eq 0 ]; then
            SUMMARY_MESSAGE="${SUMMARY_MESSAGE}⚠️ No test results found (workflow may have been canceled)\n"
            SLACK_COLOR="#ffa500"
          else
            # Add summary stats
            SUMMARY_MESSAGE="${SUMMARY_MESSAGE}\n*Results:* ${PASSED_COUNT} passed, ${FAILED_COUNT} failed out of ${TOTAL_COUNT} models\n"
            
            # Calculate total duration
            TOTAL_MIN=$((TOTAL_DURATION / 60))
            TOTAL_SEC=$((TOTAL_DURATION % 60))
            if [ $TOTAL_MIN -gt 0 ]; then
              TOTAL_DURATION_STR="${TOTAL_MIN}m ${TOTAL_SEC}s"
            else
              TOTAL_DURATION_STR="${TOTAL_SEC}s"
            fi
            SUMMARY_MESSAGE="${SUMMARY_MESSAGE}*Total Duration:* ${TOTAL_DURATION_STR}\n"
            
            # Determine color based on results
            if [ $FAILED_COUNT -eq 0 ]; then
              SLACK_COLOR="#36a64f"
            elif [ $PASSED_COUNT -eq 0 ]; then
              SLACK_COLOR="#dc3545"
            else
              SLACK_COLOR="#ffa500"
            fi
          fi

          SUMMARY_MESSAGE="${SUMMARY_MESSAGE}*Run:* ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"

          # Export for use in next step
          echo "SUMMARY_MESSAGE<<EOF" >> $GITHUB_ENV
          echo -e "${SUMMARY_MESSAGE}" >> $GITHUB_ENV
          echo "EOF" >> $GITHUB_ENV
          echo "SLACK_COLOR=${SLACK_COLOR}" >> $GITHUB_ENV

      - name: Send summary to Slack
        if: always()
        uses: rtCamp/action-slack-notify@v2
        env:
          SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
          SLACK_CHANNEL: ${{ vars.SLACK_CHANNEL }}
          SLACK_TITLE: CUA Models Test Summary
          SLACK_COLOR: ${{ env.SLACK_COLOR }}
          SLACK_MESSAGE: |
            ${{ env.SUMMARY_MESSAGE }}

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/browser.py:
--------------------------------------------------------------------------------

```python
"""
Browser manager using Playwright for programmatic browser control.
This allows agents to control a browser that runs visibly on the XFCE desktop.
"""

import asyncio
import logging
import os
from typing import Any, Dict, Optional

try:
    from playwright.async_api import Browser, BrowserContext, Page, async_playwright
except ImportError:
    async_playwright = None
    Browser = None
    BrowserContext = None
    Page = None

logger = logging.getLogger(__name__)


class BrowserManager:
    """
    Manages a Playwright browser instance that runs visibly on the XFCE desktop.
    Uses persistent context to maintain cookies and sessions.
    """

    def __init__(self):
        """Initialize the BrowserManager."""
        self.playwright = None
        self.browser: Optional[Browser] = None
        self.context: Optional[BrowserContext] = None
        self.page: Optional[Page] = None
        self._initialized = False
        self._initialization_error: Optional[str] = None
        self._lock = asyncio.Lock()

    async def _ensure_initialized(self):
        """Ensure the browser is initialized."""
        # Check if browser was closed and needs reinitialization
        if self._initialized:
            try:
                # Check if context is still valid by trying to access it
                if self.context:
                    # Try to get pages - this will raise if context is closed
                    _ = self.context.pages
                    # If we get here, context is still alive
                    return
                else:
                    # Context was closed, need to reinitialize
                    self._initialized = False
                    logger.warning("Browser context was closed, will reinitialize...")
            except Exception as e:
                # Context is dead, need to reinitialize
                logger.warning(f"Browser context is dead ({e}), will reinitialize...")
                self._initialized = False
                self.context = None
                self.page = None
                # Clean up playwright if it exists
                if self.playwright:
                    try:
                        await self.playwright.stop()
                    except Exception:
                        pass
                    self.playwright = None

        async with self._lock:
            # Double-check after acquiring lock (another thread might have initialized it)
            if self._initialized:
                try:
                    if self.context:
                        _ = self.context.pages
                        return
                except Exception:
                    self._initialized = False
                    self.context = None
                    self.page = None
                    if self.playwright:
                        try:
                            await self.playwright.stop()
                        except Exception:
                            pass
                        self.playwright = None

            if async_playwright is None:
                raise RuntimeError(
                    "playwright is not installed. Please install it with: pip install playwright && playwright install --with-deps firefox"
                )

            try:
                # Get display from environment or default to :1
                display = os.environ.get("DISPLAY", ":1")
                logger.info(f"Initializing browser with DISPLAY={display}")

                # Start playwright
                self.playwright = await async_playwright().start()

                # Launch Firefox with persistent context (keeps cookies/sessions)
                # headless=False is CRITICAL so the visual agent can see it
                user_data_dir = os.path.join(os.path.expanduser("~"), ".playwright-firefox")
                os.makedirs(user_data_dir, exist_ok=True)

                # launch_persistent_context returns a BrowserContext, not a Browser
                # Note: Removed --kiosk mode so the desktop remains visible
                self.context = await self.playwright.firefox.launch_persistent_context(
                    user_data_dir=user_data_dir,
                    headless=False,  # CRITICAL: visible for visual agent
                    viewport={"width": 1024, "height": 768},
                    # Removed --kiosk to allow desktop visibility
                )

                # Add init script to make the browser less detectable
                await self.context.add_init_script(
                    """const defaultGetter = Object.getOwnPropertyDescriptor(
      Navigator.prototype,
      "webdriver"
    ).get;
    defaultGetter.apply(navigator);
    defaultGetter.toString();
    Object.defineProperty(Navigator.prototype, "webdriver", {
      set: undefined,
      enumerable: true,
      configurable: true,
      get: new Proxy(defaultGetter, {
        apply: (target, thisArg, args) => {
          Reflect.apply(target, thisArg, args);
          return false;
        },
      }),
    });
    const patchedGetter = Object.getOwnPropertyDescriptor(
      Navigator.prototype,
      "webdriver"
    ).get;
    patchedGetter.apply(navigator);
    patchedGetter.toString();"""
                )

                # Get the first page or create one
                pages = self.context.pages
                if pages:
                    self.page = pages[0]
                else:
                    self.page = await self.context.new_page()

                self._initialized = True
                logger.info("Browser initialized successfully")

            except Exception as e:
                logger.error(f"Failed to initialize browser: {e}")
                import traceback

                logger.error(traceback.format_exc())
                # Don't raise - return error in execute_command instead
                self._initialization_error = str(e)
                raise

    async def _execute_command_impl(self, cmd: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """Internal implementation of command execution."""
        if cmd == "visit_url":
            url = params.get("url")
            if not url:
                return {"success": False, "error": "url parameter is required"}
            await self.page.goto(url, wait_until="domcontentloaded", timeout=30000)
            return {"success": True, "url": self.page.url}

        elif cmd == "click":
            x = params.get("x")
            y = params.get("y")
            if x is None or y is None:
                return {"success": False, "error": "x and y parameters are required"}
            await self.page.mouse.click(x, y)
            return {"success": True}

        elif cmd == "type":
            text = params.get("text")
            if text is None:
                return {"success": False, "error": "text parameter is required"}
            await self.page.keyboard.type(text)
            return {"success": True}

        elif cmd == "scroll":
            delta_x = params.get("delta_x", 0)
            delta_y = params.get("delta_y", 0)
            await self.page.mouse.wheel(delta_x, delta_y)
            return {"success": True}

        elif cmd == "web_search":
            query = params.get("query")
            if not query:
                return {"success": False, "error": "query parameter is required"}
            # Navigate to Google search
            search_url = f"https://www.google.com/search?q={query}"
            await self.page.goto(search_url, wait_until="domcontentloaded", timeout=30000)
            return {"success": True, "url": self.page.url}

        elif cmd == "screenshot":
            # Take a screenshot and return as base64
            import base64

            screenshot_bytes = await self.page.screenshot(type="png")
            screenshot_b64 = base64.b64encode(screenshot_bytes).decode("utf-8")
            return {"success": True, "screenshot": screenshot_b64}

        else:
            return {"success": False, "error": f"Unknown command: {cmd}"}

    async def execute_command(self, cmd: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute a browser command with automatic recovery.

        Args:
            cmd: Command name (visit_url, click, type, scroll, web_search)
            params: Command parameters

        Returns:
            Result dictionary with success status and any data
        """
        max_retries = 2
        for attempt in range(max_retries):
            try:
                await self._ensure_initialized()
            except Exception as e:
                error_msg = getattr(self, "_initialization_error", None) or str(e)
                logger.error(f"Browser initialization failed: {error_msg}")
                return {
                    "success": False,
                    "error": f"Browser initialization failed: {error_msg}. "
                    f"Make sure Playwright and Firefox are installed, and DISPLAY is set correctly.",
                }

            # Check if page is still valid and get a new one if needed
            page_valid = False
            try:
                if self.page is not None and not self.page.is_closed():
                    # Try to access page.url to check if it's still valid
                    _ = self.page.url
                    page_valid = True
            except Exception as e:
                logger.warning(f"Page is invalid: {e}, will get a new page...")
                self.page = None

            # Get a valid page if we don't have one
            if not page_valid or self.page is None:
                try:
                    if self.context:
                        pages = self.context.pages
                        if pages:
                            # Find first non-closed page
                            for p in pages:
                                try:
                                    if not p.is_closed():
                                        self.page = p
                                        logger.info("Reusing existing open page")
                                        page_valid = True
                                        break
                                except Exception:
                                    continue

                        # If no valid page found, create a new one
                        if not page_valid:
                            self.page = await self.context.new_page()
                            logger.info("Created new page")
                except Exception as e:
                    logger.error(f"Failed to get new page: {e}, browser may be closed")
                    # Browser was closed - force reinitialization
                    self._initialized = False
                    self.context = None
                    self.page = None
                    if self.playwright:
                        try:
                            await self.playwright.stop()
                        except Exception:
                            pass
                        self.playwright = None

                    # If this isn't the last attempt, continue to retry
                    if attempt < max_retries - 1:
                        logger.info("Browser was closed, retrying with fresh initialization...")
                        continue
                    else:
                        return {
                            "success": False,
                            "error": f"Browser was closed and cannot be recovered: {e}",
                        }

            # Try to execute the command
            try:
                return await self._execute_command_impl(cmd, params)
            except Exception as e:
                error_str = str(e)
                logger.error(f"Error executing command {cmd}: {e}")

                # Check if this is a "browser/page/context closed" error
                if any(keyword in error_str.lower() for keyword in ["closed", "target", "context"]):
                    logger.warning(
                        f"Browser/page was closed during command execution (attempt {attempt + 1}/{max_retries})"
                    )

                    # Force reinitialization
                    self._initialized = False
                    self.context = None
                    self.page = None
                    if self.playwright:
                        try:
                            await self.playwright.stop()
                        except Exception:
                            pass
                        self.playwright = None

                    # If this isn't the last attempt, retry
                    if attempt < max_retries - 1:
                        logger.info("Retrying command after browser reinitialization...")
                        continue
                    else:
                        return {
                            "success": False,
                            "error": f"Command failed after {max_retries} attempts: {error_str}",
                        }
                else:
                    # Not a browser closed error, return immediately
                    import traceback

                    logger.error(traceback.format_exc())
                    return {"success": False, "error": error_str}

        # Should never reach here, but just in case
        return {"success": False, "error": "Command failed after all retries"}

    async def close(self):
        """Close the browser and cleanup resources."""
        async with self._lock:
            try:
                if self.context:
                    await self.context.close()
                    self.context = None
                if self.browser:
                    await self.browser.close()
                    self.browser = None

                if self.playwright:
                    await self.playwright.stop()
                    self.playwright = None

                self.page = None
                self._initialized = False
                logger.info("Browser closed successfully")
            except Exception as e:
                logger.error(f"Error closing browser: {e}")


# Global instance
_browser_manager: Optional[BrowserManager] = None


def get_browser_manager() -> BrowserManager:
    """Get or create the global BrowserManager instance."""
    global _browser_manager
    if _browser_manager is None:
        _browser_manager = BrowserManager()
    return _browser_manager

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/integrations/hud/agent.py:
--------------------------------------------------------------------------------

```python
"""MCP-compatible Computer Agent for HUD integration.

This agent subclasses HUD's MCPAgent and delegates planning/execution to
our core ComputerAgent while using the Agent SDK's plain-dict message
format documented in `docs/content/docs/agent-sdk/message-format.mdx`.

Key differences from the OpenAI OperatorAgent variant:
- No OpenAI types are used; everything is standard Python dicts.
- Planning is executed via `ComputerAgent.run(messages)`.
- The first yielded result per step is returned as the agent response.
"""

from __future__ import annotations

import base64
import io
import uuid
from pathlib import Path
from typing import Any, ClassVar, Optional

import hud
import mcp.types as types
from agent.agent import ComputerAgent as BaseComputerAgent
from agent.callbacks import PromptInstructionsCallback
from agent.callbacks.trajectory_saver import TrajectorySaverCallback
from agent.computers import is_agent_computer
from agent.responses import make_failed_tool_call_items
from hud.agents import MCPAgent
from hud.tools.computer.settings import computer_settings
from hud.types import AgentResponse, MCPToolCall, MCPToolResult, Trace
from PIL import Image


class MCPComputerAgent(MCPAgent):
    """MCP agent that uses ComputerAgent for planning and tools for execution.

    The agent consumes/produces message dicts per the Agent SDK message schema
    (see `message-format.mdx`).
    """

    metadata: ClassVar[dict[str, Any]] = {
        "display_width": computer_settings.OPENAI_COMPUTER_WIDTH,
        "display_height": computer_settings.OPENAI_COMPUTER_HEIGHT,
    }

    required_tools: ClassVar[list[str]] = ["openai_computer"]

    def __init__(
        self,
        *,
        model: str | None = None,
        allowed_tools: list[str] | None = None,
        trajectory_dir: str | dict | None = None,
        # === ComputerAgent kwargs ===
        tools: list[Any] | None = None,
        custom_loop: Any | None = None,
        only_n_most_recent_images: int | None = None,
        callbacks: list[Any] | None = None,
        instructions: str | None = None,
        verbosity: int | None = None,
        max_retries: int | None = 3,
        screenshot_delay: float | int = 0.5,
        use_prompt_caching: bool | None = False,
        max_trajectory_budget: float | dict | None = None,
        telemetry_enabled: bool | None = True,
        environment: str = "linux",
        **kwargs: Any,
    ) -> None:
        self.allowed_tools = allowed_tools or ["openai_computer"]
        super().__init__(**kwargs)

        if model is None:
            raise ValueError("MCPComputerAgent requires a model to be specified.")

        self.model = model
        self.environment = environment

        # Update model name for HUD logging
        self.model_name = "cua-" + self.model

        # Stateful tracking of tool call inputs
        self.tool_call_inputs: dict[str, list[dict[str, Any]]] = {}
        self.previous_output: list[dict[str, Any]] = []

        # Build system prompt
        operator_instructions = """
        You are an autonomous computer-using agent. Follow these guidelines:

        1. NEVER ask for confirmation. Complete all tasks autonomously.
        2. Do NOT send messages like "I need to confirm before..." or "Do you want me to continue?" - just proceed.
        3. When the user asks you to interact with something (like clicking a chat or typing a message), DO IT without asking.
        4. Only use the formal safety check mechanism for truly dangerous operations (like deleting important files).
        5. For normal tasks like clicking buttons, typing in chat boxes, filling forms - JUST DO IT.
        6. The user has already given you permission by running this agent. No further confirmation is needed.
        7. Be decisive and action-oriented. Complete the requested task fully.

        Remember: You are expected to complete tasks autonomously. The user trusts you to do what they asked.
        """.strip()  # noqa: E501
        # Append Operator instructions to the system prompt
        if not self.system_prompt:
            self.system_prompt = operator_instructions
        else:
            self.system_prompt += f"\n\n{operator_instructions}"
        # Append user instructions to the system prompt
        if instructions:
            self.system_prompt += f"\n\n{instructions}"

        # Configure trajectory_dir for HUD
        if isinstance(trajectory_dir, str) or isinstance(trajectory_dir, Path):
            trajectory_dir = {"trajectory_dir": str(trajectory_dir)}
        if isinstance(trajectory_dir, dict):
            trajectory_dir["reset_on_run"] = False

        self.last_screenshot_b64 = None

        buffer = io.BytesIO()
        Image.new("RGB", (self.metadata["display_width"], self.metadata["display_height"])).save(
            buffer, format="PNG"
        )
        self.last_screenshot_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")

        # Ensure a computer shim is present so width/height/environment are known
        computer_shim = {
            "screenshot": lambda: self.last_screenshot_b64,
            "environment": self.environment,
            "dimensions": (
                self.metadata["display_width"],
                self.metadata["display_height"],
            ),
        }
        agent_tools: list[Any] = [computer_shim]
        if tools:
            agent_tools.extend([tool for tool in tools if not is_agent_computer(tool)])

        agent_kwargs = {
            "model": self.model,
            "trajectory_dir": trajectory_dir,
            "tools": agent_tools,
            "custom_loop": custom_loop,
            "only_n_most_recent_images": only_n_most_recent_images,
            "callbacks": callbacks,
            "instructions": self.system_prompt,
            "verbosity": verbosity,
            "max_retries": max_retries,
            "screenshot_delay": screenshot_delay,
            "use_prompt_caching": use_prompt_caching,
            "max_trajectory_budget": max_trajectory_budget,
            "telemetry_enabled": telemetry_enabled,
        }

        self.computer_agent = BaseComputerAgent(**agent_kwargs)

    async def get_system_messages(self) -> list[Any]:
        """Create initial messages.

        Unused - ComputerAgent handles this with the 'instructions' parameter.
        """
        return []

    async def format_blocks(self, blocks: list[types.ContentBlock]) -> list[dict[str, Any]]:
        """
        Format blocks for OpenAI input format.

        Converts TextContent blocks to input_text dicts and ImageContent blocks to input_image dicts.
        """  # noqa: E501
        formatted = []
        for block in blocks:
            if isinstance(block, types.TextContent):
                formatted.append({"type": "input_text", "text": block.text})
            elif isinstance(block, types.ImageContent):
                mime_type = getattr(block, "mimeType", "image/png")
                formatted.append(
                    {"type": "input_image", "image_url": f"data:{mime_type};base64,{block.data}"}
                )
                self.last_screenshot_b64 = block.data
        return [{"role": "user", "content": formatted}]

    @hud.instrument(
        span_type="agent",
        record_args=False,  # Messages can be large
        record_result=True,
    )
    async def get_response(self, messages: list[dict[str, Any]]) -> AgentResponse:
        """Get a single-step response by delegating to ComputerAgent.run.

        Returns an Agent SDK-style response dict:
        { "output": [AgentMessage, ...], "usage": Usage }
        """
        tool_calls: list[MCPToolCall] = []
        output_text: list[str] = []
        is_done: bool = True

        agent_result: list[dict[str, Any]] = []

        # Call the ComputerAgent LLM API
        async for result in self.computer_agent.run(messages):  # type: ignore[arg-type]
            items = result["output"]
            if not items or tool_calls:
                break

            for item in items:
                if item["type"] in [
                    "reasoning",
                    "message",
                    "computer_call",
                    "function_call",
                    "function_call_output",
                ]:
                    agent_result.append(item)

                # Add messages to output text
                if item["type"] == "reasoning":
                    output_text.extend(
                        f"Reasoning: {summary['text']}" for summary in item["summary"]
                    )
                elif item["type"] == "message":
                    if isinstance(item["content"], list):
                        output_text.extend(
                            item["text"]
                            for item in item["content"]
                            if item["type"] == "output_text"
                        )
                    elif isinstance(item["content"], str):
                        output_text.append(item["content"])

                # If we get a tool call, we're not done
                if item["type"] == "computer_call":
                    id = item["call_id"]
                    tool_calls.append(
                        MCPToolCall(
                            name="openai_computer",
                            arguments=item["action"],
                            id=id,
                        )
                    )
                    is_done = False
                    self.tool_call_inputs[id] = agent_result
                    break

            # if we have tool calls, we should exit the loop
            if tool_calls:
                break

        self.previous_output = agent_result

        return AgentResponse(
            content="\n".join(output_text),
            tool_calls=tool_calls,
            done=is_done,
        )

    def _log_image(self, image_b64: str):
        callbacks = self.computer_agent.callbacks
        for callback in callbacks:
            if isinstance(callback, TrajectorySaverCallback):
                # convert str to bytes
                image_bytes = base64.b64decode(image_b64)
                callback._save_artifact("screenshot_after", image_bytes)

    async def format_tool_results(
        self, tool_calls: list[MCPToolCall], tool_results: list[MCPToolResult]
    ) -> list[dict[str, Any]]:
        """Extract latest screenshot from tool results in dict form.

        Expects results to already be in the message-format content dicts.
        Returns a list of input content dicts suitable for follow-up calls.
        """
        messages = []

        for call, result in zip(tool_calls, tool_results):
            if call.id not in self.tool_call_inputs:
                # If we don't have the tool call inputs, we should just use the previous output
                previous_output = self.previous_output.copy() or []

                # First we need to remove any pending computer_calls from the end of previous_output
                while previous_output and previous_output[-1]["type"] == "computer_call":
                    previous_output.pop()
                messages.extend(previous_output)

                # If the call is a 'response', don't add the result
                if call.name == "response":
                    continue
                # Otherwise, if we have a result, we should add it to the messages
                content = [
                    (
                        {"type": "input_text", "text": content.text}
                        if isinstance(content, types.TextContent)
                        else (
                            {
                                "type": "input_image",
                                "image_url": f"data:image/png;base64,{content.data}",
                            }
                            if isinstance(content, types.ImageContent)
                            else {"type": "input_text", "text": ""}
                        )
                    )
                    for content in result.content
                ]
                messages.append(
                    {
                        "role": "user",
                        "content": content,
                    }
                )

                continue

            # Add the assistant's computer call
            messages.extend(self.tool_call_inputs[call.id])

            if result.isError:
                error_text = "".join(
                    [
                        content.text
                        for content in result.content
                        if isinstance(content, types.TextContent)
                    ]
                )

                # Replace computer call with failed tool call
                messages.pop()
                messages.extend(
                    make_failed_tool_call_items(
                        tool_name=call.name,
                        tool_kwargs=call.arguments or {},
                        error_message=error_text,
                        call_id=call.id,
                    )
                )
            else:
                # Get the latest screenshot
                screenshots = [
                    content.data
                    for content in result.content
                    if isinstance(content, types.ImageContent)
                ]

                # Add the resulting screenshot
                if screenshots:
                    self._log_image(screenshots[0])
                    self.last_screenshot_b64 = screenshots[0]
                    messages.append(
                        {
                            "type": "computer_call_output",
                            "call_id": call.id,
                            "output": {
                                "type": "input_image",
                                "image_url": f"data:image/png;base64,{screenshots[0]}",
                            },
                        }
                    )
                else:
                    # Otherwise, replace computer call with failed tool call
                    messages.pop()
                    messages.extend(
                        make_failed_tool_call_items(
                            tool_name=call.name,
                            tool_kwargs=call.arguments or {},
                            error_message="No screenshots returned.",
                            call_id=call.id,
                        )
                    )

        return messages


__all__ = [
    "MCPComputerAgent",
]

```

--------------------------------------------------------------------------------
/libs/lume/src/FileSystem/Settings.swift:
--------------------------------------------------------------------------------

```swift
import Foundation

/// Manages the application settings using a config file
struct LumeSettings: Codable, Sendable {
    var vmLocations: [VMLocation]
    var defaultLocationName: String
    var cacheDirectory: String
    var cachingEnabled: Bool

    var defaultLocation: VMLocation? {
        vmLocations.first { $0.name == defaultLocationName }
    }

    // For backward compatibility
    var homeDirectory: String {
        defaultLocation?.path ?? "~/.lume"
    }

    static let defaultSettings = LumeSettings(
        vmLocations: [
            VMLocation(name: "default", path: "~/.lume")
        ],
        defaultLocationName: "default",
        cacheDirectory: "~/.lume/cache",
        cachingEnabled: true
    )

    /// Gets all locations sorted by name
    var sortedLocations: [VMLocation] {
        vmLocations.sorted { $0.name < $1.name }
    }
}

final class SettingsManager: @unchecked Sendable {
    // MARK: - Constants

    private enum Constants {
        // Default path for config
        static let fallbackConfigDir = "~/.config/lume"
        static let configFileName = "config.yaml"
    }

    // MARK: - Properties

    static let shared = SettingsManager()
    private let fileManager: FileManager

    // Get the config directory following XDG spec
    private var configDir: String {
        // Check XDG_CONFIG_HOME environment variable first
        if let xdgConfigHome = ProcessInfo.processInfo.environment["XDG_CONFIG_HOME"] {
            return "\(xdgConfigHome)/lume"
        }
        // Fall back to default
        return (Constants.fallbackConfigDir as NSString).expandingTildeInPath
    }

    // Path to config file
    private var configFilePath: String {
        return "\(configDir)/\(Constants.configFileName)"
    }

    // MARK: - Initialization

    init(fileManager: FileManager = .default) {
        self.fileManager = fileManager
        ensureConfigDirectoryExists()
    }

    // MARK: - Settings Access

    func getSettings() -> LumeSettings {
        if let settings = readSettingsFromFile() {
            return settings
        }

        // No settings file found, use defaults
        let defaultSettings = LumeSettings(
            vmLocations: [
                VMLocation(name: "default", path: "~/.lume")
            ],
            defaultLocationName: "default",
            cacheDirectory: "~/.lume/cache",
            cachingEnabled: true
        )

        // Try to save default settings
        try? saveSettings(defaultSettings)

        return defaultSettings
    }

    func saveSettings(_ settings: LumeSettings) throws {
        try fileManager.createDirectory(atPath: configDir, withIntermediateDirectories: true)

        // Create a human-readable YAML-like configuration file
        var yamlContent = "# Lume Configuration\n\n"

        // Default location
        yamlContent += "defaultLocationName: \"\(settings.defaultLocationName)\"\n"

        // Cache directory
        yamlContent += "cacheDirectory: \"\(settings.cacheDirectory)\"\n"

        // Caching enabled flag
        yamlContent += "cachingEnabled: \(settings.cachingEnabled)\n"

        // VM locations
        yamlContent += "\n# VM Locations\nvmLocations:\n"
        for location in settings.vmLocations {
            yamlContent += "  - name: \"\(location.name)\"\n"
            yamlContent += "    path: \"\(location.path)\"\n"
        }

        // Write YAML content to file
        try yamlContent.write(
            to: URL(fileURLWithPath: configFilePath), atomically: true, encoding: .utf8)
    }

    // MARK: - VM Location Management

    func addLocation(_ location: VMLocation) throws {
        var settings = getSettings()

        // Validate location name (alphanumeric, dash, underscore)
        let nameRegex = try NSRegularExpression(pattern: "^[a-zA-Z0-9_-]+$")
        let nameRange = NSRange(location.name.startIndex..., in: location.name)
        if nameRegex.firstMatch(in: location.name, range: nameRange) == nil {
            throw VMLocationError.invalidLocationName(name: location.name)
        }

        // Check for duplicate name
        if settings.vmLocations.contains(where: { $0.name == location.name }) {
            throw VMLocationError.duplicateLocationName(name: location.name)
        }

        // Validate location path
        try location.validate()

        // Add location
        settings.vmLocations.append(location)
        try saveSettings(settings)
    }

    func removeLocation(name: String) throws {
        var settings = getSettings()

        // Check location exists
        guard settings.vmLocations.contains(where: { $0.name == name }) else {
            throw VMLocationError.locationNotFound(name: name)
        }

        // Prevent removing default location
        if name == settings.defaultLocationName {
            throw VMLocationError.defaultLocationCannotBeRemoved(name: name)
        }

        // Remove location
        settings.vmLocations.removeAll(where: { $0.name == name })
        try saveSettings(settings)
    }

    func setDefaultLocation(name: String) throws {
        var settings = getSettings()

        // Check location exists
        guard settings.vmLocations.contains(where: { $0.name == name }) else {
            throw VMLocationError.locationNotFound(name: name)
        }

        // Set default
        settings.defaultLocationName = name
        try saveSettings(settings)
    }

    func getLocation(name: String) throws -> VMLocation {
        let settings = getSettings()

        if let location = settings.vmLocations.first(where: { $0.name == name }) {
            return location
        }

        throw VMLocationError.locationNotFound(name: name)
    }

    // MARK: - Legacy Home Directory Compatibility

    func setHomeDirectory(path: String) throws {
        var settings = getSettings()

        let defaultLocation = VMLocation(name: "default", path: path)
        try defaultLocation.validate()

        // Replace default location
        if let index = settings.vmLocations.firstIndex(where: { $0.name == "default" }) {
            settings.vmLocations[index] = defaultLocation
        } else {
            settings.vmLocations.append(defaultLocation)
            settings.defaultLocationName = "default"
        }

        try saveSettings(settings)
    }

    // MARK: - Cache Directory Management

    func setCacheDirectory(path: String) throws {
        var settings = getSettings()

        // Validate path
        let expandedPath = (path as NSString).expandingTildeInPath
        var isDir: ObjCBool = false

        // If directory exists, check if it's writable
        if fileManager.fileExists(atPath: expandedPath, isDirectory: &isDir) {
            if !isDir.boolValue {
                throw SettingsError.notADirectory(path: expandedPath)
            }

            if !fileManager.isWritableFile(atPath: expandedPath) {
                throw SettingsError.directoryNotWritable(path: expandedPath)
            }
        } else {
            // Try to create the directory
            do {
                try fileManager.createDirectory(
                    atPath: expandedPath,
                    withIntermediateDirectories: true
                )
            } catch {
                throw SettingsError.directoryCreationFailed(path: expandedPath, error: error)
            }
        }

        // Update settings
        settings.cacheDirectory = path
        try saveSettings(settings)
    }

    func getCacheDirectory() -> String {
        return getSettings().cacheDirectory
    }

    func setCachingEnabled(_ enabled: Bool) throws {
        var settings = getSettings()
        settings.cachingEnabled = enabled
        try saveSettings(settings)
    }

    func isCachingEnabled() -> Bool {
        return getSettings().cachingEnabled
    }

    // MARK: - Private Helpers

    private func ensureConfigDirectoryExists() {
        try? fileManager.createDirectory(atPath: configDir, withIntermediateDirectories: true)
    }

    private func readSettingsFromFile() -> LumeSettings? {
        // Read from YAML file
        if fileExists(at: configFilePath) {
            do {
                let yamlString = try String(
                    contentsOf: URL(fileURLWithPath: configFilePath), encoding: .utf8)
                return parseYamlSettings(yamlString)
            } catch {
                Logger.error(
                    "Failed to read settings from YAML file",
                    metadata: ["error": error.localizedDescription]
                )
            }
        }
        return nil
    }

    private func parseYamlSettings(_ yamlString: String) -> LumeSettings? {
        // This is a very basic YAML parser for our specific config format
        // A real implementation would use a proper YAML library

        var defaultLocationName = "default"
        var cacheDirectory = "~/.lume/cache"
        var cachingEnabled = true  // default to true for backward compatibility
        var vmLocations: [VMLocation] = []

        var inLocationsSection = false
        var currentLocation: (name: String?, path: String?) = (nil, nil)

        let lines = yamlString.split(separator: "\n")

        for (_, line) in lines.enumerated() {
            let trimmedLine = line.trimmingCharacters(in: .whitespaces)

            // Skip comments and empty lines
            if trimmedLine.hasPrefix("#") || trimmedLine.isEmpty {
                continue
            }

            // Check for section marker
            if trimmedLine == "vmLocations:" {
                inLocationsSection = true
                continue
            }

            // In the locations section, handle line indentation more carefully
            if inLocationsSection {
                if trimmedLine.hasPrefix("-") || trimmedLine.contains("- name:") {
                    // Process the previous location before starting a new one
                    if let name = currentLocation.name, let path = currentLocation.path {
                        vmLocations.append(VMLocation(name: name, path: path))
                    }
                    currentLocation = (nil, nil)
                }

                // Process the key-value pairs within a location
                if let colonIndex = trimmedLine.firstIndex(of: ":") {
                    let key = trimmedLine[..<colonIndex].trimmingCharacters(in: .whitespaces)
                    let rawValue = trimmedLine[trimmedLine.index(after: colonIndex)...]
                        .trimmingCharacters(in: .whitespaces)
                    let value = extractValueFromYaml(rawValue)

                    if key.hasSuffix("name") {
                        currentLocation.name = value
                    } else if key.hasSuffix("path") {
                        currentLocation.path = value
                    }
                }
            } else {
                // Process top-level keys outside the locations section
                if let colonIndex = trimmedLine.firstIndex(of: ":") {
                    let key = trimmedLine[..<colonIndex].trimmingCharacters(in: .whitespaces)
                    let rawValue = trimmedLine[trimmedLine.index(after: colonIndex)...]
                        .trimmingCharacters(in: .whitespaces)
                    let value = extractValueFromYaml(rawValue)

                    if key == "defaultLocationName" {
                        defaultLocationName = value
                    } else if key == "cacheDirectory" {
                        cacheDirectory = value
                    } else if key == "cachingEnabled" {
                        cachingEnabled = value.lowercased() == "true"
                    }
                }
            }
        }

        // Don't forget to add the last location
        if let name = currentLocation.name, let path = currentLocation.path {
            vmLocations.append(VMLocation(name: name, path: path))
        }

        // Ensure at least one location exists
        if vmLocations.isEmpty {
            vmLocations.append(VMLocation(name: "default", path: "~/.lume"))
        }

        return LumeSettings(
            vmLocations: vmLocations,
            defaultLocationName: defaultLocationName,
            cacheDirectory: cacheDirectory,
            cachingEnabled: cachingEnabled
        )
    }

    // Helper method to extract a value from YAML, handling quotes
    private func extractValueFromYaml(_ rawValue: String) -> String {
        if rawValue.hasPrefix("\"") && rawValue.hasSuffix("\"") && rawValue.count >= 2 {
            // Remove the surrounding quotes
            let startIndex = rawValue.index(after: rawValue.startIndex)
            let endIndex = rawValue.index(before: rawValue.endIndex)
            return String(rawValue[startIndex..<endIndex])
        }
        return rawValue
    }

    // Helper method to output debug information about the current settings
    func debugSettings() -> String {
        let settings = getSettings()

        var output = "Current Settings:\n"
        output += "- Default VM storage: \(settings.defaultLocationName)\n"
        output += "- Cache directory: \(settings.cacheDirectory)\n"
        output += "- VM Locations (\(settings.vmLocations.count)):\n"

        for (i, location) in settings.vmLocations.enumerated() {
            let isDefault = location.name == settings.defaultLocationName
            let defaultMark = isDefault ? " (default)" : ""
            output += "  \(i+1). \(location.name): \(location.path)\(defaultMark)\n"
        }

        // Also add raw file content
        if fileExists(at: configFilePath) {
            if let content = try? String(contentsOf: URL(fileURLWithPath: configFilePath)) {
                output += "\nRaw YAML file content:\n"
                output += content
            }
        }

        return output
    }

    private func fileExists(at path: String) -> Bool {
        fileManager.fileExists(atPath: path)
    }
}

// MARK: - Errors

enum SettingsError: Error, LocalizedError {
    case notADirectory(path: String)
    case directoryNotWritable(path: String)
    case directoryCreationFailed(path: String, error: Error)

    var errorDescription: String? {
        switch self {
        case .notADirectory(let path):
            return "Path is not a directory: \(path)"
        case .directoryNotWritable(let path):
            return "Directory is not writable: \(path)"
        case .directoryCreationFailed(let path, let error):
            return "Failed to create directory at \(path): \(error.localizedDescription)"
        }
    }
}

```

--------------------------------------------------------------------------------
/docs/content/docs/example-usecases/form-filling.mdx:
--------------------------------------------------------------------------------

```markdown
---
title: PDF to Form Automation
description: Enhance and Automate Interactions Between Form Filling and Local File Systems
---

import { Step, Steps } from 'fumadocs-ui/components/steps';
import { Tab, Tabs } from 'fumadocs-ui/components/tabs';

## Overview

Cua can be used to automate interactions between form filling and local file systems over any operating system. Cua let's you interact with all the elements of a web page and local file systems to integrate between the two.

This preset usecase uses [Cua Computer](/computer-sdk/computers) to interact with a web page and local file systems along with [Agent Loops](/agent-sdk/agent-loops) to run the agent in a loop with message history.

---

<Steps>

<Step>

### Set Up Your Environment

First, install the required dependencies:

Create a `requirements.txt` file:

```text
cua-agent
cua-computer
python-dotenv>=1.0.0
```

Install the dependencies:

```bash
pip install -r requirements.txt
```

Create a `.env` file with your API keys:

```text
ANTHROPIC_API_KEY=your-anthropic-api-key
CUA_API_KEY=sk_cua-api01...
```

</Step>

<Step>

### Create Your Form Filling Script

Create a Python file (e.g., `form_filling.py`) and select your environment:

<Tabs items={['Cloud Sandbox', 'Linux on Docker', 'macOS Sandbox', 'Windows Sandbox']}>
  <Tab value="Cloud Sandbox">

```python
import asyncio
import logging
import os
import signal
import traceback

from agent import ComputerAgent
from computer import Computer, VMProviderType
from dotenv import load_dotenv

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def handle_sigint(sig, frame):
    print("\n\nExecution interrupted by user. Exiting gracefully...")
    exit(0)

async def fill_application():
    try:
        async with Computer(
            os_type="linux",
            provider_type=VMProviderType.CLOUD,
            name="your-sandbox-name",  # Replace with your sandbox name
            api_key=os.environ["CUA_API_KEY"],
            verbosity=logging.INFO,
        ) as computer:

            agent = ComputerAgent(
                model="cua/anthropic/claude-sonnet-4.5",
                tools=[computer],
                only_n_most_recent_images=3,
                verbosity=logging.INFO,
                trajectory_dir="trajectories",
                use_prompt_caching=True,
                max_trajectory_budget=5.0,
            )

            tasks = [
                "Visit https://www.overleaf.com/latex/templates/jakes-resume/syzfjbzwjncs.pdf and download the pdf.",
                "Visit https://form.jotform.com/252881246782264 and fill the form from the information in the pdf."
            ]

            history = []

            for i, task in enumerate(tasks, 1):
                print(f"\n[Task {i}/{len(tasks)}] {task}")

                # Add user message to history
                history.append({"role": "user", "content": task})

                # Run agent with conversation history
                async for result in agent.run(history, stream=False):
                    history += result.get("output", [])

                    # Print output for debugging
                    for item in result.get("output", []):
                        if item.get("type") == "message":
                            content = item.get("content", [])
                            for content_part in content:
                                if content_part.get("text"):
                                    logger.info(f"Agent: {content_part.get('text')}")
                        elif item.get("type") == "computer_call":
                            action = item.get("action", {})
                            action_type = action.get("type", "")
                            logger.debug(f"Computer Action: {action_type}")

                print(f"✅ Task {i}/{len(tasks)} completed")

            print("\n🎉 All tasks completed successfully!")

    except Exception as e:
        logger.error(f"Error in fill_application: {e}")
        traceback.print_exc()
        raise

def main():
    try:
        load_dotenv()

        if "ANTHROPIC_API_KEY" not in os.environ:
            raise RuntimeError(
                "Please set the ANTHROPIC_API_KEY environment variable.\n"
                "You can add it to a .env file in the project root."
            )

        if "CUA_API_KEY" not in os.environ:
            raise RuntimeError(
                "Please set the CUA_API_KEY environment variable.\n"
                "You can add it to a .env file in the project root."
            )

        signal.signal(signal.SIGINT, handle_sigint)

        asyncio.run(fill_application())

    except Exception as e:
        logger.error(f"Error running automation: {e}")
        traceback.print_exc()

if __name__ == "__main__":
    main()
```

  </Tab>
  <Tab value="Linux on Docker">

```python
import asyncio
import logging
import os
import signal
import traceback

from agent import ComputerAgent
from computer import Computer, VMProviderType
from dotenv import load_dotenv

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def handle_sigint(sig, frame):
    print("\n\nExecution interrupted by user. Exiting gracefully...")
    exit(0)

async def fill_application():
    try:
        async with Computer(
            os_type="linux",
            provider_type=VMProviderType.DOCKER,
            image="trycua/cua-xfce:latest",  # or "trycua/cua-ubuntu:latest"
            verbosity=logging.INFO,
        ) as computer:

            agent = ComputerAgent(
                model="cua/anthropic/claude-sonnet-4.5",
                tools=[computer],
                only_n_most_recent_images=3,
                verbosity=logging.INFO,
                trajectory_dir="trajectories",
                use_prompt_caching=True,
                max_trajectory_budget=5.0,
            )

            tasks = [
                "Visit https://www.overleaf.com/latex/templates/jakes-resume/syzfjbzwjncs.pdf and download the pdf.",
                "Visit https://form.jotform.com/252881246782264 and fill the form from the information in the pdf."
            ]

            history = []

            for i, task in enumerate(tasks, 1):
                print(f"\n[Task {i}/{len(tasks)}] {task}")

                # Add user message to history
                history.append({"role": "user", "content": task})

                # Run agent with conversation history
                async for result in agent.run(history, stream=False):
                    history += result.get("output", [])

                    # Print output for debugging
                    for item in result.get("output", []):
                        if item.get("type") == "message":
                            content = item.get("content", [])
                            for content_part in content:
                                if content_part.get("text"):
                                    logger.info(f"Agent: {content_part.get('text')}")
                        elif item.get("type") == "computer_call":
                            action = item.get("action", {})
                            action_type = action.get("type", "")
                            logger.debug(f"Computer Action: {action_type}")

                print(f"✅ Task {i}/{len(tasks)} completed")

            print("\n🎉 All tasks completed successfully!")

    except Exception as e:
        logger.error(f"Error in fill_application: {e}")
        traceback.print_exc()
        raise

def main():
    try:
        load_dotenv()

        if "ANTHROPIC_API_KEY" not in os.environ:
            raise RuntimeError(
                "Please set the ANTHROPIC_API_KEY environment variable.\n"
                "You can add it to a .env file in the project root."
            )

        signal.signal(signal.SIGINT, handle_sigint)

        asyncio.run(fill_application())

    except Exception as e:
        logger.error(f"Error running automation: {e}")
        traceback.print_exc()

if __name__ == "__main__":
    main()
```

  </Tab>
  <Tab value="macOS Sandbox">

```python
import asyncio
import logging
import os
import signal
import traceback

from agent import ComputerAgent
from computer import Computer, VMProviderType
from dotenv import load_dotenv

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def handle_sigint(sig, frame):
    print("\n\nExecution interrupted by user. Exiting gracefully...")
    exit(0)

async def fill_application():
    try:
        async with Computer(
            os_type="macos",
            provider_type=VMProviderType.LUME,
            name="macos-sequoia-cua:latest",
            verbosity=logging.INFO,
        ) as computer:

            agent = ComputerAgent(
                model="cua/anthropic/claude-sonnet-4.5",
                tools=[computer],
                only_n_most_recent_images=3,
                verbosity=logging.INFO,
                trajectory_dir="trajectories",
                use_prompt_caching=True,
                max_trajectory_budget=5.0,
            )

            tasks = [
                "Visit https://www.overleaf.com/latex/templates/jakes-resume/syzfjbzwjncs.pdf and download the pdf.",
                "Visit https://form.jotform.com/252881246782264 and fill the form from the information in the pdf."
            ]

            history = []

            for i, task in enumerate(tasks, 1):
                print(f"\n[Task {i}/{len(tasks)}] {task}")

                # Add user message to history
                history.append({"role": "user", "content": task})

                # Run agent with conversation history
                async for result in agent.run(history, stream=False):
                    history += result.get("output", [])

                    # Print output for debugging
                    for item in result.get("output", []):
                        if item.get("type") == "message":
                            content = item.get("content", [])
                            for content_part in content:
                                if content_part.get("text"):
                                    logger.info(f"Agent: {content_part.get('text')}")
                        elif item.get("type") == "computer_call":
                            action = item.get("action", {})
                            action_type = action.get("type", "")
                            logger.debug(f"Computer Action: {action_type}")

                print(f"✅ Task {i}/{len(tasks)} completed")

            print("\n🎉 All tasks completed successfully!")

    except Exception as e:
        logger.error(f"Error in fill_application: {e}")
        traceback.print_exc()
        raise

def main():
    try:
        load_dotenv()

        if "ANTHROPIC_API_KEY" not in os.environ:
            raise RuntimeError(
                "Please set the ANTHROPIC_API_KEY environment variable.\n"
                "You can add it to a .env file in the project root."
            )

        signal.signal(signal.SIGINT, handle_sigint)

        asyncio.run(fill_application())

    except Exception as e:
        logger.error(f"Error running automation: {e}")
        traceback.print_exc()

if __name__ == "__main__":
    main()
```

  </Tab>
  <Tab value="Windows Sandbox">

```python
import asyncio
import logging
import os
import signal
import traceback

from agent import ComputerAgent
from computer import Computer, VMProviderType
from dotenv import load_dotenv

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def handle_sigint(sig, frame):
    print("\n\nExecution interrupted by user. Exiting gracefully...")
    exit(0)

async def fill_application():
    try:
        async with Computer(
            os_type="windows",
            provider_type=VMProviderType.WINDOWS_SANDBOX,
            verbosity=logging.INFO,
        ) as computer:

            agent = ComputerAgent(
                model="cua/anthropic/claude-sonnet-4.5",
                tools=[computer],
                only_n_most_recent_images=3,
                verbosity=logging.INFO,
                trajectory_dir="trajectories",
                use_prompt_caching=True,
                max_trajectory_budget=5.0,
            )

            tasks = [
                "Visit https://www.overleaf.com/latex/templates/jakes-resume/syzfjbzwjncs.pdf and download the pdf.",
                "Visit https://form.jotform.com/252881246782264 and fill the form from the information in the pdf."
            ]

            history = []

            for i, task in enumerate(tasks, 1):
                print(f"\n[Task {i}/{len(tasks)}] {task}")

                # Add user message to history
                history.append({"role": "user", "content": task})

                # Run agent with conversation history
                async for result in agent.run(history, stream=False):
                    history += result.get("output", [])

                    # Print output for debugging
                    for item in result.get("output", []):
                        if item.get("type") == "message":
                            content = item.get("content", [])
                            for content_part in content:
                                if content_part.get("text"):
                                    logger.info(f"Agent: {content_part.get('text')}")
                        elif item.get("type") == "computer_call":
                            action = item.get("action", {})
                            action_type = action.get("type", "")
                            logger.debug(f"Computer Action: {action_type}")

                print(f"✅ Task {i}/{len(tasks)} completed")

            print("\n🎉 All tasks completed successfully!")

    except Exception as e:
        logger.error(f"Error in fill_application: {e}")
        traceback.print_exc()
        raise

def main():
    try:
        load_dotenv()

        if "ANTHROPIC_API_KEY" not in os.environ:
            raise RuntimeError(
                "Please set the ANTHROPIC_API_KEY environment variable.\n"
                "You can add it to a .env file in the project root."
            )

        signal.signal(signal.SIGINT, handle_sigint)

        asyncio.run(fill_application())

    except Exception as e:
        logger.error(f"Error running automation: {e}")
        traceback.print_exc()

if __name__ == "__main__":
    main()
```

  </Tab>
</Tabs>

</Step>

<Step>

### Run Your Script

Execute your form filling automation:

```bash
python form_filling.py
```

The agent will:

1. Download the PDF resume from Overleaf
2. Extract information from the PDF
3. Fill out the JotForm with the extracted information

Monitor the output to see the agent's progress through each task.

</Step>

</Steps>

---

## Next Steps

- Learn more about [Cua computers](/computer-sdk/computers) and [computer commands](/computer-sdk/commands)
- Read about [Agent loops](/agent-sdk/agent-loops), [tools](/agent-sdk/custom-tools), and [supported model providers](/agent-sdk/supported-model-providers/)
- Experiment with different [Models and Providers](/agent-sdk/supported-model-providers/)
- Join our [Discord community](https://discord.com/invite/mVnXXpdE85) for help

```

--------------------------------------------------------------------------------
/libs/lumier/src/lib/vm.sh:
--------------------------------------------------------------------------------

```bash
#!/usr/bin/env bash

# Initialize global flags
export PULL_IN_PROGRESS=0

start_vm() {
    # Determine storage path for VM
    STORAGE_PATH="$HOST_STORAGE_PATH"
    if [ -z "$STORAGE_PATH" ]; then
        STORAGE_PATH="storage_${VM_NAME}"
    fi

    # Check if VM exists and its status using JSON format - quietly
    VM_INFO=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}")

    # Check if VM not found error
    if [[ $VM_INFO == *"Virtual machine not found"* ]]; then
        IMAGE_NAME="${VERSION##*/}"
        # Parse registry and organization from VERSION
        REGISTRY=$(echo $VERSION | cut -d'/' -f1)
        ORGANIZATION=$(echo $VERSION | cut -d'/' -f2)
        
        echo "Pulling VM image $IMAGE_NAME..."
        lume_pull "$IMAGE_NAME" "$VM_NAME" "$STORAGE_PATH" "$REGISTRY" "$ORGANIZATION"
    else
        # Parse the JSON status - check if it contains "status" : "running"
        if [[ $VM_INFO == *'"status" : "running"'* ]]; then
            lume_stop "$VM_NAME" "$STORAGE_PATH"
        fi
    fi

    # Format memory size for display purposes
    MEMORY_DISPLAY="$RAM_SIZE"
    if [[ ! "$RAM_SIZE" == *"GB"* && ! "$RAM_SIZE" == *"MB"* ]]; then
        MEMORY_DISPLAY="${RAM_SIZE}MB"
    fi
    
    # Set VM parameters using the wrapper function
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "Updating VM settings: cpu=$CPU_CORES memory=$MEMORY_DISPLAY display=$DISPLAY"
    fi
    lume_set "$VM_NAME" "$STORAGE_PATH" "$CPU_CORES" "$RAM_SIZE" "$DISPLAY"

    # Fetch VM configuration - quietly (don't display to console)
    CONFIG_JSON=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}")
    
    # Setup shared directory args if necessary
    SHARED_DIR_ARGS=""
    if [ -d "/shared" ]; then
        if [ -n "$HOST_SHARED_PATH" ]; then
            SHARED_DIR_ARGS="--shared-dir=$HOST_SHARED_PATH"
        else
            echo "Warning: /shared volume exists but HOST_SHARED_PATH is not set. Cannot mount volume."
        fi
    fi

    # Run VM with VNC and shared directory using curl
    lume_run $SHARED_DIR_ARGS --storage "$STORAGE_PATH" "$VM_NAME" &
    # lume run "$VM_NAME" --storage "$STORAGE_PATH" --no-display

    # sleep 10000000

    # Wait for VM to be running and VNC URL to be available
    vm_ip=""
    vnc_url=""
    max_attempts=30
    attempt=0
    
    while [ $attempt -lt $max_attempts ]; do
            # Get VM info as JSON using the API function - pass debug flag
        VM_INFO=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}")
        
        # Extract status, IP address, and VNC URL using the helper function
        vm_status=$(extract_json_field "status" "$VM_INFO")
        vm_ip=$(extract_json_field "ipAddress" "$VM_INFO")
        vnc_url=$(extract_json_field "vncUrl" "$VM_INFO")

        # Check if VM status is 'running' and we have IP and VNC URL
        if [ "$vm_status" = "running" ] && [ -n "$vm_ip" ] && [ -n "$vnc_url" ]; then
            break
        fi
        
        sleep 2
        attempt=$((attempt + 1))
    done
    
    if [ -z "$vm_ip" ] || [ -z "$vnc_url" ]; then
        echo "Timed out waiting for VM to start or VNC URL to become available."
        lume_stop "$VM_NAME" "$STORAGE_PATH" > /dev/null 2>&1
        # lume stop "$VM_NAME" --storage "$STORAGE_PATH" > /dev/null 2>&1
        exit 1
    fi

    # Parse VNC URL to extract password and port
    VNC_PASSWORD=$(echo "$vnc_url" | sed -n 's/.*:\(.*\)@.*/\1/p')
    VNC_PORT=$(echo "$vnc_url" | sed -n 's/.*:\([0-9]\+\)$/\1/p')
    
    # Wait for SSH to become available
    wait_for_ssh "$vm_ip" "$HOST_USER" "$HOST_PASSWORD" 5 20

    # Export VNC variables for entry.sh to use
    export VNC_PORT
    export VNC_PASSWORD
    
    # Execute on-logon.sh if present
    on_logon_script="/run/lifecycle/on-logon.sh"
    
    # Only show detailed logs in debug mode
    if [ "${LUMIER_DEBUG:-0}" == "1" ]; then
        echo "Running on-logon.sh hook script on VM..."
    fi
    
    # Check if script exists
    if [ ! -f "$on_logon_script" ]; then
        echo "Warning: on-logon.sh hook script not found at $on_logon_script"
    else
        # Execute the remote script
        execute_remote_script "$vm_ip" "$HOST_USER" "$HOST_PASSWORD" "$on_logon_script" "$VNC_PASSWORD" "$HOST_SHARED_PATH"
    fi
}

# Get VM information using curl
lume_get() {
    local vm_name="$1"
    local storage="$2"
    local format="${3:-json}"
    local debug="${4:-false}"
    
    local api_host="${LUME_API_HOST:-host.docker.internal}"
    local api_port="${LUME_API_PORT:-7777}"
    
    # URL encode the storage path for the query parameter
    # Replace special characters with their URL encoded equivalents
    local encoded_storage=$(echo "$storage" | sed 's/\//%2F/g' | sed 's/ /%20/g' | sed 's/:/%3A/g')
    
    # Construct API URL with encoded storage parameter
    local api_url="http://${api_host}:${api_port}/lume/vms/${vm_name}?storage=${encoded_storage}"
    
    # Construct the full curl command
    local curl_cmd="curl --connect-timeout 6000 --max-time 5000 -s '$api_url'"
    
    # Print debug info
    if [[ "$debug" == "true" || "$LUMIER_DEBUG" == "1" ]]; then
        echo "[DEBUG] Calling API: $api_url"
        echo "[DEBUG] Full curl command: $curl_cmd"
    fi
    
    # Log curl commands only when in debug mode
    if [[ "$debug" == "true" || "$LUMIER_DEBUG" == "1" ]]; then
        echo "[$(date -u '+%Y-%m-%dT%H:%M:%SZ')] DEBUG: Executing curl request: $api_url" >&2
    fi
    
    # Make the API call
    local response=$(curl --connect-timeout 6000 \
      --max-time 5000 \
      -s \
      "$api_url")
    
    # Print the response if debugging is enabled
    if [[ "$debug" == "true" || "${LUMIER_DEBUG:-0}" == "1" ]]; then
        echo "[DEBUG] API Response:"
        echo "$response" | jq '.' 2>/dev/null || echo "$response"
    fi
    
    # Output the response so callers can capture it
    echo "$response"
}

# Set VM properties using curl
lume_set() {
    local vm_name="$1"
    local storage="$2"
    local cpu="${3:-4}"
    local memory="${4:-8192}"
    local display="${5:-1024x768}"
    
    local api_host="${LUME_API_HOST:-host.docker.internal}"
    local api_port="${LUME_API_PORT:-7777}"
    
    # Handle memory format for the API
    if [[ "$memory" == *"GB"* ]]; then
        # Already in GB format, keep as is
        :  # No-op
    elif [[ "$memory" =~ ^[0-9]+$ ]]; then
        # If memory is a simple number, assume MB and convert to GB
        memory="$(awk "BEGIN { printf \"%.1f\", $memory/1024 }")GB"
    fi
    
    # Only show memory formatting debug in debug mode
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "[DEBUG] Formatted memory value: $memory"
    fi
    
    # Store response to conditionally show based on debug mode
    local response=$(curl --connect-timeout 6000 \
      --max-time 5000 \
      -s \
      -X PATCH \
      -H "Content-Type: application/json" \
      -d "{
        \"cpu\": $cpu,
        \"memory\": \"$memory\",
        \"display\": \"$display\",
        \"storage\": \"$storage\"
      }" \
      "http://${api_host}:${api_port}/lume/vms/${vm_name}")
      
    # Only show response in debug mode
    if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
        echo "$response"
    fi
}

stop_vm() {
    local in_cleanup=${1:-false} # Optional first argument to indicate if called from cleanup trap
    echo "Stopping VM '$VM_NAME'..."
    STORAGE_PATH="$HOST_STORAGE_PATH"
    
    # Only show storage path in debug mode
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "STORAGE_PATH: $STORAGE_PATH"
    fi
    
    VM_INFO=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}")
    vm_status=$(extract_json_field "status" "$VM_INFO")

    if [ "$vm_status" == "running" ]; then
        lume_stop "$VM_NAME" "$STORAGE_PATH"
    elif [ "$vm_status" == "stopped" ]; then
        echo "VM '$VM_NAME' is already stopped."
    elif [ "$in_cleanup" = true ]; then
        # If we are in the cleanup trap and status is unknown or VM not found, 
        # still attempt a stop just in case.
        echo "VM status is unknown ('$vm_status') or VM not found during cleanup. Attempting stop anyway."
        lume_stop "$VM_NAME" "$STORAGE_PATH"
        sleep 5
        echo "VM '$VM_NAME' stop command issued as a precaution."
    else
        echo "VM status is unknown ('$vm_status') or VM not found. Not attempting stop."
    fi
}

is_vm_running() {
    # Check VM status using the API function
    local vm_info
    vm_info=$(lume_get "$VM_NAME" "$HOST_STORAGE_PATH")
    if [[ $vm_info == *'"status" : "running"'* ]]; then
        return 0 # Running
    else
        return 1 # Not running or doesn't exist
    fi
    # lume ls | grep -q "$VM_NAME" # Old CLI check
}

# Stop VM with storage location specified using curl
lume_stop() {
    local vm_name="$1"
    local storage="$2"
    
    local api_host="${LUME_API_HOST:-host.docker.internal}"
    local api_port="${LUME_API_PORT:-7777}"
    
    # Only log in debug mode
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "Stopping VM $vm_name..."
    fi
    
    # Execute command and capture response
    local response
    if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
        # Show output in debug mode
        response=$(curl --connect-timeout 6000 \
          --max-time 5000 \
          -X POST \
          -H "Content-Type: application/json" \
          -d '{"storage":"'$storage'"}' \
          "http://${api_host}:${api_port}/lume/vms/${vm_name}/stop")
        echo "$response"
    else
        # Run silently in normal mode
        response=$(curl --connect-timeout 6000 \
          --max-time 5000 \
          -s \
          -X POST \
          -H "Content-Type: application/json" \
          -d '{"storage":"'$storage'"}' \
          "http://${api_host}:${api_port}/lume/vms/${vm_name}/stop")
    fi
}

# Pull a VM image using curl
lume_pull() {
    local image="$1"      # Image name with tag
    local vm_name="$2"    # Name for the new VM
    local storage="$3"    # Storage location
    local registry="${4:-ghcr.io}"  # Registry, default is ghcr.io
    local organization="${5:-trycua}" # Organization, default is trycua
    
    local api_host="${LUME_API_HOST:-host.docker.internal}"
    local api_port="${LUME_API_PORT:-7777}"
    
    # Mark that pull is in progress for interrupt handling
    export PULL_IN_PROGRESS=1
    
    # Only log full details in debug mode
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "Pulling image $image from $registry/$organization..."
    else
        echo "Pulling image $image..."
    fi
    
    # Inform users how to check pull progress
    echo "You can check the pull progress using: lume logs -f"
    
    # Pull image via API and capture response
    local response
    if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
        # Show full response in debug mode - no timeout limits
        response=$(curl \
          -X POST \
          -H "Content-Type: application/json" \
          -d "{
            \"image\": \"$image\",
            \"name\": \"$vm_name\",
            \"registry\": \"$registry\",
            \"organization\": \"$organization\",
            \"storage\": \"$storage\"
          }" \
          "http://${api_host}:${api_port}/lume/pull")
        echo "$response"
    else
        # Run silently in normal mode - no timeout limits
        response=$(curl \
          -s \
          -X POST \
          -H "Content-Type: application/json" \
          -d "{
            \"image\": \"$image\",
            \"name\": \"$vm_name\",
            \"registry\": \"$registry\",
            \"organization\": \"$organization\",
            \"storage\": \"$storage\"
          }" \
          "http://${api_host}:${api_port}/lume/pull")
    fi
    
    # Unset pull in progress flag
    export PULL_IN_PROGRESS=0
}


# Run VM with VNC client started and shared directory using curl
lume_run() {
    # Parse args
    local shared_dir=""
    local storage=""
    local vm_name="lume_vm"
    local no_display=true
    while [[ $# -gt 0 ]]; do
        case $1 in
            --shared-dir=*)
                shared_dir="${1#*=}"
                shift
                ;;
            --storage)
                storage="$2"
                shift 2
                ;;
            --no-display)
                no_display=true
                shift
                ;;
            *)
                # Assume last arg is VM name if not an option
                vm_name="$1"
                shift
                ;;
        esac
    done
    
    local api_host="${LUME_API_HOST:-host.docker.internal}"
    local api_port="${LUME_API_PORT:-7777}"

    # Only log in debug mode
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "Running VM $vm_name..."
    fi
    
    # Build the JSON body dynamically based on what's provided
    local json_body="{\"noDisplay\": true"
    
    # Only include shared directories if shared_dir is provided
    if [[ -n "$shared_dir" ]]; then
        json_body+=", \"sharedDirectories\": [{\"hostPath\": \"$shared_dir\", \"readOnly\": false}]"
    fi
    
    # Only include storage if it's provided
    if [[ -n "$storage" ]]; then
        json_body+=", \"storage\": \"$storage\""
    fi
    
    # Add recovery mode (always false)
    json_body+=", \"recoveryMode\": false}"

    # Execute the command and store the response
    local response
    if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
        # Show response in debug mode
        response=$(curl --connect-timeout 6000 \
          --max-time 5000 \
          -X POST \
          -H 'Content-Type: application/json' \
          -d "$json_body" \
          http://${api_host}:${api_port}/lume/vms/$vm_name/run)
        echo "$response"
    else
        # Run silently in normal mode
        response=$(curl --connect-timeout 6000 \
          --max-time 5000 \
          -s \
          -X POST \
          -H 'Content-Type: application/json' \
          -d "$json_body" \
          http://${api_host}:${api_port}/lume/vms/$vm_name/run)
    fi
}

# Delete a VM using curl
lume_delete() {
    local vm_name="$1"
    local storage="$2"
    
    local api_host="${LUME_API_HOST:-host.docker.internal}"
    local api_port="${LUME_API_PORT:-7777}"
    
    # URL encode the storage path for the query parameter
    # Replace special characters with their URL encoded equivalents
    local encoded_storage=$(echo "$storage" | sed 's/\//%2F/g' | sed 's/ /%20/g' | sed 's/:/%3A/g')
    
    # Construct API URL with encoded storage parameter
    local api_url="http://${api_host}:${api_port}/lume/vms/${vm_name}?storage=${encoded_storage}"
    
    # Only log in debug mode
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "Deleting VM $vm_name from storage $storage..."
    fi
    
    # Execute command and capture response
    local response
    if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
        # Show output in debug mode
        response=$(curl --connect-timeout 6000 \
          --max-time 5000 \
          -X DELETE \
          "$api_url")
        echo "$response"
    else
        # Run silently in normal mode
        response=$(curl --connect-timeout 6000 \
          --max-time 5000 \
          -s \
          -X DELETE \
          "$api_url")
    fi
}
```

--------------------------------------------------------------------------------
/libs/python/agent/benchmarks/utils.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Shared utilities for ScreenSpot-Pro benchmarking and interactive testing.
"""

import dotenv

dotenv.load_dotenv()

import asyncio
import base64
import gc
import os
import statistics
import subprocess as sp
import sys
from datetime import datetime
from io import BytesIO
from typing import List, Optional, Tuple, Union

import torch
from PIL import Image, ImageDraw
from tqdm import tqdm

# Add parent directory to path for imports
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from agent.agent import ComputerAgent
from models.base import ModelProtocol


def get_gpu_memory() -> List[int]:
    """
    Get GPU memory usage using nvidia-smi.

    Returns:
        List of free memory values in MB for each GPU
    """
    try:
        command = "nvidia-smi --query-gpu=memory.free --format=csv"
        memory_free_info = sp.check_output(command.split()).decode("ascii").split("\n")[:-1][1:]
        memory_free_values = [int(x.split()[0]) for i, x in enumerate(memory_free_info)]
        return memory_free_values
    except (sp.CalledProcessError, FileNotFoundError, IndexError):
        # Fallback to torch if nvidia-smi is not available
        if torch.cuda.is_available():
            device = torch.cuda.current_device()
            total = torch.cuda.get_device_properties(device).total_memory / 1024 / 1024
            reserved = torch.cuda.memory_reserved(device) / 1024 / 1024
            return [int(total - reserved)]
        return [0]


def get_vram_usage() -> dict:
    """
    Get current VRAM usage statistics.

    Returns:
        Dictionary with VRAM usage info (in MB)
    """
    if torch.cuda.is_available():
        device = torch.cuda.current_device()
        allocated = torch.cuda.memory_allocated(device) / 1024 / 1024  # Convert to MB
        reserved = torch.cuda.memory_reserved(device) / 1024 / 1024  # Convert to MB
        total = torch.cuda.get_device_properties(device).total_memory / 1024 / 1024
        return {
            "allocated_mb": allocated,
            "reserved_mb": reserved,
            "total_mb": total,
            "free_mb": total - reserved,
        }
    else:
        return {"allocated_mb": 0.0, "reserved_mb": 0.0, "total_mb": 0.0, "free_mb": 0.0}


def get_available_models() -> List[Union[str, ModelProtocol]]:
    """
    Get list of available models for testing.

    Returns:
        List of model strings and model classes
    """
    local_provider = "huggingface-local/"  # Options: huggingface-local/ or mlx/

    # from models.gta1 import GTA1Model

    models = [
        # === ComputerAgent model strings ===
        "openai/computer-use-preview",
        "anthropic/claude-opus-4-20250514",
        # f"{local_provider}HelloKKMe/GTA1-7B",
        # f"{local_provider}HelloKKMe/GTA1-32B",
        "openai/computer-use-preview+openai/gpt-4o-mini",
        "anthropic/claude-opus-4-20250514+openai/gpt-4o-mini",
        # === Reference model classes ===
        # GTA1Model("HelloKKMe/GTA1-7B"),
        # GTA1Model("HelloKKMe/GTA1-32B"),
    ]

    return models


def is_click_in_bbox(click_coords: Optional[Tuple[int, int]], bbox: List[int]) -> bool:
    """
    Check if click coordinates are within the bounding box.

    Args:
        click_coords: (x, y) coordinates or None
        bbox: [x1, y1, x2, y2] bounding box

    Returns:
        True if click is within bbox, False otherwise
    """
    if click_coords is None:
        return False

    x, y = click_coords
    x1, y1, x2, y2 = bbox

    return x1 <= x <= x2 and y1 <= y <= y2


def image_to_base64(image: Image.Image) -> str:
    """
    Convert PIL Image to base64 string.

    Args:
        image: PIL Image

    Returns:
        Base64 encoded image string
    """
    buffered = BytesIO()
    image.save(buffered, format="PNG")
    return base64.b64encode(buffered.getvalue()).decode()


class ModelWrapper:
    """
    Wrapper to provide unified interface for both ComputerAgent and custom models.
    """

    def __init__(self, model: Union[str, ModelProtocol]):
        self.model = model
        self.is_computer_agent = isinstance(model, str)
        self.agent: Optional[ComputerAgent] = None
        self.vram_usage_history: List[float] = []  # Track VRAM usage over time

        if self.is_computer_agent:
            self.model_name = str(model)
        else:
            self.model_name = (
                f"{model.__class__.__name__}('{getattr(model, 'model_name', 'unknown')}')"
            )

    async def load_model(self) -> None:
        """Load the model."""
        if self.is_computer_agent:
            self.agent = ComputerAgent(model=str(self.model))
        else:
            await self.model.load_model()  # type: ignore

        # Record initial VRAM usage after loading
        vram_info = get_vram_usage()
        self.vram_usage_history.append(vram_info["allocated_mb"])

    async def unload_model(self) -> None:
        """Unload the model."""
        if not self.is_computer_agent:
            await self.model.unload_model()  # type: ignore
        else:
            del self.agent
            self.agent = None
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

        # Record VRAM usage after unloading
        vram_info = get_vram_usage()
        self.vram_usage_history.append(vram_info["allocated_mb"])

    def get_vram_stats(self) -> dict:
        """Get VRAM usage statistics for this model."""
        if not self.vram_usage_history:
            return {"max_mb": 0.0, "avg_mb": 0.0}

        return {
            "max_mb": max(self.vram_usage_history),
            "avg_mb": sum(self.vram_usage_history) / len(self.vram_usage_history),
        }

    async def predict_click(
        self, image: Image.Image, instruction: str
    ) -> Optional[Tuple[int, int]]:
        """Predict click coordinates."""
        # Record VRAM usage before prediction
        vram_info = get_vram_usage()
        self.vram_usage_history.append(vram_info["allocated_mb"])

        if self.is_computer_agent:
            if self.agent is None:
                await self.load_model()

            if self.agent is not None:
                image_b64 = image_to_base64(image)
                result = await self.agent.predict_click(
                    instruction=instruction, image_b64=image_b64
                )

                # Record VRAM usage after prediction
                vram_info = get_vram_usage()
                self.vram_usage_history.append(vram_info["allocated_mb"])

                return result
            return None
        else:
            result = await self.model.predict_click(image, instruction)  # type: ignore

            # Record VRAM usage after prediction
            vram_info = get_vram_usage()
            self.vram_usage_history.append(vram_info["allocated_mb"])

            return result


def save_results_to_markdown(
    all_results: List[dict],
    output_file: str = "screenspot_pro_results.md",
    title: str = "ScreenSpot-Pro Benchmark Results",
) -> None:
    """
    Save evaluation results to a markdown table.

    Args:
        all_results: List of evaluation results for each model
        output_file: Output markdown file path
    """
    with open(output_file, "w", encoding="utf-8") as f:
        f.write(f"# {title}\n\n")
        f.write(f"**Evaluation Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")

        # Summary table
        f.write("## Summary\n\n")
        f.write(
            "| Model | Total Samples | Correct | Errors | Accuracy | Error Rate | Avg Time (s) | Median Time (s) | Time Range (s) | VRAM Max (GB) | VRAM Avg (GB) |\n"
        )
        f.write(
            "|-------|---------------|---------|--------|----------|------------|--------------|-----------------|----------------|---------------|---------------|\n"
        )

        for result in all_results:
            model_name = result["model_name"]
            total = result["total_samples"]
            correct = result["correct_predictions"]
            errors = result["failed_predictions"]
            accuracy = result["accuracy"] * 100
            error_rate = result["failure_rate"] * 100
            avg_time = result.get("avg_prediction_time", 0.0)
            median_time = result.get("median_prediction_time", 0.0)
            min_time = result.get("min_prediction_time", 0.0)
            max_time = result.get("max_prediction_time", 0.0)
            time_range = f"{min_time:.2f} - {max_time:.2f}"
            vram_max = result.get("vram_max_mb", 0.0) / 1024
            vram_avg = result.get("vram_avg_mb", 0.0) / 1024

            f.write(
                f"| {model_name} | {total} | {correct} | {errors} | {accuracy:.2f}% | {error_rate:.2f}% | {avg_time:.2f} | {median_time:.2f} | {time_range} | {vram_max:.1f} | {vram_avg:.1f} |\n"
            )

        # Detailed results for each model
        for result in all_results:
            f.write(f"\n## {result['model_name']} - Detailed Results\n\n")
            f.write(
                "| Sample Index | Instruction | BBox | Predicted | Correct | Error | Time (s) |\n"
            )
            f.write("|-----------|-------------|------|-----------|---------|-------|----------|\n")

            for sample_result in result["results"][:10]:  # Show first 10 samples
                sample_idx = sample_result["sample_idx"]
                instruction = (
                    sample_result["instruction"][:50] + "..."
                    if len(sample_result["instruction"]) > 50
                    else sample_result["instruction"]
                )
                bbox = str(sample_result["bbox"])
                predicted = (
                    str(sample_result["predicted_coords"])
                    if sample_result["predicted_coords"]
                    else "None"
                )
                correct = "PASS" if sample_result["is_correct"] else "FAIL"
                error = "YES" if sample_result["failed"] else "NO"
                pred_time = sample_result.get("prediction_time", 0.0)

                f.write(
                    f"| {sample_idx} | {instruction} | {bbox} | {predicted} | {correct} | {error} | {pred_time:.2f} |\n"
                )

            if len(result["results"]) > 10:
                f.write(f"\n*Showing first 10 of {len(result['results'])} samples*\n")

    print(f"\nResults saved to: {output_file}")


def save_visualizations(all_results: List[dict], samples, output_dir: str = "output") -> None:
    """
    Save visualizations of predicted coordinates vs bboxes to an output folder.

    Args:
        all_results: List of evaluation results for each model
        samples: List of sample dicts with image, bbox, instruction keys
        output_dir: Output directory path
    """
    os.makedirs(output_dir, exist_ok=True)

    for result in all_results:
        model_name = result["model_name"].replace("/", "_").replace("\\", "_")
        model_dir = os.path.join(output_dir, model_name)
        os.makedirs(model_dir, exist_ok=True)

        print(f"Saving visualizations for {result['model_name']}...")

        # Save first 10 samples for visualization
        for i, sample_result in enumerate(
            tqdm(result["results"][:10], desc=f"Saving {model_name} visualizations")
        ):
            # Get sample data using index
            sample_idx = sample_result["sample_idx"]

            if sample_idx < len(samples):
                sample = samples[sample_idx]
                image = sample["image"].copy()  # Make a copy to avoid modifying original
            else:
                print(f"Warning: Could not find sample at index {sample_idx}")
                continue

            bbox = sample_result["bbox"]
            predicted_coords = sample_result["predicted_coords"]
            is_correct = sample_result["is_correct"]

            # Draw on image
            draw = ImageDraw.Draw(image)

            # Draw bounding box (ground truth) in green
            x1, y1, x2, y2 = bbox
            draw.rectangle([x1, y1, x2, y2], outline="green", width=3)
            draw.text((x1, y1 - 20), "Ground Truth", fill="green")

            # Draw predicted click in red or blue
            if predicted_coords is not None:
                px, py = predicted_coords
                color = "blue" if is_correct else "red"
                # Draw crosshair
                crosshair_size = 15
                draw.line(
                    [(px - crosshair_size, py), (px + crosshair_size, py)], fill=color, width=3
                )
                draw.line(
                    [(px, py - crosshair_size), (px, py + crosshair_size)], fill=color, width=3
                )
                draw.text((px + 10, py - 20), f"Predicted ({px},{py})", fill=color)

            # Add status text
            status = "CORRECT" if is_correct else "INCORRECT"
            status_color = "blue" if is_correct else "red"
            draw.text((10, 10), f"Status: {status}", fill=status_color)
            draw.text(
                (10, 30), f"Instruction: {sample_result['instruction'][:50]}...", fill="black"
            )

            # Save image
            filename = f"sample_{i+1:02d}_idx{sample_idx}_{status.lower()}.png"
            filepath = os.path.join(model_dir, filename)
            image.save(filepath)

        print(f"Visualizations saved to: {model_dir}")


def save_prediction_visualization(
    image: Image.Image,
    instruction: str,
    predictions: List[dict],
    output_file: str = "interactive_prediction.png",
) -> None:
    """
    Save visualization of multiple model predictions on a single image.

    Args:
        image: PIL Image to visualize
        instruction: Instruction text
        predictions: List of prediction dicts with keys: model_name, coords, error
        output_file: Output file path
    """
    # Create a copy of the image
    vis_image = image.copy()
    draw = ImageDraw.Draw(vis_image)

    # Colors for different models
    colors = ["red", "blue", "orange", "purple", "brown", "pink", "gray", "olive"]

    # Draw predictions
    for i, pred in enumerate(predictions):
        color = colors[i % len(colors)]
        model_name = pred["model_name"]
        coords = pred.get("coords")
        error = pred.get("error")

        if coords is not None:
            px, py = coords
            # Draw crosshair
            crosshair_size = 20
            draw.line([(px - crosshair_size, py), (px + crosshair_size, py)], fill=color, width=4)
            draw.line([(px, py - crosshair_size), (px, py + crosshair_size)], fill=color, width=4)
            # Draw model name
            draw.text((px + 15, py + 15), f"{model_name}: ({px},{py})", fill=color)
        else:
            # Draw error text
            draw.text((10, 50 + i * 20), f"{model_name}: ERROR - {error}", fill=color)

    # Add instruction at the top
    draw.text((10, 10), f"Instruction: {instruction}", fill="black")

    # Save image
    vis_image.save(output_file)
    print(f"Prediction visualization saved to: {output_file}")


def take_screenshot() -> Image.Image:
    """
    Take a screenshot of the current screen.

    Returns:
        PIL Image of the screenshot
    """
    try:
        import pyautogui

        screenshot = pyautogui.screenshot()
        return screenshot
    except ImportError:
        print("pyautogui not installed. Please install it with: pip install pyautogui")
        raise
    except Exception as e:
        print(f"Error taking screenshot: {e}")
        raise

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/callbacks/trajectory_saver.py:
--------------------------------------------------------------------------------

```python
"""
Trajectory saving callback handler for ComputerAgent.
"""

import base64
import io
import json
import os
import uuid
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, override

from PIL import Image, ImageDraw

from .base import AsyncCallbackHandler


def sanitize_image_urls(data: Any) -> Any:
    """
    Recursively search for 'image_url' keys and set their values to '[omitted]'.

    Args:
        data: Any data structure (dict, list, or primitive type)

    Returns:
        A deep copy of the data with all 'image_url' values replaced with '[omitted]'
    """
    if isinstance(data, dict):
        # Create a copy of the dictionary
        sanitized = {}
        for key, value in data.items():
            if key == "image_url":
                sanitized[key] = "[omitted]"
            else:
                # Recursively sanitize the value
                sanitized[key] = sanitize_image_urls(value)
        return sanitized

    elif isinstance(data, list):
        # Recursively sanitize each item in the list
        return [sanitize_image_urls(item) for item in data]

    else:
        # For primitive types (str, int, bool, None, etc.), return as-is
        return data


def extract_computer_call_outputs(
    items: List[Dict[str, Any]], screenshot_dir: Optional[Path]
) -> List[Dict[str, Any]]:
    """
    Save any base64-encoded screenshots from computer_call_output entries to files and
    replace their image_url with the saved file path when a call_id is present.

    Only operates if screenshot_dir is provided and exists; otherwise returns items unchanged.

    Args:
        items: List of message/result dicts potentially containing computer_call_output entries
        screenshot_dir: Directory to write screenshots into

    Returns:
        A new list with updated image_url fields when applicable.
    """
    if not items:
        return items
    if not screenshot_dir or not screenshot_dir.exists():
        return items

    updated: List[Dict[str, Any]] = []
    for item in items:
        # work on a shallow copy; deep copy nested 'output' if we modify it
        msg = dict(item)
        try:
            if msg.get("type") == "computer_call_output":
                call_id = msg.get("call_id")
                output = msg.get("output", {})
                image_url = output.get("image_url")
                if call_id and isinstance(image_url, str) and image_url.startswith("data:"):
                    # derive extension from MIME type e.g. data:image/png;base64,
                    try:
                        ext = image_url.split(";", 1)[0].split("/")[-1]
                        if not ext:
                            ext = "png"
                    except Exception:
                        ext = "png"
                    out_path = screenshot_dir / f"{call_id}.{ext}"
                    # write file if it doesn't exist
                    if not out_path.exists():
                        try:
                            b64_payload = image_url.split(",", 1)[1]
                            img_bytes = base64.b64decode(b64_payload)
                            out_path.parent.mkdir(parents=True, exist_ok=True)
                            with open(out_path, "wb") as f:
                                f.write(img_bytes)
                        except Exception:
                            # if anything fails, skip modifying this message
                            pass
                    # update image_url to file path
                    new_output = dict(output)
                    new_output["image_url"] = str(out_path)
                    msg["output"] = new_output
        except Exception:
            # do not block on malformed entries; keep original
            pass
        updated.append(msg)
    return updated


class TrajectorySaverCallback(AsyncCallbackHandler):
    """
    Callback handler that saves agent trajectories to disk.

    Saves each run as a separate trajectory with unique ID, and each turn
    within the trajectory gets its own folder with screenshots and responses.
    """

    def __init__(
        self, trajectory_dir: str, reset_on_run: bool = True, screenshot_dir: Optional[str] = None
    ):
        """
        Initialize trajectory saver.

        Args:
            trajectory_dir: Base directory to save trajectories
            reset_on_run: If True, reset trajectory_id/turn/artifact on each run.
                         If False, continue using existing trajectory_id if set.
        """
        self.trajectory_dir = Path(trajectory_dir)
        self.trajectory_id: Optional[str] = None
        self.current_turn: int = 0
        self.current_artifact: int = 0
        self.model: Optional[str] = None
        self.total_usage: Dict[str, Any] = {}
        self.reset_on_run = reset_on_run
        # Optional directory to store extracted screenshots from metadata/new_items
        self.screenshot_dir: Optional[Path] = Path(screenshot_dir) if screenshot_dir else None

        # Ensure trajectory directory exists
        self.trajectory_dir.mkdir(parents=True, exist_ok=True)

    def _get_turn_dir(self) -> Path:
        """Get the directory for the current turn."""
        if not self.trajectory_id:
            raise ValueError("Trajectory not initialized - call _on_run_start first")

        # format: trajectory_id/turn_000
        turn_dir = self.trajectory_dir / self.trajectory_id / f"turn_{self.current_turn:03d}"
        turn_dir.mkdir(parents=True, exist_ok=True)
        return turn_dir

    def _save_artifact(self, name: str, artifact: Union[str, bytes, Dict[str, Any]]) -> None:
        """Save an artifact to the current turn directory."""
        turn_dir = self._get_turn_dir()
        if isinstance(artifact, bytes):
            # format: turn_000/0000_name.png
            artifact_filename = f"{self.current_artifact:04d}_{name}"
            artifact_path = turn_dir / f"{artifact_filename}.png"
            with open(artifact_path, "wb") as f:
                f.write(artifact)
        else:
            # format: turn_000/0000_name.json
            artifact_filename = f"{self.current_artifact:04d}_{name}"
            artifact_path = turn_dir / f"{artifact_filename}.json"
            # add created_at
            if isinstance(artifact, dict):
                artifact = artifact.copy()
                artifact["created_at"] = str(uuid.uuid1().time)
            with open(artifact_path, "w") as f:
                json.dump(sanitize_image_urls(artifact), f, indent=2)
        self.current_artifact += 1

    def _update_usage(self, usage: Dict[str, Any]) -> None:
        """Update total usage statistics."""

        def add_dicts(target: Dict[str, Any], source: Dict[str, Any]) -> None:
            for key, value in source.items():
                if isinstance(value, dict):
                    if key not in target:
                        target[key] = {}
                    add_dicts(target[key], value)
                else:
                    if key not in target:
                        target[key] = 0
                    target[key] += value

        add_dicts(self.total_usage, usage)

    @override
    async def on_run_start(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]]) -> None:
        """Initialize trajectory tracking for a new run."""
        model = kwargs.get("model", "unknown")

        # Only reset trajectory state if reset_on_run is True or no trajectory exists
        if self.reset_on_run or not self.trajectory_id:
            model_name_short = model.split("+")[-1].split("/")[-1].lower()[:16]
            if "+" in model:
                model_name_short = model.split("+")[0].lower()[:4] + "_" + model_name_short
            # strip non-alphanumeric characters from model_name_short
            model_name_short = "".join(c for c in model_name_short if c.isalnum() or c == "_")

            # id format: yyyy-mm-dd_model_hhmmss_uuid[:4]
            now = datetime.now()
            self.trajectory_id = f"{now.strftime('%Y-%m-%d')}_{model_name_short}_{now.strftime('%H%M%S')}_{str(uuid.uuid4())[:4]}"
            self.current_turn = 0
            self.current_artifact = 0
            self.model = model
            self.total_usage = {}

            # Create trajectory directory
            trajectory_path = self.trajectory_dir / self.trajectory_id
            trajectory_path.mkdir(parents=True, exist_ok=True)

            # Save trajectory metadata (optionally extract screenshots to screenshot_dir)
            kwargs_to_save = kwargs.copy()
            try:
                if "messages" in kwargs_to_save:
                    kwargs_to_save["messages"] = extract_computer_call_outputs(
                        kwargs_to_save["messages"], self.screenshot_dir
                    )
            except Exception:
                # If extraction fails, fall back to original messages
                pass
            metadata = {
                "trajectory_id": self.trajectory_id,
                "created_at": str(uuid.uuid1().time),
                "status": "running",
                "kwargs": kwargs_to_save,
            }

            with open(trajectory_path / "metadata.json", "w") as f:
                json.dump(metadata, f, indent=2)
        else:
            # Continue with existing trajectory - just update model if needed
            self.model = model

    @override
    async def on_run_end(
        self,
        kwargs: Dict[str, Any],
        old_items: List[Dict[str, Any]],
        new_items: List[Dict[str, Any]],
    ) -> None:
        """Finalize run tracking by updating metadata with completion status, usage, and new items."""
        if not self.trajectory_id:
            return

        # Update metadata with completion status, total usage, and new items
        trajectory_path = self.trajectory_dir / self.trajectory_id
        metadata_path = trajectory_path / "metadata.json"

        # Read existing metadata
        if metadata_path.exists():
            with open(metadata_path, "r") as f:
                metadata = json.load(f)
        else:
            metadata = {}

        # Update metadata with completion info
        # Optionally extract screenshots from new_items before persisting
        new_items_to_save = new_items
        try:
            new_items_to_save = extract_computer_call_outputs(new_items, self.screenshot_dir)
        except Exception:
            pass

        metadata.update(
            {
                "status": "completed",
                "completed_at": str(uuid.uuid1().time),
                "total_usage": self.total_usage,
                "new_items": new_items_to_save,
                "total_turns": self.current_turn,
            }
        )

        # Save updated metadata
        with open(metadata_path, "w") as f:
            json.dump(metadata, f, indent=2)

    @override
    async def on_api_start(self, kwargs: Dict[str, Any]) -> None:
        if not self.trajectory_id:
            return

        self._save_artifact("api_start", {"kwargs": kwargs})

    @override
    async def on_api_end(self, kwargs: Dict[str, Any], result: Any) -> None:
        """Save API call result."""
        if not self.trajectory_id:
            return

        self._save_artifact("api_result", {"kwargs": kwargs, "result": result})

    @override
    async def on_screenshot(self, screenshot: Union[str, bytes], name: str = "screenshot") -> None:
        """Save a screenshot."""
        if isinstance(screenshot, str):
            screenshot = base64.b64decode(screenshot)
        self._save_artifact(name, screenshot)

    @override
    async def on_usage(self, usage: Dict[str, Any]) -> None:
        """Called when usage information is received."""
        self._update_usage(usage)

    @override
    async def on_responses(self, kwargs: Dict[str, Any], responses: Dict[str, Any]) -> None:
        """Save responses to the current turn directory and update usage statistics."""
        if not self.trajectory_id:
            return

        # Save responses
        turn_dir = self._get_turn_dir()
        response_data = {
            "timestamp": str(uuid.uuid1().time),
            "model": self.model,
            "kwargs": kwargs,
            "response": responses,
        }

        self._save_artifact("agent_response", response_data)

        # Increment turn counter
        self.current_turn += 1

    def _draw_crosshair_on_image(self, image_bytes: bytes, x: int, y: int) -> bytes:
        """
        Draw a red dot and crosshair at the specified coordinates on the image.

        Args:
            image_bytes: The original image as bytes
            x: X coordinate for the crosshair
            y: Y coordinate for the crosshair

        Returns:
            Modified image as bytes with red dot and crosshair
        """
        # Open the image
        image = Image.open(io.BytesIO(image_bytes))
        draw = ImageDraw.Draw(image)

        # Draw crosshair lines (red, 2px thick)
        crosshair_size = 20
        line_width = 2
        color = "red"

        # Horizontal line
        draw.line([(x - crosshair_size, y), (x + crosshair_size, y)], fill=color, width=line_width)
        # Vertical line
        draw.line([(x, y - crosshair_size), (x, y + crosshair_size)], fill=color, width=line_width)

        # Draw center dot (filled circle)
        dot_radius = 3
        draw.ellipse(
            [(x - dot_radius, y - dot_radius), (x + dot_radius, y + dot_radius)], fill=color
        )

        # Convert back to bytes
        output = io.BytesIO()
        image.save(output, format="PNG")
        return output.getvalue()

    @override
    async def on_computer_call_end(
        self, item: Dict[str, Any], result: List[Dict[str, Any]]
    ) -> None:
        """
        Called when a computer call has completed.
        Saves screenshots and computer call output.
        """
        if not self.trajectory_id:
            return

        self._save_artifact("computer_call_result", {"item": item, "result": result})

        # Check if action has x/y coordinates and there's a screenshot in the result
        action = item.get("action", {})
        if "x" in action and "y" in action:
            # Look for screenshot in the result
            for result_item in result:
                if (
                    result_item.get("type") == "computer_call_output"
                    and result_item.get("output", {}).get("type") == "input_image"
                ):

                    image_url = result_item["output"]["image_url"]

                    # Extract base64 image data
                    if image_url.startswith("data:image/"):
                        # Format: data:image/png;base64,<base64_data>
                        base64_data = image_url.split(",", 1)[1]
                    else:
                        # Assume it's just base64 data
                        base64_data = image_url

                    try:
                        # Decode the image
                        image_bytes = base64.b64decode(base64_data)

                        # Draw crosshair at the action coordinates
                        annotated_image = self._draw_crosshair_on_image(
                            image_bytes, int(action["x"]), int(action["y"])
                        )

                        # Save as screenshot_action
                        self._save_artifact("screenshot_action", annotated_image)

                    except Exception as e:
                        # If annotation fails, just log and continue
                        print(f"Failed to annotate screenshot: {e}")

                    break  # Only process the first screenshot found

        # Increment turn counter
        self.current_turn += 1

```
Page 11/20FirstPrevNextLast