#
tokens: 46342/50000 4/513 files (page 14/16)
lines: off (toggle) GitHub
raw markdown copy
This is page 14 of 16. Use http://codebase.md/trycua/cua?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .all-contributorsrc
├── .cursorignore
├── .devcontainer
│   ├── devcontainer.json
│   ├── post-install.sh
│   └── README.md
├── .dockerignore
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── ci-lume.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-pylume.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       └── test-validation-script.yml
├── .gitignore
├── .vscode
│   ├── docs.code-workspace
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── composite-agents.md
│   ├── cua-hackathon.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .gitignore
│   ├── .prettierrc
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   └── meta.json
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── meta.json
│   │       │   └── sandboxed-python.mdx
│   │       ├── index.mdx
│   │       ├── libraries
│   │       │   ├── agent
│   │       │   │   └── index.mdx
│   │       │   ├── computer
│   │       │   │   └── index.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── core
│   │       │   │   └── index.mdx
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   └── som
│   │       │       ├── configuration.mdx
│   │       │       └── index.mdx
│   │       ├── meta.json
│   │       ├── quickstart-cli.mdx
│   │       ├── quickstart-devs.mdx
│   │       └── telemetry.mdx
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   └── llms.txt
│   │   │       └── route.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── iou.tsx
│   │   │   └── mermaid.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   └── mdx-components.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── .prettierrc
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   └── uitars.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   └── test_connection.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── scripts
│   │   │       ├── install_mcp_server.sh
│   │   │       └── start_mcp_server.sh
│   │   ├── pylume
│   │   │   ├── __init__.py
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── pylume
│   │   │   │   ├── __init__.py
│   │   │   │   ├── client.py
│   │   │   │   ├── exceptions.py
│   │   │   │   ├── lume
│   │   │   │   ├── models.py
│   │   │   │   ├── pylume.py
│   │   │   │   └── server.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           └── test_omniparser.py
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── biome.json
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Dockerfile
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── pylume_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── pdm.lock
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── samples
│   └── community
│       ├── global-online
│       │   └── README.md
│       └── hack-the-north
│           └── README.md
├── scripts
│   ├── build-uv.sh
│   ├── build.ps1
│   ├── build.sh
│   ├── cleanup.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   └── run-docker-dev.sh
└── tests
    ├── pytest.ini
    ├── shell_cmd.py
    ├── test_files.py
    ├── test_mcp_server_session_management.py
    ├── test_mcp_server_streaming.py
    ├── test_shell_bash.py
    ├── test_telemetry.py
    ├── test_venv.py
    └── test_watchdog.py
```

# Files

--------------------------------------------------------------------------------
/libs/python/computer/computer/interface/generic.py:
--------------------------------------------------------------------------------

```python
import asyncio
import json
import time
from typing import Any, Dict, List, Optional, Tuple
from PIL import Image

import websockets
import aiohttp

from ..logger import Logger, LogLevel
from .base import BaseComputerInterface
from ..utils import decode_base64_image, encode_base64_image, bytes_to_image, draw_box, resize_image
from .models import Key, KeyType, MouseButton, CommandResult


class GenericComputerInterface(BaseComputerInterface):
    """Generic interface with common functionality for all supported platforms (Windows, Linux, macOS)."""

    def __init__(self, ip_address: str, username: str = "lume", password: str = "lume", api_key: Optional[str] = None, vm_name: Optional[str] = None, logger_name: str = "computer.interface.generic"):
        super().__init__(ip_address, username, password, api_key, vm_name)
        self._ws = None
        self._reconnect_task = None
        self._closed = False
        self._last_ping = 0
        self._ping_interval = 5  # Send ping every 5 seconds
        self._ping_timeout = 120  # Wait 120 seconds for pong response
        self._reconnect_delay = 1  # Start with 1 second delay
        self._max_reconnect_delay = 30  # Maximum delay between reconnection attempts
        self._log_connection_attempts = True  # Flag to control connection attempt logging
        self._authenticated = False  # Track authentication status
        self._recv_lock = asyncio.Lock()  # Lock to ensure only one recv at a time

        # Set logger name for the interface
        self.logger = Logger(logger_name, LogLevel.NORMAL)

        # Optional default delay time between commands (in seconds)
        self.delay = 0.0

    async def _handle_delay(self, delay: Optional[float] = None):
        """Handle delay between commands using async sleep.
        
        Args:
            delay: Optional delay in seconds. If None, uses self.delay.
        """
        if delay is not None:
            if isinstance(delay, float) or isinstance(delay, int) and delay > 0:
                await asyncio.sleep(delay)
        elif isinstance(self.delay, float) or isinstance(self.delay, int) and self.delay > 0:
            await asyncio.sleep(self.delay)

    @property
    def ws_uri(self) -> str:
        """Get the WebSocket URI using the current IP address.
        
        Returns:
            WebSocket URI for the Computer API Server
        """
        protocol = "wss" if self.api_key else "ws"
        port = "8443" if self.api_key else "8000"
        return f"{protocol}://{self.ip_address}:{port}/ws"
    
    @property
    def rest_uri(self) -> str:
        """Get the REST URI using the current IP address.
        
        Returns:
            REST URI for the Computer API Server
        """
        protocol = "https" if self.api_key else "http"
        port = "8443" if self.api_key else "8000"
        return f"{protocol}://{self.ip_address}:{port}/cmd"

    # Mouse actions
    async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left", delay: Optional[float] = None) -> None:
        await self._send_command("mouse_down", {"x": x, "y": y, "button": button})
        await self._handle_delay(delay)
    
    async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left", delay: Optional[float] = None) -> None:
        await self._send_command("mouse_up", {"x": x, "y": y, "button": button})
        await self._handle_delay(delay)
    
    async def left_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None:
        await self._send_command("left_click", {"x": x, "y": y})
        await self._handle_delay(delay)

    async def right_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None:
        await self._send_command("right_click", {"x": x, "y": y})
        await self._handle_delay(delay)

    async def double_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None:
        await self._send_command("double_click", {"x": x, "y": y})
        await self._handle_delay(delay)

    async def move_cursor(self, x: int, y: int, delay: Optional[float] = None) -> None:
        await self._send_command("move_cursor", {"x": x, "y": y})
        await self._handle_delay(delay)

    async def drag_to(self, x: int, y: int, button: "MouseButton" = "left", duration: float = 0.5, delay: Optional[float] = None) -> None:
        await self._send_command(
            "drag_to", {"x": x, "y": y, "button": button, "duration": duration}
        )
        await self._handle_delay(delay)

    async def drag(self, path: List[Tuple[int, int]], button: "MouseButton" = "left", duration: float = 0.5, delay: Optional[float] = None) -> None:
        await self._send_command(
            "drag", {"path": path, "button": button, "duration": duration}
        )
        await self._handle_delay(delay)

    # Keyboard Actions
    async def key_down(self, key: "KeyType", delay: Optional[float] = None) -> None:
        await self._send_command("key_down", {"key": key})
        await self._handle_delay(delay)
    
    async def key_up(self, key: "KeyType", delay: Optional[float] = None) -> None:
        await self._send_command("key_up", {"key": key})
        await self._handle_delay(delay)
    
    async def type_text(self, text: str, delay: Optional[float] = None) -> None:
        # Temporary fix for https://github.com/trycua/cua/issues/165
        # Check if text contains Unicode characters
        if any(ord(char) > 127 for char in text):
            # For Unicode text, use clipboard and paste
            await self.set_clipboard(text)
            await self.hotkey(Key.COMMAND, 'v')
        else:
            # For ASCII text, use the regular typing method
            await self._send_command("type_text", {"text": text})
        await self._handle_delay(delay)

    async def press(self, key: "KeyType", delay: Optional[float] = None) -> None:
        """Press a single key.

        Args:
            key: The key to press. Can be any of:
                - A Key enum value (recommended), e.g. Key.PAGE_DOWN
                - A direct key value string, e.g. 'pagedown'
                - A single character string, e.g. 'a'

        Examples:
            ```python
            # Using enum (recommended)
            await interface.press(Key.PAGE_DOWN)
            await interface.press(Key.ENTER)

            # Using direct values
            await interface.press('pagedown')
            await interface.press('enter')

            # Using single characters
            await interface.press('a')
            ```

        Raises:
            ValueError: If the key type is invalid or the key is not recognized
        """
        if isinstance(key, Key):
            actual_key = key.value
        elif isinstance(key, str):
            # Try to convert to enum if it matches a known key
            key_or_enum = Key.from_string(key)
            actual_key = key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum
        else:
            raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.")

        await self._send_command("press_key", {"key": actual_key})
        await self._handle_delay(delay)

    async def press_key(self, key: "KeyType", delay: Optional[float] = None) -> None:
        """DEPRECATED: Use press() instead.

        This method is kept for backward compatibility but will be removed in a future version.
        Please use the press() method instead.
        """
        await self.press(key, delay)

    async def hotkey(self, *keys: "KeyType", delay: Optional[float] = None) -> None:
        """Press multiple keys simultaneously.

        Args:
            *keys: Multiple keys to press simultaneously. Each key can be any of:
                - A Key enum value (recommended), e.g. Key.COMMAND
                - A direct key value string, e.g. 'command'
                - A single character string, e.g. 'a'

        Examples:
            ```python
            # Using enums (recommended)
            await interface.hotkey(Key.COMMAND, Key.C)  # Copy
            await interface.hotkey(Key.COMMAND, Key.V)  # Paste

            # Using mixed formats
            await interface.hotkey(Key.COMMAND, 'a')  # Select all
            ```

        Raises:
            ValueError: If any key type is invalid or not recognized
        """
        actual_keys = []
        for key in keys:
            if isinstance(key, Key):
                actual_keys.append(key.value)
            elif isinstance(key, str):
                # Try to convert to enum if it matches a known key
                key_or_enum = Key.from_string(key)
                actual_keys.append(key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum)
            else:
                raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.")
        
        await self._send_command("hotkey", {"keys": actual_keys})
        await self._handle_delay(delay)

    # Scrolling Actions
    async def scroll(self, x: int, y: int, delay: Optional[float] = None) -> None:
        await self._send_command("scroll", {"x": x, "y": y})
        await self._handle_delay(delay)
    
    async def scroll_down(self, clicks: int = 1, delay: Optional[float] = None) -> None:
        await self._send_command("scroll_down", {"clicks": clicks})
        await self._handle_delay(delay)
    
    async def scroll_up(self, clicks: int = 1, delay: Optional[float] = None) -> None:
        await self._send_command("scroll_up", {"clicks": clicks})
        await self._handle_delay(delay)

    # Screen actions
    async def screenshot(
        self,
        boxes: Optional[List[Tuple[int, int, int, int]]] = None,
        box_color: str = "#FF0000",
        box_thickness: int = 2,
        scale_factor: float = 1.0,
    ) -> bytes:
        """Take a screenshot with optional box drawing and scaling.

        Args:
            boxes: Optional list of (x, y, width, height) tuples defining boxes to draw in screen coordinates
            box_color: Color of the boxes in hex format (default: "#FF0000" red)
            box_thickness: Thickness of the box borders in pixels (default: 2)
            scale_factor: Factor to scale the final image by (default: 1.0)
                         Use > 1.0 to enlarge, < 1.0 to shrink (e.g., 0.5 for half size, 2.0 for double)

        Returns:
            bytes: The screenshot image data, optionally with boxes drawn on it and scaled
        """
        result = await self._send_command("screenshot")
        if not result.get("image_data"):
            raise RuntimeError("Failed to take screenshot, no image data received from server")

        screenshot = decode_base64_image(result["image_data"])

        if boxes:
            # Get the natural scaling between screen and screenshot
            screen_size = await self.get_screen_size()
            screenshot_width, screenshot_height = bytes_to_image(screenshot).size
            width_scale = screenshot_width / screen_size["width"]
            height_scale = screenshot_height / screen_size["height"]

            # Scale box coordinates from screen space to screenshot space
            for box in boxes:
                scaled_box = (
                    int(box[0] * width_scale),  # x
                    int(box[1] * height_scale),  # y
                    int(box[2] * width_scale),  # width
                    int(box[3] * height_scale),  # height
                )
                screenshot = draw_box(
                    screenshot,
                    x=scaled_box[0],
                    y=scaled_box[1],
                    width=scaled_box[2],
                    height=scaled_box[3],
                    color=box_color,
                    thickness=box_thickness,
                )

        if scale_factor != 1.0:
            screenshot = resize_image(screenshot, scale_factor)

        return screenshot

    async def get_screen_size(self) -> Dict[str, int]:
        result = await self._send_command("get_screen_size")
        if result["success"] and result["size"]:
            return result["size"]
        raise RuntimeError("Failed to get screen size")

    async def get_cursor_position(self) -> Dict[str, int]:
        result = await self._send_command("get_cursor_position")
        if result["success"] and result["position"]:
            return result["position"]
        raise RuntimeError("Failed to get cursor position")

    # Clipboard Actions
    async def copy_to_clipboard(self) -> str:
        result = await self._send_command("copy_to_clipboard")
        if result["success"] and result["content"]:
            return result["content"]
        raise RuntimeError("Failed to get clipboard content")

    async def set_clipboard(self, text: str) -> None:
        await self._send_command("set_clipboard", {"text": text})

    # File Operations
    async def _write_bytes_chunked(self, path: str, content: bytes, append: bool = False, chunk_size: int = 1024 * 1024) -> None:
        """Write large files in chunks to avoid memory issues."""
        total_size = len(content)
        current_offset = 0
        
        while current_offset < total_size:
            chunk_end = min(current_offset + chunk_size, total_size)
            chunk_data = content[current_offset:chunk_end]
            
            # First chunk uses the original append flag, subsequent chunks always append
            chunk_append = append if current_offset == 0 else True
            
            result = await self._send_command("write_bytes", {
                "path": path,
                "content_b64": encode_base64_image(chunk_data),
                "append": chunk_append
            })
            
            if not result.get("success", False):
                raise RuntimeError(result.get("error", "Failed to write file chunk"))
            
            current_offset = chunk_end

    async def write_bytes(self, path: str, content: bytes, append: bool = False) -> None:
        # For large files, use chunked writing
        if len(content) > 5 * 1024 * 1024:  # 5MB threshold
            await self._write_bytes_chunked(path, content, append)
            return
        
        result = await self._send_command("write_bytes", {"path": path, "content_b64": encode_base64_image(content), "append": append})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to write file"))

    async def _read_bytes_chunked(self, path: str, offset: int, total_length: int, chunk_size: int = 1024 * 1024) -> bytes:
        """Read large files in chunks to avoid memory issues."""
        chunks = []
        current_offset = offset
        remaining = total_length
        
        while remaining > 0:
            read_size = min(chunk_size, remaining)
            result = await self._send_command("read_bytes", {
                "path": path,
                "offset": current_offset,
                "length": read_size
            })
            
            if not result.get("success", False):
                raise RuntimeError(result.get("error", "Failed to read file chunk"))
            
            content_b64 = result.get("content_b64", "")
            chunk_data = decode_base64_image(content_b64)
            chunks.append(chunk_data)
            
            current_offset += read_size
            remaining -= read_size
        
        return b''.join(chunks)

    async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> bytes:
        # For large files, use chunked reading
        if length is None:
            # Get file size first to determine if we need chunking
            file_size = await self.get_file_size(path)
            # If file is larger than 5MB, read in chunks
            if file_size > 5 * 1024 * 1024:  # 5MB threshold
                return await self._read_bytes_chunked(path, offset, file_size - offset if offset > 0 else file_size)
        
        result = await self._send_command("read_bytes", {
            "path": path, 
            "offset": offset, 
            "length": length
        })
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to read file"))
        content_b64 = result.get("content_b64", "")
        return decode_base64_image(content_b64)

    async def read_text(self, path: str, encoding: str = 'utf-8') -> str:
        """Read text from a file with specified encoding.
        
        Args:
            path: Path to the file to read
            encoding: Text encoding to use (default: 'utf-8')
            
        Returns:
            str: The decoded text content of the file
        """
        content_bytes = await self.read_bytes(path)
        return content_bytes.decode(encoding)

    async def write_text(self, path: str, content: str, encoding: str = 'utf-8', append: bool = False) -> None:
        """Write text to a file with specified encoding.
        
        Args:
            path: Path to the file to write
            content: Text content to write
            encoding: Text encoding to use (default: 'utf-8')
            append: Whether to append to the file instead of overwriting
        """
        content_bytes = content.encode(encoding)
        await self.write_bytes(path, content_bytes, append)

    async def get_file_size(self, path: str) -> int:
        result = await self._send_command("get_file_size", {"path": path})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to get file size"))
        return result.get("size", 0)

    async def file_exists(self, path: str) -> bool:
        result = await self._send_command("file_exists", {"path": path})
        return result.get("exists", False)

    async def directory_exists(self, path: str) -> bool:
        result = await self._send_command("directory_exists", {"path": path})
        return result.get("exists", False)

    async def create_dir(self, path: str) -> None:
        result = await self._send_command("create_dir", {"path": path})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to create directory"))

    async def delete_file(self, path: str) -> None:
        result = await self._send_command("delete_file", {"path": path})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to delete file"))

    async def delete_dir(self, path: str) -> None:
        result = await self._send_command("delete_dir", {"path": path})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to delete directory"))

    async def list_dir(self, path: str) -> list[str]:
        result = await self._send_command("list_dir", {"path": path})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to list directory"))
        return result.get("files", [])

    # Command execution
    async def run_command(self, command: str) -> CommandResult:
        result = await self._send_command("run_command", {"command": command})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to run command"))
        return CommandResult(
            stdout=result.get("stdout", ""),
            stderr=result.get("stderr", ""),
            returncode=result.get("return_code", 0)
        )

    # Accessibility Actions
    async def get_accessibility_tree(self) -> Dict[str, Any]:
        """Get the accessibility tree of the current screen."""
        result = await self._send_command("get_accessibility_tree")
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to get accessibility tree"))
        return result
    
    async def get_active_window_bounds(self) -> Dict[str, int]:
        """Get the bounds of the currently active window."""
        result = await self._send_command("get_active_window_bounds")
        if result["success"] and result["bounds"]:
            return result["bounds"]
        raise RuntimeError("Failed to get active window bounds")

    async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
        """Convert screenshot coordinates to screen coordinates.

        Args:
            x: X coordinate in screenshot space
            y: Y coordinate in screenshot space

        Returns:
            tuple[float, float]: (x, y) coordinates in screen space
        """
        screen_size = await self.get_screen_size()
        screenshot = await self.screenshot()
        screenshot_img = bytes_to_image(screenshot)
        screenshot_width, screenshot_height = screenshot_img.size

        # Calculate scaling factors
        width_scale = screen_size["width"] / screenshot_width
        height_scale = screen_size["height"] / screenshot_height

        # Convert coordinates
        screen_x = x * width_scale
        screen_y = y * height_scale

        return screen_x, screen_y

    async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
        """Convert screen coordinates to screenshot coordinates.

        Args:
            x: X coordinate in screen space
            y: Y coordinate in screen space

        Returns:
            tuple[float, float]: (x, y) coordinates in screenshot space
        """
        screen_size = await self.get_screen_size()
        screenshot = await self.screenshot()
        screenshot_img = bytes_to_image(screenshot)
        screenshot_width, screenshot_height = screenshot_img.size

        # Calculate scaling factors
        width_scale = screenshot_width / screen_size["width"]
        height_scale = screenshot_height / screen_size["height"]

        # Convert coordinates
        screenshot_x = x * width_scale
        screenshot_y = y * height_scale

        return screenshot_x, screenshot_y

    # Websocket Methods
    async def _keep_alive(self):
        """Keep the WebSocket connection alive with automatic reconnection."""
        retry_count = 0
        max_log_attempts = 1  # Only log the first attempt at INFO level
        log_interval = 500  # Then log every 500th attempt (significantly increased from 30)
        last_warning_time = 0
        min_warning_interval = 30  # Minimum seconds between connection lost warnings
        min_retry_delay = 0.5  # Minimum delay between connection attempts (500ms)

        while not self._closed:
            try:
                if self._ws is None or (
                    self._ws and self._ws.state == websockets.protocol.State.CLOSED
                ):
                    try:
                        retry_count += 1

                        # Add a minimum delay between connection attempts to avoid flooding
                        if retry_count > 1:
                            await asyncio.sleep(min_retry_delay)

                        # Only log the first attempt at INFO level, then every Nth attempt
                        if retry_count == 1:
                            self.logger.info(f"Attempting WebSocket connection to {self.ws_uri}")
                        elif retry_count % log_interval == 0:
                            self.logger.info(
                                f"Still attempting WebSocket connection (attempt {retry_count})..."
                            )
                        else:
                            # All other attempts are logged at DEBUG level
                            self.logger.debug(
                                f"Attempting WebSocket connection to {self.ws_uri} (attempt {retry_count})"
                            )

                        self._ws = await asyncio.wait_for(
                            websockets.connect(
                                self.ws_uri,
                                max_size=1024 * 1024 * 10,  # 10MB limit
                                max_queue=32,
                                ping_interval=self._ping_interval,
                                ping_timeout=self._ping_timeout,
                                close_timeout=5,
                                compression=None,  # Disable compression to reduce overhead
                            ),
                            timeout=120,
                        )
                        self.logger.info("WebSocket connection established")
                        
                        # If api_key and vm_name are provided, perform authentication handshake
                        if self.api_key and self.vm_name:
                            self.logger.info("Performing authentication handshake...")
                            auth_message = {
                                "command": "authenticate",
                                "params": {
                                    "api_key": self.api_key,
                                    "container_name": self.vm_name
                                }
                            }
                            await self._ws.send(json.dumps(auth_message))
                            
                            # Wait for authentication response
                            async with self._recv_lock:
                                auth_response = await asyncio.wait_for(self._ws.recv(), timeout=10)
                            auth_result = json.loads(auth_response)
                            
                            if not auth_result.get("success"):
                                error_msg = auth_result.get("error", "Authentication failed")
                                self.logger.error(f"Authentication failed: {error_msg}")
                                await self._ws.close()
                                self._ws = None
                                raise ConnectionError(f"Authentication failed: {error_msg}")
                            
                            self.logger.info("Authentication successful")
                        
                        self._reconnect_delay = 1  # Reset reconnect delay on successful connection
                        self._last_ping = time.time()
                        retry_count = 0  # Reset retry count on successful connection
                    except (asyncio.TimeoutError, websockets.exceptions.WebSocketException) as e:
                        next_retry = self._reconnect_delay

                        # Only log the first error at WARNING level, then every Nth attempt
                        if retry_count == 1:
                            self.logger.warning(
                                f"Computer API Server not ready yet. Will retry automatically."
                            )
                        elif retry_count % log_interval == 0:
                            self.logger.warning(
                                f"Still waiting for Computer API Server (attempt {retry_count})..."
                            )
                        else:
                            # All other errors are logged at DEBUG level
                            self.logger.debug(f"Connection attempt {retry_count} failed: {e}")

                        if self._ws:
                            try:
                                await self._ws.close()
                            except:
                                pass
                        self._ws = None

                        # Use exponential backoff for connection retries
                        await asyncio.sleep(self._reconnect_delay)
                        self._reconnect_delay = min(
                            self._reconnect_delay * 2, self._max_reconnect_delay
                        )
                        continue

                # Regular ping to check connection
                if self._ws and self._ws.state == websockets.protocol.State.OPEN:
                    try:
                        if time.time() - self._last_ping >= self._ping_interval:
                            pong_waiter = await self._ws.ping()
                            await asyncio.wait_for(pong_waiter, timeout=self._ping_timeout)
                            self._last_ping = time.time()
                    except Exception as e:
                        self.logger.debug(f"Ping failed: {e}")
                        if self._ws:
                            try:
                                await self._ws.close()
                            except:
                                pass
                        self._ws = None
                        continue

                await asyncio.sleep(1)

            except Exception as e:
                current_time = time.time()
                # Only log connection lost warnings at most once every min_warning_interval seconds
                if current_time - last_warning_time >= min_warning_interval:
                    self.logger.warning(
                        f"Computer API Server connection lost. Will retry automatically."
                    )
                    last_warning_time = current_time
                else:
                    # Log at debug level instead
                    self.logger.debug(f"Connection lost: {e}")

                if self._ws:
                    try:
                        await self._ws.close()
                    except:
                        pass
                self._ws = None
    
    async def _ensure_connection(self):
        """Ensure WebSocket connection is established."""
        if self._reconnect_task is None or self._reconnect_task.done():
            self._reconnect_task = asyncio.create_task(self._keep_alive())

        retry_count = 0
        max_retries = 5

        while retry_count < max_retries:
            try:
                if self._ws and self._ws.state == websockets.protocol.State.OPEN:
                    return
                retry_count += 1
                await asyncio.sleep(1)
            except Exception as e:
                # Only log at ERROR level for the last retry attempt
                if retry_count == max_retries - 1:
                    self.logger.error(
                        f"Persistent connection check error after {retry_count} attempts: {e}"
                    )
                else:
                    self.logger.debug(f"Connection check error (attempt {retry_count}): {e}")
                retry_count += 1
                await asyncio.sleep(1)
                continue

        raise ConnectionError("Failed to establish WebSocket connection after multiple retries")

    async def _send_command_ws(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        """Send command through WebSocket."""
        max_retries = 3
        retry_count = 0
        last_error = None

        # Acquire lock to ensure only one command is processed at a time
        self.logger.debug(f"Acquired lock for command: {command}")
        while retry_count < max_retries:
            try:
                await self._ensure_connection()
                if not self._ws:
                    raise ConnectionError("WebSocket connection is not established")

                message = {"command": command, "params": params or {}}
                await self._ws.send(json.dumps(message))
                async with self._recv_lock:
                    response = await asyncio.wait_for(self._ws.recv(), timeout=120)
                self.logger.debug(f"Completed command: {command}")
                return json.loads(response)
            except Exception as e:
                last_error = e
                retry_count += 1
                if retry_count < max_retries:
                    # Only log at debug level for intermediate retries
                    self.logger.debug(
                        f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}"
                    )
                    await asyncio.sleep(1)
                    continue
                else:
                    # Only log at error level for the final failure
                    self.logger.error(
                        f"Failed to send command '{command}' after {max_retries} retries"
                    )
                    self.logger.debug(f"Command failure details: {e}")
                    raise

        raise last_error if last_error else RuntimeError("Failed to send command")

    async def _send_command_rest(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        """Send command through REST API without retries or connection management."""
        try:
            # Prepare the request payload
            payload = {"command": command, "params": params or {}}
            
            # Prepare headers
            headers = {"Content-Type": "application/json"}
            if self.api_key:
                headers["X-API-Key"] = self.api_key
            if self.vm_name:
                headers["X-Container-Name"] = self.vm_name
            
            # Send the request
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    self.rest_uri,
                    json=payload,
                    headers=headers
                ) as response:
                    # Get the response text
                    response_text = await response.text()
                    
                    # Trim whitespace
                    response_text = response_text.strip()
                    
                    # Check if it starts with "data: "
                    if response_text.startswith("data: "):
                        # Extract everything after "data: "
                        json_str = response_text[6:]  # Remove "data: " prefix
                        try:
                            return json.loads(json_str)
                        except json.JSONDecodeError:
                            return {
                                "success": False,
                                "error": "Server returned malformed response",
                                "message": response_text
                            }
                    else:
                        # Return error response
                        return {
                            "success": False,
                            "error": "Server returned malformed response",
                            "message": response_text
                        }
                        
        except Exception as e:
            return {
                "success": False,
                "error": "Request failed",
                "message": str(e)
            }

    async def _send_command(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        """Send command using REST API with WebSocket fallback."""
        # Try REST API first
        result = await self._send_command_rest(command, params)
        
        # If REST failed with "Request failed", try WebSocket as fallback
        if not result.get("success", True) and (result.get("error") == "Request failed" or result.get("error") == "Server returned malformed response"):
            self.logger.warning(f"REST API failed for command '{command}', trying WebSocket fallback")
            try:
                return await self._send_command_ws(command, params)
            except Exception as e:
                self.logger.error(f"WebSocket fallback also failed: {e}")
                # Return the original REST error
                return result
        
        return result

    async def wait_for_ready(self, timeout: int = 60, interval: float = 1.0):
        """Wait for Computer API Server to be ready by testing version command."""

        # Check if REST API is available
        try:
            result = await self._send_command_rest("version", {})
            assert result.get("success", True)
        except Exception as e:
            self.logger.debug(f"REST API failed for command 'version', trying WebSocket fallback: {e}")
            try:
                await self._wait_for_ready_ws(timeout, interval)
                return
            except Exception as e:
                self.logger.debug(f"WebSocket fallback also failed: {e}")
                raise e

        start_time = time.time()
        last_error = None
        attempt_count = 0
        progress_interval = 10  # Log progress every 10 seconds
        last_progress_time = start_time

        try:
            self.logger.info(
                f"Waiting for Computer API Server to be ready (timeout: {timeout}s)..."
            )

            # Wait for the server to respond to get_screen_size command
            while time.time() - start_time < timeout:
                try:
                    attempt_count += 1
                    current_time = time.time()

                    # Log progress periodically without flooding logs
                    if current_time - last_progress_time >= progress_interval:
                        elapsed = current_time - start_time
                        self.logger.info(
                            f"Still waiting for Computer API Server... (elapsed: {elapsed:.1f}s, attempts: {attempt_count})"
                        )
                        last_progress_time = current_time

                    # Test the server with a simple get_screen_size command
                    result = await self._send_command("get_screen_size")
                    if result.get("success", False):
                        elapsed = time.time() - start_time
                        self.logger.info(
                            f"Computer API Server is ready (after {elapsed:.1f}s, {attempt_count} attempts)"
                        )
                        return  # Server is ready
                    else:
                        last_error = result.get("error", "Unknown error")
                        self.logger.debug(f"Initial connection command failed: {last_error}")

                except Exception as e:
                    last_error = e
                    self.logger.debug(f"Connection attempt {attempt_count} failed: {e}")

                # Wait before trying again
                await asyncio.sleep(interval)

            # If we get here, we've timed out
            error_msg = f"Could not connect to {self.ip_address} after {timeout} seconds"
            if last_error:
                error_msg += f": {str(last_error)}"
            self.logger.error(error_msg)
            raise TimeoutError(error_msg)

        except Exception as e:
            if isinstance(e, TimeoutError):
                raise
            error_msg = f"Error while waiting for server: {str(e)}"
            self.logger.error(error_msg)
            raise RuntimeError(error_msg)

    async def _wait_for_ready_ws(self, timeout: int = 60, interval: float = 1.0):
        """Wait for WebSocket connection to become available."""
        start_time = time.time()
        last_error = None
        attempt_count = 0
        progress_interval = 10  # Log progress every 10 seconds
        last_progress_time = start_time

        # Disable detailed logging for connection attempts
        self._log_connection_attempts = False

        try:
            self.logger.info(
                f"Waiting for Computer API Server to be ready (timeout: {timeout}s)..."
            )

            # Start the keep-alive task if it's not already running
            if self._reconnect_task is None or self._reconnect_task.done():
                self._reconnect_task = asyncio.create_task(self._keep_alive())

            # Wait for the connection to be established
            while time.time() - start_time < timeout:
                try:
                    attempt_count += 1
                    current_time = time.time()

                    # Log progress periodically without flooding logs
                    if current_time - last_progress_time >= progress_interval:
                        elapsed = current_time - start_time
                        self.logger.info(
                            f"Still waiting for Computer API Server... (elapsed: {elapsed:.1f}s, attempts: {attempt_count})"
                        )
                        last_progress_time = current_time

                    # Check if we have a connection
                    if self._ws and self._ws.state == websockets.protocol.State.OPEN:
                        # Test the connection with a simple command
                        try:
                            await self._send_command_ws("get_screen_size")
                            elapsed = time.time() - start_time
                            self.logger.info(
                                f"Computer API Server is ready (after {elapsed:.1f}s, {attempt_count} attempts)"
                            )
                            return  # Connection is fully working
                        except Exception as e:
                            last_error = e
                            self.logger.debug(f"Connection test failed: {e}")

                    # Wait before trying again
                    await asyncio.sleep(interval)

                except Exception as e:
                    last_error = e
                    self.logger.debug(f"Connection attempt {attempt_count} failed: {e}")
                    await asyncio.sleep(interval)

            # If we get here, we've timed out
            error_msg = f"Could not connect to {self.ip_address} after {timeout} seconds"
            if last_error:
                error_msg += f": {str(last_error)}"
            self.logger.error(error_msg)
            raise TimeoutError(error_msg)
        finally:
            # Reset to default logging behavior
            self._log_connection_attempts = False

    def close(self):
        """Close WebSocket connection.

        Note: In host computer server mode, we leave the connection open
        to allow other clients to connect to the same server. The server
        will handle cleaning up idle connections.
        """
        # Only cancel the reconnect task
        if self._reconnect_task:
            self._reconnect_task.cancel()

        # Don't set closed flag or close websocket by default
        # This allows the server to stay connected for other clients
        # self._closed = True
        # if self._ws:
        #     asyncio.create_task(self._ws.close())
        #     self._ws = None
    
    def force_close(self):
        """Force close the WebSocket connection.

        This method should be called when you want to completely
        shut down the connection, not just for regular cleanup.
        """
        self._closed = True
        if self._reconnect_task:
            self._reconnect_task.cancel()
        if self._ws:
            asyncio.create_task(self._ws.close())
            self._ws = None


```

--------------------------------------------------------------------------------
/libs/python/computer/computer/providers/lumier/provider.py:
--------------------------------------------------------------------------------

```python
"""
Lumier VM provider implementation.

This provider uses Docker containers running the Lumier image to create
macOS and Linux VMs. It handles VM lifecycle operations through Docker
commands and container management.
"""

import logging
import os
import json
import asyncio
from typing import Dict, List, Optional, Any
import subprocess
import time
import re

from ..base import BaseVMProvider, VMProviderType
from ..lume_api import (
    lume_api_get,
    lume_api_run,
    lume_api_stop,
    lume_api_update
)

# Setup logging
logger = logging.getLogger(__name__)

# Check if Docker is available
try:
    subprocess.run(["docker", "--version"], capture_output=True, check=True)
    HAS_LUMIER = True
except (subprocess.SubprocessError, FileNotFoundError):
    HAS_LUMIER = False


class LumierProvider(BaseVMProvider):
    """
    Lumier VM Provider implementation using Docker containers.
    
    This provider uses Docker to run Lumier containers that can create
    macOS and Linux VMs through containerization.
    """
    
    def __init__(
        self, 
        port: Optional[int] = 7777,
        host: str = "localhost",
        storage: Optional[str] = None,  # Can be a path or 'ephemeral'
        shared_path: Optional[str] = None,
        image: str = "macos-sequoia-cua:latest",  # VM image to use
        verbose: bool = False,
        ephemeral: bool = False,
        noVNC_port: Optional[int] = 8006,
    ):
        """Initialize the Lumier VM Provider.
        
        Args:
            port: Port for the API server (default: 7777)
            host: Hostname for the API server (default: localhost)
            storage: Path for persistent VM storage
            shared_path: Path for shared folder between host and VM
            image: VM image to use (e.g. "macos-sequoia-cua:latest")
            verbose: Enable verbose logging
            ephemeral: Use ephemeral (temporary) storage
            noVNC_port: Specific port for noVNC interface (default: 8006)
        """
        self.host = host
        # Always ensure api_port has a valid value (7777 is the default)
        self.api_port = 7777 if port is None else port
        self.vnc_port = noVNC_port  # User-specified noVNC port, will be set in run_vm if provided
        self.ephemeral = ephemeral
        
        # Handle ephemeral storage (temporary directory)
        if ephemeral:
            self.storage = "ephemeral"
        else:
            self.storage = storage
            
        self.shared_path = shared_path
        self.image = image  # Store the VM image name to use
        # The container_name will be set in run_vm using the VM name
        self.verbose = verbose
        self._container_id = None
        self._api_url = None  # Will be set after container starts
        
    @property
    def provider_type(self) -> VMProviderType:
        """Return the provider type."""
        return VMProviderType.LUMIER
    
    def _parse_memory(self, memory_str: str) -> int:
        """Parse memory string to MB integer.
        
        Examples:
            "8GB" -> 8192
            "1024MB" -> 1024
            "512" -> 512
        """
        if isinstance(memory_str, int):
            return memory_str
            
        if isinstance(memory_str, str):
            # Extract number and unit
            match = re.match(r"(\d+)([A-Za-z]*)", memory_str)
            if match:
                value, unit = match.groups()
                value = int(value)
                unit = unit.upper()
                
                if unit == "GB" or unit == "G":
                    return value * 1024
                elif unit == "MB" or unit == "M" or unit == "":
                    return value
                    
        # Default fallback
        logger.warning(f"Could not parse memory string '{memory_str}', using 8GB default")
        return 8192  # Default to 8GB
    
    # Helper methods for interacting with the Lumier API through curl
    # These methods handle the various VM operations via API calls
    
    def _get_curl_error_message(self, return_code: int) -> str:
        """Get a descriptive error message for curl return codes.
        
        Args:
            return_code: The curl return code
            
        Returns:
            A descriptive error message
        """
        # Map common curl error codes to helpful messages
        if return_code == 7:
            return "Failed to connect - API server is starting up"
        elif return_code == 22:
            return "HTTP error returned from API server"
        elif return_code == 28:
            return "Operation timeout - API server is slow to respond"
        elif return_code == 52:
            return "Empty reply from server - API is starting but not ready"
        elif return_code == 56:
            return "Network problem during data transfer"
        else:
            return f"Unknown curl error code: {return_code}"

    
    async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Get VM information by name.
        
        Args:
            name: Name of the VM to get information for
            storage: Optional storage path override. If provided, this will be used
                    instead of the provider's default storage path.
            
        Returns:
            Dictionary with VM information including status, IP address, etc.
        """
        if not HAS_LUMIER:
            logger.error("Docker is not available. Cannot get VM status.")
            return {
                "name": name,
                "status": "unavailable",
                "error": "Docker is not available"
            }
            
        # Store the current name for API requests
        self.container_name = name
        
        try:
            # Check if the container exists and is running
            check_cmd = ["docker", "ps", "-a", "--filter", f"name={name}", "--format", "{{.Status}}"]
            check_result = subprocess.run(check_cmd, capture_output=True, text=True)
            container_status = check_result.stdout.strip()
            
            if not container_status:
                logger.info(f"Container {name} does not exist. Will create when run_vm is called.")
                return {
                    "name": name,
                    "status": "not_found",
                    "message": "Container doesn't exist yet"
                }
                
            # Container exists, check if it's running
            is_running = container_status.startswith("Up")
            
            if not is_running:
                logger.info(f"Container {name} exists but is not running. Status: {container_status}")
                return {
                    "name": name,
                    "status": "stopped",
                    "container_status": container_status,
                }
                
            # Container is running, get the IP address and API status from Lumier API
            logger.info(f"Container {name} is running. Getting VM status from API.")
            
            # Use the shared lume_api_get function directly
            vm_info = lume_api_get(
                vm_name=name,
                host=self.host,
                port=self.api_port,
                storage=storage if storage is not None else self.storage,
                debug=self.verbose,
                verbose=self.verbose
            )
            
            # Check for API errors
            if "error" in vm_info:
                # Use debug level instead of warning to reduce log noise during polling
                logger.debug(f"API request error: {vm_info['error']}")
                return {
                    "name": name,
                    "status": "running",  # Container is running even if API is not responsive
                    "api_status": "error",
                    "error": vm_info["error"],
                    "container_status": container_status
                }
                
            # Process the VM status information
            vm_status = vm_info.get("status", "unknown")
            vnc_url = vm_info.get("vncUrl", "")
            ip_address = vm_info.get("ipAddress", "")
            
            # IMPORTANT: Always ensure we have a valid IP address for connectivity
            # If the API doesn't return an IP address, default to localhost (127.0.0.1)
            # This makes the behavior consistent with LumeProvider
            if not ip_address and vm_status == "running":
                ip_address = "127.0.0.1"
                logger.info(f"No IP address returned from API, defaulting to {ip_address}")
                vm_info["ipAddress"] = ip_address
            
            logger.info(f"VM {name} status: {vm_status}")
            
            if ip_address and vnc_url:
                logger.info(f"VM {name} has IP: {ip_address} and VNC URL: {vnc_url}")
            elif not ip_address and not vnc_url and vm_status != "running":
                # Not running is expected in this case
                logger.info(f"VM {name} is not running yet. Status: {vm_status}")
            else:
                # Missing IP or VNC but status is running - this is unusual but handled with default IP
                logger.warning(f"VM {name} is running but missing expected fields. API response: {vm_info}")
            
            # Return the full status information
            return {
                "name": name,
                "status": vm_status,
                "ip_address": ip_address,
                "vnc_url": vnc_url,
                "api_status": "ok",
                "container_status": container_status,
                **vm_info  # Include all fields from the API response
            }
        except subprocess.SubprocessError as e:
            logger.error(f"Failed to check container status: {e}")
            return {
                "name": name,
                "status": "error",
                "error": f"Failed to check container status: {str(e)}"
            }
    
    async def list_vms(self) -> List[Dict[str, Any]]:
        """List all VMs managed by this provider.
        
        For Lumier provider, there is only one VM per container.
        """
        try:
            status = await self.get_vm("default")
            return [status] if status.get("status") != "unknown" else []
        except Exception as e:
            logger.error(f"Failed to list VMs: {e}")
            return []
    
    async def run_vm(self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
        """Run a VM with the given options.
        
        Args:
            image: Name/tag of the image to use
            name: Name of the VM to run (used for the container name and Docker image tag)
            run_opts: Options for running the VM, including:
                - cpu: Number of CPU cores
                - memory: Amount of memory (e.g. "8GB")
                - noVNC_port: Specific port for noVNC interface
        
        Returns:
            Dictionary with VM status information
        """
        # Set the container name using the VM name for consistency
        self.container_name = name
        try:
            # First, check if container already exists and remove it
            try:
                check_cmd = ["docker", "ps", "-a", "--filter", f"name={self.container_name}", "--format", "{{.ID}}"]
                check_result = subprocess.run(check_cmd, capture_output=True, text=True)
                existing_container = check_result.stdout.strip()
                
                if existing_container:
                    logger.info(f"Removing existing container: {self.container_name}")
                    remove_cmd = ["docker", "rm", "-f", self.container_name]
                    subprocess.run(remove_cmd, check=True)
            except subprocess.CalledProcessError as e:
                logger.warning(f"Error removing existing container: {e}")
                # Continue anyway, next steps will fail if there's a real problem
            
            # Prepare the Docker run command
            cmd = ["docker", "run", "-d", "--name", self.container_name]
            
            cmd.extend(["-p", f"{self.vnc_port}:8006"])
            logger.debug(f"Using specified noVNC_port: {self.vnc_port}")
                
            # Set API URL using the API port
            self._api_url = f"http://{self.host}:{self.api_port}"
            
            # Parse memory setting
            memory_mb = self._parse_memory(run_opts.get("memory", "8GB"))
            
            # Add storage volume mount if storage is specified (for persistent VM storage)
            if self.storage and self.storage != "ephemeral":
                # Create storage directory if it doesn't exist
                storage_dir = os.path.abspath(os.path.expanduser(self.storage or ""))
                os.makedirs(storage_dir, exist_ok=True)
                
                # Add volume mount for storage
                cmd.extend([
                    "-v", f"{storage_dir}:/storage", 
                    "-e", f"HOST_STORAGE_PATH={storage_dir}"
                ])
                logger.debug(f"Using persistent storage at: {storage_dir}")
            
            # Add shared folder volume mount if shared_path is specified
            if self.shared_path:
                # Create shared directory if it doesn't exist
                shared_dir = os.path.abspath(os.path.expanduser(self.shared_path or ""))
                os.makedirs(shared_dir, exist_ok=True)
                
                # Add volume mount for shared folder
                cmd.extend([
                    "-v", f"{shared_dir}:/shared",
                    "-e", f"HOST_SHARED_PATH={shared_dir}"
                ])
                logger.debug(f"Using shared folder at: {shared_dir}")
            
            # Add environment variables
            # Always use the container_name as the VM_NAME for consistency
            # Use the VM image passed from the Computer class
            logger.debug(f"Using VM image: {self.image}")
            
            # If ghcr.io is in the image, use the full image name
            if "ghcr.io" in self.image:
                vm_image = self.image
            else:
                vm_image = f"ghcr.io/trycua/{self.image}"

            cmd.extend([
                "-e", f"VM_NAME={self.container_name}",
                "-e", f"VERSION={vm_image}",
                "-e", f"CPU_CORES={run_opts.get('cpu', '4')}",
                "-e", f"RAM_SIZE={memory_mb}",
            ])
            
            # Specify the Lumier image with the full image name
            lumier_image = "trycua/lumier:latest"
            
            # First check if the image exists locally
            try:
                logger.debug(f"Checking if Docker image {lumier_image} exists locally...")
                check_image_cmd = ["docker", "image", "inspect", lumier_image]
                subprocess.run(check_image_cmd, capture_output=True, check=True)
                logger.debug(f"Docker image {lumier_image} found locally.")
            except subprocess.CalledProcessError:
                # Image doesn't exist locally
                logger.warning(f"\nWARNING: Docker image {lumier_image} not found locally.")
                logger.warning("The system will attempt to pull it from Docker Hub, which may fail if you have network connectivity issues.")
                logger.warning("If the Docker pull fails, you may need to manually pull the image first with:")
                logger.warning(f"  docker pull {lumier_image}\n")
            
            # Add the image to the command
            cmd.append(lumier_image)
            
            # Print the Docker command for debugging
            logger.debug(f"DOCKER COMMAND: {' '.join(cmd)}")
            
            # Run the container with improved error handling
            try:
                result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            except subprocess.CalledProcessError as e:
                if "no route to host" in str(e.stderr).lower() or "failed to resolve reference" in str(e.stderr).lower():
                    error_msg = (f"Network error while trying to pull Docker image '{lumier_image}'\n"
                                f"Error: {e.stderr}\n\n"
                                f"SOLUTION: Please try one of the following:\n"
                                f"1. Check your internet connection\n"
                                f"2. Pull the image manually with: docker pull {lumier_image}\n"
                                f"3. Check if Docker is running properly\n")
                    logger.error(error_msg)
                    raise RuntimeError(error_msg)
                raise
            
            # Container started, now check VM status with polling
            logger.debug("Container started, checking VM status...")
            logger.debug("NOTE: This may take some time while the VM image is being pulled and initialized")
            
            # Start a background thread to show container logs in real-time
            import threading
            
            def show_container_logs():
                # Give the container a moment to start generating logs
                time.sleep(1)
                logger.debug(f"\n---- CONTAINER LOGS FOR '{name}' (LIVE) ----")
                logger.debug("Showing logs as they are generated. Press Ctrl+C to stop viewing logs...\n")
                
                try:
                    # Use docker logs with follow option
                    log_cmd = ["docker", "logs", "--tail", "30", "--follow", name]
                    process = subprocess.Popen(log_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, 
                                              text=True, bufsize=1, universal_newlines=True)
                    
                    # Read and print logs line by line
                    for line in process.stdout:
                        logger.debug(line, end='')
                        
                        # Break if process has exited
                        if process.poll() is not None:
                            break
                except Exception as e:
                    logger.error(f"\nError showing container logs: {e}")
                    if self.verbose:
                        logger.error(f"Error in log streaming thread: {e}")
                finally:
                    logger.debug("\n---- LOG STREAMING ENDED ----")
                    # Make sure process is terminated
                    if 'process' in locals() and process.poll() is None:
                        process.terminate()
            
            # Start log streaming in a background thread if verbose mode is enabled
            log_thread = threading.Thread(target=show_container_logs)
            log_thread.daemon = True  # Thread will exit when main program exits
            log_thread.start()
            
            # Skip waiting for container readiness and just poll get_vm directly
            # Poll the get_vm method indefinitely until the VM is ready with an IP address
            attempt = 0
            consecutive_errors = 0
            vm_running = False
            
            while True:  # Wait indefinitely
                try:
                    # Use longer delays to give the system time to initialize
                    if attempt > 0:
                        # Start with 5s delay, then increase gradually up to 30s for later attempts
                        # But use shorter delays while we're getting API errors
                        if consecutive_errors > 0 and consecutive_errors < 5:
                            wait_time = 3  # Use shorter delays when we're getting API errors
                        else:  
                            wait_time = min(30, 5 + (attempt * 2))
                        
                        logger.debug(f"Waiting {wait_time}s before retry #{attempt+1}...")
                        await asyncio.sleep(wait_time)
                    
                    # Try to get VM status
                    logger.debug(f"Checking VM status (attempt {attempt+1})...")
                    vm_status = await self.get_vm(name)
                    
                    # Check for API errors
                    if 'error' in vm_status:
                        consecutive_errors += 1
                        error_msg = vm_status.get('error', 'Unknown error')
                        
                        # Only print a user-friendly status message, not the raw error
                        # since _lume_api_get already logged the technical details
                        if consecutive_errors == 1 or attempt % 5 == 0:
                            if 'Empty reply from server' in error_msg:
                                logger.info("API server is starting up - container is running, but API isn't fully initialized yet.")
                                logger.info("This is expected during the initial VM setup - will continue polling...")
                            else:
                                # Don't repeat the exact same error message each time
                                logger.warning(f"API request error (attempt {attempt+1}): {error_msg}")
                                # Just log that we're still working on it
                                if attempt > 3:
                                    logger.debug("Still waiting for the API server to become available...")
                            
                        # If we're getting errors but container is running, that's normal during startup
                        if vm_status.get('status') == 'running':
                            if not vm_running:
                                logger.info("Container is running, waiting for the VM within it to become fully ready...")
                                logger.info("This might take a minute while the VM initializes...")
                                vm_running = True
                        
                        # Increase counter and continue
                        attempt += 1
                        continue
                    
                    # Reset consecutive error counter when we get a successful response
                    consecutive_errors = 0
                    
                    # If the VM is running, check if it has an IP address (which means it's fully ready)
                    if vm_status.get('status') == 'running':
                        vm_running = True
                        
                        # Check if we have an IP address, which means the VM is fully ready
                        if 'ip_address' in vm_status and vm_status['ip_address']:
                            logger.info(f"VM is now fully running with IP: {vm_status.get('ip_address')}")
                            if 'vnc_url' in vm_status and vm_status['vnc_url']:
                                logger.info(f"VNC URL: {vm_status.get('vnc_url')}")
                            return vm_status
                        else:
                            logger.debug("VM is running but still initializing network interfaces...")
                            logger.debug("Waiting for IP address to be assigned...")
                    else:
                        # VM exists but might still be starting up
                        status = vm_status.get('status', 'unknown')
                        logger.debug(f"VM found but status is: {status}. Continuing to poll...")
                    
                    # Increase counter for next iteration's delay calculation
                    attempt += 1
                    
                    # If we reach a very large number of attempts, give a reassuring message but continue
                    if attempt % 10 == 0:
                        logger.debug(f"Still waiting after {attempt} attempts. This might take several minutes for first-time setup.")
                        if not vm_running and attempt >= 20:
                            logger.warning("\nNOTE: First-time VM initialization can be slow as images are downloaded.")
                            logger.warning("If this continues for more than 10 minutes, you may want to check:")
                            logger.warning("  1. Docker logs with: docker logs " + name)
                            logger.warning("  2. If your network can access container registries")
                            logger.warning("Press Ctrl+C to abort if needed.\n")
                            
                    # After 150 attempts (likely over 30-40 minutes), return current status
                    if attempt >= 150:
                        logger.debug(f"Reached 150 polling attempts. VM status is: {vm_status.get('status', 'unknown')}")
                        logger.debug("Returning current VM status, but please check Docker logs if there are issues.")
                        return vm_status
                    
                except Exception as e:
                    # Always continue retrying, but with increasing delays
                    logger.warning(f"Error checking VM status (attempt {attempt+1}): {e}. Will retry.")
                    consecutive_errors += 1
                    
                    # If we've had too many consecutive errors, might be a deeper problem
                    if consecutive_errors >= 10:
                        logger.warning(f"\nWARNING: Encountered {consecutive_errors} consecutive errors while checking VM status.")
                        logger.warning("You may need to check the Docker container logs or restart the process.")
                        logger.warning(f"Error details: {str(e)}\n")
                        
                    # Increase attempt counter for next iteration
                    attempt += 1
                    
                    # After many consecutive errors, add a delay to avoid hammering the system
                    if attempt > 5:
                        error_delay = min(30, 10 + attempt)
                        logger.warning(f"Multiple connection errors, waiting {error_delay}s before next attempt...")
                        await asyncio.sleep(error_delay)
        
        except subprocess.CalledProcessError as e:
            error_msg = f"Failed to start Lumier container: {e.stderr if hasattr(e, 'stderr') else str(e)}"
            logger.error(error_msg)
            raise RuntimeError(error_msg)
        
    async def _wait_for_container_ready(self, container_name: str, timeout: int = 90) -> bool:
        """Wait for the Lumier container to be fully ready with a valid API response.
        
        Args:
            container_name: Name of the Docker container to check
            timeout: Maximum time to wait in seconds (default: 90 seconds)
            
        Returns:
            True if the container is running, even if API is not fully ready.
            This allows operations to continue with appropriate fallbacks.
        """
        start_time = time.time()
        api_ready = False
        container_running = False
        
        logger.debug(f"Waiting for container {container_name} to be ready (timeout: {timeout}s)...")
        
        while time.time() - start_time < timeout:
            # Check if container is running
            try:
                check_cmd = ["docker", "ps", "--filter", f"name={container_name}", "--format", "{{.Status}}"]
                result = subprocess.run(check_cmd, capture_output=True, text=True, check=True)
                container_status = result.stdout.strip()
                
                if container_status and container_status.startswith("Up"):
                    container_running = True
                    logger.info(f"Container {container_name} is running with status: {container_status}")
                else:
                    logger.warning(f"Container {container_name} not yet running, status: {container_status}")
                    # container is not running yet, wait and try again
                    await asyncio.sleep(2)  # Longer sleep to give Docker time
                    continue
            except subprocess.CalledProcessError as e:
                logger.warning(f"Error checking container status: {e}")
                await asyncio.sleep(2)
                continue
                
            # Container is running, check if API is responsive
            try:
                # First check the health endpoint
                api_url = f"http://{self.host}:{self.api_port}/health"
                logger.info(f"Checking API health at: {api_url}")
                
                # Use longer timeout for API health check since it may still be initializing
                curl_cmd = ["curl", "-s", "--connect-timeout", "5", "--max-time", "10", api_url]
                result = subprocess.run(curl_cmd, capture_output=True, text=True)
                
                if result.returncode == 0 and "ok" in result.stdout.lower():
                    api_ready = True
                    logger.info(f"API is ready at {api_url}")
                    break
                else:
                    # API health check failed, now let's check if the VM status endpoint is responsive
                    # This covers cases where the health endpoint isn't implemented but the VM API is working
                    vm_api_url = f"http://{self.host}:{self.api_port}/lume/vms/{container_name}"
                    if self.storage:
                        import urllib.parse
                        encoded_storage = urllib.parse.quote_plus(self.storage)
                        vm_api_url += f"?storage={encoded_storage}"
                        
                    curl_vm_cmd = ["curl", "-s", "--connect-timeout", "5", "--max-time", "10", vm_api_url]
                    vm_result = subprocess.run(curl_vm_cmd, capture_output=True, text=True)
                    
                    if vm_result.returncode == 0 and vm_result.stdout.strip():
                        # VM API responded with something - consider the API ready
                        api_ready = True
                        logger.info(f"VM API is ready at {vm_api_url}")
                        break
                    else:
                        curl_code = result.returncode
                        if curl_code == 0:
                            curl_code = vm_result.returncode
                            
                        # Map common curl error codes to helpful messages
                        if curl_code == 7:
                            curl_error = "Failed to connect - API server is starting up"
                        elif curl_code == 22:
                            curl_error = "HTTP error returned from API server"
                        elif curl_code == 28:
                            curl_error = "Operation timeout - API server is slow to respond"
                        elif curl_code == 52:
                            curl_error = "Empty reply from server - API is starting but not ready"
                        elif curl_code == 56:
                            curl_error = "Network problem during data transfer"
                        else:
                            curl_error = f"Unknown curl error code: {curl_code}"
                            
                        logger.info(f"API not ready yet: {curl_error}")
            except subprocess.SubprocessError as e:
                logger.warning(f"Error checking API status: {e}")
                
            # If the container is running but API is not ready, that's OK - we'll just wait
            # a bit longer before checking again, as the container may still be initializing
            elapsed_seconds = time.time() - start_time
            if int(elapsed_seconds) % 5 == 0:  # Only print status every 5 seconds to reduce verbosity
                logger.debug(f"Waiting for API to initialize... ({elapsed_seconds:.1f}s / {timeout}s)")
            
            await asyncio.sleep(3)  # Longer sleep between API checks
        
        # Handle timeout - if the container is running but API is not ready, that's not
        # necessarily an error - the API might just need more time to start up
        if not container_running:
            logger.warning(f"Timed out waiting for container {container_name} to start")
            return False
        
        if not api_ready:
            logger.warning(f"Container {container_name} is running, but API is not fully ready yet.")
            logger.warning(f"NOTE: You may see some 'API request failed' messages while the API initializes.")
        
        # Return True if container is running, even if API isn't ready yet
        # This allows VM operations to proceed, with appropriate retries for API calls
        return container_running

    async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Stop a running VM by stopping the Lumier container."""
        try:
            # Use Docker commands to stop the container directly
            if hasattr(self, '_container_id') and self._container_id:
                logger.info(f"Stopping Lumier container: {self.container_name}")
                cmd = ["docker", "stop", self.container_name]
                result = subprocess.run(cmd, capture_output=True, text=True, check=True)
                logger.info(f"Container stopped: {result.stdout.strip()}")
                
                # Return minimal status info
                return {
                    "name": name,
                    "status": "stopped",
                    "container_id": self._container_id,
                }
            else:
                # Try to find the container by name
                check_cmd = ["docker", "ps", "-a", "--filter", f"name={self.container_name}", "--format", "{{.ID}}"]
                check_result = subprocess.run(check_cmd, capture_output=True, text=True)
                container_id = check_result.stdout.strip()
                
                if container_id:
                    logger.info(f"Found container ID: {container_id}")
                    cmd = ["docker", "stop", self.container_name]
                    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
                    logger.info(f"Container stopped: {result.stdout.strip()}")
                    
                    return {
                        "name": name,
                        "status": "stopped",
                        "container_id": container_id,
                    }
                else:
                    logger.warning(f"No container found with name {self.container_name}")
                    return {
                        "name": name,
                        "status": "unknown",
                    }
        except subprocess.CalledProcessError as e:
            error_msg = f"Failed to stop container: {e.stderr if hasattr(e, 'stderr') else str(e)}"
            logger.error(error_msg)
            raise RuntimeError(f"Failed to stop Lumier container: {error_msg}")
            
    # update_vm is not implemented as it's not needed for Lumier
    # The BaseVMProvider requires it, so we provide a minimal implementation
    async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
        """Not implemented for Lumier provider."""
        logger.warning("update_vm is not implemented for Lumier provider")
        return {"name": name, "status": "unchanged"}
        
    async def get_logs(self, name: str, num_lines: int = 100, follow: bool = False, timeout: Optional[int] = None) -> str:
        """Get the logs from the Lumier container.
        
        Args:
            name: Name of the VM/container to get logs for
            num_lines: Number of recent log lines to return (default: 100)
            follow: If True, follow the logs (stream new logs as they are generated)
            timeout: Optional timeout in seconds for follow mode (None means no timeout)
            
        Returns:
            Container logs as a string
            
        Note:
            If follow=True, this function will continuously stream logs until timeout
            or until interrupted. The output will be printed to console in real-time.
        """
        if not HAS_LUMIER:
            error_msg = "Docker is not available. Cannot get container logs."
            logger.error(error_msg)
            return error_msg
        
        # Make sure we have a container name
        container_name = name
        
        # Check if the container exists and is running
        try:
            # Check if the container exists
            inspect_cmd = ["docker", "container", "inspect", container_name]
            result = subprocess.run(inspect_cmd, capture_output=True, text=True)
            
            if result.returncode != 0:
                error_msg = f"Container '{container_name}' does not exist or is not accessible"
                logger.error(error_msg)
                return error_msg
        except Exception as e:
            error_msg = f"Error checking container status: {str(e)}"
            logger.error(error_msg)
            return error_msg
        
        # Base docker logs command
        log_cmd = ["docker", "logs"]
        
        # Add tail parameter to limit the number of lines
        log_cmd.extend(["--tail", str(num_lines)])
        
        # Handle follow mode with or without timeout
        if follow:
            log_cmd.append("--follow")
            
            if timeout is not None:
                # For follow mode with timeout, we'll run the command and handle the timeout
                log_cmd.append(container_name)
                logger.info(f"Following logs for container '{container_name}' with timeout {timeout}s")
                logger.info(f"\n---- CONTAINER LOGS FOR '{container_name}' (LIVE) ----")
                logger.info(f"Press Ctrl+C to stop following logs\n")
                
                try:
                    # Run with timeout
                    process = subprocess.Popen(log_cmd, text=True)
                    
                    # Wait for the specified timeout
                    if timeout:
                        try:
                            process.wait(timeout=timeout)
                        except subprocess.TimeoutExpired:
                            process.terminate()  # Stop after timeout
                            logger.info(f"\n---- LOG FOLLOWING STOPPED (timeout {timeout}s reached) ----")
                    else:
                        # Without timeout, wait for user interruption
                        process.wait()
                        
                    return "Logs were displayed to console in follow mode"
                except KeyboardInterrupt:
                    process.terminate()
                    logger.info("\n---- LOG FOLLOWING STOPPED (user interrupted) ----")
                    return "Logs were displayed to console in follow mode (interrupted)"
            else:
                # For follow mode without timeout, we'll print a helpful message
                log_cmd.append(container_name)
                logger.info(f"Following logs for container '{container_name}' indefinitely")
                logger.info(f"\n---- CONTAINER LOGS FOR '{container_name}' (LIVE) ----")
                logger.info(f"Press Ctrl+C to stop following logs\n")
                
                try:
                    # Run the command and let it run until interrupted
                    process = subprocess.Popen(log_cmd, text=True)
                    process.wait()  # Wait indefinitely (until user interrupts)
                    return "Logs were displayed to console in follow mode"
                except KeyboardInterrupt:
                    process.terminate()
                    logger.info("\n---- LOG FOLLOWING STOPPED (user interrupted) ----")
                    return "Logs were displayed to console in follow mode (interrupted)"
        else:
            # For non-follow mode, capture and return the logs as a string
            log_cmd.append(container_name)
            logger.info(f"Getting {num_lines} log lines for container '{container_name}'")
            
            try:
                result = subprocess.run(log_cmd, capture_output=True, text=True, check=True)
                logs = result.stdout
                
                # Only print header and logs if there's content
                if logs.strip():
                    logger.info(f"\n---- CONTAINER LOGS FOR '{container_name}' (LAST {num_lines} LINES) ----\n")
                    logger.info(logs)
                    logger.info(f"\n---- END OF LOGS ----")
                else:
                    logger.info(f"\nNo logs available for container '{container_name}'")
                    
                return logs
            except subprocess.CalledProcessError as e:
                error_msg = f"Error getting logs: {e.stderr}"
                logger.error(error_msg)
                return error_msg
            except Exception as e:
                error_msg = f"Unexpected error getting logs: {str(e)}"
                logger.error(error_msg)
                return error_msg
    
    async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        raise NotImplementedError("LumierProvider does not support restarting VMs.")

    async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str:
        """Get the IP address of a VM, waiting indefinitely until it's available.
        
        Args:
            name: Name of the VM to get the IP for
            storage: Optional storage path override
            retry_delay: Delay between retries in seconds (default: 2)
            
        Returns:
            IP address of the VM when it becomes available
        """
        # Use container_name = name for consistency
        self.container_name = name
        
        # Track total attempts for logging purposes
        total_attempts = 0
        
        # Loop indefinitely until we get a valid IP
        while True:
            total_attempts += 1
            
            # Log retry message but not on first attempt
            if total_attempts > 1:
                logger.info(f"Waiting for VM {name} IP address (attempt {total_attempts})...")
            
            try:
                # Get VM information
                vm_info = await self.get_vm(name, storage=storage)
                
                # Check if we got a valid IP
                ip = vm_info.get("ip_address", None)
                if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
                    logger.info(f"Got valid VM IP address: {ip}")
                    return ip
                    
                # Check the VM status
                status = vm_info.get("status", "unknown")
                
                # Special handling for Lumier: it may report "stopped" even when the VM is starting
                # If the VM information contains an IP but status is stopped, it might be a race condition
                if status == "stopped" and "ip_address" in vm_info:
                    ip = vm_info.get("ip_address")
                    if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
                        logger.info(f"Found valid IP {ip} despite VM status being {status}")
                        return ip
                    logger.info(f"VM status is {status}, but still waiting for IP to be assigned")
                # If VM is not running yet, log and wait
                elif status != "running":
                    logger.info(f"VM is not running yet (status: {status}). Waiting...")
                # If VM is running but no IP yet, wait and retry
                else:
                    logger.info("VM is running but no valid IP address yet. Waiting...")
                
            except Exception as e:
                logger.warning(f"Error getting VM {name} IP: {e}, continuing to wait...")
                
            # Wait before next retry
            await asyncio.sleep(retry_delay)
            
            # Add progress log every 10 attempts
            if total_attempts % 10 == 0:
                logger.info(f"Still waiting for VM {name} IP after {total_attempts} attempts...")
    
    async def __aenter__(self):
        """Async context manager entry.
        
        This method is called when entering an async context manager block.
        Returns self to be used in the context.
        """
        logger.debug("Entering LumierProvider context")
        
        # Initialize the API URL with the default value if not already set
        # This ensures get_vm can work before run_vm is called
        if not hasattr(self, '_api_url') or not self._api_url:
            self._api_url = f"http://{self.host}:{self.api_port}"
            logger.info(f"Initialized default Lumier API URL: {self._api_url}")
            
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit.
        
        This method is called when exiting an async context manager block.
        It handles proper cleanup of resources, including stopping any running containers.
        """
        logger.debug(f"Exiting LumierProvider context, handling exceptions: {exc_type}")
        try:
            # If we have a container ID, we should stop it to clean up resources
            if hasattr(self, '_container_id') and self._container_id:
                logger.info(f"Stopping Lumier container on context exit: {self.container_name}")
                try:
                    cmd = ["docker", "stop", self.container_name]
                    subprocess.run(cmd, capture_output=True, text=True, check=True)
                    logger.info(f"Container stopped during context exit: {self.container_name}")
                except subprocess.CalledProcessError as e:
                    logger.warning(f"Failed to stop container during cleanup: {e.stderr}")
                    # Don't raise an exception here, we want to continue with cleanup
        except Exception as e:
            logger.error(f"Error during LumierProvider cleanup: {e}")
            # We don't want to suppress the original exception if there was one
            if exc_type is None:
                raise
        # Return False to indicate that any exception should propagate
        return False

```

--------------------------------------------------------------------------------
/libs/python/computer/computer/computer.py:
--------------------------------------------------------------------------------

```python
import traceback
from typing import Optional, List, Literal, Dict, Any, Union, TYPE_CHECKING, cast
import asyncio
from .models import Computer as ComputerConfig, Display
from .interface.factory import InterfaceFactory
import time
from PIL import Image
import io
import re
from .logger import Logger, LogLevel
import json
import logging
from core.telemetry import is_telemetry_enabled, record_event
import os
from . import helpers

import platform

SYSTEM_INFO = {
    "os": platform.system().lower(),
    "os_version": platform.release(),
    "python_version": platform.python_version(),
}

# Import provider related modules
from .providers.base import VMProviderType
from .providers.factory import VMProviderFactory

OSType = Literal["macos", "linux", "windows"]

class Computer:
    """Computer is the main class for interacting with the computer."""

    def create_desktop_from_apps(self, apps):
        """
        Create a virtual desktop from a list of app names, returning a DioramaComputer
        that proxies Diorama.Interface but uses diorama_cmds via the computer interface.

        Args:
            apps (list[str]): List of application names to include in the desktop.
        Returns:
            DioramaComputer: A proxy object with the Diorama interface, but using diorama_cmds.
        """
        assert "app-use" in self.experiments, "App Usage is an experimental feature. Enable it by passing experiments=['app-use'] to Computer()"
        from .diorama_computer import DioramaComputer
        return DioramaComputer(self, apps)

    def __init__(
        self,
        display: Union[Display, Dict[str, int], str] = "1024x768",
        memory: str = "8GB",
        cpu: str = "4",
        os_type: OSType = "macos",
        name: str = "",
        image: Optional[str] = None,
        shared_directories: Optional[List[str]] = None,
        use_host_computer_server: bool = False,
        verbosity: Union[int, LogLevel] = logging.INFO,
        telemetry_enabled: bool = True,
        provider_type: Union[str, VMProviderType] = VMProviderType.LUME,
        port: Optional[int] = 7777,
        noVNC_port: Optional[int] = 8006,
        host: str = os.environ.get("PYLUME_HOST", "localhost"),
        storage: Optional[str] = None,
        ephemeral: bool = False,
        api_key: Optional[str] = None,
        experiments: Optional[List[str]] = None
    ):
        """Initialize a new Computer instance.

        Args:
            display: The display configuration. Can be:
                    - A Display object
                    - A dict with 'width' and 'height'
                    - A string in format "WIDTHxHEIGHT" (e.g. "1920x1080")
                    Defaults to "1024x768"
            memory: The VM memory allocation. Defaults to "8GB"
            cpu: The VM CPU allocation. Defaults to "4"
            os_type: The operating system type ('macos' or 'linux')
            name: The VM name
            image: The VM image name
            shared_directories: Optional list of directory paths to share with the VM
            use_host_computer_server: If True, target localhost instead of starting a VM
            verbosity: Logging level (standard Python logging levels: logging.DEBUG, logging.INFO, etc.)
                      LogLevel enum values are still accepted for backward compatibility
            telemetry_enabled: Whether to enable telemetry tracking. Defaults to True.
            provider_type: The VM provider type to use (lume, qemu, cloud)
            port: Optional port to use for the VM provider server
            noVNC_port: Optional port for the noVNC web interface (Lumier provider)
            host: Host to use for VM provider connections (e.g. "localhost", "host.docker.internal")
            storage: Optional path for persistent VM storage (Lumier provider)
            ephemeral: Whether to use ephemeral storage
            api_key: Optional API key for cloud providers
            experiments: Optional list of experimental features to enable (e.g. ["app-use"])
        """

        self.logger = Logger("computer", verbosity)
        self.logger.info("Initializing Computer...")

        if not image:
            if os_type == "macos":
                image = "macos-sequoia-cua:latest"
            elif os_type == "linux":
                image = "trycua/cua-ubuntu:latest"
        image = str(image)

        # Store original parameters
        self.image = image
        self.port = port
        self.noVNC_port = noVNC_port
        self.host = host
        self.os_type = os_type
        self.provider_type = provider_type
        self.ephemeral = ephemeral
        
        self.api_key = api_key
        self.experiments = experiments or []
        
        if "app-use" in self.experiments:
            assert self.os_type == "macos", "App use experiment is only supported on macOS"

        # The default is currently to use non-ephemeral storage
        if storage and ephemeral and storage != "ephemeral":
            raise ValueError("Storage path and ephemeral flag cannot be used together")
        
        # Windows Sandbox always uses ephemeral storage
        if self.provider_type == VMProviderType.WINSANDBOX:
            if not ephemeral and storage != None and storage != "ephemeral":
                self.logger.warning("Windows Sandbox storage is always ephemeral. Setting ephemeral=True.")
            self.ephemeral = True
            self.storage = "ephemeral"
        else:
            self.storage = "ephemeral" if ephemeral else storage
        
        # For Lumier provider, store the first shared directory path to use
        # for VM file sharing
        self.shared_path = None
        if shared_directories and len(shared_directories) > 0:
            self.shared_path = shared_directories[0]
            self.logger.info(f"Using first shared directory for VM file sharing: {self.shared_path}")

        # Store telemetry preference
        self._telemetry_enabled = telemetry_enabled

        # Set initialization flag
        self._initialized = False
        self._running = False

        # Configure root logger
        self.verbosity = verbosity
        self.logger = Logger("computer", verbosity)

        # Configure component loggers with proper hierarchy
        self.vm_logger = Logger("computer.vm", verbosity)
        self.interface_logger = Logger("computer.interface", verbosity)

        if not use_host_computer_server:
            if ":" not in image:
                image = f"{image}:latest"

            if not name:
                # Normalize the name to be used for the VM
                name = image.replace(":", "_")
                # Remove any forward slashes
                name = name.replace("/", "_")

            # Convert display parameter to Display object
            if isinstance(display, str):
                # Parse string format "WIDTHxHEIGHT"
                match = re.match(r"(\d+)x(\d+)", display)
                if not match:
                    raise ValueError(
                        "Display string must be in format 'WIDTHxHEIGHT' (e.g. '1024x768')"
                    )
                width, height = map(int, match.groups())
                display_config = Display(width=width, height=height)
            elif isinstance(display, dict):
                display_config = Display(**display)
            else:
                display_config = display

            self.config = ComputerConfig(
                image=image.split(":")[0],
                tag=image.split(":")[1],
                name=name,
                display=display_config,
                memory=memory,
                cpu=cpu,
            )
            # Initialize VM provider but don't start it yet - we'll do that in run()
            self.config.vm_provider = None  # Will be initialized in run()

        # Store shared directories config
        self.shared_directories = shared_directories or []

        # Placeholder for VM provider context manager
        self._provider_context = None

        # Initialize with proper typing - None at first, will be set in run()
        self._interface = None
        self.use_host_computer_server = use_host_computer_server

        # Record initialization in telemetry (if enabled)
        if telemetry_enabled and is_telemetry_enabled():
            record_event("computer_initialized", SYSTEM_INFO)
        else:
            self.logger.debug("Telemetry disabled - skipping initialization tracking")

    async def __aenter__(self):
        """Start the computer."""
        await self.run()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Stop the computer."""
        await self.disconnect()

    def __enter__(self):
        """Start the computer."""
        # Run the event loop to call the async enter method
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__aenter__())
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Stop the computer."""
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__aexit__(exc_type, exc_val, exc_tb))

    async def run(self) -> Optional[str]:
        """Initialize the VM and computer interface."""
        if TYPE_CHECKING:
            from .interface.base import BaseComputerInterface

        # If already initialized, just log and return
        if hasattr(self, "_initialized") and self._initialized:
            self.logger.info("Computer already initialized, skipping initialization")
            return

        self.logger.info("Starting computer...")
        start_time = time.time()

        try:
            # If using host computer server
            if self.use_host_computer_server:
                self.logger.info("Using host computer server")
                # Set ip_address for host computer server mode
                ip_address = "localhost"
                # Create the interface with explicit type annotation
                from .interface.base import BaseComputerInterface

                self._interface = cast(
                    BaseComputerInterface,
                    InterfaceFactory.create_interface_for_os(
                        os=self.os_type, ip_address=ip_address  # type: ignore[arg-type]
                    ),
                )

                self.logger.info("Waiting for host computer server to be ready...")
                await self._interface.wait_for_ready()
                self.logger.info("Host computer server ready")
            else:
                # Start or connect to VM
                self.logger.info(f"Starting VM: {self.image}")
                if not self._provider_context:
                    try:
                        provider_type_name = self.provider_type.name if isinstance(self.provider_type, VMProviderType) else self.provider_type
                        self.logger.verbose(f"Initializing {provider_type_name} provider context...")

                        # Explicitly set provider parameters
                        storage = "ephemeral" if self.ephemeral else self.storage
                        verbose = self.verbosity >= LogLevel.DEBUG
                        ephemeral = self.ephemeral
                        port = self.port if self.port is not None else 7777
                        host = self.host if self.host else "localhost"
                        image = self.image
                        shared_path = self.shared_path
                        noVNC_port = self.noVNC_port

                        # Create VM provider instance with explicit parameters
                        try:
                            if self.provider_type == VMProviderType.LUMIER:
                                self.logger.info(f"Using VM image for Lumier provider: {image}")
                                if shared_path:
                                    self.logger.info(f"Using shared path for Lumier provider: {shared_path}")
                                if noVNC_port:
                                    self.logger.info(f"Using noVNC port for Lumier provider: {noVNC_port}")
                                self.config.vm_provider = VMProviderFactory.create_provider(
                                    self.provider_type,
                                    port=port,
                                    host=host,
                                    storage=storage,
                                    shared_path=shared_path,
                                    image=image,
                                    verbose=verbose,
                                    ephemeral=ephemeral,
                                    noVNC_port=noVNC_port,
                                )
                            elif self.provider_type == VMProviderType.LUME:
                                self.config.vm_provider = VMProviderFactory.create_provider(
                                    self.provider_type,
                                    port=port,
                                    host=host,
                                    storage=storage,
                                    verbose=verbose,
                                    ephemeral=ephemeral,
                                )
                            elif self.provider_type == VMProviderType.CLOUD:
                                self.config.vm_provider = VMProviderFactory.create_provider(
                                    self.provider_type,
                                    api_key=self.api_key,
                                    verbose=verbose,
                                )
                            elif self.provider_type == VMProviderType.WINSANDBOX:
                                self.config.vm_provider = VMProviderFactory.create_provider(
                                    self.provider_type,
                                    port=port,
                                    host=host,
                                    storage=storage,
                                    verbose=verbose,
                                    ephemeral=ephemeral,
                                    noVNC_port=noVNC_port,
                                )
                            elif self.provider_type == VMProviderType.DOCKER:
                                self.config.vm_provider = VMProviderFactory.create_provider(
                                    self.provider_type,
                                    port=port,
                                    host=host,
                                    storage=storage,
                                    shared_path=shared_path,
                                    image=image or "trycua/cua-ubuntu:latest",
                                    verbose=verbose,
                                    ephemeral=ephemeral,
                                    noVNC_port=noVNC_port,
                                )
                            else:
                                raise ValueError(f"Unsupported provider type: {self.provider_type}")
                            self._provider_context = await self.config.vm_provider.__aenter__()
                            self.logger.verbose("VM provider context initialized successfully")
                        except ImportError as ie:
                            self.logger.error(f"Failed to import provider dependencies: {ie}")
                            if str(ie).find("lume") >= 0 and str(ie).find("lumier") < 0:
                                self.logger.error("Please install with: pip install cua-computer[lume]")
                            elif str(ie).find("lumier") >= 0 or str(ie).find("docker") >= 0:
                                self.logger.error("Please install with: pip install cua-computer[lumier] and make sure Docker is installed")
                            elif str(ie).find("cloud") >= 0:
                                self.logger.error("Please install with: pip install cua-computer[cloud]")
                            raise
                    except Exception as e:
                        self.logger.error(f"Failed to initialize provider context: {e}")
                        raise RuntimeError(f"Failed to initialize VM provider: {e}")

                # Check if VM exists or create it
                is_running = False
                try:
                    if self.config.vm_provider is None:
                        raise RuntimeError(f"VM provider not initialized for {self.config.name}")
                        
                    vm = await self.config.vm_provider.get_vm(self.config.name)
                    self.logger.verbose(f"Found existing VM: {self.config.name}")
                    is_running = vm.get("status") == "running"
                except Exception as e:
                    self.logger.error(f"VM not found: {self.config.name}")
                    self.logger.error(f"Error: {e}")
                    raise RuntimeError(
                        f"VM {self.config.name} could not be found or created."
                    )

                # Start the VM if it's not running
                if not is_running:
                    self.logger.info(f"VM {self.config.name} is not running, starting it...")

                    # Convert paths to dictionary format for shared directories
                    shared_dirs = []
                    for path in self.shared_directories:
                        self.logger.verbose(f"Adding shared directory: {path}")
                        path = os.path.abspath(os.path.expanduser(path))
                        if os.path.exists(path):
                            # Add path in format expected by Lume API
                            shared_dirs.append({
                                "hostPath": path,
                                "readOnly": False
                            })
                        else:
                            self.logger.warning(f"Shared directory does not exist: {path}")
                            
                    # Prepare run options to pass to the provider
                    run_opts = {}

                    # Add display information if available
                    if self.config.display is not None:
                        display_info = {
                            "width": self.config.display.width,
                            "height": self.config.display.height,
                        }
                        
                        # Check if scale_factor exists before adding it
                        if hasattr(self.config.display, "scale_factor"):
                            display_info["scale_factor"] = self.config.display.scale_factor
                        
                        run_opts["display"] = display_info

                    # Add shared directories if available
                    if self.shared_directories:
                        run_opts["shared_directories"] = shared_dirs.copy()

                    # Run the VM with the provider
                    try:
                        if self.config.vm_provider is None:
                            raise RuntimeError(f"VM provider not initialized for {self.config.name}")
                            
                        # Use the complete run_opts we prepared earlier
                        # Handle ephemeral storage for run_vm method too
                        storage_param = "ephemeral" if self.ephemeral else self.storage
                        
                        # Log the image being used
                        self.logger.info(f"Running VM using image: {self.image}")
                        
                        # Call provider.run_vm with explicit image parameter
                        response = await self.config.vm_provider.run_vm(
                            image=self.image,
                            name=self.config.name,
                            run_opts=run_opts,
                            storage=storage_param
                        )
                        self.logger.info(f"VM run response: {response if response else 'None'}")
                    except Exception as run_error:
                        self.logger.error(f"Failed to run VM: {run_error}")
                        raise RuntimeError(f"Failed to start VM: {run_error}")

                # Wait for VM to be ready with a valid IP address
                self.logger.info("Waiting for VM to be ready with a valid IP address...")
                try:
                    if self.provider_type == VMProviderType.LUMIER:
                        max_retries = 60  # Increased for Lumier VM startup which takes longer
                        retry_delay = 3    # 3 seconds between retries for Lumier
                    else:
                        max_retries = 30  # Default for other providers
                        retry_delay = 2    # 2 seconds between retries
                    
                    self.logger.info(f"Waiting up to {max_retries * retry_delay} seconds for VM to be ready...")
                    ip = await self.get_ip(max_retries=max_retries, retry_delay=retry_delay)
                    
                    # If we get here, we have a valid IP
                    self.logger.info(f"VM is ready with IP: {ip}")
                    ip_address = ip
                except TimeoutError as timeout_error:
                    self.logger.error(str(timeout_error))
                    raise RuntimeError(f"VM startup timed out: {timeout_error}")
                except Exception as wait_error:
                    self.logger.error(f"Error waiting for VM: {wait_error}")
                    raise RuntimeError(f"VM failed to become ready: {wait_error}")
        except Exception as e:
            self.logger.error(f"Failed to initialize computer: {e}")
            self.logger.error(traceback.format_exc())
            raise RuntimeError(f"Failed to initialize computer: {e}")

        try:
            # Verify we have a valid IP before initializing the interface
            if not ip_address or ip_address == "unknown" or ip_address == "0.0.0.0":
                raise RuntimeError(f"Cannot initialize interface - invalid IP address: {ip_address}")
                
            # Initialize the interface using the factory with the specified OS
            self.logger.info(f"Initializing interface for {self.os_type} at {ip_address}")
            from .interface.base import BaseComputerInterface

            # Pass authentication credentials if using cloud provider
            if self.provider_type == VMProviderType.CLOUD and self.api_key and self.config.name:
                self._interface = cast(
                    BaseComputerInterface,
                    InterfaceFactory.create_interface_for_os(
                        os=self.os_type, 
                        ip_address=ip_address,
                        api_key=self.api_key,
                        vm_name=self.config.name
                    ),
                )
            else:
                self._interface = cast(
                    BaseComputerInterface,
                    InterfaceFactory.create_interface_for_os(
                        os=self.os_type, 
                        ip_address=ip_address
                    ),
                )

            # Wait for the WebSocket interface to be ready
            self.logger.info("Connecting to WebSocket interface...")

            try:
                # Use a single timeout for the entire connection process
                # The VM should already be ready at this point, so we're just establishing the connection
                await self._interface.wait_for_ready(timeout=30)
                self.logger.info("WebSocket interface connected successfully")
            except TimeoutError as e:
                self.logger.error(f"Failed to connect to WebSocket interface at {ip_address}")
                raise TimeoutError(
                    f"Could not connect to WebSocket interface at {ip_address}:8000/ws: {str(e)}"
                )
                # self.logger.warning(
                #     f"Could not connect to WebSocket interface at {ip_address}:8000/ws: {str(e)}, expect missing functionality"
                # )

            # Create an event to keep the VM running in background if needed
            if not self.use_host_computer_server:
                self._stop_event = asyncio.Event()
                self._keep_alive_task = asyncio.create_task(self._stop_event.wait())

            self.logger.info("Computer is ready")

            # Set the initialization flag and clear the initializing flag
            self._initialized = True
            
            # Set this instance as the default computer for remote decorators
            helpers.set_default_computer(self)
            
            self.logger.info("Computer successfully initialized")
        except Exception as e:
            raise
        finally:
            # Log initialization time for performance monitoring
            duration_ms = (time.time() - start_time) * 1000
            self.logger.debug(f"Computer initialization took {duration_ms:.2f}ms")
        return
    
    async def disconnect(self) -> None:
        """Disconnect from the computer's WebSocket interface."""
        if self._interface:
            self._interface.close()

    async def stop(self) -> None:
        """Disconnect from the computer's WebSocket interface and stop the computer."""
        start_time = time.time()

        try:
            self.logger.info("Stopping Computer...")

            # In VM mode, first explicitly stop the VM, then exit the provider context
            if not self.use_host_computer_server and self._provider_context and self.config.vm_provider is not None:
                try:
                    self.logger.info(f"Stopping VM {self.config.name}...")
                    await self.config.vm_provider.stop_vm(
                    name=self.config.name,
                    storage=self.storage  # Pass storage explicitly for clarity
                )
                except Exception as e:
                    self.logger.error(f"Error stopping VM: {e}")

                self.logger.verbose("Closing VM provider context...")
                await self.config.vm_provider.__aexit__(None, None, None)
                self._provider_context = None

            await self.disconnect()
            self.logger.info("Computer stopped")
        except Exception as e:
            self.logger.debug(f"Error during cleanup: {e}")  # Log as debug since this might be expected
        finally:
            # Log stop time for performance monitoring
            duration_ms = (time.time() - start_time) * 1000
            self.logger.debug(f"Computer stop process took {duration_ms:.2f}ms")
        return

    async def start(self) -> None:
        """Start the computer."""
        await self.run()

    async def restart(self) -> None:
        """Restart the computer.

        If using a VM provider that supports restart, this will issue a restart
        without tearing down the provider context, then reconnect the interface.
        Falls back to stop()+run() when a provider restart is not available.
        """
        # Host computer server: just disconnect and run again
        if self.use_host_computer_server:
            try:
                await self.disconnect()
            finally:
                await self.run()
            return

        # If no VM provider context yet, fall back to full run
        if not getattr(self, "_provider_context", None) or self.config.vm_provider is None:
            self.logger.info("No provider context active; performing full restart via run()")
            await self.run()
            return

        # Gracefully close current interface connection if present
        if self._interface:
            try:
                self._interface.close()
            except Exception as e:
                self.logger.debug(f"Error closing interface prior to restart: {e}")

        # Attempt provider-level restart if implemented
        try:
            storage_param = "ephemeral" if self.ephemeral else self.storage
            if hasattr(self.config.vm_provider, "restart_vm"):
                self.logger.info(f"Restarting VM {self.config.name} via provider...")
                await self.config.vm_provider.restart_vm(name=self.config.name, storage=storage_param)
            else:
                # Fallback: stop then start without leaving provider context
                self.logger.info(f"Provider has no restart_vm; performing stop+start for {self.config.name}...")
                await self.config.vm_provider.stop_vm(name=self.config.name, storage=storage_param)
                await self.config.vm_provider.run_vm(image=self.image, name=self.config.name, run_opts={}, storage=storage_param)
        except Exception as e:
            self.logger.error(f"Failed to restart VM via provider: {e}")
            # As a last resort, do a full stop (with provider context exit) and run
            try:
                await self.stop()
            finally:
                await self.run()
            return

        # Wait for VM to be ready and reconnect interface
        try:
            self.logger.info("Waiting for VM to be ready after restart...")
            if self.provider_type == VMProviderType.LUMIER:
                max_retries = 60
                retry_delay = 3
            else:
                max_retries = 30
                retry_delay = 2
            ip_address = await self.get_ip(max_retries=max_retries, retry_delay=retry_delay)

            self.logger.info(f"Re-initializing interface for {self.os_type} at {ip_address}")
            from .interface.base import BaseComputerInterface

            if self.provider_type == VMProviderType.CLOUD and self.api_key and self.config.name:
                self._interface = cast(
                    BaseComputerInterface,
                    InterfaceFactory.create_interface_for_os(
                        os=self.os_type,
                        ip_address=ip_address,
                        api_key=self.api_key,
                        vm_name=self.config.name,
                    ),
                )
            else:
                self._interface = cast(
                    BaseComputerInterface,
                    InterfaceFactory.create_interface_for_os(
                        os=self.os_type,
                        ip_address=ip_address,
                    ),
                )

            self.logger.info("Connecting to WebSocket interface after restart...")
            await self._interface.wait_for_ready(timeout=30)
            self.logger.info("Computer reconnected and ready after restart")
        except Exception as e:
            self.logger.error(f"Failed to reconnect after restart: {e}")
            # Try a full reset if reconnection failed
            try:
                await self.stop()
            finally:
                await self.run()

    # @property
    async def get_ip(self, max_retries: int = 15, retry_delay: int = 3) -> str:
        """Get the IP address of the VM or localhost if using host computer server.
        
        This method delegates to the provider's get_ip method, which waits indefinitely 
        until the VM has a valid IP address.
        
        Args:
            max_retries: Unused parameter, kept for backward compatibility
            retry_delay: Delay between retries in seconds (default: 2)
            
        Returns:
            IP address of the VM or localhost if using host computer server
        """
        # For host computer server, always return localhost immediately
        if self.use_host_computer_server:
            return "127.0.0.1"
            
        # Get IP from the provider - each provider implements its own waiting logic
        if self.config.vm_provider is None:
            raise RuntimeError("VM provider is not initialized")
        
        # Log that we're waiting for the IP
        self.logger.info(f"Waiting for VM {self.config.name} to get an IP address...")
        
        # Call the provider's get_ip method which will wait indefinitely
        storage_param = "ephemeral" if self.ephemeral else self.storage
        
        # Log the image being used
        self.logger.info(f"Running VM using image: {self.image}")
        
        # Call provider.get_ip with explicit image parameter
        ip = await self.config.vm_provider.get_ip(
            name=self.config.name,
            storage=storage_param,
            retry_delay=retry_delay
        )
        
        # Log success
        self.logger.info(f"VM {self.config.name} has IP address: {ip}")
        return ip
        

    async def wait_vm_ready(self) -> Optional[Dict[str, Any]]:
        """Wait for VM to be ready with an IP address.

        Returns:
            VM status information or None if using host computer server.
        """
        if self.use_host_computer_server:
            return None

        timeout = 600  # 10 minutes timeout (increased from 4 minutes)
        interval = 2.0  # 2 seconds between checks (increased to reduce API load)
        start_time = time.time()
        last_status = None
        attempts = 0

        self.logger.info(f"Waiting for VM {self.config.name} to be ready (timeout: {timeout}s)...")

        while time.time() - start_time < timeout:
            attempts += 1
            elapsed = time.time() - start_time

            try:
                # Keep polling for VM info
                if self.config.vm_provider is None:
                    self.logger.error("VM provider is not initialized")
                    vm = None
                else:
                    vm = await self.config.vm_provider.get_vm(self.config.name)

                # Log full VM properties for debugging (every 30 attempts)
                if attempts % 30 == 0:
                    self.logger.info(
                        f"VM properties at attempt {attempts}: {vars(vm) if vm else 'None'}"
                    )

                # Get current status for logging
                current_status = getattr(vm, "status", None) if vm else None
                if current_status != last_status:
                    self.logger.info(
                        f"VM status changed to: {current_status} (after {elapsed:.1f}s)"
                    )
                    last_status = current_status

                # Check for IP address - ensure it's not None or empty
                ip = getattr(vm, "ip_address", None) if vm else None
                if ip and ip.strip():  # Check for non-empty string
                    self.logger.info(
                        f"VM {self.config.name} got IP address: {ip} (after {elapsed:.1f}s)"
                    )
                    return vm

                if attempts % 10 == 0:  # Log every 10 attempts to avoid flooding
                    self.logger.info(
                        f"Still waiting for VM IP address... (elapsed: {elapsed:.1f}s)"
                    )
                else:
                    self.logger.debug(
                        f"Waiting for VM IP address... Current IP: {ip}, Status: {current_status}"
                    )

            except Exception as e:
                self.logger.warning(f"Error checking VM status (attempt {attempts}): {str(e)}")
                # If we've been trying for a while and still getting errors, log more details
                if elapsed > 60:  # After 1 minute of errors, log more details
                    self.logger.error(f"Persistent error getting VM status: {str(e)}")
                    self.logger.info("Trying to get VM list for debugging...")
                    try:
                        if self.config.vm_provider is not None:
                            vms = await self.config.vm_provider.list_vms()
                            self.logger.info(
                                f"Available VMs: {[getattr(vm, 'name', None) for vm in vms if hasattr(vm, 'name')]}"
                            )
                    except Exception as list_error:
                        self.logger.error(f"Failed to list VMs: {str(list_error)}")

            await asyncio.sleep(interval)

        # If we get here, we've timed out
        elapsed = time.time() - start_time
        self.logger.error(f"VM {self.config.name} not ready after {elapsed:.1f} seconds")

        # Try to get final VM status for debugging
        try:
            if self.config.vm_provider is not None:
                vm = await self.config.vm_provider.get_vm(self.config.name)
                # VM data is returned as a dictionary from the Lumier provider
                status = vm.get('status', 'unknown') if vm else "unknown"
                ip = vm.get('ip_address') if vm else None
            else:
                status = "unknown"
                ip = None
            self.logger.error(f"Final VM status: {status}, IP: {ip}")
        except Exception as e:
            self.logger.error(f"Failed to get final VM status: {str(e)}")

        raise TimeoutError(
            f"VM {self.config.name} not ready after {elapsed:.1f} seconds - IP address not assigned"
        )

    async def update(self, cpu: Optional[int] = None, memory: Optional[str] = None):
        """Update VM settings."""
        self.logger.info(
            f"Updating VM settings: CPU={cpu or self.config.cpu}, Memory={memory or self.config.memory}"
        )
        update_opts = {
            "cpu": cpu or int(self.config.cpu), 
            "memory": memory or self.config.memory
        }
        if self.config.vm_provider is not None:
                await self.config.vm_provider.update_vm(
                    name=self.config.name,
                    update_opts=update_opts,
                    storage=self.storage  # Pass storage explicitly for clarity
                )
        else:
            raise RuntimeError("VM provider not initialized")

    def get_screenshot_size(self, screenshot: bytes) -> Dict[str, int]:
        """Get the dimensions of a screenshot.

        Args:
            screenshot: The screenshot bytes

        Returns:
            Dict[str, int]: Dictionary containing 'width' and 'height' of the image
        """
        image = Image.open(io.BytesIO(screenshot))
        width, height = image.size
        return {"width": width, "height": height}

    @property
    def interface(self):
        """Get the computer interface for interacting with the VM.

        Returns:
            The computer interface
        """
        if not hasattr(self, "_interface") or self._interface is None:
            error_msg = "Computer interface not initialized. Call run() first."
            self.logger.error(error_msg)
            self.logger.error(
                "Make sure to call await computer.run() before using any interface methods."
            )
            raise RuntimeError(error_msg)

        return self._interface

    @property
    def telemetry_enabled(self) -> bool:
        """Check if telemetry is enabled for this computer instance.

        Returns:
            bool: True if telemetry is enabled, False otherwise
        """
        return self._telemetry_enabled

    async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
        """Convert normalized coordinates to screen coordinates.

        Args:
            x: X coordinate between 0 and 1
            y: Y coordinate between 0 and 1

        Returns:
            tuple[float, float]: Screen coordinates (x, y)
        """
        return await self.interface.to_screen_coordinates(x, y)

    async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
        """Convert screen coordinates to screenshot coordinates.

        Args:
            x: X coordinate in screen space
            y: Y coordinate in screen space

        Returns:
            tuple[float, float]: (x, y) coordinates in screenshot space
        """
        return await self.interface.to_screenshot_coordinates(x, y)


    # Add virtual environment management functions to computer interface
    async def venv_install(self, venv_name: str, requirements: list[str]):
        """Install packages in a virtual environment.
        
        Args:
            venv_name: Name of the virtual environment
            requirements: List of package requirements to install
            
        Returns:
            Tuple of (stdout, stderr) from the installation command
        """
        requirements = requirements or []
        # Windows vs POSIX handling
        if self.os_type == "windows":
            # Use %USERPROFILE% for home directory and cmd.exe semantics
            venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}"
            ensure_dir_cmd = "if not exist \"%USERPROFILE%\\.venvs\" mkdir \"%USERPROFILE%\\.venvs\""
            create_cmd = f"if not exist \"{venv_path}\" python -m venv \"{venv_path}\""
            requirements_str = " ".join(requirements)
            # Activate via activate.bat and install
            install_cmd = f"call \"{venv_path}\\Scripts\\activate.bat\" && pip install {requirements_str}" if requirements_str else f"echo No requirements to install"
            await self.interface.run_command(ensure_dir_cmd)
            await self.interface.run_command(create_cmd)
            return await self.interface.run_command(install_cmd)
        else:
            # POSIX (macOS/Linux)
            venv_path = f"$HOME/.venvs/{venv_name}"
            create_cmd = f"mkdir -p \"$HOME/.venvs\" && python3 -m venv \"{venv_path}\""
            # Check if venv exists, if not create it
            check_cmd = f"test -d \"{venv_path}\" || ({create_cmd})"
            _ = await self.interface.run_command(check_cmd)
            # Install packages
            requirements_str = " ".join(requirements)
            install_cmd = (
                f". \"{venv_path}/bin/activate\" && pip install {requirements_str}"
                if requirements_str
                else "echo No requirements to install"
            )
            return await self.interface.run_command(install_cmd)
    
    async def venv_cmd(self, venv_name: str, command: str):
        """Execute a shell command in a virtual environment.
        
        Args:
            venv_name: Name of the virtual environment
            command: Shell command to execute in the virtual environment
            
        Returns:
            Tuple of (stdout, stderr) from the command execution
        """
        if self.os_type == "windows":
            # Windows (cmd.exe)
            venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}"
            # Check existence and signal if missing
            check_cmd = f"if not exist \"{venv_path}\" (echo VENV_NOT_FOUND) else (echo VENV_FOUND)"
            result = await self.interface.run_command(check_cmd)
            if "VENV_NOT_FOUND" in getattr(result, "stdout", ""):
                # Auto-create the venv with no requirements
                await self.venv_install(venv_name, [])
            # Activate and run the command
            full_command = f"call \"{venv_path}\\Scripts\\activate.bat\" && {command}"
            return await self.interface.run_command(full_command)
        else:
            # POSIX (macOS/Linux)
            venv_path = f"$HOME/.venvs/{venv_name}"
            # Check if virtual environment exists
            check_cmd = f"test -d \"{venv_path}\""
            result = await self.interface.run_command(check_cmd)
            if result.stderr or "test:" in result.stdout:  # venv doesn't exist
                # Auto-create the venv with no requirements
                await self.venv_install(venv_name, [])
            # Activate virtual environment and run command
            full_command = f". \"{venv_path}/bin/activate\" && {command}"
            return await self.interface.run_command(full_command)
    
    async def venv_exec(self, venv_name: str, python_func, *args, **kwargs):
        """Execute Python function in a virtual environment using source code extraction.
        
        Args:
            venv_name: Name of the virtual environment
            python_func: A callable function to execute
            *args: Positional arguments to pass to the function
            **kwargs: Keyword arguments to pass to the function
            
        Returns:
            The result of the function execution, or raises any exception that occurred
        """
        import base64
        import inspect
        import json
        import textwrap
        
        try:
            # Get function source code using inspect.getsource
            source = inspect.getsource(python_func)
            # Remove common leading whitespace (dedent)
            func_source = textwrap.dedent(source).strip()
            
            # Remove decorators
            while func_source.lstrip().startswith("@"):
                func_source = func_source.split("\n", 1)[1].strip()
            
            # Get function name for execution
            func_name = python_func.__name__
            
            # Serialize args and kwargs as JSON (safer than dill for cross-version compatibility)
            args_json = json.dumps(args, default=str)
            kwargs_json = json.dumps(kwargs, default=str)
            
        except OSError as e:
            raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
        except Exception as e:
            raise Exception(f"Failed to reconstruct function source: {e}")
        
        # Create Python code that will define and execute the function
        python_code = f'''
import json
import traceback

try:
    # Define the function from source
{textwrap.indent(func_source, "    ")}
    
    # Deserialize args and kwargs from JSON
    args_json = """{args_json}"""
    kwargs_json = """{kwargs_json}"""
    args = json.loads(args_json)
    kwargs = json.loads(kwargs_json)
    
    # Execute the function
    result = {func_name}(*args, **kwargs)

    # Create success output payload
    output_payload = {{
        "success": True,
        "result": result,
        "error": None
    }}
    
except Exception as e:
    # Create error output payload
    output_payload = {{
        "success": False,
        "result": None,
        "error": {{
            "type": type(e).__name__,
            "message": str(e),
            "traceback": traceback.format_exc()
        }}
    }}

# Serialize the output payload as JSON
import json
output_json = json.dumps(output_payload, default=str)

# Print the JSON output with markers
print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>")
'''
        
        # Encode the Python code in base64 to avoid shell escaping issues
        encoded_code = base64.b64encode(python_code.encode('utf-8')).decode('ascii')
        
        # Execute the Python code in the virtual environment
        python_command = f"python -c \"import base64; exec(base64.b64decode('{encoded_code}').decode('utf-8'))\""
        result = await self.venv_cmd(venv_name, python_command)
        
        # Parse the output to extract the payload
        start_marker = "<<<VENV_EXEC_START>>>"
        end_marker = "<<<VENV_EXEC_END>>>"

        # Print original stdout
        print(result.stdout[:result.stdout.find(start_marker)])
        
        if start_marker in result.stdout and end_marker in result.stdout:
            start_idx = result.stdout.find(start_marker) + len(start_marker)
            end_idx = result.stdout.find(end_marker)
            
            if start_idx < end_idx:
                output_json = result.stdout[start_idx:end_idx]

                try:
                    # Decode and deserialize the output payload from JSON
                    output_payload = json.loads(output_json)
                except Exception as e:
                    raise Exception(f"Failed to decode output payload: {e}")
                
                if output_payload["success"]:
                    return output_payload["result"]
                else:
                    # Recreate and raise the original exception
                    error_info = output_payload["error"]
                    error_class = eval(error_info["type"])
                    raise error_class(error_info["message"])
            else:
                raise Exception("Invalid output format: markers found but no content between them")
        else:
            # Fallback: return stdout/stderr if no payload markers found
            raise Exception(f"No output payload found. stdout: {result.stdout}, stderr: {result.stderr}")

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/diorama/draw.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""Diorama Renderer - A tool for rendering selective views of macOS desktops

This script renders filtered views of the macOS desktop, preserving only selected applications
while maintaining system UI elements like menubar and dock. Each "diorama" shows a consistent
view of the system while isolating specific applications. 

The image is "smart resized" to remove any empty space around the menubar and dock.

Key features:
- Captures shared window state, z-order and position information
- Filters windows by application based on whitelist
- Preserves system context (menubar, dock) in each view
- Preserves menu-owning / keyboard-focused window in each view
- Supports parallel views of the same desktop for multi-agent systems
"""

import sys
import os
import time
import argparse
from typing import List, Dict, Any, Optional, Tuple
import json
from PIL import Image, ImageDraw
import io
import asyncio
import functools
import logging

# simple, nicely formatted logging
logger = logging.getLogger(__name__)

from computer_server.diorama.safezone import (
    get_menubar_bounds,
    get_dock_bounds,
)

# Timing decorator for profiling
def timing_decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        elapsed_time = end_time - start_time
        logger.debug(f"Function {func.__name__} took {elapsed_time:.4f} seconds to run")
        return result
    return wrapper

# Import Objective-C bridge libraries
try:
    import Quartz
    import AppKit
    from ApplicationServices import (
        AXUIElementCreateSystemWide,  # type: ignore
        AXUIElementCreateApplication,  # type: ignore
        AXUIElementCopyAttributeValue,  # type: ignore
        AXUIElementCopyAttributeValues,  # type: ignore
        kAXFocusedWindowAttribute,  # type: ignore
        kAXWindowsAttribute,  # type: ignore
        kAXMainWindowAttribute,  # type: ignore
        kAXChildrenAttribute,  # type: ignore
        kAXRoleAttribute,  # type: ignore
        kAXTitleAttribute,  # type: ignore
        kAXValueAttribute,  # type: ignore
        kAXDescriptionAttribute,  # type: ignore
        kAXEnabledAttribute,  # type: ignore
        kAXPositionAttribute,  # type: ignore
        kAXSizeAttribute,  # type: ignore
        kAXErrorSuccess,  # type: ignore
        AXValueGetType,  # type: ignore
        kAXValueCGSizeType,  # type: ignore
        kAXValueCGPointType,  # type: ignore
        kAXValueCFRangeType,  # type: ignore
        AXUIElementGetTypeID,  # type: ignore
        AXValueGetValue,  # type: ignore
        kAXVisibleChildrenAttribute,  # type: ignore
        kAXRoleDescriptionAttribute,  # type: ignore
        kAXFocusedApplicationAttribute,  # type: ignore
        kAXFocusedUIElementAttribute,  # type: ignore
        kAXSelectedTextAttribute,  # type: ignore
        kAXSelectedTextRangeAttribute,  # type: ignore
    )
    from AppKit import NSWorkspace, NSApplication, NSApp, NSRunningApplication
    import Foundation
    from Foundation import NSObject, NSMakeRect
    import objc
except ImportError:
    logger.error("Error: This script requires PyObjC to be installed.")
    logger.error("Please install it with: pip install pyobjc")
    sys.exit(1)

# Constants for accessibility API
kAXErrorSuccess = 0
kAXRoleAttribute = "AXRole"
kAXTitleAttribute = "AXTitle"
kAXValueAttribute = "AXValue"
kAXWindowsAttribute = "AXWindows"
kAXFocusedAttribute = "AXFocused"
kAXPositionAttribute = "AXPosition"
kAXSizeAttribute = "AXSize"
kAXChildrenAttribute = "AXChildren"
kAXMenuBarAttribute = "AXMenuBar"
kAXMenuBarItemAttribute = "AXMenuBarItem"

# Constants for window properties
kCGWindowLayer = "kCGWindowLayer"  # Z-order information (lower values are higher in the stack)
kCGWindowAlpha = "kCGWindowAlpha"  # Window opacity

# Constants for application activation options
NSApplicationActivationOptions = {
    "regular": 0,  # Default activation
    "bringing_all_windows_forward": 1 << 0,  # NSApplicationActivateAllWindows
    "ignoring_other_apps": 1 << 1  # NSApplicationActivateIgnoringOtherApps
}


def CFAttributeToPyObject(attrValue):
    def list_helper(list_value):
        list_builder = []
        for item in list_value:
            list_builder.append(CFAttributeToPyObject(item))
        return list_builder

    def number_helper(number_value):
        success, int_value = Foundation.CFNumberGetValue(  # type: ignore
            number_value, Foundation.kCFNumberIntType, None  # type: ignore
        )
        if success:
            return int(int_value)

        success, float_value = Foundation.CFNumberGetValue(  # type: ignore
            number_value, Foundation.kCFNumberDoubleType, None  # type: ignore
        )
        if success:
            return float(float_value)
        return None

    def axuielement_helper(element_value):
        return element_value

    cf_attr_type = Foundation.CFGetTypeID(attrValue)  # type: ignore
    cf_type_mapping = {
        Foundation.CFStringGetTypeID(): str,  # type: ignore
        Foundation.CFBooleanGetTypeID(): bool,  # type: ignore
        Foundation.CFArrayGetTypeID(): list_helper,  # type: ignore
        Foundation.CFNumberGetTypeID(): number_helper,  # type: ignore
        AXUIElementGetTypeID(): axuielement_helper,  # type: ignore
    }
    try:
        return cf_type_mapping[cf_attr_type](attrValue)
    except KeyError:
        # did not get a supported CF type. Move on to AX type
        pass

    ax_attr_type = AXValueGetType(attrValue)
    ax_type_map = {
        kAXValueCGSizeType: Foundation.NSSizeFromString,  # type: ignore
        kAXValueCGPointType: Foundation.NSPointFromString,  # type: ignore
        kAXValueCFRangeType: Foundation.NSRangeFromString,  # type: ignore
    }
    try:
        search_result = re.search("{.*}", attrValue.description())
        if search_result:
            extracted_str = search_result.group()
            return tuple(ax_type_map[ax_attr_type](extracted_str))
        return None
    except KeyError:
        return None

def element_attribute(element, attribute):
    if attribute == kAXChildrenAttribute:
        err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
        if err == kAXErrorSuccess:
            if isinstance(value, Foundation.NSArray):  # type: ignore
                return CFAttributeToPyObject(value)
            else:
                return value
    err, value = AXUIElementCopyAttributeValue(element, attribute, None)
    if err == kAXErrorSuccess:
        if isinstance(value, Foundation.NSArray):  # type: ignore
            return CFAttributeToPyObject(value)
        else:
            return value
    return None

def element_value(element, type):
    err, value = AXValueGetValue(element, type, None)
    if err == True:
        return value
    return None


@timing_decorator
def get_running_apps() -> List[NSRunningApplication]:
    """Get list of all running applications
    
    Returns:
        List of NSRunningApplication objects
    """
    return NSWorkspace.sharedWorkspace().runningApplications()

# @timing_decorator
def get_app_info(app: NSRunningApplication) -> Dict[str, Any]:
    """Get information about an application
    
    Args:
        app: NSRunningApplication object
        
    Returns:
        Dictionary with application information
    """
    return {
        "name": app.localizedName(),
        "bundle_id": app.bundleIdentifier(),
        "pid": app.processIdentifier(),
        "active": app.isActive(),
        "hidden": app.isHidden(),
        "terminated": app.isTerminated(),
    }

@timing_decorator
def get_all_windows() -> List[Dict[str, Any]]:
    """Get all windows from all applications with z-order information
    
    Returns:
        List of window dictionaries with z-order information
    """
    # Get all windows from Quartz
    # The kCGWindowListOptionOnScreenOnly flag gets only visible windows with preserved z-order
    window_list = Quartz.CGWindowListCopyWindowInfo(
        Quartz.kCGWindowListOptionOnScreenOnly,
        Quartz.kCGNullWindowID
    )
    
    # Create a dictionary of window z-order
    z_order = {window['kCGWindowNumber']: z_index for z_index, window in enumerate(window_list[::-1])}
    
    # The kCGWindowListOptionAll flag gets all windows *without* z-order preserved
    window_list_all = Quartz.CGWindowListCopyWindowInfo(
        Quartz.kCGWindowListOptionAll,
        Quartz.kCGNullWindowID
    )
    
    # Process all windows
    windows = []
    for window in window_list_all:
        # We track z_index which is the index in the window list (0 is the desktop / background)
        
        # Get window properties
        window_id = window.get('kCGWindowNumber', 0)
        window_name = window.get('kCGWindowName', '')
        window_pid = window.get('kCGWindowOwnerPID', 0)
        window_bounds = window.get('kCGWindowBounds', {})
        window_owner = window.get('kCGWindowOwnerName', '')
        window_is_on_screen = window.get('kCGWindowIsOnscreen', False)
        
        # Get z-order information
        # Note: kCGWindowLayer provides the system's layer value (lower values are higher in the stack)
        layer = window.get(kCGWindowLayer, 0)
        opacity = window.get(kCGWindowAlpha, 1.0)
        z_index = z_order.get(window_id, -1)
        
        # Determine window role (desktop, dock, menubar, app)
        if window_name == "Dock" and window_owner == "Dock":
            role = "dock"
        elif window_name == "Menubar" and window_owner == "Window Server":
            role = "menubar"
        elif window_owner in ["Window Server", "Dock"]:
            role = "desktop"
        else:
            role = "app"
        
        # Only include windows with valid bounds
        if window_bounds:
            windows.append({
                "id": window_id,
                "name": window_name or "Unnamed Window",
                "pid": window_pid,
                "owner": window_owner,
                "role": role,
                "is_on_screen": window_is_on_screen,
                "bounds": {
                    "x": window_bounds.get('X', 0),
                    "y": window_bounds.get('Y', 0),
                    "width": window_bounds.get('Width', 0),
                    "height": window_bounds.get('Height', 0)
                },
                "layer": layer,  # System layer (lower values are higher in stack)
                "z_index": z_index,  # Our z-index (0 is the desktop)
                "opacity": opacity
            })
            
    windows = sorted(windows, key=lambda x: x["z_index"])
    
    return windows

def get_app_windows(app_pid: int, all_windows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Get all windows for a specific application
    
    Args:
        app_pid: Process ID of the application
        all_windows: List of all windows with z-order information
        
    Returns:
        List of window dictionaries for the app
    """
    # Filter windows by PID
    return [window for window in all_windows if window["pid"] == app_pid]

@timing_decorator
def draw_desktop_screenshot(app_whitelist: List[str] = None, all_windows: List[Dict[str, Any]] = None, dock_bounds: Dict[str, float] = None, dock_items: List[Dict[str, Any]] = None, menubar_bounds: Dict[str, float] = None, menubar_items: List[Dict[str, Any]] = None) -> Tuple[Optional[Image.Image], List[Dict[str, Any]]]:
    """Capture a screenshot of the entire desktop using Quartz compositing, including dock as a second pass.
    Args:
        app_whitelist: Optional list of app names to include in the screenshot
    Returns:
        PIL Image of the desktop or None if capture failed
    """
    import ctypes

    if dock_bounds is None:
        dock_bounds = get_dock_bounds()
    if dock_items is None:
        dock_items = get_dock_items()
    if menubar_bounds is None:
        menubar_bounds = get_menubar_bounds()
    if menubar_items is None:
        menubar_items = get_menubar_items()
    if all_windows is None:
        all_windows = get_all_windows()
    all_windows = all_windows[::-1]
    all_windows = [window for window in all_windows if window["is_on_screen"]]

    main_screen = AppKit.NSScreen.mainScreen()
    if main_screen:
        frame = main_screen.frame()
        screen_rect = Quartz.CGRectMake(0, 0, frame.size.width, frame.size.height)
    else:
        screen_rect = Quartz.CGRectNull

    # Screenshot-to-screen hitboxes
    hitboxes = []
    
    if app_whitelist is None:
        # Single pass: desktop, menubar, app, dock
        window_list = Foundation.CFArrayCreateMutable(None, len(all_windows), None)
        for window in all_windows:
            Foundation.CFArrayAppendValue(window_list, window["id"])
        cg_image = Quartz.CGWindowListCreateImageFromArray(
            screen_rect, window_list, Quartz.kCGWindowImageDefault
        )
        if cg_image is None:
            return None

        # Create CGContext for compositing
        width = int(frame.size.width)
        height = int(frame.size.height)
        color_space = Quartz.CGColorSpaceCreateWithName(Quartz.kCGColorSpaceSRGB)
        cg_context = Quartz.CGBitmapContextCreate(
            None, width, height, 8, 0, color_space, Quartz.kCGImageAlphaPremultipliedLast
        )
        Quartz.CGContextDrawImage(cg_context, screen_rect, cg_image)
        hitboxes.append({
            "hitbox": [0, 0, width, height],
            "target": [0, 0, width, height]
        })
    else:
        # Filter out windows that are not in the whitelist
        all_windows = [window for window in all_windows if window["owner"] in app_whitelist or window["role"] != "app"]
        app_windows = [window for window in all_windows if window["role"] == "app"]
        
        dock_orientation = "side" if dock_bounds["width"] < dock_bounds["height"] else "bottom"
        
        menubar_length = max(item["bounds"]["x"] + item["bounds"]["width"] for item in menubar_items) if menubar_items else 0
                
        # Calculate bounds of app windows
        app_bounds = {
            "x": min(window["bounds"]["x"] for window in app_windows) if app_windows else 0,
            "y": min(window["bounds"]["y"] for window in app_windows) if app_windows else 0,
        }
        app_bounds["width"] = max(window["bounds"]["x"] + window["bounds"]["width"] for window in app_windows) - app_bounds["x"] if app_windows else 0
        app_bounds["height"] = max(window["bounds"]["y"] + window["bounds"]["height"] for window in app_windows) - app_bounds["y"] if app_windows else 0
        
        # Set minimum bounds of 256x256
        app_bounds["width"] = max(app_bounds["width"], 256)
        app_bounds["height"] = max(app_bounds["height"], 256)
        
        # Add dock bounds to app bounds
        if dock_orientation == "bottom":
            app_bounds["height"] += dock_bounds["height"] + 4
        elif dock_orientation == "side":
            if dock_bounds["x"] > frame.size.width / 2:
                app_bounds["width"] += dock_bounds["width"] + 4
            else:
                app_bounds["x"] -= dock_bounds["width"] + 4
                app_bounds["width"] += dock_bounds["width"] + 4
        
        # Add menubar bounds to app bounds
        app_bounds["height"] += menubar_bounds["height"]
        
        # Make sure app bounds contains menubar bounds
        app_bounds["width"] = max(app_bounds["width"], menubar_length)
        
        # Clamp bounds to screen
        app_bounds["x"] = max(app_bounds["x"], 0)
        app_bounds["y"] = max(app_bounds["y"], 0)
        app_bounds["width"] = min(app_bounds["width"], frame.size.width - app_bounds["x"])
        app_bounds["height"] = min(app_bounds["height"], frame.size.height - app_bounds["y"] + menubar_bounds["height"])
        
        # Create CGContext for compositing
        width = int(app_bounds["width"])
        height = int(app_bounds["height"])
        color_space = Quartz.CGColorSpaceCreateWithName(Quartz.kCGColorSpaceSRGB)
        cg_context = Quartz.CGBitmapContextCreate(
            None, width, height, 8, 0, color_space, Quartz.kCGImageAlphaPremultipliedLast
        )
        
        def _draw_layer(cg_context, all_windows, source_rect, target_rect):
            """Draw a layer of windows from source_rect to target_rect on the given context."""
            window_list = Foundation.CFArrayCreateMutable(None, len(all_windows), None)
            for window in all_windows:
                Foundation.CFArrayAppendValue(window_list, window["id"])
            cg_image = Quartz.CGWindowListCreateImageFromArray(
                source_rect, window_list, Quartz.kCGWindowImageDefault
            )
            if cg_image is not None:
                Quartz.CGContextDrawImage(cg_context, target_rect, cg_image)
        
        # --- FIRST PASS: desktop, apps ---
        source_position = [app_bounds["x"], app_bounds["y"]]
        source_size = [app_bounds["width"], app_bounds["height"]]
        target_position = [
            0,
            min(
                menubar_bounds["y"] + menubar_bounds["height"], 
                app_bounds["y"]
            )
        ]
        target_size = [app_bounds["width"], app_bounds["height"]]
        
        if dock_orientation == "bottom":
            source_size[1] += dock_bounds["height"]
            target_size[1] += dock_bounds["height"]
        elif dock_orientation == "side":
            if dock_bounds["x"] < frame.size.width / 2:
                source_position[0] -= dock_bounds["width"]
                target_position[0] -= dock_bounds["width"]
            source_size[0] += dock_bounds["width"]
            target_size[0] += dock_bounds["width"]
        
        app_source_rect = Quartz.CGRectMake(
            source_position[0], source_position[1], source_size[0], source_size[1]
        )
        app_target_rect = Quartz.CGRectMake(
            target_position[0], app_bounds["height"] - target_position[1] - target_size[1], target_size[0], target_size[1]
        )
        first_pass_windows = [w for w in all_windows if w["role"] == "app" or w["role"] == "desktop"]
        _draw_layer(cg_context, first_pass_windows, app_source_rect, app_target_rect)
        
        hitboxes.append({
            "hitbox": [0, menubar_bounds["height"], app_bounds["width"], menubar_bounds["height"] + app_bounds["height"]],
            "target": [
                app_source_rect.origin.x, 
                app_source_rect.origin.y, 
                app_source_rect.origin.x + app_bounds["width"], 
                app_source_rect.origin.y + app_bounds["height"]
            ]
        })

        # --- SECOND PASS: menubar ---
        allowed_roles = {"menubar"}
        menubar_windows = [w for w in all_windows if w["role"] in allowed_roles]
        menubar_source_rect = Quartz.CGRectMake(
            0, 0, app_bounds["width"], menubar_bounds["height"]
        )
        menubar_target_rect = Quartz.CGRectMake(
            0, app_bounds["height"] - menubar_bounds["height"], app_bounds["width"], menubar_bounds["height"]
        )
        _draw_layer(cg_context, menubar_windows, menubar_source_rect, menubar_target_rect)
        
        hitboxes.append({
            "hitbox": [0, 0, app_bounds["width"], menubar_bounds["height"]],
            "target": [0, 0, app_bounds["width"], menubar_bounds["height"]]
        })
        
        # --- THIRD PASS: dock, filtered ---
        # Step 1: Collect dock items to draw, with their computed target rects
        dock_draw_items = []
        for index, item in enumerate(dock_items):
            source_position = (item["bounds"]["x"], item["bounds"]["y"])
            source_size = (item["bounds"]["width"], item["bounds"]["height"])

            # apply whitelist to middle items
            if not (index == 0 or index == len(dock_items) - 1):
                if item["subrole"] == "AXApplicationDockItem":
                    if item["title"] not in app_whitelist:
                        continue
                elif item["subrole"] == "AXMinimizedWindowDockItem":
                    if not any(window["name"] == item["title"] and window["role"] == "app" and window["owner"] in app_whitelist for window in all_windows):
                        continue
                elif item["subrole"] == "AXFolderDockItem":
                    continue

            # Preserve unscaled (original) source position and size before any modification
            hitbox_position = source_position
            hitbox_size = source_size
            
            screen_position = source_position
            screen_size = source_size
            
            # stretch to screen size
            padding = 32
            if dock_orientation == "bottom":
                source_position = (source_position[0], 0)
                source_size = (source_size[0], frame.size.height)
                
                hitbox_position = (source_position[0], app_bounds['height'] - hitbox_size[1])
                hitbox_size = (source_size[0], hitbox_size[1])
                
                if index == 0:
                    source_size = (padding + source_size[0], source_size[1])
                    source_position = (source_position[0] - padding, 0)
                elif index == len(dock_items) - 1:
                    source_size = (source_size[0] + padding, source_size[1])
                    source_position = (source_position[0], 0)
                    
            elif dock_orientation == "side":
                source_position = (0, source_position[1])
                source_size = (frame.size.width, source_size[1])
                
                hitbox_position = (
                    source_position[0] if dock_bounds['x'] < frame.size.width / 2 else app_bounds['width'] - hitbox_size[0],
                    source_position[1]
                )
                hitbox_size = (hitbox_size[0], source_size[1])
                
                if index == 0:
                    source_size = (source_size[0], padding + source_size[1])
                    source_position = (0, source_position[1] - padding)
                elif index == len(dock_items) - 1:
                    source_size = (source_size[0], source_size[1] + padding)
                    source_position = (0, source_position[1])
                

            # Compute the initial target position
            target_position = source_position
            target_size = source_size
            
            dock_draw_items.append({
                "item": item,
                "index": index,
                "source_position": source_position,
                "source_size": source_size,
                "target_size": target_size,
                "target_position": target_position,  # Will be updated after packing
                "hitbox_position": hitbox_position,
                "hitbox_size": hitbox_size,
                "screen_position": screen_position,
                "screen_size": screen_size,
            })

        # Step 2: Pack the target rects along the main axis, removing gaps
        packed_positions = []
        if dock_orientation == "bottom":
            # Pack left-to-right
            x_cursor = 0
            for draw_item in dock_draw_items:
                packed_positions.append((x_cursor, draw_item["target_position"][1]))
                x_cursor += draw_item["target_size"][0]
            packed_strip_length = x_cursor
            # Center horizontally
            x_offset = (app_bounds['width'] - packed_strip_length) / 2
            y_offset = (frame.size.height - app_bounds['height'])
            for i, draw_item in enumerate(dock_draw_items):
                px, py = packed_positions[i]
                draw_item["target_position"] = (px + x_offset, py - y_offset)
                
            # Pack unscaled source rects
            x_cursor = 0
            for draw_item in dock_draw_items:
                draw_item["hitbox_position"] = (x_cursor, draw_item["hitbox_position"][1])
                x_cursor += draw_item["hitbox_size"][0]
            packed_strip_length = x_cursor
            # Center horizontally
            x_offset = (app_bounds['width'] - packed_strip_length) / 2
            for i, draw_item in enumerate(dock_draw_items):
                px, py = draw_item["hitbox_position"]
                draw_item["hitbox_position"] = (px + x_offset, py)
        elif dock_orientation == "side":
            # Pack top-to-bottom
            y_cursor = 0
            for draw_item in dock_draw_items:
                packed_positions.append((draw_item["target_position"][0], y_cursor))
                y_cursor += draw_item["target_size"][1]
            packed_strip_length = y_cursor
            # Center vertically
            y_offset = (app_bounds['height'] - packed_strip_length) / 2
            x_offset = 0 if dock_bounds['x'] < frame.size.width / 2 else frame.size.width - app_bounds['width']
            for i, draw_item in enumerate(dock_draw_items):
                px, py = packed_positions[i]
                draw_item["target_position"] = (px - x_offset, py + y_offset)
            
            # Pack unscaled source rects
            y_cursor = 0
            for draw_item in dock_draw_items:
                draw_item["hitbox_position"] = (draw_item["hitbox_position"][0], y_cursor)
                y_cursor += draw_item["hitbox_size"][1]
            packed_strip_length = y_cursor
            # Center vertically
            y_offset = (app_bounds['height'] - packed_strip_length) / 2
            for i, draw_item in enumerate(dock_draw_items):
                px, py = draw_item["hitbox_position"]
                draw_item["hitbox_position"] = (px, py + y_offset)
            
        dock_windows = [window for window in all_windows if window["role"] == "dock"]
        # Step 3: Draw dock items using packed and recentered positions
        for draw_item in dock_draw_items:
            item = draw_item["item"]
            source_position = draw_item["source_position"]
            source_size = draw_item["source_size"]
            target_position = draw_item["target_position"]
            target_size = draw_item["target_size"]

            # flip target position y
            target_position = (target_position[0], app_bounds['height'] - target_position[1] - target_size[1])

            source_rect = Quartz.CGRectMake(*source_position, *source_size)
            target_rect = Quartz.CGRectMake(*target_position, *target_size)

            _draw_layer(cg_context, dock_windows, source_rect, target_rect)

            hitbox_position = draw_item["hitbox_position"]
            hitbox_size = draw_item["hitbox_size"]

            # Debug: Draw true hitbox rect (packed position, unscaled size)
            # # Flip y like target_rect
            # hitbox_position_flipped = (
            #     hitbox_position[0],
            #     app_bounds['height'] - hitbox_position[1] - hitbox_size[1]
            # )
            # hitbox_rect = Quartz.CGRectMake(*hitbox_position_flipped, *hitbox_size)
            # Quartz.CGContextSetStrokeColorWithColor(cg_context, Quartz.CGColorCreateGenericRGB(0, 1, 0, 1))
            # Quartz.CGContextStrokeRect(cg_context, hitbox_rect)
            
            hitboxes.append({
                "hitbox": [*hitbox_position, hitbox_position[0] + hitbox_size[0], hitbox_position[1] + hitbox_size[1]],
                "target": [*draw_item["screen_position"], draw_item["screen_position"][0] + draw_item["screen_size"][0], draw_item["screen_position"][1] + draw_item["screen_size"][1]]
            })
            

    # Convert composited context to CGImage
    final_cg_image = Quartz.CGBitmapContextCreateImage(cg_context)
    ns_image = AppKit.NSImage.alloc().initWithCGImage_size_(final_cg_image, Foundation.NSZeroSize)
    ns_data = ns_image.TIFFRepresentation()
    bitmap_rep = AppKit.NSBitmapImageRep.imageRepWithData_(ns_data)
    png_data = bitmap_rep.representationUsingType_properties_(AppKit.NSBitmapImageFileTypePNG, None)
    image_data = io.BytesIO(png_data)
    return Image.open(image_data), hitboxes

@timing_decorator
def get_menubar_items(active_app_pid: int = None) -> List[Dict[str, Any]]:
    """Get menubar items from the active application using Accessibility API
    
    Args:
        active_app_pid: PID of the active application
        
    Returns:
        List of dictionaries with menubar item information
    """
    menubar_items = []
    
    if active_app_pid is None:
        # Get the frontmost application's PID if none provided
        frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
        if frontmost_app:
            active_app_pid = frontmost_app.processIdentifier()
        else:
            logger.error("Error: Could not determine frontmost application")
            return menubar_items
    
    # Create an accessibility element for the application
    app_element = AXUIElementCreateApplication(active_app_pid)
    if app_element is None:
        logger.error(f"Error: Could not create accessibility element for PID {active_app_pid}")
        return menubar_items
    
    # Get the menubar
    menubar = element_attribute(app_element, kAXMenuBarAttribute)
    if menubar is None:
        logger.error(f"Error: Could not get menubar for application with PID {active_app_pid}")
        return menubar_items
    
    # Get the menubar items
    children = element_attribute(menubar, kAXChildrenAttribute)
    if children is None:
        logger.error("Error: Could not get menubar items")
        return menubar_items
    
    # Process each menubar item
    for i in range(len(children)):
        item = children[i]
        
        # Get item title
        title = element_attribute(item, kAXTitleAttribute) or "Untitled"
        
        # Create bounding box
        bounds = {
            "x": 0,
            "y": 0,
            "width": 0,
            "height": 0
        }
        
        # Get item position
        position_value = element_attribute(item, kAXPositionAttribute)
        if position_value:
            position_value = element_value(position_value, kAXValueCGPointType)
            bounds["x"] = position_value.x
            bounds["y"] = position_value.y
        
        # Get item size
        size_value = element_attribute(item, kAXSizeAttribute)
        if size_value:
            size_value = element_value(size_value, kAXValueCGSizeType)
            bounds["width"] = size_value.width
            bounds["height"] = size_value.height
        
        
        # Add to list
        menubar_items.append({
            "title": title,
            "bounds": bounds,
            "index": i,
            "app_pid": active_app_pid
        })
    
    return menubar_items
    
@timing_decorator
def get_dock_items() -> List[Dict[str, Any]]:
    """Get all items in the macOS Dock
    
    Returns:
        List of dictionaries with Dock item information
    """
    dock_items = []
    
    # Find the Dock process
    dock_pid = None
    running_apps = get_running_apps()
    for app in running_apps:
        if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
            dock_pid = app.processIdentifier()
            break
            
    if dock_pid is None:
        logger.error("Error: Could not find Dock process")
        return dock_items
        
    # Create an accessibility element for the Dock
    dock_element = AXUIElementCreateApplication(dock_pid)
    if dock_element is None:
        logger.error(f"Error: Could not create accessibility element for Dock (PID {dock_pid})")
        return dock_items
        
    # Get the Dock's main element
    dock_list = element_attribute(dock_element, kAXChildrenAttribute)
    if dock_list is None or len(dock_list) == 0:
        logger.error("Error: Could not get Dock children")
        return dock_items
        
    # Find the Dock's application list (usually the first child)
    dock_app_list = None
    for child in dock_list:
        role = element_attribute(child, kAXRoleAttribute)
        if role == "AXList":
            dock_app_list = child
            break
            
    if dock_app_list is None:
        logger.error("Error: Could not find Dock application list")
        return dock_items
        
    # Get all items in the Dock
    items = element_attribute(dock_app_list, kAXChildrenAttribute)
    if items is None:
        logger.error("Error: Could not get Dock items")
        return dock_items
        
    # Process each Dock item
    for i, item in enumerate(items):
        # Get item attributes
        title = element_attribute(item, kAXTitleAttribute) or "Untitled"
        description = element_attribute(item, "AXDescription") or ""
        role = element_attribute(item, kAXRoleAttribute) or ""
        subrole = element_attribute(item, "AXSubrole") or ""
        
        # Create bounding box
        bounds = {
            "x": 0,
            "y": 0,
            "width": 0,
            "height": 0
        }
        
        # Get item position
        position_value = element_attribute(item, kAXPositionAttribute)
        if position_value:
            position_value = element_value(position_value, kAXValueCGPointType)
            bounds["x"] = position_value.x
            bounds["y"] = position_value.y
        
        # Get item size
        size_value = element_attribute(item, kAXSizeAttribute)
        if size_value:
            size_value = element_value(size_value, kAXValueCGSizeType)
            bounds["width"] = size_value.width
            bounds["height"] = size_value.height
            
        # Determine if this is an application, file/folder, or separator
        item_type = "unknown"
        if subrole == "AXApplicationDockItem":
            item_type = "application"
        elif subrole == "AXFolderDockItem":
            item_type = "folder"
        elif subrole == "AXDocumentDockItem":
            item_type = "document"
        elif subrole == "AXSeparatorDockItem" or role == "AXSeparator":
            item_type = "separator"
        elif "trash" in title.lower():
            item_type = "trash"
            
        # Add to list
        dock_items.append({
            "title": title,
            "description": description,
            "bounds": bounds,
            "index": i,
            "type": item_type,
            "role": role,
            "subrole": subrole
        })
        
    return dock_items

class AppActivationContext:
    def __init__(self, active_app_pid=None, active_app_to_use="", logger=None):
        self.active_app_pid = active_app_pid
        self.active_app_to_use = active_app_to_use
        self.logger = logger
        self.frontmost_app = None

    def __enter__(self):
        from AppKit import NSWorkspace
        if self.active_app_pid:
            if self.logger and self.active_app_to_use:
                self.logger.debug(f"Automatically activating app '{self.active_app_to_use}' for screenshot composition")
            self.frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
            running_apps_list = NSWorkspace.sharedWorkspace().runningApplications()
            for app in running_apps_list:
                if app.processIdentifier() == self.active_app_pid:
                    app.activateWithOptions_(0)
                    # sleep for 0.5 seconds
                    time.sleep(0.5)
                    break
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.frontmost_app:
            # sleep for 0.5 seconds
            time.sleep(0.5)
            self.frontmost_app.activateWithOptions_(0)
            

def get_frontmost_and_active_app(all_windows, running_apps, app_whitelist):
    from AppKit import NSWorkspace
    frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()

    active_app_to_use = None
    active_app_pid = None

    # Find the topmost (highest z_index) non-filtered app
    for window in reversed(all_windows):
        owner = window.get("owner")
        role = window.get("role")
        is_on_screen = window.get("is_on_screen")

        # Skip non-app windows
        if role != "app":
            continue

        # Skip not-on-screen windows
        if not is_on_screen:
            continue

        # Skip filtered apps
        if app_whitelist is not None and owner not in app_whitelist:
            continue

        # Found a suitable app
        active_app_to_use = owner
        active_app_pid = window.get("pid")
        break

    # If no suitable app found, use Finder
    if active_app_to_use is None:
        active_app_to_use = "Finder"
        for app in running_apps:
            if app.localizedName() == "Finder":
                active_app_pid = app.processIdentifier()
                break

    return frontmost_app, active_app_to_use, active_app_pid

def capture_all_apps(save_to_disk: bool = False, app_whitelist: List[str] = None, output_dir: str = None, take_focus: bool = True) -> Tuple[Dict[str, Any], Optional[Image.Image]]:
    """Capture screenshots of all running applications
    
    Args:
        save_to_disk: Whether to save screenshots to disk
        app_whitelist: Optional list of app names to include in the recomposited screenshot
                    (will always include 'Window Server' and 'Dock')
        
    Returns:
        Dictionary with application information and screenshots
        Optional PIL Image of the recomposited screenshot
    """
    result = {
        "timestamp": time.time(),
        "applications": [],
        "windows": [],  # New array to store all windows, including those without apps
        "menubar_items": [],  # New array to store menubar items
        "dock_items": []  # New array to store dock items
    }
    
    # Get all windows with z-order information
    all_windows = get_all_windows()
    
    # Get all running applications
    running_apps = get_running_apps()
    
    frontmost_app, active_app_to_use, active_app_pid = get_frontmost_and_active_app(all_windows, running_apps, app_whitelist) if take_focus else (None, None, None)
            
    # Use AppActivationContext to activate the app and restore focus
    with AppActivationContext(active_app_pid, active_app_to_use, logger):
        
        # Process applications
        for app in running_apps:
            # Skip system apps without a bundle ID
            if app.bundleIdentifier() is None:
                continue
                
            app_info = get_app_info(app)
            app_windows = get_app_windows(app.processIdentifier(), all_windows)
            
            app_data = {
                "info": app_info,
                "windows": [ window["id"] for window in app_windows ]
            }
            
            result["applications"].append(app_data)
        
        # Add all windows to the result
        result["windows"] = all_windows
        
        # Get menubar items from the active application
        menubar_items = get_menubar_items(active_app_pid)
        result["menubar_items"] = menubar_items
        
        # Get dock items
        dock_items = get_dock_items()
        result["dock_items"] = dock_items
        
        # Get menubar bounds
        menubar_bounds = get_menubar_bounds()
        result["menubar_bounds"] = menubar_bounds
        
        # Get dock bounds
        dock_bounds = get_dock_bounds()
        result["dock_bounds"] = dock_bounds
        
        # Capture the entire desktop using Quartz compositing
        desktop_screenshot, hitboxes = draw_desktop_screenshot(app_whitelist, all_windows, dock_bounds, dock_items, menubar_bounds, menubar_items)
        
        result["hitboxes"] = hitboxes
        
        from PIL import Image, ImageDraw, ImageChops
        def _draw_hitboxes(img, hitboxes, key="target"):
            """
            Overlay opaque colored rectangles for each hitbox (using hitbox[key])
            with color depending on index, then multiply overlay onto img.
            Args:
                img: PIL.Image (RGBA or RGB)
                hitboxes: list of dicts with 'hitbox' and 'target' keys
                key: 'hitbox' or 'target'
            Returns:
                PIL.Image with overlayed hitboxes (same mode/size as input)
            """
            # Ensure RGBA mode for blending
            base = img.convert("RGBA")
            overlay = Image.new("RGBA", base.size, (0, 0, 0, 0))
            draw = ImageDraw.Draw(overlay)

            # Distinct colors for order
            colors = [
                (255, 0, 0, 180),      # Red
                (0, 255, 0, 180),      # Green
                (0, 0, 255, 180),      # Blue
                (255, 255, 0, 180),    # Yellow
                (0, 255, 255, 180),    # Cyan
                (255, 0, 255, 180),    # Magenta
                (255, 128, 0, 180),    # Orange
                (128, 0, 255, 180),    # Purple
                (0, 128, 255, 180),    # Sky blue
                (128, 255, 0, 180),    # Lime
            ]
            # Set minimum brightness for colors
            min_brightness = 0
            colors = [
                (max(min_brightness, c[0]), max(min_brightness, c[1]), max(min_brightness, c[2]), c[3]) for c in colors
            ]
            
            for i, h in enumerate(hitboxes):
                rect = h.get(key)
                color = colors[i % len(colors)]
                if rect:
                    draw.rectangle(rect, fill=color)

            # Multiply blend overlay onto base
            result = ImageChops.multiply(base, overlay)
            return result

        # DEBUG: Save hitboxes to disk
        if desktop_screenshot and save_to_disk and output_dir:
            desktop_path = os.path.join(output_dir, "desktop.png")
            desktop_screenshot.save(desktop_path)
            result["desktop_screenshot"] = desktop_path
            
            logger.info(f"Saved desktop screenshot to {desktop_path}")

            if app_whitelist:
                # Take screenshot without whitelist
                desktop_screenshot_full, hitboxes_full = draw_desktop_screenshot(
                    None, all_windows, dock_bounds, dock_items, menubar_bounds, menubar_items)

                # Draw hitboxes on both images using overlay
                img1 = _draw_hitboxes(desktop_screenshot.copy(), hitboxes, key="hitbox")
                img2 = _draw_hitboxes(desktop_screenshot_full.copy(), hitboxes, key="target") if desktop_screenshot_full else None

                if img2 and hitboxes_full:

                    # Compose side-by-side
                    from PIL import Image
                    width = img1.width + img2.width
                    height = max(img1.height, img2.height)
                    combined = Image.new('RGBA', (width, height), (0, 0, 0, 0))
                    combined.paste(img1, (0, 0))
                    combined.paste(img2, (img1.width, 0))
                    side_by_side_path = os.path.join(output_dir, "side_by_side_hitboxes.png")
                    combined.save(side_by_side_path)
                    result["side_by_side_hitboxes"] = side_by_side_path
            else:
                # Overlay hitboxes using new function
                hitbox_img = _draw_hitboxes(desktop_screenshot.copy(), hitboxes, key="hitbox")
                hitbox_path = os.path.join(output_dir, "hitboxes.png")
                hitbox_img.save(hitbox_path)
                result["hitbox_screenshot"] = hitbox_path

        # Focus restoration is now handled by AppActivationContext
    
    return result, desktop_screenshot

async def run_capture():
    """Run the screenshot capture asynchronously"""
    # Parse command line arguments
    parser = argparse.ArgumentParser(description="Capture screenshots of running macOS applications")
    parser.add_argument("--output", "-o", help="Output directory for screenshots", default="app_screenshots")
    parser.add_argument("--filter", "-f", nargs="+", help="Filter recomposited screenshot to only include specified apps")
    parser.add_argument("--menubar", "-m", action="store_true", help="List menubar and status items with their bounding boxes")
    parser.add_argument("--dock", "-d", action="store_true", help="List Dock items with their bounding boxes")
    parser.add_argument("--demo", nargs="*", help="Demo mode: pass app names to capture individual and combinations, create mosaic PNG")
    args = parser.parse_args()
    
    # Create output directory in the current directory if not absolute
    if not os.path.isabs(args.output):
        output_dir = os.path.join(os.getcwd(), args.output)
    else:
        output_dir = args.output
    
    # DEMO MODE: capture each app and all non-empty combinations, then mosaic
    if args.demo:
        from PIL import Image
        demo_apps = args.demo
        print(f"Running in DEMO mode for apps: {demo_apps}")
        groups = []
        for item in demo_apps:
            if "/" in item:
                group = [x.strip() for x in item.split("/") if x.strip()]
            else:
                group = [item.strip()]
            if group:
                groups.append(group)
        screenshots = []
        for group in groups:
            print(f"Capturing for apps: {group}")
            _, img = capture_all_apps(app_whitelist=group)
            if img:
                screenshots.append((group, img))
        if not screenshots:
            print("No screenshots captured in demo mode.")
            return
        # Mosaic-pack: grid (rows of sqrt(N))
        def make_mosaic(images, pad=64, bg=(30,30,30)):
            import rpack
            sizes = [(img.width + pad, img.height + pad) for _, img in images]
            positions = rpack.pack(sizes)
            # Find the bounding box for the mosaic
            max_x = max(x + w for (x, y), (w, h) in zip(positions, sizes))
            max_y = max(y + h for (x, y), (w, h) in zip(positions, sizes))
            mosaic = Image.new("RGBA", (max_x, max_y), bg)
            for (group, img), (x, y) in zip(images, positions):
                mosaic.paste(img, (x, y))
            return mosaic
        mosaic_img = make_mosaic(screenshots)
        mosaic_path = os.path.join(output_dir, "demo_mosaic.png")
        os.makedirs(output_dir, exist_ok=True)
        mosaic_img.save(mosaic_path)
        print(f"Demo mosaic saved to: {mosaic_path}")
        return

    # Capture all apps and save to disk, including a recomposited screenshot
    print(f"Capturing screenshots of all running applications...")
    print(f"Saving screenshots to: {output_dir}")
    
    # If filter is provided, show what we're filtering by
    if args.filter:
        print(f"Filtering recomposited screenshot to only include: {', '.join(args.filter)} (plus Window Server and Dock)")
    
    result, img = capture_all_apps(
        save_to_disk=True, 
        app_whitelist=args.filter,
        output_dir=output_dir,
        take_focus=True
    )
    
    # Print summary
    print(f"\nCapture complete!")
    print(f"Captured {len(result['applications'])} applications")
    
    total_app_windows = sum(len(app["windows"]) for app in result["applications"])
    print(f"Total application windows captured: {total_app_windows}")
    print(f"Total standalone windows captured: {len(result['windows'])}")
    
    # Print details of each application
    print("\nApplication details:")
    for app in result["applications"]:
        app_info = app["info"]
        windows = app["windows"]
        print(f"  - {app_info['name']} ({len(windows)} windows)")
    
    # Print recomposited screenshot path if available
    if "desktop_screenshot" in result:
        print(f"\nRecomposited screenshot saved to: {result['desktop_screenshot']}")
    
    # Print menubar items if requested
    if args.menubar and "menubar_items" in result:
        print("\nMenubar items:")
        
        # Find app name for the PID
        app_name_by_pid = {}
        for app in result["applications"]:
            app_info = app["info"]
            app_name_by_pid[app_info["pid"]] = app_info["name"]
            
        for item in result["menubar_items"]:
            print(f"  - {item['title']}")
            print(f"    Bounds: x={item['bounds']['x']}, y={item['bounds']['y']}, width={item['bounds']['width']}, height={item['bounds']['height']}")
            
            if "app_pid" in item:
                app_name = app_name_by_pid.get(item["app_pid"], f"Unknown App (PID: {item['app_pid']})")
                print(f"    App: {app_name} (PID: {item['app_pid']})")
                
            if "window_id" in item:
                print(f"    Window ID: {item['window_id']}")
            if "owner" in item:
                print(f"    Owner: {item['owner']}")
            if "layer" in item and "z_index" in item:
                print(f"    Layer: {item['layer']}, Z-Index: {item['z_index']}")
            print("")
    
    # Print dock items if requested
    if args.dock and "dock_items" in result:
        print("\nDock items:")
        for item in result["dock_items"]:
            print(f"  - {item['title']} ({item['type']})")
            print(f"    Description: {item['description']}")
            print(f"    Bounds: x={item['bounds']['x']}, y={item['bounds']['y']}, width={item['bounds']['width']}, height={item['bounds']['height']}")
            print(f"    Role: {item['role']}, Subrole: {item['subrole']}")
            print(f"    Index: {item['index']}")
            print("")
    
    # Save the metadata to a JSON file
    metadata_path = os.path.join(output_dir, "metadata.json")
    with open(metadata_path, "w") as f:
        json.dump(result, f, indent=2)
    
    print(f"\nMetadata saved to: {metadata_path}")

if __name__ == "__main__":
    asyncio.run(run_capture())
```
Page 14/16FirstPrevNextLast