This is page 19 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── scripts
│ │ ├── get_pyproject_version.py
│ │ └── tests
│ │ ├── __init__.py
│ │ ├── README.md
│ │ └── test_get_pyproject_version.py
│ └── workflows
│ ├── bump-version.yml
│ ├── ci-lume.yml
│ ├── docker-publish-cua-linux.yml
│ ├── docker-publish-cua-windows.yml
│ ├── docker-publish-kasm.yml
│ ├── docker-publish-xfce.yml
│ ├── docker-reusable-publish.yml
│ ├── link-check.yml
│ ├── lint.yml
│ ├── npm-publish-cli.yml
│ ├── npm-publish-computer.yml
│ ├── npm-publish-core.yml
│ ├── publish-lume.yml
│ ├── pypi-publish-agent.yml
│ ├── pypi-publish-computer-server.yml
│ ├── pypi-publish-computer.yml
│ ├── pypi-publish-core.yml
│ ├── pypi-publish-mcp-server.yml
│ ├── pypi-publish-som.yml
│ ├── pypi-reusable-publish.yml
│ ├── python-tests.yml
│ ├── test-cua-models.yml
│ └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│ ├── docs.code-workspace
│ ├── extensions.json
│ ├── launch.json
│ ├── libs-ts.code-workspace
│ ├── lume.code-workspace
│ ├── lumier.code-workspace
│ ├── py.code-workspace
│ └── settings.json
├── blog
│ ├── app-use.md
│ ├── assets
│ │ ├── composite-agents.png
│ │ ├── docker-ubuntu-support.png
│ │ ├── hack-booth.png
│ │ ├── hack-closing-ceremony.jpg
│ │ ├── hack-cua-ollama-hud.jpeg
│ │ ├── hack-leaderboard.png
│ │ ├── hack-the-north.png
│ │ ├── hack-winners.jpeg
│ │ ├── hack-workshop.jpeg
│ │ ├── hud-agent-evals.png
│ │ └── trajectory-viewer.jpeg
│ ├── bringing-computer-use-to-the-web.md
│ ├── build-your-own-operator-on-macos-1.md
│ ├── build-your-own-operator-on-macos-2.md
│ ├── cloud-windows-ga-macos-preview.md
│ ├── composite-agents.md
│ ├── computer-use-agents-for-growth-hacking.md
│ ├── cua-hackathon.md
│ ├── cua-playground-preview.md
│ ├── cua-vlm-router.md
│ ├── hack-the-north.md
│ ├── hud-agent-evals.md
│ ├── human-in-the-loop.md
│ ├── introducing-cua-cli.md
│ ├── introducing-cua-cloud-containers.md
│ ├── lume-to-containerization.md
│ ├── neurips-2025-cua-papers.md
│ ├── sandboxed-python-execution.md
│ ├── training-computer-use-models-trajectories-1.md
│ ├── trajectory-viewer.md
│ ├── ubuntu-docker-support.md
│ └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│ ├── .env.example
│ ├── .gitignore
│ ├── content
│ │ └── docs
│ │ ├── agent-sdk
│ │ │ ├── agent-loops.mdx
│ │ │ ├── benchmarks
│ │ │ │ ├── index.mdx
│ │ │ │ ├── interactive.mdx
│ │ │ │ ├── introduction.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── osworld-verified.mdx
│ │ │ │ ├── screenspot-pro.mdx
│ │ │ │ └── screenspot-v2.mdx
│ │ │ ├── callbacks
│ │ │ │ ├── agent-lifecycle.mdx
│ │ │ │ ├── cost-saving.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── logging.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── pii-anonymization.mdx
│ │ │ │ └── trajectories.mdx
│ │ │ ├── chat-history.mdx
│ │ │ ├── custom-tools.mdx
│ │ │ ├── customizing-computeragent.mdx
│ │ │ ├── integrations
│ │ │ │ ├── hud.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── observability.mdx
│ │ │ ├── mcp-server
│ │ │ │ ├── client-integrations.mdx
│ │ │ │ ├── configuration.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── llm-integrations.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── tools.mdx
│ │ │ │ └── usage.mdx
│ │ │ ├── message-format.mdx
│ │ │ ├── meta.json
│ │ │ ├── migration-guide.mdx
│ │ │ ├── prompt-caching.mdx
│ │ │ ├── supported-agents
│ │ │ │ ├── composed-agents.mdx
│ │ │ │ ├── computer-use-agents.mdx
│ │ │ │ ├── grounding-models.mdx
│ │ │ │ ├── human-in-the-loop.mdx
│ │ │ │ └── meta.json
│ │ │ ├── supported-model-providers
│ │ │ │ ├── cua-vlm-router.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ └── local-models.mdx
│ │ │ ├── telemetry.mdx
│ │ │ └── usage-tracking.mdx
│ │ ├── cli-playbook
│ │ │ ├── commands.mdx
│ │ │ ├── index.mdx
│ │ │ └── meta.json
│ │ ├── computer-sdk
│ │ │ ├── cloud-vm-management.mdx
│ │ │ ├── commands.mdx
│ │ │ ├── computer-server
│ │ │ │ ├── Commands.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── REST-API.mdx
│ │ │ │ └── WebSocket-API.mdx
│ │ │ ├── computer-ui.mdx
│ │ │ ├── computers.mdx
│ │ │ ├── custom-computer-handlers.mdx
│ │ │ ├── meta.json
│ │ │ ├── sandboxed-python.mdx
│ │ │ └── tracing-api.mdx
│ │ ├── example-usecases
│ │ │ ├── form-filling.mdx
│ │ │ ├── gemini-complex-ui-navigation.mdx
│ │ │ ├── meta.json
│ │ │ ├── post-event-contact-export.mdx
│ │ │ └── windows-app-behind-vpn.mdx
│ │ ├── get-started
│ │ │ ├── meta.json
│ │ │ └── quickstart.mdx
│ │ ├── index.mdx
│ │ ├── macos-vm-cli-playbook
│ │ │ ├── lume
│ │ │ │ ├── cli-reference.mdx
│ │ │ │ ├── faq.md
│ │ │ │ ├── http-api.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── prebuilt-images.mdx
│ │ │ ├── lumier
│ │ │ │ ├── building-lumier.mdx
│ │ │ │ ├── docker-compose.mdx
│ │ │ │ ├── docker.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ └── meta.json
│ │ │ └── meta.json
│ │ └── meta.json
│ ├── next.config.mjs
│ ├── package-lock.json
│ ├── package.json
│ ├── pnpm-lock.yaml
│ ├── postcss.config.mjs
│ ├── public
│ │ └── img
│ │ ├── agent_gradio_ui.png
│ │ ├── agent.png
│ │ ├── bg-dark.jpg
│ │ ├── bg-light.jpg
│ │ ├── cli.png
│ │ ├── computer.png
│ │ ├── grounding-with-gemini3.gif
│ │ ├── hero.png
│ │ ├── laminar_trace_example.png
│ │ ├── som_box_threshold.png
│ │ └── som_iou_threshold.png
│ ├── README.md
│ ├── source.config.ts
│ ├── src
│ │ ├── app
│ │ │ ├── (home)
│ │ │ │ ├── [[...slug]]
│ │ │ │ │ └── page.tsx
│ │ │ │ └── layout.tsx
│ │ │ ├── api
│ │ │ │ ├── posthog
│ │ │ │ │ └── [...path]
│ │ │ │ │ └── route.ts
│ │ │ │ └── search
│ │ │ │ └── route.ts
│ │ │ ├── favicon.ico
│ │ │ ├── global.css
│ │ │ ├── layout.config.tsx
│ │ │ ├── layout.tsx
│ │ │ ├── llms.mdx
│ │ │ │ └── [[...slug]]
│ │ │ │ └── route.ts
│ │ │ ├── llms.txt
│ │ │ │ └── route.ts
│ │ │ ├── robots.ts
│ │ │ └── sitemap.ts
│ │ ├── assets
│ │ │ ├── discord-black.svg
│ │ │ ├── discord-white.svg
│ │ │ ├── logo-black.svg
│ │ │ └── logo-white.svg
│ │ ├── components
│ │ │ ├── analytics-tracker.tsx
│ │ │ ├── cookie-consent.tsx
│ │ │ ├── doc-actions-menu.tsx
│ │ │ ├── editable-code-block.tsx
│ │ │ ├── footer.tsx
│ │ │ ├── hero.tsx
│ │ │ ├── iou.tsx
│ │ │ ├── mermaid.tsx
│ │ │ └── page-feedback.tsx
│ │ ├── lib
│ │ │ ├── llms.ts
│ │ │ └── source.ts
│ │ ├── mdx-components.tsx
│ │ └── providers
│ │ └── posthog-provider.tsx
│ └── tsconfig.json
├── examples
│ ├── agent_examples.py
│ ├── agent_ui_examples.py
│ ├── browser_tool_example.py
│ ├── cloud_api_examples.py
│ ├── computer_examples_windows.py
│ ├── computer_examples.py
│ ├── computer_ui_examples.py
│ ├── computer-example-ts
│ │ ├── .env.example
│ │ ├── .gitignore
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── README.md
│ │ ├── src
│ │ │ ├── helpers.ts
│ │ │ └── index.ts
│ │ └── tsconfig.json
│ ├── docker_examples.py
│ ├── evals
│ │ ├── hud_eval_examples.py
│ │ └── wikipedia_most_linked.txt
│ ├── pylume_examples.py
│ ├── sandboxed_functions_examples.py
│ ├── som_examples.py
│ ├── tracing_examples.py
│ ├── utils.py
│ └── winsandbox_example.py
├── img
│ ├── agent_gradio_ui.png
│ ├── agent.png
│ ├── cli.png
│ ├── computer.png
│ ├── logo_black.png
│ └── logo_white.png
├── libs
│ ├── kasm
│ │ ├── Dockerfile
│ │ ├── LICENSE
│ │ ├── README.md
│ │ └── src
│ │ └── ubuntu
│ │ └── install
│ │ └── firefox
│ │ ├── custom_startup.sh
│ │ ├── firefox.desktop
│ │ └── install_firefox.sh
│ ├── lume
│ │ ├── .cursorignore
│ │ ├── CONTRIBUTING.md
│ │ ├── Development.md
│ │ ├── img
│ │ │ └── cli.png
│ │ ├── Package.resolved
│ │ ├── Package.swift
│ │ ├── README.md
│ │ ├── resources
│ │ │ └── lume.entitlements
│ │ ├── scripts
│ │ │ ├── build
│ │ │ │ ├── build-debug.sh
│ │ │ │ ├── build-release-notarized.sh
│ │ │ │ └── build-release.sh
│ │ │ └── install.sh
│ │ ├── src
│ │ │ ├── Commands
│ │ │ │ ├── Clone.swift
│ │ │ │ ├── Config.swift
│ │ │ │ ├── Create.swift
│ │ │ │ ├── Delete.swift
│ │ │ │ ├── Get.swift
│ │ │ │ ├── Images.swift
│ │ │ │ ├── IPSW.swift
│ │ │ │ ├── List.swift
│ │ │ │ ├── Logs.swift
│ │ │ │ ├── Options
│ │ │ │ │ └── FormatOption.swift
│ │ │ │ ├── Prune.swift
│ │ │ │ ├── Pull.swift
│ │ │ │ ├── Push.swift
│ │ │ │ ├── Run.swift
│ │ │ │ ├── Serve.swift
│ │ │ │ ├── Set.swift
│ │ │ │ └── Stop.swift
│ │ │ ├── ContainerRegistry
│ │ │ │ ├── ImageContainerRegistry.swift
│ │ │ │ ├── ImageList.swift
│ │ │ │ └── ImagesPrinter.swift
│ │ │ ├── Errors
│ │ │ │ └── Errors.swift
│ │ │ ├── FileSystem
│ │ │ │ ├── Home.swift
│ │ │ │ ├── Settings.swift
│ │ │ │ ├── VMConfig.swift
│ │ │ │ ├── VMDirectory.swift
│ │ │ │ └── VMLocation.swift
│ │ │ ├── LumeController.swift
│ │ │ ├── Main.swift
│ │ │ ├── Server
│ │ │ │ ├── Handlers.swift
│ │ │ │ ├── HTTP.swift
│ │ │ │ ├── Requests.swift
│ │ │ │ ├── Responses.swift
│ │ │ │ └── Server.swift
│ │ │ ├── Utils
│ │ │ │ ├── CommandRegistry.swift
│ │ │ │ ├── CommandUtils.swift
│ │ │ │ ├── Logger.swift
│ │ │ │ ├── NetworkUtils.swift
│ │ │ │ ├── Path.swift
│ │ │ │ ├── ProcessRunner.swift
│ │ │ │ ├── ProgressLogger.swift
│ │ │ │ ├── String.swift
│ │ │ │ └── Utils.swift
│ │ │ ├── Virtualization
│ │ │ │ ├── DarwinImageLoader.swift
│ │ │ │ ├── DHCPLeaseParser.swift
│ │ │ │ ├── ImageLoaderFactory.swift
│ │ │ │ └── VMVirtualizationService.swift
│ │ │ ├── VM
│ │ │ │ ├── DarwinVM.swift
│ │ │ │ ├── LinuxVM.swift
│ │ │ │ ├── VM.swift
│ │ │ │ ├── VMDetails.swift
│ │ │ │ ├── VMDetailsPrinter.swift
│ │ │ │ ├── VMDisplayResolution.swift
│ │ │ │ └── VMFactory.swift
│ │ │ └── VNC
│ │ │ ├── PassphraseGenerator.swift
│ │ │ └── VNCService.swift
│ │ └── tests
│ │ ├── Mocks
│ │ │ ├── MockVM.swift
│ │ │ ├── MockVMVirtualizationService.swift
│ │ │ └── MockVNCService.swift
│ │ ├── VM
│ │ │ └── VMDetailsPrinterTests.swift
│ │ ├── VMTests.swift
│ │ ├── VMVirtualizationServiceTests.swift
│ │ └── VNCServiceTests.swift
│ ├── lumier
│ │ ├── .dockerignore
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── bin
│ │ │ └── entry.sh
│ │ ├── config
│ │ │ └── constants.sh
│ │ ├── hooks
│ │ │ └── on-logon.sh
│ │ └── lib
│ │ ├── utils.sh
│ │ └── vm.sh
│ ├── python
│ │ ├── agent
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── agent
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── adapters
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── cua_adapter.py
│ │ │ │ │ ├── huggingfacelocal_adapter.py
│ │ │ │ │ ├── human_adapter.py
│ │ │ │ │ ├── mlxvlm_adapter.py
│ │ │ │ │ └── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ └── qwen2_5_vl.py
│ │ │ │ ├── agent.py
│ │ │ │ ├── callbacks
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── budget_manager.py
│ │ │ │ │ ├── image_retention.py
│ │ │ │ │ ├── logging.py
│ │ │ │ │ ├── operator_validator.py
│ │ │ │ │ ├── pii_anonymization.py
│ │ │ │ │ ├── prompt_instructions.py
│ │ │ │ │ ├── telemetry.py
│ │ │ │ │ └── trajectory_saver.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── computers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cua.py
│ │ │ │ │ └── custom.py
│ │ │ │ ├── decorators.py
│ │ │ │ ├── human_tool
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ ├── server.py
│ │ │ │ │ └── ui.py
│ │ │ │ ├── integrations
│ │ │ │ │ └── hud
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── agent.py
│ │ │ │ │ └── proxy.py
│ │ │ │ ├── loops
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── anthropic.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── composed_grounded.py
│ │ │ │ │ ├── gelato.py
│ │ │ │ │ ├── gemini.py
│ │ │ │ │ ├── generic_vlm.py
│ │ │ │ │ ├── glm45v.py
│ │ │ │ │ ├── gta1.py
│ │ │ │ │ ├── holo.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── model_types.csv
│ │ │ │ │ ├── moondream3.py
│ │ │ │ │ ├── omniparser.py
│ │ │ │ │ ├── openai.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ ├── uiins.py
│ │ │ │ │ ├── uitars.py
│ │ │ │ │ └── uitars2.py
│ │ │ │ ├── proxy
│ │ │ │ │ ├── examples.py
│ │ │ │ │ └── handlers.py
│ │ │ │ ├── responses.py
│ │ │ │ ├── tools
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── browser_tool.py
│ │ │ │ ├── types.py
│ │ │ │ └── ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ └── gradio
│ │ │ │ ├── __init__.py
│ │ │ │ ├── app.py
│ │ │ │ └── ui_components.py
│ │ │ ├── benchmarks
│ │ │ │ ├── .gitignore
│ │ │ │ ├── contrib.md
│ │ │ │ ├── interactive.py
│ │ │ │ ├── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ └── gta1.py
│ │ │ │ ├── README.md
│ │ │ │ ├── ss-pro.py
│ │ │ │ ├── ss-v2.py
│ │ │ │ └── utils.py
│ │ │ ├── example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer_agent.py
│ │ ├── bench-ui
│ │ │ ├── bench_ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api.py
│ │ │ │ └── child.py
│ │ │ ├── examples
│ │ │ │ ├── folder_example.py
│ │ │ │ ├── gui
│ │ │ │ │ ├── index.html
│ │ │ │ │ ├── logo.svg
│ │ │ │ │ └── styles.css
│ │ │ │ ├── output_overlay.png
│ │ │ │ └── simple_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ └── test_port_detection.py
│ │ ├── computer
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer
│ │ │ │ ├── __init__.py
│ │ │ │ ├── computer.py
│ │ │ │ ├── diorama_computer.py
│ │ │ │ ├── helpers.py
│ │ │ │ ├── interface
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ ├── models.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── logger.py
│ │ │ │ ├── models.py
│ │ │ │ ├── providers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cloud
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── docker
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── lume
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── lume_api.py
│ │ │ │ │ ├── lumier
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── types.py
│ │ │ │ │ └── winsandbox
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── provider.py
│ │ │ │ │ └── setup_script.ps1
│ │ │ │ ├── tracing_wrapper.py
│ │ │ │ ├── tracing.py
│ │ │ │ ├── ui
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ └── gradio
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── app.py
│ │ │ │ └── utils.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer.py
│ │ ├── computer-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── browser.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── diorama
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── diorama_computer.py
│ │ │ │ │ ├── diorama.py
│ │ │ │ │ ├── draw.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── safezone.py
│ │ │ │ ├── handlers
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── main.py
│ │ │ │ ├── server.py
│ │ │ │ ├── utils
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── wallpaper.py
│ │ │ │ └── watchdog.py
│ │ │ ├── examples
│ │ │ │ ├── __init__.py
│ │ │ │ └── usage_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ ├── run_server.py
│ │ │ ├── test_connection.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_server.py
│ │ ├── core
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── core
│ │ │ │ ├── __init__.py
│ │ │ │ └── telemetry
│ │ │ │ ├── __init__.py
│ │ │ │ └── posthog.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_telemetry.py
│ │ ├── mcp-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── build-extension.py
│ │ │ ├── CONCURRENT_SESSIONS.md
│ │ │ ├── desktop-extension
│ │ │ │ ├── cua-extension.mcpb
│ │ │ │ ├── desktop_extension.png
│ │ │ │ ├── manifest.json
│ │ │ │ ├── README.md
│ │ │ │ ├── requirements.txt
│ │ │ │ ├── run_server.sh
│ │ │ │ └── setup.py
│ │ │ ├── mcp_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── server.py
│ │ │ │ └── session_manager.py
│ │ │ ├── pdm.lock
│ │ │ ├── pyproject.toml
│ │ │ ├── QUICK_TEST_COMMANDS.sh
│ │ │ ├── quick_test_local_option.py
│ │ │ ├── README.md
│ │ │ ├── scripts
│ │ │ │ ├── install_mcp_server.sh
│ │ │ │ └── start_mcp_server.sh
│ │ │ ├── test_mcp_server_local_option.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_mcp_server.py
│ │ ├── pylume
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_pylume.py
│ │ └── som
│ │ ├── .bumpversion.cfg
│ │ ├── LICENSE
│ │ ├── poetry.toml
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ ├── som
│ │ │ ├── __init__.py
│ │ │ ├── detect.py
│ │ │ ├── detection.py
│ │ │ ├── models.py
│ │ │ ├── ocr.py
│ │ │ ├── util
│ │ │ │ └── utils.py
│ │ │ └── visualization.py
│ │ └── tests
│ │ ├── conftest.py
│ │ └── test_omniparser.py
│ ├── qemu-docker
│ │ ├── linux
│ │ │ ├── Dockerfile
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ ├── entry.sh
│ │ │ └── vm
│ │ │ ├── image
│ │ │ │ └── README.md
│ │ │ └── setup
│ │ │ ├── install.sh
│ │ │ ├── setup-cua-server.sh
│ │ │ └── setup.sh
│ │ ├── README.md
│ │ └── windows
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── entry.sh
│ │ └── vm
│ │ ├── image
│ │ │ └── README.md
│ │ └── setup
│ │ ├── install.bat
│ │ ├── on-logon.ps1
│ │ ├── setup-cua-server.ps1
│ │ ├── setup-utils.psm1
│ │ └── setup.ps1
│ ├── typescript
│ │ ├── .gitignore
│ │ ├── .nvmrc
│ │ ├── agent
│ │ │ ├── examples
│ │ │ │ ├── playground-example.html
│ │ │ │ └── README.md
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── client.ts
│ │ │ │ ├── index.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ └── client.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── computer
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── computer
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── providers
│ │ │ │ │ │ ├── base.ts
│ │ │ │ │ │ ├── cloud.ts
│ │ │ │ │ │ └── index.ts
│ │ │ │ │ └── types.ts
│ │ │ │ ├── index.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── base.ts
│ │ │ │ │ ├── factory.ts
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── linux.ts
│ │ │ │ │ ├── macos.ts
│ │ │ │ │ └── windows.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ ├── computer
│ │ │ │ │ └── cloud.test.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── factory.test.ts
│ │ │ │ │ ├── index.test.ts
│ │ │ │ │ ├── linux.test.ts
│ │ │ │ │ ├── macos.test.ts
│ │ │ │ │ └── windows.test.ts
│ │ │ │ └── setup.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── core
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── index.ts
│ │ │ │ └── telemetry
│ │ │ │ ├── clients
│ │ │ │ │ ├── index.ts
│ │ │ │ │ └── posthog.ts
│ │ │ │ └── index.ts
│ │ │ ├── tests
│ │ │ │ └── telemetry.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── cua-cli
│ │ │ ├── .gitignore
│ │ │ ├── .prettierrc
│ │ │ ├── bun.lock
│ │ │ ├── CLAUDE.md
│ │ │ ├── index.ts
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── auth.ts
│ │ │ │ ├── cli.ts
│ │ │ │ ├── commands
│ │ │ │ │ ├── auth.ts
│ │ │ │ │ └── sandbox.ts
│ │ │ │ ├── config.ts
│ │ │ │ ├── http.ts
│ │ │ │ ├── storage.ts
│ │ │ │ └── util.ts
│ │ │ └── tsconfig.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── pnpm-workspace.yaml
│ │ └── README.md
│ └── xfce
│ ├── .dockerignore
│ ├── .gitignore
│ ├── Development.md
│ ├── Dockerfile
│ ├── Dockerfile.dev
│ ├── README.md
│ └── src
│ ├── scripts
│ │ ├── resize-display.sh
│ │ ├── start-computer-server.sh
│ │ ├── start-novnc.sh
│ │ ├── start-vnc.sh
│ │ └── xstartup.sh
│ ├── supervisor
│ │ └── supervisord.conf
│ └── xfce-config
│ ├── helpers.rc
│ ├── xfce4-power-manager.xml
│ └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│ ├── agent_nb.ipynb
│ ├── blog
│ │ ├── build-your-own-operator-on-macos-1.ipynb
│ │ └── build-your-own-operator-on-macos-2.ipynb
│ ├── composite_agents_docker_nb.ipynb
│ ├── computer_nb.ipynb
│ ├── computer_server_nb.ipynb
│ ├── customizing_computeragent.ipynb
│ ├── eval_osworld.ipynb
│ ├── ollama_nb.ipynb
│ ├── README.md
│ ├── sota_hackathon_cloud.ipynb
│ └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── install-cli.ps1
│ ├── install-cli.sh
│ ├── playground-docker.sh
│ ├── playground.sh
│ ├── run-docker-dev.sh
│ └── typescript-typecheck.js
├── TESTING.md
├── tests
│ ├── agent_loop_testing
│ │ ├── agent_test.py
│ │ └── README.md
│ ├── pytest.ini
│ ├── shell_cmd.py
│ ├── test_files.py
│ ├── test_mcp_server_session_management.py
│ ├── test_mcp_server_streaming.py
│ ├── test_shell_bash.py
│ ├── test_telemetry.py
│ ├── test_tracing.py
│ ├── test_venv.py
│ └── test_watchdog.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/libs/python/computer/computer/ui/gradio/app.py:
--------------------------------------------------------------------------------
```python
"""
Advanced Gradio UI for Computer Interface
This is a Gradio interface for the Computer Interface
"""
import asyncio
import base64
import glob
import hashlib
import io
import json
import os
import random
import random as rand
import uuid
from datetime import datetime
import datasets
import gradio as gr
import pandas as pd
from computer import Computer, VMProviderType
from datasets import Dataset, Features, Sequence, concatenate_datasets
from gradio.components import ChatMessage
from huggingface_hub import DatasetCard, DatasetCardData
from PIL import Image
# Task examples as dictionaries with task string and setup function
TASK_EXAMPLES = [
{
"task": "Open the shopping list on my desktop and add all the items to a Doordash cart",
"setup": lambda computer: create_shopping_list_file(computer),
},
{
"task": "Do a random miniwob++ task, output the task name in <task> </task> tags and your reward in <reward> </reward> tags"
},
]
# Generate random shopping list and save to desktop using computer interface
async def create_shopping_list_file(computer):
items = [
"Milk",
"Eggs",
"Bread",
"Apples",
"Bananas",
"Chicken",
"Rice",
"Cereal",
"Coffee",
"Cheese",
"Pasta",
"Tomatoes",
"Potatoes",
"Onions",
"Carrots",
"Ice Cream",
"Yogurt",
"Cookies",
]
# Select 1-5 random items
num_items = rand.randint(1, 5)
selected_items = rand.sample(items, num_items)
# Create shopping list content
content = "SHOPPING LIST:\n\n"
for item in selected_items:
content += f"- {item}\n"
# Create a temporary file with the content
temp_file_path = "/tmp/shopping_list.txt"
# Use run_command to create the file on the desktop
desktop_path = "~/Desktop"
file_path = f"{desktop_path}/shopping_list.txt"
# Create the file using echo command
cmd = f"echo '{content}' > {file_path}"
stdout, stderr = await computer.interface.run_command(cmd)
print(f"Created shopping list at {file_path} with {num_items} items")
if stderr:
print(f"Error: {stderr}")
return file_path
import typing
# Load valid keys from the Key enum in models.py
from computer.interface.models import Key
VALID_KEYS = [key.value for key in Key] + [
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h",
"i",
"j",
"k",
"l",
"m",
"n",
"o",
"p",
"q",
"r",
"s",
"t",
"u",
"v",
"w",
"x",
"y",
"z",
"0",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
]
VALID_KEYS = list(dict.fromkeys(VALID_KEYS)) # remove duplicates, preserve order
# List of random words for demo naming
RANDOM_WORDS = [
"apple",
"banana",
"cherry",
"dolphin",
"elephant",
"forest",
"giraffe",
"harmony",
"igloo",
"jungle",
"kangaroo",
"lemon",
"mountain",
"notebook",
"ocean",
"penguin",
"quasar",
"rainbow",
"ohana",
"sunflower",
"tiger",
"umbrella",
"volcano",
"waterfall",
"xylophone",
"yellow",
"zebra",
]
# Generate a random demo name with 3 words
def generate_random_demo_name():
return " ".join(random.sample(RANDOM_WORDS, 3))
# Global session ID for tracking this run
session_id = str(uuid.uuid4())
# Global computer instance, tool call logs, memory, and chatbot messages
computer = None
tool_call_logs = []
memory = ""
last_action = {"name": "", "action": "", "arguments": {}}
last_screenshot = None # Store the most recent screenshot
last_screenshot_before = None # Store the most [-2]th recent screenshot
screenshot_images = [] # Array to store all screenshot images
# Define a constant for the output directory
OUTPUT_DIR = "examples/output"
SESSION_DIR = os.path.join(OUTPUT_DIR, "sessions")
def load_all_sessions(with_images=False):
"""Load and concatenate all session datasets into a single Dataset"""
try:
# Get all session folders
if not os.path.exists(SESSION_DIR):
return None
session_folders = glob.glob(os.path.join(SESSION_DIR, "*"))
if not session_folders:
return None
# Load each dataset and concatenate
all_datasets = []
for folder in session_folders:
try:
ds = Dataset.load_from_disk(folder)
if not with_images:
ds = ds.remove_columns("images")
# Add folder name to identify the source
folder_name = os.path.basename(folder)
# Process the messages from tool_call_logs
def process_messages(example):
messages_text = []
current_role = None
# Process the logs if they exist in the example
if "tool_calls" in example:
# Use the existing get_chatbot_messages function with explicit logs parameter
formatted_msgs = get_chatbot_messages(
logs=json.loads(example["tool_calls"])
)
# Process each ChatMessage and extract either title or content
for msg in formatted_msgs:
# Check if role has changed
if msg.role != current_role:
# Add a line with the new role if it changed
if current_role is not None: # Skip for the first message
messages_text.append(
""
) # Add an empty line between role changes
messages_text.append(f"{msg.role}")
current_role = msg.role
# Add the message content
if msg.metadata and "title" in msg.metadata:
# Use the title if available
messages_text.append(msg.metadata["title"])
else:
# Use just the content without role prefix since we're adding role headers
messages_text.append(msg.content)
# Join all messages with newlines
all_messages = "\n".join(messages_text)
return {
**example,
"source_folder": folder_name,
"messages": all_messages,
}
# Apply the processing to each example
ds = ds.map(process_messages)
all_datasets.append(ds)
except Exception as e:
print(f"Error loading dataset from {folder}: {str(e)}")
if not all_datasets:
return None
# Concatenate all datasets
return concatenate_datasets(all_datasets)
except Exception as e:
print(f"Error loading sessions: {str(e)}")
return None
def get_existing_tags():
"""Extract all existing tags from saved demonstrations"""
all_sessions = load_all_sessions()
if all_sessions is None:
return [], []
# Convert to pandas and extract tags
df = all_sessions.to_pandas()
if "tags" not in df.columns:
return []
# Extract all tags and flatten the list
all_tags = []
for tags in df["tags"].dropna():
all_tags += list(tags)
# Remove duplicates and sort
unique_tags = sorted(list(set(all_tags)))
return unique_tags, unique_tags
def get_sessions_data():
"""Load all sessions dataset"""
combined_ds = load_all_sessions()
if combined_ds:
# Convert to pandas and select columns
df = combined_ds.to_pandas()
columns = ["name", "messages", "source_folder"]
if "tags" in df.columns:
columns.append("tags")
return df[columns]
else:
return pd.DataFrame({"name": [""], "messages": [""], "source_folder": [""]})
def upload_to_huggingface(dataset_name, visibility, filter_tags=None):
"""Upload sessions to HuggingFace Datasets Hub, optionally filtered by tags
Args:
dataset_name: Name of the dataset on HuggingFace (format: username/dataset-name)
visibility: 'public' or 'private'
filter_tags: List of tags to filter by (optional)
Returns:
Status message
"""
try:
# Check if HF_TOKEN is available
hf_token = os.environ.get("HF_TOKEN")
if not hf_token:
return "Error: HF_TOKEN environment variable not found. Please set it before uploading."
# Check if dataset name is in the correct format
if not dataset_name or "/" not in dataset_name:
return "Dataset name must be in the format 'username/dataset-name'"
# Load all sessions
combined_ds = load_all_sessions(with_images=True)
if combined_ds is None or len(combined_ds) == 0:
return "No sessions found to upload."
# If tag filtering is provided, filter the datasets
if filter_tags:
# Convert to pandas to filter
df = combined_ds.to_pandas()
if "tags" not in df.columns:
return "No sessions with tags found to filter."
# Get list of source folders for sessions that have any of the selected tags
matching_folders = []
for _, row in df.iterrows():
if not len(row.get("tags")):
continue
if any(tag in list(row.get("tags", [])) for tag in filter_tags):
matching_folders.append(row["source_folder"])
if not matching_folders:
return "No sessions matched the selected tag filters."
# Load only the matching datasets
filtered_datasets = []
for folder in matching_folders:
folder_path = os.path.join(SESSION_DIR, folder)
if os.path.exists(folder_path):
try:
ds = Dataset.load_from_disk(folder_path)
filtered_datasets.append(ds)
except Exception as e:
print(f"Error loading dataset from {folder}: {str(e)}")
if not len(filtered_datasets):
return "Error loading the filtered sessions."
# Create a new combined dataset with just the filtered sessions
upload_ds = concatenate_datasets(filtered_datasets)
session_count = len(upload_ds)
else:
# Use all sessions
upload_ds = combined_ds
session_count = len(upload_ds)
tags = ["cua"]
if isinstance(filter_tags, list):
tags += filter_tags
# Push to HuggingFace
upload_ds.push_to_hub(
dataset_name,
private=visibility == "private",
token=hf_token,
commit_message="(Built with github.com/trycua/cua)",
)
# Create dataset card
card_data = DatasetCardData(
language="en", license="mit", task_categories=["visual-question-answering"], tags=tags
)
card = DatasetCard.from_template(
card_data=card_data,
template_str="---\n{{ card_data }}\n---\n\n# Uploaded computer interface trajectories\n\nThese trajectories were generated and uploaded using [cua](https://github.com/trycua/cua)",
)
card.push_to_hub(dataset_name, commit_message="Cua dataset card")
return f"Successfully uploaded {session_count} sessions to HuggingFace Datasets Hub at https://huggingface.co/datasets/{dataset_name}"
except Exception as e:
return f"Error uploading to HuggingFace: {str(e)}"
def save_demonstration(log_data, demo_name=None, demo_tags=None):
"""Save the current tool call logs as a demonstration file using HuggingFace datasets"""
global tool_call_logs, session_id
if not tool_call_logs:
return "No data to save", None
# Create output directories if they don't exist
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
if not os.path.exists(SESSION_DIR):
os.makedirs(SESSION_DIR)
# Use default name if none provided
if not demo_name or demo_name.strip() == "":
demo_name = generate_random_demo_name()
# Process tags
tags = []
if demo_tags:
if isinstance(demo_tags, list):
tags = demo_tags
elif isinstance(demo_tags, str):
# Split by comma if it's a comma-separated string
tags = [tag.strip() for tag in demo_tags.split(",") if tag.strip()]
log_time = datetime.now().isoformat()
def msg_to_dict(msg: ChatMessage):
return {"role": msg.role, "content": str(msg.content), "metadata": dict(msg.metadata)}
# Create dataset
demonstration_dataset = [
{
"timestamp": str(log_time),
"session_id": str(session_id),
"name": str(demo_name),
"tool_calls": json.dumps(tool_call_logs),
"messages": json.dumps(
[msg_to_dict(msg) for msg in get_chatbot_messages(tool_call_logs)]
),
"tags": list(tags),
"images": [Image.open(io.BytesIO(img)) for img in screenshot_images],
}
]
try:
# Create a new HuggingFace dataset from the current session
new_session_ds = Dataset.from_list(
demonstration_dataset,
features=Features(
{
"timestamp": datasets.Value("string"),
"session_id": datasets.Value("string"),
"name": datasets.Value("string"),
"tool_calls": datasets.Value("string"),
"messages": datasets.Value("string"),
"tags": Sequence(datasets.Value("string")),
"images": Sequence(datasets.Image()),
}
),
)
# Create a unique folder name with demonstration name, session ID and timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_name = demo_name.replace(" ", "_").replace("/", "_").replace("\\", "_")[:50]
session_folder = os.path.join(SESSION_DIR, f"{safe_name}_{session_id}_{timestamp}")
# Create the directory if it doesn't exist
if not os.path.exists(session_folder):
os.makedirs(session_folder)
# Save the dataset to the unique folder
new_session_ds.save_to_disk(session_folder)
return f"Session saved to {session_folder}"
except Exception as e:
return f"Error saving demonstration: {str(e)}"
def log_tool_call(name, action, arguments, result=None):
"""Log a tool call with unique IDs and results"""
global tool_call_logs
# Create arguments JSON that includes the action
args = {"action": action, **arguments}
# Process result for logging
processed_result = {}
if result:
for key, value in result.items():
if key == "screenshot" and isinstance(value, bytes):
# Add screenshot to the array and get its index
screenshot_index = len(screenshot_images)
screenshot_images.append(value)
# Create hash of screenshot data that includes the index
hash_value = hashlib.md5(value).hexdigest()
processed_result[key] = f"<Screenshot: MD5 {hash_value}:{screenshot_index}>"
elif key == "clipboard" and isinstance(value, str):
processed_result[key] = value
elif isinstance(value, bytes):
# Create hash for any binary data
hash_value = hashlib.md5(value).hexdigest()
processed_result[key] = f"<Binary data: MD5 {hash_value}>"
else:
processed_result[key] = value
# Create the tool call log entry
log_entry = {
"type": "function_call",
"name": name,
"arguments": json.dumps(args),
"result": processed_result if result else None,
}
# Add to logs and immediately flush by printing
tool_call_logs.append(log_entry)
print(f"Tool call logged: {json.dumps(log_entry)}")
return log_entry
async def execute(name, action, arguments):
"""Execute a tool call, log it, and return any results"""
global computer, last_action, last_screenshot, last_screenshot_before
last_screenshot_before = last_screenshot
# Store last action for reasoning box
last_action = {"name": name, "action": action, "arguments": arguments}
results = {}
# Execute the action based on name and action
if name == "computer":
if computer is None:
return {}
# Get the method from the computer interface
if action == "initialize":
# Already initialized, just log
pass
elif action == "wait":
# Wait for 1 second
await asyncio.sleep(1)
elif action == "screenshot":
pass
elif action == "move_cursor":
await computer.interface.move_cursor(arguments["x"], arguments["y"])
await asyncio.sleep(0.2)
elif action == "left_click":
if "x" in arguments and "y" in arguments:
await computer.interface.move_cursor(arguments["x"], arguments["y"])
await computer.interface.left_click(arguments["x"], arguments["y"])
await asyncio.sleep(0.5)
elif action == "right_click":
if "x" in arguments and "y" in arguments:
await computer.interface.move_cursor(arguments["x"], arguments["y"])
await computer.interface.right_click(arguments["x"], arguments["y"])
await asyncio.sleep(0.5)
elif action == "double_click":
if "x" in arguments and "y" in arguments:
await computer.interface.move_cursor(arguments["x"], arguments["y"])
await computer.interface.double_click(arguments["x"], arguments["y"])
await asyncio.sleep(0.5)
elif action == "type_text":
await computer.interface.type_text(arguments["text"])
await asyncio.sleep(0.3)
if "press_enter" in arguments and arguments["press_enter"]:
await computer.interface.press_key("enter")
elif action == "press_key":
await computer.interface.press_key(arguments["key"])
await asyncio.sleep(0.3)
elif action == "scroll_up":
await computer.interface.scroll_up(arguments["clicks"])
await asyncio.sleep(0.3)
elif action == "scroll_down":
await computer.interface.scroll_down(arguments["clicks"])
await asyncio.sleep(0.3)
elif action == "send_hotkey":
await computer.interface.hotkey(*arguments.get("keys", []))
await asyncio.sleep(0.3)
elif action == "copy_to_clipboard":
results["clipboard"] = await computer.interface.copy_to_clipboard()
elif action == "set_clipboard":
await computer.interface.set_clipboard(arguments["text"])
elif action == "run_command":
stdout, stderr = await computer.interface.run_command(arguments["command"])
results["stdout"] = stdout
results["stderr"] = stderr
elif action == "shutdown":
await computer.stop()
elif action == "done" or action == "fail":
# Just a marker, doesn't do anything
pass
# Add a screenshot to the results for every action (if not already there)
if action != "shutdown" and "screenshot" not in results:
results["screenshot"] = await computer.interface.screenshot()
elif name == "message":
if action == "submit":
# No action needed for message submission except logging
# If requested, take a screenshot after message
if arguments.get("screenshot_after", False) and computer is not None:
results["screenshot"] = await computer.interface.screenshot()
# Log the tool call with results
log_tool_call(name, action, arguments, results)
if "screenshot" in results:
# Convert bytes to PIL Image
screenshot_img = Image.open(io.BytesIO(results["screenshot"]))
results["screenshot"] = screenshot_img
# Update last_screenshot with the new screenshot
last_screenshot = screenshot_img
return results
async def handle_init_computer(
os_choice: str, app_list=None, provider="lume", container_name=None, api_key=None
):
"""Initialize the computer instance and tools for macOS or Ubuntu or Windows
Args:
os_choice: The OS to use ("macOS" or "Ubuntu" or "Windows")
app_list: Optional list of apps to focus on using the app-use experiment
provider: The provider to use ("lume" or "self" or "cloud")
container_name: The container name to use for cloud provider
api_key: The API key to use for cloud provider
"""
global computer, tool_call_logs, tools
# Check if we should enable app-use experiment
use_app_experiment = app_list and len(app_list) > 0
experiments = ["app-use"] if use_app_experiment else None
# Determine if we should use host computer server
use_host_computer_server = provider == "self"
if os_choice == "Ubuntu":
os_type_str = "linux"
image_str = "ubuntu-noble-vanilla:latest"
elif os_choice == "Windows":
os_type_str = "windows"
image_str = "windows-11-vanilla:latest"
else:
os_type_str = "macos"
image_str = "macos-sequoia-cua:latest"
# Create computer instance with appropriate configuration
if use_host_computer_server:
computer = Computer(
os_type=os_type_str, use_host_computer_server=True, experiments=experiments
)
elif provider == "cloud":
# Use API key from environment variable or field input
cloud_api_key = os.environ.get("CUA_API_KEY") or api_key
computer = Computer(
os_type=os_type_str,
provider_type=VMProviderType.CLOUD,
name=container_name,
api_key=cloud_api_key,
experiments=experiments,
)
elif provider == "winsandbox":
computer = Computer(
os_type="windows", provider_type=VMProviderType.WINSANDBOX, experiments=experiments
)
else:
computer = Computer(
image=image_str,
os_type=os_type_str,
provider_type=VMProviderType.LUME,
display="1024x768",
memory="8GB",
cpu="4",
experiments=experiments,
)
await computer.run()
# If app list is provided, create desktop from apps
if use_app_experiment:
computer = computer.create_desktop_from_apps(app_list)
# Log computer initialization as a tool call
init_params = {"os": os_type_str, "provider": provider}
# Add VM-specific parameters if not using host computer server
if not use_host_computer_server:
init_params.update({"image": image_str, "display": "1024x768", "memory": "8GB", "cpu": "4"})
# Add app list to the log if provided
if use_app_experiment:
init_params["apps"] = app_list
init_params["experiments"] = ["app-use"]
# Add container name to the log if using cloud provider
if provider == "cloud":
init_params["container_name"] = container_name
result = await execute("computer", "initialize", init_params)
return result["screenshot"], json.dumps(tool_call_logs, indent=2)
async def handle_screenshot():
"""Take a screenshot and return it as a PIL Image"""
global computer
if computer is None:
return None
result = await execute("computer", "screenshot", {})
return result["screenshot"]
async def handle_wait():
"""Wait for 1 second and then take a screenshot"""
global computer
if computer is None:
return None
# Execute wait action
result = await execute("computer", "wait", {})
return result["screenshot"], json.dumps(tool_call_logs, indent=2)
async def handle_click(evt: gr.SelectData, img, click_type):
"""Handle click events on the image based on click type"""
global computer
if computer is None:
return img, json.dumps(tool_call_logs, indent=2)
# Get the coordinates of the click
x, y = evt.index
# Move cursor and perform click
result = await execute("computer", click_type, {"x": x, "y": y})
# Take a new screenshot to show the result
return result["screenshot"], json.dumps(tool_call_logs, indent=2)
async def handle_type(text, press_enter=False):
"""Type text into the computer"""
global computer
if computer is None or not text:
return await handle_screenshot(), json.dumps(tool_call_logs, indent=2)
result = await execute("computer", "type_text", {"text": text, "press_enter": press_enter})
return result["screenshot"], json.dumps(tool_call_logs, indent=2)
async def handle_copy():
"""Copy selected content to clipboard and return it"""
global computer
if computer is None:
return "Computer not initialized", json.dumps(tool_call_logs, indent=2)
result = await execute("computer", "copy_to_clipboard", {})
content = result.get("clipboard", "No content copied")
return content, json.dumps(tool_call_logs, indent=2)
async def handle_set_clipboard(text):
"""Set clipboard content"""
global computer
if computer is None:
return "Computer not initialized", json.dumps(tool_call_logs, indent=2)
await execute("computer", "set_clipboard", {"text": text})
return f"Clipboard set to: {text}", json.dumps(tool_call_logs, indent=2)
async def handle_run_command(command):
"""Run a shell command"""
global computer
if computer is None:
return "Computer not initialized", json.dumps(tool_call_logs, indent=2)
# Execute the run_command action and log it
result = await execute("computer", "run_command", {"command": command})
# Get the result from the computer interface
stdout, stderr = result.get("stdout"), result.get("stderr")
# Format the output
output = ""
if stdout:
output += f"STDOUT:\n{stdout}\n"
if stderr:
output += f"STDERR:\n{stderr}\n"
if not output:
output = "(No output)"
return output, json.dumps(tool_call_logs, indent=2)
async def handle_shutdown():
"""Shutdown the computer instance"""
global computer
if computer is None:
return "Computer not initialized", json.dumps(tool_call_logs, indent=2)
await execute("computer", "shutdown", {})
computer = None
return "Computer shut down", json.dumps(tool_call_logs, indent=2)
async def handle_memory(memory_text):
"""Update the global memory"""
global memory
await execute("memory", "update", {"memory_text": memory_text})
memory = memory_text
return "Memory updated"
async def update_reasoning(reasoning_text, is_erroneous=False):
"""Update the reasoning for the last action"""
global last_action, tool_call_logs
if not last_action["name"]:
return "No action to update reasoning for"
# Find the last log entry that matches the last action
for log_entry in reversed(tool_call_logs):
if (
log_entry["name"] == last_action["name"]
and json.loads(log_entry["arguments"]).get("action") == last_action["action"]
):
# Add reasoning to the log entry
log_entry["reasoning"] = reasoning_text
# If marked as erroneous, set weight to 0
log_entry["weight"] = 0 if is_erroneous else 1
break
return "Reasoning updated"
async def clear_log():
"""Clear the tool call logs"""
global tool_call_logs, screenshot_images
screenshot_images = []
tool_call_logs = []
return json.dumps(tool_call_logs, indent=2)
def get_last_action_display():
"""Format the last action for display in the reasoning box"""
global last_action
if not last_action["name"]:
return "No actions performed yet"
action_str = f"Tool: {last_action['name']}\nAction: {last_action['action']}"
if last_action["arguments"]:
args_str = "\nArguments:\n"
for k, v in last_action["arguments"].items():
args_str += f" {k}: {v}\n"
action_str += args_str
return action_str
def get_memory():
"""Get the current memory"""
global memory
return memory
def get_chatbot_messages(logs=None):
"""Format chat messages for gr.Chatbot component
Args:
logs: Optional list of tool call logs. If None, uses global tool_call_logs.
Returns:
List of ChatMessage objects
"""
formatted_messages = []
# Use provided logs if specified, otherwise use global tool_call_logs
logs_to_process = logs if logs is not None else tool_call_logs
for tool_call in logs_to_process:
if tool_call["type"] != "function_call":
continue
name = tool_call["name"]
arguments = json.loads(tool_call["arguments"])
role = (
tool_call["role"]
if "role" in tool_call
else arguments["role"] if "role" in arguments else "assistant"
)
if "reasoning" in tool_call:
formatted_messages += [
ChatMessage(
role=role, content=tool_call["reasoning"], metadata={"title": "🧠 Reasoning"}
)
]
# Format tool calls with titles
if name == "message":
formatted_messages += [ChatMessage(role=role, content=arguments["text"])]
else:
# Format tool calls with a title
action = arguments.get("action", "")
# Define dictionary for title mappings
title_mappings = {
"wait": "⏳ Waiting...",
"done": "✅ Task Completed",
"fail": "❌ Task Failed",
"memory.update": "🧠 Memory Updated",
"screenshot": "📸 Taking Screenshot",
"move_cursor": "🖱️ Moving Cursor",
"left_click": "🖱️ Left Click",
"right_click": "🖱️ Right Click",
"double_click": "🖱️ Double Click",
"type_text": "⌨️ Typing Text",
"press_key": "⌨️ Pressing Key",
"send_hotkey": "⌨️ Sending Hotkey",
"copy_to_clipboard": "📋 Copying to Clipboard",
"set_clipboard": "📋 Setting Clipboard",
"run_command": "🖥️ Running Shell Command",
"initialize": "🚀 Initializing Computer",
"shutdown": "🛑 Shutting Down",
}
# Look up title based on name.action or just action
key = f"{name}.{action}"
if key in title_mappings:
title = title_mappings[key]
elif action in title_mappings:
title = title_mappings[action]
else:
title = f"🛠️ {name.capitalize()}: {action}"
# Always set status to done
status = "done"
# Format the response content
content_parts = []
# Add arguments
if arguments:
content_parts.append("**Arguments:**")
for k, v in arguments.items():
if k != "action": # Skip action as it's in the title
content_parts.append(f"- {k}: {v}")
# Add results if available
if tool_call.get("result"):
content_parts.append("\n**Results:**")
content_parts.append(f"```json\n{json.dumps(tool_call['result'], indent=4)}\n```")
# for k, v in tool_call['result'].items():
# content_parts.append(f"- {k}: {v}")
# Join all content parts
content = "\n".join(content_parts)
formatted_messages += [
ChatMessage(
role="assistant", content=content, metadata={"title": title, "status": status}
)
]
return formatted_messages
async def submit_message(message_text, role, screenshot_after=False):
"""Submit a message with specified role (user or assistant)"""
global last_screenshot
# Log the message submission and get result (may include screenshot)
result = await execute(
"message",
"submit",
{"role": role, "text": message_text, "screenshot_after": screenshot_after},
)
# Update return values based on whether a screenshot was taken
if screenshot_after and "screenshot" in result:
return (
f"Message submitted as {role} with screenshot",
get_chatbot_messages(),
json.dumps(tool_call_logs, indent=2),
result["screenshot"],
)
else:
# Return last screenshot if available
return (
f"Message submitted as {role}",
get_chatbot_messages(),
json.dumps(tool_call_logs, indent=2),
last_screenshot,
)
def create_gradio_ui():
with gr.Blocks() as app:
gr.Markdown("# Computer Interface Tool")
with gr.Row():
with gr.Column(scale=3):
with gr.Group():
# Main screenshot display
img = gr.Image(
type="pil", label="Current Screenshot", show_label=False, interactive=False
)
# Click type selection
click_type = gr.Radio(
["left_click", "right_click", "double_click", "move_cursor"],
label="Click Type",
value="left_click",
)
with gr.Row():
wait_btn = gr.Button("WAIT")
done_btn = gr.Button("DONE")
fail_btn = gr.Button("FAIL")
# Tabbed logs: Tool logs, Conversational logs, and Demonstrations
with gr.Tabs() as logs_tabs:
with gr.TabItem("Conversational Logs"):
chat_log = gr.Chatbot(
value=get_chatbot_messages,
label="Conversation",
elem_classes="chatbot",
height=400,
type="messages",
sanitize_html=True,
allow_tags=True,
)
with gr.TabItem("Function Logs"):
with gr.Group():
action_log = gr.JSON(label="Function Logs", every=0.2)
clear_log_btn = gr.Button("Clear Log")
with gr.TabItem("Save/Share Demonstrations"):
with gr.Row():
with gr.Column(scale=3):
# Dataset viewer - automatically loads sessions with selection column
dataset_viewer = gr.DataFrame(
label="All Sessions",
value=get_sessions_data,
show_search="filter",
max_height=300,
interactive=True, # Make it interactive for selection
)
# HuggingFace Upload UI
with gr.Group(visible=True):
gr.Markdown("Upload Sessions to HuggingFace")
with gr.Row():
hf_dataset_name = gr.Textbox(
label="HuggingFace Dataset Name",
placeholder="username/dataset-name",
info="Format: username/dataset-name",
)
hf_visibility = gr.Radio(
choices=["public", "private"],
label="Dataset Visibility",
value="private",
)
# Tag filtering with a single multi-select dropdown
filter_tags = gr.Dropdown(
label="Filter by tags (optional)",
choices=get_existing_tags()[0],
multiselect=True,
allow_custom_value=True,
info="When tags are selected, only demonstrations with those tags will be uploaded. Leave empty to upload all sessions.",
)
# Function to update button text based on selected tags
def get_upload_button_text(selected_tags=None):
if not selected_tags:
# Count all sessions
session_folders = glob.glob(
os.path.join(SESSION_DIR, "*")
)
count = len(session_folders) if session_folders else 0
return f"Upload {count} Sessions to HuggingFace"
else:
# Count sessions with matching tags
all_sessions = load_all_sessions()
if all_sessions is None:
return "Upload 0 Sessions to HuggingFace"
df = all_sessions.to_pandas()
if "tags" not in df.columns:
return "Upload 0 Sessions to HuggingFace"
# Filter by selected tags (sessions that have ANY of the selected tags)
matching_count = 0
for _, row in df.iterrows():
tags = row.get("tags", [])
if not len(tags):
continue
# Check if any of the selected tags are in this session's tags
if any(
tag in list(row["tags"])
for tag in selected_tags
):
matching_count += 1
return (
f"Upload {matching_count} Sessions to HuggingFace"
)
# Initial button text with all sessions
hf_upload_btn = gr.Button(get_upload_button_text())
# Update button text when filter changes
def update_button_text(selected_tags):
return get_upload_button_text(selected_tags)
# Connect filter changes to update button text
filter_tags.change(
update_button_text,
inputs=filter_tags,
outputs=hf_upload_btn,
)
hf_upload_status = gr.Textbox(label="Upload Status", value="")
with gr.Column(scale=1):
# Demo name with random name button
with gr.Group():
demo_name = gr.Textbox(
label="Demonstration Name",
value=generate_random_demo_name(),
placeholder="Enter a name for this demonstration",
)
random_name_btn = gr.Button("🎲", scale=1)
# Demo tags dropdown
demo_tags = gr.Dropdown(
label="Demonstration Tags",
choices=get_existing_tags()[0],
multiselect=True,
allow_custom_value=True,
info="Select existing tags or create new ones",
)
save_btn = gr.Button("Save Current Session")
save_status = gr.Textbox(label="Save Status", value="")
# Function to update the demo name with a new random name
def update_random_name():
return generate_random_demo_name()
# Connect random name button
random_name_btn.click(update_random_name, outputs=[demo_name])
with gr.Column(scale=1):
with gr.Accordion("Memory / Scratchpad", open=False):
with gr.Group():
memory_display = gr.Textbox(
label="Current Memory", value=get_memory(), lines=5
)
with gr.Row():
memory_submit_btn = gr.Button("Submit Memory")
memory_refine_btn = gr.Button("Refine")
memory_status = gr.Textbox(label="Status", value="")
with gr.Accordion("Tasks", open=True):
# Add current task display and controls
with gr.Group():
current_task = gr.Textbox(
label="Current Task", value=TASK_EXAMPLES[0]["task"], interactive=True
)
with gr.Row():
randomize_task_btn = gr.Button("🎲 Randomize Task")
run_setup_btn = gr.Button("⚙️ Run Task Setup")
# Setup status textbox
setup_status = gr.Textbox(label="Setup Status", value="")
with gr.Group():
with gr.Accordion("Computer Configuration", open=False):
with gr.Row():
os_choice = gr.Radio(
label="OS",
choices=["macOS", "Ubuntu", "Windows"],
value="macOS",
)
# Provider selection radio
provider_choice = gr.Radio(
label="Provider",
choices=["lume", "self", "cloud", "winsandbox"],
value="lume",
info="'lume' uses a VM, 'self' uses the host computer server, 'cloud' uses a cloud container",
)
# Container name field for cloud provider (initially hidden)
container_name = gr.Textbox(
label="Container Name",
placeholder="Enter your container name",
visible=False,
info="Get your container from [cua.ai](https://cua.ai/)",
)
# Check if CUA_API_KEY is set in environment
has_cua_key = os.environ.get("CUA_API_KEY") is not None
# API key field for cloud provider (visible only if no env key and cloud selected)
api_key_field = gr.Textbox(
label="CUA API Key",
placeholder="Enter your CUA API key",
type="password",
visible=False,
info="Required for cloud provider. Set CUA_API_KEY environment variable to hide this field.",
)
# App filtering dropdown for app-use experiment
app_filter = gr.Dropdown(
label="Filter by apps (App-Use)",
multiselect=True,
allow_custom_value=True,
info="When apps are selected, the computer will focus on those apps using the app-use experiment",
)
# Function to show/hide container name and API key fields based on provider selection
def update_cloud_fields_visibility(provider):
show_container = provider == "cloud"
show_api_key = provider == "cloud" and not has_cua_key
return (
gr.update(visible=show_container),
gr.update(visible=show_api_key),
)
# Connect provider choice to field visibility
provider_choice.change(
update_cloud_fields_visibility,
inputs=provider_choice,
outputs=[container_name, api_key_field],
)
start_btn = gr.Button("Initialize Computer")
with gr.Group():
input_text = gr.Textbox(label="Type Text")
with gr.Row():
press_enter_checkbox = gr.Checkbox(label="Press Enter", value=False)
submit_text_btn = gr.Button("Submit Text")
text_refine_btn = gr.Button("Refine")
with gr.Group():
hotkey_keys = gr.Dropdown(
choices=VALID_KEYS,
label="Select Keys",
multiselect=True,
show_label=False,
allow_custom_value=True,
info="Select one or more keys to send as a hotkey",
)
hotkey_btn = gr.Button("Send Hotkey(s)")
with gr.Accordion("Scrolling", open=False):
with gr.Group():
scroll_clicks = gr.Number(
label="Number of Clicks", value=1, minimum=1, step=1
)
with gr.Row():
scroll_up_btn = gr.Button("Scroll Up")
scroll_down_btn = gr.Button("Scroll Down")
with gr.Accordion("Reasoning for Last Action", open=False):
with gr.Group():
last_action_display = gr.Textbox(
label="Last Action", value=get_last_action_display(), interactive=False
)
reasoning_text = gr.Textbox(
label="What was your thought process behind this action?",
placeholder="Enter your reasoning here...",
lines=3,
)
erroneous_checkbox = gr.Checkbox(
label="Mark this action as erroneous (sets weight to 0)", value=False
)
reasoning_submit_btn = gr.Button("Submit Reasoning")
reasoning_refine_btn = gr.Button("Refine")
reasoning_status = gr.Textbox(label="Status", value="")
with gr.Accordion("Conversation Messages", open=False):
message_role = gr.Radio(
["user", "assistant"], label="Message Role", value="user"
)
message_text = gr.Textbox(
label="Message Content", placeholder="Enter message here...", lines=3
)
screenshot_after_msg = gr.Checkbox(
label="Receive screenshot after message", value=False
)
message_submit_btn = gr.Button("Submit Message")
message_status = gr.Textbox(label="Status")
with gr.Accordion("Clipboard Operations", open=False):
clipboard_content = gr.Textbox(label="Clipboard Content")
get_clipboard_btn = gr.Button("Get Clipboard Content")
set_clipboard_text = gr.Textbox(label="Set Clipboard Text")
set_clipboard_btn = gr.Button("Set Clipboard")
clipboard_status = gr.Textbox(label="Status")
with gr.Accordion("Run Shell Commands", open=False):
command_input = gr.Textbox(label="Command to run", placeholder="ls -la")
run_command_btn = gr.Button("Run Command")
command_output = gr.Textbox(label="Command Output", lines=5)
shutdown_btn = gr.Button("Shutdown Computer")
# Handle save button
save_btn.click(
save_demonstration, inputs=[action_log, demo_name, demo_tags], outputs=[save_status]
)
# Function to refresh the dataset viewer
def refresh_dataset_viewer():
return get_sessions_data()
# Also update the dataset viewer when saving
save_btn.click(refresh_dataset_viewer, outputs=dataset_viewer)
# Also update the tags dropdown when saving
save_btn.click(get_existing_tags, outputs=[demo_tags, filter_tags])
# Handle HuggingFace upload button
hf_upload_btn.click(
upload_to_huggingface,
inputs=[hf_dataset_name, hf_visibility, filter_tags],
outputs=[hf_upload_status],
)
# Function to randomize task
def randomize_task():
task_dict = random.choice(TASK_EXAMPLES)
return task_dict["task"]
# Function to run task setup
async def run_task_setup(task_text):
global computer
# Check if computer is initialized
if computer is None:
return (
"Computer not initialized. Please initialize the computer first.",
img,
action_log,
)
# Find the task dict that matches the current task text
for task_dict in TASK_EXAMPLES:
if task_dict["task"] == task_text:
try:
# Run the setup function with the computer interface and return the result
setup_func = task_dict["setup"]
if setup_func:
await setup_func(computer)
# Send initial user message
_, _, logs_json, screenshot = await submit_message(
task_text, "user", screenshot_after=True
)
return f"Setup complete for: {task_text}", screenshot, logs_json
except Exception as e:
return f"Error during setup: {str(e)}", img, action_log
return "Task not found in examples", img, action_log
# Connect the randomize button to the function
randomize_task_btn.click(randomize_task, outputs=[current_task])
# Connect the setup button
run_setup_btn.click(
run_task_setup, inputs=[current_task], outputs=[setup_status, img, action_log]
)
# Event handlers
action_log.change(get_chatbot_messages, outputs=[chat_log])
img.select(handle_click, inputs=[img, click_type], outputs=[img, action_log])
start_btn.click(
handle_init_computer,
inputs=[os_choice, app_filter, provider_choice, container_name, api_key_field],
outputs=[img, action_log],
)
wait_btn.click(handle_wait, outputs=[img, action_log])
# DONE and FAIL buttons just do a placeholder action
async def handle_done():
output = await execute("computer", "done", {})
return output["screenshot"], json.dumps(tool_call_logs, indent=2)
async def handle_fail():
output = await execute("computer", "fail", {})
return output["screenshot"], json.dumps(tool_call_logs, indent=2)
done_btn.click(handle_done, outputs=[img, action_log])
fail_btn.click(handle_fail, outputs=[img, action_log])
# Handle hotkey button
async def handle_hotkey(selected_keys):
if not selected_keys or len(selected_keys) == 0:
return await handle_screenshot(), json.dumps(tool_call_logs, indent=2)
# When multiple keys are selected, the last one is the main key, the rest are modifiers
if len(selected_keys) > 1:
key = selected_keys[-1]
modifiers = selected_keys[:-1]
else:
# If only one key is selected, no modifiers
key = selected_keys[0]
modifiers = []
output = await execute("computer", "send_hotkey", {"keys": selected_keys})
return output["screenshot"], json.dumps(tool_call_logs, indent=2)
hotkey_btn.click(handle_hotkey, inputs=[hotkey_keys], outputs=[img, action_log])
# Define async handler for scrolling
async def handle_scroll(direction, num_clicks=1):
"""Scroll the page up or down"""
global computer
if computer is None:
return None, json.dumps(tool_call_logs, indent=2)
# Convert num_clicks to integer with validation
try:
num_clicks = int(num_clicks)
if num_clicks < 1:
num_clicks = 1
except (ValueError, TypeError):
num_clicks = 1
# Execute the scroll action
action = "scroll_up" if direction == "up" else "scroll_down"
result = await execute("computer", action, {"clicks": num_clicks})
return result["screenshot"], json.dumps(tool_call_logs, indent=2)
# Connect scroll buttons
scroll_up_btn.click(
handle_scroll, inputs=[gr.State("up"), scroll_clicks], outputs=[img, action_log]
)
scroll_down_btn.click(
handle_scroll, inputs=[gr.State("down"), scroll_clicks], outputs=[img, action_log]
)
submit_text_btn.click(
handle_type, inputs=[input_text, press_enter_checkbox], outputs=[img, action_log]
)
get_clipboard_btn.click(handle_copy, outputs=[clipboard_content, action_log])
set_clipboard_btn.click(
handle_set_clipboard, inputs=set_clipboard_text, outputs=[clipboard_status, action_log]
)
run_command_btn.click(
handle_run_command, inputs=command_input, outputs=[command_output, action_log]
)
shutdown_btn.click(handle_shutdown, outputs=[clipboard_status, action_log])
clear_log_btn.click(clear_log, outputs=action_log)
chat_log.clear(clear_log, outputs=action_log)
# Update last action display after each action
img.select(lambda *args: get_last_action_display(), outputs=last_action_display)
start_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
wait_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
done_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
fail_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
hotkey_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
submit_text_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
message_submit_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
# Handle reasoning submission
async def handle_reasoning_update(reasoning, is_erroneous):
status = await update_reasoning(reasoning, is_erroneous)
return status, json.dumps(tool_call_logs, indent=2)
reasoning_submit_btn.click(
handle_reasoning_update,
inputs=[reasoning_text, erroneous_checkbox],
outputs=[reasoning_status, action_log],
)
# Helper function for text refinement - used for all refine buttons
async def handle_text_refinement(
text_content, content_type="reasoning", task_text="", use_before=False
):
global last_screenshot, last_action, tool_call_logs, last_screenshot_before
screenshot = last_screenshot_before if use_before else last_screenshot
# Check if we have the necessary components
if not text_content.strip():
return f"No {content_type} text to refine", text_content
if screenshot is None:
return "No screenshot available for refinement", text_content
try:
# Convert the PIL image to base64 if available
screenshot_base64 = None
if screenshot:
with io.BytesIO() as buffer:
screenshot.save(buffer, format="PNG")
screenshot_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
# Set up the OpenAI client for refinement
# Try different API keys from environment in order of preference
api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OMNI_OPENAI_API_KEY")
if not api_key:
return "OpenAI API key not found in environment", text_content
from libs.agent.agent.providers.omni.clients.openai import OpenAIClient
# Create a client - use gpt-4 if available, fall back to 3.5-turbo
model = "gpt-4.1-2025-04-14"
client = OpenAIClient(
api_key=api_key,
model=model,
max_tokens=1024,
temperature=0.2, # Low temperature for more focused refinement
)
# Get the last 3 messages from the chat history
recent_messages = (
get_chatbot_messages(tool_call_logs)[-3:]
if len(get_chatbot_messages(tool_call_logs)) >= 3
else get_chatbot_messages(tool_call_logs)
)
# Format message history with titles when available
formatted_messages = []
for msg in recent_messages:
if msg.metadata and "title" in msg.metadata:
formatted_messages.append(
f"{msg.role} ({msg.metadata['title']}): {msg.content}"
)
else:
formatted_messages.append(f"{msg.role}: {msg.content}")
formatted_messages = [f"<message>{msg}</message>" for msg in formatted_messages]
# Create different prompts based on content type
if content_type == "reasoning":
message_prompt = f"""You are helping refine an explanation about why a specific computer UI action is about to be taken.
The screenshot below shows the state of the screen as I prepare to take this action.
TASK: <task_text>{task_text}</task_text>
ACTION I'M ABOUT TO TAKE:
<action_display>{get_last_action_display()}</action_display>
CURRENT EXPLANATION:
<reasoning_content>{text_content}</reasoning_content>
RECENT MESSAGES:
<recent_messages>{'\n'.join(formatted_messages)}</recent_messages>
Make this into a concise reasoning / self-reflection trace, using "I should/need to/let me/it seems/i see". This trace MUST demonstrate planning extensively before each function call, and reflect extensively on the outcomes of the previous function calls. DO NOT do this entire process by making function calls only, as this can impair your ability to solve the problem and think insightfully.
Provide ONLY the refined explanation text, with no additional commentary or markdown."""
elif content_type == "memory":
message_prompt = f"""You are helping refine memory/scratchpad content for an AI assistant.
The screenshot below shows the current state of the computer interface.
TASK: <task_text>{task_text}</task_text>
CURRENT MEMORY CONTENT:
<memory_content>{text_content}</memory_content>
RECENT MESSAGES:
<recent_messages>{'\n'.join(formatted_messages)}</recent_messages>
Refine this memory content to be more clear, organized, and useful for the assistant's task.
- Organize information into logical sections
- Prioritize key facts needed for the task
- Remove unnecessary or redundant information
- Make the format more readable with bullet points or other organizational elements if helpful
Provide ONLY the refined memory text, with no additional commentary or markdown."""
elif content_type == "text":
message_prompt = f"""You are helping refine text that will be typed into a computer interface.
The screenshot below shows the current state of the computer interface.
TASK: <task_text>{task_text}</task_text>
CURRENT TEXT TO TYPE:
<text_content>{text_content}</text_content>
RECENT MESSAGES:
<recent_messages>{'\n'.join(formatted_messages)}</recent_messages>
Refine this text to be more effective for the current context:
- Fix any spelling or grammar issues
- Improve clarity and conciseness
- Format appropriately for the context
- Optimize the text for the intended use
Provide ONLY the refined text, with no additional commentary or markdown."""
else:
message_prompt = f"""You are helping refine text content.
The screenshot below shows the current state of the computer interface.
CURRENT TEXT:
{text_content}
RECENT MESSAGES:
<recent_messages>{'\n'.join(formatted_messages)}</recent_messages>
Improve this text to be more clear, concise, and effective.
Provide ONLY the refined text, with no additional commentary or markdown."""
# Create messages with the screenshot
messages = []
# Add message with image if available
if screenshot_base64:
messages.append(
{
"role": "user",
"content": [
{"type": "text", "text": message_prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{screenshot_base64}"
},
},
],
}
)
else:
# Fallback if screenshot isn't available
messages.append({"role": "user", "content": message_prompt})
print(message_prompt)
# Make the API call
response = await client.run_interleaved(
messages=messages,
system="You are a helpful AI assistant that improves and refines text.",
)
# Extract the refined text from the response
if "choices" in response and len(response["choices"]) > 0:
refined_text = response["choices"][0]["message"]["content"]
return f"{content_type.capitalize()} refined successfully", refined_text
else:
return "Error: Unexpected API response format", text_content
except Exception as e:
return f"Error refining {content_type}: {str(e)}", text_content
# Define async wrapper functions for each refine button
async def handle_reasoning_refinement(reasoning, task):
return await handle_text_refinement(reasoning, "reasoning", task, use_before=True)
async def handle_memory_refinement(memory_text, task):
return await handle_text_refinement(memory_text, "memory", task)
async def handle_text_input_refinement(text, task):
return await handle_text_refinement(text, "text", task)
# Connect the refine buttons to the appropriate handlers
reasoning_refine_btn.click(
handle_reasoning_refinement,
inputs=[reasoning_text, current_task],
outputs=[reasoning_status, reasoning_text],
)
# Connect memory refine button
memory_refine_btn.click(
handle_memory_refinement,
inputs=[memory_display, current_task],
outputs=[memory_status, memory_display],
)
# Status element for type text section
with gr.Group():
type_text_status = gr.Textbox(label="Text Status", value="", visible=False)
# Connect text refine button
text_refine_btn.click(
handle_text_input_refinement,
inputs=[input_text, current_task],
outputs=[type_text_status, input_text],
)
# Handle memory submission
async def handle_memory_update(memory_text):
status = await handle_memory(memory_text)
return status, json.dumps(tool_call_logs, indent=2)
memory_submit_btn.click(
handle_memory_update, inputs=memory_display, outputs=[memory_status, action_log]
)
# Handle message submission
async def handle_message_submit(message_content, role, screenshot_after):
status, chat_messages, logs_json, screenshot = await submit_message(
message_content, role, screenshot_after
)
if screenshot:
return status, chat_messages, logs_json, screenshot
else:
return status, chat_messages, logs_json, last_screenshot
message_submit_btn.click(
handle_message_submit,
inputs=[message_text, message_role, screenshot_after_msg],
outputs=[message_status, chat_log, action_log, img],
)
return app
# Launch the app
if __name__ == "__main__":
app = create_gradio_ui()
app.launch()
```
--------------------------------------------------------------------------------
/libs/python/computer/computer/computer.py:
--------------------------------------------------------------------------------
```python
import asyncio
import io
import json
import logging
import os
import platform
import re
import time
import traceback
from functools import wraps
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
List,
Literal,
Optional,
TypeVar,
Union,
cast,
)
try:
from typing import ParamSpec
except Exception: # pragma: no cover
from typing_extensions import ParamSpec # type: ignore
P = ParamSpec("P")
R = TypeVar("R")
from core.telemetry import is_telemetry_enabled, record_event
from PIL import Image
from . import helpers
from .interface.factory import InterfaceFactory
from .logger import Logger, LogLevel
from .models import Computer as ComputerConfig
from .models import Display
from .tracing import ComputerTracing
from .tracing_wrapper import TracingInterfaceWrapper
SYSTEM_INFO = {
"os": platform.system().lower(),
"os_version": platform.release(),
"python_version": platform.python_version(),
}
# Import provider related modules
from .providers.base import VMProviderType
from .providers.factory import VMProviderFactory
OSType = Literal["macos", "linux", "windows"]
class Computer:
"""Computer is the main class for interacting with the computer."""
def create_desktop_from_apps(self, apps):
"""
Create a virtual desktop from a list of app names, returning a DioramaComputer
that proxies Diorama.Interface but uses diorama_cmds via the computer interface.
Args:
apps (list[str]): List of application names to include in the desktop.
Returns:
DioramaComputer: A proxy object with the Diorama interface, but using diorama_cmds.
"""
assert (
"app-use" in self.experiments
), "App Usage is an experimental feature. Enable it by passing experiments=['app-use'] to Computer()"
from .diorama_computer import DioramaComputer
return DioramaComputer(self, apps)
def __init__(
self,
display: Union[Display, Dict[str, int], str] = "1024x768",
memory: str = "8GB",
cpu: str = "4",
os_type: OSType = "macos",
name: str = "",
image: Optional[str] = None,
shared_directories: Optional[List[str]] = None,
use_host_computer_server: bool = False,
verbosity: Union[int, LogLevel] = logging.INFO,
telemetry_enabled: bool = True,
provider_type: Union[str, VMProviderType] = VMProviderType.LUME,
provider_port: Optional[int] = 7777,
noVNC_port: Optional[int] = 8006,
api_port: Optional[int] = None,
host: str = os.environ.get("PYLUME_HOST", "localhost"),
storage: Optional[str] = None,
ephemeral: bool = False,
api_key: Optional[str] = None,
experiments: Optional[List[str]] = None,
):
"""Initialize a new Computer instance.
Args:
display: The display configuration. Can be:
- A Display object
- A dict with 'width' and 'height'
- A string in format "WIDTHxHEIGHT" (e.g. "1920x1080")
Defaults to "1024x768"
memory: The VM memory allocation. Defaults to "8GB"
cpu: The VM CPU allocation. Defaults to "4"
os_type: The operating system type ('macos' or 'linux')
name: The VM name
image: The VM image name
shared_directories: Optional list of directory paths to share with the VM
use_host_computer_server: If True, target localhost instead of starting a VM
verbosity: Logging level (standard Python logging levels: logging.DEBUG, logging.INFO, etc.)
LogLevel enum values are still accepted for backward compatibility
telemetry_enabled: Whether to enable telemetry tracking. Defaults to True.
provider_type: The VM provider type to use (lume, qemu, cloud)
port: Optional port to use for the VM provider server
noVNC_port: Optional port for the noVNC web interface (Lumier provider)
host: Host to use for VM provider connections (e.g. "localhost", "host.docker.internal")
storage: Optional path for persistent VM storage (Lumier provider)
ephemeral: Whether to use ephemeral storage
api_key: Optional API key for cloud providers (defaults to CUA_API_KEY environment variable)
experiments: Optional list of experimental features to enable (e.g. ["app-use"])
"""
self.logger = Logger("computer", verbosity)
self.logger.info("Initializing Computer...")
# Fall back to environment variable for api_key if not provided
if api_key is None:
api_key = os.environ.get("CUA_API_KEY")
if not image:
if os_type == "macos":
image = "macos-sequoia-cua:latest"
elif os_type == "linux":
image = "trycua/cua-ubuntu:latest"
image = str(image)
# Store original parameters
self.image = image
self.provider_port = provider_port
self.noVNC_port = noVNC_port
self.api_port = api_port
self.host = host
self.os_type = os_type
self.provider_type = provider_type
self.ephemeral = ephemeral
self.api_key = api_key if self.provider_type == VMProviderType.CLOUD else None
# Set default API port if not specified
if self.api_port is None:
self.api_port = 8443 if self.api_key else 8000
self.experiments = experiments or []
if "app-use" in self.experiments:
assert self.os_type == "macos", "App use experiment is only supported on macOS"
# The default is currently to use non-ephemeral storage
if storage and ephemeral and storage != "ephemeral":
raise ValueError("Storage path and ephemeral flag cannot be used together")
# Windows Sandbox always uses ephemeral storage
if self.provider_type == VMProviderType.WINSANDBOX:
if not ephemeral and storage != None and storage != "ephemeral":
self.logger.warning(
"Windows Sandbox storage is always ephemeral. Setting ephemeral=True."
)
self.ephemeral = True
self.storage = "ephemeral"
else:
self.storage = "ephemeral" if ephemeral else storage
# For Lumier provider, store the first shared directory path to use
# for VM file sharing
self.shared_path = None
if shared_directories and len(shared_directories) > 0:
self.shared_path = shared_directories[0]
self.logger.info(
f"Using first shared directory for VM file sharing: {self.shared_path}"
)
# Store telemetry preference
self._telemetry_enabled = telemetry_enabled
# Set initialization flag
self._initialized = False
self._running = False
# Configure root logger
self.verbosity = verbosity
self.logger = Logger("computer", verbosity)
# Configure component loggers with proper hierarchy
self.vm_logger = Logger("computer.vm", verbosity)
self.interface_logger = Logger("computer.interface", verbosity)
if not use_host_computer_server:
if ":" not in image:
image = f"{image}:latest"
if not name:
# Normalize the name to be used for the VM
name = image.replace(":", "_")
# Remove any forward slashes
name = name.replace("/", "_")
# Convert display parameter to Display object
if isinstance(display, str):
# Parse string format "WIDTHxHEIGHT"
match = re.match(r"(\d+)x(\d+)", display)
if not match:
raise ValueError(
"Display string must be in format 'WIDTHxHEIGHT' (e.g. '1024x768')"
)
width, height = map(int, match.groups())
display_config = Display(width=width, height=height)
elif isinstance(display, dict):
display_config = Display(**display)
else:
display_config = display
self.config = ComputerConfig(
image=image.split(":")[0],
tag=image.split(":")[1],
name=name,
display=display_config,
memory=memory,
cpu=cpu,
)
# Initialize VM provider but don't start it yet - we'll do that in run()
self.config.vm_provider = None # Will be initialized in run()
# Store shared directories config
self.shared_directories = shared_directories or []
# Placeholder for VM provider context manager
self._provider_context = None
# Initialize with proper typing - None at first, will be set in run()
self._interface = None
self._original_interface = None # Keep reference to original interface
self._tracing_wrapper = None # Tracing wrapper for interface
self.use_host_computer_server = use_host_computer_server
# Initialize tracing
self._tracing = ComputerTracing(self)
# Record initialization in telemetry (if enabled)
if telemetry_enabled and is_telemetry_enabled():
record_event("computer_initialized", SYSTEM_INFO)
else:
self.logger.debug("Telemetry disabled - skipping initialization tracking")
async def __aenter__(self):
"""Start the computer."""
await self.run()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Stop the computer."""
await self.disconnect()
def __enter__(self):
"""Start the computer."""
# Run the event loop to call the async enter method
loop = asyncio.get_event_loop()
loop.run_until_complete(self.__aenter__())
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Stop the computer."""
loop = asyncio.get_event_loop()
loop.run_until_complete(self.__aexit__(exc_type, exc_val, exc_tb))
async def run(self) -> Optional[str]:
"""Initialize the VM and computer interface."""
if TYPE_CHECKING:
from .interface.base import BaseComputerInterface
# If already initialized, just log and return
if hasattr(self, "_initialized") and self._initialized:
self.logger.info("Computer already initialized, skipping initialization")
return
self.logger.info("Starting computer...")
start_time = time.time()
try:
# If using host computer server
if self.use_host_computer_server:
self.logger.info("Using host computer server")
# Set ip_address for host computer server mode
ip_address = "localhost"
# Create the interface with explicit type annotation
from .interface.base import BaseComputerInterface
interface = cast(
BaseComputerInterface,
InterfaceFactory.create_interface_for_os(
os=self.os_type, ip_address=ip_address, api_port=self.api_port # type: ignore[arg-type]
),
)
self._interface = interface
self._original_interface = interface
self.logger.info("Waiting for host computer server to be ready...")
await self._interface.wait_for_ready()
self.logger.info("Host computer server ready")
else:
# Start or connect to VM
self.logger.info(f"Starting VM: {self.image}")
if not self._provider_context:
try:
provider_type_name = (
self.provider_type.name
if isinstance(self.provider_type, VMProviderType)
else self.provider_type
)
self.logger.verbose(
f"Initializing {provider_type_name} provider context..."
)
# Explicitly set provider parameters
storage = "ephemeral" if self.ephemeral else self.storage
verbose = self.verbosity >= LogLevel.DEBUG
ephemeral = self.ephemeral
port = self.provider_port if self.provider_port is not None else 7777
host = self.host if self.host else "localhost"
image = self.image
shared_path = self.shared_path
noVNC_port = self.noVNC_port
# Create VM provider instance with explicit parameters
try:
if self.provider_type == VMProviderType.LUMIER:
self.logger.info(f"Using VM image for Lumier provider: {image}")
if shared_path:
self.logger.info(
f"Using shared path for Lumier provider: {shared_path}"
)
if noVNC_port:
self.logger.info(
f"Using noVNC port for Lumier provider: {noVNC_port}"
)
self.config.vm_provider = VMProviderFactory.create_provider(
self.provider_type,
port=port,
host=host,
storage=storage,
shared_path=shared_path,
image=image,
verbose=verbose,
ephemeral=ephemeral,
noVNC_port=noVNC_port,
)
elif self.provider_type == VMProviderType.LUME:
self.config.vm_provider = VMProviderFactory.create_provider(
self.provider_type,
port=port,
host=host,
storage=storage,
verbose=verbose,
ephemeral=ephemeral,
)
elif self.provider_type == VMProviderType.CLOUD:
self.config.vm_provider = VMProviderFactory.create_provider(
self.provider_type,
api_key=self.api_key,
verbose=verbose,
)
elif self.provider_type == VMProviderType.WINSANDBOX:
self.config.vm_provider = VMProviderFactory.create_provider(
self.provider_type,
port=port,
host=host,
storage=storage,
verbose=verbose,
ephemeral=ephemeral,
noVNC_port=noVNC_port,
)
elif self.provider_type == VMProviderType.DOCKER:
self.config.vm_provider = VMProviderFactory.create_provider(
self.provider_type,
port=port,
host=host,
storage=storage,
shared_path=shared_path,
image=image or "trycua/cua-ubuntu:latest",
verbose=verbose,
ephemeral=ephemeral,
noVNC_port=noVNC_port,
api_port=self.api_port,
)
else:
raise ValueError(f"Unsupported provider type: {self.provider_type}")
self._provider_context = await self.config.vm_provider.__aenter__()
self.logger.verbose("VM provider context initialized successfully")
except ImportError as ie:
self.logger.error(f"Failed to import provider dependencies: {ie}")
if str(ie).find("lume") >= 0 and str(ie).find("lumier") < 0:
self.logger.error(
"Please install with: pip install cua-computer[lume]"
)
elif str(ie).find("lumier") >= 0 or str(ie).find("docker") >= 0:
self.logger.error(
"Please install with: pip install cua-computer[lumier] and make sure Docker is installed"
)
elif str(ie).find("cloud") >= 0:
self.logger.error(
"Please install with: pip install cua-computer[cloud]"
)
raise
except Exception as e:
self.logger.error(f"Failed to initialize provider context: {e}")
raise RuntimeError(f"Failed to initialize VM provider: {e}")
# Check if VM exists or create it
is_running = False
try:
if self.config.vm_provider is None:
raise RuntimeError(f"VM provider not initialized for {self.config.name}")
vm = await self.config.vm_provider.get_vm(self.config.name)
self.logger.verbose(f"Found existing VM: {self.config.name}")
is_running = vm.get("status") == "running"
except Exception as e:
self.logger.error(f"VM not found: {self.config.name}")
self.logger.error(f"Error: {e}")
raise RuntimeError(f"VM {self.config.name} could not be found or created.")
# Start the VM if it's not running
if not is_running:
self.logger.info(f"VM {self.config.name} is not running, starting it...")
# Convert paths to dictionary format for shared directories
shared_dirs = []
for path in self.shared_directories:
self.logger.verbose(f"Adding shared directory: {path}")
path = os.path.abspath(os.path.expanduser(path))
if os.path.exists(path):
# Add path in format expected by Lume API
shared_dirs.append({"hostPath": path, "readOnly": False})
else:
self.logger.warning(f"Shared directory does not exist: {path}")
# Prepare run options to pass to the provider
run_opts = {}
# Add display information if available
if self.config.display is not None:
display_info = {
"width": self.config.display.width,
"height": self.config.display.height,
}
# Check if scale_factor exists before adding it
if hasattr(self.config.display, "scale_factor"):
display_info["scale_factor"] = self.config.display.scale_factor
run_opts["display"] = display_info
# Add shared directories if available
if self.shared_directories:
run_opts["shared_directories"] = shared_dirs.copy()
# Run the VM with the provider
try:
if self.config.vm_provider is None:
raise RuntimeError(
f"VM provider not initialized for {self.config.name}"
)
# Use the complete run_opts we prepared earlier
# Handle ephemeral storage for run_vm method too
storage_param = "ephemeral" if self.ephemeral else self.storage
# Log the image being used
self.logger.info(f"Running VM using image: {self.image}")
# Call provider.run_vm with explicit image parameter
response = await self.config.vm_provider.run_vm(
image=self.image,
name=self.config.name,
run_opts=run_opts,
storage=storage_param,
)
self.logger.info(f"VM run response: {response if response else 'None'}")
except Exception as run_error:
self.logger.error(f"Failed to run VM: {run_error}")
raise RuntimeError(f"Failed to start VM: {run_error}")
# Wait for VM to be ready with a valid IP address
self.logger.info("Waiting for VM to be ready with a valid IP address...")
try:
if self.provider_type == VMProviderType.LUMIER:
max_retries = 60 # Increased for Lumier VM startup which takes longer
retry_delay = 3 # 3 seconds between retries for Lumier
else:
max_retries = 30 # Default for other providers
retry_delay = 2 # 2 seconds between retries
self.logger.info(
f"Waiting up to {max_retries * retry_delay} seconds for VM to be ready..."
)
ip = await self.get_ip(max_retries=max_retries, retry_delay=retry_delay)
# If we get here, we have a valid IP
self.logger.info(f"VM is ready with IP: {ip}")
ip_address = ip
except TimeoutError as timeout_error:
self.logger.error(str(timeout_error))
raise RuntimeError(f"VM startup timed out: {timeout_error}")
except Exception as wait_error:
self.logger.error(f"Error waiting for VM: {wait_error}")
raise RuntimeError(f"VM failed to become ready: {wait_error}")
except Exception as e:
self.logger.error(f"Failed to initialize computer: {e}")
self.logger.error(traceback.format_exc())
raise RuntimeError(f"Failed to initialize computer: {e}")
try:
# Verify we have a valid IP before initializing the interface
if not ip_address or ip_address == "unknown" or ip_address == "0.0.0.0":
raise RuntimeError(
f"Cannot initialize interface - invalid IP address: {ip_address}"
)
# Initialize the interface using the factory with the specified OS
self.logger.info(f"Initializing interface for {self.os_type} at {ip_address}")
from .interface.base import BaseComputerInterface
# Pass authentication credentials if using cloud provider
if self.provider_type == VMProviderType.CLOUD and self.api_key and self.config.name:
interface = cast(
BaseComputerInterface,
InterfaceFactory.create_interface_for_os(
os=self.os_type,
ip_address=ip_address,
api_key=self.api_key,
vm_name=self.config.name,
api_port=self.api_port,
),
)
else:
interface = cast(
BaseComputerInterface,
InterfaceFactory.create_interface_for_os(
os=self.os_type, ip_address=ip_address, api_port=self.api_port
),
)
self._interface = interface
self._original_interface = interface
# Wait for the WebSocket interface to be ready
self.logger.info("Connecting to WebSocket interface...")
try:
# Use a single timeout for the entire connection process
# The VM should already be ready at this point, so we're just establishing the connection
await self._interface.wait_for_ready(timeout=30)
self.logger.info("Sandbox interface connected successfully")
except TimeoutError as e:
port = getattr(self._interface, "_api_port", 8000) # Default to 8000 if not set
self.logger.error(f"Failed to connect to sandbox interface at {ip_address}:{port}")
raise TimeoutError(
f"Could not connect to sandbox interface at {ip_address}:{port}: {str(e)}"
)
# Create an event to keep the VM running in background if needed
if not self.use_host_computer_server:
self._stop_event = asyncio.Event()
self._keep_alive_task = asyncio.create_task(self._stop_event.wait())
self.logger.info("Computer is ready")
# Set the initialization flag and clear the initializing flag
self._initialized = True
# Set this instance as the default computer for remote decorators
helpers.set_default_computer(self)
self.logger.info("Computer successfully initialized")
except Exception as e:
raise
finally:
# Log initialization time for performance monitoring
duration_ms = (time.time() - start_time) * 1000
self.logger.debug(f"Computer initialization took {duration_ms:.2f}ms")
return
async def disconnect(self) -> None:
"""Disconnect from the computer's WebSocket interface."""
if self._interface:
self._interface.close()
async def stop(self) -> None:
"""Disconnect from the computer's WebSocket interface and stop the computer."""
start_time = time.time()
try:
self.logger.info("Stopping Computer...")
# In VM mode, first explicitly stop the VM, then exit the provider context
if (
not self.use_host_computer_server
and self._provider_context
and self.config.vm_provider is not None
):
try:
self.logger.info(f"Stopping VM {self.config.name}...")
await self.config.vm_provider.stop_vm(
name=self.config.name,
storage=self.storage, # Pass storage explicitly for clarity
)
except Exception as e:
self.logger.error(f"Error stopping VM: {e}")
self.logger.verbose("Closing VM provider context...")
await self.config.vm_provider.__aexit__(None, None, None)
self._provider_context = None
await self.disconnect()
self.logger.info("Computer stopped")
except Exception as e:
self.logger.debug(
f"Error during cleanup: {e}"
) # Log as debug since this might be expected
finally:
# Log stop time for performance monitoring
duration_ms = (time.time() - start_time) * 1000
self.logger.debug(f"Computer stop process took {duration_ms:.2f}ms")
return
async def start(self) -> None:
"""Start the computer."""
await self.run()
async def restart(self) -> None:
"""Restart the computer.
If using a VM provider that supports restart, this will issue a restart
without tearing down the provider context, then reconnect the interface.
Falls back to stop()+run() when a provider restart is not available.
"""
# Host computer server: just disconnect and run again
if self.use_host_computer_server:
try:
await self.disconnect()
finally:
await self.run()
return
# If no VM provider context yet, fall back to full run
if not getattr(self, "_provider_context", None) or self.config.vm_provider is None:
self.logger.info("No provider context active; performing full restart via run()")
await self.run()
return
# Gracefully close current interface connection if present
if self._interface:
try:
self._interface.close()
except Exception as e:
self.logger.debug(f"Error closing interface prior to restart: {e}")
# Attempt provider-level restart if implemented
try:
storage_param = "ephemeral" if self.ephemeral else self.storage
if hasattr(self.config.vm_provider, "restart_vm"):
self.logger.info(f"Restarting VM {self.config.name} via provider...")
await self.config.vm_provider.restart_vm(
name=self.config.name, storage=storage_param
)
else:
# Fallback: stop then start without leaving provider context
self.logger.info(
f"Provider has no restart_vm; performing stop+start for {self.config.name}..."
)
await self.config.vm_provider.stop_vm(name=self.config.name, storage=storage_param)
await self.config.vm_provider.run_vm(
image=self.image, name=self.config.name, run_opts={}, storage=storage_param
)
except Exception as e:
self.logger.error(f"Failed to restart VM via provider: {e}")
# As a last resort, do a full stop (with provider context exit) and run
try:
await self.stop()
finally:
await self.run()
return
# Wait for VM to be ready and reconnect interface
try:
self.logger.info("Waiting for VM to be ready after restart...")
if self.provider_type == VMProviderType.LUMIER:
max_retries = 60
retry_delay = 3
else:
max_retries = 30
retry_delay = 2
ip_address = await self.get_ip(max_retries=max_retries, retry_delay=retry_delay)
self.logger.info(f"Re-initializing interface for {self.os_type} at {ip_address}")
from .interface.base import BaseComputerInterface
if self.provider_type == VMProviderType.CLOUD and self.api_key and self.config.name:
self._interface = cast(
BaseComputerInterface,
InterfaceFactory.create_interface_for_os(
os=self.os_type,
ip_address=ip_address,
api_key=self.api_key,
vm_name=self.config.name,
api_port=self.api_port,
),
)
else:
self._interface = cast(
BaseComputerInterface,
InterfaceFactory.create_interface_for_os(
os=self.os_type,
ip_address=ip_address,
api_port=self.api_port,
),
)
self.logger.info("Connecting to WebSocket interface after restart...")
await self._interface.wait_for_ready(timeout=30)
self.logger.info("Computer reconnected and ready after restart")
except Exception as e:
self.logger.error(f"Failed to reconnect after restart: {e}")
# Try a full reset if reconnection failed
try:
await self.stop()
finally:
await self.run()
# @property
async def get_ip(self, max_retries: int = 15, retry_delay: int = 3) -> str:
"""Get the IP address of the VM or localhost if using host computer server.
This method delegates to the provider's get_ip method, which waits indefinitely
until the VM has a valid IP address.
Args:
max_retries: Unused parameter, kept for backward compatibility
retry_delay: Delay between retries in seconds (default: 2)
Returns:
IP address of the VM or localhost if using host computer server
"""
# For host computer server, always return localhost immediately
if self.use_host_computer_server:
return "127.0.0.1"
# Get IP from the provider - each provider implements its own waiting logic
if self.config.vm_provider is None:
raise RuntimeError("VM provider is not initialized")
# Log that we're waiting for the IP
self.logger.info(f"Waiting for VM {self.config.name} to get an IP address...")
# Call the provider's get_ip method which will wait indefinitely
storage_param = "ephemeral" if self.ephemeral else self.storage
# Log the image being used
self.logger.info(f"Running VM using image: {self.image}")
# Call provider.get_ip with explicit image parameter
ip = await self.config.vm_provider.get_ip(
name=self.config.name, storage=storage_param, retry_delay=retry_delay
)
# Log success
self.logger.info(f"VM {self.config.name} has IP address: {ip}")
return ip
async def wait_vm_ready(self) -> Optional[Dict[str, Any]]:
"""Wait for VM to be ready with an IP address.
Returns:
VM status information or None if using host computer server.
"""
if self.use_host_computer_server:
return None
timeout = 600 # 10 minutes timeout (increased from 4 minutes)
interval = 2.0 # 2 seconds between checks (increased to reduce API load)
start_time = time.time()
last_status = None
attempts = 0
self.logger.info(f"Waiting for VM {self.config.name} to be ready (timeout: {timeout}s)...")
while time.time() - start_time < timeout:
attempts += 1
elapsed = time.time() - start_time
try:
# Keep polling for VM info
if self.config.vm_provider is None:
self.logger.error("VM provider is not initialized")
vm = None
else:
vm = await self.config.vm_provider.get_vm(self.config.name)
# Log full VM properties for debugging (every 30 attempts)
if attempts % 30 == 0:
self.logger.info(
f"VM properties at attempt {attempts}: {vars(vm) if vm else 'None'}"
)
# Get current status for logging
current_status = getattr(vm, "status", None) if vm else None
if current_status != last_status:
self.logger.info(
f"VM status changed to: {current_status} (after {elapsed:.1f}s)"
)
last_status = current_status
# Check for IP address - ensure it's not None or empty
ip = getattr(vm, "ip_address", None) if vm else None
if ip and ip.strip(): # Check for non-empty string
self.logger.info(
f"VM {self.config.name} got IP address: {ip} (after {elapsed:.1f}s)"
)
return vm
if attempts % 10 == 0: # Log every 10 attempts to avoid flooding
self.logger.info(
f"Still waiting for VM IP address... (elapsed: {elapsed:.1f}s)"
)
else:
self.logger.debug(
f"Waiting for VM IP address... Current IP: {ip}, Status: {current_status}"
)
except Exception as e:
self.logger.warning(f"Error checking VM status (attempt {attempts}): {str(e)}")
# If we've been trying for a while and still getting errors, log more details
if elapsed > 60: # After 1 minute of errors, log more details
self.logger.error(f"Persistent error getting VM status: {str(e)}")
self.logger.info("Trying to get VM list for debugging...")
try:
if self.config.vm_provider is not None:
vms = await self.config.vm_provider.list_vms()
self.logger.info(
f"Available VMs: {[getattr(vm, 'name', None) for vm in vms if hasattr(vm, 'name')]}"
)
except Exception as list_error:
self.logger.error(f"Failed to list VMs: {str(list_error)}")
await asyncio.sleep(interval)
# If we get here, we've timed out
elapsed = time.time() - start_time
self.logger.error(f"VM {self.config.name} not ready after {elapsed:.1f} seconds")
# Try to get final VM status for debugging
try:
if self.config.vm_provider is not None:
vm = await self.config.vm_provider.get_vm(self.config.name)
# VM data is returned as a dictionary from the Lumier provider
status = vm.get("status", "unknown") if vm else "unknown"
ip = vm.get("ip_address") if vm else None
else:
status = "unknown"
ip = None
self.logger.error(f"Final VM status: {status}, IP: {ip}")
except Exception as e:
self.logger.error(f"Failed to get final VM status: {str(e)}")
raise TimeoutError(
f"VM {self.config.name} not ready after {elapsed:.1f} seconds - IP address not assigned"
)
async def update(self, cpu: Optional[int] = None, memory: Optional[str] = None):
"""Update VM settings."""
self.logger.info(
f"Updating VM settings: CPU={cpu or self.config.cpu}, Memory={memory or self.config.memory}"
)
update_opts = {"cpu": cpu or int(self.config.cpu), "memory": memory or self.config.memory}
if self.config.vm_provider is not None:
await self.config.vm_provider.update_vm(
name=self.config.name,
update_opts=update_opts,
storage=self.storage, # Pass storage explicitly for clarity
)
else:
raise RuntimeError("VM provider not initialized")
def get_screenshot_size(self, screenshot: bytes) -> Dict[str, int]:
"""Get the dimensions of a screenshot.
Args:
screenshot: The screenshot bytes
Returns:
Dict[str, int]: Dictionary containing 'width' and 'height' of the image
"""
image = Image.open(io.BytesIO(screenshot))
width, height = image.size
return {"width": width, "height": height}
@property
def interface(self):
"""Get the computer interface for interacting with the VM.
Returns:
The computer interface (wrapped with tracing if tracing is active)
"""
if not hasattr(self, "_interface") or self._interface is None:
error_msg = "Computer interface not initialized. Call run() first."
self.logger.error(error_msg)
self.logger.error(
"Make sure to call await computer.run() before using any interface methods."
)
raise RuntimeError(error_msg)
# Return tracing wrapper if tracing is active and we have an original interface
if (
self._tracing.is_tracing
and hasattr(self, "_original_interface")
and self._original_interface is not None
):
# Create wrapper if it doesn't exist or if the original interface changed
if (
not hasattr(self, "_tracing_wrapper")
or self._tracing_wrapper is None
or self._tracing_wrapper._original_interface != self._original_interface
):
self._tracing_wrapper = TracingInterfaceWrapper(
self._original_interface, self._tracing
)
return self._tracing_wrapper
return self._interface
@property
def tracing(self) -> ComputerTracing:
"""Get the computer tracing instance for recording sessions.
Returns:
ComputerTracing: The tracing instance
"""
return self._tracing
@property
def telemetry_enabled(self) -> bool:
"""Check if telemetry is enabled for this computer instance.
Returns:
bool: True if telemetry is enabled, False otherwise
"""
return self._telemetry_enabled
async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert normalized coordinates to screen coordinates.
Args:
x: X coordinate between 0 and 1
y: Y coordinate between 0 and 1
Returns:
tuple[float, float]: Screen coordinates (x, y)
"""
return await self.interface.to_screen_coordinates(x, y)
async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screen coordinates to screenshot coordinates.
Args:
x: X coordinate in screen space
y: Y coordinate in screen space
Returns:
tuple[float, float]: (x, y) coordinates in screenshot space
"""
return await self.interface.to_screenshot_coordinates(x, y)
async def playwright_exec(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""
Execute a Playwright browser command.
Args:
command: The browser command to execute (visit_url, click, type, scroll, web_search)
params: Command parameters
Returns:
Dict containing the command result
Examples:
# Navigate to a URL
await computer.playwright_exec("visit_url", {"url": "https://example.com"})
# Click at coordinates
await computer.playwright_exec("click", {"x": 100, "y": 200})
# Type text
await computer.playwright_exec("type", {"text": "Hello, world!"})
# Scroll
await computer.playwright_exec("scroll", {"delta_x": 0, "delta_y": -100})
# Web search
await computer.playwright_exec("web_search", {"query": "computer use agent"})
"""
return await self.interface.playwright_exec(command, params)
# Add virtual environment management functions to computer interface
async def venv_install(self, venv_name: str, requirements: list[str]):
"""Install packages in a virtual environment.
Args:
venv_name: Name of the virtual environment
requirements: List of package requirements to install
Returns:
Tuple of (stdout, stderr) from the installation command
"""
requirements = requirements or []
# Windows vs POSIX handling
if self.os_type == "windows":
# Use %USERPROFILE% for home directory and cmd.exe semantics
venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}"
ensure_dir_cmd = 'if not exist "%USERPROFILE%\\.venvs" mkdir "%USERPROFILE%\\.venvs"'
create_cmd = f'if not exist "{venv_path}" python -m venv "{venv_path}"'
requirements_str = " ".join(requirements)
# Activate via activate.bat and install
install_cmd = (
f'call "{venv_path}\\Scripts\\activate.bat" && pip install {requirements_str}'
if requirements_str
else "echo No requirements to install"
)
await self.interface.run_command(ensure_dir_cmd)
await self.interface.run_command(create_cmd)
return await self.interface.run_command(install_cmd)
else:
# POSIX (macOS/Linux)
venv_path = f"$HOME/.venvs/{venv_name}"
create_cmd = f'mkdir -p "$HOME/.venvs" && python -m venv "{venv_path}"'
# Check if venv exists, if not create it
check_cmd = f'test -d "{venv_path}" || ({create_cmd})'
_ = await self.interface.run_command(check_cmd)
# Install packages
requirements_str = " ".join(requirements)
install_cmd = (
f'. "{venv_path}/bin/activate" && pip install {requirements_str}'
if requirements_str
else "echo No requirements to install"
)
return await self.interface.run_command(install_cmd)
async def pip_install(self, requirements: list[str]):
"""Install packages using the system Python/pip (no venv).
Args:
requirements: List of package requirements to install globally/user site.
Returns:
Tuple of (stdout, stderr) from the installation command
"""
requirements = requirements or []
if not requirements:
return await self.interface.run_command("echo No requirements to install")
# Use python -m pip for cross-platform consistency
reqs = " ".join(requirements)
install_cmd = f"python -m pip install {reqs}"
return await self.interface.run_command(install_cmd)
async def venv_cmd(self, venv_name: str, command: str):
"""Execute a shell command in a virtual environment.
Args:
venv_name: Name of the virtual environment
command: Shell command to execute in the virtual environment
Returns:
Tuple of (stdout, stderr) from the command execution
"""
if self.os_type == "windows":
# Windows (cmd.exe)
venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}"
# Check existence and signal if missing
check_cmd = f'if not exist "{venv_path}" (echo VENV_NOT_FOUND) else (echo VENV_FOUND)'
result = await self.interface.run_command(check_cmd)
if "VENV_NOT_FOUND" in getattr(result, "stdout", ""):
# Auto-create the venv with no requirements
await self.venv_install(venv_name, [])
# Activate and run the command
full_command = f'call "{venv_path}\\Scripts\\activate.bat" && {command}'
return await self.interface.run_command(full_command)
else:
# POSIX (macOS/Linux)
venv_path = f"$HOME/.venvs/{venv_name}"
# Check if virtual environment exists
check_cmd = f'test -d "{venv_path}"'
result = await self.interface.run_command(check_cmd)
if result.stderr or "test:" in result.stdout: # venv doesn't exist
# Auto-create the venv with no requirements
await self.venv_install(venv_name, [])
# Activate virtual environment and run command
full_command = f'. "{venv_path}/bin/activate" && {command}'
return await self.interface.run_command(full_command)
async def venv_exec(self, venv_name: str, python_func, *args, **kwargs):
"""Execute Python function in a virtual environment using source code extraction.
Args:
venv_name: Name of the virtual environment
python_func: A callable function to execute
*args: Positional arguments to pass to the function
**kwargs: Keyword arguments to pass to the function
Returns:
The result of the function execution, or raises any exception that occurred
"""
import base64
import inspect
import json
import textwrap
try:
# Get function source code using inspect.getsource
source = inspect.getsource(python_func)
# Remove common leading whitespace (dedent)
func_source = textwrap.dedent(source).strip()
# Remove decorators
while func_source.lstrip().startswith("@"):
func_source = func_source.split("\n", 1)[1].strip()
# Get function name for execution
func_name = python_func.__name__
# Serialize args and kwargs as JSON (safer than dill for cross-version compatibility)
args_json = json.dumps(args, default=str)
kwargs_json = json.dumps(kwargs, default=str)
except OSError as e:
raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
except Exception as e:
raise Exception(f"Failed to reconstruct function source: {e}")
# Create Python code that will define and execute the function
args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")
python_code = f'''
import json
import traceback
import base64
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from base64 JSON
_args_b64 = """{args_b64}"""
_kwargs_b64 = """{kwargs_b64}"""
args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
# Execute the function
result = {func_name}(*args, **kwargs)
# Create success output payload
output_payload = {{
"success": True,
"result": result,
"error": None
}}
except Exception as e:
# Create error output payload
output_payload = {{
"success": False,
"result": None,
"error": {{
"type": type(e).__name__,
"message": str(e),
"traceback": traceback.format_exc()
}}
}}
# Serialize the output payload as JSON
import json
output_json = json.dumps(output_payload, default=str)
# Print the JSON output with markers
print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>")
'''
# Encode the Python code in base64 to avoid shell escaping issues
encoded_code = base64.b64encode(python_code.encode("utf-8")).decode("ascii")
# Execute the Python code in the virtual environment
python_command = (
f"python -c \"import base64; exec(base64.b64decode('{encoded_code}').decode('utf-8'))\""
)
result = await self.venv_cmd(venv_name, python_command)
# Parse the output to extract the payload
start_marker = "<<<VENV_EXEC_START>>>"
end_marker = "<<<VENV_EXEC_END>>>"
# Print original stdout
print(result.stdout[: result.stdout.find(start_marker)])
if start_marker in result.stdout and end_marker in result.stdout:
start_idx = result.stdout.find(start_marker) + len(start_marker)
end_idx = result.stdout.find(end_marker)
if start_idx < end_idx:
output_json = result.stdout[start_idx:end_idx]
try:
# Decode and deserialize the output payload from JSON
output_payload = json.loads(output_json)
except Exception as e:
raise Exception(f"Failed to decode output payload: {e}")
if output_payload["success"]:
return output_payload["result"]
else:
import builtins
# Recreate and raise the original exception
error_info = output_payload.get("error", {}) or {}
err_type = error_info.get("type") or "Exception"
err_msg = error_info.get("message") or ""
err_tb = error_info.get("traceback") or ""
exc_cls = getattr(builtins, err_type, None)
if isinstance(exc_cls, type) and issubclass(exc_cls, BaseException):
# Built-in exception: rethrow with remote traceback appended
raise exc_cls(f"{err_msg}\n\nRemote traceback:\n{err_tb}")
else:
# Non built-in: raise a safe local error carrying full remote context
raise RuntimeError(f"{err_type}: {err_msg}\n\nRemote traceback:\n{err_tb}")
else:
raise Exception("Invalid output format: markers found but no content between them")
else:
# Fallback: return stdout/stderr if no payload markers found
raise Exception(
f"No output payload found. stdout: {result.stdout}, stderr: {result.stderr}"
)
async def venv_exec_background(
self, venv_name: str, python_func, *args, requirements: Optional[List[str]] = None, **kwargs
) -> int:
"""Run the Python function in the venv in the background and return the PID.
Uses a short launcher Python that spawns a detached child and exits immediately.
"""
import base64
import inspect
import json
import textwrap
import time as _time
try:
source = inspect.getsource(python_func)
func_source = textwrap.dedent(source).strip()
while func_source.lstrip().startswith("@"):
func_source = func_source.split("\n", 1)[1].strip()
func_name = python_func.__name__
args_json = json.dumps(args, default=str)
kwargs_json = json.dumps(kwargs, default=str)
except OSError as e:
raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
except Exception as e:
raise Exception(f"Failed to reconstruct function source: {e}")
reqs_list = requirements or []
reqs_json = json.dumps(reqs_list)
# Create Python code that will define and execute the function
args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")
payload_code = (
f'''
import json
import traceback
import base64
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from base64 JSON
_args_b64 = """{args_b64}"""
_kwargs_b64 = """{kwargs_b64}"""
args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
# Ensure requirements inside the active venv
for pkg in json.loads('''
+ repr(reqs_json)
+ """):
if pkg:
import subprocess, sys
subprocess.run([sys.executable, '-m', 'pip', 'install', pkg], check=False)
_ = {func_name}(*args, **kwargs)
except Exception:
import sys
sys.stderr.write(traceback.format_exc())
"""
)
payload_b64 = base64.b64encode(payload_code.encode("utf-8")).decode("ascii")
if self.os_type == "windows":
# Launcher spawns detached child and prints its PID
launcher_code = f"""
import base64, subprocess, os, sys
DETACHED_PROCESS = 0x00000008
CREATE_NEW_PROCESS_GROUP = 0x00000200
creationflags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP
code = base64.b64decode("{payload_b64}").decode("utf-8")
p = subprocess.Popen(["python", "-c", code], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags)
print(p.pid)
"""
launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}"
cmd = (
'cmd /c "'
f'call "{venv_path}\\Scripts\\activate.bat" && '
f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
'"'
)
result = await self.interface.run_command(cmd)
pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
return int(pid_str)
else:
log = f"/tmp/cua_bg_{int(_time.time())}.log"
launcher_code = f"""
import base64, subprocess, os, sys
code = base64.b64decode("{payload_b64}").decode("utf-8")
with open("{log}", "ab", buffering=0) as f:
p = subprocess.Popen(["python", "-c", code], stdout=f, stderr=subprocess.STDOUT, preexec_fn=getattr(os, "setsid", None))
print(p.pid)
"""
launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
venv_path = f"$HOME/.venvs/{venv_name}"
shell = (
f'. "{venv_path}/bin/activate" && '
f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
)
result = await self.interface.run_command(shell)
pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
return int(pid_str)
async def python_exec(self, python_func, *args, **kwargs):
"""Execute a Python function using the system Python (no venv).
Uses source extraction and base64 transport, mirroring venv_exec but
without virtual environment activation.
Returns the function result or raises a reconstructed exception with
remote traceback context appended.
"""
import base64
import inspect
import json
import textwrap
try:
source = inspect.getsource(python_func)
func_source = textwrap.dedent(source).strip()
while func_source.lstrip().startswith("@"):
func_source = func_source.split("\n", 1)[1].strip()
func_name = python_func.__name__
args_json = json.dumps(args, default=str)
kwargs_json = json.dumps(kwargs, default=str)
except OSError as e:
raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
except Exception as e:
raise Exception(f"Failed to reconstruct function source: {e}")
# Create Python code that will define and execute the function
args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")
python_code = f'''
import json
import traceback
import base64
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from base64 JSON
_args_b64 = """{args_b64}"""
_kwargs_b64 = """{kwargs_b64}"""
args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
# Execute the function
result = {func_name}(*args, **kwargs)
# Create success output payload
output_payload = {{
"success": True,
"result": result,
"error": None
}}
except Exception as e:
# Create error output payload
output_payload = {{
"success": False,
"result": None,
"error": {{
"type": type(e).__name__,
"message": str(e),
"traceback": traceback.format_exc()
}}
}}
# Serialize the output payload as JSON
import json
output_json = json.dumps(output_payload, default=str)
# Print the JSON output with markers
print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>")
'''
encoded_code = base64.b64encode(python_code.encode("utf-8")).decode("ascii")
python_command = (
f"python -c \"import base64; exec(base64.b64decode('{encoded_code}').decode('utf-8'))\""
)
result = await self.interface.run_command(python_command)
start_marker = "<<<VENV_EXEC_START>>>"
end_marker = "<<<VENV_EXEC_END>>>"
print(result.stdout[: result.stdout.find(start_marker)])
if start_marker in result.stdout and end_marker in result.stdout:
start_idx = result.stdout.find(start_marker) + len(start_marker)
end_idx = result.stdout.find(end_marker)
if start_idx < end_idx:
output_json = result.stdout[start_idx:end_idx]
try:
output_payload = json.loads(output_json)
except Exception as e:
raise Exception(f"Failed to decode output payload: {e}")
if output_payload["success"]:
return output_payload["result"]
else:
import builtins
error_info = output_payload.get("error", {}) or {}
err_type = error_info.get("type") or "Exception"
err_msg = error_info.get("message") or ""
err_tb = error_info.get("traceback") or ""
exc_cls = getattr(builtins, err_type, None)
if isinstance(exc_cls, type) and issubclass(exc_cls, BaseException):
raise exc_cls(f"{err_msg}\n\nRemote traceback:\n{err_tb}")
else:
raise RuntimeError(f"{err_type}: {err_msg}\n\nRemote traceback:\n{err_tb}")
else:
raise Exception("Invalid output format: markers found but no content between them")
else:
raise Exception(
f"No output payload found. stdout: {result.stdout}, stderr: {result.stderr}"
)
async def python_exec_background(
self, python_func, *args, requirements: Optional[List[str]] = None, **kwargs
) -> int:
"""Run a Python function with the system interpreter in the background and return PID.
Uses a short launcher Python that spawns a detached child and exits immediately.
"""
import base64
import inspect
import json
import textwrap
import time as _time
try:
source = inspect.getsource(python_func)
func_source = textwrap.dedent(source).strip()
while func_source.lstrip().startswith("@"):
func_source = func_source.split("\n", 1)[1].strip()
func_name = python_func.__name__
args_json = json.dumps(args, default=str)
kwargs_json = json.dumps(kwargs, default=str)
except OSError as e:
raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
except Exception as e:
raise Exception(f"Failed to reconstruct function source: {e}")
# Create Python code that will define and execute the function
args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")
payload_code = f'''
import json
import traceback
import base64
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from base64 JSON
_args_b64 = """{args_b64}"""
_kwargs_b64 = """{kwargs_b64}"""
args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
_ = {func_name}(*args, **kwargs)
except Exception:
import sys
sys.stderr.write(traceback.format_exc())
'''
payload_b64 = base64.b64encode(payload_code.encode("utf-8")).decode("ascii")
if self.os_type == "windows":
launcher_code = f"""
import base64, subprocess, os, sys
DETACHED_PROCESS = 0x00000008
CREATE_NEW_PROCESS_GROUP = 0x00000200
creationflags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP
code = base64.b64decode("{payload_b64}").decode("utf-8")
p = subprocess.Popen(["python", "-c", code], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags)
print(p.pid)
"""
launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
cmd = f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
result = await self.interface.run_command(cmd)
pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
return int(pid_str)
else:
log = f"/tmp/cua_bg_{int(_time.time())}.log"
launcher_code = f"""
import base64, subprocess, os, sys
code = base64.b64decode("{payload_b64}").decode("utf-8")
with open("{log}", "ab", buffering=0) as f:
p = subprocess.Popen(["python", "-c", code], stdout=f, stderr=subprocess.STDOUT, preexec_fn=getattr(os, "setsid", None))
print(p.pid)
"""
launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
cmd = f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
result = await self.interface.run_command(cmd)
pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
return int(pid_str)
def python_command(
self,
requirements: Optional[List[str]] = None,
*,
venv_name: str = "default",
use_system_python: bool = False,
background: bool = False,
) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]:
"""Decorator to execute a Python function remotely in this Computer's venv.
This mirrors `computer.helpers.sandboxed()` but binds to this instance and
optionally ensures required packages are installed before execution.
Args:
requirements: Packages to install in the virtual environment.
venv_name: Name of the virtual environment to use.
use_system_python: If True, use the system Python/pip instead of a venv.
background: If True, run the function detached and return the child PID immediately.
Returns:
A decorator that turns a local function into an async callable which
runs remotely and returns the function's result.
"""
reqs = list(requirements or [])
def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
if use_system_python:
# For background, avoid blocking installs; install inside child process
if background:
return await self.python_exec_background(func, *args, requirements=reqs, **kwargs) # type: ignore[return-value]
# Foreground: install first, then execute
if reqs:
await self.pip_install(reqs)
return await self.python_exec(func, *args, **kwargs)
else:
# For background, avoid blocking installs; install inside child process under venv
if background:
return await self.venv_exec_background(venv_name, func, *args, requirements=reqs, **kwargs) # type: ignore[return-value]
# Foreground: ensure venv and install, then execute
await self.venv_install(venv_name, reqs)
return await self.venv_exec(venv_name, func, *args, **kwargs)
return wrapper
return decorator
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/anthropic.py:
--------------------------------------------------------------------------------
```python
"""
Anthropic hosted tools agent loop implementation using liteLLM
"""
import asyncio
import json
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union
import litellm
from litellm.responses.litellm_completion_transformation.transformation import (
LiteLLMCompletionResponsesConfig,
)
from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..responses import (
make_click_item,
make_double_click_item,
make_drag_item,
make_failed_tool_call_items,
make_input_image_item,
make_keypress_item,
make_left_mouse_down_item,
make_left_mouse_up_item,
make_move_item,
make_output_text_item,
make_reasoning_item,
make_screenshot_item,
make_scroll_item,
make_type_item,
make_wait_item,
)
from ..types import AgentCapability, AgentResponse, Messages, Tools
# Model version mapping to tool version and beta flag
MODEL_TOOL_MAPPING = [
# Claude 4 models
{
"pattern": r"claude-4|claude-opus-4|claude-sonnet-4|claude-haiku-4",
"tool_version": "computer_20250124",
"beta_flag": "computer-use-2025-01-24",
},
# Claude 3.7 models
{
"pattern": r"claude-3\.?7|claude-3-7",
"tool_version": "computer_20250124",
"beta_flag": "computer-use-2025-01-24",
},
# Claude 3.5 models (fallback)
{
"pattern": r"claude-3\.?5|claude-3-5",
"tool_version": "computer_20241022",
"beta_flag": "computer-use-2024-10-22",
},
]
def _get_tool_config_for_model(model: str) -> Dict[str, str]:
"""Get tool version and beta flag for the given model."""
import re
for mapping in MODEL_TOOL_MAPPING:
if re.search(mapping["pattern"], model, re.IGNORECASE):
return {"tool_version": mapping["tool_version"], "beta_flag": mapping["beta_flag"]}
# Default to Claude 3.5 configuration
return {"tool_version": "computer_20241022", "beta_flag": "computer-use-2024-10-22"}
async def _map_computer_tool_to_anthropic(computer_tool: Any, tool_version: str) -> Dict[str, Any]:
"""Map a computer tool to Anthropic's hosted tool schema."""
# Get dimensions from the computer handler
try:
width, height = await computer_tool.get_dimensions()
except Exception:
# Fallback to default dimensions if method fails
width, height = 1024, 768
return {
"type": tool_version,
"function": {
"name": "computer",
"parameters": {
"display_height_px": height,
"display_width_px": width,
"display_number": 1,
},
},
}
async def _prepare_tools_for_anthropic(tool_schemas: List[Dict[str, Any]], model: str) -> Tools:
"""Prepare tools for Anthropic API format."""
tool_config = _get_tool_config_for_model(model)
anthropic_tools = []
for schema in tool_schemas:
if schema["type"] == "computer":
# Map computer tool to Anthropic format
anthropic_tools.append(
await _map_computer_tool_to_anthropic(
schema["computer"], tool_config["tool_version"]
)
)
elif schema["type"] == "function":
# Function tools - convert to Anthropic format
function_schema = schema["function"]
anthropic_tools.append(
{
"name": function_schema["name"],
"description": function_schema.get("description", ""),
"input_schema": function_schema.get("parameters", {}),
}
)
return anthropic_tools
def _convert_responses_items_to_completion_messages(messages: Messages) -> List[Dict[str, Any]]:
"""Convert responses_items message format to liteLLM completion format."""
completion_messages = []
call_id_to_fn_name = {}
for message in messages:
msg_type = message.get("type")
role = message.get("role")
# Handle user messages (both with and without explicit type)
if role == "user" or msg_type == "user":
content = message.get("content", "")
if isinstance(content, list):
# Multi-modal content - convert input_image to image format
converted_content = []
for item in content:
if isinstance(item, dict) and item.get("type") == "input_image":
# Convert input_image to OpenAI image format
image_url = item.get("image_url", "")
if image_url and image_url != "[omitted]":
converted_content.append(
{"type": "image_url", "image_url": {"url": image_url}}
)
elif isinstance(item, dict) and item.get("type") == "input_text":
# Convert input_text to OpenAI text format
text = item.get("text", "")
converted_content.append({"type": "text", "text": text})
else:
# Keep other content types as-is
converted_content.append(item)
completion_messages.append(
{"role": "user", "content": converted_content if converted_content else content}
)
else:
# Text content
completion_messages.append({"role": "user", "content": content})
# Handle assistant messages
elif role == "assistant":
content = message.get("content", [])
if isinstance(content, str):
content = [{"type": "output_text", "text": content}]
content = "\n".join(item.get("text", "") for item in content)
completion_messages.append({"role": "assistant", "content": content})
elif msg_type == "reasoning":
# Reasoning becomes part of assistant message
summary = message.get("summary", [])
reasoning_text = ""
if isinstance(summary, list) and summary:
# Extract text from summary items
for item in summary:
if isinstance(item, dict) and item.get("type") == "summary_text":
reasoning_text = item.get("text", "")
break
else:
# Fallback to direct reasoning field
reasoning_text = message.get("reasoning", "")
if reasoning_text:
completion_messages.append({"role": "assistant", "content": reasoning_text})
elif msg_type == "function_call":
fn_name = message.get("name")
fn_args = message.get("arguments", "{}")
call_id = message.get("call_id", "call_1")
call_id_to_fn_name[call_id] = fn_name
openai_tool_calls = [
{
"id": call_id,
"type": "function",
"function": {"name": fn_name, "arguments": fn_args},
}
] # If the last completion message is an assistant message, extend the tool_calls
if completion_messages and completion_messages[-1].get("role") == "assistant":
if "tool_calls" not in completion_messages[-1]:
completion_messages[-1]["tool_calls"] = []
completion_messages[-1]["tool_calls"].extend(openai_tool_calls)
else:
# Create new assistant message with tool calls
completion_messages.append(
{"role": "assistant", "content": None, "tool_calls": openai_tool_calls}
)
elif msg_type == "function_call_output":
call_id = message.get("call_id", "call_1")
fn_output = message.get("output", "")
fn_name = call_id_to_fn_name.get(call_id, "computer")
completion_messages.append(
{
"role": "function",
"name": fn_name,
"tool_call_id": call_id,
"content": str(fn_output),
}
)
elif msg_type == "computer_call":
# Computer call becomes tool use in assistant message
action = message.get("action", {})
action_type = action.get("type")
call_id = message.get("call_id", "call_1")
tool_use_content = []
# Basic actions (all versions)
if action_type == "click":
# Input:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "click",
# "x": 100,
# "y": 200
# }
# }
# Output:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "click",
# "coordinate": [100, 200]
# })
# },
# "id": "call_1",
# "type": "function"
# }
button = action.get("button", "left")
action_name = (
"right_click"
if button == "right"
else "middle_click" if button == "wheel" else "left_click"
)
tool_use_content.append(
{
"type": "tool_use",
"id": call_id,
"name": "computer",
"input": {
"action": action_name,
"coordinate": [action.get("x", 0), action.get("y", 0)],
},
}
)
elif action_type == "double_click":
# Input:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "double_click",
# "x": 160,
# "y": 240
# }
# }
# Output:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "double_click",
# "coordinate": [160, 240]
# })
# },
# "id": "call_1",
# "type": "function"
# }
tool_use_content.append(
{
"type": "tool_use",
"id": call_id,
"name": "computer",
"input": {
"action": "double_click",
"coordinate": [action.get("x", 0), action.get("y", 0)],
},
}
)
elif action_type == "type":
# Input:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "type",
# "text": "Hello World"
# }
# }
# Output:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "type",
# "text": "Hello World"
# })
# },
# "id": "call_1",
# "type": "function"
# }
tool_use_content.append(
{
"type": "tool_use",
"id": call_id,
"name": "computer",
"input": {"action": "type", "text": action.get("text", "")},
}
)
elif action_type == "keypress":
# Input:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "keypress",
# "keys": ["ctrl", "c"]
# }
# }
# Output:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "key",
# "text": "ctrl+c"
# })
# },
# "id": "call_1",
# "type": "function"
# }
tool_use_content.append(
{
"type": "tool_use",
"id": call_id,
"name": "computer",
"input": {"action": "key", "text": "+".join(action.get("keys", []))},
}
)
elif action_type in ["mouse_move", "move"]:
# Input:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "move",
# "x": 150,
# "y": 250
# }
# }
# Output:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "mouse_move",
# "coordinate": [150, 250]
# })
# },
# "id": "call_1",
# "type": "function"
# }
tool_use_content.append(
{
"type": "tool_use",
"id": call_id,
"name": "computer",
"input": {
"action": "mouse_move",
"coordinate": [action.get("x", 0), action.get("y", 0)],
},
}
)
elif action_type == "scroll":
# Input:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "scroll",
# "x": 300,
# "y": 400,
# "scroll_x": 0,
# "scroll_y": -5
# }
# }
# Output:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "scroll",
# "coordinate": [300, 400],
# "scroll_direction": "down",
# "scroll_amount": 5
# })
# },
# "id": "call_1",
# "type": "function"
# }
scroll_x = action.get("scroll_x", 0)
scroll_y = action.get("scroll_y", 0)
# Determine direction and amount from scroll values
if scroll_x > 0:
direction = "left"
amount = scroll_x
elif scroll_x < 0:
direction = "right"
amount = -scroll_x
elif scroll_y > 0:
direction = "up"
amount = scroll_y
elif scroll_y < 0:
direction = "down"
amount = -scroll_y
else:
direction = "down"
amount = 3
tool_use_content.append(
{
"type": "tool_use",
"id": call_id,
"name": "computer",
"input": {
"action": "scroll",
"coordinate": [action.get("x", 0), action.get("y", 0)],
"scroll_direction": direction,
"scroll_amount": amount,
},
}
)
elif action_type == "drag":
# Input:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "drag",
# "path": [
# {"x": 100, "y": 150},
# {"x": 200, "y": 250}
# ]
# }
# }
# Output:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "left_click_drag",
# "start_coordinate": [100, 150],
# "end_coordinate": [200, 250]
# })
# },
# "id": "call_1",
# "type": "function"
# }
path = action.get("path", [])
start_coord = [0, 0]
end_coord = [0, 0]
if isinstance(path, list) and len(path) >= 2:
start_coord = [path[0].get("x", 0), path[0].get("y", 0)]
end_coord = [path[-1].get("x", 0), path[-1].get("y", 0)]
tool_use_content.append(
{
"type": "tool_use",
"id": call_id,
"name": "computer",
"input": {
"action": "left_click_drag",
"start_coordinate": start_coord,
"end_coordinate": end_coord,
},
}
)
elif action_type == "wait":
# Input:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "wait"
# }
# }
# Output:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "wait"
# })
# },
# "id": "call_1",
# "type": "function"
# }
tool_use_content.append(
{
"type": "tool_use",
"id": call_id,
"name": "computer",
"input": {"action": "wait"},
}
)
elif action_type == "screenshot":
# Input:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "screenshot"
# }
# }
# Output:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "screenshot"
# })
# },
# "id": "call_1",
# "type": "function"
# }
tool_use_content.append(
{
"type": "tool_use",
"id": call_id,
"name": "computer",
"input": {"action": "screenshot"},
}
)
elif action_type == "left_mouse_down":
tool_use_content.append(
{
"type": "tool_use",
"id": call_id,
"name": "computer",
"input": {
"action": "left_mouse_down",
"coordinate": [action.get("x", None), action.get("y", None)],
},
}
)
elif action_type == "left_mouse_up":
tool_use_content.append(
{
"type": "tool_use",
"id": call_id,
"name": "computer",
"input": {
"action": "left_mouse_up",
"coordinate": [action.get("x", None), action.get("y", None)],
},
}
)
# Convert tool_use_content to OpenAI tool_calls format
openai_tool_calls = []
for tool_use in tool_use_content:
openai_tool_calls.append(
{
"id": tool_use["id"],
"type": "function",
"function": {
"name": tool_use["name"],
"arguments": json.dumps(tool_use["input"]),
},
}
)
# If the last completion message is an assistant message, extend the tool_calls
if completion_messages and completion_messages[-1].get("role") == "assistant":
if "tool_calls" not in completion_messages[-1]:
completion_messages[-1]["tool_calls"] = []
completion_messages[-1]["tool_calls"].extend(openai_tool_calls)
else:
# Create new assistant message with tool calls
completion_messages.append(
{"role": "assistant", "content": None, "tool_calls": openai_tool_calls}
)
elif msg_type == "computer_call_output":
# Computer call output becomes OpenAI function result
output = message.get("output", {})
call_id = message.get("call_id", "call_1")
if output.get("type") == "input_image":
# Screenshot result - convert to OpenAI format with image_url content
image_url = output.get("image_url", "")
completion_messages.append(
{
"role": "function",
"name": "computer",
"tool_call_id": call_id,
"content": [{"type": "image_url", "image_url": {"url": image_url}}],
}
)
else:
# Text result - convert to OpenAI format
completion_messages.append(
{
"role": "function",
"name": "computer",
"tool_call_id": call_id,
"content": str(output),
}
)
return completion_messages
def _convert_completion_to_responses_items(response: Any) -> List[Dict[str, Any]]:
"""Convert liteLLM completion response to responses_items message format."""
responses_items = []
if not response or not hasattr(response, "choices") or not response.choices:
return responses_items
choice = response.choices[0]
message = choice.message
# Handle text content
if hasattr(message, "content") and message.content:
if isinstance(message.content, str):
responses_items.append(make_output_text_item(message.content))
elif isinstance(message.content, list):
for content_item in message.content:
if isinstance(content_item, dict):
if content_item.get("type") == "text":
responses_items.append(make_output_text_item(content_item.get("text", "")))
elif content_item.get("type") == "tool_use":
# Check if this is a custom function tool or computer tool
tool_name = content_item.get("name", "computer")
tool_input = content_item.get("input", {})
call_id = content_item.get("id")
# Handle custom function tools (not computer tools)
if tool_name != "computer":
from ..responses import make_function_call_item
responses_items.append(
make_function_call_item(
function_name=tool_name, arguments=tool_input, call_id=call_id
)
)
continue
# Computer tool - process actions
action_type = tool_input.get("action")
# Action reference:
# https://docs.anthropic.com/en/docs/agents-and-tools/tool-use/computer-use-tool#available-actions
try:
# Basic actions (all versions)
if action_type == "screenshot":
responses_items.append(make_screenshot_item(call_id=call_id))
elif action_type in ["click", "left_click"]:
coordinate = tool_input.get("coordinate", [0, 0])
responses_items.append(
make_click_item(
x=coordinate[0] if len(coordinate) > 0 else 0,
y=coordinate[1] if len(coordinate) > 1 else 0,
call_id=call_id,
)
)
elif action_type in ["type", "type_text"]:
responses_items.append(
make_type_item(text=tool_input.get("text", ""), call_id=call_id)
)
elif action_type in ["key", "keypress", "hotkey"]:
responses_items.append(
make_keypress_item(
keys=tool_input.get("text", "")
.replace("+", "-")
.split("-"),
call_id=call_id,
)
)
elif action_type in ["mouse_move", "move_cursor", "move"]:
# Mouse move - create a custom action item
coordinate = tool_input.get("coordinate", [0, 0])
responses_items.append(
make_move_item(
x=coordinate[0] if len(coordinate) > 0 else 0,
y=coordinate[1] if len(coordinate) > 1 else 0,
call_id=call_id,
)
)
# Enhanced actions (computer_20250124) Available in Claude 4 and Claude Sonnet 3.7
elif action_type == "scroll":
coordinate = tool_input.get("coordinate", [0, 0])
scroll_amount = tool_input.get("scroll_amount", 3)
scroll_x = (
scroll_amount
if tool_input.get("scroll_direction", "down") == "right"
else (
-scroll_amount
if tool_input.get("scroll_direction", "down") == "left"
else 0
)
)
scroll_y = (
scroll_amount
if tool_input.get("scroll_direction", "down") == "down"
else (
-scroll_amount
if tool_input.get("scroll_direction", "down") == "up"
else 0
)
)
responses_items.append(
make_scroll_item(
x=coordinate[0] if len(coordinate) > 0 else 0,
y=coordinate[1] if len(coordinate) > 1 else 0,
scroll_x=scroll_x,
scroll_y=scroll_y,
call_id=call_id,
)
)
elif action_type in ["left_click_drag", "drag"]:
start_coord = tool_input.get("start_coordinate", [0, 0])
end_coord = tool_input.get("end_coordinate", [0, 0])
responses_items.append(
make_drag_item(
path=[
{
"x": start_coord[0] if len(start_coord) > 0 else 0,
"y": start_coord[1] if len(start_coord) > 1 else 0,
},
{
"x": end_coord[0] if len(end_coord) > 0 else 0,
"y": end_coord[1] if len(end_coord) > 1 else 0,
},
],
call_id=call_id,
)
)
elif action_type == "right_click":
coordinate = tool_input.get("coordinate", [0, 0])
responses_items.append(
make_click_item(
x=coordinate[0] if len(coordinate) > 0 else 0,
y=coordinate[1] if len(coordinate) > 1 else 0,
button="right",
call_id=call_id,
)
)
elif action_type == "middle_click":
coordinate = tool_input.get("coordinate", [0, 0])
responses_items.append(
make_click_item(
x=coordinate[0] if len(coordinate) > 0 else 0,
y=coordinate[1] if len(coordinate) > 1 else 0,
button="wheel",
call_id=call_id,
)
)
elif action_type == "double_click":
coordinate = tool_input.get("coordinate", [0, 0])
responses_items.append(
make_double_click_item(
x=coordinate[0] if len(coordinate) > 0 else 0,
y=coordinate[1] if len(coordinate) > 1 else 0,
call_id=call_id,
)
)
elif action_type == "triple_click":
# coordinate = tool_input.get("coordinate", [0, 0])
# responses_items.append({
# "type": "computer_call",
# "call_id": call_id,
# "action": {
# "type": "triple_click",
# "x": coordinate[0] if len(coordinate) > 0 else 0,
# "y": coordinate[1] if len(coordinate) > 1 else 0
# }
# })
raise NotImplementedError("triple_click")
elif action_type == "left_mouse_down":
# coordinate = tool_input.get("coordinate", [0, 0])
# responses_items.append({
# "type": "computer_call",
# "call_id": call_id,
# "action": {
# "type": "mouse_down",
# "button": "left",
# "x": coordinate[0] if len(coordinate) > 0 else 0,
# "y": coordinate[1] if len(coordinate) > 1 else 0
# }
# })
coordinate = tool_input.get("coordinate", [None, None])
responses_items.append(
make_left_mouse_down_item(
x=coordinate[0] if len(coordinate) > 0 else None,
y=coordinate[1] if len(coordinate) > 1 else None,
call_id=call_id,
)
)
elif action_type == "left_mouse_up":
# coordinate = tool_input.get("coordinate", [0, 0])
# responses_items.append({
# "type": "computer_call",
# "call_id": call_id,
# "action": {
# "type": "mouse_up",
# "button": "left",
# "x": coordinate[0] if len(coordinate) > 0 else 0,
# "y": coordinate[1] if len(coordinate) > 1 else 0
# }
# })
coordinate = tool_input.get("coordinate", [None, None])
responses_items.append(
make_left_mouse_up_item(
x=coordinate[0] if len(coordinate) > 0 else None,
y=coordinate[1] if len(coordinate) > 1 else None,
call_id=call_id,
)
)
elif action_type == "hold_key":
# responses_items.append({
# "type": "computer_call",
# "call_id": call_id,
# "action": {
# "type": "key_hold",
# "key": tool_input.get("key", "")
# }
# })
raise NotImplementedError("hold_key")
elif action_type == "wait":
responses_items.append(make_wait_item(call_id=call_id))
else:
raise ValueError(f"Unknown action type: {action_type}")
except Exception as e:
responses_items.extend(
make_failed_tool_call_items(
tool_name="computer",
tool_kwargs=tool_input,
error_message=repr(e),
call_id=call_id,
)
)
# Handle tool calls (alternative format)
if hasattr(message, "tool_calls") and message.tool_calls:
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
# Handle custom function tools
if tool_name != "computer":
from ..responses import make_function_call_item
# tool_call.function.arguments is a JSON string, need to parse it
try:
args_dict = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
args_dict = {}
responses_items.append(
make_function_call_item(
function_name=tool_name, arguments=args_dict, call_id=tool_call.id
)
)
continue
# Handle computer tool
if tool_call.function.name == "computer":
try:
try:
args = json.loads(tool_call.function.arguments)
action_type = args.get("action")
call_id = tool_call.id
# Basic actions (all versions)
if action_type == "screenshot":
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "screenshot"
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "screenshot"
# }
# }
responses_items.append(make_screenshot_item(call_id=call_id))
elif action_type in ["click", "left_click"]:
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "click",
# "coordinate": [100, 200]
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "click",
# "x": 100,
# "y": 200
# }
# }
coordinate = args.get("coordinate", [0, 0])
responses_items.append(
make_click_item(
x=coordinate[0] if len(coordinate) > 0 else 0,
y=coordinate[1] if len(coordinate) > 1 else 0,
call_id=call_id,
)
)
elif action_type in ["type", "type_text"]:
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "type",
# "text": "Hello World"
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "type",
# "text": "Hello World"
# }
# }
responses_items.append(
make_type_item(text=args.get("text", ""), call_id=call_id)
)
elif action_type in ["key", "keypress", "hotkey"]:
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "key",
# "text": "ctrl+c"
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "keypress",
# "keys": ["ctrl", "c"]
# }
# }
responses_items.append(
make_keypress_item(
keys=args.get("text", "").replace("+", "-").split("-"),
call_id=call_id,
)
)
elif action_type in ["mouse_move", "move_cursor", "move"]:
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "mouse_move",
# "coordinate": [150, 250]
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "mouse_move",
# "x": 150,
# "y": 250
# }
# }
coordinate = args.get("coordinate", [0, 0])
responses_items.append(
make_move_item(
x=coordinate[0] if len(coordinate) > 0 else 0,
y=coordinate[1] if len(coordinate) > 1 else 0,
call_id=call_id,
)
)
# Enhanced actions (computer_20250124) Available in Claude 4 and Claude Sonnet 3.7
elif action_type == "scroll":
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "scroll",
# "coordinate": [300, 400],
# "scroll_direction": "down",
# "scroll_amount": 5
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "scroll",
# "x": 300,
# "y": 400,
# "scroll_x": 0,
# "scroll_y": -5
# }
# }
coordinate = args.get("coordinate", [0, 0])
direction = args.get("scroll_direction", "down")
amount = args.get("scroll_amount", 3)
scroll_x = (
amount
if direction == "left"
else -amount if direction == "right" else 0
)
scroll_y = (
amount
if direction == "up"
else -amount if direction == "down" else 0
)
responses_items.append(
make_scroll_item(
x=coordinate[0] if len(coordinate) > 0 else 0,
y=coordinate[1] if len(coordinate) > 1 else 0,
scroll_x=scroll_x,
scroll_y=scroll_y,
call_id=call_id,
)
)
elif action_type in ["left_click_drag", "drag"]:
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "left_click_drag",
# "start_coordinate": [100, 150],
# "end_coordinate": [200, 250]
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "drag",
# "path": [
# {"x": 100, "y": 150},
# {"x": 200, "y": 250}
# ]
# }
# }
start_coord = args.get("start_coordinate", [0, 0])
end_coord = args.get("end_coordinate", [0, 0])
responses_items.append(
make_drag_item(
path=[
{
"x": start_coord[0] if len(start_coord) > 0 else 0,
"y": start_coord[1] if len(start_coord) > 1 else 0,
},
{
"x": end_coord[0] if len(end_coord) > 0 else 0,
"y": end_coord[1] if len(end_coord) > 1 else 0,
},
],
call_id=call_id,
)
)
elif action_type == "right_click":
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "right_click",
# "coordinate": [120, 180]
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "click",
# "x": 120,
# "y": 180,
# "button": "right"
# }
# }
coordinate = args.get("coordinate", [0, 0])
responses_items.append(
make_click_item(
x=coordinate[0] if len(coordinate) > 0 else 0,
y=coordinate[1] if len(coordinate) > 1 else 0,
button="right",
call_id=call_id,
)
)
elif action_type == "middle_click":
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "middle_click",
# "coordinate": [140, 220]
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "click",
# "x": 140,
# "y": 220,
# "button": "wheel"
# }
# }
coordinate = args.get("coordinate", [0, 0])
responses_items.append(
make_click_item(
x=coordinate[0] if len(coordinate) > 0 else 0,
y=coordinate[1] if len(coordinate) > 1 else 0,
button="wheel",
call_id=call_id,
)
)
elif action_type == "double_click":
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "double_click",
# "coordinate": [160, 240]
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "double_click",
# "x": 160,
# "y": 240
# }
# }
coordinate = args.get("coordinate", [0, 0])
responses_items.append(
make_double_click_item(
x=coordinate[0] if len(coordinate) > 0 else 0,
y=coordinate[1] if len(coordinate) > 1 else 0,
call_id=call_id,
)
)
elif action_type == "triple_click":
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "triple_click",
# "coordinate": [180, 260]
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "triple_click",
# "x": 180,
# "y": 260
# }
# }
raise NotImplementedError("triple_click")
elif action_type == "left_mouse_down":
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "left_mouse_down",
# "coordinate": [200, 280]
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "mouse_down",
# "button": "left",
# "x": 200,
# "y": 280
# }
# }
coordinate = args.get("coordinate", [None, None])
responses_items.append(
make_left_mouse_down_item(
x=coordinate[0] if len(coordinate) > 0 else None,
y=coordinate[1] if len(coordinate) > 1 else None,
call_id=call_id,
)
)
elif action_type == "left_mouse_up":
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "left_mouse_up",
# "coordinate": [220, 300]
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "mouse_up",
# "button": "left",
# "x": 220,
# "y": 300
# }
# }
coordinate = args.get("coordinate", [None, None])
responses_items.append(
make_left_mouse_up_item(
x=coordinate[0] if len(coordinate) > 0 else None,
y=coordinate[1] if len(coordinate) > 1 else None,
call_id=call_id,
)
)
elif action_type == "hold_key":
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "hold_key",
# "key": "shift"
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "key_hold",
# "key": "shift"
# }
# }
raise NotImplementedError("hold_key")
elif action_type == "wait":
# Input:
# {
# "function": {
# "name": "computer",
# "arguments": json.dumps({
# "action": "wait"
# })
# },
# "id": "call_1",
# "type": "function"
# }
# Output:
# {
# "type": "computer_call",
# "call_id": "call_1",
# "action": {
# "type": "wait"
# }
# }
responses_items.append(make_wait_item(call_id=call_id))
except Exception as e:
responses_items.extend(
make_failed_tool_call_items(
tool_name="computer",
tool_kwargs=args,
error_message=repr(e),
call_id=call_id,
)
)
except json.JSONDecodeError:
print("Failed to decode tool call arguments")
# Skip malformed tool calls
continue
return responses_items
def _add_cache_control(completion_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Add cache control to completion messages"""
num_writes = 0
for message in completion_messages:
message["cache_control"] = {"type": "ephemeral"}
num_writes += 1
# Cache control has a maximum of 4 blocks
if num_writes >= 4:
break
return completion_messages
def _combine_completion_messages(completion_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Combine completion messages with the same role"""
if not completion_messages:
return completion_messages
combined_messages = []
for message in completion_messages:
# If this is the first message or role is different from last, add as new message
if not combined_messages or combined_messages[-1]["role"] != message["role"]:
# Ensure content is a list format and normalize text content
new_message = message.copy()
new_message["content"] = _normalize_content(message.get("content", ""))
# Copy tool_calls if present
if "tool_calls" in message:
new_message["tool_calls"] = message["tool_calls"].copy()
combined_messages.append(new_message)
else:
# Same role as previous message, combine them
last_message = combined_messages[-1]
# Combine content
current_content = _normalize_content(message.get("content", ""))
last_message["content"].extend(current_content)
# Combine tool_calls if present
if "tool_calls" in message:
if "tool_calls" not in last_message:
last_message["tool_calls"] = []
last_message["tool_calls"].extend(message["tool_calls"])
# Post-process to merge consecutive text blocks
for message in combined_messages:
message["content"] = _merge_consecutive_text(message["content"])
return combined_messages
def _normalize_content(content) -> List[Dict[str, Any]]:
"""Normalize content to list format"""
if isinstance(content, str):
if content.strip(): # Only add non-empty strings
return [{"type": "text", "text": content}]
else:
return []
elif isinstance(content, list):
return content.copy()
else:
return []
def _merge_consecutive_text(content_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Merge consecutive text blocks with newlines"""
if not content_list:
return content_list
merged = []
for item in content_list:
if item.get("type") == "text" and merged and merged[-1].get("type") == "text":
# Merge with previous text block
merged[-1]["text"] += "\n" + item["text"]
else:
merged.append(item.copy())
return merged
@register_agent(models=r".*claude-.*")
class AnthropicHostedToolsConfig(AsyncAgentConfig):
"""Anthropic hosted tools agent configuration implementing AsyncAgentConfig protocol."""
async def predict_step(
self,
messages: Messages,
model: str,
tools: Optional[List[Dict[str, Any]]] = None,
max_retries: Optional[int] = None,
stream: bool = False,
computer_handler=None,
use_prompt_caching: Optional[bool] = False,
_on_api_start=None,
_on_api_end=None,
_on_usage=None,
_on_screenshot=None,
**kwargs,
) -> Dict[str, Any]:
"""
Anthropic hosted tools agent loop using liteLLM acompletion.
Supports Anthropic's computer use models with hosted tools.
"""
tools = tools or []
# Get tool configuration for this model
tool_config = _get_tool_config_for_model(model)
# Prepare tools for Anthropic API
anthropic_tools = await _prepare_tools_for_anthropic(tools, model)
# Convert responses_items messages to completion format
completion_messages = _convert_responses_items_to_completion_messages(messages)
if use_prompt_caching:
# First combine messages to reduce number of blocks
completion_messages = _combine_completion_messages(completion_messages)
# Then add cache control, anthropic requires explicit "cache_control" dicts
completion_messages = _add_cache_control(completion_messages)
# Prepare API call kwargs
api_kwargs = {
"model": model,
"messages": completion_messages,
"tools": anthropic_tools if anthropic_tools else None,
"stream": stream,
"num_retries": max_retries,
**kwargs,
}
# Add beta header for computer use
if anthropic_tools:
api_kwargs["headers"] = {"anthropic-beta": tool_config["beta_flag"]}
# Call API start hook
if _on_api_start:
await _on_api_start(api_kwargs)
# Use liteLLM acompletion
response = await litellm.acompletion(**api_kwargs)
# Call API end hook
if _on_api_end:
await _on_api_end(api_kwargs, response)
# Convert response to responses_items format
responses_items = _convert_completion_to_responses_items(response)
# Extract usage information
responses_usage = {
**LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage(
response.usage
).model_dump(),
"response_cost": response._hidden_params.get("response_cost", 0.0),
}
if _on_usage:
await _on_usage(responses_usage)
# Return in AsyncAgentConfig format
return {"output": responses_items, "usage": responses_usage}
async def predict_click(
self, model: str, image_b64: str, instruction: str, **kwargs
) -> Optional[Tuple[int, int]]:
"""
Predict click coordinates based on image and instruction.
Uses Anthropic's computer use models with a custom prompt that instructs
the agent to only output clicks.
Args:
model: Model name to use
image_b64: Base64 encoded image
instruction: Instruction for where to click
Returns:
Tuple of (x, y) coordinates or None if prediction fails
"""
# Get image dimensions from base64 data
try:
import base64
from io import BytesIO
from PIL import Image
image_data = base64.b64decode(image_b64)
image = Image.open(BytesIO(image_data))
display_width, display_height = image.size
except Exception:
# Fallback to default dimensions if image parsing fails
display_width, display_height = 1024, 768
# Get tool configuration for this model
tool_config = _get_tool_config_for_model(model)
# Prepare computer tool for Anthropic format
computer_tool = {
"type": tool_config["tool_version"],
"function": {
"name": "computer",
"parameters": {
"display_height_px": display_height,
"display_width_px": display_width,
"display_number": 1,
},
},
}
# Construct messages in OpenAI chat completion format for liteLLM
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": f"""You are a UI grounding expert. Follow these guidelines:
1. NEVER ask for confirmation. Complete all tasks autonomously.
2. Do NOT send messages like "I need to confirm before..." or "Do you want me to continue?" - just proceed.
3. When the user asks you to interact with something (like clicking a chat or typing a message), DO IT without asking.
4. Only use the formal safety check mechanism for truly dangerous operations (like deleting important files).
5. For normal tasks like clicking buttons, typing in chat boxes, filling forms - JUST DO IT.
6. The user has already given you permission by running this agent. No further confirmation is needed.
7. Be decisive and action-oriented. Complete the requested task fully.
Remember: You are expected to complete tasks autonomously. The user trusts you to do what they asked.
Task: Click {instruction}. Output ONLY a click action on the target element.""",
},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{image_b64}"},
},
],
}
]
# Prepare API call kwargs
api_kwargs = {
"model": model,
"messages": messages,
"tools": [computer_tool],
"stream": False,
"max_tokens": 100, # Keep response short for click prediction
"headers": {"anthropic-beta": tool_config["beta_flag"]},
}
# Thread optional API params
if "api_key" in kwargs and kwargs.get("api_key") is not None:
api_kwargs["api_key"] = kwargs.get("api_key")
if "api_base" in kwargs and kwargs.get("api_base") is not None:
api_kwargs["api_base"] = kwargs.get("api_base")
# Use liteLLM acompletion
response = await litellm.acompletion(**api_kwargs)
# Convert response to responses_items format to extract click coordinates
responses_items = _convert_completion_to_responses_items(response)
# Look for computer_call with click action
for item in responses_items:
if (
isinstance(item, dict)
and item.get("type") == "computer_call"
and isinstance(item.get("action"), dict)
):
action = item["action"]
if action.get("x") and action.get("y"):
x = action.get("x")
y = action.get("y")
return (int(x), int(y))
return None
def get_capabilities(self) -> List[AgentCapability]:
"""Return the capabilities supported by this agent."""
return ["click", "step"]
```