#
tokens: 49025/50000 5/616 files (page 17/20)
lines: off (toggle) GitHub
raw markdown copy
This is page 17 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── bump-version.yml
│       ├── ci-lume.yml
│       ├── docker-publish-cua-linux.yml
│       ├── docker-publish-cua-windows.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── link-check.yml
│       ├── lint.yml
│       ├── npm-publish-cli.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       ├── python-tests.yml
│       ├── test-cua-models.yml
│       └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│   ├── docs.code-workspace
│   ├── extensions.json
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── cloud-windows-ga-macos-preview.md
│   ├── composite-agents.md
│   ├── computer-use-agents-for-growth-hacking.md
│   ├── cua-hackathon.md
│   ├── cua-playground-preview.md
│   ├── cua-vlm-router.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cli.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── neurips-2025-cua-papers.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .env.example
│   ├── .gitignore
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── observability.mdx
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── cua-vlm-router.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   ├── telemetry.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── cli-playbook
│   │       │   ├── commands.mdx
│   │       │   ├── index.mdx
│   │       │   └── meta.json
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── meta.json
│   │       │   ├── sandboxed-python.mdx
│   │       │   └── tracing-api.mdx
│   │       ├── example-usecases
│   │       │   ├── form-filling.mdx
│   │       │   ├── gemini-complex-ui-navigation.mdx
│   │       │   ├── meta.json
│   │       │   ├── post-event-contact-export.mdx
│   │       │   └── windows-app-behind-vpn.mdx
│   │       ├── get-started
│   │       │   ├── meta.json
│   │       │   └── quickstart.mdx
│   │       ├── index.mdx
│   │       ├── macos-vm-cli-playbook
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   └── meta.json
│   │       └── meta.json
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── bg-dark.jpg
│   │       ├── bg-light.jpg
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── grounding-with-gemini3.gif
│   │       ├── hero.png
│   │       ├── laminar_trace_example.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   ├── posthog
│   │   │   │   │   └── [...path]
│   │   │   │   │       └── route.ts
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   ├── llms.txt
│   │   │   │   └── route.ts
│   │   │   ├── robots.ts
│   │   │   └── sitemap.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── analytics-tracker.tsx
│   │   │   ├── cookie-consent.tsx
│   │   │   ├── doc-actions-menu.tsx
│   │   │   ├── editable-code-block.tsx
│   │   │   ├── footer.tsx
│   │   │   ├── hero.tsx
│   │   │   ├── iou.tsx
│   │   │   ├── mermaid.tsx
│   │   │   └── page-feedback.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   ├── mdx-components.tsx
│   │   └── providers
│   │       └── posthog-provider.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── browser_tool_example.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── tracing_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── cua_adapter.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gelato.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── generic_vlm.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   ├── uiins.py
│   │   │   │   │   ├── uitars.py
│   │   │   │   │   └── uitars2.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── tools
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── browser_tool.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer_agent.py
│   │   ├── bench-ui
│   │   │   ├── bench_ui
│   │   │   │   ├── __init__.py
│   │   │   │   ├── api.py
│   │   │   │   └── child.py
│   │   │   ├── examples
│   │   │   │   ├── folder_example.py
│   │   │   │   ├── gui
│   │   │   │   │   ├── index.html
│   │   │   │   │   ├── logo.svg
│   │   │   │   │   └── styles.css
│   │   │   │   ├── output_overlay.png
│   │   │   │   └── simple_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       └── test_port_detection.py
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── tracing_wrapper.py
│   │   │   │   ├── tracing.py
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer.py
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── browser.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   ├── utils
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── wallpaper.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   ├── test_connection.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_server.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_telemetry.py
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── build-extension.py
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── desktop-extension
│   │   │   │   ├── cua-extension.mcpb
│   │   │   │   ├── desktop_extension.png
│   │   │   │   ├── manifest.json
│   │   │   │   ├── README.md
│   │   │   │   ├── requirements.txt
│   │   │   │   ├── run_server.sh
│   │   │   │   └── setup.py
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── QUICK_TEST_COMMANDS.sh
│   │   │   ├── quick_test_local_option.py
│   │   │   ├── README.md
│   │   │   ├── scripts
│   │   │   │   ├── install_mcp_server.sh
│   │   │   │   └── start_mcp_server.sh
│   │   │   ├── test_mcp_server_local_option.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_mcp_server.py
│   │   ├── pylume
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_pylume.py
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           ├── conftest.py
│   │           └── test_omniparser.py
│   ├── qemu-docker
│   │   ├── linux
│   │   │   ├── Dockerfile
│   │   │   ├── README.md
│   │   │   └── src
│   │   │       ├── entry.sh
│   │   │       └── vm
│   │   │           ├── image
│   │   │           │   └── README.md
│   │   │           └── setup
│   │   │               ├── install.sh
│   │   │               ├── setup-cua-server.sh
│   │   │               └── setup.sh
│   │   ├── README.md
│   │   └── windows
│   │       ├── Dockerfile
│   │       ├── README.md
│   │       └── src
│   │           ├── entry.sh
│   │           └── vm
│   │               ├── image
│   │               │   └── README.md
│   │               └── setup
│   │                   ├── install.bat
│   │                   ├── on-logon.ps1
│   │                   ├── setup-cua-server.ps1
│   │                   ├── setup-utils.psm1
│   │                   └── setup.ps1
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── cua-cli
│   │   │   ├── .gitignore
│   │   │   ├── .prettierrc
│   │   │   ├── bun.lock
│   │   │   ├── CLAUDE.md
│   │   │   ├── index.ts
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── auth.ts
│   │   │   │   ├── cli.ts
│   │   │   │   ├── commands
│   │   │   │   │   ├── auth.ts
│   │   │   │   │   └── sandbox.ts
│   │   │   │   ├── config.ts
│   │   │   │   ├── http.ts
│   │   │   │   ├── storage.ts
│   │   │   │   └── util.ts
│   │   │   └── tsconfig.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Development.md
│       ├── Dockerfile
│       ├── Dockerfile.dev
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── install-cli.ps1
│   ├── install-cli.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   ├── run-docker-dev.sh
│   └── typescript-typecheck.js
├── TESTING.md
├── tests
│   ├── agent_loop_testing
│   │   ├── agent_test.py
│   │   └── README.md
│   ├── pytest.ini
│   ├── shell_cmd.py
│   ├── test_files.py
│   ├── test_mcp_server_session_management.py
│   ├── test_mcp_server_streaming.py
│   ├── test_shell_bash.py
│   ├── test_telemetry.py
│   ├── test_tracing.py
│   ├── test_venv.py
│   └── test_watchdog.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/glm45v.py:
--------------------------------------------------------------------------------

```python
"""
GLM-4.5V agent loop implementation using liteLLM for GLM-4.5V model.
Supports vision-language models for computer control with bounding box parsing.
"""

import asyncio
import base64
import json
import re
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple

import litellm
from litellm.responses.litellm_completion_transformation.transformation import (
    LiteLLMCompletionResponsesConfig,
)
from litellm.types.utils import ModelResponse
from PIL import Image

from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..responses import (
    convert_completion_messages_to_responses_items,
    convert_responses_items_to_completion_messages,
    make_click_item,
    make_double_click_item,
    make_drag_item,
    make_input_image_item,
    make_keypress_item,
    make_output_text_item,
    make_reasoning_item,
    make_scroll_item,
    make_type_item,
    make_wait_item,
)
from ..types import AgentCapability, AgentResponse, Messages, Tools

# GLM-4.5V specific constants
GLM_ACTION_SPACE = """
### {left,right,middle}_click

Call rule: `{left,right,middle}_click(start_box='[x,y]', element_info='')`
{
    'name': ['left_click', 'right_click', 'middle_click'],
    'description': 'Perform a left/right/middle mouse click at the specified coordinates on the screen.',
    'parameters': {
        'type': 'object',
        'properties': {
            'start_box': {
                'type': 'array',
                'items': {
                    'type': 'integer'
                },
                'description': 'Coordinates [x,y] where to perform the click, normalized to 0-999 range.'
            },
            'element_info': {
                'type': 'string',
                'description': 'Optional text description of the UI element being clicked.'
            }
        },
        'required': ['start_box']
    }
}

### hover

Call rule: `hover(start_box='[x,y]', element_info='')`
{
    'name': 'hover',
    'description': 'Move the mouse pointer to the specified coordinates without performing any click action.',
    'parameters': {
        'type': 'object',
        'properties': {
            'start_box': {
                'type': 'array',
                'items': {
                    'type': 'integer'
                },
                'description': 'Coordinates [x,y] where to move the mouse pointer, normalized to 0-999 range.'
            },
            'element_info': {
                'type': 'string',
                'description': 'Optional text description of the UI element being hovered over.'
            }
        },
        'required': ['start_box']
    }
}

### left_double_click

Call rule: `left_double_click(start_box='[x,y]', element_info='')`
{
    'name': 'left_double_click',
    'description': 'Perform a left mouse double-click at the specified coordinates on the screen.',
    'parameters': {
        'type': 'object',
        'properties': {
            'start_box': {
                'type': 'array',
                'items': {
                    'type': 'integer'
                },
                'description': 'Coordinates [x,y] where to perform the double-click, normalized to 0-999 range.'
            },
            'element_info': {
                'type': 'string',
                'description': 'Optional text description of the UI element being double-clicked.'
            }
        },
        'required': ['start_box']
    }
}

### left_drag

Call rule: `left_drag(start_box='[x1,y1]', end_box='[x2,y2]', element_info='')`
{
    'name': 'left_drag',
    'description': 'Drag the mouse from starting coordinates to ending coordinates while holding the left mouse button.',
    'parameters': {
        'type': 'object',
        'properties': {
            'start_box': {
                'type': 'array',
                'items': {
                    'type': 'integer'
                },
                'description': 'Starting coordinates [x1,y1] for the drag operation, normalized to 0-999 range.'
            },
            'end_box': {
                'type': 'array',
                'items': {
                    'type': 'integer'
                },
                'description': 'Ending coordinates [x2,y2] for the drag operation, normalized to 0-999 range.'
            },
            'element_info': {
                'type': 'string',
                'description': 'Optional text description of the UI element being dragged.'
            }
        },
        'required': ['start_box', 'end_box']
    }
}

### key

Call rule: `key(keys='')`
{
    'name': 'key',
    'description': 'Simulate pressing a single key or combination of keys on the keyboard.',
    'parameters': {
        'type': 'object',
        'properties': {
            'keys': {
                'type': 'string',
                'description': 'The key or key combination to press. Use '+' to separate keys in combinations (e.g., 'ctrl+c', 'alt+tab').'
            }
        },
        'required': ['keys']
    }
}

### type

Call rule: `type(content='')`
{
    'name': 'type',
    'description': 'Type text content into the currently focused text input field. This action only performs typing and does not handle field activation or clearing.',
    'parameters': {
        'type': 'object',
        'properties': {
            'content': {
                'type': 'string',
                'description': 'The text content to be typed into the active text field.'
            }
        },
        'required': ['content']
    }
}

### scroll

Call rule: `scroll(start_box='[x,y]', direction='', step=5, element_info='')`
{
    'name': 'scroll',
    'description': 'Scroll an element at the specified coordinates in the specified direction by a given number of wheel steps.',
    'parameters': {
        'type': 'object',
        'properties': {
            'start_box': {
                'type': 'array',
                'items': {
                    'type': 'integer'
                },
                'description': 'Coordinates [x,y] of the element or area to scroll, normalized to 0-999 range.'
            },
            'direction': {
                'type': 'string',
                'enum': ['down', 'up'],
                'description': 'The direction to scroll: 'down' or 'up'.'
            },
            'step': {
                'type': 'integer',
                'default': 5,
                'description': 'Number of wheel steps to scroll, default is 5.'
            },
            'element_info': {
                'type': 'string',
                'description': 'Optional text description of the UI element being scrolled.'
            }
        },
        'required': ['start_box', 'direction']
    }
}

### WAIT

Call rule: `WAIT()`
{
    'name': 'WAIT',
    'description': 'Wait for 5 seconds before proceeding to the next action.',
    'parameters': {
        'type': 'object',
        'properties': {},
        'required': []
    }
}

### DONE

Call rule: `DONE()`
{
    'name': 'DONE',
    'description': 'Indicate that the current task has been completed successfully and no further actions are needed.',
    'parameters': {
        'type': 'object',
        'properties': {},
        'required': []
    }
}

### FAIL

Call rule: `FAIL()`
{
    'name': 'FAIL',
    'description': 'Indicate that the current task cannot be completed or is impossible to accomplish.',
    'parameters': {
        'type': 'object',
        'properties': {},
        'required': []
    }
}"""


def encode_image_to_base64(image_path: str) -> str:
    """Encode image file to base64 string with data URI."""
    with open(image_path, "rb") as image_file:
        encoded_string = base64.b64encode(image_file.read()).decode("utf-8")
        return f"data:image/png;base64,{encoded_string}"


def parse_glm_response(response: str) -> Dict[str, Any]:
    """
    Parse GLM-4.5V response to extract action and memory.

    The special tokens <|begin_of_box|> and <|end_of_box|> mark bounding boxes.
    Coordinates are normalized values between 0 and 1000.
    """
    # Extract action from between special tokens
    pattern = r"<\|begin_of_box\|>(.*?)<\|end_of_box\|>"
    match = re.search(pattern, response)
    if match:
        action = match.group(1).strip()
    else:
        # Fallback: look for function call patterns
        action_pattern = r"[\w_]+\([^)]*\)"
        matches = re.findall(action_pattern, response)
        action = matches[0] if matches else None

    # Extract memory section
    memory_pattern = r"Memory:(.*?)$"
    memory_match = re.search(memory_pattern, response, re.DOTALL)
    memory = memory_match.group(1).strip() if memory_match else "[]"

    # Extract action text (everything before Memory:)
    action_text_pattern = r"^(.*?)Memory:"
    action_text_match = re.search(action_text_pattern, response, re.DOTALL)
    action_text = action_text_match.group(1).strip() if action_text_match else response

    # Clean up action text by removing special tokens
    if action_text:
        action_text = action_text.replace("<|begin_of_box|>", "").replace("<|end_of_box|>", "")

    return {"action": action, "action_text": action_text, "memory": memory}


def get_last_image_from_messages(messages: Messages) -> Optional[str]:
    """Extract the last image from messages for processing."""
    for message in reversed(messages):
        if isinstance(message, dict):
            if message.get("type") == "computer_call_output":
                output = message.get("output", {})
                if isinstance(output, dict) and output.get("type") == "input_image":
                    image_url = output.get("image_url", "")
                    if isinstance(image_url, str) and image_url.startswith("data:image/"):
                        # Extract base64 part
                        return image_url.split(",", 1)[1]
            elif message.get("role") == "user":
                content = message.get("content", [])
                if isinstance(content, list):
                    for item in reversed(content):
                        if isinstance(item, dict) and item.get("type") == "image_url":
                            image_url_obj = item.get("image_url", {})
                            if isinstance(image_url_obj, dict):
                                image_url = image_url_obj.get("url", "")
                                if isinstance(image_url, str) and image_url.startswith(
                                    "data:image/"
                                ):
                                    return image_url.split(",", 1)[1]
    return None


def convert_responses_items_to_glm45v_pc_prompt(
    messages: Messages, task: str, memory: str = ""
) -> List[Dict[str, Any]]:
    """Convert responses items to GLM-4.5V PC prompt format with historical actions.

    Args:
        messages: List of message items from the conversation
        task: The task description
        memory: Current memory state

    Returns:
        List of content items for the prompt (text and image_url items)
    """
    action_space = GLM_ACTION_SPACE

    # Template head
    head_text = f"""You are a GUI Agent, and your primary task is to respond accurately to user requests or questions. In addition to directly answering the user's queries, you can also use tools or perform GUI operations directly until you fulfill the user's request or provide a correct answer. You should carefully read and understand the images and questions provided by the user, and engage in thinking and reflection when appropriate. The coordinates involved are all represented in thousandths (0-999).

# Task:
{task}

# Task Platform
Ubuntu

# Action Space
{action_space}

# Historical Actions and Current Memory
History:"""

    # Template tail
    tail_text = f"""
Memory:
{memory}
# Output Format
Plain text explanation with action(param='...')
Memory:
[{{"key": "value"}}, ...]

# Some Additional Notes
- I'll give you the most recent 4 history screenshots(shrunked to 50%*50%) along with the historical action steps.
- You should put the key information you *have to remember* in a seperated memory part and I'll give it to you in the next round. The content in this part should be a dict list. If you no longer need some given information, you should remove it from the memory. Even if you don't need to remember anything, you should also output an empty list.
- My computer's password is "password", feel free to use it when you need sudo rights.
- For the thunderbird account "[email protected]", the password is "gTCI";=@y7|QJ0nDa_kN3Sb&>".

Current Screenshot:
"""

    # Build history from messages
    history = []
    history_images = []

    # Group messages into steps
    current_step = []
    step_num = 0

    for message in messages:
        msg_type = message.get("type")

        if msg_type == "reasoning":
            current_step.append(message)
        elif msg_type == "message" and message.get("role") == "assistant":
            current_step.append(message)
        elif msg_type == "computer_call":
            current_step.append(message)
        elif msg_type == "computer_call_output":
            current_step.append(message)
            # End of step - process it
            if current_step:
                step_num += 1

                # Extract bot thought from message content
                bot_thought = ""
                for item in current_step:
                    if item.get("type") == "message" and item.get("role") == "assistant":
                        content = item.get("content", [])
                        for content_item in content:
                            if content_item.get("type") == "output_text":
                                bot_thought = content_item.get("text", "")
                                break
                        break

                # Extract action from computer_call
                action_text = ""
                for item in current_step:
                    if item.get("type") == "computer_call":
                        action = item.get("action", {})
                        action_type = action.get("type", "")

                        if action_type == "click":
                            x, y = action.get("x", 0), action.get("y", 0)
                            # Convert to 0-999 range (assuming screen dimensions)
                            # For now, use direct coordinates - this may need adjustment
                            action_text = f"left_click(start_box='[{x},{y}]')"
                        elif action_type == "double_click":
                            x, y = action.get("x", 0), action.get("y", 0)
                            action_text = f"left_double_click(start_box='[{x},{y}]')"
                        elif action_type == "right_click":
                            x, y = action.get("x", 0), action.get("y", 0)
                            action_text = f"right_click(start_box='[{x},{y}]')"
                        elif action_type == "drag":
                            # Handle drag with path
                            path = action.get("path", [])
                            if len(path) >= 2:
                                start = path[0]
                                end = path[-1]
                                action_text = f"left_drag(start_box='[{start.get('x', 0)},{start.get('y', 0)}]', end_box='[{end.get('x', 0)},{end.get('y', 0)}]')"
                        elif action_type == "keypress":
                            key = action.get("key", "")
                            action_text = f"key(keys='{key}')"
                        elif action_type == "type":
                            text = action.get("text", "")
                            action_text = f"type(content='{text}')"
                        elif action_type == "scroll":
                            x, y = action.get("x", 0), action.get("y", 0)
                            direction = action.get("direction", "down")
                            action_text = f"scroll(start_box='[{x},{y}]', direction='{direction}')"
                        elif action_type == "wait":
                            action_text = "WAIT()"
                        break

                # Extract screenshot from computer_call_output
                screenshot_url = None
                for item in current_step:
                    if item.get("type") == "computer_call_output":
                        output = item.get("output", {})
                        if output.get("type") == "input_image":
                            screenshot_url = output.get("image_url", "")
                            break

                # Store step info
                step_info = {
                    "step_num": step_num,
                    "bot_thought": bot_thought,
                    "action_text": action_text,
                    "screenshot_url": screenshot_url,
                }
                history.append(step_info)

                # Store screenshot for last 4 steps
                if screenshot_url:
                    history_images.append(screenshot_url)

                current_step = []

    # Build content array with head, history, and tail
    content = []
    current_text = head_text

    total_history_steps = len(history)
    history_image_count = min(4, len(history_images))  # Last 4 images

    for step_idx, step_info in enumerate(history):
        step_num = step_info["step_num"]
        bot_thought = step_info["bot_thought"]
        action_text = step_info["action_text"]

        if step_idx < total_history_steps - history_image_count:
            # For steps beyond the last 4, use text placeholder
            current_text += f"\nstep {step_num}: Screenshot:(Omitted in context.) Thought: {bot_thought}\nAction: {action_text}"
        else:
            # For the last 4 steps, insert images
            current_text += f"\nstep {step_num}: Screenshot:"
            content.append({"type": "text", "text": current_text})

            # Add image
            img_idx = step_idx - (total_history_steps - history_image_count)
            if img_idx < len(history_images):
                content.append({"type": "image_url", "image_url": {"url": history_images[img_idx]}})

            current_text = f" Thought: {bot_thought}\nAction: {action_text}"

    # Add tail
    current_text += tail_text
    content.append({"type": "text", "text": current_text})

    return content


def model_dump(obj) -> Dict[str, Any]:
    if isinstance(obj, dict):
        return {k: model_dump(v) for k, v in obj.items()}
    elif hasattr(obj, "model_dump"):
        return obj.model_dump()
    else:
        return obj


def convert_glm_completion_to_responses_items(
    response: ModelResponse, image_width: int, image_height: int
) -> List[Dict[str, Any]]:
    """
    Convert GLM-4.5V completion response to responses items format.

    Args:
        response: LiteLLM ModelResponse from GLM-4.5V
        image_width: Original image width for coordinate scaling
        image_height: Original image height for coordinate scaling

    Returns:
        List of response items in the proper format
    """
    import uuid

    response_items = []

    if not response.choices or not response.choices[0].message:
        return response_items

    message = response.choices[0].message
    content = message.content or ""
    reasoning_content = getattr(message, "reasoning_content", None)

    # Add reasoning item if present
    if reasoning_content:
        reasoning_item = model_dump(make_reasoning_item(reasoning_content))
        response_items.append(reasoning_item)

    # Parse the content to extract action and text
    parsed_response = parse_glm_response(content)
    action = parsed_response.get("action", "")
    action_text = parsed_response.get("action_text", "")

    # Add message item with text content (excluding action and memory)
    if action_text:
        # Remove action from action_text if it's there
        clean_text = action_text
        if action and action in clean_text:
            clean_text = clean_text.replace(action, "").strip()

        # Remove memory section
        memory_pattern = r"Memory:\s*\[.*?\]\s*$"
        clean_text = re.sub(memory_pattern, "", clean_text, flags=re.DOTALL).strip()

        if clean_text:
            message_item = model_dump(make_output_text_item(clean_text))
            response_items.append(message_item)

    # Convert action to computer call if present
    if action:
        call_id = f"call_{uuid.uuid4().hex[:8]}"

        # Parse different action types and create appropriate computer calls
        if action.startswith("left_click"):
            coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action)
            if coord_match:
                x, y = int(coord_match.group(1)), int(coord_match.group(2))
                # Convert from 0-999 to actual pixel coordinates
                actual_x = int((x / 999.0) * image_width)
                actual_y = int((y / 999.0) * image_height)
                computer_call = model_dump(make_click_item(actual_x, actual_y))
                computer_call["call_id"] = call_id
                computer_call["status"] = "completed"
                response_items.append(computer_call)

        elif action.startswith("right_click"):
            coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action)
            if coord_match:
                x, y = int(coord_match.group(1)), int(coord_match.group(2))
                actual_x = int((x / 999.0) * image_width)
                actual_y = int((y / 999.0) * image_height)
                computer_call = model_dump(make_click_item(actual_x, actual_y, button="right"))
                computer_call["call_id"] = call_id
                computer_call["status"] = "completed"
                response_items.append(computer_call)

        elif action.startswith("left_double_click"):
            coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action)
            if coord_match:
                x, y = int(coord_match.group(1)), int(coord_match.group(2))
                actual_x = int((x / 999.0) * image_width)
                actual_y = int((y / 999.0) * image_height)
                computer_call = model_dump(make_double_click_item(actual_x, actual_y))
                computer_call["call_id"] = call_id
                computer_call["status"] = "completed"
                response_items.append(computer_call)

        elif action.startswith("left_drag"):
            start_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action)
            end_match = re.search(r"end_box='?\[(\d+),\s*(\d+)\]'?", action)
            if start_match and end_match:
                x1, y1 = int(start_match.group(1)), int(start_match.group(2))
                x2, y2 = int(end_match.group(1)), int(end_match.group(2))
                actual_x1 = int((x1 / 999.0) * image_width)
                actual_y1 = int((y1 / 999.0) * image_height)
                actual_x2 = int((x2 / 999.0) * image_width)
                actual_y2 = int((y2 / 999.0) * image_height)
                # Create path for drag operation
                drag_path = [{"x": actual_x1, "y": actual_y1}, {"x": actual_x2, "y": actual_y2}]
                computer_call = model_dump(make_drag_item(drag_path))
                computer_call["call_id"] = call_id
                computer_call["status"] = "completed"
                response_items.append(computer_call)

        elif action.startswith("key"):
            key_match = re.search(r"keys='([^']+)'", action)
            if key_match:
                keys = key_match.group(1)
                # Split keys by '+' for key combinations, or use as single key
                key_list = keys.split("+") if "+" in keys else [keys]
                computer_call = model_dump(make_keypress_item(key_list))
                computer_call["call_id"] = call_id
                computer_call["status"] = "completed"
                response_items.append(computer_call)

        elif action.startswith("type"):
            content_match = re.search(r"content='([^']*)'", action)
            if content_match:
                content = content_match.group(1)
                computer_call = model_dump(make_type_item(content))
                computer_call["call_id"] = call_id
                computer_call["status"] = "completed"
                response_items.append(computer_call)

        elif action.startswith("scroll"):
            coord_match = re.search(r"start_box='?\[(\d+),\s*(\d+)\]'?", action)
            direction_match = re.search(r"direction='([^']+)'", action)
            if coord_match and direction_match:
                x, y = int(coord_match.group(1)), int(coord_match.group(2))
                direction = direction_match.group(1)
                actual_x = int((x / 999.0) * image_width)
                actual_y = int((y / 999.0) * image_height)
                # Convert direction to scroll amounts
                scroll_x, scroll_y = 0, 0
                if direction == "up":
                    scroll_y = -5
                elif direction == "down":
                    scroll_y = 5
                elif direction == "left":
                    scroll_x = -5
                elif direction == "right":
                    scroll_x = 5
                computer_call = model_dump(make_scroll_item(actual_x, actual_y, scroll_x, scroll_y))
                computer_call["call_id"] = call_id
                computer_call["status"] = "completed"
                response_items.append(computer_call)

        elif action == "WAIT()":
            computer_call = model_dump(make_wait_item())
            computer_call["call_id"] = call_id
            computer_call["status"] = "completed"
            response_items.append(computer_call)

    return response_items


@register_agent(models=r"(?i).*GLM-4\.5V.*")
class Glm4vConfig(AsyncAgentConfig):
    """GLM-4.5V agent configuration using liteLLM."""

    async def predict_step(
        self,
        messages: List[Dict[str, Any]],
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        use_prompt_caching: Optional[bool] = False,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs,
    ) -> Dict[str, Any]:
        """
        Predict the next step using GLM-4.5V model.

        Args:
            messages: Input messages following Responses format
            model: Model name to use
            tools: Optional list of tool schemas
            max_retries: Maximum number of retries for API calls
            stream: Whether to stream the response
            computer_handler: Computer handler for taking screenshots
            use_prompt_caching: Whether to use prompt caching
            _on_api_start: Callback for API start
            _on_api_end: Callback for API end
            _on_usage: Callback for usage tracking
            _on_screenshot: Callback for screenshot events

        Returns:
            Dict with "output" and "usage" keys
        """
        # Get the user instruction from the last user message
        user_instruction = ""
        for message in reversed(messages):
            if isinstance(message, dict) and message.get("role") == "user":
                content = message.get("content", "")
                if isinstance(content, str):
                    user_instruction = content
                elif isinstance(content, list):
                    for item in content:
                        if isinstance(item, dict) and item.get("type") == "text":
                            user_instruction = item.get("text", "")
                            break
                break

        # Get the last image for processing
        last_image_b64 = get_last_image_from_messages(messages)
        if not last_image_b64 and computer_handler:
            # Take a screenshot if no image available
            screenshot_b64 = await computer_handler.screenshot()
            if screenshot_b64:
                last_image_b64 = screenshot_b64
                if _on_screenshot:
                    await _on_screenshot(screenshot_b64)

        if not last_image_b64:
            raise ValueError("No image available for GLM-4.5V processing")

        # Convert responses items to GLM-4.5V PC prompt format with historical actions
        prompt_content = convert_responses_items_to_glm45v_pc_prompt(
            messages=messages,
            task=user_instruction,
            memory="[]",  # Initialize with empty memory for now
        )

        # Add the current screenshot to the end
        prompt_content.append(
            {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{last_image_b64}"}}
        )

        # Prepare messages for liteLLM
        litellm_messages = [
            {"role": "system", "content": "You are a helpful GUI agent assistant."},
            {"role": "user", "content": prompt_content},
        ]

        # Prepare API call kwargs
        api_kwargs = {
            "model": model,
            "messages": litellm_messages,
            # "max_tokens": 2048,
            # "temperature": 0.001,
            # "extra_body": {
            #     "skip_special_tokens": False,
            # }
        }
        api_kwargs.update({k: v for k, v in (kwargs or {}).items()})

        # Add API callbacks
        if _on_api_start:
            await _on_api_start(api_kwargs)

        # Call liteLLM
        response = await litellm.acompletion(**api_kwargs)

        if _on_api_end:
            await _on_api_end(api_kwargs, response)

        # Get image dimensions for coordinate scaling
        image_width, image_height = 1920, 1080  # Default dimensions

        # Try to get actual dimensions from the image
        try:
            image_data = base64.b64decode(last_image_b64)
            image = Image.open(BytesIO(image_data))
            image_width, image_height = image.size
        except Exception:
            pass  # Use default dimensions

        # Convert GLM completion response to responses items
        response_items = convert_glm_completion_to_responses_items(
            response, image_width, image_height
        )

        # Extract usage information
        response_usage = {
            **LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage(
                response.usage
            ).model_dump(),
            "response_cost": response._hidden_params.get("response_cost", 0.0),
        }
        if _on_usage:
            await _on_usage(response_usage)

        # Create agent response
        agent_response = {"output": response_items, "usage": response_usage}

        return agent_response

    async def predict_click(
        self, model: str, image_b64: str, instruction: str, **kwargs
    ) -> Optional[Tuple[int, int]]:
        """
        Predict click coordinates using GLM-4.5V model.

        Args:
            model: Model name to use
            image_b64: Base64 encoded image
            instruction: Instruction for where to click

        Returns:
            Tuple with (x, y) coordinates or None
        """
        try:
            # Create a simple click instruction prompt
            click_prompt = f"""You are a GUI agent. Look at the screenshot and identify where to click for: {instruction}

Respond with a single click action in this format:
left_click(start_box='[x,y]')

Where x,y are coordinates normalized to 0-999 range."""

            # Prepare messages for liteLLM
            litellm_messages = [
                {"role": "system", "content": "You are a helpful GUI agent assistant."},
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": click_prompt},
                        {
                            "type": "image_url",
                            "image_url": {"url": f"data:image/png;base64,{image_b64}"},
                        },
                    ],
                },
            ]

            # Prepare API call kwargs
            api_kwargs = {
                "model": model,
                "messages": litellm_messages,
                "max_tokens": 2056,
                "temperature": 0.001,
                "extra_body": {
                    "skip_special_tokens": False,
                },
            }
            api_kwargs.update({k: v for k, v in (kwargs or {}).items()})

            # Call liteLLM
            response = await litellm.acompletion(**api_kwargs)

            # Extract response content
            response_content = response.choices[0].message.content.strip()
            print(response)

            # Parse response for click coordinates
            # Look for coordinates in the response, handling special tokens
            coord_pattern = r"<\|begin_of_box\|>.*?left_click\(start_box='?\[(\d+),(\d+)\]'?\).*?<\|end_of_box\|>"
            match = re.search(coord_pattern, response_content)

            if not match:
                # Fallback: look for coordinates without special tokens
                coord_pattern = r"left_click\(start_box='?\[(\d+),(\d+)\]'?\)"
                match = re.search(coord_pattern, response_content)

            if match:
                x, y = int(match.group(1)), int(match.group(2))

                # Get actual image dimensions for scaling
                try:
                    image_data = base64.b64decode(image_b64)
                    image = Image.open(BytesIO(image_data))
                    image_width, image_height = image.size
                except Exception:
                    # Use default dimensions
                    image_width, image_height = 1920, 1080

                # Convert from 0-999 normalized coordinates to actual pixel coordinates
                actual_x = int((x / 999.0) * image_width)
                actual_y = int((y / 999.0) * image_height)

                return (actual_x, actual_y)

            return None

        except Exception as e:
            # Log error and return None
            print(f"Error in predict_click: {e}")
            return None

    def get_capabilities(self) -> List[AgentCapability]:
        """
        Get list of capabilities supported by this agent config.

        Returns:
            List of capability strings
        """
        return ["step", "click"]

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/uitars2.py:
--------------------------------------------------------------------------------

```python
"""
UITARS-2 agent loop implementation using LiteLLM.
- Prepends a system prompt modeled after the training prompts in examples/seed_16_gui.ipynb
- Converts Responses items -> completion messages
- Calls litellm.acompletion
- Parses <seed:tool_call> ... </seed:tool_call> outputs back into Responses items (computer actions)
"""

from __future__ import annotations

import base64
import io
import json
import re
from typing import Any, Dict, List, Optional, Tuple

import litellm
from litellm.responses.litellm_completion_transformation.transformation import (
    LiteLLMCompletionResponsesConfig,
)

from ..decorators import register_agent
from .omniparser import get_last_computer_call_output  # type: ignore

try:
    from PIL import Image  # type: ignore
except Exception:  # pragma: no cover
    Image = None  # type: ignore
from ..responses import (
    convert_responses_items_to_completion_messages,
    make_click_item,
    make_double_click_item,
    make_drag_item,
    make_function_call_item,
    make_keypress_item,
    make_move_item,
    make_output_text_item,
    make_reasoning_item,
    make_screenshot_item,
    make_scroll_item,
    make_type_item,
    make_wait_item,
)
from ..types import AgentCapability

TOOL_SCHEMAS: List[Dict[str, Any]] = [
    {
        "type": "function",
        "name": "open_computer",
        "parameters": {},
        "description": "Open computer.",
    },
    {
        "type": "function",
        "name": "click",
        "parameters": {
            "type": "object",
            "properties": {
                "point": {
                    "type": "string",
                    "description": "Click coordinates. The format is: <point>x y</point>",
                }
            },
            "required": ["point"],
        },
        "description": "Mouse left single click action.",
    },
    {
        "type": "function",
        "name": "left_double",
        "parameters": {
            "type": "object",
            "properties": {
                "point": {
                    "type": "string",
                    "description": "Click coordinates. The format is: <point>x y</point>",
                }
            },
            "required": ["point"],
        },
        "description": "Mouse left double click action.",
    },
    {
        "type": "function",
        "name": "right_single",
        "parameters": {
            "type": "object",
            "properties": {
                "point": {
                    "type": "string",
                    "description": "Click coordinates. The format is: <point>x y</point>",
                }
            },
            "required": ["point"],
        },
        "description": "Mouse right single click action.",
    },
    {
        "type": "function",
        "name": "scroll",
        "parameters": {
            "type": "object",
            "properties": {
                "point": {
                    "type": "string",
                    "description": "Scroll start position. If not specified, default to execute on the current mouse position. The format is: <point>x y</point>",
                },
                "direction": {
                    "type": "string",
                    "description": "Scroll direction.",
                    "enum": ["up", "down", "left", "right"],
                },
            },
            "required": ["direction"],
        },
        "description": "Scroll action.",
    },
    {
        "type": "function",
        "name": "move_to",
        "parameters": {
            "type": "object",
            "properties": {
                "point": {
                    "type": "string",
                    "description": "Target coordinates. The format is: <point>x y</point>",
                }
            },
            "required": ["point"],
        },
        "description": "Mouse move action.",
    },
    {
        "type": "function",
        "name": "hotkey",
        "parameters": {
            "type": "object",
            "properties": {
                "key": {
                    "type": "string",
                    "description": "Hotkeys you want to press. Split keys with a space and use lowercase.",
                }
            },
            "required": ["key"],
        },
        "description": "Press hotkey.",
    },
    {
        "type": "function",
        "name": "finished",
        "parameters": {
            "type": "object",
            "properties": {
                "content": {
                    "type": "string",
                    "description": "Provide the final answer or response to complete the task.",
                }
            },
            "required": [],
        },
        "description": "This function is used to indicate the completion of a task by providing the final answer or response.",
    },
    {
        "type": "function",
        "name": "press",
        "parameters": {
            "type": "object",
            "properties": {
                "key": {
                    "type": "string",
                    "description": "Key you want to press. Only one key can be pressed at one time.",
                }
            },
            "required": ["key"],
        },
        "description": "Press key.",
    },
    {
        "type": "function",
        "name": "release",
        "parameters": {
            "type": "object",
            "properties": {
                "key": {
                    "type": "string",
                    "description": "Key you want to release. Only one key can be released at one time.",
                }
            },
            "required": ["key"],
        },
        "description": "Release key.",
    },
    {
        "type": "function",
        "name": "mouse_down",
        "parameters": {
            "type": "object",
            "properties": {
                "point": {
                    "type": "string",
                    "description": "Mouse down position. If not specified, default to execute on the current mouse position. The format is: <point>x y</point>",
                },
                "button": {
                    "type": "string",
                    "description": "Down button. Default to left.",
                    "enum": ["left", "right"],
                },
            },
            "required": [],
        },
        "description": "Mouse down action.",
    },
    {
        "type": "function",
        "name": "mouse_up",
        "parameters": {
            "type": "object",
            "properties": {
                "point": {
                    "type": "string",
                    "description": "Mouse up position. If not specified, default to execute on the current mouse position. The format is: <point>x y</point>",
                },
                "button": {
                    "type": "string",
                    "description": "Up button. Default to left.",
                    "enum": ["left", "right"],
                },
            },
            "required": [],
        },
        "description": "Mouse up action.",
    },
    {
        "type": "function",
        "name": "call_user",
        "parameters": {
            "type": "object",
            "properties": {
                "content": {
                    "type": "string",
                    "description": "Message or information displayed to the user to request their input, feedback, or guidance.",
                }
            },
            "required": [],
        },
        "description": "This function is used to interact with the user by displaying a message and requesting their input, feedback, or guidance.",
    },
    {
        "type": "function",
        "name": "wait",
        "parameters": {
            "type": "object",
            "properties": {"time": {"type": "integer", "description": "Wait time in seconds."}},
            "required": [],
        },
        "description": "Wait for a while.",
    },
    {
        "type": "function",
        "name": "drag",
        "parameters": {
            "type": "object",
            "properties": {
                "start_point": {
                    "type": "string",
                    "description": "Drag start point. The format is: <point>x y</point>",
                },
                "end_point": {
                    "type": "string",
                    "description": "Drag end point. The format is: <point>x y</point>",
                },
            },
            "required": ["start_point", "end_point"],
        },
        "description": "Mouse left button drag action.",
    },
    {
        "type": "function",
        "name": "type",
        "parameters": {
            "type": "object",
            "properties": {
                "content": {
                    "type": "string",
                    "description": "Type content. If you want to submit your input, use \\n at the end of content.",
                }
            },
            "required": ["content"],
        },
        "description": "Type content.",
    },
    {
        "type": "function",
        "name": "take_screenshot",
        "parameters": {},
        "description": "Take screenshot.",
    },
]


def _format_tool_schemas_json_lines(schemas: List[Dict[str, Any]]) -> str:
    # Nicely formatted: pretty JSON with indentation, separated by blank lines
    return "\n\n".join(json.dumps(s, ensure_ascii=False, indent=2) for s in schemas) + "\n\n"


_PROMPT_PREFIX = (
    "You should begin by detailing the internal reasoning process, and then present the answer to the user. "
    "The reasoning process should be enclosed within <think_never_used_51bce0c785ca2f68081bfa7d91973934> "
    "</think_never_used_51bce0c785ca2f68081bfa7d91973934> tags, as follows:\n"
    "<think_never_used_51bce0c785ca2f68081bfa7d91973934> reasoning process here "
    "</think_never_used_51bce0c785ca2f68081bfa7d91973934> answer here.\n\n"
    "You have different modes of thinking:\n"
    "Unrestricted think mode: Engage in an internal thinking process with thorough reasoning and reflections. "
    "You have an unlimited budget for thinking tokens and can continue thinking until you fully solve the problem.\n"
    "Efficient think mode: Provide a concise internal thinking process with efficient reasoning and reflections. "
    "You don't have a strict token budget but be less verbose and more direct in your thinking.\n"
    "No think mode: Respond directly to the question without any internal reasoning process or extra thinking tokens. "
    "Still follow the template with the minimum required thinking tokens to justify the answer.\n"
    "Budgeted think mode: Limit your internal reasoning and reflections to stay within the specified token budget\n\n"
    "Based on the complexity of the problem, select the appropriate mode for reasoning among the provided options listed below.\n\n"
    "Provided Mode(s):\nEfficient think.\n\n"
    "You are provided with a task description, a history of previous actions, and corresponding screenshots. "
    "Your goal is to perform the next action to complete the task. "
    "If performing the same action multiple times results in a static screen with no changes, attempt a modified or alternative action.\n\n"
    "## Function Definition\n\n"
    "- You have access to the following functions:\n\n"
)

_PROMPT_SUFFIX = (
    "- To call a function, use the following structure without any suffix:\n\n"
    "<gui_think> reasoning process </gui_think>\n"
    "<seed:tool_call><function=example_function_name><parameter=example_parameter_1>value_1</parameter>"
    "<parameter=example_parameter_2>multiline...\n</parameter></function></seed:tool_call>\n\n"
    "## Important Notes\n"
    "- Function calls must begin with <function= and end with </function>.\n"
    "- All required parameters must be explicitly provided.\n"
    "\n## Additional Notes\n"
    "- You can execute multiple actions within a single tool call. For example:\n"
    "<seed:tool_call><function=example_function_1><parameter=example_parameter_1>value_1</parameter><parameter=example_parameter_2>\n"
    "This is the value for the second parameter\nthat can span\nmultiple lines\n"
    "</parameter></function><function=example_function_2><parameter=example_parameter_3>value_4</parameter></function></seed:tool_call>"
)


SYSTEM_PROMPT = _PROMPT_PREFIX + _format_tool_schemas_json_lines(TOOL_SCHEMAS) + _PROMPT_SUFFIX


def _extract_function_schemas_from_tools(
    tools: Optional[List[Dict[str, Any]]],
) -> List[Dict[str, Any]]:
    schemas: List[Dict[str, Any]] = []
    if not tools:
        return schemas
    for t in tools:
        if t.get("type") == "function":
            fn = t.get("function", {})
            name = fn.get("name")
            params = fn.get("parameters", {})
            desc = fn.get("description", "")
            if name:
                schemas.append(
                    {
                        "type": "function",
                        "name": name,
                        "parameters": params if isinstance(params, dict) else {},
                        "description": desc,
                    }
                )
    return schemas


def _parse_seed_tool_calls(text: str) -> List[Dict[str, Any]]:
    """Parse <seed:tool_call> blocks into a list of {function, parameters} dicts.
    Also captures optional <gui_think>...</gui_think> as reasoning.
    """
    actions: List[Dict[str, Any]] = []
    if not text:
        return actions

    # Extract reasoning if present
    reasoning_text = None
    think_match = re.search(r"<gui_think>([\s\S]*?)</gui_think>", text)
    if think_match:
        reasoning_text = think_match.group(1).strip()

    # Iterate each seed tool_call block
    for block in re.finditer(r"<seed:tool_call>([\s\S]*?)</seed:tool_call>", text):
        content = block.group(1)
        # One or multiple <function=...>...</function> inside
        for fmatch in re.finditer(r"<function=([\w_]+)>([\s\S]*?)</function>", content):
            fname = fmatch.group(1)
            inner = fmatch.group(2)
            params: Dict[str, str] = {}
            for pmatch in re.finditer(r"<parameter=([\w_]+)>([\s\S]*?)</parameter>", inner):
                pname = pmatch.group(1)
                pval = pmatch.group(2).strip()
                params[pname] = pval
            actions.append({"function": fname, "parameters": params})

    # If we have a global reasoning and at least one action, attach it to first
    if reasoning_text and actions:
        actions[0]["reasoning"] = reasoning_text
    elif reasoning_text:
        actions.append({"function": "reasoning", "parameters": {"content": reasoning_text}})

    return actions


def _normalize_xy_to_uitars(x: int, y: int, width: int, height: int) -> Tuple[int, int]:
    width = max(1, int(width))
    height = max(1, int(height))
    nx = max(0, min(1000, int(round((x / width) * 1000))))
    ny = max(0, min(1000, int(round((y / height) * 1000))))
    return nx, ny


def _denormalize_xy_from_uitars(nx: float, ny: float, width: int, height: int) -> Tuple[int, int]:
    width = max(1, int(width))
    height = max(1, int(height))
    x = int(round((nx / 1000.0) * width))
    y = int(round((ny / 1000.0) * height))
    return x, y


def _map_computer_action_to_function(
    action: Dict[str, Any], width: int, height: int
) -> Optional[Dict[str, Any]]:
    """Map a computer action item to a UITARS function + parameters dict of strings.
    Returns dict like {"function": name, "parameters": {..}} or None if unknown.
    """
    atype = action.get("type") or action.get("action")
    if atype == "click":
        x, y = action.get("x"), action.get("y")
        btn = action.get("button", "left")
        if x is None or y is None:
            return None
        nx, ny = _normalize_xy_to_uitars(int(x), int(y), width, height)
        if btn == "right":
            return {
                "function": "right_single",
                "parameters": {"point": f"<point>{nx} {ny}</point>"},
            }
        return {"function": "click", "parameters": {"point": f"<point>{nx} {ny}</point>"}}
    if atype == "double_click":
        x, y = action.get("x"), action.get("y")
        if x is None or y is None:
            return None
        nx, ny = _normalize_xy_to_uitars(int(x), int(y), width, height)
        return {"function": "left_double", "parameters": {"point": f"<point>{nx} {ny}</point>"}}
    if atype == "move":
        x, y = action.get("x"), action.get("y")
        if x is None or y is None:
            return None
        nx, ny = _normalize_xy_to_uitars(int(x), int(y), width, height)
        return {"function": "move_to", "parameters": {"point": f"<point>{nx} {ny}</point>"}}
    if atype == "keypress":
        keys = action.get("keys", [])
        if isinstance(keys, list) and keys:
            if len(keys) == 1:
                return {"function": "press", "parameters": {"key": keys[0]}}
            else:
                return {"function": "hotkey", "parameters": {"key": " ".join(keys)}}
        return None
    if atype == "type":
        text = action.get("text", "")
        return {"function": "type", "parameters": {"content": text}}
    if atype == "scroll":
        x, y = action.get("x", 512), action.get("y", 512)
        nx, ny = _normalize_xy_to_uitars(int(x), int(y), width, height)
        sx, sy = action.get("scroll_x", 0), action.get("scroll_y", 0)
        # Our parser used positive sy for up
        direction = (
            "up"
            if sy and sy > 0
            else (
                "down"
                if sy and sy < 0
                else ("right" if sx and sx > 0 else ("left" if sx and sx < 0 else "down"))
            )
        )
        return {
            "function": "scroll",
            "parameters": {"direction": direction, "point": f"<point>{nx} {ny}</point>"},
        }
    if atype == "drag":
        path = action.get("path", [])
        if isinstance(path, list) and len(path) >= 2:
            sx, sy = path[0].get("x"), path[0].get("y")
            ex, ey = path[-1].get("x"), path[-1].get("y")
            if sx is None or sy is None or ex is None or ey is None:
                return None
            nsx, nsy = _normalize_xy_to_uitars(int(sx), int(sy), width, height)
            nex, ney = _normalize_xy_to_uitars(int(ex), int(ey), width, height)
            return {
                "function": "drag",
                "parameters": {
                    "start_point": f"<point>{nsx} {nsy}</point>",
                    "end_point": f"<point>{nex} {ney}</point>",
                },
            }
        return None
    if atype == "wait":
        return {"function": "wait", "parameters": {}}
    if atype == "screenshot":
        return {"function": "take_screenshot", "parameters": {}}
    # Fallback unknown
    return None


def _to_uitars_messages(
    messages: List[Dict[str, Any]], width: int, height: int
) -> List[Dict[str, Any]]:
    """Convert responses items into completion messages tailored for UI-TARS.

    - User content is passed through similar to convert_responses_items_to_completion_messages
    - Assistant/tool history is rendered as text with <gui_think> and <seed:tool_call> blocks
    """
    uitars_messages: List[Dict[str, Any]] = []

    def flush_seed_block(pending_think: Optional[str], pending_functions: List[Dict[str, Any]]):
        if not pending_think and not pending_functions:
            return
        parts: List[str] = []
        if pending_think:
            parts.append(f"<gui_think> {pending_think} </gui_think>")
        if pending_functions:
            inner = []
            for f in pending_functions:
                fname = f["function"]
                params = f.get("parameters", {})
                param_blocks = []
                for k, v in params.items():
                    param_blocks.append(f"<parameter={k}>{v}</parameter>")
                inner.append(f"<function={fname}>{''.join(param_blocks)}</function>")
            parts.append(f"<seed:tool_call>{''.join(inner)}</seed:tool_call>")
        uitars_messages.append({"role": "assistant", "content": "".join(parts)})

    # Accumulators for a single assistant seed block
    pending_think: Optional[str] = None
    pending_functions: List[Dict[str, Any]] = []

    for msg in messages:
        mtype = msg.get("type")
        role = msg.get("role")

        # On any user message, flush current assistant block
        if role == "user" or mtype == "user":
            flush_seed_block(pending_think, pending_functions)
            pending_think, pending_functions = None, []

            content = msg.get("content", "")
            if isinstance(content, list):
                completion_content = []
                for item in content:
                    if item.get("type") == "input_image":
                        completion_content.append(
                            {"type": "image_url", "image_url": {"url": item.get("image_url")}}
                        )
                    elif item.get("type") in ("input_text", "text"):
                        completion_content.append({"type": "text", "text": item.get("text")})
                uitars_messages.append({"role": "user", "content": completion_content})
            elif isinstance(content, str):
                uitars_messages.append({"role": "user", "content": content})
            continue

        # Reasoning item
        if mtype == "reasoning":
            # Responses reasoning stores summary list
            summary = msg.get("summary", [])
            texts = [
                s.get("text", "")
                for s in summary
                if isinstance(s, dict) and s.get("type") == "summary_text"
            ]
            if texts:
                pending_think = "\n".join([t for t in texts if t])
            continue

        # Computer/tool calls -> map to functions
        if mtype == "computer_call":
            f = _map_computer_action_to_function(msg.get("action", {}), width, height)
            if f:
                pending_functions.append(f)
            continue
        if mtype == "function_call":
            # Include custom tools as-is
            name = msg.get("name")
            try:
                args_obj = json.loads(msg.get("arguments", "{}"))
            except json.JSONDecodeError:
                args_obj = {}
            # Ensure string values
            params = {k: (str(v) if not isinstance(v, str) else v) for k, v in args_obj.items()}
            pending_functions.append({"function": name, "parameters": params})
            continue

        # If assistant message text is given, flush current block and add as plain assistant text
        if role == "assistant" or mtype == "message":
            flush_seed_block(pending_think, pending_functions)
            pending_think, pending_functions = None, []
            content = msg.get("content", [])
            if isinstance(content, list):
                texts = [
                    c.get("text", "")
                    for c in content
                    if isinstance(c, dict) and c.get("type") in ("output_text", "text")
                ]
                if texts:
                    uitars_messages.append(
                        {"role": "assistant", "content": "\n".join([t for t in texts if t])}
                    )
            elif isinstance(content, str) and content:
                uitars_messages.append({"role": "assistant", "content": content})
            continue

        # On outputs, flush pending assistant block and send outputs as user messages
        if mtype in ("function_call_output", "computer_call_output"):
            flush_seed_block(pending_think, pending_functions)
            pending_think, pending_functions = None, []
            output = msg.get("output")
            if isinstance(output, dict) and output.get("type") == "input_image":
                img_url = output.get("image_url")
                if img_url:
                    uitars_messages.append(
                        {
                            "role": "user",
                            "content": [
                                {"type": "image_url", "image_url": {"url": img_url}},
                            ],
                        }
                    )
            elif isinstance(output, str):
                uitars_messages.append({"role": "user", "content": output})
            else:
                # Fallback stringify
                uitars_messages.append({"role": "user", "content": json.dumps(output)})
            continue

    # Flush any remaining pending seed block
    flush_seed_block(pending_think, pending_functions)

    return uitars_messages


def _to_response_items(
    actions: List[Dict[str, Any]],
    tool_names: Optional[set[str]] = None,
    width: Optional[int] = None,
    height: Optional[int] = None,
) -> List[Any]:
    """Map parsed actions into Responses items (computer actions + optional reasoning)."""
    items: List[Any] = []
    tool_names = tool_names or set()

    # Optional top-level reasoning attached to first
    if actions and actions[0].get("reasoning"):
        items.append(make_reasoning_item(actions[0]["reasoning"]))

    # Dimensions default
    w = int(width) if width else 1024
    h = int(height) if height else 768

    for a in actions:
        fn = a.get("function")
        params = a.get("parameters", {})
        if fn == "reasoning":
            items.append(make_reasoning_item(params.get("content", "")))
        elif fn in ("click", "left_double", "right_single"):
            # params.point is like: <point>x y</point> or plain "x y"
            point = params.get("point", "").strip()
            m = re.search(r"([\-\d\.]+)\s+([\-\d\.]+)", point)
            if not m:
                continue
            nx = float(m.group(1))
            ny = float(m.group(2))
            x, y = _denormalize_xy_from_uitars(nx, ny, w, h)
            if fn == "left_double":
                items.append(make_double_click_item(x, y))
            elif fn == "right_single":
                items.append(make_click_item(x, y, "right"))
            else:
                items.append(make_click_item(x, y, "left"))
        elif fn == "move_to":
            point = params.get("point", "").strip()
            m = re.search(r"([\-\d\.]+)\s+([\-\d\.]+)", point)
            if not m:
                continue
            nx = float(m.group(1))
            ny = float(m.group(2))
            x, y = _denormalize_xy_from_uitars(nx, ny, w, h)
            items.append(make_move_item(x, y))
        elif fn == "drag":
            sp = params.get("start_point", "").strip()
            ep = params.get("end_point", "").strip()
            ms = re.search(r"([\-\d\.]+)\s+([\-\d\.]+)", sp)
            me = re.search(r"([\-\d\.]+)\s+([\-\d\.]+)", ep)
            if not (ms and me):
                continue
            nsx, nsy = float(ms.group(1)), float(ms.group(2))
            nex, ney = float(me.group(1)), float(me.group(2))
            sx, sy = _denormalize_xy_from_uitars(nsx, nsy, w, h)
            ex, ey = _denormalize_xy_from_uitars(nex, ney, w, h)
            items.append(make_drag_item([{"x": sx, "y": sy}, {"x": ex, "y": ey}]))
        elif fn == "hotkey":
            key = params.get("key", "")
            keys = key.split()
            if keys:
                items.append(make_keypress_item(keys))
        elif fn == "press":
            key = params.get("key", "")
            if key:
                items.append(make_keypress_item([key]))
        elif fn == "type":
            content = params.get("content", "")
            items.append(make_type_item(content))
        elif fn == "scroll":
            # direction: up/down/left/right. Point optional
            direction = params.get("direction", "down").lower()
            point = params.get("point", "")
            m = re.search(r"([\-\d\.]+)\s+([\-\d\.]+)", point)
            if m:
                nx = float(m.group(1))
                ny = float(m.group(2))
                x, y = _denormalize_xy_from_uitars(nx, ny, w, h)
            else:
                x, y = _denormalize_xy_from_uitars(500.0, 500.0, w, h)
            dy = 5 if direction == "up" else -5
            dx = 5 if direction == "right" else (-5 if direction == "left" else 0)
            items.append(make_scroll_item(x, y, dx, dy))
        elif fn == "wait":
            items.append(make_wait_item())
        elif fn == "finished":
            content = params.get("content", "")
            items.append(make_output_text_item(content or "Task completed."))
            break
        elif fn == "take_screenshot":
            items.append(make_screenshot_item())
        elif fn == "open_computer":
            items.append(make_screenshot_item())
        else:
            # If this function name is present in provided tool schemas, emit function_call
            if fn in tool_names:
                # Convert simple string params into an arguments object
                # Parameters are strings; pass through as-is
                items.append(make_function_call_item(fn, params))
            else:
                # Unknown function -> surface as assistant text
                items.append(make_output_text_item(f"Unknown action: {fn} {params}"))

    return items


@register_agent(models=r"(?i).*ui-?tars-?2.*")
class UITARS2Config:
    async def predict_step(
        self,
        messages: List[Dict[str, Any]],
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        use_prompt_caching: Optional[bool] = False,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs,
    ) -> Dict[str, Any]:
        # Determine screen dimensions (prefer computer_handler, fallback to last screenshot)
        width: Optional[int] = None
        height: Optional[int] = None
        if computer_handler is not None and hasattr(computer_handler, "get_dimensions"):
            try:
                dims = await computer_handler.get_dimensions()  # type: ignore
                if isinstance(dims, (list, tuple)) and len(dims) == 2:
                    width, height = int(dims[0]), int(dims[1])
            except Exception:
                pass

        if width is None or height is None:
            try:
                last_out = get_last_computer_call_output(messages)  # type: ignore
                if last_out:
                    image_url = last_out.get("output", {}).get("image_url", "")
                    if image_url:
                        b64 = image_url.split(",")[-1]
                        img_bytes = base64.b64decode(b64)
                        if Image is not None:
                            img = Image.open(io.BytesIO(img_bytes))
                            width, height = img.size
            except Exception:
                pass

        if width is None or height is None:
            width, height = 1024, 768

        # Convert Responses items to UI-TARS style messages with <seed:tool_call> history
        completion_messages = _to_uitars_messages(messages, width, height)

        # Build dynamic system prompt by concatenating built-in schemas and provided function tools
        provided_fn_schemas = _extract_function_schemas_from_tools(tools)
        combined_schemas = (
            TOOL_SCHEMAS + provided_fn_schemas if provided_fn_schemas else TOOL_SCHEMAS
        )
        dynamic_system_prompt = (
            _PROMPT_PREFIX + _format_tool_schemas_json_lines(combined_schemas) + _PROMPT_SUFFIX
        )

        # Prepend system prompt (based on training prompts + provided tools)
        litellm_messages: List[Dict[str, Any]] = [
            {"role": "system", "content": dynamic_system_prompt},
        ]
        litellm_messages.extend(completion_messages)

        api_kwargs: Dict[str, Any] = {
            "model": model,
            "messages": litellm_messages,
            "max_retries": max_retries,
            "stream": stream,
            **{k: v for k, v in kwargs.items()},
        }
        if use_prompt_caching:
            api_kwargs["use_prompt_caching"] = use_prompt_caching

        if _on_api_start:
            await _on_api_start(api_kwargs)

        response = await litellm.acompletion(**api_kwargs)

        if _on_api_end:
            await _on_api_end(api_kwargs, response)

        usage = {
            **LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage(  # type: ignore
                response.usage
            ).model_dump(),
            "response_cost": response._hidden_params.get("response_cost", 0.0),
        }
        if _on_usage:
            await _on_usage(usage)

        # Extract text content (first choice)
        response_dict = response.model_dump()  # type: ignore
        content_text = ""
        choices = response_dict.get("choices", [])
        if choices:
            msg = choices[0].get("message", {})
            # message.content may be string or array; gather text pieces
            mc = msg.get("content")
            if isinstance(mc, str):
                content_text = mc
            elif isinstance(mc, list):
                parts = []
                for part in mc:
                    if isinstance(part, dict) and part.get("type") == "text":
                        parts.append(part.get("text", ""))
                content_text = "\n".join([p for p in parts if p])

        # Parse the seed tool calls and map to response items
        actions = _parse_seed_tool_calls(content_text)
        # Build set of tool names from provided tools to emit function_call items
        tool_names: set[str] = set()
        for s in provided_fn_schemas:
            name = s.get("name")
            if isinstance(name, str):
                tool_names.add(name)
        output_items = _to_response_items(actions, tool_names, width, height)

        return {"output": output_items, "usage": usage}

    def get_capabilities(self) -> List[AgentCapability]:
        return ["step"]

    async def predict_click(
        self, model: str, image_b64: str, instruction: str, **kwargs
    ) -> Optional[Tuple[int, int]]:
        """Predict a single click coordinate using a minimal prompt with a click tool.

        This sends the current screenshot and instruction, asking the model to
        output a click action in the form:
            Action: click(point='(x,y)')
        """
        # Minimal grounding-style prompt
        system_text = (
            "You are a GUI agent. Given the instruction, return a single action on the current screen.\n\n"
            "## Output Format\n\n"
            "Action: click(point='(x,y)')\n\n"
            "## User Instruction\n"
            f"{instruction}"
        )

        # Build messages with image
        litellm_messages: List[Dict[str, Any]] = [
            {"role": "system", "content": system_text},
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "Please return a single click action."},
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:image/png;base64,{image_b64}"},
                    },
                ],
            },
        ]

        api_kwargs: Dict[str, Any] = {
            "model": model,
            "messages": litellm_messages,
            "max_tokens": kwargs.get("max_tokens", 512),
            "temperature": kwargs.get("temperature", 0.0),
            "do_sample": kwargs.get("temperature", 0.0) > 0.0,
        }
        api_kwargs.update(
            {k: v for k, v in (kwargs or {}).items() if k not in ["max_tokens", "temperature"]}
        )

        response = await litellm.acompletion(**api_kwargs)
        # Extract response content
        response_dict = response.model_dump()  # type: ignore
        choices = response_dict.get("choices", [])
        if not choices:
            return None
        msg = choices[0].get("message", {})
        content_text = msg.get("content", "")
        if isinstance(content_text, list):
            text_parts = [
                p.get("text", "")
                for p in content_text
                if isinstance(p, dict) and p.get("type") == "text"
            ]
            content_text = "\n".join([t for t in text_parts if t])
        if not isinstance(content_text, str):
            return None

        # Parse coordinates
        # Pattern for click(point='(x,y)') or click(start_box='(x,y)')
        patterns = [
            r"click\(point='\((\d+),(\d+)\)'\)",
            r"click\((?:start_box|point)='\((\d+),(\d+)\)'\)",
        ]
        for pat in patterns:
            m = re.search(pat, content_text)
            if m:
                try:
                    x, y = int(m.group(1)), int(m.group(2))
                    return (x, y)
                except Exception:
                    pass
        return None

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/ui/gradio/ui_components.py:
--------------------------------------------------------------------------------

```python
"""
UI Components for the Gradio interface
"""

import asyncio
import json
import logging
import os
import platform
from pathlib import Path
from typing import Any, Dict, List, Optional, cast

import gradio as gr
from gradio.components.chatbot import MetadataDict

from .app import (
    create_agent,
    get_model_string,
    get_ollama_models,
    global_agent,
    global_computer,
    load_settings,
    save_settings,
)

# Global messages array to maintain conversation history
global_messages = []


def create_gradio_ui() -> gr.Blocks:
    """Create a Gradio UI for the Computer-Use Agent."""

    # Load settings
    saved_settings = load_settings()

    # Check for API keys
    openai_api_key = os.environ.get("OPENAI_API_KEY", "")
    anthropic_api_key = os.environ.get("ANTHROPIC_API_KEY", "")
    cua_api_key = os.environ.get("CUA_API_KEY", "")

    # Model choices
    openai_models = ["OpenAI: Computer-Use Preview"]
    anthropic_models = [
        "Anthropic: Claude 4 Opus (20250514)",
        "Anthropic: Claude 4 Sonnet (20250514)",
        "Anthropic: Claude 3.7 Sonnet (20250219)",
    ]
    omni_models = [
        "OMNI: OpenAI GPT-4o",
        "OMNI: OpenAI GPT-4o mini",
        "OMNI: Claude 3.7 Sonnet (20250219)",
    ]

    # Check if API keys are available
    has_openai_key = bool(openai_api_key)
    has_anthropic_key = bool(anthropic_api_key)
    has_cua_key = bool(cua_api_key)

    # Get Ollama models for OMNI
    ollama_models = get_ollama_models()
    if ollama_models:
        omni_models += ollama_models

    # Detect platform
    is_mac = platform.system().lower() == "darwin"

    # Format model choices
    provider_to_models = {
        "OPENAI": openai_models,
        "ANTHROPIC": anthropic_models,
        "OMNI": omni_models + ["Custom model (OpenAI compatible API)", "Custom model (ollama)"],
        "UITARS": (
            [
                "huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B",
            ]
            if is_mac
            else []
        )
        + ["Custom model (OpenAI compatible API)"],
    }

    # Apply saved settings
    initial_loop = saved_settings.get("agent_loop", "OMNI")
    available_models_for_loop = provider_to_models.get(initial_loop, [])
    saved_model_choice = saved_settings.get("model_choice")
    if saved_model_choice and saved_model_choice in available_models_for_loop:
        initial_model = saved_model_choice
    else:
        if initial_loop == "OPENAI":
            initial_model = openai_models[0] if openai_models else "No models available"
        elif initial_loop == "ANTHROPIC":
            initial_model = anthropic_models[0] if anthropic_models else "No models available"
        else:  # OMNI
            initial_model = (
                omni_models[0] if omni_models else "Custom model (OpenAI compatible API)"
            )

    initial_custom_model = saved_settings.get("custom_model", "Qwen2.5-VL-7B-Instruct")
    initial_provider_base_url = saved_settings.get("provider_base_url", "http://localhost:1234/v1")
    initial_save_trajectory = saved_settings.get("save_trajectory", True)
    initial_recent_images = saved_settings.get("recent_images", 3)

    # Example prompts
    example_messages = [
        "Create a Python virtual environment, install pandas and matplotlib, then plot stock data",
        "Open a PDF in Preview, add annotations, and save it as a compressed version",
        "Open Safari, search for 'macOS automation tools', and save the first three results as bookmarks",
        "Configure SSH keys and set up a connection to a remote server",
    ]

    def generate_python_code(
        agent_loop_choice,
        model_name,
        tasks,
        recent_images=3,
        save_trajectory=True,
        computer_os="linux",
        computer_provider="cloud",
        container_name="",
        cua_cloud_api_key="",
        max_budget=None,
    ):
        """Generate Python code for the current configuration and tasks."""
        tasks_str = ""
        for task in tasks:
            if task and task.strip():
                tasks_str += f'            "{task}",\n'

        model_string = get_model_string(model_name, agent_loop_choice)

        computer_args = []
        if computer_os != "macos":
            computer_args.append(f'os_type="{computer_os}"')
        if computer_provider != "lume":
            computer_args.append(f'provider_type="{computer_provider}"')
        if container_name:
            computer_args.append(f'name="{container_name}"')
        if cua_cloud_api_key:
            computer_args.append(f'api_key="{cua_cloud_api_key}"')

        computer_args_str = ", ".join(computer_args)
        if computer_args_str:
            computer_args_str = f"({computer_args_str})"
        else:
            computer_args_str = "()"

        code = f"""import asyncio
from computer import Computer
from agent import ComputerAgent

async def main():
    async with Computer{computer_args_str} as computer:
        agent = ComputerAgent(
            model="{model_string}",
            tools=[computer],
            only_n_most_recent_images={recent_images},"""

        if save_trajectory:
            code += """
            trajectory_dir="trajectories","""

        if max_budget:
            code += f"""
            max_trajectory_budget={{"max_budget": {max_budget}, "raise_error": True}},"""

        code += """
        )
        """

        if tasks_str:
            code += f"""
        # Prompts for the computer-use agent
        tasks = [
{tasks_str.rstrip()}
        ]

        for task in tasks:
            print(f"Executing task: {{task}}")
            messages = [{{"role": "user", "content": task}}]
            async for result in agent.run(messages):
                for item in result["output"]:
                    if item["type"] == "message":
                        print(item["content"][0]["text"])"""
        else:
            code += """
        # Execute a single task
        task = "Search for information about CUA on GitHub"
        print(f"Executing task: {task}")
        messages = [{"role": "user", "content": task}]
        async for result in agent.run(messages):
            for item in result["output"]:
                if item["type"] == "message":
                    print(item["content"][0]["text"])"""

        code += """

if __name__ == "__main__":
    asyncio.run(main())"""

        return code

    # Create the Gradio interface
    with gr.Blocks(title="Computer-Use Agent") as demo:
        with gr.Row():
            # Left column for settings
            with gr.Column(scale=1):
                # Logo
                gr.HTML(
                    """
                    <div style="display: flex; justify-content: center; margin-bottom: 0.5em">
                        <img alt="CUA Logo" style="width: 80px;"
                             src="https://github.com/trycua/cua/blob/main/img/logo_white.png?raw=true" />
                    </div>
                    """
                )

                # Python code accordion
                with gr.Accordion("Python Code", open=False):
                    code_display = gr.Code(
                        language="python",
                        value=generate_python_code(initial_loop, "gpt-4o", []),
                        interactive=False,
                    )

                with gr.Accordion("Computer Configuration", open=True):
                    is_windows = platform.system().lower() == "windows"
                    is_mac = platform.system().lower() == "darwin"

                    providers = ["cloud", "localhost", "docker"]
                    if is_mac:
                        providers += ["lume"]
                    if is_windows:
                        providers += ["winsandbox"]

                    # Remove unavailable options
                    # MacOS is unavailable if Lume is not available
                    # Windows is unavailable if Winsandbox is not available
                    # Linux is always available
                    # This should be removed once we support macOS and Windows on the cloud provider
                    computer_choices = ["macos", "linux", "windows"]
                    if not is_mac or "lume" not in providers:
                        computer_choices.remove("macos")
                    if not is_windows or "winsandbox" not in providers:
                        computer_choices.remove("windows")

                    computer_os = gr.Radio(
                        choices=computer_choices,
                        label="Operating System",
                        value=computer_choices[0],
                        info="Select the operating system for the computer",
                    )

                    computer_provider = gr.Radio(
                        choices=providers,
                        label="Provider",
                        value="lume" if is_mac else "cloud",
                        info="Select the computer provider",
                    )

                    container_name = gr.Textbox(
                        label="Container Name",
                        placeholder="Enter container name (optional)",
                        value=os.environ.get("CUA_CONTAINER_NAME", ""),
                        info="Optional name for the container",
                    )

                    cua_cloud_api_key = gr.Textbox(
                        label="CUA Cloud API Key",
                        placeholder="Enter your CUA Cloud API key",
                        value=os.environ.get("CUA_API_KEY", ""),
                        type="password",
                        info="Required for cloud provider",
                        visible=(not has_cua_key),
                    )

                with gr.Accordion("Agent Configuration", open=True):
                    agent_loop = gr.Dropdown(
                        choices=["OPENAI", "ANTHROPIC", "OMNI", "UITARS"],
                        label="Agent Loop",
                        value=initial_loop,
                        info="Select the agent loop provider",
                    )

                    # Model selection dropdowns
                    with gr.Group() as model_selection_group:
                        openai_model_choice = gr.Dropdown(
                            choices=openai_models,
                            label="OpenAI Model",
                            value=openai_models[0] if openai_models else "No models available",
                            info="Select OpenAI model",
                            interactive=True,
                            visible=(initial_loop == "OPENAI"),
                        )

                        anthropic_model_choice = gr.Dropdown(
                            choices=anthropic_models,
                            label="Anthropic Model",
                            value=(
                                anthropic_models[0] if anthropic_models else "No models available"
                            ),
                            info="Select Anthropic model",
                            interactive=True,
                            visible=(initial_loop == "ANTHROPIC"),
                        )

                        omni_model_choice = gr.Dropdown(
                            choices=omni_models
                            + ["Custom model (OpenAI compatible API)", "Custom model (ollama)"],
                            label="OMNI Model",
                            value=(
                                omni_models[0]
                                if omni_models
                                else "Custom model (OpenAI compatible API)"
                            ),
                            info="Select OMNI model or choose a custom model option",
                            interactive=True,
                            visible=(initial_loop == "OMNI"),
                        )

                        uitars_model_choice = gr.Dropdown(
                            choices=provider_to_models.get("UITARS", ["No models available"]),
                            label="UITARS Model",
                            value=(
                                provider_to_models.get("UITARS", ["No models available"])[0]
                                if provider_to_models.get("UITARS")
                                else "No models available"
                            ),
                            info="Select UITARS model",
                            interactive=True,
                            visible=(initial_loop == "UITARS"),
                        )

                        model_choice = gr.Textbox(visible=False)

                    # API key inputs
                    with gr.Group(
                        visible=not has_openai_key
                        and (initial_loop == "OPENAI" or initial_loop == "OMNI")
                    ) as openai_key_group:
                        openai_api_key_input = gr.Textbox(
                            label="OpenAI API Key",
                            placeholder="Enter your OpenAI API key",
                            value=os.environ.get("OPENAI_API_KEY", ""),
                            interactive=True,
                            type="password",
                            info="Required for OpenAI models",
                        )

                    with gr.Group(
                        visible=not has_anthropic_key
                        and (initial_loop == "ANTHROPIC" or initial_loop == "OMNI")
                    ) as anthropic_key_group:
                        anthropic_api_key_input = gr.Textbox(
                            label="Anthropic API Key",
                            placeholder="Enter your Anthropic API key",
                            value=os.environ.get("ANTHROPIC_API_KEY", ""),
                            interactive=True,
                            type="password",
                            info="Required for Anthropic models",
                        )

                    # API key handlers
                    def set_openai_api_key(key):
                        if key and key.strip():
                            os.environ["OPENAI_API_KEY"] = key.strip()
                            print("DEBUG - Set OpenAI API key environment variable")
                        return key

                    def set_anthropic_api_key(key):
                        if key and key.strip():
                            os.environ["ANTHROPIC_API_KEY"] = key.strip()
                            print("DEBUG - Set Anthropic API key environment variable")
                        return key

                    openai_api_key_input.change(
                        fn=set_openai_api_key,
                        inputs=[openai_api_key_input],
                        outputs=[openai_api_key_input],
                        queue=False,
                    )

                    anthropic_api_key_input.change(
                        fn=set_anthropic_api_key,
                        inputs=[anthropic_api_key_input],
                        outputs=[anthropic_api_key_input],
                        queue=False,
                    )

                    # UI update function
                    def update_ui(
                        loop=None,
                        openai_model=None,
                        anthropic_model=None,
                        omni_model=None,
                        uitars_model=None,
                    ):
                        loop = loop or agent_loop.value

                        model_value = None
                        if loop == "OPENAI" and openai_model:
                            model_value = openai_model
                        elif loop == "ANTHROPIC" and anthropic_model:
                            model_value = anthropic_model
                        elif loop == "OMNI" and omni_model:
                            model_value = omni_model
                        elif loop == "UITARS" and uitars_model:
                            model_value = uitars_model

                        openai_visible = loop == "OPENAI"
                        anthropic_visible = loop == "ANTHROPIC"
                        omni_visible = loop == "OMNI"
                        uitars_visible = loop == "UITARS"

                        show_openai_key = not has_openai_key and (
                            loop == "OPENAI"
                            or (
                                loop == "OMNI"
                                and model_value
                                and "OpenAI" in model_value
                                and "Custom" not in model_value
                            )
                        )
                        show_anthropic_key = not has_anthropic_key and (
                            loop == "ANTHROPIC"
                            or (
                                loop == "OMNI"
                                and model_value
                                and "Claude" in model_value
                                and "Custom" not in model_value
                            )
                        )

                        is_custom_openai_api = model_value == "Custom model (OpenAI compatible API)"
                        is_custom_ollama = model_value == "Custom model (ollama)"
                        is_any_custom = is_custom_openai_api or is_custom_ollama

                        model_choice_value = model_value if model_value else ""

                        return [
                            gr.update(visible=openai_visible),
                            gr.update(visible=anthropic_visible),
                            gr.update(visible=omni_visible),
                            gr.update(visible=uitars_visible),
                            gr.update(visible=show_openai_key),
                            gr.update(visible=show_anthropic_key),
                            gr.update(visible=is_any_custom),
                            gr.update(visible=is_custom_openai_api),
                            gr.update(visible=is_custom_openai_api),
                            gr.update(value=model_choice_value),
                        ]

                    # Custom model inputs
                    custom_model = gr.Textbox(
                        label="Custom Model Name",
                        placeholder="Enter custom model name (e.g., Qwen2.5-VL-7B-Instruct or llama3)",
                        value=initial_custom_model,
                        visible=(
                            initial_model == "Custom model (OpenAI compatible API)"
                            or initial_model == "Custom model (ollama)"
                        ),
                        interactive=True,
                    )

                    provider_base_url = gr.Textbox(
                        label="Provider Base URL",
                        placeholder="Enter provider base URL (e.g., http://localhost:1234/v1)",
                        value=initial_provider_base_url,
                        visible=(initial_model == "Custom model (OpenAI compatible API)"),
                        interactive=True,
                    )

                    provider_api_key = gr.Textbox(
                        label="Provider API Key",
                        placeholder="Enter provider API key (if required)",
                        value="",
                        visible=(initial_model == "Custom model (OpenAI compatible API)"),
                        interactive=True,
                        type="password",
                    )

                    # Provider visibility update function
                    def update_provider_visibility(provider):
                        """Update visibility of container name and API key based on selected provider."""
                        is_localhost = provider == "localhost"
                        return [
                            gr.update(visible=not is_localhost),  # container_name
                            gr.update(
                                visible=not is_localhost and not has_cua_key
                            ),  # cua_cloud_api_key
                        ]

                    # Connect provider change event
                    computer_provider.change(
                        fn=update_provider_visibility,
                        inputs=[computer_provider],
                        outputs=[container_name, cua_cloud_api_key],
                        queue=False,
                    )

                    # Connect UI update events
                    for dropdown in [
                        agent_loop,
                        omni_model_choice,
                        uitars_model_choice,
                        openai_model_choice,
                        anthropic_model_choice,
                    ]:
                        dropdown.change(
                            fn=update_ui,
                            inputs=[
                                agent_loop,
                                openai_model_choice,
                                anthropic_model_choice,
                                omni_model_choice,
                                uitars_model_choice,
                            ],
                            outputs=[
                                openai_model_choice,
                                anthropic_model_choice,
                                omni_model_choice,
                                uitars_model_choice,
                                openai_key_group,
                                anthropic_key_group,
                                custom_model,
                                provider_base_url,
                                provider_api_key,
                                model_choice,
                            ],
                            queue=False,
                        )

                    save_trajectory = gr.Checkbox(
                        label="Save Trajectory",
                        value=initial_save_trajectory,
                        info="Save the agent's trajectory for debugging",
                        interactive=True,
                    )

                    recent_images = gr.Slider(
                        label="Recent Images",
                        minimum=1,
                        maximum=10,
                        value=initial_recent_images,
                        step=1,
                        info="Number of recent images to keep in context",
                        interactive=True,
                    )

                    max_budget = gr.Number(
                        label="Max Budget ($)",
                        value=lambda: None,
                        minimum=-1,
                        maximum=100.0,
                        step=0.1,
                        info="Optional budget limit for trajectory (0 = no limit)",
                        interactive=True,
                    )

            # Right column for chat interface
            with gr.Column(scale=2):
                gr.Markdown(
                    "Ask me to perform tasks in a virtual environment.<br>Built with <a href='https://github.com/trycua/cua' target='_blank'>github.com/trycua/cua</a>."
                )

                chatbot_history = gr.Chatbot(type="messages")
                msg = gr.Textbox(placeholder="Ask me to perform tasks in a virtual environment")
                clear = gr.Button("Clear")
                cancel_button = gr.Button("Cancel", variant="stop")

                # Add examples
                example_group = gr.Examples(examples=example_messages, inputs=msg)

                # Chat submission function
                def chat_submit(message, history):
                    history.append(gr.ChatMessage(role="user", content=message))
                    return "", history

                # Cancel function
                async def cancel_agent_task(history):
                    global global_agent
                    if global_agent:
                        print("DEBUG - Cancelling agent task")
                        history.append(
                            gr.ChatMessage(
                                role="assistant",
                                content="Task cancelled by user",
                                metadata={"title": "❌ Cancelled"},
                            )
                        )
                    else:
                        history.append(
                            gr.ChatMessage(
                                role="assistant",
                                content="No active agent task to cancel",
                                metadata={"title": "ℹ️ Info"},
                            )
                        )
                    return history

                # Process response function
                async def process_response(
                    history,
                    openai_model_value,
                    anthropic_model_value,
                    omni_model_value,
                    uitars_model_value,
                    custom_model_value,
                    agent_loop_choice,
                    save_traj,
                    recent_imgs,
                    custom_url_value=None,
                    custom_api_key=None,
                    openai_key_input=None,
                    anthropic_key_input=None,
                    computer_os="linux",
                    computer_provider="cloud",
                    container_name="",
                    cua_cloud_api_key="",
                    max_budget_value=None,
                ):
                    if not history:
                        yield history
                        return

                    # Get the last user message
                    last_user_message = history[-1]["content"]

                    # Get the appropriate model value based on the agent loop
                    if agent_loop_choice == "OPENAI":
                        model_choice_value = openai_model_value
                    elif agent_loop_choice == "ANTHROPIC":
                        model_choice_value = anthropic_model_value
                    elif agent_loop_choice == "OMNI":
                        model_choice_value = omni_model_value
                    elif agent_loop_choice == "UITARS":
                        model_choice_value = uitars_model_value
                    else:
                        model_choice_value = "No models available"

                    # Determine if this is a custom model selection
                    is_custom_model_selected = model_choice_value in [
                        "Custom model (OpenAI compatible API)",
                        "Custom model (ollama)",
                    ]

                    # Determine the model name string to analyze
                    if is_custom_model_selected:
                        model_string_to_analyze = custom_model_value
                    else:
                        model_string_to_analyze = model_choice_value

                    try:
                        # Get the model string
                        model_string = get_model_string(model_string_to_analyze, agent_loop_choice)

                        # Set API keys if provided
                        if openai_key_input:
                            os.environ["OPENAI_API_KEY"] = openai_key_input
                        if anthropic_key_input:
                            os.environ["ANTHROPIC_API_KEY"] = anthropic_key_input
                        if cua_cloud_api_key:
                            os.environ["CUA_API_KEY"] = cua_cloud_api_key

                        # Save settings
                        current_settings = {
                            "agent_loop": agent_loop_choice,
                            "model_choice": model_choice_value,
                            "custom_model": custom_model_value,
                            "provider_base_url": custom_url_value,
                            "save_trajectory": save_traj,
                            "recent_images": recent_imgs,
                            "computer_os": computer_os,
                            "computer_provider": computer_provider,
                            "container_name": container_name,
                        }
                        save_settings(current_settings)

                        # Create agent
                        global_agent = create_agent(
                            model_string=model_string,
                            save_trajectory=save_traj,
                            only_n_most_recent_images=recent_imgs,
                            custom_model_name=(
                                custom_model_value if is_custom_model_selected else None
                            ),
                            computer_os=computer_os,
                            computer_provider=computer_provider,
                            computer_name=container_name,
                            computer_api_key=cua_cloud_api_key,
                            verbosity=logging.DEBUG,
                            max_trajectory_budget=(
                                max_budget_value
                                if max_budget_value and max_budget_value > 0
                                else None
                            ),
                        )

                        if global_agent is None:
                            history.append(
                                gr.ChatMessage(
                                    role="assistant",
                                    content="Failed to create agent. Check API keys and configuration.",
                                )
                            )
                            yield history
                            return

                        # Add user message to global history
                        global global_messages
                        global_messages.append({"role": "user", "content": last_user_message})

                        # Stream responses from the agent
                        async for result in global_agent.run(global_messages):
                            global_messages += result.get("output", [])
                            # print(f"DEBUG - Agent response ------- START")
                            # from pprint import pprint
                            # pprint(result)
                            # print(f"DEBUG - Agent response ------- END")

                            # Process the result output
                            for item in result.get("output", []):
                                if item.get("type") == "message":
                                    content = item.get("content", [])
                                    for content_part in content:
                                        if content_part.get("text"):
                                            history.append(
                                                gr.ChatMessage(
                                                    role=item.get("role", "assistant"),
                                                    content=content_part.get("text", ""),
                                                    metadata=content_part.get("metadata", {}),
                                                )
                                            )
                                elif item.get("type") == "computer_call":
                                    action = item.get("action", {})
                                    action_type = action.get("type", "")
                                    if action_type:
                                        action_title = f"🛠️ Performing {action_type}"
                                        if action.get("x") and action.get("y"):
                                            action_title += f" at ({action['x']}, {action['y']})"
                                        history.append(
                                            gr.ChatMessage(
                                                role="assistant",
                                                content=f"```json\n{json.dumps(action)}\n```",
                                                metadata={"title": action_title},
                                            )
                                        )
                                elif item.get("type") == "function_call":
                                    function_name = item.get("name", "")
                                    arguments = item.get("arguments", "{}")
                                    history.append(
                                        gr.ChatMessage(
                                            role="assistant",
                                            content=f"🔧 Calling function: {function_name}\n```json\n{arguments}\n```",
                                            metadata={"title": f"Function Call: {function_name}"},
                                        )
                                    )
                                elif item.get("type") == "function_call_output":
                                    output = item.get("output", "")
                                    history.append(
                                        gr.ChatMessage(
                                            role="assistant",
                                            content=f"📤 Function output:\n```\n{output}\n```",
                                            metadata={"title": "Function Output"},
                                        )
                                    )
                                elif item.get("type") == "computer_call_output":
                                    output = item.get("output", {}).get("image_url", "")
                                    image_markdown = f"![Computer output]({output})"
                                    history.append(
                                        gr.ChatMessage(
                                            role="assistant",
                                            content=image_markdown,
                                            metadata={"title": "🖥️ Computer Output"},
                                        )
                                    )

                            yield history

                    except Exception as e:
                        import traceback

                        traceback.print_exc()
                        history.append(gr.ChatMessage(role="assistant", content=f"Error: {str(e)}"))
                        yield history

                # Connect the submit button
                submit_event = msg.submit(
                    fn=chat_submit,
                    inputs=[msg, chatbot_history],
                    outputs=[msg, chatbot_history],
                    queue=False,
                ).then(
                    fn=process_response,
                    inputs=[
                        chatbot_history,
                        openai_model_choice,
                        anthropic_model_choice,
                        omni_model_choice,
                        uitars_model_choice,
                        custom_model,
                        agent_loop,
                        save_trajectory,
                        recent_images,
                        provider_base_url,
                        provider_api_key,
                        openai_api_key_input,
                        anthropic_api_key_input,
                        computer_os,
                        computer_provider,
                        container_name,
                        cua_cloud_api_key,
                        max_budget,
                    ],
                    outputs=[chatbot_history],
                    queue=True,
                )

                # Clear button functionality
                def clear_chat():
                    global global_messages
                    global_messages.clear()
                    return None

                clear.click(clear_chat, None, chatbot_history, queue=False)

                # Connect cancel button
                cancel_button.click(
                    cancel_agent_task, [chatbot_history], [chatbot_history], queue=False
                )

                # Code display update function
                def update_code_display(
                    agent_loop,
                    model_choice_val,
                    custom_model_val,
                    chat_history,
                    recent_images_val,
                    save_trajectory_val,
                    computer_os,
                    computer_provider,
                    container_name,
                    cua_cloud_api_key,
                    max_budget_val,
                ):
                    messages = []
                    if chat_history:
                        for msg in chat_history:
                            if isinstance(msg, dict) and msg.get("role") == "user":
                                messages.append(msg.get("content", ""))

                    return generate_python_code(
                        agent_loop,
                        model_choice_val or custom_model_val or "gpt-4o",
                        messages,
                        recent_images_val,
                        save_trajectory_val,
                        computer_os,
                        computer_provider,
                        container_name,
                        cua_cloud_api_key,
                        max_budget_val,
                    )

                # Update code display when configuration changes
                for component in [
                    agent_loop,
                    model_choice,
                    custom_model,
                    chatbot_history,
                    recent_images,
                    save_trajectory,
                    computer_os,
                    computer_provider,
                    container_name,
                    cua_cloud_api_key,
                    max_budget,
                ]:
                    component.change(
                        update_code_display,
                        inputs=[
                            agent_loop,
                            model_choice,
                            custom_model,
                            chatbot_history,
                            recent_images,
                            save_trajectory,
                            computer_os,
                            computer_provider,
                            container_name,
                            cua_cloud_api_key,
                            max_budget,
                        ],
                        outputs=[code_display],
                    )

    return demo

```

--------------------------------------------------------------------------------
/libs/lume/src/LumeController.swift:
--------------------------------------------------------------------------------

```swift
import ArgumentParser
import Foundation
import Virtualization

// MARK: - Shared VM Manager

@MainActor
final class SharedVM {
    static let shared: SharedVM = SharedVM()
    private var runningVMs: [String: VM] = [:]

    private init() {}

    func getVM(name: String) -> VM? {
        return runningVMs[name]
    }

    func setVM(name: String, vm: VM) {
        runningVMs[name] = vm
    }

    func removeVM(name: String) {
        runningVMs.removeValue(forKey: name)
    }
}

/// Entrypoint for Commands and API server
final class LumeController {
    // MARK: - Properties

    let home: Home
    private let imageLoaderFactory: ImageLoaderFactory
    private let vmFactory: VMFactory

    // MARK: - Initialization

    init(
        home: Home = Home(),
        imageLoaderFactory: ImageLoaderFactory = DefaultImageLoaderFactory(),
        vmFactory: VMFactory = DefaultVMFactory()
    ) {
        self.home = home
        self.imageLoaderFactory = imageLoaderFactory
        self.vmFactory = vmFactory
    }

    // MARK: - Public VM Management Methods

    /// Lists all virtual machines in the system
    @MainActor
    public func list(storage: String? = nil) throws -> [VMDetails] {
        do {
            if let storage = storage {
                // If storage is specified, only return VMs from that location
                if storage.contains("/") || storage.contains("\\") {
                    // Direct path - check if it exists
                    if !FileManager.default.fileExists(atPath: storage) {
                        // Return empty array if the path doesn't exist
                        return []
                    }
                    
                    // Try to get all VMs from the specified path
                    // We need to check which subdirectories are valid VM dirs
                    let directoryURL = URL(fileURLWithPath: storage)
                    let contents = try FileManager.default.contentsOfDirectory(
                        at: directoryURL,
                        includingPropertiesForKeys: [.isDirectoryKey],
                        options: .skipsHiddenFiles
                    )
                    
                    let statuses = try contents.compactMap { subdir -> VMDetails? in
                        guard let isDirectory = try subdir.resourceValues(forKeys: [.isDirectoryKey]).isDirectory,
                              isDirectory else {
                            return nil
                        }
                        
                        let vmName = subdir.lastPathComponent
                        // Check if it's a valid VM directory
                        let vmDir = try home.getVMDirectoryFromPath(vmName, storagePath: storage)
                        if !vmDir.initialized() {
                            return nil
                        }
                        
                        do {
                            let vm = try self.get(name: vmName, storage: storage)
                            return vm.details
                        } catch {
                            // Skip invalid VM directories
                            return nil
                        }
                    }
                    return statuses
                } else {
                    // Named storage
                    let vmsWithLoc = try home.getAllVMDirectories()
                    let statuses = try vmsWithLoc.compactMap { vmWithLoc -> VMDetails? in
                        // Only include VMs from the specified location
                        if vmWithLoc.locationName != storage {
                            return nil
                        }
                        let vm = try self.get(
                            name: vmWithLoc.directory.name, storage: vmWithLoc.locationName)
                        return vm.details
                    }
                    return statuses
                }
            } else {
                // No storage filter - get all VMs
                let vmsWithLoc = try home.getAllVMDirectories()
                let statuses = try vmsWithLoc.compactMap { vmWithLoc -> VMDetails? in
                    let vm = try self.get(
                        name: vmWithLoc.directory.name, storage: vmWithLoc.locationName)
                    return vm.details
                }
                return statuses
            }
        } catch {
            Logger.error("Failed to list VMs", metadata: ["error": error.localizedDescription])
            throw error
        }
    }

    @MainActor
    public func clone(
        name: String, newName: String, sourceLocation: String? = nil, destLocation: String? = nil
    ) throws {
        let normalizedName = normalizeVMName(name: name)
        let normalizedNewName = normalizeVMName(name: newName)
        Logger.info(
            "Cloning VM",
            metadata: [
                "source": normalizedName,
                "destination": normalizedNewName,
                "sourceLocation": sourceLocation ?? "default",
                "destLocation": destLocation ?? "default",
            ])

        do {
            // Validate source VM exists
            _ = try self.validateVMExists(normalizedName, storage: sourceLocation)

            // Get the source VM and check if it's running
            let sourceVM = try get(name: normalizedName, storage: sourceLocation)
            if sourceVM.details.status == "running" {
                Logger.error("Cannot clone a running VM", metadata: ["source": normalizedName])
                throw VMError.alreadyRunning(normalizedName)
            }

            // Check if destination already exists
            do {
                let destDir = try home.getVMDirectory(normalizedNewName, storage: destLocation)
                if destDir.exists() {
                    Logger.error(
                        "Destination VM already exists",
                        metadata: ["destination": normalizedNewName])
                    throw HomeError.directoryAlreadyExists(path: destDir.dir.path)
                }
            } catch VMLocationError.locationNotFound {
                // Location not found is okay, we'll create it
            } catch VMError.notFound {
                // VM not found is okay, we'll create it
            }

            // Copy the VM directory
            try home.copyVMDirectory(
                from: normalizedName,
                to: normalizedNewName,
                sourceLocation: sourceLocation,
                destLocation: destLocation
            )

            // Update MAC address in the cloned VM to ensure uniqueness
            let clonedVM = try get(name: normalizedNewName, storage: destLocation)
            try clonedVM.setMacAddress(VZMACAddress.randomLocallyAdministered().string)

            // Update MAC Identifier in the cloned VM to ensure uniqueness
            try clonedVM.setMachineIdentifier(
                DarwinVirtualizationService.generateMachineIdentifier())

            Logger.info(
                "VM cloned successfully",
                metadata: ["source": normalizedName, "destination": normalizedNewName])
        } catch {
            Logger.error("Failed to clone VM", metadata: ["error": error.localizedDescription])
            throw error
        }
    }

    @MainActor
    public func get(name: String, storage: String? = nil) throws -> VM {
        let normalizedName = normalizeVMName(name: name)
        do {
            let vm: VM
            if let storagePath = storage, storagePath.contains("/") || storagePath.contains("\\") {
                // Storage is a direct path
                let vmDir = try home.getVMDirectoryFromPath(normalizedName, storagePath: storagePath)
                guard vmDir.initialized() else {
                    // Throw a specific error if the directory exists but isn't a valid VM
                    if vmDir.exists() {
                        throw VMError.notInitialized(normalizedName)
                    } else {
                        throw VMError.notFound(normalizedName)
                    }
                }
                // Pass the path as the storage context
                vm = try self.loadVM(vmDir: vmDir, storage: storagePath)
            } else {
                // Storage is nil or a named location
                let actualLocation = try self.validateVMExists(
                    normalizedName, storage: storage)

                let vmDir = try home.getVMDirectory(normalizedName, storage: actualLocation)
                // loadVM will re-check initialized, but good practice to keep validateVMExists result.
                vm = try self.loadVM(vmDir: vmDir, storage: actualLocation)
            }
            return vm
        } catch {
            Logger.error(
                "Failed to get VM",
                metadata: [
                    "vmName": normalizedName, "storage": storage ?? "default",
                    "error": error.localizedDescription,
                ])
            // Re-throw the original error to preserve its type
            throw error
        }
    }

    @MainActor
    public func create(
        name: String,
        os: String,
        diskSize: UInt64,
        cpuCount: Int,
        memorySize: UInt64,
        display: String,
        ipsw: String?,
        storage: String? = nil
    ) async throws {
        Logger.info(
            "Creating VM",
            metadata: [
                "name": name,
                "os": os,
                "location": storage ?? "default",
                "disk_size": "\(diskSize / 1024 / 1024)MB",
                "cpu_count": "\(cpuCount)",
                "memory_size": "\(memorySize / 1024 / 1024)MB",
                "display": display,
                "ipsw": ipsw ?? "none",
            ])

        do {
            try validateCreateParameters(name: name, os: os, ipsw: ipsw, storage: storage)

            let vm = try await createTempVMConfig(
                os: os,
                cpuCount: cpuCount,
                memorySize: memorySize,
                diskSize: diskSize,
                display: display
            )

            try await vm.setup(
                ipswPath: ipsw ?? "none",
                cpuCount: cpuCount,
                memorySize: memorySize,
                diskSize: diskSize,
                display: display
            )

            try vm.finalize(to: name, home: home, storage: storage)

            Logger.info("VM created successfully", metadata: ["name": name])
        } catch {
            Logger.error("Failed to create VM", metadata: ["error": error.localizedDescription])
            throw error
        }
    }

    @MainActor
    public func delete(name: String, storage: String? = nil) async throws {
        let normalizedName = normalizeVMName(name: name)
        Logger.info(
            "Deleting VM",
            metadata: [
                "name": normalizedName,
                "location": storage ?? "default",
            ])

        do {
            let vmDir: VMDirectory
            
            // Check if storage is a direct path
            if let storagePath = storage, storagePath.contains("/") || storagePath.contains("\\") {
                // Storage is a direct path
                vmDir = try home.getVMDirectoryFromPath(normalizedName, storagePath: storagePath)
                guard vmDir.initialized() else {
                    // Throw a specific error if the directory exists but isn't a valid VM
                    if vmDir.exists() {
                        throw VMError.notInitialized(normalizedName)
                    } else {
                        throw VMError.notFound(normalizedName)
                    }
                }
            } else {
                // Storage is nil or a named location
                let actualLocation = try self.validateVMExists(normalizedName, storage: storage)
                vmDir = try home.getVMDirectory(normalizedName, storage: actualLocation)
            }
            
            // Stop VM if it's running
            if SharedVM.shared.getVM(name: normalizedName) != nil {
                try await stopVM(name: normalizedName)
            }
            
            try vmDir.delete()
            
            Logger.info("VM deleted successfully", metadata: ["name": normalizedName])
            
        } catch {
            Logger.error("Failed to delete VM", metadata: ["error": error.localizedDescription])
            throw error
        }
    }

    // MARK: - VM Operations

    @MainActor
    public func updateSettings(
        name: String,
        cpu: Int? = nil,
        memory: UInt64? = nil,
        diskSize: UInt64? = nil,
        display: String? = nil,
        storage: String? = nil
    ) throws {
        let normalizedName = normalizeVMName(name: name)
        Logger.info(
            "Updating VM settings",
            metadata: [
                "name": normalizedName,
                "location": storage ?? "default",
                "cpu": cpu.map { "\($0)" } ?? "unchanged",
                "memory": memory.map { "\($0 / 1024 / 1024)MB" } ?? "unchanged",
                "disk_size": diskSize.map { "\($0 / 1024 / 1024)MB" } ?? "unchanged",
                "display": display ?? "unchanged",
            ])
        do {
            // Find the actual location of the VM
            let actualLocation = try self.validateVMExists(
                normalizedName, storage: storage)

            let vm = try get(name: normalizedName, storage: actualLocation)

            // Apply settings in order
            if let cpu = cpu {
                try vm.setCpuCount(cpu)
            }
            if let memory = memory {
                try vm.setMemorySize(memory)
            }
            if let diskSize = diskSize {
                try vm.setDiskSize(diskSize)
            }
            if let display = display {
                try vm.setDisplay(display)
            }

            Logger.info("VM settings updated successfully", metadata: ["name": normalizedName])
        } catch {
            Logger.error(
                "Failed to update VM settings", metadata: ["error": error.localizedDescription])
            throw error
        }
    }

    @MainActor
    public func stopVM(name: String, storage: String? = nil) async throws {
        let normalizedName = normalizeVMName(name: name)
        Logger.info("Stopping VM", metadata: ["name": normalizedName])

        do {
            // Find the actual location of the VM
            let actualLocation = try self.validateVMExists(
                normalizedName, storage: storage)

            // Try to get VM from cache first
            let vm: VM
            if let cachedVM = SharedVM.shared.getVM(name: normalizedName) {
                vm = cachedVM
            } else {
                vm = try get(name: normalizedName, storage: actualLocation)
            }

            try await vm.stop()
            // Remove VM from cache after stopping
            SharedVM.shared.removeVM(name: normalizedName)
            Logger.info("VM stopped successfully", metadata: ["name": normalizedName])
        } catch {
            // Clean up cache even if stop fails
            SharedVM.shared.removeVM(name: normalizedName)
            Logger.error("Failed to stop VM", metadata: ["error": error.localizedDescription])
            throw error
        }
    }

    @MainActor
    public func runVM(
        name: String,
        noDisplay: Bool = false,
        sharedDirectories: [SharedDirectory] = [],
        mount: Path? = nil,
        registry: String = "ghcr.io",
        organization: String = "trycua",
        vncPort: Int = 0,
        recoveryMode: Bool = false,
        storage: String? = nil,
        usbMassStoragePaths: [Path]? = nil
    ) async throws {
        let normalizedName = normalizeVMName(name: name)
        Logger.info(
            "Running VM",
            metadata: [
                "name": normalizedName,
                "no_display": "\(noDisplay)",
                "shared_directories":
                    "\(sharedDirectories.map( { $0.string } ).joined(separator: ", "))",
                "mount": mount?.path ?? "none",
                "vnc_port": "\(vncPort)",
                "recovery_mode": "\(recoveryMode)",
                "storage_param": storage ?? "default", // Log the original param
                "usb_storage_devices": "\(usbMassStoragePaths?.count ?? 0)",
            ])

        do {
            // Check if name is an image ref to auto-pull
            let components = normalizedName.split(separator: ":")
            if components.count == 2 { // Check if it looks like image:tag
                // Attempt to validate if VM exists first, suppressing the error
                // This avoids pulling if the VM already exists, even if name looks like an image ref
                let vmExists = (try? self.validateVMExists(normalizedName, storage: storage)) != nil
                if !vmExists {
                    Logger.info(
                        "VM not found, attempting to pull image based on name",
                        metadata: ["imageRef": normalizedName])
                    // Use the potentially new VM name derived from the image ref
                    let potentialVMName = String(components[0])
                    try await pullImage(
                        image: normalizedName, // Full image ref
                        name: potentialVMName, // Name derived from image
                        registry: registry,
                        organization: organization,
                        storage: storage
                    )
                    // Important: After pull, the effective name might have changed
                    // We proceed assuming the user wants to run the VM derived from image name
                    // normalizedName = potentialVMName // Re-assign normalizedName if pull logic creates it
                    // Note: Current pullImage doesn't return the final VM name, 
                    // so we assume it matches the name derived from the image.
                    // This might need refinement if pullImage behaviour changes.
                }
            }

            // Determine effective storage path or name AND get the VMDirectory
            let effectiveStorage: String?
            let vmDir: VMDirectory

            if let storagePath = storage, storagePath.contains("/") || storagePath.contains("\\") {
                // Storage is a direct path
                vmDir = try home.getVMDirectoryFromPath(normalizedName, storagePath: storagePath)
                guard vmDir.initialized() else {
                    if vmDir.exists() {
                        throw VMError.notInitialized(normalizedName)
                    } else {
                        throw VMError.notFound(normalizedName)
                    }
                }
                effectiveStorage = storagePath // Use the path string
                Logger.info("Using direct storage path", metadata: ["path": storagePath])
            } else {
                // Storage is nil or a named location - validate and get the actual name
                let actualLocationName = try validateVMExists(normalizedName, storage: storage)
                vmDir = try home.getVMDirectory(normalizedName, storage: actualLocationName) // Get VMDir for named location
                effectiveStorage = actualLocationName // Use the named location string
                Logger.info(
                    "Using named storage location",
                    metadata: [
                        "requested": storage ?? "default",
                        "actual": actualLocationName ?? "default",
                    ])
            }

            // Validate parameters using the located VMDirectory
            try validateRunParameters(
                vmDir: vmDir, // Pass vmDir
                sharedDirectories: sharedDirectories,
                mount: mount,
                usbMassStoragePaths: usbMassStoragePaths
            )

            // Load the VM directly using the located VMDirectory and storage context
            let vm = try self.loadVM(vmDir: vmDir, storage: effectiveStorage)

            SharedVM.shared.setVM(name: normalizedName, vm: vm)
            try await vm.run(
                noDisplay: noDisplay,
                sharedDirectories: sharedDirectories,
                mount: mount,
                vncPort: vncPort,
                recoveryMode: recoveryMode,
                usbMassStoragePaths: usbMassStoragePaths)
            Logger.info("VM started successfully", metadata: ["name": normalizedName])
        } catch {
            SharedVM.shared.removeVM(name: normalizedName)
            Logger.error("Failed to run VM", metadata: ["error": error.localizedDescription])
            throw error
        }
    }

    // MARK: - Image Management

    @MainActor
    public func getLatestIPSWURL() async throws -> URL {
        Logger.info("Fetching latest supported IPSW URL")

        do {
            let imageLoader = DarwinImageLoader()
            let url = try await imageLoader.fetchLatestSupportedURL()
            Logger.info("Found latest IPSW URL", metadata: ["url": url.absoluteString])
            return url
        } catch {
            Logger.error(
                "Failed to fetch IPSW URL", metadata: ["error": error.localizedDescription])
            throw error
        }
    }

    @MainActor
    public func pullImage(
        image: String,
        name: String?,
        registry: String,
        organization: String,
        storage: String? = nil
    ) async throws {
        do {
            // Convert non-sparse image to sparse version if needed
            var actualImage = image
            var actualName = name

            // Split the image to get name and tag for both sparse and non-sparse cases
            let components = image.split(separator: ":")
            guard components.count == 2 else {
                throw ValidationError("Invalid image format. Expected format: name:tag")
            }

            let originalName = String(components[0])
            let tag = String(components[1])

            // For consistent VM naming, strip "-sparse" suffix if present when no name provided
            let normalizedBaseName: String
            if originalName.hasSuffix("-sparse") {
                normalizedBaseName = String(originalName.dropLast(7))  // drop "-sparse"
            } else {
                normalizedBaseName = originalName
            }

            // Set default VM name if not provided
            if actualName == nil {
                actualName = "\(normalizedBaseName)_\(tag)"
            }

            // Convert non-sparse image to sparse version if needed
            if !image.contains("-sparse") {
                // Create sparse version of the image name
                actualImage = "\(originalName)-sparse:\(tag)"

                Logger.info(
                    "Converting to sparse image",
                    metadata: [
                        "original": image,
                        "sparse": actualImage,
                        "vm_name": actualName ?? "default",
                    ]
                )
            }

            let vmName = actualName ?? "default"  // Just use actualName as it's already normalized

            Logger.info(
                "Pulling image",
                metadata: [
                    "image": actualImage,
                    "name": vmName,
                    "registry": registry,
                    "organization": organization,
                    "location": storage ?? "default",
                ])

            try self.validatePullParameters(
                image: actualImage,
                name: vmName,
                registry: registry,
                organization: organization,
                storage: storage
            )

            let imageContainerRegistry = ImageContainerRegistry(
                registry: registry, organization: organization)
            let _ = try await imageContainerRegistry.pull(
                image: actualImage,
                name: vmName,
                locationName: storage)

            Logger.info(
                "Setting new VM mac address",
                metadata: [
                    "vm_name": vmName,
                    "location": storage ?? "default",
                ])

            // Update MAC address in the cloned VM to ensure uniqueness
            let vm = try get(name: vmName, storage: storage)
            try vm.setMacAddress(VZMACAddress.randomLocallyAdministered().string)

            Logger.info(
                "Image pulled successfully",
                metadata: [
                    "image": actualImage,
                    "name": vmName,
                    "registry": registry,
                    "organization": organization,
                    "location": storage ?? "default",
                ])
        } catch {
            Logger.error("Failed to pull image", metadata: ["error": error.localizedDescription])
            throw error
        }
    }

    @MainActor
    public func pushImage(
        name: String,
        imageName: String,
        tags: [String],
        registry: String,
        organization: String,
        storage: String? = nil,
        chunkSizeMb: Int = 512,
        verbose: Bool = false,
        dryRun: Bool = false,
        reassemble: Bool = false
    ) async throws {
        do {
            Logger.info(
                "Pushing VM to registry",
                metadata: [
                    "name": name,
                    "imageName": imageName,
                    "tags": "\(tags.joined(separator: ", "))",
                    "registry": registry,
                    "organization": organization,
                    "location": storage ?? "default",
                    "chunk_size": "\(chunkSizeMb)MB",
                    "dry_run": "\(dryRun)",
                    "reassemble": "\(reassemble)",
                ])

            try validatePushParameters(
                name: name,
                imageName: imageName,
                tags: tags,
                registry: registry,
                organization: organization
            )

            // Find the actual location of the VM
            let actualLocation = try self.validateVMExists(name, storage: storage)

            // Get the VM directory
            let vmDir = try home.getVMDirectory(name, storage: actualLocation)

            // Use ImageContainerRegistry to push the VM
            let imageContainerRegistry = ImageContainerRegistry(
                registry: registry, organization: organization)

            try await imageContainerRegistry.push(
                vmDirPath: vmDir.dir.path,
                imageName: imageName,
                tags: tags,
                chunkSizeMb: chunkSizeMb,
                verbose: verbose,
                dryRun: dryRun,
                reassemble: reassemble
            )

            Logger.info(
                "VM pushed successfully",
                metadata: [
                    "name": name,
                    "imageName": imageName,
                    "tags": "\(tags.joined(separator: ", "))",
                    "registry": registry,
                    "organization": organization,
                ])
        } catch {
            Logger.error("Failed to push VM", metadata: ["error": error.localizedDescription])
            throw error
        }
    }

    @MainActor
    public func pruneImages() async throws {
        Logger.info("Pruning cached images")

        do {
            // Use configured cache directory
            let cacheDir = (SettingsManager.shared.getCacheDirectory() as NSString)
                .expandingTildeInPath
            let ghcrDir = URL(fileURLWithPath: cacheDir).appendingPathComponent("ghcr")

            if FileManager.default.fileExists(atPath: ghcrDir.path) {
                try FileManager.default.removeItem(at: ghcrDir)
                try FileManager.default.createDirectory(
                    at: ghcrDir, withIntermediateDirectories: true)
                Logger.info("Successfully removed cached images")
            } else {
                Logger.info("No cached images found")
            }
        } catch {
            Logger.error("Failed to prune images", metadata: ["error": error.localizedDescription])
            throw error
        }
    }

    public struct ImageInfo: Codable {
        public let repository: String
        public let imageId: String  // This will be the shortened manifest ID
    }

    public struct ImageList: Codable {
        public let local: [ImageInfo]
        public let remote: [String]  // Keep this for future remote registry support
    }

    @MainActor
    public func getImages(organization: String = "trycua") async throws -> ImageList {
        Logger.info("Listing local images", metadata: ["organization": organization])

        let imageContainerRegistry = ImageContainerRegistry(
            registry: "ghcr.io", organization: organization)
        let cachedImages = try await imageContainerRegistry.getImages()

        let imageInfos = cachedImages.map { image in
            ImageInfo(
                repository: image.repository,
                imageId: String(image.manifestId.prefix(12))
            )
        }

        ImagesPrinter.print(images: imageInfos.map { "\($0.repository):\($0.imageId)" })
        return ImageList(local: imageInfos, remote: [])
    }

    // MARK: - Settings Management

    public func getSettings() -> LumeSettings {
        return SettingsManager.shared.getSettings()
    }

    public func setHomeDirectory(_ path: String) throws {
        // Try to set the home directory in settings
        try SettingsManager.shared.setHomeDirectory(path: path)

        // Force recreate home instance to use the new path
        try home.validateHomeDirectory()

        Logger.info("Home directory updated", metadata: ["path": path])
    }

    // MARK: - VM Location Management

    public func addLocation(name: String, path: String) throws {
        Logger.info("Adding VM location", metadata: ["name": name, "path": path])

        try home.addLocation(name: name, path: path)

        Logger.info("VM location added successfully", metadata: ["name": name])
    }

    public func removeLocation(name: String) throws {
        Logger.info("Removing VM location", metadata: ["name": name])

        try home.removeLocation(name: name)

        Logger.info("VM location removed successfully", metadata: ["name": name])
    }

    public func setDefaultLocation(name: String) throws {
        Logger.info("Setting default VM location", metadata: ["name": name])

        try home.setDefaultLocation(name: name)

        Logger.info("Default VM location set successfully", metadata: ["name": name])
    }

    public func getLocations() -> [VMLocation] {
        return home.getLocations()
    }

    // MARK: - Cache Directory Management

    public func setCacheDirectory(path: String) throws {
        Logger.info("Setting cache directory", metadata: ["path": path])

        try SettingsManager.shared.setCacheDirectory(path: path)

        Logger.info("Cache directory updated", metadata: ["path": path])
    }

    public func getCacheDirectory() -> String {
        return SettingsManager.shared.getCacheDirectory()
    }

    public func isCachingEnabled() -> Bool {
        return SettingsManager.shared.isCachingEnabled()
    }

    public func setCachingEnabled(_ enabled: Bool) throws {
        Logger.info("Setting caching enabled", metadata: ["enabled": "\(enabled)"])

        try SettingsManager.shared.setCachingEnabled(enabled)

        Logger.info("Caching setting updated", metadata: ["enabled": "\(enabled)"])
    }

    // MARK: - Private Helper Methods

    /// Normalizes a VM name by replacing colons with underscores
    private func normalizeVMName(name: String) -> String {
        let components = name.split(separator: ":")
        return components.count == 2 ? "\(components[0])_\(components[1])" : name
    }

    @MainActor
    private func createTempVMConfig(
        os: String,
        cpuCount: Int,
        memorySize: UInt64,
        diskSize: UInt64,
        display: String
    ) async throws -> VM {
        let config = try VMConfig(
            os: os,
            cpuCount: cpuCount,
            memorySize: memorySize,
            diskSize: diskSize,
            macAddress: VZMACAddress.randomLocallyAdministered().string,
            display: display
        )

        let vmDirContext = VMDirContext(
            dir: try home.createTempVMDirectory(),
            config: config,
            home: home,
            storage: nil
        )

        let imageLoader = os.lowercased() == "macos" ? imageLoaderFactory.createImageLoader() : nil
        return try vmFactory.createVM(vmDirContext: vmDirContext, imageLoader: imageLoader)
    }

    @MainActor
    private func loadVM(vmDir: VMDirectory, storage: String?) throws -> VM {
        // vmDir is now passed directly
        guard vmDir.initialized() else {
            throw VMError.notInitialized(vmDir.name) // Use name from vmDir
        }

        let config: VMConfig = try vmDir.loadConfig()
        // Pass the provided storage (which could be a path or named location)
        let vmDirContext = VMDirContext(
            dir: vmDir, config: config, home: home, storage: storage
        )

        let imageLoader =
            config.os.lowercased() == "macos" ? imageLoaderFactory.createImageLoader() : nil
        return try vmFactory.createVM(vmDirContext: vmDirContext, imageLoader: imageLoader)
    }

    // MARK: - Validation Methods

    private func validateCreateParameters(
        name: String, os: String, ipsw: String?, storage: String?
    ) throws {
        if os.lowercased() == "macos" {
            guard let ipsw = ipsw else {
                throw ValidationError("IPSW path required for macOS VM")
            }
            if ipsw != "latest" && !FileManager.default.fileExists(atPath: ipsw) {
                throw ValidationError("IPSW file not found")
            }
        } else if os.lowercased() == "linux" {
            if ipsw != nil {
                throw ValidationError("IPSW path not supported for Linux VM")
            }
        } else {
            throw ValidationError("Unsupported OS type: \(os)")
        }

        let vmDir: VMDirectory = try home.getVMDirectory(name, storage: storage)
        if vmDir.exists() {
            throw VMError.alreadyExists(name)
        }
    }

    private func validateSharedDirectories(_ directories: [SharedDirectory]) throws {
        for dir in directories {
            var isDirectory: ObjCBool = false
            guard FileManager.default.fileExists(atPath: dir.hostPath, isDirectory: &isDirectory),
                isDirectory.boolValue
            else {
                throw ValidationError(
                    "Host path does not exist or is not a directory: \(dir.hostPath)")
            }
        }
    }

    public func validateVMExists(_ name: String, storage: String? = nil) throws -> String? {
        // If location is specified, only check that location
        if let storage = storage {
            // Check if storage is a path by looking for directory separator
            if storage.contains("/") || storage.contains("\\") {
                // Treat as direct path
                let vmDir = try home.getVMDirectoryFromPath(name, storagePath: storage)
                guard vmDir.initialized() else {
                    throw VMError.notFound(name)
                }
                return storage  // Return the path as the location identifier
            } else {
                // Treat as named storage
                let vmDir = try home.getVMDirectory(name, storage: storage)
                guard vmDir.initialized() else {
                    throw VMError.notFound(name)
                }
                return storage
            }
        }

        // If no location specified, try to find the VM in any location
        let allVMs = try home.getAllVMDirectories()
        if let foundVM = allVMs.first(where: { $0.directory.name == name }) {
            // VM found, return its location
            return foundVM.locationName
        }

        // VM not found in any location
        throw VMError.notFound(name)
    }

    private func validateRunParameters(
        vmDir: VMDirectory, // Changed signature: accept VMDirectory
        sharedDirectories: [SharedDirectory]?,
        mount: Path?,
        usbMassStoragePaths: [Path]? = nil
    ) throws {
        // VM existence is confirmed by having vmDir, no need for validateVMExists
        if let dirs = sharedDirectories {
            try self.validateSharedDirectories(dirs)
        }

        // Validate USB mass storage paths
        if let usbPaths = usbMassStoragePaths {
            for path in usbPaths {
                if !FileManager.default.fileExists(atPath: path.path) {
                    throw ValidationError("USB mass storage image not found: \(path.path)")
                }
            }

            if #available(macOS 15.0, *) {
                // USB mass storage is supported
            } else {
                Logger.info(
                    "USB mass storage devices require macOS 15.0 or later. They will be ignored.")
            }
        }

        // Load config directly from vmDir
        let vmConfig = try vmDir.loadConfig()
        switch vmConfig.os.lowercased() {
        case "macos":
            if mount != nil {
                throw ValidationError(
                    "Mounting disk images is not supported for macOS VMs. If you are looking to mount a IPSW, please use the --ipsw option in the create command."
                )
            }
        case "linux":
            if let mount = mount, !FileManager.default.fileExists(atPath: mount.path) {
                throw ValidationError("Mount file not found: \(mount.path)")
            }
        default:
            break
        }
    }

    private func validatePullParameters(
        image: String,
        name: String,
        registry: String,
        organization: String,
        storage: String? = nil
    ) throws {
        guard !image.isEmpty else {
            throw ValidationError("Image name cannot be empty")
        }
        guard !name.isEmpty else {
            throw ValidationError("VM name cannot be empty")
        }
        guard !registry.isEmpty else {
            throw ValidationError("Registry cannot be empty")
        }
        guard !organization.isEmpty else {
            throw ValidationError("Organization cannot be empty")
        }

        // Determine if storage is a path or a named storage location
        let vmDir: VMDirectory
        if let storage = storage, storage.contains("/") || storage.contains("\\") {
            // Create the base directory if it doesn't exist
            if !FileManager.default.fileExists(atPath: storage) {
                Logger.info("Creating VM storage directory", metadata: ["path": storage])
                do {
                    try FileManager.default.createDirectory(
                        atPath: storage,
                        withIntermediateDirectories: true
                    )
                } catch {
                    throw HomeError.directoryCreationFailed(path: storage)
                }
            }
            
            // Use getVMDirectoryFromPath for direct paths
            vmDir = try home.getVMDirectoryFromPath(name, storagePath: storage)
        } else {
            // Use getVMDirectory for named storage locations
            vmDir = try home.getVMDirectory(name, storage: storage)
        }
        
        if vmDir.exists() {
            throw VMError.alreadyExists(name)
        }
    }

    private func validatePushParameters(
        name: String,
        imageName: String,
        tags: [String],
        registry: String,
        organization: String
    ) throws {
        guard !name.isEmpty else {
            throw ValidationError("VM name cannot be empty")
        }
        guard !imageName.isEmpty else {
            throw ValidationError("Image name cannot be empty")
        }
        guard !tags.isEmpty else {
            throw ValidationError("At least one tag must be provided.")
        }
        guard !registry.isEmpty else {
            throw ValidationError("Registry cannot be empty")
        }
        guard !organization.isEmpty else {
            throw ValidationError("Organization cannot be empty")
        }

        // Verify VM exists (this will throw if not found)
        _ = try self.validateVMExists(name)
    }
}

```

--------------------------------------------------------------------------------
/libs/python/computer/computer/providers/lumier/provider.py:
--------------------------------------------------------------------------------

```python
"""
Lumier VM provider implementation.

This provider uses Docker containers running the Lumier image to create
macOS and Linux VMs. It handles VM lifecycle operations through Docker
commands and container management.
"""

import asyncio
import json
import logging
import os
import re
import subprocess
import time
from typing import Any, Dict, List, Optional

from ..base import BaseVMProvider, VMProviderType
from ..lume_api import lume_api_get, lume_api_run, lume_api_stop, lume_api_update

# Setup logging
logger = logging.getLogger(__name__)

# Check if Docker is available
try:
    subprocess.run(["docker", "--version"], capture_output=True, check=True)
    HAS_LUMIER = True
except (subprocess.SubprocessError, FileNotFoundError):
    HAS_LUMIER = False


class LumierProvider(BaseVMProvider):
    """
    Lumier VM Provider implementation using Docker containers.

    This provider uses Docker to run Lumier containers that can create
    macOS and Linux VMs through containerization.
    """

    def __init__(
        self,
        provider_port: Optional[int] = 7777,
        host: str = "localhost",
        storage: Optional[str] = None,  # Can be a path or 'ephemeral'
        shared_path: Optional[str] = None,
        image: str = "macos-sequoia-cua:latest",  # VM image to use
        verbose: bool = False,
        ephemeral: bool = False,
        noVNC_port: Optional[int] = 8006,
    ):
        """Initialize the Lumier VM Provider.

        Args:
            provider_port: Port for the API server (default: 7777)
            host: Hostname for the API server (default: localhost)
            storage: Path for persistent VM storage
            shared_path: Path for shared folder between host and VM
            image: VM image to use (e.g. "macos-sequoia-cua:latest")
            verbose: Enable verbose logging
            ephemeral: Use ephemeral (temporary) storage
            noVNC_port: Specific port for noVNC interface (default: 8006)
        """
        self.host = host
        # Always ensure lume_port has a valid value (7777 is the default)
        self.lume_port = 7777 if provider_port is None else provider_port
        self.vnc_port = noVNC_port  # User-specified noVNC port, will be set in run_vm if provided
        self.ephemeral = ephemeral

        # Handle ephemeral storage (temporary directory)
        if ephemeral:
            self.storage = "ephemeral"
        else:
            self.storage = storage

        self.shared_path = shared_path
        self.image = image  # Store the VM image name to use
        # The container_name will be set in run_vm using the VM name
        self.verbose = verbose
        self._container_id = None
        self._api_url = None  # Will be set after container starts

    @property
    def provider_type(self) -> VMProviderType:
        """Return the provider type."""
        return VMProviderType.LUMIER

    def _parse_memory(self, memory_str: str) -> int:
        """Parse memory string to MB integer.

        Examples:
            "8GB" -> 8192
            "1024MB" -> 1024
            "512" -> 512
        """
        if isinstance(memory_str, int):
            return memory_str

        if isinstance(memory_str, str):
            # Extract number and unit
            match = re.match(r"(\d+)([A-Za-z]*)", memory_str)
            if match:
                value, unit = match.groups()
                value = int(value)
                unit = unit.upper()

                if unit == "GB" or unit == "G":
                    return value * 1024
                elif unit == "MB" or unit == "M" or unit == "":
                    return value

        # Default fallback
        logger.warning(f"Could not parse memory string '{memory_str}', using 8GB default")
        return 8192  # Default to 8GB

    # Helper methods for interacting with the Lumier API through curl
    # These methods handle the various VM operations via API calls

    def _get_curl_error_message(self, return_code: int) -> str:
        """Get a descriptive error message for curl return codes.

        Args:
            return_code: The curl return code

        Returns:
            A descriptive error message
        """
        # Map common curl error codes to helpful messages
        if return_code == 7:
            return "Failed to connect - API server is starting up"
        elif return_code == 22:
            return "HTTP error returned from API server"
        elif return_code == 28:
            return "Operation timeout - API server is slow to respond"
        elif return_code == 52:
            return "Empty reply from server - API is starting but not ready"
        elif return_code == 56:
            return "Network problem during data transfer"
        else:
            return f"Unknown curl error code: {return_code}"

    async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Get VM information by name.

        Args:
            name: Name of the VM to get information for
            storage: Optional storage path override. If provided, this will be used
                    instead of the provider's default storage path.

        Returns:
            Dictionary with VM information including status, IP address, etc.
        """
        if not HAS_LUMIER:
            logger.error("Docker is not available. Cannot get VM status.")
            return {"name": name, "status": "unavailable", "error": "Docker is not available"}

        # Store the current name for API requests
        self.container_name = name

        try:
            # Check if the container exists and is running
            check_cmd = [
                "docker",
                "ps",
                "-a",
                "--filter",
                f"name={name}",
                "--format",
                "{{.Status}}",
            ]
            check_result = subprocess.run(check_cmd, capture_output=True, text=True)
            container_status = check_result.stdout.strip()

            if not container_status:
                logger.info(f"Container {name} does not exist. Will create when run_vm is called.")
                return {
                    "name": name,
                    "status": "not_found",
                    "message": "Container doesn't exist yet",
                }

            # Container exists, check if it's running
            is_running = container_status.startswith("Up")

            if not is_running:
                logger.info(
                    f"Container {name} exists but is not running. Status: {container_status}"
                )
                return {
                    "name": name,
                    "status": "stopped",
                    "container_status": container_status,
                }

            # Container is running, get the IP address and API status from Lumier API
            logger.info(f"Container {name} is running. Getting VM status from API.")

            # Use the shared lume_api_get function directly
            vm_info = lume_api_get(
                vm_name=name,
                host=self.host,
                port=self.lume_port,
                storage=storage if storage is not None else self.storage,
                debug=self.verbose,
                verbose=self.verbose,
            )

            # Check for API errors
            if "error" in vm_info:
                # Use debug level instead of warning to reduce log noise during polling
                logger.debug(f"API request error: {vm_info['error']}")
                return {
                    "name": name,
                    "status": "running",  # Container is running even if API is not responsive
                    "api_status": "error",
                    "error": vm_info["error"],
                    "container_status": container_status,
                }

            # Process the VM status information
            vm_status = vm_info.get("status", "unknown")
            vnc_url = vm_info.get("vncUrl", "")
            ip_address = vm_info.get("ipAddress", "")

            # IMPORTANT: Always ensure we have a valid IP address for connectivity
            # If the API doesn't return an IP address, default to localhost (127.0.0.1)
            # This makes the behavior consistent with LumeProvider
            if not ip_address and vm_status == "running":
                ip_address = "127.0.0.1"
                logger.info(f"No IP address returned from API, defaulting to {ip_address}")
                vm_info["ipAddress"] = ip_address

            logger.info(f"VM {name} status: {vm_status}")

            if ip_address and vnc_url:
                logger.info(f"VM {name} has IP: {ip_address} and VNC URL: {vnc_url}")
            elif not ip_address and not vnc_url and vm_status != "running":
                # Not running is expected in this case
                logger.info(f"VM {name} is not running yet. Status: {vm_status}")
            else:
                # Missing IP or VNC but status is running - this is unusual but handled with default IP
                logger.warning(
                    f"VM {name} is running but missing expected fields. API response: {vm_info}"
                )

            # Return the full status information
            return {
                "name": name,
                "status": vm_status,
                "ip_address": ip_address,
                "vnc_url": vnc_url,
                "api_status": "ok",
                "container_status": container_status,
                **vm_info,  # Include all fields from the API response
            }
        except subprocess.SubprocessError as e:
            logger.error(f"Failed to check container status: {e}")
            return {
                "name": name,
                "status": "error",
                "error": f"Failed to check container status: {str(e)}",
            }

    async def list_vms(self) -> List[Dict[str, Any]]:
        """List all VMs managed by this provider.

        For Lumier provider, there is only one VM per container.
        """
        try:
            status = await self.get_vm("default")
            return [status] if status.get("status") != "unknown" else []
        except Exception as e:
            logger.error(f"Failed to list VMs: {e}")
            return []

    async def run_vm(
        self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None
    ) -> Dict[str, Any]:
        """Run a VM with the given options.

        Args:
            image: Name/tag of the image to use
            name: Name of the VM to run (used for the container name and Docker image tag)
            run_opts: Options for running the VM, including:
                - cpu: Number of CPU cores
                - memory: Amount of memory (e.g. "8GB")
                - noVNC_port: Specific port for noVNC interface

        Returns:
            Dictionary with VM status information
        """
        # Set the container name using the VM name for consistency
        self.container_name = name
        try:
            # First, check if container already exists and remove it
            try:
                check_cmd = [
                    "docker",
                    "ps",
                    "-a",
                    "--filter",
                    f"name={self.container_name}",
                    "--format",
                    "{{.ID}}",
                ]
                check_result = subprocess.run(check_cmd, capture_output=True, text=True)
                existing_container = check_result.stdout.strip()

                if existing_container:
                    logger.info(f"Removing existing container: {self.container_name}")
                    remove_cmd = ["docker", "rm", "-f", self.container_name]
                    subprocess.run(remove_cmd, check=True)
            except subprocess.CalledProcessError as e:
                logger.warning(f"Error removing existing container: {e}")
                # Continue anyway, next steps will fail if there's a real problem

            # Prepare the Docker run command
            cmd = ["docker", "run", "-d", "--name", self.container_name]

            cmd.extend(["-p", f"{self.vnc_port}:8006"])
            logger.debug(f"Using specified noVNC_port: {self.vnc_port}")

            # Set API URL using the API port
            self._api_url = f"http://{self.host}:{self.lume_port}"

            # Parse memory setting
            memory_mb = self._parse_memory(run_opts.get("memory", "8GB"))

            # Add storage volume mount if storage is specified (for persistent VM storage)
            if self.storage and self.storage != "ephemeral":
                # Create storage directory if it doesn't exist
                storage_dir = os.path.abspath(os.path.expanduser(self.storage or ""))
                os.makedirs(storage_dir, exist_ok=True)

                # Add volume mount for storage
                cmd.extend(
                    ["-v", f"{storage_dir}:/storage", "-e", f"HOST_STORAGE_PATH={storage_dir}"]
                )
                logger.debug(f"Using persistent storage at: {storage_dir}")

            # Add shared folder volume mount if shared_path is specified
            if self.shared_path:
                # Create shared directory if it doesn't exist
                shared_dir = os.path.abspath(os.path.expanduser(self.shared_path or ""))
                os.makedirs(shared_dir, exist_ok=True)

                # Add volume mount for shared folder
                cmd.extend(["-v", f"{shared_dir}:/shared", "-e", f"HOST_SHARED_PATH={shared_dir}"])
                logger.debug(f"Using shared folder at: {shared_dir}")

            # Add environment variables
            # Always use the container_name as the VM_NAME for consistency
            # Use the VM image passed from the Computer class
            logger.debug(f"Using VM image: {self.image}")

            # If ghcr.io is in the image, use the full image name
            if "ghcr.io" in self.image:
                vm_image = self.image
            else:
                vm_image = f"ghcr.io/trycua/{self.image}"

            cmd.extend(
                [
                    "-e",
                    f"VM_NAME={self.container_name}",
                    "-e",
                    f"VERSION={vm_image}",
                    "-e",
                    f"CPU_CORES={run_opts.get('cpu', '4')}",
                    "-e",
                    f"RAM_SIZE={memory_mb}",
                ]
            )

            # Specify the Lumier image with the full image name
            lumier_image = "trycua/lumier:latest"

            # First check if the image exists locally
            try:
                logger.debug(f"Checking if Docker image {lumier_image} exists locally...")
                check_image_cmd = ["docker", "image", "inspect", lumier_image]
                subprocess.run(check_image_cmd, capture_output=True, check=True)
                logger.debug(f"Docker image {lumier_image} found locally.")
            except subprocess.CalledProcessError:
                # Image doesn't exist locally
                logger.warning(f"\nWARNING: Docker image {lumier_image} not found locally.")
                logger.warning(
                    "The system will attempt to pull it from Docker Hub, which may fail if you have network connectivity issues."
                )
                logger.warning(
                    "If the Docker pull fails, you may need to manually pull the image first with:"
                )
                logger.warning(f"  docker pull {lumier_image}\n")

            # Add the image to the command
            cmd.append(lumier_image)

            # Print the Docker command for debugging
            logger.debug(f"DOCKER COMMAND: {' '.join(cmd)}")

            # Run the container with improved error handling
            try:
                result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            except subprocess.CalledProcessError as e:
                if (
                    "no route to host" in str(e.stderr).lower()
                    or "failed to resolve reference" in str(e.stderr).lower()
                ):
                    error_msg = (
                        f"Network error while trying to pull Docker image '{lumier_image}'\n"
                        f"Error: {e.stderr}\n\n"
                        f"SOLUTION: Please try one of the following:\n"
                        f"1. Check your internet connection\n"
                        f"2. Pull the image manually with: docker pull {lumier_image}\n"
                        f"3. Check if Docker is running properly\n"
                    )
                    logger.error(error_msg)
                    raise RuntimeError(error_msg)
                raise

            # Container started, now check VM status with polling
            logger.debug("Container started, checking VM status...")
            logger.debug(
                "NOTE: This may take some time while the VM image is being pulled and initialized"
            )

            # Start a background thread to show container logs in real-time
            import threading

            def show_container_logs():
                # Give the container a moment to start generating logs
                time.sleep(1)
                logger.debug(f"\n---- CONTAINER LOGS FOR '{name}' (LIVE) ----")
                logger.debug(
                    "Showing logs as they are generated. Press Ctrl+C to stop viewing logs...\n"
                )

                try:
                    # Use docker logs with follow option
                    log_cmd = ["docker", "logs", "--tail", "30", "--follow", name]
                    process = subprocess.Popen(
                        log_cmd,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT,
                        text=True,
                        bufsize=1,
                        universal_newlines=True,
                    )

                    # Read and print logs line by line
                    for line in process.stdout:
                        logger.debug(line, end="")

                        # Break if process has exited
                        if process.poll() is not None:
                            break
                except Exception as e:
                    logger.error(f"\nError showing container logs: {e}")
                    if self.verbose:
                        logger.error(f"Error in log streaming thread: {e}")
                finally:
                    logger.debug("\n---- LOG STREAMING ENDED ----")
                    # Make sure process is terminated
                    if "process" in locals() and process.poll() is None:
                        process.terminate()

            # Start log streaming in a background thread if verbose mode is enabled
            log_thread = threading.Thread(target=show_container_logs)
            log_thread.daemon = True  # Thread will exit when main program exits
            log_thread.start()

            # Skip waiting for container readiness and just poll get_vm directly
            # Poll the get_vm method indefinitely until the VM is ready with an IP address
            attempt = 0
            consecutive_errors = 0
            vm_running = False

            while True:  # Wait indefinitely
                try:
                    # Use longer delays to give the system time to initialize
                    if attempt > 0:
                        # Start with 5s delay, then increase gradually up to 30s for later attempts
                        # But use shorter delays while we're getting API errors
                        if consecutive_errors > 0 and consecutive_errors < 5:
                            wait_time = 3  # Use shorter delays when we're getting API errors
                        else:
                            wait_time = min(30, 5 + (attempt * 2))

                        logger.debug(f"Waiting {wait_time}s before retry #{attempt+1}...")
                        await asyncio.sleep(wait_time)

                    # Try to get VM status
                    logger.debug(f"Checking VM status (attempt {attempt+1})...")
                    vm_status = await self.get_vm(name)

                    # Check for API errors
                    if "error" in vm_status:
                        consecutive_errors += 1
                        error_msg = vm_status.get("error", "Unknown error")

                        # Only print a user-friendly status message, not the raw error
                        # since _lume_api_get already logged the technical details
                        if consecutive_errors == 1 or attempt % 5 == 0:
                            if "Empty reply from server" in error_msg:
                                logger.info(
                                    "API server is starting up - container is running, but API isn't fully initialized yet."
                                )
                                logger.info(
                                    "This is expected during the initial VM setup - will continue polling..."
                                )
                            else:
                                # Don't repeat the exact same error message each time
                                logger.warning(
                                    f"API request error (attempt {attempt+1}): {error_msg}"
                                )
                                # Just log that we're still working on it
                                if attempt > 3:
                                    logger.debug(
                                        "Still waiting for the API server to become available..."
                                    )

                        # If we're getting errors but container is running, that's normal during startup
                        if vm_status.get("status") == "running":
                            if not vm_running:
                                logger.info(
                                    "Container is running, waiting for the VM within it to become fully ready..."
                                )
                                logger.info("This might take a minute while the VM initializes...")
                                vm_running = True

                        # Increase counter and continue
                        attempt += 1
                        continue

                    # Reset consecutive error counter when we get a successful response
                    consecutive_errors = 0

                    # If the VM is running, check if it has an IP address (which means it's fully ready)
                    if vm_status.get("status") == "running":
                        vm_running = True

                        # Check if we have an IP address, which means the VM is fully ready
                        if "ip_address" in vm_status and vm_status["ip_address"]:
                            logger.info(
                                f"VM is now fully running with IP: {vm_status.get('ip_address')}"
                            )
                            if "vnc_url" in vm_status and vm_status["vnc_url"]:
                                logger.info(f"VNC URL: {vm_status.get('vnc_url')}")
                            return vm_status
                        else:
                            logger.debug(
                                "VM is running but still initializing network interfaces..."
                            )
                            logger.debug("Waiting for IP address to be assigned...")
                    else:
                        # VM exists but might still be starting up
                        status = vm_status.get("status", "unknown")
                        logger.debug(f"VM found but status is: {status}. Continuing to poll...")

                    # Increase counter for next iteration's delay calculation
                    attempt += 1

                    # If we reach a very large number of attempts, give a reassuring message but continue
                    if attempt % 10 == 0:
                        logger.debug(
                            f"Still waiting after {attempt} attempts. This might take several minutes for first-time setup."
                        )
                        if not vm_running and attempt >= 20:
                            logger.warning(
                                "\nNOTE: First-time VM initialization can be slow as images are downloaded."
                            )
                            logger.warning(
                                "If this continues for more than 10 minutes, you may want to check:"
                            )
                            logger.warning("  1. Docker logs with: docker logs " + name)
                            logger.warning("  2. If your network can access container registries")
                            logger.warning("Press Ctrl+C to abort if needed.\n")

                    # After 150 attempts (likely over 30-40 minutes), return current status
                    if attempt >= 150:
                        logger.debug(
                            f"Reached 150 polling attempts. VM status is: {vm_status.get('status', 'unknown')}"
                        )
                        logger.debug(
                            "Returning current VM status, but please check Docker logs if there are issues."
                        )
                        return vm_status

                except Exception as e:
                    # Always continue retrying, but with increasing delays
                    logger.warning(
                        f"Error checking VM status (attempt {attempt+1}): {e}. Will retry."
                    )
                    consecutive_errors += 1

                    # If we've had too many consecutive errors, might be a deeper problem
                    if consecutive_errors >= 10:
                        logger.warning(
                            f"\nWARNING: Encountered {consecutive_errors} consecutive errors while checking VM status."
                        )
                        logger.warning(
                            "You may need to check the Docker container logs or restart the process."
                        )
                        logger.warning(f"Error details: {str(e)}\n")

                    # Increase attempt counter for next iteration
                    attempt += 1

                    # After many consecutive errors, add a delay to avoid hammering the system
                    if attempt > 5:
                        error_delay = min(30, 10 + attempt)
                        logger.warning(
                            f"Multiple connection errors, waiting {error_delay}s before next attempt..."
                        )
                        await asyncio.sleep(error_delay)

        except subprocess.CalledProcessError as e:
            error_msg = (
                f"Failed to start Lumier container: {e.stderr if hasattr(e, 'stderr') else str(e)}"
            )
            logger.error(error_msg)
            raise RuntimeError(error_msg)

    async def _wait_for_container_ready(self, container_name: str, timeout: int = 90) -> bool:
        """Wait for the Lumier container to be fully ready with a valid API response.

        Args:
            container_name: Name of the Docker container to check
            timeout: Maximum time to wait in seconds (default: 90 seconds)

        Returns:
            True if the container is running, even if API is not fully ready.
            This allows operations to continue with appropriate fallbacks.
        """
        start_time = time.time()
        api_ready = False
        container_running = False

        logger.debug(f"Waiting for container {container_name} to be ready (timeout: {timeout}s)...")

        while time.time() - start_time < timeout:
            # Check if container is running
            try:
                check_cmd = [
                    "docker",
                    "ps",
                    "--filter",
                    f"name={container_name}",
                    "--format",
                    "{{.Status}}",
                ]
                result = subprocess.run(check_cmd, capture_output=True, text=True, check=True)
                container_status = result.stdout.strip()

                if container_status and container_status.startswith("Up"):
                    container_running = True
                    logger.info(
                        f"Container {container_name} is running with status: {container_status}"
                    )
                else:
                    logger.warning(
                        f"Container {container_name} not yet running, status: {container_status}"
                    )
                    # container is not running yet, wait and try again
                    await asyncio.sleep(2)  # Longer sleep to give Docker time
                    continue
            except subprocess.CalledProcessError as e:
                logger.warning(f"Error checking container status: {e}")
                await asyncio.sleep(2)
                continue

            # Container is running, check if API is responsive
            try:
                # First check the health endpoint
                api_url = f"http://{self.host}:{self.lume_port}/health"
                logger.info(f"Checking API health at: {api_url}")

                # Use longer timeout for API health check since it may still be initializing
                curl_cmd = ["curl", "-s", "--connect-timeout", "5", "--max-time", "10", api_url]
                result = subprocess.run(curl_cmd, capture_output=True, text=True)

                if result.returncode == 0 and "ok" in result.stdout.lower():
                    api_ready = True
                    logger.info(f"API is ready at {api_url}")
                    break
                else:
                    # API health check failed, now let's check if the VM status endpoint is responsive
                    # This covers cases where the health endpoint isn't implemented but the VM API is working
                    vm_api_url = f"http://{self.host}:{self.lume_port}/lume/vms/{container_name}"
                    if self.storage:
                        import urllib.parse

                        encoded_storage = urllib.parse.quote_plus(self.storage)
                        vm_api_url += f"?storage={encoded_storage}"

                    curl_vm_cmd = [
                        "curl",
                        "-s",
                        "--connect-timeout",
                        "5",
                        "--max-time",
                        "10",
                        vm_api_url,
                    ]
                    vm_result = subprocess.run(curl_vm_cmd, capture_output=True, text=True)

                    if vm_result.returncode == 0 and vm_result.stdout.strip():
                        # VM API responded with something - consider the API ready
                        api_ready = True
                        logger.info(f"VM API is ready at {vm_api_url}")
                        break
                    else:
                        curl_code = result.returncode
                        if curl_code == 0:
                            curl_code = vm_result.returncode

                        # Map common curl error codes to helpful messages
                        if curl_code == 7:
                            curl_error = "Failed to connect - API server is starting up"
                        elif curl_code == 22:
                            curl_error = "HTTP error returned from API server"
                        elif curl_code == 28:
                            curl_error = "Operation timeout - API server is slow to respond"
                        elif curl_code == 52:
                            curl_error = "Empty reply from server - API is starting but not ready"
                        elif curl_code == 56:
                            curl_error = "Network problem during data transfer"
                        else:
                            curl_error = f"Unknown curl error code: {curl_code}"

                        logger.info(f"API not ready yet: {curl_error}")
            except subprocess.SubprocessError as e:
                logger.warning(f"Error checking API status: {e}")

            # If the container is running but API is not ready, that's OK - we'll just wait
            # a bit longer before checking again, as the container may still be initializing
            elapsed_seconds = time.time() - start_time
            if (
                int(elapsed_seconds) % 5 == 0
            ):  # Only print status every 5 seconds to reduce verbosity
                logger.debug(
                    f"Waiting for API to initialize... ({elapsed_seconds:.1f}s / {timeout}s)"
                )

            await asyncio.sleep(3)  # Longer sleep between API checks

        # Handle timeout - if the container is running but API is not ready, that's not
        # necessarily an error - the API might just need more time to start up
        if not container_running:
            logger.warning(f"Timed out waiting for container {container_name} to start")
            return False

        if not api_ready:
            logger.warning(
                f"Container {container_name} is running, but API is not fully ready yet."
            )
            logger.warning(
                "NOTE: You may see some 'API request failed' messages while the API initializes."
            )

        # Return True if container is running, even if API isn't ready yet
        # This allows VM operations to proceed, with appropriate retries for API calls
        return container_running

    async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Stop a running VM by stopping the Lumier container."""
        try:
            # Use Docker commands to stop the container directly
            if hasattr(self, "_container_id") and self._container_id:
                logger.info(f"Stopping Lumier container: {self.container_name}")
                cmd = ["docker", "stop", self.container_name]
                result = subprocess.run(cmd, capture_output=True, text=True, check=True)
                logger.info(f"Container stopped: {result.stdout.strip()}")

                # Return minimal status info
                return {
                    "name": name,
                    "status": "stopped",
                    "container_id": self._container_id,
                }
            else:
                # Try to find the container by name
                check_cmd = [
                    "docker",
                    "ps",
                    "-a",
                    "--filter",
                    f"name={self.container_name}",
                    "--format",
                    "{{.ID}}",
                ]
                check_result = subprocess.run(check_cmd, capture_output=True, text=True)
                container_id = check_result.stdout.strip()

                if container_id:
                    logger.info(f"Found container ID: {container_id}")
                    cmd = ["docker", "stop", self.container_name]
                    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
                    logger.info(f"Container stopped: {result.stdout.strip()}")

                    return {
                        "name": name,
                        "status": "stopped",
                        "container_id": container_id,
                    }
                else:
                    logger.warning(f"No container found with name {self.container_name}")
                    return {
                        "name": name,
                        "status": "unknown",
                    }
        except subprocess.CalledProcessError as e:
            error_msg = f"Failed to stop container: {e.stderr if hasattr(e, 'stderr') else str(e)}"
            logger.error(error_msg)
            raise RuntimeError(f"Failed to stop Lumier container: {error_msg}")

    # update_vm is not implemented as it's not needed for Lumier
    # The BaseVMProvider requires it, so we provide a minimal implementation
    async def update_vm(
        self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None
    ) -> Dict[str, Any]:
        """Not implemented for Lumier provider."""
        logger.warning("update_vm is not implemented for Lumier provider")
        return {"name": name, "status": "unchanged"}

    async def get_logs(
        self, name: str, num_lines: int = 100, follow: bool = False, timeout: Optional[int] = None
    ) -> str:
        """Get the logs from the Lumier container.

        Args:
            name: Name of the VM/container to get logs for
            num_lines: Number of recent log lines to return (default: 100)
            follow: If True, follow the logs (stream new logs as they are generated)
            timeout: Optional timeout in seconds for follow mode (None means no timeout)

        Returns:
            Container logs as a string

        Note:
            If follow=True, this function will continuously stream logs until timeout
            or until interrupted. The output will be printed to console in real-time.
        """
        if not HAS_LUMIER:
            error_msg = "Docker is not available. Cannot get container logs."
            logger.error(error_msg)
            return error_msg

        # Make sure we have a container name
        container_name = name

        # Check if the container exists and is running
        try:
            # Check if the container exists
            inspect_cmd = ["docker", "container", "inspect", container_name]
            result = subprocess.run(inspect_cmd, capture_output=True, text=True)

            if result.returncode != 0:
                error_msg = f"Container '{container_name}' does not exist or is not accessible"
                logger.error(error_msg)
                return error_msg
        except Exception as e:
            error_msg = f"Error checking container status: {str(e)}"
            logger.error(error_msg)
            return error_msg

        # Base docker logs command
        log_cmd = ["docker", "logs"]

        # Add tail parameter to limit the number of lines
        log_cmd.extend(["--tail", str(num_lines)])

        # Handle follow mode with or without timeout
        if follow:
            log_cmd.append("--follow")

            if timeout is not None:
                # For follow mode with timeout, we'll run the command and handle the timeout
                log_cmd.append(container_name)
                logger.info(
                    f"Following logs for container '{container_name}' with timeout {timeout}s"
                )
                logger.info(f"\n---- CONTAINER LOGS FOR '{container_name}' (LIVE) ----")
                logger.info("Press Ctrl+C to stop following logs\n")

                try:
                    # Run with timeout
                    process = subprocess.Popen(log_cmd, text=True)

                    # Wait for the specified timeout
                    if timeout:
                        try:
                            process.wait(timeout=timeout)
                        except subprocess.TimeoutExpired:
                            process.terminate()  # Stop after timeout
                            logger.info(
                                f"\n---- LOG FOLLOWING STOPPED (timeout {timeout}s reached) ----"
                            )
                    else:
                        # Without timeout, wait for user interruption
                        process.wait()

                    return "Logs were displayed to console in follow mode"
                except KeyboardInterrupt:
                    process.terminate()
                    logger.info("\n---- LOG FOLLOWING STOPPED (user interrupted) ----")
                    return "Logs were displayed to console in follow mode (interrupted)"
            else:
                # For follow mode without timeout, we'll print a helpful message
                log_cmd.append(container_name)
                logger.info(f"Following logs for container '{container_name}' indefinitely")
                logger.info(f"\n---- CONTAINER LOGS FOR '{container_name}' (LIVE) ----")
                logger.info("Press Ctrl+C to stop following logs\n")

                try:
                    # Run the command and let it run until interrupted
                    process = subprocess.Popen(log_cmd, text=True)
                    process.wait()  # Wait indefinitely (until user interrupts)
                    return "Logs were displayed to console in follow mode"
                except KeyboardInterrupt:
                    process.terminate()
                    logger.info("\n---- LOG FOLLOWING STOPPED (user interrupted) ----")
                    return "Logs were displayed to console in follow mode (interrupted)"
        else:
            # For non-follow mode, capture and return the logs as a string
            log_cmd.append(container_name)
            logger.info(f"Getting {num_lines} log lines for container '{container_name}'")

            try:
                result = subprocess.run(log_cmd, capture_output=True, text=True, check=True)
                logs = result.stdout

                # Only print header and logs if there's content
                if logs.strip():
                    logger.info(
                        f"\n---- CONTAINER LOGS FOR '{container_name}' (LAST {num_lines} LINES) ----\n"
                    )
                    logger.info(logs)
                    logger.info("\n---- END OF LOGS ----")
                else:
                    logger.info(f"\nNo logs available for container '{container_name}'")

                return logs
            except subprocess.CalledProcessError as e:
                error_msg = f"Error getting logs: {e.stderr}"
                logger.error(error_msg)
                return error_msg
            except Exception as e:
                error_msg = f"Unexpected error getting logs: {str(e)}"
                logger.error(error_msg)
                return error_msg

    async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        raise NotImplementedError("LumierProvider does not support restarting VMs.")

    async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str:
        """Get the IP address of a VM, waiting indefinitely until it's available.

        Args:
            name: Name of the VM to get the IP for
            storage: Optional storage path override
            retry_delay: Delay between retries in seconds (default: 2)

        Returns:
            IP address of the VM when it becomes available
        """
        # Use container_name = name for consistency
        self.container_name = name

        # Track total attempts for logging purposes
        total_attempts = 0

        # Loop indefinitely until we get a valid IP
        while True:
            total_attempts += 1

            # Log retry message but not on first attempt
            if total_attempts > 1:
                logger.info(f"Waiting for VM {name} IP address (attempt {total_attempts})...")

            try:
                # Get VM information
                vm_info = await self.get_vm(name, storage=storage)

                # Check if we got a valid IP
                ip = vm_info.get("ip_address", None)
                if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
                    logger.info(f"Got valid VM IP address: {ip}")
                    return ip

                # Check the VM status
                status = vm_info.get("status", "unknown")

                # Special handling for Lumier: it may report "stopped" even when the VM is starting
                # If the VM information contains an IP but status is stopped, it might be a race condition
                if status == "stopped" and "ip_address" in vm_info:
                    ip = vm_info.get("ip_address")
                    if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
                        logger.info(f"Found valid IP {ip} despite VM status being {status}")
                        return ip
                    logger.info(f"VM status is {status}, but still waiting for IP to be assigned")
                # If VM is not running yet, log and wait
                elif status != "running":
                    logger.info(f"VM is not running yet (status: {status}). Waiting...")
                # If VM is running but no IP yet, wait and retry
                else:
                    logger.info("VM is running but no valid IP address yet. Waiting...")

            except Exception as e:
                logger.warning(f"Error getting VM {name} IP: {e}, continuing to wait...")

            # Wait before next retry
            await asyncio.sleep(retry_delay)

            # Add progress log every 10 attempts
            if total_attempts % 10 == 0:
                logger.info(f"Still waiting for VM {name} IP after {total_attempts} attempts...")

    async def __aenter__(self):
        """Async context manager entry.

        This method is called when entering an async context manager block.
        Returns self to be used in the context.
        """
        logger.debug("Entering LumierProvider context")

        # Initialize the API URL with the default value if not already set
        # This ensures get_vm can work before run_vm is called
        if not hasattr(self, "_api_url") or not self._api_url:
            self._api_url = f"http://{self.host}:{self.lume_port}"
            logger.info(f"Initialized default Lumier API URL: {self._api_url}")

        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit.

        This method is called when exiting an async context manager block.
        It handles proper cleanup of resources, including stopping any running containers.
        """
        logger.debug(f"Exiting LumierProvider context, handling exceptions: {exc_type}")
        try:
            # If we have a container ID, we should stop it to clean up resources
            if hasattr(self, "_container_id") and self._container_id:
                logger.info(f"Stopping Lumier container on context exit: {self.container_name}")
                try:
                    cmd = ["docker", "stop", self.container_name]
                    subprocess.run(cmd, capture_output=True, text=True, check=True)
                    logger.info(f"Container stopped during context exit: {self.container_name}")
                except subprocess.CalledProcessError as e:
                    logger.warning(f"Failed to stop container during cleanup: {e.stderr}")
                    # Don't raise an exception here, we want to continue with cleanup
        except Exception as e:
            logger.error(f"Error during LumierProvider cleanup: {e}")
            # We don't want to suppress the original exception if there was one
            if exc_type is None:
                raise
        # Return False to indicate that any exception should propagate
        return False

```
Page 17/20FirstPrevNextLast