This is page 10 of 16. Use http://codebase.md/trycua/cua?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .all-contributorsrc ├── .cursorignore ├── .devcontainer │ ├── devcontainer.json │ ├── post-install.sh │ └── README.md ├── .dockerignore ├── .gitattributes ├── .github │ ├── FUNDING.yml │ ├── scripts │ │ ├── get_pyproject_version.py │ │ └── tests │ │ ├── __init__.py │ │ ├── README.md │ │ └── test_get_pyproject_version.py │ └── workflows │ ├── ci-lume.yml │ ├── docker-publish-kasm.yml │ ├── docker-publish-xfce.yml │ ├── docker-reusable-publish.yml │ ├── npm-publish-computer.yml │ ├── npm-publish-core.yml │ ├── publish-lume.yml │ ├── pypi-publish-agent.yml │ ├── pypi-publish-computer-server.yml │ ├── pypi-publish-computer.yml │ ├── pypi-publish-core.yml │ ├── pypi-publish-mcp-server.yml │ ├── pypi-publish-pylume.yml │ ├── pypi-publish-som.yml │ ├── pypi-reusable-publish.yml │ └── test-validation-script.yml ├── .gitignore ├── .vscode │ ├── docs.code-workspace │ ├── launch.json │ ├── libs-ts.code-workspace │ ├── lume.code-workspace │ ├── lumier.code-workspace │ ├── py.code-workspace │ └── settings.json ├── blog │ ├── app-use.md │ ├── assets │ │ ├── composite-agents.png │ │ ├── docker-ubuntu-support.png │ │ ├── hack-booth.png │ │ ├── hack-closing-ceremony.jpg │ │ ├── hack-cua-ollama-hud.jpeg │ │ ├── hack-leaderboard.png │ │ ├── hack-the-north.png │ │ ├── hack-winners.jpeg │ │ ├── hack-workshop.jpeg │ │ ├── hud-agent-evals.png │ │ └── trajectory-viewer.jpeg │ ├── bringing-computer-use-to-the-web.md │ ├── build-your-own-operator-on-macos-1.md │ ├── build-your-own-operator-on-macos-2.md │ ├── composite-agents.md │ ├── cua-hackathon.md │ ├── hack-the-north.md │ ├── hud-agent-evals.md │ ├── human-in-the-loop.md │ ├── introducing-cua-cloud-containers.md │ ├── lume-to-containerization.md │ ├── sandboxed-python-execution.md │ ├── training-computer-use-models-trajectories-1.md │ ├── trajectory-viewer.md │ ├── ubuntu-docker-support.md │ └── windows-sandbox.md ├── CONTRIBUTING.md ├── Development.md ├── Dockerfile ├── docs │ ├── .gitignore │ ├── .prettierrc │ ├── content │ │ └── docs │ │ ├── agent-sdk │ │ │ ├── agent-loops.mdx │ │ │ ├── benchmarks │ │ │ │ ├── index.mdx │ │ │ │ ├── interactive.mdx │ │ │ │ ├── introduction.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── osworld-verified.mdx │ │ │ │ ├── screenspot-pro.mdx │ │ │ │ └── screenspot-v2.mdx │ │ │ ├── callbacks │ │ │ │ ├── agent-lifecycle.mdx │ │ │ │ ├── cost-saving.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── logging.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── pii-anonymization.mdx │ │ │ │ └── trajectories.mdx │ │ │ ├── chat-history.mdx │ │ │ ├── custom-computer-handlers.mdx │ │ │ ├── custom-tools.mdx │ │ │ ├── customizing-computeragent.mdx │ │ │ ├── integrations │ │ │ │ ├── hud.mdx │ │ │ │ └── meta.json │ │ │ ├── message-format.mdx │ │ │ ├── meta.json │ │ │ ├── migration-guide.mdx │ │ │ ├── prompt-caching.mdx │ │ │ ├── supported-agents │ │ │ │ ├── composed-agents.mdx │ │ │ │ ├── computer-use-agents.mdx │ │ │ │ ├── grounding-models.mdx │ │ │ │ ├── human-in-the-loop.mdx │ │ │ │ └── meta.json │ │ │ ├── supported-model-providers │ │ │ │ ├── index.mdx │ │ │ │ └── local-models.mdx │ │ │ └── usage-tracking.mdx │ │ ├── computer-sdk │ │ │ ├── cloud-vm-management.mdx │ │ │ ├── commands.mdx │ │ │ ├── computer-ui.mdx │ │ │ ├── computers.mdx │ │ │ ├── meta.json │ │ │ └── sandboxed-python.mdx │ │ ├── index.mdx │ │ ├── libraries │ │ │ ├── agent │ │ │ │ └── index.mdx │ │ │ ├── computer │ │ │ │ └── index.mdx │ │ │ ├── computer-server │ │ │ │ ├── Commands.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── REST-API.mdx │ │ │ │ └── WebSocket-API.mdx │ │ │ ├── core │ │ │ │ └── index.mdx │ │ │ ├── lume │ │ │ │ ├── cli-reference.mdx │ │ │ │ ├── faq.md │ │ │ │ ├── http-api.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── meta.json │ │ │ │ └── prebuilt-images.mdx │ │ │ ├── lumier │ │ │ │ ├── building-lumier.mdx │ │ │ │ ├── docker-compose.mdx │ │ │ │ ├── docker.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ └── meta.json │ │ │ ├── mcp-server │ │ │ │ ├── client-integrations.mdx │ │ │ │ ├── configuration.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── llm-integrations.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── tools.mdx │ │ │ │ └── usage.mdx │ │ │ └── som │ │ │ ├── configuration.mdx │ │ │ └── index.mdx │ │ ├── meta.json │ │ ├── quickstart-cli.mdx │ │ ├── quickstart-devs.mdx │ │ └── telemetry.mdx │ ├── next.config.mjs │ ├── package-lock.json │ ├── package.json │ ├── pnpm-lock.yaml │ ├── postcss.config.mjs │ ├── public │ │ └── img │ │ ├── agent_gradio_ui.png │ │ ├── agent.png │ │ ├── cli.png │ │ ├── computer.png │ │ ├── som_box_threshold.png │ │ └── som_iou_threshold.png │ ├── README.md │ ├── source.config.ts │ ├── src │ │ ├── app │ │ │ ├── (home) │ │ │ │ ├── [[...slug]] │ │ │ │ │ └── page.tsx │ │ │ │ └── layout.tsx │ │ │ ├── api │ │ │ │ └── search │ │ │ │ └── route.ts │ │ │ ├── favicon.ico │ │ │ ├── global.css │ │ │ ├── layout.config.tsx │ │ │ ├── layout.tsx │ │ │ ├── llms.mdx │ │ │ │ └── [[...slug]] │ │ │ │ └── route.ts │ │ │ └── llms.txt │ │ │ └── route.ts │ │ ├── assets │ │ │ ├── discord-black.svg │ │ │ ├── discord-white.svg │ │ │ ├── logo-black.svg │ │ │ └── logo-white.svg │ │ ├── components │ │ │ ├── iou.tsx │ │ │ └── mermaid.tsx │ │ ├── lib │ │ │ ├── llms.ts │ │ │ └── source.ts │ │ └── mdx-components.tsx │ └── tsconfig.json ├── examples │ ├── agent_examples.py │ ├── agent_ui_examples.py │ ├── cloud_api_examples.py │ ├── computer_examples_windows.py │ ├── computer_examples.py │ ├── computer_ui_examples.py │ ├── computer-example-ts │ │ ├── .env.example │ │ ├── .gitignore │ │ ├── .prettierrc │ │ ├── package-lock.json │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── README.md │ │ ├── src │ │ │ ├── helpers.ts │ │ │ └── index.ts │ │ └── tsconfig.json │ ├── docker_examples.py │ ├── evals │ │ ├── hud_eval_examples.py │ │ └── wikipedia_most_linked.txt │ ├── pylume_examples.py │ ├── sandboxed_functions_examples.py │ ├── som_examples.py │ ├── utils.py │ └── winsandbox_example.py ├── img │ ├── agent_gradio_ui.png │ ├── agent.png │ ├── cli.png │ ├── computer.png │ ├── logo_black.png │ └── logo_white.png ├── libs │ ├── kasm │ │ ├── Dockerfile │ │ ├── LICENSE │ │ ├── README.md │ │ └── src │ │ └── ubuntu │ │ └── install │ │ └── firefox │ │ ├── custom_startup.sh │ │ ├── firefox.desktop │ │ └── install_firefox.sh │ ├── lume │ │ ├── .cursorignore │ │ ├── CONTRIBUTING.md │ │ ├── Development.md │ │ ├── img │ │ │ └── cli.png │ │ ├── Package.resolved │ │ ├── Package.swift │ │ ├── README.md │ │ ├── resources │ │ │ └── lume.entitlements │ │ ├── scripts │ │ │ ├── build │ │ │ │ ├── build-debug.sh │ │ │ │ ├── build-release-notarized.sh │ │ │ │ └── build-release.sh │ │ │ └── install.sh │ │ ├── src │ │ │ ├── Commands │ │ │ │ ├── Clone.swift │ │ │ │ ├── Config.swift │ │ │ │ ├── Create.swift │ │ │ │ ├── Delete.swift │ │ │ │ ├── Get.swift │ │ │ │ ├── Images.swift │ │ │ │ ├── IPSW.swift │ │ │ │ ├── List.swift │ │ │ │ ├── Logs.swift │ │ │ │ ├── Options │ │ │ │ │ └── FormatOption.swift │ │ │ │ ├── Prune.swift │ │ │ │ ├── Pull.swift │ │ │ │ ├── Push.swift │ │ │ │ ├── Run.swift │ │ │ │ ├── Serve.swift │ │ │ │ ├── Set.swift │ │ │ │ └── Stop.swift │ │ │ ├── ContainerRegistry │ │ │ │ ├── ImageContainerRegistry.swift │ │ │ │ ├── ImageList.swift │ │ │ │ └── ImagesPrinter.swift │ │ │ ├── Errors │ │ │ │ └── Errors.swift │ │ │ ├── FileSystem │ │ │ │ ├── Home.swift │ │ │ │ ├── Settings.swift │ │ │ │ ├── VMConfig.swift │ │ │ │ ├── VMDirectory.swift │ │ │ │ └── VMLocation.swift │ │ │ ├── LumeController.swift │ │ │ ├── Main.swift │ │ │ ├── Server │ │ │ │ ├── Handlers.swift │ │ │ │ ├── HTTP.swift │ │ │ │ ├── Requests.swift │ │ │ │ ├── Responses.swift │ │ │ │ └── Server.swift │ │ │ ├── Utils │ │ │ │ ├── CommandRegistry.swift │ │ │ │ ├── CommandUtils.swift │ │ │ │ ├── Logger.swift │ │ │ │ ├── NetworkUtils.swift │ │ │ │ ├── Path.swift │ │ │ │ ├── ProcessRunner.swift │ │ │ │ ├── ProgressLogger.swift │ │ │ │ ├── String.swift │ │ │ │ └── Utils.swift │ │ │ ├── Virtualization │ │ │ │ ├── DarwinImageLoader.swift │ │ │ │ ├── DHCPLeaseParser.swift │ │ │ │ ├── ImageLoaderFactory.swift │ │ │ │ └── VMVirtualizationService.swift │ │ │ ├── VM │ │ │ │ ├── DarwinVM.swift │ │ │ │ ├── LinuxVM.swift │ │ │ │ ├── VM.swift │ │ │ │ ├── VMDetails.swift │ │ │ │ ├── VMDetailsPrinter.swift │ │ │ │ ├── VMDisplayResolution.swift │ │ │ │ └── VMFactory.swift │ │ │ └── VNC │ │ │ ├── PassphraseGenerator.swift │ │ │ └── VNCService.swift │ │ └── tests │ │ ├── Mocks │ │ │ ├── MockVM.swift │ │ │ ├── MockVMVirtualizationService.swift │ │ │ └── MockVNCService.swift │ │ ├── VM │ │ │ └── VMDetailsPrinterTests.swift │ │ ├── VMTests.swift │ │ ├── VMVirtualizationServiceTests.swift │ │ └── VNCServiceTests.swift │ ├── lumier │ │ ├── .dockerignore │ │ ├── Dockerfile │ │ ├── README.md │ │ └── src │ │ ├── bin │ │ │ └── entry.sh │ │ ├── config │ │ │ └── constants.sh │ │ ├── hooks │ │ │ └── on-logon.sh │ │ └── lib │ │ ├── utils.sh │ │ └── vm.sh │ ├── python │ │ ├── agent │ │ │ ├── .bumpversion.cfg │ │ │ ├── agent │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── adapters │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── huggingfacelocal_adapter.py │ │ │ │ │ ├── human_adapter.py │ │ │ │ │ ├── mlxvlm_adapter.py │ │ │ │ │ └── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── qwen2_5_vl.py │ │ │ │ ├── agent.py │ │ │ │ ├── callbacks │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── budget_manager.py │ │ │ │ │ ├── image_retention.py │ │ │ │ │ ├── logging.py │ │ │ │ │ ├── operator_validator.py │ │ │ │ │ ├── pii_anonymization.py │ │ │ │ │ ├── prompt_instructions.py │ │ │ │ │ ├── telemetry.py │ │ │ │ │ └── trajectory_saver.py │ │ │ │ ├── cli.py │ │ │ │ ├── computers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cua.py │ │ │ │ │ └── custom.py │ │ │ │ ├── decorators.py │ │ │ │ ├── human_tool │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ ├── server.py │ │ │ │ │ └── ui.py │ │ │ │ ├── integrations │ │ │ │ │ └── hud │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── agent.py │ │ │ │ │ └── proxy.py │ │ │ │ ├── loops │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── anthropic.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── composed_grounded.py │ │ │ │ │ ├── gemini.py │ │ │ │ │ ├── glm45v.py │ │ │ │ │ ├── gta1.py │ │ │ │ │ ├── holo.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── model_types.csv │ │ │ │ │ ├── moondream3.py │ │ │ │ │ ├── omniparser.py │ │ │ │ │ ├── openai.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── uitars.py │ │ │ │ ├── proxy │ │ │ │ │ ├── examples.py │ │ │ │ │ └── handlers.py │ │ │ │ ├── responses.py │ │ │ │ ├── types.py │ │ │ │ └── ui │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ └── gradio │ │ │ │ ├── __init__.py │ │ │ │ ├── app.py │ │ │ │ └── ui_components.py │ │ │ ├── benchmarks │ │ │ │ ├── .gitignore │ │ │ │ ├── contrib.md │ │ │ │ ├── interactive.py │ │ │ │ ├── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ └── gta1.py │ │ │ │ ├── README.md │ │ │ │ ├── ss-pro.py │ │ │ │ ├── ss-v2.py │ │ │ │ └── utils.py │ │ │ ├── example.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer │ │ │ │ ├── __init__.py │ │ │ │ ├── computer.py │ │ │ │ ├── diorama_computer.py │ │ │ │ ├── helpers.py │ │ │ │ ├── interface │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ ├── models.py │ │ │ │ │ └── windows.py │ │ │ │ ├── logger.py │ │ │ │ ├── models.py │ │ │ │ ├── providers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cloud │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── docker │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── lume │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── lume_api.py │ │ │ │ │ ├── lumier │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── types.py │ │ │ │ │ └── winsandbox │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── provider.py │ │ │ │ │ └── setup_script.ps1 │ │ │ │ ├── ui │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ └── gradio │ │ │ │ │ ├── __init__.py │ │ │ │ │ └── app.py │ │ │ │ └── utils.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── cli.py │ │ │ │ ├── diorama │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── diorama_computer.py │ │ │ │ │ ├── diorama.py │ │ │ │ │ ├── draw.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── safezone.py │ │ │ │ ├── handlers │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── windows.py │ │ │ │ ├── main.py │ │ │ │ ├── server.py │ │ │ │ └── watchdog.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── usage_example.py │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ ├── run_server.py │ │ │ └── test_connection.py │ │ ├── core │ │ │ ├── .bumpversion.cfg │ │ │ ├── core │ │ │ │ ├── __init__.py │ │ │ │ └── telemetry │ │ │ │ ├── __init__.py │ │ │ │ └── posthog.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── mcp-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── CONCURRENT_SESSIONS.md │ │ │ ├── mcp_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── server.py │ │ │ │ └── session_manager.py │ │ │ ├── pdm.lock │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ └── scripts │ │ │ ├── install_mcp_server.sh │ │ │ └── start_mcp_server.sh │ │ ├── pylume │ │ │ ├── __init__.py │ │ │ ├── .bumpversion.cfg │ │ │ ├── pylume │ │ │ │ ├── __init__.py │ │ │ │ ├── client.py │ │ │ │ ├── exceptions.py │ │ │ │ ├── lume │ │ │ │ ├── models.py │ │ │ │ ├── pylume.py │ │ │ │ └── server.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ └── som │ │ ├── .bumpversion.cfg │ │ ├── LICENSE │ │ ├── poetry.toml │ │ ├── pyproject.toml │ │ ├── README.md │ │ ├── som │ │ │ ├── __init__.py │ │ │ ├── detect.py │ │ │ ├── detection.py │ │ │ ├── models.py │ │ │ ├── ocr.py │ │ │ ├── util │ │ │ │ └── utils.py │ │ │ └── visualization.py │ │ └── tests │ │ └── test_omniparser.py │ ├── typescript │ │ ├── .gitignore │ │ ├── .nvmrc │ │ ├── agent │ │ │ ├── examples │ │ │ │ ├── playground-example.html │ │ │ │ └── README.md │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── client.ts │ │ │ │ ├── index.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ └── client.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── biome.json │ │ ├── computer │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── computer │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── providers │ │ │ │ │ │ ├── base.ts │ │ │ │ │ │ ├── cloud.ts │ │ │ │ │ │ └── index.ts │ │ │ │ │ └── types.ts │ │ │ │ ├── index.ts │ │ │ │ ├── interface │ │ │ │ │ ├── base.ts │ │ │ │ │ ├── factory.ts │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── linux.ts │ │ │ │ │ ├── macos.ts │ │ │ │ │ └── windows.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ ├── computer │ │ │ │ │ └── cloud.test.ts │ │ │ │ ├── interface │ │ │ │ │ ├── factory.test.ts │ │ │ │ │ ├── index.test.ts │ │ │ │ │ ├── linux.test.ts │ │ │ │ │ ├── macos.test.ts │ │ │ │ │ └── windows.test.ts │ │ │ │ └── setup.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── core │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── index.ts │ │ │ │ └── telemetry │ │ │ │ ├── clients │ │ │ │ │ ├── index.ts │ │ │ │ │ └── posthog.ts │ │ │ │ └── index.ts │ │ │ ├── tests │ │ │ │ └── telemetry.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── pnpm-workspace.yaml │ │ └── README.md │ └── xfce │ ├── .dockerignore │ ├── .gitignore │ ├── Dockerfile │ ├── README.md │ └── src │ ├── scripts │ │ ├── resize-display.sh │ │ ├── start-computer-server.sh │ │ ├── start-novnc.sh │ │ ├── start-vnc.sh │ │ └── xstartup.sh │ ├── supervisor │ │ └── supervisord.conf │ └── xfce-config │ ├── helpers.rc │ ├── xfce4-power-manager.xml │ └── xfce4-session.xml ├── LICENSE.md ├── Makefile ├── notebooks │ ├── agent_nb.ipynb │ ├── blog │ │ ├── build-your-own-operator-on-macos-1.ipynb │ │ └── build-your-own-operator-on-macos-2.ipynb │ ├── composite_agents_docker_nb.ipynb │ ├── computer_nb.ipynb │ ├── computer_server_nb.ipynb │ ├── customizing_computeragent.ipynb │ ├── eval_osworld.ipynb │ ├── ollama_nb.ipynb │ ├── pylume_nb.ipynb │ ├── README.md │ ├── sota_hackathon_cloud.ipynb │ └── sota_hackathon.ipynb ├── pdm.lock ├── pyproject.toml ├── pyrightconfig.json ├── README.md ├── samples │ └── community │ ├── global-online │ │ └── README.md │ └── hack-the-north │ └── README.md ├── scripts │ ├── build-uv.sh │ ├── build.ps1 │ ├── build.sh │ ├── cleanup.sh │ ├── playground-docker.sh │ ├── playground.sh │ └── run-docker-dev.sh └── tests ├── pytest.ini ├── shell_cmd.py ├── test_files.py ├── test_mcp_server_session_management.py ├── test_mcp_server_streaming.py ├── test_shell_bash.py ├── test_telemetry.py ├── test_venv.py └── test_watchdog.py ``` # Files -------------------------------------------------------------------------------- /libs/lume/src/Commands/Logs.swift: -------------------------------------------------------------------------------- ```swift import ArgumentParser import Foundation struct Logs: ParsableCommand { static let configuration = CommandConfiguration( abstract: "View lume serve logs", subcommands: [Info.self, Error.self, All.self], defaultSubcommand: All.self ) // Common functionality for reading log files static func readLogFile(path: String, lines: Int? = nil, follow: Bool = false) -> String { let fileManager = FileManager.default // Check if file exists guard fileManager.fileExists(atPath: path) else { return "Log file not found at \(path)" } do { // Read file content let content = try String(contentsOfFile: path, encoding: .utf8) // If lines parameter is provided, return only the specified number of lines from the end if let lineCount = lines { let allLines = content.components(separatedBy: .newlines) let startIndex = max(0, allLines.count - lineCount) let lastLines = Array(allLines[startIndex...]) return lastLines.joined(separator: "\n") } return content } catch { return "Error reading log file: \(error.localizedDescription)" } } // Method for tailing a log file (following new changes) static func tailLogFile(path: String, initialLines: Int? = 10) { let fileManager = FileManager.default // Check if file exists guard fileManager.fileExists(atPath: path) else { print("Log file not found at \(path)") return } do { // Get initial content with only the specified number of lines from the end var lastPosition: UInt64 = 0 let fileHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: path)) // First, print the last few lines of the file if let lines = initialLines { let content = try String(contentsOfFile: path, encoding: .utf8) let allLines = content.components(separatedBy: .newlines) let startIndex = max(0, allLines.count - lines) let lastLines = Array(allLines[startIndex...]) print(lastLines.joined(separator: "\n")) } // Get current file size lastPosition = UInt64(try fileManager.attributesOfItem(atPath: path)[.size] as? UInt64 ?? 0) // Set up for continuous monitoring print("\nTailing log file... Press Ctrl+C to stop") // Monitor file for changes while true { // Brief pause to reduce CPU usage Thread.sleep(forTimeInterval: 0.5) // Get current size let currentSize = try fileManager.attributesOfItem(atPath: path)[.size] as? UInt64 ?? 0 // If file has grown if currentSize > lastPosition { // Seek to where we last read fileHandle.seek(toFileOffset: lastPosition) // Read new content if let newData = try? fileHandle.readToEnd() { if let newContent = String(data: newData, encoding: .utf8) { // Print new content without trailing newline if newContent.hasSuffix("\n") { print(newContent, terminator: "") } else { print(newContent) } } } // Update position lastPosition = currentSize } // Handle file rotation (if file became smaller) else if currentSize < lastPosition { // File was probably rotated, start from beginning lastPosition = 0 fileHandle.seek(toFileOffset: 0) if let newData = try? fileHandle.readToEnd() { if let newContent = String(data: newData, encoding: .utf8) { print(newContent, terminator: "") } } lastPosition = currentSize } } } catch { print("Error tailing log file: \(error.localizedDescription)") } } // MARK: - Info Logs Subcommand struct Info: ParsableCommand { static let configuration = CommandConfiguration( commandName: "info", abstract: "View info logs from the daemon" ) @Option(name: .shortAndLong, help: "Number of lines to display from the end of the file") var lines: Int? @Flag(name: .shortAndLong, help: "Follow log file continuously (like tail -f)") var follow: Bool = false func run() throws { let logPath = "/tmp/lume_daemon.log" print("=== Info Logs ===") if follow { // Use tailing functionality to continuously monitor the log Logs.tailLogFile(path: logPath, initialLines: lines ?? 10) } else { // Regular one-time viewing of logs let content = Logs.readLogFile(path: logPath, lines: lines) print(content) } } } // MARK: - Error Logs Subcommand struct Error: ParsableCommand { static let configuration = CommandConfiguration( commandName: "error", abstract: "View error logs from the daemon" ) @Option(name: .shortAndLong, help: "Number of lines to display from the end of the file") var lines: Int? @Flag(name: .shortAndLong, help: "Follow log file continuously (like tail -f)") var follow: Bool = false func run() throws { let logPath = "/tmp/lume_daemon.error.log" print("=== Error Logs ===") if follow { // Use tailing functionality to continuously monitor the log Logs.tailLogFile(path: logPath, initialLines: lines ?? 10) } else { // Regular one-time viewing of logs let content = Logs.readLogFile(path: logPath, lines: lines) print(content) } } } // MARK: - All Logs Subcommand struct All: ParsableCommand { static let configuration = CommandConfiguration( commandName: "all", abstract: "View both info and error logs from the daemon" ) @Option(name: .shortAndLong, help: "Number of lines to display from the end of each file") var lines: Int? @Flag(name: .shortAndLong, help: "Follow log files continuously (like tail -f)") var follow: Bool = false // Custom implementation to tail both logs simultaneously private func tailBothLogs(infoPath: String, errorPath: String, initialLines: Int? = 10) { let fileManager = FileManager.default var infoExists = fileManager.fileExists(atPath: infoPath) var errorExists = fileManager.fileExists(atPath: errorPath) if !infoExists && !errorExists { print("Neither info nor error log files found") return } // Print initial content print("=== Info Logs ===") if infoExists { if let lines = initialLines { let content = (try? String(contentsOfFile: infoPath, encoding: .utf8)) ?? "" let allLines = content.components(separatedBy: .newlines) let startIndex = max(0, allLines.count - lines) let lastLines = Array(allLines[startIndex...]) print(lastLines.joined(separator: "\n")) } } else { print("Info log file not found") } print("\n=== Error Logs ===") if errorExists { if let lines = initialLines { let content = (try? String(contentsOfFile: errorPath, encoding: .utf8)) ?? "" let allLines = content.components(separatedBy: .newlines) let startIndex = max(0, allLines.count - lines) let lastLines = Array(allLines[startIndex...]) print(lastLines.joined(separator: "\n")) } } else { print("Error log file not found") } print("\nTailing both log files... Press Ctrl+C to stop") // Initialize file handles and positions var infoHandle: FileHandle? = nil var errorHandle: FileHandle? = nil var infoPosition: UInt64 = 0 var errorPosition: UInt64 = 0 // Set up file handles if infoExists { do { infoHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: infoPath)) infoPosition = UInt64(try fileManager.attributesOfItem(atPath: infoPath)[.size] as? UInt64 ?? 0) } catch { print("Error opening info log file: \(error.localizedDescription)") } } if errorExists { do { errorHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: errorPath)) errorPosition = UInt64(try fileManager.attributesOfItem(atPath: errorPath)[.size] as? UInt64 ?? 0) } catch { print("Error opening error log file: \(error.localizedDescription)") } } // Monitor both files for changes while true { Thread.sleep(forTimeInterval: 0.5) // Check for new content in info log if let handle = infoHandle { do { // Re-check existence in case file was deleted infoExists = fileManager.fileExists(atPath: infoPath) if !infoExists { print("\n[Info log file was removed]") infoHandle = nil continue } let currentSize = try fileManager.attributesOfItem(atPath: infoPath)[.size] as? UInt64 ?? 0 if currentSize > infoPosition { handle.seek(toFileOffset: infoPosition) if let newData = try? handle.readToEnd() { if let newContent = String(data: newData, encoding: .utf8) { print("\n--- New Info Log Content ---") if newContent.hasSuffix("\n") { print(newContent, terminator: "") } else { print(newContent) } } } infoPosition = currentSize } else if currentSize < infoPosition { // File was rotated print("\n[Info log was rotated]") infoPosition = 0 handle.seek(toFileOffset: 0) if let newData = try? handle.readToEnd() { if let newContent = String(data: newData, encoding: .utf8) { print("\n--- New Info Log Content ---") print(newContent, terminator: "") } } infoPosition = currentSize } } catch { print("\nError reading info log: \(error.localizedDescription)") } } else if fileManager.fileExists(atPath: infoPath) && !infoExists { // File exists again after being deleted do { infoHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: infoPath)) infoPosition = 0 infoExists = true print("\n[Info log file reappeared]") } catch { print("\nError reopening info log: \(error.localizedDescription)") } } // Check for new content in error log if let handle = errorHandle { do { // Re-check existence in case file was deleted errorExists = fileManager.fileExists(atPath: errorPath) if !errorExists { print("\n[Error log file was removed]") errorHandle = nil continue } let currentSize = try fileManager.attributesOfItem(atPath: errorPath)[.size] as? UInt64 ?? 0 if currentSize > errorPosition { handle.seek(toFileOffset: errorPosition) if let newData = try? handle.readToEnd() { if let newContent = String(data: newData, encoding: .utf8) { print("\n--- New Error Log Content ---") if newContent.hasSuffix("\n") { print(newContent, terminator: "") } else { print(newContent) } } } errorPosition = currentSize } else if currentSize < errorPosition { // File was rotated print("\n[Error log was rotated]") errorPosition = 0 handle.seek(toFileOffset: 0) if let newData = try? handle.readToEnd() { if let newContent = String(data: newData, encoding: .utf8) { print("\n--- New Error Log Content ---") print(newContent, terminator: "") } } errorPosition = currentSize } } catch { print("\nError reading error log: \(error.localizedDescription)") } } else if fileManager.fileExists(atPath: errorPath) && !errorExists { // File exists again after being deleted do { errorHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: errorPath)) errorPosition = 0 errorExists = true print("\n[Error log file reappeared]") } catch { print("\nError reopening error log: \(error.localizedDescription)") } } } } func run() throws { let infoLogPath = "/tmp/lume_daemon.log" let errorLogPath = "/tmp/lume_daemon.error.log" if follow { // Use custom tailing implementation for both logs tailBothLogs(infoPath: infoLogPath, errorPath: errorLogPath, initialLines: lines ?? 10) } else { // Regular one-time viewing of logs let infoContent = Logs.readLogFile(path: infoLogPath, lines: lines) let errorContent = Logs.readLogFile(path: errorLogPath, lines: lines) print("=== Info Logs ===") print(infoContent) print("\n=== Error Logs ===") print(errorContent) } } } } ``` -------------------------------------------------------------------------------- /examples/som_examples.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Example script demonstrating the usage of OmniParser's UI element detection functionality. This script shows how to: 1. Initialize the OmniParser 2. Load and process images 3. Visualize detection results 4. Compare performance between CPU and MPS (Apple Silicon) """ import argparse import logging import sys from pathlib import Path import time from PIL import Image from typing import Dict, Any, List, Optional import numpy as np import io import base64 import glob import os # Load environment variables from .env file project_root = Path(__file__).parent.parent env_file = project_root / ".env" print(f"Loading environment from: {env_file}") from dotenv import load_dotenv load_dotenv(env_file) # Add paths to sys.path if needed pythonpath = os.environ.get("PYTHONPATH", "") for path in pythonpath.split(":"): if path and path not in sys.path: sys.path.append(path) print(f"Added to sys.path: {path}") # Add the libs directory to the path to find som libs_path = project_root / "libs" if str(libs_path) not in sys.path: sys.path.append(str(libs_path)) print(f"Added to sys.path: {libs_path}") from som import OmniParser, ParseResult, IconElement, TextElement from som.models import UIElement, ParserMetadata, BoundingBox # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) def setup_logging(): """Configure logging with a nice format.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) class Timer: """Enhanced context manager for timing code blocks.""" def __init__(self, name: str, logger): self.name = name self.logger = logger self.start_time: float = 0.0 self.elapsed_time: float = 0.0 def __enter__(self): self.start_time = time.time() return self def __exit__(self, *args): self.elapsed_time = time.time() - self.start_time self.logger.info(f"{self.name}: {self.elapsed_time:.3f}s") return False def image_to_bytes(image: Image.Image) -> bytes: """Convert PIL Image to PNG bytes.""" buf = io.BytesIO() image.save(buf, format="PNG") return buf.getvalue() def process_image( parser: OmniParser, image_path: str, output_dir: Path, use_ocr: bool = False ) -> None: """Process a single image and save the result.""" try: # Load image logger.info(f"Processing image: {image_path}") image = Image.open(image_path).convert("RGB") logger.info(f"Image loaded successfully, size: {image.size}") # Create output filename input_filename = Path(image_path).stem output_path = output_dir / f"{input_filename}_analyzed.png" # Convert image to PNG bytes image_bytes = image_to_bytes(image) # Process image with Timer(f"Processing {input_filename}", logger): result = parser.parse(image_bytes, use_ocr=use_ocr) logger.info( f"Found {result.metadata.num_icons} icons and {result.metadata.num_text} text elements" ) # Save the annotated image logger.info(f"Saving annotated image to: {output_path}") try: # Save image from base64 img_data = base64.b64decode(result.annotated_image_base64) img = Image.open(io.BytesIO(img_data)) img.save(output_path) # Print detailed results logger.info("\nDetected Elements:") for elem in result.elements: if isinstance(elem, IconElement): logger.info( f"Icon: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}" ) elif isinstance(elem, TextElement): logger.info( f"Text: '{elem.content}', confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}" ) # Verify file exists and log size if output_path.exists(): logger.info( f"Successfully saved image. File size: {output_path.stat().st_size} bytes" ) else: logger.error(f"Failed to verify file at {output_path}") except Exception as e: logger.error(f"Error saving image: {str(e)}", exc_info=True) except Exception as e: logger.error(f"Error processing image {image_path}: {str(e)}", exc_info=True) def run_detection_benchmark( input_path: str, output_dir: Path, use_ocr: bool = False, box_threshold: float = 0.01, iou_threshold: float = 0.1, ): """Run detection benchmark on images.""" logger.info( f"Starting benchmark with OCR enabled: {use_ocr}, box_threshold: {box_threshold}, iou_threshold: {iou_threshold}" ) try: # Initialize parser logger.info("Initializing OmniParser...") parser = OmniParser() # Create output directory output_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Output directory created at: {output_dir}") # Get list of PNG files if os.path.isdir(input_path): image_files = glob.glob(os.path.join(input_path, "*.png")) else: image_files = [input_path] logger.info(f"Found {len(image_files)} images to process") # Process each image with specified thresholds for image_path in image_files: try: # Load image logger.info(f"Processing image: {image_path}") image = Image.open(image_path).convert("RGB") logger.info(f"Image loaded successfully, size: {image.size}") # Create output filename input_filename = Path(image_path).stem output_path = output_dir / f"{input_filename}_analyzed.png" # Convert image to PNG bytes image_bytes = image_to_bytes(image) # Process image with specified thresholds with Timer(f"Processing {input_filename}", logger): result = parser.parse( image_bytes, use_ocr=use_ocr, box_threshold=box_threshold, iou_threshold=iou_threshold, ) logger.info( f"Found {result.metadata.num_icons} icons and {result.metadata.num_text} text elements" ) # Save the annotated image logger.info(f"Saving annotated image to: {output_path}") try: # Save image from base64 img_data = base64.b64decode(result.annotated_image_base64) img = Image.open(io.BytesIO(img_data)) img.save(output_path) # Print detailed results logger.info("\nDetected Elements:") for elem in result.elements: if isinstance(elem, IconElement): logger.info( f"Icon: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}" ) elif isinstance(elem, TextElement): logger.info( f"Text: '{elem.content}', confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}" ) # Verify file exists and log size if output_path.exists(): logger.info( f"Successfully saved image. File size: {output_path.stat().st_size} bytes" ) else: logger.error(f"Failed to verify file at {output_path}") except Exception as e: logger.error(f"Error saving image: {str(e)}", exc_info=True) except Exception as e: logger.error(f"Error processing image {image_path}: {str(e)}", exc_info=True) except Exception as e: logger.error(f"Benchmark failed: {str(e)}", exc_info=True) raise def run_experiments(input_path: str, output_dir: Path, use_ocr: bool = False): """Run experiments with different threshold combinations.""" # Define threshold values to test box_thresholds = [0.01, 0.05, 0.1, 0.3] iou_thresholds = [0.05, 0.1, 0.2, 0.5] logger.info("Starting threshold experiments...") logger.info("Box thresholds to test: %s", box_thresholds) logger.info("IOU thresholds to test: %s", iou_thresholds) # Create results directory for this experiment timestamp = time.strftime("%Y%m%d-%H%M%S") ocr_suffix = "_ocr" if use_ocr else "_no_ocr" exp_dir = output_dir / f"experiment_{timestamp}{ocr_suffix}" exp_dir.mkdir(parents=True, exist_ok=True) # Create a summary file summary_file = exp_dir / "results_summary.txt" with open(summary_file, "w") as f: f.write("Threshold Experiments Results\n") f.write("==========================\n\n") f.write(f"Input: {input_path}\n") f.write(f"OCR Enabled: {use_ocr}\n") f.write(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write("Results:\n") f.write("-" * 80 + "\n") f.write( f"{'Box Thresh':^10} | {'IOU Thresh':^10} | {'Num Icons':^10} | {'Num Text':^10} | {'Time (s)':^10}\n" ) f.write("-" * 80 + "\n") # Initialize parser once for all experiments parser = OmniParser() # Run experiments with each combination for box_thresh in box_thresholds: for iou_thresh in iou_thresholds: logger.info(f"\nTesting box_threshold={box_thresh}, iou_threshold={iou_thresh}") # Create directory for this combination combo_dir = exp_dir / f"box_{box_thresh}_iou_{iou_thresh}" combo_dir.mkdir(exist_ok=True) try: # Process each image if os.path.isdir(input_path): image_files = glob.glob(os.path.join(input_path, "*.png")) else: image_files = [input_path] total_icons = 0 total_text = 0 total_time = 0 for image_path in image_files: # Load and process image image = Image.open(image_path).convert("RGB") image_bytes = image_to_bytes(image) # Process with current thresholds with Timer(f"Processing {Path(image_path).stem}", logger) as t: result = parser.parse( image_bytes, use_ocr=use_ocr, box_threshold=box_thresh, iou_threshold=iou_thresh, ) # Save annotated image output_path = combo_dir / f"{Path(image_path).stem}_analyzed.png" img_data = base64.b64decode(result.annotated_image_base64) img = Image.open(io.BytesIO(img_data)) img.save(output_path) # Update totals total_icons += result.metadata.num_icons total_text += result.metadata.num_text # Log detailed results detail_file = combo_dir / f"{Path(image_path).stem}_details.txt" with open(detail_file, "w") as detail_f: detail_f.write(f"Results for {Path(image_path).name}\n") detail_f.write("-" * 40 + "\n") detail_f.write(f"Number of icons: {result.metadata.num_icons}\n") detail_f.write( f"Number of text elements: {result.metadata.num_text}\n\n" ) detail_f.write("Icon Detections:\n") icon_count = 1 text_count = ( result.metadata.num_icons + 1 ) # Text boxes start after icons # First list all icons for elem in result.elements: if isinstance(elem, IconElement): detail_f.write(f"Box #{icon_count}: Icon\n") detail_f.write(f" - Confidence: {elem.confidence:.3f}\n") detail_f.write( f" - Coordinates: {elem.bbox.coordinates}\n" ) icon_count += 1 if use_ocr: detail_f.write("\nText Detections:\n") for elem in result.elements: if isinstance(elem, TextElement): detail_f.write(f"Box #{text_count}: Text\n") detail_f.write(f" - Content: '{elem.content}'\n") detail_f.write( f" - Confidence: {elem.confidence:.3f}\n" ) detail_f.write( f" - Coordinates: {elem.bbox.coordinates}\n" ) text_count += 1 # Update timing totals total_time += t.elapsed_time # Write summary for this combination avg_time = total_time / len(image_files) f.write( f"{box_thresh:^10.3f} | {iou_thresh:^10.3f} | {total_icons:^10d} | {total_text:^10d} | {avg_time:^10.3f}\n" ) except Exception as e: logger.error( f"Error in experiment box={box_thresh}, iou={iou_thresh}: {str(e)}" ) f.write( f"{box_thresh:^10.3f} | {iou_thresh:^10.3f} | {'ERROR':^10s} | {'ERROR':^10s} | {'ERROR':^10s}\n" ) # Write summary footer f.write("-" * 80 + "\n") f.write("\nExperiment completed successfully!\n") logger.info(f"\nExperiment results saved to {exp_dir}") logger.info(f"Summary file: {summary_file}") def main(): """Main entry point.""" parser = argparse.ArgumentParser(description="Run OmniParser benchmark") parser.add_argument("input_path", help="Path to input image or directory containing images") parser.add_argument( "--output-dir", default="examples/output", help="Output directory for annotated images" ) parser.add_argument( "--ocr", choices=["none", "easyocr"], default="none", help="OCR engine to use (default: none)", ) parser.add_argument( "--mode", choices=["single", "experiment"], default="single", help="Run mode: single run or threshold experiments (default: single)", ) parser.add_argument( "--box-threshold", type=float, default=0.01, help="Confidence threshold for detection (default: 0.01)", ) parser.add_argument( "--iou-threshold", type=float, default=0.1, help="IOU threshold for Non-Maximum Suppression (default: 0.1)", ) args = parser.parse_args() logger.info(f"Starting OmniParser with arguments: {args}") use_ocr = args.ocr != "none" output_dir = Path(args.output_dir) try: if args.mode == "experiment": run_experiments(args.input_path, output_dir, use_ocr) else: run_detection_benchmark( args.input_path, output_dir, use_ocr, args.box_threshold, args.iou_threshold ) except Exception as e: logger.error(f"Process failed: {str(e)}", exc_info=True) return 1 return 0 if __name__ == "__main__": sys.exit(main()) ``` -------------------------------------------------------------------------------- /libs/python/som/som/detect.py: -------------------------------------------------------------------------------- ```python from pathlib import Path from typing import Union, List, Dict, Any, Tuple, Optional, cast import logging import torch import torchvision.ops import cv2 import numpy as np import time import torchvision.transforms as T from PIL import Image import io import base64 import argparse import signal from contextlib import contextmanager from ultralytics import YOLO from huggingface_hub import hf_hub_download import supervision as sv from supervision.detection.core import Detections from .detection import DetectionProcessor from .ocr import OCRProcessor from .visualization import BoxAnnotator from .models import BoundingBox, UIElement, IconElement, TextElement, ParserMetadata, ParseResult logger = logging.getLogger(__name__) class TimeoutException(Exception): pass @contextmanager def timeout(seconds: int): def timeout_handler(signum, frame): raise TimeoutException("OCR process timed out") # Register the signal handler original_handler = signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0) signal.signal(signal.SIGALRM, original_handler) def process_text_box(box, image): """Process a single text box with OCR.""" try: import easyocr from typing import List, Tuple, Any, Sequence x1 = int(min(point[0] for point in box)) y1 = int(min(point[1] for point in box)) x2 = int(max(point[0] for point in box)) y2 = int(max(point[1] for point in box)) # Add padding pad = 2 x1 = max(0, x1 - pad) y1 = max(0, y1 - pad) x2 = min(image.shape[1], x2 + pad) y2 = min(image.shape[0], y2 + pad) region = image[y1:y2, x1:x2] if region.size > 0: reader = easyocr.Reader(["en"]) results = reader.readtext(region) if results and len(results) > 0: # EasyOCR returns a list of tuples (bbox, text, confidence) first_result = results[0] if isinstance(first_result, (list, tuple)) and len(first_result) >= 3: text = str(first_result[1]) confidence = float(first_result[2]) if confidence > 0.5: return text, [x1, y1, x2, y2], confidence except Exception: pass return None def check_ocr_box(image_path: Union[str, Path]) -> Tuple[List[str], List[List[float]]]: """Check OCR box using EasyOCR.""" # Read image once if isinstance(image_path, str): image_path = Path(image_path) # Read image into memory image_cv = cv2.imread(str(image_path)) if image_cv is None: logger.error(f"Failed to read image: {image_path}") return [], [] # Get image dimensions img_height, img_width = image_cv.shape[:2] confidence_threshold = 0.5 # Use EasyOCR import ssl import easyocr # Create unverified SSL context for development ssl._create_default_https_context = ssl._create_unverified_context try: reader = easyocr.Reader(["en"]) with timeout(5): # 5 second timeout for EasyOCR results = reader.readtext(image_cv, paragraph=False, text_threshold=0.5) except TimeoutException: logger.warning("EasyOCR timed out, returning no results") return [], [] except Exception as e: logger.warning(f"EasyOCR failed: {str(e)}") return [], [] finally: # Restore default SSL context ssl._create_default_https_context = ssl.create_default_context texts = [] boxes = [] for box, text, conf in results: # Convert box format to [x1, y1, x2, y2] x1 = min(point[0] for point in box) y1 = min(point[1] for point in box) x2 = max(point[0] for point in box) y2 = max(point[1] for point in box) if float(conf) > 0.5: # Only keep higher confidence detections texts.append(text) boxes.append([x1, y1, x2, y2]) return texts, boxes class OmniParser: """Enhanced UI parser using computer vision and OCR for detecting interactive elements.""" def __init__( self, model_path: Optional[Union[str, Path]] = None, cache_dir: Optional[Union[str, Path]] = None, force_device: Optional[str] = None, ): """Initialize the OmniParser. Args: model_path: Optional path to the YOLO model cache_dir: Optional directory to cache model files force_device: Force specific device (cpu/cuda/mps) """ self.detector = DetectionProcessor( model_path=Path(model_path) if model_path else None, cache_dir=Path(cache_dir) if cache_dir else None, force_device=force_device, ) self.ocr = OCRProcessor() self.visualizer = BoxAnnotator() def process_image( self, image: Image.Image, box_threshold: float = 0.3, iou_threshold: float = 0.1, use_ocr: bool = True, ) -> Tuple[Image.Image, List[UIElement]]: """Process an image to detect UI elements and optionally text. Args: image: Input PIL Image box_threshold: Confidence threshold for detection iou_threshold: IOU threshold for NMS use_ocr: Whether to enable OCR processing Returns: Tuple of (annotated image, list of detections) """ try: logger.info("Starting UI element detection...") # Detect icons icon_detections = self.detector.detect_icons( image=image, box_threshold=box_threshold, iou_threshold=iou_threshold ) logger.info(f"Found {len(icon_detections)} interactive elements") # Convert icon detections to typed objects elements: List[UIElement] = cast( List[UIElement], [ IconElement( id=i + 1, bbox=BoundingBox( x1=det["bbox"][0], y1=det["bbox"][1], x2=det["bbox"][2], y2=det["bbox"][3], ), confidence=det["confidence"], scale=det.get("scale"), ) for i, det in enumerate(icon_detections) ], ) # Run OCR if enabled if use_ocr: logger.info("Running OCR detection...") text_detections = self.ocr.detect_text(image=image, confidence_threshold=0.5) if text_detections is None: text_detections = [] logger.info(f"Found {len(text_detections)} text regions") # Convert text detections to typed objects text_elements = cast( List[UIElement], [ TextElement( id=len(elements) + i + 1, bbox=BoundingBox( x1=det["bbox"][0], y1=det["bbox"][1], x2=det["bbox"][2], y2=det["bbox"][3], ), content=det["content"], confidence=det["confidence"], ) for i, det in enumerate(text_detections) ], ) if elements and text_elements: # Filter out non-OCR elements that have OCR elements with center points colliding with them filtered_elements = [] for elem in elements: # elements at this point contains only non-OCR elements should_keep = True for text_elem in text_elements: # Calculate center point of the text element center_x = (text_elem.bbox.x1 + text_elem.bbox.x2) / 2 center_y = (text_elem.bbox.y1 + text_elem.bbox.y2) / 2 # Check if this center point is inside the non-OCR element if (center_x >= elem.bbox.x1 and center_x <= elem.bbox.x2 and center_y >= elem.bbox.y1 and center_y <= elem.bbox.y2): should_keep = False break if should_keep: filtered_elements.append(elem) elements = filtered_elements # Merge detections using NMS all_elements = elements + text_elements boxes = torch.tensor([elem.bbox.coordinates for elem in all_elements]) scores = torch.tensor([elem.confidence for elem in all_elements]) keep_indices = torchvision.ops.nms(boxes, scores, iou_threshold) elements = [all_elements[i] for i in keep_indices] else: # Just add text elements to the list if IOU doesn't need to be applied elements.extend(text_elements) # Calculate drawing parameters based on image size box_overlay_ratio = max(image.size) / 3200 draw_config = { "font_size": int(12 * box_overlay_ratio), "box_thickness": max(int(2 * box_overlay_ratio), 1), "text_padding": max(int(3 * box_overlay_ratio), 1), } # Convert elements back to dict format for visualization detection_dicts = [ { "type": elem.type, "bbox": elem.bbox.coordinates, "confidence": elem.confidence, "content": elem.content if isinstance(elem, TextElement) else None, } for elem in elements ] # Create visualization logger.info("Creating visualization...") annotated_image = self.visualizer.draw_boxes( image=image.copy(), detections=detection_dicts, draw_config=draw_config ) logger.info("Visualization complete") return annotated_image, elements except Exception as e: logger.error(f"Error in process_image: {str(e)}") import traceback logger.error(traceback.format_exc()) raise def parse( self, screenshot_data: Union[bytes, str], box_threshold: float = 0.3, iou_threshold: float = 0.1, use_ocr: bool = True, ) -> ParseResult: """Parse a UI screenshot to detect interactive elements and text. Args: screenshot_data: Raw bytes or base64 string of the screenshot box_threshold: Confidence threshold for detection iou_threshold: IOU threshold for NMS use_ocr: Whether to enable OCR processing Returns: ParseResult object containing elements, annotated image, and metadata """ try: start_time = time.time() # Convert input to PIL Image if isinstance(screenshot_data, str): screenshot_data = base64.b64decode(screenshot_data) image = Image.open(io.BytesIO(screenshot_data)).convert("RGB") # Process image annotated_image, elements = self.process_image( image=image, box_threshold=box_threshold, iou_threshold=iou_threshold, use_ocr=use_ocr, ) # Convert annotated image to base64 buffered = io.BytesIO() annotated_image.save(buffered, format="PNG") annotated_image_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") # Generate screen info text screen_info = [] parsed_content_list = [] # Set element IDs and generate human-readable descriptions for i, elem in enumerate(elements): # Set the ID (1-indexed) elem.id = i + 1 if isinstance(elem, IconElement): screen_info.append( f"Box #{i+1}: Icon (confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates})" ) parsed_content_list.append( { "id": i + 1, "type": "icon", "bbox": elem.bbox.coordinates, "confidence": elem.confidence, "content": None, } ) elif isinstance(elem, TextElement): screen_info.append( f"Box #{i+1}: Text '{elem.content}' (confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates})" ) parsed_content_list.append( { "id": i + 1, "type": "text", "bbox": elem.bbox.coordinates, "confidence": elem.confidence, "content": elem.content, } ) # Calculate metadata latency = time.time() - start_time width, height = image.size # Create ParseResult object with enhanced properties result = ParseResult( elements=elements, annotated_image_base64=annotated_image_base64, screen_info=screen_info, parsed_content_list=parsed_content_list, metadata=ParserMetadata( image_size=(width, height), num_icons=len([e for e in elements if isinstance(e, IconElement)]), num_text=len([e for e in elements if isinstance(e, TextElement)]), device=self.detector.device, ocr_enabled=use_ocr, latency=latency, ), ) # Return the ParseResult object directly return result except Exception as e: logger.error(f"Error in parse: {str(e)}") import traceback logger.error(traceback.format_exc()) raise def main(): """Command line interface for UI element detection.""" parser = argparse.ArgumentParser(description="Detect UI elements and text in images") parser.add_argument("image_path", help="Path to the input image") parser.add_argument("--model-path", help="Path to YOLO model") parser.add_argument( "--box-threshold", type=float, default=0.3, help="Box confidence threshold (default: 0.3)" ) parser.add_argument( "--iou-threshold", type=float, default=0.1, help="IOU threshold (default: 0.1)" ) parser.add_argument( "--ocr", action="store_true", default=True, help="Enable OCR processing (default: True)" ) parser.add_argument("--output", help="Output path for annotated image") args = parser.parse_args() # Setup logging logging.basicConfig(level=logging.INFO) try: # Initialize parser parser = OmniParser(model_path=args.model_path) # Load and process image logger.info(f"Loading image from: {args.image_path}") image = Image.open(args.image_path).convert("RGB") logger.info(f"Image loaded successfully, size: {image.size}") # Process image annotated_image, elements = parser.process_image( image=image, box_threshold=args.box_threshold, iou_threshold=args.iou_threshold, use_ocr=args.ocr, ) # Save output image output_path = args.output or str( Path(args.image_path).parent / f"{Path(args.image_path).stem}_analyzed{Path(args.image_path).suffix}" ) logger.info(f"Saving annotated image to: {output_path}") Path(output_path).parent.mkdir(parents=True, exist_ok=True) annotated_image.save(output_path) logger.info(f"Image saved successfully to {output_path}") # Print detections logger.info("\nDetections:") for i, elem in enumerate(elements): if isinstance(elem, IconElement): logger.info( f"Interactive element {i}: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}" ) elif isinstance(elem, TextElement): logger.info(f"Text {i}: '{elem.content}', bbox={elem.bbox.coordinates}") except Exception as e: logger.error(f"Error processing image: {str(e)}") import traceback logger.error(traceback.format_exc()) return 1 return 0 if __name__ == "__main__": import sys sys.exit(main()) ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/cli.py: -------------------------------------------------------------------------------- ```python """ CLI chat interface for agent - Computer Use Agent Usage: python -m agent.cli <model_string> Examples: python -m agent.cli openai/computer-use-preview python -m agent.cli anthropic/claude-3-5-sonnet-20241022 python -m agent.cli omniparser+anthropic/claude-3-5-sonnet-20241022 """ try: import asyncio import argparse import os import sys import json from typing import List, Dict, Any import dotenv import base64 import time import platform from pathlib import Path try: from PIL import Image, ImageDraw PIL_AVAILABLE = True except Exception: PIL_AVAILABLE = False from yaspin import yaspin except ImportError: if __name__ == "__main__": raise ImportError( "CLI dependencies not found. " "Please install with: pip install \"cua-agent[cli]\"" ) # Load environment variables dotenv.load_dotenv() # Color codes for terminal output class Colors: RESET = '\033[0m' BOLD = '\033[1m' DIM = '\033[2m' # Text colors RED = '\033[31m' GREEN = '\033[32m' YELLOW = '\033[33m' BLUE = '\033[34m' MAGENTA = '\033[35m' CYAN = '\033[36m' WHITE = '\033[37m' GRAY = '\033[90m' # Background colors BG_RED = '\033[41m' BG_GREEN = '\033[42m' BG_YELLOW = '\033[43m' BG_BLUE = '\033[44m' def print_colored(text: str, color: str = "", bold: bool = False, dim: bool = False, end: str = "\n", right: str = ""): """Print colored text to terminal with optional right-aligned text.""" prefix = "" if bold: prefix += Colors.BOLD if dim: prefix += Colors.DIM if color: prefix += color if right: # Get terminal width (default to 80 if unable to determine) try: import shutil terminal_width = shutil.get_terminal_size().columns except: terminal_width = 80 # Add right margin terminal_width -= 1 # Calculate padding needed # Account for ANSI escape codes not taking visual space visible_left_len = len(text) visible_right_len = len(right) padding = terminal_width - visible_left_len - visible_right_len if padding > 0: output = f"{prefix}{text}{' ' * padding}{right}{Colors.RESET}" else: # If not enough space, just put a single space between output = f"{prefix}{text} {right}{Colors.RESET}" else: output = f"{prefix}{text}{Colors.RESET}" print(output, end=end) def print_action(action_type: str, details: Dict[str, Any], total_cost: float): """Print computer action with nice formatting.""" # Format action details args_str = "" if action_type == "click" and "x" in details and "y" in details: args_str = f"_{details.get('button', 'left')}({details['x']}, {details['y']})" elif action_type == "type" and "text" in details: text = details["text"] if len(text) > 50: text = text[:47] + "..." args_str = f'("{text}")' elif action_type == "key" and "text" in details: args_str = f"('{details['text']}')" elif action_type == "scroll" and "x" in details and "y" in details: args_str = f"({details['x']}, {details['y']})" if total_cost > 0: print_colored(f"🛠️ {action_type}{args_str}", dim=True, right=f"💸 ${total_cost:.2f}") else: print_colored(f"🛠️ {action_type}{args_str}", dim=True) def print_welcome(model: str, agent_loop: str, container_name: str): """Print welcome message.""" print_colored(f"Connected to {container_name} ({model}, {agent_loop})") print_colored("Type 'exit' to quit.", dim=True) async def ainput(prompt: str = ""): return await asyncio.to_thread(input, prompt) async def chat_loop(agent, model: str, container_name: str, initial_prompt: str = "", show_usage: bool = True): """Main chat loop with the agent.""" print_welcome(model, agent.agent_config_info.agent_class.__name__, container_name) history = [] if initial_prompt: history.append({"role": "user", "content": initial_prompt}) total_cost = 0 while True: if len(history) == 0 or history[-1].get("role") != "user": # Get user input with prompt print_colored("> ", end="") user_input = await ainput() if user_input.lower() in ['exit', 'quit', 'q']: print_colored("\n👋 Goodbye!") break if not user_input: continue # Add user message to history history.append({"role": "user", "content": user_input}) # Stream responses from the agent with spinner with yaspin(text="Thinking...", spinner="line", attrs=["dark"]) as spinner: spinner.hide() async for result in agent.run(history): # Add agent responses to history history.extend(result.get("output", [])) if show_usage: total_cost += result.get("usage", {}).get("response_cost", 0) # Process and display the output for item in result.get("output", []): if item.get("type") == "message" and item.get("role") == "assistant": # Display agent text response content = item.get("content", []) for content_part in content: if content_part.get("text"): text = content_part.get("text", "").strip() if text: spinner.hide() print_colored(text) elif item.get("type") == "computer_call": # Display computer action action = item.get("action", {}) action_type = action.get("type", "") if action_type: spinner.hide() print_action(action_type, action, total_cost) spinner.text = f"Performing {action_type}..." spinner.show() elif item.get("type") == "function_call": # Display function call function_name = item.get("name", "") spinner.hide() print_colored(f"🔧 Calling function: {function_name}", dim=True) spinner.text = f"Calling {function_name}..." spinner.show() elif item.get("type") == "function_call_output": # Display function output (dimmed) output = item.get("output", "") if output and len(output.strip()) > 0: spinner.hide() print_colored(f"📤 {output}", dim=True) spinner.hide() if show_usage and total_cost > 0: print_colored(f"Total cost: ${total_cost:.2f}", dim=True) async def main(): """Main CLI function.""" parser = argparse.ArgumentParser( description="CUA Agent CLI - Interactive computer use assistant", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python -m agent.cli openai/computer-use-preview python -m agent.cli anthropic/claude-3-5-sonnet-20241022 python -m agent.cli omniparser+anthropic/claude-3-5-sonnet-20241022 python -m agent.cli huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B """ ) parser.add_argument( "model", help="Model string (e.g., 'openai/computer-use-preview', 'anthropic/claude-3-5-sonnet-20241022')" ) parser.add_argument( "--provider", choices=["cloud", "lume", "winsandbox", "docker"], default="cloud", help="Computer provider to use: cloud (default), lume, winsandbox, or docker" ) parser.add_argument( "--images", type=int, default=3, help="Number of recent images to keep in context (default: 3)" ) parser.add_argument( "--trajectory", action="store_true", help="Save trajectory for debugging" ) parser.add_argument( "--budget", type=float, help="Maximum budget for the session (in dollars)" ) parser.add_argument( "--verbose", action="store_true", help="Enable verbose logging" ) parser.add_argument( "-p", "--prompt", type=str, help="Initial prompt to send to the agent. Leave blank for interactive mode." ) parser.add_argument( "--prompt-file", type=Path, help="Path to a UTF-8 text file whose contents will be used as the initial prompt. If provided, overrides --prompt." ) parser.add_argument( "--predict-click", dest="predict_click", type=str, help="Instruction for click prediction. If set, runs predict_click, draws crosshair on a fresh screenshot, saves and opens it." ) parser.add_argument( "-c", "--cache", action="store_true", help="Tell the API to enable caching" ) parser.add_argument( "-u", "--usage", action="store_true", help="Show total cost of the agent runs" ) parser.add_argument( "-r", "--max-retries", type=int, default=3, help="Maximum number of retries for the LLM API calls" ) args = parser.parse_args() # Check for required environment variables container_name = os.getenv("CUA_CONTAINER_NAME") cua_api_key = os.getenv("CUA_API_KEY") # Prompt for missing environment variables (container name always required) if not container_name: if args.provider == "cloud": print_colored("CUA_CONTAINER_NAME not set.", dim=True) print_colored("You can get a CUA container at https://www.trycua.com/", dim=True) container_name = input("Enter your CUA container name: ").strip() if not container_name: print_colored("❌ Container name is required.") sys.exit(1) else: container_name = "cli-sandbox" # Only require API key for cloud provider if args.provider == "cloud" and not cua_api_key: print_colored("CUA_API_KEY not set.", dim=True) cua_api_key = input("Enter your CUA API key: ").strip() if not cua_api_key: print_colored("❌ API key is required for cloud provider.") sys.exit(1) # Check for provider-specific API keys based on model provider_api_keys = { "openai/": "OPENAI_API_KEY", "anthropic/": "ANTHROPIC_API_KEY", } # Find matching provider and check for API key for prefix, env_var in provider_api_keys.items(): if prefix in args.model: if not os.getenv(env_var): print_colored(f"{env_var} not set.", dim=True) api_key = input(f"Enter your {env_var.replace('_', ' ').title()}: ").strip() if not api_key: print_colored(f"❌ {env_var.replace('_', ' ').title()} is required.") sys.exit(1) # Set the environment variable for the session os.environ[env_var] = api_key break # Import here to avoid import errors if dependencies are missing try: from agent import ComputerAgent from computer import Computer except ImportError as e: print_colored(f"❌ Import error: {e}", Colors.RED, bold=True) print_colored("Make sure agent and computer libraries are installed.", Colors.YELLOW) sys.exit(1) # Resolve provider -> os_type, provider_type, api key requirement provider_map = { "cloud": ("linux", "cloud", True), "lume": ("macos", "lume", False), "winsandbox": ("windows", "winsandbox", False), "docker": ("linux", "docker", False), } os_type, provider_type, needs_api_key = provider_map[args.provider] computer_kwargs = { "os_type": os_type, "provider_type": provider_type, "name": container_name, } if needs_api_key: computer_kwargs["api_key"] = cua_api_key # type: ignore # Create computer instance async with Computer(**computer_kwargs) as computer: # type: ignore # Create agent agent_kwargs = { "model": args.model, "tools": [computer], "trust_remote_code": True, # needed for some local models (e.g., InternVL, OpenCUA) "verbosity": 20 if args.verbose else 30, # DEBUG vs WARNING "max_retries": args.max_retries } if args.images > 0: agent_kwargs["only_n_most_recent_images"] = args.images if args.trajectory: agent_kwargs["trajectory_dir"] = "trajectories" if args.budget: agent_kwargs["max_trajectory_budget"] = { "max_budget": args.budget, "raise_error": True, "reset_after_each_run": False } if args.cache: agent_kwargs["use_prompt_caching"] = True agent = ComputerAgent(**agent_kwargs) # If predict-click mode is requested, run once and exit if args.predict_click: if not PIL_AVAILABLE: print_colored("❌ Pillow (PIL) is required for --predict-click visualization. Install with: pip install pillow", Colors.RED, bold=True) sys.exit(1) instruction = args.predict_click print_colored(f"Predicting click for: '{instruction}'", Colors.CYAN) # Take a fresh screenshot FIRST try: img_bytes = await computer.interface.screenshot() except Exception as e: print_colored(f"❌ Failed to take screenshot: {e}", Colors.RED, bold=True) sys.exit(1) # Encode screenshot to base64 for predict_click try: image_b64 = base64.b64encode(img_bytes).decode("utf-8") except Exception as e: print_colored(f"❌ Failed to encode screenshot: {e}", Colors.RED, bold=True) sys.exit(1) try: coords = await agent.predict_click(instruction, image_b64=image_b64) except Exception as e: print_colored(f"❌ predict_click failed: {e}", Colors.RED, bold=True) sys.exit(1) if not coords: print_colored("⚠️ No coordinates returned.", Colors.YELLOW) sys.exit(2) x, y = coords print_colored(f"✅ Predicted coordinates: ({x}, {y})", Colors.GREEN) try: from io import BytesIO with Image.open(BytesIO(img_bytes)) as img: img = img.convert("RGB") draw = ImageDraw.Draw(img) # Draw crosshair size = 12 color = (255, 0, 0) draw.line([(x - size, y), (x + size, y)], fill=color, width=3) draw.line([(x, y - size), (x, y + size)], fill=color, width=3) # Optional small circle r = 6 draw.ellipse([(x - r, y - r), (x + r, y + r)], outline=color, width=2) out_path = Path.cwd() / f"predict_click_{int(time.time())}.png" img.save(out_path) print_colored(f"🖼️ Saved to {out_path}") # Open the image with default viewer try: system = platform.system().lower() if system == "windows": os.startfile(str(out_path)) # type: ignore[attr-defined] elif system == "darwin": os.system(f"open \"{out_path}\"") else: os.system(f"xdg-open \"{out_path}\"") except Exception: pass except Exception as e: print_colored(f"❌ Failed to render/save screenshot: {e}", Colors.RED, bold=True) sys.exit(1) # Done sys.exit(0) # Resolve initial prompt from --prompt-file or --prompt initial_prompt = args.prompt or "" if args.prompt_file: try: initial_prompt = args.prompt_file.read_text(encoding="utf-8") except Exception as e: print_colored(f"❌ Failed to read --prompt-file: {e}", Colors.RED, bold=True) sys.exit(1) # Start chat loop (default interactive mode) await chat_loop(agent, args.model, container_name, initial_prompt, args.usage) if __name__ == "__main__": try: asyncio.run(main()) except (KeyboardInterrupt, EOFError) as _: print_colored("\n\n👋 Goodbye!") ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/loops/moondream3.py: -------------------------------------------------------------------------------- ```python """ Moondream3+ composed-grounded agent loop implementation. Grounding is handled by a local Moondream3 preview model via Transformers. Thinking is delegated to the trailing LLM in the composed model string: "moondream3+<thinking_model>". Differences from composed_grounded: - Provides a singleton Moondream3 client outside the class. - predict_click uses model.point(image, instruction, settings={"max_objects": 1}) and returns pixel coordinates. - If the last image was a screenshot (or we take one), run model.detect(image, "all form ui") to get bboxes, then run model.caption on each cropped bbox to label it. Overlay labels on the screenshot and emit via _on_screenshot. - Add a user message listing all detected form UI names so the thinker can reference them. - If the thinking model doesn't support vision, filter out image content before calling litellm. """ from __future__ import annotations import uuid import base64 import io from typing import Dict, List, Any, Optional, Tuple, Any from PIL import Image, ImageDraw, ImageFont import torch from transformers import AutoModelForCausalLM import litellm from ..decorators import register_agent from ..types import AgentCapability from ..loops.base import AsyncAgentConfig from ..responses import ( convert_computer_calls_xy2desc, convert_responses_items_to_completion_messages, convert_completion_messages_to_responses_items, convert_computer_calls_desc2xy, get_all_element_descriptions, ) _MOONDREAM_SINGLETON = None def get_moondream_model() -> Any: """Get a singleton instance of the Moondream3 preview model.""" global _MOONDREAM_SINGLETON if _MOONDREAM_SINGLETON is None: _MOONDREAM_SINGLETON = AutoModelForCausalLM.from_pretrained( "moondream/moondream3-preview", trust_remote_code=True, torch_dtype=torch.bfloat16, device_map="cuda", ) return _MOONDREAM_SINGLETON def _decode_image_b64(image_b64: str) -> Image.Image: data = base64.b64decode(image_b64) return Image.open(io.BytesIO(data)).convert("RGB") def _image_to_b64(img: Image.Image) -> str: buf = io.BytesIO() img.save(buf, format="PNG") return base64.b64encode(buf.getvalue()).decode("utf-8") def _supports_vision(model: str) -> bool: """Heuristic vision support detection for thinking model.""" m = model.lower() vision_markers = [ "gpt-4o", "gpt-4.1", "o1", "o3", "claude-3", "claude-3.5", "sonnet", "haiku", "opus", "gemini-1.5", "llava", ] return any(v in m for v in vision_markers) def _filter_images_from_completion_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: filtered: List[Dict[str, Any]] = [] for msg in messages: msg_copy = {**msg} content = msg_copy.get("content") if isinstance(content, list): msg_copy["content"] = [c for c in content if c.get("type") != "image_url"] filtered.append(msg_copy) return filtered def _annotate_detect_and_label_ui(base_img: Image.Image, model_md) -> Tuple[str, List[str]]: """Detect UI elements with Moondream, caption each, draw labels with backgrounds. Args: base_img: PIL image of the screenshot (RGB or RGBA). Will be copied/converted internally. model_md: Moondream model instance with .detect() and .query() methods. Returns: A tuple of (annotated_image_base64_png, detected_names) """ # Ensure RGBA for semi-transparent fills if base_img.mode != "RGBA": base_img = base_img.convert("RGBA") W, H = base_img.width, base_img.height # Detect objects try: detect_result = model_md.detect(base_img, "all ui elements") objects = detect_result.get("objects", []) if isinstance(detect_result, dict) else [] except Exception: objects = [] draw = ImageDraw.Draw(base_img) try: font = ImageFont.load_default() except Exception: font = None detected_names: List[str] = [] for i, obj in enumerate(objects): try: # Clamp normalized coords and crop x_min = max(0.0, min(1.0, float(obj.get("x_min", 0.0)))) y_min = max(0.0, min(1.0, float(obj.get("y_min", 0.0)))) x_max = max(0.0, min(1.0, float(obj.get("x_max", 0.0)))) y_max = max(0.0, min(1.0, float(obj.get("y_max", 0.0)))) left, top, right, bottom = int(x_min * W), int(y_min * H), int(x_max * W), int(y_max * H) left, top = max(0, left), max(0, top) right, bottom = min(W - 1, right), min(H - 1, bottom) crop = base_img.crop((left, top, right, bottom)) # Prompted short caption try: result = model_md.query(crop, "Caption this UI element in few words.") caption_text = (result or {}).get("answer", "") except Exception: caption_text = "" name = (caption_text or "").strip() or f"element_{i+1}" detected_names.append(name) # Draw bbox draw.rectangle([left, top, right, bottom], outline=(255, 215, 0, 255), width=2) # Label background with padding and rounded corners label = f"{i+1}. {name}" padding = 3 if font: text_bbox = draw.textbbox((0, 0), label, font=font) else: text_bbox = draw.textbbox((0, 0), label) text_w = text_bbox[2] - text_bbox[0] text_h = text_bbox[3] - text_bbox[1] tx = left + 3 ty = top - (text_h + 2 * padding + 4) if ty < 0: ty = top + 3 bg_left = tx - padding bg_top = ty - padding bg_right = tx + text_w + padding bg_bottom = ty + text_h + padding try: draw.rounded_rectangle( [bg_left, bg_top, bg_right, bg_bottom], radius=4, fill=(0, 0, 0, 160), outline=(255, 215, 0, 200), width=1, ) except Exception: draw.rectangle( [bg_left, bg_top, bg_right, bg_bottom], fill=(0, 0, 0, 160), outline=(255, 215, 0, 200), width=1, ) text_fill = (255, 255, 255, 255) if font: draw.text((tx, ty), label, fill=text_fill, font=font) else: draw.text((tx, ty), label, fill=text_fill) except Exception: continue # Encode PNG base64 annotated = base_img if annotated.mode not in ("RGBA", "RGB"): annotated = annotated.convert("RGBA") annotated_b64 = _image_to_b64(annotated) return annotated_b64, detected_names GROUNDED_COMPUTER_TOOL_SCHEMA = { "type": "function", "function": { "name": "computer", "description": ( "Control a computer by taking screenshots and interacting with UI elements. " "The screenshot action will include a list of detected form UI element names when available. " "Use element descriptions to locate and interact with UI elements on the screen." ), "parameters": { "type": "object", "properties": { "action": { "type": "string", "enum": [ "screenshot", "click", "double_click", "drag", "type", "keypress", "scroll", "move", "wait", "get_current_url", "get_dimensions", "get_environment", ], "description": "The action to perform (required for all actions)", }, "element_description": { "type": "string", "description": "Description of the element to interact with (required for click/double_click/move/scroll)", }, "start_element_description": { "type": "string", "description": "Description of the element to start dragging from (required for drag)", }, "end_element_description": { "type": "string", "description": "Description of the element to drag to (required for drag)", }, "text": { "type": "string", "description": "The text to type (required for type)", }, "keys": { "type": "array", "items": {"type": "string"}, "description": "Key(s) to press (required for keypress)", }, "button": { "type": "string", "enum": ["left", "right", "wheel", "back", "forward"], "description": "The mouse button to use for click/double_click", }, "scroll_x": { "type": "integer", "description": "Horizontal scroll amount (required for scroll)", }, "scroll_y": { "type": "integer", "description": "Vertical scroll amount (required for scroll)", }, }, "required": ["action"], }, }, } @register_agent(r"moondream3\+.*", priority=2) class Moondream3PlusConfig(AsyncAgentConfig): def __init__(self): self.desc2xy: Dict[str, Tuple[float, float]] = {} async def predict_step( self, messages: List[Dict[str, Any]], model: str, tools: Optional[List[Dict[str, Any]]] = None, max_retries: Optional[int] = None, stream: bool = False, computer_handler=None, use_prompt_caching: Optional[bool] = False, _on_api_start=None, _on_api_end=None, _on_usage=None, _on_screenshot=None, **kwargs, ) -> Dict[str, Any]: # Parse composed model: moondream3+<thinking_model> if "+" not in model: raise ValueError(f"Composed model must be 'moondream3+<thinking_model>', got: {model}") _, thinking_model = model.split("+", 1) pre_output_items: List[Dict[str, Any]] = [] # Acquire last screenshot; if missing, take one last_image_b64: Optional[str] = None for message in reversed(messages): if ( isinstance(message, dict) and message.get("type") == "computer_call_output" and isinstance(message.get("output"), dict) and message["output"].get("type") == "input_image" ): image_url = message["output"].get("image_url", "") if image_url.startswith("data:image/png;base64,"): last_image_b64 = image_url.split(",", 1)[1] break if last_image_b64 is None and computer_handler is not None: # Take a screenshot screenshot_b64 = await computer_handler.screenshot() # type: ignore if screenshot_b64: call_id = uuid.uuid4().hex pre_output_items += [ { "type": "message", "role": "assistant", "content": [ {"type": "output_text", "text": "Taking a screenshot to analyze the current screen."} ], }, {"type": "computer_call", "call_id": call_id, "status": "completed", "action": {"type": "screenshot"}}, { "type": "computer_call_output", "call_id": call_id, "output": {"type": "input_image", "image_url": f"data:image/png;base64,{screenshot_b64}"}, }, ] last_image_b64 = screenshot_b64 if _on_screenshot: await _on_screenshot(screenshot_b64) # If we have a last screenshot, run Moondream detection and labeling detected_names: List[str] = [] if last_image_b64 is not None: base_img = _decode_image_b64(last_image_b64) model_md = get_moondream_model() annotated_b64, detected_names = _annotate_detect_and_label_ui(base_img, model_md) if _on_screenshot: await _on_screenshot(annotated_b64, "annotated_form_ui") # Also push a user message listing all detected names if detected_names: names_text = "\n".join(f"- {n}" for n in detected_names) pre_output_items.append( { "type": "message", "role": "user", "content": [ {"type": "input_text", "text": "Detected form UI elements on screen:"}, {"type": "input_text", "text": names_text}, {"type": "input_text", "text": "Please continue with the next action needed to perform your task."} ], } ) tool_schemas = [] for schema in (tools or []): if schema.get("type") == "computer": tool_schemas.append(GROUNDED_COMPUTER_TOOL_SCHEMA) else: tool_schemas.append(schema) # Step 1: Convert computer calls from xy to descriptions input_messages = messages + pre_output_items messages_with_descriptions = convert_computer_calls_xy2desc(input_messages, self.desc2xy) # Step 2: Convert responses items to completion messages completion_messages = convert_responses_items_to_completion_messages( messages_with_descriptions, allow_images_in_tool_results=False, ) # Optionally filter images if model lacks vision if not _supports_vision(thinking_model): completion_messages = _filter_images_from_completion_messages(completion_messages) # Step 3: Call thinking model with litellm.acompletion api_kwargs = { "model": thinking_model, "messages": completion_messages, "tools": tool_schemas, "max_retries": max_retries, "stream": stream, **kwargs, } if use_prompt_caching: api_kwargs["use_prompt_caching"] = use_prompt_caching if _on_api_start: await _on_api_start(api_kwargs) response = await litellm.acompletion(**api_kwargs) if _on_api_end: await _on_api_end(api_kwargs, response) usage = { **response.usage.model_dump(), # type: ignore "response_cost": response._hidden_params.get("response_cost", 0.0), } if _on_usage: await _on_usage(usage) # Step 4: Convert completion messages back to responses items format response_dict = response.model_dump() # type: ignore choice_messages = [choice["message"] for choice in response_dict["choices"]] thinking_output_items: List[Dict[str, Any]] = [] for choice_message in choice_messages: thinking_output_items.extend( convert_completion_messages_to_responses_items([choice_message]) ) # Step 5: Use Moondream to get coordinates for each description element_descriptions = get_all_element_descriptions(thinking_output_items) if element_descriptions and last_image_b64: for desc in element_descriptions: for _ in range(3): # try 3 times coords = await self.predict_click( model=model, image_b64=last_image_b64, instruction=desc, ) if coords: self.desc2xy[desc] = coords break # Step 6: Convert computer calls from descriptions back to xy coordinates final_output_items = convert_computer_calls_desc2xy(thinking_output_items, self.desc2xy) # Step 7: Return output and usage return {"output": pre_output_items + final_output_items, "usage": usage} async def predict_click( self, model: str, image_b64: str, instruction: str, **kwargs, ) -> Optional[Tuple[float, float]]: """Predict click coordinates using Moondream3's point API. Returns pixel coordinates (x, y) as floats. """ img = _decode_image_b64(image_b64) W, H = img.width, img.height model_md = get_moondream_model() try: result = model_md.point(img, instruction, settings={"max_objects": 1}) except Exception: return None try: pt = (result or {}).get("points", [])[0] x_norm = float(pt.get("x", 0.0)) y_norm = float(pt.get("y", 0.0)) x_px = max(0.0, min(float(W - 1), x_norm * W)) y_px = max(0.0, min(float(H - 1), y_norm * H)) return (x_px, y_px) except Exception: return None def get_capabilities(self) -> List[AgentCapability]: return ["click", "step"] ``` -------------------------------------------------------------------------------- /libs/typescript/computer/src/interface/macos.ts: -------------------------------------------------------------------------------- ```typescript /** * macOS computer interface implementation. */ import type { ScreenSize } from '../types'; import { BaseComputerInterface } from './base'; import type { AccessibilityNode, CursorPosition, MouseButton } from './base'; export class MacOSComputerInterface extends BaseComputerInterface { // Mouse Actions /** * Press and hold a mouse button at the specified coordinates. * @param {number} [x] - X coordinate for the mouse action * @param {number} [y] - Y coordinate for the mouse action * @param {MouseButton} [button='left'] - Mouse button to press down * @returns {Promise<void>} */ async mouseDown( x?: number, y?: number, button: MouseButton = 'left' ): Promise<void> { await this.sendCommand('mouse_down', { x, y, button }); } /** * Release a mouse button at the specified coordinates. * @param {number} [x] - X coordinate for the mouse action * @param {number} [y] - Y coordinate for the mouse action * @param {MouseButton} [button='left'] - Mouse button to release * @returns {Promise<void>} */ async mouseUp( x?: number, y?: number, button: MouseButton = 'left' ): Promise<void> { await this.sendCommand('mouse_up', { x, y, button }); } /** * Perform a left mouse click at the specified coordinates. * @param {number} [x] - X coordinate for the click * @param {number} [y] - Y coordinate for the click * @returns {Promise<void>} */ async leftClick(x?: number, y?: number): Promise<void> { await this.sendCommand('left_click', { x, y }); } /** * Perform a right mouse click at the specified coordinates. * @param {number} [x] - X coordinate for the click * @param {number} [y] - Y coordinate for the click * @returns {Promise<void>} */ async rightClick(x?: number, y?: number): Promise<void> { await this.sendCommand('right_click', { x, y }); } /** * Perform a double click at the specified coordinates. * @param {number} [x] - X coordinate for the double click * @param {number} [y] - Y coordinate for the double click * @returns {Promise<void>} */ async doubleClick(x?: number, y?: number): Promise<void> { await this.sendCommand('double_click', { x, y }); } /** * Move the cursor to the specified coordinates. * @param {number} x - X coordinate to move to * @param {number} y - Y coordinate to move to * @returns {Promise<void>} */ async moveCursor(x: number, y: number): Promise<void> { await this.sendCommand('move_cursor', { x, y }); } /** * Drag from current position to the specified coordinates. * @param {number} x - X coordinate to drag to * @param {number} y - Y coordinate to drag to * @param {MouseButton} [button='left'] - Mouse button to use for dragging * @param {number} [duration=0.5] - Duration of the drag operation in seconds * @returns {Promise<void>} */ async dragTo( x: number, y: number, button: MouseButton = 'left', duration = 0.5 ): Promise<void> { await this.sendCommand('drag_to', { x, y, button, duration }); } /** * Drag along a path of coordinates. * @param {Array<[number, number]>} path - Array of [x, y] coordinate pairs to drag through * @param {MouseButton} [button='left'] - Mouse button to use for dragging * @param {number} [duration=0.5] - Duration of the drag operation in seconds * @returns {Promise<void>} */ async drag( path: Array<[number, number]>, button: MouseButton = 'left', duration = 0.5 ): Promise<void> { await this.sendCommand('drag', { path, button, duration }); } // Keyboard Actions /** * Press and hold a key. * @param {string} key - Key to press down * @returns {Promise<void>} */ async keyDown(key: string): Promise<void> { await this.sendCommand('key_down', { key }); } /** * Release a key. * @param {string} key - Key to release * @returns {Promise<void>} */ async keyUp(key: string): Promise<void> { await this.sendCommand('key_up', { key }); } /** * Type text as if entered from keyboard. * @param {string} text - Text to type * @returns {Promise<void>} */ async typeText(text: string): Promise<void> { await this.sendCommand('type_text', { text }); } /** * Press and release a key. * @param {string} key - Key to press * @returns {Promise<void>} */ async pressKey(key: string): Promise<void> { await this.sendCommand('press_key', { key }); } /** * Press multiple keys simultaneously as a hotkey combination. * @param {...string} keys - Keys to press together * @returns {Promise<void>} */ async hotkey(...keys: string[]): Promise<void> { await this.sendCommand('hotkey', { keys }); } // Scrolling Actions /** * Scroll by the specified amount in x and y directions. * @param {number} x - Horizontal scroll amount * @param {number} y - Vertical scroll amount * @returns {Promise<void>} */ async scroll(x: number, y: number): Promise<void> { await this.sendCommand('scroll', { x, y }); } /** * Scroll down by the specified number of clicks. * @param {number} [clicks=1] - Number of scroll clicks * @returns {Promise<void>} */ async scrollDown(clicks = 1): Promise<void> { await this.sendCommand('scroll_down', { clicks }); } /** * Scroll up by the specified number of clicks. * @param {number} [clicks=1] - Number of scroll clicks * @returns {Promise<void>} */ async scrollUp(clicks = 1): Promise<void> { await this.sendCommand('scroll_up', { clicks }); } // Screen Actions /** * Take a screenshot of the screen. * @returns {Promise<Buffer>} Screenshot image data as a Buffer * @throws {Error} If screenshot fails */ async screenshot(): Promise<Buffer> { const response = await this.sendCommand('screenshot'); if (!response.image_data) { throw new Error('Failed to take screenshot'); } return Buffer.from(response.image_data as string, 'base64'); } /** * Get the current screen size. * @returns {Promise<ScreenSize>} Screen dimensions * @throws {Error} If unable to get screen size */ async getScreenSize(): Promise<ScreenSize> { const response = await this.sendCommand('get_screen_size'); if (!response.success || !response.size) { throw new Error('Failed to get screen size'); } return response.size as ScreenSize; } /** * Get the current cursor position. * @returns {Promise<CursorPosition>} Current cursor coordinates * @throws {Error} If unable to get cursor position */ async getCursorPosition(): Promise<CursorPosition> { const response = await this.sendCommand('get_cursor_position'); if (!response.success || !response.position) { throw new Error('Failed to get cursor position'); } return response.position as CursorPosition; } // Clipboard Actions /** * Copy current selection to clipboard and return the content. * @returns {Promise<string>} Clipboard content * @throws {Error} If unable to get clipboard content */ async copyToClipboard(): Promise<string> { const response = await this.sendCommand('copy_to_clipboard'); if (!response.success || !response.content) { throw new Error('Failed to get clipboard content'); } return response.content as string; } /** * Set the clipboard content to the specified text. * @param {string} text - Text to set in clipboard * @returns {Promise<void>} */ async setClipboard(text: string): Promise<void> { await this.sendCommand('set_clipboard', { text }); } // File System Actions /** * Check if a file exists at the specified path. * @param {string} path - Path to the file * @returns {Promise<boolean>} True if file exists, false otherwise */ async fileExists(path: string): Promise<boolean> { const response = await this.sendCommand('file_exists', { path }); return (response.exists as boolean) || false; } /** * Check if a directory exists at the specified path. * @param {string} path - Path to the directory * @returns {Promise<boolean>} True if directory exists, false otherwise */ async directoryExists(path: string): Promise<boolean> { const response = await this.sendCommand('directory_exists', { path }); return (response.exists as boolean) || false; } /** * List the contents of a directory. * @param {string} path - Path to the directory * @returns {Promise<string[]>} Array of file and directory names * @throws {Error} If unable to list directory */ async listDir(path: string): Promise<string[]> { const response = await this.sendCommand('list_dir', { path }); if (!response.success) { throw new Error((response.error as string) || 'Failed to list directory'); } return (response.files as string[]) || []; } /** * Get the size of a file in bytes. * @param {string} path - Path to the file * @returns {Promise<number>} File size in bytes * @throws {Error} If unable to get file size */ async getFileSize(path: string): Promise<number> { const response = await this.sendCommand('get_file_size', { path }); if (!response.success) { throw new Error((response.error as string) || 'Failed to get file size'); } return (response.size as number) || 0; } /** * Read file content in chunks for large files. * @private * @param {string} path - Path to the file * @param {number} offset - Starting byte offset * @param {number} totalLength - Total number of bytes to read * @param {number} [chunkSize=1048576] - Size of each chunk in bytes * @returns {Promise<Buffer>} File content as Buffer * @throws {Error} If unable to read file chunk */ private async readBytesChunked( path: string, offset: number, totalLength: number, chunkSize: number = 1024 * 1024 ): Promise<Buffer> { const chunks: Buffer[] = []; let currentOffset = offset; let remaining = totalLength; while (remaining > 0) { const readSize = Math.min(chunkSize, remaining); const response = await this.sendCommand('read_bytes', { path, offset: currentOffset, length: readSize, }); if (!response.success) { throw new Error( (response.error as string) || 'Failed to read file chunk' ); } const chunkData = Buffer.from(response.content_b64 as string, 'base64'); chunks.push(chunkData); currentOffset += readSize; remaining -= readSize; } return Buffer.concat(chunks); } /** * Write file content in chunks for large files. * @private * @param {string} path - Path to the file * @param {Buffer} content - Content to write * @param {boolean} [append=false] - Whether to append to existing file * @param {number} [chunkSize=1048576] - Size of each chunk in bytes * @returns {Promise<void>} * @throws {Error} If unable to write file chunk */ private async writeBytesChunked( path: string, content: Buffer, append: boolean = false, chunkSize: number = 1024 * 1024 ): Promise<void> { const totalSize = content.length; let currentOffset = 0; while (currentOffset < totalSize) { const chunkEnd = Math.min(currentOffset + chunkSize, totalSize); const chunkData = content.subarray(currentOffset, chunkEnd); // First chunk uses the original append flag, subsequent chunks always append const chunkAppend = currentOffset === 0 ? append : true; const response = await this.sendCommand('write_bytes', { path, content_b64: chunkData.toString('base64'), append: chunkAppend, }); if (!response.success) { throw new Error( (response.error as string) || 'Failed to write file chunk' ); } currentOffset = chunkEnd; } } /** * Read text from a file with specified encoding. * @param {string} path - Path to the file to read * @param {BufferEncoding} [encoding='utf8'] - Text encoding to use * @returns {Promise<string>} The decoded text content of the file */ async readText(path: string, encoding: BufferEncoding = 'utf8'): Promise<string> { const contentBytes = await this.readBytes(path); return contentBytes.toString(encoding); } /** * Write text to a file with specified encoding. * @param {string} path - Path to the file to write * @param {string} content - Text content to write * @param {BufferEncoding} [encoding='utf8'] - Text encoding to use * @param {boolean} [append=false] - Whether to append to the file instead of overwriting * @returns {Promise<void>} */ async writeText( path: string, content: string, encoding: BufferEncoding = 'utf8', append: boolean = false ): Promise<void> { const contentBytes = Buffer.from(content, encoding); await this.writeBytes(path, contentBytes, append); } /** * Read bytes from a file, with optional offset and length. * @param {string} path - Path to the file * @param {number} [offset=0] - Starting byte offset * @param {number} [length] - Number of bytes to read (reads entire file if not specified) * @returns {Promise<Buffer>} File content as Buffer * @throws {Error} If unable to read file */ async readBytes(path: string, offset: number = 0, length?: number): Promise<Buffer> { // For large files, use chunked reading if (length === undefined) { // Get file size first to determine if we need chunking const fileSize = await this.getFileSize(path); // If file is larger than 5MB, read in chunks if (fileSize > 5 * 1024 * 1024) { const readLength = offset > 0 ? fileSize - offset : fileSize; return await this.readBytesChunked(path, offset, readLength); } } const response = await this.sendCommand('read_bytes', { path, offset, length, }); if (!response.success) { throw new Error((response.error as string) || 'Failed to read file'); } return Buffer.from(response.content_b64 as string, 'base64'); } /** * Write bytes to a file. * @param {string} path - Path to the file * @param {Buffer} content - Content to write as Buffer * @param {boolean} [append=false] - Whether to append to existing file * @returns {Promise<void>} * @throws {Error} If unable to write file */ async writeBytes(path: string, content: Buffer, append: boolean = false): Promise<void> { // For large files, use chunked writing if (content.length > 5 * 1024 * 1024) { // 5MB threshold await this.writeBytesChunked(path, content, append); return; } const response = await this.sendCommand('write_bytes', { path, content_b64: content.toString('base64'), append, }); if (!response.success) { throw new Error((response.error as string) || 'Failed to write file'); } } /** * Delete a file at the specified path. * @param {string} path - Path to the file to delete * @returns {Promise<void>} * @throws {Error} If unable to delete file */ async deleteFile(path: string): Promise<void> { const response = await this.sendCommand('delete_file', { path }); if (!response.success) { throw new Error((response.error as string) || 'Failed to delete file'); } } /** * Create a directory at the specified path. * @param {string} path - Path where to create the directory * @returns {Promise<void>} * @throws {Error} If unable to create directory */ async createDir(path: string): Promise<void> { const response = await this.sendCommand('create_dir', { path }); if (!response.success) { throw new Error( (response.error as string) || 'Failed to create directory' ); } } /** * Delete a directory at the specified path. * @param {string} path - Path to the directory to delete * @returns {Promise<void>} * @throws {Error} If unable to delete directory */ async deleteDir(path: string): Promise<void> { const response = await this.sendCommand('delete_dir', { path }); if (!response.success) { throw new Error( (response.error as string) || 'Failed to delete directory' ); } } /** * Execute a shell command and return stdout and stderr. * @param {string} command - Command to execute * @returns {Promise<[string, string]>} Tuple of [stdout, stderr] * @throws {Error} If command execution fails */ async runCommand(command: string): Promise<[string, string]> { const response = await this.sendCommand('run_command', { command }); if (!response.success) { throw new Error((response.error as string) || 'Failed to run command'); } return [ (response.stdout as string) || '', (response.stderr as string) || '', ]; } // Accessibility Actions /** * Get the accessibility tree of the current screen. * @returns {Promise<AccessibilityNode>} Root accessibility node * @throws {Error} If unable to get accessibility tree */ async getAccessibilityTree(): Promise<AccessibilityNode> { const response = await this.sendCommand('get_accessibility_tree'); if (!response.success) { throw new Error( (response.error as string) || 'Failed to get accessibility tree' ); } return response as unknown as AccessibilityNode; } /** * Convert coordinates to screen coordinates. * @param {number} x - X coordinate to convert * @param {number} y - Y coordinate to convert * @returns {Promise<[number, number]>} Converted screen coordinates as [x, y] * @throws {Error} If coordinate conversion fails */ async toScreenCoordinates(x: number, y: number): Promise<[number, number]> { const response = await this.sendCommand('to_screen_coordinates', { x, y }); if (!response.success || !response.coordinates) { throw new Error('Failed to convert to screen coordinates'); } return response.coordinates as [number, number]; } /** * Convert coordinates to screenshot coordinates. * @param {number} x - X coordinate to convert * @param {number} y - Y coordinate to convert * @returns {Promise<[number, number]>} Converted screenshot coordinates as [x, y] * @throws {Error} If coordinate conversion fails */ async toScreenshotCoordinates( x: number, y: number ): Promise<[number, number]> { const response = await this.sendCommand('to_screenshot_coordinates', { x, y, }); if (!response.success || !response.coordinates) { throw new Error('Failed to convert to screenshot coordinates'); } return response.coordinates as [number, number]; } } ``` -------------------------------------------------------------------------------- /libs/lume/src/Virtualization/VMVirtualizationService.swift: -------------------------------------------------------------------------------- ```swift import Foundation import Virtualization /// Framework-agnostic VM configuration struct VMVirtualizationServiceContext { let cpuCount: Int let memorySize: UInt64 let display: String let sharedDirectories: [SharedDirectory]? let mount: Path? let hardwareModel: Data? let machineIdentifier: Data? let macAddress: String let diskPath: Path let nvramPath: Path let recoveryMode: Bool let usbMassStoragePaths: [Path]? } /// Protocol defining the interface for virtualization operations @MainActor protocol VMVirtualizationService { var state: VZVirtualMachine.State { get } func start() async throws func stop() async throws func pause() async throws func resume() async throws func getVirtualMachine() -> Any } /// Base implementation of VMVirtualizationService using VZVirtualMachine @MainActor class BaseVirtualizationService: VMVirtualizationService { let virtualMachine: VZVirtualMachine let recoveryMode: Bool // Store whether we should start in recovery mode var state: VZVirtualMachine.State { virtualMachine.state } init(virtualMachine: VZVirtualMachine, recoveryMode: Bool = false) { self.virtualMachine = virtualMachine self.recoveryMode = recoveryMode } func start() async throws { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in Task { @MainActor in if #available(macOS 13, *) { let startOptions = VZMacOSVirtualMachineStartOptions() startOptions.startUpFromMacOSRecovery = recoveryMode if recoveryMode { Logger.info("Starting VM in recovery mode") } virtualMachine.start(options: startOptions) { error in if let error = error { continuation.resume(throwing: error) } else { continuation.resume() } } } else { Logger.info("Starting VM in normal mode") virtualMachine.start { result in switch result { case .success: continuation.resume() case .failure(let error): continuation.resume(throwing: error) } } } } } } func stop() async throws { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in virtualMachine.stop { error in if let error = error { continuation.resume(throwing: error) } else { continuation.resume() } } } } func pause() async throws { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in virtualMachine.start { result in switch result { case .success: continuation.resume() case .failure(let error): continuation.resume(throwing: error) } } } } func resume() async throws { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in virtualMachine.start { result in switch result { case .success: continuation.resume() case .failure(let error): continuation.resume(throwing: error) } } } } func getVirtualMachine() -> Any { return virtualMachine } // Helper methods for creating common configurations static func createStorageDeviceConfiguration(diskPath: Path, readOnly: Bool = false) throws -> VZStorageDeviceConfiguration { return VZVirtioBlockDeviceConfiguration( attachment: try VZDiskImageStorageDeviceAttachment( url: diskPath.url, readOnly: readOnly, cachingMode: VZDiskImageCachingMode.automatic, synchronizationMode: VZDiskImageSynchronizationMode.fsync ) ) } static func createUSBMassStorageDeviceConfiguration(diskPath: Path, readOnly: Bool = false) throws -> VZStorageDeviceConfiguration { if #available(macOS 15.0, *) { return VZUSBMassStorageDeviceConfiguration( attachment: try VZDiskImageStorageDeviceAttachment( url: diskPath.url, readOnly: readOnly, cachingMode: VZDiskImageCachingMode.automatic, synchronizationMode: VZDiskImageSynchronizationMode.fsync ) ) } else { // Fallback to normal storage device if USB mass storage not available return try createStorageDeviceConfiguration(diskPath: diskPath, readOnly: readOnly) } } static func createNetworkDeviceConfiguration(macAddress: String) throws -> VZNetworkDeviceConfiguration { let network = VZVirtioNetworkDeviceConfiguration() guard let vzMacAddress = VZMACAddress(string: macAddress) else { throw VMConfigError.invalidMachineIdentifier } network.attachment = VZNATNetworkDeviceAttachment() network.macAddress = vzMacAddress return network } static func createDirectorySharingDevices(sharedDirectories: [SharedDirectory]?) -> [VZDirectorySharingDeviceConfiguration] { return sharedDirectories?.map { sharedDir in let device = VZVirtioFileSystemDeviceConfiguration(tag: sharedDir.tag) let url = URL(fileURLWithPath: sharedDir.hostPath) device.share = VZSingleDirectoryShare( directory: VZSharedDirectory(url: url, readOnly: sharedDir.readOnly)) return device } ?? [] } } /// macOS-specific virtualization service @MainActor final class DarwinVirtualizationService: BaseVirtualizationService { static func createConfiguration(_ config: VMVirtualizationServiceContext) throws -> VZVirtualMachineConfiguration { let vzConfig = VZVirtualMachineConfiguration() vzConfig.cpuCount = config.cpuCount vzConfig.memorySize = config.memorySize // Platform configuration guard let machineIdentifier = config.machineIdentifier else { throw VMConfigError.emptyMachineIdentifier } guard let hardwareModel = config.hardwareModel else { throw VMConfigError.emptyHardwareModel } let platform = VZMacPlatformConfiguration() platform.auxiliaryStorage = VZMacAuxiliaryStorage(url: config.nvramPath.url) Logger.info("Pre-VZMacHardwareModel: hardwareModel=\(hardwareModel)") guard let vzHardwareModel = VZMacHardwareModel(dataRepresentation: hardwareModel) else { throw VMConfigError.invalidHardwareModel } platform.hardwareModel = vzHardwareModel guard let vzMachineIdentifier = VZMacMachineIdentifier(dataRepresentation: machineIdentifier) else { throw VMConfigError.invalidMachineIdentifier } platform.machineIdentifier = vzMachineIdentifier vzConfig.platform = platform vzConfig.bootLoader = VZMacOSBootLoader() // Graphics configuration let display = VMDisplayResolution(string: config.display)! let graphics = VZMacGraphicsDeviceConfiguration() graphics.displays = [ VZMacGraphicsDisplayConfiguration( widthInPixels: display.width, heightInPixels: display.height, pixelsPerInch: 220 // Retina display density ) ] vzConfig.graphicsDevices = [graphics] // Common configurations vzConfig.keyboards = [VZUSBKeyboardConfiguration()] vzConfig.pointingDevices = [VZUSBScreenCoordinatePointingDeviceConfiguration()] var storageDevices = [try createStorageDeviceConfiguration(diskPath: config.diskPath)] if let mount = config.mount { storageDevices.append( try createStorageDeviceConfiguration(diskPath: mount, readOnly: true)) } // Add USB mass storage devices if specified if #available(macOS 15.0, *), let usbPaths = config.usbMassStoragePaths, !usbPaths.isEmpty { for usbPath in usbPaths { storageDevices.append( try createUSBMassStorageDeviceConfiguration(diskPath: usbPath, readOnly: true)) } } vzConfig.storageDevices = storageDevices vzConfig.networkDevices = [ try createNetworkDeviceConfiguration(macAddress: config.macAddress) ] vzConfig.memoryBalloonDevices = [VZVirtioTraditionalMemoryBalloonDeviceConfiguration()] vzConfig.entropyDevices = [VZVirtioEntropyDeviceConfiguration()] // Audio configuration let soundDeviceConfiguration = VZVirtioSoundDeviceConfiguration() let inputAudioStreamConfiguration = VZVirtioSoundDeviceInputStreamConfiguration() let outputAudioStreamConfiguration = VZVirtioSoundDeviceOutputStreamConfiguration() inputAudioStreamConfiguration.source = VZHostAudioInputStreamSource() outputAudioStreamConfiguration.sink = VZHostAudioOutputStreamSink() soundDeviceConfiguration.streams = [inputAudioStreamConfiguration, outputAudioStreamConfiguration] vzConfig.audioDevices = [soundDeviceConfiguration] // Clipboard sharing via Spice agent let spiceAgentConsoleDevice = VZVirtioConsoleDeviceConfiguration() let spiceAgentPort = VZVirtioConsolePortConfiguration() spiceAgentPort.name = VZSpiceAgentPortAttachment.spiceAgentPortName let spiceAgentPortAttachment = VZSpiceAgentPortAttachment() spiceAgentPortAttachment.sharesClipboard = true spiceAgentPort.attachment = spiceAgentPortAttachment spiceAgentConsoleDevice.ports[0] = spiceAgentPort vzConfig.consoleDevices.append(spiceAgentConsoleDevice) // Directory sharing let directorySharingDevices = createDirectorySharingDevices( sharedDirectories: config.sharedDirectories) if !directorySharingDevices.isEmpty { vzConfig.directorySharingDevices = directorySharingDevices } // USB Controller configuration if #available(macOS 15.0, *) { let usbControllerConfiguration = VZXHCIControllerConfiguration() vzConfig.usbControllers = [usbControllerConfiguration] } try vzConfig.validate() return vzConfig } static func generateMacAddress() -> String { VZMACAddress.randomLocallyAdministered().string } static func generateMachineIdentifier() -> Data { VZMacMachineIdentifier().dataRepresentation } func createAuxiliaryStorage(at path: Path, hardwareModel: Data) throws { guard let vzHardwareModel = VZMacHardwareModel(dataRepresentation: hardwareModel) else { throw VMConfigError.invalidHardwareModel } _ = try VZMacAuxiliaryStorage(creatingStorageAt: path.url, hardwareModel: vzHardwareModel) } init(configuration: VMVirtualizationServiceContext) throws { let vzConfig = try Self.createConfiguration(configuration) super.init( virtualMachine: VZVirtualMachine(configuration: vzConfig), recoveryMode: configuration.recoveryMode) } func installMacOS(imagePath: Path, progressHandler: (@Sendable (Double) -> Void)?) async throws { var observers: [NSKeyValueObservation] = [] // must hold observer references during installation to print process try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in Task { let installer = VZMacOSInstaller( virtualMachine: virtualMachine, restoringFromImageAt: imagePath.url) Logger.info("Starting macOS installation") if let progressHandler = progressHandler { let observer = installer.progress.observe( \.fractionCompleted, options: [.initial, .new] ) { (progress, change) in if let newValue = change.newValue { progressHandler(newValue) } } observers.append(observer) } installer.install { result in switch result { case .success: continuation.resume() case .failure(let error): Logger.error("Failed to install, error=\(error))") continuation.resume(throwing: error) } } } } Logger.info("macOS installation finished") } } /// Linux-specific virtualization service @MainActor final class LinuxVirtualizationService: BaseVirtualizationService { static func createConfiguration(_ config: VMVirtualizationServiceContext) throws -> VZVirtualMachineConfiguration { let vzConfig = VZVirtualMachineConfiguration() vzConfig.cpuCount = config.cpuCount vzConfig.memorySize = config.memorySize // Platform configuration let platform = VZGenericPlatformConfiguration() if #available(macOS 15, *) { platform.isNestedVirtualizationEnabled = VZGenericPlatformConfiguration.isNestedVirtualizationSupported } vzConfig.platform = platform let bootLoader = VZEFIBootLoader() bootLoader.variableStore = VZEFIVariableStore(url: config.nvramPath.url) vzConfig.bootLoader = bootLoader // Graphics configuration let display = VMDisplayResolution(string: config.display)! let graphics = VZVirtioGraphicsDeviceConfiguration() graphics.scanouts = [ VZVirtioGraphicsScanoutConfiguration( widthInPixels: display.width, heightInPixels: display.height ) ] vzConfig.graphicsDevices = [graphics] // Common configurations vzConfig.keyboards = [VZUSBKeyboardConfiguration()] vzConfig.pointingDevices = [VZUSBScreenCoordinatePointingDeviceConfiguration()] var storageDevices = [try createStorageDeviceConfiguration(diskPath: config.diskPath)] if let mount = config.mount { storageDevices.append( try createStorageDeviceConfiguration(diskPath: mount, readOnly: true)) } // Add USB mass storage devices if specified if #available(macOS 15.0, *), let usbPaths = config.usbMassStoragePaths, !usbPaths.isEmpty { for usbPath in usbPaths { storageDevices.append( try createUSBMassStorageDeviceConfiguration(diskPath: usbPath, readOnly: true)) } } vzConfig.storageDevices = storageDevices vzConfig.networkDevices = [ try createNetworkDeviceConfiguration(macAddress: config.macAddress) ] vzConfig.memoryBalloonDevices = [VZVirtioTraditionalMemoryBalloonDeviceConfiguration()] vzConfig.entropyDevices = [VZVirtioEntropyDeviceConfiguration()] // Audio configuration let soundDeviceConfiguration = VZVirtioSoundDeviceConfiguration() let inputAudioStreamConfiguration = VZVirtioSoundDeviceInputStreamConfiguration() let outputAudioStreamConfiguration = VZVirtioSoundDeviceOutputStreamConfiguration() inputAudioStreamConfiguration.source = VZHostAudioInputStreamSource() outputAudioStreamConfiguration.sink = VZHostAudioOutputStreamSink() soundDeviceConfiguration.streams = [inputAudioStreamConfiguration, outputAudioStreamConfiguration] vzConfig.audioDevices = [soundDeviceConfiguration] // Clipboard sharing via Spice agent let spiceAgentConsoleDevice = VZVirtioConsoleDeviceConfiguration() let spiceAgentPort = VZVirtioConsolePortConfiguration() spiceAgentPort.name = VZSpiceAgentPortAttachment.spiceAgentPortName let spiceAgentPortAttachment = VZSpiceAgentPortAttachment() spiceAgentPortAttachment.sharesClipboard = true spiceAgentPort.attachment = spiceAgentPortAttachment spiceAgentConsoleDevice.ports[0] = spiceAgentPort vzConfig.consoleDevices.append(spiceAgentConsoleDevice) // Directory sharing var directorySharingDevices = createDirectorySharingDevices( sharedDirectories: config.sharedDirectories) // Add Rosetta support if available if #available(macOS 13.0, *) { if VZLinuxRosettaDirectoryShare.availability == .installed { do { let rosettaShare = try VZLinuxRosettaDirectoryShare() let rosettaDevice = VZVirtioFileSystemDeviceConfiguration(tag: "rosetta") rosettaDevice.share = rosettaShare directorySharingDevices.append(rosettaDevice) Logger.info("Added Rosetta support to Linux VM") } catch { Logger.info("Failed to add Rosetta support: \(error.localizedDescription)") } } else { Logger.info("Rosetta not installed, skipping Rosetta support") } } if !directorySharingDevices.isEmpty { vzConfig.directorySharingDevices = directorySharingDevices } // USB Controller configuration if #available(macOS 15.0, *) { let usbControllerConfiguration = VZXHCIControllerConfiguration() vzConfig.usbControllers = [usbControllerConfiguration] } try vzConfig.validate() return vzConfig } func generateMacAddress() -> String { VZMACAddress.randomLocallyAdministered().string } func createNVRAM(at path: Path) throws { _ = try VZEFIVariableStore(creatingVariableStoreAt: path.url) } init(configuration: VMVirtualizationServiceContext) throws { let vzConfig = try Self.createConfiguration(configuration) super.init(virtualMachine: VZVirtualMachine(configuration: vzConfig)) } } ``` -------------------------------------------------------------------------------- /libs/python/computer/computer/providers/lume_api.py: -------------------------------------------------------------------------------- ```python """Shared API utilities for Lume and Lumier providers. This module contains shared functions for interacting with the Lume API, used by both the LumeProvider and LumierProvider classes. """ import logging import json import subprocess import urllib.parse from typing import Dict, List, Optional, Any # Setup logging logger = logging.getLogger(__name__) # Check if curl is available try: subprocess.run(["curl", "--version"], capture_output=True, check=True) HAS_CURL = True except (subprocess.SubprocessError, FileNotFoundError): HAS_CURL = False def lume_api_get( vm_name: str, host: str, port: int, storage: Optional[str] = None, debug: bool = False, verbose: bool = False ) -> Dict[str, Any]: """Use curl to get VM information from Lume API. Args: vm_name: Name of the VM to get info for host: API host port: API port storage: Storage path for the VM debug: Whether to show debug output verbose: Enable verbose logging Returns: Dictionary with VM status information parsed from JSON response """ # URL encode the storage parameter for the query encoded_storage = "" storage_param = "" if storage: # First encode the storage path properly encoded_storage = urllib.parse.quote(storage, safe='') storage_param = f"?storage={encoded_storage}" # Construct API URL with encoded storage parameter if needed api_url = f"http://{host}:{port}/lume/vms/{vm_name}{storage_param}" # Construct the curl command with increased timeouts for more reliability # --connect-timeout: Time to establish connection (15 seconds) # --max-time: Maximum time for the whole operation (20 seconds) # -f: Fail silently (no output at all) on server errors # Add single quotes around URL to ensure special characters are handled correctly cmd = ["curl", "--connect-timeout", "15", "--max-time", "20", "-s", "-f", f"'{api_url}'"] # For logging and display, show the properly escaped URL display_cmd = ["curl", "--connect-timeout", "15", "--max-time", "20", "-s", "-f", api_url] # Only print the curl command when debug is enabled display_curl_string = ' '.join(display_cmd) logger.debug(f"Executing API request: {display_curl_string}") # Execute the command - for execution we need to use shell=True to handle URLs with special characters try: # Use a single string with shell=True for proper URL handling shell_cmd = ' '.join(cmd) result = subprocess.run(shell_cmd, shell=True, capture_output=True, text=True) # Handle curl exit codes if result.returncode != 0: curl_error = "Unknown error" # Map common curl error codes to helpful messages if result.returncode == 7: curl_error = "Failed to connect to the API server - it might still be starting up" elif result.returncode == 22: curl_error = "HTTP error returned from API server" elif result.returncode == 28: curl_error = "Operation timeout - the API server is taking too long to respond" elif result.returncode == 52: curl_error = "Empty reply from server - the API server is starting but not fully ready yet" elif result.returncode == 56: curl_error = "Network problem during data transfer - check container networking" # Only log at debug level to reduce noise during retries logger.debug(f"API request failed with code {result.returncode}: {curl_error}") # Return a more useful error message return { "error": f"API request failed: {curl_error}", "curl_code": result.returncode, "vm_name": vm_name, "status": "unknown" # We don't know the actual status due to API error } # Try to parse the response as JSON if result.stdout and result.stdout.strip(): try: vm_status = json.loads(result.stdout) if debug or verbose: logger.info(f"Successfully parsed VM status: {vm_status.get('status', 'unknown')}") return vm_status except json.JSONDecodeError as e: # Return the raw response if it's not valid JSON logger.warning(f"Invalid JSON response: {e}") if "Virtual machine not found" in result.stdout: return {"status": "not_found", "message": "VM not found in Lume API"} return {"error": f"Invalid JSON response: {result.stdout[:100]}...", "status": "unknown"} else: return {"error": "Empty response from API", "status": "unknown"} except subprocess.SubprocessError as e: logger.error(f"Failed to execute API request: {e}") return {"error": f"Failed to execute API request: {str(e)}", "status": "unknown"} def lume_api_run( vm_name: str, host: str, port: int, run_opts: Dict[str, Any], storage: Optional[str] = None, debug: bool = False, verbose: bool = False ) -> Dict[str, Any]: """Run a VM using curl. Args: vm_name: Name of the VM to run host: API host port: API port run_opts: Dictionary of run options storage: Storage path for the VM debug: Whether to show debug output verbose: Enable verbose logging Returns: Dictionary with API response or error information """ # Construct API URL api_url = f"http://{host}:{port}/lume/vms/{vm_name}/run" # Prepare JSON payload with required parameters payload = {} # Add CPU cores if specified if "cpu" in run_opts: payload["cpu"] = run_opts["cpu"] # Add memory if specified if "memory" in run_opts: payload["memory"] = run_opts["memory"] # Add storage parameter if specified if storage: payload["storage"] = storage elif "storage" in run_opts: payload["storage"] = run_opts["storage"] # Add shared directories if specified if "shared_directories" in run_opts and run_opts["shared_directories"]: payload["sharedDirectories"] = run_opts["shared_directories"] # Log the payload for debugging logger.debug(f"API payload: {json.dumps(payload, indent=2)}") # Construct the curl command cmd = [ "curl", "--connect-timeout", "30", "--max-time", "30", "-s", "-X", "POST", "-H", "Content-Type: application/json", "-d", json.dumps(payload), api_url ] # Execute the command try: result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: logger.warning(f"API request failed with code {result.returncode}: {result.stderr}") return {"error": f"API request failed: {result.stderr}"} # Try to parse the response as JSON if result.stdout and result.stdout.strip(): try: response = json.loads(result.stdout) return response except json.JSONDecodeError: # Return the raw response if it's not valid JSON return {"success": True, "message": "VM started successfully", "raw_response": result.stdout} else: return {"success": True, "message": "VM started successfully"} except subprocess.SubprocessError as e: logger.error(f"Failed to execute run request: {e}") return {"error": f"Failed to execute run request: {str(e)}"} def lume_api_stop( vm_name: str, host: str, port: int, storage: Optional[str] = None, debug: bool = False, verbose: bool = False ) -> Dict[str, Any]: """Stop a VM using curl. Args: vm_name: Name of the VM to stop host: API host port: API port storage: Storage path for the VM debug: Whether to show debug output verbose: Enable verbose logging Returns: Dictionary with API response or error information """ # Construct API URL api_url = f"http://{host}:{port}/lume/vms/{vm_name}/stop" # Prepare JSON payload with required parameters payload = {} # Add storage path if specified if storage: payload["storage"] = storage # Construct the curl command cmd = [ "curl", "--connect-timeout", "15", "--max-time", "20", "-s", "-X", "POST", "-H", "Content-Type: application/json", "-d", json.dumps(payload), api_url ] # Execute the command try: if debug or verbose: logger.info(f"Executing: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: logger.warning(f"API request failed with code {result.returncode}: {result.stderr}") return {"error": f"API request failed: {result.stderr}"} # Try to parse the response as JSON if result.stdout and result.stdout.strip(): try: response = json.loads(result.stdout) return response except json.JSONDecodeError: # Return the raw response if it's not valid JSON return {"success": True, "message": "VM stopped successfully", "raw_response": result.stdout} else: return {"success": True, "message": "VM stopped successfully"} except subprocess.SubprocessError as e: logger.error(f"Failed to execute stop request: {e}") return {"error": f"Failed to execute stop request: {str(e)}"} def lume_api_update( vm_name: str, host: str, port: int, update_opts: Dict[str, Any], storage: Optional[str] = None, debug: bool = False, verbose: bool = False ) -> Dict[str, Any]: """Update VM settings using curl. Args: vm_name: Name of the VM to update host: API host port: API port update_opts: Dictionary of update options storage: Storage path for the VM debug: Whether to show debug output verbose: Enable verbose logging Returns: Dictionary with API response or error information """ # Construct API URL api_url = f"http://{host}:{port}/lume/vms/{vm_name}/update" # Prepare JSON payload with required parameters payload = {} # Add CPU cores if specified if "cpu" in update_opts: payload["cpu"] = update_opts["cpu"] # Add memory if specified if "memory" in update_opts: payload["memory"] = update_opts["memory"] # Add storage path if specified if storage: payload["storage"] = storage # Construct the curl command cmd = [ "curl", "--connect-timeout", "15", "--max-time", "20", "-s", "-X", "POST", "-H", "Content-Type: application/json", "-d", json.dumps(payload), api_url ] # Execute the command try: if debug: logger.info(f"Executing: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: logger.warning(f"API request failed with code {result.returncode}: {result.stderr}") return {"error": f"API request failed: {result.stderr}"} # Try to parse the response as JSON if result.stdout and result.stdout.strip(): try: response = json.loads(result.stdout) return response except json.JSONDecodeError: # Return the raw response if it's not valid JSON return {"success": True, "message": "VM updated successfully", "raw_response": result.stdout} else: return {"success": True, "message": "VM updated successfully"} except subprocess.SubprocessError as e: logger.error(f"Failed to execute update request: {e}") return {"error": f"Failed to execute update request: {str(e)}"} def lume_api_pull( image: str, name: str, host: str, port: int, storage: Optional[str] = None, registry: str = "ghcr.io", organization: str = "trycua", debug: bool = False, verbose: bool = False ) -> Dict[str, Any]: """Pull a VM image from a registry using curl. Args: image: Name/tag of the image to pull name: Name to give the VM after pulling host: API host port: API port storage: Storage path for the VM registry: Registry to pull from (default: ghcr.io) organization: Organization in registry (default: trycua) debug: Whether to show debug output verbose: Enable verbose logging Returns: Dictionary with pull status and information """ # Prepare pull request payload pull_payload = { "image": image, # Use provided image name "name": name, # Always use name as the target VM name "registry": registry, "organization": organization } if storage: pull_payload["storage"] = storage # Construct pull command with proper JSON payload pull_cmd = [ "curl" ] if not verbose: pull_cmd.append("-s") pull_cmd.extend([ "-X", "POST", "-H", "Content-Type: application/json", "-d", json.dumps(pull_payload), f"http://{host}:{port}/lume/pull" ]) logger.debug(f"Executing API request: {' '.join(pull_cmd)}") try: # Execute pull command result = subprocess.run(pull_cmd, capture_output=True, text=True) if result.returncode != 0: error_msg = f"Failed to pull VM {name}: {result.stderr}" logger.error(error_msg) return {"error": error_msg} try: response = json.loads(result.stdout) logger.info(f"Successfully initiated pull for VM {name}") return response except json.JSONDecodeError: if result.stdout: logger.info(f"Pull response: {result.stdout}") return {"success": True, "message": f"Successfully initiated pull for VM {name}"} except subprocess.SubprocessError as e: error_msg = f"Failed to execute pull command: {str(e)}" logger.error(error_msg) return {"error": error_msg} def lume_api_delete( vm_name: str, host: str, port: int, storage: Optional[str] = None, debug: bool = False, verbose: bool = False ) -> Dict[str, Any]: """Delete a VM using curl. Args: vm_name: Name of the VM to delete host: API host port: API port storage: Storage path for the VM debug: Whether to show debug output verbose: Enable verbose logging Returns: Dictionary with API response or error information """ # URL encode the storage parameter for the query encoded_storage = "" storage_param = "" if storage: # First encode the storage path properly encoded_storage = urllib.parse.quote(storage, safe='') storage_param = f"?storage={encoded_storage}" # Construct API URL with encoded storage parameter if needed api_url = f"http://{host}:{port}/lume/vms/{vm_name}{storage_param}" # Construct the curl command for DELETE operation - using much longer timeouts matching shell implementation cmd = ["curl", "--connect-timeout", "6000", "--max-time", "5000", "-s", "-X", "DELETE", f"'{api_url}'"] # For logging and display, show the properly escaped URL display_cmd = ["curl", "--connect-timeout", "6000", "--max-time", "5000", "-s", "-X", "DELETE", api_url] # Only print the curl command when debug is enabled display_curl_string = ' '.join(display_cmd) logger.debug(f"Executing API request: {display_curl_string}") # Execute the command - for execution we need to use shell=True to handle URLs with special characters try: # Use a single string with shell=True for proper URL handling shell_cmd = ' '.join(cmd) result = subprocess.run(shell_cmd, shell=True, capture_output=True, text=True) # Handle curl exit codes if result.returncode != 0: curl_error = "Unknown error" # Map common curl error codes to helpful messages if result.returncode == 7: curl_error = "Failed to connect to the API server - it might still be starting up" elif result.returncode == 22: curl_error = "HTTP error returned from API server" elif result.returncode == 28: curl_error = "Operation timeout - the API server is taking too long to respond" elif result.returncode == 52: curl_error = "Empty reply from server - the API server is starting but not fully ready yet" elif result.returncode == 56: curl_error = "Network problem during data transfer - check container networking" # Only log at debug level to reduce noise during retries logger.debug(f"API request failed with code {result.returncode}: {curl_error}") # Return a more useful error message return { "error": f"API request failed: {curl_error}", "curl_code": result.returncode, "vm_name": vm_name, "storage": storage } # Try to parse the response as JSON if result.stdout and result.stdout.strip(): try: response = json.loads(result.stdout) return response except json.JSONDecodeError: # Return the raw response if it's not valid JSON return {"success": True, "message": "VM deleted successfully", "raw_response": result.stdout} else: return {"success": True, "message": "VM deleted successfully"} except subprocess.SubprocessError as e: logger.error(f"Failed to execute delete request: {e}") return {"error": f"Failed to execute delete request: {str(e)}"} def parse_memory(memory_str: str) -> int: """Parse memory string to MB integer. Examples: "8GB" -> 8192 "1024MB" -> 1024 "512" -> 512 Returns: Memory value in MB """ if isinstance(memory_str, int): return memory_str if isinstance(memory_str, str): # Extract number and unit import re match = re.match(r"(\d+)([A-Za-z]*)", memory_str) if match: value, unit = match.groups() value = int(value) unit = unit.upper() if unit == "GB" or unit == "G": return value * 1024 elif unit == "MB" or unit == "M" or unit == "": return value # Default fallback logger.warning(f"Could not parse memory string '{memory_str}', using 8GB default") return 8192 # Default to 8GB ``` -------------------------------------------------------------------------------- /libs/python/pylume/pylume/server.py: -------------------------------------------------------------------------------- ```python import os import time import asyncio import subprocess import tempfile import logging import socket from typing import Optional import sys from .exceptions import LumeConnectionError import signal import json import shlex import random from logging import getLogger class LumeServer: def __init__( self, debug: bool = False, server_start_timeout: int = 60, port: Optional[int] = None, use_existing_server: bool = False, host: str = "localhost", ): """Initialize the LumeServer. Args: debug: Enable debug logging server_start_timeout: Timeout in seconds to wait for server to start port: Specific port to use for the server use_existing_server: If True, will try to connect to an existing server instead of starting a new one host: Host to use for connections (e.g., "localhost", "127.0.0.1", "host.docker.internal") """ self.debug = debug self.server_start_timeout = server_start_timeout self.server_process = None self.output_file = None self.requested_port = port self.port = None self.base_url = None self.use_existing_server = use_existing_server self.host = host # Configure logging self.logger = getLogger("pylume.server") if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.setLevel(logging.DEBUG if debug else logging.INFO) self.logger.debug(f"Server initialized with host: {self.host}") def _check_port_available(self, port: int) -> bool: """Check if a port is available.""" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(0.5) result = s.connect_ex(("127.0.0.1", port)) if result == 0: # Port is in use on localhost return False except: pass # Check the specified host (e.g., "host.docker.internal") if it's not a localhost alias if self.host not in ["localhost", "127.0.0.1"]: try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(0.5) result = s.connect_ex((self.host, port)) if result == 0: # Port is in use on host return False except: pass return True def _get_server_port(self) -> int: """Get an available port for the server.""" # Use requested port if specified if self.requested_port is not None: if not self._check_port_available(self.requested_port): raise RuntimeError(f"Requested port {self.requested_port} is not available") return self.requested_port # Find a free port for _ in range(10): # Try up to 10 times port = random.randint(49152, 65535) if self._check_port_available(port): return port raise RuntimeError("Could not find an available port") async def _ensure_server_running(self) -> None: """Ensure the lume server is running, start it if it's not.""" try: self.logger.debug("Checking if lume server is running...") # Try to connect to the server with a short timeout cmd = ["curl", "-s", "-w", "%{http_code}", "-m", "5", f"{self.base_url}/vms"] process = await asyncio.create_subprocess_exec( *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode == 0: response = stdout.decode() status_code = int(response[-3:]) if status_code == 200: self.logger.debug("PyLume server is running") return self.logger.debug("PyLume server not running, attempting to start it") # Server not running, try to start it lume_path = os.path.join(os.path.dirname(__file__), "lume") if not os.path.exists(lume_path): raise RuntimeError(f"Could not find lume binary at {lume_path}") # Make sure the file is executable os.chmod(lume_path, 0o755) # Create a temporary file for server output self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False) self.logger.debug(f"Using temporary file for server output: {self.output_file.name}") # Start the server self.logger.debug(f"Starting lume server with: {lume_path} serve --port {self.port}") # Start server in background using subprocess.Popen try: self.server_process = subprocess.Popen( [lume_path, "serve", "--port", str(self.port)], stdout=self.output_file, stderr=self.output_file, cwd=os.path.dirname(lume_path), start_new_session=True, # Run in new session to avoid blocking ) except Exception as e: self.output_file.close() os.unlink(self.output_file.name) raise RuntimeError(f"Failed to start lume server process: {str(e)}") # Wait for server to start self.logger.debug( f"Waiting up to {self.server_start_timeout} seconds for server to start..." ) start_time = time.time() server_ready = False last_size = 0 while time.time() - start_time < self.server_start_timeout: if self.server_process.poll() is not None: # Process has terminated self.output_file.seek(0) output = self.output_file.read() self.output_file.close() os.unlink(self.output_file.name) error_msg = ( f"Server process terminated unexpectedly.\n" f"Exit code: {self.server_process.returncode}\n" f"Output: {output}" ) raise RuntimeError(error_msg) # Check output file for server ready message self.output_file.seek(0, os.SEEK_END) size = self.output_file.tell() if size > last_size: # Only read if there's new content self.output_file.seek(last_size) new_output = self.output_file.read() if new_output.strip(): # Only log non-empty output self.logger.debug(f"Server output: {new_output.strip()}") last_size = size if "Server started" in new_output: server_ready = True self.logger.debug("Server startup detected") break # Try to connect to the server periodically try: cmd = ["curl", "-s", "-w", "%{http_code}", "-m", "5", f"{self.base_url}/vms"] process = await asyncio.create_subprocess_exec( *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode == 0: response = stdout.decode() status_code = int(response[-3:]) if status_code == 200: server_ready = True self.logger.debug("Server is responding to requests") break except: pass # Server not ready yet await asyncio.sleep(1.0) if not server_ready: # Cleanup if server didn't start if self.server_process: self.server_process.terminate() try: self.server_process.wait(timeout=5) except subprocess.TimeoutExpired: self.server_process.kill() self.output_file.close() os.unlink(self.output_file.name) raise RuntimeError( f"Failed to start lume server after {self.server_start_timeout} seconds. " "Check the debug output for more details." ) # Give the server a moment to fully initialize await asyncio.sleep(2.0) # Verify server is responding try: cmd = ["curl", "-s", "-w", "%{http_code}", "-m", "10", f"{self.base_url}/vms"] process = await asyncio.create_subprocess_exec( *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: raise RuntimeError(f"Curl command failed: {stderr.decode()}") response = stdout.decode() status_code = int(response[-3:]) if status_code != 200: raise RuntimeError(f"Server returned status code {status_code}") self.logger.debug("PyLume server started successfully") except Exception as e: self.logger.debug(f"Server verification failed: {str(e)}") if self.server_process: self.server_process.terminate() try: self.server_process.wait(timeout=5) except subprocess.TimeoutExpired: self.server_process.kill() self.output_file.close() os.unlink(self.output_file.name) raise RuntimeError(f"Server started but is not responding: {str(e)}") self.logger.debug("Server startup completed successfully") except Exception as e: raise RuntimeError(f"Failed to start lume server: {str(e)}") async def _start_server(self) -> None: """Start the lume server using the lume executable.""" self.logger.debug("Starting PyLume server") # Get absolute path to lume executable in the same directory as this file lume_path = os.path.join(os.path.dirname(__file__), "lume") if not os.path.exists(lume_path): raise RuntimeError(f"Could not find lume binary at {lume_path}") try: # Make executable os.chmod(lume_path, 0o755) # Get and validate port self.port = self._get_server_port() self.base_url = f"http://{self.host}:{self.port}/lume" # Set up output handling self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False) # Start the server process with the lume executable env = os.environ.copy() env["RUST_BACKTRACE"] = "1" # Enable backtrace for better error reporting # Specify the host to bind to (0.0.0.0 to allow external connections) self.server_process = subprocess.Popen( [lume_path, "serve", "--port", str(self.port)], stdout=self.output_file, stderr=subprocess.STDOUT, cwd=os.path.dirname(lume_path), # Run from same directory as executable env=env, ) # Wait for server to initialize await asyncio.sleep(2) await self._wait_for_server() except Exception as e: await self._cleanup() raise RuntimeError(f"Failed to start lume server process: {str(e)}") async def _tail_log(self) -> None: """Read and display server log output in debug mode.""" while True: try: self.output_file.seek(0, os.SEEK_END) # type: ignore[attr-defined] line = self.output_file.readline() # type: ignore[attr-defined] if line: line = line.strip() if line: print(f"SERVER: {line}") if self.server_process.poll() is not None: # type: ignore[attr-defined] print("Server process ended") break await asyncio.sleep(0.1) except Exception as e: print(f"Error reading log: {e}") await asyncio.sleep(0.1) async def _wait_for_server(self) -> None: """Wait for server to start and become responsive with increased timeout.""" start_time = time.time() while time.time() - start_time < self.server_start_timeout: if self.server_process.poll() is not None: # type: ignore[attr-defined] error_msg = await self._get_error_output() await self._cleanup() raise RuntimeError(error_msg) try: await self._verify_server() self.logger.debug("Server is now responsive") return except Exception as e: self.logger.debug(f"Server not ready yet: {str(e)}") await asyncio.sleep(1.0) await self._cleanup() raise RuntimeError(f"Server failed to start after {self.server_start_timeout} seconds") async def _verify_server(self) -> None: """Verify server is responding to requests.""" try: cmd = [ "curl", "-s", "-w", "%{http_code}", "-m", "10", f"http://{self.host}:{self.port}/lume/vms", ] process = await asyncio.create_subprocess_exec( *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: raise RuntimeError(f"Curl command failed: {stderr.decode()}") response = stdout.decode() status_code = int(response[-3:]) if status_code != 200: raise RuntimeError(f"Server returned status code {status_code}") self.logger.debug("PyLume server started successfully") except Exception as e: raise RuntimeError(f"Server not responding: {str(e)}") async def _get_error_output(self) -> str: """Get error output from the server process.""" if not self.output_file: return "No output available" self.output_file.seek(0) output = self.output_file.read() return ( f"Server process terminated unexpectedly.\n" f"Exit code: {self.server_process.returncode}\n" # type: ignore[attr-defined] f"Output: {output}" ) async def _cleanup(self) -> None: """Clean up all server resources.""" if self.server_process: try: self.server_process.terminate() try: self.server_process.wait(timeout=5) except subprocess.TimeoutExpired: self.server_process.kill() except: pass self.server_process = None # Clean up output file if self.output_file: try: self.output_file.close() os.unlink(self.output_file.name) except Exception as e: self.logger.debug(f"Error cleaning up output file: {e}") self.output_file = None async def ensure_running(self) -> None: """Ensure the server is running. If use_existing_server is True, will only try to connect to an existing server. Otherwise will: 1. Try to connect to an existing server on the specified port 2. If that fails and not in Docker, start a new server 3. If in Docker and no existing server is found, raise an error """ # First check if we're in Docker in_docker = os.path.exists("/.dockerenv") or ( os.path.exists("/proc/1/cgroup") and "docker" in open("/proc/1/cgroup", "r").read() ) # If using a non-localhost host like host.docker.internal, set up the connection details if self.host not in ["localhost", "127.0.0.1"]: if self.requested_port is None: raise RuntimeError("Port must be specified when using a remote host") self.port = self.requested_port self.base_url = f"http://{self.host}:{self.port}/lume" self.logger.debug(f"Using remote host server at {self.base_url}") # Try to verify the server is accessible try: await self._verify_server() self.logger.debug("Successfully connected to remote server") return except Exception as e: if self.use_existing_server or in_docker: # If explicitly requesting an existing server or in Docker, we can't start a new one raise RuntimeError( f"Failed to connect to remote server at {self.base_url}: {str(e)}" ) else: self.logger.debug(f"Remote server not available at {self.base_url}: {str(e)}") # Fall back to localhost for starting a new server self.host = "localhost" # If explicitly using an existing server, verify it's running if self.use_existing_server: if self.requested_port is None: raise RuntimeError("Port must be specified when using an existing server") self.port = self.requested_port self.base_url = f"http://{self.host}:{self.port}/lume" try: await self._verify_server() self.logger.debug("Successfully connected to existing server") except Exception as e: raise RuntimeError( f"Failed to connect to existing server at {self.base_url}: {str(e)}" ) else: # Try to connect to an existing server first if self.requested_port is not None: self.port = self.requested_port self.base_url = f"http://{self.host}:{self.port}/lume" try: await self._verify_server() self.logger.debug("Successfully connected to existing server") return except Exception: self.logger.debug(f"No existing server found at {self.base_url}") # If in Docker and can't connect to existing server, raise an error if in_docker: raise RuntimeError( f"Failed to connect to server at {self.base_url} and cannot start a new server in Docker" ) # Start a new server self.logger.debug("Starting a new server instance") await self._start_server() async def stop(self) -> None: """Stop the server if we're managing it.""" if not self.use_existing_server: self.logger.debug("Stopping lume server...") await self._cleanup() ``` -------------------------------------------------------------------------------- /libs/python/computer-server/computer_server/handlers/linux.py: -------------------------------------------------------------------------------- ```python """ Linux implementation of automation and accessibility handlers. This implementation attempts to use pyautogui for GUI automation when available. If running in a headless environment without X11, it will fall back to simulated responses. To use GUI automation in a headless environment: 1. Install Xvfb: sudo apt-get install xvfb 2. Run with virtual display: xvfb-run python -m computer_server """ from typing import Dict, Any, List, Tuple, Optional import logging import subprocess import asyncio import base64 import os import json from io import BytesIO # Configure logger logger = logging.getLogger(__name__) # Try to import pyautogui, but don't fail if it's not available # This allows the server to run in headless environments try: import pyautogui pyautogui.FAILSAFE = False logger.info("pyautogui successfully imported, GUI automation available") except Exception as e: logger.warning(f"pyautogui import failed: {str(e)}. GUI operations will be simulated.") from pynput.mouse import Button, Controller as MouseController from pynput.keyboard import Key, Controller as KeyboardController from .base import BaseAccessibilityHandler, BaseAutomationHandler class LinuxAccessibilityHandler(BaseAccessibilityHandler): """Linux implementation of accessibility handler.""" async def get_accessibility_tree(self) -> Dict[str, Any]: """Get the accessibility tree of the current window. Returns: Dict[str, Any]: A dictionary containing success status and a simulated tree structure since Linux doesn't have equivalent accessibility API like macOS. """ # Linux doesn't have equivalent accessibility API like macOS # Return a minimal dummy tree logger.info("Getting accessibility tree (simulated, no accessibility API available on Linux)") return { "success": True, "tree": { "role": "Window", "title": "Linux Window", "position": {"x": 0, "y": 0}, "size": {"width": 1920, "height": 1080}, "children": [] } } async def find_element(self, role: Optional[str] = None, title: Optional[str] = None, value: Optional[str] = None) -> Dict[str, Any]: """Find an element in the accessibility tree by criteria. Args: role: The role of the element to find. title: The title of the element to find. value: The value of the element to find. Returns: Dict[str, Any]: A dictionary indicating that element search is not supported on Linux. """ logger.info(f"Finding element with role={role}, title={title}, value={value} (not supported on Linux)") return { "success": False, "message": "Element search not supported on Linux" } def get_cursor_position(self) -> Tuple[int, int]: """Get the current cursor position. Returns: Tuple[int, int]: The x and y coordinates of the cursor position. Returns (0, 0) if pyautogui is not available. """ try: pos = pyautogui.position() return pos.x, pos.y except Exception as e: logger.warning(f"Failed to get cursor position with pyautogui: {e}") logger.info("Getting cursor position (simulated)") return 0, 0 def get_screen_size(self) -> Tuple[int, int]: """Get the screen size. Returns: Tuple[int, int]: The width and height of the screen in pixels. Returns (1920, 1080) if pyautogui is not available. """ try: size = pyautogui.size() return size.width, size.height except Exception as e: logger.warning(f"Failed to get screen size with pyautogui: {e}") logger.info("Getting screen size (simulated)") return 1920, 1080 class LinuxAutomationHandler(BaseAutomationHandler): """Linux implementation of automation handler using pyautogui.""" keyboard = KeyboardController() mouse = MouseController() # Mouse Actions async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]: """Press and hold a mouse button at the specified coordinates. Args: x: The x coordinate to move to before pressing. If None, uses current position. y: The y coordinate to move to before pressing. If None, uses current position. button: The mouse button to press ("left", "right", or "middle"). Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: if x is not None and y is not None: pyautogui.moveTo(x, y) pyautogui.mouseDown(button=button) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]: """Release a mouse button at the specified coordinates. Args: x: The x coordinate to move to before releasing. If None, uses current position. y: The y coordinate to move to before releasing. If None, uses current position. button: The mouse button to release ("left", "right", or "middle"). Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: if x is not None and y is not None: pyautogui.moveTo(x, y) pyautogui.mouseUp(button=button) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def move_cursor(self, x: int, y: int) -> Dict[str, Any]: """Move the cursor to the specified coordinates. Args: x: The x coordinate to move to. y: The y coordinate to move to. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: pyautogui.moveTo(x, y) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: """Perform a left mouse click at the specified coordinates. Args: x: The x coordinate to click at. If None, clicks at current position. y: The y coordinate to click at. If None, clicks at current position. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: if x is not None and y is not None: pyautogui.moveTo(x, y) pyautogui.click() return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: """Perform a right mouse click at the specified coordinates. Args: x: The x coordinate to click at. If None, clicks at current position. y: The y coordinate to click at. If None, clicks at current position. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: if x is not None and y is not None: pyautogui.moveTo(x, y) pyautogui.rightClick() return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: """Perform a double click at the specified coordinates. Args: x: The x coordinate to double click at. If None, clicks at current position. y: The y coordinate to double click at. If None, clicks at current position. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: if x is not None and y is not None: pyautogui.moveTo(x, y) pyautogui.doubleClick(interval=0.1) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def click(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]: """Perform a mouse click with the specified button at the given coordinates. Args: x: The x coordinate to click at. If None, clicks at current position. y: The y coordinate to click at. If None, clicks at current position. button: The mouse button to click ("left", "right", or "middle"). Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: if x is not None and y is not None: pyautogui.moveTo(x, y) pyautogui.click(button=button) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]: """Drag from the current position to the specified coordinates. Args: x: The x coordinate to drag to. y: The y coordinate to drag to. button: The mouse button to use for dragging. duration: The time in seconds to take for the drag operation. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: pyautogui.dragTo(x, y, duration=duration, button=button) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def drag(self, start_x: int, start_y: int, end_x: int, end_y: int, button: str = "left") -> Dict[str, Any]: """Drag from start coordinates to end coordinates. Args: start_x: The starting x coordinate. start_y: The starting y coordinate. end_x: The ending x coordinate. end_y: The ending y coordinate. button: The mouse button to use for dragging. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: pyautogui.moveTo(start_x, start_y) pyautogui.dragTo(end_x, end_y, duration=0.5, button=button) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def drag_path(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]: """Drag along a path defined by a list of coordinates. Args: path: A list of (x, y) coordinate tuples defining the drag path. button: The mouse button to use for dragging. duration: The time in seconds to take for each segment of the drag. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: if not path: return {"success": False, "error": "Path is empty"} pyautogui.moveTo(*path[0]) for x, y in path[1:]: pyautogui.dragTo(x, y, duration=duration, button=button) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} # Keyboard Actions async def key_down(self, key: str) -> Dict[str, Any]: """Press and hold a key. Args: key: The key to press down. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: pyautogui.keyDown(key) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def key_up(self, key: str) -> Dict[str, Any]: """Release a key. Args: key: The key to release. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: pyautogui.keyUp(key) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def type_text(self, text: str) -> Dict[str, Any]: """Type the specified text using the keyboard. Args: text: The text to type. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: # use pynput for Unicode support self.keyboard.type(text) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def press_key(self, key: str) -> Dict[str, Any]: """Press and release a key. Args: key: The key to press. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: pyautogui.press(key) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def hotkey(self, keys: List[str]) -> Dict[str, Any]: """Press a combination of keys simultaneously. Args: keys: A list of keys to press together as a hotkey combination. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: pyautogui.hotkey(*keys) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} # Scrolling Actions async def scroll(self, x: int, y: int) -> Dict[str, Any]: """Scroll the mouse wheel. Args: x: The horizontal scroll amount. y: The vertical scroll amount. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: self.mouse.scroll(x, y) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]: """Scroll down by the specified number of clicks. Args: clicks: The number of scroll clicks to perform downward. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: pyautogui.scroll(-clicks) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]: """Scroll up by the specified number of clicks. Args: clicks: The number of scroll clicks to perform upward. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: pyautogui.scroll(clicks) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} # Screen Actions async def screenshot(self) -> Dict[str, Any]: """Take a screenshot of the current screen. Returns: Dict[str, Any]: A dictionary containing success status and base64-encoded image data, or error message if failed. """ try: from PIL import Image screenshot = pyautogui.screenshot() if not isinstance(screenshot, Image.Image): return {"success": False, "error": "Failed to capture screenshot"} buffered = BytesIO() screenshot.save(buffered, format="PNG", optimize=True) buffered.seek(0) image_data = base64.b64encode(buffered.getvalue()).decode() return {"success": True, "image_data": image_data} except Exception as e: return {"success": False, "error": f"Screenshot error: {str(e)}"} async def get_screen_size(self) -> Dict[str, Any]: """Get the size of the screen. Returns: Dict[str, Any]: A dictionary containing success status and screen dimensions, or error message if failed. """ try: size = pyautogui.size() return {"success": True, "size": {"width": size.width, "height": size.height}} except Exception as e: return {"success": False, "error": str(e)} async def get_cursor_position(self) -> Dict[str, Any]: """Get the current position of the cursor. Returns: Dict[str, Any]: A dictionary containing success status and cursor coordinates, or error message if failed. """ try: pos = pyautogui.position() return {"success": True, "position": {"x": pos.x, "y": pos.y}} except Exception as e: return {"success": False, "error": str(e)} # Clipboard Actions async def copy_to_clipboard(self) -> Dict[str, Any]: """Get the current content of the clipboard. Returns: Dict[str, Any]: A dictionary containing success status and clipboard content, or error message if failed. """ try: import pyperclip content = pyperclip.paste() return {"success": True, "content": content} except Exception as e: return {"success": False, "error": str(e)} async def set_clipboard(self, text: str) -> Dict[str, Any]: """Set the clipboard content to the specified text. Args: text: The text to copy to the clipboard. Returns: Dict[str, Any]: A dictionary with success status and error message if failed. """ try: import pyperclip pyperclip.copy(text) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} # Command Execution async def run_command(self, command: str) -> Dict[str, Any]: """Execute a shell command asynchronously. Args: command: The shell command to execute. Returns: Dict[str, Any]: A dictionary containing success status, stdout, stderr, and return code, or error message if failed. """ try: # Create subprocess process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Wait for the subprocess to finish stdout, stderr = await process.communicate() # Return decoded output return { "success": True, "stdout": stdout.decode() if stdout else "", "stderr": stderr.decode() if stderr else "", "return_code": process.returncode } except Exception as e: return {"success": False, "error": str(e)} ```