This is page 14 of 16. Use http://codebase.md/trycua/cua?page={x} to view the full context. # Directory Structure ``` ├── .all-contributorsrc ├── .cursorignore ├── .devcontainer │ ├── devcontainer.json │ ├── post-install.sh │ └── README.md ├── .dockerignore ├── .gitattributes ├── .github │ ├── FUNDING.yml │ ├── scripts │ │ ├── get_pyproject_version.py │ │ └── tests │ │ ├── __init__.py │ │ ├── README.md │ │ └── test_get_pyproject_version.py │ └── workflows │ ├── ci-lume.yml │ ├── docker-publish-kasm.yml │ ├── docker-publish-xfce.yml │ ├── docker-reusable-publish.yml │ ├── npm-publish-computer.yml │ ├── npm-publish-core.yml │ ├── publish-lume.yml │ ├── pypi-publish-agent.yml │ ├── pypi-publish-computer-server.yml │ ├── pypi-publish-computer.yml │ ├── pypi-publish-core.yml │ ├── pypi-publish-mcp-server.yml │ ├── pypi-publish-pylume.yml │ ├── pypi-publish-som.yml │ ├── pypi-reusable-publish.yml │ └── test-validation-script.yml ├── .gitignore ├── .vscode │ ├── docs.code-workspace │ ├── launch.json │ ├── libs-ts.code-workspace │ ├── lume.code-workspace │ ├── lumier.code-workspace │ ├── py.code-workspace │ └── settings.json ├── blog │ ├── app-use.md │ ├── assets │ │ ├── composite-agents.png │ │ ├── docker-ubuntu-support.png │ │ ├── hack-booth.png │ │ ├── hack-closing-ceremony.jpg │ │ ├── hack-cua-ollama-hud.jpeg │ │ ├── hack-leaderboard.png │ │ ├── hack-the-north.png │ │ ├── hack-winners.jpeg │ │ ├── hack-workshop.jpeg │ │ ├── hud-agent-evals.png │ │ └── trajectory-viewer.jpeg │ ├── bringing-computer-use-to-the-web.md │ ├── build-your-own-operator-on-macos-1.md │ ├── build-your-own-operator-on-macos-2.md │ ├── composite-agents.md │ ├── cua-hackathon.md │ ├── hack-the-north.md │ ├── hud-agent-evals.md │ ├── human-in-the-loop.md │ ├── introducing-cua-cloud-containers.md │ ├── lume-to-containerization.md │ ├── sandboxed-python-execution.md │ ├── training-computer-use-models-trajectories-1.md │ ├── trajectory-viewer.md │ ├── ubuntu-docker-support.md │ └── windows-sandbox.md ├── CONTRIBUTING.md ├── Development.md ├── Dockerfile ├── docs │ ├── .gitignore │ ├── .prettierrc │ ├── content │ │ └── docs │ │ ├── agent-sdk │ │ │ ├── agent-loops.mdx │ │ │ ├── benchmarks │ │ │ │ ├── index.mdx │ │ │ │ ├── interactive.mdx │ │ │ │ ├── introduction.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── osworld-verified.mdx │ │ │ │ ├── screenspot-pro.mdx │ │ │ │ └── screenspot-v2.mdx │ │ │ ├── callbacks │ │ │ │ ├── agent-lifecycle.mdx │ │ │ │ ├── cost-saving.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── logging.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── pii-anonymization.mdx │ │ │ │ └── trajectories.mdx │ │ │ ├── chat-history.mdx │ │ │ ├── custom-computer-handlers.mdx │ │ │ ├── custom-tools.mdx │ │ │ ├── customizing-computeragent.mdx │ │ │ ├── integrations │ │ │ │ ├── hud.mdx │ │ │ │ └── meta.json │ │ │ ├── message-format.mdx │ │ │ ├── meta.json │ │ │ ├── migration-guide.mdx │ │ │ ├── prompt-caching.mdx │ │ │ ├── supported-agents │ │ │ │ ├── composed-agents.mdx │ │ │ │ ├── computer-use-agents.mdx │ │ │ │ ├── grounding-models.mdx │ │ │ │ ├── human-in-the-loop.mdx │ │ │ │ └── meta.json │ │ │ ├── supported-model-providers │ │ │ │ ├── index.mdx │ │ │ │ └── local-models.mdx │ │ │ └── usage-tracking.mdx │ │ ├── computer-sdk │ │ │ ├── cloud-vm-management.mdx │ │ │ ├── commands.mdx │ │ │ ├── computer-ui.mdx │ │ │ ├── computers.mdx │ │ │ ├── meta.json │ │ │ └── sandboxed-python.mdx │ │ ├── index.mdx │ │ ├── libraries │ │ │ ├── agent │ │ │ │ └── index.mdx │ │ │ ├── computer │ │ │ │ └── index.mdx │ │ │ ├── computer-server │ │ │ │ ├── Commands.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── REST-API.mdx │ │ │ │ └── WebSocket-API.mdx │ │ │ ├── core │ │ │ │ └── index.mdx │ │ │ ├── lume │ │ │ │ ├── cli-reference.mdx │ │ │ │ ├── faq.md │ │ │ │ ├── http-api.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── meta.json │ │ │ │ └── prebuilt-images.mdx │ │ │ ├── lumier │ │ │ │ ├── building-lumier.mdx │ │ │ │ ├── docker-compose.mdx │ │ │ │ ├── docker.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ └── meta.json │ │ │ ├── mcp-server │ │ │ │ ├── client-integrations.mdx │ │ │ │ ├── configuration.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── llm-integrations.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── tools.mdx │ │ │ │ └── usage.mdx │ │ │ └── som │ │ │ ├── configuration.mdx │ │ │ └── index.mdx │ │ ├── meta.json │ │ ├── quickstart-cli.mdx │ │ ├── quickstart-devs.mdx │ │ └── telemetry.mdx │ ├── next.config.mjs │ ├── package-lock.json │ ├── package.json │ ├── pnpm-lock.yaml │ ├── postcss.config.mjs │ ├── public │ │ └── img │ │ ├── agent_gradio_ui.png │ │ ├── agent.png │ │ ├── cli.png │ │ ├── computer.png │ │ ├── som_box_threshold.png │ │ └── som_iou_threshold.png │ ├── README.md │ ├── source.config.ts │ ├── src │ │ ├── app │ │ │ ├── (home) │ │ │ │ ├── [[...slug]] │ │ │ │ │ └── page.tsx │ │ │ │ └── layout.tsx │ │ │ ├── api │ │ │ │ └── search │ │ │ │ └── route.ts │ │ │ ├── favicon.ico │ │ │ ├── global.css │ │ │ ├── layout.config.tsx │ │ │ ├── layout.tsx │ │ │ ├── llms.mdx │ │ │ │ └── [[...slug]] │ │ │ │ └── route.ts │ │ │ └── llms.txt │ │ │ └── route.ts │ │ ├── assets │ │ │ ├── discord-black.svg │ │ │ ├── discord-white.svg │ │ │ ├── logo-black.svg │ │ │ └── logo-white.svg │ │ ├── components │ │ │ ├── iou.tsx │ │ │ └── mermaid.tsx │ │ ├── lib │ │ │ ├── llms.ts │ │ │ └── source.ts │ │ └── mdx-components.tsx │ └── tsconfig.json ├── examples │ ├── agent_examples.py │ ├── agent_ui_examples.py │ ├── cloud_api_examples.py │ ├── computer_examples_windows.py │ ├── computer_examples.py │ ├── computer_ui_examples.py │ ├── computer-example-ts │ │ ├── .env.example │ │ ├── .gitignore │ │ ├── .prettierrc │ │ ├── package-lock.json │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── README.md │ │ ├── src │ │ │ ├── helpers.ts │ │ │ └── index.ts │ │ └── tsconfig.json │ ├── docker_examples.py │ ├── evals │ │ ├── hud_eval_examples.py │ │ └── wikipedia_most_linked.txt │ ├── pylume_examples.py │ ├── sandboxed_functions_examples.py │ ├── som_examples.py │ ├── utils.py │ └── winsandbox_example.py ├── img │ ├── agent_gradio_ui.png │ ├── agent.png │ ├── cli.png │ ├── computer.png │ ├── logo_black.png │ └── logo_white.png ├── libs │ ├── kasm │ │ ├── Dockerfile │ │ ├── LICENSE │ │ ├── README.md │ │ └── src │ │ └── ubuntu │ │ └── install │ │ └── firefox │ │ ├── custom_startup.sh │ │ ├── firefox.desktop │ │ └── install_firefox.sh │ ├── lume │ │ ├── .cursorignore │ │ ├── CONTRIBUTING.md │ │ ├── Development.md │ │ ├── img │ │ │ └── cli.png │ │ ├── Package.resolved │ │ ├── Package.swift │ │ ├── README.md │ │ ├── resources │ │ │ └── lume.entitlements │ │ ├── scripts │ │ │ ├── build │ │ │ │ ├── build-debug.sh │ │ │ │ ├── build-release-notarized.sh │ │ │ │ └── build-release.sh │ │ │ └── install.sh │ │ ├── src │ │ │ ├── Commands │ │ │ │ ├── Clone.swift │ │ │ │ ├── Config.swift │ │ │ │ ├── Create.swift │ │ │ │ ├── Delete.swift │ │ │ │ ├── Get.swift │ │ │ │ ├── Images.swift │ │ │ │ ├── IPSW.swift │ │ │ │ ├── List.swift │ │ │ │ ├── Logs.swift │ │ │ │ ├── Options │ │ │ │ │ └── FormatOption.swift │ │ │ │ ├── Prune.swift │ │ │ │ ├── Pull.swift │ │ │ │ ├── Push.swift │ │ │ │ ├── Run.swift │ │ │ │ ├── Serve.swift │ │ │ │ ├── Set.swift │ │ │ │ └── Stop.swift │ │ │ ├── ContainerRegistry │ │ │ │ ├── ImageContainerRegistry.swift │ │ │ │ ├── ImageList.swift │ │ │ │ └── ImagesPrinter.swift │ │ │ ├── Errors │ │ │ │ └── Errors.swift │ │ │ ├── FileSystem │ │ │ │ ├── Home.swift │ │ │ │ ├── Settings.swift │ │ │ │ ├── VMConfig.swift │ │ │ │ ├── VMDirectory.swift │ │ │ │ └── VMLocation.swift │ │ │ ├── LumeController.swift │ │ │ ├── Main.swift │ │ │ ├── Server │ │ │ │ ├── Handlers.swift │ │ │ │ ├── HTTP.swift │ │ │ │ ├── Requests.swift │ │ │ │ ├── Responses.swift │ │ │ │ └── Server.swift │ │ │ ├── Utils │ │ │ │ ├── CommandRegistry.swift │ │ │ │ ├── CommandUtils.swift │ │ │ │ ├── Logger.swift │ │ │ │ ├── NetworkUtils.swift │ │ │ │ ├── Path.swift │ │ │ │ ├── ProcessRunner.swift │ │ │ │ ├── ProgressLogger.swift │ │ │ │ ├── String.swift │ │ │ │ └── Utils.swift │ │ │ ├── Virtualization │ │ │ │ ├── DarwinImageLoader.swift │ │ │ │ ├── DHCPLeaseParser.swift │ │ │ │ ├── ImageLoaderFactory.swift │ │ │ │ └── VMVirtualizationService.swift │ │ │ ├── VM │ │ │ │ ├── DarwinVM.swift │ │ │ │ ├── LinuxVM.swift │ │ │ │ ├── VM.swift │ │ │ │ ├── VMDetails.swift │ │ │ │ ├── VMDetailsPrinter.swift │ │ │ │ ├── VMDisplayResolution.swift │ │ │ │ └── VMFactory.swift │ │ │ └── VNC │ │ │ ├── PassphraseGenerator.swift │ │ │ └── VNCService.swift │ │ └── tests │ │ ├── Mocks │ │ │ ├── MockVM.swift │ │ │ ├── MockVMVirtualizationService.swift │ │ │ └── MockVNCService.swift │ │ ├── VM │ │ │ └── VMDetailsPrinterTests.swift │ │ ├── VMTests.swift │ │ ├── VMVirtualizationServiceTests.swift │ │ └── VNCServiceTests.swift │ ├── lumier │ │ ├── .dockerignore │ │ ├── Dockerfile │ │ ├── README.md │ │ └── src │ │ ├── bin │ │ │ └── entry.sh │ │ ├── config │ │ │ └── constants.sh │ │ ├── hooks │ │ │ └── on-logon.sh │ │ └── lib │ │ ├── utils.sh │ │ └── vm.sh │ ├── python │ │ ├── agent │ │ │ ├── .bumpversion.cfg │ │ │ ├── agent │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── adapters │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── huggingfacelocal_adapter.py │ │ │ │ │ ├── human_adapter.py │ │ │ │ │ ├── mlxvlm_adapter.py │ │ │ │ │ └── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── qwen2_5_vl.py │ │ │ │ ├── agent.py │ │ │ │ ├── callbacks │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── budget_manager.py │ │ │ │ │ ├── image_retention.py │ │ │ │ │ ├── logging.py │ │ │ │ │ ├── operator_validator.py │ │ │ │ │ ├── pii_anonymization.py │ │ │ │ │ ├── prompt_instructions.py │ │ │ │ │ ├── telemetry.py │ │ │ │ │ └── trajectory_saver.py │ │ │ │ ├── cli.py │ │ │ │ ├── computers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cua.py │ │ │ │ │ └── custom.py │ │ │ │ ├── decorators.py │ │ │ │ ├── human_tool │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ ├── server.py │ │ │ │ │ └── ui.py │ │ │ │ ├── integrations │ │ │ │ │ └── hud │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── agent.py │ │ │ │ │ └── proxy.py │ │ │ │ ├── loops │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── anthropic.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── composed_grounded.py │ │ │ │ │ ├── gemini.py │ │ │ │ │ ├── glm45v.py │ │ │ │ │ ├── gta1.py │ │ │ │ │ ├── holo.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── model_types.csv │ │ │ │ │ ├── moondream3.py │ │ │ │ │ ├── omniparser.py │ │ │ │ │ ├── openai.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── uitars.py │ │ │ │ ├── proxy │ │ │ │ │ ├── examples.py │ │ │ │ │ └── handlers.py │ │ │ │ ├── responses.py │ │ │ │ ├── types.py │ │ │ │ └── ui │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ └── gradio │ │ │ │ ├── __init__.py │ │ │ │ ├── app.py │ │ │ │ └── ui_components.py │ │ │ ├── benchmarks │ │ │ │ ├── .gitignore │ │ │ │ ├── contrib.md │ │ │ │ ├── interactive.py │ │ │ │ ├── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ └── gta1.py │ │ │ │ ├── README.md │ │ │ │ ├── ss-pro.py │ │ │ │ ├── ss-v2.py │ │ │ │ └── utils.py │ │ │ ├── example.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer │ │ │ │ ├── __init__.py │ │ │ │ ├── computer.py │ │ │ │ ├── diorama_computer.py │ │ │ │ ├── helpers.py │ │ │ │ ├── interface │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ ├── models.py │ │ │ │ │ └── windows.py │ │ │ │ ├── logger.py │ │ │ │ ├── models.py │ │ │ │ ├── providers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cloud │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── docker │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── lume │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── lume_api.py │ │ │ │ │ ├── lumier │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── types.py │ │ │ │ │ └── winsandbox │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── provider.py │ │ │ │ │ └── setup_script.ps1 │ │ │ │ ├── ui │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ └── gradio │ │ │ │ │ ├── __init__.py │ │ │ │ │ └── app.py │ │ │ │ └── utils.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── cli.py │ │ │ │ ├── diorama │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── diorama_computer.py │ │ │ │ │ ├── diorama.py │ │ │ │ │ ├── draw.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── safezone.py │ │ │ │ ├── handlers │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── windows.py │ │ │ │ ├── main.py │ │ │ │ ├── server.py │ │ │ │ └── watchdog.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── usage_example.py │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ ├── run_server.py │ │ │ └── test_connection.py │ │ ├── core │ │ │ ├── .bumpversion.cfg │ │ │ ├── core │ │ │ │ ├── __init__.py │ │ │ │ └── telemetry │ │ │ │ ├── __init__.py │ │ │ │ └── posthog.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── mcp-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── CONCURRENT_SESSIONS.md │ │ │ ├── mcp_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── server.py │ │ │ │ └── session_manager.py │ │ │ ├── pdm.lock │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ └── scripts │ │ │ ├── install_mcp_server.sh │ │ │ └── start_mcp_server.sh │ │ ├── pylume │ │ │ ├── __init__.py │ │ │ ├── .bumpversion.cfg │ │ │ ├── pylume │ │ │ │ ├── __init__.py │ │ │ │ ├── client.py │ │ │ │ ├── exceptions.py │ │ │ │ ├── lume │ │ │ │ ├── models.py │ │ │ │ ├── pylume.py │ │ │ │ └── server.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ └── som │ │ ├── .bumpversion.cfg │ │ ├── LICENSE │ │ ├── poetry.toml │ │ ├── pyproject.toml │ │ ├── README.md │ │ ├── som │ │ │ ├── __init__.py │ │ │ ├── detect.py │ │ │ ├── detection.py │ │ │ ├── models.py │ │ │ ├── ocr.py │ │ │ ├── util │ │ │ │ └── utils.py │ │ │ └── visualization.py │ │ └── tests │ │ └── test_omniparser.py │ ├── typescript │ │ ├── .gitignore │ │ ├── .nvmrc │ │ ├── agent │ │ │ ├── examples │ │ │ │ ├── playground-example.html │ │ │ │ └── README.md │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── client.ts │ │ │ │ ├── index.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ └── client.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── biome.json │ │ ├── computer │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── computer │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── providers │ │ │ │ │ │ ├── base.ts │ │ │ │ │ │ ├── cloud.ts │ │ │ │ │ │ └── index.ts │ │ │ │ │ └── types.ts │ │ │ │ ├── index.ts │ │ │ │ ├── interface │ │ │ │ │ ├── base.ts │ │ │ │ │ ├── factory.ts │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── linux.ts │ │ │ │ │ ├── macos.ts │ │ │ │ │ └── windows.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ ├── computer │ │ │ │ │ └── cloud.test.ts │ │ │ │ ├── interface │ │ │ │ │ ├── factory.test.ts │ │ │ │ │ ├── index.test.ts │ │ │ │ │ ├── linux.test.ts │ │ │ │ │ ├── macos.test.ts │ │ │ │ │ └── windows.test.ts │ │ │ │ └── setup.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── core │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── index.ts │ │ │ │ └── telemetry │ │ │ │ ├── clients │ │ │ │ │ ├── index.ts │ │ │ │ │ └── posthog.ts │ │ │ │ └── index.ts │ │ │ ├── tests │ │ │ │ └── telemetry.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── pnpm-workspace.yaml │ │ └── README.md │ └── xfce │ ├── .dockerignore │ ├── .gitignore │ ├── Dockerfile │ ├── README.md │ └── src │ ├── scripts │ │ ├── resize-display.sh │ │ ├── start-computer-server.sh │ │ ├── start-novnc.sh │ │ ├── start-vnc.sh │ │ └── xstartup.sh │ ├── supervisor │ │ └── supervisord.conf │ └── xfce-config │ ├── helpers.rc │ ├── xfce4-power-manager.xml │ └── xfce4-session.xml ├── LICENSE.md ├── Makefile ├── notebooks │ ├── agent_nb.ipynb │ ├── blog │ │ ├── build-your-own-operator-on-macos-1.ipynb │ │ └── build-your-own-operator-on-macos-2.ipynb │ ├── composite_agents_docker_nb.ipynb │ ├── computer_nb.ipynb │ ├── computer_server_nb.ipynb │ ├── customizing_computeragent.ipynb │ ├── eval_osworld.ipynb │ ├── ollama_nb.ipynb │ ├── pylume_nb.ipynb │ ├── README.md │ ├── sota_hackathon_cloud.ipynb │ └── sota_hackathon.ipynb ├── pdm.lock ├── pyproject.toml ├── pyrightconfig.json ├── README.md ├── samples │ └── community │ ├── global-online │ │ └── README.md │ └── hack-the-north │ └── README.md ├── scripts │ ├── build-uv.sh │ ├── build.ps1 │ ├── build.sh │ ├── cleanup.sh │ ├── playground-docker.sh │ ├── playground.sh │ └── run-docker-dev.sh └── tests ├── pytest.ini ├── shell_cmd.py ├── test_files.py ├── test_mcp_server_session_management.py ├── test_mcp_server_streaming.py ├── test_shell_bash.py ├── test_telemetry.py ├── test_venv.py └── test_watchdog.py ``` # Files -------------------------------------------------------------------------------- /libs/python/computer/computer/interface/generic.py: -------------------------------------------------------------------------------- ```python import asyncio import json import time from typing import Any, Dict, List, Optional, Tuple from PIL import Image import websockets import aiohttp from ..logger import Logger, LogLevel from .base import BaseComputerInterface from ..utils import decode_base64_image, encode_base64_image, bytes_to_image, draw_box, resize_image from .models import Key, KeyType, MouseButton, CommandResult class GenericComputerInterface(BaseComputerInterface): """Generic interface with common functionality for all supported platforms (Windows, Linux, macOS).""" def __init__(self, ip_address: str, username: str = "lume", password: str = "lume", api_key: Optional[str] = None, vm_name: Optional[str] = None, logger_name: str = "computer.interface.generic"): super().__init__(ip_address, username, password, api_key, vm_name) self._ws = None self._reconnect_task = None self._closed = False self._last_ping = 0 self._ping_interval = 5 # Send ping every 5 seconds self._ping_timeout = 120 # Wait 120 seconds for pong response self._reconnect_delay = 1 # Start with 1 second delay self._max_reconnect_delay = 30 # Maximum delay between reconnection attempts self._log_connection_attempts = True # Flag to control connection attempt logging self._authenticated = False # Track authentication status self._recv_lock = asyncio.Lock() # Lock to ensure only one recv at a time # Set logger name for the interface self.logger = Logger(logger_name, LogLevel.NORMAL) # Optional default delay time between commands (in seconds) self.delay = 0.0 async def _handle_delay(self, delay: Optional[float] = None): """Handle delay between commands using async sleep. Args: delay: Optional delay in seconds. If None, uses self.delay. """ if delay is not None: if isinstance(delay, float) or isinstance(delay, int) and delay > 0: await asyncio.sleep(delay) elif isinstance(self.delay, float) or isinstance(self.delay, int) and self.delay > 0: await asyncio.sleep(self.delay) @property def ws_uri(self) -> str: """Get the WebSocket URI using the current IP address. Returns: WebSocket URI for the Computer API Server """ protocol = "wss" if self.api_key else "ws" port = "8443" if self.api_key else "8000" return f"{protocol}://{self.ip_address}:{port}/ws" @property def rest_uri(self) -> str: """Get the REST URI using the current IP address. Returns: REST URI for the Computer API Server """ protocol = "https" if self.api_key else "http" port = "8443" if self.api_key else "8000" return f"{protocol}://{self.ip_address}:{port}/cmd" # Mouse actions async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left", delay: Optional[float] = None) -> None: await self._send_command("mouse_down", {"x": x, "y": y, "button": button}) await self._handle_delay(delay) async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left", delay: Optional[float] = None) -> None: await self._send_command("mouse_up", {"x": x, "y": y, "button": button}) await self._handle_delay(delay) async def left_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None: await self._send_command("left_click", {"x": x, "y": y}) await self._handle_delay(delay) async def right_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None: await self._send_command("right_click", {"x": x, "y": y}) await self._handle_delay(delay) async def double_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None: await self._send_command("double_click", {"x": x, "y": y}) await self._handle_delay(delay) async def move_cursor(self, x: int, y: int, delay: Optional[float] = None) -> None: await self._send_command("move_cursor", {"x": x, "y": y}) await self._handle_delay(delay) async def drag_to(self, x: int, y: int, button: "MouseButton" = "left", duration: float = 0.5, delay: Optional[float] = None) -> None: await self._send_command( "drag_to", {"x": x, "y": y, "button": button, "duration": duration} ) await self._handle_delay(delay) async def drag(self, path: List[Tuple[int, int]], button: "MouseButton" = "left", duration: float = 0.5, delay: Optional[float] = None) -> None: await self._send_command( "drag", {"path": path, "button": button, "duration": duration} ) await self._handle_delay(delay) # Keyboard Actions async def key_down(self, key: "KeyType", delay: Optional[float] = None) -> None: await self._send_command("key_down", {"key": key}) await self._handle_delay(delay) async def key_up(self, key: "KeyType", delay: Optional[float] = None) -> None: await self._send_command("key_up", {"key": key}) await self._handle_delay(delay) async def type_text(self, text: str, delay: Optional[float] = None) -> None: # Temporary fix for https://github.com/trycua/cua/issues/165 # Check if text contains Unicode characters if any(ord(char) > 127 for char in text): # For Unicode text, use clipboard and paste await self.set_clipboard(text) await self.hotkey(Key.COMMAND, 'v') else: # For ASCII text, use the regular typing method await self._send_command("type_text", {"text": text}) await self._handle_delay(delay) async def press(self, key: "KeyType", delay: Optional[float] = None) -> None: """Press a single key. Args: key: The key to press. Can be any of: - A Key enum value (recommended), e.g. Key.PAGE_DOWN - A direct key value string, e.g. 'pagedown' - A single character string, e.g. 'a' Examples: ```python # Using enum (recommended) await interface.press(Key.PAGE_DOWN) await interface.press(Key.ENTER) # Using direct values await interface.press('pagedown') await interface.press('enter') # Using single characters await interface.press('a') ``` Raises: ValueError: If the key type is invalid or the key is not recognized """ if isinstance(key, Key): actual_key = key.value elif isinstance(key, str): # Try to convert to enum if it matches a known key key_or_enum = Key.from_string(key) actual_key = key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum else: raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.") await self._send_command("press_key", {"key": actual_key}) await self._handle_delay(delay) async def press_key(self, key: "KeyType", delay: Optional[float] = None) -> None: """DEPRECATED: Use press() instead. This method is kept for backward compatibility but will be removed in a future version. Please use the press() method instead. """ await self.press(key, delay) async def hotkey(self, *keys: "KeyType", delay: Optional[float] = None) -> None: """Press multiple keys simultaneously. Args: *keys: Multiple keys to press simultaneously. Each key can be any of: - A Key enum value (recommended), e.g. Key.COMMAND - A direct key value string, e.g. 'command' - A single character string, e.g. 'a' Examples: ```python # Using enums (recommended) await interface.hotkey(Key.COMMAND, Key.C) # Copy await interface.hotkey(Key.COMMAND, Key.V) # Paste # Using mixed formats await interface.hotkey(Key.COMMAND, 'a') # Select all ``` Raises: ValueError: If any key type is invalid or not recognized """ actual_keys = [] for key in keys: if isinstance(key, Key): actual_keys.append(key.value) elif isinstance(key, str): # Try to convert to enum if it matches a known key key_or_enum = Key.from_string(key) actual_keys.append(key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum) else: raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.") await self._send_command("hotkey", {"keys": actual_keys}) await self._handle_delay(delay) # Scrolling Actions async def scroll(self, x: int, y: int, delay: Optional[float] = None) -> None: await self._send_command("scroll", {"x": x, "y": y}) await self._handle_delay(delay) async def scroll_down(self, clicks: int = 1, delay: Optional[float] = None) -> None: await self._send_command("scroll_down", {"clicks": clicks}) await self._handle_delay(delay) async def scroll_up(self, clicks: int = 1, delay: Optional[float] = None) -> None: await self._send_command("scroll_up", {"clicks": clicks}) await self._handle_delay(delay) # Screen actions async def screenshot( self, boxes: Optional[List[Tuple[int, int, int, int]]] = None, box_color: str = "#FF0000", box_thickness: int = 2, scale_factor: float = 1.0, ) -> bytes: """Take a screenshot with optional box drawing and scaling. Args: boxes: Optional list of (x, y, width, height) tuples defining boxes to draw in screen coordinates box_color: Color of the boxes in hex format (default: "#FF0000" red) box_thickness: Thickness of the box borders in pixels (default: 2) scale_factor: Factor to scale the final image by (default: 1.0) Use > 1.0 to enlarge, < 1.0 to shrink (e.g., 0.5 for half size, 2.0 for double) Returns: bytes: The screenshot image data, optionally with boxes drawn on it and scaled """ result = await self._send_command("screenshot") if not result.get("image_data"): raise RuntimeError("Failed to take screenshot, no image data received from server") screenshot = decode_base64_image(result["image_data"]) if boxes: # Get the natural scaling between screen and screenshot screen_size = await self.get_screen_size() screenshot_width, screenshot_height = bytes_to_image(screenshot).size width_scale = screenshot_width / screen_size["width"] height_scale = screenshot_height / screen_size["height"] # Scale box coordinates from screen space to screenshot space for box in boxes: scaled_box = ( int(box[0] * width_scale), # x int(box[1] * height_scale), # y int(box[2] * width_scale), # width int(box[3] * height_scale), # height ) screenshot = draw_box( screenshot, x=scaled_box[0], y=scaled_box[1], width=scaled_box[2], height=scaled_box[3], color=box_color, thickness=box_thickness, ) if scale_factor != 1.0: screenshot = resize_image(screenshot, scale_factor) return screenshot async def get_screen_size(self) -> Dict[str, int]: result = await self._send_command("get_screen_size") if result["success"] and result["size"]: return result["size"] raise RuntimeError("Failed to get screen size") async def get_cursor_position(self) -> Dict[str, int]: result = await self._send_command("get_cursor_position") if result["success"] and result["position"]: return result["position"] raise RuntimeError("Failed to get cursor position") # Clipboard Actions async def copy_to_clipboard(self) -> str: result = await self._send_command("copy_to_clipboard") if result["success"] and result["content"]: return result["content"] raise RuntimeError("Failed to get clipboard content") async def set_clipboard(self, text: str) -> None: await self._send_command("set_clipboard", {"text": text}) # File Operations async def _write_bytes_chunked(self, path: str, content: bytes, append: bool = False, chunk_size: int = 1024 * 1024) -> None: """Write large files in chunks to avoid memory issues.""" total_size = len(content) current_offset = 0 while current_offset < total_size: chunk_end = min(current_offset + chunk_size, total_size) chunk_data = content[current_offset:chunk_end] # First chunk uses the original append flag, subsequent chunks always append chunk_append = append if current_offset == 0 else True result = await self._send_command("write_bytes", { "path": path, "content_b64": encode_base64_image(chunk_data), "append": chunk_append }) if not result.get("success", False): raise RuntimeError(result.get("error", "Failed to write file chunk")) current_offset = chunk_end async def write_bytes(self, path: str, content: bytes, append: bool = False) -> None: # For large files, use chunked writing if len(content) > 5 * 1024 * 1024: # 5MB threshold await self._write_bytes_chunked(path, content, append) return result = await self._send_command("write_bytes", {"path": path, "content_b64": encode_base64_image(content), "append": append}) if not result.get("success", False): raise RuntimeError(result.get("error", "Failed to write file")) async def _read_bytes_chunked(self, path: str, offset: int, total_length: int, chunk_size: int = 1024 * 1024) -> bytes: """Read large files in chunks to avoid memory issues.""" chunks = [] current_offset = offset remaining = total_length while remaining > 0: read_size = min(chunk_size, remaining) result = await self._send_command("read_bytes", { "path": path, "offset": current_offset, "length": read_size }) if not result.get("success", False): raise RuntimeError(result.get("error", "Failed to read file chunk")) content_b64 = result.get("content_b64", "") chunk_data = decode_base64_image(content_b64) chunks.append(chunk_data) current_offset += read_size remaining -= read_size return b''.join(chunks) async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> bytes: # For large files, use chunked reading if length is None: # Get file size first to determine if we need chunking file_size = await self.get_file_size(path) # If file is larger than 5MB, read in chunks if file_size > 5 * 1024 * 1024: # 5MB threshold return await self._read_bytes_chunked(path, offset, file_size - offset if offset > 0 else file_size) result = await self._send_command("read_bytes", { "path": path, "offset": offset, "length": length }) if not result.get("success", False): raise RuntimeError(result.get("error", "Failed to read file")) content_b64 = result.get("content_b64", "") return decode_base64_image(content_b64) async def read_text(self, path: str, encoding: str = 'utf-8') -> str: """Read text from a file with specified encoding. Args: path: Path to the file to read encoding: Text encoding to use (default: 'utf-8') Returns: str: The decoded text content of the file """ content_bytes = await self.read_bytes(path) return content_bytes.decode(encoding) async def write_text(self, path: str, content: str, encoding: str = 'utf-8', append: bool = False) -> None: """Write text to a file with specified encoding. Args: path: Path to the file to write content: Text content to write encoding: Text encoding to use (default: 'utf-8') append: Whether to append to the file instead of overwriting """ content_bytes = content.encode(encoding) await self.write_bytes(path, content_bytes, append) async def get_file_size(self, path: str) -> int: result = await self._send_command("get_file_size", {"path": path}) if not result.get("success", False): raise RuntimeError(result.get("error", "Failed to get file size")) return result.get("size", 0) async def file_exists(self, path: str) -> bool: result = await self._send_command("file_exists", {"path": path}) return result.get("exists", False) async def directory_exists(self, path: str) -> bool: result = await self._send_command("directory_exists", {"path": path}) return result.get("exists", False) async def create_dir(self, path: str) -> None: result = await self._send_command("create_dir", {"path": path}) if not result.get("success", False): raise RuntimeError(result.get("error", "Failed to create directory")) async def delete_file(self, path: str) -> None: result = await self._send_command("delete_file", {"path": path}) if not result.get("success", False): raise RuntimeError(result.get("error", "Failed to delete file")) async def delete_dir(self, path: str) -> None: result = await self._send_command("delete_dir", {"path": path}) if not result.get("success", False): raise RuntimeError(result.get("error", "Failed to delete directory")) async def list_dir(self, path: str) -> list[str]: result = await self._send_command("list_dir", {"path": path}) if not result.get("success", False): raise RuntimeError(result.get("error", "Failed to list directory")) return result.get("files", []) # Command execution async def run_command(self, command: str) -> CommandResult: result = await self._send_command("run_command", {"command": command}) if not result.get("success", False): raise RuntimeError(result.get("error", "Failed to run command")) return CommandResult( stdout=result.get("stdout", ""), stderr=result.get("stderr", ""), returncode=result.get("return_code", 0) ) # Accessibility Actions async def get_accessibility_tree(self) -> Dict[str, Any]: """Get the accessibility tree of the current screen.""" result = await self._send_command("get_accessibility_tree") if not result.get("success", False): raise RuntimeError(result.get("error", "Failed to get accessibility tree")) return result async def get_active_window_bounds(self) -> Dict[str, int]: """Get the bounds of the currently active window.""" result = await self._send_command("get_active_window_bounds") if result["success"] and result["bounds"]: return result["bounds"] raise RuntimeError("Failed to get active window bounds") async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]: """Convert screenshot coordinates to screen coordinates. Args: x: X coordinate in screenshot space y: Y coordinate in screenshot space Returns: tuple[float, float]: (x, y) coordinates in screen space """ screen_size = await self.get_screen_size() screenshot = await self.screenshot() screenshot_img = bytes_to_image(screenshot) screenshot_width, screenshot_height = screenshot_img.size # Calculate scaling factors width_scale = screen_size["width"] / screenshot_width height_scale = screen_size["height"] / screenshot_height # Convert coordinates screen_x = x * width_scale screen_y = y * height_scale return screen_x, screen_y async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]: """Convert screen coordinates to screenshot coordinates. Args: x: X coordinate in screen space y: Y coordinate in screen space Returns: tuple[float, float]: (x, y) coordinates in screenshot space """ screen_size = await self.get_screen_size() screenshot = await self.screenshot() screenshot_img = bytes_to_image(screenshot) screenshot_width, screenshot_height = screenshot_img.size # Calculate scaling factors width_scale = screenshot_width / screen_size["width"] height_scale = screenshot_height / screen_size["height"] # Convert coordinates screenshot_x = x * width_scale screenshot_y = y * height_scale return screenshot_x, screenshot_y # Websocket Methods async def _keep_alive(self): """Keep the WebSocket connection alive with automatic reconnection.""" retry_count = 0 max_log_attempts = 1 # Only log the first attempt at INFO level log_interval = 500 # Then log every 500th attempt (significantly increased from 30) last_warning_time = 0 min_warning_interval = 30 # Minimum seconds between connection lost warnings min_retry_delay = 0.5 # Minimum delay between connection attempts (500ms) while not self._closed: try: if self._ws is None or ( self._ws and self._ws.state == websockets.protocol.State.CLOSED ): try: retry_count += 1 # Add a minimum delay between connection attempts to avoid flooding if retry_count > 1: await asyncio.sleep(min_retry_delay) # Only log the first attempt at INFO level, then every Nth attempt if retry_count == 1: self.logger.info(f"Attempting WebSocket connection to {self.ws_uri}") elif retry_count % log_interval == 0: self.logger.info( f"Still attempting WebSocket connection (attempt {retry_count})..." ) else: # All other attempts are logged at DEBUG level self.logger.debug( f"Attempting WebSocket connection to {self.ws_uri} (attempt {retry_count})" ) self._ws = await asyncio.wait_for( websockets.connect( self.ws_uri, max_size=1024 * 1024 * 10, # 10MB limit max_queue=32, ping_interval=self._ping_interval, ping_timeout=self._ping_timeout, close_timeout=5, compression=None, # Disable compression to reduce overhead ), timeout=120, ) self.logger.info("WebSocket connection established") # If api_key and vm_name are provided, perform authentication handshake if self.api_key and self.vm_name: self.logger.info("Performing authentication handshake...") auth_message = { "command": "authenticate", "params": { "api_key": self.api_key, "container_name": self.vm_name } } await self._ws.send(json.dumps(auth_message)) # Wait for authentication response async with self._recv_lock: auth_response = await asyncio.wait_for(self._ws.recv(), timeout=10) auth_result = json.loads(auth_response) if not auth_result.get("success"): error_msg = auth_result.get("error", "Authentication failed") self.logger.error(f"Authentication failed: {error_msg}") await self._ws.close() self._ws = None raise ConnectionError(f"Authentication failed: {error_msg}") self.logger.info("Authentication successful") self._reconnect_delay = 1 # Reset reconnect delay on successful connection self._last_ping = time.time() retry_count = 0 # Reset retry count on successful connection except (asyncio.TimeoutError, websockets.exceptions.WebSocketException) as e: next_retry = self._reconnect_delay # Only log the first error at WARNING level, then every Nth attempt if retry_count == 1: self.logger.warning( f"Computer API Server not ready yet. Will retry automatically." ) elif retry_count % log_interval == 0: self.logger.warning( f"Still waiting for Computer API Server (attempt {retry_count})..." ) else: # All other errors are logged at DEBUG level self.logger.debug(f"Connection attempt {retry_count} failed: {e}") if self._ws: try: await self._ws.close() except: pass self._ws = None # Use exponential backoff for connection retries await asyncio.sleep(self._reconnect_delay) self._reconnect_delay = min( self._reconnect_delay * 2, self._max_reconnect_delay ) continue # Regular ping to check connection if self._ws and self._ws.state == websockets.protocol.State.OPEN: try: if time.time() - self._last_ping >= self._ping_interval: pong_waiter = await self._ws.ping() await asyncio.wait_for(pong_waiter, timeout=self._ping_timeout) self._last_ping = time.time() except Exception as e: self.logger.debug(f"Ping failed: {e}") if self._ws: try: await self._ws.close() except: pass self._ws = None continue await asyncio.sleep(1) except Exception as e: current_time = time.time() # Only log connection lost warnings at most once every min_warning_interval seconds if current_time - last_warning_time >= min_warning_interval: self.logger.warning( f"Computer API Server connection lost. Will retry automatically." ) last_warning_time = current_time else: # Log at debug level instead self.logger.debug(f"Connection lost: {e}") if self._ws: try: await self._ws.close() except: pass self._ws = None async def _ensure_connection(self): """Ensure WebSocket connection is established.""" if self._reconnect_task is None or self._reconnect_task.done(): self._reconnect_task = asyncio.create_task(self._keep_alive()) retry_count = 0 max_retries = 5 while retry_count < max_retries: try: if self._ws and self._ws.state == websockets.protocol.State.OPEN: return retry_count += 1 await asyncio.sleep(1) except Exception as e: # Only log at ERROR level for the last retry attempt if retry_count == max_retries - 1: self.logger.error( f"Persistent connection check error after {retry_count} attempts: {e}" ) else: self.logger.debug(f"Connection check error (attempt {retry_count}): {e}") retry_count += 1 await asyncio.sleep(1) continue raise ConnectionError("Failed to establish WebSocket connection after multiple retries") async def _send_command_ws(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]: """Send command through WebSocket.""" max_retries = 3 retry_count = 0 last_error = None # Acquire lock to ensure only one command is processed at a time self.logger.debug(f"Acquired lock for command: {command}") while retry_count < max_retries: try: await self._ensure_connection() if not self._ws: raise ConnectionError("WebSocket connection is not established") message = {"command": command, "params": params or {}} await self._ws.send(json.dumps(message)) async with self._recv_lock: response = await asyncio.wait_for(self._ws.recv(), timeout=120) self.logger.debug(f"Completed command: {command}") return json.loads(response) except Exception as e: last_error = e retry_count += 1 if retry_count < max_retries: # Only log at debug level for intermediate retries self.logger.debug( f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}" ) await asyncio.sleep(1) continue else: # Only log at error level for the final failure self.logger.error( f"Failed to send command '{command}' after {max_retries} retries" ) self.logger.debug(f"Command failure details: {e}") raise raise last_error if last_error else RuntimeError("Failed to send command") async def _send_command_rest(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]: """Send command through REST API without retries or connection management.""" try: # Prepare the request payload payload = {"command": command, "params": params or {}} # Prepare headers headers = {"Content-Type": "application/json"} if self.api_key: headers["X-API-Key"] = self.api_key if self.vm_name: headers["X-Container-Name"] = self.vm_name # Send the request async with aiohttp.ClientSession() as session: async with session.post( self.rest_uri, json=payload, headers=headers ) as response: # Get the response text response_text = await response.text() # Trim whitespace response_text = response_text.strip() # Check if it starts with "data: " if response_text.startswith("data: "): # Extract everything after "data: " json_str = response_text[6:] # Remove "data: " prefix try: return json.loads(json_str) except json.JSONDecodeError: return { "success": False, "error": "Server returned malformed response", "message": response_text } else: # Return error response return { "success": False, "error": "Server returned malformed response", "message": response_text } except Exception as e: return { "success": False, "error": "Request failed", "message": str(e) } async def _send_command(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]: """Send command using REST API with WebSocket fallback.""" # Try REST API first result = await self._send_command_rest(command, params) # If REST failed with "Request failed", try WebSocket as fallback if not result.get("success", True) and (result.get("error") == "Request failed" or result.get("error") == "Server returned malformed response"): self.logger.warning(f"REST API failed for command '{command}', trying WebSocket fallback") try: return await self._send_command_ws(command, params) except Exception as e: self.logger.error(f"WebSocket fallback also failed: {e}") # Return the original REST error return result return result async def wait_for_ready(self, timeout: int = 60, interval: float = 1.0): """Wait for Computer API Server to be ready by testing version command.""" # Check if REST API is available try: result = await self._send_command_rest("version", {}) assert result.get("success", True) except Exception as e: self.logger.debug(f"REST API failed for command 'version', trying WebSocket fallback: {e}") try: await self._wait_for_ready_ws(timeout, interval) return except Exception as e: self.logger.debug(f"WebSocket fallback also failed: {e}") raise e start_time = time.time() last_error = None attempt_count = 0 progress_interval = 10 # Log progress every 10 seconds last_progress_time = start_time try: self.logger.info( f"Waiting for Computer API Server to be ready (timeout: {timeout}s)..." ) # Wait for the server to respond to get_screen_size command while time.time() - start_time < timeout: try: attempt_count += 1 current_time = time.time() # Log progress periodically without flooding logs if current_time - last_progress_time >= progress_interval: elapsed = current_time - start_time self.logger.info( f"Still waiting for Computer API Server... (elapsed: {elapsed:.1f}s, attempts: {attempt_count})" ) last_progress_time = current_time # Test the server with a simple get_screen_size command result = await self._send_command("get_screen_size") if result.get("success", False): elapsed = time.time() - start_time self.logger.info( f"Computer API Server is ready (after {elapsed:.1f}s, {attempt_count} attempts)" ) return # Server is ready else: last_error = result.get("error", "Unknown error") self.logger.debug(f"Initial connection command failed: {last_error}") except Exception as e: last_error = e self.logger.debug(f"Connection attempt {attempt_count} failed: {e}") # Wait before trying again await asyncio.sleep(interval) # If we get here, we've timed out error_msg = f"Could not connect to {self.ip_address} after {timeout} seconds" if last_error: error_msg += f": {str(last_error)}" self.logger.error(error_msg) raise TimeoutError(error_msg) except Exception as e: if isinstance(e, TimeoutError): raise error_msg = f"Error while waiting for server: {str(e)}" self.logger.error(error_msg) raise RuntimeError(error_msg) async def _wait_for_ready_ws(self, timeout: int = 60, interval: float = 1.0): """Wait for WebSocket connection to become available.""" start_time = time.time() last_error = None attempt_count = 0 progress_interval = 10 # Log progress every 10 seconds last_progress_time = start_time # Disable detailed logging for connection attempts self._log_connection_attempts = False try: self.logger.info( f"Waiting for Computer API Server to be ready (timeout: {timeout}s)..." ) # Start the keep-alive task if it's not already running if self._reconnect_task is None or self._reconnect_task.done(): self._reconnect_task = asyncio.create_task(self._keep_alive()) # Wait for the connection to be established while time.time() - start_time < timeout: try: attempt_count += 1 current_time = time.time() # Log progress periodically without flooding logs if current_time - last_progress_time >= progress_interval: elapsed = current_time - start_time self.logger.info( f"Still waiting for Computer API Server... (elapsed: {elapsed:.1f}s, attempts: {attempt_count})" ) last_progress_time = current_time # Check if we have a connection if self._ws and self._ws.state == websockets.protocol.State.OPEN: # Test the connection with a simple command try: await self._send_command_ws("get_screen_size") elapsed = time.time() - start_time self.logger.info( f"Computer API Server is ready (after {elapsed:.1f}s, {attempt_count} attempts)" ) return # Connection is fully working except Exception as e: last_error = e self.logger.debug(f"Connection test failed: {e}") # Wait before trying again await asyncio.sleep(interval) except Exception as e: last_error = e self.logger.debug(f"Connection attempt {attempt_count} failed: {e}") await asyncio.sleep(interval) # If we get here, we've timed out error_msg = f"Could not connect to {self.ip_address} after {timeout} seconds" if last_error: error_msg += f": {str(last_error)}" self.logger.error(error_msg) raise TimeoutError(error_msg) finally: # Reset to default logging behavior self._log_connection_attempts = False def close(self): """Close WebSocket connection. Note: In host computer server mode, we leave the connection open to allow other clients to connect to the same server. The server will handle cleaning up idle connections. """ # Only cancel the reconnect task if self._reconnect_task: self._reconnect_task.cancel() # Don't set closed flag or close websocket by default # This allows the server to stay connected for other clients # self._closed = True # if self._ws: # asyncio.create_task(self._ws.close()) # self._ws = None def force_close(self): """Force close the WebSocket connection. This method should be called when you want to completely shut down the connection, not just for regular cleanup. """ self._closed = True if self._reconnect_task: self._reconnect_task.cancel() if self._ws: asyncio.create_task(self._ws.close()) self._ws = None ``` -------------------------------------------------------------------------------- /libs/python/computer/computer/providers/lumier/provider.py: -------------------------------------------------------------------------------- ```python """ Lumier VM provider implementation. This provider uses Docker containers running the Lumier image to create macOS and Linux VMs. It handles VM lifecycle operations through Docker commands and container management. """ import logging import os import json import asyncio from typing import Dict, List, Optional, Any import subprocess import time import re from ..base import BaseVMProvider, VMProviderType from ..lume_api import ( lume_api_get, lume_api_run, lume_api_stop, lume_api_update ) # Setup logging logger = logging.getLogger(__name__) # Check if Docker is available try: subprocess.run(["docker", "--version"], capture_output=True, check=True) HAS_LUMIER = True except (subprocess.SubprocessError, FileNotFoundError): HAS_LUMIER = False class LumierProvider(BaseVMProvider): """ Lumier VM Provider implementation using Docker containers. This provider uses Docker to run Lumier containers that can create macOS and Linux VMs through containerization. """ def __init__( self, port: Optional[int] = 7777, host: str = "localhost", storage: Optional[str] = None, # Can be a path or 'ephemeral' shared_path: Optional[str] = None, image: str = "macos-sequoia-cua:latest", # VM image to use verbose: bool = False, ephemeral: bool = False, noVNC_port: Optional[int] = 8006, ): """Initialize the Lumier VM Provider. Args: port: Port for the API server (default: 7777) host: Hostname for the API server (default: localhost) storage: Path for persistent VM storage shared_path: Path for shared folder between host and VM image: VM image to use (e.g. "macos-sequoia-cua:latest") verbose: Enable verbose logging ephemeral: Use ephemeral (temporary) storage noVNC_port: Specific port for noVNC interface (default: 8006) """ self.host = host # Always ensure api_port has a valid value (7777 is the default) self.api_port = 7777 if port is None else port self.vnc_port = noVNC_port # User-specified noVNC port, will be set in run_vm if provided self.ephemeral = ephemeral # Handle ephemeral storage (temporary directory) if ephemeral: self.storage = "ephemeral" else: self.storage = storage self.shared_path = shared_path self.image = image # Store the VM image name to use # The container_name will be set in run_vm using the VM name self.verbose = verbose self._container_id = None self._api_url = None # Will be set after container starts @property def provider_type(self) -> VMProviderType: """Return the provider type.""" return VMProviderType.LUMIER def _parse_memory(self, memory_str: str) -> int: """Parse memory string to MB integer. Examples: "8GB" -> 8192 "1024MB" -> 1024 "512" -> 512 """ if isinstance(memory_str, int): return memory_str if isinstance(memory_str, str): # Extract number and unit match = re.match(r"(\d+)([A-Za-z]*)", memory_str) if match: value, unit = match.groups() value = int(value) unit = unit.upper() if unit == "GB" or unit == "G": return value * 1024 elif unit == "MB" or unit == "M" or unit == "": return value # Default fallback logger.warning(f"Could not parse memory string '{memory_str}', using 8GB default") return 8192 # Default to 8GB # Helper methods for interacting with the Lumier API through curl # These methods handle the various VM operations via API calls def _get_curl_error_message(self, return_code: int) -> str: """Get a descriptive error message for curl return codes. Args: return_code: The curl return code Returns: A descriptive error message """ # Map common curl error codes to helpful messages if return_code == 7: return "Failed to connect - API server is starting up" elif return_code == 22: return "HTTP error returned from API server" elif return_code == 28: return "Operation timeout - API server is slow to respond" elif return_code == 52: return "Empty reply from server - API is starting but not ready" elif return_code == 56: return "Network problem during data transfer" else: return f"Unknown curl error code: {return_code}" async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: """Get VM information by name. Args: name: Name of the VM to get information for storage: Optional storage path override. If provided, this will be used instead of the provider's default storage path. Returns: Dictionary with VM information including status, IP address, etc. """ if not HAS_LUMIER: logger.error("Docker is not available. Cannot get VM status.") return { "name": name, "status": "unavailable", "error": "Docker is not available" } # Store the current name for API requests self.container_name = name try: # Check if the container exists and is running check_cmd = ["docker", "ps", "-a", "--filter", f"name={name}", "--format", "{{.Status}}"] check_result = subprocess.run(check_cmd, capture_output=True, text=True) container_status = check_result.stdout.strip() if not container_status: logger.info(f"Container {name} does not exist. Will create when run_vm is called.") return { "name": name, "status": "not_found", "message": "Container doesn't exist yet" } # Container exists, check if it's running is_running = container_status.startswith("Up") if not is_running: logger.info(f"Container {name} exists but is not running. Status: {container_status}") return { "name": name, "status": "stopped", "container_status": container_status, } # Container is running, get the IP address and API status from Lumier API logger.info(f"Container {name} is running. Getting VM status from API.") # Use the shared lume_api_get function directly vm_info = lume_api_get( vm_name=name, host=self.host, port=self.api_port, storage=storage if storage is not None else self.storage, debug=self.verbose, verbose=self.verbose ) # Check for API errors if "error" in vm_info: # Use debug level instead of warning to reduce log noise during polling logger.debug(f"API request error: {vm_info['error']}") return { "name": name, "status": "running", # Container is running even if API is not responsive "api_status": "error", "error": vm_info["error"], "container_status": container_status } # Process the VM status information vm_status = vm_info.get("status", "unknown") vnc_url = vm_info.get("vncUrl", "") ip_address = vm_info.get("ipAddress", "") # IMPORTANT: Always ensure we have a valid IP address for connectivity # If the API doesn't return an IP address, default to localhost (127.0.0.1) # This makes the behavior consistent with LumeProvider if not ip_address and vm_status == "running": ip_address = "127.0.0.1" logger.info(f"No IP address returned from API, defaulting to {ip_address}") vm_info["ipAddress"] = ip_address logger.info(f"VM {name} status: {vm_status}") if ip_address and vnc_url: logger.info(f"VM {name} has IP: {ip_address} and VNC URL: {vnc_url}") elif not ip_address and not vnc_url and vm_status != "running": # Not running is expected in this case logger.info(f"VM {name} is not running yet. Status: {vm_status}") else: # Missing IP or VNC but status is running - this is unusual but handled with default IP logger.warning(f"VM {name} is running but missing expected fields. API response: {vm_info}") # Return the full status information return { "name": name, "status": vm_status, "ip_address": ip_address, "vnc_url": vnc_url, "api_status": "ok", "container_status": container_status, **vm_info # Include all fields from the API response } except subprocess.SubprocessError as e: logger.error(f"Failed to check container status: {e}") return { "name": name, "status": "error", "error": f"Failed to check container status: {str(e)}" } async def list_vms(self) -> List[Dict[str, Any]]: """List all VMs managed by this provider. For Lumier provider, there is only one VM per container. """ try: status = await self.get_vm("default") return [status] if status.get("status") != "unknown" else [] except Exception as e: logger.error(f"Failed to list VMs: {e}") return [] async def run_vm(self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: """Run a VM with the given options. Args: image: Name/tag of the image to use name: Name of the VM to run (used for the container name and Docker image tag) run_opts: Options for running the VM, including: - cpu: Number of CPU cores - memory: Amount of memory (e.g. "8GB") - noVNC_port: Specific port for noVNC interface Returns: Dictionary with VM status information """ # Set the container name using the VM name for consistency self.container_name = name try: # First, check if container already exists and remove it try: check_cmd = ["docker", "ps", "-a", "--filter", f"name={self.container_name}", "--format", "{{.ID}}"] check_result = subprocess.run(check_cmd, capture_output=True, text=True) existing_container = check_result.stdout.strip() if existing_container: logger.info(f"Removing existing container: {self.container_name}") remove_cmd = ["docker", "rm", "-f", self.container_name] subprocess.run(remove_cmd, check=True) except subprocess.CalledProcessError as e: logger.warning(f"Error removing existing container: {e}") # Continue anyway, next steps will fail if there's a real problem # Prepare the Docker run command cmd = ["docker", "run", "-d", "--name", self.container_name] cmd.extend(["-p", f"{self.vnc_port}:8006"]) logger.debug(f"Using specified noVNC_port: {self.vnc_port}") # Set API URL using the API port self._api_url = f"http://{self.host}:{self.api_port}" # Parse memory setting memory_mb = self._parse_memory(run_opts.get("memory", "8GB")) # Add storage volume mount if storage is specified (for persistent VM storage) if self.storage and self.storage != "ephemeral": # Create storage directory if it doesn't exist storage_dir = os.path.abspath(os.path.expanduser(self.storage or "")) os.makedirs(storage_dir, exist_ok=True) # Add volume mount for storage cmd.extend([ "-v", f"{storage_dir}:/storage", "-e", f"HOST_STORAGE_PATH={storage_dir}" ]) logger.debug(f"Using persistent storage at: {storage_dir}") # Add shared folder volume mount if shared_path is specified if self.shared_path: # Create shared directory if it doesn't exist shared_dir = os.path.abspath(os.path.expanduser(self.shared_path or "")) os.makedirs(shared_dir, exist_ok=True) # Add volume mount for shared folder cmd.extend([ "-v", f"{shared_dir}:/shared", "-e", f"HOST_SHARED_PATH={shared_dir}" ]) logger.debug(f"Using shared folder at: {shared_dir}") # Add environment variables # Always use the container_name as the VM_NAME for consistency # Use the VM image passed from the Computer class logger.debug(f"Using VM image: {self.image}") # If ghcr.io is in the image, use the full image name if "ghcr.io" in self.image: vm_image = self.image else: vm_image = f"ghcr.io/trycua/{self.image}" cmd.extend([ "-e", f"VM_NAME={self.container_name}", "-e", f"VERSION={vm_image}", "-e", f"CPU_CORES={run_opts.get('cpu', '4')}", "-e", f"RAM_SIZE={memory_mb}", ]) # Specify the Lumier image with the full image name lumier_image = "trycua/lumier:latest" # First check if the image exists locally try: logger.debug(f"Checking if Docker image {lumier_image} exists locally...") check_image_cmd = ["docker", "image", "inspect", lumier_image] subprocess.run(check_image_cmd, capture_output=True, check=True) logger.debug(f"Docker image {lumier_image} found locally.") except subprocess.CalledProcessError: # Image doesn't exist locally logger.warning(f"\nWARNING: Docker image {lumier_image} not found locally.") logger.warning("The system will attempt to pull it from Docker Hub, which may fail if you have network connectivity issues.") logger.warning("If the Docker pull fails, you may need to manually pull the image first with:") logger.warning(f" docker pull {lumier_image}\n") # Add the image to the command cmd.append(lumier_image) # Print the Docker command for debugging logger.debug(f"DOCKER COMMAND: {' '.join(cmd)}") # Run the container with improved error handling try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) except subprocess.CalledProcessError as e: if "no route to host" in str(e.stderr).lower() or "failed to resolve reference" in str(e.stderr).lower(): error_msg = (f"Network error while trying to pull Docker image '{lumier_image}'\n" f"Error: {e.stderr}\n\n" f"SOLUTION: Please try one of the following:\n" f"1. Check your internet connection\n" f"2. Pull the image manually with: docker pull {lumier_image}\n" f"3. Check if Docker is running properly\n") logger.error(error_msg) raise RuntimeError(error_msg) raise # Container started, now check VM status with polling logger.debug("Container started, checking VM status...") logger.debug("NOTE: This may take some time while the VM image is being pulled and initialized") # Start a background thread to show container logs in real-time import threading def show_container_logs(): # Give the container a moment to start generating logs time.sleep(1) logger.debug(f"\n---- CONTAINER LOGS FOR '{name}' (LIVE) ----") logger.debug("Showing logs as they are generated. Press Ctrl+C to stop viewing logs...\n") try: # Use docker logs with follow option log_cmd = ["docker", "logs", "--tail", "30", "--follow", name] process = subprocess.Popen(log_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True) # Read and print logs line by line for line in process.stdout: logger.debug(line, end='') # Break if process has exited if process.poll() is not None: break except Exception as e: logger.error(f"\nError showing container logs: {e}") if self.verbose: logger.error(f"Error in log streaming thread: {e}") finally: logger.debug("\n---- LOG STREAMING ENDED ----") # Make sure process is terminated if 'process' in locals() and process.poll() is None: process.terminate() # Start log streaming in a background thread if verbose mode is enabled log_thread = threading.Thread(target=show_container_logs) log_thread.daemon = True # Thread will exit when main program exits log_thread.start() # Skip waiting for container readiness and just poll get_vm directly # Poll the get_vm method indefinitely until the VM is ready with an IP address attempt = 0 consecutive_errors = 0 vm_running = False while True: # Wait indefinitely try: # Use longer delays to give the system time to initialize if attempt > 0: # Start with 5s delay, then increase gradually up to 30s for later attempts # But use shorter delays while we're getting API errors if consecutive_errors > 0 and consecutive_errors < 5: wait_time = 3 # Use shorter delays when we're getting API errors else: wait_time = min(30, 5 + (attempt * 2)) logger.debug(f"Waiting {wait_time}s before retry #{attempt+1}...") await asyncio.sleep(wait_time) # Try to get VM status logger.debug(f"Checking VM status (attempt {attempt+1})...") vm_status = await self.get_vm(name) # Check for API errors if 'error' in vm_status: consecutive_errors += 1 error_msg = vm_status.get('error', 'Unknown error') # Only print a user-friendly status message, not the raw error # since _lume_api_get already logged the technical details if consecutive_errors == 1 or attempt % 5 == 0: if 'Empty reply from server' in error_msg: logger.info("API server is starting up - container is running, but API isn't fully initialized yet.") logger.info("This is expected during the initial VM setup - will continue polling...") else: # Don't repeat the exact same error message each time logger.warning(f"API request error (attempt {attempt+1}): {error_msg}") # Just log that we're still working on it if attempt > 3: logger.debug("Still waiting for the API server to become available...") # If we're getting errors but container is running, that's normal during startup if vm_status.get('status') == 'running': if not vm_running: logger.info("Container is running, waiting for the VM within it to become fully ready...") logger.info("This might take a minute while the VM initializes...") vm_running = True # Increase counter and continue attempt += 1 continue # Reset consecutive error counter when we get a successful response consecutive_errors = 0 # If the VM is running, check if it has an IP address (which means it's fully ready) if vm_status.get('status') == 'running': vm_running = True # Check if we have an IP address, which means the VM is fully ready if 'ip_address' in vm_status and vm_status['ip_address']: logger.info(f"VM is now fully running with IP: {vm_status.get('ip_address')}") if 'vnc_url' in vm_status and vm_status['vnc_url']: logger.info(f"VNC URL: {vm_status.get('vnc_url')}") return vm_status else: logger.debug("VM is running but still initializing network interfaces...") logger.debug("Waiting for IP address to be assigned...") else: # VM exists but might still be starting up status = vm_status.get('status', 'unknown') logger.debug(f"VM found but status is: {status}. Continuing to poll...") # Increase counter for next iteration's delay calculation attempt += 1 # If we reach a very large number of attempts, give a reassuring message but continue if attempt % 10 == 0: logger.debug(f"Still waiting after {attempt} attempts. This might take several minutes for first-time setup.") if not vm_running and attempt >= 20: logger.warning("\nNOTE: First-time VM initialization can be slow as images are downloaded.") logger.warning("If this continues for more than 10 minutes, you may want to check:") logger.warning(" 1. Docker logs with: docker logs " + name) logger.warning(" 2. If your network can access container registries") logger.warning("Press Ctrl+C to abort if needed.\n") # After 150 attempts (likely over 30-40 minutes), return current status if attempt >= 150: logger.debug(f"Reached 150 polling attempts. VM status is: {vm_status.get('status', 'unknown')}") logger.debug("Returning current VM status, but please check Docker logs if there are issues.") return vm_status except Exception as e: # Always continue retrying, but with increasing delays logger.warning(f"Error checking VM status (attempt {attempt+1}): {e}. Will retry.") consecutive_errors += 1 # If we've had too many consecutive errors, might be a deeper problem if consecutive_errors >= 10: logger.warning(f"\nWARNING: Encountered {consecutive_errors} consecutive errors while checking VM status.") logger.warning("You may need to check the Docker container logs or restart the process.") logger.warning(f"Error details: {str(e)}\n") # Increase attempt counter for next iteration attempt += 1 # After many consecutive errors, add a delay to avoid hammering the system if attempt > 5: error_delay = min(30, 10 + attempt) logger.warning(f"Multiple connection errors, waiting {error_delay}s before next attempt...") await asyncio.sleep(error_delay) except subprocess.CalledProcessError as e: error_msg = f"Failed to start Lumier container: {e.stderr if hasattr(e, 'stderr') else str(e)}" logger.error(error_msg) raise RuntimeError(error_msg) async def _wait_for_container_ready(self, container_name: str, timeout: int = 90) -> bool: """Wait for the Lumier container to be fully ready with a valid API response. Args: container_name: Name of the Docker container to check timeout: Maximum time to wait in seconds (default: 90 seconds) Returns: True if the container is running, even if API is not fully ready. This allows operations to continue with appropriate fallbacks. """ start_time = time.time() api_ready = False container_running = False logger.debug(f"Waiting for container {container_name} to be ready (timeout: {timeout}s)...") while time.time() - start_time < timeout: # Check if container is running try: check_cmd = ["docker", "ps", "--filter", f"name={container_name}", "--format", "{{.Status}}"] result = subprocess.run(check_cmd, capture_output=True, text=True, check=True) container_status = result.stdout.strip() if container_status and container_status.startswith("Up"): container_running = True logger.info(f"Container {container_name} is running with status: {container_status}") else: logger.warning(f"Container {container_name} not yet running, status: {container_status}") # container is not running yet, wait and try again await asyncio.sleep(2) # Longer sleep to give Docker time continue except subprocess.CalledProcessError as e: logger.warning(f"Error checking container status: {e}") await asyncio.sleep(2) continue # Container is running, check if API is responsive try: # First check the health endpoint api_url = f"http://{self.host}:{self.api_port}/health" logger.info(f"Checking API health at: {api_url}") # Use longer timeout for API health check since it may still be initializing curl_cmd = ["curl", "-s", "--connect-timeout", "5", "--max-time", "10", api_url] result = subprocess.run(curl_cmd, capture_output=True, text=True) if result.returncode == 0 and "ok" in result.stdout.lower(): api_ready = True logger.info(f"API is ready at {api_url}") break else: # API health check failed, now let's check if the VM status endpoint is responsive # This covers cases where the health endpoint isn't implemented but the VM API is working vm_api_url = f"http://{self.host}:{self.api_port}/lume/vms/{container_name}" if self.storage: import urllib.parse encoded_storage = urllib.parse.quote_plus(self.storage) vm_api_url += f"?storage={encoded_storage}" curl_vm_cmd = ["curl", "-s", "--connect-timeout", "5", "--max-time", "10", vm_api_url] vm_result = subprocess.run(curl_vm_cmd, capture_output=True, text=True) if vm_result.returncode == 0 and vm_result.stdout.strip(): # VM API responded with something - consider the API ready api_ready = True logger.info(f"VM API is ready at {vm_api_url}") break else: curl_code = result.returncode if curl_code == 0: curl_code = vm_result.returncode # Map common curl error codes to helpful messages if curl_code == 7: curl_error = "Failed to connect - API server is starting up" elif curl_code == 22: curl_error = "HTTP error returned from API server" elif curl_code == 28: curl_error = "Operation timeout - API server is slow to respond" elif curl_code == 52: curl_error = "Empty reply from server - API is starting but not ready" elif curl_code == 56: curl_error = "Network problem during data transfer" else: curl_error = f"Unknown curl error code: {curl_code}" logger.info(f"API not ready yet: {curl_error}") except subprocess.SubprocessError as e: logger.warning(f"Error checking API status: {e}") # If the container is running but API is not ready, that's OK - we'll just wait # a bit longer before checking again, as the container may still be initializing elapsed_seconds = time.time() - start_time if int(elapsed_seconds) % 5 == 0: # Only print status every 5 seconds to reduce verbosity logger.debug(f"Waiting for API to initialize... ({elapsed_seconds:.1f}s / {timeout}s)") await asyncio.sleep(3) # Longer sleep between API checks # Handle timeout - if the container is running but API is not ready, that's not # necessarily an error - the API might just need more time to start up if not container_running: logger.warning(f"Timed out waiting for container {container_name} to start") return False if not api_ready: logger.warning(f"Container {container_name} is running, but API is not fully ready yet.") logger.warning(f"NOTE: You may see some 'API request failed' messages while the API initializes.") # Return True if container is running, even if API isn't ready yet # This allows VM operations to proceed, with appropriate retries for API calls return container_running async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: """Stop a running VM by stopping the Lumier container.""" try: # Use Docker commands to stop the container directly if hasattr(self, '_container_id') and self._container_id: logger.info(f"Stopping Lumier container: {self.container_name}") cmd = ["docker", "stop", self.container_name] result = subprocess.run(cmd, capture_output=True, text=True, check=True) logger.info(f"Container stopped: {result.stdout.strip()}") # Return minimal status info return { "name": name, "status": "stopped", "container_id": self._container_id, } else: # Try to find the container by name check_cmd = ["docker", "ps", "-a", "--filter", f"name={self.container_name}", "--format", "{{.ID}}"] check_result = subprocess.run(check_cmd, capture_output=True, text=True) container_id = check_result.stdout.strip() if container_id: logger.info(f"Found container ID: {container_id}") cmd = ["docker", "stop", self.container_name] result = subprocess.run(cmd, capture_output=True, text=True, check=True) logger.info(f"Container stopped: {result.stdout.strip()}") return { "name": name, "status": "stopped", "container_id": container_id, } else: logger.warning(f"No container found with name {self.container_name}") return { "name": name, "status": "unknown", } except subprocess.CalledProcessError as e: error_msg = f"Failed to stop container: {e.stderr if hasattr(e, 'stderr') else str(e)}" logger.error(error_msg) raise RuntimeError(f"Failed to stop Lumier container: {error_msg}") # update_vm is not implemented as it's not needed for Lumier # The BaseVMProvider requires it, so we provide a minimal implementation async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: """Not implemented for Lumier provider.""" logger.warning("update_vm is not implemented for Lumier provider") return {"name": name, "status": "unchanged"} async def get_logs(self, name: str, num_lines: int = 100, follow: bool = False, timeout: Optional[int] = None) -> str: """Get the logs from the Lumier container. Args: name: Name of the VM/container to get logs for num_lines: Number of recent log lines to return (default: 100) follow: If True, follow the logs (stream new logs as they are generated) timeout: Optional timeout in seconds for follow mode (None means no timeout) Returns: Container logs as a string Note: If follow=True, this function will continuously stream logs until timeout or until interrupted. The output will be printed to console in real-time. """ if not HAS_LUMIER: error_msg = "Docker is not available. Cannot get container logs." logger.error(error_msg) return error_msg # Make sure we have a container name container_name = name # Check if the container exists and is running try: # Check if the container exists inspect_cmd = ["docker", "container", "inspect", container_name] result = subprocess.run(inspect_cmd, capture_output=True, text=True) if result.returncode != 0: error_msg = f"Container '{container_name}' does not exist or is not accessible" logger.error(error_msg) return error_msg except Exception as e: error_msg = f"Error checking container status: {str(e)}" logger.error(error_msg) return error_msg # Base docker logs command log_cmd = ["docker", "logs"] # Add tail parameter to limit the number of lines log_cmd.extend(["--tail", str(num_lines)]) # Handle follow mode with or without timeout if follow: log_cmd.append("--follow") if timeout is not None: # For follow mode with timeout, we'll run the command and handle the timeout log_cmd.append(container_name) logger.info(f"Following logs for container '{container_name}' with timeout {timeout}s") logger.info(f"\n---- CONTAINER LOGS FOR '{container_name}' (LIVE) ----") logger.info(f"Press Ctrl+C to stop following logs\n") try: # Run with timeout process = subprocess.Popen(log_cmd, text=True) # Wait for the specified timeout if timeout: try: process.wait(timeout=timeout) except subprocess.TimeoutExpired: process.terminate() # Stop after timeout logger.info(f"\n---- LOG FOLLOWING STOPPED (timeout {timeout}s reached) ----") else: # Without timeout, wait for user interruption process.wait() return "Logs were displayed to console in follow mode" except KeyboardInterrupt: process.terminate() logger.info("\n---- LOG FOLLOWING STOPPED (user interrupted) ----") return "Logs were displayed to console in follow mode (interrupted)" else: # For follow mode without timeout, we'll print a helpful message log_cmd.append(container_name) logger.info(f"Following logs for container '{container_name}' indefinitely") logger.info(f"\n---- CONTAINER LOGS FOR '{container_name}' (LIVE) ----") logger.info(f"Press Ctrl+C to stop following logs\n") try: # Run the command and let it run until interrupted process = subprocess.Popen(log_cmd, text=True) process.wait() # Wait indefinitely (until user interrupts) return "Logs were displayed to console in follow mode" except KeyboardInterrupt: process.terminate() logger.info("\n---- LOG FOLLOWING STOPPED (user interrupted) ----") return "Logs were displayed to console in follow mode (interrupted)" else: # For non-follow mode, capture and return the logs as a string log_cmd.append(container_name) logger.info(f"Getting {num_lines} log lines for container '{container_name}'") try: result = subprocess.run(log_cmd, capture_output=True, text=True, check=True) logs = result.stdout # Only print header and logs if there's content if logs.strip(): logger.info(f"\n---- CONTAINER LOGS FOR '{container_name}' (LAST {num_lines} LINES) ----\n") logger.info(logs) logger.info(f"\n---- END OF LOGS ----") else: logger.info(f"\nNo logs available for container '{container_name}'") return logs except subprocess.CalledProcessError as e: error_msg = f"Error getting logs: {e.stderr}" logger.error(error_msg) return error_msg except Exception as e: error_msg = f"Unexpected error getting logs: {str(e)}" logger.error(error_msg) return error_msg async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: raise NotImplementedError("LumierProvider does not support restarting VMs.") async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str: """Get the IP address of a VM, waiting indefinitely until it's available. Args: name: Name of the VM to get the IP for storage: Optional storage path override retry_delay: Delay between retries in seconds (default: 2) Returns: IP address of the VM when it becomes available """ # Use container_name = name for consistency self.container_name = name # Track total attempts for logging purposes total_attempts = 0 # Loop indefinitely until we get a valid IP while True: total_attempts += 1 # Log retry message but not on first attempt if total_attempts > 1: logger.info(f"Waiting for VM {name} IP address (attempt {total_attempts})...") try: # Get VM information vm_info = await self.get_vm(name, storage=storage) # Check if we got a valid IP ip = vm_info.get("ip_address", None) if ip and ip != "unknown" and not ip.startswith("0.0.0.0"): logger.info(f"Got valid VM IP address: {ip}") return ip # Check the VM status status = vm_info.get("status", "unknown") # Special handling for Lumier: it may report "stopped" even when the VM is starting # If the VM information contains an IP but status is stopped, it might be a race condition if status == "stopped" and "ip_address" in vm_info: ip = vm_info.get("ip_address") if ip and ip != "unknown" and not ip.startswith("0.0.0.0"): logger.info(f"Found valid IP {ip} despite VM status being {status}") return ip logger.info(f"VM status is {status}, but still waiting for IP to be assigned") # If VM is not running yet, log and wait elif status != "running": logger.info(f"VM is not running yet (status: {status}). Waiting...") # If VM is running but no IP yet, wait and retry else: logger.info("VM is running but no valid IP address yet. Waiting...") except Exception as e: logger.warning(f"Error getting VM {name} IP: {e}, continuing to wait...") # Wait before next retry await asyncio.sleep(retry_delay) # Add progress log every 10 attempts if total_attempts % 10 == 0: logger.info(f"Still waiting for VM {name} IP after {total_attempts} attempts...") async def __aenter__(self): """Async context manager entry. This method is called when entering an async context manager block. Returns self to be used in the context. """ logger.debug("Entering LumierProvider context") # Initialize the API URL with the default value if not already set # This ensures get_vm can work before run_vm is called if not hasattr(self, '_api_url') or not self._api_url: self._api_url = f"http://{self.host}:{self.api_port}" logger.info(f"Initialized default Lumier API URL: {self._api_url}") return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit. This method is called when exiting an async context manager block. It handles proper cleanup of resources, including stopping any running containers. """ logger.debug(f"Exiting LumierProvider context, handling exceptions: {exc_type}") try: # If we have a container ID, we should stop it to clean up resources if hasattr(self, '_container_id') and self._container_id: logger.info(f"Stopping Lumier container on context exit: {self.container_name}") try: cmd = ["docker", "stop", self.container_name] subprocess.run(cmd, capture_output=True, text=True, check=True) logger.info(f"Container stopped during context exit: {self.container_name}") except subprocess.CalledProcessError as e: logger.warning(f"Failed to stop container during cleanup: {e.stderr}") # Don't raise an exception here, we want to continue with cleanup except Exception as e: logger.error(f"Error during LumierProvider cleanup: {e}") # We don't want to suppress the original exception if there was one if exc_type is None: raise # Return False to indicate that any exception should propagate return False ``` -------------------------------------------------------------------------------- /libs/python/computer/computer/computer.py: -------------------------------------------------------------------------------- ```python import traceback from typing import Optional, List, Literal, Dict, Any, Union, TYPE_CHECKING, cast import asyncio from .models import Computer as ComputerConfig, Display from .interface.factory import InterfaceFactory import time from PIL import Image import io import re from .logger import Logger, LogLevel import json import logging from core.telemetry import is_telemetry_enabled, record_event import os from . import helpers import platform SYSTEM_INFO = { "os": platform.system().lower(), "os_version": platform.release(), "python_version": platform.python_version(), } # Import provider related modules from .providers.base import VMProviderType from .providers.factory import VMProviderFactory OSType = Literal["macos", "linux", "windows"] class Computer: """Computer is the main class for interacting with the computer.""" def create_desktop_from_apps(self, apps): """ Create a virtual desktop from a list of app names, returning a DioramaComputer that proxies Diorama.Interface but uses diorama_cmds via the computer interface. Args: apps (list[str]): List of application names to include in the desktop. Returns: DioramaComputer: A proxy object with the Diorama interface, but using diorama_cmds. """ assert "app-use" in self.experiments, "App Usage is an experimental feature. Enable it by passing experiments=['app-use'] to Computer()" from .diorama_computer import DioramaComputer return DioramaComputer(self, apps) def __init__( self, display: Union[Display, Dict[str, int], str] = "1024x768", memory: str = "8GB", cpu: str = "4", os_type: OSType = "macos", name: str = "", image: Optional[str] = None, shared_directories: Optional[List[str]] = None, use_host_computer_server: bool = False, verbosity: Union[int, LogLevel] = logging.INFO, telemetry_enabled: bool = True, provider_type: Union[str, VMProviderType] = VMProviderType.LUME, port: Optional[int] = 7777, noVNC_port: Optional[int] = 8006, host: str = os.environ.get("PYLUME_HOST", "localhost"), storage: Optional[str] = None, ephemeral: bool = False, api_key: Optional[str] = None, experiments: Optional[List[str]] = None ): """Initialize a new Computer instance. Args: display: The display configuration. Can be: - A Display object - A dict with 'width' and 'height' - A string in format "WIDTHxHEIGHT" (e.g. "1920x1080") Defaults to "1024x768" memory: The VM memory allocation. Defaults to "8GB" cpu: The VM CPU allocation. Defaults to "4" os_type: The operating system type ('macos' or 'linux') name: The VM name image: The VM image name shared_directories: Optional list of directory paths to share with the VM use_host_computer_server: If True, target localhost instead of starting a VM verbosity: Logging level (standard Python logging levels: logging.DEBUG, logging.INFO, etc.) LogLevel enum values are still accepted for backward compatibility telemetry_enabled: Whether to enable telemetry tracking. Defaults to True. provider_type: The VM provider type to use (lume, qemu, cloud) port: Optional port to use for the VM provider server noVNC_port: Optional port for the noVNC web interface (Lumier provider) host: Host to use for VM provider connections (e.g. "localhost", "host.docker.internal") storage: Optional path for persistent VM storage (Lumier provider) ephemeral: Whether to use ephemeral storage api_key: Optional API key for cloud providers experiments: Optional list of experimental features to enable (e.g. ["app-use"]) """ self.logger = Logger("computer", verbosity) self.logger.info("Initializing Computer...") if not image: if os_type == "macos": image = "macos-sequoia-cua:latest" elif os_type == "linux": image = "trycua/cua-ubuntu:latest" image = str(image) # Store original parameters self.image = image self.port = port self.noVNC_port = noVNC_port self.host = host self.os_type = os_type self.provider_type = provider_type self.ephemeral = ephemeral self.api_key = api_key self.experiments = experiments or [] if "app-use" in self.experiments: assert self.os_type == "macos", "App use experiment is only supported on macOS" # The default is currently to use non-ephemeral storage if storage and ephemeral and storage != "ephemeral": raise ValueError("Storage path and ephemeral flag cannot be used together") # Windows Sandbox always uses ephemeral storage if self.provider_type == VMProviderType.WINSANDBOX: if not ephemeral and storage != None and storage != "ephemeral": self.logger.warning("Windows Sandbox storage is always ephemeral. Setting ephemeral=True.") self.ephemeral = True self.storage = "ephemeral" else: self.storage = "ephemeral" if ephemeral else storage # For Lumier provider, store the first shared directory path to use # for VM file sharing self.shared_path = None if shared_directories and len(shared_directories) > 0: self.shared_path = shared_directories[0] self.logger.info(f"Using first shared directory for VM file sharing: {self.shared_path}") # Store telemetry preference self._telemetry_enabled = telemetry_enabled # Set initialization flag self._initialized = False self._running = False # Configure root logger self.verbosity = verbosity self.logger = Logger("computer", verbosity) # Configure component loggers with proper hierarchy self.vm_logger = Logger("computer.vm", verbosity) self.interface_logger = Logger("computer.interface", verbosity) if not use_host_computer_server: if ":" not in image: image = f"{image}:latest" if not name: # Normalize the name to be used for the VM name = image.replace(":", "_") # Remove any forward slashes name = name.replace("/", "_") # Convert display parameter to Display object if isinstance(display, str): # Parse string format "WIDTHxHEIGHT" match = re.match(r"(\d+)x(\d+)", display) if not match: raise ValueError( "Display string must be in format 'WIDTHxHEIGHT' (e.g. '1024x768')" ) width, height = map(int, match.groups()) display_config = Display(width=width, height=height) elif isinstance(display, dict): display_config = Display(**display) else: display_config = display self.config = ComputerConfig( image=image.split(":")[0], tag=image.split(":")[1], name=name, display=display_config, memory=memory, cpu=cpu, ) # Initialize VM provider but don't start it yet - we'll do that in run() self.config.vm_provider = None # Will be initialized in run() # Store shared directories config self.shared_directories = shared_directories or [] # Placeholder for VM provider context manager self._provider_context = None # Initialize with proper typing - None at first, will be set in run() self._interface = None self.use_host_computer_server = use_host_computer_server # Record initialization in telemetry (if enabled) if telemetry_enabled and is_telemetry_enabled(): record_event("computer_initialized", SYSTEM_INFO) else: self.logger.debug("Telemetry disabled - skipping initialization tracking") async def __aenter__(self): """Start the computer.""" await self.run() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Stop the computer.""" await self.disconnect() def __enter__(self): """Start the computer.""" # Run the event loop to call the async enter method loop = asyncio.get_event_loop() loop.run_until_complete(self.__aenter__()) return self def __exit__(self, exc_type, exc_val, exc_tb): """Stop the computer.""" loop = asyncio.get_event_loop() loop.run_until_complete(self.__aexit__(exc_type, exc_val, exc_tb)) async def run(self) -> Optional[str]: """Initialize the VM and computer interface.""" if TYPE_CHECKING: from .interface.base import BaseComputerInterface # If already initialized, just log and return if hasattr(self, "_initialized") and self._initialized: self.logger.info("Computer already initialized, skipping initialization") return self.logger.info("Starting computer...") start_time = time.time() try: # If using host computer server if self.use_host_computer_server: self.logger.info("Using host computer server") # Set ip_address for host computer server mode ip_address = "localhost" # Create the interface with explicit type annotation from .interface.base import BaseComputerInterface self._interface = cast( BaseComputerInterface, InterfaceFactory.create_interface_for_os( os=self.os_type, ip_address=ip_address # type: ignore[arg-type] ), ) self.logger.info("Waiting for host computer server to be ready...") await self._interface.wait_for_ready() self.logger.info("Host computer server ready") else: # Start or connect to VM self.logger.info(f"Starting VM: {self.image}") if not self._provider_context: try: provider_type_name = self.provider_type.name if isinstance(self.provider_type, VMProviderType) else self.provider_type self.logger.verbose(f"Initializing {provider_type_name} provider context...") # Explicitly set provider parameters storage = "ephemeral" if self.ephemeral else self.storage verbose = self.verbosity >= LogLevel.DEBUG ephemeral = self.ephemeral port = self.port if self.port is not None else 7777 host = self.host if self.host else "localhost" image = self.image shared_path = self.shared_path noVNC_port = self.noVNC_port # Create VM provider instance with explicit parameters try: if self.provider_type == VMProviderType.LUMIER: self.logger.info(f"Using VM image for Lumier provider: {image}") if shared_path: self.logger.info(f"Using shared path for Lumier provider: {shared_path}") if noVNC_port: self.logger.info(f"Using noVNC port for Lumier provider: {noVNC_port}") self.config.vm_provider = VMProviderFactory.create_provider( self.provider_type, port=port, host=host, storage=storage, shared_path=shared_path, image=image, verbose=verbose, ephemeral=ephemeral, noVNC_port=noVNC_port, ) elif self.provider_type == VMProviderType.LUME: self.config.vm_provider = VMProviderFactory.create_provider( self.provider_type, port=port, host=host, storage=storage, verbose=verbose, ephemeral=ephemeral, ) elif self.provider_type == VMProviderType.CLOUD: self.config.vm_provider = VMProviderFactory.create_provider( self.provider_type, api_key=self.api_key, verbose=verbose, ) elif self.provider_type == VMProviderType.WINSANDBOX: self.config.vm_provider = VMProviderFactory.create_provider( self.provider_type, port=port, host=host, storage=storage, verbose=verbose, ephemeral=ephemeral, noVNC_port=noVNC_port, ) elif self.provider_type == VMProviderType.DOCKER: self.config.vm_provider = VMProviderFactory.create_provider( self.provider_type, port=port, host=host, storage=storage, shared_path=shared_path, image=image or "trycua/cua-ubuntu:latest", verbose=verbose, ephemeral=ephemeral, noVNC_port=noVNC_port, ) else: raise ValueError(f"Unsupported provider type: {self.provider_type}") self._provider_context = await self.config.vm_provider.__aenter__() self.logger.verbose("VM provider context initialized successfully") except ImportError as ie: self.logger.error(f"Failed to import provider dependencies: {ie}") if str(ie).find("lume") >= 0 and str(ie).find("lumier") < 0: self.logger.error("Please install with: pip install cua-computer[lume]") elif str(ie).find("lumier") >= 0 or str(ie).find("docker") >= 0: self.logger.error("Please install with: pip install cua-computer[lumier] and make sure Docker is installed") elif str(ie).find("cloud") >= 0: self.logger.error("Please install with: pip install cua-computer[cloud]") raise except Exception as e: self.logger.error(f"Failed to initialize provider context: {e}") raise RuntimeError(f"Failed to initialize VM provider: {e}") # Check if VM exists or create it is_running = False try: if self.config.vm_provider is None: raise RuntimeError(f"VM provider not initialized for {self.config.name}") vm = await self.config.vm_provider.get_vm(self.config.name) self.logger.verbose(f"Found existing VM: {self.config.name}") is_running = vm.get("status") == "running" except Exception as e: self.logger.error(f"VM not found: {self.config.name}") self.logger.error(f"Error: {e}") raise RuntimeError( f"VM {self.config.name} could not be found or created." ) # Start the VM if it's not running if not is_running: self.logger.info(f"VM {self.config.name} is not running, starting it...") # Convert paths to dictionary format for shared directories shared_dirs = [] for path in self.shared_directories: self.logger.verbose(f"Adding shared directory: {path}") path = os.path.abspath(os.path.expanduser(path)) if os.path.exists(path): # Add path in format expected by Lume API shared_dirs.append({ "hostPath": path, "readOnly": False }) else: self.logger.warning(f"Shared directory does not exist: {path}") # Prepare run options to pass to the provider run_opts = {} # Add display information if available if self.config.display is not None: display_info = { "width": self.config.display.width, "height": self.config.display.height, } # Check if scale_factor exists before adding it if hasattr(self.config.display, "scale_factor"): display_info["scale_factor"] = self.config.display.scale_factor run_opts["display"] = display_info # Add shared directories if available if self.shared_directories: run_opts["shared_directories"] = shared_dirs.copy() # Run the VM with the provider try: if self.config.vm_provider is None: raise RuntimeError(f"VM provider not initialized for {self.config.name}") # Use the complete run_opts we prepared earlier # Handle ephemeral storage for run_vm method too storage_param = "ephemeral" if self.ephemeral else self.storage # Log the image being used self.logger.info(f"Running VM using image: {self.image}") # Call provider.run_vm with explicit image parameter response = await self.config.vm_provider.run_vm( image=self.image, name=self.config.name, run_opts=run_opts, storage=storage_param ) self.logger.info(f"VM run response: {response if response else 'None'}") except Exception as run_error: self.logger.error(f"Failed to run VM: {run_error}") raise RuntimeError(f"Failed to start VM: {run_error}") # Wait for VM to be ready with a valid IP address self.logger.info("Waiting for VM to be ready with a valid IP address...") try: if self.provider_type == VMProviderType.LUMIER: max_retries = 60 # Increased for Lumier VM startup which takes longer retry_delay = 3 # 3 seconds between retries for Lumier else: max_retries = 30 # Default for other providers retry_delay = 2 # 2 seconds between retries self.logger.info(f"Waiting up to {max_retries * retry_delay} seconds for VM to be ready...") ip = await self.get_ip(max_retries=max_retries, retry_delay=retry_delay) # If we get here, we have a valid IP self.logger.info(f"VM is ready with IP: {ip}") ip_address = ip except TimeoutError as timeout_error: self.logger.error(str(timeout_error)) raise RuntimeError(f"VM startup timed out: {timeout_error}") except Exception as wait_error: self.logger.error(f"Error waiting for VM: {wait_error}") raise RuntimeError(f"VM failed to become ready: {wait_error}") except Exception as e: self.logger.error(f"Failed to initialize computer: {e}") self.logger.error(traceback.format_exc()) raise RuntimeError(f"Failed to initialize computer: {e}") try: # Verify we have a valid IP before initializing the interface if not ip_address or ip_address == "unknown" or ip_address == "0.0.0.0": raise RuntimeError(f"Cannot initialize interface - invalid IP address: {ip_address}") # Initialize the interface using the factory with the specified OS self.logger.info(f"Initializing interface for {self.os_type} at {ip_address}") from .interface.base import BaseComputerInterface # Pass authentication credentials if using cloud provider if self.provider_type == VMProviderType.CLOUD and self.api_key and self.config.name: self._interface = cast( BaseComputerInterface, InterfaceFactory.create_interface_for_os( os=self.os_type, ip_address=ip_address, api_key=self.api_key, vm_name=self.config.name ), ) else: self._interface = cast( BaseComputerInterface, InterfaceFactory.create_interface_for_os( os=self.os_type, ip_address=ip_address ), ) # Wait for the WebSocket interface to be ready self.logger.info("Connecting to WebSocket interface...") try: # Use a single timeout for the entire connection process # The VM should already be ready at this point, so we're just establishing the connection await self._interface.wait_for_ready(timeout=30) self.logger.info("WebSocket interface connected successfully") except TimeoutError as e: self.logger.error(f"Failed to connect to WebSocket interface at {ip_address}") raise TimeoutError( f"Could not connect to WebSocket interface at {ip_address}:8000/ws: {str(e)}" ) # self.logger.warning( # f"Could not connect to WebSocket interface at {ip_address}:8000/ws: {str(e)}, expect missing functionality" # ) # Create an event to keep the VM running in background if needed if not self.use_host_computer_server: self._stop_event = asyncio.Event() self._keep_alive_task = asyncio.create_task(self._stop_event.wait()) self.logger.info("Computer is ready") # Set the initialization flag and clear the initializing flag self._initialized = True # Set this instance as the default computer for remote decorators helpers.set_default_computer(self) self.logger.info("Computer successfully initialized") except Exception as e: raise finally: # Log initialization time for performance monitoring duration_ms = (time.time() - start_time) * 1000 self.logger.debug(f"Computer initialization took {duration_ms:.2f}ms") return async def disconnect(self) -> None: """Disconnect from the computer's WebSocket interface.""" if self._interface: self._interface.close() async def stop(self) -> None: """Disconnect from the computer's WebSocket interface and stop the computer.""" start_time = time.time() try: self.logger.info("Stopping Computer...") # In VM mode, first explicitly stop the VM, then exit the provider context if not self.use_host_computer_server and self._provider_context and self.config.vm_provider is not None: try: self.logger.info(f"Stopping VM {self.config.name}...") await self.config.vm_provider.stop_vm( name=self.config.name, storage=self.storage # Pass storage explicitly for clarity ) except Exception as e: self.logger.error(f"Error stopping VM: {e}") self.logger.verbose("Closing VM provider context...") await self.config.vm_provider.__aexit__(None, None, None) self._provider_context = None await self.disconnect() self.logger.info("Computer stopped") except Exception as e: self.logger.debug(f"Error during cleanup: {e}") # Log as debug since this might be expected finally: # Log stop time for performance monitoring duration_ms = (time.time() - start_time) * 1000 self.logger.debug(f"Computer stop process took {duration_ms:.2f}ms") return async def start(self) -> None: """Start the computer.""" await self.run() async def restart(self) -> None: """Restart the computer. If using a VM provider that supports restart, this will issue a restart without tearing down the provider context, then reconnect the interface. Falls back to stop()+run() when a provider restart is not available. """ # Host computer server: just disconnect and run again if self.use_host_computer_server: try: await self.disconnect() finally: await self.run() return # If no VM provider context yet, fall back to full run if not getattr(self, "_provider_context", None) or self.config.vm_provider is None: self.logger.info("No provider context active; performing full restart via run()") await self.run() return # Gracefully close current interface connection if present if self._interface: try: self._interface.close() except Exception as e: self.logger.debug(f"Error closing interface prior to restart: {e}") # Attempt provider-level restart if implemented try: storage_param = "ephemeral" if self.ephemeral else self.storage if hasattr(self.config.vm_provider, "restart_vm"): self.logger.info(f"Restarting VM {self.config.name} via provider...") await self.config.vm_provider.restart_vm(name=self.config.name, storage=storage_param) else: # Fallback: stop then start without leaving provider context self.logger.info(f"Provider has no restart_vm; performing stop+start for {self.config.name}...") await self.config.vm_provider.stop_vm(name=self.config.name, storage=storage_param) await self.config.vm_provider.run_vm(image=self.image, name=self.config.name, run_opts={}, storage=storage_param) except Exception as e: self.logger.error(f"Failed to restart VM via provider: {e}") # As a last resort, do a full stop (with provider context exit) and run try: await self.stop() finally: await self.run() return # Wait for VM to be ready and reconnect interface try: self.logger.info("Waiting for VM to be ready after restart...") if self.provider_type == VMProviderType.LUMIER: max_retries = 60 retry_delay = 3 else: max_retries = 30 retry_delay = 2 ip_address = await self.get_ip(max_retries=max_retries, retry_delay=retry_delay) self.logger.info(f"Re-initializing interface for {self.os_type} at {ip_address}") from .interface.base import BaseComputerInterface if self.provider_type == VMProviderType.CLOUD and self.api_key and self.config.name: self._interface = cast( BaseComputerInterface, InterfaceFactory.create_interface_for_os( os=self.os_type, ip_address=ip_address, api_key=self.api_key, vm_name=self.config.name, ), ) else: self._interface = cast( BaseComputerInterface, InterfaceFactory.create_interface_for_os( os=self.os_type, ip_address=ip_address, ), ) self.logger.info("Connecting to WebSocket interface after restart...") await self._interface.wait_for_ready(timeout=30) self.logger.info("Computer reconnected and ready after restart") except Exception as e: self.logger.error(f"Failed to reconnect after restart: {e}") # Try a full reset if reconnection failed try: await self.stop() finally: await self.run() # @property async def get_ip(self, max_retries: int = 15, retry_delay: int = 3) -> str: """Get the IP address of the VM or localhost if using host computer server. This method delegates to the provider's get_ip method, which waits indefinitely until the VM has a valid IP address. Args: max_retries: Unused parameter, kept for backward compatibility retry_delay: Delay between retries in seconds (default: 2) Returns: IP address of the VM or localhost if using host computer server """ # For host computer server, always return localhost immediately if self.use_host_computer_server: return "127.0.0.1" # Get IP from the provider - each provider implements its own waiting logic if self.config.vm_provider is None: raise RuntimeError("VM provider is not initialized") # Log that we're waiting for the IP self.logger.info(f"Waiting for VM {self.config.name} to get an IP address...") # Call the provider's get_ip method which will wait indefinitely storage_param = "ephemeral" if self.ephemeral else self.storage # Log the image being used self.logger.info(f"Running VM using image: {self.image}") # Call provider.get_ip with explicit image parameter ip = await self.config.vm_provider.get_ip( name=self.config.name, storage=storage_param, retry_delay=retry_delay ) # Log success self.logger.info(f"VM {self.config.name} has IP address: {ip}") return ip async def wait_vm_ready(self) -> Optional[Dict[str, Any]]: """Wait for VM to be ready with an IP address. Returns: VM status information or None if using host computer server. """ if self.use_host_computer_server: return None timeout = 600 # 10 minutes timeout (increased from 4 minutes) interval = 2.0 # 2 seconds between checks (increased to reduce API load) start_time = time.time() last_status = None attempts = 0 self.logger.info(f"Waiting for VM {self.config.name} to be ready (timeout: {timeout}s)...") while time.time() - start_time < timeout: attempts += 1 elapsed = time.time() - start_time try: # Keep polling for VM info if self.config.vm_provider is None: self.logger.error("VM provider is not initialized") vm = None else: vm = await self.config.vm_provider.get_vm(self.config.name) # Log full VM properties for debugging (every 30 attempts) if attempts % 30 == 0: self.logger.info( f"VM properties at attempt {attempts}: {vars(vm) if vm else 'None'}" ) # Get current status for logging current_status = getattr(vm, "status", None) if vm else None if current_status != last_status: self.logger.info( f"VM status changed to: {current_status} (after {elapsed:.1f}s)" ) last_status = current_status # Check for IP address - ensure it's not None or empty ip = getattr(vm, "ip_address", None) if vm else None if ip and ip.strip(): # Check for non-empty string self.logger.info( f"VM {self.config.name} got IP address: {ip} (after {elapsed:.1f}s)" ) return vm if attempts % 10 == 0: # Log every 10 attempts to avoid flooding self.logger.info( f"Still waiting for VM IP address... (elapsed: {elapsed:.1f}s)" ) else: self.logger.debug( f"Waiting for VM IP address... Current IP: {ip}, Status: {current_status}" ) except Exception as e: self.logger.warning(f"Error checking VM status (attempt {attempts}): {str(e)}") # If we've been trying for a while and still getting errors, log more details if elapsed > 60: # After 1 minute of errors, log more details self.logger.error(f"Persistent error getting VM status: {str(e)}") self.logger.info("Trying to get VM list for debugging...") try: if self.config.vm_provider is not None: vms = await self.config.vm_provider.list_vms() self.logger.info( f"Available VMs: {[getattr(vm, 'name', None) for vm in vms if hasattr(vm, 'name')]}" ) except Exception as list_error: self.logger.error(f"Failed to list VMs: {str(list_error)}") await asyncio.sleep(interval) # If we get here, we've timed out elapsed = time.time() - start_time self.logger.error(f"VM {self.config.name} not ready after {elapsed:.1f} seconds") # Try to get final VM status for debugging try: if self.config.vm_provider is not None: vm = await self.config.vm_provider.get_vm(self.config.name) # VM data is returned as a dictionary from the Lumier provider status = vm.get('status', 'unknown') if vm else "unknown" ip = vm.get('ip_address') if vm else None else: status = "unknown" ip = None self.logger.error(f"Final VM status: {status}, IP: {ip}") except Exception as e: self.logger.error(f"Failed to get final VM status: {str(e)}") raise TimeoutError( f"VM {self.config.name} not ready after {elapsed:.1f} seconds - IP address not assigned" ) async def update(self, cpu: Optional[int] = None, memory: Optional[str] = None): """Update VM settings.""" self.logger.info( f"Updating VM settings: CPU={cpu or self.config.cpu}, Memory={memory or self.config.memory}" ) update_opts = { "cpu": cpu or int(self.config.cpu), "memory": memory or self.config.memory } if self.config.vm_provider is not None: await self.config.vm_provider.update_vm( name=self.config.name, update_opts=update_opts, storage=self.storage # Pass storage explicitly for clarity ) else: raise RuntimeError("VM provider not initialized") def get_screenshot_size(self, screenshot: bytes) -> Dict[str, int]: """Get the dimensions of a screenshot. Args: screenshot: The screenshot bytes Returns: Dict[str, int]: Dictionary containing 'width' and 'height' of the image """ image = Image.open(io.BytesIO(screenshot)) width, height = image.size return {"width": width, "height": height} @property def interface(self): """Get the computer interface for interacting with the VM. Returns: The computer interface """ if not hasattr(self, "_interface") or self._interface is None: error_msg = "Computer interface not initialized. Call run() first." self.logger.error(error_msg) self.logger.error( "Make sure to call await computer.run() before using any interface methods." ) raise RuntimeError(error_msg) return self._interface @property def telemetry_enabled(self) -> bool: """Check if telemetry is enabled for this computer instance. Returns: bool: True if telemetry is enabled, False otherwise """ return self._telemetry_enabled async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]: """Convert normalized coordinates to screen coordinates. Args: x: X coordinate between 0 and 1 y: Y coordinate between 0 and 1 Returns: tuple[float, float]: Screen coordinates (x, y) """ return await self.interface.to_screen_coordinates(x, y) async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]: """Convert screen coordinates to screenshot coordinates. Args: x: X coordinate in screen space y: Y coordinate in screen space Returns: tuple[float, float]: (x, y) coordinates in screenshot space """ return await self.interface.to_screenshot_coordinates(x, y) # Add virtual environment management functions to computer interface async def venv_install(self, venv_name: str, requirements: list[str]): """Install packages in a virtual environment. Args: venv_name: Name of the virtual environment requirements: List of package requirements to install Returns: Tuple of (stdout, stderr) from the installation command """ requirements = requirements or [] # Windows vs POSIX handling if self.os_type == "windows": # Use %USERPROFILE% for home directory and cmd.exe semantics venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}" ensure_dir_cmd = "if not exist \"%USERPROFILE%\\.venvs\" mkdir \"%USERPROFILE%\\.venvs\"" create_cmd = f"if not exist \"{venv_path}\" python -m venv \"{venv_path}\"" requirements_str = " ".join(requirements) # Activate via activate.bat and install install_cmd = f"call \"{venv_path}\\Scripts\\activate.bat\" && pip install {requirements_str}" if requirements_str else f"echo No requirements to install" await self.interface.run_command(ensure_dir_cmd) await self.interface.run_command(create_cmd) return await self.interface.run_command(install_cmd) else: # POSIX (macOS/Linux) venv_path = f"$HOME/.venvs/{venv_name}" create_cmd = f"mkdir -p \"$HOME/.venvs\" && python3 -m venv \"{venv_path}\"" # Check if venv exists, if not create it check_cmd = f"test -d \"{venv_path}\" || ({create_cmd})" _ = await self.interface.run_command(check_cmd) # Install packages requirements_str = " ".join(requirements) install_cmd = ( f". \"{venv_path}/bin/activate\" && pip install {requirements_str}" if requirements_str else "echo No requirements to install" ) return await self.interface.run_command(install_cmd) async def venv_cmd(self, venv_name: str, command: str): """Execute a shell command in a virtual environment. Args: venv_name: Name of the virtual environment command: Shell command to execute in the virtual environment Returns: Tuple of (stdout, stderr) from the command execution """ if self.os_type == "windows": # Windows (cmd.exe) venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}" # Check existence and signal if missing check_cmd = f"if not exist \"{venv_path}\" (echo VENV_NOT_FOUND) else (echo VENV_FOUND)" result = await self.interface.run_command(check_cmd) if "VENV_NOT_FOUND" in getattr(result, "stdout", ""): # Auto-create the venv with no requirements await self.venv_install(venv_name, []) # Activate and run the command full_command = f"call \"{venv_path}\\Scripts\\activate.bat\" && {command}" return await self.interface.run_command(full_command) else: # POSIX (macOS/Linux) venv_path = f"$HOME/.venvs/{venv_name}" # Check if virtual environment exists check_cmd = f"test -d \"{venv_path}\"" result = await self.interface.run_command(check_cmd) if result.stderr or "test:" in result.stdout: # venv doesn't exist # Auto-create the venv with no requirements await self.venv_install(venv_name, []) # Activate virtual environment and run command full_command = f". \"{venv_path}/bin/activate\" && {command}" return await self.interface.run_command(full_command) async def venv_exec(self, venv_name: str, python_func, *args, **kwargs): """Execute Python function in a virtual environment using source code extraction. Args: venv_name: Name of the virtual environment python_func: A callable function to execute *args: Positional arguments to pass to the function **kwargs: Keyword arguments to pass to the function Returns: The result of the function execution, or raises any exception that occurred """ import base64 import inspect import json import textwrap try: # Get function source code using inspect.getsource source = inspect.getsource(python_func) # Remove common leading whitespace (dedent) func_source = textwrap.dedent(source).strip() # Remove decorators while func_source.lstrip().startswith("@"): func_source = func_source.split("\n", 1)[1].strip() # Get function name for execution func_name = python_func.__name__ # Serialize args and kwargs as JSON (safer than dill for cross-version compatibility) args_json = json.dumps(args, default=str) kwargs_json = json.dumps(kwargs, default=str) except OSError as e: raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}") except Exception as e: raise Exception(f"Failed to reconstruct function source: {e}") # Create Python code that will define and execute the function python_code = f''' import json import traceback try: # Define the function from source {textwrap.indent(func_source, " ")} # Deserialize args and kwargs from JSON args_json = """{args_json}""" kwargs_json = """{kwargs_json}""" args = json.loads(args_json) kwargs = json.loads(kwargs_json) # Execute the function result = {func_name}(*args, **kwargs) # Create success output payload output_payload = {{ "success": True, "result": result, "error": None }} except Exception as e: # Create error output payload output_payload = {{ "success": False, "result": None, "error": {{ "type": type(e).__name__, "message": str(e), "traceback": traceback.format_exc() }} }} # Serialize the output payload as JSON import json output_json = json.dumps(output_payload, default=str) # Print the JSON output with markers print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>") ''' # Encode the Python code in base64 to avoid shell escaping issues encoded_code = base64.b64encode(python_code.encode('utf-8')).decode('ascii') # Execute the Python code in the virtual environment python_command = f"python -c \"import base64; exec(base64.b64decode('{encoded_code}').decode('utf-8'))\"" result = await self.venv_cmd(venv_name, python_command) # Parse the output to extract the payload start_marker = "<<<VENV_EXEC_START>>>" end_marker = "<<<VENV_EXEC_END>>>" # Print original stdout print(result.stdout[:result.stdout.find(start_marker)]) if start_marker in result.stdout and end_marker in result.stdout: start_idx = result.stdout.find(start_marker) + len(start_marker) end_idx = result.stdout.find(end_marker) if start_idx < end_idx: output_json = result.stdout[start_idx:end_idx] try: # Decode and deserialize the output payload from JSON output_payload = json.loads(output_json) except Exception as e: raise Exception(f"Failed to decode output payload: {e}") if output_payload["success"]: return output_payload["result"] else: # Recreate and raise the original exception error_info = output_payload["error"] error_class = eval(error_info["type"]) raise error_class(error_info["message"]) else: raise Exception("Invalid output format: markers found but no content between them") else: # Fallback: return stdout/stderr if no payload markers found raise Exception(f"No output payload found. stdout: {result.stdout}, stderr: {result.stderr}") ``` -------------------------------------------------------------------------------- /libs/python/computer-server/computer_server/diorama/draw.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """Diorama Renderer - A tool for rendering selective views of macOS desktops This script renders filtered views of the macOS desktop, preserving only selected applications while maintaining system UI elements like menubar and dock. Each "diorama" shows a consistent view of the system while isolating specific applications. The image is "smart resized" to remove any empty space around the menubar and dock. Key features: - Captures shared window state, z-order and position information - Filters windows by application based on whitelist - Preserves system context (menubar, dock) in each view - Preserves menu-owning / keyboard-focused window in each view - Supports parallel views of the same desktop for multi-agent systems """ import sys import os import time import argparse from typing import List, Dict, Any, Optional, Tuple import json from PIL import Image, ImageDraw import io import asyncio import functools import logging # simple, nicely formatted logging logger = logging.getLogger(__name__) from computer_server.diorama.safezone import ( get_menubar_bounds, get_dock_bounds, ) # Timing decorator for profiling def timing_decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() elapsed_time = end_time - start_time logger.debug(f"Function {func.__name__} took {elapsed_time:.4f} seconds to run") return result return wrapper # Import Objective-C bridge libraries try: import Quartz import AppKit from ApplicationServices import ( AXUIElementCreateSystemWide, # type: ignore AXUIElementCreateApplication, # type: ignore AXUIElementCopyAttributeValue, # type: ignore AXUIElementCopyAttributeValues, # type: ignore kAXFocusedWindowAttribute, # type: ignore kAXWindowsAttribute, # type: ignore kAXMainWindowAttribute, # type: ignore kAXChildrenAttribute, # type: ignore kAXRoleAttribute, # type: ignore kAXTitleAttribute, # type: ignore kAXValueAttribute, # type: ignore kAXDescriptionAttribute, # type: ignore kAXEnabledAttribute, # type: ignore kAXPositionAttribute, # type: ignore kAXSizeAttribute, # type: ignore kAXErrorSuccess, # type: ignore AXValueGetType, # type: ignore kAXValueCGSizeType, # type: ignore kAXValueCGPointType, # type: ignore kAXValueCFRangeType, # type: ignore AXUIElementGetTypeID, # type: ignore AXValueGetValue, # type: ignore kAXVisibleChildrenAttribute, # type: ignore kAXRoleDescriptionAttribute, # type: ignore kAXFocusedApplicationAttribute, # type: ignore kAXFocusedUIElementAttribute, # type: ignore kAXSelectedTextAttribute, # type: ignore kAXSelectedTextRangeAttribute, # type: ignore ) from AppKit import NSWorkspace, NSApplication, NSApp, NSRunningApplication import Foundation from Foundation import NSObject, NSMakeRect import objc except ImportError: logger.error("Error: This script requires PyObjC to be installed.") logger.error("Please install it with: pip install pyobjc") sys.exit(1) # Constants for accessibility API kAXErrorSuccess = 0 kAXRoleAttribute = "AXRole" kAXTitleAttribute = "AXTitle" kAXValueAttribute = "AXValue" kAXWindowsAttribute = "AXWindows" kAXFocusedAttribute = "AXFocused" kAXPositionAttribute = "AXPosition" kAXSizeAttribute = "AXSize" kAXChildrenAttribute = "AXChildren" kAXMenuBarAttribute = "AXMenuBar" kAXMenuBarItemAttribute = "AXMenuBarItem" # Constants for window properties kCGWindowLayer = "kCGWindowLayer" # Z-order information (lower values are higher in the stack) kCGWindowAlpha = "kCGWindowAlpha" # Window opacity # Constants for application activation options NSApplicationActivationOptions = { "regular": 0, # Default activation "bringing_all_windows_forward": 1 << 0, # NSApplicationActivateAllWindows "ignoring_other_apps": 1 << 1 # NSApplicationActivateIgnoringOtherApps } def CFAttributeToPyObject(attrValue): def list_helper(list_value): list_builder = [] for item in list_value: list_builder.append(CFAttributeToPyObject(item)) return list_builder def number_helper(number_value): success, int_value = Foundation.CFNumberGetValue( # type: ignore number_value, Foundation.kCFNumberIntType, None # type: ignore ) if success: return int(int_value) success, float_value = Foundation.CFNumberGetValue( # type: ignore number_value, Foundation.kCFNumberDoubleType, None # type: ignore ) if success: return float(float_value) return None def axuielement_helper(element_value): return element_value cf_attr_type = Foundation.CFGetTypeID(attrValue) # type: ignore cf_type_mapping = { Foundation.CFStringGetTypeID(): str, # type: ignore Foundation.CFBooleanGetTypeID(): bool, # type: ignore Foundation.CFArrayGetTypeID(): list_helper, # type: ignore Foundation.CFNumberGetTypeID(): number_helper, # type: ignore AXUIElementGetTypeID(): axuielement_helper, # type: ignore } try: return cf_type_mapping[cf_attr_type](attrValue) except KeyError: # did not get a supported CF type. Move on to AX type pass ax_attr_type = AXValueGetType(attrValue) ax_type_map = { kAXValueCGSizeType: Foundation.NSSizeFromString, # type: ignore kAXValueCGPointType: Foundation.NSPointFromString, # type: ignore kAXValueCFRangeType: Foundation.NSRangeFromString, # type: ignore } try: search_result = re.search("{.*}", attrValue.description()) if search_result: extracted_str = search_result.group() return tuple(ax_type_map[ax_attr_type](extracted_str)) return None except KeyError: return None def element_attribute(element, attribute): if attribute == kAXChildrenAttribute: err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None) if err == kAXErrorSuccess: if isinstance(value, Foundation.NSArray): # type: ignore return CFAttributeToPyObject(value) else: return value err, value = AXUIElementCopyAttributeValue(element, attribute, None) if err == kAXErrorSuccess: if isinstance(value, Foundation.NSArray): # type: ignore return CFAttributeToPyObject(value) else: return value return None def element_value(element, type): err, value = AXValueGetValue(element, type, None) if err == True: return value return None @timing_decorator def get_running_apps() -> List[NSRunningApplication]: """Get list of all running applications Returns: List of NSRunningApplication objects """ return NSWorkspace.sharedWorkspace().runningApplications() # @timing_decorator def get_app_info(app: NSRunningApplication) -> Dict[str, Any]: """Get information about an application Args: app: NSRunningApplication object Returns: Dictionary with application information """ return { "name": app.localizedName(), "bundle_id": app.bundleIdentifier(), "pid": app.processIdentifier(), "active": app.isActive(), "hidden": app.isHidden(), "terminated": app.isTerminated(), } @timing_decorator def get_all_windows() -> List[Dict[str, Any]]: """Get all windows from all applications with z-order information Returns: List of window dictionaries with z-order information """ # Get all windows from Quartz # The kCGWindowListOptionOnScreenOnly flag gets only visible windows with preserved z-order window_list = Quartz.CGWindowListCopyWindowInfo( Quartz.kCGWindowListOptionOnScreenOnly, Quartz.kCGNullWindowID ) # Create a dictionary of window z-order z_order = {window['kCGWindowNumber']: z_index for z_index, window in enumerate(window_list[::-1])} # The kCGWindowListOptionAll flag gets all windows *without* z-order preserved window_list_all = Quartz.CGWindowListCopyWindowInfo( Quartz.kCGWindowListOptionAll, Quartz.kCGNullWindowID ) # Process all windows windows = [] for window in window_list_all: # We track z_index which is the index in the window list (0 is the desktop / background) # Get window properties window_id = window.get('kCGWindowNumber', 0) window_name = window.get('kCGWindowName', '') window_pid = window.get('kCGWindowOwnerPID', 0) window_bounds = window.get('kCGWindowBounds', {}) window_owner = window.get('kCGWindowOwnerName', '') window_is_on_screen = window.get('kCGWindowIsOnscreen', False) # Get z-order information # Note: kCGWindowLayer provides the system's layer value (lower values are higher in the stack) layer = window.get(kCGWindowLayer, 0) opacity = window.get(kCGWindowAlpha, 1.0) z_index = z_order.get(window_id, -1) # Determine window role (desktop, dock, menubar, app) if window_name == "Dock" and window_owner == "Dock": role = "dock" elif window_name == "Menubar" and window_owner == "Window Server": role = "menubar" elif window_owner in ["Window Server", "Dock"]: role = "desktop" else: role = "app" # Only include windows with valid bounds if window_bounds: windows.append({ "id": window_id, "name": window_name or "Unnamed Window", "pid": window_pid, "owner": window_owner, "role": role, "is_on_screen": window_is_on_screen, "bounds": { "x": window_bounds.get('X', 0), "y": window_bounds.get('Y', 0), "width": window_bounds.get('Width', 0), "height": window_bounds.get('Height', 0) }, "layer": layer, # System layer (lower values are higher in stack) "z_index": z_index, # Our z-index (0 is the desktop) "opacity": opacity }) windows = sorted(windows, key=lambda x: x["z_index"]) return windows def get_app_windows(app_pid: int, all_windows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Get all windows for a specific application Args: app_pid: Process ID of the application all_windows: List of all windows with z-order information Returns: List of window dictionaries for the app """ # Filter windows by PID return [window for window in all_windows if window["pid"] == app_pid] @timing_decorator def draw_desktop_screenshot(app_whitelist: List[str] = None, all_windows: List[Dict[str, Any]] = None, dock_bounds: Dict[str, float] = None, dock_items: List[Dict[str, Any]] = None, menubar_bounds: Dict[str, float] = None, menubar_items: List[Dict[str, Any]] = None) -> Tuple[Optional[Image.Image], List[Dict[str, Any]]]: """Capture a screenshot of the entire desktop using Quartz compositing, including dock as a second pass. Args: app_whitelist: Optional list of app names to include in the screenshot Returns: PIL Image of the desktop or None if capture failed """ import ctypes if dock_bounds is None: dock_bounds = get_dock_bounds() if dock_items is None: dock_items = get_dock_items() if menubar_bounds is None: menubar_bounds = get_menubar_bounds() if menubar_items is None: menubar_items = get_menubar_items() if all_windows is None: all_windows = get_all_windows() all_windows = all_windows[::-1] all_windows = [window for window in all_windows if window["is_on_screen"]] main_screen = AppKit.NSScreen.mainScreen() if main_screen: frame = main_screen.frame() screen_rect = Quartz.CGRectMake(0, 0, frame.size.width, frame.size.height) else: screen_rect = Quartz.CGRectNull # Screenshot-to-screen hitboxes hitboxes = [] if app_whitelist is None: # Single pass: desktop, menubar, app, dock window_list = Foundation.CFArrayCreateMutable(None, len(all_windows), None) for window in all_windows: Foundation.CFArrayAppendValue(window_list, window["id"]) cg_image = Quartz.CGWindowListCreateImageFromArray( screen_rect, window_list, Quartz.kCGWindowImageDefault ) if cg_image is None: return None # Create CGContext for compositing width = int(frame.size.width) height = int(frame.size.height) color_space = Quartz.CGColorSpaceCreateWithName(Quartz.kCGColorSpaceSRGB) cg_context = Quartz.CGBitmapContextCreate( None, width, height, 8, 0, color_space, Quartz.kCGImageAlphaPremultipliedLast ) Quartz.CGContextDrawImage(cg_context, screen_rect, cg_image) hitboxes.append({ "hitbox": [0, 0, width, height], "target": [0, 0, width, height] }) else: # Filter out windows that are not in the whitelist all_windows = [window for window in all_windows if window["owner"] in app_whitelist or window["role"] != "app"] app_windows = [window for window in all_windows if window["role"] == "app"] dock_orientation = "side" if dock_bounds["width"] < dock_bounds["height"] else "bottom" menubar_length = max(item["bounds"]["x"] + item["bounds"]["width"] for item in menubar_items) if menubar_items else 0 # Calculate bounds of app windows app_bounds = { "x": min(window["bounds"]["x"] for window in app_windows) if app_windows else 0, "y": min(window["bounds"]["y"] for window in app_windows) if app_windows else 0, } app_bounds["width"] = max(window["bounds"]["x"] + window["bounds"]["width"] for window in app_windows) - app_bounds["x"] if app_windows else 0 app_bounds["height"] = max(window["bounds"]["y"] + window["bounds"]["height"] for window in app_windows) - app_bounds["y"] if app_windows else 0 # Set minimum bounds of 256x256 app_bounds["width"] = max(app_bounds["width"], 256) app_bounds["height"] = max(app_bounds["height"], 256) # Add dock bounds to app bounds if dock_orientation == "bottom": app_bounds["height"] += dock_bounds["height"] + 4 elif dock_orientation == "side": if dock_bounds["x"] > frame.size.width / 2: app_bounds["width"] += dock_bounds["width"] + 4 else: app_bounds["x"] -= dock_bounds["width"] + 4 app_bounds["width"] += dock_bounds["width"] + 4 # Add menubar bounds to app bounds app_bounds["height"] += menubar_bounds["height"] # Make sure app bounds contains menubar bounds app_bounds["width"] = max(app_bounds["width"], menubar_length) # Clamp bounds to screen app_bounds["x"] = max(app_bounds["x"], 0) app_bounds["y"] = max(app_bounds["y"], 0) app_bounds["width"] = min(app_bounds["width"], frame.size.width - app_bounds["x"]) app_bounds["height"] = min(app_bounds["height"], frame.size.height - app_bounds["y"] + menubar_bounds["height"]) # Create CGContext for compositing width = int(app_bounds["width"]) height = int(app_bounds["height"]) color_space = Quartz.CGColorSpaceCreateWithName(Quartz.kCGColorSpaceSRGB) cg_context = Quartz.CGBitmapContextCreate( None, width, height, 8, 0, color_space, Quartz.kCGImageAlphaPremultipliedLast ) def _draw_layer(cg_context, all_windows, source_rect, target_rect): """Draw a layer of windows from source_rect to target_rect on the given context.""" window_list = Foundation.CFArrayCreateMutable(None, len(all_windows), None) for window in all_windows: Foundation.CFArrayAppendValue(window_list, window["id"]) cg_image = Quartz.CGWindowListCreateImageFromArray( source_rect, window_list, Quartz.kCGWindowImageDefault ) if cg_image is not None: Quartz.CGContextDrawImage(cg_context, target_rect, cg_image) # --- FIRST PASS: desktop, apps --- source_position = [app_bounds["x"], app_bounds["y"]] source_size = [app_bounds["width"], app_bounds["height"]] target_position = [ 0, min( menubar_bounds["y"] + menubar_bounds["height"], app_bounds["y"] ) ] target_size = [app_bounds["width"], app_bounds["height"]] if dock_orientation == "bottom": source_size[1] += dock_bounds["height"] target_size[1] += dock_bounds["height"] elif dock_orientation == "side": if dock_bounds["x"] < frame.size.width / 2: source_position[0] -= dock_bounds["width"] target_position[0] -= dock_bounds["width"] source_size[0] += dock_bounds["width"] target_size[0] += dock_bounds["width"] app_source_rect = Quartz.CGRectMake( source_position[0], source_position[1], source_size[0], source_size[1] ) app_target_rect = Quartz.CGRectMake( target_position[0], app_bounds["height"] - target_position[1] - target_size[1], target_size[0], target_size[1] ) first_pass_windows = [w for w in all_windows if w["role"] == "app" or w["role"] == "desktop"] _draw_layer(cg_context, first_pass_windows, app_source_rect, app_target_rect) hitboxes.append({ "hitbox": [0, menubar_bounds["height"], app_bounds["width"], menubar_bounds["height"] + app_bounds["height"]], "target": [ app_source_rect.origin.x, app_source_rect.origin.y, app_source_rect.origin.x + app_bounds["width"], app_source_rect.origin.y + app_bounds["height"] ] }) # --- SECOND PASS: menubar --- allowed_roles = {"menubar"} menubar_windows = [w for w in all_windows if w["role"] in allowed_roles] menubar_source_rect = Quartz.CGRectMake( 0, 0, app_bounds["width"], menubar_bounds["height"] ) menubar_target_rect = Quartz.CGRectMake( 0, app_bounds["height"] - menubar_bounds["height"], app_bounds["width"], menubar_bounds["height"] ) _draw_layer(cg_context, menubar_windows, menubar_source_rect, menubar_target_rect) hitboxes.append({ "hitbox": [0, 0, app_bounds["width"], menubar_bounds["height"]], "target": [0, 0, app_bounds["width"], menubar_bounds["height"]] }) # --- THIRD PASS: dock, filtered --- # Step 1: Collect dock items to draw, with their computed target rects dock_draw_items = [] for index, item in enumerate(dock_items): source_position = (item["bounds"]["x"], item["bounds"]["y"]) source_size = (item["bounds"]["width"], item["bounds"]["height"]) # apply whitelist to middle items if not (index == 0 or index == len(dock_items) - 1): if item["subrole"] == "AXApplicationDockItem": if item["title"] not in app_whitelist: continue elif item["subrole"] == "AXMinimizedWindowDockItem": if not any(window["name"] == item["title"] and window["role"] == "app" and window["owner"] in app_whitelist for window in all_windows): continue elif item["subrole"] == "AXFolderDockItem": continue # Preserve unscaled (original) source position and size before any modification hitbox_position = source_position hitbox_size = source_size screen_position = source_position screen_size = source_size # stretch to screen size padding = 32 if dock_orientation == "bottom": source_position = (source_position[0], 0) source_size = (source_size[0], frame.size.height) hitbox_position = (source_position[0], app_bounds['height'] - hitbox_size[1]) hitbox_size = (source_size[0], hitbox_size[1]) if index == 0: source_size = (padding + source_size[0], source_size[1]) source_position = (source_position[0] - padding, 0) elif index == len(dock_items) - 1: source_size = (source_size[0] + padding, source_size[1]) source_position = (source_position[0], 0) elif dock_orientation == "side": source_position = (0, source_position[1]) source_size = (frame.size.width, source_size[1]) hitbox_position = ( source_position[0] if dock_bounds['x'] < frame.size.width / 2 else app_bounds['width'] - hitbox_size[0], source_position[1] ) hitbox_size = (hitbox_size[0], source_size[1]) if index == 0: source_size = (source_size[0], padding + source_size[1]) source_position = (0, source_position[1] - padding) elif index == len(dock_items) - 1: source_size = (source_size[0], source_size[1] + padding) source_position = (0, source_position[1]) # Compute the initial target position target_position = source_position target_size = source_size dock_draw_items.append({ "item": item, "index": index, "source_position": source_position, "source_size": source_size, "target_size": target_size, "target_position": target_position, # Will be updated after packing "hitbox_position": hitbox_position, "hitbox_size": hitbox_size, "screen_position": screen_position, "screen_size": screen_size, }) # Step 2: Pack the target rects along the main axis, removing gaps packed_positions = [] if dock_orientation == "bottom": # Pack left-to-right x_cursor = 0 for draw_item in dock_draw_items: packed_positions.append((x_cursor, draw_item["target_position"][1])) x_cursor += draw_item["target_size"][0] packed_strip_length = x_cursor # Center horizontally x_offset = (app_bounds['width'] - packed_strip_length) / 2 y_offset = (frame.size.height - app_bounds['height']) for i, draw_item in enumerate(dock_draw_items): px, py = packed_positions[i] draw_item["target_position"] = (px + x_offset, py - y_offset) # Pack unscaled source rects x_cursor = 0 for draw_item in dock_draw_items: draw_item["hitbox_position"] = (x_cursor, draw_item["hitbox_position"][1]) x_cursor += draw_item["hitbox_size"][0] packed_strip_length = x_cursor # Center horizontally x_offset = (app_bounds['width'] - packed_strip_length) / 2 for i, draw_item in enumerate(dock_draw_items): px, py = draw_item["hitbox_position"] draw_item["hitbox_position"] = (px + x_offset, py) elif dock_orientation == "side": # Pack top-to-bottom y_cursor = 0 for draw_item in dock_draw_items: packed_positions.append((draw_item["target_position"][0], y_cursor)) y_cursor += draw_item["target_size"][1] packed_strip_length = y_cursor # Center vertically y_offset = (app_bounds['height'] - packed_strip_length) / 2 x_offset = 0 if dock_bounds['x'] < frame.size.width / 2 else frame.size.width - app_bounds['width'] for i, draw_item in enumerate(dock_draw_items): px, py = packed_positions[i] draw_item["target_position"] = (px - x_offset, py + y_offset) # Pack unscaled source rects y_cursor = 0 for draw_item in dock_draw_items: draw_item["hitbox_position"] = (draw_item["hitbox_position"][0], y_cursor) y_cursor += draw_item["hitbox_size"][1] packed_strip_length = y_cursor # Center vertically y_offset = (app_bounds['height'] - packed_strip_length) / 2 for i, draw_item in enumerate(dock_draw_items): px, py = draw_item["hitbox_position"] draw_item["hitbox_position"] = (px, py + y_offset) dock_windows = [window for window in all_windows if window["role"] == "dock"] # Step 3: Draw dock items using packed and recentered positions for draw_item in dock_draw_items: item = draw_item["item"] source_position = draw_item["source_position"] source_size = draw_item["source_size"] target_position = draw_item["target_position"] target_size = draw_item["target_size"] # flip target position y target_position = (target_position[0], app_bounds['height'] - target_position[1] - target_size[1]) source_rect = Quartz.CGRectMake(*source_position, *source_size) target_rect = Quartz.CGRectMake(*target_position, *target_size) _draw_layer(cg_context, dock_windows, source_rect, target_rect) hitbox_position = draw_item["hitbox_position"] hitbox_size = draw_item["hitbox_size"] # Debug: Draw true hitbox rect (packed position, unscaled size) # # Flip y like target_rect # hitbox_position_flipped = ( # hitbox_position[0], # app_bounds['height'] - hitbox_position[1] - hitbox_size[1] # ) # hitbox_rect = Quartz.CGRectMake(*hitbox_position_flipped, *hitbox_size) # Quartz.CGContextSetStrokeColorWithColor(cg_context, Quartz.CGColorCreateGenericRGB(0, 1, 0, 1)) # Quartz.CGContextStrokeRect(cg_context, hitbox_rect) hitboxes.append({ "hitbox": [*hitbox_position, hitbox_position[0] + hitbox_size[0], hitbox_position[1] + hitbox_size[1]], "target": [*draw_item["screen_position"], draw_item["screen_position"][0] + draw_item["screen_size"][0], draw_item["screen_position"][1] + draw_item["screen_size"][1]] }) # Convert composited context to CGImage final_cg_image = Quartz.CGBitmapContextCreateImage(cg_context) ns_image = AppKit.NSImage.alloc().initWithCGImage_size_(final_cg_image, Foundation.NSZeroSize) ns_data = ns_image.TIFFRepresentation() bitmap_rep = AppKit.NSBitmapImageRep.imageRepWithData_(ns_data) png_data = bitmap_rep.representationUsingType_properties_(AppKit.NSBitmapImageFileTypePNG, None) image_data = io.BytesIO(png_data) return Image.open(image_data), hitboxes @timing_decorator def get_menubar_items(active_app_pid: int = None) -> List[Dict[str, Any]]: """Get menubar items from the active application using Accessibility API Args: active_app_pid: PID of the active application Returns: List of dictionaries with menubar item information """ menubar_items = [] if active_app_pid is None: # Get the frontmost application's PID if none provided frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication() if frontmost_app: active_app_pid = frontmost_app.processIdentifier() else: logger.error("Error: Could not determine frontmost application") return menubar_items # Create an accessibility element for the application app_element = AXUIElementCreateApplication(active_app_pid) if app_element is None: logger.error(f"Error: Could not create accessibility element for PID {active_app_pid}") return menubar_items # Get the menubar menubar = element_attribute(app_element, kAXMenuBarAttribute) if menubar is None: logger.error(f"Error: Could not get menubar for application with PID {active_app_pid}") return menubar_items # Get the menubar items children = element_attribute(menubar, kAXChildrenAttribute) if children is None: logger.error("Error: Could not get menubar items") return menubar_items # Process each menubar item for i in range(len(children)): item = children[i] # Get item title title = element_attribute(item, kAXTitleAttribute) or "Untitled" # Create bounding box bounds = { "x": 0, "y": 0, "width": 0, "height": 0 } # Get item position position_value = element_attribute(item, kAXPositionAttribute) if position_value: position_value = element_value(position_value, kAXValueCGPointType) bounds["x"] = position_value.x bounds["y"] = position_value.y # Get item size size_value = element_attribute(item, kAXSizeAttribute) if size_value: size_value = element_value(size_value, kAXValueCGSizeType) bounds["width"] = size_value.width bounds["height"] = size_value.height # Add to list menubar_items.append({ "title": title, "bounds": bounds, "index": i, "app_pid": active_app_pid }) return menubar_items @timing_decorator def get_dock_items() -> List[Dict[str, Any]]: """Get all items in the macOS Dock Returns: List of dictionaries with Dock item information """ dock_items = [] # Find the Dock process dock_pid = None running_apps = get_running_apps() for app in running_apps: if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock": dock_pid = app.processIdentifier() break if dock_pid is None: logger.error("Error: Could not find Dock process") return dock_items # Create an accessibility element for the Dock dock_element = AXUIElementCreateApplication(dock_pid) if dock_element is None: logger.error(f"Error: Could not create accessibility element for Dock (PID {dock_pid})") return dock_items # Get the Dock's main element dock_list = element_attribute(dock_element, kAXChildrenAttribute) if dock_list is None or len(dock_list) == 0: logger.error("Error: Could not get Dock children") return dock_items # Find the Dock's application list (usually the first child) dock_app_list = None for child in dock_list: role = element_attribute(child, kAXRoleAttribute) if role == "AXList": dock_app_list = child break if dock_app_list is None: logger.error("Error: Could not find Dock application list") return dock_items # Get all items in the Dock items = element_attribute(dock_app_list, kAXChildrenAttribute) if items is None: logger.error("Error: Could not get Dock items") return dock_items # Process each Dock item for i, item in enumerate(items): # Get item attributes title = element_attribute(item, kAXTitleAttribute) or "Untitled" description = element_attribute(item, "AXDescription") or "" role = element_attribute(item, kAXRoleAttribute) or "" subrole = element_attribute(item, "AXSubrole") or "" # Create bounding box bounds = { "x": 0, "y": 0, "width": 0, "height": 0 } # Get item position position_value = element_attribute(item, kAXPositionAttribute) if position_value: position_value = element_value(position_value, kAXValueCGPointType) bounds["x"] = position_value.x bounds["y"] = position_value.y # Get item size size_value = element_attribute(item, kAXSizeAttribute) if size_value: size_value = element_value(size_value, kAXValueCGSizeType) bounds["width"] = size_value.width bounds["height"] = size_value.height # Determine if this is an application, file/folder, or separator item_type = "unknown" if subrole == "AXApplicationDockItem": item_type = "application" elif subrole == "AXFolderDockItem": item_type = "folder" elif subrole == "AXDocumentDockItem": item_type = "document" elif subrole == "AXSeparatorDockItem" or role == "AXSeparator": item_type = "separator" elif "trash" in title.lower(): item_type = "trash" # Add to list dock_items.append({ "title": title, "description": description, "bounds": bounds, "index": i, "type": item_type, "role": role, "subrole": subrole }) return dock_items class AppActivationContext: def __init__(self, active_app_pid=None, active_app_to_use="", logger=None): self.active_app_pid = active_app_pid self.active_app_to_use = active_app_to_use self.logger = logger self.frontmost_app = None def __enter__(self): from AppKit import NSWorkspace if self.active_app_pid: if self.logger and self.active_app_to_use: self.logger.debug(f"Automatically activating app '{self.active_app_to_use}' for screenshot composition") self.frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication() running_apps_list = NSWorkspace.sharedWorkspace().runningApplications() for app in running_apps_list: if app.processIdentifier() == self.active_app_pid: app.activateWithOptions_(0) # sleep for 0.5 seconds time.sleep(0.5) break return self def __exit__(self, exc_type, exc_val, exc_tb): if self.frontmost_app: # sleep for 0.5 seconds time.sleep(0.5) self.frontmost_app.activateWithOptions_(0) def get_frontmost_and_active_app(all_windows, running_apps, app_whitelist): from AppKit import NSWorkspace frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication() active_app_to_use = None active_app_pid = None # Find the topmost (highest z_index) non-filtered app for window in reversed(all_windows): owner = window.get("owner") role = window.get("role") is_on_screen = window.get("is_on_screen") # Skip non-app windows if role != "app": continue # Skip not-on-screen windows if not is_on_screen: continue # Skip filtered apps if app_whitelist is not None and owner not in app_whitelist: continue # Found a suitable app active_app_to_use = owner active_app_pid = window.get("pid") break # If no suitable app found, use Finder if active_app_to_use is None: active_app_to_use = "Finder" for app in running_apps: if app.localizedName() == "Finder": active_app_pid = app.processIdentifier() break return frontmost_app, active_app_to_use, active_app_pid def capture_all_apps(save_to_disk: bool = False, app_whitelist: List[str] = None, output_dir: str = None, take_focus: bool = True) -> Tuple[Dict[str, Any], Optional[Image.Image]]: """Capture screenshots of all running applications Args: save_to_disk: Whether to save screenshots to disk app_whitelist: Optional list of app names to include in the recomposited screenshot (will always include 'Window Server' and 'Dock') Returns: Dictionary with application information and screenshots Optional PIL Image of the recomposited screenshot """ result = { "timestamp": time.time(), "applications": [], "windows": [], # New array to store all windows, including those without apps "menubar_items": [], # New array to store menubar items "dock_items": [] # New array to store dock items } # Get all windows with z-order information all_windows = get_all_windows() # Get all running applications running_apps = get_running_apps() frontmost_app, active_app_to_use, active_app_pid = get_frontmost_and_active_app(all_windows, running_apps, app_whitelist) if take_focus else (None, None, None) # Use AppActivationContext to activate the app and restore focus with AppActivationContext(active_app_pid, active_app_to_use, logger): # Process applications for app in running_apps: # Skip system apps without a bundle ID if app.bundleIdentifier() is None: continue app_info = get_app_info(app) app_windows = get_app_windows(app.processIdentifier(), all_windows) app_data = { "info": app_info, "windows": [ window["id"] for window in app_windows ] } result["applications"].append(app_data) # Add all windows to the result result["windows"] = all_windows # Get menubar items from the active application menubar_items = get_menubar_items(active_app_pid) result["menubar_items"] = menubar_items # Get dock items dock_items = get_dock_items() result["dock_items"] = dock_items # Get menubar bounds menubar_bounds = get_menubar_bounds() result["menubar_bounds"] = menubar_bounds # Get dock bounds dock_bounds = get_dock_bounds() result["dock_bounds"] = dock_bounds # Capture the entire desktop using Quartz compositing desktop_screenshot, hitboxes = draw_desktop_screenshot(app_whitelist, all_windows, dock_bounds, dock_items, menubar_bounds, menubar_items) result["hitboxes"] = hitboxes from PIL import Image, ImageDraw, ImageChops def _draw_hitboxes(img, hitboxes, key="target"): """ Overlay opaque colored rectangles for each hitbox (using hitbox[key]) with color depending on index, then multiply overlay onto img. Args: img: PIL.Image (RGBA or RGB) hitboxes: list of dicts with 'hitbox' and 'target' keys key: 'hitbox' or 'target' Returns: PIL.Image with overlayed hitboxes (same mode/size as input) """ # Ensure RGBA mode for blending base = img.convert("RGBA") overlay = Image.new("RGBA", base.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(overlay) # Distinct colors for order colors = [ (255, 0, 0, 180), # Red (0, 255, 0, 180), # Green (0, 0, 255, 180), # Blue (255, 255, 0, 180), # Yellow (0, 255, 255, 180), # Cyan (255, 0, 255, 180), # Magenta (255, 128, 0, 180), # Orange (128, 0, 255, 180), # Purple (0, 128, 255, 180), # Sky blue (128, 255, 0, 180), # Lime ] # Set minimum brightness for colors min_brightness = 0 colors = [ (max(min_brightness, c[0]), max(min_brightness, c[1]), max(min_brightness, c[2]), c[3]) for c in colors ] for i, h in enumerate(hitboxes): rect = h.get(key) color = colors[i % len(colors)] if rect: draw.rectangle(rect, fill=color) # Multiply blend overlay onto base result = ImageChops.multiply(base, overlay) return result # DEBUG: Save hitboxes to disk if desktop_screenshot and save_to_disk and output_dir: desktop_path = os.path.join(output_dir, "desktop.png") desktop_screenshot.save(desktop_path) result["desktop_screenshot"] = desktop_path logger.info(f"Saved desktop screenshot to {desktop_path}") if app_whitelist: # Take screenshot without whitelist desktop_screenshot_full, hitboxes_full = draw_desktop_screenshot( None, all_windows, dock_bounds, dock_items, menubar_bounds, menubar_items) # Draw hitboxes on both images using overlay img1 = _draw_hitboxes(desktop_screenshot.copy(), hitboxes, key="hitbox") img2 = _draw_hitboxes(desktop_screenshot_full.copy(), hitboxes, key="target") if desktop_screenshot_full else None if img2 and hitboxes_full: # Compose side-by-side from PIL import Image width = img1.width + img2.width height = max(img1.height, img2.height) combined = Image.new('RGBA', (width, height), (0, 0, 0, 0)) combined.paste(img1, (0, 0)) combined.paste(img2, (img1.width, 0)) side_by_side_path = os.path.join(output_dir, "side_by_side_hitboxes.png") combined.save(side_by_side_path) result["side_by_side_hitboxes"] = side_by_side_path else: # Overlay hitboxes using new function hitbox_img = _draw_hitboxes(desktop_screenshot.copy(), hitboxes, key="hitbox") hitbox_path = os.path.join(output_dir, "hitboxes.png") hitbox_img.save(hitbox_path) result["hitbox_screenshot"] = hitbox_path # Focus restoration is now handled by AppActivationContext return result, desktop_screenshot async def run_capture(): """Run the screenshot capture asynchronously""" # Parse command line arguments parser = argparse.ArgumentParser(description="Capture screenshots of running macOS applications") parser.add_argument("--output", "-o", help="Output directory for screenshots", default="app_screenshots") parser.add_argument("--filter", "-f", nargs="+", help="Filter recomposited screenshot to only include specified apps") parser.add_argument("--menubar", "-m", action="store_true", help="List menubar and status items with their bounding boxes") parser.add_argument("--dock", "-d", action="store_true", help="List Dock items with their bounding boxes") parser.add_argument("--demo", nargs="*", help="Demo mode: pass app names to capture individual and combinations, create mosaic PNG") args = parser.parse_args() # Create output directory in the current directory if not absolute if not os.path.isabs(args.output): output_dir = os.path.join(os.getcwd(), args.output) else: output_dir = args.output # DEMO MODE: capture each app and all non-empty combinations, then mosaic if args.demo: from PIL import Image demo_apps = args.demo print(f"Running in DEMO mode for apps: {demo_apps}") groups = [] for item in demo_apps: if "/" in item: group = [x.strip() for x in item.split("/") if x.strip()] else: group = [item.strip()] if group: groups.append(group) screenshots = [] for group in groups: print(f"Capturing for apps: {group}") _, img = capture_all_apps(app_whitelist=group) if img: screenshots.append((group, img)) if not screenshots: print("No screenshots captured in demo mode.") return # Mosaic-pack: grid (rows of sqrt(N)) def make_mosaic(images, pad=64, bg=(30,30,30)): import rpack sizes = [(img.width + pad, img.height + pad) for _, img in images] positions = rpack.pack(sizes) # Find the bounding box for the mosaic max_x = max(x + w for (x, y), (w, h) in zip(positions, sizes)) max_y = max(y + h for (x, y), (w, h) in zip(positions, sizes)) mosaic = Image.new("RGBA", (max_x, max_y), bg) for (group, img), (x, y) in zip(images, positions): mosaic.paste(img, (x, y)) return mosaic mosaic_img = make_mosaic(screenshots) mosaic_path = os.path.join(output_dir, "demo_mosaic.png") os.makedirs(output_dir, exist_ok=True) mosaic_img.save(mosaic_path) print(f"Demo mosaic saved to: {mosaic_path}") return # Capture all apps and save to disk, including a recomposited screenshot print(f"Capturing screenshots of all running applications...") print(f"Saving screenshots to: {output_dir}") # If filter is provided, show what we're filtering by if args.filter: print(f"Filtering recomposited screenshot to only include: {', '.join(args.filter)} (plus Window Server and Dock)") result, img = capture_all_apps( save_to_disk=True, app_whitelist=args.filter, output_dir=output_dir, take_focus=True ) # Print summary print(f"\nCapture complete!") print(f"Captured {len(result['applications'])} applications") total_app_windows = sum(len(app["windows"]) for app in result["applications"]) print(f"Total application windows captured: {total_app_windows}") print(f"Total standalone windows captured: {len(result['windows'])}") # Print details of each application print("\nApplication details:") for app in result["applications"]: app_info = app["info"] windows = app["windows"] print(f" - {app_info['name']} ({len(windows)} windows)") # Print recomposited screenshot path if available if "desktop_screenshot" in result: print(f"\nRecomposited screenshot saved to: {result['desktop_screenshot']}") # Print menubar items if requested if args.menubar and "menubar_items" in result: print("\nMenubar items:") # Find app name for the PID app_name_by_pid = {} for app in result["applications"]: app_info = app["info"] app_name_by_pid[app_info["pid"]] = app_info["name"] for item in result["menubar_items"]: print(f" - {item['title']}") print(f" Bounds: x={item['bounds']['x']}, y={item['bounds']['y']}, width={item['bounds']['width']}, height={item['bounds']['height']}") if "app_pid" in item: app_name = app_name_by_pid.get(item["app_pid"], f"Unknown App (PID: {item['app_pid']})") print(f" App: {app_name} (PID: {item['app_pid']})") if "window_id" in item: print(f" Window ID: {item['window_id']}") if "owner" in item: print(f" Owner: {item['owner']}") if "layer" in item and "z_index" in item: print(f" Layer: {item['layer']}, Z-Index: {item['z_index']}") print("") # Print dock items if requested if args.dock and "dock_items" in result: print("\nDock items:") for item in result["dock_items"]: print(f" - {item['title']} ({item['type']})") print(f" Description: {item['description']}") print(f" Bounds: x={item['bounds']['x']}, y={item['bounds']['y']}, width={item['bounds']['width']}, height={item['bounds']['height']}") print(f" Role: {item['role']}, Subrole: {item['subrole']}") print(f" Index: {item['index']}") print("") # Save the metadata to a JSON file metadata_path = os.path.join(output_dir, "metadata.json") with open(metadata_path, "w") as f: json.dump(result, f, indent=2) print(f"\nMetadata saved to: {metadata_path}") if __name__ == "__main__": asyncio.run(run_capture()) ```