#
tokens: 47213/50000 10/513 files (page 10/16)
lines: off (toggle) GitHub
raw markdown copy
This is page 10 of 16. Use http://codebase.md/trycua/cua?page={x} to view the full context.

# Directory Structure

```
├── .all-contributorsrc
├── .cursorignore
├── .devcontainer
│   ├── devcontainer.json
│   ├── post-install.sh
│   └── README.md
├── .dockerignore
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── ci-lume.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-pylume.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       └── test-validation-script.yml
├── .gitignore
├── .vscode
│   ├── docs.code-workspace
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── composite-agents.md
│   ├── cua-hackathon.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .gitignore
│   ├── .prettierrc
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   └── meta.json
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── meta.json
│   │       │   └── sandboxed-python.mdx
│   │       ├── index.mdx
│   │       ├── libraries
│   │       │   ├── agent
│   │       │   │   └── index.mdx
│   │       │   ├── computer
│   │       │   │   └── index.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── core
│   │       │   │   └── index.mdx
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   └── som
│   │       │       ├── configuration.mdx
│   │       │       └── index.mdx
│   │       ├── meta.json
│   │       ├── quickstart-cli.mdx
│   │       ├── quickstart-devs.mdx
│   │       └── telemetry.mdx
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   └── llms.txt
│   │   │       └── route.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── iou.tsx
│   │   │   └── mermaid.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   └── mdx-components.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── .prettierrc
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   └── uitars.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   └── test_connection.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── scripts
│   │   │       ├── install_mcp_server.sh
│   │   │       └── start_mcp_server.sh
│   │   ├── pylume
│   │   │   ├── __init__.py
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── pylume
│   │   │   │   ├── __init__.py
│   │   │   │   ├── client.py
│   │   │   │   ├── exceptions.py
│   │   │   │   ├── lume
│   │   │   │   ├── models.py
│   │   │   │   ├── pylume.py
│   │   │   │   └── server.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           └── test_omniparser.py
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── biome.json
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Dockerfile
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── pylume_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── pdm.lock
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── samples
│   └── community
│       ├── global-online
│       │   └── README.md
│       └── hack-the-north
│           └── README.md
├── scripts
│   ├── build-uv.sh
│   ├── build.ps1
│   ├── build.sh
│   ├── cleanup.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   └── run-docker-dev.sh
└── tests
    ├── pytest.ini
    ├── shell_cmd.py
    ├── test_files.py
    ├── test_mcp_server_session_management.py
    ├── test_mcp_server_streaming.py
    ├── test_shell_bash.py
    ├── test_telemetry.py
    ├── test_venv.py
    └── test_watchdog.py
```

# Files

--------------------------------------------------------------------------------
/libs/lume/src/Commands/Logs.swift:
--------------------------------------------------------------------------------

```swift
import ArgumentParser
import Foundation

struct Logs: ParsableCommand {
    static let configuration = CommandConfiguration(
        abstract: "View lume serve logs",
        subcommands: [Info.self, Error.self, All.self],
        defaultSubcommand: All.self
    )
    
    // Common functionality for reading log files
    static func readLogFile(path: String, lines: Int? = nil, follow: Bool = false) -> String {
        let fileManager = FileManager.default
        
        // Check if file exists
        guard fileManager.fileExists(atPath: path) else {
            return "Log file not found at \(path)"
        }
        
        do {
            // Read file content
            let content = try String(contentsOfFile: path, encoding: .utf8)
            
            // If lines parameter is provided, return only the specified number of lines from the end
            if let lineCount = lines {
                let allLines = content.components(separatedBy: .newlines)
                let startIndex = max(0, allLines.count - lineCount)
                let lastLines = Array(allLines[startIndex...])
                return lastLines.joined(separator: "\n")
            }
            
            return content
        } catch {
            return "Error reading log file: \(error.localizedDescription)"
        }
    }
    
    // Method for tailing a log file (following new changes)
    static func tailLogFile(path: String, initialLines: Int? = 10) {
        let fileManager = FileManager.default
        
        // Check if file exists
        guard fileManager.fileExists(atPath: path) else {
            print("Log file not found at \(path)")
            return
        }
        
        do {
            // Get initial content with only the specified number of lines from the end
            var lastPosition: UInt64 = 0
            let fileHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: path))
            
            // First, print the last few lines of the file
            if let lines = initialLines {
                let content = try String(contentsOfFile: path, encoding: .utf8)
                let allLines = content.components(separatedBy: .newlines)
                let startIndex = max(0, allLines.count - lines)
                let lastLines = Array(allLines[startIndex...])
                print(lastLines.joined(separator: "\n"))
            }
            
            // Get current file size
            lastPosition = UInt64(try fileManager.attributesOfItem(atPath: path)[.size] as? UInt64 ?? 0)
            
            // Set up for continuous monitoring
            print("\nTailing log file... Press Ctrl+C to stop")
            
            // Monitor file for changes
            while true {
                // Brief pause to reduce CPU usage
                Thread.sleep(forTimeInterval: 0.5)
                
                // Get current size
                let currentSize = try fileManager.attributesOfItem(atPath: path)[.size] as? UInt64 ?? 0
                
                // If file has grown
                if currentSize > lastPosition {
                    // Seek to where we last read
                    fileHandle.seek(toFileOffset: lastPosition)
                    
                    // Read new content
                    if let newData = try? fileHandle.readToEnd() {
                        if let newContent = String(data: newData, encoding: .utf8) {
                            // Print new content without trailing newline
                            if newContent.hasSuffix("\n") {
                                print(newContent, terminator: "")
                            } else {
                                print(newContent)
                            }
                        }
                    }
                    
                    // Update position
                    lastPosition = currentSize
                }
                
                // Handle file rotation (if file became smaller)
                else if currentSize < lastPosition {
                    // File was probably rotated, start from beginning
                    lastPosition = 0
                    fileHandle.seek(toFileOffset: 0)
                    
                    if let newData = try? fileHandle.readToEnd() {
                        if let newContent = String(data: newData, encoding: .utf8) {
                            print(newContent, terminator: "")
                        }
                    }
                    
                    lastPosition = currentSize
                }
            }
        } catch {
            print("Error tailing log file: \(error.localizedDescription)")
        }
    }
    
    // MARK: - Info Logs Subcommand
    
    struct Info: ParsableCommand {
        static let configuration = CommandConfiguration(
            commandName: "info",
            abstract: "View info logs from the daemon"
        )
        
        @Option(name: .shortAndLong, help: "Number of lines to display from the end of the file")
        var lines: Int?
        
        @Flag(name: .shortAndLong, help: "Follow log file continuously (like tail -f)")
        var follow: Bool = false
        
        func run() throws {
            let logPath = "/tmp/lume_daemon.log"
            
            print("=== Info Logs ===")
            
            if follow {
                // Use tailing functionality to continuously monitor the log
                Logs.tailLogFile(path: logPath, initialLines: lines ?? 10)
            } else {
                // Regular one-time viewing of logs
                let content = Logs.readLogFile(path: logPath, lines: lines)
                print(content)
            }
        }
    }
    
    // MARK: - Error Logs Subcommand
    
    struct Error: ParsableCommand {
        static let configuration = CommandConfiguration(
            commandName: "error",
            abstract: "View error logs from the daemon"
        )
        
        @Option(name: .shortAndLong, help: "Number of lines to display from the end of the file")
        var lines: Int?
        
        @Flag(name: .shortAndLong, help: "Follow log file continuously (like tail -f)")
        var follow: Bool = false
        
        func run() throws {
            let logPath = "/tmp/lume_daemon.error.log"
            
            print("=== Error Logs ===")
            
            if follow {
                // Use tailing functionality to continuously monitor the log
                Logs.tailLogFile(path: logPath, initialLines: lines ?? 10)
            } else {
                // Regular one-time viewing of logs
                let content = Logs.readLogFile(path: logPath, lines: lines)
                print(content)
            }
        }
    }
    
    // MARK: - All Logs Subcommand
    
    struct All: ParsableCommand {
        static let configuration = CommandConfiguration(
            commandName: "all",
            abstract: "View both info and error logs from the daemon"
        )
        
        @Option(name: .shortAndLong, help: "Number of lines to display from the end of each file")
        var lines: Int?
        
        @Flag(name: .shortAndLong, help: "Follow log files continuously (like tail -f)")
        var follow: Bool = false
        
        // Custom implementation to tail both logs simultaneously
        private func tailBothLogs(infoPath: String, errorPath: String, initialLines: Int? = 10) {
            let fileManager = FileManager.default
            var infoExists = fileManager.fileExists(atPath: infoPath)
            var errorExists = fileManager.fileExists(atPath: errorPath)
            
            if !infoExists && !errorExists {
                print("Neither info nor error log files found")
                return
            }
            
            // Print initial content
            print("=== Info Logs ===")
            if infoExists {
                if let lines = initialLines {
                    let content = (try? String(contentsOfFile: infoPath, encoding: .utf8)) ?? ""
                    let allLines = content.components(separatedBy: .newlines)
                    let startIndex = max(0, allLines.count - lines)
                    let lastLines = Array(allLines[startIndex...])
                    print(lastLines.joined(separator: "\n"))
                }
            } else {
                print("Info log file not found")
            }
            
            print("\n=== Error Logs ===")
            if errorExists {
                if let lines = initialLines {
                    let content = (try? String(contentsOfFile: errorPath, encoding: .utf8)) ?? ""
                    let allLines = content.components(separatedBy: .newlines)
                    let startIndex = max(0, allLines.count - lines)
                    let lastLines = Array(allLines[startIndex...])
                    print(lastLines.joined(separator: "\n"))
                }
            } else {
                print("Error log file not found")
            }
            
            print("\nTailing both log files... Press Ctrl+C to stop")
            
            // Initialize file handles and positions
            var infoHandle: FileHandle? = nil
            var errorHandle: FileHandle? = nil
            var infoPosition: UInt64 = 0
            var errorPosition: UInt64 = 0
            
            // Set up file handles
            if infoExists {
                do {
                    infoHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: infoPath))
                    infoPosition = UInt64(try fileManager.attributesOfItem(atPath: infoPath)[.size] as? UInt64 ?? 0)
                } catch {
                    print("Error opening info log file: \(error.localizedDescription)")
                }
            }
            
            if errorExists {
                do {
                    errorHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: errorPath))
                    errorPosition = UInt64(try fileManager.attributesOfItem(atPath: errorPath)[.size] as? UInt64 ?? 0)
                } catch {
                    print("Error opening error log file: \(error.localizedDescription)")
                }
            }
            
            // Monitor both files for changes
            while true {
                Thread.sleep(forTimeInterval: 0.5)
                
                // Check for new content in info log
                if let handle = infoHandle {
                    do {
                        // Re-check existence in case file was deleted
                        infoExists = fileManager.fileExists(atPath: infoPath)
                        if !infoExists {
                            print("\n[Info log file was removed]")
                            infoHandle = nil
                            continue
                        }
                        
                        let currentSize = try fileManager.attributesOfItem(atPath: infoPath)[.size] as? UInt64 ?? 0
                        
                        if currentSize > infoPosition {
                            handle.seek(toFileOffset: infoPosition)
                            if let newData = try? handle.readToEnd() {
                                if let newContent = String(data: newData, encoding: .utf8) {
                                    print("\n--- New Info Log Content ---")
                                    if newContent.hasSuffix("\n") {
                                        print(newContent, terminator: "")
                                    } else {
                                        print(newContent)
                                    }
                                }
                            }
                            infoPosition = currentSize
                        } else if currentSize < infoPosition {
                            // File was rotated
                            print("\n[Info log was rotated]")
                            infoPosition = 0
                            handle.seek(toFileOffset: 0)
                            if let newData = try? handle.readToEnd() {
                                if let newContent = String(data: newData, encoding: .utf8) {
                                    print("\n--- New Info Log Content ---")
                                    print(newContent, terminator: "")
                                }
                            }
                            infoPosition = currentSize
                        }
                    } catch {
                        print("\nError reading info log: \(error.localizedDescription)")
                    }
                } else if fileManager.fileExists(atPath: infoPath) && !infoExists {
                    // File exists again after being deleted
                    do {
                        infoHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: infoPath))
                        infoPosition = 0
                        infoExists = true
                        print("\n[Info log file reappeared]")
                    } catch {
                        print("\nError reopening info log: \(error.localizedDescription)")
                    }
                }
                
                // Check for new content in error log
                if let handle = errorHandle {
                    do {
                        // Re-check existence in case file was deleted
                        errorExists = fileManager.fileExists(atPath: errorPath)
                        if !errorExists {
                            print("\n[Error log file was removed]")
                            errorHandle = nil
                            continue
                        }
                        
                        let currentSize = try fileManager.attributesOfItem(atPath: errorPath)[.size] as? UInt64 ?? 0
                        
                        if currentSize > errorPosition {
                            handle.seek(toFileOffset: errorPosition)
                            if let newData = try? handle.readToEnd() {
                                if let newContent = String(data: newData, encoding: .utf8) {
                                    print("\n--- New Error Log Content ---")
                                    if newContent.hasSuffix("\n") {
                                        print(newContent, terminator: "")
                                    } else {
                                        print(newContent)
                                    }
                                }
                            }
                            errorPosition = currentSize
                        } else if currentSize < errorPosition {
                            // File was rotated
                            print("\n[Error log was rotated]")
                            errorPosition = 0
                            handle.seek(toFileOffset: 0)
                            if let newData = try? handle.readToEnd() {
                                if let newContent = String(data: newData, encoding: .utf8) {
                                    print("\n--- New Error Log Content ---")
                                    print(newContent, terminator: "")
                                }
                            }
                            errorPosition = currentSize
                        }
                    } catch {
                        print("\nError reading error log: \(error.localizedDescription)")
                    }
                } else if fileManager.fileExists(atPath: errorPath) && !errorExists {
                    // File exists again after being deleted
                    do {
                        errorHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: errorPath))
                        errorPosition = 0
                        errorExists = true
                        print("\n[Error log file reappeared]")
                    } catch {
                        print("\nError reopening error log: \(error.localizedDescription)")
                    }
                }
            }
        }
        
        func run() throws {
            let infoLogPath = "/tmp/lume_daemon.log"
            let errorLogPath = "/tmp/lume_daemon.error.log"
            
            if follow {
                // Use custom tailing implementation for both logs
                tailBothLogs(infoPath: infoLogPath, errorPath: errorLogPath, initialLines: lines ?? 10)
            } else {
                // Regular one-time viewing of logs
                let infoContent = Logs.readLogFile(path: infoLogPath, lines: lines)
                let errorContent = Logs.readLogFile(path: errorLogPath, lines: lines)
                
                print("=== Info Logs ===")
                print(infoContent)
                print("\n=== Error Logs ===")
                print(errorContent)
            }
        }
    }
}

```

--------------------------------------------------------------------------------
/examples/som_examples.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Example script demonstrating the usage of OmniParser's UI element detection functionality.
This script shows how to:
1. Initialize the OmniParser
2. Load and process images
3. Visualize detection results
4. Compare performance between CPU and MPS (Apple Silicon)
"""

import argparse
import logging
import sys
from pathlib import Path
import time
from PIL import Image
from typing import Dict, Any, List, Optional
import numpy as np
import io
import base64
import glob
import os

# Load environment variables from .env file
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
print(f"Loading environment from: {env_file}")
from dotenv import load_dotenv

load_dotenv(env_file)

# Add paths to sys.path if needed
pythonpath = os.environ.get("PYTHONPATH", "")
for path in pythonpath.split(":"):
    if path and path not in sys.path:
        sys.path.append(path)
        print(f"Added to sys.path: {path}")

# Add the libs directory to the path to find som
libs_path = project_root / "libs"
if str(libs_path) not in sys.path:
    sys.path.append(str(libs_path))
    print(f"Added to sys.path: {libs_path}")

from som import OmniParser, ParseResult, IconElement, TextElement
from som.models import UIElement, ParserMetadata, BoundingBox

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)


def setup_logging():
    """Configure logging with a nice format."""
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )


class Timer:
    """Enhanced context manager for timing code blocks."""

    def __init__(self, name: str, logger):
        self.name = name
        self.logger = logger
        self.start_time: float = 0.0
        self.elapsed_time: float = 0.0

    def __enter__(self):
        self.start_time = time.time()
        return self

    def __exit__(self, *args):
        self.elapsed_time = time.time() - self.start_time
        self.logger.info(f"{self.name}: {self.elapsed_time:.3f}s")
        return False


def image_to_bytes(image: Image.Image) -> bytes:
    """Convert PIL Image to PNG bytes."""
    buf = io.BytesIO()
    image.save(buf, format="PNG")
    return buf.getvalue()


def process_image(
    parser: OmniParser, image_path: str, output_dir: Path, use_ocr: bool = False
) -> None:
    """Process a single image and save the result."""
    try:
        # Load image
        logger.info(f"Processing image: {image_path}")
        image = Image.open(image_path).convert("RGB")
        logger.info(f"Image loaded successfully, size: {image.size}")

        # Create output filename
        input_filename = Path(image_path).stem
        output_path = output_dir / f"{input_filename}_analyzed.png"

        # Convert image to PNG bytes
        image_bytes = image_to_bytes(image)

        # Process image
        with Timer(f"Processing {input_filename}", logger):
            result = parser.parse(image_bytes, use_ocr=use_ocr)
            logger.info(
                f"Found {result.metadata.num_icons} icons and {result.metadata.num_text} text elements"
            )

            # Save the annotated image
            logger.info(f"Saving annotated image to: {output_path}")
            try:
                # Save image from base64
                img_data = base64.b64decode(result.annotated_image_base64)
                img = Image.open(io.BytesIO(img_data))
                img.save(output_path)

                # Print detailed results
                logger.info("\nDetected Elements:")
                for elem in result.elements:
                    if isinstance(elem, IconElement):
                        logger.info(
                            f"Icon: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
                        )
                    elif isinstance(elem, TextElement):
                        logger.info(
                            f"Text: '{elem.content}', confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
                        )

                # Verify file exists and log size
                if output_path.exists():
                    logger.info(
                        f"Successfully saved image. File size: {output_path.stat().st_size} bytes"
                    )
                else:
                    logger.error(f"Failed to verify file at {output_path}")
            except Exception as e:
                logger.error(f"Error saving image: {str(e)}", exc_info=True)

    except Exception as e:
        logger.error(f"Error processing image {image_path}: {str(e)}", exc_info=True)


def run_detection_benchmark(
    input_path: str,
    output_dir: Path,
    use_ocr: bool = False,
    box_threshold: float = 0.01,
    iou_threshold: float = 0.1,
):
    """Run detection benchmark on images."""
    logger.info(
        f"Starting benchmark with OCR enabled: {use_ocr}, box_threshold: {box_threshold}, iou_threshold: {iou_threshold}"
    )

    try:
        # Initialize parser
        logger.info("Initializing OmniParser...")
        parser = OmniParser()

        # Create output directory
        output_dir.mkdir(parents=True, exist_ok=True)
        logger.info(f"Output directory created at: {output_dir}")

        # Get list of PNG files
        if os.path.isdir(input_path):
            image_files = glob.glob(os.path.join(input_path, "*.png"))
        else:
            image_files = [input_path]

        logger.info(f"Found {len(image_files)} images to process")

        # Process each image with specified thresholds
        for image_path in image_files:
            try:
                # Load image
                logger.info(f"Processing image: {image_path}")
                image = Image.open(image_path).convert("RGB")
                logger.info(f"Image loaded successfully, size: {image.size}")

                # Create output filename
                input_filename = Path(image_path).stem
                output_path = output_dir / f"{input_filename}_analyzed.png"

                # Convert image to PNG bytes
                image_bytes = image_to_bytes(image)

                # Process image with specified thresholds
                with Timer(f"Processing {input_filename}", logger):
                    result = parser.parse(
                        image_bytes,
                        use_ocr=use_ocr,
                        box_threshold=box_threshold,
                        iou_threshold=iou_threshold,
                    )
                    logger.info(
                        f"Found {result.metadata.num_icons} icons and {result.metadata.num_text} text elements"
                    )

                    # Save the annotated image
                    logger.info(f"Saving annotated image to: {output_path}")
                    try:
                        # Save image from base64
                        img_data = base64.b64decode(result.annotated_image_base64)
                        img = Image.open(io.BytesIO(img_data))
                        img.save(output_path)

                        # Print detailed results
                        logger.info("\nDetected Elements:")
                        for elem in result.elements:
                            if isinstance(elem, IconElement):
                                logger.info(
                                    f"Icon: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
                                )
                            elif isinstance(elem, TextElement):
                                logger.info(
                                    f"Text: '{elem.content}', confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
                                )

                        # Verify file exists and log size
                        if output_path.exists():
                            logger.info(
                                f"Successfully saved image. File size: {output_path.stat().st_size} bytes"
                            )
                        else:
                            logger.error(f"Failed to verify file at {output_path}")
                    except Exception as e:
                        logger.error(f"Error saving image: {str(e)}", exc_info=True)

            except Exception as e:
                logger.error(f"Error processing image {image_path}: {str(e)}", exc_info=True)

    except Exception as e:
        logger.error(f"Benchmark failed: {str(e)}", exc_info=True)
        raise


def run_experiments(input_path: str, output_dir: Path, use_ocr: bool = False):
    """Run experiments with different threshold combinations."""
    # Define threshold values to test
    box_thresholds = [0.01, 0.05, 0.1, 0.3]
    iou_thresholds = [0.05, 0.1, 0.2, 0.5]

    logger.info("Starting threshold experiments...")
    logger.info("Box thresholds to test: %s", box_thresholds)
    logger.info("IOU thresholds to test: %s", iou_thresholds)

    # Create results directory for this experiment
    timestamp = time.strftime("%Y%m%d-%H%M%S")
    ocr_suffix = "_ocr" if use_ocr else "_no_ocr"
    exp_dir = output_dir / f"experiment_{timestamp}{ocr_suffix}"
    exp_dir.mkdir(parents=True, exist_ok=True)

    # Create a summary file
    summary_file = exp_dir / "results_summary.txt"
    with open(summary_file, "w") as f:
        f.write("Threshold Experiments Results\n")
        f.write("==========================\n\n")
        f.write(f"Input: {input_path}\n")
        f.write(f"OCR Enabled: {use_ocr}\n")
        f.write(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
        f.write("Results:\n")
        f.write("-" * 80 + "\n")
        f.write(
            f"{'Box Thresh':^10} | {'IOU Thresh':^10} | {'Num Icons':^10} | {'Num Text':^10} | {'Time (s)':^10}\n"
        )
        f.write("-" * 80 + "\n")

        # Initialize parser once for all experiments
        parser = OmniParser()

        # Run experiments with each combination
        for box_thresh in box_thresholds:
            for iou_thresh in iou_thresholds:
                logger.info(f"\nTesting box_threshold={box_thresh}, iou_threshold={iou_thresh}")

                # Create directory for this combination
                combo_dir = exp_dir / f"box_{box_thresh}_iou_{iou_thresh}"
                combo_dir.mkdir(exist_ok=True)

                try:
                    # Process each image
                    if os.path.isdir(input_path):
                        image_files = glob.glob(os.path.join(input_path, "*.png"))
                    else:
                        image_files = [input_path]

                    total_icons = 0
                    total_text = 0
                    total_time = 0

                    for image_path in image_files:
                        # Load and process image
                        image = Image.open(image_path).convert("RGB")
                        image_bytes = image_to_bytes(image)

                        # Process with current thresholds
                        with Timer(f"Processing {Path(image_path).stem}", logger) as t:
                            result = parser.parse(
                                image_bytes,
                                use_ocr=use_ocr,
                                box_threshold=box_thresh,
                                iou_threshold=iou_thresh,
                            )

                            # Save annotated image
                            output_path = combo_dir / f"{Path(image_path).stem}_analyzed.png"
                            img_data = base64.b64decode(result.annotated_image_base64)
                            img = Image.open(io.BytesIO(img_data))
                            img.save(output_path)

                            # Update totals
                            total_icons += result.metadata.num_icons
                            total_text += result.metadata.num_text

                            # Log detailed results
                            detail_file = combo_dir / f"{Path(image_path).stem}_details.txt"
                            with open(detail_file, "w") as detail_f:
                                detail_f.write(f"Results for {Path(image_path).name}\n")
                                detail_f.write("-" * 40 + "\n")
                                detail_f.write(f"Number of icons: {result.metadata.num_icons}\n")
                                detail_f.write(
                                    f"Number of text elements: {result.metadata.num_text}\n\n"
                                )

                                detail_f.write("Icon Detections:\n")
                                icon_count = 1
                                text_count = (
                                    result.metadata.num_icons + 1
                                )  # Text boxes start after icons

                                # First list all icons
                                for elem in result.elements:
                                    if isinstance(elem, IconElement):
                                        detail_f.write(f"Box #{icon_count}: Icon\n")
                                        detail_f.write(f"  - Confidence: {elem.confidence:.3f}\n")
                                        detail_f.write(
                                            f"  - Coordinates: {elem.bbox.coordinates}\n"
                                        )
                                        icon_count += 1

                                if use_ocr:
                                    detail_f.write("\nText Detections:\n")
                                    for elem in result.elements:
                                        if isinstance(elem, TextElement):
                                            detail_f.write(f"Box #{text_count}: Text\n")
                                            detail_f.write(f"  - Content: '{elem.content}'\n")
                                            detail_f.write(
                                                f"  - Confidence: {elem.confidence:.3f}\n"
                                            )
                                            detail_f.write(
                                                f"  - Coordinates: {elem.bbox.coordinates}\n"
                                            )
                                            text_count += 1

                        # Update timing totals
                        total_time += t.elapsed_time
                        
                    # Write summary for this combination
                    avg_time = total_time / len(image_files)
                    f.write(
                        f"{box_thresh:^10.3f} | {iou_thresh:^10.3f} | {total_icons:^10d} | {total_text:^10d} | {avg_time:^10.3f}\n"
                    )

                except Exception as e:
                    logger.error(
                        f"Error in experiment box={box_thresh}, iou={iou_thresh}: {str(e)}"
                    )
                    f.write(
                        f"{box_thresh:^10.3f} | {iou_thresh:^10.3f} | {'ERROR':^10s} | {'ERROR':^10s} | {'ERROR':^10s}\n"
                    )

        # Write summary footer
        f.write("-" * 80 + "\n")
        f.write("\nExperiment completed successfully!\n")

    logger.info(f"\nExperiment results saved to {exp_dir}")
    logger.info(f"Summary file: {summary_file}")


def main():
    """Main entry point."""
    parser = argparse.ArgumentParser(description="Run OmniParser benchmark")
    parser.add_argument("input_path", help="Path to input image or directory containing images")
    parser.add_argument(
        "--output-dir", default="examples/output", help="Output directory for annotated images"
    )
    parser.add_argument(
        "--ocr",
        choices=["none", "easyocr"],
        default="none",
        help="OCR engine to use (default: none)",
    )
    parser.add_argument(
        "--mode",
        choices=["single", "experiment"],
        default="single",
        help="Run mode: single run or threshold experiments (default: single)",
    )
    parser.add_argument(
        "--box-threshold",
        type=float,
        default=0.01,
        help="Confidence threshold for detection (default: 0.01)",
    )
    parser.add_argument(
        "--iou-threshold",
        type=float,
        default=0.1,
        help="IOU threshold for Non-Maximum Suppression (default: 0.1)",
    )
    args = parser.parse_args()

    logger.info(f"Starting OmniParser with arguments: {args}")
    use_ocr = args.ocr != "none"
    output_dir = Path(args.output_dir)

    try:
        if args.mode == "experiment":
            run_experiments(args.input_path, output_dir, use_ocr)
        else:
            run_detection_benchmark(
                args.input_path, output_dir, use_ocr, args.box_threshold, args.iou_threshold
            )
    except Exception as e:
        logger.error(f"Process failed: {str(e)}", exc_info=True)
        return 1

    return 0


if __name__ == "__main__":
    sys.exit(main())

```

--------------------------------------------------------------------------------
/libs/python/som/som/detect.py:
--------------------------------------------------------------------------------

```python
from pathlib import Path
from typing import Union, List, Dict, Any, Tuple, Optional, cast
import logging
import torch
import torchvision.ops
import cv2
import numpy as np
import time
import torchvision.transforms as T
from PIL import Image
import io
import base64
import argparse
import signal
from contextlib import contextmanager

from ultralytics import YOLO
from huggingface_hub import hf_hub_download
import supervision as sv
from supervision.detection.core import Detections

from .detection import DetectionProcessor
from .ocr import OCRProcessor
from .visualization import BoxAnnotator
from .models import BoundingBox, UIElement, IconElement, TextElement, ParserMetadata, ParseResult

logger = logging.getLogger(__name__)


class TimeoutException(Exception):
    pass


@contextmanager
def timeout(seconds: int):
    def timeout_handler(signum, frame):
        raise TimeoutException("OCR process timed out")

    # Register the signal handler
    original_handler = signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)

    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, original_handler)


def process_text_box(box, image):
    """Process a single text box with OCR."""
    try:
        import easyocr
        from typing import List, Tuple, Any, Sequence

        x1 = int(min(point[0] for point in box))
        y1 = int(min(point[1] for point in box))
        x2 = int(max(point[0] for point in box))
        y2 = int(max(point[1] for point in box))

        # Add padding
        pad = 2
        x1 = max(0, x1 - pad)
        y1 = max(0, y1 - pad)
        x2 = min(image.shape[1], x2 + pad)
        y2 = min(image.shape[0], y2 + pad)

        region = image[y1:y2, x1:x2]
        if region.size > 0:
            reader = easyocr.Reader(["en"])
            results = reader.readtext(region)
            if results and len(results) > 0:
                # EasyOCR returns a list of tuples (bbox, text, confidence)
                first_result = results[0]
                if isinstance(first_result, (list, tuple)) and len(first_result) >= 3:
                    text = str(first_result[1])
                    confidence = float(first_result[2])
                    if confidence > 0.5:
                        return text, [x1, y1, x2, y2], confidence
    except Exception:
        pass
    return None


def check_ocr_box(image_path: Union[str, Path]) -> Tuple[List[str], List[List[float]]]:
    """Check OCR box using EasyOCR."""
    # Read image once
    if isinstance(image_path, str):
        image_path = Path(image_path)

    # Read image into memory
    image_cv = cv2.imread(str(image_path))
    if image_cv is None:
        logger.error(f"Failed to read image: {image_path}")
        return [], []

    # Get image dimensions
    img_height, img_width = image_cv.shape[:2]
    confidence_threshold = 0.5

    # Use EasyOCR
    import ssl
    import easyocr

    # Create unverified SSL context for development
    ssl._create_default_https_context = ssl._create_unverified_context
    try:
        reader = easyocr.Reader(["en"])
        with timeout(5):  # 5 second timeout for EasyOCR
            results = reader.readtext(image_cv, paragraph=False, text_threshold=0.5)
    except TimeoutException:
        logger.warning("EasyOCR timed out, returning no results")
        return [], []
    except Exception as e:
        logger.warning(f"EasyOCR failed: {str(e)}")
        return [], []
    finally:
        # Restore default SSL context
        ssl._create_default_https_context = ssl.create_default_context

    texts = []
    boxes = []

    for box, text, conf in results:
        # Convert box format to [x1, y1, x2, y2]
        x1 = min(point[0] for point in box)
        y1 = min(point[1] for point in box)
        x2 = max(point[0] for point in box)
        y2 = max(point[1] for point in box)

        if float(conf) > 0.5:  # Only keep higher confidence detections
            texts.append(text)
            boxes.append([x1, y1, x2, y2])

    return texts, boxes


class OmniParser:
    """Enhanced UI parser using computer vision and OCR for detecting interactive elements."""

    def __init__(
        self,
        model_path: Optional[Union[str, Path]] = None,
        cache_dir: Optional[Union[str, Path]] = None,
        force_device: Optional[str] = None,
    ):
        """Initialize the OmniParser.

        Args:
            model_path: Optional path to the YOLO model
            cache_dir: Optional directory to cache model files
            force_device: Force specific device (cpu/cuda/mps)
        """
        self.detector = DetectionProcessor(
            model_path=Path(model_path) if model_path else None,
            cache_dir=Path(cache_dir) if cache_dir else None,
            force_device=force_device,
        )
        self.ocr = OCRProcessor()
        self.visualizer = BoxAnnotator()

    def process_image(
        self,
        image: Image.Image,
        box_threshold: float = 0.3,
        iou_threshold: float = 0.1,
        use_ocr: bool = True,
    ) -> Tuple[Image.Image, List[UIElement]]:
        """Process an image to detect UI elements and optionally text.

        Args:
            image: Input PIL Image
            box_threshold: Confidence threshold for detection
            iou_threshold: IOU threshold for NMS
            use_ocr: Whether to enable OCR processing

        Returns:
            Tuple of (annotated image, list of detections)
        """
        try:
            logger.info("Starting UI element detection...")

            # Detect icons
            icon_detections = self.detector.detect_icons(
                image=image, box_threshold=box_threshold, iou_threshold=iou_threshold
            )
            logger.info(f"Found {len(icon_detections)} interactive elements")

            # Convert icon detections to typed objects
            elements: List[UIElement] = cast(
                List[UIElement],
                [
                    IconElement(
                        id=i + 1,
                        bbox=BoundingBox(
                            x1=det["bbox"][0],
                            y1=det["bbox"][1],
                            x2=det["bbox"][2],
                            y2=det["bbox"][3],
                        ),
                        confidence=det["confidence"],
                        scale=det.get("scale"),
                    )
                    for i, det in enumerate(icon_detections)
                ],
            )

            # Run OCR if enabled
            if use_ocr:
                logger.info("Running OCR detection...")
                text_detections = self.ocr.detect_text(image=image, confidence_threshold=0.5)
                if text_detections is None:
                    text_detections = []
                logger.info(f"Found {len(text_detections)} text regions")

                # Convert text detections to typed objects
                text_elements = cast(
                    List[UIElement],
                    [
                        TextElement(
                            id=len(elements) + i + 1,
                            bbox=BoundingBox(
                                x1=det["bbox"][0],
                                y1=det["bbox"][1],
                                x2=det["bbox"][2],
                                y2=det["bbox"][3],
                            ),
                            content=det["content"],
                            confidence=det["confidence"],
                        )
                        for i, det in enumerate(text_detections)
                    ],
                )
                
                if elements and text_elements:
                    # Filter out non-OCR elements that have OCR elements with center points colliding with them
                    filtered_elements = []
                    for elem in elements:  # elements at this point contains only non-OCR elements
                        should_keep = True
                        for text_elem in text_elements:
                            # Calculate center point of the text element
                            center_x = (text_elem.bbox.x1 + text_elem.bbox.x2) / 2
                            center_y = (text_elem.bbox.y1 + text_elem.bbox.y2) / 2
                            
                            # Check if this center point is inside the non-OCR element
                            if (center_x >= elem.bbox.x1 and center_x <= elem.bbox.x2 and 
                                center_y >= elem.bbox.y1 and center_y <= elem.bbox.y2):
                                should_keep = False
                                break
                        
                        if should_keep:
                            filtered_elements.append(elem)
                    elements = filtered_elements
                    
                    # Merge detections using NMS
                    all_elements = elements + text_elements
                    boxes = torch.tensor([elem.bbox.coordinates for elem in all_elements])
                    scores = torch.tensor([elem.confidence for elem in all_elements])
                    keep_indices = torchvision.ops.nms(boxes, scores, iou_threshold)
                    elements = [all_elements[i] for i in keep_indices]
                else:
                    # Just add text elements to the list if IOU doesn't need to be applied
                    elements.extend(text_elements)

            # Calculate drawing parameters based on image size
            box_overlay_ratio = max(image.size) / 3200
            draw_config = {
                "font_size": int(12 * box_overlay_ratio),
                "box_thickness": max(int(2 * box_overlay_ratio), 1),
                "text_padding": max(int(3 * box_overlay_ratio), 1),
            }

            # Convert elements back to dict format for visualization
            detection_dicts = [
                {
                    "type": elem.type,
                    "bbox": elem.bbox.coordinates,
                    "confidence": elem.confidence,
                    "content": elem.content if isinstance(elem, TextElement) else None,
                }
                for elem in elements
            ]

            # Create visualization
            logger.info("Creating visualization...")
            annotated_image = self.visualizer.draw_boxes(
                image=image.copy(), detections=detection_dicts, draw_config=draw_config
            )
            logger.info("Visualization complete")

            return annotated_image, elements

        except Exception as e:
            logger.error(f"Error in process_image: {str(e)}")
            import traceback

            logger.error(traceback.format_exc())
            raise

    def parse(
        self,
        screenshot_data: Union[bytes, str],
        box_threshold: float = 0.3,
        iou_threshold: float = 0.1,
        use_ocr: bool = True,
    ) -> ParseResult:
        """Parse a UI screenshot to detect interactive elements and text.

        Args:
            screenshot_data: Raw bytes or base64 string of the screenshot
            box_threshold: Confidence threshold for detection
            iou_threshold: IOU threshold for NMS
            use_ocr: Whether to enable OCR processing

        Returns:
            ParseResult object containing elements, annotated image, and metadata
        """
        try:
            start_time = time.time()

            # Convert input to PIL Image
            if isinstance(screenshot_data, str):
                screenshot_data = base64.b64decode(screenshot_data)
            image = Image.open(io.BytesIO(screenshot_data)).convert("RGB")

            # Process image
            annotated_image, elements = self.process_image(
                image=image,
                box_threshold=box_threshold,
                iou_threshold=iou_threshold,
                use_ocr=use_ocr,
            )

            # Convert annotated image to base64
            buffered = io.BytesIO()
            annotated_image.save(buffered, format="PNG")
            annotated_image_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")

            # Generate screen info text
            screen_info = []
            parsed_content_list = []

            # Set element IDs and generate human-readable descriptions
            for i, elem in enumerate(elements):
                # Set the ID (1-indexed)
                elem.id = i + 1

                if isinstance(elem, IconElement):
                    screen_info.append(
                        f"Box #{i+1}: Icon (confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates})"
                    )
                    parsed_content_list.append(
                        {
                            "id": i + 1,
                            "type": "icon",
                            "bbox": elem.bbox.coordinates,
                            "confidence": elem.confidence,
                            "content": None,
                        }
                    )
                elif isinstance(elem, TextElement):
                    screen_info.append(
                        f"Box #{i+1}: Text '{elem.content}' (confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates})"
                    )
                    parsed_content_list.append(
                        {
                            "id": i + 1,
                            "type": "text",
                            "bbox": elem.bbox.coordinates,
                            "confidence": elem.confidence,
                            "content": elem.content,
                        }
                    )

            # Calculate metadata
            latency = time.time() - start_time
            width, height = image.size

            # Create ParseResult object with enhanced properties
            result = ParseResult(
                elements=elements,
                annotated_image_base64=annotated_image_base64,
                screen_info=screen_info,
                parsed_content_list=parsed_content_list,
                metadata=ParserMetadata(
                    image_size=(width, height),
                    num_icons=len([e for e in elements if isinstance(e, IconElement)]),
                    num_text=len([e for e in elements if isinstance(e, TextElement)]),
                    device=self.detector.device,
                    ocr_enabled=use_ocr,
                    latency=latency,
                ),
            )

            # Return the ParseResult object directly
            return result

        except Exception as e:
            logger.error(f"Error in parse: {str(e)}")
            import traceback

            logger.error(traceback.format_exc())
            raise


def main():
    """Command line interface for UI element detection."""
    parser = argparse.ArgumentParser(description="Detect UI elements and text in images")
    parser.add_argument("image_path", help="Path to the input image")
    parser.add_argument("--model-path", help="Path to YOLO model")
    parser.add_argument(
        "--box-threshold", type=float, default=0.3, help="Box confidence threshold (default: 0.3)"
    )
    parser.add_argument(
        "--iou-threshold", type=float, default=0.1, help="IOU threshold (default: 0.1)"
    )
    parser.add_argument(
        "--ocr", action="store_true", default=True, help="Enable OCR processing (default: True)"
    )
    parser.add_argument("--output", help="Output path for annotated image")
    args = parser.parse_args()

    # Setup logging
    logging.basicConfig(level=logging.INFO)

    try:
        # Initialize parser
        parser = OmniParser(model_path=args.model_path)

        # Load and process image
        logger.info(f"Loading image from: {args.image_path}")
        image = Image.open(args.image_path).convert("RGB")
        logger.info(f"Image loaded successfully, size: {image.size}")

        # Process image
        annotated_image, elements = parser.process_image(
            image=image,
            box_threshold=args.box_threshold,
            iou_threshold=args.iou_threshold,
            use_ocr=args.ocr,
        )

        # Save output image
        output_path = args.output or str(
            Path(args.image_path).parent
            / f"{Path(args.image_path).stem}_analyzed{Path(args.image_path).suffix}"
        )
        logger.info(f"Saving annotated image to: {output_path}")

        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        annotated_image.save(output_path)
        logger.info(f"Image saved successfully to {output_path}")

        # Print detections
        logger.info("\nDetections:")
        for i, elem in enumerate(elements):
            if isinstance(elem, IconElement):
                logger.info(
                    f"Interactive element {i}: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
                )
            elif isinstance(elem, TextElement):
                logger.info(f"Text {i}: '{elem.content}', bbox={elem.bbox.coordinates}")

    except Exception as e:
        logger.error(f"Error processing image: {str(e)}")
        import traceback

        logger.error(traceback.format_exc())
        return 1

    return 0


if __name__ == "__main__":
    import sys

    sys.exit(main())

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/cli.py:
--------------------------------------------------------------------------------

```python
"""
CLI chat interface for agent - Computer Use Agent

Usage:
    python -m agent.cli <model_string>
    
Examples:
    python -m agent.cli openai/computer-use-preview
    python -m agent.cli anthropic/claude-3-5-sonnet-20241022
    python -m agent.cli omniparser+anthropic/claude-3-5-sonnet-20241022
"""

try:
    import asyncio
    import argparse
    import os
    import sys
    import json
    from typing import List, Dict, Any
    import dotenv
    import base64
    import time
    import platform
    from pathlib import Path
    try:
        from PIL import Image, ImageDraw
        PIL_AVAILABLE = True
    except Exception:
        PIL_AVAILABLE = False
    from yaspin import yaspin
except ImportError:
    if __name__ == "__main__":
        raise ImportError(
            "CLI dependencies not found. "
            "Please install with: pip install \"cua-agent[cli]\""
        )

# Load environment variables
dotenv.load_dotenv()

# Color codes for terminal output
class Colors:
    RESET = '\033[0m'
    BOLD = '\033[1m'
    DIM = '\033[2m'
    
    # Text colors
    RED = '\033[31m'
    GREEN = '\033[32m'
    YELLOW = '\033[33m'
    BLUE = '\033[34m'
    MAGENTA = '\033[35m'
    CYAN = '\033[36m'
    WHITE = '\033[37m'
    GRAY = '\033[90m'
    
    # Background colors
    BG_RED = '\033[41m'
    BG_GREEN = '\033[42m'
    BG_YELLOW = '\033[43m'
    BG_BLUE = '\033[44m'

def print_colored(text: str, color: str = "", bold: bool = False, dim: bool = False, end: str = "\n", right: str = ""):
    """Print colored text to terminal with optional right-aligned text."""
    prefix = ""
    if bold:
        prefix += Colors.BOLD
    if dim:
        prefix += Colors.DIM
    if color:
        prefix += color
    
    if right:
        # Get terminal width (default to 80 if unable to determine)
        try:
            import shutil
            terminal_width = shutil.get_terminal_size().columns
        except:
            terminal_width = 80

        # Add right margin
        terminal_width -= 1
        
        # Calculate padding needed
        # Account for ANSI escape codes not taking visual space
        visible_left_len = len(text)
        visible_right_len = len(right)
        padding = terminal_width - visible_left_len - visible_right_len
        
        if padding > 0:
            output = f"{prefix}{text}{' ' * padding}{right}{Colors.RESET}"
        else:
            # If not enough space, just put a single space between
            output = f"{prefix}{text} {right}{Colors.RESET}"
    else:
        output = f"{prefix}{text}{Colors.RESET}"
    
    print(output, end=end)


def print_action(action_type: str, details: Dict[str, Any], total_cost: float):
    """Print computer action with nice formatting."""
    # Format action details
    args_str = ""
    if action_type == "click" and "x" in details and "y" in details:
        args_str = f"_{details.get('button', 'left')}({details['x']}, {details['y']})"
    elif action_type == "type" and "text" in details:
        text = details["text"]
        if len(text) > 50:
            text = text[:47] + "..."
        args_str = f'("{text}")'
    elif action_type == "key" and "text" in details:
        args_str = f"('{details['text']}')"
    elif action_type == "scroll" and "x" in details and "y" in details:
        args_str = f"({details['x']}, {details['y']})"
    
    if total_cost > 0:
        print_colored(f"🛠️  {action_type}{args_str}", dim=True, right=f"💸 ${total_cost:.2f}")
    else:
        print_colored(f"🛠️  {action_type}{args_str}", dim=True)

def print_welcome(model: str, agent_loop: str, container_name: str):
    """Print welcome message."""
    print_colored(f"Connected to {container_name} ({model}, {agent_loop})")
    print_colored("Type 'exit' to quit.", dim=True)

async def ainput(prompt: str = ""):
    return await asyncio.to_thread(input, prompt)

async def chat_loop(agent, model: str, container_name: str, initial_prompt: str = "", show_usage: bool = True):
    """Main chat loop with the agent."""
    print_welcome(model, agent.agent_config_info.agent_class.__name__, container_name)
    
    history = []
    
    if initial_prompt:
        history.append({"role": "user", "content": initial_prompt})
    
    total_cost = 0

    while True:
        if len(history) == 0 or history[-1].get("role") != "user":
            # Get user input with prompt
            print_colored("> ", end="")
            user_input = await ainput()
            
            if user_input.lower() in ['exit', 'quit', 'q']:
                print_colored("\n👋 Goodbye!")
                break
                
            if not user_input:
                continue
                
            # Add user message to history
            history.append({"role": "user", "content": user_input})
        
        # Stream responses from the agent with spinner
        with yaspin(text="Thinking...", spinner="line", attrs=["dark"]) as spinner:
            spinner.hide()
            
            async for result in agent.run(history):
                # Add agent responses to history
                history.extend(result.get("output", []))

                if show_usage:
                    total_cost += result.get("usage", {}).get("response_cost", 0)
                
                # Process and display the output
                for item in result.get("output", []):
                    if item.get("type") == "message" and item.get("role") == "assistant":
                        # Display agent text response
                        content = item.get("content", [])
                        for content_part in content:
                            if content_part.get("text"):
                                text = content_part.get("text", "").strip()
                                if text:
                                    spinner.hide()
                                    print_colored(text)
                    
                    elif item.get("type") == "computer_call":
                        # Display computer action
                        action = item.get("action", {})
                        action_type = action.get("type", "")
                        if action_type:
                            spinner.hide()
                            print_action(action_type, action, total_cost)
                            spinner.text = f"Performing {action_type}..."
                            spinner.show()
                    
                    elif item.get("type") == "function_call":
                        # Display function call
                        function_name = item.get("name", "")
                        spinner.hide()
                        print_colored(f"🔧 Calling function: {function_name}", dim=True)
                        spinner.text = f"Calling {function_name}..."
                        spinner.show()
                    
                    elif item.get("type") == "function_call_output":
                        # Display function output (dimmed)
                        output = item.get("output", "")
                        if output and len(output.strip()) > 0:
                            spinner.hide()
                            print_colored(f"📤 {output}", dim=True)
            
            spinner.hide()
            if show_usage and total_cost > 0:
                print_colored(f"Total cost: ${total_cost:.2f}", dim=True)
        

async def main():
    """Main CLI function."""
    parser = argparse.ArgumentParser(
        description="CUA Agent CLI - Interactive computer use assistant",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python -m agent.cli openai/computer-use-preview
  python -m agent.cli anthropic/claude-3-5-sonnet-20241022
  python -m agent.cli omniparser+anthropic/claude-3-5-sonnet-20241022
  python -m agent.cli huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B
        """
    )
    
    parser.add_argument(
        "model",
        help="Model string (e.g., 'openai/computer-use-preview', 'anthropic/claude-3-5-sonnet-20241022')"
    )
    
    parser.add_argument(
        "--provider",
        choices=["cloud", "lume", "winsandbox", "docker"],
        default="cloud",
        help="Computer provider to use: cloud (default), lume, winsandbox, or docker"
    )
    
    parser.add_argument(
        "--images",
        type=int,
        default=3,
        help="Number of recent images to keep in context (default: 3)"
    )
    
    parser.add_argument(
        "--trajectory",
        action="store_true",
        help="Save trajectory for debugging"
    )
    
    parser.add_argument(
        "--budget",
        type=float,
        help="Maximum budget for the session (in dollars)"
    )
    
    parser.add_argument(
        "--verbose",
        action="store_true",
        help="Enable verbose logging"
    )

    parser.add_argument(
        "-p", "--prompt",
        type=str,
        help="Initial prompt to send to the agent. Leave blank for interactive mode."
    )

    parser.add_argument(
        "--prompt-file",
        type=Path,
        help="Path to a UTF-8 text file whose contents will be used as the initial prompt. If provided, overrides --prompt."
    )

    parser.add_argument(
        "--predict-click",
        dest="predict_click",
        type=str,
        help="Instruction for click prediction. If set, runs predict_click, draws crosshair on a fresh screenshot, saves and opens it."
    )

    parser.add_argument(
        "-c", "--cache",
        action="store_true",
        help="Tell the API to enable caching"
    )

    parser.add_argument(
        "-u", "--usage",
        action="store_true",
        help="Show total cost of the agent runs"
    )

    parser.add_argument(
        "-r", "--max-retries",
        type=int,
        default=3,
        help="Maximum number of retries for the LLM API calls"
    )
    
    args = parser.parse_args()
    
    # Check for required environment variables
    container_name = os.getenv("CUA_CONTAINER_NAME")
    cua_api_key = os.getenv("CUA_API_KEY")
    
    # Prompt for missing environment variables (container name always required)
    if not container_name:
        if args.provider == "cloud":
            print_colored("CUA_CONTAINER_NAME not set.", dim=True)
            print_colored("You can get a CUA container at https://www.trycua.com/", dim=True)
            container_name = input("Enter your CUA container name: ").strip()
            if not container_name:
                print_colored("❌ Container name is required.")
                sys.exit(1)
        else:
            container_name = "cli-sandbox"

    # Only require API key for cloud provider
    if args.provider == "cloud" and not cua_api_key:
        print_colored("CUA_API_KEY not set.", dim=True)
        cua_api_key = input("Enter your CUA API key: ").strip()
        if not cua_api_key:
            print_colored("❌ API key is required for cloud provider.")
            sys.exit(1)
    
    # Check for provider-specific API keys based on model
    provider_api_keys = {
        "openai/": "OPENAI_API_KEY",
        "anthropic/": "ANTHROPIC_API_KEY",
    }
    
    # Find matching provider and check for API key
    for prefix, env_var in provider_api_keys.items():
        if prefix in args.model:
            if not os.getenv(env_var):
                print_colored(f"{env_var} not set.", dim=True)
                api_key = input(f"Enter your {env_var.replace('_', ' ').title()}: ").strip()
                if not api_key:
                    print_colored(f"❌ {env_var.replace('_', ' ').title()} is required.")
                    sys.exit(1)
                # Set the environment variable for the session
                os.environ[env_var] = api_key
            break
    
    # Import here to avoid import errors if dependencies are missing
    try:
        from agent import ComputerAgent
        from computer import Computer
    except ImportError as e:
        print_colored(f"❌ Import error: {e}", Colors.RED, bold=True)
        print_colored("Make sure agent and computer libraries are installed.", Colors.YELLOW)
        sys.exit(1)
    
    # Resolve provider -> os_type, provider_type, api key requirement
    provider_map = {
        "cloud": ("linux", "cloud", True),
        "lume": ("macos", "lume", False),
        "winsandbox": ("windows", "winsandbox", False),
        "docker": ("linux", "docker", False),
    }
    os_type, provider_type, needs_api_key = provider_map[args.provider]

    computer_kwargs = {
        "os_type": os_type,
        "provider_type": provider_type,
        "name": container_name,
    }
    if needs_api_key:
        computer_kwargs["api_key"] = cua_api_key # type: ignore

    # Create computer instance
    async with Computer(**computer_kwargs) as computer: # type: ignore
        
        # Create agent
        agent_kwargs = {
            "model": args.model,
            "tools": [computer],
            "trust_remote_code": True, # needed for some local models (e.g., InternVL, OpenCUA)
            "verbosity": 20 if args.verbose else 30,  # DEBUG vs WARNING
            "max_retries": args.max_retries
        }

        if args.images > 0:
            agent_kwargs["only_n_most_recent_images"] = args.images
        
        if args.trajectory:
            agent_kwargs["trajectory_dir"] = "trajectories"
        
        if args.budget:
            agent_kwargs["max_trajectory_budget"] = {
                "max_budget": args.budget,
                "raise_error": True,
                "reset_after_each_run": False
            }

        if args.cache:
            agent_kwargs["use_prompt_caching"] = True
        
        agent = ComputerAgent(**agent_kwargs)
        
        # If predict-click mode is requested, run once and exit
        if args.predict_click:
            if not PIL_AVAILABLE:
                print_colored("❌ Pillow (PIL) is required for --predict-click visualization. Install with: pip install pillow", Colors.RED, bold=True)
                sys.exit(1)

            instruction = args.predict_click
            print_colored(f"Predicting click for: '{instruction}'", Colors.CYAN)

            # Take a fresh screenshot FIRST
            try:
                img_bytes = await computer.interface.screenshot()
            except Exception as e:
                print_colored(f"❌ Failed to take screenshot: {e}", Colors.RED, bold=True)
                sys.exit(1)

            # Encode screenshot to base64 for predict_click
            try:
                image_b64 = base64.b64encode(img_bytes).decode("utf-8")
            except Exception as e:
                print_colored(f"❌ Failed to encode screenshot: {e}", Colors.RED, bold=True)
                sys.exit(1)

            try:
                coords = await agent.predict_click(instruction, image_b64=image_b64)
            except Exception as e:
                print_colored(f"❌ predict_click failed: {e}", Colors.RED, bold=True)
                sys.exit(1)

            if not coords:
                print_colored("⚠️  No coordinates returned.", Colors.YELLOW)
                sys.exit(2)

            x, y = coords
            print_colored(f"✅ Predicted coordinates: ({x}, {y})", Colors.GREEN)

            try:
                from io import BytesIO
                with Image.open(BytesIO(img_bytes)) as img:
                    img = img.convert("RGB")
                    draw = ImageDraw.Draw(img)
                    # Draw crosshair
                    size = 12
                    color = (255, 0, 0)
                    draw.line([(x - size, y), (x + size, y)], fill=color, width=3)
                    draw.line([(x, y - size), (x, y + size)], fill=color, width=3)
                    # Optional small circle
                    r = 6
                    draw.ellipse([(x - r, y - r), (x + r, y + r)], outline=color, width=2)

                    out_path = Path.cwd() / f"predict_click_{int(time.time())}.png"
                    img.save(out_path)
                    print_colored(f"🖼️  Saved to {out_path}")

                    # Open the image with default viewer
                    try:
                        system = platform.system().lower()
                        if system == "windows":
                            os.startfile(str(out_path))  # type: ignore[attr-defined]
                        elif system == "darwin":
                            os.system(f"open \"{out_path}\"")
                        else:
                            os.system(f"xdg-open \"{out_path}\"")
                    except Exception:
                        pass
            except Exception as e:
                print_colored(f"❌ Failed to render/save screenshot: {e}", Colors.RED, bold=True)
                sys.exit(1)

            # Done
            sys.exit(0)

        # Resolve initial prompt from --prompt-file or --prompt
        initial_prompt = args.prompt or ""
        if args.prompt_file:
            try:
                initial_prompt = args.prompt_file.read_text(encoding="utf-8")
            except Exception as e:
                print_colored(f"❌ Failed to read --prompt-file: {e}", Colors.RED, bold=True)
                sys.exit(1)

        # Start chat loop (default interactive mode)
        await chat_loop(agent, args.model, container_name, initial_prompt, args.usage)



if __name__ == "__main__":
    try:
        asyncio.run(main())
    except (KeyboardInterrupt, EOFError) as _:
        print_colored("\n\n👋 Goodbye!")
```

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/moondream3.py:
--------------------------------------------------------------------------------

```python
"""
Moondream3+ composed-grounded agent loop implementation.
Grounding is handled by a local Moondream3 preview model via Transformers.
Thinking is delegated to the trailing LLM in the composed model string: "moondream3+<thinking_model>".

Differences from composed_grounded:
- Provides a singleton Moondream3 client outside the class.
- predict_click uses model.point(image, instruction, settings={"max_objects": 1}) and returns pixel coordinates.
- If the last image was a screenshot (or we take one), run model.detect(image, "all form ui") to get bboxes, then
  run model.caption on each cropped bbox to label it. Overlay labels on the screenshot and emit via _on_screenshot.
- Add a user message listing all detected form UI names so the thinker can reference them.
- If the thinking model doesn't support vision, filter out image content before calling litellm.
"""

from __future__ import annotations

import uuid
import base64
import io
from typing import Dict, List, Any, Optional, Tuple, Any

from PIL import Image, ImageDraw, ImageFont
import torch
from transformers import AutoModelForCausalLM
import litellm

from ..decorators import register_agent
from ..types import AgentCapability
from ..loops.base import AsyncAgentConfig
from ..responses import (
    convert_computer_calls_xy2desc,
    convert_responses_items_to_completion_messages,
    convert_completion_messages_to_responses_items,
    convert_computer_calls_desc2xy,
    get_all_element_descriptions,
)

_MOONDREAM_SINGLETON = None

def get_moondream_model() -> Any:
    """Get a singleton instance of the Moondream3 preview model."""
    global _MOONDREAM_SINGLETON
    if _MOONDREAM_SINGLETON is None:
        _MOONDREAM_SINGLETON = AutoModelForCausalLM.from_pretrained(
            "moondream/moondream3-preview",
            trust_remote_code=True,
            torch_dtype=torch.bfloat16,
            device_map="cuda",
        )
    return _MOONDREAM_SINGLETON


def _decode_image_b64(image_b64: str) -> Image.Image:
    data = base64.b64decode(image_b64)
    return Image.open(io.BytesIO(data)).convert("RGB")


def _image_to_b64(img: Image.Image) -> str:
    buf = io.BytesIO()
    img.save(buf, format="PNG")
    return base64.b64encode(buf.getvalue()).decode("utf-8")


def _supports_vision(model: str) -> bool:
    """Heuristic vision support detection for thinking model."""
    m = model.lower()
    vision_markers = [
        "gpt-4o",
        "gpt-4.1",
        "o1",
        "o3",
        "claude-3",
        "claude-3.5",
        "sonnet",
        "haiku",
        "opus",
        "gemini-1.5",
        "llava",
    ]
    return any(v in m for v in vision_markers)


def _filter_images_from_completion_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    filtered: List[Dict[str, Any]] = []
    for msg in messages:
        msg_copy = {**msg}
        content = msg_copy.get("content")
        if isinstance(content, list):
            msg_copy["content"] = [c for c in content if c.get("type") != "image_url"]
        filtered.append(msg_copy)
    return filtered

def _annotate_detect_and_label_ui(base_img: Image.Image, model_md) -> Tuple[str, List[str]]:
    """Detect UI elements with Moondream, caption each, draw labels with backgrounds.

    Args:
        base_img: PIL image of the screenshot (RGB or RGBA). Will be copied/converted internally.
        model_md: Moondream model instance with .detect() and .query() methods.

    Returns:
        A tuple of (annotated_image_base64_png, detected_names)
    """
    # Ensure RGBA for semi-transparent fills
    if base_img.mode != "RGBA":
        base_img = base_img.convert("RGBA")
    W, H = base_img.width, base_img.height

    # Detect objects
    try:
        detect_result = model_md.detect(base_img, "all ui elements")
        objects = detect_result.get("objects", []) if isinstance(detect_result, dict) else []
    except Exception:
        objects = []

    draw = ImageDraw.Draw(base_img)
    try:
        font = ImageFont.load_default()
    except Exception:
        font = None

    detected_names: List[str] = []

    for i, obj in enumerate(objects):
        try:
            # Clamp normalized coords and crop
            x_min = max(0.0, min(1.0, float(obj.get("x_min", 0.0))))
            y_min = max(0.0, min(1.0, float(obj.get("y_min", 0.0))))
            x_max = max(0.0, min(1.0, float(obj.get("x_max", 0.0))))
            y_max = max(0.0, min(1.0, float(obj.get("y_max", 0.0))))
            left, top, right, bottom = int(x_min * W), int(y_min * H), int(x_max * W), int(y_max * H)
            left, top = max(0, left), max(0, top)
            right, bottom = min(W - 1, right), min(H - 1, bottom)
            crop = base_img.crop((left, top, right, bottom))

            # Prompted short caption
            try:
                result = model_md.query(crop, "Caption this UI element in few words.")
                caption_text = (result or {}).get("answer", "")
            except Exception:
                caption_text = ""

            name = (caption_text or "").strip() or f"element_{i+1}"
            detected_names.append(name)

            # Draw bbox
            draw.rectangle([left, top, right, bottom], outline=(255, 215, 0, 255), width=2)

            # Label background with padding and rounded corners
            label = f"{i+1}. {name}"
            padding = 3
            if font:
                text_bbox = draw.textbbox((0, 0), label, font=font)
            else:
                text_bbox = draw.textbbox((0, 0), label)
            text_w = text_bbox[2] - text_bbox[0]
            text_h = text_bbox[3] - text_bbox[1]

            tx = left + 3
            ty = top - (text_h + 2 * padding + 4)
            if ty < 0:
                ty = top + 3

            bg_left = tx - padding
            bg_top = ty - padding
            bg_right = tx + text_w + padding
            bg_bottom = ty + text_h + padding
            try:
                draw.rounded_rectangle(
                    [bg_left, bg_top, bg_right, bg_bottom],
                    radius=4,
                    fill=(0, 0, 0, 160),
                    outline=(255, 215, 0, 200),
                    width=1,
                )
            except Exception:
                draw.rectangle(
                    [bg_left, bg_top, bg_right, bg_bottom],
                    fill=(0, 0, 0, 160),
                    outline=(255, 215, 0, 200),
                    width=1,
                )

            text_fill = (255, 255, 255, 255)
            if font:
                draw.text((tx, ty), label, fill=text_fill, font=font)
            else:
                draw.text((tx, ty), label, fill=text_fill)
        except Exception:
            continue

    # Encode PNG base64
    annotated = base_img
    if annotated.mode not in ("RGBA", "RGB"):
        annotated = annotated.convert("RGBA")
    annotated_b64 = _image_to_b64(annotated)
    return annotated_b64, detected_names

GROUNDED_COMPUTER_TOOL_SCHEMA = {
    "type": "function",
    "function": {
        "name": "computer",
        "description": (
            "Control a computer by taking screenshots and interacting with UI elements. "
            "The screenshot action will include a list of detected form UI element names when available. "
            "Use element descriptions to locate and interact with UI elements on the screen."
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "action": {
                    "type": "string",
                    "enum": [
                        "screenshot",
                        "click",
                        "double_click",
                        "drag",
                        "type",
                        "keypress",
                        "scroll",
                        "move",
                        "wait",
                        "get_current_url",
                        "get_dimensions",
                        "get_environment",
                    ],
                    "description": "The action to perform (required for all actions)",
                },
                "element_description": {
                    "type": "string",
                    "description": "Description of the element to interact with (required for click/double_click/move/scroll)",
                },
                "start_element_description": {
                    "type": "string",
                    "description": "Description of the element to start dragging from (required for drag)",
                },
                "end_element_description": {
                    "type": "string",
                    "description": "Description of the element to drag to (required for drag)",
                },
                "text": {
                    "type": "string",
                    "description": "The text to type (required for type)",
                },
                "keys": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "Key(s) to press (required for keypress)",
                },
                "button": {
                    "type": "string",
                    "enum": ["left", "right", "wheel", "back", "forward"],
                    "description": "The mouse button to use for click/double_click",
                },
                "scroll_x": {
                    "type": "integer",
                    "description": "Horizontal scroll amount (required for scroll)",
                },
                "scroll_y": {
                    "type": "integer",
                    "description": "Vertical scroll amount (required for scroll)",
                },
            },
            "required": ["action"],
        },
    },
}

@register_agent(r"moondream3\+.*", priority=2)
class Moondream3PlusConfig(AsyncAgentConfig):
    def __init__(self):
        self.desc2xy: Dict[str, Tuple[float, float]] = {}

    async def predict_step(
        self,
        messages: List[Dict[str, Any]],
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        use_prompt_caching: Optional[bool] = False,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs,
    ) -> Dict[str, Any]:
        # Parse composed model: moondream3+<thinking_model>
        if "+" not in model:
            raise ValueError(f"Composed model must be 'moondream3+<thinking_model>', got: {model}")
        _, thinking_model = model.split("+", 1)

        pre_output_items: List[Dict[str, Any]] = []

        # Acquire last screenshot; if missing, take one
        last_image_b64: Optional[str] = None
        for message in reversed(messages):
            if (
                isinstance(message, dict)
                and message.get("type") == "computer_call_output"
                and isinstance(message.get("output"), dict)
                and message["output"].get("type") == "input_image"
            ):
                image_url = message["output"].get("image_url", "")
                if image_url.startswith("data:image/png;base64,"):
                    last_image_b64 = image_url.split(",", 1)[1]
                    break

        if last_image_b64 is None and computer_handler is not None:
            # Take a screenshot
            screenshot_b64 = await computer_handler.screenshot()  # type: ignore
            if screenshot_b64:
                call_id = uuid.uuid4().hex
                pre_output_items += [
                    {
                        "type": "message",
                        "role": "assistant",
                        "content": [
                            {"type": "output_text", "text": "Taking a screenshot to analyze the current screen."}
                        ],
                    },
                    {"type": "computer_call", "call_id": call_id, "status": "completed", "action": {"type": "screenshot"}},
                    {
                        "type": "computer_call_output",
                        "call_id": call_id,
                        "output": {"type": "input_image", "image_url": f"data:image/png;base64,{screenshot_b64}"},
                    },
                ]
                last_image_b64 = screenshot_b64
                if _on_screenshot:
                    await _on_screenshot(screenshot_b64)

        # If we have a last screenshot, run Moondream detection and labeling
        detected_names: List[str] = []
        if last_image_b64 is not None:
            base_img = _decode_image_b64(last_image_b64)
            model_md = get_moondream_model()
            annotated_b64, detected_names = _annotate_detect_and_label_ui(base_img, model_md)
            if _on_screenshot:
                await _on_screenshot(annotated_b64, "annotated_form_ui")

            # Also push a user message listing all detected names
            if detected_names:
                names_text = "\n".join(f"- {n}" for n in detected_names)
                pre_output_items.append(
                    {
                        "type": "message",
                        "role": "user",
                        "content": [
                            {"type": "input_text", "text": "Detected form UI elements on screen:"},
                            {"type": "input_text", "text": names_text},
                            {"type": "input_text", "text": "Please continue with the next action needed to perform your task."}
                        ],
                    }
                )

        tool_schemas = []
        for schema in (tools or []):
            if schema.get("type") == "computer":
                tool_schemas.append(GROUNDED_COMPUTER_TOOL_SCHEMA)
            else:
                tool_schemas.append(schema)

        # Step 1: Convert computer calls from xy to descriptions
        input_messages = messages + pre_output_items
        messages_with_descriptions = convert_computer_calls_xy2desc(input_messages, self.desc2xy)

        # Step 2: Convert responses items to completion messages
        completion_messages = convert_responses_items_to_completion_messages(
            messages_with_descriptions,
            allow_images_in_tool_results=False,
        )

        # Optionally filter images if model lacks vision
        if not _supports_vision(thinking_model):
            completion_messages = _filter_images_from_completion_messages(completion_messages)

        # Step 3: Call thinking model with litellm.acompletion
        api_kwargs = {
            "model": thinking_model,
            "messages": completion_messages,
            "tools": tool_schemas,
            "max_retries": max_retries,
            "stream": stream,
            **kwargs,
        }
        if use_prompt_caching:
            api_kwargs["use_prompt_caching"] = use_prompt_caching

        if _on_api_start:
            await _on_api_start(api_kwargs)

        response = await litellm.acompletion(**api_kwargs)

        if _on_api_end:
            await _on_api_end(api_kwargs, response)

        usage = {
            **response.usage.model_dump(),  # type: ignore
            "response_cost": response._hidden_params.get("response_cost", 0.0),
        }
        if _on_usage:
            await _on_usage(usage)

        # Step 4: Convert completion messages back to responses items format
        response_dict = response.model_dump()  # type: ignore
        choice_messages = [choice["message"] for choice in response_dict["choices"]]
        thinking_output_items: List[Dict[str, Any]] = []
        for choice_message in choice_messages:
            thinking_output_items.extend(
                convert_completion_messages_to_responses_items([choice_message])
            )

        # Step 5: Use Moondream to get coordinates for each description
        element_descriptions = get_all_element_descriptions(thinking_output_items)
        if element_descriptions and last_image_b64:
            for desc in element_descriptions:
                for _ in range(3):  # try 3 times
                    coords = await self.predict_click(
                        model=model,
                        image_b64=last_image_b64,
                        instruction=desc,
                    )
                    if coords:
                        self.desc2xy[desc] = coords
                        break

        # Step 6: Convert computer calls from descriptions back to xy coordinates
        final_output_items = convert_computer_calls_desc2xy(thinking_output_items, self.desc2xy)

        # Step 7: Return output and usage
        return {"output": pre_output_items + final_output_items, "usage": usage}

    async def predict_click(
        self,
        model: str,
        image_b64: str,
        instruction: str,
        **kwargs,
    ) -> Optional[Tuple[float, float]]:
        """Predict click coordinates using Moondream3's point API.

        Returns pixel coordinates (x, y) as floats.
        """
        img = _decode_image_b64(image_b64)
        W, H = img.width, img.height
        model_md = get_moondream_model()
        try:
            result = model_md.point(img, instruction, settings={"max_objects": 1})
        except Exception:
            return None

        try:
            pt = (result or {}).get("points", [])[0]
            x_norm = float(pt.get("x", 0.0))
            y_norm = float(pt.get("y", 0.0))
            x_px = max(0.0, min(float(W - 1), x_norm * W))
            y_px = max(0.0, min(float(H - 1), y_norm * H))
            return (x_px, y_px)
        except Exception:
            return None

    def get_capabilities(self) -> List[AgentCapability]:
        return ["click", "step"]

```

--------------------------------------------------------------------------------
/libs/typescript/computer/src/interface/macos.ts:
--------------------------------------------------------------------------------

```typescript
/**
 * macOS computer interface implementation.
 */

import type { ScreenSize } from '../types';
import { BaseComputerInterface } from './base';
import type { AccessibilityNode, CursorPosition, MouseButton } from './base';

export class MacOSComputerInterface extends BaseComputerInterface {
  // Mouse Actions
  /**
   * Press and hold a mouse button at the specified coordinates.
   * @param {number} [x] - X coordinate for the mouse action
   * @param {number} [y] - Y coordinate for the mouse action
   * @param {MouseButton} [button='left'] - Mouse button to press down
   * @returns {Promise<void>}
   */
  async mouseDown(
    x?: number,
    y?: number,
    button: MouseButton = 'left'
  ): Promise<void> {
    await this.sendCommand('mouse_down', { x, y, button });
  }

  /**
   * Release a mouse button at the specified coordinates.
   * @param {number} [x] - X coordinate for the mouse action
   * @param {number} [y] - Y coordinate for the mouse action
   * @param {MouseButton} [button='left'] - Mouse button to release
   * @returns {Promise<void>}
   */
  async mouseUp(
    x?: number,
    y?: number,
    button: MouseButton = 'left'
  ): Promise<void> {
    await this.sendCommand('mouse_up', { x, y, button });
  }

  /**
   * Perform a left mouse click at the specified coordinates.
   * @param {number} [x] - X coordinate for the click
   * @param {number} [y] - Y coordinate for the click
   * @returns {Promise<void>}
   */
  async leftClick(x?: number, y?: number): Promise<void> {
    await this.sendCommand('left_click', { x, y });
  }

  /**
   * Perform a right mouse click at the specified coordinates.
   * @param {number} [x] - X coordinate for the click
   * @param {number} [y] - Y coordinate for the click
   * @returns {Promise<void>}
   */
  async rightClick(x?: number, y?: number): Promise<void> {
    await this.sendCommand('right_click', { x, y });
  }

  /**
   * Perform a double click at the specified coordinates.
   * @param {number} [x] - X coordinate for the double click
   * @param {number} [y] - Y coordinate for the double click
   * @returns {Promise<void>}
   */
  async doubleClick(x?: number, y?: number): Promise<void> {
    await this.sendCommand('double_click', { x, y });
  }

  /**
   * Move the cursor to the specified coordinates.
   * @param {number} x - X coordinate to move to
   * @param {number} y - Y coordinate to move to
   * @returns {Promise<void>}
   */
  async moveCursor(x: number, y: number): Promise<void> {
    await this.sendCommand('move_cursor', { x, y });
  }

  /**
   * Drag from current position to the specified coordinates.
   * @param {number} x - X coordinate to drag to
   * @param {number} y - Y coordinate to drag to
   * @param {MouseButton} [button='left'] - Mouse button to use for dragging
   * @param {number} [duration=0.5] - Duration of the drag operation in seconds
   * @returns {Promise<void>}
   */
  async dragTo(
    x: number,
    y: number,
    button: MouseButton = 'left',
    duration = 0.5
  ): Promise<void> {
    await this.sendCommand('drag_to', { x, y, button, duration });
  }

  /**
   * Drag along a path of coordinates.
   * @param {Array<[number, number]>} path - Array of [x, y] coordinate pairs to drag through
   * @param {MouseButton} [button='left'] - Mouse button to use for dragging
   * @param {number} [duration=0.5] - Duration of the drag operation in seconds
   * @returns {Promise<void>}
   */
  async drag(
    path: Array<[number, number]>,
    button: MouseButton = 'left',
    duration = 0.5
  ): Promise<void> {
    await this.sendCommand('drag', { path, button, duration });
  }

  // Keyboard Actions
  /**
   * Press and hold a key.
   * @param {string} key - Key to press down
   * @returns {Promise<void>}
   */
  async keyDown(key: string): Promise<void> {
    await this.sendCommand('key_down', { key });
  }

  /**
   * Release a key.
   * @param {string} key - Key to release
   * @returns {Promise<void>}
   */
  async keyUp(key: string): Promise<void> {
    await this.sendCommand('key_up', { key });
  }

  /**
   * Type text as if entered from keyboard.
   * @param {string} text - Text to type
   * @returns {Promise<void>}
   */
  async typeText(text: string): Promise<void> {
    await this.sendCommand('type_text', { text });
  }

  /**
   * Press and release a key.
   * @param {string} key - Key to press
   * @returns {Promise<void>}
   */
  async pressKey(key: string): Promise<void> {
    await this.sendCommand('press_key', { key });
  }

  /**
   * Press multiple keys simultaneously as a hotkey combination.
   * @param {...string} keys - Keys to press together
   * @returns {Promise<void>}
   */
  async hotkey(...keys: string[]): Promise<void> {
    await this.sendCommand('hotkey', { keys });
  }

  // Scrolling Actions
  /**
   * Scroll by the specified amount in x and y directions.
   * @param {number} x - Horizontal scroll amount
   * @param {number} y - Vertical scroll amount
   * @returns {Promise<void>}
   */
  async scroll(x: number, y: number): Promise<void> {
    await this.sendCommand('scroll', { x, y });
  }

  /**
   * Scroll down by the specified number of clicks.
   * @param {number} [clicks=1] - Number of scroll clicks
   * @returns {Promise<void>}
   */
  async scrollDown(clicks = 1): Promise<void> {
    await this.sendCommand('scroll_down', { clicks });
  }

  /**
   * Scroll up by the specified number of clicks.
   * @param {number} [clicks=1] - Number of scroll clicks
   * @returns {Promise<void>}
   */
  async scrollUp(clicks = 1): Promise<void> {
    await this.sendCommand('scroll_up', { clicks });
  }

  // Screen Actions
  /**
   * Take a screenshot of the screen.
   * @returns {Promise<Buffer>} Screenshot image data as a Buffer
   * @throws {Error} If screenshot fails
   */
  async screenshot(): Promise<Buffer> {
    const response = await this.sendCommand('screenshot');
    if (!response.image_data) {
      throw new Error('Failed to take screenshot');
    }
    return Buffer.from(response.image_data as string, 'base64');
  }

  /**
   * Get the current screen size.
   * @returns {Promise<ScreenSize>} Screen dimensions
   * @throws {Error} If unable to get screen size
   */
  async getScreenSize(): Promise<ScreenSize> {
    const response = await this.sendCommand('get_screen_size');
    if (!response.success || !response.size) {
      throw new Error('Failed to get screen size');
    }
    return response.size as ScreenSize;
  }

  /**
   * Get the current cursor position.
   * @returns {Promise<CursorPosition>} Current cursor coordinates
   * @throws {Error} If unable to get cursor position
   */
  async getCursorPosition(): Promise<CursorPosition> {
    const response = await this.sendCommand('get_cursor_position');
    if (!response.success || !response.position) {
      throw new Error('Failed to get cursor position');
    }
    return response.position as CursorPosition;
  }

  // Clipboard Actions
  /**
   * Copy current selection to clipboard and return the content.
   * @returns {Promise<string>} Clipboard content
   * @throws {Error} If unable to get clipboard content
   */
  async copyToClipboard(): Promise<string> {
    const response = await this.sendCommand('copy_to_clipboard');
    if (!response.success || !response.content) {
      throw new Error('Failed to get clipboard content');
    }
    return response.content as string;
  }

  /**
   * Set the clipboard content to the specified text.
   * @param {string} text - Text to set in clipboard
   * @returns {Promise<void>}
   */
  async setClipboard(text: string): Promise<void> {
    await this.sendCommand('set_clipboard', { text });
  }

  // File System Actions
  /**
   * Check if a file exists at the specified path.
   * @param {string} path - Path to the file
   * @returns {Promise<boolean>} True if file exists, false otherwise
   */
  async fileExists(path: string): Promise<boolean> {
    const response = await this.sendCommand('file_exists', { path });
    return (response.exists as boolean) || false;
  }

  /**
   * Check if a directory exists at the specified path.
   * @param {string} path - Path to the directory
   * @returns {Promise<boolean>} True if directory exists, false otherwise
   */
  async directoryExists(path: string): Promise<boolean> {
    const response = await this.sendCommand('directory_exists', { path });
    return (response.exists as boolean) || false;
  }

  /**
   * List the contents of a directory.
   * @param {string} path - Path to the directory
   * @returns {Promise<string[]>} Array of file and directory names
   * @throws {Error} If unable to list directory
   */
  async listDir(path: string): Promise<string[]> {
    const response = await this.sendCommand('list_dir', { path });
    if (!response.success) {
      throw new Error((response.error as string) || 'Failed to list directory');
    }
    return (response.files as string[]) || [];
  }

  /**
   * Get the size of a file in bytes.
   * @param {string} path - Path to the file
   * @returns {Promise<number>} File size in bytes
   * @throws {Error} If unable to get file size
   */
  async getFileSize(path: string): Promise<number> {
    const response = await this.sendCommand('get_file_size', { path });
    if (!response.success) {
      throw new Error((response.error as string) || 'Failed to get file size');
    }
    return (response.size as number) || 0;
  }

  /**
   * Read file content in chunks for large files.
   * @private
   * @param {string} path - Path to the file
   * @param {number} offset - Starting byte offset
   * @param {number} totalLength - Total number of bytes to read
   * @param {number} [chunkSize=1048576] - Size of each chunk in bytes
   * @returns {Promise<Buffer>} File content as Buffer
   * @throws {Error} If unable to read file chunk
   */
  private async readBytesChunked(
    path: string,
    offset: number,
    totalLength: number,
    chunkSize: number = 1024 * 1024
  ): Promise<Buffer> {
    const chunks: Buffer[] = [];
    let currentOffset = offset;
    let remaining = totalLength;

    while (remaining > 0) {
      const readSize = Math.min(chunkSize, remaining);
      const response = await this.sendCommand('read_bytes', {
        path,
        offset: currentOffset,
        length: readSize,
      });

      if (!response.success) {
        throw new Error(
          (response.error as string) || 'Failed to read file chunk'
        );
      }

      const chunkData = Buffer.from(response.content_b64 as string, 'base64');
      chunks.push(chunkData);

      currentOffset += readSize;
      remaining -= readSize;
    }

    return Buffer.concat(chunks);
  }

  /**
   * Write file content in chunks for large files.
   * @private
   * @param {string} path - Path to the file
   * @param {Buffer} content - Content to write
   * @param {boolean} [append=false] - Whether to append to existing file
   * @param {number} [chunkSize=1048576] - Size of each chunk in bytes
   * @returns {Promise<void>}
   * @throws {Error} If unable to write file chunk
   */
  private async writeBytesChunked(
    path: string,
    content: Buffer,
    append: boolean = false,
    chunkSize: number = 1024 * 1024
  ): Promise<void> {
    const totalSize = content.length;
    let currentOffset = 0;

    while (currentOffset < totalSize) {
      const chunkEnd = Math.min(currentOffset + chunkSize, totalSize);
      const chunkData = content.subarray(currentOffset, chunkEnd);

      // First chunk uses the original append flag, subsequent chunks always append
      const chunkAppend = currentOffset === 0 ? append : true;

      const response = await this.sendCommand('write_bytes', {
        path,
        content_b64: chunkData.toString('base64'),
        append: chunkAppend,
      });

      if (!response.success) {
        throw new Error(
          (response.error as string) || 'Failed to write file chunk'
        );
      }

      currentOffset = chunkEnd;
    }
  }

  /**
   * Read text from a file with specified encoding.
   * @param {string} path - Path to the file to read
   * @param {BufferEncoding} [encoding='utf8'] - Text encoding to use
   * @returns {Promise<string>} The decoded text content of the file
   */
  async readText(path: string, encoding: BufferEncoding = 'utf8'): Promise<string> {
    const contentBytes = await this.readBytes(path);
    return contentBytes.toString(encoding);
  }

  /**
   * Write text to a file with specified encoding.
   * @param {string} path - Path to the file to write
   * @param {string} content - Text content to write
   * @param {BufferEncoding} [encoding='utf8'] - Text encoding to use
   * @param {boolean} [append=false] - Whether to append to the file instead of overwriting
   * @returns {Promise<void>}
   */
  async writeText(
    path: string,
    content: string,
    encoding: BufferEncoding = 'utf8',
    append: boolean = false
  ): Promise<void> {
    const contentBytes = Buffer.from(content, encoding);
    await this.writeBytes(path, contentBytes, append);
  }

  /**
   * Read bytes from a file, with optional offset and length.
   * @param {string} path - Path to the file
   * @param {number} [offset=0] - Starting byte offset
   * @param {number} [length] - Number of bytes to read (reads entire file if not specified)
   * @returns {Promise<Buffer>} File content as Buffer
   * @throws {Error} If unable to read file
   */
  async readBytes(path: string, offset: number = 0, length?: number): Promise<Buffer> {
    // For large files, use chunked reading
    if (length === undefined) {
      // Get file size first to determine if we need chunking
      const fileSize = await this.getFileSize(path);
      // If file is larger than 5MB, read in chunks
      if (fileSize > 5 * 1024 * 1024) {
        const readLength = offset > 0 ? fileSize - offset : fileSize;
        return await this.readBytesChunked(path, offset, readLength);
      }
    }

    const response = await this.sendCommand('read_bytes', {
      path,
      offset,
      length,
    });
    if (!response.success) {
      throw new Error((response.error as string) || 'Failed to read file');
    }
    return Buffer.from(response.content_b64 as string, 'base64');
  }

  /**
   * Write bytes to a file.
   * @param {string} path - Path to the file
   * @param {Buffer} content - Content to write as Buffer
   * @param {boolean} [append=false] - Whether to append to existing file
   * @returns {Promise<void>}
   * @throws {Error} If unable to write file
   */
  async writeBytes(path: string, content: Buffer, append: boolean = false): Promise<void> {
    // For large files, use chunked writing
    if (content.length > 5 * 1024 * 1024) {
      // 5MB threshold
      await this.writeBytesChunked(path, content, append);
      return;
    }

    const response = await this.sendCommand('write_bytes', {
      path,
      content_b64: content.toString('base64'),
      append,
    });
    if (!response.success) {
      throw new Error((response.error as string) || 'Failed to write file');
    }
  }

  /**
   * Delete a file at the specified path.
   * @param {string} path - Path to the file to delete
   * @returns {Promise<void>}
   * @throws {Error} If unable to delete file
   */
  async deleteFile(path: string): Promise<void> {
    const response = await this.sendCommand('delete_file', { path });
    if (!response.success) {
      throw new Error((response.error as string) || 'Failed to delete file');
    }
  }

  /**
   * Create a directory at the specified path.
   * @param {string} path - Path where to create the directory
   * @returns {Promise<void>}
   * @throws {Error} If unable to create directory
   */
  async createDir(path: string): Promise<void> {
    const response = await this.sendCommand('create_dir', { path });
    if (!response.success) {
      throw new Error(
        (response.error as string) || 'Failed to create directory'
      );
    }
  }

  /**
   * Delete a directory at the specified path.
   * @param {string} path - Path to the directory to delete
   * @returns {Promise<void>}
   * @throws {Error} If unable to delete directory
   */
  async deleteDir(path: string): Promise<void> {
    const response = await this.sendCommand('delete_dir', { path });
    if (!response.success) {
      throw new Error(
        (response.error as string) || 'Failed to delete directory'
      );
    }
  }

  /**
   * Execute a shell command and return stdout and stderr.
   * @param {string} command - Command to execute
   * @returns {Promise<[string, string]>} Tuple of [stdout, stderr]
   * @throws {Error} If command execution fails
   */
  async runCommand(command: string): Promise<[string, string]> {
    const response = await this.sendCommand('run_command', { command });
    if (!response.success) {
      throw new Error((response.error as string) || 'Failed to run command');
    }
    return [
      (response.stdout as string) || '',
      (response.stderr as string) || '',
    ];
  }

  // Accessibility Actions
  /**
   * Get the accessibility tree of the current screen.
   * @returns {Promise<AccessibilityNode>} Root accessibility node
   * @throws {Error} If unable to get accessibility tree
   */
  async getAccessibilityTree(): Promise<AccessibilityNode> {
    const response = await this.sendCommand('get_accessibility_tree');
    if (!response.success) {
      throw new Error(
        (response.error as string) || 'Failed to get accessibility tree'
      );
    }
    return response as unknown as AccessibilityNode;
  }

  /**
   * Convert coordinates to screen coordinates.
   * @param {number} x - X coordinate to convert
   * @param {number} y - Y coordinate to convert
   * @returns {Promise<[number, number]>} Converted screen coordinates as [x, y]
   * @throws {Error} If coordinate conversion fails
   */
  async toScreenCoordinates(x: number, y: number): Promise<[number, number]> {
    const response = await this.sendCommand('to_screen_coordinates', { x, y });
    if (!response.success || !response.coordinates) {
      throw new Error('Failed to convert to screen coordinates');
    }
    return response.coordinates as [number, number];
  }

  /**
   * Convert coordinates to screenshot coordinates.
   * @param {number} x - X coordinate to convert
   * @param {number} y - Y coordinate to convert
   * @returns {Promise<[number, number]>} Converted screenshot coordinates as [x, y]
   * @throws {Error} If coordinate conversion fails
   */
  async toScreenshotCoordinates(
    x: number,
    y: number
  ): Promise<[number, number]> {
    const response = await this.sendCommand('to_screenshot_coordinates', {
      x,
      y,
    });
    if (!response.success || !response.coordinates) {
      throw new Error('Failed to convert to screenshot coordinates');
    }
    return response.coordinates as [number, number];
  }
}

```

--------------------------------------------------------------------------------
/libs/lume/src/Virtualization/VMVirtualizationService.swift:
--------------------------------------------------------------------------------

```swift
import Foundation
import Virtualization

/// Framework-agnostic VM configuration
struct VMVirtualizationServiceContext {
    let cpuCount: Int
    let memorySize: UInt64
    let display: String
    let sharedDirectories: [SharedDirectory]?
    let mount: Path?
    let hardwareModel: Data?
    let machineIdentifier: Data?
    let macAddress: String
    let diskPath: Path
    let nvramPath: Path
    let recoveryMode: Bool
    let usbMassStoragePaths: [Path]?
}

/// Protocol defining the interface for virtualization operations
@MainActor
protocol VMVirtualizationService {
    var state: VZVirtualMachine.State { get }
    func start() async throws
    func stop() async throws
    func pause() async throws
    func resume() async throws
    func getVirtualMachine() -> Any
}

/// Base implementation of VMVirtualizationService using VZVirtualMachine
@MainActor
class BaseVirtualizationService: VMVirtualizationService {
    let virtualMachine: VZVirtualMachine
    let recoveryMode: Bool  // Store whether we should start in recovery mode

    var state: VZVirtualMachine.State {
        virtualMachine.state
    }

    init(virtualMachine: VZVirtualMachine, recoveryMode: Bool = false) {
        self.virtualMachine = virtualMachine
        self.recoveryMode = recoveryMode
    }

    func start() async throws {
        try await withCheckedThrowingContinuation {
            (continuation: CheckedContinuation<Void, Error>) in
            Task { @MainActor in
                if #available(macOS 13, *) {
                    let startOptions = VZMacOSVirtualMachineStartOptions()
                    startOptions.startUpFromMacOSRecovery = recoveryMode
                    if recoveryMode {
                        Logger.info("Starting VM in recovery mode")
                    }
                    virtualMachine.start(options: startOptions) { error in
                        if let error = error {
                            continuation.resume(throwing: error)
                        } else {
                            continuation.resume()
                        }
                    }
                } else {
                    Logger.info("Starting VM in normal mode")
                    virtualMachine.start { result in
                        switch result {
                        case .success:
                            continuation.resume()
                        case .failure(let error):
                            continuation.resume(throwing: error)
                        }
                    }
                }
            }
        }
    }

    func stop() async throws {
        try await withCheckedThrowingContinuation {
            (continuation: CheckedContinuation<Void, Error>) in
            virtualMachine.stop { error in
                if let error = error {
                    continuation.resume(throwing: error)
                } else {
                    continuation.resume()
                }
            }
        }
    }

    func pause() async throws {
        try await withCheckedThrowingContinuation {
            (continuation: CheckedContinuation<Void, Error>) in
            virtualMachine.start { result in
                switch result {
                case .success:
                    continuation.resume()
                case .failure(let error):
                    continuation.resume(throwing: error)
                }
            }
        }
    }

    func resume() async throws {
        try await withCheckedThrowingContinuation {
            (continuation: CheckedContinuation<Void, Error>) in
            virtualMachine.start { result in
                switch result {
                case .success:
                    continuation.resume()
                case .failure(let error):
                    continuation.resume(throwing: error)
                }
            }
        }
    }

    func getVirtualMachine() -> Any {
        return virtualMachine
    }

    // Helper methods for creating common configurations
    static func createStorageDeviceConfiguration(diskPath: Path, readOnly: Bool = false) throws
        -> VZStorageDeviceConfiguration
    {
        return VZVirtioBlockDeviceConfiguration(
            attachment: try VZDiskImageStorageDeviceAttachment(
                url: diskPath.url,
                readOnly: readOnly,
                cachingMode: VZDiskImageCachingMode.automatic,
                synchronizationMode: VZDiskImageSynchronizationMode.fsync
            )
        )
    }

    static func createUSBMassStorageDeviceConfiguration(diskPath: Path, readOnly: Bool = false)
        throws
        -> VZStorageDeviceConfiguration
    {
        if #available(macOS 15.0, *) {
            return VZUSBMassStorageDeviceConfiguration(
                attachment: try VZDiskImageStorageDeviceAttachment(
                    url: diskPath.url,
                    readOnly: readOnly,
                    cachingMode: VZDiskImageCachingMode.automatic,
                    synchronizationMode: VZDiskImageSynchronizationMode.fsync
                )
            )
        } else {
            // Fallback to normal storage device if USB mass storage not available
            return try createStorageDeviceConfiguration(diskPath: diskPath, readOnly: readOnly)
        }
    }

    static func createNetworkDeviceConfiguration(macAddress: String) throws
        -> VZNetworkDeviceConfiguration
    {
        let network = VZVirtioNetworkDeviceConfiguration()
        guard let vzMacAddress = VZMACAddress(string: macAddress) else {
            throw VMConfigError.invalidMachineIdentifier
        }
        network.attachment = VZNATNetworkDeviceAttachment()
        network.macAddress = vzMacAddress
        return network
    }

    static func createDirectorySharingDevices(sharedDirectories: [SharedDirectory]?)
        -> [VZDirectorySharingDeviceConfiguration]
    {
        return sharedDirectories?.map { sharedDir in
            let device = VZVirtioFileSystemDeviceConfiguration(tag: sharedDir.tag)
            let url = URL(fileURLWithPath: sharedDir.hostPath)
            device.share = VZSingleDirectoryShare(
                directory: VZSharedDirectory(url: url, readOnly: sharedDir.readOnly))
            return device
        } ?? []
    }
}

/// macOS-specific virtualization service
@MainActor
final class DarwinVirtualizationService: BaseVirtualizationService {
    static func createConfiguration(_ config: VMVirtualizationServiceContext) throws
        -> VZVirtualMachineConfiguration
    {
        let vzConfig = VZVirtualMachineConfiguration()
        vzConfig.cpuCount = config.cpuCount
        vzConfig.memorySize = config.memorySize

        // Platform configuration
        guard let machineIdentifier = config.machineIdentifier else {
            throw VMConfigError.emptyMachineIdentifier
        }

        guard let hardwareModel = config.hardwareModel else {
            throw VMConfigError.emptyHardwareModel
        }

        let platform = VZMacPlatformConfiguration()
        platform.auxiliaryStorage = VZMacAuxiliaryStorage(url: config.nvramPath.url)
        Logger.info("Pre-VZMacHardwareModel: hardwareModel=\(hardwareModel)")
        guard let vzHardwareModel = VZMacHardwareModel(dataRepresentation: hardwareModel) else {
            throw VMConfigError.invalidHardwareModel
        }
        platform.hardwareModel = vzHardwareModel
        guard
            let vzMachineIdentifier = VZMacMachineIdentifier(dataRepresentation: machineIdentifier)
        else {
            throw VMConfigError.invalidMachineIdentifier
        }
        platform.machineIdentifier = vzMachineIdentifier
        vzConfig.platform = platform
        vzConfig.bootLoader = VZMacOSBootLoader()

        // Graphics configuration
        let display = VMDisplayResolution(string: config.display)!
        let graphics = VZMacGraphicsDeviceConfiguration()
        graphics.displays = [
            VZMacGraphicsDisplayConfiguration(
                widthInPixels: display.width,
                heightInPixels: display.height,
                pixelsPerInch: 220  // Retina display density
            )
        ]
        vzConfig.graphicsDevices = [graphics]

        // Common configurations
        vzConfig.keyboards = [VZUSBKeyboardConfiguration()]
        vzConfig.pointingDevices = [VZUSBScreenCoordinatePointingDeviceConfiguration()]
        var storageDevices = [try createStorageDeviceConfiguration(diskPath: config.diskPath)]
        if let mount = config.mount {
            storageDevices.append(
                try createStorageDeviceConfiguration(diskPath: mount, readOnly: true))
        }
        // Add USB mass storage devices if specified
        if #available(macOS 15.0, *), let usbPaths = config.usbMassStoragePaths, !usbPaths.isEmpty {
            for usbPath in usbPaths {
                storageDevices.append(
                    try createUSBMassStorageDeviceConfiguration(diskPath: usbPath, readOnly: true))
            }
        }
        vzConfig.storageDevices = storageDevices
        vzConfig.networkDevices = [
            try createNetworkDeviceConfiguration(macAddress: config.macAddress)
        ]
        vzConfig.memoryBalloonDevices = [VZVirtioTraditionalMemoryBalloonDeviceConfiguration()]
        vzConfig.entropyDevices = [VZVirtioEntropyDeviceConfiguration()]
        
        // Audio configuration
        let soundDeviceConfiguration = VZVirtioSoundDeviceConfiguration()
        let inputAudioStreamConfiguration = VZVirtioSoundDeviceInputStreamConfiguration()
        let outputAudioStreamConfiguration = VZVirtioSoundDeviceOutputStreamConfiguration()
        
        inputAudioStreamConfiguration.source = VZHostAudioInputStreamSource()
        outputAudioStreamConfiguration.sink = VZHostAudioOutputStreamSink()
        
        soundDeviceConfiguration.streams = [inputAudioStreamConfiguration, outputAudioStreamConfiguration]
        vzConfig.audioDevices = [soundDeviceConfiguration]
        
        // Clipboard sharing via Spice agent
        let spiceAgentConsoleDevice = VZVirtioConsoleDeviceConfiguration()
        let spiceAgentPort = VZVirtioConsolePortConfiguration()
        spiceAgentPort.name = VZSpiceAgentPortAttachment.spiceAgentPortName
        let spiceAgentPortAttachment = VZSpiceAgentPortAttachment()
        spiceAgentPortAttachment.sharesClipboard = true
        spiceAgentPort.attachment = spiceAgentPortAttachment
        spiceAgentConsoleDevice.ports[0] = spiceAgentPort
        vzConfig.consoleDevices.append(spiceAgentConsoleDevice)

        // Directory sharing
        let directorySharingDevices = createDirectorySharingDevices(
            sharedDirectories: config.sharedDirectories)
        if !directorySharingDevices.isEmpty {
            vzConfig.directorySharingDevices = directorySharingDevices
        }

        // USB Controller configuration
        if #available(macOS 15.0, *) {
            let usbControllerConfiguration = VZXHCIControllerConfiguration()
            vzConfig.usbControllers = [usbControllerConfiguration]
        }

        try vzConfig.validate()
        return vzConfig
    }

    static func generateMacAddress() -> String {
        VZMACAddress.randomLocallyAdministered().string
    }

    static func generateMachineIdentifier() -> Data {
        VZMacMachineIdentifier().dataRepresentation
    }

    func createAuxiliaryStorage(at path: Path, hardwareModel: Data) throws {
        guard let vzHardwareModel = VZMacHardwareModel(dataRepresentation: hardwareModel) else {
            throw VMConfigError.invalidHardwareModel
        }
        _ = try VZMacAuxiliaryStorage(creatingStorageAt: path.url, hardwareModel: vzHardwareModel)
    }

    init(configuration: VMVirtualizationServiceContext) throws {
        let vzConfig = try Self.createConfiguration(configuration)
        super.init(
            virtualMachine: VZVirtualMachine(configuration: vzConfig),
            recoveryMode: configuration.recoveryMode)
    }

    func installMacOS(imagePath: Path, progressHandler: (@Sendable (Double) -> Void)?) async throws
    {
        var observers: [NSKeyValueObservation] = []  // must hold observer references during installation to print process
        try await withCheckedThrowingContinuation {
            (continuation: CheckedContinuation<Void, Error>) in
            Task {
                let installer = VZMacOSInstaller(
                    virtualMachine: virtualMachine, restoringFromImageAt: imagePath.url)
                Logger.info("Starting macOS installation")

                if let progressHandler = progressHandler {
                    let observer = installer.progress.observe(
                        \.fractionCompleted, options: [.initial, .new]
                    ) { (progress, change) in
                        if let newValue = change.newValue {
                            progressHandler(newValue)
                        }
                    }
                    observers.append(observer)
                }

                installer.install { result in
                    switch result {
                    case .success:
                        continuation.resume()
                    case .failure(let error):
                        Logger.error("Failed to install, error=\(error))")
                        continuation.resume(throwing: error)
                    }
                }
            }
        }
        Logger.info("macOS installation finished")
    }
}

/// Linux-specific virtualization service
@MainActor
final class LinuxVirtualizationService: BaseVirtualizationService {
    static func createConfiguration(_ config: VMVirtualizationServiceContext) throws
        -> VZVirtualMachineConfiguration
    {
        let vzConfig = VZVirtualMachineConfiguration()
        vzConfig.cpuCount = config.cpuCount
        vzConfig.memorySize = config.memorySize

        // Platform configuration
        let platform = VZGenericPlatformConfiguration()
        if #available(macOS 15, *) {
            platform.isNestedVirtualizationEnabled =
                VZGenericPlatformConfiguration.isNestedVirtualizationSupported
        }
        vzConfig.platform = platform

        let bootLoader = VZEFIBootLoader()
        bootLoader.variableStore = VZEFIVariableStore(url: config.nvramPath.url)
        vzConfig.bootLoader = bootLoader

        // Graphics configuration
        let display = VMDisplayResolution(string: config.display)!
        let graphics = VZVirtioGraphicsDeviceConfiguration()
        graphics.scanouts = [
            VZVirtioGraphicsScanoutConfiguration(
                widthInPixels: display.width,
                heightInPixels: display.height
            )
        ]
        vzConfig.graphicsDevices = [graphics]

        // Common configurations
        vzConfig.keyboards = [VZUSBKeyboardConfiguration()]
        vzConfig.pointingDevices = [VZUSBScreenCoordinatePointingDeviceConfiguration()]
        var storageDevices = [try createStorageDeviceConfiguration(diskPath: config.diskPath)]
        if let mount = config.mount {
            storageDevices.append(
                try createStorageDeviceConfiguration(diskPath: mount, readOnly: true))
        }
        // Add USB mass storage devices if specified
        if #available(macOS 15.0, *), let usbPaths = config.usbMassStoragePaths, !usbPaths.isEmpty {
            for usbPath in usbPaths {
                storageDevices.append(
                    try createUSBMassStorageDeviceConfiguration(diskPath: usbPath, readOnly: true))
            }
        }
        vzConfig.storageDevices = storageDevices
        vzConfig.networkDevices = [
            try createNetworkDeviceConfiguration(macAddress: config.macAddress)
        ]
        vzConfig.memoryBalloonDevices = [VZVirtioTraditionalMemoryBalloonDeviceConfiguration()]
        vzConfig.entropyDevices = [VZVirtioEntropyDeviceConfiguration()]
        
        // Audio configuration
        let soundDeviceConfiguration = VZVirtioSoundDeviceConfiguration()
        let inputAudioStreamConfiguration = VZVirtioSoundDeviceInputStreamConfiguration()
        let outputAudioStreamConfiguration = VZVirtioSoundDeviceOutputStreamConfiguration()
        
        inputAudioStreamConfiguration.source = VZHostAudioInputStreamSource()
        outputAudioStreamConfiguration.sink = VZHostAudioOutputStreamSink()
        
        soundDeviceConfiguration.streams = [inputAudioStreamConfiguration, outputAudioStreamConfiguration]
        vzConfig.audioDevices = [soundDeviceConfiguration]

        // Clipboard sharing via Spice agent
        let spiceAgentConsoleDevice = VZVirtioConsoleDeviceConfiguration()
        let spiceAgentPort = VZVirtioConsolePortConfiguration()
        spiceAgentPort.name = VZSpiceAgentPortAttachment.spiceAgentPortName
        let spiceAgentPortAttachment = VZSpiceAgentPortAttachment()
        spiceAgentPortAttachment.sharesClipboard = true
        spiceAgentPort.attachment = spiceAgentPortAttachment
        spiceAgentConsoleDevice.ports[0] = spiceAgentPort
        vzConfig.consoleDevices.append(spiceAgentConsoleDevice)

        // Directory sharing
        var directorySharingDevices = createDirectorySharingDevices(
            sharedDirectories: config.sharedDirectories)

        // Add Rosetta support if available
        if #available(macOS 13.0, *) {
            if VZLinuxRosettaDirectoryShare.availability == .installed {
                do {
                    let rosettaShare = try VZLinuxRosettaDirectoryShare()
                    let rosettaDevice = VZVirtioFileSystemDeviceConfiguration(tag: "rosetta")
                    rosettaDevice.share = rosettaShare
                    directorySharingDevices.append(rosettaDevice)
                    Logger.info("Added Rosetta support to Linux VM")
                } catch {
                    Logger.info("Failed to add Rosetta support: \(error.localizedDescription)")
                }
            } else {
                Logger.info("Rosetta not installed, skipping Rosetta support")
            }
        }

        if !directorySharingDevices.isEmpty {
            vzConfig.directorySharingDevices = directorySharingDevices
        }

        // USB Controller configuration
        if #available(macOS 15.0, *) {
            let usbControllerConfiguration = VZXHCIControllerConfiguration()
            vzConfig.usbControllers = [usbControllerConfiguration]
        }

        try vzConfig.validate()
        return vzConfig
    }

    func generateMacAddress() -> String {
        VZMACAddress.randomLocallyAdministered().string
    }

    func createNVRAM(at path: Path) throws {
        _ = try VZEFIVariableStore(creatingVariableStoreAt: path.url)
    }

    init(configuration: VMVirtualizationServiceContext) throws {
        let vzConfig = try Self.createConfiguration(configuration)
        super.init(virtualMachine: VZVirtualMachine(configuration: vzConfig))
    }
}

```

--------------------------------------------------------------------------------
/libs/python/computer/computer/providers/lume_api.py:
--------------------------------------------------------------------------------

```python
"""Shared API utilities for Lume and Lumier providers.

This module contains shared functions for interacting with the Lume API,
used by both the LumeProvider and LumierProvider classes.
"""

import logging
import json
import subprocess
import urllib.parse
from typing import Dict, List, Optional, Any

# Setup logging
logger = logging.getLogger(__name__)

# Check if curl is available
try:
    subprocess.run(["curl", "--version"], capture_output=True, check=True)
    HAS_CURL = True
except (subprocess.SubprocessError, FileNotFoundError):
    HAS_CURL = False


def lume_api_get(
    vm_name: str,
    host: str,
    port: int,
    storage: Optional[str] = None,
    debug: bool = False,
    verbose: bool = False
) -> Dict[str, Any]:
    """Use curl to get VM information from Lume API.
    
    Args:
        vm_name: Name of the VM to get info for
        host: API host
        port: API port
        storage: Storage path for the VM
        debug: Whether to show debug output
        verbose: Enable verbose logging
        
    Returns:
        Dictionary with VM status information parsed from JSON response
    """
    # URL encode the storage parameter for the query
    encoded_storage = ""
    storage_param = ""
    
    if storage:
        # First encode the storage path properly
        encoded_storage = urllib.parse.quote(storage, safe='')
        storage_param = f"?storage={encoded_storage}"
        
    # Construct API URL with encoded storage parameter if needed
    api_url = f"http://{host}:{port}/lume/vms/{vm_name}{storage_param}"
        
    # Construct the curl command with increased timeouts for more reliability
    # --connect-timeout: Time to establish connection (15 seconds)
    # --max-time: Maximum time for the whole operation (20 seconds)
    # -f: Fail silently (no output at all) on server errors
    # Add single quotes around URL to ensure special characters are handled correctly
    cmd = ["curl", "--connect-timeout", "15", "--max-time", "20", "-s", "-f", f"'{api_url}'"]
    
    # For logging and display, show the properly escaped URL
    display_cmd = ["curl", "--connect-timeout", "15", "--max-time", "20", "-s", "-f", api_url]
    
    # Only print the curl command when debug is enabled
    display_curl_string = ' '.join(display_cmd)
    logger.debug(f"Executing API request: {display_curl_string}")
    
    # Execute the command - for execution we need to use shell=True to handle URLs with special characters
    try:
        # Use a single string with shell=True for proper URL handling
        shell_cmd = ' '.join(cmd)
        result = subprocess.run(shell_cmd, shell=True, capture_output=True, text=True)
        
        # Handle curl exit codes
        if result.returncode != 0:
            curl_error = "Unknown error"
            
            # Map common curl error codes to helpful messages
            if result.returncode == 7:
                curl_error = "Failed to connect to the API server - it might still be starting up"
            elif result.returncode == 22:
                curl_error = "HTTP error returned from API server"
            elif result.returncode == 28:
                curl_error = "Operation timeout - the API server is taking too long to respond"
            elif result.returncode == 52:
                curl_error = "Empty reply from server - the API server is starting but not fully ready yet"
            elif result.returncode == 56:
                curl_error = "Network problem during data transfer - check container networking"
                
            # Only log at debug level to reduce noise during retries
            logger.debug(f"API request failed with code {result.returncode}: {curl_error}")
            
            # Return a more useful error message
            return {
                "error": f"API request failed: {curl_error}",
                "curl_code": result.returncode,
                "vm_name": vm_name,
                "status": "unknown"  # We don't know the actual status due to API error
            }
            
        # Try to parse the response as JSON
        if result.stdout and result.stdout.strip():
            try:
                vm_status = json.loads(result.stdout)
                if debug or verbose:
                    logger.info(f"Successfully parsed VM status: {vm_status.get('status', 'unknown')}")
                return vm_status
            except json.JSONDecodeError as e:
                # Return the raw response if it's not valid JSON
                logger.warning(f"Invalid JSON response: {e}")
                if "Virtual machine not found" in result.stdout:
                    return {"status": "not_found", "message": "VM not found in Lume API"}
                
                return {"error": f"Invalid JSON response: {result.stdout[:100]}...", "status": "unknown"}
        else:
            return {"error": "Empty response from API", "status": "unknown"}
    except subprocess.SubprocessError as e:
        logger.error(f"Failed to execute API request: {e}")
        return {"error": f"Failed to execute API request: {str(e)}", "status": "unknown"}


def lume_api_run(
    vm_name: str,
    host: str,
    port: int,
    run_opts: Dict[str, Any],
    storage: Optional[str] = None,
    debug: bool = False,
    verbose: bool = False
) -> Dict[str, Any]:
    """Run a VM using curl.
    
    Args:
        vm_name: Name of the VM to run
        host: API host
        port: API port
        run_opts: Dictionary of run options
        storage: Storage path for the VM
        debug: Whether to show debug output
        verbose: Enable verbose logging
        
    Returns:
        Dictionary with API response or error information
    """
    # Construct API URL
    api_url = f"http://{host}:{port}/lume/vms/{vm_name}/run"
    
    # Prepare JSON payload with required parameters
    payload = {}
    
    # Add CPU cores if specified
    if "cpu" in run_opts:
        payload["cpu"] = run_opts["cpu"]
        
    # Add memory if specified
    if "memory" in run_opts:
        payload["memory"] = run_opts["memory"]
    
    # Add storage parameter if specified
    if storage:
        payload["storage"] = storage
    elif "storage" in run_opts:
        payload["storage"] = run_opts["storage"]
        
    # Add shared directories if specified
    if "shared_directories" in run_opts and run_opts["shared_directories"]:
        payload["sharedDirectories"] = run_opts["shared_directories"]
        
    # Log the payload for debugging
    logger.debug(f"API payload: {json.dumps(payload, indent=2)}")
    
    # Construct the curl command
    cmd = [
        "curl", "--connect-timeout", "30", "--max-time", "30",
        "-s", "-X", "POST", "-H", "Content-Type: application/json",
        "-d", json.dumps(payload),
        api_url
    ]
    
    # Execute the command
    try:
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            logger.warning(f"API request failed with code {result.returncode}: {result.stderr}")
            return {"error": f"API request failed: {result.stderr}"}
            
        # Try to parse the response as JSON
        if result.stdout and result.stdout.strip():
            try:
                response = json.loads(result.stdout)
                return response
            except json.JSONDecodeError:
                # Return the raw response if it's not valid JSON
                return {"success": True, "message": "VM started successfully", "raw_response": result.stdout}
        else:
            return {"success": True, "message": "VM started successfully"}
    except subprocess.SubprocessError as e:
        logger.error(f"Failed to execute run request: {e}")
        return {"error": f"Failed to execute run request: {str(e)}"}


def lume_api_stop(
    vm_name: str,
    host: str,
    port: int,
    storage: Optional[str] = None,
    debug: bool = False,
    verbose: bool = False
) -> Dict[str, Any]:
    """Stop a VM using curl.
    
    Args:
        vm_name: Name of the VM to stop
        host: API host
        port: API port
        storage: Storage path for the VM
        debug: Whether to show debug output
        verbose: Enable verbose logging
        
    Returns:
        Dictionary with API response or error information
    """
    # Construct API URL
    api_url = f"http://{host}:{port}/lume/vms/{vm_name}/stop"
    
    # Prepare JSON payload with required parameters
    payload = {}
    
    # Add storage path if specified
    if storage:
        payload["storage"] = storage
        
    # Construct the curl command
    cmd = [
        "curl", "--connect-timeout", "15", "--max-time", "20",
        "-s", "-X", "POST", "-H", "Content-Type: application/json",
        "-d", json.dumps(payload),
        api_url
    ]
    
    # Execute the command
    try:
        if debug or verbose:
            logger.info(f"Executing: {' '.join(cmd)}")
            
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            logger.warning(f"API request failed with code {result.returncode}: {result.stderr}")
            return {"error": f"API request failed: {result.stderr}"}
            
        # Try to parse the response as JSON
        if result.stdout and result.stdout.strip():
            try:
                response = json.loads(result.stdout)
                return response
            except json.JSONDecodeError:
                # Return the raw response if it's not valid JSON
                return {"success": True, "message": "VM stopped successfully", "raw_response": result.stdout}
        else:
            return {"success": True, "message": "VM stopped successfully"}
    except subprocess.SubprocessError as e:
        logger.error(f"Failed to execute stop request: {e}")
        return {"error": f"Failed to execute stop request: {str(e)}"}


def lume_api_update(
    vm_name: str,
    host: str,
    port: int,
    update_opts: Dict[str, Any],
    storage: Optional[str] = None,
    debug: bool = False,
    verbose: bool = False
) -> Dict[str, Any]:
    """Update VM settings using curl.
    
    Args:
        vm_name: Name of the VM to update
        host: API host
        port: API port
        update_opts: Dictionary of update options
        storage: Storage path for the VM
        debug: Whether to show debug output
        verbose: Enable verbose logging
        
    Returns:
        Dictionary with API response or error information
    """
    # Construct API URL
    api_url = f"http://{host}:{port}/lume/vms/{vm_name}/update"
    
    # Prepare JSON payload with required parameters
    payload = {}
    
    # Add CPU cores if specified
    if "cpu" in update_opts:
        payload["cpu"] = update_opts["cpu"]
        
    # Add memory if specified
    if "memory" in update_opts:
        payload["memory"] = update_opts["memory"]
    
    # Add storage path if specified
    if storage:
        payload["storage"] = storage
        
    # Construct the curl command
    cmd = [
        "curl", "--connect-timeout", "15", "--max-time", "20",
        "-s", "-X", "POST", "-H", "Content-Type: application/json",
        "-d", json.dumps(payload),
        api_url
    ]
    
    # Execute the command
    try:
        if debug:
            logger.info(f"Executing: {' '.join(cmd)}")
            
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            logger.warning(f"API request failed with code {result.returncode}: {result.stderr}")
            return {"error": f"API request failed: {result.stderr}"}
            
        # Try to parse the response as JSON
        if result.stdout and result.stdout.strip():
            try:
                response = json.loads(result.stdout)
                return response
            except json.JSONDecodeError:
                # Return the raw response if it's not valid JSON
                return {"success": True, "message": "VM updated successfully", "raw_response": result.stdout}
        else:
            return {"success": True, "message": "VM updated successfully"}
    except subprocess.SubprocessError as e:
        logger.error(f"Failed to execute update request: {e}")
        return {"error": f"Failed to execute update request: {str(e)}"}


def lume_api_pull(
    image: str,
    name: str,
    host: str,
    port: int,
    storage: Optional[str] = None,
    registry: str = "ghcr.io",
    organization: str = "trycua",
    debug: bool = False,
    verbose: bool = False
) -> Dict[str, Any]:
    """Pull a VM image from a registry using curl.
    
    Args:
        image: Name/tag of the image to pull
        name: Name to give the VM after pulling
        host: API host
        port: API port
        storage: Storage path for the VM
        registry: Registry to pull from (default: ghcr.io)
        organization: Organization in registry (default: trycua)
        debug: Whether to show debug output
        verbose: Enable verbose logging
        
    Returns:
        Dictionary with pull status and information
    """
    # Prepare pull request payload
    pull_payload = {
        "image": image,  # Use provided image name
        "name": name, # Always use name as the target VM name
        "registry": registry,
        "organization": organization
    }
    
    if storage:
        pull_payload["storage"] = storage
    
    # Construct pull command with proper JSON payload
    pull_cmd = [
        "curl"
    ]
    
    if not verbose:
        pull_cmd.append("-s")
    
    pull_cmd.extend([
        "-X", "POST",
        "-H", "Content-Type: application/json",
        "-d", json.dumps(pull_payload),
        f"http://{host}:{port}/lume/pull"
    ])
    
    logger.debug(f"Executing API request: {' '.join(pull_cmd)}")
    
    try:
        # Execute pull command
        result = subprocess.run(pull_cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            error_msg = f"Failed to pull VM {name}: {result.stderr}"
            logger.error(error_msg)
            return {"error": error_msg}
        
        try:
            response = json.loads(result.stdout)
            logger.info(f"Successfully initiated pull for VM {name}")
            return response
        except json.JSONDecodeError:
            if result.stdout:
                logger.info(f"Pull response: {result.stdout}")
            return {"success": True, "message": f"Successfully initiated pull for VM {name}"}
            
    except subprocess.SubprocessError as e:
        error_msg = f"Failed to execute pull command: {str(e)}"
        logger.error(error_msg)
        return {"error": error_msg}


def lume_api_delete(
    vm_name: str,
    host: str,
    port: int,
    storage: Optional[str] = None,
    debug: bool = False,
    verbose: bool = False
) -> Dict[str, Any]:
    """Delete a VM using curl.
    
    Args:
        vm_name: Name of the VM to delete
        host: API host
        port: API port
        storage: Storage path for the VM
        debug: Whether to show debug output
        verbose: Enable verbose logging
        
    Returns:
        Dictionary with API response or error information
    """
    # URL encode the storage parameter for the query
    encoded_storage = ""
    storage_param = ""
    
    if storage:
        # First encode the storage path properly
        encoded_storage = urllib.parse.quote(storage, safe='')
        storage_param = f"?storage={encoded_storage}"
        
    # Construct API URL with encoded storage parameter if needed
    api_url = f"http://{host}:{port}/lume/vms/{vm_name}{storage_param}"
        
    # Construct the curl command for DELETE operation - using much longer timeouts matching shell implementation
    cmd = ["curl", "--connect-timeout", "6000", "--max-time", "5000", "-s", "-X", "DELETE", f"'{api_url}'"]
    
    # For logging and display, show the properly escaped URL
    display_cmd = ["curl", "--connect-timeout", "6000", "--max-time", "5000", "-s", "-X", "DELETE", api_url]
    
    # Only print the curl command when debug is enabled
    display_curl_string = ' '.join(display_cmd)
    logger.debug(f"Executing API request: {display_curl_string}")
    
    # Execute the command - for execution we need to use shell=True to handle URLs with special characters
    try:
        # Use a single string with shell=True for proper URL handling
        shell_cmd = ' '.join(cmd)
        result = subprocess.run(shell_cmd, shell=True, capture_output=True, text=True)
        
        # Handle curl exit codes
        if result.returncode != 0:
            curl_error = "Unknown error"
            
            # Map common curl error codes to helpful messages
            if result.returncode == 7:
                curl_error = "Failed to connect to the API server - it might still be starting up"
            elif result.returncode == 22:
                curl_error = "HTTP error returned from API server"
            elif result.returncode == 28:
                curl_error = "Operation timeout - the API server is taking too long to respond"
            elif result.returncode == 52:
                curl_error = "Empty reply from server - the API server is starting but not fully ready yet"
            elif result.returncode == 56:
                curl_error = "Network problem during data transfer - check container networking"
                
            # Only log at debug level to reduce noise during retries
            logger.debug(f"API request failed with code {result.returncode}: {curl_error}")
            
            # Return a more useful error message
            return {
                "error": f"API request failed: {curl_error}",
                "curl_code": result.returncode,
                "vm_name": vm_name,
                "storage": storage
            }
            
        # Try to parse the response as JSON
        if result.stdout and result.stdout.strip():
            try:
                response = json.loads(result.stdout)
                return response
            except json.JSONDecodeError:
                # Return the raw response if it's not valid JSON
                return {"success": True, "message": "VM deleted successfully", "raw_response": result.stdout}
        else:
            return {"success": True, "message": "VM deleted successfully"}
    except subprocess.SubprocessError as e:
        logger.error(f"Failed to execute delete request: {e}")
        return {"error": f"Failed to execute delete request: {str(e)}"}


def parse_memory(memory_str: str) -> int:
    """Parse memory string to MB integer.
    
    Examples:
        "8GB" -> 8192
        "1024MB" -> 1024
        "512" -> 512
        
    Returns:
        Memory value in MB
    """
    if isinstance(memory_str, int):
        return memory_str
        
    if isinstance(memory_str, str):
        # Extract number and unit
        import re
        match = re.match(r"(\d+)([A-Za-z]*)", memory_str)
        if match:
            value, unit = match.groups()
            value = int(value)
            unit = unit.upper()
            
            if unit == "GB" or unit == "G":
                return value * 1024
            elif unit == "MB" or unit == "M" or unit == "":
                return value
                
    # Default fallback
    logger.warning(f"Could not parse memory string '{memory_str}', using 8GB default")
    return 8192  # Default to 8GB

```

--------------------------------------------------------------------------------
/libs/python/pylume/pylume/server.py:
--------------------------------------------------------------------------------

```python
import os
import time
import asyncio
import subprocess
import tempfile
import logging
import socket
from typing import Optional
import sys
from .exceptions import LumeConnectionError
import signal
import json
import shlex
import random
from logging import getLogger


class LumeServer:
    def __init__(
        self,
        debug: bool = False,
        server_start_timeout: int = 60,
        port: Optional[int] = None,
        use_existing_server: bool = False,
        host: str = "localhost",
    ):
        """Initialize the LumeServer.

        Args:
            debug: Enable debug logging
            server_start_timeout: Timeout in seconds to wait for server to start
            port: Specific port to use for the server
            use_existing_server: If True, will try to connect to an existing server
                               instead of starting a new one
            host: Host to use for connections (e.g., "localhost", "127.0.0.1", "host.docker.internal")
        """
        self.debug = debug
        self.server_start_timeout = server_start_timeout
        self.server_process = None
        self.output_file = None
        self.requested_port = port
        self.port = None
        self.base_url = None
        self.use_existing_server = use_existing_server
        self.host = host

        # Configure logging
        self.logger = getLogger("pylume.server")
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.DEBUG if debug else logging.INFO)

        self.logger.debug(f"Server initialized with host: {self.host}")

    def _check_port_available(self, port: int) -> bool:
        """Check if a port is available."""
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.settimeout(0.5)
                result = s.connect_ex(("127.0.0.1", port))
                if result == 0:  # Port is in use on localhost
                    return False
        except:
            pass

        # Check the specified host (e.g., "host.docker.internal") if it's not a localhost alias
        if self.host not in ["localhost", "127.0.0.1"]:
            try:
                with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                    s.settimeout(0.5)
                    result = s.connect_ex((self.host, port))
                    if result == 0:  # Port is in use on host
                        return False
            except:
                pass

        return True

    def _get_server_port(self) -> int:
        """Get an available port for the server."""
        # Use requested port if specified
        if self.requested_port is not None:
            if not self._check_port_available(self.requested_port):
                raise RuntimeError(f"Requested port {self.requested_port} is not available")
            return self.requested_port

        # Find a free port
        for _ in range(10):  # Try up to 10 times
            port = random.randint(49152, 65535)
            if self._check_port_available(port):
                return port

        raise RuntimeError("Could not find an available port")

    async def _ensure_server_running(self) -> None:
        """Ensure the lume server is running, start it if it's not."""
        try:
            self.logger.debug("Checking if lume server is running...")
            # Try to connect to the server with a short timeout
            cmd = ["curl", "-s", "-w", "%{http_code}", "-m", "5", f"{self.base_url}/vms"]
            process = await asyncio.create_subprocess_exec(
                *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
            )
            stdout, stderr = await process.communicate()

            if process.returncode == 0:
                response = stdout.decode()
                status_code = int(response[-3:])
                if status_code == 200:
                    self.logger.debug("PyLume server is running")
                    return

            self.logger.debug("PyLume server not running, attempting to start it")
            # Server not running, try to start it
            lume_path = os.path.join(os.path.dirname(__file__), "lume")
            if not os.path.exists(lume_path):
                raise RuntimeError(f"Could not find lume binary at {lume_path}")

            # Make sure the file is executable
            os.chmod(lume_path, 0o755)

            # Create a temporary file for server output
            self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
            self.logger.debug(f"Using temporary file for server output: {self.output_file.name}")

            # Start the server
            self.logger.debug(f"Starting lume server with: {lume_path} serve --port {self.port}")

            # Start server in background using subprocess.Popen
            try:
                self.server_process = subprocess.Popen(
                    [lume_path, "serve", "--port", str(self.port)],
                    stdout=self.output_file,
                    stderr=self.output_file,
                    cwd=os.path.dirname(lume_path),
                    start_new_session=True,  # Run in new session to avoid blocking
                )
            except Exception as e:
                self.output_file.close()
                os.unlink(self.output_file.name)
                raise RuntimeError(f"Failed to start lume server process: {str(e)}")

            # Wait for server to start
            self.logger.debug(
                f"Waiting up to {self.server_start_timeout} seconds for server to start..."
            )
            start_time = time.time()
            server_ready = False
            last_size = 0

            while time.time() - start_time < self.server_start_timeout:
                if self.server_process.poll() is not None:
                    # Process has terminated
                    self.output_file.seek(0)
                    output = self.output_file.read()
                    self.output_file.close()
                    os.unlink(self.output_file.name)
                    error_msg = (
                        f"Server process terminated unexpectedly.\n"
                        f"Exit code: {self.server_process.returncode}\n"
                        f"Output: {output}"
                    )
                    raise RuntimeError(error_msg)

                # Check output file for server ready message
                self.output_file.seek(0, os.SEEK_END)
                size = self.output_file.tell()
                if size > last_size:  # Only read if there's new content
                    self.output_file.seek(last_size)
                    new_output = self.output_file.read()
                    if new_output.strip():  # Only log non-empty output
                        self.logger.debug(f"Server output: {new_output.strip()}")
                    last_size = size

                    if "Server started" in new_output:
                        server_ready = True
                        self.logger.debug("Server startup detected")
                        break

                # Try to connect to the server periodically
                try:
                    cmd = ["curl", "-s", "-w", "%{http_code}", "-m", "5", f"{self.base_url}/vms"]
                    process = await asyncio.create_subprocess_exec(
                        *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
                    )
                    stdout, stderr = await process.communicate()

                    if process.returncode == 0:
                        response = stdout.decode()
                        status_code = int(response[-3:])
                        if status_code == 200:
                            server_ready = True
                            self.logger.debug("Server is responding to requests")
                            break
                except:
                    pass  # Server not ready yet

                await asyncio.sleep(1.0)

            if not server_ready:
                # Cleanup if server didn't start
                if self.server_process:
                    self.server_process.terminate()
                    try:
                        self.server_process.wait(timeout=5)
                    except subprocess.TimeoutExpired:
                        self.server_process.kill()
                self.output_file.close()
                os.unlink(self.output_file.name)
                raise RuntimeError(
                    f"Failed to start lume server after {self.server_start_timeout} seconds. "
                    "Check the debug output for more details."
                )

            # Give the server a moment to fully initialize
            await asyncio.sleep(2.0)

            # Verify server is responding
            try:
                cmd = ["curl", "-s", "-w", "%{http_code}", "-m", "10", f"{self.base_url}/vms"]
                process = await asyncio.create_subprocess_exec(
                    *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
                )
                stdout, stderr = await process.communicate()

                if process.returncode != 0:
                    raise RuntimeError(f"Curl command failed: {stderr.decode()}")

                response = stdout.decode()
                status_code = int(response[-3:])

                if status_code != 200:
                    raise RuntimeError(f"Server returned status code {status_code}")

                self.logger.debug("PyLume server started successfully")
            except Exception as e:
                self.logger.debug(f"Server verification failed: {str(e)}")
                if self.server_process:
                    self.server_process.terminate()
                    try:
                        self.server_process.wait(timeout=5)
                    except subprocess.TimeoutExpired:
                        self.server_process.kill()
                self.output_file.close()
                os.unlink(self.output_file.name)
                raise RuntimeError(f"Server started but is not responding: {str(e)}")

            self.logger.debug("Server startup completed successfully")

        except Exception as e:
            raise RuntimeError(f"Failed to start lume server: {str(e)}")

    async def _start_server(self) -> None:
        """Start the lume server using the lume executable."""
        self.logger.debug("Starting PyLume server")

        # Get absolute path to lume executable in the same directory as this file
        lume_path = os.path.join(os.path.dirname(__file__), "lume")
        if not os.path.exists(lume_path):
            raise RuntimeError(f"Could not find lume binary at {lume_path}")

        try:
            # Make executable
            os.chmod(lume_path, 0o755)

            # Get and validate port
            self.port = self._get_server_port()
            self.base_url = f"http://{self.host}:{self.port}/lume"

            # Set up output handling
            self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)

            # Start the server process with the lume executable
            env = os.environ.copy()
            env["RUST_BACKTRACE"] = "1"  # Enable backtrace for better error reporting

            # Specify the host to bind to (0.0.0.0 to allow external connections)
            self.server_process = subprocess.Popen(
                [lume_path, "serve", "--port", str(self.port)],
                stdout=self.output_file,
                stderr=subprocess.STDOUT,
                cwd=os.path.dirname(lume_path),  # Run from same directory as executable
                env=env,
            )

            # Wait for server to initialize
            await asyncio.sleep(2)
            await self._wait_for_server()

        except Exception as e:
            await self._cleanup()
            raise RuntimeError(f"Failed to start lume server process: {str(e)}")

    async def _tail_log(self) -> None:
        """Read and display server log output in debug mode."""
        while True:
            try:
                self.output_file.seek(0, os.SEEK_END)  # type: ignore[attr-defined]
                line = self.output_file.readline()  # type: ignore[attr-defined]
                if line:
                    line = line.strip()
                    if line:
                        print(f"SERVER: {line}")
                if self.server_process.poll() is not None:  # type: ignore[attr-defined]
                    print("Server process ended")
                    break
                await asyncio.sleep(0.1)
            except Exception as e:
                print(f"Error reading log: {e}")
                await asyncio.sleep(0.1)

    async def _wait_for_server(self) -> None:
        """Wait for server to start and become responsive with increased timeout."""
        start_time = time.time()
        while time.time() - start_time < self.server_start_timeout:
            if self.server_process.poll() is not None:  # type: ignore[attr-defined]
                error_msg = await self._get_error_output()
                await self._cleanup()
                raise RuntimeError(error_msg)

            try:
                await self._verify_server()
                self.logger.debug("Server is now responsive")
                return
            except Exception as e:
                self.logger.debug(f"Server not ready yet: {str(e)}")
                await asyncio.sleep(1.0)

        await self._cleanup()
        raise RuntimeError(f"Server failed to start after {self.server_start_timeout} seconds")

    async def _verify_server(self) -> None:
        """Verify server is responding to requests."""
        try:
            cmd = [
                "curl",
                "-s",
                "-w",
                "%{http_code}",
                "-m",
                "10",
                f"http://{self.host}:{self.port}/lume/vms",
            ]
            process = await asyncio.create_subprocess_exec(
                *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
            )
            stdout, stderr = await process.communicate()

            if process.returncode != 0:
                raise RuntimeError(f"Curl command failed: {stderr.decode()}")

            response = stdout.decode()
            status_code = int(response[-3:])

            if status_code != 200:
                raise RuntimeError(f"Server returned status code {status_code}")

            self.logger.debug("PyLume server started successfully")
        except Exception as e:
            raise RuntimeError(f"Server not responding: {str(e)}")

    async def _get_error_output(self) -> str:
        """Get error output from the server process."""
        if not self.output_file:
            return "No output available"
        self.output_file.seek(0)
        output = self.output_file.read()
        return (
            f"Server process terminated unexpectedly.\n"
            f"Exit code: {self.server_process.returncode}\n"  # type: ignore[attr-defined]
            f"Output: {output}"
        )

    async def _cleanup(self) -> None:
        """Clean up all server resources."""
        if self.server_process:
            try:
                self.server_process.terminate()
                try:
                    self.server_process.wait(timeout=5)
                except subprocess.TimeoutExpired:
                    self.server_process.kill()
            except:
                pass
            self.server_process = None

        # Clean up output file
        if self.output_file:
            try:
                self.output_file.close()
                os.unlink(self.output_file.name)
            except Exception as e:
                self.logger.debug(f"Error cleaning up output file: {e}")
            self.output_file = None

    async def ensure_running(self) -> None:
        """Ensure the server is running.

        If use_existing_server is True, will only try to connect to an existing server.
        Otherwise will:
          1. Try to connect to an existing server on the specified port
          2. If that fails and not in Docker, start a new server
          3. If in Docker and no existing server is found, raise an error
        """
        # First check if we're in Docker
        in_docker = os.path.exists("/.dockerenv") or (
            os.path.exists("/proc/1/cgroup") and "docker" in open("/proc/1/cgroup", "r").read()
        )

        # If using a non-localhost host like host.docker.internal, set up the connection details
        if self.host not in ["localhost", "127.0.0.1"]:
            if self.requested_port is None:
                raise RuntimeError("Port must be specified when using a remote host")

            self.port = self.requested_port
            self.base_url = f"http://{self.host}:{self.port}/lume"
            self.logger.debug(f"Using remote host server at {self.base_url}")

            # Try to verify the server is accessible
            try:
                await self._verify_server()
                self.logger.debug("Successfully connected to remote server")
                return
            except Exception as e:
                if self.use_existing_server or in_docker:
                    # If explicitly requesting an existing server or in Docker, we can't start a new one
                    raise RuntimeError(
                        f"Failed to connect to remote server at {self.base_url}: {str(e)}"
                    )
                else:
                    self.logger.debug(f"Remote server not available at {self.base_url}: {str(e)}")
                    # Fall back to localhost for starting a new server
                    self.host = "localhost"

        # If explicitly using an existing server, verify it's running
        if self.use_existing_server:
            if self.requested_port is None:
                raise RuntimeError("Port must be specified when using an existing server")

            self.port = self.requested_port
            self.base_url = f"http://{self.host}:{self.port}/lume"

            try:
                await self._verify_server()
                self.logger.debug("Successfully connected to existing server")
            except Exception as e:
                raise RuntimeError(
                    f"Failed to connect to existing server at {self.base_url}: {str(e)}"
                )
        else:
            # Try to connect to an existing server first
            if self.requested_port is not None:
                self.port = self.requested_port
                self.base_url = f"http://{self.host}:{self.port}/lume"

                try:
                    await self._verify_server()
                    self.logger.debug("Successfully connected to existing server")
                    return
                except Exception:
                    self.logger.debug(f"No existing server found at {self.base_url}")

                    # If in Docker and can't connect to existing server, raise an error
                    if in_docker:
                        raise RuntimeError(
                            f"Failed to connect to server at {self.base_url} and cannot start a new server in Docker"
                        )

            # Start a new server
            self.logger.debug("Starting a new server instance")
            await self._start_server()

    async def stop(self) -> None:
        """Stop the server if we're managing it."""
        if not self.use_existing_server:
            self.logger.debug("Stopping lume server...")
            await self._cleanup()

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/handlers/linux.py:
--------------------------------------------------------------------------------

```python
"""
Linux implementation of automation and accessibility handlers.

This implementation attempts to use pyautogui for GUI automation when available.
If running in a headless environment without X11, it will fall back to simulated responses.
To use GUI automation in a headless environment:
1. Install Xvfb: sudo apt-get install xvfb
2. Run with virtual display: xvfb-run python -m computer_server
"""
from typing import Dict, Any, List, Tuple, Optional
import logging
import subprocess
import asyncio
import base64
import os
import json
from io import BytesIO

# Configure logger
logger = logging.getLogger(__name__)

# Try to import pyautogui, but don't fail if it's not available
# This allows the server to run in headless environments
try:
    import pyautogui
    pyautogui.FAILSAFE = False

    logger.info("pyautogui successfully imported, GUI automation available")
except Exception as e:
    logger.warning(f"pyautogui import failed: {str(e)}. GUI operations will be simulated.")

from pynput.mouse import Button, Controller as MouseController
from pynput.keyboard import Key, Controller as KeyboardController

from .base import BaseAccessibilityHandler, BaseAutomationHandler

class LinuxAccessibilityHandler(BaseAccessibilityHandler):
    """Linux implementation of accessibility handler."""
    
    async def get_accessibility_tree(self) -> Dict[str, Any]:
        """Get the accessibility tree of the current window.
        
        Returns:
            Dict[str, Any]: A dictionary containing success status and a simulated tree structure
                           since Linux doesn't have equivalent accessibility API like macOS.
        """
        # Linux doesn't have equivalent accessibility API like macOS
        # Return a minimal dummy tree
        logger.info("Getting accessibility tree (simulated, no accessibility API available on Linux)")
        return {
            "success": True,
            "tree": {
                "role": "Window",
                "title": "Linux Window",
                "position": {"x": 0, "y": 0},
                "size": {"width": 1920, "height": 1080},
                "children": []
            }
        }
    
    async def find_element(self, role: Optional[str] = None,
                          title: Optional[str] = None,
                          value: Optional[str] = None) -> Dict[str, Any]:
        """Find an element in the accessibility tree by criteria.
        
        Args:
            role: The role of the element to find.
            title: The title of the element to find.
            value: The value of the element to find.
            
        Returns:
            Dict[str, Any]: A dictionary indicating that element search is not supported on Linux.
        """
        logger.info(f"Finding element with role={role}, title={title}, value={value} (not supported on Linux)")
        return {
            "success": False,
            "message": "Element search not supported on Linux"
        }
    
    def get_cursor_position(self) -> Tuple[int, int]:
        """Get the current cursor position.
        
        Returns:
            Tuple[int, int]: The x and y coordinates of the cursor position.
                           Returns (0, 0) if pyautogui is not available.
        """
        try:
            pos = pyautogui.position()
            return pos.x, pos.y
        except Exception as e:
            logger.warning(f"Failed to get cursor position with pyautogui: {e}")
        
        logger.info("Getting cursor position (simulated)")
        return 0, 0
    
    def get_screen_size(self) -> Tuple[int, int]:
        """Get the screen size.
        
        Returns:
            Tuple[int, int]: The width and height of the screen in pixels.
                           Returns (1920, 1080) if pyautogui is not available.
        """
        try:
            size = pyautogui.size()
            return size.width, size.height
        except Exception as e:
            logger.warning(f"Failed to get screen size with pyautogui: {e}")
        
        logger.info("Getting screen size (simulated)")
        return 1920, 1080

class LinuxAutomationHandler(BaseAutomationHandler):
    """Linux implementation of automation handler using pyautogui."""
    keyboard = KeyboardController()
    mouse = MouseController()
    
    # Mouse Actions
    async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
        """Press and hold a mouse button at the specified coordinates.
        
        Args:
            x: The x coordinate to move to before pressing. If None, uses current position.
            y: The y coordinate to move to before pressing. If None, uses current position.
            button: The mouse button to press ("left", "right", or "middle").
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            if x is not None and y is not None:
                pyautogui.moveTo(x, y)
            pyautogui.mouseDown(button=button)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
        """Release a mouse button at the specified coordinates.
        
        Args:
            x: The x coordinate to move to before releasing. If None, uses current position.
            y: The y coordinate to move to before releasing. If None, uses current position.
            button: The mouse button to release ("left", "right", or "middle").
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            if x is not None and y is not None:
                pyautogui.moveTo(x, y)
            pyautogui.mouseUp(button=button)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
        """Move the cursor to the specified coordinates.
        
        Args:
            x: The x coordinate to move to.
            y: The y coordinate to move to.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            pyautogui.moveTo(x, y)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
        """Perform a left mouse click at the specified coordinates.
        
        Args:
            x: The x coordinate to click at. If None, clicks at current position.
            y: The y coordinate to click at. If None, clicks at current position.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            if x is not None and y is not None:
                pyautogui.moveTo(x, y)
            pyautogui.click()
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
        """Perform a right mouse click at the specified coordinates.
        
        Args:
            x: The x coordinate to click at. If None, clicks at current position.
            y: The y coordinate to click at. If None, clicks at current position.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            if x is not None and y is not None:
                pyautogui.moveTo(x, y)
            pyautogui.rightClick()
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
        """Perform a double click at the specified coordinates.
        
        Args:
            x: The x coordinate to double click at. If None, clicks at current position.
            y: The y coordinate to double click at. If None, clicks at current position.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            if x is not None and y is not None:
                pyautogui.moveTo(x, y)
            pyautogui.doubleClick(interval=0.1)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def click(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
        """Perform a mouse click with the specified button at the given coordinates.
        
        Args:
            x: The x coordinate to click at. If None, clicks at current position.
            y: The y coordinate to click at. If None, clicks at current position.
            button: The mouse button to click ("left", "right", or "middle").
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            if x is not None and y is not None:
                pyautogui.moveTo(x, y)
            pyautogui.click(button=button)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
        """Drag from the current position to the specified coordinates.
        
        Args:
            x: The x coordinate to drag to.
            y: The y coordinate to drag to.
            button: The mouse button to use for dragging.
            duration: The time in seconds to take for the drag operation.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            pyautogui.dragTo(x, y, duration=duration, button=button)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def drag(self, start_x: int, start_y: int, end_x: int, end_y: int, button: str = "left") -> Dict[str, Any]:
        """Drag from start coordinates to end coordinates.
        
        Args:
            start_x: The starting x coordinate.
            start_y: The starting y coordinate.
            end_x: The ending x coordinate.
            end_y: The ending y coordinate.
            button: The mouse button to use for dragging.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            pyautogui.moveTo(start_x, start_y)
            pyautogui.dragTo(end_x, end_y, duration=0.5, button=button)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def drag_path(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
        """Drag along a path defined by a list of coordinates.
        
        Args:
            path: A list of (x, y) coordinate tuples defining the drag path.
            button: The mouse button to use for dragging.
            duration: The time in seconds to take for each segment of the drag.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            if not path:
                return {"success": False, "error": "Path is empty"}
            pyautogui.moveTo(*path[0])
            for x, y in path[1:]:
                pyautogui.dragTo(x, y, duration=duration, button=button)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Keyboard Actions
    async def key_down(self, key: str) -> Dict[str, Any]:
        """Press and hold a key.
        
        Args:
            key: The key to press down.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            pyautogui.keyDown(key)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}
        
    async def key_up(self, key: str) -> Dict[str, Any]:
        """Release a key.
        
        Args:
            key: The key to release.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            pyautogui.keyUp(key)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def type_text(self, text: str) -> Dict[str, Any]:
        """Type the specified text using the keyboard.
        
        Args:
            text: The text to type.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            # use pynput for Unicode support
            self.keyboard.type(text)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def press_key(self, key: str) -> Dict[str, Any]:
        """Press and release a key.
        
        Args:
            key: The key to press.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            pyautogui.press(key)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
        """Press a combination of keys simultaneously.
        
        Args:
            keys: A list of keys to press together as a hotkey combination.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            pyautogui.hotkey(*keys)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Scrolling Actions
    async def scroll(self, x: int, y: int) -> Dict[str, Any]:
        """Scroll the mouse wheel.
        
        Args:
            x: The horizontal scroll amount.
            y: The vertical scroll amount.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            self.mouse.scroll(x, y)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
        """Scroll down by the specified number of clicks.
        
        Args:
            clicks: The number of scroll clicks to perform downward.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            pyautogui.scroll(-clicks)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
        """Scroll up by the specified number of clicks.
        
        Args:
            clicks: The number of scroll clicks to perform upward.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            pyautogui.scroll(clicks)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Screen Actions
    async def screenshot(self) -> Dict[str, Any]:
        """Take a screenshot of the current screen.
        
        Returns:
            Dict[str, Any]: A dictionary containing success status and base64-encoded image data,
                           or error message if failed.
        """
        try:
            from PIL import Image
            screenshot = pyautogui.screenshot()
            if not isinstance(screenshot, Image.Image):
                return {"success": False, "error": "Failed to capture screenshot"}
            buffered = BytesIO()
            screenshot.save(buffered, format="PNG", optimize=True)
            buffered.seek(0)
            image_data = base64.b64encode(buffered.getvalue()).decode()
            return {"success": True, "image_data": image_data}
        except Exception as e:
            return {"success": False, "error": f"Screenshot error: {str(e)}"}

    async def get_screen_size(self) -> Dict[str, Any]:
        """Get the size of the screen.
        
        Returns:
            Dict[str, Any]: A dictionary containing success status and screen dimensions,
                           or error message if failed.
        """
        try:
            size = pyautogui.size()
            return {"success": True, "size": {"width": size.width, "height": size.height}}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def get_cursor_position(self) -> Dict[str, Any]:
        """Get the current position of the cursor.
        
        Returns:
            Dict[str, Any]: A dictionary containing success status and cursor coordinates,
                           or error message if failed.
        """
        try:
            pos = pyautogui.position()
            return {"success": True, "position": {"x": pos.x, "y": pos.y}}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Clipboard Actions
    async def copy_to_clipboard(self) -> Dict[str, Any]:
        """Get the current content of the clipboard.
        
        Returns:
            Dict[str, Any]: A dictionary containing success status and clipboard content,
                           or error message if failed.
        """
        try:
            import pyperclip
            content = pyperclip.paste()
            return {"success": True, "content": content}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def set_clipboard(self, text: str) -> Dict[str, Any]:
        """Set the clipboard content to the specified text.
        
        Args:
            text: The text to copy to the clipboard.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and error message if failed.
        """
        try:
            import pyperclip
            pyperclip.copy(text)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Command Execution
    async def run_command(self, command: str) -> Dict[str, Any]:
        """Execute a shell command asynchronously.
        
        Args:
            command: The shell command to execute.
            
        Returns:
            Dict[str, Any]: A dictionary containing success status, stdout, stderr,
                           and return code, or error message if failed.
        """
        try:
            # Create subprocess
            process = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            # Wait for the subprocess to finish
            stdout, stderr = await process.communicate()
            # Return decoded output
            return {
                "success": True, 
                "stdout": stdout.decode() if stdout else "", 
                "stderr": stderr.decode() if stderr else "", 
                "return_code": process.returncode
            }
        except Exception as e:
            return {"success": False, "error": str(e)}

```
Page 10/16FirstPrevNextLast