#
tokens: 46174/50000 8/513 files (page 11/16)
lines: off (toggle) GitHub
raw markdown copy
This is page 11 of 16. Use http://codebase.md/trycua/cua?page={x} to view the full context.

# Directory Structure

```
├── .all-contributorsrc
├── .cursorignore
├── .devcontainer
│   ├── devcontainer.json
│   ├── post-install.sh
│   └── README.md
├── .dockerignore
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── ci-lume.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-pylume.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       └── test-validation-script.yml
├── .gitignore
├── .vscode
│   ├── docs.code-workspace
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── composite-agents.md
│   ├── cua-hackathon.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .gitignore
│   ├── .prettierrc
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   └── meta.json
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── meta.json
│   │       │   └── sandboxed-python.mdx
│   │       ├── index.mdx
│   │       ├── libraries
│   │       │   ├── agent
│   │       │   │   └── index.mdx
│   │       │   ├── computer
│   │       │   │   └── index.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── core
│   │       │   │   └── index.mdx
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   └── som
│   │       │       ├── configuration.mdx
│   │       │       └── index.mdx
│   │       ├── meta.json
│   │       ├── quickstart-cli.mdx
│   │       ├── quickstart-devs.mdx
│   │       └── telemetry.mdx
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   └── llms.txt
│   │   │       └── route.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── iou.tsx
│   │   │   └── mermaid.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   └── mdx-components.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── .prettierrc
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   └── uitars.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   └── test_connection.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── scripts
│   │   │       ├── install_mcp_server.sh
│   │   │       └── start_mcp_server.sh
│   │   ├── pylume
│   │   │   ├── __init__.py
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── pylume
│   │   │   │   ├── __init__.py
│   │   │   │   ├── client.py
│   │   │   │   ├── exceptions.py
│   │   │   │   ├── lume
│   │   │   │   ├── models.py
│   │   │   │   ├── pylume.py
│   │   │   │   └── server.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           └── test_omniparser.py
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── biome.json
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Dockerfile
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── pylume_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── pdm.lock
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── samples
│   └── community
│       ├── global-online
│       │   └── README.md
│       └── hack-the-north
│           └── README.md
├── scripts
│   ├── build-uv.sh
│   ├── build.ps1
│   ├── build.sh
│   ├── cleanup.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   └── run-docker-dev.sh
└── tests
    ├── pytest.ini
    ├── shell_cmd.py
    ├── test_files.py
    ├── test_mcp_server_session_management.py
    ├── test_mcp_server_streaming.py
    ├── test_shell_bash.py
    ├── test_telemetry.py
    ├── test_venv.py
    └── test_watchdog.py
```

# Files

--------------------------------------------------------------------------------
/libs/python/computer/computer/providers/winsandbox/provider.py:
--------------------------------------------------------------------------------

```python
"""Windows Sandbox VM provider implementation using pywinsandbox."""

import os
import asyncio
import logging
import time
from typing import Dict, Any, Optional, List
from pathlib import Path

from ..base import BaseVMProvider, VMProviderType

# Setup logging
logger = logging.getLogger(__name__)

try:
    import winsandbox
    HAS_WINSANDBOX = True
except ImportError:
    HAS_WINSANDBOX = False


class WinSandboxProvider(BaseVMProvider):
    """Windows Sandbox VM provider implementation using pywinsandbox.
    
    This provider uses Windows Sandbox to create isolated Windows environments.
    Storage is always ephemeral with Windows Sandbox.
    """
    
    def __init__(
        self, 
        port: int = 7777,
        host: str = "localhost",
        storage: Optional[str] = None,
        verbose: bool = False,
        ephemeral: bool = True,  # Windows Sandbox is always ephemeral
        memory_mb: int = 4096,
        networking: bool = True,
        **kwargs
    ):
        """Initialize the Windows Sandbox provider.
        
        Args:
            port: Port for the computer server (default: 7777)
            host: Host to use for connections (default: localhost)
            storage: Storage path (ignored - Windows Sandbox is always ephemeral)
            verbose: Enable verbose logging
            ephemeral: Always True for Windows Sandbox
            memory_mb: Memory allocation in MB (default: 4096)
            networking: Enable networking in sandbox (default: True)
        """
        if not HAS_WINSANDBOX:
            raise ImportError(
                "pywinsandbox is required for WinSandboxProvider. "
                "Please install it with 'pip install pywinsandbox'"
            )
            
        self.host = host
        self.port = port
        self.verbose = verbose
        self.memory_mb = memory_mb
        self.networking = networking
        
        # Windows Sandbox is always ephemeral
        if not ephemeral:
            logger.warning("Windows Sandbox storage is always ephemeral. Ignoring ephemeral=False.")
        self.ephemeral = True
        
        # Storage is always ephemeral for Windows Sandbox
        if storage and storage != "ephemeral":
            logger.warning("Windows Sandbox does not support persistent storage. Using ephemeral storage.")
        self.storage = "ephemeral"
        
        self.logger = logging.getLogger(__name__)
        
        # Track active sandboxes
        self._active_sandboxes: Dict[str, Any] = {}
        
    @property
    def provider_type(self) -> VMProviderType:
        """Get the provider type."""
        return VMProviderType.WINSANDBOX
        
    async def __aenter__(self):
        """Enter async context manager."""
        # Verify Windows Sandbox is available
        if not HAS_WINSANDBOX:
            raise ImportError("pywinsandbox is not available")
        
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Exit async context manager."""
        # Clean up any active sandboxes
        for name, sandbox in self._active_sandboxes.items():
            try:
                sandbox.shutdown()
                self.logger.info(f"Terminated sandbox: {name}")
            except Exception as e:
                self.logger.error(f"Error terminating sandbox {name}: {e}")
        
        self._active_sandboxes.clear()
        
    async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Get VM information by name.
        
        Args:
            name: Name of the VM to get information for
            storage: Ignored for Windows Sandbox (always ephemeral)
            
        Returns:
            Dictionary with VM information including status, IP address, etc.
        """
        if name not in self._active_sandboxes:
            return {
                "name": name,
                "status": "stopped",
                "ip_address": None,
                "storage": "ephemeral"
            }
        
        sandbox = self._active_sandboxes[name]
        
        # Check if sandbox is still running
        try:
            # Try to ping the sandbox to see if it's responsive
            try:
                sandbox.rpyc.modules.os.getcwd()
                sandbox_responsive = True
            except Exception:
                sandbox_responsive = False
            
            if not sandbox_responsive:
                return {
                    "name": name,
                    "status": "starting",
                    "ip_address": None,
                    "storage": "ephemeral",
                    "memory_mb": self.memory_mb,
                    "networking": self.networking
                }
            
            # Check for computer server address file
            server_address_file = r"C:\Users\WDAGUtilityAccount\Desktop\shared_windows_sandbox_dir\server_address"
            
            try:
                # Check if the server address file exists
                file_exists = sandbox.rpyc.modules.os.path.exists(server_address_file)
                
                if file_exists:
                    # Read the server address file
                    with sandbox.rpyc.builtin.open(server_address_file, 'r') as f:
                        server_address = f.read().strip()
                    
                    if server_address and ':' in server_address:
                        # Parse IP:port from the file
                        ip_address, port = server_address.split(':', 1)
                        
                        # Verify the server is actually responding
                        try:
                            import socket
                            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                            sock.settimeout(3)
                            result = sock.connect_ex((ip_address, int(port)))
                            sock.close()
                            
                            if result == 0:
                                # Server is responding
                                status = "running"
                                self.logger.debug(f"Computer server found at {ip_address}:{port}")
                            else:
                                # Server file exists but not responding
                                status = "starting"
                                ip_address = None
                        except Exception as e:
                            self.logger.debug(f"Error checking server connectivity: {e}")
                            status = "starting"
                            ip_address = None
                    else:
                        # File exists but doesn't contain valid address
                        status = "starting"
                        ip_address = None
                else:
                    # Server address file doesn't exist yet
                    status = "starting"
                    ip_address = None
                    
            except Exception as e:
                self.logger.debug(f"Error checking server address file: {e}")
                status = "starting"
                ip_address = None
                
        except Exception as e:
            self.logger.error(f"Error checking sandbox status: {e}")
            status = "error"
            ip_address = None
        
        return {
            "name": name,
            "status": status,
            "ip_address": ip_address,
            "storage": "ephemeral",
            "memory_mb": self.memory_mb,
            "networking": self.networking
        }
        
    async def list_vms(self) -> List[Dict[str, Any]]:
        """List all available VMs."""
        vms = []
        for name in self._active_sandboxes.keys():
            vm_info = await self.get_vm(name)
            vms.append(vm_info)
        return vms
        
    async def run_vm(self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
        """Run a VM with the given options.
        
        Args:
            image: Image name (ignored for Windows Sandbox - always uses host Windows)
            name: Name of the VM to run
            run_opts: Dictionary of run options (memory, cpu, etc.)
            storage: Ignored for Windows Sandbox (always ephemeral)
        
        Returns:
            Dictionary with VM run status and information
        """
        if name in self._active_sandboxes:
            return {
                "success": False,
                "error": f"Sandbox {name} is already running"
            }
        
        try:
            # Extract options from run_opts
            memory_mb = run_opts.get("memory_mb", self.memory_mb)
            if isinstance(memory_mb, str):
                # Convert memory string like "4GB" to MB
                if memory_mb.upper().endswith("GB"):
                    memory_mb = int(float(memory_mb[:-2]) * 1024)
                elif memory_mb.upper().endswith("MB"):
                    memory_mb = int(memory_mb[:-2])
                else:
                    memory_mb = self.memory_mb
            
            networking = run_opts.get("networking", self.networking)
            
            # Create folder mappers; always map a persistent venv directory on host for caching packages
            folder_mappers = []
            # Ensure host side persistent venv directory exists (Path.home()/wsb_venv)
            host_wsb_env = Path.home() / ".cua" / "wsb_cache"
            try:
                host_wsb_env.mkdir(parents=True, exist_ok=True)
            except Exception:
                # If cannot create, continue without persistent mapping
                host_wsb_env = None
            shared_directories = run_opts.get("shared_directories", [])
            for shared_dir in shared_directories:
                if isinstance(shared_dir, dict):
                    host_path = shared_dir.get("hostPath", "")
                elif isinstance(shared_dir, str):
                    host_path = shared_dir
                else:
                    continue
                    
                if host_path and os.path.exists(host_path):
                    folder_mappers.append(winsandbox.FolderMapper(host_path))

            # Add mapping for the persistent venv directory (read/write) so it appears in Sandbox Desktop
            if host_wsb_env is not None and host_wsb_env.exists():
                try:
                    folder_mappers.append(
                        winsandbox.FolderMapper(str(host_wsb_env), read_only=False)
                    )
                except Exception as e:
                    self.logger.warning(f"Failed to map host winsandbox_venv: {e}")
            
            self.logger.info(f"Creating Windows Sandbox: {name}")
            self.logger.info(f"Memory: {memory_mb}MB, Networking: {networking}")
            if folder_mappers:
                self.logger.info(f"Shared directories: {len(folder_mappers)}")
            
            # Create the sandbox without logon script
            try:
                # Try with memory_mb parameter (newer pywinsandbox version)
                sandbox = winsandbox.new_sandbox(
                    memory_mb=str(memory_mb),
                    networking=networking,
                    folder_mappers=folder_mappers
                )
            except TypeError as e:
                if "memory_mb" in str(e):
                    # Fallback for older pywinsandbox version that doesn't support memory_mb
                    self.logger.warning(
                        f"Your pywinsandbox version doesn't support memory_mb parameter. "
                        f"Using default memory settings. To use custom memory settings, "
                        f"please update pywinsandbox: pip install -U git+https://github.com/karkason/pywinsandbox.git"
                    )
                    sandbox = winsandbox.new_sandbox(
                        networking=networking,
                        folder_mappers=folder_mappers
                    )
                else:
                    # Re-raise if it's a different TypeError
                    raise
            
            # Store the sandbox
            self._active_sandboxes[name] = sandbox
            
            self.logger.info(f"Windows Sandbox {name} created successfully")
            
            venv_exists = (host_wsb_env / "venv" / "Lib" / "site-packages" / "computer_server").exists() if host_wsb_env else False

            # Setup the computer server in the sandbox
            await self._setup_computer_server(sandbox, name, wait_for_venv=(not venv_exists))
            
            return {
                "success": True,
                "name": name,
                "status": "starting",
                "memory_mb": memory_mb,
                "networking": networking,
                "storage": "ephemeral"
            }
            
        except Exception as e:
            self.logger.error(f"Failed to create Windows Sandbox {name}: {e}")
            # stack trace
            import traceback
            self.logger.error(f"Stack trace: {traceback.format_exc()}")
            return {
                "success": False,
                "error": f"Failed to create sandbox: {str(e)}"
            }
        
    async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Stop a running VM.
        
        Args:
            name: Name of the VM to stop
            storage: Ignored for Windows Sandbox
            
        Returns:
            Dictionary with stop status and information
        """
        if name not in self._active_sandboxes:
            return {
                "success": False,
                "error": f"Sandbox {name} is not running"
            }
        
        try:
            sandbox = self._active_sandboxes[name]
            
            # Terminate the sandbox
            sandbox.shutdown()
            
            # Remove from active sandboxes
            del self._active_sandboxes[name]
            
            self.logger.info(f"Windows Sandbox {name} stopped successfully")
            
            return {
                "success": True,
                "name": name,
                "status": "stopped"
            }
            
        except Exception as e:
            self.logger.error(f"Failed to stop Windows Sandbox {name}: {e}")
            return {
                "success": False,
                "error": f"Failed to stop sandbox: {str(e)}"
            }
        
    async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
        """Update VM configuration.
        
        Note: Windows Sandbox does not support runtime configuration updates.
        The sandbox must be stopped and restarted with new configuration.
        
        Args:
            name: Name of the VM to update
            update_opts: Dictionary of update options
            storage: Ignored for Windows Sandbox
            
        Returns:
            Dictionary with update status and information
        """
        return {
            "success": False,
            "error": "Windows Sandbox does not support runtime configuration updates. "
                    "Please stop and restart the sandbox with new configuration."
        }

    async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        raise NotImplementedError("WinSandboxProvider does not support restarting VMs.")
        
    async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str:
        """Get the IP address of a VM, waiting indefinitely until it's available.
        
        Args:
            name: Name of the VM to get the IP for
            storage: Ignored for Windows Sandbox
            retry_delay: Delay between retries in seconds (default: 2)
            
        Returns:
            IP address of the VM when it becomes available
        """
        total_attempts = 0
        
        # Loop indefinitely until we get a valid IP
        while True:
            total_attempts += 1
            
            # Log retry message but not on first attempt
            if total_attempts > 1:
                self.logger.info(f"Waiting for Windows Sandbox {name} IP address (attempt {total_attempts})...")
            
            try:
                # Get VM information
                vm_info = await self.get_vm(name, storage=storage)
                
                # Check if we got a valid IP
                ip = vm_info.get("ip_address", None)
                if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
                    self.logger.info(f"Got valid Windows Sandbox IP address: {ip}")
                    return ip
                    
                # Check the VM status
                status = vm_info.get("status", "unknown")
                
                # If VM is not running yet, log and wait
                if status != "running":
                    self.logger.info(f"Windows Sandbox is not running yet (status: {status}). Waiting...")
                # If VM is running but no IP yet, wait and retry
                else:
                    self.logger.info("Windows Sandbox is running but no valid IP address yet. Waiting...")
                
            except Exception as e:
                self.logger.warning(f"Error getting Windows Sandbox {name} IP: {e}, continuing to wait...")
                
            # Wait before next retry
            await asyncio.sleep(retry_delay)
            
            # Add progress log every 10 attempts
            if total_attempts % 10 == 0:
                self.logger.info(f"Still waiting for Windows Sandbox {name} IP after {total_attempts} attempts...")
    
    async def _setup_computer_server(self, sandbox, name: str, visible: bool = False, wait_for_venv: bool = True):
        """Setup the computer server in the Windows Sandbox using RPyC.
        
        Args:
            sandbox: The Windows Sandbox instance
            name: Name of the sandbox
            visible: Whether the opened process should be visible (default: False)
        """
        try:
            self.logger.info(f"Setting up computer server in sandbox {name}...")

            # Read the PowerShell setup script
            script_path = os.path.join(os.path.dirname(__file__), "setup_script.ps1")
            with open(script_path, 'r', encoding='utf-8') as f:
                setup_script_content = f.read()
            
            # Write the setup script to the sandbox using RPyC
            script_dest_path = r"C:\Users\WDAGUtilityAccount\setup_cua.ps1"
            
            self.logger.info(f"Writing setup script to {script_dest_path}")
            with sandbox.rpyc.builtin.open(script_dest_path, 'w') as f:
                f.write(setup_script_content)
            
            # Execute the PowerShell script in the background
            self.logger.info("Executing setup script in sandbox...")
            
            # Use subprocess to run PowerShell script
            import subprocess
            powershell_cmd = [
                "powershell.exe", 
                "-ExecutionPolicy", "Bypass",
                "-NoExit",  # Keep window open after script completes
                "-File", script_dest_path
            ]
            
            # Set creation flags based on visibility preference
            if visible:
                # CREATE_NEW_CONSOLE - creates a new console window (visible)
                creation_flags = 0x00000010
            else:
                creation_flags = 0x08000000 # CREATE_NO_WINDOW
            
            # Start the process using RPyC
            process = sandbox.rpyc.modules.subprocess.Popen(
                powershell_cmd,
                creationflags=creation_flags,
                shell=False
            )

            if wait_for_venv:
                print("Waiting for venv to be created for the first time setup of Windows Sandbox...")
                print("This may take a minute...")
                await asyncio.sleep(120)
            
            ip = await self.get_ip(name)
            self.logger.info(f"Sandbox IP: {ip}")
            self.logger.info(f"Setup script started in background in sandbox {name} with PID: {process.pid}")
            
        except Exception as e:
            self.logger.error(f"Failed to setup computer server in sandbox {name}: {e}")
            import traceback
            self.logger.error(f"Stack trace: {traceback.format_exc()}")

```

--------------------------------------------------------------------------------
/libs/python/computer/computer/providers/lume/provider.py:
--------------------------------------------------------------------------------

```python
"""Lume VM provider implementation using curl commands.

This provider uses direct curl commands to interact with the Lume API,
removing the dependency on the pylume Python package.
"""

import os
import re
import asyncio
import json
import logging
import subprocess
import urllib.parse
from typing import Dict, Any, Optional, List, Tuple

from ..base import BaseVMProvider, VMProviderType
from ...logger import Logger, LogLevel
from ..lume_api import (
    lume_api_get,
    lume_api_run,
    lume_api_stop,
    lume_api_update,
    lume_api_pull,
    HAS_CURL,
    parse_memory
)

# Setup logging
logger = logging.getLogger(__name__)

class LumeProvider(BaseVMProvider):
    """Lume VM provider implementation using direct curl commands.
    
    This provider uses curl to interact with the Lume API server,
    removing the dependency on the pylume Python package.
    """
    
    def __init__(
        self, 
        port: int = 7777,
        host: str = "localhost",
        storage: Optional[str] = None,
        verbose: bool = False,
        ephemeral: bool = False,
    ):
        """Initialize the Lume provider.
        
        Args:
            port: Port for the Lume API server (default: 7777)
            host: Host to use for API connections (default: localhost)
            storage: Path to store VM data
            verbose: Enable verbose logging
        """
        if not HAS_CURL:
            raise ImportError(
                "curl is required for LumeProvider. "
                "Please ensure it is installed and in your PATH."
            )
            
        self.host = host
        self.port = port  # Default port for Lume API
        self.storage = storage
        self.verbose = verbose
        self.ephemeral = ephemeral  # If True, VMs will be deleted after stopping
        
        # Base API URL for Lume API calls
        self.api_base_url = f"http://{self.host}:{self.port}"
        
        self.logger = logging.getLogger(__name__)
        
    @property
    def provider_type(self) -> VMProviderType:
        """Get the provider type."""
        return VMProviderType.LUME
        
    async def __aenter__(self):
        """Enter async context manager."""
        # No initialization needed, just return self
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Exit async context manager."""
        # No cleanup needed
        pass
            
    def _lume_api_get(self, vm_name: str = "", storage: Optional[str] = None, debug: bool = False) -> Dict[str, Any]:
        """Get VM information using shared lume_api function.
        
        Args:
            vm_name: Optional name of the VM to get info for.
                     If empty, lists all VMs.
            storage: Optional storage path override. If provided, this will be used instead of self.storage
            debug: Whether to show debug output
            
        Returns:
            Dictionary with VM status information parsed from JSON response
        """
        # Use the shared implementation from lume_api module
        return lume_api_get(
            vm_name=vm_name,
            host=self.host,
            port=self.port,
            storage=storage if storage is not None else self.storage,
            debug=debug,
            verbose=self.verbose
        )
    
    def _lume_api_run(self, vm_name: str, run_opts: Dict[str, Any], debug: bool = False) -> Dict[str, Any]:
        """Run a VM using shared lume_api function.
        
        Args:
            vm_name: Name of the VM to run
            run_opts: Dictionary of run options
            debug: Whether to show debug output
            
        Returns:
            Dictionary with API response or error information
        """
        # Use the shared implementation from lume_api module
        return lume_api_run(
            vm_name=vm_name, 
            host=self.host,
            port=self.port,
            run_opts=run_opts,
            storage=self.storage,
            debug=debug,
            verbose=self.verbose
        )
    
    def _lume_api_stop(self, vm_name: str, debug: bool = False) -> Dict[str, Any]:
        """Stop a VM using shared lume_api function.
        
        Args:
            vm_name: Name of the VM to stop
            debug: Whether to show debug output
            
        Returns:
            Dictionary with API response or error information
        """
        # Use the shared implementation from lume_api module
        return lume_api_stop(
            vm_name=vm_name, 
            host=self.host,
            port=self.port,
            storage=self.storage,
            debug=debug,
            verbose=self.verbose
        )
    
    def _lume_api_update(self, vm_name: str, update_opts: Dict[str, Any], debug: bool = False) -> Dict[str, Any]:
        """Update VM configuration using shared lume_api function.
        
        Args:
            vm_name: Name of the VM to update
            update_opts: Dictionary of update options
            debug: Whether to show debug output
            
        Returns:
            Dictionary with API response or error information
        """
        # Use the shared implementation from lume_api module
        return lume_api_update(
            vm_name=vm_name, 
            host=self.host,
            port=self.port,
            update_opts=update_opts,
            storage=self.storage,
            debug=debug,
            verbose=self.verbose
        )
    
    async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Get VM information by name.
        
        Args:
            name: Name of the VM to get information for
            storage: Optional storage path override. If provided, this will be used
                    instead of the provider's default storage path.
            
        Returns:
            Dictionary with VM information including status, IP address, etc.
            
        Note:
            If storage is not provided, the provider's default storage path will be used.
            The storage parameter allows overriding the storage location for this specific call.
        """
        if not HAS_CURL:
            logger.error("curl is not available. Cannot get VM status.")
            return {
                "name": name,
                "status": "unavailable",
                "error": "curl is not available"
            }
        
        # First try to get detailed VM info from the API
        try:
            # Query the Lume API for VM status using the provider's storage_path
            vm_info = self._lume_api_get(
                vm_name=name, 
                storage=storage if storage is not None else self.storage,
                debug=self.verbose
            )
            
            # Check for API errors
            if "error" in vm_info:
                logger.debug(f"API request error: {vm_info['error']}")
                # If we got an error from the API, report the VM as not ready yet
                return {
                    "name": name,
                    "status": "starting",  # VM is still starting - do not attempt to connect yet
                    "api_status": "error",
                    "error": vm_info["error"]
                }
            
            # Process the VM status information
            vm_status = vm_info.get("status", "unknown")
            
            # Check if VM is stopped or not running - don't wait for IP in this case
            if vm_status == "stopped":
                logger.info(f"VM {name} is in '{vm_status}' state - not waiting for IP address")
                # Return the status as-is without waiting for an IP
                result = {
                    "name": name,
                    "status": vm_status,
                    **vm_info  # Include all original fields from the API response
                }
                return result
            
            # Handle field name differences between APIs
            # Some APIs use camelCase, others use snake_case
            if "vncUrl" in vm_info:
                vnc_url = vm_info["vncUrl"]
            elif "vnc_url" in vm_info:
                vnc_url = vm_info["vnc_url"]
            else:
                vnc_url = ""
                
            if "ipAddress" in vm_info:
                ip_address = vm_info["ipAddress"]
            elif "ip_address" in vm_info:
                ip_address = vm_info["ip_address"]
            else:
                # If no IP address is provided and VM is supposed to be running,
                # report it as still starting
                ip_address = None
                logger.info(f"VM {name} is in '{vm_status}' state but no IP address found - reporting as still starting")
                
            logger.info(f"VM {name} status: {vm_status}")
            
            # Return the complete status information
            result = {
                "name": name,
                "status": vm_status if vm_status else "running",
                "ip_address": ip_address,
                "vnc_url": vnc_url,
                "api_status": "ok"
            }
            
            # Include all original fields from the API response
            if isinstance(vm_info, dict):
                for key, value in vm_info.items():
                    if key not in result:  # Don't override our carefully processed fields
                        result[key] = value
                        
            return result
            
        except Exception as e:
            logger.error(f"Failed to get VM status: {e}")
            # Return a fallback status that indicates the VM is not ready yet
            return {
                "name": name,
                "status": "initializing",  # VM is still initializing
                "error": f"Failed to get VM status: {str(e)}"
            }
        
    async def list_vms(self) -> List[Dict[str, Any]]:
        """List all available VMs."""
        result = self._lume_api_get(debug=self.verbose)
        
        # Extract the VMs list from the response
        if "vms" in result and isinstance(result["vms"], list):
            return result["vms"]
        elif "error" in result:
            logger.error(f"Error listing VMs: {result['error']}")
            return []
        else:
            return []
        
    async def run_vm(self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
        """Run a VM with the given options.
        
        If the VM does not exist in the storage location, this will attempt to pull it
        from the Lume registry first.
        
        Args:
            image: Image name to use when pulling the VM if it doesn't exist
            name: Name of the VM to run
            run_opts: Dictionary of run options (memory, cpu, etc.)
            storage: Optional storage path override. If provided, this will be used
                    instead of the provider's default storage path.
        
        Returns:
            Dictionary with VM run status and information
        """
        # First check if VM exists by trying to get its info
        vm_info = await self.get_vm(name, storage=storage)
        
        if "error" in vm_info:
            # VM doesn't exist, try to pull it
            self.logger.info(f"VM {name} not found, attempting to pull image {image} from registry...")
            
            # Call pull_vm with the image parameter
            pull_result = await self.pull_vm(
                name=name, 
                image=image, 
                storage=storage
            )
            
            # Check if pull was successful
            if "error" in pull_result:
                self.logger.error(f"Failed to pull VM image: {pull_result['error']}")
                return pull_result  # Return the error from pull
                
            self.logger.info(f"Successfully pulled VM image {image} as {name}")
        
        # Now run the VM with the given options
        self.logger.info(f"Running VM {name} with options: {run_opts}")
        
        from ..lume_api import lume_api_run
        return lume_api_run(
            vm_name=name,
            host=self.host,
            port=self.port,
            run_opts=run_opts,
            storage=storage if storage is not None else self.storage,
            debug=self.verbose,
            verbose=self.verbose
        )
        
    async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Stop a running VM.
        
        If this provider was initialized with ephemeral=True, the VM will also
        be deleted after it is stopped.
        
        Args:
            name: Name of the VM to stop
            storage: Optional storage path override
            
        Returns:
            Dictionary with stop status and information
        """
        # Stop the VM first
        stop_result = self._lume_api_stop(name, debug=self.verbose)
        
        # Log ephemeral status for debugging
        self.logger.info(f"Ephemeral mode status: {self.ephemeral}")
        
        # If ephemeral mode is enabled, delete the VM after stopping
        if self.ephemeral and (stop_result.get("success", False) or "error" not in stop_result):
            self.logger.info(f"Ephemeral mode enabled - deleting VM {name} after stopping")
            try:
                delete_result = await self.delete_vm(name, storage=storage)
                
                # Return combined result
                return {
                    **stop_result,  # Include all stop result info
                    "deleted": True,
                    "delete_result": delete_result
                }
            except Exception as e:
                self.logger.error(f"Failed to delete ephemeral VM {name}: {e}")
                # Include the error but still return stop result
                return {
                    **stop_result,
                    "deleted": False,
                    "delete_error": str(e)
                }
        
        # Just return the stop result if not ephemeral
        return stop_result
        
    async def pull_vm(
        self,
        name: str,
        image: str,
        storage: Optional[str] = None,
        registry: str = "ghcr.io",
        organization: str = "trycua",
        pull_opts: Optional[Dict[str, Any]] = None,
    ) -> Dict[str, Any]:
        """Pull a VM image from the registry.
        
        Args:
            name: Name for the VM after pulling
            image: The image name to pull (e.g. 'macos-sequoia-cua:latest')
            storage: Optional storage path to use
            registry: Registry to pull from (default: ghcr.io)
            organization: Organization in registry (default: trycua)
            pull_opts: Additional options for pulling the VM (optional)
            
        Returns:
            Dictionary with information about the pulled VM
            
        Raises:
            RuntimeError: If pull operation fails or image is not provided
        """
        # Validate image parameter
        if not image:
            raise ValueError("Image parameter is required for pull_vm")
            
        self.logger.info(f"Pulling VM image '{image}' as '{name}'")
        self.logger.info("You can check the pull progress using: lume logs -f")
        
        # Set default pull_opts if not provided
        if pull_opts is None:
            pull_opts = {}
            
        # Log information about the operation
        self.logger.debug(f"Pull storage location: {storage or 'default'}")
        
        try:
            # Call the lume_api_pull function from lume_api.py
            from ..lume_api import lume_api_pull
            
            result = lume_api_pull(
                image=image,
                name=name,
                host=self.host,
                port=self.port,
                storage=storage if storage is not None else self.storage,
                registry=registry,
                organization=organization,
                debug=self.verbose,
                verbose=self.verbose
            )
            
            # Check for errors in the result
            if "error" in result:
                self.logger.error(f"Failed to pull VM image: {result['error']}")
                return result
                
            self.logger.info(f"Successfully pulled VM image '{image}' as '{name}'")
            return result
        except Exception as e:
            self.logger.error(f"Failed to pull VM image '{image}': {e}")
            return {"error": f"Failed to pull VM: {str(e)}"}
        
    async def delete_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Delete a VM permanently.
        
        Args:
            name: Name of the VM to delete
            storage: Optional storage path override
            
        Returns:
            Dictionary with delete status and information
        """
        self.logger.info(f"Deleting VM {name}...")
        
        try:
            # Call the lume_api_delete function we created
            from ..lume_api import lume_api_delete
            
            result = lume_api_delete(
                vm_name=name,
                host=self.host,
                port=self.port,
                storage=storage if storage is not None else self.storage,
                debug=self.verbose,
                verbose=self.verbose
            )
            
            # Check for errors in the result
            if "error" in result:
                self.logger.error(f"Failed to delete VM: {result['error']}")
                return result
                
            self.logger.info(f"Successfully deleted VM '{name}'")
            return result
        except Exception as e:
            self.logger.error(f"Failed to delete VM '{name}': {e}")
            return {"error": f"Failed to delete VM: {str(e)}"}
    
    async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
        """Update VM configuration."""
        return self._lume_api_update(name, update_opts, debug=self.verbose)
        
    async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        raise NotImplementedError("LumeProvider does not support restarting VMs.")
        
    async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str:
        """Get the IP address of a VM, waiting indefinitely until it's available.
        
        Args:
            name: Name of the VM to get the IP for
            storage: Optional storage path override
            retry_delay: Delay between retries in seconds (default: 2)
            
        Returns:
            IP address of the VM when it becomes available
        """
        # Track total attempts for logging purposes
        total_attempts = 0
        
        # Loop indefinitely until we get a valid IP
        while True:
            total_attempts += 1
            
            # Log retry message but not on first attempt
            if total_attempts > 1:
                self.logger.info(f"Waiting for VM {name} IP address (attempt {total_attempts})...")
            
            try:
                # Get VM information
                vm_info = await self.get_vm(name, storage=storage)
                
                # Check if we got a valid IP
                ip = vm_info.get("ip_address", None)
                if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
                    self.logger.info(f"Got valid VM IP address: {ip}")
                    return ip
                    
                # Check the VM status
                status = vm_info.get("status", "unknown")
                
                # If VM is not running yet, log and wait
                if status != "running":
                    self.logger.info(f"VM is not running yet (status: {status}). Waiting...")
                # If VM is running but no IP yet, wait and retry
                else:
                    self.logger.info("VM is running but no valid IP address yet. Waiting...")
                
            except Exception as e:
                self.logger.warning(f"Error getting VM {name} IP: {e}, continuing to wait...")
                
            # Wait before next retry
            await asyncio.sleep(retry_delay)
            
            # Add progress log every 10 attempts
            if total_attempts % 10 == 0:
                self.logger.info(f"Still waiting for VM {name} IP after {total_attempts} attempts...")
        


```

--------------------------------------------------------------------------------
/libs/python/computer/computer/providers/docker/provider.py:
--------------------------------------------------------------------------------

```python
"""
Docker VM provider implementation.

This provider uses Docker containers running the CUA Ubuntu image to create
Linux VMs with computer-server. It handles VM lifecycle operations through Docker
commands and container management.
"""

import logging
import json
import asyncio
from typing import Dict, List, Optional, Any
import subprocess
import time
import re

from ..base import BaseVMProvider, VMProviderType

# Setup logging
logger = logging.getLogger(__name__)

# Check if Docker is available
try:
    subprocess.run(["docker", "--version"], capture_output=True, check=True)
    HAS_DOCKER = True
except (subprocess.SubprocessError, FileNotFoundError):
    HAS_DOCKER = False


class DockerProvider(BaseVMProvider):
    """
    Docker VM Provider implementation using Docker containers.
    
    This provider uses Docker to run containers with the CUA Ubuntu image
    that includes computer-server for remote computer use.
    """
    
    def __init__(
        self,
        port: Optional[int] = 8000,
        host: str = "localhost",
        storage: Optional[str] = None,
        shared_path: Optional[str] = None,
        image: str = "trycua/cua-ubuntu:latest",
        verbose: bool = False,
        ephemeral: bool = False,
        vnc_port: Optional[int] = 6901,
    ):
        """Initialize the Docker VM Provider.

        Args:
            port: Currently unused (VM provider port)
            host: Hostname for the API server (default: localhost)
            storage: Path for persistent VM storage
            shared_path: Path for shared folder between host and container
            image: Docker image to use (default: "trycua/cua-ubuntu:latest")
                   Supported images:
                   - "trycua/cua-ubuntu:latest" (Kasm-based)
                   - "trycua/cua-docker-xfce:latest" (vanilla XFCE)
            verbose: Enable verbose logging
            ephemeral: Use ephemeral (temporary) storage
            vnc_port: Port for VNC interface (default: 6901)
        """
        self.host = host
        self.api_port = 8000
        self.vnc_port = vnc_port
        self.ephemeral = ephemeral

        # Handle ephemeral storage (temporary directory)
        if ephemeral:
            self.storage = "ephemeral"
        else:
            self.storage = storage

        self.shared_path = shared_path
        self.image = image
        self.verbose = verbose
        self._container_id = None
        self._running_containers = {}  # Track running containers by name

        # Detect image type and configure user directory accordingly
        self._detect_image_config()
        
    def _detect_image_config(self):
        """Detect image type and configure paths accordingly."""
        # Detect if this is a docker-xfce image or Kasm image
        if "docker-xfce" in self.image.lower() or "xfce" in self.image.lower():
            self._home_dir = "/home/cua"
            self._image_type = "docker-xfce"
            logger.info(f"Detected docker-xfce image: using {self._home_dir}")
        else:
            # Default to Kasm configuration
            self._home_dir = "/home/kasm-user"
            self._image_type = "kasm"
            logger.info(f"Detected Kasm image: using {self._home_dir}")

    @property
    def provider_type(self) -> VMProviderType:
        """Return the provider type."""
        return VMProviderType.DOCKER
    
    def _parse_memory(self, memory_str: str) -> str:
        """Parse memory string to Docker format.
        
        Examples:
            "8GB" -> "8g"
            "1024MB" -> "1024m"
            "512" -> "512m"
        """
        if isinstance(memory_str, int):
            return f"{memory_str}m"
            
        if isinstance(memory_str, str):
            # Extract number and unit
            match = re.match(r"(\d+)([A-Za-z]*)", memory_str)
            if match:
                value, unit = match.groups()
                unit = unit.upper()
                
                if unit == "GB" or unit == "G":
                    return f"{value}g"
                elif unit == "MB" or unit == "M" or unit == "":
                    return f"{value}m"
                    
        # Default fallback
        logger.warning(f"Could not parse memory string '{memory_str}', using 4g default")
        return "4g"  # Default to 4GB
    
    async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Get VM information by name.
        
        Args:
            name: Name of the VM to get information for
            storage: Optional storage path override. If provided, this will be used
                    instead of the provider's default storage path.
        
        Returns:
            Dictionary with VM information including status, IP address, etc.
        """
        try:
            # Check if container exists and get its status
            cmd = ["docker", "inspect", name]
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode != 0:
                # Container doesn't exist
                return {
                    "name": name,
                    "status": "not_found",
                    "ip_address": None,
                    "ports": {},
                    "image": self.image,
                    "provider": "docker"
                }
            
            # Parse container info
            container_info = json.loads(result.stdout)[0]
            state = container_info["State"]
            network_settings = container_info["NetworkSettings"]
            
            # Determine status
            if state["Running"]:
                status = "running"
            elif state["Paused"]:
                status = "paused"
            else:
                status = "stopped"
            
            # Get IP address
            ip_address = network_settings.get("IPAddress", "")
            if not ip_address and "Networks" in network_settings:
                # Try to get IP from bridge network
                for network_name, network_info in network_settings["Networks"].items():
                    if network_info.get("IPAddress"):
                        ip_address = network_info["IPAddress"]
                        break
            
            # Get port mappings
            ports = {}
            if "Ports" in network_settings and network_settings["Ports"]:
                # network_settings["Ports"] is a dict like:
                # {'6901/tcp': [{'HostIp': '0.0.0.0', 'HostPort': '6901'}, ...], ...}
                for container_port, port_mappings in network_settings["Ports"].items():
                    if port_mappings:  # Check if there are any port mappings
                        # Take the first mapping (usually the IPv4 one)
                        for mapping in port_mappings:
                            if mapping.get("HostPort"):
                                ports[container_port] = mapping["HostPort"]
                                break  # Use the first valid mapping
            
            return {
                "name": name,
                "status": status,
                "ip_address": ip_address or "127.0.0.1",  # Use localhost if no IP
                "ports": ports,
                "image": container_info["Config"]["Image"],
                "provider": "docker",
                "container_id": container_info["Id"][:12],  # Short ID
                "created": container_info["Created"],
                "started": state.get("StartedAt", ""),
            }
            
        except Exception as e:
            logger.error(f"Error getting VM info for {name}: {e}")
            import traceback
            traceback.print_exc()
            return {
                "name": name,
                "status": "error",
                "error": str(e),
                "provider": "docker"
            }
    
    async def list_vms(self) -> List[Dict[str, Any]]:
        """List all Docker containers managed by this provider."""
        try:
            # List all containers (running and stopped) with the CUA image
            cmd = ["docker", "ps", "-a", "--filter", f"ancestor={self.image}", "--format", "json"]
            result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            
            containers = []
            if result.stdout.strip():
                for line in result.stdout.strip().split('\n'):
                    if line.strip():
                        container_data = json.loads(line)
                        vm_info = await self.get_vm(container_data["Names"])
                        containers.append(vm_info)
            
            return containers
            
        except subprocess.CalledProcessError as e:
            logger.error(f"Error listing containers: {e.stderr}")
            return []
        except Exception as e:
            logger.error(f"Error listing VMs: {e}")
            import traceback
            traceback.print_exc()
            return []
    
    async def run_vm(self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
        """Run a VM with the given options.
        
        Args:
            image: Name/tag of the Docker image to use
            name: Name of the container to run
            run_opts: Options for running the VM, including:
                - memory: Memory limit (e.g., "4GB", "2048MB")
                - cpu: CPU limit (e.g., 2 for 2 cores)
                - vnc_port: Specific port for VNC interface
                - api_port: Specific port for computer-server API
        
        Returns:
            Dictionary with VM status information
        """
        try:
            # Check if container already exists
            existing_vm = await self.get_vm(name, storage)
            if existing_vm["status"] == "running":
                logger.info(f"Container {name} is already running")
                return existing_vm
            elif existing_vm["status"] in ["stopped", "paused"]:
                # Start existing container
                logger.info(f"Starting existing container {name}")
                start_cmd = ["docker", "start", name]
                result = subprocess.run(start_cmd, capture_output=True, text=True, check=True)
                
                # Wait for container to be ready
                await self._wait_for_container_ready(name)
                return await self.get_vm(name, storage)
            
            # Use provided image or default
            docker_image = image if image != "default" else self.image
            
            # Build docker run command
            cmd = ["docker", "run", "-d", "--name", name]
            
            # Add memory limit if specified
            if "memory" in run_opts:
                memory_limit = self._parse_memory(run_opts["memory"])
                cmd.extend(["--memory", memory_limit])
            
            # Add CPU limit if specified
            if "cpu" in run_opts:
                cpu_count = str(run_opts["cpu"])
                cmd.extend(["--cpus", cpu_count])
            
            # Add port mappings
            vnc_port = run_opts.get("vnc_port", self.vnc_port)
            api_port = run_opts.get("api_port", self.api_port)
            
            if vnc_port:
                cmd.extend(["-p", f"{vnc_port}:6901"])  # VNC port
            if api_port:
                cmd.extend(["-p", f"{api_port}:8000"])  # computer-server API port
            
            # Add volume mounts if storage is specified
            storage_path = storage or self.storage
            if storage_path and storage_path != "ephemeral":
                # Mount storage directory using detected home directory
                cmd.extend(["-v", f"{storage_path}:{self._home_dir}/storage"])

            # Add shared path if specified
            if self.shared_path:
                # Mount shared directory using detected home directory
                cmd.extend(["-v", f"{self.shared_path}:{self._home_dir}/shared"])
            
            # Add environment variables
            cmd.extend(["-e", "VNC_PW=password"])  # Set VNC password
            cmd.extend(["-e", "VNCOPTIONS=-disableBasicAuth"])  # Disable VNC basic auth
            
            # Add the image
            cmd.append(docker_image)
            
            logger.info(f"Running Docker container with command: {' '.join(cmd)}")
            
            # Run the container
            result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            container_id = result.stdout.strip()
            
            logger.info(f"Container {name} started with ID: {container_id[:12]}")
            
            # Store container info
            self._container_id = container_id
            self._running_containers[name] = container_id
            
            # Wait for container to be ready
            await self._wait_for_container_ready(name)
            
            # Return VM info
            vm_info = await self.get_vm(name, storage)
            vm_info["container_id"] = container_id[:12]
            
            return vm_info
            
        except subprocess.CalledProcessError as e:
            error_msg = f"Failed to run container {name}: {e.stderr}"
            logger.error(error_msg)
            return {
                "name": name,
                "status": "error",
                "error": error_msg,
                "provider": "docker"
            }
        except Exception as e:
            error_msg = f"Error running VM {name}: {e}"
            logger.error(error_msg)
            return {
                "name": name,
                "status": "error",
                "error": error_msg,
                "provider": "docker"
            }
    
    async def _wait_for_container_ready(self, container_name: str, timeout: int = 60) -> bool:
        """Wait for the Docker container to be fully ready.
        
        Args:
            container_name: Name of the Docker container to check
            timeout: Maximum time to wait in seconds (default: 60 seconds)
            
        Returns:
            True if the container is running and ready
        """
        logger.info(f"Waiting for container {container_name} to be ready...")
        
        start_time = time.time()
        while time.time() - start_time < timeout:
            try:
                # Check if container is running
                vm_info = await self.get_vm(container_name)
                if vm_info["status"] == "running":
                    logger.info(f"Container {container_name} is running")
                    
                    # Additional check: try to connect to computer-server API
                    # This is optional - we'll just wait a bit more for services to start
                    await asyncio.sleep(5)
                    return True
                    
            except Exception as e:
                logger.debug(f"Container {container_name} not ready yet: {e}")
            
            await asyncio.sleep(2)
        
        logger.warning(f"Container {container_name} did not become ready within {timeout} seconds")
        return False
    
    async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Stop a running VM by stopping the Docker container."""
        try:
            logger.info(f"Stopping container {name}")
            
            # Stop the container
            cmd = ["docker", "stop", name]
            result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            
            # Remove from running containers tracking
            if name in self._running_containers:
                del self._running_containers[name]
            
            logger.info(f"Container {name} stopped successfully")
            
            return {
                "name": name,
                "status": "stopped",
                "message": "Container stopped successfully",
                "provider": "docker"
            }
            
        except subprocess.CalledProcessError as e:
            error_msg = f"Failed to stop container {name}: {e.stderr}"
            logger.error(error_msg)
            return {
                "name": name,
                "status": "error",
                "error": error_msg,
                "provider": "docker"
            }
        except Exception as e:
            error_msg = f"Error stopping VM {name}: {e}"
            logger.error(error_msg)
            return {
                "name": name,
                "status": "error",
                "error": error_msg,
                "provider": "docker"
            }
    
    async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        raise NotImplementedError("DockerProvider does not support restarting VMs.")

    async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
        """Update VM configuration.
        
        Note: Docker containers cannot be updated while running. 
        This method will return an error suggesting to recreate the container.
        """
        return {
            "name": name,
            "status": "error",
            "error": "Docker containers cannot be updated while running. Please stop and recreate the container with new options.",
            "provider": "docker"
        }
    
    async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str:
        """Get the IP address of a VM, waiting indefinitely until it's available.
        
        Args:
            name: Name of the VM to get the IP for
            storage: Optional storage path override
            retry_delay: Delay between retries in seconds (default: 2)
            
        Returns:
            IP address of the VM when it becomes available
        """
        logger.info(f"Getting IP address for container {name}")
        
        total_attempts = 0
        while True:
            total_attempts += 1
            
            try:
                vm_info = await self.get_vm(name, storage)
                
                if vm_info["status"] == "error":
                    raise Exception(f"VM is in error state: {vm_info.get('error', 'Unknown error')}")
                
                # TODO: for now, return localhost
                # it seems the docker container is not accessible from the host
                # on WSL2, unless you port forward? not sure
                if True:
                    logger.warning("Overriding container IP with localhost")
                    return "localhost"

                # Check if we got a valid IP
                ip = vm_info.get("ip_address", None)
                if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
                    logger.info(f"Got valid container IP address: {ip}")
                    return ip
                    
                # For Docker containers, we can also use localhost if ports are mapped
                if vm_info["status"] == "running" and vm_info.get("ports"):
                    logger.info(f"Container is running with port mappings, using localhost")
                    return "127.0.0.1"
                
                # Check the container status
                status = vm_info.get("status", "unknown")
                
                if status == "stopped":
                    logger.info(f"Container status is {status}, but still waiting for it to start")
                elif status != "running":
                    logger.info(f"Container is not running yet (status: {status}). Waiting...")
                else:
                    logger.info("Container is running but no valid IP address yet. Waiting...")
                
            except Exception as e:
                logger.warning(f"Error getting container {name} IP: {e}, continuing to wait...")
                
            # Wait before next retry
            await asyncio.sleep(retry_delay)
            
            # Add progress log every 10 attempts
            if total_attempts % 10 == 0:
                logger.info(f"Still waiting for container {name} IP after {total_attempts} attempts...")
    
    async def __aenter__(self):
        """Async context manager entry."""
        logger.debug("Entering DockerProvider context")
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit.
        
        This method handles cleanup of running containers if needed.
        """
        logger.debug(f"Exiting DockerProvider context, handling exceptions: {exc_type}")
        try:
            # Optionally stop running containers on context exit
            # For now, we'll leave containers running as they might be needed
            # Users can manually stop them if needed
            pass
        except Exception as e:
            logger.error(f"Error during DockerProvider cleanup: {e}")
            if exc_type is None:
                raise
        return False

```

--------------------------------------------------------------------------------
/blog/build-your-own-operator-on-macos-1.md:
--------------------------------------------------------------------------------

```markdown
# Build Your Own Operator on macOS - Part 1

*Published on March 31, 2025 by Francesco Bonacci*

In this first blogpost, we'll learn how to build our own Computer-Use Operator using OpenAI's `computer-use-preview` model. But first, let's understand what some common terms mean:

- A **Virtual Machine (VM)** is like a computer within your computer - a safe, isolated environment where the AI can work without affecting your main system.
- **computer-use-preview** is OpenAI's specialized language model trained to understand and interact with computer interfaces through screenshots.
- A **Computer-Use Agent** is an AI agent that can control a computer just like a human would - clicking buttons, typing text, and interacting with applications.

Our Operator will run in an isolated macOS VM, by making use of our [cua-computer](https://github.com/trycua/cua/tree/main/libs/computer) package and [lume virtualization CLI](https://github.com/trycua/cua/tree/main/libs/lume).

Check out what it looks like to use your own Operator from a Gradio app:

<div align="center">
  <video src="https://github.com/user-attachments/assets/a2cf69ad-2ab2-4eb9-8e1a-45606dd7eec6" width="600" controls></video>
</div>

## What You'll Learn

By the end of this tutorial, you'll be able to:
- Set up a macOS virtual machine for AI automation
- Connect OpenAI's computer-use model to your VM
- Create a basic loop for the AI to interact with your VM
- Handle different types of computer actions (clicking, typing, etc.)
- Implement safety checks and error handling

**Prerequisites:**
- macOS Sonoma (14.0) or later
- 8GB RAM minimum (16GB recommended)
- OpenAI API access (Tier 3+)
- Basic Python knowledge
- Familiarity with terminal commands

**Estimated Time:** 45-60 minutes

## Introduction to Computer-Use Agents

Last March OpenAI released a fine-tuned version of GPT-4o, namely [CUA](https://openai.com/index/computer-using-agent/), introducing pixel-level vision capabilities with advanced reasoning through reinforcement learning. This fine-tuning enables the computer-use model to interpret screenshots and interact with graphical user interfaces on a pixel-level such as buttons, menus, and text fields - mimicking human interactions on a computer screen. It scores a remarkable 38.1% success rate on [OSWorld](https://os-world.github.io) - a benchmark for Computer-Use agents on Linux and Windows. This is the 2nd available model after Anthropic's [Claude 3.5 Sonnet](https://www.anthropic.com/news/3-5-models-and-computer-use) to support computer-use capabilities natively with no external models (e.g. accessory [SoM (Set-of-Mark)](https://arxiv.org/abs/2310.11441) and OCR runs).

Professor Ethan Mollick provides an excellent explanation of computer-use agents in this article: [When you give a Claude a mouse](https://www.oneusefulthing.org/p/when-you-give-a-claude-a-mouse).

### ChatGPT Operator
OpenAI's computer-use model powers [ChatGPT Operator](https://openai.com/index/introducing-operator), a Chromium-based interface exclusively available to ChatGPT Pro subscribers. Users leverage this functionality to automate web-based tasks such as online shopping, expense report submission, and booking reservations by interacting with websites in a human-like manner.

## Benefits of Custom Operators

### Why Build Your Own?
While OpenAI's Operator uses a controlled Chromium VM instance, there are scenarios where you may want to use your own VM with full desktop capabilities. Here are some examples:

- Automating native macOS apps like Finder, Xcode
- Managing files, changing settings, and running terminal commands 
- Testing desktop software and applications
- Creating workflows that combine web and desktop tasks
- Automating media editing in apps like Final Cut Pro and Blender

This gives you more control and flexibility to automate tasks beyond just web browsing, with full access to interact with native applications and system-level operations. Additionally, running your own VM locally provides better privacy for sensitive user files and delivers superior performance by leveraging your own hardware instead of renting expensive Cloud VMs.

## Access Requirements

### Model Availability
As we speak, the **computer-use-preview** model has limited availability:
- Only accessible to OpenAI tier 3+ users
- Additional application process may be required even for eligible users
- Cannot be used in the OpenAI Playground
- Outside of ChatGPT Operator, usage is restricted to the new **Responses API**

## Understanding the OpenAI API

### Responses API Overview
Let's start with the basics. In our case, we'll use OpenAI's Responses API to communicate with their computer-use model.

Think of it like this:
1. We send the model a screenshot of our VM and tell it what we want it to do
2. The model looks at the screenshot and decides what actions to take
3. It sends back instructions (like "click here" or "type this")
4. We execute those instructions in our VM

The [Responses API](https://platform.openai.com/docs/guides/responses) is OpenAI's newest way to interact with their AI models. It comes with several built-in tools:
- **Web search**: Let the AI search the internet
- **File search**: Help the AI find documents
- **Computer use**: Allow the AI to control a computer (what we'll be using)

As we speak, the computer-use model is only available through the Responses API.

### Responses API Examples
Let's look at some simple examples. We'll start with the traditional way of using OpenAI's API with Chat Completions, then show the new Responses API primitive.

Chat Completions:
```python
# The old way required managing conversation history manually
messages = [{"role": "user", "content": "Hello"}]
response = client.chat.completions.create(
    model="gpt-4",
    messages=messages  # We had to track all messages ourselves
)
messages.append(response.choices[0].message)  # Manual message tracking
```

Responses API:
```python
# Example 1: Simple web search
# The API handles all the complexity for us
response = client.responses.create(
    model="gpt-4",
    input=[{
        "role": "user", 
        "content": "What's the latest news about AI?"
    }],
    tools=[{
        "type": "web_search",  # Tell the API to use web search
        "search_query": "latest AI news"
    }]
)

# Example 2: File search
# Looking for specific documents becomes easy
response = client.responses.create(
    model="gpt-4",
    input=[{
        "role": "user", 
        "content": "Find documents about project X"
    }],
    tools=[{
        "type": "file_search",
        "query": "project X",
        "file_types": ["pdf", "docx"]  # Specify which file types to look for
    }]
)
```

### Computer-Use Model Setup
For our operator, we'll use the computer-use model. Here's how we set it up:

```python
# Set up the computer-use model to control our VM
response = client.responses.create(
    model="computer-use-preview",  # Special model for computer control
    tools=[{
        "type": "computer_use_preview",
        "display_width": 1024,     # Size of our VM screen
        "display_height": 768,
        "environment": "mac"       # Tell it we're using macOS.
    }],
    input=[
        {
            "role": "user", 
            "content": [
                # What we want the AI to do
                {"type": "input_text", "text": "Open Safari and go to google.com"},
                # Current screenshot of our VM
                {"type": "input_image", "image_url": f"data:image/png;base64,{screenshot_base64}"}
            ]
        }
    ],
    truncation="auto"  # Let OpenAI handle message length
)
```

### Understanding the Response
When we send a request, the API sends back a response that looks like this:

```json
"output": [
    {
        "type": "reasoning",           # The AI explains what it's thinking
        "id": "rs_67cc...",
        "summary": [
            {
                "type": "summary_text",
                "text": "Clicking on the browser address bar."
            }
        ]
    },
    {
        "type": "computer_call",       # The actual action to perform
        "id": "cu_67cc...",
        "call_id": "call_zw3...",
        "action": {
            "type": "click",           # What kind of action (click, type, etc.)
            "button": "left",          # Which mouse button to use
            "x": 156,                  # Where to click (coordinates)
            "y": 50
        },
        "pending_safety_checks": [],   # Any safety warnings to consider
        "status": "completed"          # Whether the action was successful
    }
]
```

Each response contains:
1. **Reasoning**: The AI's explanation of what it's doing
2. **Action**: The specific computer action to perform
3. **Safety Checks**: Any potential risks to review
4. **Status**: Whether everything worked as planned

## CUA-Computer Interface

### Architecture Overview
Let's break down the main components of our system and how they work together:

1. **The Virtual Machine (VM)**
   - Think of this as a safe playground for our AI
   - It's a complete macOS system running inside your computer
   - Anything the AI does stays inside this VM, keeping your main system safe
   - We use `lume` to create and manage this VM

2. **The Computer Interface (CUI)**
   - This is how we control the VM
   - It can move the mouse, type text, and take screenshots
   - Works like a remote control for the VM
   - Built using our `cua-computer` package

3. **The OpenAI Model**
   - This is the brain of our operator
   - It looks at screenshots of the VM
   - Decides what actions to take
   - Sends back instructions like "click here" or "type this"

Here's how they all work together:

```mermaid
sequenceDiagram
    participant User as You
    participant CUI as Computer Interface
    participant VM as Virtual Machine
    participant AI as OpenAI API

    Note over User,AI: The Main Loop
    User->>CUI: Start the operator
    CUI->>VM: Create macOS sandbox
    activate VM
    VM-->>CUI: VM is ready

    loop Action Loop
        Note over CUI,AI: Each iteration
        CUI->>VM: Take a screenshot
        VM-->>CUI: Return current screen
        CUI->>AI: Send screenshot + instructions
        AI-->>CUI: Return next action
        
        Note over CUI,VM: Execute the action
        alt Mouse Click
            CUI->>VM: Move and click mouse
        else Type Text
            CUI->>VM: Type characters
        else Scroll Screen
            CUI->>VM: Scroll window
        else Press Keys
            CUI->>VM: Press keyboard keys
        else Wait
            CUI->>VM: Pause for a moment
        end
    end

    VM-->>CUI: Task finished
    deactivate VM
    CUI-->>User: All done!
```

The diagram above shows how information flows through our system:
1. You start the operator
2. The Computer Interface creates a virtual macOS
3. Then it enters a loop:
   - Take a picture of the VM screen
   - Send it to OpenAI with instructions
   - Get back an action to perform
   - Execute that action in the VM
   - Repeat until the task is done

This design keeps everything organized and safe. The AI can only interact with the VM through our controlled interface, and the VM keeps the AI's actions isolated from your main system.

---

## Implementation Guide

### Prerequisites

1. **Lume CLI Setup**
   For installing the standalone lume binary, run the following command from a terminal, or download the [latest pkg](https://github.com/trycua/cua/releases/latest/download/lume.pkg.tar.gz).

   ```bash
   sudo /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/trycua/cua/main/libs/lume/scripts/install.sh)"
   ```

   **Important Storage Notes:**
   - Initial download requires 80GB of free space
   - After first run, space usage reduces to ~30GB due to macOS's sparse file system
   - VMs are stored in `~/.lume`
   - Cached images are stored in `~/.lume/cache`

   You can check your downloaded VM images anytime:
   ```bash
   lume ls
   ```

   Example output:

   | name                     | os      | cpu   | memory  | disk           | display   | status    | ip             | vnc                                               |
   |--------------------------|---------|-------|---------|----------------|-----------|-----------|----------------|---------------------------------------------------|
   | macos-sequoia-cua:latest | macOS   | 12    | 16.00G  | 64.5GB/80.0GB  | 1024x768  | running   | 192.168.64.78  | vnc://:[email protected]:56085    |

   After checking your available images, you can run the VM to ensure everything is working correctly:
   ```bash
   lume run macos-sequoia-cua:latest
   ```

2. **Python Environment Setup**
   **Note**: The `cua-computer` package requires Python 3.10 or later. We recommend creating a dedicated Python environment:

   **Using venv:**
   ```bash
   python -m venv cua-env
   source cua-env/bin/activate
   ```

   **Using conda:**
   ```bash
   conda create -n cua-env python=3.10
   conda activate cua-env
   ```

   Then install the required packages:

   ```bash
   pip install openai
   pip install cua-computer
   ```

   Ensure you have an OpenAI API key (set as an environment variable or in your OpenAI configuration).

### Building the Operator

#### Importing Required Modules
With the prerequisites installed and configured, we're ready to build our first operator.
The following example uses asynchronous Python (async/await). You can run it either in a VS Code Notebook or as a standalone Python script.

```python
import asyncio
import base64
import openai

from computer import Computer
```

#### Mapping API Actions to CUA Methods
The following helper function converts a `computer_call` action from the OpenAI Responses API into corresponding commands on the CUI interface. For example, if the API instructs a `click` action, we move the cursor and perform a left click on the lume VM Sandbox. We will use the computer interface to execute the actions.

```python
async def execute_action(computer, action):
    action_type = action.type
    
    if action_type == "click":
        x = action.x
        y = action.y
        button = action.button
        print(f"Executing click at ({x}, {y}) with button '{button}'")
        await computer.interface.move_cursor(x, y)
        if button == "right":
            await computer.interface.right_click()
        else:
            await computer.interface.left_click()
    
    elif action_type == "type":
        text = action.text
        print(f"Typing text: {text}")
        await computer.interface.type_text(text)
    
    elif action_type == "scroll":
        x = action.x
        y = action.y
        scroll_x = action.scroll_x
        scroll_y = action.scroll_y
        print(f"Scrolling at ({x}, {y}) with offsets (scroll_x={scroll_x}, scroll_y={scroll_y})")
        await computer.interface.move_cursor(x, y)
        await computer.interface.scroll(scroll_y)  # Using vertical scroll only
    
    elif action_type == "keypress":
        keys = action.keys
        for key in keys:
            print(f"Pressing key: {key}")
            # Map common key names to CUA equivalents
            if key.lower() == "enter":
                await computer.interface.press_key("return")
            elif key.lower() == "space":
                await computer.interface.press_key("space")
            else:
                await computer.interface.press_key(key)
    
    elif action_type == "wait":
        wait_time = action.time
        print(f"Waiting for {wait_time} seconds")
        await asyncio.sleep(wait_time)
    
    elif action_type == "screenshot":
        print("Taking screenshot")
        # This is handled automatically in the main loop, but we can take an extra one if requested
        screenshot = await computer.interface.screenshot()
        return screenshot
    
    else:
        print(f"Unrecognized action: {action_type}")
```

#### Implementing the Computer-Use Loop
This section defines a loop that:

1. Initializes the cua-computer instance (connecting to a macOS sandbox).
2. Captures a screenshot of the current state.
3. Sends the screenshot (with a user prompt) to the OpenAI Responses API using the `computer-use-preview` model.
4. Processes the returned `computer_call` action and executes it using our helper function.
5. Captures an updated screenshot after the action (this example runs one iteration, but you can wrap it in a loop).

For a full loop, you would repeat these steps until no further actions are returned.

```python
async def cua_openai_loop():
    # Initialize the lume computer instance (macOS sandbox)
    async with Computer(
        display="1024x768",
        memory="4GB",
        cpu="2",
        os_type="macos"
    ) as computer:
        await computer.run() # Start the lume VM
        
        # Capture the initial screenshot
        screenshot = await computer.interface.screenshot()
        screenshot_base64 = base64.b64encode(screenshot).decode('utf-8')

        # Initial request to start the loop
        response = openai.responses.create(
            model="computer-use-preview",
            tools=[{
                "type": "computer_use_preview",
                "display_width": 1024,
                "display_height": 768,
                "environment": "mac"
            }],
            input=[
                {  
                    "role": "user", 
                    "content": [
                        {"type": "input_text", "text": "Open Safari, download and install Cursor."},
                        {"type": "input_image", "image_url": f"data:image/png;base64,{screenshot_base64}"}
                    ]
                }
            ],
            truncation="auto"
        )

        # Continue the loop until no more computer_call actions
        while True:
            # Check for computer_call actions
            computer_calls = [item for item in response.output if item and item.type == "computer_call"]
            if not computer_calls:
                print("No more computer calls. Loop complete.")
                break

            # Get the first computer call
            call = computer_calls[0]
            last_call_id = call.call_id
            action = call.action
            print("Received action from OpenAI Responses API:", action)

            # Handle any pending safety checks
            if call.pending_safety_checks:
                print("Safety checks pending:", call.pending_safety_checks)
                # In a real implementation, you would want to get user confirmation here
                acknowledged_checks = call.pending_safety_checks
            else:
                acknowledged_checks = []

            # Execute the action
            await execute_action(computer, action)
            await asyncio.sleep(1)  # Allow time for changes to take effect

            # Capture new screenshot after action
            new_screenshot = await computer.interface.screenshot()
            new_screenshot_base64 = base64.b64encode(new_screenshot).decode('utf-8')

            # Send the screenshot back as computer_call_output
            response = openai.responses.create(
                model="computer-use-preview",
                tools=[{
                    "type": "computer_use_preview",
                    "display_width": 1024,
                    "display_height": 768,
                    "environment": "mac"
                }],
                input=[{  
                    "type": "computer_call_output",
                    "call_id": last_call_id,
                    "acknowledged_safety_checks": acknowledged_checks,
                    "output": {
                        "type": "input_image",
                        "image_url": f"data:image/png;base64,{new_screenshot_base64}"
                    }
                }],
                truncation="auto"
            )

        # End the session
        await computer.stop()

# Run the loop
if __name__ == "__main__":
    asyncio.run(cua_openai_loop())
```

You can find the full code in our [notebook](https://github.com/trycua/cua/blob/main/notebooks/blog/build-your-own-operator-on-macos-1.ipynb).

#### Request Handling Differences
The first request to the OpenAI Responses API is special in that it includes the initial screenshot and prompt. Subsequent requests are handled differently, using the `computer_call_output` type to provide feedback on the executed action.

##### Initial Request Format
- We use `role: "user"` with `content` that contains both `input_text` (the prompt) and `input_image` (the screenshot)

##### Subsequent Request Format
- We use `type: "computer_call_output"` instead of the user role
- We include the `call_id` to link the output to the specific previous action that was executed
- We provide any `acknowledged_safety_checks` that were approved
- We include the new screenshot in the `output` field

This structured approach allows the API to maintain context and continuity throughout the interaction session.

**Note**: For multi-turn conversations, you should include the `previous_response_id` in your initial requests when starting a new conversation with prior context. However, when using `computer_call_output` for action feedback, you don't need to explicitly manage the conversation history - OpenAI's API automatically tracks the context using the `call_id`. The `previous_response_id` is primarily important when the user provides additional instructions or when starting a new request that should continue from a previous session.

## Conclusion

### Summary
This blogpost demonstrates a single iteration of a OpenAI Computer-Use loop where:

- A macOS sandbox is controlled using the CUA interface.
- A screenshot and prompt are sent to the OpenAI Responses API.
- The returned action (e.g. a click or type command) is executed via the CUI interface.

In a production setting, you would wrap the action-response cycle in a loop, handling multiple actions and safety checks as needed.

### Next Steps
In the next blogpost, we'll introduce our Agent framework which abstracts away all these tedious implementation steps. This framework provides a higher-level API that handles the interaction loop between OpenAI's computer-use model and the macOS sandbox, allowing you to focus on building sophisticated applications rather than managing the low-level details we've explored here. Can't wait? Check out the [cua-agent](https://github.com/trycua/cua/tree/main/libs/agent) package!

### Resources
- [OpenAI Computer-Use docs](https://platform.openai.com/docs/guides/tools-computer-use)
- [cua-computer](https://github.com/trycua/cua/tree/main/libs/computer)
- [lume](https://github.com/trycua/cua/tree/main/libs/lume)

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/diorama/diorama.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""Diorama: A virtual desktop manager for macOS"""

import os
import asyncio
import logging
import sys
import io
from typing import Union
from PIL import Image, ImageDraw

from computer_server.diorama.draw import capture_all_apps, AppActivationContext, get_frontmost_and_active_app, get_all_windows, get_running_apps

from computer_server.diorama.diorama_computer import DioramaComputer
from computer_server.handlers.macos import *

# simple, nicely formatted logging
logger = logging.getLogger(__name__)

automation_handler = MacOSAutomationHandler()

class Diorama:
    """Virtual desktop manager that provides automation capabilities for macOS applications.
    
    Manages application windows and provides an interface for taking screenshots,
    mouse interactions, keyboard input, and coordinate transformations between
    screenshot space and screen space.
    """
    _scheduler_queue = None
    _scheduler_task = None
    _loop = None
    _scheduler_started = False

    @classmethod
    def create_from_apps(cls, *args) -> DioramaComputer:
        """Create a DioramaComputer instance from a list of application names.
        
        Args:
            *args: Variable number of application names to include in the desktop
            
        Returns:
            DioramaComputer: A computer interface for the specified applications
        """
        cls._ensure_scheduler()
        return cls(args).computer

    # Dictionary to store cursor positions for each unique app_list hash
    _cursor_positions = {}
    
    def __init__(self, app_list):
        """Initialize a Diorama instance for the specified applications.
        
        Args:
            app_list: List of application names to manage
        """
        self.app_list = app_list
        self.interface = self.Interface(self)
        self.computer = DioramaComputer(self)
        self.focus_context = None
        
        # Create a hash for this app_list to use as a key
        self.app_list_hash = hash(tuple(sorted(app_list)))
        
        # Initialize cursor position for this app_list if it doesn't exist
        if self.app_list_hash not in Diorama._cursor_positions:
            Diorama._cursor_positions[self.app_list_hash] = (0, 0)

    @classmethod
    def _ensure_scheduler(cls):
        """Ensure the async scheduler loop is running.
        
        Creates and starts the scheduler task if it hasn't been started yet.
        """
        if not cls._scheduler_started:
            logger.info("Starting Diorama scheduler loop…")
            cls._scheduler_queue = asyncio.Queue()
            cls._loop = asyncio.get_event_loop()
            cls._scheduler_task = cls._loop.create_task(cls._scheduler_loop())
            cls._scheduler_started = True

    @classmethod
    async def _scheduler_loop(cls):
        """Main scheduler loop that processes automation commands.
        
        Continuously processes commands from the scheduler queue, handling
        screenshots, mouse actions, keyboard input, and scrolling operations.
        """
        while True:
            cmd = await cls._scheduler_queue.get()
            action = cmd.get("action")
            args = cmd.get("arguments", {})
            future = cmd.get("future")
            logger.info(f"Processing command: {action} | args={args}")
            
            app_whitelist = args.get("app_list", [])
            
            all_windows = get_all_windows()
            running_apps = get_running_apps()
            frontmost_app, active_app_to_use, active_app_pid = get_frontmost_and_active_app(all_windows, running_apps, app_whitelist)
            focus_context = AppActivationContext(active_app_pid, active_app_to_use, logger)
            
            with focus_context:
                try:
                    if action == "screenshot":
                        logger.info(f"Taking screenshot for apps: {app_whitelist}")
                        result, img = capture_all_apps(
                            app_whitelist=app_whitelist,
                            save_to_disk=False,
                            take_focus=False
                        )
                        logger.info("Screenshot complete.")
                        if future:
                            future.set_result((result, img))
                    # Mouse actions
                    elif action in ["left_click", "right_click", "double_click", "move_cursor", "drag_to"]:
                        x = args.get("x")
                        y = args.get("y")
                        
                        duration = args.get("duration", 0.5)
                        if action == "left_click":
                            await automation_handler.left_click(x, y)
                        elif action == "right_click":
                            await automation_handler.right_click(x, y)
                        elif action == "double_click":
                            await automation_handler.double_click(x, y)
                        elif action == "move_cursor":
                            await automation_handler.move_cursor(x, y)
                        elif action == "drag_to":
                            await automation_handler.drag_to(x, y, duration=duration)
                        if future:
                            future.set_result(None)
                    elif action in ["scroll_up", "scroll_down"]:
                        x = args.get("x")
                        y = args.get("y")
                        if x is not None and y is not None:
                            await automation_handler.move_cursor(x, y)
                        
                        clicks = args.get("clicks", 1)
                        if action == "scroll_up":
                            await automation_handler.scroll_up(clicks)
                        else:
                            await automation_handler.scroll_down(clicks)
                        if future:
                            future.set_result(None)
                    # Keyboard actions
                    elif action == "type_text":
                        text = args.get("text")
                        await automation_handler.type_text(text)
                        if future:
                            future.set_result(None)
                    elif action == "press_key":
                        key = args.get("key")
                        await automation_handler.press_key(key)
                        if future:
                            future.set_result(None)
                    elif action == "hotkey":
                        keys = args.get("keys", [])
                        await automation_handler.hotkey(keys)
                        if future:
                            future.set_result(None)
                    elif action == "get_cursor_position":
                        pos = await automation_handler.get_cursor_position()
                        if future:
                            future.set_result(pos)
                    else:
                        logger.warning(f"Unknown action: {action}")
                        if future:
                            future.set_exception(ValueError(f"Unknown action: {action}"))
                except Exception as e:
                    logger.error(f"Exception during {action}: {e}", exc_info=True)
                    if future:
                        future.set_exception(e)

    class Interface():
        """Interface for interacting with the virtual desktop.
        
        Provides methods for taking screenshots, mouse interactions, keyboard input,
        and coordinate transformations between screenshot and screen coordinates.
        """
        
        def __init__(self, diorama):
            """Initialize the interface with a reference to the parent Diorama instance.
            
            Args:
                diorama: The parent Diorama instance
            """
            self._diorama = diorama
            
            self._scene_hitboxes = []
            self._scene_size = None

        async def _send_cmd(self, action, arguments=None):
            """Send a command to the scheduler queue.
            
            Args:
                action (str): The action to perform
                arguments (dict, optional): Arguments for the action
                
            Returns:
                The result of the command execution
            """
            Diorama._ensure_scheduler()
            loop = asyncio.get_event_loop()
            future = loop.create_future()
            logger.info(f"Enqueuing {action} command for apps: {self._diorama.app_list}")
            await Diorama._scheduler_queue.put({
                "action": action,
                "arguments": {"app_list": self._diorama.app_list, **(arguments or {})},
                "future": future
            })
            try:
                return await future
            except asyncio.CancelledError:
                logger.warning(f"Command was cancelled: {action}")
                return None

        async def screenshot(self, as_bytes: bool = True) -> Union[str, Image.Image]:
            """Take a screenshot of the managed applications.
            
            Args:
                as_bytes (bool): If True, return base64-encoded bytes; if False, return PIL Image
                
            Returns:
                Union[str, Image.Image]: Base64-encoded PNG bytes or PIL Image object
            """
            import base64
            result, img = await self._send_cmd("screenshot")
            self._scene_hitboxes = result.get("hitboxes", [])
            self._scene_size = img.size
            
            if as_bytes:
                # PIL Image to bytes, then base64 encode for JSON
                import io
                img_byte_arr = io.BytesIO()
                img.save(img_byte_arr, format="PNG")
                img_bytes = img_byte_arr.getvalue()
                img_b64 = base64.b64encode(img_bytes).decode("ascii")
                return img_b64
            else:
                return img

        async def left_click(self, x, y):
            """Perform a left mouse click at the specified coordinates.
            
            Args:
                x (int): X coordinate in screenshot space (or None to use last position)
                y (int): Y coordinate in screenshot space (or None to use last position)
            """
            # Get last cursor position for this app_list hash
            app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
            last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
            x, y = x or last_pos[0], y or last_pos[1]
            # Update cursor position for this app_list hash
            Diorama._cursor_positions[app_list_hash] = (x, y)

            sx, sy = await self.to_screen_coordinates(x, y)
            await self._send_cmd("left_click", {"x": sx, "y": sy})

        async def right_click(self, x, y):
            """Perform a right mouse click at the specified coordinates.
            
            Args:
                x (int): X coordinate in screenshot space (or None to use last position)
                y (int): Y coordinate in screenshot space (or None to use last position)
            """
            # Get last cursor position for this app_list hash
            app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
            last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
            x, y = x or last_pos[0], y or last_pos[1]
            # Update cursor position for this app_list hash
            Diorama._cursor_positions[app_list_hash] = (x, y)
            
            sx, sy = await self.to_screen_coordinates(x, y)
            await self._send_cmd("right_click", {"x": sx, "y": sy})

        async def double_click(self, x, y):
            """Perform a double mouse click at the specified coordinates.
            
            Args:
                x (int): X coordinate in screenshot space (or None to use last position)
                y (int): Y coordinate in screenshot space (or None to use last position)
            """
            # Get last cursor position for this app_list hash
            app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
            last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
            x, y = x or last_pos[0], y or last_pos[1]
            # Update cursor position for this app_list hash
            Diorama._cursor_positions[app_list_hash] = (x, y)
            
            sx, sy = await self.to_screen_coordinates(x, y)
            await self._send_cmd("double_click", {"x": sx, "y": sy})

        async def move_cursor(self, x, y):
            """Move the mouse cursor to the specified coordinates.
            
            Args:
                x (int): X coordinate in screenshot space (or None to use last position)
                y (int): Y coordinate in screenshot space (or None to use last position)
            """
            # Get last cursor position for this app_list hash
            app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
            last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
            x, y = x or last_pos[0], y or last_pos[1]
            # Update cursor position for this app_list hash
            Diorama._cursor_positions[app_list_hash] = (x, y)
            
            sx, sy = await self.to_screen_coordinates(x, y)
            await self._send_cmd("move_cursor", {"x": sx, "y": sy})

        async def drag_to(self, x, y, duration=0.5):
            """Drag the mouse from current position to the specified coordinates.
            
            Args:
                x (int): X coordinate in screenshot space (or None to use last position)
                y (int): Y coordinate in screenshot space (or None to use last position)
                duration (float): Duration of the drag operation in seconds
            """
            # Get last cursor position for this app_list hash
            app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
            last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
            x, y = x or last_pos[0], y or last_pos[1]
            # Update cursor position for this app_list hash
            Diorama._cursor_positions[app_list_hash] = (x, y)
            
            sx, sy = await self.to_screen_coordinates(x, y)
            await self._send_cmd("drag_to", {"x": sx, "y": sy, "duration": duration})

        async def get_cursor_position(self):
            """Get the current cursor position in screen coordinates.
            
            Returns:
                tuple: (x, y) coordinates of the cursor in screen space
            """
            return await self._send_cmd("get_cursor_position")

        async def type_text(self, text):
            """Type the specified text using the keyboard.
            
            Args:
                text (str): The text to type
            """
            await self._send_cmd("type_text", {"text": text})

        async def press_key(self, key):
            """Press a single key on the keyboard.
            
            Args:
                key (str): The key to press
            """
            await self._send_cmd("press_key", {"key": key})

        async def hotkey(self, keys):
            """Press a combination of keys simultaneously.
            
            Args:
                keys (list): List of keys to press together
            """
            await self._send_cmd("hotkey", {"keys": list(keys)})

        async def scroll_up(self, clicks: int = 1):
            """Scroll up at the current cursor position.
            
            Args:
                clicks (int): Number of scroll clicks to perform
            """
            # Get last cursor position for this app_list hash
            app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
            last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
            x, y = last_pos[0], last_pos[1]
            
            await self._send_cmd("scroll_up", {"clicks": clicks, "x": x, "y": y})

        async def scroll_down(self, clicks: int = 1):
            """Scroll down at the current cursor position.
            
            Args:
                clicks (int): Number of scroll clicks to perform
            """
            # Get last cursor position for this app_list hash
            app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
            last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
            x, y = last_pos[0], last_pos[1]
            
            await self._send_cmd("scroll_down", {"clicks": clicks, "x": x, "y": y})

        async def get_screen_size(self) -> dict[str, int]:
            """Get the size of the screenshot area.
            
            Returns:
                dict[str, int]: Dictionary with 'width' and 'height' keys
            """
            if not self._scene_size:
                await self.screenshot()
            return { "width": self._scene_size[0], "height": self._scene_size[1] }

        async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
            """Convert screenshot coordinates to screen coordinates.

            Args:
                x: X absolute coordinate in screenshot space
                y: Y absolute coordinate in screenshot space

            Returns:
                tuple[float, float]: (x, y) absolute coordinates in screen space
            """
            if not self._scene_hitboxes:
                await self.screenshot() # get hitboxes
            # Try all hitboxes
            for h in self._scene_hitboxes[::-1]:
                rect_from = h.get("hitbox")
                rect_to = h.get("target")
                if not rect_from or len(rect_from) != 4:
                    continue
                
                # check if (x, y) is inside rect_from
                x0, y0, x1, y1 = rect_from
                if x0 <= x <= x1 and y0 <= y <= y1:
                    logger.info(f"Found hitbox: {h}")
                    # remap (x, y) to rect_to
                    tx0, ty0, tx1, ty1 = rect_to
                    
                    # calculate offset from x0, y0
                    offset_x = x - x0
                    offset_y = y - y0
                    
                    # remap offset to rect_to
                    tx = tx0 + offset_x
                    ty = ty0 + offset_y
                    
                    return tx, ty
            return x, y

        async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
            """Convert screen coordinates to screenshot coordinates.

            Args:
                x: X absolute coordinate in screen space
                y: Y absolute coordinate in screen space

            Returns:
                tuple[float, float]: (x, y) absolute coordinates in screenshot space
            """
            if not self._scene_hitboxes:
                await self.screenshot() # get hitboxes
            # Try all hitboxes
            for h in self._scene_hitboxes[::-1]:
                rect_from = h.get("target")
                rect_to = h.get("hitbox")
                if not rect_from or len(rect_from) != 4:
                    continue
                
                # check if (x, y) is inside rect_from
                x0, y0, x1, y1 = rect_from
                if x0 <= x <= x1 and y0 <= y <= y1:
                    # remap (x, y) to rect_to
                    tx0, ty0, tx1, ty1 = rect_to
                    
                    # calculate offset from x0, y0
                    offset_x = x - x0
                    offset_y = y - y0
                    
                    # remap offset to rect_to
                    tx = tx0 + offset_x
                    ty = ty0 + offset_y
                    
                    return tx, ty
            return x, y

import pyautogui
import time

async def main():
    """Main function demonstrating Diorama usage with multiple desktops and mouse tracking."""
    desktop1 = Diorama.create_from_apps(["Discord", "Notes"])
    desktop2 = Diorama.create_from_apps(["Terminal"])

    img1 = await desktop1.interface.screenshot(as_bytes=False)
    img2 = await desktop2.interface.screenshot(as_bytes=False)

    img1.save("app_screenshots/desktop1.png")
    img2.save("app_screenshots/desktop2.png")
    # Initialize Diorama desktop
    desktop3 = Diorama.create_from_apps("Safari")
    screen_size = await desktop3.interface.get_screen_size()
    print(screen_size)

    # Take initial screenshot
    img = await desktop3.interface.screenshot(as_bytes=False)
    img.save("app_screenshots/desktop3.png")

    # Prepare hitboxes and draw on the single screenshot
    hitboxes = desktop3.interface._scene_hitboxes[::-1]
    base_img = img.copy()
    draw = ImageDraw.Draw(base_img)
    for h in hitboxes:
        rect = h.get("hitbox")
        if not rect or len(rect) != 4:
            continue
        draw.rectangle(rect, outline="red", width=2)

    # Track and draw mouse position in real time (single screenshot size)
    last_mouse_pos = None
    print("Tracking mouse... Press Ctrl+C to stop.")
    try:
        while True:
            mouse_x, mouse_y = pyautogui.position()
            if last_mouse_pos != (mouse_x, mouse_y):
                last_mouse_pos = (mouse_x, mouse_y)
                # Map to screenshot coordinates
                sx, sy = await desktop3.interface.to_screenshot_coordinates(mouse_x, mouse_y)
                # Draw on a copy of the screenshot
                frame = base_img.copy()
                frame_draw = ImageDraw.Draw(frame)
                frame_draw.ellipse((sx-5, sy-5, sx+5, sy+5), fill="blue", outline="blue")
                # Save the frame
                frame.save("app_screenshots/desktop3_mouse.png")
                print(f"Mouse at screen ({mouse_x}, {mouse_y}) -> screenshot ({sx:.1f}, {sy:.1f})")
            time.sleep(0.05)  # Throttle updates to ~20 FPS
    except KeyboardInterrupt:
        print("Stopped tracking.")

        draw.text((rect[0], rect[1]), str(idx), fill="red")
    
    canvas.save("app_screenshots/desktop3_hitboxes.png")
    
    

    # move mouse in a square spiral around the screen
    import math
    import random
    
    step = 20  # pixels per move
    dot_radius = 10
    width = screen_size["width"]
    height = screen_size["height"]
    x, y = 0, 10

    while x < width and y < height:
        await desktop3.interface.move_cursor(x, y)
        img = await desktop3.interface.screenshot(as_bytes=False)
        draw = ImageDraw.Draw(img)
        draw.ellipse((x-dot_radius, y-dot_radius, x+dot_radius, y+dot_radius), fill="red")
        img.save("current.png")
        await asyncio.sleep(0.03)
        x += step
        y = math.sin(x / width * math.pi * 2) * 50 + 25

if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/libs/lume/src/Server/Server.swift:
--------------------------------------------------------------------------------

```swift
import Darwin
import Foundation
import Network

// MARK: - Error Types
enum PortError: Error, LocalizedError {
    case alreadyInUse(port: UInt16)

    var errorDescription: String? {
        switch self {
        case .alreadyInUse(let port):
            return "Port \(port) is already in use by another process"
        }
    }
}

// MARK: - Server Class
@MainActor
final class Server {

    // MARK: - Route Type
    private struct Route {
        let method: String
        let path: String
        let handler: (HTTPRequest) async throws -> HTTPResponse

        func matches(_ request: HTTPRequest) -> Bool {
            if method != request.method { return false }

            // Handle path parameters
            let routeParts = path.split(separator: "/")
            let requestParts = request.path.split(separator: "/")

            if routeParts.count != requestParts.count { return false }

            for (routePart, requestPart) in zip(routeParts, requestParts) {
                if routePart.hasPrefix(":") { continue }  // Path parameter
                if routePart != requestPart { return false }
            }

            return true
        }

        func extractParams(_ request: HTTPRequest) -> [String: String] {
            var params: [String: String] = [:]
            let routeParts = path.split(separator: "/")
            
            // Split request path to remove query parameters
            let requestPathOnly = request.path.split(separator: "?", maxSplits: 1)[0]
            let requestParts = requestPathOnly.split(separator: "/")

            for (routePart, requestPart) in zip(routeParts, requestParts) {
                if routePart.hasPrefix(":") {
                    let paramName = String(routePart.dropFirst())
                    params[paramName] = String(requestPart)
                }
            }

            return params
        }
    }

    // MARK: - Properties
    private let port: NWEndpoint.Port
    private let controller: LumeController
    private var isRunning = false
    private var listener: NWListener?
    private var routes: [Route]

    // MARK: - Initialization
    init(port: UInt16 = 7777) {
        self.port = NWEndpoint.Port(rawValue: port)!
        self.controller = LumeController()
        self.routes = []

        // Define API routes after self is fully initialized
        self.setupRoutes()
    }

    // MARK: - Route Setup
    private func setupRoutes() {
        routes = [
            Route(
                method: "GET", path: "/lume/vms",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    // Extract storage from query params if present
                    let storage = self.extractQueryParam(request: request, name: "storage")
                    return try await self.handleListVMs(storage: storage)
                }),
            Route(
                method: "GET", path: "/lume/vms/:name",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    let params = Route(
                        method: "GET", path: "/lume/vms/:name",
                        handler: { _ in
                            HTTPResponse(statusCode: .ok, body: "")
                        }
                    ).extractParams(request)
                    guard let name = params["name"] else {
                        return HTTPResponse(statusCode: .badRequest, body: "Missing VM name")
                    }

                    // Extract storage from query params if present
                    let storage = self.extractQueryParam(request: request, name: "storage")

                    return try await self.handleGetVM(name: name, storage: storage)
                }),
            Route(
                method: "DELETE", path: "/lume/vms/:name",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    let params = Route(
                        method: "DELETE", path: "/lume/vms/:name",
                        handler: { _ in
                            HTTPResponse(statusCode: .ok, body: "")
                        }
                    ).extractParams(request)
                    guard let name = params["name"] else {
                        return HTTPResponse(statusCode: .badRequest, body: "Missing VM name")
                    }

                    // Extract storage from query params if present
                    let storage = self.extractQueryParam(request: request, name: "storage")

                    return try await self.handleDeleteVM(name: name, storage: storage)
                }),
            Route(
                method: "POST", path: "/lume/vms",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    return try await self.handleCreateVM(request.body)
                }),
            Route(
                method: "POST", path: "/lume/vms/clone",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    return try await self.handleCloneVM(request.body)
                }),
            Route(
                method: "PATCH", path: "/lume/vms/:name",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    let params = Route(
                        method: "PATCH", path: "/lume/vms/:name",
                        handler: { _ in
                            HTTPResponse(statusCode: .ok, body: "")
                        }
                    ).extractParams(request)
                    guard let name = params["name"] else {
                        return HTTPResponse(statusCode: .badRequest, body: "Missing VM name")
                    }
                    return try await self.handleSetVM(name: name, body: request.body)
                }),
            Route(
                method: "POST", path: "/lume/vms/:name/run",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    let params = Route(
                        method: "POST", path: "/lume/vms/:name/run",
                        handler: { _ in
                            HTTPResponse(statusCode: .ok, body: "")
                        }
                    ).extractParams(request)
                    guard let name = params["name"] else {
                        return HTTPResponse(statusCode: .badRequest, body: "Missing VM name")
                    }
                    return try await self.handleRunVM(name: name, body: request.body)
                }),
            Route(
                method: "POST", path: "/lume/vms/:name/stop",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    let params = Route(
                        method: "POST", path: "/lume/vms/:name/stop",
                        handler: { _ in
                            HTTPResponse(statusCode: .ok, body: "")
                        }
                    ).extractParams(request)
                    guard let name = params["name"] else {
                        return HTTPResponse(statusCode: .badRequest, body: "Missing VM name")
                    }

                    Logger.info("Processing stop VM request", metadata: ["method": request.method, "path": request.path])

                    // Extract storage from the request body
                    var storage: String? = nil
                    if let bodyData = request.body, !bodyData.isEmpty {
                        do {
                            if let json = try JSONSerialization.jsonObject(with: bodyData) as? [String: Any],
                               let bodyStorage = json["storage"] as? String {
                                storage = bodyStorage
                                Logger.info("Extracted storage from request body", metadata: ["storage": bodyStorage])
                            }
                        } catch {
                            Logger.error("Failed to parse request body JSON", metadata: ["error": error.localizedDescription])
                        }
                    }

                    return try await self.handleStopVM(name: name, storage: storage)
                }),
            Route(
                method: "GET", path: "/lume/ipsw",
                handler: { [weak self] _ in
                    guard let self else { throw HTTPError.internalError }
                    return try await self.handleIPSW()
                }),
            Route(
                method: "POST", path: "/lume/pull",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    return try await self.handlePull(request.body)
                }),
            Route(
                method: "POST", path: "/lume/prune",
                handler: { [weak self] _ in
                    guard let self else { throw HTTPError.internalError }
                    return try await self.handlePruneImages()
                }),
            Route(
                method: "GET", path: "/lume/images",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    return try await self.handleGetImages(request)
                }),
            // New config endpoint
            Route(
                method: "GET", path: "/lume/config",
                handler: { [weak self] _ in
                    guard let self else { throw HTTPError.internalError }
                    return try await self.handleGetConfig()
                }),
            Route(
                method: "POST", path: "/lume/config",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    return try await self.handleUpdateConfig(request.body)
                }),
            Route(
                method: "GET", path: "/lume/config/locations",
                handler: { [weak self] _ in
                    guard let self else { throw HTTPError.internalError }
                    return try await self.handleGetLocations()
                }),
            Route(
                method: "POST", path: "/lume/config/locations",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    return try await self.handleAddLocation(request.body)
                }),
            Route(
                method: "DELETE", path: "/lume/config/locations/:name",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    let params = Route(
                        method: "DELETE", path: "/lume/config/locations/:name",
                        handler: { _ in
                            HTTPResponse(statusCode: .ok, body: "")
                        }
                    ).extractParams(request)
                    guard let name = params["name"] else {
                        return HTTPResponse(statusCode: .badRequest, body: "Missing location name")
                    }
                    return try await self.handleRemoveLocation(name)
                }),
            
            // Logs retrieval route
            Route(
                method: "GET", path: "/lume/logs",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    
                    // Extract query parameters
                    let type = self.extractQueryParam(request: request, name: "type") // "info", "error", or "all"
                    let linesParam = self.extractQueryParam(request: request, name: "lines")
                    let lines = linesParam.flatMap { Int($0) } // Convert to Int if present
                    
                    return try await self.handleGetLogs(type: type, lines: lines)
                }),
            Route(
                method: "POST", path: "/lume/config/locations/default/:name",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    let params = Route(
                        method: "POST", path: "/lume/config/locations/default/:name",
                        handler: { _ in
                            HTTPResponse(statusCode: .ok, body: "")
                        }
                    ).extractParams(request)
                    guard let name = params["name"] else {
                        return HTTPResponse(statusCode: .badRequest, body: "Missing location name")
                    }
                    return try await self.handleSetDefaultLocation(name)
                }),
            Route(
                method: "POST", path: "/lume/vms/push",
                handler: { [weak self] request in
                    guard let self else { throw HTTPError.internalError }
                    return try await self.handlePush(request.body)
                }),
        ]
    }

    // Helper to extract query parameters from the URL
    private func extractQueryParam(request: HTTPRequest, name: String) -> String? {
        // Extract only the query part by splitting on '?'
        let parts = request.path.split(separator: "?", maxSplits: 1)
        guard parts.count > 1 else { return nil } // No query parameters
        
        let queryString = String(parts[1])
        // Create a placeholder URL with the query string
        if let urlComponents = URLComponents(string: "http://placeholder.com?"+queryString),
           let queryItems = urlComponents.queryItems
        {
            return queryItems.first(where: { $0.name == name })?.value?.removingPercentEncoding
        }
        return nil
    }

    // MARK: - Port Utilities
    private func isPortAvailable(port: Int) async -> Bool {
        // Create a socket
        let socketFD = socket(AF_INET, SOCK_STREAM, 0)
        if socketFD == -1 {
            return false
        }

        // Set socket options to allow reuse
        var value: Int32 = 1
        if setsockopt(
            socketFD, SOL_SOCKET, SO_REUSEADDR, &value, socklen_t(MemoryLayout<Int32>.size)) == -1
        {
            close(socketFD)
            return false
        }

        // Set up the address structure
        var addr = sockaddr_in()
        addr.sin_family = sa_family_t(AF_INET)
        addr.sin_port = UInt16(port).bigEndian
        addr.sin_addr.s_addr = INADDR_ANY.bigEndian

        // Bind to the port
        let bindResult = withUnsafePointer(to: &addr) { addrPtr in
            addrPtr.withMemoryRebound(to: sockaddr.self, capacity: 1) { addrPtr in
                Darwin.bind(socketFD, addrPtr, socklen_t(MemoryLayout<sockaddr_in>.size))
            }
        }

        // Clean up
        close(socketFD)

        // If bind failed, the port is in use
        return bindResult == 0
    }

    // MARK: - Server Lifecycle
    func start() async throws {
        // First check if the port is already in use
        if !(await isPortAvailable(port: Int(port.rawValue))) {
            // Don't log anything here, just throw the error
            throw PortError.alreadyInUse(port: port.rawValue)
        }

        let parameters = NWParameters.tcp
        listener = try NWListener(using: parameters, on: port)

        // Create an actor to safely manage state transitions
        actor StartupState {
            var error: Error?
            var isComplete = false

            func setError(_ error: Error) {
                self.error = error
                self.isComplete = true
            }

            func setComplete() {
                self.isComplete = true
            }

            func checkStatus() -> (isComplete: Bool, error: Error?) {
                return (isComplete, error)
            }
        }

        let startupState = StartupState()

        // Set up a state update handler to detect port binding errors
        listener?.stateUpdateHandler = { state in
            Task {
                switch state {
                case .setup:
                    // Initial state, no action needed
                    Logger.info("Listener setup", metadata: ["port": "\(self.port.rawValue)"])
                    break
                case .waiting(let error):
                    // Log the full error details to see what we're getting
                    Logger.error(
                        "Listener waiting",
                        metadata: [
                            "error": error.localizedDescription,
                            "debugDescription": error.debugDescription,
                            "localizedDescription": error.localizedDescription,
                            "port": "\(self.port.rawValue)",
                        ])

                    // Check for different port in use error messages
                    if error.debugDescription.contains("Address already in use")
                        || error.localizedDescription.contains("in use")
                        || error.localizedDescription.contains("address already in use")
                    {
                        Logger.error(
                            "Port conflict detected", metadata: ["port": "\(self.port.rawValue)"])
                        await startupState.setError(
                            PortError.alreadyInUse(port: self.port.rawValue))
                    } else {
                        // Wait for a short period to see if the listener recovers
                        // Some network errors are transient
                        try? await Task.sleep(nanoseconds: 1_000_000_000)  // 1 second

                        // If we're still waiting after delay, consider it an error
                        if case .waiting = await self.listener?.state {
                            await startupState.setError(error)
                        }
                    }
                case .failed(let error):
                    // Log the full error details
                    Logger.error(
                        "Listener failed",
                        metadata: [
                            "error": error.localizedDescription,
                            "debugDescription": error.debugDescription,
                            "port": "\(self.port.rawValue)",
                        ])
                    await startupState.setError(error)
                case .ready:
                    // Listener successfully bound to port
                    Logger.info("Listener ready", metadata: ["port": "\(self.port.rawValue)"])
                    await startupState.setComplete()
                case .cancelled:
                    // Listener was cancelled
                    Logger.info("Listener cancelled", metadata: ["port": "\(self.port.rawValue)"])
                    break
                @unknown default:
                    Logger.info(
                        "Unknown listener state",
                        metadata: ["state": "\(state)", "port": "\(self.port.rawValue)"])
                    break
                }
            }
        }

        listener?.newConnectionHandler = { [weak self] connection in
            Task { @MainActor [weak self] in
                guard let self else { return }
                self.handleConnection(connection)
            }
        }

        listener?.start(queue: .main)

        // Wait for either successful startup or an error
        var status: (isComplete: Bool, error: Error?) = (false, nil)
        repeat {
            try await Task.sleep(nanoseconds: 100_000_000)  // 100ms
            status = await startupState.checkStatus()
        } while !status.isComplete

        // If there was a startup error, throw it
        if let error = status.error {
            self.stop()
            throw error
        }

        isRunning = true

        Logger.info("Server started", metadata: ["port": "\(port.rawValue)"])

        // Keep the server running
        while isRunning {
            try await Task.sleep(nanoseconds: 1_000_000_000)
        }
    }

    func stop() {
        isRunning = false
        listener?.cancel()
    }

    // MARK: - Connection Handling
    private func handleConnection(_ connection: NWConnection) {
        connection.stateUpdateHandler = { [weak self] state in
            switch state {
            case .ready:
                Task { @MainActor [weak self] in
                    guard let self else { return }
                    self.receiveData(connection)
                }
            case .failed(let error):
                Logger.error("Connection failed", metadata: ["error": error.localizedDescription])
                connection.cancel()
            case .cancelled:
                // Connection is already cancelled, no need to cancel again
                break
            default:
                break
            }
        }
        connection.start(queue: .main)
    }

    private func receiveData(_ connection: NWConnection) {
        connection.receive(minimumIncompleteLength: 1, maximumLength: 65536) {
            [weak self] content, _, isComplete, error in
            if let error = error {
                Logger.error("Receive error", metadata: ["error": error.localizedDescription])
                connection.cancel()
                return
            }

            guard let data = content, !data.isEmpty else {
                if isComplete {
                    connection.cancel()
                }
                return
            }

            Task { @MainActor [weak self] in
                guard let self else { return }
                do {
                    let response = try await self.handleRequest(data)
                    self.send(response, on: connection)
                } catch {
                    let errorResponse = self.errorResponse(error)
                    self.send(errorResponse, on: connection)
                }
            }
        }
    }

    private func send(_ response: HTTPResponse, on connection: NWConnection) {
        let data = response.serialize()
        Logger.info(
            "Serialized response", metadata: ["data": String(data: data, encoding: .utf8) ?? ""])
        connection.send(
            content: data,
            completion: .contentProcessed { [weak connection] error in
                if let error = error {
                    Logger.error(
                        "Failed to send response", metadata: ["error": error.localizedDescription])
                } else {
                    Logger.info("Response sent successfully")
                }
                if connection?.state != .cancelled {
                    connection?.cancel()
                }
            })
    }

    // MARK: - Request Handling
    private func handleRequest(_ data: Data) async throws -> HTTPResponse {
        Logger.info(
            "Received request data", metadata: ["data": String(data: data, encoding: .utf8) ?? ""])

        guard let request = HTTPRequest(data: data) else {
            Logger.error("Failed to parse request")
            return HTTPResponse(statusCode: .badRequest, body: "Invalid request")
        }

        Logger.info(
            "Parsed request",
            metadata: [
                "method": request.method,
                "path": request.path,
                "headers": "\(request.headers)",
                "body": String(data: request.body ?? Data(), encoding: .utf8) ?? "",
            ])

        // Find matching route
        guard let route = routes.first(where: { $0.matches(request) }) else {
            return HTTPResponse(statusCode: .notFound, body: "Not found")
        }

        // Handle the request
        let response = try await route.handler(request)

        Logger.info(
            "Sending response",
            metadata: [
                "statusCode": "\(response.statusCode.rawValue)",
                "headers": "\(response.headers)",
                "body": String(data: response.body ?? Data(), encoding: .utf8) ?? "",
            ])

        return response
    }

    private func errorResponse(_ error: Error) -> HTTPResponse {
        HTTPResponse(
            statusCode: .internalServerError,
            headers: ["Content-Type": "application/json"],
            body: try! JSONEncoder().encode(APIError(message: error.localizedDescription))
        )
    }
}

```

--------------------------------------------------------------------------------
/libs/typescript/computer/tests/interface/macos.test.ts:
--------------------------------------------------------------------------------

```typescript
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { WebSocket, WebSocketServer } from 'ws';
import { MacOSComputerInterface } from '../../src/interface/macos.ts';

describe('MacOSComputerInterface', () => {
  // Define test parameters
  const testParams = {
    ipAddress: 'localhost',
    username: 'testuser',
    password: 'testpass',
    // apiKey: "test-api-key", No API Key for local testing
    vmName: 'test-vm',
  };

  // WebSocket server mock
  let wss: WebSocketServer;
  let serverPort: number;
  let connectedClients: WebSocket[] = [];

  // Track received messages for verification
  interface ReceivedMessage {
    action: string;
    [key: string]: unknown;
  }
  let receivedMessages: ReceivedMessage[] = [];

  // Set up WebSocket server before all tests
  beforeEach(async () => {
    receivedMessages = [];
    connectedClients = [];

    // Create WebSocket server on a random available port
    wss = new WebSocketServer({ port: 0 });
    serverPort = (wss.address() as { port: number }).port;

    // Update test params with the actual server address
    testParams.ipAddress = `localhost:${serverPort}`;

    // Handle WebSocket connections
    wss.on('connection', (ws) => {
      connectedClients.push(ws);

      // Handle incoming messages
      ws.on('message', (data) => {
        try {
          const message = JSON.parse(data.toString());
          receivedMessages.push(message);

          // Send appropriate responses based on action
          switch (message.command) {
            case 'screenshot':
              ws.send(
                JSON.stringify({
                  image_data: Buffer.from('fake-screenshot-data').toString(
                    'base64'
                  ),
                  success: true,
                })
              );
              break;
            case 'get_screen_size':
              ws.send(
                JSON.stringify({
                  size: { width: 1920, height: 1080 },
                  success: true,
                })
              );
              break;
            case 'get_cursor_position':
              ws.send(
                JSON.stringify({
                  position: { x: 100, y: 200 },
                  success: true,
                })
              );
              break;
            case 'copy_to_clipboard':
              ws.send(
                JSON.stringify({
                  content: 'clipboard content',
                  success: true,
                })
              );
              break;
            case 'file_exists':
              ws.send(
                JSON.stringify({
                  exists: true,
                  success: true,
                })
              );
              break;
            case 'directory_exists':
              ws.send(
                JSON.stringify({
                  exists: true,
                  success: true,
                })
              );
              break;
            case 'list_dir':
              ws.send(
                JSON.stringify({
                  files: ['file1.txt', 'file2.txt'],
                  success: true,
                })
              );
              break;
            case 'read_text':
              ws.send(
                JSON.stringify({
                  content: 'file content',
                  success: true,
                })
              );
              break;
            case 'read_bytes':
              ws.send(
                JSON.stringify({
                  content_b64: Buffer.from('binary content').toString('base64'),
                  success: true,
                })
              );
              break;
            case 'run_command':
              ws.send(
                JSON.stringify({
                  stdout: 'command output',
                  stderr: '',
                  success: true,
                })
              );
              break;
            case 'get_accessibility_tree':
              ws.send(
                JSON.stringify({
                  role: 'window',
                  title: 'Test Window',
                  bounds: { x: 0, y: 0, width: 1920, height: 1080 },
                  children: [],
                  success: true,
                })
              );
              break;
            case 'to_screen_coordinates':
            case 'to_screenshot_coordinates':
              ws.send(
                JSON.stringify({
                  coordinates: [message.params?.x || 0, message.params?.y || 0],
                  success: true,
                })
              );
              break;
            default:
              // For all other actions, just send success
              ws.send(JSON.stringify({ success: true }));
              break;
          }
        } catch (error) {
          ws.send(JSON.stringify({ error: (error as Error).message }));
        }
      });

      ws.on('error', (error) => {
        console.error('WebSocket error:', error);
      });
    });
  });

  // Clean up WebSocket server after each test
  afterEach(async () => {
    // Close all connected clients
    for (const client of connectedClients) {
      if (client.readyState === WebSocket.OPEN) {
        client.close();
      }
    }

    // Close the server
    await new Promise<void>((resolve) => {
      wss.close(() => resolve());
    });
  });

  describe('Connection Management', () => {
    it('should connect with proper authentication headers', async () => {
      const macosInterface = new MacOSComputerInterface(
        testParams.ipAddress,
        testParams.username,
        testParams.password,
        undefined,
        testParams.vmName
      );

      await macosInterface.connect();

      // Verify the interface is connected
      expect(macosInterface.isConnected()).toBe(true);
      expect(connectedClients.length).toBe(1);

      await macosInterface.disconnect();
    });

    it('should handle connection without API key', async () => {
      // Create a separate server that doesn't check auth
      const noAuthWss = new WebSocketServer({ port: 0 });
      const noAuthPort = (noAuthWss.address() as { port: number }).port;

      noAuthWss.on('connection', (ws) => {
        ws.on('message', () => {
          ws.send(JSON.stringify({ success: true }));
        });
      });

      const macosInterface = new MacOSComputerInterface(
        `localhost:${noAuthPort}`,
        testParams.username,
        testParams.password,
        undefined,
        undefined
      );

      await macosInterface.connect();
      expect(macosInterface.isConnected()).toBe(true);

      await macosInterface.disconnect();
      await new Promise<void>((resolve) => {
        noAuthWss.close(() => resolve());
      });
    });
  });

  describe('Mouse Actions', () => {
    let macosInterface: MacOSComputerInterface;

    beforeEach(async () => {
      macosInterface = new MacOSComputerInterface(
        testParams.ipAddress,
        testParams.username,
        testParams.password,
        undefined,
        testParams.vmName
      );
      await macosInterface.connect();
    });

    afterEach(async () => {
      if (macosInterface) {
        await macosInterface.disconnect();
      }
    });

    it('should send mouse_down command', async () => {
      await macosInterface.mouseDown(100, 200, 'left');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'mouse_down',
        params: {
          x: 100,
          y: 200,
          button: 'left',
        },
      });
    });

    it('should send mouse_up command', async () => {
      await macosInterface.mouseUp(100, 200, 'right');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'mouse_up',
        params: {
          x: 100,
          y: 200,
          button: 'right',
        },
      });
    });

    it('should send left_click command', async () => {
      await macosInterface.leftClick(150, 250);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'left_click',
        params: {
          x: 150,
          y: 250,
        },
      });
    });

    it('should send right_click command', async () => {
      await macosInterface.rightClick(200, 300);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'right_click',
        params: {
          x: 200,
          y: 300,
        },
      });
    });

    it('should send double_click command', async () => {
      await macosInterface.doubleClick(250, 350);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'double_click',
        params: {
          x: 250,
          y: 350,
        },
      });
    });

    it('should send move_cursor command', async () => {
      await macosInterface.moveCursor(300, 400);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'move_cursor',
        params: {
          x: 300,
          y: 400,
        },
      });
    });

    it('should send drag_to command', async () => {
      await macosInterface.dragTo(400, 500, 'left', 1.5);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'drag_to',
        params: {
          x: 400,
          y: 500,
          button: 'left',
          duration: 1.5,
        },
      });
    });

    it('should send drag command with path', async () => {
      const path: Array<[number, number]> = [
        [100, 100],
        [200, 200],
        [300, 300],
      ];
      await macosInterface.drag(path, 'middle', 2.0);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'drag',
        params: {
          path: path,
          button: 'middle',
          duration: 2.0,
        },
      });
    });
  });

  describe('Keyboard Actions', () => {
    let macosInterface: MacOSComputerInterface;

    beforeEach(async () => {
      macosInterface = new MacOSComputerInterface(
        testParams.ipAddress,
        testParams.username,
        testParams.password,
        undefined,
        testParams.vmName
      );
      await macosInterface.connect();
    });

    afterEach(async () => {
      if (macosInterface) {
        await macosInterface.disconnect();
      }
    });

    it('should send key_down command', async () => {
      await macosInterface.keyDown('a');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'key_down',
        params: {
          key: 'a',
        },
      });
    });

    it('should send key_up command', async () => {
      await macosInterface.keyUp('b');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'key_up',
        params: {
          key: 'b',
        },
      });
    });

    it('should send type_text command', async () => {
      await macosInterface.typeText('Hello, World!');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'type_text',
        params: {
          text: 'Hello, World!',
        },
      });
    });

    it('should send press_key command', async () => {
      await macosInterface.pressKey('enter');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'press_key',
        params: {
          key: 'enter',
        },
      });
    });

    it('should send hotkey command', async () => {
      await macosInterface.hotkey('cmd', 'c');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'hotkey',
        params: {
          keys: ['cmd', 'c'],
        },
      });
    });
  });

  describe('Scrolling Actions', () => {
    let macosInterface: MacOSComputerInterface;

    beforeEach(async () => {
      macosInterface = new MacOSComputerInterface(
        testParams.ipAddress,
        testParams.username,
        testParams.password,
        undefined,
        testParams.vmName
      );
      await macosInterface.connect();
    });

    afterEach(async () => {
      if (macosInterface) {
        await macosInterface.disconnect();
      }
    });

    it('should send scroll command', async () => {
      await macosInterface.scroll(10, -5);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'scroll',
        params: {
          x: 10,
          y: -5,
        },
      });
    });

    it('should send scroll_down command', async () => {
      await macosInterface.scrollDown(3);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'scroll_down',
        params: {
          clicks: 3,
        },
      });
    });

    it('should send scroll_up command', async () => {
      await macosInterface.scrollUp(2);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'scroll_up',
        params: {
          clicks: 2,
        },
      });
    });
  });

  describe('Screen Actions', () => {
    let macosInterface: MacOSComputerInterface;

    beforeEach(async () => {
      macosInterface = new MacOSComputerInterface(
        testParams.ipAddress,
        testParams.username,
        testParams.password,
        undefined,
        testParams.vmName
      );
      await macosInterface.connect();
    });

    afterEach(async () => {
      if (macosInterface) {
        await macosInterface.disconnect();
      }
    });

    it('should get screenshot', async () => {
      const screenshot = await macosInterface.screenshot();

      expect(screenshot).toBeInstanceOf(Buffer);
      expect(screenshot.toString()).toBe('fake-screenshot-data');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'screenshot',
        params: {},
      });
    });

    it('should get screen size', async () => {
      const size = await macosInterface.getScreenSize();

      expect(size).toEqual({ width: 1920, height: 1080 });

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'get_screen_size',
        params: {},
      });
    });

    it('should get cursor position', async () => {
      const position = await macosInterface.getCursorPosition();

      expect(position).toEqual({ x: 100, y: 200 });

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'get_cursor_position',
        params: {},
      });
    });
  });

  describe('Clipboard Actions', () => {
    let macosInterface: MacOSComputerInterface;

    beforeEach(async () => {
      macosInterface = new MacOSComputerInterface(
        testParams.ipAddress,
        testParams.username,
        testParams.password,
        undefined,
        testParams.vmName
      );
      await macosInterface.connect();
    });

    afterEach(async () => {
      if (macosInterface) {
        await macosInterface.disconnect();
      }
    });

    it('should copy to clipboard', async () => {
      const text = await macosInterface.copyToClipboard();

      expect(text).toBe('clipboard content');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'copy_to_clipboard',
        params: {},
      });
    });

    it('should set clipboard', async () => {
      await macosInterface.setClipboard('new clipboard text');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'set_clipboard',
        params: {
          text: 'new clipboard text',
        },
      });
    });
  });

  describe('File System Actions', () => {
    let macosInterface: MacOSComputerInterface;

    beforeEach(async () => {
      macosInterface = new MacOSComputerInterface(
        testParams.ipAddress,
        testParams.username,
        testParams.password,
        undefined,
        testParams.vmName
      );
      await macosInterface.connect();
    });

    afterEach(async () => {
      if (macosInterface) {
        await macosInterface.disconnect();
      }
    });

    it('should check file exists', async () => {
      const exists = await macosInterface.fileExists('/path/to/file');

      expect(exists).toBe(true);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'file_exists',
        params: {
          path: '/path/to/file',
        },
      });
    });

    it('should check directory exists', async () => {
      const exists = await macosInterface.directoryExists('/path/to/dir');

      expect(exists).toBe(true);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'directory_exists',
        params: {
          path: '/path/to/dir',
        },
      });
    });

    it('should list directory', async () => {
      const files = await macosInterface.listDir('/path/to/dir');

      expect(files).toEqual(['file1.txt', 'file2.txt']);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'list_dir',
        params: {
          path: '/path/to/dir',
        },
      });
    });

    it('should read text file', async () => {
      const content = await macosInterface.readText('/path/to/file.txt');

      expect(content).toBe('file content');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'read_text',
        params: {
          path: '/path/to/file.txt',
        },
      });
    });

    it('should write text file', async () => {
      await macosInterface.writeText('/path/to/file.txt', 'new content');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'write_text',
        params: {
          path: '/path/to/file.txt',
          content: 'new content',
        },
      });
    });

    it('should read binary file', async () => {
      const content = await macosInterface.readBytes('/path/to/file.bin');

      expect(content).toBeInstanceOf(Buffer);
      expect(content.toString()).toBe('binary content');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'read_bytes',
        params: {
          path: '/path/to/file.bin',
        },
      });
    });

    it('should write binary file', async () => {
      const buffer = Buffer.from('binary data');
      await macosInterface.writeBytes('/path/to/file.bin', buffer);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'write_bytes',
        params: {
          path: '/path/to/file.bin',
          content_b64: buffer.toString('base64'),
        },
      });
    });

    it('should delete file', async () => {
      await macosInterface.deleteFile('/path/to/file');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'delete_file',
        params: {
          path: '/path/to/file',
        },
      });
    });

    it('should create directory', async () => {
      await macosInterface.createDir('/path/to/new/dir');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'create_dir',
        params: {
          path: '/path/to/new/dir',
        },
      });
    });

    it('should delete directory', async () => {
      await macosInterface.deleteDir('/path/to/dir');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'delete_dir',
        params: {
          path: '/path/to/dir',
        },
      });
    });

    it('should run command', async () => {
      const [stdout, stderr] = await macosInterface.runCommand('ls -la');

      expect(stdout).toBe('command output');
      expect(stderr).toBe('');

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'run_command',
        params: {
          command: 'ls -la',
        },
      });
    });
  });

  describe('Accessibility Actions', () => {
    let macosInterface: MacOSComputerInterface;

    beforeEach(async () => {
      macosInterface = new MacOSComputerInterface(
        testParams.ipAddress,
        testParams.username,
        testParams.password,
        undefined,
        testParams.vmName
      );
      await macosInterface.connect();
    });

    afterEach(async () => {
      if (macosInterface) {
        await macosInterface.disconnect();
      }
    });

    it('should get accessibility tree', async () => {
      const tree = await macosInterface.getAccessibilityTree();

      expect(tree).toEqual({
        role: 'window',
        title: 'Test Window',
        bounds: { x: 0, y: 0, width: 1920, height: 1080 },
        children: [],
        success: true,
      });

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'get_accessibility_tree',
        params: {},
      });
    });

    it('should convert to screen coordinates', async () => {
      const [x, y] = await macosInterface.toScreenCoordinates(100, 200);

      expect(x).toBe(100);
      expect(y).toBe(200);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'to_screen_coordinates',
        params: {
          x: 100,
          y: 200,
        },
      });
    });

    it('should convert to screenshot coordinates', async () => {
      const [x, y] = await macosInterface.toScreenshotCoordinates(300, 400);

      expect(x).toBe(300);
      expect(y).toBe(400);

      const lastMessage = receivedMessages[receivedMessages.length - 1];
      expect(lastMessage).toEqual({
        command: 'to_screenshot_coordinates',
        params: {
          x: 300,
          y: 400,
        },
      });
    });
  });

  describe('Error Handling', () => {
    it('should handle WebSocket connection errors', async () => {
      // Use a valid but unreachable IP to avoid DNS errors
      const macosInterface = new MacOSComputerInterface(
        'localhost:9999',
        testParams.username,
        testParams.password,
        undefined,
        testParams.vmName
      );

      // Connection should fail
      await expect(macosInterface.connect()).rejects.toThrow();
    });

    it('should handle command errors', async () => {
      // Create a server that returns errors
      const errorWss = new WebSocketServer({ port: 0 });
      const errorPort = (errorWss.address() as { port: number }).port;

      errorWss.on('connection', (ws) => {
        ws.on('message', () => {
          ws.send(JSON.stringify({ error: 'Command failed', success: false }));
        });
      });

      const macosInterface = new MacOSComputerInterface(
        `localhost:${errorPort}`,
        testParams.username,
        testParams.password,
        undefined,
        testParams.vmName
      );

      await macosInterface.connect();

      // Command should throw error
      await expect(macosInterface.leftClick(100, 100)).rejects.toThrow(
        'Command failed'
      );

      await macosInterface.disconnect();
      await new Promise<void>((resolve) => {
        errorWss.close(() => resolve());
      });
    });

    it('should handle disconnection gracefully', async () => {
      const macosInterface = new MacOSComputerInterface(
        testParams.ipAddress,
        testParams.username,
        testParams.password,
        undefined,
        testParams.vmName
      );

      await macosInterface.connect();
      expect(macosInterface.isConnected()).toBe(true);

      // Disconnect
      macosInterface.disconnect();
      expect(macosInterface.isConnected()).toBe(false);

      // Should reconnect automatically on next command
      await macosInterface.leftClick(100, 100);
      expect(macosInterface.isConnected()).toBe(true);

      await macosInterface.disconnect();
    });

    it('should handle force close', async () => {
      const macosInterface = new MacOSComputerInterface(
        testParams.ipAddress,
        testParams.username,
        testParams.password,
        undefined,
        testParams.vmName
      );

      await macosInterface.connect();
      expect(macosInterface.isConnected()).toBe(true);

      // Force close
      macosInterface.forceClose();
      expect(macosInterface.isConnected()).toBe(false);
    });
  });
});

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/handlers/windows.py:
--------------------------------------------------------------------------------

```python
"""
Windows implementation of automation and accessibility handlers.

This implementation uses pyautogui for GUI automation and Windows-specific APIs
for accessibility and system operations.
"""
from typing import Dict, Any, List, Tuple, Optional
import logging
import subprocess
import asyncio
import base64
import os
from io import BytesIO
from pynput.mouse import Controller as MouseController
from pynput.keyboard import Controller as KeyboardController

# Configure logger
logger = logging.getLogger(__name__)

# Try to import pyautogui
try:
    import pyautogui
    pyautogui.FAILSAFE = False
    logger.info("pyautogui successfully imported, GUI automation available")
except Exception as e:
    logger.error(f"pyautogui import failed: {str(e)}. GUI operations will not work.")
    pyautogui = None

# Try to import Windows-specific modules
try:
    import win32gui
    import win32con
    import win32api
    logger.info("Windows API modules successfully imported")
    WINDOWS_API_AVAILABLE = True
except Exception as e:
    logger.error(f"Windows API modules import failed: {str(e)}. Some Windows-specific features will be unavailable.")
    WINDOWS_API_AVAILABLE = False

from .base import BaseAccessibilityHandler, BaseAutomationHandler

class WindowsAccessibilityHandler(BaseAccessibilityHandler):
    """Windows implementation of accessibility handler."""
    
    async def get_accessibility_tree(self) -> Dict[str, Any]:
        """Get the accessibility tree of the current window.
        
        Returns:
            Dict[str, Any]: A dictionary containing the success status and either
                           the accessibility tree or an error message.
                           Structure: {"success": bool, "tree": dict} or 
                                    {"success": bool, "error": str}
        """
        if not WINDOWS_API_AVAILABLE:
            return {"success": False, "error": "Windows API not available"}
        
        try:
            # Get the foreground window
            hwnd = win32gui.GetForegroundWindow()
            if not hwnd:
                return {"success": False, "error": "No foreground window found"}
            
            # Get window information
            window_text = win32gui.GetWindowText(hwnd)
            rect = win32gui.GetWindowRect(hwnd)
            
            tree = {
                "role": "Window",
                "title": window_text,
                "position": {"x": rect[0], "y": rect[1]},
                "size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]},
                "children": []
            }
            
            # Enumerate child windows
            def enum_child_proc(hwnd_child, children_list):
                """Callback function to enumerate child windows and collect their information.
                
                Args:
                    hwnd_child: Handle to the child window being enumerated.
                    children_list: List to append child window information to.
                    
                Returns:
                    bool: True to continue enumeration, False to stop.
                """
                try:
                    child_text = win32gui.GetWindowText(hwnd_child)
                    child_rect = win32gui.GetWindowRect(hwnd_child)
                    child_class = win32gui.GetClassName(hwnd_child)
                    
                    child_info = {
                        "role": child_class,
                        "title": child_text,
                        "position": {"x": child_rect[0], "y": child_rect[1]},
                        "size": {"width": child_rect[2] - child_rect[0], "height": child_rect[3] - child_rect[1]},
                        "children": []
                    }
                    children_list.append(child_info)
                except Exception as e:
                    logger.debug(f"Error getting child window info: {e}")
                return True
            
            win32gui.EnumChildWindows(hwnd, enum_child_proc, tree["children"])
            
            return {"success": True, "tree": tree}
            
        except Exception as e:
            logger.error(f"Error getting accessibility tree: {e}")
            return {"success": False, "error": str(e)}
    
    async def find_element(self, role: Optional[str] = None,
                          title: Optional[str] = None,
                          value: Optional[str] = None) -> Dict[str, Any]:
        """Find an element in the accessibility tree by criteria.
        
        Args:
            role (Optional[str]): The role or class name of the element to find.
            title (Optional[str]): The title or text of the element to find.
            value (Optional[str]): The value of the element (not used in Windows implementation).
            
        Returns:
            Dict[str, Any]: A dictionary containing the success status and either
                           the found element or an error message.
                           Structure: {"success": bool, "element": dict} or 
                                    {"success": bool, "error": str}
        """
        if not WINDOWS_API_AVAILABLE:
            return {"success": False, "error": "Windows API not available"}
        
        try:
            # Find window by title if specified
            if title:
                hwnd = win32gui.FindWindow(None, title)
                if hwnd:
                    rect = win32gui.GetWindowRect(hwnd)
                    return {
                        "success": True,
                        "element": {
                            "role": "Window",
                            "title": title,
                            "position": {"x": rect[0], "y": rect[1]},
                            "size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]}
                        }
                    }
            
            # Find window by class name if role is specified
            if role:
                hwnd = win32gui.FindWindow(role, None)
                if hwnd:
                    window_text = win32gui.GetWindowText(hwnd)
                    rect = win32gui.GetWindowRect(hwnd)
                    return {
                        "success": True,
                        "element": {
                            "role": role,
                            "title": window_text,
                            "position": {"x": rect[0], "y": rect[1]},
                            "size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]}
                        }
                    }
            
            return {"success": False, "error": "Element not found"}
            
        except Exception as e:
            logger.error(f"Error finding element: {e}")
            return {"success": False, "error": str(e)}

class WindowsAutomationHandler(BaseAutomationHandler):
    """Windows implementation of automation handler using pyautogui and Windows APIs."""
    
    mouse = MouseController()
    keyboard = KeyboardController()

    # Mouse Actions
    async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
        """Press and hold a mouse button at the specified coordinates.
        
        Args:
            x (Optional[int]): The x-coordinate to move to before pressing. If None, uses current position.
            y (Optional[int]): The y-coordinate to move to before pressing. If None, uses current position.
            button (str): The mouse button to press ("left", "right", or "middle").
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            if x is not None and y is not None:
                pyautogui.moveTo(x, y)
            pyautogui.mouseDown(button=button)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
        """Release a mouse button at the specified coordinates.
        
        Args:
            x (Optional[int]): The x-coordinate to move to before releasing. If None, uses current position.
            y (Optional[int]): The y-coordinate to move to before releasing. If None, uses current position.
            button (str): The mouse button to release ("left", "right", or "middle").
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            if x is not None and y is not None:
                pyautogui.moveTo(x, y)
            pyautogui.mouseUp(button=button)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
        """Move the mouse cursor to the specified coordinates.
        
        Args:
            x (int): The x-coordinate to move to.
            y (int): The y-coordinate to move to.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            pyautogui.moveTo(x, y)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
        """Perform a left mouse click at the specified coordinates.
        
        Args:
            x (Optional[int]): The x-coordinate to click at. If None, clicks at current position.
            y (Optional[int]): The y-coordinate to click at. If None, clicks at current position.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            if x is not None and y is not None:
                pyautogui.moveTo(x, y)
            pyautogui.click()
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
        """Perform a right mouse click at the specified coordinates.
        
        Args:
            x (Optional[int]): The x-coordinate to click at. If None, clicks at current position.
            y (Optional[int]): The y-coordinate to click at. If None, clicks at current position.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            if x is not None and y is not None:
                pyautogui.moveTo(x, y)
            pyautogui.rightClick()
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
        """Perform a double left mouse click at the specified coordinates.
        
        Args:
            x (Optional[int]): The x-coordinate to double-click at. If None, clicks at current position.
            y (Optional[int]): The y-coordinate to double-click at. If None, clicks at current position.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            if x is not None and y is not None:
                pyautogui.moveTo(x, y)
            pyautogui.doubleClick(interval=0.1)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
        """Drag from the current position to the specified coordinates.
        
        Args:
            x (int): The x-coordinate to drag to.
            y (int): The y-coordinate to drag to.
            button (str): The mouse button to use for dragging ("left", "right", or "middle").
            duration (float): The time in seconds to take for the drag operation.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            pyautogui.dragTo(x, y, duration=duration, button=button)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
        """Drag the mouse through a series of coordinates.
        
        Args:
            path (List[Tuple[int, int]]): A list of (x, y) coordinate tuples to drag through.
            button (str): The mouse button to use for dragging ("left", "right", or "middle").
            duration (float): The total time in seconds for the entire drag operation.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            if not path:
                return {"success": False, "error": "Path is empty"}
            
            # Move to first position
            pyautogui.moveTo(*path[0])
            
            # Drag through all positions
            for x, y in path[1:]:
                pyautogui.dragTo(x, y, duration=duration/len(path), button=button)
            
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Keyboard Actions
    async def key_down(self, key: str) -> Dict[str, Any]:
        """Press and hold a keyboard key.
        
        Args:
            key (str): The key to press down (e.g., 'ctrl', 'shift', 'a').
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            pyautogui.keyDown(key)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}
        
    async def key_up(self, key: str) -> Dict[str, Any]:
        """Release a keyboard key.
        
        Args:
            key (str): The key to release (e.g., 'ctrl', 'shift', 'a').
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            pyautogui.keyUp(key)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def type_text(self, text: str) -> Dict[str, Any]:
        """Type the specified text.
        
        Args:
            text (str): The text to type.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        try:
            # use pynput for Unicode support
            self.keyboard.type(text)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def press_key(self, key: str) -> Dict[str, Any]:
        """Press and release a keyboard key.
        
        Args:
            key (str): The key to press (e.g., 'enter', 'space', 'tab').
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            pyautogui.press(key)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
        """Press a combination of keys simultaneously.
        
        Args:
            keys (List[str]): The keys to press together (e.g., ['ctrl', 'c'], ['alt', 'tab']).
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            pyautogui.hotkey(*keys)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Scrolling Actions
    async def scroll(self, x: int, y: int) -> Dict[str, Any]:
        """Scroll vertically at the current cursor position.
        
        Args:
            x (int): Horizontal scroll amount (not used in pyautogui implementation).
            y (int): Vertical scroll amount. Positive values scroll up, negative values scroll down.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            self.mouse.scroll(x, y)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
        """Scroll down by the specified number of clicks.
        
        Args:
            clicks (int): The number of scroll clicks to perform downward.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            pyautogui.scroll(-clicks)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
        """Scroll up by the specified number of clicks.
        
        Args:
            clicks (int): The number of scroll clicks to perform upward.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            pyautogui.scroll(clicks)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Screen Actions
    async def screenshot(self) -> Dict[str, Any]:
        """Capture a screenshot of the entire screen.
        
        Returns:
            Dict[str, Any]: A dictionary containing the success status and either
                           base64-encoded image data or an error message.
                           Structure: {"success": bool, "image_data": str} or 
                                    {"success": bool, "error": str}
        """
        if not pyautogui:
            return {"success": False, "error": "pyautogui not available"}
        
        try:
            from PIL import Image
            screenshot = pyautogui.screenshot()
            if not isinstance(screenshot, Image.Image):
                return {"success": False, "error": "Failed to capture screenshot"}
            
            buffered = BytesIO()
            screenshot.save(buffered, format="PNG", optimize=True)
            buffered.seek(0)
            image_data = base64.b64encode(buffered.getvalue()).decode()
            return {"success": True, "image_data": image_data}
        except Exception as e:
            return {"success": False, "error": f"Screenshot error: {str(e)}"}

    async def get_screen_size(self) -> Dict[str, Any]:
        """Get the size of the screen in pixels.
        
        Returns:
            Dict[str, Any]: A dictionary containing the success status and either
                           screen size information or an error message.
                           Structure: {"success": bool, "size": {"width": int, "height": int}} or 
                                    {"success": bool, "error": str}
        """
        try:
            if pyautogui:
                size = pyautogui.size()
                return {"success": True, "size": {"width": size.width, "height": size.height}}
            elif WINDOWS_API_AVAILABLE:
                # Fallback to Windows API
                width = win32api.GetSystemMetrics(win32con.SM_CXSCREEN)
                height = win32api.GetSystemMetrics(win32con.SM_CYSCREEN)
                return {"success": True, "size": {"width": width, "height": height}}
            else:
                return {"success": False, "error": "No screen size detection method available"}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def get_cursor_position(self) -> Dict[str, Any]:
        """Get the current position of the mouse cursor.
        
        Returns:
            Dict[str, Any]: A dictionary containing the success status and either
                           cursor position or an error message.
                           Structure: {"success": bool, "position": {"x": int, "y": int}} or 
                                    {"success": bool, "error": str}
        """
        try:
            if pyautogui:
                pos = pyautogui.position()
                return {"success": True, "position": {"x": pos.x, "y": pos.y}}
            elif WINDOWS_API_AVAILABLE:
                # Fallback to Windows API
                pos = win32gui.GetCursorPos()
                return {"success": True, "position": {"x": pos[0], "y": pos[1]}}
            else:
                return {"success": False, "error": "No cursor position detection method available"}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Clipboard Actions
    async def copy_to_clipboard(self) -> Dict[str, Any]:
        """Get the current content of the clipboard.
        
        Returns:
            Dict[str, Any]: A dictionary containing the success status and either
                           clipboard content or an error message.
                           Structure: {"success": bool, "content": str} or 
                                    {"success": bool, "error": str}
        """
        try:
            import pyperclip
            content = pyperclip.paste()
            return {"success": True, "content": content}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def set_clipboard(self, text: str) -> Dict[str, Any]:
        """Set the clipboard content to the specified text.
        
        Args:
            text (str): The text to copy to the clipboard.
            
        Returns:
            Dict[str, Any]: A dictionary with success status and optional error message.
        """
        try:
            import pyperclip
            pyperclip.copy(text)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Command Execution
    async def run_command(self, command: str) -> Dict[str, Any]:
        """Execute a shell command asynchronously.
        
        Args:
            command (str): The shell command to execute.
            
        Returns:
            Dict[str, Any]: A dictionary containing the success status and either
                           command output or an error message.
                           Structure: {"success": bool, "stdout": str, "stderr": str, "return_code": int} or 
                                    {"success": bool, "error": str}
        """
        try:
            # Create subprocess
            process = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            # Wait for the subprocess to finish
            stdout, stderr = await process.communicate()
            # Return decoded output
            return {
                "success": True, 
                "stdout": stdout.decode() if stdout else "", 
                "stderr": stderr.decode() if stderr else "", 
                "return_code": process.returncode
            }
        except Exception as e:
            return {"success": False, "error": str(e)}

```
Page 11/16FirstPrevNextLast