This is page 11 of 16. Use http://codebase.md/trycua/cua?page={x} to view the full context. # Directory Structure ``` ├── .all-contributorsrc ├── .cursorignore ├── .devcontainer │ ├── devcontainer.json │ ├── post-install.sh │ └── README.md ├── .dockerignore ├── .gitattributes ├── .github │ ├── FUNDING.yml │ ├── scripts │ │ ├── get_pyproject_version.py │ │ └── tests │ │ ├── __init__.py │ │ ├── README.md │ │ └── test_get_pyproject_version.py │ └── workflows │ ├── ci-lume.yml │ ├── docker-publish-kasm.yml │ ├── docker-publish-xfce.yml │ ├── docker-reusable-publish.yml │ ├── npm-publish-computer.yml │ ├── npm-publish-core.yml │ ├── publish-lume.yml │ ├── pypi-publish-agent.yml │ ├── pypi-publish-computer-server.yml │ ├── pypi-publish-computer.yml │ ├── pypi-publish-core.yml │ ├── pypi-publish-mcp-server.yml │ ├── pypi-publish-pylume.yml │ ├── pypi-publish-som.yml │ ├── pypi-reusable-publish.yml │ └── test-validation-script.yml ├── .gitignore ├── .vscode │ ├── docs.code-workspace │ ├── launch.json │ ├── libs-ts.code-workspace │ ├── lume.code-workspace │ ├── lumier.code-workspace │ ├── py.code-workspace │ └── settings.json ├── blog │ ├── app-use.md │ ├── assets │ │ ├── composite-agents.png │ │ ├── docker-ubuntu-support.png │ │ ├── hack-booth.png │ │ ├── hack-closing-ceremony.jpg │ │ ├── hack-cua-ollama-hud.jpeg │ │ ├── hack-leaderboard.png │ │ ├── hack-the-north.png │ │ ├── hack-winners.jpeg │ │ ├── hack-workshop.jpeg │ │ ├── hud-agent-evals.png │ │ └── trajectory-viewer.jpeg │ ├── bringing-computer-use-to-the-web.md │ ├── build-your-own-operator-on-macos-1.md │ ├── build-your-own-operator-on-macos-2.md │ ├── composite-agents.md │ ├── cua-hackathon.md │ ├── hack-the-north.md │ ├── hud-agent-evals.md │ ├── human-in-the-loop.md │ ├── introducing-cua-cloud-containers.md │ ├── lume-to-containerization.md │ ├── sandboxed-python-execution.md │ ├── training-computer-use-models-trajectories-1.md │ ├── trajectory-viewer.md │ ├── ubuntu-docker-support.md │ └── windows-sandbox.md ├── CONTRIBUTING.md ├── Development.md ├── Dockerfile ├── docs │ ├── .gitignore │ ├── .prettierrc │ ├── content │ │ └── docs │ │ ├── agent-sdk │ │ │ ├── agent-loops.mdx │ │ │ ├── benchmarks │ │ │ │ ├── index.mdx │ │ │ │ ├── interactive.mdx │ │ │ │ ├── introduction.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── osworld-verified.mdx │ │ │ │ ├── screenspot-pro.mdx │ │ │ │ └── screenspot-v2.mdx │ │ │ ├── callbacks │ │ │ │ ├── agent-lifecycle.mdx │ │ │ │ ├── cost-saving.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── logging.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── pii-anonymization.mdx │ │ │ │ └── trajectories.mdx │ │ │ ├── chat-history.mdx │ │ │ ├── custom-computer-handlers.mdx │ │ │ ├── custom-tools.mdx │ │ │ ├── customizing-computeragent.mdx │ │ │ ├── integrations │ │ │ │ ├── hud.mdx │ │ │ │ └── meta.json │ │ │ ├── message-format.mdx │ │ │ ├── meta.json │ │ │ ├── migration-guide.mdx │ │ │ ├── prompt-caching.mdx │ │ │ ├── supported-agents │ │ │ │ ├── composed-agents.mdx │ │ │ │ ├── computer-use-agents.mdx │ │ │ │ ├── grounding-models.mdx │ │ │ │ ├── human-in-the-loop.mdx │ │ │ │ └── meta.json │ │ │ ├── supported-model-providers │ │ │ │ ├── index.mdx │ │ │ │ └── local-models.mdx │ │ │ └── usage-tracking.mdx │ │ ├── computer-sdk │ │ │ ├── cloud-vm-management.mdx │ │ │ ├── commands.mdx │ │ │ ├── computer-ui.mdx │ │ │ ├── computers.mdx │ │ │ ├── meta.json │ │ │ └── sandboxed-python.mdx │ │ ├── index.mdx │ │ ├── libraries │ │ │ ├── agent │ │ │ │ └── index.mdx │ │ │ ├── computer │ │ │ │ └── index.mdx │ │ │ ├── computer-server │ │ │ │ ├── Commands.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── REST-API.mdx │ │ │ │ └── WebSocket-API.mdx │ │ │ ├── core │ │ │ │ └── index.mdx │ │ │ ├── lume │ │ │ │ ├── cli-reference.mdx │ │ │ │ ├── faq.md │ │ │ │ ├── http-api.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── meta.json │ │ │ │ └── prebuilt-images.mdx │ │ │ ├── lumier │ │ │ │ ├── building-lumier.mdx │ │ │ │ ├── docker-compose.mdx │ │ │ │ ├── docker.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ └── meta.json │ │ │ ├── mcp-server │ │ │ │ ├── client-integrations.mdx │ │ │ │ ├── configuration.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── llm-integrations.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── tools.mdx │ │ │ │ └── usage.mdx │ │ │ └── som │ │ │ ├── configuration.mdx │ │ │ └── index.mdx │ │ ├── meta.json │ │ ├── quickstart-cli.mdx │ │ ├── quickstart-devs.mdx │ │ └── telemetry.mdx │ ├── next.config.mjs │ ├── package-lock.json │ ├── package.json │ ├── pnpm-lock.yaml │ ├── postcss.config.mjs │ ├── public │ │ └── img │ │ ├── agent_gradio_ui.png │ │ ├── agent.png │ │ ├── cli.png │ │ ├── computer.png │ │ ├── som_box_threshold.png │ │ └── som_iou_threshold.png │ ├── README.md │ ├── source.config.ts │ ├── src │ │ ├── app │ │ │ ├── (home) │ │ │ │ ├── [[...slug]] │ │ │ │ │ └── page.tsx │ │ │ │ └── layout.tsx │ │ │ ├── api │ │ │ │ └── search │ │ │ │ └── route.ts │ │ │ ├── favicon.ico │ │ │ ├── global.css │ │ │ ├── layout.config.tsx │ │ │ ├── layout.tsx │ │ │ ├── llms.mdx │ │ │ │ └── [[...slug]] │ │ │ │ └── route.ts │ │ │ └── llms.txt │ │ │ └── route.ts │ │ ├── assets │ │ │ ├── discord-black.svg │ │ │ ├── discord-white.svg │ │ │ ├── logo-black.svg │ │ │ └── logo-white.svg │ │ ├── components │ │ │ ├── iou.tsx │ │ │ └── mermaid.tsx │ │ ├── lib │ │ │ ├── llms.ts │ │ │ └── source.ts │ │ └── mdx-components.tsx │ └── tsconfig.json ├── examples │ ├── agent_examples.py │ ├── agent_ui_examples.py │ ├── cloud_api_examples.py │ ├── computer_examples_windows.py │ ├── computer_examples.py │ ├── computer_ui_examples.py │ ├── computer-example-ts │ │ ├── .env.example │ │ ├── .gitignore │ │ ├── .prettierrc │ │ ├── package-lock.json │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── README.md │ │ ├── src │ │ │ ├── helpers.ts │ │ │ └── index.ts │ │ └── tsconfig.json │ ├── docker_examples.py │ ├── evals │ │ ├── hud_eval_examples.py │ │ └── wikipedia_most_linked.txt │ ├── pylume_examples.py │ ├── sandboxed_functions_examples.py │ ├── som_examples.py │ ├── utils.py │ └── winsandbox_example.py ├── img │ ├── agent_gradio_ui.png │ ├── agent.png │ ├── cli.png │ ├── computer.png │ ├── logo_black.png │ └── logo_white.png ├── libs │ ├── kasm │ │ ├── Dockerfile │ │ ├── LICENSE │ │ ├── README.md │ │ └── src │ │ └── ubuntu │ │ └── install │ │ └── firefox │ │ ├── custom_startup.sh │ │ ├── firefox.desktop │ │ └── install_firefox.sh │ ├── lume │ │ ├── .cursorignore │ │ ├── CONTRIBUTING.md │ │ ├── Development.md │ │ ├── img │ │ │ └── cli.png │ │ ├── Package.resolved │ │ ├── Package.swift │ │ ├── README.md │ │ ├── resources │ │ │ └── lume.entitlements │ │ ├── scripts │ │ │ ├── build │ │ │ │ ├── build-debug.sh │ │ │ │ ├── build-release-notarized.sh │ │ │ │ └── build-release.sh │ │ │ └── install.sh │ │ ├── src │ │ │ ├── Commands │ │ │ │ ├── Clone.swift │ │ │ │ ├── Config.swift │ │ │ │ ├── Create.swift │ │ │ │ ├── Delete.swift │ │ │ │ ├── Get.swift │ │ │ │ ├── Images.swift │ │ │ │ ├── IPSW.swift │ │ │ │ ├── List.swift │ │ │ │ ├── Logs.swift │ │ │ │ ├── Options │ │ │ │ │ └── FormatOption.swift │ │ │ │ ├── Prune.swift │ │ │ │ ├── Pull.swift │ │ │ │ ├── Push.swift │ │ │ │ ├── Run.swift │ │ │ │ ├── Serve.swift │ │ │ │ ├── Set.swift │ │ │ │ └── Stop.swift │ │ │ ├── ContainerRegistry │ │ │ │ ├── ImageContainerRegistry.swift │ │ │ │ ├── ImageList.swift │ │ │ │ └── ImagesPrinter.swift │ │ │ ├── Errors │ │ │ │ └── Errors.swift │ │ │ ├── FileSystem │ │ │ │ ├── Home.swift │ │ │ │ ├── Settings.swift │ │ │ │ ├── VMConfig.swift │ │ │ │ ├── VMDirectory.swift │ │ │ │ └── VMLocation.swift │ │ │ ├── LumeController.swift │ │ │ ├── Main.swift │ │ │ ├── Server │ │ │ │ ├── Handlers.swift │ │ │ │ ├── HTTP.swift │ │ │ │ ├── Requests.swift │ │ │ │ ├── Responses.swift │ │ │ │ └── Server.swift │ │ │ ├── Utils │ │ │ │ ├── CommandRegistry.swift │ │ │ │ ├── CommandUtils.swift │ │ │ │ ├── Logger.swift │ │ │ │ ├── NetworkUtils.swift │ │ │ │ ├── Path.swift │ │ │ │ ├── ProcessRunner.swift │ │ │ │ ├── ProgressLogger.swift │ │ │ │ ├── String.swift │ │ │ │ └── Utils.swift │ │ │ ├── Virtualization │ │ │ │ ├── DarwinImageLoader.swift │ │ │ │ ├── DHCPLeaseParser.swift │ │ │ │ ├── ImageLoaderFactory.swift │ │ │ │ └── VMVirtualizationService.swift │ │ │ ├── VM │ │ │ │ ├── DarwinVM.swift │ │ │ │ ├── LinuxVM.swift │ │ │ │ ├── VM.swift │ │ │ │ ├── VMDetails.swift │ │ │ │ ├── VMDetailsPrinter.swift │ │ │ │ ├── VMDisplayResolution.swift │ │ │ │ └── VMFactory.swift │ │ │ └── VNC │ │ │ ├── PassphraseGenerator.swift │ │ │ └── VNCService.swift │ │ └── tests │ │ ├── Mocks │ │ │ ├── MockVM.swift │ │ │ ├── MockVMVirtualizationService.swift │ │ │ └── MockVNCService.swift │ │ ├── VM │ │ │ └── VMDetailsPrinterTests.swift │ │ ├── VMTests.swift │ │ ├── VMVirtualizationServiceTests.swift │ │ └── VNCServiceTests.swift │ ├── lumier │ │ ├── .dockerignore │ │ ├── Dockerfile │ │ ├── README.md │ │ └── src │ │ ├── bin │ │ │ └── entry.sh │ │ ├── config │ │ │ └── constants.sh │ │ ├── hooks │ │ │ └── on-logon.sh │ │ └── lib │ │ ├── utils.sh │ │ └── vm.sh │ ├── python │ │ ├── agent │ │ │ ├── .bumpversion.cfg │ │ │ ├── agent │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── adapters │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── huggingfacelocal_adapter.py │ │ │ │ │ ├── human_adapter.py │ │ │ │ │ ├── mlxvlm_adapter.py │ │ │ │ │ └── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── qwen2_5_vl.py │ │ │ │ ├── agent.py │ │ │ │ ├── callbacks │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── budget_manager.py │ │ │ │ │ ├── image_retention.py │ │ │ │ │ ├── logging.py │ │ │ │ │ ├── operator_validator.py │ │ │ │ │ ├── pii_anonymization.py │ │ │ │ │ ├── prompt_instructions.py │ │ │ │ │ ├── telemetry.py │ │ │ │ │ └── trajectory_saver.py │ │ │ │ ├── cli.py │ │ │ │ ├── computers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cua.py │ │ │ │ │ └── custom.py │ │ │ │ ├── decorators.py │ │ │ │ ├── human_tool │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ ├── server.py │ │ │ │ │ └── ui.py │ │ │ │ ├── integrations │ │ │ │ │ └── hud │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── agent.py │ │ │ │ │ └── proxy.py │ │ │ │ ├── loops │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── anthropic.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── composed_grounded.py │ │ │ │ │ ├── gemini.py │ │ │ │ │ ├── glm45v.py │ │ │ │ │ ├── gta1.py │ │ │ │ │ ├── holo.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── model_types.csv │ │ │ │ │ ├── moondream3.py │ │ │ │ │ ├── omniparser.py │ │ │ │ │ ├── openai.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── uitars.py │ │ │ │ ├── proxy │ │ │ │ │ ├── examples.py │ │ │ │ │ └── handlers.py │ │ │ │ ├── responses.py │ │ │ │ ├── types.py │ │ │ │ └── ui │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ └── gradio │ │ │ │ ├── __init__.py │ │ │ │ ├── app.py │ │ │ │ └── ui_components.py │ │ │ ├── benchmarks │ │ │ │ ├── .gitignore │ │ │ │ ├── contrib.md │ │ │ │ ├── interactive.py │ │ │ │ ├── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ └── gta1.py │ │ │ │ ├── README.md │ │ │ │ ├── ss-pro.py │ │ │ │ ├── ss-v2.py │ │ │ │ └── utils.py │ │ │ ├── example.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer │ │ │ │ ├── __init__.py │ │ │ │ ├── computer.py │ │ │ │ ├── diorama_computer.py │ │ │ │ ├── helpers.py │ │ │ │ ├── interface │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ ├── models.py │ │ │ │ │ └── windows.py │ │ │ │ ├── logger.py │ │ │ │ ├── models.py │ │ │ │ ├── providers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cloud │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── docker │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── lume │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── lume_api.py │ │ │ │ │ ├── lumier │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── types.py │ │ │ │ │ └── winsandbox │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── provider.py │ │ │ │ │ └── setup_script.ps1 │ │ │ │ ├── ui │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ └── gradio │ │ │ │ │ ├── __init__.py │ │ │ │ │ └── app.py │ │ │ │ └── utils.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── cli.py │ │ │ │ ├── diorama │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── diorama_computer.py │ │ │ │ │ ├── diorama.py │ │ │ │ │ ├── draw.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── safezone.py │ │ │ │ ├── handlers │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── windows.py │ │ │ │ ├── main.py │ │ │ │ ├── server.py │ │ │ │ └── watchdog.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── usage_example.py │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ ├── run_server.py │ │ │ └── test_connection.py │ │ ├── core │ │ │ ├── .bumpversion.cfg │ │ │ ├── core │ │ │ │ ├── __init__.py │ │ │ │ └── telemetry │ │ │ │ ├── __init__.py │ │ │ │ └── posthog.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── mcp-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── CONCURRENT_SESSIONS.md │ │ │ ├── mcp_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── server.py │ │ │ │ └── session_manager.py │ │ │ ├── pdm.lock │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ └── scripts │ │ │ ├── install_mcp_server.sh │ │ │ └── start_mcp_server.sh │ │ ├── pylume │ │ │ ├── __init__.py │ │ │ ├── .bumpversion.cfg │ │ │ ├── pylume │ │ │ │ ├── __init__.py │ │ │ │ ├── client.py │ │ │ │ ├── exceptions.py │ │ │ │ ├── lume │ │ │ │ ├── models.py │ │ │ │ ├── pylume.py │ │ │ │ └── server.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ └── som │ │ ├── .bumpversion.cfg │ │ ├── LICENSE │ │ ├── poetry.toml │ │ ├── pyproject.toml │ │ ├── README.md │ │ ├── som │ │ │ ├── __init__.py │ │ │ ├── detect.py │ │ │ ├── detection.py │ │ │ ├── models.py │ │ │ ├── ocr.py │ │ │ ├── util │ │ │ │ └── utils.py │ │ │ └── visualization.py │ │ └── tests │ │ └── test_omniparser.py │ ├── typescript │ │ ├── .gitignore │ │ ├── .nvmrc │ │ ├── agent │ │ │ ├── examples │ │ │ │ ├── playground-example.html │ │ │ │ └── README.md │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── client.ts │ │ │ │ ├── index.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ └── client.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── biome.json │ │ ├── computer │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── computer │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── providers │ │ │ │ │ │ ├── base.ts │ │ │ │ │ │ ├── cloud.ts │ │ │ │ │ │ └── index.ts │ │ │ │ │ └── types.ts │ │ │ │ ├── index.ts │ │ │ │ ├── interface │ │ │ │ │ ├── base.ts │ │ │ │ │ ├── factory.ts │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── linux.ts │ │ │ │ │ ├── macos.ts │ │ │ │ │ └── windows.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ ├── computer │ │ │ │ │ └── cloud.test.ts │ │ │ │ ├── interface │ │ │ │ │ ├── factory.test.ts │ │ │ │ │ ├── index.test.ts │ │ │ │ │ ├── linux.test.ts │ │ │ │ │ ├── macos.test.ts │ │ │ │ │ └── windows.test.ts │ │ │ │ └── setup.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── core │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── index.ts │ │ │ │ └── telemetry │ │ │ │ ├── clients │ │ │ │ │ ├── index.ts │ │ │ │ │ └── posthog.ts │ │ │ │ └── index.ts │ │ │ ├── tests │ │ │ │ └── telemetry.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── pnpm-workspace.yaml │ │ └── README.md │ └── xfce │ ├── .dockerignore │ ├── .gitignore │ ├── Dockerfile │ ├── README.md │ └── src │ ├── scripts │ │ ├── resize-display.sh │ │ ├── start-computer-server.sh │ │ ├── start-novnc.sh │ │ ├── start-vnc.sh │ │ └── xstartup.sh │ ├── supervisor │ │ └── supervisord.conf │ └── xfce-config │ ├── helpers.rc │ ├── xfce4-power-manager.xml │ └── xfce4-session.xml ├── LICENSE.md ├── Makefile ├── notebooks │ ├── agent_nb.ipynb │ ├── blog │ │ ├── build-your-own-operator-on-macos-1.ipynb │ │ └── build-your-own-operator-on-macos-2.ipynb │ ├── composite_agents_docker_nb.ipynb │ ├── computer_nb.ipynb │ ├── computer_server_nb.ipynb │ ├── customizing_computeragent.ipynb │ ├── eval_osworld.ipynb │ ├── ollama_nb.ipynb │ ├── pylume_nb.ipynb │ ├── README.md │ ├── sota_hackathon_cloud.ipynb │ └── sota_hackathon.ipynb ├── pdm.lock ├── pyproject.toml ├── pyrightconfig.json ├── README.md ├── samples │ └── community │ ├── global-online │ │ └── README.md │ └── hack-the-north │ └── README.md ├── scripts │ ├── build-uv.sh │ ├── build.ps1 │ ├── build.sh │ ├── cleanup.sh │ ├── playground-docker.sh │ ├── playground.sh │ └── run-docker-dev.sh └── tests ├── pytest.ini ├── shell_cmd.py ├── test_files.py ├── test_mcp_server_session_management.py ├── test_mcp_server_streaming.py ├── test_shell_bash.py ├── test_telemetry.py ├── test_venv.py └── test_watchdog.py ``` # Files -------------------------------------------------------------------------------- /libs/python/computer/computer/providers/winsandbox/provider.py: -------------------------------------------------------------------------------- ```python """Windows Sandbox VM provider implementation using pywinsandbox.""" import os import asyncio import logging import time from typing import Dict, Any, Optional, List from pathlib import Path from ..base import BaseVMProvider, VMProviderType # Setup logging logger = logging.getLogger(__name__) try: import winsandbox HAS_WINSANDBOX = True except ImportError: HAS_WINSANDBOX = False class WinSandboxProvider(BaseVMProvider): """Windows Sandbox VM provider implementation using pywinsandbox. This provider uses Windows Sandbox to create isolated Windows environments. Storage is always ephemeral with Windows Sandbox. """ def __init__( self, port: int = 7777, host: str = "localhost", storage: Optional[str] = None, verbose: bool = False, ephemeral: bool = True, # Windows Sandbox is always ephemeral memory_mb: int = 4096, networking: bool = True, **kwargs ): """Initialize the Windows Sandbox provider. Args: port: Port for the computer server (default: 7777) host: Host to use for connections (default: localhost) storage: Storage path (ignored - Windows Sandbox is always ephemeral) verbose: Enable verbose logging ephemeral: Always True for Windows Sandbox memory_mb: Memory allocation in MB (default: 4096) networking: Enable networking in sandbox (default: True) """ if not HAS_WINSANDBOX: raise ImportError( "pywinsandbox is required for WinSandboxProvider. " "Please install it with 'pip install pywinsandbox'" ) self.host = host self.port = port self.verbose = verbose self.memory_mb = memory_mb self.networking = networking # Windows Sandbox is always ephemeral if not ephemeral: logger.warning("Windows Sandbox storage is always ephemeral. Ignoring ephemeral=False.") self.ephemeral = True # Storage is always ephemeral for Windows Sandbox if storage and storage != "ephemeral": logger.warning("Windows Sandbox does not support persistent storage. Using ephemeral storage.") self.storage = "ephemeral" self.logger = logging.getLogger(__name__) # Track active sandboxes self._active_sandboxes: Dict[str, Any] = {} @property def provider_type(self) -> VMProviderType: """Get the provider type.""" return VMProviderType.WINSANDBOX async def __aenter__(self): """Enter async context manager.""" # Verify Windows Sandbox is available if not HAS_WINSANDBOX: raise ImportError("pywinsandbox is not available") return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Exit async context manager.""" # Clean up any active sandboxes for name, sandbox in self._active_sandboxes.items(): try: sandbox.shutdown() self.logger.info(f"Terminated sandbox: {name}") except Exception as e: self.logger.error(f"Error terminating sandbox {name}: {e}") self._active_sandboxes.clear() async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: """Get VM information by name. Args: name: Name of the VM to get information for storage: Ignored for Windows Sandbox (always ephemeral) Returns: Dictionary with VM information including status, IP address, etc. """ if name not in self._active_sandboxes: return { "name": name, "status": "stopped", "ip_address": None, "storage": "ephemeral" } sandbox = self._active_sandboxes[name] # Check if sandbox is still running try: # Try to ping the sandbox to see if it's responsive try: sandbox.rpyc.modules.os.getcwd() sandbox_responsive = True except Exception: sandbox_responsive = False if not sandbox_responsive: return { "name": name, "status": "starting", "ip_address": None, "storage": "ephemeral", "memory_mb": self.memory_mb, "networking": self.networking } # Check for computer server address file server_address_file = r"C:\Users\WDAGUtilityAccount\Desktop\shared_windows_sandbox_dir\server_address" try: # Check if the server address file exists file_exists = sandbox.rpyc.modules.os.path.exists(server_address_file) if file_exists: # Read the server address file with sandbox.rpyc.builtin.open(server_address_file, 'r') as f: server_address = f.read().strip() if server_address and ':' in server_address: # Parse IP:port from the file ip_address, port = server_address.split(':', 1) # Verify the server is actually responding try: import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(3) result = sock.connect_ex((ip_address, int(port))) sock.close() if result == 0: # Server is responding status = "running" self.logger.debug(f"Computer server found at {ip_address}:{port}") else: # Server file exists but not responding status = "starting" ip_address = None except Exception as e: self.logger.debug(f"Error checking server connectivity: {e}") status = "starting" ip_address = None else: # File exists but doesn't contain valid address status = "starting" ip_address = None else: # Server address file doesn't exist yet status = "starting" ip_address = None except Exception as e: self.logger.debug(f"Error checking server address file: {e}") status = "starting" ip_address = None except Exception as e: self.logger.error(f"Error checking sandbox status: {e}") status = "error" ip_address = None return { "name": name, "status": status, "ip_address": ip_address, "storage": "ephemeral", "memory_mb": self.memory_mb, "networking": self.networking } async def list_vms(self) -> List[Dict[str, Any]]: """List all available VMs.""" vms = [] for name in self._active_sandboxes.keys(): vm_info = await self.get_vm(name) vms.append(vm_info) return vms async def run_vm(self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: """Run a VM with the given options. Args: image: Image name (ignored for Windows Sandbox - always uses host Windows) name: Name of the VM to run run_opts: Dictionary of run options (memory, cpu, etc.) storage: Ignored for Windows Sandbox (always ephemeral) Returns: Dictionary with VM run status and information """ if name in self._active_sandboxes: return { "success": False, "error": f"Sandbox {name} is already running" } try: # Extract options from run_opts memory_mb = run_opts.get("memory_mb", self.memory_mb) if isinstance(memory_mb, str): # Convert memory string like "4GB" to MB if memory_mb.upper().endswith("GB"): memory_mb = int(float(memory_mb[:-2]) * 1024) elif memory_mb.upper().endswith("MB"): memory_mb = int(memory_mb[:-2]) else: memory_mb = self.memory_mb networking = run_opts.get("networking", self.networking) # Create folder mappers; always map a persistent venv directory on host for caching packages folder_mappers = [] # Ensure host side persistent venv directory exists (Path.home()/wsb_venv) host_wsb_env = Path.home() / ".cua" / "wsb_cache" try: host_wsb_env.mkdir(parents=True, exist_ok=True) except Exception: # If cannot create, continue without persistent mapping host_wsb_env = None shared_directories = run_opts.get("shared_directories", []) for shared_dir in shared_directories: if isinstance(shared_dir, dict): host_path = shared_dir.get("hostPath", "") elif isinstance(shared_dir, str): host_path = shared_dir else: continue if host_path and os.path.exists(host_path): folder_mappers.append(winsandbox.FolderMapper(host_path)) # Add mapping for the persistent venv directory (read/write) so it appears in Sandbox Desktop if host_wsb_env is not None and host_wsb_env.exists(): try: folder_mappers.append( winsandbox.FolderMapper(str(host_wsb_env), read_only=False) ) except Exception as e: self.logger.warning(f"Failed to map host winsandbox_venv: {e}") self.logger.info(f"Creating Windows Sandbox: {name}") self.logger.info(f"Memory: {memory_mb}MB, Networking: {networking}") if folder_mappers: self.logger.info(f"Shared directories: {len(folder_mappers)}") # Create the sandbox without logon script try: # Try with memory_mb parameter (newer pywinsandbox version) sandbox = winsandbox.new_sandbox( memory_mb=str(memory_mb), networking=networking, folder_mappers=folder_mappers ) except TypeError as e: if "memory_mb" in str(e): # Fallback for older pywinsandbox version that doesn't support memory_mb self.logger.warning( f"Your pywinsandbox version doesn't support memory_mb parameter. " f"Using default memory settings. To use custom memory settings, " f"please update pywinsandbox: pip install -U git+https://github.com/karkason/pywinsandbox.git" ) sandbox = winsandbox.new_sandbox( networking=networking, folder_mappers=folder_mappers ) else: # Re-raise if it's a different TypeError raise # Store the sandbox self._active_sandboxes[name] = sandbox self.logger.info(f"Windows Sandbox {name} created successfully") venv_exists = (host_wsb_env / "venv" / "Lib" / "site-packages" / "computer_server").exists() if host_wsb_env else False # Setup the computer server in the sandbox await self._setup_computer_server(sandbox, name, wait_for_venv=(not venv_exists)) return { "success": True, "name": name, "status": "starting", "memory_mb": memory_mb, "networking": networking, "storage": "ephemeral" } except Exception as e: self.logger.error(f"Failed to create Windows Sandbox {name}: {e}") # stack trace import traceback self.logger.error(f"Stack trace: {traceback.format_exc()}") return { "success": False, "error": f"Failed to create sandbox: {str(e)}" } async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: """Stop a running VM. Args: name: Name of the VM to stop storage: Ignored for Windows Sandbox Returns: Dictionary with stop status and information """ if name not in self._active_sandboxes: return { "success": False, "error": f"Sandbox {name} is not running" } try: sandbox = self._active_sandboxes[name] # Terminate the sandbox sandbox.shutdown() # Remove from active sandboxes del self._active_sandboxes[name] self.logger.info(f"Windows Sandbox {name} stopped successfully") return { "success": True, "name": name, "status": "stopped" } except Exception as e: self.logger.error(f"Failed to stop Windows Sandbox {name}: {e}") return { "success": False, "error": f"Failed to stop sandbox: {str(e)}" } async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: """Update VM configuration. Note: Windows Sandbox does not support runtime configuration updates. The sandbox must be stopped and restarted with new configuration. Args: name: Name of the VM to update update_opts: Dictionary of update options storage: Ignored for Windows Sandbox Returns: Dictionary with update status and information """ return { "success": False, "error": "Windows Sandbox does not support runtime configuration updates. " "Please stop and restart the sandbox with new configuration." } async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: raise NotImplementedError("WinSandboxProvider does not support restarting VMs.") async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str: """Get the IP address of a VM, waiting indefinitely until it's available. Args: name: Name of the VM to get the IP for storage: Ignored for Windows Sandbox retry_delay: Delay between retries in seconds (default: 2) Returns: IP address of the VM when it becomes available """ total_attempts = 0 # Loop indefinitely until we get a valid IP while True: total_attempts += 1 # Log retry message but not on first attempt if total_attempts > 1: self.logger.info(f"Waiting for Windows Sandbox {name} IP address (attempt {total_attempts})...") try: # Get VM information vm_info = await self.get_vm(name, storage=storage) # Check if we got a valid IP ip = vm_info.get("ip_address", None) if ip and ip != "unknown" and not ip.startswith("0.0.0.0"): self.logger.info(f"Got valid Windows Sandbox IP address: {ip}") return ip # Check the VM status status = vm_info.get("status", "unknown") # If VM is not running yet, log and wait if status != "running": self.logger.info(f"Windows Sandbox is not running yet (status: {status}). Waiting...") # If VM is running but no IP yet, wait and retry else: self.logger.info("Windows Sandbox is running but no valid IP address yet. Waiting...") except Exception as e: self.logger.warning(f"Error getting Windows Sandbox {name} IP: {e}, continuing to wait...") # Wait before next retry await asyncio.sleep(retry_delay) # Add progress log every 10 attempts if total_attempts % 10 == 0: self.logger.info(f"Still waiting for Windows Sandbox {name} IP after {total_attempts} attempts...") async def _setup_computer_server(self, sandbox, name: str, visible: bool = False, wait_for_venv: bool = True): """Setup the computer server in the Windows Sandbox using RPyC. Args: sandbox: The Windows Sandbox instance name: Name of the sandbox visible: Whether the opened process should be visible (default: False) """ try: self.logger.info(f"Setting up computer server in sandbox {name}...") # Read the PowerShell setup script script_path = os.path.join(os.path.dirname(__file__), "setup_script.ps1") with open(script_path, 'r', encoding='utf-8') as f: setup_script_content = f.read() # Write the setup script to the sandbox using RPyC script_dest_path = r"C:\Users\WDAGUtilityAccount\setup_cua.ps1" self.logger.info(f"Writing setup script to {script_dest_path}") with sandbox.rpyc.builtin.open(script_dest_path, 'w') as f: f.write(setup_script_content) # Execute the PowerShell script in the background self.logger.info("Executing setup script in sandbox...") # Use subprocess to run PowerShell script import subprocess powershell_cmd = [ "powershell.exe", "-ExecutionPolicy", "Bypass", "-NoExit", # Keep window open after script completes "-File", script_dest_path ] # Set creation flags based on visibility preference if visible: # CREATE_NEW_CONSOLE - creates a new console window (visible) creation_flags = 0x00000010 else: creation_flags = 0x08000000 # CREATE_NO_WINDOW # Start the process using RPyC process = sandbox.rpyc.modules.subprocess.Popen( powershell_cmd, creationflags=creation_flags, shell=False ) if wait_for_venv: print("Waiting for venv to be created for the first time setup of Windows Sandbox...") print("This may take a minute...") await asyncio.sleep(120) ip = await self.get_ip(name) self.logger.info(f"Sandbox IP: {ip}") self.logger.info(f"Setup script started in background in sandbox {name} with PID: {process.pid}") except Exception as e: self.logger.error(f"Failed to setup computer server in sandbox {name}: {e}") import traceback self.logger.error(f"Stack trace: {traceback.format_exc()}") ``` -------------------------------------------------------------------------------- /libs/python/computer/computer/providers/lume/provider.py: -------------------------------------------------------------------------------- ```python """Lume VM provider implementation using curl commands. This provider uses direct curl commands to interact with the Lume API, removing the dependency on the pylume Python package. """ import os import re import asyncio import json import logging import subprocess import urllib.parse from typing import Dict, Any, Optional, List, Tuple from ..base import BaseVMProvider, VMProviderType from ...logger import Logger, LogLevel from ..lume_api import ( lume_api_get, lume_api_run, lume_api_stop, lume_api_update, lume_api_pull, HAS_CURL, parse_memory ) # Setup logging logger = logging.getLogger(__name__) class LumeProvider(BaseVMProvider): """Lume VM provider implementation using direct curl commands. This provider uses curl to interact with the Lume API server, removing the dependency on the pylume Python package. """ def __init__( self, port: int = 7777, host: str = "localhost", storage: Optional[str] = None, verbose: bool = False, ephemeral: bool = False, ): """Initialize the Lume provider. Args: port: Port for the Lume API server (default: 7777) host: Host to use for API connections (default: localhost) storage: Path to store VM data verbose: Enable verbose logging """ if not HAS_CURL: raise ImportError( "curl is required for LumeProvider. " "Please ensure it is installed and in your PATH." ) self.host = host self.port = port # Default port for Lume API self.storage = storage self.verbose = verbose self.ephemeral = ephemeral # If True, VMs will be deleted after stopping # Base API URL for Lume API calls self.api_base_url = f"http://{self.host}:{self.port}" self.logger = logging.getLogger(__name__) @property def provider_type(self) -> VMProviderType: """Get the provider type.""" return VMProviderType.LUME async def __aenter__(self): """Enter async context manager.""" # No initialization needed, just return self return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Exit async context manager.""" # No cleanup needed pass def _lume_api_get(self, vm_name: str = "", storage: Optional[str] = None, debug: bool = False) -> Dict[str, Any]: """Get VM information using shared lume_api function. Args: vm_name: Optional name of the VM to get info for. If empty, lists all VMs. storage: Optional storage path override. If provided, this will be used instead of self.storage debug: Whether to show debug output Returns: Dictionary with VM status information parsed from JSON response """ # Use the shared implementation from lume_api module return lume_api_get( vm_name=vm_name, host=self.host, port=self.port, storage=storage if storage is not None else self.storage, debug=debug, verbose=self.verbose ) def _lume_api_run(self, vm_name: str, run_opts: Dict[str, Any], debug: bool = False) -> Dict[str, Any]: """Run a VM using shared lume_api function. Args: vm_name: Name of the VM to run run_opts: Dictionary of run options debug: Whether to show debug output Returns: Dictionary with API response or error information """ # Use the shared implementation from lume_api module return lume_api_run( vm_name=vm_name, host=self.host, port=self.port, run_opts=run_opts, storage=self.storage, debug=debug, verbose=self.verbose ) def _lume_api_stop(self, vm_name: str, debug: bool = False) -> Dict[str, Any]: """Stop a VM using shared lume_api function. Args: vm_name: Name of the VM to stop debug: Whether to show debug output Returns: Dictionary with API response or error information """ # Use the shared implementation from lume_api module return lume_api_stop( vm_name=vm_name, host=self.host, port=self.port, storage=self.storage, debug=debug, verbose=self.verbose ) def _lume_api_update(self, vm_name: str, update_opts: Dict[str, Any], debug: bool = False) -> Dict[str, Any]: """Update VM configuration using shared lume_api function. Args: vm_name: Name of the VM to update update_opts: Dictionary of update options debug: Whether to show debug output Returns: Dictionary with API response or error information """ # Use the shared implementation from lume_api module return lume_api_update( vm_name=vm_name, host=self.host, port=self.port, update_opts=update_opts, storage=self.storage, debug=debug, verbose=self.verbose ) async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: """Get VM information by name. Args: name: Name of the VM to get information for storage: Optional storage path override. If provided, this will be used instead of the provider's default storage path. Returns: Dictionary with VM information including status, IP address, etc. Note: If storage is not provided, the provider's default storage path will be used. The storage parameter allows overriding the storage location for this specific call. """ if not HAS_CURL: logger.error("curl is not available. Cannot get VM status.") return { "name": name, "status": "unavailable", "error": "curl is not available" } # First try to get detailed VM info from the API try: # Query the Lume API for VM status using the provider's storage_path vm_info = self._lume_api_get( vm_name=name, storage=storage if storage is not None else self.storage, debug=self.verbose ) # Check for API errors if "error" in vm_info: logger.debug(f"API request error: {vm_info['error']}") # If we got an error from the API, report the VM as not ready yet return { "name": name, "status": "starting", # VM is still starting - do not attempt to connect yet "api_status": "error", "error": vm_info["error"] } # Process the VM status information vm_status = vm_info.get("status", "unknown") # Check if VM is stopped or not running - don't wait for IP in this case if vm_status == "stopped": logger.info(f"VM {name} is in '{vm_status}' state - not waiting for IP address") # Return the status as-is without waiting for an IP result = { "name": name, "status": vm_status, **vm_info # Include all original fields from the API response } return result # Handle field name differences between APIs # Some APIs use camelCase, others use snake_case if "vncUrl" in vm_info: vnc_url = vm_info["vncUrl"] elif "vnc_url" in vm_info: vnc_url = vm_info["vnc_url"] else: vnc_url = "" if "ipAddress" in vm_info: ip_address = vm_info["ipAddress"] elif "ip_address" in vm_info: ip_address = vm_info["ip_address"] else: # If no IP address is provided and VM is supposed to be running, # report it as still starting ip_address = None logger.info(f"VM {name} is in '{vm_status}' state but no IP address found - reporting as still starting") logger.info(f"VM {name} status: {vm_status}") # Return the complete status information result = { "name": name, "status": vm_status if vm_status else "running", "ip_address": ip_address, "vnc_url": vnc_url, "api_status": "ok" } # Include all original fields from the API response if isinstance(vm_info, dict): for key, value in vm_info.items(): if key not in result: # Don't override our carefully processed fields result[key] = value return result except Exception as e: logger.error(f"Failed to get VM status: {e}") # Return a fallback status that indicates the VM is not ready yet return { "name": name, "status": "initializing", # VM is still initializing "error": f"Failed to get VM status: {str(e)}" } async def list_vms(self) -> List[Dict[str, Any]]: """List all available VMs.""" result = self._lume_api_get(debug=self.verbose) # Extract the VMs list from the response if "vms" in result and isinstance(result["vms"], list): return result["vms"] elif "error" in result: logger.error(f"Error listing VMs: {result['error']}") return [] else: return [] async def run_vm(self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: """Run a VM with the given options. If the VM does not exist in the storage location, this will attempt to pull it from the Lume registry first. Args: image: Image name to use when pulling the VM if it doesn't exist name: Name of the VM to run run_opts: Dictionary of run options (memory, cpu, etc.) storage: Optional storage path override. If provided, this will be used instead of the provider's default storage path. Returns: Dictionary with VM run status and information """ # First check if VM exists by trying to get its info vm_info = await self.get_vm(name, storage=storage) if "error" in vm_info: # VM doesn't exist, try to pull it self.logger.info(f"VM {name} not found, attempting to pull image {image} from registry...") # Call pull_vm with the image parameter pull_result = await self.pull_vm( name=name, image=image, storage=storage ) # Check if pull was successful if "error" in pull_result: self.logger.error(f"Failed to pull VM image: {pull_result['error']}") return pull_result # Return the error from pull self.logger.info(f"Successfully pulled VM image {image} as {name}") # Now run the VM with the given options self.logger.info(f"Running VM {name} with options: {run_opts}") from ..lume_api import lume_api_run return lume_api_run( vm_name=name, host=self.host, port=self.port, run_opts=run_opts, storage=storage if storage is not None else self.storage, debug=self.verbose, verbose=self.verbose ) async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: """Stop a running VM. If this provider was initialized with ephemeral=True, the VM will also be deleted after it is stopped. Args: name: Name of the VM to stop storage: Optional storage path override Returns: Dictionary with stop status and information """ # Stop the VM first stop_result = self._lume_api_stop(name, debug=self.verbose) # Log ephemeral status for debugging self.logger.info(f"Ephemeral mode status: {self.ephemeral}") # If ephemeral mode is enabled, delete the VM after stopping if self.ephemeral and (stop_result.get("success", False) or "error" not in stop_result): self.logger.info(f"Ephemeral mode enabled - deleting VM {name} after stopping") try: delete_result = await self.delete_vm(name, storage=storage) # Return combined result return { **stop_result, # Include all stop result info "deleted": True, "delete_result": delete_result } except Exception as e: self.logger.error(f"Failed to delete ephemeral VM {name}: {e}") # Include the error but still return stop result return { **stop_result, "deleted": False, "delete_error": str(e) } # Just return the stop result if not ephemeral return stop_result async def pull_vm( self, name: str, image: str, storage: Optional[str] = None, registry: str = "ghcr.io", organization: str = "trycua", pull_opts: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Pull a VM image from the registry. Args: name: Name for the VM after pulling image: The image name to pull (e.g. 'macos-sequoia-cua:latest') storage: Optional storage path to use registry: Registry to pull from (default: ghcr.io) organization: Organization in registry (default: trycua) pull_opts: Additional options for pulling the VM (optional) Returns: Dictionary with information about the pulled VM Raises: RuntimeError: If pull operation fails or image is not provided """ # Validate image parameter if not image: raise ValueError("Image parameter is required for pull_vm") self.logger.info(f"Pulling VM image '{image}' as '{name}'") self.logger.info("You can check the pull progress using: lume logs -f") # Set default pull_opts if not provided if pull_opts is None: pull_opts = {} # Log information about the operation self.logger.debug(f"Pull storage location: {storage or 'default'}") try: # Call the lume_api_pull function from lume_api.py from ..lume_api import lume_api_pull result = lume_api_pull( image=image, name=name, host=self.host, port=self.port, storage=storage if storage is not None else self.storage, registry=registry, organization=organization, debug=self.verbose, verbose=self.verbose ) # Check for errors in the result if "error" in result: self.logger.error(f"Failed to pull VM image: {result['error']}") return result self.logger.info(f"Successfully pulled VM image '{image}' as '{name}'") return result except Exception as e: self.logger.error(f"Failed to pull VM image '{image}': {e}") return {"error": f"Failed to pull VM: {str(e)}"} async def delete_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: """Delete a VM permanently. Args: name: Name of the VM to delete storage: Optional storage path override Returns: Dictionary with delete status and information """ self.logger.info(f"Deleting VM {name}...") try: # Call the lume_api_delete function we created from ..lume_api import lume_api_delete result = lume_api_delete( vm_name=name, host=self.host, port=self.port, storage=storage if storage is not None else self.storage, debug=self.verbose, verbose=self.verbose ) # Check for errors in the result if "error" in result: self.logger.error(f"Failed to delete VM: {result['error']}") return result self.logger.info(f"Successfully deleted VM '{name}'") return result except Exception as e: self.logger.error(f"Failed to delete VM '{name}': {e}") return {"error": f"Failed to delete VM: {str(e)}"} async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: """Update VM configuration.""" return self._lume_api_update(name, update_opts, debug=self.verbose) async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: raise NotImplementedError("LumeProvider does not support restarting VMs.") async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str: """Get the IP address of a VM, waiting indefinitely until it's available. Args: name: Name of the VM to get the IP for storage: Optional storage path override retry_delay: Delay between retries in seconds (default: 2) Returns: IP address of the VM when it becomes available """ # Track total attempts for logging purposes total_attempts = 0 # Loop indefinitely until we get a valid IP while True: total_attempts += 1 # Log retry message but not on first attempt if total_attempts > 1: self.logger.info(f"Waiting for VM {name} IP address (attempt {total_attempts})...") try: # Get VM information vm_info = await self.get_vm(name, storage=storage) # Check if we got a valid IP ip = vm_info.get("ip_address", None) if ip and ip != "unknown" and not ip.startswith("0.0.0.0"): self.logger.info(f"Got valid VM IP address: {ip}") return ip # Check the VM status status = vm_info.get("status", "unknown") # If VM is not running yet, log and wait if status != "running": self.logger.info(f"VM is not running yet (status: {status}). Waiting...") # If VM is running but no IP yet, wait and retry else: self.logger.info("VM is running but no valid IP address yet. Waiting...") except Exception as e: self.logger.warning(f"Error getting VM {name} IP: {e}, continuing to wait...") # Wait before next retry await asyncio.sleep(retry_delay) # Add progress log every 10 attempts if total_attempts % 10 == 0: self.logger.info(f"Still waiting for VM {name} IP after {total_attempts} attempts...") ``` -------------------------------------------------------------------------------- /libs/python/computer/computer/providers/docker/provider.py: -------------------------------------------------------------------------------- ```python """ Docker VM provider implementation. This provider uses Docker containers running the CUA Ubuntu image to create Linux VMs with computer-server. It handles VM lifecycle operations through Docker commands and container management. """ import logging import json import asyncio from typing import Dict, List, Optional, Any import subprocess import time import re from ..base import BaseVMProvider, VMProviderType # Setup logging logger = logging.getLogger(__name__) # Check if Docker is available try: subprocess.run(["docker", "--version"], capture_output=True, check=True) HAS_DOCKER = True except (subprocess.SubprocessError, FileNotFoundError): HAS_DOCKER = False class DockerProvider(BaseVMProvider): """ Docker VM Provider implementation using Docker containers. This provider uses Docker to run containers with the CUA Ubuntu image that includes computer-server for remote computer use. """ def __init__( self, port: Optional[int] = 8000, host: str = "localhost", storage: Optional[str] = None, shared_path: Optional[str] = None, image: str = "trycua/cua-ubuntu:latest", verbose: bool = False, ephemeral: bool = False, vnc_port: Optional[int] = 6901, ): """Initialize the Docker VM Provider. Args: port: Currently unused (VM provider port) host: Hostname for the API server (default: localhost) storage: Path for persistent VM storage shared_path: Path for shared folder between host and container image: Docker image to use (default: "trycua/cua-ubuntu:latest") Supported images: - "trycua/cua-ubuntu:latest" (Kasm-based) - "trycua/cua-docker-xfce:latest" (vanilla XFCE) verbose: Enable verbose logging ephemeral: Use ephemeral (temporary) storage vnc_port: Port for VNC interface (default: 6901) """ self.host = host self.api_port = 8000 self.vnc_port = vnc_port self.ephemeral = ephemeral # Handle ephemeral storage (temporary directory) if ephemeral: self.storage = "ephemeral" else: self.storage = storage self.shared_path = shared_path self.image = image self.verbose = verbose self._container_id = None self._running_containers = {} # Track running containers by name # Detect image type and configure user directory accordingly self._detect_image_config() def _detect_image_config(self): """Detect image type and configure paths accordingly.""" # Detect if this is a docker-xfce image or Kasm image if "docker-xfce" in self.image.lower() or "xfce" in self.image.lower(): self._home_dir = "/home/cua" self._image_type = "docker-xfce" logger.info(f"Detected docker-xfce image: using {self._home_dir}") else: # Default to Kasm configuration self._home_dir = "/home/kasm-user" self._image_type = "kasm" logger.info(f"Detected Kasm image: using {self._home_dir}") @property def provider_type(self) -> VMProviderType: """Return the provider type.""" return VMProviderType.DOCKER def _parse_memory(self, memory_str: str) -> str: """Parse memory string to Docker format. Examples: "8GB" -> "8g" "1024MB" -> "1024m" "512" -> "512m" """ if isinstance(memory_str, int): return f"{memory_str}m" if isinstance(memory_str, str): # Extract number and unit match = re.match(r"(\d+)([A-Za-z]*)", memory_str) if match: value, unit = match.groups() unit = unit.upper() if unit == "GB" or unit == "G": return f"{value}g" elif unit == "MB" or unit == "M" or unit == "": return f"{value}m" # Default fallback logger.warning(f"Could not parse memory string '{memory_str}', using 4g default") return "4g" # Default to 4GB async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: """Get VM information by name. Args: name: Name of the VM to get information for storage: Optional storage path override. If provided, this will be used instead of the provider's default storage path. Returns: Dictionary with VM information including status, IP address, etc. """ try: # Check if container exists and get its status cmd = ["docker", "inspect", name] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: # Container doesn't exist return { "name": name, "status": "not_found", "ip_address": None, "ports": {}, "image": self.image, "provider": "docker" } # Parse container info container_info = json.loads(result.stdout)[0] state = container_info["State"] network_settings = container_info["NetworkSettings"] # Determine status if state["Running"]: status = "running" elif state["Paused"]: status = "paused" else: status = "stopped" # Get IP address ip_address = network_settings.get("IPAddress", "") if not ip_address and "Networks" in network_settings: # Try to get IP from bridge network for network_name, network_info in network_settings["Networks"].items(): if network_info.get("IPAddress"): ip_address = network_info["IPAddress"] break # Get port mappings ports = {} if "Ports" in network_settings and network_settings["Ports"]: # network_settings["Ports"] is a dict like: # {'6901/tcp': [{'HostIp': '0.0.0.0', 'HostPort': '6901'}, ...], ...} for container_port, port_mappings in network_settings["Ports"].items(): if port_mappings: # Check if there are any port mappings # Take the first mapping (usually the IPv4 one) for mapping in port_mappings: if mapping.get("HostPort"): ports[container_port] = mapping["HostPort"] break # Use the first valid mapping return { "name": name, "status": status, "ip_address": ip_address or "127.0.0.1", # Use localhost if no IP "ports": ports, "image": container_info["Config"]["Image"], "provider": "docker", "container_id": container_info["Id"][:12], # Short ID "created": container_info["Created"], "started": state.get("StartedAt", ""), } except Exception as e: logger.error(f"Error getting VM info for {name}: {e}") import traceback traceback.print_exc() return { "name": name, "status": "error", "error": str(e), "provider": "docker" } async def list_vms(self) -> List[Dict[str, Any]]: """List all Docker containers managed by this provider.""" try: # List all containers (running and stopped) with the CUA image cmd = ["docker", "ps", "-a", "--filter", f"ancestor={self.image}", "--format", "json"] result = subprocess.run(cmd, capture_output=True, text=True, check=True) containers = [] if result.stdout.strip(): for line in result.stdout.strip().split('\n'): if line.strip(): container_data = json.loads(line) vm_info = await self.get_vm(container_data["Names"]) containers.append(vm_info) return containers except subprocess.CalledProcessError as e: logger.error(f"Error listing containers: {e.stderr}") return [] except Exception as e: logger.error(f"Error listing VMs: {e}") import traceback traceback.print_exc() return [] async def run_vm(self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: """Run a VM with the given options. Args: image: Name/tag of the Docker image to use name: Name of the container to run run_opts: Options for running the VM, including: - memory: Memory limit (e.g., "4GB", "2048MB") - cpu: CPU limit (e.g., 2 for 2 cores) - vnc_port: Specific port for VNC interface - api_port: Specific port for computer-server API Returns: Dictionary with VM status information """ try: # Check if container already exists existing_vm = await self.get_vm(name, storage) if existing_vm["status"] == "running": logger.info(f"Container {name} is already running") return existing_vm elif existing_vm["status"] in ["stopped", "paused"]: # Start existing container logger.info(f"Starting existing container {name}") start_cmd = ["docker", "start", name] result = subprocess.run(start_cmd, capture_output=True, text=True, check=True) # Wait for container to be ready await self._wait_for_container_ready(name) return await self.get_vm(name, storage) # Use provided image or default docker_image = image if image != "default" else self.image # Build docker run command cmd = ["docker", "run", "-d", "--name", name] # Add memory limit if specified if "memory" in run_opts: memory_limit = self._parse_memory(run_opts["memory"]) cmd.extend(["--memory", memory_limit]) # Add CPU limit if specified if "cpu" in run_opts: cpu_count = str(run_opts["cpu"]) cmd.extend(["--cpus", cpu_count]) # Add port mappings vnc_port = run_opts.get("vnc_port", self.vnc_port) api_port = run_opts.get("api_port", self.api_port) if vnc_port: cmd.extend(["-p", f"{vnc_port}:6901"]) # VNC port if api_port: cmd.extend(["-p", f"{api_port}:8000"]) # computer-server API port # Add volume mounts if storage is specified storage_path = storage or self.storage if storage_path and storage_path != "ephemeral": # Mount storage directory using detected home directory cmd.extend(["-v", f"{storage_path}:{self._home_dir}/storage"]) # Add shared path if specified if self.shared_path: # Mount shared directory using detected home directory cmd.extend(["-v", f"{self.shared_path}:{self._home_dir}/shared"]) # Add environment variables cmd.extend(["-e", "VNC_PW=password"]) # Set VNC password cmd.extend(["-e", "VNCOPTIONS=-disableBasicAuth"]) # Disable VNC basic auth # Add the image cmd.append(docker_image) logger.info(f"Running Docker container with command: {' '.join(cmd)}") # Run the container result = subprocess.run(cmd, capture_output=True, text=True, check=True) container_id = result.stdout.strip() logger.info(f"Container {name} started with ID: {container_id[:12]}") # Store container info self._container_id = container_id self._running_containers[name] = container_id # Wait for container to be ready await self._wait_for_container_ready(name) # Return VM info vm_info = await self.get_vm(name, storage) vm_info["container_id"] = container_id[:12] return vm_info except subprocess.CalledProcessError as e: error_msg = f"Failed to run container {name}: {e.stderr}" logger.error(error_msg) return { "name": name, "status": "error", "error": error_msg, "provider": "docker" } except Exception as e: error_msg = f"Error running VM {name}: {e}" logger.error(error_msg) return { "name": name, "status": "error", "error": error_msg, "provider": "docker" } async def _wait_for_container_ready(self, container_name: str, timeout: int = 60) -> bool: """Wait for the Docker container to be fully ready. Args: container_name: Name of the Docker container to check timeout: Maximum time to wait in seconds (default: 60 seconds) Returns: True if the container is running and ready """ logger.info(f"Waiting for container {container_name} to be ready...") start_time = time.time() while time.time() - start_time < timeout: try: # Check if container is running vm_info = await self.get_vm(container_name) if vm_info["status"] == "running": logger.info(f"Container {container_name} is running") # Additional check: try to connect to computer-server API # This is optional - we'll just wait a bit more for services to start await asyncio.sleep(5) return True except Exception as e: logger.debug(f"Container {container_name} not ready yet: {e}") await asyncio.sleep(2) logger.warning(f"Container {container_name} did not become ready within {timeout} seconds") return False async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: """Stop a running VM by stopping the Docker container.""" try: logger.info(f"Stopping container {name}") # Stop the container cmd = ["docker", "stop", name] result = subprocess.run(cmd, capture_output=True, text=True, check=True) # Remove from running containers tracking if name in self._running_containers: del self._running_containers[name] logger.info(f"Container {name} stopped successfully") return { "name": name, "status": "stopped", "message": "Container stopped successfully", "provider": "docker" } except subprocess.CalledProcessError as e: error_msg = f"Failed to stop container {name}: {e.stderr}" logger.error(error_msg) return { "name": name, "status": "error", "error": error_msg, "provider": "docker" } except Exception as e: error_msg = f"Error stopping VM {name}: {e}" logger.error(error_msg) return { "name": name, "status": "error", "error": error_msg, "provider": "docker" } async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: raise NotImplementedError("DockerProvider does not support restarting VMs.") async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: """Update VM configuration. Note: Docker containers cannot be updated while running. This method will return an error suggesting to recreate the container. """ return { "name": name, "status": "error", "error": "Docker containers cannot be updated while running. Please stop and recreate the container with new options.", "provider": "docker" } async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str: """Get the IP address of a VM, waiting indefinitely until it's available. Args: name: Name of the VM to get the IP for storage: Optional storage path override retry_delay: Delay between retries in seconds (default: 2) Returns: IP address of the VM when it becomes available """ logger.info(f"Getting IP address for container {name}") total_attempts = 0 while True: total_attempts += 1 try: vm_info = await self.get_vm(name, storage) if vm_info["status"] == "error": raise Exception(f"VM is in error state: {vm_info.get('error', 'Unknown error')}") # TODO: for now, return localhost # it seems the docker container is not accessible from the host # on WSL2, unless you port forward? not sure if True: logger.warning("Overriding container IP with localhost") return "localhost" # Check if we got a valid IP ip = vm_info.get("ip_address", None) if ip and ip != "unknown" and not ip.startswith("0.0.0.0"): logger.info(f"Got valid container IP address: {ip}") return ip # For Docker containers, we can also use localhost if ports are mapped if vm_info["status"] == "running" and vm_info.get("ports"): logger.info(f"Container is running with port mappings, using localhost") return "127.0.0.1" # Check the container status status = vm_info.get("status", "unknown") if status == "stopped": logger.info(f"Container status is {status}, but still waiting for it to start") elif status != "running": logger.info(f"Container is not running yet (status: {status}). Waiting...") else: logger.info("Container is running but no valid IP address yet. Waiting...") except Exception as e: logger.warning(f"Error getting container {name} IP: {e}, continuing to wait...") # Wait before next retry await asyncio.sleep(retry_delay) # Add progress log every 10 attempts if total_attempts % 10 == 0: logger.info(f"Still waiting for container {name} IP after {total_attempts} attempts...") async def __aenter__(self): """Async context manager entry.""" logger.debug("Entering DockerProvider context") return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit. This method handles cleanup of running containers if needed. """ logger.debug(f"Exiting DockerProvider context, handling exceptions: {exc_type}") try: # Optionally stop running containers on context exit # For now, we'll leave containers running as they might be needed # Users can manually stop them if needed pass except Exception as e: logger.error(f"Error during DockerProvider cleanup: {e}") if exc_type is None: raise return False ``` -------------------------------------------------------------------------------- /blog/build-your-own-operator-on-macos-1.md: -------------------------------------------------------------------------------- ```markdown # Build Your Own Operator on macOS - Part 1 *Published on March 31, 2025 by Francesco Bonacci* In this first blogpost, we'll learn how to build our own Computer-Use Operator using OpenAI's `computer-use-preview` model. But first, let's understand what some common terms mean: - A **Virtual Machine (VM)** is like a computer within your computer - a safe, isolated environment where the AI can work without affecting your main system. - **computer-use-preview** is OpenAI's specialized language model trained to understand and interact with computer interfaces through screenshots. - A **Computer-Use Agent** is an AI agent that can control a computer just like a human would - clicking buttons, typing text, and interacting with applications. Our Operator will run in an isolated macOS VM, by making use of our [cua-computer](https://github.com/trycua/cua/tree/main/libs/computer) package and [lume virtualization CLI](https://github.com/trycua/cua/tree/main/libs/lume). Check out what it looks like to use your own Operator from a Gradio app: <div align="center"> <video src="https://github.com/user-attachments/assets/a2cf69ad-2ab2-4eb9-8e1a-45606dd7eec6" width="600" controls></video> </div> ## What You'll Learn By the end of this tutorial, you'll be able to: - Set up a macOS virtual machine for AI automation - Connect OpenAI's computer-use model to your VM - Create a basic loop for the AI to interact with your VM - Handle different types of computer actions (clicking, typing, etc.) - Implement safety checks and error handling **Prerequisites:** - macOS Sonoma (14.0) or later - 8GB RAM minimum (16GB recommended) - OpenAI API access (Tier 3+) - Basic Python knowledge - Familiarity with terminal commands **Estimated Time:** 45-60 minutes ## Introduction to Computer-Use Agents Last March OpenAI released a fine-tuned version of GPT-4o, namely [CUA](https://openai.com/index/computer-using-agent/), introducing pixel-level vision capabilities with advanced reasoning through reinforcement learning. This fine-tuning enables the computer-use model to interpret screenshots and interact with graphical user interfaces on a pixel-level such as buttons, menus, and text fields - mimicking human interactions on a computer screen. It scores a remarkable 38.1% success rate on [OSWorld](https://os-world.github.io) - a benchmark for Computer-Use agents on Linux and Windows. This is the 2nd available model after Anthropic's [Claude 3.5 Sonnet](https://www.anthropic.com/news/3-5-models-and-computer-use) to support computer-use capabilities natively with no external models (e.g. accessory [SoM (Set-of-Mark)](https://arxiv.org/abs/2310.11441) and OCR runs). Professor Ethan Mollick provides an excellent explanation of computer-use agents in this article: [When you give a Claude a mouse](https://www.oneusefulthing.org/p/when-you-give-a-claude-a-mouse). ### ChatGPT Operator OpenAI's computer-use model powers [ChatGPT Operator](https://openai.com/index/introducing-operator), a Chromium-based interface exclusively available to ChatGPT Pro subscribers. Users leverage this functionality to automate web-based tasks such as online shopping, expense report submission, and booking reservations by interacting with websites in a human-like manner. ## Benefits of Custom Operators ### Why Build Your Own? While OpenAI's Operator uses a controlled Chromium VM instance, there are scenarios where you may want to use your own VM with full desktop capabilities. Here are some examples: - Automating native macOS apps like Finder, Xcode - Managing files, changing settings, and running terminal commands - Testing desktop software and applications - Creating workflows that combine web and desktop tasks - Automating media editing in apps like Final Cut Pro and Blender This gives you more control and flexibility to automate tasks beyond just web browsing, with full access to interact with native applications and system-level operations. Additionally, running your own VM locally provides better privacy for sensitive user files and delivers superior performance by leveraging your own hardware instead of renting expensive Cloud VMs. ## Access Requirements ### Model Availability As we speak, the **computer-use-preview** model has limited availability: - Only accessible to OpenAI tier 3+ users - Additional application process may be required even for eligible users - Cannot be used in the OpenAI Playground - Outside of ChatGPT Operator, usage is restricted to the new **Responses API** ## Understanding the OpenAI API ### Responses API Overview Let's start with the basics. In our case, we'll use OpenAI's Responses API to communicate with their computer-use model. Think of it like this: 1. We send the model a screenshot of our VM and tell it what we want it to do 2. The model looks at the screenshot and decides what actions to take 3. It sends back instructions (like "click here" or "type this") 4. We execute those instructions in our VM The [Responses API](https://platform.openai.com/docs/guides/responses) is OpenAI's newest way to interact with their AI models. It comes with several built-in tools: - **Web search**: Let the AI search the internet - **File search**: Help the AI find documents - **Computer use**: Allow the AI to control a computer (what we'll be using) As we speak, the computer-use model is only available through the Responses API. ### Responses API Examples Let's look at some simple examples. We'll start with the traditional way of using OpenAI's API with Chat Completions, then show the new Responses API primitive. Chat Completions: ```python # The old way required managing conversation history manually messages = [{"role": "user", "content": "Hello"}] response = client.chat.completions.create( model="gpt-4", messages=messages # We had to track all messages ourselves ) messages.append(response.choices[0].message) # Manual message tracking ``` Responses API: ```python # Example 1: Simple web search # The API handles all the complexity for us response = client.responses.create( model="gpt-4", input=[{ "role": "user", "content": "What's the latest news about AI?" }], tools=[{ "type": "web_search", # Tell the API to use web search "search_query": "latest AI news" }] ) # Example 2: File search # Looking for specific documents becomes easy response = client.responses.create( model="gpt-4", input=[{ "role": "user", "content": "Find documents about project X" }], tools=[{ "type": "file_search", "query": "project X", "file_types": ["pdf", "docx"] # Specify which file types to look for }] ) ``` ### Computer-Use Model Setup For our operator, we'll use the computer-use model. Here's how we set it up: ```python # Set up the computer-use model to control our VM response = client.responses.create( model="computer-use-preview", # Special model for computer control tools=[{ "type": "computer_use_preview", "display_width": 1024, # Size of our VM screen "display_height": 768, "environment": "mac" # Tell it we're using macOS. }], input=[ { "role": "user", "content": [ # What we want the AI to do {"type": "input_text", "text": "Open Safari and go to google.com"}, # Current screenshot of our VM {"type": "input_image", "image_url": f"data:image/png;base64,{screenshot_base64}"} ] } ], truncation="auto" # Let OpenAI handle message length ) ``` ### Understanding the Response When we send a request, the API sends back a response that looks like this: ```json "output": [ { "type": "reasoning", # The AI explains what it's thinking "id": "rs_67cc...", "summary": [ { "type": "summary_text", "text": "Clicking on the browser address bar." } ] }, { "type": "computer_call", # The actual action to perform "id": "cu_67cc...", "call_id": "call_zw3...", "action": { "type": "click", # What kind of action (click, type, etc.) "button": "left", # Which mouse button to use "x": 156, # Where to click (coordinates) "y": 50 }, "pending_safety_checks": [], # Any safety warnings to consider "status": "completed" # Whether the action was successful } ] ``` Each response contains: 1. **Reasoning**: The AI's explanation of what it's doing 2. **Action**: The specific computer action to perform 3. **Safety Checks**: Any potential risks to review 4. **Status**: Whether everything worked as planned ## CUA-Computer Interface ### Architecture Overview Let's break down the main components of our system and how they work together: 1. **The Virtual Machine (VM)** - Think of this as a safe playground for our AI - It's a complete macOS system running inside your computer - Anything the AI does stays inside this VM, keeping your main system safe - We use `lume` to create and manage this VM 2. **The Computer Interface (CUI)** - This is how we control the VM - It can move the mouse, type text, and take screenshots - Works like a remote control for the VM - Built using our `cua-computer` package 3. **The OpenAI Model** - This is the brain of our operator - It looks at screenshots of the VM - Decides what actions to take - Sends back instructions like "click here" or "type this" Here's how they all work together: ```mermaid sequenceDiagram participant User as You participant CUI as Computer Interface participant VM as Virtual Machine participant AI as OpenAI API Note over User,AI: The Main Loop User->>CUI: Start the operator CUI->>VM: Create macOS sandbox activate VM VM-->>CUI: VM is ready loop Action Loop Note over CUI,AI: Each iteration CUI->>VM: Take a screenshot VM-->>CUI: Return current screen CUI->>AI: Send screenshot + instructions AI-->>CUI: Return next action Note over CUI,VM: Execute the action alt Mouse Click CUI->>VM: Move and click mouse else Type Text CUI->>VM: Type characters else Scroll Screen CUI->>VM: Scroll window else Press Keys CUI->>VM: Press keyboard keys else Wait CUI->>VM: Pause for a moment end end VM-->>CUI: Task finished deactivate VM CUI-->>User: All done! ``` The diagram above shows how information flows through our system: 1. You start the operator 2. The Computer Interface creates a virtual macOS 3. Then it enters a loop: - Take a picture of the VM screen - Send it to OpenAI with instructions - Get back an action to perform - Execute that action in the VM - Repeat until the task is done This design keeps everything organized and safe. The AI can only interact with the VM through our controlled interface, and the VM keeps the AI's actions isolated from your main system. --- ## Implementation Guide ### Prerequisites 1. **Lume CLI Setup** For installing the standalone lume binary, run the following command from a terminal, or download the [latest pkg](https://github.com/trycua/cua/releases/latest/download/lume.pkg.tar.gz). ```bash sudo /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/trycua/cua/main/libs/lume/scripts/install.sh)" ``` **Important Storage Notes:** - Initial download requires 80GB of free space - After first run, space usage reduces to ~30GB due to macOS's sparse file system - VMs are stored in `~/.lume` - Cached images are stored in `~/.lume/cache` You can check your downloaded VM images anytime: ```bash lume ls ``` Example output: | name | os | cpu | memory | disk | display | status | ip | vnc | |--------------------------|---------|-------|---------|----------------|-----------|-----------|----------------|---------------------------------------------------| | macos-sequoia-cua:latest | macOS | 12 | 16.00G | 64.5GB/80.0GB | 1024x768 | running | 192.168.64.78 | vnc://:[email protected]:56085 | After checking your available images, you can run the VM to ensure everything is working correctly: ```bash lume run macos-sequoia-cua:latest ``` 2. **Python Environment Setup** **Note**: The `cua-computer` package requires Python 3.10 or later. We recommend creating a dedicated Python environment: **Using venv:** ```bash python -m venv cua-env source cua-env/bin/activate ``` **Using conda:** ```bash conda create -n cua-env python=3.10 conda activate cua-env ``` Then install the required packages: ```bash pip install openai pip install cua-computer ``` Ensure you have an OpenAI API key (set as an environment variable or in your OpenAI configuration). ### Building the Operator #### Importing Required Modules With the prerequisites installed and configured, we're ready to build our first operator. The following example uses asynchronous Python (async/await). You can run it either in a VS Code Notebook or as a standalone Python script. ```python import asyncio import base64 import openai from computer import Computer ``` #### Mapping API Actions to CUA Methods The following helper function converts a `computer_call` action from the OpenAI Responses API into corresponding commands on the CUI interface. For example, if the API instructs a `click` action, we move the cursor and perform a left click on the lume VM Sandbox. We will use the computer interface to execute the actions. ```python async def execute_action(computer, action): action_type = action.type if action_type == "click": x = action.x y = action.y button = action.button print(f"Executing click at ({x}, {y}) with button '{button}'") await computer.interface.move_cursor(x, y) if button == "right": await computer.interface.right_click() else: await computer.interface.left_click() elif action_type == "type": text = action.text print(f"Typing text: {text}") await computer.interface.type_text(text) elif action_type == "scroll": x = action.x y = action.y scroll_x = action.scroll_x scroll_y = action.scroll_y print(f"Scrolling at ({x}, {y}) with offsets (scroll_x={scroll_x}, scroll_y={scroll_y})") await computer.interface.move_cursor(x, y) await computer.interface.scroll(scroll_y) # Using vertical scroll only elif action_type == "keypress": keys = action.keys for key in keys: print(f"Pressing key: {key}") # Map common key names to CUA equivalents if key.lower() == "enter": await computer.interface.press_key("return") elif key.lower() == "space": await computer.interface.press_key("space") else: await computer.interface.press_key(key) elif action_type == "wait": wait_time = action.time print(f"Waiting for {wait_time} seconds") await asyncio.sleep(wait_time) elif action_type == "screenshot": print("Taking screenshot") # This is handled automatically in the main loop, but we can take an extra one if requested screenshot = await computer.interface.screenshot() return screenshot else: print(f"Unrecognized action: {action_type}") ``` #### Implementing the Computer-Use Loop This section defines a loop that: 1. Initializes the cua-computer instance (connecting to a macOS sandbox). 2. Captures a screenshot of the current state. 3. Sends the screenshot (with a user prompt) to the OpenAI Responses API using the `computer-use-preview` model. 4. Processes the returned `computer_call` action and executes it using our helper function. 5. Captures an updated screenshot after the action (this example runs one iteration, but you can wrap it in a loop). For a full loop, you would repeat these steps until no further actions are returned. ```python async def cua_openai_loop(): # Initialize the lume computer instance (macOS sandbox) async with Computer( display="1024x768", memory="4GB", cpu="2", os_type="macos" ) as computer: await computer.run() # Start the lume VM # Capture the initial screenshot screenshot = await computer.interface.screenshot() screenshot_base64 = base64.b64encode(screenshot).decode('utf-8') # Initial request to start the loop response = openai.responses.create( model="computer-use-preview", tools=[{ "type": "computer_use_preview", "display_width": 1024, "display_height": 768, "environment": "mac" }], input=[ { "role": "user", "content": [ {"type": "input_text", "text": "Open Safari, download and install Cursor."}, {"type": "input_image", "image_url": f"data:image/png;base64,{screenshot_base64}"} ] } ], truncation="auto" ) # Continue the loop until no more computer_call actions while True: # Check for computer_call actions computer_calls = [item for item in response.output if item and item.type == "computer_call"] if not computer_calls: print("No more computer calls. Loop complete.") break # Get the first computer call call = computer_calls[0] last_call_id = call.call_id action = call.action print("Received action from OpenAI Responses API:", action) # Handle any pending safety checks if call.pending_safety_checks: print("Safety checks pending:", call.pending_safety_checks) # In a real implementation, you would want to get user confirmation here acknowledged_checks = call.pending_safety_checks else: acknowledged_checks = [] # Execute the action await execute_action(computer, action) await asyncio.sleep(1) # Allow time for changes to take effect # Capture new screenshot after action new_screenshot = await computer.interface.screenshot() new_screenshot_base64 = base64.b64encode(new_screenshot).decode('utf-8') # Send the screenshot back as computer_call_output response = openai.responses.create( model="computer-use-preview", tools=[{ "type": "computer_use_preview", "display_width": 1024, "display_height": 768, "environment": "mac" }], input=[{ "type": "computer_call_output", "call_id": last_call_id, "acknowledged_safety_checks": acknowledged_checks, "output": { "type": "input_image", "image_url": f"data:image/png;base64,{new_screenshot_base64}" } }], truncation="auto" ) # End the session await computer.stop() # Run the loop if __name__ == "__main__": asyncio.run(cua_openai_loop()) ``` You can find the full code in our [notebook](https://github.com/trycua/cua/blob/main/notebooks/blog/build-your-own-operator-on-macos-1.ipynb). #### Request Handling Differences The first request to the OpenAI Responses API is special in that it includes the initial screenshot and prompt. Subsequent requests are handled differently, using the `computer_call_output` type to provide feedback on the executed action. ##### Initial Request Format - We use `role: "user"` with `content` that contains both `input_text` (the prompt) and `input_image` (the screenshot) ##### Subsequent Request Format - We use `type: "computer_call_output"` instead of the user role - We include the `call_id` to link the output to the specific previous action that was executed - We provide any `acknowledged_safety_checks` that were approved - We include the new screenshot in the `output` field This structured approach allows the API to maintain context and continuity throughout the interaction session. **Note**: For multi-turn conversations, you should include the `previous_response_id` in your initial requests when starting a new conversation with prior context. However, when using `computer_call_output` for action feedback, you don't need to explicitly manage the conversation history - OpenAI's API automatically tracks the context using the `call_id`. The `previous_response_id` is primarily important when the user provides additional instructions or when starting a new request that should continue from a previous session. ## Conclusion ### Summary This blogpost demonstrates a single iteration of a OpenAI Computer-Use loop where: - A macOS sandbox is controlled using the CUA interface. - A screenshot and prompt are sent to the OpenAI Responses API. - The returned action (e.g. a click or type command) is executed via the CUI interface. In a production setting, you would wrap the action-response cycle in a loop, handling multiple actions and safety checks as needed. ### Next Steps In the next blogpost, we'll introduce our Agent framework which abstracts away all these tedious implementation steps. This framework provides a higher-level API that handles the interaction loop between OpenAI's computer-use model and the macOS sandbox, allowing you to focus on building sophisticated applications rather than managing the low-level details we've explored here. Can't wait? Check out the [cua-agent](https://github.com/trycua/cua/tree/main/libs/agent) package! ### Resources - [OpenAI Computer-Use docs](https://platform.openai.com/docs/guides/tools-computer-use) - [cua-computer](https://github.com/trycua/cua/tree/main/libs/computer) - [lume](https://github.com/trycua/cua/tree/main/libs/lume) ``` -------------------------------------------------------------------------------- /libs/python/computer-server/computer_server/diorama/diorama.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """Diorama: A virtual desktop manager for macOS""" import os import asyncio import logging import sys import io from typing import Union from PIL import Image, ImageDraw from computer_server.diorama.draw import capture_all_apps, AppActivationContext, get_frontmost_and_active_app, get_all_windows, get_running_apps from computer_server.diorama.diorama_computer import DioramaComputer from computer_server.handlers.macos import * # simple, nicely formatted logging logger = logging.getLogger(__name__) automation_handler = MacOSAutomationHandler() class Diorama: """Virtual desktop manager that provides automation capabilities for macOS applications. Manages application windows and provides an interface for taking screenshots, mouse interactions, keyboard input, and coordinate transformations between screenshot space and screen space. """ _scheduler_queue = None _scheduler_task = None _loop = None _scheduler_started = False @classmethod def create_from_apps(cls, *args) -> DioramaComputer: """Create a DioramaComputer instance from a list of application names. Args: *args: Variable number of application names to include in the desktop Returns: DioramaComputer: A computer interface for the specified applications """ cls._ensure_scheduler() return cls(args).computer # Dictionary to store cursor positions for each unique app_list hash _cursor_positions = {} def __init__(self, app_list): """Initialize a Diorama instance for the specified applications. Args: app_list: List of application names to manage """ self.app_list = app_list self.interface = self.Interface(self) self.computer = DioramaComputer(self) self.focus_context = None # Create a hash for this app_list to use as a key self.app_list_hash = hash(tuple(sorted(app_list))) # Initialize cursor position for this app_list if it doesn't exist if self.app_list_hash not in Diorama._cursor_positions: Diorama._cursor_positions[self.app_list_hash] = (0, 0) @classmethod def _ensure_scheduler(cls): """Ensure the async scheduler loop is running. Creates and starts the scheduler task if it hasn't been started yet. """ if not cls._scheduler_started: logger.info("Starting Diorama scheduler loop…") cls._scheduler_queue = asyncio.Queue() cls._loop = asyncio.get_event_loop() cls._scheduler_task = cls._loop.create_task(cls._scheduler_loop()) cls._scheduler_started = True @classmethod async def _scheduler_loop(cls): """Main scheduler loop that processes automation commands. Continuously processes commands from the scheduler queue, handling screenshots, mouse actions, keyboard input, and scrolling operations. """ while True: cmd = await cls._scheduler_queue.get() action = cmd.get("action") args = cmd.get("arguments", {}) future = cmd.get("future") logger.info(f"Processing command: {action} | args={args}") app_whitelist = args.get("app_list", []) all_windows = get_all_windows() running_apps = get_running_apps() frontmost_app, active_app_to_use, active_app_pid = get_frontmost_and_active_app(all_windows, running_apps, app_whitelist) focus_context = AppActivationContext(active_app_pid, active_app_to_use, logger) with focus_context: try: if action == "screenshot": logger.info(f"Taking screenshot for apps: {app_whitelist}") result, img = capture_all_apps( app_whitelist=app_whitelist, save_to_disk=False, take_focus=False ) logger.info("Screenshot complete.") if future: future.set_result((result, img)) # Mouse actions elif action in ["left_click", "right_click", "double_click", "move_cursor", "drag_to"]: x = args.get("x") y = args.get("y") duration = args.get("duration", 0.5) if action == "left_click": await automation_handler.left_click(x, y) elif action == "right_click": await automation_handler.right_click(x, y) elif action == "double_click": await automation_handler.double_click(x, y) elif action == "move_cursor": await automation_handler.move_cursor(x, y) elif action == "drag_to": await automation_handler.drag_to(x, y, duration=duration) if future: future.set_result(None) elif action in ["scroll_up", "scroll_down"]: x = args.get("x") y = args.get("y") if x is not None and y is not None: await automation_handler.move_cursor(x, y) clicks = args.get("clicks", 1) if action == "scroll_up": await automation_handler.scroll_up(clicks) else: await automation_handler.scroll_down(clicks) if future: future.set_result(None) # Keyboard actions elif action == "type_text": text = args.get("text") await automation_handler.type_text(text) if future: future.set_result(None) elif action == "press_key": key = args.get("key") await automation_handler.press_key(key) if future: future.set_result(None) elif action == "hotkey": keys = args.get("keys", []) await automation_handler.hotkey(keys) if future: future.set_result(None) elif action == "get_cursor_position": pos = await automation_handler.get_cursor_position() if future: future.set_result(pos) else: logger.warning(f"Unknown action: {action}") if future: future.set_exception(ValueError(f"Unknown action: {action}")) except Exception as e: logger.error(f"Exception during {action}: {e}", exc_info=True) if future: future.set_exception(e) class Interface(): """Interface for interacting with the virtual desktop. Provides methods for taking screenshots, mouse interactions, keyboard input, and coordinate transformations between screenshot and screen coordinates. """ def __init__(self, diorama): """Initialize the interface with a reference to the parent Diorama instance. Args: diorama: The parent Diorama instance """ self._diorama = diorama self._scene_hitboxes = [] self._scene_size = None async def _send_cmd(self, action, arguments=None): """Send a command to the scheduler queue. Args: action (str): The action to perform arguments (dict, optional): Arguments for the action Returns: The result of the command execution """ Diorama._ensure_scheduler() loop = asyncio.get_event_loop() future = loop.create_future() logger.info(f"Enqueuing {action} command for apps: {self._diorama.app_list}") await Diorama._scheduler_queue.put({ "action": action, "arguments": {"app_list": self._diorama.app_list, **(arguments or {})}, "future": future }) try: return await future except asyncio.CancelledError: logger.warning(f"Command was cancelled: {action}") return None async def screenshot(self, as_bytes: bool = True) -> Union[str, Image.Image]: """Take a screenshot of the managed applications. Args: as_bytes (bool): If True, return base64-encoded bytes; if False, return PIL Image Returns: Union[str, Image.Image]: Base64-encoded PNG bytes or PIL Image object """ import base64 result, img = await self._send_cmd("screenshot") self._scene_hitboxes = result.get("hitboxes", []) self._scene_size = img.size if as_bytes: # PIL Image to bytes, then base64 encode for JSON import io img_byte_arr = io.BytesIO() img.save(img_byte_arr, format="PNG") img_bytes = img_byte_arr.getvalue() img_b64 = base64.b64encode(img_bytes).decode("ascii") return img_b64 else: return img async def left_click(self, x, y): """Perform a left mouse click at the specified coordinates. Args: x (int): X coordinate in screenshot space (or None to use last position) y (int): Y coordinate in screenshot space (or None to use last position) """ # Get last cursor position for this app_list hash app_list_hash = hash(tuple(sorted(self._diorama.app_list))) last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0)) x, y = x or last_pos[0], y or last_pos[1] # Update cursor position for this app_list hash Diorama._cursor_positions[app_list_hash] = (x, y) sx, sy = await self.to_screen_coordinates(x, y) await self._send_cmd("left_click", {"x": sx, "y": sy}) async def right_click(self, x, y): """Perform a right mouse click at the specified coordinates. Args: x (int): X coordinate in screenshot space (or None to use last position) y (int): Y coordinate in screenshot space (or None to use last position) """ # Get last cursor position for this app_list hash app_list_hash = hash(tuple(sorted(self._diorama.app_list))) last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0)) x, y = x or last_pos[0], y or last_pos[1] # Update cursor position for this app_list hash Diorama._cursor_positions[app_list_hash] = (x, y) sx, sy = await self.to_screen_coordinates(x, y) await self._send_cmd("right_click", {"x": sx, "y": sy}) async def double_click(self, x, y): """Perform a double mouse click at the specified coordinates. Args: x (int): X coordinate in screenshot space (or None to use last position) y (int): Y coordinate in screenshot space (or None to use last position) """ # Get last cursor position for this app_list hash app_list_hash = hash(tuple(sorted(self._diorama.app_list))) last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0)) x, y = x or last_pos[0], y or last_pos[1] # Update cursor position for this app_list hash Diorama._cursor_positions[app_list_hash] = (x, y) sx, sy = await self.to_screen_coordinates(x, y) await self._send_cmd("double_click", {"x": sx, "y": sy}) async def move_cursor(self, x, y): """Move the mouse cursor to the specified coordinates. Args: x (int): X coordinate in screenshot space (or None to use last position) y (int): Y coordinate in screenshot space (or None to use last position) """ # Get last cursor position for this app_list hash app_list_hash = hash(tuple(sorted(self._diorama.app_list))) last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0)) x, y = x or last_pos[0], y or last_pos[1] # Update cursor position for this app_list hash Diorama._cursor_positions[app_list_hash] = (x, y) sx, sy = await self.to_screen_coordinates(x, y) await self._send_cmd("move_cursor", {"x": sx, "y": sy}) async def drag_to(self, x, y, duration=0.5): """Drag the mouse from current position to the specified coordinates. Args: x (int): X coordinate in screenshot space (or None to use last position) y (int): Y coordinate in screenshot space (or None to use last position) duration (float): Duration of the drag operation in seconds """ # Get last cursor position for this app_list hash app_list_hash = hash(tuple(sorted(self._diorama.app_list))) last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0)) x, y = x or last_pos[0], y or last_pos[1] # Update cursor position for this app_list hash Diorama._cursor_positions[app_list_hash] = (x, y) sx, sy = await self.to_screen_coordinates(x, y) await self._send_cmd("drag_to", {"x": sx, "y": sy, "duration": duration}) async def get_cursor_position(self): """Get the current cursor position in screen coordinates. Returns: tuple: (x, y) coordinates of the cursor in screen space """ return await self._send_cmd("get_cursor_position") async def type_text(self, text): """Type the specified text using the keyboard. Args: text (str): The text to type """ await self._send_cmd("type_text", {"text": text}) async def press_key(self, key): """Press a single key on the keyboard. Args: key (str): The key to press """ await self._send_cmd("press_key", {"key": key}) async def hotkey(self, keys): """Press a combination of keys simultaneously. Args: keys (list): List of keys to press together """ await self._send_cmd("hotkey", {"keys": list(keys)}) async def scroll_up(self, clicks: int = 1): """Scroll up at the current cursor position. Args: clicks (int): Number of scroll clicks to perform """ # Get last cursor position for this app_list hash app_list_hash = hash(tuple(sorted(self._diorama.app_list))) last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0)) x, y = last_pos[0], last_pos[1] await self._send_cmd("scroll_up", {"clicks": clicks, "x": x, "y": y}) async def scroll_down(self, clicks: int = 1): """Scroll down at the current cursor position. Args: clicks (int): Number of scroll clicks to perform """ # Get last cursor position for this app_list hash app_list_hash = hash(tuple(sorted(self._diorama.app_list))) last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0)) x, y = last_pos[0], last_pos[1] await self._send_cmd("scroll_down", {"clicks": clicks, "x": x, "y": y}) async def get_screen_size(self) -> dict[str, int]: """Get the size of the screenshot area. Returns: dict[str, int]: Dictionary with 'width' and 'height' keys """ if not self._scene_size: await self.screenshot() return { "width": self._scene_size[0], "height": self._scene_size[1] } async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]: """Convert screenshot coordinates to screen coordinates. Args: x: X absolute coordinate in screenshot space y: Y absolute coordinate in screenshot space Returns: tuple[float, float]: (x, y) absolute coordinates in screen space """ if not self._scene_hitboxes: await self.screenshot() # get hitboxes # Try all hitboxes for h in self._scene_hitboxes[::-1]: rect_from = h.get("hitbox") rect_to = h.get("target") if not rect_from or len(rect_from) != 4: continue # check if (x, y) is inside rect_from x0, y0, x1, y1 = rect_from if x0 <= x <= x1 and y0 <= y <= y1: logger.info(f"Found hitbox: {h}") # remap (x, y) to rect_to tx0, ty0, tx1, ty1 = rect_to # calculate offset from x0, y0 offset_x = x - x0 offset_y = y - y0 # remap offset to rect_to tx = tx0 + offset_x ty = ty0 + offset_y return tx, ty return x, y async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]: """Convert screen coordinates to screenshot coordinates. Args: x: X absolute coordinate in screen space y: Y absolute coordinate in screen space Returns: tuple[float, float]: (x, y) absolute coordinates in screenshot space """ if not self._scene_hitboxes: await self.screenshot() # get hitboxes # Try all hitboxes for h in self._scene_hitboxes[::-1]: rect_from = h.get("target") rect_to = h.get("hitbox") if not rect_from or len(rect_from) != 4: continue # check if (x, y) is inside rect_from x0, y0, x1, y1 = rect_from if x0 <= x <= x1 and y0 <= y <= y1: # remap (x, y) to rect_to tx0, ty0, tx1, ty1 = rect_to # calculate offset from x0, y0 offset_x = x - x0 offset_y = y - y0 # remap offset to rect_to tx = tx0 + offset_x ty = ty0 + offset_y return tx, ty return x, y import pyautogui import time async def main(): """Main function demonstrating Diorama usage with multiple desktops and mouse tracking.""" desktop1 = Diorama.create_from_apps(["Discord", "Notes"]) desktop2 = Diorama.create_from_apps(["Terminal"]) img1 = await desktop1.interface.screenshot(as_bytes=False) img2 = await desktop2.interface.screenshot(as_bytes=False) img1.save("app_screenshots/desktop1.png") img2.save("app_screenshots/desktop2.png") # Initialize Diorama desktop desktop3 = Diorama.create_from_apps("Safari") screen_size = await desktop3.interface.get_screen_size() print(screen_size) # Take initial screenshot img = await desktop3.interface.screenshot(as_bytes=False) img.save("app_screenshots/desktop3.png") # Prepare hitboxes and draw on the single screenshot hitboxes = desktop3.interface._scene_hitboxes[::-1] base_img = img.copy() draw = ImageDraw.Draw(base_img) for h in hitboxes: rect = h.get("hitbox") if not rect or len(rect) != 4: continue draw.rectangle(rect, outline="red", width=2) # Track and draw mouse position in real time (single screenshot size) last_mouse_pos = None print("Tracking mouse... Press Ctrl+C to stop.") try: while True: mouse_x, mouse_y = pyautogui.position() if last_mouse_pos != (mouse_x, mouse_y): last_mouse_pos = (mouse_x, mouse_y) # Map to screenshot coordinates sx, sy = await desktop3.interface.to_screenshot_coordinates(mouse_x, mouse_y) # Draw on a copy of the screenshot frame = base_img.copy() frame_draw = ImageDraw.Draw(frame) frame_draw.ellipse((sx-5, sy-5, sx+5, sy+5), fill="blue", outline="blue") # Save the frame frame.save("app_screenshots/desktop3_mouse.png") print(f"Mouse at screen ({mouse_x}, {mouse_y}) -> screenshot ({sx:.1f}, {sy:.1f})") time.sleep(0.05) # Throttle updates to ~20 FPS except KeyboardInterrupt: print("Stopped tracking.") draw.text((rect[0], rect[1]), str(idx), fill="red") canvas.save("app_screenshots/desktop3_hitboxes.png") # move mouse in a square spiral around the screen import math import random step = 20 # pixels per move dot_radius = 10 width = screen_size["width"] height = screen_size["height"] x, y = 0, 10 while x < width and y < height: await desktop3.interface.move_cursor(x, y) img = await desktop3.interface.screenshot(as_bytes=False) draw = ImageDraw.Draw(img) draw.ellipse((x-dot_radius, y-dot_radius, x+dot_radius, y+dot_radius), fill="red") img.save("current.png") await asyncio.sleep(0.03) x += step y = math.sin(x / width * math.pi * 2) * 50 + 25 if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /libs/lume/src/Server/Server.swift: -------------------------------------------------------------------------------- ```swift import Darwin import Foundation import Network // MARK: - Error Types enum PortError: Error, LocalizedError { case alreadyInUse(port: UInt16) var errorDescription: String? { switch self { case .alreadyInUse(let port): return "Port \(port) is already in use by another process" } } } // MARK: - Server Class @MainActor final class Server { // MARK: - Route Type private struct Route { let method: String let path: String let handler: (HTTPRequest) async throws -> HTTPResponse func matches(_ request: HTTPRequest) -> Bool { if method != request.method { return false } // Handle path parameters let routeParts = path.split(separator: "/") let requestParts = request.path.split(separator: "/") if routeParts.count != requestParts.count { return false } for (routePart, requestPart) in zip(routeParts, requestParts) { if routePart.hasPrefix(":") { continue } // Path parameter if routePart != requestPart { return false } } return true } func extractParams(_ request: HTTPRequest) -> [String: String] { var params: [String: String] = [:] let routeParts = path.split(separator: "/") // Split request path to remove query parameters let requestPathOnly = request.path.split(separator: "?", maxSplits: 1)[0] let requestParts = requestPathOnly.split(separator: "/") for (routePart, requestPart) in zip(routeParts, requestParts) { if routePart.hasPrefix(":") { let paramName = String(routePart.dropFirst()) params[paramName] = String(requestPart) } } return params } } // MARK: - Properties private let port: NWEndpoint.Port private let controller: LumeController private var isRunning = false private var listener: NWListener? private var routes: [Route] // MARK: - Initialization init(port: UInt16 = 7777) { self.port = NWEndpoint.Port(rawValue: port)! self.controller = LumeController() self.routes = [] // Define API routes after self is fully initialized self.setupRoutes() } // MARK: - Route Setup private func setupRoutes() { routes = [ Route( method: "GET", path: "/lume/vms", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } // Extract storage from query params if present let storage = self.extractQueryParam(request: request, name: "storage") return try await self.handleListVMs(storage: storage) }), Route( method: "GET", path: "/lume/vms/:name", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } let params = Route( method: "GET", path: "/lume/vms/:name", handler: { _ in HTTPResponse(statusCode: .ok, body: "") } ).extractParams(request) guard let name = params["name"] else { return HTTPResponse(statusCode: .badRequest, body: "Missing VM name") } // Extract storage from query params if present let storage = self.extractQueryParam(request: request, name: "storage") return try await self.handleGetVM(name: name, storage: storage) }), Route( method: "DELETE", path: "/lume/vms/:name", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } let params = Route( method: "DELETE", path: "/lume/vms/:name", handler: { _ in HTTPResponse(statusCode: .ok, body: "") } ).extractParams(request) guard let name = params["name"] else { return HTTPResponse(statusCode: .badRequest, body: "Missing VM name") } // Extract storage from query params if present let storage = self.extractQueryParam(request: request, name: "storage") return try await self.handleDeleteVM(name: name, storage: storage) }), Route( method: "POST", path: "/lume/vms", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } return try await self.handleCreateVM(request.body) }), Route( method: "POST", path: "/lume/vms/clone", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } return try await self.handleCloneVM(request.body) }), Route( method: "PATCH", path: "/lume/vms/:name", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } let params = Route( method: "PATCH", path: "/lume/vms/:name", handler: { _ in HTTPResponse(statusCode: .ok, body: "") } ).extractParams(request) guard let name = params["name"] else { return HTTPResponse(statusCode: .badRequest, body: "Missing VM name") } return try await self.handleSetVM(name: name, body: request.body) }), Route( method: "POST", path: "/lume/vms/:name/run", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } let params = Route( method: "POST", path: "/lume/vms/:name/run", handler: { _ in HTTPResponse(statusCode: .ok, body: "") } ).extractParams(request) guard let name = params["name"] else { return HTTPResponse(statusCode: .badRequest, body: "Missing VM name") } return try await self.handleRunVM(name: name, body: request.body) }), Route( method: "POST", path: "/lume/vms/:name/stop", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } let params = Route( method: "POST", path: "/lume/vms/:name/stop", handler: { _ in HTTPResponse(statusCode: .ok, body: "") } ).extractParams(request) guard let name = params["name"] else { return HTTPResponse(statusCode: .badRequest, body: "Missing VM name") } Logger.info("Processing stop VM request", metadata: ["method": request.method, "path": request.path]) // Extract storage from the request body var storage: String? = nil if let bodyData = request.body, !bodyData.isEmpty { do { if let json = try JSONSerialization.jsonObject(with: bodyData) as? [String: Any], let bodyStorage = json["storage"] as? String { storage = bodyStorage Logger.info("Extracted storage from request body", metadata: ["storage": bodyStorage]) } } catch { Logger.error("Failed to parse request body JSON", metadata: ["error": error.localizedDescription]) } } return try await self.handleStopVM(name: name, storage: storage) }), Route( method: "GET", path: "/lume/ipsw", handler: { [weak self] _ in guard let self else { throw HTTPError.internalError } return try await self.handleIPSW() }), Route( method: "POST", path: "/lume/pull", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } return try await self.handlePull(request.body) }), Route( method: "POST", path: "/lume/prune", handler: { [weak self] _ in guard let self else { throw HTTPError.internalError } return try await self.handlePruneImages() }), Route( method: "GET", path: "/lume/images", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } return try await self.handleGetImages(request) }), // New config endpoint Route( method: "GET", path: "/lume/config", handler: { [weak self] _ in guard let self else { throw HTTPError.internalError } return try await self.handleGetConfig() }), Route( method: "POST", path: "/lume/config", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } return try await self.handleUpdateConfig(request.body) }), Route( method: "GET", path: "/lume/config/locations", handler: { [weak self] _ in guard let self else { throw HTTPError.internalError } return try await self.handleGetLocations() }), Route( method: "POST", path: "/lume/config/locations", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } return try await self.handleAddLocation(request.body) }), Route( method: "DELETE", path: "/lume/config/locations/:name", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } let params = Route( method: "DELETE", path: "/lume/config/locations/:name", handler: { _ in HTTPResponse(statusCode: .ok, body: "") } ).extractParams(request) guard let name = params["name"] else { return HTTPResponse(statusCode: .badRequest, body: "Missing location name") } return try await self.handleRemoveLocation(name) }), // Logs retrieval route Route( method: "GET", path: "/lume/logs", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } // Extract query parameters let type = self.extractQueryParam(request: request, name: "type") // "info", "error", or "all" let linesParam = self.extractQueryParam(request: request, name: "lines") let lines = linesParam.flatMap { Int($0) } // Convert to Int if present return try await self.handleGetLogs(type: type, lines: lines) }), Route( method: "POST", path: "/lume/config/locations/default/:name", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } let params = Route( method: "POST", path: "/lume/config/locations/default/:name", handler: { _ in HTTPResponse(statusCode: .ok, body: "") } ).extractParams(request) guard let name = params["name"] else { return HTTPResponse(statusCode: .badRequest, body: "Missing location name") } return try await self.handleSetDefaultLocation(name) }), Route( method: "POST", path: "/lume/vms/push", handler: { [weak self] request in guard let self else { throw HTTPError.internalError } return try await self.handlePush(request.body) }), ] } // Helper to extract query parameters from the URL private func extractQueryParam(request: HTTPRequest, name: String) -> String? { // Extract only the query part by splitting on '?' let parts = request.path.split(separator: "?", maxSplits: 1) guard parts.count > 1 else { return nil } // No query parameters let queryString = String(parts[1]) // Create a placeholder URL with the query string if let urlComponents = URLComponents(string: "http://placeholder.com?"+queryString), let queryItems = urlComponents.queryItems { return queryItems.first(where: { $0.name == name })?.value?.removingPercentEncoding } return nil } // MARK: - Port Utilities private func isPortAvailable(port: Int) async -> Bool { // Create a socket let socketFD = socket(AF_INET, SOCK_STREAM, 0) if socketFD == -1 { return false } // Set socket options to allow reuse var value: Int32 = 1 if setsockopt( socketFD, SOL_SOCKET, SO_REUSEADDR, &value, socklen_t(MemoryLayout<Int32>.size)) == -1 { close(socketFD) return false } // Set up the address structure var addr = sockaddr_in() addr.sin_family = sa_family_t(AF_INET) addr.sin_port = UInt16(port).bigEndian addr.sin_addr.s_addr = INADDR_ANY.bigEndian // Bind to the port let bindResult = withUnsafePointer(to: &addr) { addrPtr in addrPtr.withMemoryRebound(to: sockaddr.self, capacity: 1) { addrPtr in Darwin.bind(socketFD, addrPtr, socklen_t(MemoryLayout<sockaddr_in>.size)) } } // Clean up close(socketFD) // If bind failed, the port is in use return bindResult == 0 } // MARK: - Server Lifecycle func start() async throws { // First check if the port is already in use if !(await isPortAvailable(port: Int(port.rawValue))) { // Don't log anything here, just throw the error throw PortError.alreadyInUse(port: port.rawValue) } let parameters = NWParameters.tcp listener = try NWListener(using: parameters, on: port) // Create an actor to safely manage state transitions actor StartupState { var error: Error? var isComplete = false func setError(_ error: Error) { self.error = error self.isComplete = true } func setComplete() { self.isComplete = true } func checkStatus() -> (isComplete: Bool, error: Error?) { return (isComplete, error) } } let startupState = StartupState() // Set up a state update handler to detect port binding errors listener?.stateUpdateHandler = { state in Task { switch state { case .setup: // Initial state, no action needed Logger.info("Listener setup", metadata: ["port": "\(self.port.rawValue)"]) break case .waiting(let error): // Log the full error details to see what we're getting Logger.error( "Listener waiting", metadata: [ "error": error.localizedDescription, "debugDescription": error.debugDescription, "localizedDescription": error.localizedDescription, "port": "\(self.port.rawValue)", ]) // Check for different port in use error messages if error.debugDescription.contains("Address already in use") || error.localizedDescription.contains("in use") || error.localizedDescription.contains("address already in use") { Logger.error( "Port conflict detected", metadata: ["port": "\(self.port.rawValue)"]) await startupState.setError( PortError.alreadyInUse(port: self.port.rawValue)) } else { // Wait for a short period to see if the listener recovers // Some network errors are transient try? await Task.sleep(nanoseconds: 1_000_000_000) // 1 second // If we're still waiting after delay, consider it an error if case .waiting = await self.listener?.state { await startupState.setError(error) } } case .failed(let error): // Log the full error details Logger.error( "Listener failed", metadata: [ "error": error.localizedDescription, "debugDescription": error.debugDescription, "port": "\(self.port.rawValue)", ]) await startupState.setError(error) case .ready: // Listener successfully bound to port Logger.info("Listener ready", metadata: ["port": "\(self.port.rawValue)"]) await startupState.setComplete() case .cancelled: // Listener was cancelled Logger.info("Listener cancelled", metadata: ["port": "\(self.port.rawValue)"]) break @unknown default: Logger.info( "Unknown listener state", metadata: ["state": "\(state)", "port": "\(self.port.rawValue)"]) break } } } listener?.newConnectionHandler = { [weak self] connection in Task { @MainActor [weak self] in guard let self else { return } self.handleConnection(connection) } } listener?.start(queue: .main) // Wait for either successful startup or an error var status: (isComplete: Bool, error: Error?) = (false, nil) repeat { try await Task.sleep(nanoseconds: 100_000_000) // 100ms status = await startupState.checkStatus() } while !status.isComplete // If there was a startup error, throw it if let error = status.error { self.stop() throw error } isRunning = true Logger.info("Server started", metadata: ["port": "\(port.rawValue)"]) // Keep the server running while isRunning { try await Task.sleep(nanoseconds: 1_000_000_000) } } func stop() { isRunning = false listener?.cancel() } // MARK: - Connection Handling private func handleConnection(_ connection: NWConnection) { connection.stateUpdateHandler = { [weak self] state in switch state { case .ready: Task { @MainActor [weak self] in guard let self else { return } self.receiveData(connection) } case .failed(let error): Logger.error("Connection failed", metadata: ["error": error.localizedDescription]) connection.cancel() case .cancelled: // Connection is already cancelled, no need to cancel again break default: break } } connection.start(queue: .main) } private func receiveData(_ connection: NWConnection) { connection.receive(minimumIncompleteLength: 1, maximumLength: 65536) { [weak self] content, _, isComplete, error in if let error = error { Logger.error("Receive error", metadata: ["error": error.localizedDescription]) connection.cancel() return } guard let data = content, !data.isEmpty else { if isComplete { connection.cancel() } return } Task { @MainActor [weak self] in guard let self else { return } do { let response = try await self.handleRequest(data) self.send(response, on: connection) } catch { let errorResponse = self.errorResponse(error) self.send(errorResponse, on: connection) } } } } private func send(_ response: HTTPResponse, on connection: NWConnection) { let data = response.serialize() Logger.info( "Serialized response", metadata: ["data": String(data: data, encoding: .utf8) ?? ""]) connection.send( content: data, completion: .contentProcessed { [weak connection] error in if let error = error { Logger.error( "Failed to send response", metadata: ["error": error.localizedDescription]) } else { Logger.info("Response sent successfully") } if connection?.state != .cancelled { connection?.cancel() } }) } // MARK: - Request Handling private func handleRequest(_ data: Data) async throws -> HTTPResponse { Logger.info( "Received request data", metadata: ["data": String(data: data, encoding: .utf8) ?? ""]) guard let request = HTTPRequest(data: data) else { Logger.error("Failed to parse request") return HTTPResponse(statusCode: .badRequest, body: "Invalid request") } Logger.info( "Parsed request", metadata: [ "method": request.method, "path": request.path, "headers": "\(request.headers)", "body": String(data: request.body ?? Data(), encoding: .utf8) ?? "", ]) // Find matching route guard let route = routes.first(where: { $0.matches(request) }) else { return HTTPResponse(statusCode: .notFound, body: "Not found") } // Handle the request let response = try await route.handler(request) Logger.info( "Sending response", metadata: [ "statusCode": "\(response.statusCode.rawValue)", "headers": "\(response.headers)", "body": String(data: response.body ?? Data(), encoding: .utf8) ?? "", ]) return response } private func errorResponse(_ error: Error) -> HTTPResponse { HTTPResponse( statusCode: .internalServerError, headers: ["Content-Type": "application/json"], body: try! JSONEncoder().encode(APIError(message: error.localizedDescription)) ) } } ``` -------------------------------------------------------------------------------- /libs/typescript/computer/tests/interface/macos.test.ts: -------------------------------------------------------------------------------- ```typescript import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { WebSocket, WebSocketServer } from 'ws'; import { MacOSComputerInterface } from '../../src/interface/macos.ts'; describe('MacOSComputerInterface', () => { // Define test parameters const testParams = { ipAddress: 'localhost', username: 'testuser', password: 'testpass', // apiKey: "test-api-key", No API Key for local testing vmName: 'test-vm', }; // WebSocket server mock let wss: WebSocketServer; let serverPort: number; let connectedClients: WebSocket[] = []; // Track received messages for verification interface ReceivedMessage { action: string; [key: string]: unknown; } let receivedMessages: ReceivedMessage[] = []; // Set up WebSocket server before all tests beforeEach(async () => { receivedMessages = []; connectedClients = []; // Create WebSocket server on a random available port wss = new WebSocketServer({ port: 0 }); serverPort = (wss.address() as { port: number }).port; // Update test params with the actual server address testParams.ipAddress = `localhost:${serverPort}`; // Handle WebSocket connections wss.on('connection', (ws) => { connectedClients.push(ws); // Handle incoming messages ws.on('message', (data) => { try { const message = JSON.parse(data.toString()); receivedMessages.push(message); // Send appropriate responses based on action switch (message.command) { case 'screenshot': ws.send( JSON.stringify({ image_data: Buffer.from('fake-screenshot-data').toString( 'base64' ), success: true, }) ); break; case 'get_screen_size': ws.send( JSON.stringify({ size: { width: 1920, height: 1080 }, success: true, }) ); break; case 'get_cursor_position': ws.send( JSON.stringify({ position: { x: 100, y: 200 }, success: true, }) ); break; case 'copy_to_clipboard': ws.send( JSON.stringify({ content: 'clipboard content', success: true, }) ); break; case 'file_exists': ws.send( JSON.stringify({ exists: true, success: true, }) ); break; case 'directory_exists': ws.send( JSON.stringify({ exists: true, success: true, }) ); break; case 'list_dir': ws.send( JSON.stringify({ files: ['file1.txt', 'file2.txt'], success: true, }) ); break; case 'read_text': ws.send( JSON.stringify({ content: 'file content', success: true, }) ); break; case 'read_bytes': ws.send( JSON.stringify({ content_b64: Buffer.from('binary content').toString('base64'), success: true, }) ); break; case 'run_command': ws.send( JSON.stringify({ stdout: 'command output', stderr: '', success: true, }) ); break; case 'get_accessibility_tree': ws.send( JSON.stringify({ role: 'window', title: 'Test Window', bounds: { x: 0, y: 0, width: 1920, height: 1080 }, children: [], success: true, }) ); break; case 'to_screen_coordinates': case 'to_screenshot_coordinates': ws.send( JSON.stringify({ coordinates: [message.params?.x || 0, message.params?.y || 0], success: true, }) ); break; default: // For all other actions, just send success ws.send(JSON.stringify({ success: true })); break; } } catch (error) { ws.send(JSON.stringify({ error: (error as Error).message })); } }); ws.on('error', (error) => { console.error('WebSocket error:', error); }); }); }); // Clean up WebSocket server after each test afterEach(async () => { // Close all connected clients for (const client of connectedClients) { if (client.readyState === WebSocket.OPEN) { client.close(); } } // Close the server await new Promise<void>((resolve) => { wss.close(() => resolve()); }); }); describe('Connection Management', () => { it('should connect with proper authentication headers', async () => { const macosInterface = new MacOSComputerInterface( testParams.ipAddress, testParams.username, testParams.password, undefined, testParams.vmName ); await macosInterface.connect(); // Verify the interface is connected expect(macosInterface.isConnected()).toBe(true); expect(connectedClients.length).toBe(1); await macosInterface.disconnect(); }); it('should handle connection without API key', async () => { // Create a separate server that doesn't check auth const noAuthWss = new WebSocketServer({ port: 0 }); const noAuthPort = (noAuthWss.address() as { port: number }).port; noAuthWss.on('connection', (ws) => { ws.on('message', () => { ws.send(JSON.stringify({ success: true })); }); }); const macosInterface = new MacOSComputerInterface( `localhost:${noAuthPort}`, testParams.username, testParams.password, undefined, undefined ); await macosInterface.connect(); expect(macosInterface.isConnected()).toBe(true); await macosInterface.disconnect(); await new Promise<void>((resolve) => { noAuthWss.close(() => resolve()); }); }); }); describe('Mouse Actions', () => { let macosInterface: MacOSComputerInterface; beforeEach(async () => { macosInterface = new MacOSComputerInterface( testParams.ipAddress, testParams.username, testParams.password, undefined, testParams.vmName ); await macosInterface.connect(); }); afterEach(async () => { if (macosInterface) { await macosInterface.disconnect(); } }); it('should send mouse_down command', async () => { await macosInterface.mouseDown(100, 200, 'left'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'mouse_down', params: { x: 100, y: 200, button: 'left', }, }); }); it('should send mouse_up command', async () => { await macosInterface.mouseUp(100, 200, 'right'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'mouse_up', params: { x: 100, y: 200, button: 'right', }, }); }); it('should send left_click command', async () => { await macosInterface.leftClick(150, 250); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'left_click', params: { x: 150, y: 250, }, }); }); it('should send right_click command', async () => { await macosInterface.rightClick(200, 300); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'right_click', params: { x: 200, y: 300, }, }); }); it('should send double_click command', async () => { await macosInterface.doubleClick(250, 350); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'double_click', params: { x: 250, y: 350, }, }); }); it('should send move_cursor command', async () => { await macosInterface.moveCursor(300, 400); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'move_cursor', params: { x: 300, y: 400, }, }); }); it('should send drag_to command', async () => { await macosInterface.dragTo(400, 500, 'left', 1.5); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'drag_to', params: { x: 400, y: 500, button: 'left', duration: 1.5, }, }); }); it('should send drag command with path', async () => { const path: Array<[number, number]> = [ [100, 100], [200, 200], [300, 300], ]; await macosInterface.drag(path, 'middle', 2.0); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'drag', params: { path: path, button: 'middle', duration: 2.0, }, }); }); }); describe('Keyboard Actions', () => { let macosInterface: MacOSComputerInterface; beforeEach(async () => { macosInterface = new MacOSComputerInterface( testParams.ipAddress, testParams.username, testParams.password, undefined, testParams.vmName ); await macosInterface.connect(); }); afterEach(async () => { if (macosInterface) { await macosInterface.disconnect(); } }); it('should send key_down command', async () => { await macosInterface.keyDown('a'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'key_down', params: { key: 'a', }, }); }); it('should send key_up command', async () => { await macosInterface.keyUp('b'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'key_up', params: { key: 'b', }, }); }); it('should send type_text command', async () => { await macosInterface.typeText('Hello, World!'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'type_text', params: { text: 'Hello, World!', }, }); }); it('should send press_key command', async () => { await macosInterface.pressKey('enter'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'press_key', params: { key: 'enter', }, }); }); it('should send hotkey command', async () => { await macosInterface.hotkey('cmd', 'c'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'hotkey', params: { keys: ['cmd', 'c'], }, }); }); }); describe('Scrolling Actions', () => { let macosInterface: MacOSComputerInterface; beforeEach(async () => { macosInterface = new MacOSComputerInterface( testParams.ipAddress, testParams.username, testParams.password, undefined, testParams.vmName ); await macosInterface.connect(); }); afterEach(async () => { if (macosInterface) { await macosInterface.disconnect(); } }); it('should send scroll command', async () => { await macosInterface.scroll(10, -5); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'scroll', params: { x: 10, y: -5, }, }); }); it('should send scroll_down command', async () => { await macosInterface.scrollDown(3); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'scroll_down', params: { clicks: 3, }, }); }); it('should send scroll_up command', async () => { await macosInterface.scrollUp(2); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'scroll_up', params: { clicks: 2, }, }); }); }); describe('Screen Actions', () => { let macosInterface: MacOSComputerInterface; beforeEach(async () => { macosInterface = new MacOSComputerInterface( testParams.ipAddress, testParams.username, testParams.password, undefined, testParams.vmName ); await macosInterface.connect(); }); afterEach(async () => { if (macosInterface) { await macosInterface.disconnect(); } }); it('should get screenshot', async () => { const screenshot = await macosInterface.screenshot(); expect(screenshot).toBeInstanceOf(Buffer); expect(screenshot.toString()).toBe('fake-screenshot-data'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'screenshot', params: {}, }); }); it('should get screen size', async () => { const size = await macosInterface.getScreenSize(); expect(size).toEqual({ width: 1920, height: 1080 }); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'get_screen_size', params: {}, }); }); it('should get cursor position', async () => { const position = await macosInterface.getCursorPosition(); expect(position).toEqual({ x: 100, y: 200 }); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'get_cursor_position', params: {}, }); }); }); describe('Clipboard Actions', () => { let macosInterface: MacOSComputerInterface; beforeEach(async () => { macosInterface = new MacOSComputerInterface( testParams.ipAddress, testParams.username, testParams.password, undefined, testParams.vmName ); await macosInterface.connect(); }); afterEach(async () => { if (macosInterface) { await macosInterface.disconnect(); } }); it('should copy to clipboard', async () => { const text = await macosInterface.copyToClipboard(); expect(text).toBe('clipboard content'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'copy_to_clipboard', params: {}, }); }); it('should set clipboard', async () => { await macosInterface.setClipboard('new clipboard text'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'set_clipboard', params: { text: 'new clipboard text', }, }); }); }); describe('File System Actions', () => { let macosInterface: MacOSComputerInterface; beforeEach(async () => { macosInterface = new MacOSComputerInterface( testParams.ipAddress, testParams.username, testParams.password, undefined, testParams.vmName ); await macosInterface.connect(); }); afterEach(async () => { if (macosInterface) { await macosInterface.disconnect(); } }); it('should check file exists', async () => { const exists = await macosInterface.fileExists('/path/to/file'); expect(exists).toBe(true); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'file_exists', params: { path: '/path/to/file', }, }); }); it('should check directory exists', async () => { const exists = await macosInterface.directoryExists('/path/to/dir'); expect(exists).toBe(true); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'directory_exists', params: { path: '/path/to/dir', }, }); }); it('should list directory', async () => { const files = await macosInterface.listDir('/path/to/dir'); expect(files).toEqual(['file1.txt', 'file2.txt']); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'list_dir', params: { path: '/path/to/dir', }, }); }); it('should read text file', async () => { const content = await macosInterface.readText('/path/to/file.txt'); expect(content).toBe('file content'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'read_text', params: { path: '/path/to/file.txt', }, }); }); it('should write text file', async () => { await macosInterface.writeText('/path/to/file.txt', 'new content'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'write_text', params: { path: '/path/to/file.txt', content: 'new content', }, }); }); it('should read binary file', async () => { const content = await macosInterface.readBytes('/path/to/file.bin'); expect(content).toBeInstanceOf(Buffer); expect(content.toString()).toBe('binary content'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'read_bytes', params: { path: '/path/to/file.bin', }, }); }); it('should write binary file', async () => { const buffer = Buffer.from('binary data'); await macosInterface.writeBytes('/path/to/file.bin', buffer); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'write_bytes', params: { path: '/path/to/file.bin', content_b64: buffer.toString('base64'), }, }); }); it('should delete file', async () => { await macosInterface.deleteFile('/path/to/file'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'delete_file', params: { path: '/path/to/file', }, }); }); it('should create directory', async () => { await macosInterface.createDir('/path/to/new/dir'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'create_dir', params: { path: '/path/to/new/dir', }, }); }); it('should delete directory', async () => { await macosInterface.deleteDir('/path/to/dir'); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'delete_dir', params: { path: '/path/to/dir', }, }); }); it('should run command', async () => { const [stdout, stderr] = await macosInterface.runCommand('ls -la'); expect(stdout).toBe('command output'); expect(stderr).toBe(''); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'run_command', params: { command: 'ls -la', }, }); }); }); describe('Accessibility Actions', () => { let macosInterface: MacOSComputerInterface; beforeEach(async () => { macosInterface = new MacOSComputerInterface( testParams.ipAddress, testParams.username, testParams.password, undefined, testParams.vmName ); await macosInterface.connect(); }); afterEach(async () => { if (macosInterface) { await macosInterface.disconnect(); } }); it('should get accessibility tree', async () => { const tree = await macosInterface.getAccessibilityTree(); expect(tree).toEqual({ role: 'window', title: 'Test Window', bounds: { x: 0, y: 0, width: 1920, height: 1080 }, children: [], success: true, }); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'get_accessibility_tree', params: {}, }); }); it('should convert to screen coordinates', async () => { const [x, y] = await macosInterface.toScreenCoordinates(100, 200); expect(x).toBe(100); expect(y).toBe(200); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'to_screen_coordinates', params: { x: 100, y: 200, }, }); }); it('should convert to screenshot coordinates', async () => { const [x, y] = await macosInterface.toScreenshotCoordinates(300, 400); expect(x).toBe(300); expect(y).toBe(400); const lastMessage = receivedMessages[receivedMessages.length - 1]; expect(lastMessage).toEqual({ command: 'to_screenshot_coordinates', params: { x: 300, y: 400, }, }); }); }); describe('Error Handling', () => { it('should handle WebSocket connection errors', async () => { // Use a valid but unreachable IP to avoid DNS errors const macosInterface = new MacOSComputerInterface( 'localhost:9999', testParams.username, testParams.password, undefined, testParams.vmName ); // Connection should fail await expect(macosInterface.connect()).rejects.toThrow(); }); it('should handle command errors', async () => { // Create a server that returns errors const errorWss = new WebSocketServer({ port: 0 }); const errorPort = (errorWss.address() as { port: number }).port; errorWss.on('connection', (ws) => { ws.on('message', () => { ws.send(JSON.stringify({ error: 'Command failed', success: false })); }); }); const macosInterface = new MacOSComputerInterface( `localhost:${errorPort}`, testParams.username, testParams.password, undefined, testParams.vmName ); await macosInterface.connect(); // Command should throw error await expect(macosInterface.leftClick(100, 100)).rejects.toThrow( 'Command failed' ); await macosInterface.disconnect(); await new Promise<void>((resolve) => { errorWss.close(() => resolve()); }); }); it('should handle disconnection gracefully', async () => { const macosInterface = new MacOSComputerInterface( testParams.ipAddress, testParams.username, testParams.password, undefined, testParams.vmName ); await macosInterface.connect(); expect(macosInterface.isConnected()).toBe(true); // Disconnect macosInterface.disconnect(); expect(macosInterface.isConnected()).toBe(false); // Should reconnect automatically on next command await macosInterface.leftClick(100, 100); expect(macosInterface.isConnected()).toBe(true); await macosInterface.disconnect(); }); it('should handle force close', async () => { const macosInterface = new MacOSComputerInterface( testParams.ipAddress, testParams.username, testParams.password, undefined, testParams.vmName ); await macosInterface.connect(); expect(macosInterface.isConnected()).toBe(true); // Force close macosInterface.forceClose(); expect(macosInterface.isConnected()).toBe(false); }); }); }); ``` -------------------------------------------------------------------------------- /libs/python/computer-server/computer_server/handlers/windows.py: -------------------------------------------------------------------------------- ```python """ Windows implementation of automation and accessibility handlers. This implementation uses pyautogui for GUI automation and Windows-specific APIs for accessibility and system operations. """ from typing import Dict, Any, List, Tuple, Optional import logging import subprocess import asyncio import base64 import os from io import BytesIO from pynput.mouse import Controller as MouseController from pynput.keyboard import Controller as KeyboardController # Configure logger logger = logging.getLogger(__name__) # Try to import pyautogui try: import pyautogui pyautogui.FAILSAFE = False logger.info("pyautogui successfully imported, GUI automation available") except Exception as e: logger.error(f"pyautogui import failed: {str(e)}. GUI operations will not work.") pyautogui = None # Try to import Windows-specific modules try: import win32gui import win32con import win32api logger.info("Windows API modules successfully imported") WINDOWS_API_AVAILABLE = True except Exception as e: logger.error(f"Windows API modules import failed: {str(e)}. Some Windows-specific features will be unavailable.") WINDOWS_API_AVAILABLE = False from .base import BaseAccessibilityHandler, BaseAutomationHandler class WindowsAccessibilityHandler(BaseAccessibilityHandler): """Windows implementation of accessibility handler.""" async def get_accessibility_tree(self) -> Dict[str, Any]: """Get the accessibility tree of the current window. Returns: Dict[str, Any]: A dictionary containing the success status and either the accessibility tree or an error message. Structure: {"success": bool, "tree": dict} or {"success": bool, "error": str} """ if not WINDOWS_API_AVAILABLE: return {"success": False, "error": "Windows API not available"} try: # Get the foreground window hwnd = win32gui.GetForegroundWindow() if not hwnd: return {"success": False, "error": "No foreground window found"} # Get window information window_text = win32gui.GetWindowText(hwnd) rect = win32gui.GetWindowRect(hwnd) tree = { "role": "Window", "title": window_text, "position": {"x": rect[0], "y": rect[1]}, "size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]}, "children": [] } # Enumerate child windows def enum_child_proc(hwnd_child, children_list): """Callback function to enumerate child windows and collect their information. Args: hwnd_child: Handle to the child window being enumerated. children_list: List to append child window information to. Returns: bool: True to continue enumeration, False to stop. """ try: child_text = win32gui.GetWindowText(hwnd_child) child_rect = win32gui.GetWindowRect(hwnd_child) child_class = win32gui.GetClassName(hwnd_child) child_info = { "role": child_class, "title": child_text, "position": {"x": child_rect[0], "y": child_rect[1]}, "size": {"width": child_rect[2] - child_rect[0], "height": child_rect[3] - child_rect[1]}, "children": [] } children_list.append(child_info) except Exception as e: logger.debug(f"Error getting child window info: {e}") return True win32gui.EnumChildWindows(hwnd, enum_child_proc, tree["children"]) return {"success": True, "tree": tree} except Exception as e: logger.error(f"Error getting accessibility tree: {e}") return {"success": False, "error": str(e)} async def find_element(self, role: Optional[str] = None, title: Optional[str] = None, value: Optional[str] = None) -> Dict[str, Any]: """Find an element in the accessibility tree by criteria. Args: role (Optional[str]): The role or class name of the element to find. title (Optional[str]): The title or text of the element to find. value (Optional[str]): The value of the element (not used in Windows implementation). Returns: Dict[str, Any]: A dictionary containing the success status and either the found element or an error message. Structure: {"success": bool, "element": dict} or {"success": bool, "error": str} """ if not WINDOWS_API_AVAILABLE: return {"success": False, "error": "Windows API not available"} try: # Find window by title if specified if title: hwnd = win32gui.FindWindow(None, title) if hwnd: rect = win32gui.GetWindowRect(hwnd) return { "success": True, "element": { "role": "Window", "title": title, "position": {"x": rect[0], "y": rect[1]}, "size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]} } } # Find window by class name if role is specified if role: hwnd = win32gui.FindWindow(role, None) if hwnd: window_text = win32gui.GetWindowText(hwnd) rect = win32gui.GetWindowRect(hwnd) return { "success": True, "element": { "role": role, "title": window_text, "position": {"x": rect[0], "y": rect[1]}, "size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]} } } return {"success": False, "error": "Element not found"} except Exception as e: logger.error(f"Error finding element: {e}") return {"success": False, "error": str(e)} class WindowsAutomationHandler(BaseAutomationHandler): """Windows implementation of automation handler using pyautogui and Windows APIs.""" mouse = MouseController() keyboard = KeyboardController() # Mouse Actions async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]: """Press and hold a mouse button at the specified coordinates. Args: x (Optional[int]): The x-coordinate to move to before pressing. If None, uses current position. y (Optional[int]): The y-coordinate to move to before pressing. If None, uses current position. button (str): The mouse button to press ("left", "right", or "middle"). Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: if x is not None and y is not None: pyautogui.moveTo(x, y) pyautogui.mouseDown(button=button) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]: """Release a mouse button at the specified coordinates. Args: x (Optional[int]): The x-coordinate to move to before releasing. If None, uses current position. y (Optional[int]): The y-coordinate to move to before releasing. If None, uses current position. button (str): The mouse button to release ("left", "right", or "middle"). Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: if x is not None and y is not None: pyautogui.moveTo(x, y) pyautogui.mouseUp(button=button) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def move_cursor(self, x: int, y: int) -> Dict[str, Any]: """Move the mouse cursor to the specified coordinates. Args: x (int): The x-coordinate to move to. y (int): The y-coordinate to move to. Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: pyautogui.moveTo(x, y) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: """Perform a left mouse click at the specified coordinates. Args: x (Optional[int]): The x-coordinate to click at. If None, clicks at current position. y (Optional[int]): The y-coordinate to click at. If None, clicks at current position. Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: if x is not None and y is not None: pyautogui.moveTo(x, y) pyautogui.click() return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: """Perform a right mouse click at the specified coordinates. Args: x (Optional[int]): The x-coordinate to click at. If None, clicks at current position. y (Optional[int]): The y-coordinate to click at. If None, clicks at current position. Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: if x is not None and y is not None: pyautogui.moveTo(x, y) pyautogui.rightClick() return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: """Perform a double left mouse click at the specified coordinates. Args: x (Optional[int]): The x-coordinate to double-click at. If None, clicks at current position. y (Optional[int]): The y-coordinate to double-click at. If None, clicks at current position. Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: if x is not None and y is not None: pyautogui.moveTo(x, y) pyautogui.doubleClick(interval=0.1) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]: """Drag from the current position to the specified coordinates. Args: x (int): The x-coordinate to drag to. y (int): The y-coordinate to drag to. button (str): The mouse button to use for dragging ("left", "right", or "middle"). duration (float): The time in seconds to take for the drag operation. Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: pyautogui.dragTo(x, y, duration=duration, button=button) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]: """Drag the mouse through a series of coordinates. Args: path (List[Tuple[int, int]]): A list of (x, y) coordinate tuples to drag through. button (str): The mouse button to use for dragging ("left", "right", or "middle"). duration (float): The total time in seconds for the entire drag operation. Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: if not path: return {"success": False, "error": "Path is empty"} # Move to first position pyautogui.moveTo(*path[0]) # Drag through all positions for x, y in path[1:]: pyautogui.dragTo(x, y, duration=duration/len(path), button=button) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} # Keyboard Actions async def key_down(self, key: str) -> Dict[str, Any]: """Press and hold a keyboard key. Args: key (str): The key to press down (e.g., 'ctrl', 'shift', 'a'). Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: pyautogui.keyDown(key) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def key_up(self, key: str) -> Dict[str, Any]: """Release a keyboard key. Args: key (str): The key to release (e.g., 'ctrl', 'shift', 'a'). Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: pyautogui.keyUp(key) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def type_text(self, text: str) -> Dict[str, Any]: """Type the specified text. Args: text (str): The text to type. Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ try: # use pynput for Unicode support self.keyboard.type(text) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def press_key(self, key: str) -> Dict[str, Any]: """Press and release a keyboard key. Args: key (str): The key to press (e.g., 'enter', 'space', 'tab'). Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: pyautogui.press(key) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def hotkey(self, keys: List[str]) -> Dict[str, Any]: """Press a combination of keys simultaneously. Args: keys (List[str]): The keys to press together (e.g., ['ctrl', 'c'], ['alt', 'tab']). Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: pyautogui.hotkey(*keys) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} # Scrolling Actions async def scroll(self, x: int, y: int) -> Dict[str, Any]: """Scroll vertically at the current cursor position. Args: x (int): Horizontal scroll amount (not used in pyautogui implementation). y (int): Vertical scroll amount. Positive values scroll up, negative values scroll down. Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: self.mouse.scroll(x, y) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]: """Scroll down by the specified number of clicks. Args: clicks (int): The number of scroll clicks to perform downward. Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: pyautogui.scroll(-clicks) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]: """Scroll up by the specified number of clicks. Args: clicks (int): The number of scroll clicks to perform upward. Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: pyautogui.scroll(clicks) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} # Screen Actions async def screenshot(self) -> Dict[str, Any]: """Capture a screenshot of the entire screen. Returns: Dict[str, Any]: A dictionary containing the success status and either base64-encoded image data or an error message. Structure: {"success": bool, "image_data": str} or {"success": bool, "error": str} """ if not pyautogui: return {"success": False, "error": "pyautogui not available"} try: from PIL import Image screenshot = pyautogui.screenshot() if not isinstance(screenshot, Image.Image): return {"success": False, "error": "Failed to capture screenshot"} buffered = BytesIO() screenshot.save(buffered, format="PNG", optimize=True) buffered.seek(0) image_data = base64.b64encode(buffered.getvalue()).decode() return {"success": True, "image_data": image_data} except Exception as e: return {"success": False, "error": f"Screenshot error: {str(e)}"} async def get_screen_size(self) -> Dict[str, Any]: """Get the size of the screen in pixels. Returns: Dict[str, Any]: A dictionary containing the success status and either screen size information or an error message. Structure: {"success": bool, "size": {"width": int, "height": int}} or {"success": bool, "error": str} """ try: if pyautogui: size = pyautogui.size() return {"success": True, "size": {"width": size.width, "height": size.height}} elif WINDOWS_API_AVAILABLE: # Fallback to Windows API width = win32api.GetSystemMetrics(win32con.SM_CXSCREEN) height = win32api.GetSystemMetrics(win32con.SM_CYSCREEN) return {"success": True, "size": {"width": width, "height": height}} else: return {"success": False, "error": "No screen size detection method available"} except Exception as e: return {"success": False, "error": str(e)} async def get_cursor_position(self) -> Dict[str, Any]: """Get the current position of the mouse cursor. Returns: Dict[str, Any]: A dictionary containing the success status and either cursor position or an error message. Structure: {"success": bool, "position": {"x": int, "y": int}} or {"success": bool, "error": str} """ try: if pyautogui: pos = pyautogui.position() return {"success": True, "position": {"x": pos.x, "y": pos.y}} elif WINDOWS_API_AVAILABLE: # Fallback to Windows API pos = win32gui.GetCursorPos() return {"success": True, "position": {"x": pos[0], "y": pos[1]}} else: return {"success": False, "error": "No cursor position detection method available"} except Exception as e: return {"success": False, "error": str(e)} # Clipboard Actions async def copy_to_clipboard(self) -> Dict[str, Any]: """Get the current content of the clipboard. Returns: Dict[str, Any]: A dictionary containing the success status and either clipboard content or an error message. Structure: {"success": bool, "content": str} or {"success": bool, "error": str} """ try: import pyperclip content = pyperclip.paste() return {"success": True, "content": content} except Exception as e: return {"success": False, "error": str(e)} async def set_clipboard(self, text: str) -> Dict[str, Any]: """Set the clipboard content to the specified text. Args: text (str): The text to copy to the clipboard. Returns: Dict[str, Any]: A dictionary with success status and optional error message. """ try: import pyperclip pyperclip.copy(text) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} # Command Execution async def run_command(self, command: str) -> Dict[str, Any]: """Execute a shell command asynchronously. Args: command (str): The shell command to execute. Returns: Dict[str, Any]: A dictionary containing the success status and either command output or an error message. Structure: {"success": bool, "stdout": str, "stderr": str, "return_code": int} or {"success": bool, "error": str} """ try: # Create subprocess process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Wait for the subprocess to finish stdout, stderr = await process.communicate() # Return decoded output return { "success": True, "stdout": stdout.decode() if stdout else "", "stderr": stderr.decode() if stderr else "", "return_code": process.returncode } except Exception as e: return {"success": False, "error": str(e)} ```