This is page 17 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── scripts
│ │ ├── get_pyproject_version.py
│ │ └── tests
│ │ ├── __init__.py
│ │ ├── README.md
│ │ └── test_get_pyproject_version.py
│ └── workflows
│ ├── bump-version.yml
│ ├── ci-lume.yml
│ ├── docker-publish-cua-linux.yml
│ ├── docker-publish-cua-windows.yml
│ ├── docker-publish-kasm.yml
│ ├── docker-publish-xfce.yml
│ ├── docker-reusable-publish.yml
│ ├── link-check.yml
│ ├── lint.yml
│ ├── npm-publish-cli.yml
│ ├── npm-publish-computer.yml
│ ├── npm-publish-core.yml
│ ├── publish-lume.yml
│ ├── pypi-publish-agent.yml
│ ├── pypi-publish-computer-server.yml
│ ├── pypi-publish-computer.yml
│ ├── pypi-publish-core.yml
│ ├── pypi-publish-mcp-server.yml
│ ├── pypi-publish-som.yml
│ ├── pypi-reusable-publish.yml
│ ├── python-tests.yml
│ ├── test-cua-models.yml
│ └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│ ├── docs.code-workspace
│ ├── extensions.json
│ ├── launch.json
│ ├── libs-ts.code-workspace
│ ├── lume.code-workspace
│ ├── lumier.code-workspace
│ ├── py.code-workspace
│ └── settings.json
├── blog
│ ├── app-use.md
│ ├── assets
│ │ ├── composite-agents.png
│ │ ├── docker-ubuntu-support.png
│ │ ├── hack-booth.png
│ │ ├── hack-closing-ceremony.jpg
│ │ ├── hack-cua-ollama-hud.jpeg
│ │ ├── hack-leaderboard.png
│ │ ├── hack-the-north.png
│ │ ├── hack-winners.jpeg
│ │ ├── hack-workshop.jpeg
│ │ ├── hud-agent-evals.png
│ │ └── trajectory-viewer.jpeg
│ ├── bringing-computer-use-to-the-web.md
│ ├── build-your-own-operator-on-macos-1.md
│ ├── build-your-own-operator-on-macos-2.md
│ ├── cloud-windows-ga-macos-preview.md
│ ├── composite-agents.md
│ ├── computer-use-agents-for-growth-hacking.md
│ ├── cua-hackathon.md
│ ├── cua-playground-preview.md
│ ├── cua-vlm-router.md
│ ├── hack-the-north.md
│ ├── hud-agent-evals.md
│ ├── human-in-the-loop.md
│ ├── introducing-cua-cli.md
│ ├── introducing-cua-cloud-containers.md
│ ├── lume-to-containerization.md
│ ├── neurips-2025-cua-papers.md
│ ├── sandboxed-python-execution.md
│ ├── training-computer-use-models-trajectories-1.md
│ ├── trajectory-viewer.md
│ ├── ubuntu-docker-support.md
│ └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│ ├── .env.example
│ ├── .gitignore
│ ├── content
│ │ └── docs
│ │ ├── agent-sdk
│ │ │ ├── agent-loops.mdx
│ │ │ ├── benchmarks
│ │ │ │ ├── index.mdx
│ │ │ │ ├── interactive.mdx
│ │ │ │ ├── introduction.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── osworld-verified.mdx
│ │ │ │ ├── screenspot-pro.mdx
│ │ │ │ └── screenspot-v2.mdx
│ │ │ ├── callbacks
│ │ │ │ ├── agent-lifecycle.mdx
│ │ │ │ ├── cost-saving.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── logging.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── pii-anonymization.mdx
│ │ │ │ └── trajectories.mdx
│ │ │ ├── chat-history.mdx
│ │ │ ├── custom-tools.mdx
│ │ │ ├── customizing-computeragent.mdx
│ │ │ ├── integrations
│ │ │ │ ├── hud.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── observability.mdx
│ │ │ ├── mcp-server
│ │ │ │ ├── client-integrations.mdx
│ │ │ │ ├── configuration.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── llm-integrations.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── tools.mdx
│ │ │ │ └── usage.mdx
│ │ │ ├── message-format.mdx
│ │ │ ├── meta.json
│ │ │ ├── migration-guide.mdx
│ │ │ ├── prompt-caching.mdx
│ │ │ ├── supported-agents
│ │ │ │ ├── composed-agents.mdx
│ │ │ │ ├── computer-use-agents.mdx
│ │ │ │ ├── grounding-models.mdx
│ │ │ │ ├── human-in-the-loop.mdx
│ │ │ │ └── meta.json
│ │ │ ├── supported-model-providers
│ │ │ │ ├── cua-vlm-router.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ └── local-models.mdx
│ │ │ ├── telemetry.mdx
│ │ │ └── usage-tracking.mdx
│ │ ├── cli-playbook
│ │ │ ├── commands.mdx
│ │ │ ├── index.mdx
│ │ │ └── meta.json
│ │ ├── computer-sdk
│ │ │ ├── cloud-vm-management.mdx
│ │ │ ├── commands.mdx
│ │ │ ├── computer-server
│ │ │ │ ├── Commands.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── REST-API.mdx
│ │ │ │ └── WebSocket-API.mdx
│ │ │ ├── computer-ui.mdx
│ │ │ ├── computers.mdx
│ │ │ ├── custom-computer-handlers.mdx
│ │ │ ├── meta.json
│ │ │ ├── sandboxed-python.mdx
│ │ │ └── tracing-api.mdx
│ │ ├── example-usecases
│ │ │ ├── form-filling.mdx
│ │ │ ├── gemini-complex-ui-navigation.mdx
│ │ │ ├── meta.json
│ │ │ ├── post-event-contact-export.mdx
│ │ │ └── windows-app-behind-vpn.mdx
│ │ ├── get-started
│ │ │ ├── meta.json
│ │ │ └── quickstart.mdx
│ │ ├── index.mdx
│ │ ├── macos-vm-cli-playbook
│ │ │ ├── lume
│ │ │ │ ├── cli-reference.mdx
│ │ │ │ ├── faq.md
│ │ │ │ ├── http-api.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── prebuilt-images.mdx
│ │ │ ├── lumier
│ │ │ │ ├── building-lumier.mdx
│ │ │ │ ├── docker-compose.mdx
│ │ │ │ ├── docker.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ └── meta.json
│ │ │ └── meta.json
│ │ └── meta.json
│ ├── next.config.mjs
│ ├── package-lock.json
│ ├── package.json
│ ├── pnpm-lock.yaml
│ ├── postcss.config.mjs
│ ├── public
│ │ └── img
│ │ ├── agent_gradio_ui.png
│ │ ├── agent.png
│ │ ├── bg-dark.jpg
│ │ ├── bg-light.jpg
│ │ ├── cli.png
│ │ ├── computer.png
│ │ ├── grounding-with-gemini3.gif
│ │ ├── hero.png
│ │ ├── laminar_trace_example.png
│ │ ├── som_box_threshold.png
│ │ └── som_iou_threshold.png
│ ├── README.md
│ ├── source.config.ts
│ ├── src
│ │ ├── app
│ │ │ ├── (home)
│ │ │ │ ├── [[...slug]]
│ │ │ │ │ └── page.tsx
│ │ │ │ └── layout.tsx
│ │ │ ├── api
│ │ │ │ ├── posthog
│ │ │ │ │ └── [...path]
│ │ │ │ │ └── route.ts
│ │ │ │ └── search
│ │ │ │ └── route.ts
│ │ │ ├── favicon.ico
│ │ │ ├── global.css
│ │ │ ├── layout.config.tsx
│ │ │ ├── layout.tsx
│ │ │ ├── llms.mdx
│ │ │ │ └── [[...slug]]
│ │ │ │ └── route.ts
│ │ │ ├── llms.txt
│ │ │ │ └── route.ts
│ │ │ ├── robots.ts
│ │ │ └── sitemap.ts
│ │ ├── assets
│ │ │ ├── discord-black.svg
│ │ │ ├── discord-white.svg
│ │ │ ├── logo-black.svg
│ │ │ └── logo-white.svg
│ │ ├── components
│ │ │ ├── analytics-tracker.tsx
│ │ │ ├── cookie-consent.tsx
│ │ │ ├── doc-actions-menu.tsx
│ │ │ ├── editable-code-block.tsx
│ │ │ ├── footer.tsx
│ │ │ ├── hero.tsx
│ │ │ ├── iou.tsx
│ │ │ ├── mermaid.tsx
│ │ │ └── page-feedback.tsx
│ │ ├── lib
│ │ │ ├── llms.ts
│ │ │ └── source.ts
│ │ ├── mdx-components.tsx
│ │ └── providers
│ │ └── posthog-provider.tsx
│ └── tsconfig.json
├── examples
│ ├── agent_examples.py
│ ├── agent_ui_examples.py
│ ├── browser_tool_example.py
│ ├── cloud_api_examples.py
│ ├── computer_examples_windows.py
│ ├── computer_examples.py
│ ├── computer_ui_examples.py
│ ├── computer-example-ts
│ │ ├── .env.example
│ │ ├── .gitignore
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── README.md
│ │ ├── src
│ │ │ ├── helpers.ts
│ │ │ └── index.ts
│ │ └── tsconfig.json
│ ├── docker_examples.py
│ ├── evals
│ │ ├── hud_eval_examples.py
│ │ └── wikipedia_most_linked.txt
│ ├── pylume_examples.py
│ ├── sandboxed_functions_examples.py
│ ├── som_examples.py
│ ├── tracing_examples.py
│ ├── utils.py
│ └── winsandbox_example.py
├── img
│ ├── agent_gradio_ui.png
│ ├── agent.png
│ ├── cli.png
│ ├── computer.png
│ ├── logo_black.png
│ └── logo_white.png
├── libs
│ ├── kasm
│ │ ├── Dockerfile
│ │ ├── LICENSE
│ │ ├── README.md
│ │ └── src
│ │ └── ubuntu
│ │ └── install
│ │ └── firefox
│ │ ├── custom_startup.sh
│ │ ├── firefox.desktop
│ │ └── install_firefox.sh
│ ├── lume
│ │ ├── .cursorignore
│ │ ├── CONTRIBUTING.md
│ │ ├── Development.md
│ │ ├── img
│ │ │ └── cli.png
│ │ ├── Package.resolved
│ │ ├── Package.swift
│ │ ├── README.md
│ │ ├── resources
│ │ │ └── lume.entitlements
│ │ ├── scripts
│ │ │ ├── build
│ │ │ │ ├── build-debug.sh
│ │ │ │ ├── build-release-notarized.sh
│ │ │ │ └── build-release.sh
│ │ │ └── install.sh
│ │ ├── src
│ │ │ ├── Commands
│ │ │ │ ├── Clone.swift
│ │ │ │ ├── Config.swift
│ │ │ │ ├── Create.swift
│ │ │ │ ├── Delete.swift
│ │ │ │ ├── Get.swift
│ │ │ │ ├── Images.swift
│ │ │ │ ├── IPSW.swift
│ │ │ │ ├── List.swift
│ │ │ │ ├── Logs.swift
│ │ │ │ ├── Options
│ │ │ │ │ └── FormatOption.swift
│ │ │ │ ├── Prune.swift
│ │ │ │ ├── Pull.swift
│ │ │ │ ├── Push.swift
│ │ │ │ ├── Run.swift
│ │ │ │ ├── Serve.swift
│ │ │ │ ├── Set.swift
│ │ │ │ └── Stop.swift
│ │ │ ├── ContainerRegistry
│ │ │ │ ├── ImageContainerRegistry.swift
│ │ │ │ ├── ImageList.swift
│ │ │ │ └── ImagesPrinter.swift
│ │ │ ├── Errors
│ │ │ │ └── Errors.swift
│ │ │ ├── FileSystem
│ │ │ │ ├── Home.swift
│ │ │ │ ├── Settings.swift
│ │ │ │ ├── VMConfig.swift
│ │ │ │ ├── VMDirectory.swift
│ │ │ │ └── VMLocation.swift
│ │ │ ├── LumeController.swift
│ │ │ ├── Main.swift
│ │ │ ├── Server
│ │ │ │ ├── Handlers.swift
│ │ │ │ ├── HTTP.swift
│ │ │ │ ├── Requests.swift
│ │ │ │ ├── Responses.swift
│ │ │ │ └── Server.swift
│ │ │ ├── Utils
│ │ │ │ ├── CommandRegistry.swift
│ │ │ │ ├── CommandUtils.swift
│ │ │ │ ├── Logger.swift
│ │ │ │ ├── NetworkUtils.swift
│ │ │ │ ├── Path.swift
│ │ │ │ ├── ProcessRunner.swift
│ │ │ │ ├── ProgressLogger.swift
│ │ │ │ ├── String.swift
│ │ │ │ └── Utils.swift
│ │ │ ├── Virtualization
│ │ │ │ ├── DarwinImageLoader.swift
│ │ │ │ ├── DHCPLeaseParser.swift
│ │ │ │ ├── ImageLoaderFactory.swift
│ │ │ │ └── VMVirtualizationService.swift
│ │ │ ├── VM
│ │ │ │ ├── DarwinVM.swift
│ │ │ │ ├── LinuxVM.swift
│ │ │ │ ├── VM.swift
│ │ │ │ ├── VMDetails.swift
│ │ │ │ ├── VMDetailsPrinter.swift
│ │ │ │ ├── VMDisplayResolution.swift
│ │ │ │ └── VMFactory.swift
│ │ │ └── VNC
│ │ │ ├── PassphraseGenerator.swift
│ │ │ └── VNCService.swift
│ │ └── tests
│ │ ├── Mocks
│ │ │ ├── MockVM.swift
│ │ │ ├── MockVMVirtualizationService.swift
│ │ │ └── MockVNCService.swift
│ │ ├── VM
│ │ │ └── VMDetailsPrinterTests.swift
│ │ ├── VMTests.swift
│ │ ├── VMVirtualizationServiceTests.swift
│ │ └── VNCServiceTests.swift
│ ├── lumier
│ │ ├── .dockerignore
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── bin
│ │ │ └── entry.sh
│ │ ├── config
│ │ │ └── constants.sh
│ │ ├── hooks
│ │ │ └── on-logon.sh
│ │ └── lib
│ │ ├── utils.sh
│ │ └── vm.sh
│ ├── python
│ │ ├── agent
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── agent
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── adapters
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── cua_adapter.py
│ │ │ │ │ ├── huggingfacelocal_adapter.py
│ │ │ │ │ ├── human_adapter.py
│ │ │ │ │ ├── mlxvlm_adapter.py
│ │ │ │ │ └── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ └── qwen2_5_vl.py
│ │ │ │ ├── agent.py
│ │ │ │ ├── callbacks
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── budget_manager.py
│ │ │ │ │ ├── image_retention.py
│ │ │ │ │ ├── logging.py
│ │ │ │ │ ├── operator_validator.py
│ │ │ │ │ ├── pii_anonymization.py
│ │ │ │ │ ├── prompt_instructions.py
│ │ │ │ │ ├── telemetry.py
│ │ │ │ │ └── trajectory_saver.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── computers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cua.py
│ │ │ │ │ └── custom.py
│ │ │ │ ├── decorators.py
│ │ │ │ ├── human_tool
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ ├── server.py
│ │ │ │ │ └── ui.py
│ │ │ │ ├── integrations
│ │ │ │ │ └── hud
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── agent.py
│ │ │ │ │ └── proxy.py
│ │ │ │ ├── loops
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── anthropic.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── composed_grounded.py
│ │ │ │ │ ├── gelato.py
│ │ │ │ │ ├── gemini.py
│ │ │ │ │ ├── generic_vlm.py
│ │ │ │ │ ├── glm45v.py
│ │ │ │ │ ├── gta1.py
│ │ │ │ │ ├── holo.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── model_types.csv
│ │ │ │ │ ├── moondream3.py
│ │ │ │ │ ├── omniparser.py
│ │ │ │ │ ├── openai.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ ├── uiins.py
│ │ │ │ │ ├── uitars.py
│ │ │ │ │ └── uitars2.py
│ │ │ │ ├── proxy
│ │ │ │ │ ├── examples.py
│ │ │ │ │ └── handlers.py
│ │ │ │ ├── responses.py
│ │ │ │ ├── tools
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── browser_tool.py
│ │ │ │ ├── types.py
│ │ │ │ └── ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ └── gradio
│ │ │ │ ├── __init__.py
│ │ │ │ ├── app.py
│ │ │ │ └── ui_components.py
│ │ │ ├── benchmarks
│ │ │ │ ├── .gitignore
│ │ │ │ ├── contrib.md
│ │ │ │ ├── interactive.py
│ │ │ │ ├── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ └── gta1.py
│ │ │ │ ├── README.md
│ │ │ │ ├── ss-pro.py
│ │ │ │ ├── ss-v2.py
│ │ │ │ └── utils.py
│ │ │ ├── example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer_agent.py
│ │ ├── bench-ui
│ │ │ ├── bench_ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api.py
│ │ │ │ └── child.py
│ │ │ ├── examples
│ │ │ │ ├── folder_example.py
│ │ │ │ ├── gui
│ │ │ │ │ ├── index.html
│ │ │ │ │ ├── logo.svg
│ │ │ │ │ └── styles.css
│ │ │ │ ├── output_overlay.png
│ │ │ │ └── simple_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ └── test_port_detection.py
│ │ ├── computer
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer
│ │ │ │ ├── __init__.py
│ │ │ │ ├── computer.py
│ │ │ │ ├── diorama_computer.py
│ │ │ │ ├── helpers.py
│ │ │ │ ├── interface
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ ├── models.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── logger.py
│ │ │ │ ├── models.py
│ │ │ │ ├── providers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cloud
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── docker
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── lume
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── lume_api.py
│ │ │ │ │ ├── lumier
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── types.py
│ │ │ │ │ └── winsandbox
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── provider.py
│ │ │ │ │ └── setup_script.ps1
│ │ │ │ ├── tracing_wrapper.py
│ │ │ │ ├── tracing.py
│ │ │ │ ├── ui
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ └── gradio
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── app.py
│ │ │ │ └── utils.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer.py
│ │ ├── computer-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── browser.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── diorama
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── diorama_computer.py
│ │ │ │ │ ├── diorama.py
│ │ │ │ │ ├── draw.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── safezone.py
│ │ │ │ ├── handlers
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── main.py
│ │ │ │ ├── server.py
│ │ │ │ ├── utils
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── wallpaper.py
│ │ │ │ └── watchdog.py
│ │ │ ├── examples
│ │ │ │ ├── __init__.py
│ │ │ │ └── usage_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ ├── run_server.py
│ │ │ ├── test_connection.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_server.py
│ │ ├── core
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── core
│ │ │ │ ├── __init__.py
│ │ │ │ └── telemetry
│ │ │ │ ├── __init__.py
│ │ │ │ └── posthog.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_telemetry.py
│ │ ├── mcp-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── build-extension.py
│ │ │ ├── CONCURRENT_SESSIONS.md
│ │ │ ├── desktop-extension
│ │ │ │ ├── cua-extension.mcpb
│ │ │ │ ├── desktop_extension.png
│ │ │ │ ├── manifest.json
│ │ │ │ ├── README.md
│ │ │ │ ├── requirements.txt
│ │ │ │ ├── run_server.sh
│ │ │ │ └── setup.py
│ │ │ ├── mcp_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── server.py
│ │ │ │ └── session_manager.py
│ │ │ ├── pdm.lock
│ │ │ ├── pyproject.toml
│ │ │ ├── QUICK_TEST_COMMANDS.sh
│ │ │ ├── quick_test_local_option.py
│ │ │ ├── README.md
│ │ │ ├── scripts
│ │ │ │ ├── install_mcp_server.sh
│ │ │ │ └── start_mcp_server.sh
│ │ │ ├── test_mcp_server_local_option.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_mcp_server.py
│ │ ├── pylume
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_pylume.py
│ │ └── som
│ │ ├── .bumpversion.cfg
│ │ ├── LICENSE
│ │ ├── poetry.toml
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ ├── som
│ │ │ ├── __init__.py
│ │ │ ├── detect.py
│ │ │ ├── detection.py
│ │ │ ├── models.py
│ │ │ ├── ocr.py
│ │ │ ├── util
│ │ │ │ └── utils.py
│ │ │ └── visualization.py
│ │ └── tests
│ │ ├── conftest.py
│ │ └── test_omniparser.py
│ ├── qemu-docker
│ │ ├── linux
│ │ │ ├── Dockerfile
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ ├── entry.sh
│ │ │ └── vm
│ │ │ ├── image
│ │ │ │ └── README.md
│ │ │ └── setup
│ │ │ ├── install.sh
│ │ │ ├── setup-cua-server.sh
│ │ │ └── setup.sh
│ │ ├── README.md
│ │ └── windows
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── entry.sh
│ │ └── vm
│ │ ├── image
│ │ │ └── README.md
│ │ └── setup
│ │ ├── install.bat
│ │ ├── on-logon.ps1
│ │ ├── setup-cua-server.ps1
│ │ ├── setup-utils.psm1
│ │ └── setup.ps1
│ ├── typescript
│ │ ├── .gitignore
│ │ ├── .nvmrc
│ │ ├── agent
│ │ │ ├── examples
│ │ │ │ ├── playground-example.html
│ │ │ │ └── README.md
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── client.ts
│ │ │ │ ├── index.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ └── client.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── computer
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── computer
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── providers
│ │ │ │ │ │ ├── base.ts
│ │ │ │ │ │ ├── cloud.ts
│ │ │ │ │ │ └── index.ts
│ │ │ │ │ └── types.ts
│ │ │ │ ├── index.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── base.ts
│ │ │ │ │ ├── factory.ts
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── linux.ts
│ │ │ │ │ ├── macos.ts
│ │ │ │ │ └── windows.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ ├── computer
│ │ │ │ │ └── cloud.test.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── factory.test.ts
│ │ │ │ │ ├── index.test.ts
│ │ │ │ │ ├── linux.test.ts
│ │ │ │ │ ├── macos.test.ts
│ │ │ │ │ └── windows.test.ts
│ │ │ │ └── setup.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── core
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── index.ts
│ │ │ │ └── telemetry
│ │ │ │ ├── clients
│ │ │ │ │ ├── index.ts
│ │ │ │ │ └── posthog.ts
│ │ │ │ └── index.ts
│ │ │ ├── tests
│ │ │ │ └── telemetry.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── cua-cli
│ │ │ ├── .gitignore
│ │ │ ├── .prettierrc
│ │ │ ├── bun.lock
│ │ │ ├── CLAUDE.md
│ │ │ ├── index.ts
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── auth.ts
│ │ │ │ ├── cli.ts
│ │ │ │ ├── commands
│ │ │ │ │ ├── auth.ts
│ │ │ │ │ └── sandbox.ts
│ │ │ │ ├── config.ts
│ │ │ │ ├── http.ts
│ │ │ │ ├── storage.ts
│ │ │ │ └── util.ts
│ │ │ └── tsconfig.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── pnpm-workspace.yaml
│ │ └── README.md
│ └── xfce
│ ├── .dockerignore
│ ├── .gitignore
│ ├── Development.md
│ ├── Dockerfile
│ ├── Dockerfile.dev
│ ├── README.md
│ └── src
│ ├── scripts
│ │ ├── resize-display.sh
│ │ ├── start-computer-server.sh
│ │ ├── start-novnc.sh
│ │ ├── start-vnc.sh
│ │ └── xstartup.sh
│ ├── supervisor
│ │ └── supervisord.conf
│ └── xfce-config
│ ├── helpers.rc
│ ├── xfce4-power-manager.xml
│ └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│ ├── agent_nb.ipynb
│ ├── blog
│ │ ├── build-your-own-operator-on-macos-1.ipynb
│ │ └── build-your-own-operator-on-macos-2.ipynb
│ ├── composite_agents_docker_nb.ipynb
│ ├── computer_nb.ipynb
│ ├── computer_server_nb.ipynb
│ ├── customizing_computeragent.ipynb
│ ├── eval_osworld.ipynb
│ ├── ollama_nb.ipynb
│ ├── README.md
│ ├── sota_hackathon_cloud.ipynb
│ └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── install-cli.ps1
│ ├── install-cli.sh
│ ├── playground-docker.sh
│ ├── playground.sh
│ ├── run-docker-dev.sh
│ └── typescript-typecheck.js
├── TESTING.md
├── tests
│ ├── agent_loop_testing
│ │ ├── agent_test.py
│ │ └── README.md
│ ├── pytest.ini
│ ├── shell_cmd.py
│ ├── test_files.py
│ ├── test_mcp_server_session_management.py
│ ├── test_mcp_server_streaming.py
│ ├── test_shell_bash.py
│ ├── test_telemetry.py
│ ├── test_tracing.py
│ ├── test_venv.py
│ └── test_watchdog.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/glm45v.py:
--------------------------------------------------------------------------------
```python
"""
GLM-4.5V agent loop implementation using liteLLM for GLM-4.5V model.
Supports vision-language models for computer control with bounding box parsing.
"""
import asyncio
import base64
import json
import re
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple
import litellm
from litellm.responses.litellm_completion_transformation.transformation import (
LiteLLMCompletionResponsesConfig,
)
from litellm.types.utils import ModelResponse
from PIL import Image
from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..responses import (
convert_completion_messages_to_responses_items,
convert_responses_items_to_completion_messages,
make_click_item,
make_double_click_item,
make_drag_item,
make_input_image_item,
make_keypress_item,
make_output_text_item,
make_reasoning_item,
make_scroll_item,
make_type_item,
make_wait_item,
)
from ..types import AgentCapability, AgentResponse, Messages, Tools
# GLM-4.5V specific constants
GLM_ACTION_SPACE = """
### {left,right,middle}_click
Call rule: `{left,right,middle}_click(start_box='[x,y]', element_info='')`
{
'name': ['left_click', 'right_click', 'middle_click'],
'description': 'Perform a left/right/middle mouse click at the specified coordinates on the screen.',
'parameters': {
'type': 'object',
'properties': {
'start_box': {
'type': 'array',
'items': {
'type': 'integer'
},
'description': 'Coordinates [x,y] where to perform the click, normalized to 0-999 range.'
},
'element_info': {
'type': 'string',
'description': 'Optional text description of the UI element being clicked.'
}
},
'required': ['start_box']
}
}
### hover
Call rule: `hover(start_box='[x,y]', element_info='')`
{
'name': 'hover',
'description': 'Move the mouse pointer to the specified coordinates without performing any click action.',
'parameters': {
'type': 'object',
'properties': {
'start_box': {
'type': 'array',
'items': {
'type': 'integer'
},
'description': 'Coordinates [x,y] where to move the mouse pointer, normalized to 0-999 range.'
},
'element_info': {
'type': 'string',
'description': 'Optional text description of the UI element being hovered over.'
}
},
'required': ['start_box']
}
}
### left_double_click
Call rule: `left_double_click(start_box='[x,y]', element_info='')`
{
'name': 'left_double_click',
'description': 'Perform a left mouse double-click at the specified coordinates on the screen.',
'parameters': {
'type': 'object',
'properties': {
'start_box': {
'type': 'array',
'items': {
'type': 'integer'
},
'description': 'Coordinates [x,y] where to perform the double-click, normalized to 0-999 range.'
},
'element_info': {
'type': 'string',
'description': 'Optional text description of the UI element being double-clicked.'
}
},
'required': ['start_box']
}
}
### left_drag
Call rule: `left_drag(start_box='[x1,y1]', end_box='[x2,y2]', element_info='')`
{
'name': 'left_drag',
'description': 'Drag the mouse from starting coordinates to ending coordinates while holding the left mouse button.',
'parameters': {
'type': 'object',
'properties': {
'start_box': {
'type': 'array',
'items': {
'type': 'integer'
},
'description': 'Starting coordinates [x1,y1] for the drag operation, normalized to 0-999 range.'
},
'end_box': {
'type': 'array',
'items': {
'type': 'integer'
},
'description': 'Ending coordinates [x2,y2] for the drag operation, normalized to 0-999 range.'
},
'element_info': {
'type': 'string',
'description': 'Optional text description of the UI element being dragged.'
}
},
'required': ['start_box', 'end_box']
}
}
### key
Call rule: `key(keys='')`
{
'name': 'key',
'description': 'Simulate pressing a single key or combination of keys on the keyboard.',
'parameters': {
'type': 'object',
'properties': {
'keys': {
'type': 'string',
'description': 'The key or key combination to press. Use '+' to separate keys in combinations (e.g., 'ctrl+c', 'alt+tab').'
}
},
'required': ['keys']
}
}
### type
Call rule: `type(content='')`
{
'name': 'type',
'description': 'Type text content into the currently focused text input field. This action only performs typing and does not handle field activation or clearing.',
'parameters': {
'type': 'object',
'properties': {
'content': {
'type': 'string',
'description': 'The text content to be typed into the active text field.'
}
},
'required': ['content']
}
}
### scroll
Call rule: `scroll(start_box='[x,y]', direction='', step=5, element_info='')`
{
'name': 'scroll',
'description': 'Scroll an element at the specified coordinates in the specified direction by a given number of wheel steps.',
'parameters': {
'type': 'object',
'properties': {
'start_box': {
'type': 'array',
'items': {
'type': 'integer'
},
'description': 'Coordinates [x,y] of the element or area to scroll, normalized to 0-999 range.'
},
'direction': {
'type': 'string',
'enum': ['down', 'up'],
'description': 'The direction to scroll: 'down' or 'up'.'
},
'step': {
'type': 'integer',
'default': 5,
'description': 'Number of wheel steps to scroll, default is 5.'
},
'element_info': {
'type': 'string',
'description': 'Optional text description of the UI element being scrolled.'
}
},
'required': ['start_box', 'direction']
}
}
### WAIT
Call rule: `WAIT()`
{
'name': 'WAIT',
'description': 'Wait for 5 seconds before proceeding to the next action.',
'parameters': {
'type': 'object',
'properties': {},
'required': []
}
}
### DONE
Call rule: `DONE()`
{
'name': 'DONE',
'description': 'Indicate that the current task has been completed successfully and no further actions are needed.',
'parameters': {
'type': 'object',
'properties': {},
'required': []
}
}
### FAIL
Call rule: `FAIL()`
{
'name': 'FAIL',
'description': 'Indicate that the current task cannot be completed or is impossible to accomplish.',
'parameters': {
'type': 'object',
'properties': {},
'required': []
}
}"""
def encode_image_to_base64(image_path: str) -> str:
"""Encode image file to base64 string with data URI."""
with open(image_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode("utf-8")
return f"data:image/png;base64,{encoded_string}"
def parse_glm_response(response: str) -> Dict[str, Any]:
"""
Parse GLM-4.5V response to extract action and memory.
The special tokens <|begin_of_box|> and <|end_of_box|> mark bounding boxes.
Coordinates are normalized values between 0 and 1000.
"""
# Extract action from between special tokens
pattern = r"<\|begin_of_box\|>(.*?)<\|end_of_box\|>"
match = re.search(pattern, response)
if match:
action = match.group(1).strip()
else:
# Fallback: look for function call patterns
action_pattern = r"[\w_]+\([^)]*\)"
matches = re.findall(action_pattern, response)
action = matches[0] if matches else None
# Extract memory section
memory_pattern = r"Memory:(.*?)$"
memory_match = re.search(memory_pattern, response, re.DOTALL)
memory = memory_match.group(1).strip() if memory_match else "[]"
# Extract action text (everything before Memory:)
action_text_pattern = r"^(.*?)Memory:"
action_text_match = re.search(action_text_pattern, response, re.DOTALL)
action_text = action_text_match.group(1).strip() if action_text_match else response
# Clean up action text by removing special tokens
if action_text:
action_text = action_text.replace("<|begin_of_box|>", "").replace("<|end_of_box|>", "")
return {"action": action, "action_text": action_text, "memory": memory}
def get_last_image_from_messages(messages: Messages) -> Optional[str]:
"""Extract the last image from messages for processing."""
for message in reversed(messages):
if isinstance(message, dict):
if message.get("type") == "computer_call_output":
output = message.get("output", {})
if isinstance(output, dict) and output.get("type") == "input_image":
image_url = output.get("image_url", "")
if isinstance(image_url, str) and image_url.startswith("data:image/"):
# Extract base64 part
return image_url.split(",", 1)[1]
elif message.get("role") == "user":
content = message.get("content", [])
if isinstance(content, list):
for item in reversed(content):
if isinstance(item, dict) and item.get("type") == "image_url":
image_url_obj = item.get("image_url", {})
if isinstance(image_url_obj, dict):
image_url = image_url_obj.get("url", "")
if isinstance(image_url, str) and image_url.startswith(
"data:image/"
):
return image_url.split(",", 1)[1]
return None
def convert_responses_items_to_glm45v_pc_prompt(
messages: Messages, task: str, memory: str = ""
) -> List[Dict[str, Any]]:
"""Convert responses items to GLM-4.5V PC prompt format with historical actions.
Args:
messages: List of message items from the conversation
task: The task description
memory: Current memory state
Returns:
List of content items for the prompt (text and image_url items)
"""
action_space = GLM_ACTION_SPACE
# Template head
head_text = f"""You are a GUI Agent, and your primary task is to respond accurately to user requests or questions. In addition to directly answering the user's queries, you can also use tools or perform GUI operations directly until you fulfill the user's request or provide a correct answer. You should carefully read and understand the images and questions provided by the user, and engage in thinking and reflection when appropriate. The coordinates involved are all represented in thousandths (0-999).
# Task:
{task}
# Task Platform
Ubuntu
# Action Space
{action_space}
# Historical Actions and Current Memory
History:"""
# Template tail
tail_text = f"""
Memory:
{memory}
# Output Format
Plain text explanation with action(param='...')
Memory:
[{{"key": "value"}}, ...]
# Some Additional Notes
- I'll give you the most recent 4 history screenshots(shrunked to 50%*50%) along with the historical action steps.
- You should put the key information you *have to remember* in a seperated memory part and I'll give it to you in the next round. The content in this part should be a dict list. If you no longer need some given information, you should remove it from the memory. Even if you don't need to remember anything, you should also output an empty list.
- My computer's password is "password", feel free to use it when you need sudo rights.
- For the thunderbird account "[email protected]", the password is "gTCI";=@y7|QJ0nDa_kN3Sb&>".
Current Screenshot:
"""
# Build history from messages
history = []
history_images = []
# Group messages into steps
current_step = []
step_num = 0
for message in messages:
msg_type = message.get("type")
if msg_type == "reasoning":
current_step.append(message)
elif msg_type == "message" and message.get("role") == "assistant":
current_step.append(message)
elif msg_type == "computer_call":
current_step.append(message)
elif msg_type == "computer_call_output":
current_step.append(message)
# End of step - process it
if current_step:
step_num += 1
# Extract bot thought from message content
bot_thought = ""
for item in current_step:
if item.get("type") == "message" and item.get("role") == "assistant":
content = item.get("content", [])
for content_item in content:
if content_item.get("type") == "output_text":
bot_thought = content_item.get("text", "")
break
break
# Extract action from computer_call
action_text = ""
for item in current_step:
if item.get("type") == "computer_call":
action = item.get("action", {})
action_type = action.get("type", "")
if action_type == "click":
x, y = action.get("x", 0), action.get("y", 0)
# Convert to 0-999 range (assuming screen dimensions)
# For now, use direct coordinates - this may need adjustment
action_text = f"left_click(start_box='[{x},{y}]')"
elif action_type == "double_click":
x, y = action.get("x", 0), action.get("y", 0)
action_text = f"left_double_click(start_box='[{x},{y}]')"
elif action_type == "right_click":
x, y = action.get("x", 0), action.get("y", 0)
action_text = f"right_click(start_box='[{x},{y}]')"
elif action_type == "drag":
# Handle drag with path
path = action.get("path", [])
if len(path) >= 2:
start = path[0]
end = path[-1]
action_text = f"left_drag(start_box='[{start.get('x', 0)},{start.get('y', 0)}]', end_box='[{end.get('x', 0)},{end.get('y', 0)}]')"
elif action_type == "keypress":
key = action.get("key", "")
action_text = f"key(keys='{key}')"
elif action_type == "type":
text = action.get("text", "")
action_text = f"type(content='{text}')"
elif action_type == "scroll":
x, y = action.get("x", 0), action.get("y", 0)
direction = action.get("direction", "down")
action_text = f"scroll(start_box='[{x},{y}]', direction='{direction}')"
elif action_type == "wait":
action_text = "WAIT()"
break
# Extract screenshot from computer_call_output
screenshot_url = None
for item in current_step:
if item.get("type") == "computer_call_output":
output = item.get("output", {})
if output.get("type") == "input_image":
screenshot_url = output.get("image_url", "")
break
# Store step info
step_info = {
"step_num": step_num,
"bot_thought": bot_thought,
"action_text": action_text,
"screenshot_url": screenshot_url,
}
history.append(step_info)
# Store screenshot for last 4 steps
if screenshot_url:
history_images.append(screenshot_url)
current_step = []
# Build content array with head, history, and tail
content = []
current_text = head_text
total_history_steps = len(history)
history_image_count = min(4, len(history_images)) # Last 4 images
for step_idx, step_info in enumerate(history):
step_num = step_info["step_num"]
bot_thought = step_info["bot_thought"]
action_text = step_info["action_text"]
if step_idx < total_history_steps - history_image_count:
# For steps beyond the last 4, use text placeholder
current_text += f"\nstep {step_num}: Screenshot:(Omitted in context.) Thought: {bot_thought}\nAction: {action_text}"
else:
# For the last 4 steps, insert images
current_text += f"\nstep {step_num}: Screenshot:"
content.append({"type": "text", "text": current_text})
# Add image
img_idx = step_idx - (total_history_steps - history_image_count)
if img_idx < len(history_images):
content.append({"type": "image_url", "image_url": {"url": history_images[img_idx]}})
current_text = f" Thought: {bot_thought}\nAction: {action_text}"
# Add tail
current_text += tail_text
content.append({"type": "text", "text": current_text})
return content
def model_dump(obj) -> Dict[str, Any]:
if isinstance(obj, dict):
return {k: model_dump(v) for k, v in obj.items()}
elif hasattr(obj, "model_dump"):
return obj.model_dump()
else:
return obj
def convert_glm_completion_to_responses_items(
response: ModelResponse, image_width: int, image_height: int
) -> List[Dict[str, Any]]:
"""
Convert GLM-4.5V completion response to responses items format.
Args:
response: LiteLLM ModelResponse from GLM-4.5V
image_width: Original image width for coordinate scaling
image_height: Original image height for coordinate scaling
Returns:
List of response items in the proper format
"""
import uuid
response_items = []
if not response.choices or not response.choices[0].message:
return response_items
message = response.choices[0].message
content = message.content or ""
reasoning_content = getattr(message, "reasoning_content", None)
# Add reasoning item if present
if reasoning_content:
reasoning_item = model_dump(make_reasoning_item(reasoning_content))
response_items.append(reasoning_item)
# Parse the content to extract action and text
parsed_response = parse_glm_response(content)
action = parsed_response.get("action", "")
action_text = parsed_response.get("action_text", "")
# Add message item with text content (excluding action and memory)
if action_text:
# Remove action from action_text if it's there
clean_text = action_text
if action and action in clean_text:
clean_text = clean_text.replace(action, "").strip()
# Remove memory section
memory_pattern = r"Memory:\s*\[.*?\]\s*$"
clean_text = re.sub(memory_pattern, "", clean_text, flags=re.DOTALL).strip()
if clean_text:
message_item = model_dump(make_output_text_item(clean_text))
response_items.append(message_item)
# Convert action to computer call if present
if action:
call_id = f"call_{uuid.uuid4().hex[:8]}"
# Parse different action types and create appropriate computer calls
if action.startswith("left_click"):
coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action)
if coord_match:
x, y = int(coord_match.group(1)), int(coord_match.group(2))
# Convert from 0-999 to actual pixel coordinates
actual_x = int((x / 999.0) * image_width)
actual_y = int((y / 999.0) * image_height)
computer_call = model_dump(make_click_item(actual_x, actual_y))
computer_call["call_id"] = call_id
computer_call["status"] = "completed"
response_items.append(computer_call)
elif action.startswith("right_click"):
coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action)
if coord_match:
x, y = int(coord_match.group(1)), int(coord_match.group(2))
actual_x = int((x / 999.0) * image_width)
actual_y = int((y / 999.0) * image_height)
computer_call = model_dump(make_click_item(actual_x, actual_y, button="right"))
computer_call["call_id"] = call_id
computer_call["status"] = "completed"
response_items.append(computer_call)
elif action.startswith("left_double_click"):
coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action)
if coord_match:
x, y = int(coord_match.group(1)), int(coord_match.group(2))
actual_x = int((x / 999.0) * image_width)
actual_y = int((y / 999.0) * image_height)
computer_call = model_dump(make_double_click_item(actual_x, actual_y))
computer_call["call_id"] = call_id
computer_call["status"] = "completed"
response_items.append(computer_call)
elif action.startswith("left_drag"):
start_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action)
end_match = re.search(r"end_box='?\[(\d+),\s*(\d+)\]'?", action)
if start_match and end_match:
x1, y1 = int(start_match.group(1)), int(start_match.group(2))
x2, y2 = int(end_match.group(1)), int(end_match.group(2))
actual_x1 = int((x1 / 999.0) * image_width)
actual_y1 = int((y1 / 999.0) * image_height)
actual_x2 = int((x2 / 999.0) * image_width)
actual_y2 = int((y2 / 999.0) * image_height)
# Create path for drag operation
drag_path = [{"x": actual_x1, "y": actual_y1}, {"x": actual_x2, "y": actual_y2}]
computer_call = model_dump(make_drag_item(drag_path))
computer_call["call_id"] = call_id
computer_call["status"] = "completed"
response_items.append(computer_call)
elif action.startswith("key"):
key_match = re.search(r"keys='([^']+)'", action)
if key_match:
keys = key_match.group(1)
# Split keys by '+' for key combinations, or use as single key
key_list = keys.split("+") if "+" in keys else [keys]
computer_call = model_dump(make_keypress_item(key_list))
computer_call["call_id"] = call_id
computer_call["status"] = "completed"
response_items.append(computer_call)
elif action.startswith("type"):
content_match = re.search(r"content='([^']*)'", action)
if content_match:
content = content_match.group(1)
computer_call = model_dump(make_type_item(content))
computer_call["call_id"] = call_id
computer_call["status"] = "completed"
response_items.append(computer_call)
elif action.startswith("scroll"):
coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action)
direction_match = re.search(r"direction='([^']+)'", action)
if coord_match and direction_match:
x, y = int(coord_match.group(1)), int(coord_match.group(2))
direction = direction_match.group(1)
actual_x = int((x / 999.0) * image_width)
actual_y = int((y / 999.0) * image_height)
# Convert direction to scroll amounts
scroll_x, scroll_y = 0, 0
if direction == "up":
scroll_y = -5
elif direction == "down":
scroll_y = 5
elif direction == "left":
scroll_x = -5
elif direction == "right":
scroll_x = 5
computer_call = model_dump(make_scroll_item(actual_x, actual_y, scroll_x, scroll_y))
computer_call["call_id"] = call_id
computer_call["status"] = "completed"
response_items.append(computer_call)
elif action == "WAIT()":
computer_call = model_dump(make_wait_item())
computer_call["call_id"] = call_id
computer_call["status"] = "completed"
response_items.append(computer_call)
return response_items
@register_agent(models=r"(?i).*GLM-4\.5V.*")
class Glm4vConfig(AsyncAgentConfig):
"""GLM-4.5V agent configuration using liteLLM."""
async def predict_step(
self,
messages: List[Dict[str, Any]],
model: str,
tools: Optional[List[Dict[str, Any]]] = None,
max_retries: Optional[int] = None,
stream: bool = False,
computer_handler=None,
use_prompt_caching: Optional[bool] = False,
_on_api_start=None,
_on_api_end=None,
_on_usage=None,
_on_screenshot=None,
**kwargs,
) -> Dict[str, Any]:
"""
Predict the next step using GLM-4.5V model.
Args:
messages: Input messages following Responses format
model: Model name to use
tools: Optional list of tool schemas
max_retries: Maximum number of retries for API calls
stream: Whether to stream the response
computer_handler: Computer handler for taking screenshots
use_prompt_caching: Whether to use prompt caching
_on_api_start: Callback for API start
_on_api_end: Callback for API end
_on_usage: Callback for usage tracking
_on_screenshot: Callback for screenshot events
Returns:
Dict with "output" and "usage" keys
"""
# Get the user instruction from the last user message
user_instruction = ""
for message in reversed(messages):
if isinstance(message, dict) and message.get("role") == "user":
content = message.get("content", "")
if isinstance(content, str):
user_instruction = content
elif isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
user_instruction = item.get("text", "")
break
break
# Get the last image for processing
last_image_b64 = get_last_image_from_messages(messages)
if not last_image_b64 and computer_handler:
# Take a screenshot if no image available
screenshot_b64 = await computer_handler.screenshot()
if screenshot_b64:
last_image_b64 = screenshot_b64
if _on_screenshot:
await _on_screenshot(screenshot_b64)
if not last_image_b64:
raise ValueError("No image available for GLM-4.5V processing")
# Convert responses items to GLM-4.5V PC prompt format with historical actions
prompt_content = convert_responses_items_to_glm45v_pc_prompt(
messages=messages,
task=user_instruction,
memory="[]", # Initialize with empty memory for now
)
# Add the current screenshot to the end
prompt_content.append(
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{last_image_b64}"}}
)
# Prepare messages for liteLLM
litellm_messages = [
{"role": "system", "content": "You are a helpful GUI agent assistant."},
{"role": "user", "content": prompt_content},
]
# Prepare API call kwargs
api_kwargs = {
"model": model,
"messages": litellm_messages,
# "max_tokens": 2048,
# "temperature": 0.001,
# "extra_body": {
# "skip_special_tokens": False,
# }
}
api_kwargs.update({k: v for k, v in (kwargs or {}).items()})
# Add API callbacks
if _on_api_start:
await _on_api_start(api_kwargs)
# Call liteLLM
response = await litellm.acompletion(**api_kwargs)
if _on_api_end:
await _on_api_end(api_kwargs, response)
# Get image dimensions for coordinate scaling
image_width, image_height = 1920, 1080 # Default dimensions
# Try to get actual dimensions from the image
try:
image_data = base64.b64decode(last_image_b64)
image = Image.open(BytesIO(image_data))
image_width, image_height = image.size
except Exception:
pass # Use default dimensions
# Convert GLM completion response to responses items
response_items = convert_glm_completion_to_responses_items(
response, image_width, image_height
)
# Extract usage information
response_usage = {
**LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage(
response.usage
).model_dump(),
"response_cost": response._hidden_params.get("response_cost", 0.0),
}
if _on_usage:
await _on_usage(response_usage)
# Create agent response
agent_response = {"output": response_items, "usage": response_usage}
return agent_response
async def predict_click(
self, model: str, image_b64: str, instruction: str, **kwargs
) -> Optional[Tuple[int, int]]:
"""
Predict click coordinates using GLM-4.5V model.
Args:
model: Model name to use
image_b64: Base64 encoded image
instruction: Instruction for where to click
Returns:
Tuple with (x, y) coordinates or None
"""
try:
# Create a simple click instruction prompt
click_prompt = f"""You are a GUI agent. Look at the screenshot and identify where to click for: {instruction}
Respond with a single click action in this format:
left_click(start_box='[x,y]')
Where x,y are coordinates normalized to 0-999 range."""
# Prepare messages for liteLLM
litellm_messages = [
{"role": "system", "content": "You are a helpful GUI agent assistant."},
{
"role": "user",
"content": [
{"type": "text", "text": click_prompt},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{image_b64}"},
},
],
},
]
# Prepare API call kwargs
api_kwargs = {
"model": model,
"messages": litellm_messages,
"max_tokens": 2056,
"temperature": 0.001,
"extra_body": {
"skip_special_tokens": False,
},
}
api_kwargs.update({k: v for k, v in (kwargs or {}).items()})
# Call liteLLM
response = await litellm.acompletion(**api_kwargs)
# Extract response content
response_content = response.choices[0].message.content.strip()
print(response)
# Parse response for click coordinates
# Look for coordinates in the response, handling special tokens
coord_pattern = r"<\|begin_of_box\|>.*?left_click\(start_box='?\[(\d+),(\d+)\]'?\).*?<\|end_of_box\|>"
match = re.search(coord_pattern, response_content)
if not match:
# Fallback: look for coordinates without special tokens
coord_pattern = r"left_click\(start_box='?\[(\d+),(\d+)\]'?\)"
match = re.search(coord_pattern, response_content)
if match:
x, y = int(match.group(1)), int(match.group(2))
# Get actual image dimensions for scaling
try:
image_data = base64.b64decode(image_b64)
image = Image.open(BytesIO(image_data))
image_width, image_height = image.size
except Exception:
# Use default dimensions
image_width, image_height = 1920, 1080
# Convert from 0-999 normalized coordinates to actual pixel coordinates
actual_x = int((x / 999.0) * image_width)
actual_y = int((y / 999.0) * image_height)
return (actual_x, actual_y)
return None
except Exception as e:
# Log error and return None
print(f"Error in predict_click: {e}")
return None
def get_capabilities(self) -> List[AgentCapability]:
"""
Get list of capabilities supported by this agent config.
Returns:
List of capability strings
"""
return ["step", "click"]
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/uitars2.py:
--------------------------------------------------------------------------------
```python
"""
UITARS-2 agent loop implementation using LiteLLM.
- Prepends a system prompt modeled after the training prompts in examples/seed_16_gui.ipynb
- Converts Responses items -> completion messages
- Calls litellm.acompletion
- Parses <seed:tool_call> ... </seed:tool_call> outputs back into Responses items (computer actions)
"""
from __future__ import annotations
import base64
import io
import json
import re
from typing import Any, Dict, List, Optional, Tuple
import litellm
from litellm.responses.litellm_completion_transformation.transformation import (
LiteLLMCompletionResponsesConfig,
)
from ..decorators import register_agent
from .omniparser import get_last_computer_call_output # type: ignore
try:
from PIL import Image # type: ignore
except Exception: # pragma: no cover
Image = None # type: ignore
from ..responses import (
convert_responses_items_to_completion_messages,
make_click_item,
make_double_click_item,
make_drag_item,
make_function_call_item,
make_keypress_item,
make_move_item,
make_output_text_item,
make_reasoning_item,
make_screenshot_item,
make_scroll_item,
make_type_item,
make_wait_item,
)
from ..types import AgentCapability
TOOL_SCHEMAS: List[Dict[str, Any]] = [
{
"type": "function",
"name": "open_computer",
"parameters": {},
"description": "Open computer.",
},
{
"type": "function",
"name": "click",
"parameters": {
"type": "object",
"properties": {
"point": {
"type": "string",
"description": "Click coordinates. The format is: <point>x y</point>",
}
},
"required": ["point"],
},
"description": "Mouse left single click action.",
},
{
"type": "function",
"name": "left_double",
"parameters": {
"type": "object",
"properties": {
"point": {
"type": "string",
"description": "Click coordinates. The format is: <point>x y</point>",
}
},
"required": ["point"],
},
"description": "Mouse left double click action.",
},
{
"type": "function",
"name": "right_single",
"parameters": {
"type": "object",
"properties": {
"point": {
"type": "string",
"description": "Click coordinates. The format is: <point>x y</point>",
}
},
"required": ["point"],
},
"description": "Mouse right single click action.",
},
{
"type": "function",
"name": "scroll",
"parameters": {
"type": "object",
"properties": {
"point": {
"type": "string",
"description": "Scroll start position. If not specified, default to execute on the current mouse position. The format is: <point>x y</point>",
},
"direction": {
"type": "string",
"description": "Scroll direction.",
"enum": ["up", "down", "left", "right"],
},
},
"required": ["direction"],
},
"description": "Scroll action.",
},
{
"type": "function",
"name": "move_to",
"parameters": {
"type": "object",
"properties": {
"point": {
"type": "string",
"description": "Target coordinates. The format is: <point>x y</point>",
}
},
"required": ["point"],
},
"description": "Mouse move action.",
},
{
"type": "function",
"name": "hotkey",
"parameters": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "Hotkeys you want to press. Split keys with a space and use lowercase.",
}
},
"required": ["key"],
},
"description": "Press hotkey.",
},
{
"type": "function",
"name": "finished",
"parameters": {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "Provide the final answer or response to complete the task.",
}
},
"required": [],
},
"description": "This function is used to indicate the completion of a task by providing the final answer or response.",
},
{
"type": "function",
"name": "press",
"parameters": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "Key you want to press. Only one key can be pressed at one time.",
}
},
"required": ["key"],
},
"description": "Press key.",
},
{
"type": "function",
"name": "release",
"parameters": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "Key you want to release. Only one key can be released at one time.",
}
},
"required": ["key"],
},
"description": "Release key.",
},
{
"type": "function",
"name": "mouse_down",
"parameters": {
"type": "object",
"properties": {
"point": {
"type": "string",
"description": "Mouse down position. If not specified, default to execute on the current mouse position. The format is: <point>x y</point>",
},
"button": {
"type": "string",
"description": "Down button. Default to left.",
"enum": ["left", "right"],
},
},
"required": [],
},
"description": "Mouse down action.",
},
{
"type": "function",
"name": "mouse_up",
"parameters": {
"type": "object",
"properties": {
"point": {
"type": "string",
"description": "Mouse up position. If not specified, default to execute on the current mouse position. The format is: <point>x y</point>",
},
"button": {
"type": "string",
"description": "Up button. Default to left.",
"enum": ["left", "right"],
},
},
"required": [],
},
"description": "Mouse up action.",
},
{
"type": "function",
"name": "call_user",
"parameters": {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "Message or information displayed to the user to request their input, feedback, or guidance.",
}
},
"required": [],
},
"description": "This function is used to interact with the user by displaying a message and requesting their input, feedback, or guidance.",
},
{
"type": "function",
"name": "wait",
"parameters": {
"type": "object",
"properties": {"time": {"type": "integer", "description": "Wait time in seconds."}},
"required": [],
},
"description": "Wait for a while.",
},
{
"type": "function",
"name": "drag",
"parameters": {
"type": "object",
"properties": {
"start_point": {
"type": "string",
"description": "Drag start point. The format is: <point>x y</point>",
},
"end_point": {
"type": "string",
"description": "Drag end point. The format is: <point>x y</point>",
},
},
"required": ["start_point", "end_point"],
},
"description": "Mouse left button drag action.",
},
{
"type": "function",
"name": "type",
"parameters": {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "Type content. If you want to submit your input, use \\n at the end of content.",
}
},
"required": ["content"],
},
"description": "Type content.",
},
{
"type": "function",
"name": "take_screenshot",
"parameters": {},
"description": "Take screenshot.",
},
]
def _format_tool_schemas_json_lines(schemas: List[Dict[str, Any]]) -> str:
# Nicely formatted: pretty JSON with indentation, separated by blank lines
return "\n\n".join(json.dumps(s, ensure_ascii=False, indent=2) for s in schemas) + "\n\n"
_PROMPT_PREFIX = (
"You should begin by detailing the internal reasoning process, and then present the answer to the user. "
"The reasoning process should be enclosed within <think_never_used_51bce0c785ca2f68081bfa7d91973934> "
"</think_never_used_51bce0c785ca2f68081bfa7d91973934> tags, as follows:\n"
"<think_never_used_51bce0c785ca2f68081bfa7d91973934> reasoning process here "
"</think_never_used_51bce0c785ca2f68081bfa7d91973934> answer here.\n\n"
"You have different modes of thinking:\n"
"Unrestricted think mode: Engage in an internal thinking process with thorough reasoning and reflections. "
"You have an unlimited budget for thinking tokens and can continue thinking until you fully solve the problem.\n"
"Efficient think mode: Provide a concise internal thinking process with efficient reasoning and reflections. "
"You don't have a strict token budget but be less verbose and more direct in your thinking.\n"
"No think mode: Respond directly to the question without any internal reasoning process or extra thinking tokens. "
"Still follow the template with the minimum required thinking tokens to justify the answer.\n"
"Budgeted think mode: Limit your internal reasoning and reflections to stay within the specified token budget\n\n"
"Based on the complexity of the problem, select the appropriate mode for reasoning among the provided options listed below.\n\n"
"Provided Mode(s):\nEfficient think.\n\n"
"You are provided with a task description, a history of previous actions, and corresponding screenshots. "
"Your goal is to perform the next action to complete the task. "
"If performing the same action multiple times results in a static screen with no changes, attempt a modified or alternative action.\n\n"
"## Function Definition\n\n"
"- You have access to the following functions:\n\n"
)
_PROMPT_SUFFIX = (
"- To call a function, use the following structure without any suffix:\n\n"
"<gui_think> reasoning process </gui_think>\n"
"<seed:tool_call><function=example_function_name><parameter=example_parameter_1>value_1</parameter>"
"<parameter=example_parameter_2>multiline...\n</parameter></function></seed:tool_call>\n\n"
"## Important Notes\n"
"- Function calls must begin with <function= and end with </function>.\n"
"- All required parameters must be explicitly provided.\n"
"\n## Additional Notes\n"
"- You can execute multiple actions within a single tool call. For example:\n"
"<seed:tool_call><function=example_function_1><parameter=example_parameter_1>value_1</parameter><parameter=example_parameter_2>\n"
"This is the value for the second parameter\nthat can span\nmultiple lines\n"
"</parameter></function><function=example_function_2><parameter=example_parameter_3>value_4</parameter></function></seed:tool_call>"
)
SYSTEM_PROMPT = _PROMPT_PREFIX + _format_tool_schemas_json_lines(TOOL_SCHEMAS) + _PROMPT_SUFFIX
def _extract_function_schemas_from_tools(
tools: Optional[List[Dict[str, Any]]],
) -> List[Dict[str, Any]]:
schemas: List[Dict[str, Any]] = []
if not tools:
return schemas
for t in tools:
if t.get("type") == "function":
fn = t.get("function", {})
name = fn.get("name")
params = fn.get("parameters", {})
desc = fn.get("description", "")
if name:
schemas.append(
{
"type": "function",
"name": name,
"parameters": params if isinstance(params, dict) else {},
"description": desc,
}
)
return schemas
def _parse_seed_tool_calls(text: str) -> List[Dict[str, Any]]:
"""Parse <seed:tool_call> blocks into a list of {function, parameters} dicts.
Also captures optional <gui_think>...</gui_think> as reasoning.
"""
actions: List[Dict[str, Any]] = []
if not text:
return actions
# Extract reasoning if present
reasoning_text = None
think_match = re.search(r"<gui_think>([\s\S]*?)</gui_think>", text)
if think_match:
reasoning_text = think_match.group(1).strip()
# Iterate each seed tool_call block
for block in re.finditer(r"<seed:tool_call>([\s\S]*?)</seed:tool_call>", text):
content = block.group(1)
# One or multiple <function=...>...</function> inside
for fmatch in re.finditer(r"<function=([\w_]+)>([\s\S]*?)</function>", content):
fname = fmatch.group(1)
inner = fmatch.group(2)
params: Dict[str, str] = {}
for pmatch in re.finditer(r"<parameter=([\w_]+)>([\s\S]*?)</parameter>", inner):
pname = pmatch.group(1)
pval = pmatch.group(2).strip()
params[pname] = pval
actions.append({"function": fname, "parameters": params})
# If we have a global reasoning and at least one action, attach it to first
if reasoning_text and actions:
actions[0]["reasoning"] = reasoning_text
elif reasoning_text:
actions.append({"function": "reasoning", "parameters": {"content": reasoning_text}})
return actions
def _normalize_xy_to_uitars(x: int, y: int, width: int, height: int) -> Tuple[int, int]:
width = max(1, int(width))
height = max(1, int(height))
nx = max(0, min(1000, int(round((x / width) * 1000))))
ny = max(0, min(1000, int(round((y / height) * 1000))))
return nx, ny
def _denormalize_xy_from_uitars(nx: float, ny: float, width: int, height: int) -> Tuple[int, int]:
width = max(1, int(width))
height = max(1, int(height))
x = int(round((nx / 1000.0) * width))
y = int(round((ny / 1000.0) * height))
return x, y
def _map_computer_action_to_function(
action: Dict[str, Any], width: int, height: int
) -> Optional[Dict[str, Any]]:
"""Map a computer action item to a UITARS function + parameters dict of strings.
Returns dict like {"function": name, "parameters": {..}} or None if unknown.
"""
atype = action.get("type") or action.get("action")
if atype == "click":
x, y = action.get("x"), action.get("y")
btn = action.get("button", "left")
if x is None or y is None:
return None
nx, ny = _normalize_xy_to_uitars(int(x), int(y), width, height)
if btn == "right":
return {
"function": "right_single",
"parameters": {"point": f"<point>{nx} {ny}</point>"},
}
return {"function": "click", "parameters": {"point": f"<point>{nx} {ny}</point>"}}
if atype == "double_click":
x, y = action.get("x"), action.get("y")
if x is None or y is None:
return None
nx, ny = _normalize_xy_to_uitars(int(x), int(y), width, height)
return {"function": "left_double", "parameters": {"point": f"<point>{nx} {ny}</point>"}}
if atype == "move":
x, y = action.get("x"), action.get("y")
if x is None or y is None:
return None
nx, ny = _normalize_xy_to_uitars(int(x), int(y), width, height)
return {"function": "move_to", "parameters": {"point": f"<point>{nx} {ny}</point>"}}
if atype == "keypress":
keys = action.get("keys", [])
if isinstance(keys, list) and keys:
if len(keys) == 1:
return {"function": "press", "parameters": {"key": keys[0]}}
else:
return {"function": "hotkey", "parameters": {"key": " ".join(keys)}}
return None
if atype == "type":
text = action.get("text", "")
return {"function": "type", "parameters": {"content": text}}
if atype == "scroll":
x, y = action.get("x", 512), action.get("y", 512)
nx, ny = _normalize_xy_to_uitars(int(x), int(y), width, height)
sx, sy = action.get("scroll_x", 0), action.get("scroll_y", 0)
# Our parser used positive sy for up
direction = (
"up"
if sy and sy > 0
else (
"down"
if sy and sy < 0
else ("right" if sx and sx > 0 else ("left" if sx and sx < 0 else "down"))
)
)
return {
"function": "scroll",
"parameters": {"direction": direction, "point": f"<point>{nx} {ny}</point>"},
}
if atype == "drag":
path = action.get("path", [])
if isinstance(path, list) and len(path) >= 2:
sx, sy = path[0].get("x"), path[0].get("y")
ex, ey = path[-1].get("x"), path[-1].get("y")
if sx is None or sy is None or ex is None or ey is None:
return None
nsx, nsy = _normalize_xy_to_uitars(int(sx), int(sy), width, height)
nex, ney = _normalize_xy_to_uitars(int(ex), int(ey), width, height)
return {
"function": "drag",
"parameters": {
"start_point": f"<point>{nsx} {nsy}</point>",
"end_point": f"<point>{nex} {ney}</point>",
},
}
return None
if atype == "wait":
return {"function": "wait", "parameters": {}}
if atype == "screenshot":
return {"function": "take_screenshot", "parameters": {}}
# Fallback unknown
return None
def _to_uitars_messages(
messages: List[Dict[str, Any]], width: int, height: int
) -> List[Dict[str, Any]]:
"""Convert responses items into completion messages tailored for UI-TARS.
- User content is passed through similar to convert_responses_items_to_completion_messages
- Assistant/tool history is rendered as text with <gui_think> and <seed:tool_call> blocks
"""
uitars_messages: List[Dict[str, Any]] = []
def flush_seed_block(pending_think: Optional[str], pending_functions: List[Dict[str, Any]]):
if not pending_think and not pending_functions:
return
parts: List[str] = []
if pending_think:
parts.append(f"<gui_think> {pending_think} </gui_think>")
if pending_functions:
inner = []
for f in pending_functions:
fname = f["function"]
params = f.get("parameters", {})
param_blocks = []
for k, v in params.items():
param_blocks.append(f"<parameter={k}>{v}</parameter>")
inner.append(f"<function={fname}>{''.join(param_blocks)}</function>")
parts.append(f"<seed:tool_call>{''.join(inner)}</seed:tool_call>")
uitars_messages.append({"role": "assistant", "content": "".join(parts)})
# Accumulators for a single assistant seed block
pending_think: Optional[str] = None
pending_functions: List[Dict[str, Any]] = []
for msg in messages:
mtype = msg.get("type")
role = msg.get("role")
# On any user message, flush current assistant block
if role == "user" or mtype == "user":
flush_seed_block(pending_think, pending_functions)
pending_think, pending_functions = None, []
content = msg.get("content", "")
if isinstance(content, list):
completion_content = []
for item in content:
if item.get("type") == "input_image":
completion_content.append(
{"type": "image_url", "image_url": {"url": item.get("image_url")}}
)
elif item.get("type") in ("input_text", "text"):
completion_content.append({"type": "text", "text": item.get("text")})
uitars_messages.append({"role": "user", "content": completion_content})
elif isinstance(content, str):
uitars_messages.append({"role": "user", "content": content})
continue
# Reasoning item
if mtype == "reasoning":
# Responses reasoning stores summary list
summary = msg.get("summary", [])
texts = [
s.get("text", "")
for s in summary
if isinstance(s, dict) and s.get("type") == "summary_text"
]
if texts:
pending_think = "\n".join([t for t in texts if t])
continue
# Computer/tool calls -> map to functions
if mtype == "computer_call":
f = _map_computer_action_to_function(msg.get("action", {}), width, height)
if f:
pending_functions.append(f)
continue
if mtype == "function_call":
# Include custom tools as-is
name = msg.get("name")
try:
args_obj = json.loads(msg.get("arguments", "{}"))
except json.JSONDecodeError:
args_obj = {}
# Ensure string values
params = {k: (str(v) if not isinstance(v, str) else v) for k, v in args_obj.items()}
pending_functions.append({"function": name, "parameters": params})
continue
# If assistant message text is given, flush current block and add as plain assistant text
if role == "assistant" or mtype == "message":
flush_seed_block(pending_think, pending_functions)
pending_think, pending_functions = None, []
content = msg.get("content", [])
if isinstance(content, list):
texts = [
c.get("text", "")
for c in content
if isinstance(c, dict) and c.get("type") in ("output_text", "text")
]
if texts:
uitars_messages.append(
{"role": "assistant", "content": "\n".join([t for t in texts if t])}
)
elif isinstance(content, str) and content:
uitars_messages.append({"role": "assistant", "content": content})
continue
# On outputs, flush pending assistant block and send outputs as user messages
if mtype in ("function_call_output", "computer_call_output"):
flush_seed_block(pending_think, pending_functions)
pending_think, pending_functions = None, []
output = msg.get("output")
if isinstance(output, dict) and output.get("type") == "input_image":
img_url = output.get("image_url")
if img_url:
uitars_messages.append(
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": img_url}},
],
}
)
elif isinstance(output, str):
uitars_messages.append({"role": "user", "content": output})
else:
# Fallback stringify
uitars_messages.append({"role": "user", "content": json.dumps(output)})
continue
# Flush any remaining pending seed block
flush_seed_block(pending_think, pending_functions)
return uitars_messages
def _to_response_items(
actions: List[Dict[str, Any]],
tool_names: Optional[set[str]] = None,
width: Optional[int] = None,
height: Optional[int] = None,
) -> List[Any]:
"""Map parsed actions into Responses items (computer actions + optional reasoning)."""
items: List[Any] = []
tool_names = tool_names or set()
# Optional top-level reasoning attached to first
if actions and actions[0].get("reasoning"):
items.append(make_reasoning_item(actions[0]["reasoning"]))
# Dimensions default
w = int(width) if width else 1024
h = int(height) if height else 768
for a in actions:
fn = a.get("function")
params = a.get("parameters", {})
if fn == "reasoning":
items.append(make_reasoning_item(params.get("content", "")))
elif fn in ("click", "left_double", "right_single"):
# params.point is like: <point>x y</point> or plain "x y"
point = params.get("point", "").strip()
m = re.search(r"([\-\d\.]+)\s+([\-\d\.]+)", point)
if not m:
continue
nx = float(m.group(1))
ny = float(m.group(2))
x, y = _denormalize_xy_from_uitars(nx, ny, w, h)
if fn == "left_double":
items.append(make_double_click_item(x, y))
elif fn == "right_single":
items.append(make_click_item(x, y, "right"))
else:
items.append(make_click_item(x, y, "left"))
elif fn == "move_to":
point = params.get("point", "").strip()
m = re.search(r"([\-\d\.]+)\s+([\-\d\.]+)", point)
if not m:
continue
nx = float(m.group(1))
ny = float(m.group(2))
x, y = _denormalize_xy_from_uitars(nx, ny, w, h)
items.append(make_move_item(x, y))
elif fn == "drag":
sp = params.get("start_point", "").strip()
ep = params.get("end_point", "").strip()
ms = re.search(r"([\-\d\.]+)\s+([\-\d\.]+)", sp)
me = re.search(r"([\-\d\.]+)\s+([\-\d\.]+)", ep)
if not (ms and me):
continue
nsx, nsy = float(ms.group(1)), float(ms.group(2))
nex, ney = float(me.group(1)), float(me.group(2))
sx, sy = _denormalize_xy_from_uitars(nsx, nsy, w, h)
ex, ey = _denormalize_xy_from_uitars(nex, ney, w, h)
items.append(make_drag_item([{"x": sx, "y": sy}, {"x": ex, "y": ey}]))
elif fn == "hotkey":
key = params.get("key", "")
keys = key.split()
if keys:
items.append(make_keypress_item(keys))
elif fn == "press":
key = params.get("key", "")
if key:
items.append(make_keypress_item([key]))
elif fn == "type":
content = params.get("content", "")
items.append(make_type_item(content))
elif fn == "scroll":
# direction: up/down/left/right. Point optional
direction = params.get("direction", "down").lower()
point = params.get("point", "")
m = re.search(r"([\-\d\.]+)\s+([\-\d\.]+)", point)
if m:
nx = float(m.group(1))
ny = float(m.group(2))
x, y = _denormalize_xy_from_uitars(nx, ny, w, h)
else:
x, y = _denormalize_xy_from_uitars(500.0, 500.0, w, h)
dy = 5 if direction == "up" else -5
dx = 5 if direction == "right" else (-5 if direction == "left" else 0)
items.append(make_scroll_item(x, y, dx, dy))
elif fn == "wait":
items.append(make_wait_item())
elif fn == "finished":
content = params.get("content", "")
items.append(make_output_text_item(content or "Task completed."))
break
elif fn == "take_screenshot":
items.append(make_screenshot_item())
elif fn == "open_computer":
items.append(make_screenshot_item())
else:
# If this function name is present in provided tool schemas, emit function_call
if fn in tool_names:
# Convert simple string params into an arguments object
# Parameters are strings; pass through as-is
items.append(make_function_call_item(fn, params))
else:
# Unknown function -> surface as assistant text
items.append(make_output_text_item(f"Unknown action: {fn} {params}"))
return items
@register_agent(models=r"(?i).*ui-?tars-?2.*")
class UITARS2Config:
async def predict_step(
self,
messages: List[Dict[str, Any]],
model: str,
tools: Optional[List[Dict[str, Any]]] = None,
max_retries: Optional[int] = None,
stream: bool = False,
computer_handler=None,
use_prompt_caching: Optional[bool] = False,
_on_api_start=None,
_on_api_end=None,
_on_usage=None,
_on_screenshot=None,
**kwargs,
) -> Dict[str, Any]:
# Determine screen dimensions (prefer computer_handler, fallback to last screenshot)
width: Optional[int] = None
height: Optional[int] = None
if computer_handler is not None and hasattr(computer_handler, "get_dimensions"):
try:
dims = await computer_handler.get_dimensions() # type: ignore
if isinstance(dims, (list, tuple)) and len(dims) == 2:
width, height = int(dims[0]), int(dims[1])
except Exception:
pass
if width is None or height is None:
try:
last_out = get_last_computer_call_output(messages) # type: ignore
if last_out:
image_url = last_out.get("output", {}).get("image_url", "")
if image_url:
b64 = image_url.split(",")[-1]
img_bytes = base64.b64decode(b64)
if Image is not None:
img = Image.open(io.BytesIO(img_bytes))
width, height = img.size
except Exception:
pass
if width is None or height is None:
width, height = 1024, 768
# Convert Responses items to UI-TARS style messages with <seed:tool_call> history
completion_messages = _to_uitars_messages(messages, width, height)
# Build dynamic system prompt by concatenating built-in schemas and provided function tools
provided_fn_schemas = _extract_function_schemas_from_tools(tools)
combined_schemas = (
TOOL_SCHEMAS + provided_fn_schemas if provided_fn_schemas else TOOL_SCHEMAS
)
dynamic_system_prompt = (
_PROMPT_PREFIX + _format_tool_schemas_json_lines(combined_schemas) + _PROMPT_SUFFIX
)
# Prepend system prompt (based on training prompts + provided tools)
litellm_messages: List[Dict[str, Any]] = [
{"role": "system", "content": dynamic_system_prompt},
]
litellm_messages.extend(completion_messages)
api_kwargs: Dict[str, Any] = {
"model": model,
"messages": litellm_messages,
"max_retries": max_retries,
"stream": stream,
**{k: v for k, v in kwargs.items()},
}
if use_prompt_caching:
api_kwargs["use_prompt_caching"] = use_prompt_caching
if _on_api_start:
await _on_api_start(api_kwargs)
response = await litellm.acompletion(**api_kwargs)
if _on_api_end:
await _on_api_end(api_kwargs, response)
usage = {
**LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage( # type: ignore
response.usage
).model_dump(),
"response_cost": response._hidden_params.get("response_cost", 0.0),
}
if _on_usage:
await _on_usage(usage)
# Extract text content (first choice)
response_dict = response.model_dump() # type: ignore
content_text = ""
choices = response_dict.get("choices", [])
if choices:
msg = choices[0].get("message", {})
# message.content may be string or array; gather text pieces
mc = msg.get("content")
if isinstance(mc, str):
content_text = mc
elif isinstance(mc, list):
parts = []
for part in mc:
if isinstance(part, dict) and part.get("type") == "text":
parts.append(part.get("text", ""))
content_text = "\n".join([p for p in parts if p])
# Parse the seed tool calls and map to response items
actions = _parse_seed_tool_calls(content_text)
# Build set of tool names from provided tools to emit function_call items
tool_names: set[str] = set()
for s in provided_fn_schemas:
name = s.get("name")
if isinstance(name, str):
tool_names.add(name)
output_items = _to_response_items(actions, tool_names, width, height)
return {"output": output_items, "usage": usage}
def get_capabilities(self) -> List[AgentCapability]:
return ["step"]
async def predict_click(
self, model: str, image_b64: str, instruction: str, **kwargs
) -> Optional[Tuple[int, int]]:
"""Predict a single click coordinate using a minimal prompt with a click tool.
This sends the current screenshot and instruction, asking the model to
output a click action in the form:
Action: click(point='(x,y)')
"""
# Minimal grounding-style prompt
system_text = (
"You are a GUI agent. Given the instruction, return a single action on the current screen.\n\n"
"## Output Format\n\n"
"Action: click(point='(x,y)')\n\n"
"## User Instruction\n"
f"{instruction}"
)
# Build messages with image
litellm_messages: List[Dict[str, Any]] = [
{"role": "system", "content": system_text},
{
"role": "user",
"content": [
{"type": "text", "text": "Please return a single click action."},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{image_b64}"},
},
],
},
]
api_kwargs: Dict[str, Any] = {
"model": model,
"messages": litellm_messages,
"max_tokens": kwargs.get("max_tokens", 512),
"temperature": kwargs.get("temperature", 0.0),
"do_sample": kwargs.get("temperature", 0.0) > 0.0,
}
api_kwargs.update(
{k: v for k, v in (kwargs or {}).items() if k not in ["max_tokens", "temperature"]}
)
response = await litellm.acompletion(**api_kwargs)
# Extract response content
response_dict = response.model_dump() # type: ignore
choices = response_dict.get("choices", [])
if not choices:
return None
msg = choices[0].get("message", {})
content_text = msg.get("content", "")
if isinstance(content_text, list):
text_parts = [
p.get("text", "")
for p in content_text
if isinstance(p, dict) and p.get("type") == "text"
]
content_text = "\n".join([t for t in text_parts if t])
if not isinstance(content_text, str):
return None
# Parse coordinates
# Pattern for click(point='(x,y)') or click(start_box='(x,y)')
patterns = [
r"click\(point='\((\d+),(\d+)\)'\)",
r"click\((?:start_box|point)='\((\d+),(\d+)\)'\)",
]
for pat in patterns:
m = re.search(pat, content_text)
if m:
try:
x, y = int(m.group(1)), int(m.group(2))
return (x, y)
except Exception:
pass
return None
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/ui/gradio/ui_components.py:
--------------------------------------------------------------------------------
```python
"""
UI Components for the Gradio interface
"""
import asyncio
import json
import logging
import os
import platform
from pathlib import Path
from typing import Any, Dict, List, Optional, cast
import gradio as gr
from gradio.components.chatbot import MetadataDict
from .app import (
create_agent,
get_model_string,
get_ollama_models,
global_agent,
global_computer,
load_settings,
save_settings,
)
# Global messages array to maintain conversation history
global_messages = []
def create_gradio_ui() -> gr.Blocks:
"""Create a Gradio UI for the Computer-Use Agent."""
# Load settings
saved_settings = load_settings()
# Check for API keys
openai_api_key = os.environ.get("OPENAI_API_KEY", "")
anthropic_api_key = os.environ.get("ANTHROPIC_API_KEY", "")
cua_api_key = os.environ.get("CUA_API_KEY", "")
# Model choices
openai_models = ["OpenAI: Computer-Use Preview"]
anthropic_models = [
"Anthropic: Claude 4 Opus (20250514)",
"Anthropic: Claude 4 Sonnet (20250514)",
"Anthropic: Claude 3.7 Sonnet (20250219)",
]
omni_models = [
"OMNI: OpenAI GPT-4o",
"OMNI: OpenAI GPT-4o mini",
"OMNI: Claude 3.7 Sonnet (20250219)",
]
# Check if API keys are available
has_openai_key = bool(openai_api_key)
has_anthropic_key = bool(anthropic_api_key)
has_cua_key = bool(cua_api_key)
# Get Ollama models for OMNI
ollama_models = get_ollama_models()
if ollama_models:
omni_models += ollama_models
# Detect platform
is_mac = platform.system().lower() == "darwin"
# Format model choices
provider_to_models = {
"OPENAI": openai_models,
"ANTHROPIC": anthropic_models,
"OMNI": omni_models + ["Custom model (OpenAI compatible API)", "Custom model (ollama)"],
"UITARS": (
[
"huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B",
]
if is_mac
else []
)
+ ["Custom model (OpenAI compatible API)"],
}
# Apply saved settings
initial_loop = saved_settings.get("agent_loop", "OMNI")
available_models_for_loop = provider_to_models.get(initial_loop, [])
saved_model_choice = saved_settings.get("model_choice")
if saved_model_choice and saved_model_choice in available_models_for_loop:
initial_model = saved_model_choice
else:
if initial_loop == "OPENAI":
initial_model = openai_models[0] if openai_models else "No models available"
elif initial_loop == "ANTHROPIC":
initial_model = anthropic_models[0] if anthropic_models else "No models available"
else: # OMNI
initial_model = (
omni_models[0] if omni_models else "Custom model (OpenAI compatible API)"
)
initial_custom_model = saved_settings.get("custom_model", "Qwen2.5-VL-7B-Instruct")
initial_provider_base_url = saved_settings.get("provider_base_url", "http://localhost:1234/v1")
initial_save_trajectory = saved_settings.get("save_trajectory", True)
initial_recent_images = saved_settings.get("recent_images", 3)
# Example prompts
example_messages = [
"Create a Python virtual environment, install pandas and matplotlib, then plot stock data",
"Open a PDF in Preview, add annotations, and save it as a compressed version",
"Open Safari, search for 'macOS automation tools', and save the first three results as bookmarks",
"Configure SSH keys and set up a connection to a remote server",
]
def generate_python_code(
agent_loop_choice,
model_name,
tasks,
recent_images=3,
save_trajectory=True,
computer_os="linux",
computer_provider="cloud",
container_name="",
cua_cloud_api_key="",
max_budget=None,
):
"""Generate Python code for the current configuration and tasks."""
tasks_str = ""
for task in tasks:
if task and task.strip():
tasks_str += f' "{task}",\n'
model_string = get_model_string(model_name, agent_loop_choice)
computer_args = []
if computer_os != "macos":
computer_args.append(f'os_type="{computer_os}"')
if computer_provider != "lume":
computer_args.append(f'provider_type="{computer_provider}"')
if container_name:
computer_args.append(f'name="{container_name}"')
if cua_cloud_api_key:
computer_args.append(f'api_key="{cua_cloud_api_key}"')
computer_args_str = ", ".join(computer_args)
if computer_args_str:
computer_args_str = f"({computer_args_str})"
else:
computer_args_str = "()"
code = f"""import asyncio
from computer import Computer
from agent import ComputerAgent
async def main():
async with Computer{computer_args_str} as computer:
agent = ComputerAgent(
model="{model_string}",
tools=[computer],
only_n_most_recent_images={recent_images},"""
if save_trajectory:
code += """
trajectory_dir="trajectories","""
if max_budget:
code += f"""
max_trajectory_budget={{"max_budget": {max_budget}, "raise_error": True}},"""
code += """
)
"""
if tasks_str:
code += f"""
# Prompts for the computer-use agent
tasks = [
{tasks_str.rstrip()}
]
for task in tasks:
print(f"Executing task: {{task}}")
messages = [{{"role": "user", "content": task}}]
async for result in agent.run(messages):
for item in result["output"]:
if item["type"] == "message":
print(item["content"][0]["text"])"""
else:
code += """
# Execute a single task
task = "Search for information about CUA on GitHub"
print(f"Executing task: {task}")
messages = [{"role": "user", "content": task}]
async for result in agent.run(messages):
for item in result["output"]:
if item["type"] == "message":
print(item["content"][0]["text"])"""
code += """
if __name__ == "__main__":
asyncio.run(main())"""
return code
# Create the Gradio interface
with gr.Blocks(title="Computer-Use Agent") as demo:
with gr.Row():
# Left column for settings
with gr.Column(scale=1):
# Logo
gr.HTML(
"""
<div style="display: flex; justify-content: center; margin-bottom: 0.5em">
<img alt="CUA Logo" style="width: 80px;"
src="https://github.com/trycua/cua/blob/main/img/logo_white.png?raw=true" />
</div>
"""
)
# Python code accordion
with gr.Accordion("Python Code", open=False):
code_display = gr.Code(
language="python",
value=generate_python_code(initial_loop, "gpt-4o", []),
interactive=False,
)
with gr.Accordion("Computer Configuration", open=True):
is_windows = platform.system().lower() == "windows"
is_mac = platform.system().lower() == "darwin"
providers = ["cloud", "localhost", "docker"]
if is_mac:
providers += ["lume"]
if is_windows:
providers += ["winsandbox"]
# Remove unavailable options
# MacOS is unavailable if Lume is not available
# Windows is unavailable if Winsandbox is not available
# Linux is always available
# This should be removed once we support macOS and Windows on the cloud provider
computer_choices = ["macos", "linux", "windows"]
if not is_mac or "lume" not in providers:
computer_choices.remove("macos")
if not is_windows or "winsandbox" not in providers:
computer_choices.remove("windows")
computer_os = gr.Radio(
choices=computer_choices,
label="Operating System",
value=computer_choices[0],
info="Select the operating system for the computer",
)
computer_provider = gr.Radio(
choices=providers,
label="Provider",
value="lume" if is_mac else "cloud",
info="Select the computer provider",
)
container_name = gr.Textbox(
label="Container Name",
placeholder="Enter container name (optional)",
value=os.environ.get("CUA_CONTAINER_NAME", ""),
info="Optional name for the container",
)
cua_cloud_api_key = gr.Textbox(
label="CUA Cloud API Key",
placeholder="Enter your CUA Cloud API key",
value=os.environ.get("CUA_API_KEY", ""),
type="password",
info="Required for cloud provider",
visible=(not has_cua_key),
)
with gr.Accordion("Agent Configuration", open=True):
agent_loop = gr.Dropdown(
choices=["OPENAI", "ANTHROPIC", "OMNI", "UITARS"],
label="Agent Loop",
value=initial_loop,
info="Select the agent loop provider",
)
# Model selection dropdowns
with gr.Group() as model_selection_group:
openai_model_choice = gr.Dropdown(
choices=openai_models,
label="OpenAI Model",
value=openai_models[0] if openai_models else "No models available",
info="Select OpenAI model",
interactive=True,
visible=(initial_loop == "OPENAI"),
)
anthropic_model_choice = gr.Dropdown(
choices=anthropic_models,
label="Anthropic Model",
value=(
anthropic_models[0] if anthropic_models else "No models available"
),
info="Select Anthropic model",
interactive=True,
visible=(initial_loop == "ANTHROPIC"),
)
omni_model_choice = gr.Dropdown(
choices=omni_models
+ ["Custom model (OpenAI compatible API)", "Custom model (ollama)"],
label="OMNI Model",
value=(
omni_models[0]
if omni_models
else "Custom model (OpenAI compatible API)"
),
info="Select OMNI model or choose a custom model option",
interactive=True,
visible=(initial_loop == "OMNI"),
)
uitars_model_choice = gr.Dropdown(
choices=provider_to_models.get("UITARS", ["No models available"]),
label="UITARS Model",
value=(
provider_to_models.get("UITARS", ["No models available"])[0]
if provider_to_models.get("UITARS")
else "No models available"
),
info="Select UITARS model",
interactive=True,
visible=(initial_loop == "UITARS"),
)
model_choice = gr.Textbox(visible=False)
# API key inputs
with gr.Group(
visible=not has_openai_key
and (initial_loop == "OPENAI" or initial_loop == "OMNI")
) as openai_key_group:
openai_api_key_input = gr.Textbox(
label="OpenAI API Key",
placeholder="Enter your OpenAI API key",
value=os.environ.get("OPENAI_API_KEY", ""),
interactive=True,
type="password",
info="Required for OpenAI models",
)
with gr.Group(
visible=not has_anthropic_key
and (initial_loop == "ANTHROPIC" or initial_loop == "OMNI")
) as anthropic_key_group:
anthropic_api_key_input = gr.Textbox(
label="Anthropic API Key",
placeholder="Enter your Anthropic API key",
value=os.environ.get("ANTHROPIC_API_KEY", ""),
interactive=True,
type="password",
info="Required for Anthropic models",
)
# API key handlers
def set_openai_api_key(key):
if key and key.strip():
os.environ["OPENAI_API_KEY"] = key.strip()
print("DEBUG - Set OpenAI API key environment variable")
return key
def set_anthropic_api_key(key):
if key and key.strip():
os.environ["ANTHROPIC_API_KEY"] = key.strip()
print("DEBUG - Set Anthropic API key environment variable")
return key
openai_api_key_input.change(
fn=set_openai_api_key,
inputs=[openai_api_key_input],
outputs=[openai_api_key_input],
queue=False,
)
anthropic_api_key_input.change(
fn=set_anthropic_api_key,
inputs=[anthropic_api_key_input],
outputs=[anthropic_api_key_input],
queue=False,
)
# UI update function
def update_ui(
loop=None,
openai_model=None,
anthropic_model=None,
omni_model=None,
uitars_model=None,
):
loop = loop or agent_loop.value
model_value = None
if loop == "OPENAI" and openai_model:
model_value = openai_model
elif loop == "ANTHROPIC" and anthropic_model:
model_value = anthropic_model
elif loop == "OMNI" and omni_model:
model_value = omni_model
elif loop == "UITARS" and uitars_model:
model_value = uitars_model
openai_visible = loop == "OPENAI"
anthropic_visible = loop == "ANTHROPIC"
omni_visible = loop == "OMNI"
uitars_visible = loop == "UITARS"
show_openai_key = not has_openai_key and (
loop == "OPENAI"
or (
loop == "OMNI"
and model_value
and "OpenAI" in model_value
and "Custom" not in model_value
)
)
show_anthropic_key = not has_anthropic_key and (
loop == "ANTHROPIC"
or (
loop == "OMNI"
and model_value
and "Claude" in model_value
and "Custom" not in model_value
)
)
is_custom_openai_api = model_value == "Custom model (OpenAI compatible API)"
is_custom_ollama = model_value == "Custom model (ollama)"
is_any_custom = is_custom_openai_api or is_custom_ollama
model_choice_value = model_value if model_value else ""
return [
gr.update(visible=openai_visible),
gr.update(visible=anthropic_visible),
gr.update(visible=omni_visible),
gr.update(visible=uitars_visible),
gr.update(visible=show_openai_key),
gr.update(visible=show_anthropic_key),
gr.update(visible=is_any_custom),
gr.update(visible=is_custom_openai_api),
gr.update(visible=is_custom_openai_api),
gr.update(value=model_choice_value),
]
# Custom model inputs
custom_model = gr.Textbox(
label="Custom Model Name",
placeholder="Enter custom model name (e.g., Qwen2.5-VL-7B-Instruct or llama3)",
value=initial_custom_model,
visible=(
initial_model == "Custom model (OpenAI compatible API)"
or initial_model == "Custom model (ollama)"
),
interactive=True,
)
provider_base_url = gr.Textbox(
label="Provider Base URL",
placeholder="Enter provider base URL (e.g., http://localhost:1234/v1)",
value=initial_provider_base_url,
visible=(initial_model == "Custom model (OpenAI compatible API)"),
interactive=True,
)
provider_api_key = gr.Textbox(
label="Provider API Key",
placeholder="Enter provider API key (if required)",
value="",
visible=(initial_model == "Custom model (OpenAI compatible API)"),
interactive=True,
type="password",
)
# Provider visibility update function
def update_provider_visibility(provider):
"""Update visibility of container name and API key based on selected provider."""
is_localhost = provider == "localhost"
return [
gr.update(visible=not is_localhost), # container_name
gr.update(
visible=not is_localhost and not has_cua_key
), # cua_cloud_api_key
]
# Connect provider change event
computer_provider.change(
fn=update_provider_visibility,
inputs=[computer_provider],
outputs=[container_name, cua_cloud_api_key],
queue=False,
)
# Connect UI update events
for dropdown in [
agent_loop,
omni_model_choice,
uitars_model_choice,
openai_model_choice,
anthropic_model_choice,
]:
dropdown.change(
fn=update_ui,
inputs=[
agent_loop,
openai_model_choice,
anthropic_model_choice,
omni_model_choice,
uitars_model_choice,
],
outputs=[
openai_model_choice,
anthropic_model_choice,
omni_model_choice,
uitars_model_choice,
openai_key_group,
anthropic_key_group,
custom_model,
provider_base_url,
provider_api_key,
model_choice,
],
queue=False,
)
save_trajectory = gr.Checkbox(
label="Save Trajectory",
value=initial_save_trajectory,
info="Save the agent's trajectory for debugging",
interactive=True,
)
recent_images = gr.Slider(
label="Recent Images",
minimum=1,
maximum=10,
value=initial_recent_images,
step=1,
info="Number of recent images to keep in context",
interactive=True,
)
max_budget = gr.Number(
label="Max Budget ($)",
value=lambda: None,
minimum=-1,
maximum=100.0,
step=0.1,
info="Optional budget limit for trajectory (0 = no limit)",
interactive=True,
)
# Right column for chat interface
with gr.Column(scale=2):
gr.Markdown(
"Ask me to perform tasks in a virtual environment.<br>Built with <a href='https://github.com/trycua/cua' target='_blank'>github.com/trycua/cua</a>."
)
chatbot_history = gr.Chatbot(type="messages")
msg = gr.Textbox(placeholder="Ask me to perform tasks in a virtual environment")
clear = gr.Button("Clear")
cancel_button = gr.Button("Cancel", variant="stop")
# Add examples
example_group = gr.Examples(examples=example_messages, inputs=msg)
# Chat submission function
def chat_submit(message, history):
history.append(gr.ChatMessage(role="user", content=message))
return "", history
# Cancel function
async def cancel_agent_task(history):
global global_agent
if global_agent:
print("DEBUG - Cancelling agent task")
history.append(
gr.ChatMessage(
role="assistant",
content="Task cancelled by user",
metadata={"title": "❌ Cancelled"},
)
)
else:
history.append(
gr.ChatMessage(
role="assistant",
content="No active agent task to cancel",
metadata={"title": "ℹ️ Info"},
)
)
return history
# Process response function
async def process_response(
history,
openai_model_value,
anthropic_model_value,
omni_model_value,
uitars_model_value,
custom_model_value,
agent_loop_choice,
save_traj,
recent_imgs,
custom_url_value=None,
custom_api_key=None,
openai_key_input=None,
anthropic_key_input=None,
computer_os="linux",
computer_provider="cloud",
container_name="",
cua_cloud_api_key="",
max_budget_value=None,
):
if not history:
yield history
return
# Get the last user message
last_user_message = history[-1]["content"]
# Get the appropriate model value based on the agent loop
if agent_loop_choice == "OPENAI":
model_choice_value = openai_model_value
elif agent_loop_choice == "ANTHROPIC":
model_choice_value = anthropic_model_value
elif agent_loop_choice == "OMNI":
model_choice_value = omni_model_value
elif agent_loop_choice == "UITARS":
model_choice_value = uitars_model_value
else:
model_choice_value = "No models available"
# Determine if this is a custom model selection
is_custom_model_selected = model_choice_value in [
"Custom model (OpenAI compatible API)",
"Custom model (ollama)",
]
# Determine the model name string to analyze
if is_custom_model_selected:
model_string_to_analyze = custom_model_value
else:
model_string_to_analyze = model_choice_value
try:
# Get the model string
model_string = get_model_string(model_string_to_analyze, agent_loop_choice)
# Set API keys if provided
if openai_key_input:
os.environ["OPENAI_API_KEY"] = openai_key_input
if anthropic_key_input:
os.environ["ANTHROPIC_API_KEY"] = anthropic_key_input
if cua_cloud_api_key:
os.environ["CUA_API_KEY"] = cua_cloud_api_key
# Save settings
current_settings = {
"agent_loop": agent_loop_choice,
"model_choice": model_choice_value,
"custom_model": custom_model_value,
"provider_base_url": custom_url_value,
"save_trajectory": save_traj,
"recent_images": recent_imgs,
"computer_os": computer_os,
"computer_provider": computer_provider,
"container_name": container_name,
}
save_settings(current_settings)
# Create agent
global_agent = create_agent(
model_string=model_string,
save_trajectory=save_traj,
only_n_most_recent_images=recent_imgs,
custom_model_name=(
custom_model_value if is_custom_model_selected else None
),
computer_os=computer_os,
computer_provider=computer_provider,
computer_name=container_name,
computer_api_key=cua_cloud_api_key,
verbosity=logging.DEBUG,
max_trajectory_budget=(
max_budget_value
if max_budget_value and max_budget_value > 0
else None
),
)
if global_agent is None:
history.append(
gr.ChatMessage(
role="assistant",
content="Failed to create agent. Check API keys and configuration.",
)
)
yield history
return
# Add user message to global history
global global_messages
global_messages.append({"role": "user", "content": last_user_message})
# Stream responses from the agent
async for result in global_agent.run(global_messages):
global_messages += result.get("output", [])
# print(f"DEBUG - Agent response ------- START")
# from pprint import pprint
# pprint(result)
# print(f"DEBUG - Agent response ------- END")
# Process the result output
for item in result.get("output", []):
if item.get("type") == "message":
content = item.get("content", [])
for content_part in content:
if content_part.get("text"):
history.append(
gr.ChatMessage(
role=item.get("role", "assistant"),
content=content_part.get("text", ""),
metadata=content_part.get("metadata", {}),
)
)
elif item.get("type") == "computer_call":
action = item.get("action", {})
action_type = action.get("type", "")
if action_type:
action_title = f"🛠️ Performing {action_type}"
if action.get("x") and action.get("y"):
action_title += f" at ({action['x']}, {action['y']})"
history.append(
gr.ChatMessage(
role="assistant",
content=f"```json\n{json.dumps(action)}\n```",
metadata={"title": action_title},
)
)
elif item.get("type") == "function_call":
function_name = item.get("name", "")
arguments = item.get("arguments", "{}")
history.append(
gr.ChatMessage(
role="assistant",
content=f"🔧 Calling function: {function_name}\n```json\n{arguments}\n```",
metadata={"title": f"Function Call: {function_name}"},
)
)
elif item.get("type") == "function_call_output":
output = item.get("output", "")
history.append(
gr.ChatMessage(
role="assistant",
content=f"📤 Function output:\n```\n{output}\n```",
metadata={"title": "Function Output"},
)
)
elif item.get("type") == "computer_call_output":
output = item.get("output", {}).get("image_url", "")
image_markdown = f""
history.append(
gr.ChatMessage(
role="assistant",
content=image_markdown,
metadata={"title": "🖥️ Computer Output"},
)
)
yield history
except Exception as e:
import traceback
traceback.print_exc()
history.append(gr.ChatMessage(role="assistant", content=f"Error: {str(e)}"))
yield history
# Connect the submit button
submit_event = msg.submit(
fn=chat_submit,
inputs=[msg, chatbot_history],
outputs=[msg, chatbot_history],
queue=False,
).then(
fn=process_response,
inputs=[
chatbot_history,
openai_model_choice,
anthropic_model_choice,
omni_model_choice,
uitars_model_choice,
custom_model,
agent_loop,
save_trajectory,
recent_images,
provider_base_url,
provider_api_key,
openai_api_key_input,
anthropic_api_key_input,
computer_os,
computer_provider,
container_name,
cua_cloud_api_key,
max_budget,
],
outputs=[chatbot_history],
queue=True,
)
# Clear button functionality
def clear_chat():
global global_messages
global_messages.clear()
return None
clear.click(clear_chat, None, chatbot_history, queue=False)
# Connect cancel button
cancel_button.click(
cancel_agent_task, [chatbot_history], [chatbot_history], queue=False
)
# Code display update function
def update_code_display(
agent_loop,
model_choice_val,
custom_model_val,
chat_history,
recent_images_val,
save_trajectory_val,
computer_os,
computer_provider,
container_name,
cua_cloud_api_key,
max_budget_val,
):
messages = []
if chat_history:
for msg in chat_history:
if isinstance(msg, dict) and msg.get("role") == "user":
messages.append(msg.get("content", ""))
return generate_python_code(
agent_loop,
model_choice_val or custom_model_val or "gpt-4o",
messages,
recent_images_val,
save_trajectory_val,
computer_os,
computer_provider,
container_name,
cua_cloud_api_key,
max_budget_val,
)
# Update code display when configuration changes
for component in [
agent_loop,
model_choice,
custom_model,
chatbot_history,
recent_images,
save_trajectory,
computer_os,
computer_provider,
container_name,
cua_cloud_api_key,
max_budget,
]:
component.change(
update_code_display,
inputs=[
agent_loop,
model_choice,
custom_model,
chatbot_history,
recent_images,
save_trajectory,
computer_os,
computer_provider,
container_name,
cua_cloud_api_key,
max_budget,
],
outputs=[code_display],
)
return demo
```
--------------------------------------------------------------------------------
/libs/lume/src/LumeController.swift:
--------------------------------------------------------------------------------
```swift
import ArgumentParser
import Foundation
import Virtualization
// MARK: - Shared VM Manager
@MainActor
final class SharedVM {
static let shared: SharedVM = SharedVM()
private var runningVMs: [String: VM] = [:]
private init() {}
func getVM(name: String) -> VM? {
return runningVMs[name]
}
func setVM(name: String, vm: VM) {
runningVMs[name] = vm
}
func removeVM(name: String) {
runningVMs.removeValue(forKey: name)
}
}
/// Entrypoint for Commands and API server
final class LumeController {
// MARK: - Properties
let home: Home
private let imageLoaderFactory: ImageLoaderFactory
private let vmFactory: VMFactory
// MARK: - Initialization
init(
home: Home = Home(),
imageLoaderFactory: ImageLoaderFactory = DefaultImageLoaderFactory(),
vmFactory: VMFactory = DefaultVMFactory()
) {
self.home = home
self.imageLoaderFactory = imageLoaderFactory
self.vmFactory = vmFactory
}
// MARK: - Public VM Management Methods
/// Lists all virtual machines in the system
@MainActor
public func list(storage: String? = nil) throws -> [VMDetails] {
do {
if let storage = storage {
// If storage is specified, only return VMs from that location
if storage.contains("/") || storage.contains("\\") {
// Direct path - check if it exists
if !FileManager.default.fileExists(atPath: storage) {
// Return empty array if the path doesn't exist
return []
}
// Try to get all VMs from the specified path
// We need to check which subdirectories are valid VM dirs
let directoryURL = URL(fileURLWithPath: storage)
let contents = try FileManager.default.contentsOfDirectory(
at: directoryURL,
includingPropertiesForKeys: [.isDirectoryKey],
options: .skipsHiddenFiles
)
let statuses = try contents.compactMap { subdir -> VMDetails? in
guard let isDirectory = try subdir.resourceValues(forKeys: [.isDirectoryKey]).isDirectory,
isDirectory else {
return nil
}
let vmName = subdir.lastPathComponent
// Check if it's a valid VM directory
let vmDir = try home.getVMDirectoryFromPath(vmName, storagePath: storage)
if !vmDir.initialized() {
return nil
}
do {
let vm = try self.get(name: vmName, storage: storage)
return vm.details
} catch {
// Skip invalid VM directories
return nil
}
}
return statuses
} else {
// Named storage
let vmsWithLoc = try home.getAllVMDirectories()
let statuses = try vmsWithLoc.compactMap { vmWithLoc -> VMDetails? in
// Only include VMs from the specified location
if vmWithLoc.locationName != storage {
return nil
}
let vm = try self.get(
name: vmWithLoc.directory.name, storage: vmWithLoc.locationName)
return vm.details
}
return statuses
}
} else {
// No storage filter - get all VMs
let vmsWithLoc = try home.getAllVMDirectories()
let statuses = try vmsWithLoc.compactMap { vmWithLoc -> VMDetails? in
let vm = try self.get(
name: vmWithLoc.directory.name, storage: vmWithLoc.locationName)
return vm.details
}
return statuses
}
} catch {
Logger.error("Failed to list VMs", metadata: ["error": error.localizedDescription])
throw error
}
}
@MainActor
public func clone(
name: String, newName: String, sourceLocation: String? = nil, destLocation: String? = nil
) throws {
let normalizedName = normalizeVMName(name: name)
let normalizedNewName = normalizeVMName(name: newName)
Logger.info(
"Cloning VM",
metadata: [
"source": normalizedName,
"destination": normalizedNewName,
"sourceLocation": sourceLocation ?? "default",
"destLocation": destLocation ?? "default",
])
do {
// Validate source VM exists
_ = try self.validateVMExists(normalizedName, storage: sourceLocation)
// Get the source VM and check if it's running
let sourceVM = try get(name: normalizedName, storage: sourceLocation)
if sourceVM.details.status == "running" {
Logger.error("Cannot clone a running VM", metadata: ["source": normalizedName])
throw VMError.alreadyRunning(normalizedName)
}
// Check if destination already exists
do {
let destDir = try home.getVMDirectory(normalizedNewName, storage: destLocation)
if destDir.exists() {
Logger.error(
"Destination VM already exists",
metadata: ["destination": normalizedNewName])
throw HomeError.directoryAlreadyExists(path: destDir.dir.path)
}
} catch VMLocationError.locationNotFound {
// Location not found is okay, we'll create it
} catch VMError.notFound {
// VM not found is okay, we'll create it
}
// Copy the VM directory
try home.copyVMDirectory(
from: normalizedName,
to: normalizedNewName,
sourceLocation: sourceLocation,
destLocation: destLocation
)
// Update MAC address in the cloned VM to ensure uniqueness
let clonedVM = try get(name: normalizedNewName, storage: destLocation)
try clonedVM.setMacAddress(VZMACAddress.randomLocallyAdministered().string)
// Update MAC Identifier in the cloned VM to ensure uniqueness
try clonedVM.setMachineIdentifier(
DarwinVirtualizationService.generateMachineIdentifier())
Logger.info(
"VM cloned successfully",
metadata: ["source": normalizedName, "destination": normalizedNewName])
} catch {
Logger.error("Failed to clone VM", metadata: ["error": error.localizedDescription])
throw error
}
}
@MainActor
public func get(name: String, storage: String? = nil) throws -> VM {
let normalizedName = normalizeVMName(name: name)
do {
let vm: VM
if let storagePath = storage, storagePath.contains("/") || storagePath.contains("\\") {
// Storage is a direct path
let vmDir = try home.getVMDirectoryFromPath(normalizedName, storagePath: storagePath)
guard vmDir.initialized() else {
// Throw a specific error if the directory exists but isn't a valid VM
if vmDir.exists() {
throw VMError.notInitialized(normalizedName)
} else {
throw VMError.notFound(normalizedName)
}
}
// Pass the path as the storage context
vm = try self.loadVM(vmDir: vmDir, storage: storagePath)
} else {
// Storage is nil or a named location
let actualLocation = try self.validateVMExists(
normalizedName, storage: storage)
let vmDir = try home.getVMDirectory(normalizedName, storage: actualLocation)
// loadVM will re-check initialized, but good practice to keep validateVMExists result.
vm = try self.loadVM(vmDir: vmDir, storage: actualLocation)
}
return vm
} catch {
Logger.error(
"Failed to get VM",
metadata: [
"vmName": normalizedName, "storage": storage ?? "default",
"error": error.localizedDescription,
])
// Re-throw the original error to preserve its type
throw error
}
}
@MainActor
public func create(
name: String,
os: String,
diskSize: UInt64,
cpuCount: Int,
memorySize: UInt64,
display: String,
ipsw: String?,
storage: String? = nil
) async throws {
Logger.info(
"Creating VM",
metadata: [
"name": name,
"os": os,
"location": storage ?? "default",
"disk_size": "\(diskSize / 1024 / 1024)MB",
"cpu_count": "\(cpuCount)",
"memory_size": "\(memorySize / 1024 / 1024)MB",
"display": display,
"ipsw": ipsw ?? "none",
])
do {
try validateCreateParameters(name: name, os: os, ipsw: ipsw, storage: storage)
let vm = try await createTempVMConfig(
os: os,
cpuCount: cpuCount,
memorySize: memorySize,
diskSize: diskSize,
display: display
)
try await vm.setup(
ipswPath: ipsw ?? "none",
cpuCount: cpuCount,
memorySize: memorySize,
diskSize: diskSize,
display: display
)
try vm.finalize(to: name, home: home, storage: storage)
Logger.info("VM created successfully", metadata: ["name": name])
} catch {
Logger.error("Failed to create VM", metadata: ["error": error.localizedDescription])
throw error
}
}
@MainActor
public func delete(name: String, storage: String? = nil) async throws {
let normalizedName = normalizeVMName(name: name)
Logger.info(
"Deleting VM",
metadata: [
"name": normalizedName,
"location": storage ?? "default",
])
do {
let vmDir: VMDirectory
// Check if storage is a direct path
if let storagePath = storage, storagePath.contains("/") || storagePath.contains("\\") {
// Storage is a direct path
vmDir = try home.getVMDirectoryFromPath(normalizedName, storagePath: storagePath)
guard vmDir.initialized() else {
// Throw a specific error if the directory exists but isn't a valid VM
if vmDir.exists() {
throw VMError.notInitialized(normalizedName)
} else {
throw VMError.notFound(normalizedName)
}
}
} else {
// Storage is nil or a named location
let actualLocation = try self.validateVMExists(normalizedName, storage: storage)
vmDir = try home.getVMDirectory(normalizedName, storage: actualLocation)
}
// Stop VM if it's running
if SharedVM.shared.getVM(name: normalizedName) != nil {
try await stopVM(name: normalizedName)
}
try vmDir.delete()
Logger.info("VM deleted successfully", metadata: ["name": normalizedName])
} catch {
Logger.error("Failed to delete VM", metadata: ["error": error.localizedDescription])
throw error
}
}
// MARK: - VM Operations
@MainActor
public func updateSettings(
name: String,
cpu: Int? = nil,
memory: UInt64? = nil,
diskSize: UInt64? = nil,
display: String? = nil,
storage: String? = nil
) throws {
let normalizedName = normalizeVMName(name: name)
Logger.info(
"Updating VM settings",
metadata: [
"name": normalizedName,
"location": storage ?? "default",
"cpu": cpu.map { "\($0)" } ?? "unchanged",
"memory": memory.map { "\($0 / 1024 / 1024)MB" } ?? "unchanged",
"disk_size": diskSize.map { "\($0 / 1024 / 1024)MB" } ?? "unchanged",
"display": display ?? "unchanged",
])
do {
// Find the actual location of the VM
let actualLocation = try self.validateVMExists(
normalizedName, storage: storage)
let vm = try get(name: normalizedName, storage: actualLocation)
// Apply settings in order
if let cpu = cpu {
try vm.setCpuCount(cpu)
}
if let memory = memory {
try vm.setMemorySize(memory)
}
if let diskSize = diskSize {
try vm.setDiskSize(diskSize)
}
if let display = display {
try vm.setDisplay(display)
}
Logger.info("VM settings updated successfully", metadata: ["name": normalizedName])
} catch {
Logger.error(
"Failed to update VM settings", metadata: ["error": error.localizedDescription])
throw error
}
}
@MainActor
public func stopVM(name: String, storage: String? = nil) async throws {
let normalizedName = normalizeVMName(name: name)
Logger.info("Stopping VM", metadata: ["name": normalizedName])
do {
// Find the actual location of the VM
let actualLocation = try self.validateVMExists(
normalizedName, storage: storage)
// Try to get VM from cache first
let vm: VM
if let cachedVM = SharedVM.shared.getVM(name: normalizedName) {
vm = cachedVM
} else {
vm = try get(name: normalizedName, storage: actualLocation)
}
try await vm.stop()
// Remove VM from cache after stopping
SharedVM.shared.removeVM(name: normalizedName)
Logger.info("VM stopped successfully", metadata: ["name": normalizedName])
} catch {
// Clean up cache even if stop fails
SharedVM.shared.removeVM(name: normalizedName)
Logger.error("Failed to stop VM", metadata: ["error": error.localizedDescription])
throw error
}
}
@MainActor
public func runVM(
name: String,
noDisplay: Bool = false,
sharedDirectories: [SharedDirectory] = [],
mount: Path? = nil,
registry: String = "ghcr.io",
organization: String = "trycua",
vncPort: Int = 0,
recoveryMode: Bool = false,
storage: String? = nil,
usbMassStoragePaths: [Path]? = nil
) async throws {
let normalizedName = normalizeVMName(name: name)
Logger.info(
"Running VM",
metadata: [
"name": normalizedName,
"no_display": "\(noDisplay)",
"shared_directories":
"\(sharedDirectories.map( { $0.string } ).joined(separator: ", "))",
"mount": mount?.path ?? "none",
"vnc_port": "\(vncPort)",
"recovery_mode": "\(recoveryMode)",
"storage_param": storage ?? "default", // Log the original param
"usb_storage_devices": "\(usbMassStoragePaths?.count ?? 0)",
])
do {
// Check if name is an image ref to auto-pull
let components = normalizedName.split(separator: ":")
if components.count == 2 { // Check if it looks like image:tag
// Attempt to validate if VM exists first, suppressing the error
// This avoids pulling if the VM already exists, even if name looks like an image ref
let vmExists = (try? self.validateVMExists(normalizedName, storage: storage)) != nil
if !vmExists {
Logger.info(
"VM not found, attempting to pull image based on name",
metadata: ["imageRef": normalizedName])
// Use the potentially new VM name derived from the image ref
let potentialVMName = String(components[0])
try await pullImage(
image: normalizedName, // Full image ref
name: potentialVMName, // Name derived from image
registry: registry,
organization: organization,
storage: storage
)
// Important: After pull, the effective name might have changed
// We proceed assuming the user wants to run the VM derived from image name
// normalizedName = potentialVMName // Re-assign normalizedName if pull logic creates it
// Note: Current pullImage doesn't return the final VM name,
// so we assume it matches the name derived from the image.
// This might need refinement if pullImage behaviour changes.
}
}
// Determine effective storage path or name AND get the VMDirectory
let effectiveStorage: String?
let vmDir: VMDirectory
if let storagePath = storage, storagePath.contains("/") || storagePath.contains("\\") {
// Storage is a direct path
vmDir = try home.getVMDirectoryFromPath(normalizedName, storagePath: storagePath)
guard vmDir.initialized() else {
if vmDir.exists() {
throw VMError.notInitialized(normalizedName)
} else {
throw VMError.notFound(normalizedName)
}
}
effectiveStorage = storagePath // Use the path string
Logger.info("Using direct storage path", metadata: ["path": storagePath])
} else {
// Storage is nil or a named location - validate and get the actual name
let actualLocationName = try validateVMExists(normalizedName, storage: storage)
vmDir = try home.getVMDirectory(normalizedName, storage: actualLocationName) // Get VMDir for named location
effectiveStorage = actualLocationName // Use the named location string
Logger.info(
"Using named storage location",
metadata: [
"requested": storage ?? "default",
"actual": actualLocationName ?? "default",
])
}
// Validate parameters using the located VMDirectory
try validateRunParameters(
vmDir: vmDir, // Pass vmDir
sharedDirectories: sharedDirectories,
mount: mount,
usbMassStoragePaths: usbMassStoragePaths
)
// Load the VM directly using the located VMDirectory and storage context
let vm = try self.loadVM(vmDir: vmDir, storage: effectiveStorage)
SharedVM.shared.setVM(name: normalizedName, vm: vm)
try await vm.run(
noDisplay: noDisplay,
sharedDirectories: sharedDirectories,
mount: mount,
vncPort: vncPort,
recoveryMode: recoveryMode,
usbMassStoragePaths: usbMassStoragePaths)
Logger.info("VM started successfully", metadata: ["name": normalizedName])
} catch {
SharedVM.shared.removeVM(name: normalizedName)
Logger.error("Failed to run VM", metadata: ["error": error.localizedDescription])
throw error
}
}
// MARK: - Image Management
@MainActor
public func getLatestIPSWURL() async throws -> URL {
Logger.info("Fetching latest supported IPSW URL")
do {
let imageLoader = DarwinImageLoader()
let url = try await imageLoader.fetchLatestSupportedURL()
Logger.info("Found latest IPSW URL", metadata: ["url": url.absoluteString])
return url
} catch {
Logger.error(
"Failed to fetch IPSW URL", metadata: ["error": error.localizedDescription])
throw error
}
}
@MainActor
public func pullImage(
image: String,
name: String?,
registry: String,
organization: String,
storage: String? = nil
) async throws {
do {
// Convert non-sparse image to sparse version if needed
var actualImage = image
var actualName = name
// Split the image to get name and tag for both sparse and non-sparse cases
let components = image.split(separator: ":")
guard components.count == 2 else {
throw ValidationError("Invalid image format. Expected format: name:tag")
}
let originalName = String(components[0])
let tag = String(components[1])
// For consistent VM naming, strip "-sparse" suffix if present when no name provided
let normalizedBaseName: String
if originalName.hasSuffix("-sparse") {
normalizedBaseName = String(originalName.dropLast(7)) // drop "-sparse"
} else {
normalizedBaseName = originalName
}
// Set default VM name if not provided
if actualName == nil {
actualName = "\(normalizedBaseName)_\(tag)"
}
// Convert non-sparse image to sparse version if needed
if !image.contains("-sparse") {
// Create sparse version of the image name
actualImage = "\(originalName)-sparse:\(tag)"
Logger.info(
"Converting to sparse image",
metadata: [
"original": image,
"sparse": actualImage,
"vm_name": actualName ?? "default",
]
)
}
let vmName = actualName ?? "default" // Just use actualName as it's already normalized
Logger.info(
"Pulling image",
metadata: [
"image": actualImage,
"name": vmName,
"registry": registry,
"organization": organization,
"location": storage ?? "default",
])
try self.validatePullParameters(
image: actualImage,
name: vmName,
registry: registry,
organization: organization,
storage: storage
)
let imageContainerRegistry = ImageContainerRegistry(
registry: registry, organization: organization)
let _ = try await imageContainerRegistry.pull(
image: actualImage,
name: vmName,
locationName: storage)
Logger.info(
"Setting new VM mac address",
metadata: [
"vm_name": vmName,
"location": storage ?? "default",
])
// Update MAC address in the cloned VM to ensure uniqueness
let vm = try get(name: vmName, storage: storage)
try vm.setMacAddress(VZMACAddress.randomLocallyAdministered().string)
Logger.info(
"Image pulled successfully",
metadata: [
"image": actualImage,
"name": vmName,
"registry": registry,
"organization": organization,
"location": storage ?? "default",
])
} catch {
Logger.error("Failed to pull image", metadata: ["error": error.localizedDescription])
throw error
}
}
@MainActor
public func pushImage(
name: String,
imageName: String,
tags: [String],
registry: String,
organization: String,
storage: String? = nil,
chunkSizeMb: Int = 512,
verbose: Bool = false,
dryRun: Bool = false,
reassemble: Bool = false
) async throws {
do {
Logger.info(
"Pushing VM to registry",
metadata: [
"name": name,
"imageName": imageName,
"tags": "\(tags.joined(separator: ", "))",
"registry": registry,
"organization": organization,
"location": storage ?? "default",
"chunk_size": "\(chunkSizeMb)MB",
"dry_run": "\(dryRun)",
"reassemble": "\(reassemble)",
])
try validatePushParameters(
name: name,
imageName: imageName,
tags: tags,
registry: registry,
organization: organization
)
// Find the actual location of the VM
let actualLocation = try self.validateVMExists(name, storage: storage)
// Get the VM directory
let vmDir = try home.getVMDirectory(name, storage: actualLocation)
// Use ImageContainerRegistry to push the VM
let imageContainerRegistry = ImageContainerRegistry(
registry: registry, organization: organization)
try await imageContainerRegistry.push(
vmDirPath: vmDir.dir.path,
imageName: imageName,
tags: tags,
chunkSizeMb: chunkSizeMb,
verbose: verbose,
dryRun: dryRun,
reassemble: reassemble
)
Logger.info(
"VM pushed successfully",
metadata: [
"name": name,
"imageName": imageName,
"tags": "\(tags.joined(separator: ", "))",
"registry": registry,
"organization": organization,
])
} catch {
Logger.error("Failed to push VM", metadata: ["error": error.localizedDescription])
throw error
}
}
@MainActor
public func pruneImages() async throws {
Logger.info("Pruning cached images")
do {
// Use configured cache directory
let cacheDir = (SettingsManager.shared.getCacheDirectory() as NSString)
.expandingTildeInPath
let ghcrDir = URL(fileURLWithPath: cacheDir).appendingPathComponent("ghcr")
if FileManager.default.fileExists(atPath: ghcrDir.path) {
try FileManager.default.removeItem(at: ghcrDir)
try FileManager.default.createDirectory(
at: ghcrDir, withIntermediateDirectories: true)
Logger.info("Successfully removed cached images")
} else {
Logger.info("No cached images found")
}
} catch {
Logger.error("Failed to prune images", metadata: ["error": error.localizedDescription])
throw error
}
}
public struct ImageInfo: Codable {
public let repository: String
public let imageId: String // This will be the shortened manifest ID
}
public struct ImageList: Codable {
public let local: [ImageInfo]
public let remote: [String] // Keep this for future remote registry support
}
@MainActor
public func getImages(organization: String = "trycua") async throws -> ImageList {
Logger.info("Listing local images", metadata: ["organization": organization])
let imageContainerRegistry = ImageContainerRegistry(
registry: "ghcr.io", organization: organization)
let cachedImages = try await imageContainerRegistry.getImages()
let imageInfos = cachedImages.map { image in
ImageInfo(
repository: image.repository,
imageId: String(image.manifestId.prefix(12))
)
}
ImagesPrinter.print(images: imageInfos.map { "\($0.repository):\($0.imageId)" })
return ImageList(local: imageInfos, remote: [])
}
// MARK: - Settings Management
public func getSettings() -> LumeSettings {
return SettingsManager.shared.getSettings()
}
public func setHomeDirectory(_ path: String) throws {
// Try to set the home directory in settings
try SettingsManager.shared.setHomeDirectory(path: path)
// Force recreate home instance to use the new path
try home.validateHomeDirectory()
Logger.info("Home directory updated", metadata: ["path": path])
}
// MARK: - VM Location Management
public func addLocation(name: String, path: String) throws {
Logger.info("Adding VM location", metadata: ["name": name, "path": path])
try home.addLocation(name: name, path: path)
Logger.info("VM location added successfully", metadata: ["name": name])
}
public func removeLocation(name: String) throws {
Logger.info("Removing VM location", metadata: ["name": name])
try home.removeLocation(name: name)
Logger.info("VM location removed successfully", metadata: ["name": name])
}
public func setDefaultLocation(name: String) throws {
Logger.info("Setting default VM location", metadata: ["name": name])
try home.setDefaultLocation(name: name)
Logger.info("Default VM location set successfully", metadata: ["name": name])
}
public func getLocations() -> [VMLocation] {
return home.getLocations()
}
// MARK: - Cache Directory Management
public func setCacheDirectory(path: String) throws {
Logger.info("Setting cache directory", metadata: ["path": path])
try SettingsManager.shared.setCacheDirectory(path: path)
Logger.info("Cache directory updated", metadata: ["path": path])
}
public func getCacheDirectory() -> String {
return SettingsManager.shared.getCacheDirectory()
}
public func isCachingEnabled() -> Bool {
return SettingsManager.shared.isCachingEnabled()
}
public func setCachingEnabled(_ enabled: Bool) throws {
Logger.info("Setting caching enabled", metadata: ["enabled": "\(enabled)"])
try SettingsManager.shared.setCachingEnabled(enabled)
Logger.info("Caching setting updated", metadata: ["enabled": "\(enabled)"])
}
// MARK: - Private Helper Methods
/// Normalizes a VM name by replacing colons with underscores
private func normalizeVMName(name: String) -> String {
let components = name.split(separator: ":")
return components.count == 2 ? "\(components[0])_\(components[1])" : name
}
@MainActor
private func createTempVMConfig(
os: String,
cpuCount: Int,
memorySize: UInt64,
diskSize: UInt64,
display: String
) async throws -> VM {
let config = try VMConfig(
os: os,
cpuCount: cpuCount,
memorySize: memorySize,
diskSize: diskSize,
macAddress: VZMACAddress.randomLocallyAdministered().string,
display: display
)
let vmDirContext = VMDirContext(
dir: try home.createTempVMDirectory(),
config: config,
home: home,
storage: nil
)
let imageLoader = os.lowercased() == "macos" ? imageLoaderFactory.createImageLoader() : nil
return try vmFactory.createVM(vmDirContext: vmDirContext, imageLoader: imageLoader)
}
@MainActor
private func loadVM(vmDir: VMDirectory, storage: String?) throws -> VM {
// vmDir is now passed directly
guard vmDir.initialized() else {
throw VMError.notInitialized(vmDir.name) // Use name from vmDir
}
let config: VMConfig = try vmDir.loadConfig()
// Pass the provided storage (which could be a path or named location)
let vmDirContext = VMDirContext(
dir: vmDir, config: config, home: home, storage: storage
)
let imageLoader =
config.os.lowercased() == "macos" ? imageLoaderFactory.createImageLoader() : nil
return try vmFactory.createVM(vmDirContext: vmDirContext, imageLoader: imageLoader)
}
// MARK: - Validation Methods
private func validateCreateParameters(
name: String, os: String, ipsw: String?, storage: String?
) throws {
if os.lowercased() == "macos" {
guard let ipsw = ipsw else {
throw ValidationError("IPSW path required for macOS VM")
}
if ipsw != "latest" && !FileManager.default.fileExists(atPath: ipsw) {
throw ValidationError("IPSW file not found")
}
} else if os.lowercased() == "linux" {
if ipsw != nil {
throw ValidationError("IPSW path not supported for Linux VM")
}
} else {
throw ValidationError("Unsupported OS type: \(os)")
}
let vmDir: VMDirectory = try home.getVMDirectory(name, storage: storage)
if vmDir.exists() {
throw VMError.alreadyExists(name)
}
}
private func validateSharedDirectories(_ directories: [SharedDirectory]) throws {
for dir in directories {
var isDirectory: ObjCBool = false
guard FileManager.default.fileExists(atPath: dir.hostPath, isDirectory: &isDirectory),
isDirectory.boolValue
else {
throw ValidationError(
"Host path does not exist or is not a directory: \(dir.hostPath)")
}
}
}
public func validateVMExists(_ name: String, storage: String? = nil) throws -> String? {
// If location is specified, only check that location
if let storage = storage {
// Check if storage is a path by looking for directory separator
if storage.contains("/") || storage.contains("\\") {
// Treat as direct path
let vmDir = try home.getVMDirectoryFromPath(name, storagePath: storage)
guard vmDir.initialized() else {
throw VMError.notFound(name)
}
return storage // Return the path as the location identifier
} else {
// Treat as named storage
let vmDir = try home.getVMDirectory(name, storage: storage)
guard vmDir.initialized() else {
throw VMError.notFound(name)
}
return storage
}
}
// If no location specified, try to find the VM in any location
let allVMs = try home.getAllVMDirectories()
if let foundVM = allVMs.first(where: { $0.directory.name == name }) {
// VM found, return its location
return foundVM.locationName
}
// VM not found in any location
throw VMError.notFound(name)
}
private func validateRunParameters(
vmDir: VMDirectory, // Changed signature: accept VMDirectory
sharedDirectories: [SharedDirectory]?,
mount: Path?,
usbMassStoragePaths: [Path]? = nil
) throws {
// VM existence is confirmed by having vmDir, no need for validateVMExists
if let dirs = sharedDirectories {
try self.validateSharedDirectories(dirs)
}
// Validate USB mass storage paths
if let usbPaths = usbMassStoragePaths {
for path in usbPaths {
if !FileManager.default.fileExists(atPath: path.path) {
throw ValidationError("USB mass storage image not found: \(path.path)")
}
}
if #available(macOS 15.0, *) {
// USB mass storage is supported
} else {
Logger.info(
"USB mass storage devices require macOS 15.0 or later. They will be ignored.")
}
}
// Load config directly from vmDir
let vmConfig = try vmDir.loadConfig()
switch vmConfig.os.lowercased() {
case "macos":
if mount != nil {
throw ValidationError(
"Mounting disk images is not supported for macOS VMs. If you are looking to mount a IPSW, please use the --ipsw option in the create command."
)
}
case "linux":
if let mount = mount, !FileManager.default.fileExists(atPath: mount.path) {
throw ValidationError("Mount file not found: \(mount.path)")
}
default:
break
}
}
private func validatePullParameters(
image: String,
name: String,
registry: String,
organization: String,
storage: String? = nil
) throws {
guard !image.isEmpty else {
throw ValidationError("Image name cannot be empty")
}
guard !name.isEmpty else {
throw ValidationError("VM name cannot be empty")
}
guard !registry.isEmpty else {
throw ValidationError("Registry cannot be empty")
}
guard !organization.isEmpty else {
throw ValidationError("Organization cannot be empty")
}
// Determine if storage is a path or a named storage location
let vmDir: VMDirectory
if let storage = storage, storage.contains("/") || storage.contains("\\") {
// Create the base directory if it doesn't exist
if !FileManager.default.fileExists(atPath: storage) {
Logger.info("Creating VM storage directory", metadata: ["path": storage])
do {
try FileManager.default.createDirectory(
atPath: storage,
withIntermediateDirectories: true
)
} catch {
throw HomeError.directoryCreationFailed(path: storage)
}
}
// Use getVMDirectoryFromPath for direct paths
vmDir = try home.getVMDirectoryFromPath(name, storagePath: storage)
} else {
// Use getVMDirectory for named storage locations
vmDir = try home.getVMDirectory(name, storage: storage)
}
if vmDir.exists() {
throw VMError.alreadyExists(name)
}
}
private func validatePushParameters(
name: String,
imageName: String,
tags: [String],
registry: String,
organization: String
) throws {
guard !name.isEmpty else {
throw ValidationError("VM name cannot be empty")
}
guard !imageName.isEmpty else {
throw ValidationError("Image name cannot be empty")
}
guard !tags.isEmpty else {
throw ValidationError("At least one tag must be provided.")
}
guard !registry.isEmpty else {
throw ValidationError("Registry cannot be empty")
}
guard !organization.isEmpty else {
throw ValidationError("Organization cannot be empty")
}
// Verify VM exists (this will throw if not found)
_ = try self.validateVMExists(name)
}
}
```
--------------------------------------------------------------------------------
/libs/python/computer/computer/providers/lumier/provider.py:
--------------------------------------------------------------------------------
```python
"""
Lumier VM provider implementation.
This provider uses Docker containers running the Lumier image to create
macOS and Linux VMs. It handles VM lifecycle operations through Docker
commands and container management.
"""
import asyncio
import json
import logging
import os
import re
import subprocess
import time
from typing import Any, Dict, List, Optional
from ..base import BaseVMProvider, VMProviderType
from ..lume_api import lume_api_get, lume_api_run, lume_api_stop, lume_api_update
# Setup logging
logger = logging.getLogger(__name__)
# Check if Docker is available
try:
subprocess.run(["docker", "--version"], capture_output=True, check=True)
HAS_LUMIER = True
except (subprocess.SubprocessError, FileNotFoundError):
HAS_LUMIER = False
class LumierProvider(BaseVMProvider):
"""
Lumier VM Provider implementation using Docker containers.
This provider uses Docker to run Lumier containers that can create
macOS and Linux VMs through containerization.
"""
def __init__(
self,
provider_port: Optional[int] = 7777,
host: str = "localhost",
storage: Optional[str] = None, # Can be a path or 'ephemeral'
shared_path: Optional[str] = None,
image: str = "macos-sequoia-cua:latest", # VM image to use
verbose: bool = False,
ephemeral: bool = False,
noVNC_port: Optional[int] = 8006,
):
"""Initialize the Lumier VM Provider.
Args:
provider_port: Port for the API server (default: 7777)
host: Hostname for the API server (default: localhost)
storage: Path for persistent VM storage
shared_path: Path for shared folder between host and VM
image: VM image to use (e.g. "macos-sequoia-cua:latest")
verbose: Enable verbose logging
ephemeral: Use ephemeral (temporary) storage
noVNC_port: Specific port for noVNC interface (default: 8006)
"""
self.host = host
# Always ensure lume_port has a valid value (7777 is the default)
self.lume_port = 7777 if provider_port is None else provider_port
self.vnc_port = noVNC_port # User-specified noVNC port, will be set in run_vm if provided
self.ephemeral = ephemeral
# Handle ephemeral storage (temporary directory)
if ephemeral:
self.storage = "ephemeral"
else:
self.storage = storage
self.shared_path = shared_path
self.image = image # Store the VM image name to use
# The container_name will be set in run_vm using the VM name
self.verbose = verbose
self._container_id = None
self._api_url = None # Will be set after container starts
@property
def provider_type(self) -> VMProviderType:
"""Return the provider type."""
return VMProviderType.LUMIER
def _parse_memory(self, memory_str: str) -> int:
"""Parse memory string to MB integer.
Examples:
"8GB" -> 8192
"1024MB" -> 1024
"512" -> 512
"""
if isinstance(memory_str, int):
return memory_str
if isinstance(memory_str, str):
# Extract number and unit
match = re.match(r"(\d+)([A-Za-z]*)", memory_str)
if match:
value, unit = match.groups()
value = int(value)
unit = unit.upper()
if unit == "GB" or unit == "G":
return value * 1024
elif unit == "MB" or unit == "M" or unit == "":
return value
# Default fallback
logger.warning(f"Could not parse memory string '{memory_str}', using 8GB default")
return 8192 # Default to 8GB
# Helper methods for interacting with the Lumier API through curl
# These methods handle the various VM operations via API calls
def _get_curl_error_message(self, return_code: int) -> str:
"""Get a descriptive error message for curl return codes.
Args:
return_code: The curl return code
Returns:
A descriptive error message
"""
# Map common curl error codes to helpful messages
if return_code == 7:
return "Failed to connect - API server is starting up"
elif return_code == 22:
return "HTTP error returned from API server"
elif return_code == 28:
return "Operation timeout - API server is slow to respond"
elif return_code == 52:
return "Empty reply from server - API is starting but not ready"
elif return_code == 56:
return "Network problem during data transfer"
else:
return f"Unknown curl error code: {return_code}"
async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Get VM information by name.
Args:
name: Name of the VM to get information for
storage: Optional storage path override. If provided, this will be used
instead of the provider's default storage path.
Returns:
Dictionary with VM information including status, IP address, etc.
"""
if not HAS_LUMIER:
logger.error("Docker is not available. Cannot get VM status.")
return {"name": name, "status": "unavailable", "error": "Docker is not available"}
# Store the current name for API requests
self.container_name = name
try:
# Check if the container exists and is running
check_cmd = [
"docker",
"ps",
"-a",
"--filter",
f"name={name}",
"--format",
"{{.Status}}",
]
check_result = subprocess.run(check_cmd, capture_output=True, text=True)
container_status = check_result.stdout.strip()
if not container_status:
logger.info(f"Container {name} does not exist. Will create when run_vm is called.")
return {
"name": name,
"status": "not_found",
"message": "Container doesn't exist yet",
}
# Container exists, check if it's running
is_running = container_status.startswith("Up")
if not is_running:
logger.info(
f"Container {name} exists but is not running. Status: {container_status}"
)
return {
"name": name,
"status": "stopped",
"container_status": container_status,
}
# Container is running, get the IP address and API status from Lumier API
logger.info(f"Container {name} is running. Getting VM status from API.")
# Use the shared lume_api_get function directly
vm_info = lume_api_get(
vm_name=name,
host=self.host,
port=self.lume_port,
storage=storage if storage is not None else self.storage,
debug=self.verbose,
verbose=self.verbose,
)
# Check for API errors
if "error" in vm_info:
# Use debug level instead of warning to reduce log noise during polling
logger.debug(f"API request error: {vm_info['error']}")
return {
"name": name,
"status": "running", # Container is running even if API is not responsive
"api_status": "error",
"error": vm_info["error"],
"container_status": container_status,
}
# Process the VM status information
vm_status = vm_info.get("status", "unknown")
vnc_url = vm_info.get("vncUrl", "")
ip_address = vm_info.get("ipAddress", "")
# IMPORTANT: Always ensure we have a valid IP address for connectivity
# If the API doesn't return an IP address, default to localhost (127.0.0.1)
# This makes the behavior consistent with LumeProvider
if not ip_address and vm_status == "running":
ip_address = "127.0.0.1"
logger.info(f"No IP address returned from API, defaulting to {ip_address}")
vm_info["ipAddress"] = ip_address
logger.info(f"VM {name} status: {vm_status}")
if ip_address and vnc_url:
logger.info(f"VM {name} has IP: {ip_address} and VNC URL: {vnc_url}")
elif not ip_address and not vnc_url and vm_status != "running":
# Not running is expected in this case
logger.info(f"VM {name} is not running yet. Status: {vm_status}")
else:
# Missing IP or VNC but status is running - this is unusual but handled with default IP
logger.warning(
f"VM {name} is running but missing expected fields. API response: {vm_info}"
)
# Return the full status information
return {
"name": name,
"status": vm_status,
"ip_address": ip_address,
"vnc_url": vnc_url,
"api_status": "ok",
"container_status": container_status,
**vm_info, # Include all fields from the API response
}
except subprocess.SubprocessError as e:
logger.error(f"Failed to check container status: {e}")
return {
"name": name,
"status": "error",
"error": f"Failed to check container status: {str(e)}",
}
async def list_vms(self) -> List[Dict[str, Any]]:
"""List all VMs managed by this provider.
For Lumier provider, there is only one VM per container.
"""
try:
status = await self.get_vm("default")
return [status] if status.get("status") != "unknown" else []
except Exception as e:
logger.error(f"Failed to list VMs: {e}")
return []
async def run_vm(
self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None
) -> Dict[str, Any]:
"""Run a VM with the given options.
Args:
image: Name/tag of the image to use
name: Name of the VM to run (used for the container name and Docker image tag)
run_opts: Options for running the VM, including:
- cpu: Number of CPU cores
- memory: Amount of memory (e.g. "8GB")
- noVNC_port: Specific port for noVNC interface
Returns:
Dictionary with VM status information
"""
# Set the container name using the VM name for consistency
self.container_name = name
try:
# First, check if container already exists and remove it
try:
check_cmd = [
"docker",
"ps",
"-a",
"--filter",
f"name={self.container_name}",
"--format",
"{{.ID}}",
]
check_result = subprocess.run(check_cmd, capture_output=True, text=True)
existing_container = check_result.stdout.strip()
if existing_container:
logger.info(f"Removing existing container: {self.container_name}")
remove_cmd = ["docker", "rm", "-f", self.container_name]
subprocess.run(remove_cmd, check=True)
except subprocess.CalledProcessError as e:
logger.warning(f"Error removing existing container: {e}")
# Continue anyway, next steps will fail if there's a real problem
# Prepare the Docker run command
cmd = ["docker", "run", "-d", "--name", self.container_name]
cmd.extend(["-p", f"{self.vnc_port}:8006"])
logger.debug(f"Using specified noVNC_port: {self.vnc_port}")
# Set API URL using the API port
self._api_url = f"http://{self.host}:{self.lume_port}"
# Parse memory setting
memory_mb = self._parse_memory(run_opts.get("memory", "8GB"))
# Add storage volume mount if storage is specified (for persistent VM storage)
if self.storage and self.storage != "ephemeral":
# Create storage directory if it doesn't exist
storage_dir = os.path.abspath(os.path.expanduser(self.storage or ""))
os.makedirs(storage_dir, exist_ok=True)
# Add volume mount for storage
cmd.extend(
["-v", f"{storage_dir}:/storage", "-e", f"HOST_STORAGE_PATH={storage_dir}"]
)
logger.debug(f"Using persistent storage at: {storage_dir}")
# Add shared folder volume mount if shared_path is specified
if self.shared_path:
# Create shared directory if it doesn't exist
shared_dir = os.path.abspath(os.path.expanduser(self.shared_path or ""))
os.makedirs(shared_dir, exist_ok=True)
# Add volume mount for shared folder
cmd.extend(["-v", f"{shared_dir}:/shared", "-e", f"HOST_SHARED_PATH={shared_dir}"])
logger.debug(f"Using shared folder at: {shared_dir}")
# Add environment variables
# Always use the container_name as the VM_NAME for consistency
# Use the VM image passed from the Computer class
logger.debug(f"Using VM image: {self.image}")
# If ghcr.io is in the image, use the full image name
if "ghcr.io" in self.image:
vm_image = self.image
else:
vm_image = f"ghcr.io/trycua/{self.image}"
cmd.extend(
[
"-e",
f"VM_NAME={self.container_name}",
"-e",
f"VERSION={vm_image}",
"-e",
f"CPU_CORES={run_opts.get('cpu', '4')}",
"-e",
f"RAM_SIZE={memory_mb}",
]
)
# Specify the Lumier image with the full image name
lumier_image = "trycua/lumier:latest"
# First check if the image exists locally
try:
logger.debug(f"Checking if Docker image {lumier_image} exists locally...")
check_image_cmd = ["docker", "image", "inspect", lumier_image]
subprocess.run(check_image_cmd, capture_output=True, check=True)
logger.debug(f"Docker image {lumier_image} found locally.")
except subprocess.CalledProcessError:
# Image doesn't exist locally
logger.warning(f"\nWARNING: Docker image {lumier_image} not found locally.")
logger.warning(
"The system will attempt to pull it from Docker Hub, which may fail if you have network connectivity issues."
)
logger.warning(
"If the Docker pull fails, you may need to manually pull the image first with:"
)
logger.warning(f" docker pull {lumier_image}\n")
# Add the image to the command
cmd.append(lumier_image)
# Print the Docker command for debugging
logger.debug(f"DOCKER COMMAND: {' '.join(cmd)}")
# Run the container with improved error handling
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
if (
"no route to host" in str(e.stderr).lower()
or "failed to resolve reference" in str(e.stderr).lower()
):
error_msg = (
f"Network error while trying to pull Docker image '{lumier_image}'\n"
f"Error: {e.stderr}\n\n"
f"SOLUTION: Please try one of the following:\n"
f"1. Check your internet connection\n"
f"2. Pull the image manually with: docker pull {lumier_image}\n"
f"3. Check if Docker is running properly\n"
)
logger.error(error_msg)
raise RuntimeError(error_msg)
raise
# Container started, now check VM status with polling
logger.debug("Container started, checking VM status...")
logger.debug(
"NOTE: This may take some time while the VM image is being pulled and initialized"
)
# Start a background thread to show container logs in real-time
import threading
def show_container_logs():
# Give the container a moment to start generating logs
time.sleep(1)
logger.debug(f"\n---- CONTAINER LOGS FOR '{name}' (LIVE) ----")
logger.debug(
"Showing logs as they are generated. Press Ctrl+C to stop viewing logs...\n"
)
try:
# Use docker logs with follow option
log_cmd = ["docker", "logs", "--tail", "30", "--follow", name]
process = subprocess.Popen(
log_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
)
# Read and print logs line by line
for line in process.stdout:
logger.debug(line, end="")
# Break if process has exited
if process.poll() is not None:
break
except Exception as e:
logger.error(f"\nError showing container logs: {e}")
if self.verbose:
logger.error(f"Error in log streaming thread: {e}")
finally:
logger.debug("\n---- LOG STREAMING ENDED ----")
# Make sure process is terminated
if "process" in locals() and process.poll() is None:
process.terminate()
# Start log streaming in a background thread if verbose mode is enabled
log_thread = threading.Thread(target=show_container_logs)
log_thread.daemon = True # Thread will exit when main program exits
log_thread.start()
# Skip waiting for container readiness and just poll get_vm directly
# Poll the get_vm method indefinitely until the VM is ready with an IP address
attempt = 0
consecutive_errors = 0
vm_running = False
while True: # Wait indefinitely
try:
# Use longer delays to give the system time to initialize
if attempt > 0:
# Start with 5s delay, then increase gradually up to 30s for later attempts
# But use shorter delays while we're getting API errors
if consecutive_errors > 0 and consecutive_errors < 5:
wait_time = 3 # Use shorter delays when we're getting API errors
else:
wait_time = min(30, 5 + (attempt * 2))
logger.debug(f"Waiting {wait_time}s before retry #{attempt+1}...")
await asyncio.sleep(wait_time)
# Try to get VM status
logger.debug(f"Checking VM status (attempt {attempt+1})...")
vm_status = await self.get_vm(name)
# Check for API errors
if "error" in vm_status:
consecutive_errors += 1
error_msg = vm_status.get("error", "Unknown error")
# Only print a user-friendly status message, not the raw error
# since _lume_api_get already logged the technical details
if consecutive_errors == 1 or attempt % 5 == 0:
if "Empty reply from server" in error_msg:
logger.info(
"API server is starting up - container is running, but API isn't fully initialized yet."
)
logger.info(
"This is expected during the initial VM setup - will continue polling..."
)
else:
# Don't repeat the exact same error message each time
logger.warning(
f"API request error (attempt {attempt+1}): {error_msg}"
)
# Just log that we're still working on it
if attempt > 3:
logger.debug(
"Still waiting for the API server to become available..."
)
# If we're getting errors but container is running, that's normal during startup
if vm_status.get("status") == "running":
if not vm_running:
logger.info(
"Container is running, waiting for the VM within it to become fully ready..."
)
logger.info("This might take a minute while the VM initializes...")
vm_running = True
# Increase counter and continue
attempt += 1
continue
# Reset consecutive error counter when we get a successful response
consecutive_errors = 0
# If the VM is running, check if it has an IP address (which means it's fully ready)
if vm_status.get("status") == "running":
vm_running = True
# Check if we have an IP address, which means the VM is fully ready
if "ip_address" in vm_status and vm_status["ip_address"]:
logger.info(
f"VM is now fully running with IP: {vm_status.get('ip_address')}"
)
if "vnc_url" in vm_status and vm_status["vnc_url"]:
logger.info(f"VNC URL: {vm_status.get('vnc_url')}")
return vm_status
else:
logger.debug(
"VM is running but still initializing network interfaces..."
)
logger.debug("Waiting for IP address to be assigned...")
else:
# VM exists but might still be starting up
status = vm_status.get("status", "unknown")
logger.debug(f"VM found but status is: {status}. Continuing to poll...")
# Increase counter for next iteration's delay calculation
attempt += 1
# If we reach a very large number of attempts, give a reassuring message but continue
if attempt % 10 == 0:
logger.debug(
f"Still waiting after {attempt} attempts. This might take several minutes for first-time setup."
)
if not vm_running and attempt >= 20:
logger.warning(
"\nNOTE: First-time VM initialization can be slow as images are downloaded."
)
logger.warning(
"If this continues for more than 10 minutes, you may want to check:"
)
logger.warning(" 1. Docker logs with: docker logs " + name)
logger.warning(" 2. If your network can access container registries")
logger.warning("Press Ctrl+C to abort if needed.\n")
# After 150 attempts (likely over 30-40 minutes), return current status
if attempt >= 150:
logger.debug(
f"Reached 150 polling attempts. VM status is: {vm_status.get('status', 'unknown')}"
)
logger.debug(
"Returning current VM status, but please check Docker logs if there are issues."
)
return vm_status
except Exception as e:
# Always continue retrying, but with increasing delays
logger.warning(
f"Error checking VM status (attempt {attempt+1}): {e}. Will retry."
)
consecutive_errors += 1
# If we've had too many consecutive errors, might be a deeper problem
if consecutive_errors >= 10:
logger.warning(
f"\nWARNING: Encountered {consecutive_errors} consecutive errors while checking VM status."
)
logger.warning(
"You may need to check the Docker container logs or restart the process."
)
logger.warning(f"Error details: {str(e)}\n")
# Increase attempt counter for next iteration
attempt += 1
# After many consecutive errors, add a delay to avoid hammering the system
if attempt > 5:
error_delay = min(30, 10 + attempt)
logger.warning(
f"Multiple connection errors, waiting {error_delay}s before next attempt..."
)
await asyncio.sleep(error_delay)
except subprocess.CalledProcessError as e:
error_msg = (
f"Failed to start Lumier container: {e.stderr if hasattr(e, 'stderr') else str(e)}"
)
logger.error(error_msg)
raise RuntimeError(error_msg)
async def _wait_for_container_ready(self, container_name: str, timeout: int = 90) -> bool:
"""Wait for the Lumier container to be fully ready with a valid API response.
Args:
container_name: Name of the Docker container to check
timeout: Maximum time to wait in seconds (default: 90 seconds)
Returns:
True if the container is running, even if API is not fully ready.
This allows operations to continue with appropriate fallbacks.
"""
start_time = time.time()
api_ready = False
container_running = False
logger.debug(f"Waiting for container {container_name} to be ready (timeout: {timeout}s)...")
while time.time() - start_time < timeout:
# Check if container is running
try:
check_cmd = [
"docker",
"ps",
"--filter",
f"name={container_name}",
"--format",
"{{.Status}}",
]
result = subprocess.run(check_cmd, capture_output=True, text=True, check=True)
container_status = result.stdout.strip()
if container_status and container_status.startswith("Up"):
container_running = True
logger.info(
f"Container {container_name} is running with status: {container_status}"
)
else:
logger.warning(
f"Container {container_name} not yet running, status: {container_status}"
)
# container is not running yet, wait and try again
await asyncio.sleep(2) # Longer sleep to give Docker time
continue
except subprocess.CalledProcessError as e:
logger.warning(f"Error checking container status: {e}")
await asyncio.sleep(2)
continue
# Container is running, check if API is responsive
try:
# First check the health endpoint
api_url = f"http://{self.host}:{self.lume_port}/health"
logger.info(f"Checking API health at: {api_url}")
# Use longer timeout for API health check since it may still be initializing
curl_cmd = ["curl", "-s", "--connect-timeout", "5", "--max-time", "10", api_url]
result = subprocess.run(curl_cmd, capture_output=True, text=True)
if result.returncode == 0 and "ok" in result.stdout.lower():
api_ready = True
logger.info(f"API is ready at {api_url}")
break
else:
# API health check failed, now let's check if the VM status endpoint is responsive
# This covers cases where the health endpoint isn't implemented but the VM API is working
vm_api_url = f"http://{self.host}:{self.lume_port}/lume/vms/{container_name}"
if self.storage:
import urllib.parse
encoded_storage = urllib.parse.quote_plus(self.storage)
vm_api_url += f"?storage={encoded_storage}"
curl_vm_cmd = [
"curl",
"-s",
"--connect-timeout",
"5",
"--max-time",
"10",
vm_api_url,
]
vm_result = subprocess.run(curl_vm_cmd, capture_output=True, text=True)
if vm_result.returncode == 0 and vm_result.stdout.strip():
# VM API responded with something - consider the API ready
api_ready = True
logger.info(f"VM API is ready at {vm_api_url}")
break
else:
curl_code = result.returncode
if curl_code == 0:
curl_code = vm_result.returncode
# Map common curl error codes to helpful messages
if curl_code == 7:
curl_error = "Failed to connect - API server is starting up"
elif curl_code == 22:
curl_error = "HTTP error returned from API server"
elif curl_code == 28:
curl_error = "Operation timeout - API server is slow to respond"
elif curl_code == 52:
curl_error = "Empty reply from server - API is starting but not ready"
elif curl_code == 56:
curl_error = "Network problem during data transfer"
else:
curl_error = f"Unknown curl error code: {curl_code}"
logger.info(f"API not ready yet: {curl_error}")
except subprocess.SubprocessError as e:
logger.warning(f"Error checking API status: {e}")
# If the container is running but API is not ready, that's OK - we'll just wait
# a bit longer before checking again, as the container may still be initializing
elapsed_seconds = time.time() - start_time
if (
int(elapsed_seconds) % 5 == 0
): # Only print status every 5 seconds to reduce verbosity
logger.debug(
f"Waiting for API to initialize... ({elapsed_seconds:.1f}s / {timeout}s)"
)
await asyncio.sleep(3) # Longer sleep between API checks
# Handle timeout - if the container is running but API is not ready, that's not
# necessarily an error - the API might just need more time to start up
if not container_running:
logger.warning(f"Timed out waiting for container {container_name} to start")
return False
if not api_ready:
logger.warning(
f"Container {container_name} is running, but API is not fully ready yet."
)
logger.warning(
"NOTE: You may see some 'API request failed' messages while the API initializes."
)
# Return True if container is running, even if API isn't ready yet
# This allows VM operations to proceed, with appropriate retries for API calls
return container_running
async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Stop a running VM by stopping the Lumier container."""
try:
# Use Docker commands to stop the container directly
if hasattr(self, "_container_id") and self._container_id:
logger.info(f"Stopping Lumier container: {self.container_name}")
cmd = ["docker", "stop", self.container_name]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
logger.info(f"Container stopped: {result.stdout.strip()}")
# Return minimal status info
return {
"name": name,
"status": "stopped",
"container_id": self._container_id,
}
else:
# Try to find the container by name
check_cmd = [
"docker",
"ps",
"-a",
"--filter",
f"name={self.container_name}",
"--format",
"{{.ID}}",
]
check_result = subprocess.run(check_cmd, capture_output=True, text=True)
container_id = check_result.stdout.strip()
if container_id:
logger.info(f"Found container ID: {container_id}")
cmd = ["docker", "stop", self.container_name]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
logger.info(f"Container stopped: {result.stdout.strip()}")
return {
"name": name,
"status": "stopped",
"container_id": container_id,
}
else:
logger.warning(f"No container found with name {self.container_name}")
return {
"name": name,
"status": "unknown",
}
except subprocess.CalledProcessError as e:
error_msg = f"Failed to stop container: {e.stderr if hasattr(e, 'stderr') else str(e)}"
logger.error(error_msg)
raise RuntimeError(f"Failed to stop Lumier container: {error_msg}")
# update_vm is not implemented as it's not needed for Lumier
# The BaseVMProvider requires it, so we provide a minimal implementation
async def update_vm(
self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None
) -> Dict[str, Any]:
"""Not implemented for Lumier provider."""
logger.warning("update_vm is not implemented for Lumier provider")
return {"name": name, "status": "unchanged"}
async def get_logs(
self, name: str, num_lines: int = 100, follow: bool = False, timeout: Optional[int] = None
) -> str:
"""Get the logs from the Lumier container.
Args:
name: Name of the VM/container to get logs for
num_lines: Number of recent log lines to return (default: 100)
follow: If True, follow the logs (stream new logs as they are generated)
timeout: Optional timeout in seconds for follow mode (None means no timeout)
Returns:
Container logs as a string
Note:
If follow=True, this function will continuously stream logs until timeout
or until interrupted. The output will be printed to console in real-time.
"""
if not HAS_LUMIER:
error_msg = "Docker is not available. Cannot get container logs."
logger.error(error_msg)
return error_msg
# Make sure we have a container name
container_name = name
# Check if the container exists and is running
try:
# Check if the container exists
inspect_cmd = ["docker", "container", "inspect", container_name]
result = subprocess.run(inspect_cmd, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f"Container '{container_name}' does not exist or is not accessible"
logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"Error checking container status: {str(e)}"
logger.error(error_msg)
return error_msg
# Base docker logs command
log_cmd = ["docker", "logs"]
# Add tail parameter to limit the number of lines
log_cmd.extend(["--tail", str(num_lines)])
# Handle follow mode with or without timeout
if follow:
log_cmd.append("--follow")
if timeout is not None:
# For follow mode with timeout, we'll run the command and handle the timeout
log_cmd.append(container_name)
logger.info(
f"Following logs for container '{container_name}' with timeout {timeout}s"
)
logger.info(f"\n---- CONTAINER LOGS FOR '{container_name}' (LIVE) ----")
logger.info("Press Ctrl+C to stop following logs\n")
try:
# Run with timeout
process = subprocess.Popen(log_cmd, text=True)
# Wait for the specified timeout
if timeout:
try:
process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
process.terminate() # Stop after timeout
logger.info(
f"\n---- LOG FOLLOWING STOPPED (timeout {timeout}s reached) ----"
)
else:
# Without timeout, wait for user interruption
process.wait()
return "Logs were displayed to console in follow mode"
except KeyboardInterrupt:
process.terminate()
logger.info("\n---- LOG FOLLOWING STOPPED (user interrupted) ----")
return "Logs were displayed to console in follow mode (interrupted)"
else:
# For follow mode without timeout, we'll print a helpful message
log_cmd.append(container_name)
logger.info(f"Following logs for container '{container_name}' indefinitely")
logger.info(f"\n---- CONTAINER LOGS FOR '{container_name}' (LIVE) ----")
logger.info("Press Ctrl+C to stop following logs\n")
try:
# Run the command and let it run until interrupted
process = subprocess.Popen(log_cmd, text=True)
process.wait() # Wait indefinitely (until user interrupts)
return "Logs were displayed to console in follow mode"
except KeyboardInterrupt:
process.terminate()
logger.info("\n---- LOG FOLLOWING STOPPED (user interrupted) ----")
return "Logs were displayed to console in follow mode (interrupted)"
else:
# For non-follow mode, capture and return the logs as a string
log_cmd.append(container_name)
logger.info(f"Getting {num_lines} log lines for container '{container_name}'")
try:
result = subprocess.run(log_cmd, capture_output=True, text=True, check=True)
logs = result.stdout
# Only print header and logs if there's content
if logs.strip():
logger.info(
f"\n---- CONTAINER LOGS FOR '{container_name}' (LAST {num_lines} LINES) ----\n"
)
logger.info(logs)
logger.info("\n---- END OF LOGS ----")
else:
logger.info(f"\nNo logs available for container '{container_name}'")
return logs
except subprocess.CalledProcessError as e:
error_msg = f"Error getting logs: {e.stderr}"
logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"Unexpected error getting logs: {str(e)}"
logger.error(error_msg)
return error_msg
async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
raise NotImplementedError("LumierProvider does not support restarting VMs.")
async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str:
"""Get the IP address of a VM, waiting indefinitely until it's available.
Args:
name: Name of the VM to get the IP for
storage: Optional storage path override
retry_delay: Delay between retries in seconds (default: 2)
Returns:
IP address of the VM when it becomes available
"""
# Use container_name = name for consistency
self.container_name = name
# Track total attempts for logging purposes
total_attempts = 0
# Loop indefinitely until we get a valid IP
while True:
total_attempts += 1
# Log retry message but not on first attempt
if total_attempts > 1:
logger.info(f"Waiting for VM {name} IP address (attempt {total_attempts})...")
try:
# Get VM information
vm_info = await self.get_vm(name, storage=storage)
# Check if we got a valid IP
ip = vm_info.get("ip_address", None)
if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
logger.info(f"Got valid VM IP address: {ip}")
return ip
# Check the VM status
status = vm_info.get("status", "unknown")
# Special handling for Lumier: it may report "stopped" even when the VM is starting
# If the VM information contains an IP but status is stopped, it might be a race condition
if status == "stopped" and "ip_address" in vm_info:
ip = vm_info.get("ip_address")
if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
logger.info(f"Found valid IP {ip} despite VM status being {status}")
return ip
logger.info(f"VM status is {status}, but still waiting for IP to be assigned")
# If VM is not running yet, log and wait
elif status != "running":
logger.info(f"VM is not running yet (status: {status}). Waiting...")
# If VM is running but no IP yet, wait and retry
else:
logger.info("VM is running but no valid IP address yet. Waiting...")
except Exception as e:
logger.warning(f"Error getting VM {name} IP: {e}, continuing to wait...")
# Wait before next retry
await asyncio.sleep(retry_delay)
# Add progress log every 10 attempts
if total_attempts % 10 == 0:
logger.info(f"Still waiting for VM {name} IP after {total_attempts} attempts...")
async def __aenter__(self):
"""Async context manager entry.
This method is called when entering an async context manager block.
Returns self to be used in the context.
"""
logger.debug("Entering LumierProvider context")
# Initialize the API URL with the default value if not already set
# This ensures get_vm can work before run_vm is called
if not hasattr(self, "_api_url") or not self._api_url:
self._api_url = f"http://{self.host}:{self.lume_port}"
logger.info(f"Initialized default Lumier API URL: {self._api_url}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit.
This method is called when exiting an async context manager block.
It handles proper cleanup of resources, including stopping any running containers.
"""
logger.debug(f"Exiting LumierProvider context, handling exceptions: {exc_type}")
try:
# If we have a container ID, we should stop it to clean up resources
if hasattr(self, "_container_id") and self._container_id:
logger.info(f"Stopping Lumier container on context exit: {self.container_name}")
try:
cmd = ["docker", "stop", self.container_name]
subprocess.run(cmd, capture_output=True, text=True, check=True)
logger.info(f"Container stopped during context exit: {self.container_name}")
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to stop container during cleanup: {e.stderr}")
# Don't raise an exception here, we want to continue with cleanup
except Exception as e:
logger.error(f"Error during LumierProvider cleanup: {e}")
# We don't want to suppress the original exception if there was one
if exc_type is None:
raise
# Return False to indicate that any exception should propagate
return False
```