This is page 8 of 16. Use http://codebase.md/trycua/cua?page={x} to view the full context. # Directory Structure ``` ├── .all-contributorsrc ├── .cursorignore ├── .devcontainer │ ├── devcontainer.json │ ├── post-install.sh │ └── README.md ├── .dockerignore ├── .gitattributes ├── .github │ ├── FUNDING.yml │ ├── scripts │ │ ├── get_pyproject_version.py │ │ └── tests │ │ ├── __init__.py │ │ ├── README.md │ │ └── test_get_pyproject_version.py │ └── workflows │ ├── ci-lume.yml │ ├── docker-publish-kasm.yml │ ├── docker-publish-xfce.yml │ ├── docker-reusable-publish.yml │ ├── npm-publish-computer.yml │ ├── npm-publish-core.yml │ ├── publish-lume.yml │ ├── pypi-publish-agent.yml │ ├── pypi-publish-computer-server.yml │ ├── pypi-publish-computer.yml │ ├── pypi-publish-core.yml │ ├── pypi-publish-mcp-server.yml │ ├── pypi-publish-pylume.yml │ ├── pypi-publish-som.yml │ ├── pypi-reusable-publish.yml │ └── test-validation-script.yml ├── .gitignore ├── .vscode │ ├── docs.code-workspace │ ├── launch.json │ ├── libs-ts.code-workspace │ ├── lume.code-workspace │ ├── lumier.code-workspace │ ├── py.code-workspace │ └── settings.json ├── blog │ ├── app-use.md │ ├── assets │ │ ├── composite-agents.png │ │ ├── docker-ubuntu-support.png │ │ ├── hack-booth.png │ │ ├── hack-closing-ceremony.jpg │ │ ├── hack-cua-ollama-hud.jpeg │ │ ├── hack-leaderboard.png │ │ ├── hack-the-north.png │ │ ├── hack-winners.jpeg │ │ ├── hack-workshop.jpeg │ │ ├── hud-agent-evals.png │ │ └── trajectory-viewer.jpeg │ ├── bringing-computer-use-to-the-web.md │ ├── build-your-own-operator-on-macos-1.md │ ├── build-your-own-operator-on-macos-2.md │ ├── composite-agents.md │ ├── cua-hackathon.md │ ├── hack-the-north.md │ ├── hud-agent-evals.md │ ├── human-in-the-loop.md │ ├── introducing-cua-cloud-containers.md │ ├── lume-to-containerization.md │ ├── sandboxed-python-execution.md │ ├── training-computer-use-models-trajectories-1.md │ ├── trajectory-viewer.md │ ├── ubuntu-docker-support.md │ └── windows-sandbox.md ├── CONTRIBUTING.md ├── Development.md ├── Dockerfile ├── docs │ ├── .gitignore │ ├── .prettierrc │ ├── content │ │ └── docs │ │ ├── agent-sdk │ │ │ ├── agent-loops.mdx │ │ │ ├── benchmarks │ │ │ │ ├── index.mdx │ │ │ │ ├── interactive.mdx │ │ │ │ ├── introduction.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── osworld-verified.mdx │ │ │ │ ├── screenspot-pro.mdx │ │ │ │ └── screenspot-v2.mdx │ │ │ ├── callbacks │ │ │ │ ├── agent-lifecycle.mdx │ │ │ │ ├── cost-saving.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── logging.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── pii-anonymization.mdx │ │ │ │ └── trajectories.mdx │ │ │ ├── chat-history.mdx │ │ │ ├── custom-computer-handlers.mdx │ │ │ ├── custom-tools.mdx │ │ │ ├── customizing-computeragent.mdx │ │ │ ├── integrations │ │ │ │ ├── hud.mdx │ │ │ │ └── meta.json │ │ │ ├── message-format.mdx │ │ │ ├── meta.json │ │ │ ├── migration-guide.mdx │ │ │ ├── prompt-caching.mdx │ │ │ ├── supported-agents │ │ │ │ ├── composed-agents.mdx │ │ │ │ ├── computer-use-agents.mdx │ │ │ │ ├── grounding-models.mdx │ │ │ │ ├── human-in-the-loop.mdx │ │ │ │ └── meta.json │ │ │ ├── supported-model-providers │ │ │ │ ├── index.mdx │ │ │ │ └── local-models.mdx │ │ │ └── usage-tracking.mdx │ │ ├── computer-sdk │ │ │ ├── cloud-vm-management.mdx │ │ │ ├── commands.mdx │ │ │ ├── computer-ui.mdx │ │ │ ├── computers.mdx │ │ │ ├── meta.json │ │ │ └── sandboxed-python.mdx │ │ ├── index.mdx │ │ ├── libraries │ │ │ ├── agent │ │ │ │ └── index.mdx │ │ │ ├── computer │ │ │ │ └── index.mdx │ │ │ ├── computer-server │ │ │ │ ├── Commands.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── REST-API.mdx │ │ │ │ └── WebSocket-API.mdx │ │ │ ├── core │ │ │ │ └── index.mdx │ │ │ ├── lume │ │ │ │ ├── cli-reference.mdx │ │ │ │ ├── faq.md │ │ │ │ ├── http-api.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── meta.json │ │ │ │ └── prebuilt-images.mdx │ │ │ ├── lumier │ │ │ │ ├── building-lumier.mdx │ │ │ │ ├── docker-compose.mdx │ │ │ │ ├── docker.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ └── meta.json │ │ │ ├── mcp-server │ │ │ │ ├── client-integrations.mdx │ │ │ │ ├── configuration.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── llm-integrations.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── tools.mdx │ │ │ │ └── usage.mdx │ │ │ └── som │ │ │ ├── configuration.mdx │ │ │ └── index.mdx │ │ ├── meta.json │ │ ├── quickstart-cli.mdx │ │ ├── quickstart-devs.mdx │ │ └── telemetry.mdx │ ├── next.config.mjs │ ├── package-lock.json │ ├── package.json │ ├── pnpm-lock.yaml │ ├── postcss.config.mjs │ ├── public │ │ └── img │ │ ├── agent_gradio_ui.png │ │ ├── agent.png │ │ ├── cli.png │ │ ├── computer.png │ │ ├── som_box_threshold.png │ │ └── som_iou_threshold.png │ ├── README.md │ ├── source.config.ts │ ├── src │ │ ├── app │ │ │ ├── (home) │ │ │ │ ├── [[...slug]] │ │ │ │ │ └── page.tsx │ │ │ │ └── layout.tsx │ │ │ ├── api │ │ │ │ └── search │ │ │ │ └── route.ts │ │ │ ├── favicon.ico │ │ │ ├── global.css │ │ │ ├── layout.config.tsx │ │ │ ├── layout.tsx │ │ │ ├── llms.mdx │ │ │ │ └── [[...slug]] │ │ │ │ └── route.ts │ │ │ └── llms.txt │ │ │ └── route.ts │ │ ├── assets │ │ │ ├── discord-black.svg │ │ │ ├── discord-white.svg │ │ │ ├── logo-black.svg │ │ │ └── logo-white.svg │ │ ├── components │ │ │ ├── iou.tsx │ │ │ └── mermaid.tsx │ │ ├── lib │ │ │ ├── llms.ts │ │ │ └── source.ts │ │ └── mdx-components.tsx │ └── tsconfig.json ├── examples │ ├── agent_examples.py │ ├── agent_ui_examples.py │ ├── cloud_api_examples.py │ ├── computer_examples_windows.py │ ├── computer_examples.py │ ├── computer_ui_examples.py │ ├── computer-example-ts │ │ ├── .env.example │ │ ├── .gitignore │ │ ├── .prettierrc │ │ ├── package-lock.json │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── README.md │ │ ├── src │ │ │ ├── helpers.ts │ │ │ └── index.ts │ │ └── tsconfig.json │ ├── docker_examples.py │ ├── evals │ │ ├── hud_eval_examples.py │ │ └── wikipedia_most_linked.txt │ ├── pylume_examples.py │ ├── sandboxed_functions_examples.py │ ├── som_examples.py │ ├── utils.py │ └── winsandbox_example.py ├── img │ ├── agent_gradio_ui.png │ ├── agent.png │ ├── cli.png │ ├── computer.png │ ├── logo_black.png │ └── logo_white.png ├── libs │ ├── kasm │ │ ├── Dockerfile │ │ ├── LICENSE │ │ ├── README.md │ │ └── src │ │ └── ubuntu │ │ └── install │ │ └── firefox │ │ ├── custom_startup.sh │ │ ├── firefox.desktop │ │ └── install_firefox.sh │ ├── lume │ │ ├── .cursorignore │ │ ├── CONTRIBUTING.md │ │ ├── Development.md │ │ ├── img │ │ │ └── cli.png │ │ ├── Package.resolved │ │ ├── Package.swift │ │ ├── README.md │ │ ├── resources │ │ │ └── lume.entitlements │ │ ├── scripts │ │ │ ├── build │ │ │ │ ├── build-debug.sh │ │ │ │ ├── build-release-notarized.sh │ │ │ │ └── build-release.sh │ │ │ └── install.sh │ │ ├── src │ │ │ ├── Commands │ │ │ │ ├── Clone.swift │ │ │ │ ├── Config.swift │ │ │ │ ├── Create.swift │ │ │ │ ├── Delete.swift │ │ │ │ ├── Get.swift │ │ │ │ ├── Images.swift │ │ │ │ ├── IPSW.swift │ │ │ │ ├── List.swift │ │ │ │ ├── Logs.swift │ │ │ │ ├── Options │ │ │ │ │ └── FormatOption.swift │ │ │ │ ├── Prune.swift │ │ │ │ ├── Pull.swift │ │ │ │ ├── Push.swift │ │ │ │ ├── Run.swift │ │ │ │ ├── Serve.swift │ │ │ │ ├── Set.swift │ │ │ │ └── Stop.swift │ │ │ ├── ContainerRegistry │ │ │ │ ├── ImageContainerRegistry.swift │ │ │ │ ├── ImageList.swift │ │ │ │ └── ImagesPrinter.swift │ │ │ ├── Errors │ │ │ │ └── Errors.swift │ │ │ ├── FileSystem │ │ │ │ ├── Home.swift │ │ │ │ ├── Settings.swift │ │ │ │ ├── VMConfig.swift │ │ │ │ ├── VMDirectory.swift │ │ │ │ └── VMLocation.swift │ │ │ ├── LumeController.swift │ │ │ ├── Main.swift │ │ │ ├── Server │ │ │ │ ├── Handlers.swift │ │ │ │ ├── HTTP.swift │ │ │ │ ├── Requests.swift │ │ │ │ ├── Responses.swift │ │ │ │ └── Server.swift │ │ │ ├── Utils │ │ │ │ ├── CommandRegistry.swift │ │ │ │ ├── CommandUtils.swift │ │ │ │ ├── Logger.swift │ │ │ │ ├── NetworkUtils.swift │ │ │ │ ├── Path.swift │ │ │ │ ├── ProcessRunner.swift │ │ │ │ ├── ProgressLogger.swift │ │ │ │ ├── String.swift │ │ │ │ └── Utils.swift │ │ │ ├── Virtualization │ │ │ │ ├── DarwinImageLoader.swift │ │ │ │ ├── DHCPLeaseParser.swift │ │ │ │ ├── ImageLoaderFactory.swift │ │ │ │ └── VMVirtualizationService.swift │ │ │ ├── VM │ │ │ │ ├── DarwinVM.swift │ │ │ │ ├── LinuxVM.swift │ │ │ │ ├── VM.swift │ │ │ │ ├── VMDetails.swift │ │ │ │ ├── VMDetailsPrinter.swift │ │ │ │ ├── VMDisplayResolution.swift │ │ │ │ └── VMFactory.swift │ │ │ └── VNC │ │ │ ├── PassphraseGenerator.swift │ │ │ └── VNCService.swift │ │ └── tests │ │ ├── Mocks │ │ │ ├── MockVM.swift │ │ │ ├── MockVMVirtualizationService.swift │ │ │ └── MockVNCService.swift │ │ ├── VM │ │ │ └── VMDetailsPrinterTests.swift │ │ ├── VMTests.swift │ │ ├── VMVirtualizationServiceTests.swift │ │ └── VNCServiceTests.swift │ ├── lumier │ │ ├── .dockerignore │ │ ├── Dockerfile │ │ ├── README.md │ │ └── src │ │ ├── bin │ │ │ └── entry.sh │ │ ├── config │ │ │ └── constants.sh │ │ ├── hooks │ │ │ └── on-logon.sh │ │ └── lib │ │ ├── utils.sh │ │ └── vm.sh │ ├── python │ │ ├── agent │ │ │ ├── .bumpversion.cfg │ │ │ ├── agent │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── adapters │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── huggingfacelocal_adapter.py │ │ │ │ │ ├── human_adapter.py │ │ │ │ │ ├── mlxvlm_adapter.py │ │ │ │ │ └── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── qwen2_5_vl.py │ │ │ │ ├── agent.py │ │ │ │ ├── callbacks │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── budget_manager.py │ │ │ │ │ ├── image_retention.py │ │ │ │ │ ├── logging.py │ │ │ │ │ ├── operator_validator.py │ │ │ │ │ ├── pii_anonymization.py │ │ │ │ │ ├── prompt_instructions.py │ │ │ │ │ ├── telemetry.py │ │ │ │ │ └── trajectory_saver.py │ │ │ │ ├── cli.py │ │ │ │ ├── computers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cua.py │ │ │ │ │ └── custom.py │ │ │ │ ├── decorators.py │ │ │ │ ├── human_tool │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ ├── server.py │ │ │ │ │ └── ui.py │ │ │ │ ├── integrations │ │ │ │ │ └── hud │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── agent.py │ │ │ │ │ └── proxy.py │ │ │ │ ├── loops │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── anthropic.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── composed_grounded.py │ │ │ │ │ ├── gemini.py │ │ │ │ │ ├── glm45v.py │ │ │ │ │ ├── gta1.py │ │ │ │ │ ├── holo.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── model_types.csv │ │ │ │ │ ├── moondream3.py │ │ │ │ │ ├── omniparser.py │ │ │ │ │ ├── openai.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── uitars.py │ │ │ │ ├── proxy │ │ │ │ │ ├── examples.py │ │ │ │ │ └── handlers.py │ │ │ │ ├── responses.py │ │ │ │ ├── types.py │ │ │ │ └── ui │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ └── gradio │ │ │ │ ├── __init__.py │ │ │ │ ├── app.py │ │ │ │ └── ui_components.py │ │ │ ├── benchmarks │ │ │ │ ├── .gitignore │ │ │ │ ├── contrib.md │ │ │ │ ├── interactive.py │ │ │ │ ├── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ └── gta1.py │ │ │ │ ├── README.md │ │ │ │ ├── ss-pro.py │ │ │ │ ├── ss-v2.py │ │ │ │ └── utils.py │ │ │ ├── example.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer │ │ │ │ ├── __init__.py │ │ │ │ ├── computer.py │ │ │ │ ├── diorama_computer.py │ │ │ │ ├── helpers.py │ │ │ │ ├── interface │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ ├── models.py │ │ │ │ │ └── windows.py │ │ │ │ ├── logger.py │ │ │ │ ├── models.py │ │ │ │ ├── providers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cloud │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── docker │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── lume │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── lume_api.py │ │ │ │ │ ├── lumier │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── types.py │ │ │ │ │ └── winsandbox │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── provider.py │ │ │ │ │ └── setup_script.ps1 │ │ │ │ ├── ui │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ └── gradio │ │ │ │ │ ├── __init__.py │ │ │ │ │ └── app.py │ │ │ │ └── utils.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── cli.py │ │ │ │ ├── diorama │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── diorama_computer.py │ │ │ │ │ ├── diorama.py │ │ │ │ │ ├── draw.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── safezone.py │ │ │ │ ├── handlers │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── windows.py │ │ │ │ ├── main.py │ │ │ │ ├── server.py │ │ │ │ └── watchdog.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── usage_example.py │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ ├── run_server.py │ │ │ └── test_connection.py │ │ ├── core │ │ │ ├── .bumpversion.cfg │ │ │ ├── core │ │ │ │ ├── __init__.py │ │ │ │ └── telemetry │ │ │ │ ├── __init__.py │ │ │ │ └── posthog.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── mcp-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── CONCURRENT_SESSIONS.md │ │ │ ├── mcp_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── server.py │ │ │ │ └── session_manager.py │ │ │ ├── pdm.lock │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ └── scripts │ │ │ ├── install_mcp_server.sh │ │ │ └── start_mcp_server.sh │ │ ├── pylume │ │ │ ├── __init__.py │ │ │ ├── .bumpversion.cfg │ │ │ ├── pylume │ │ │ │ ├── __init__.py │ │ │ │ ├── client.py │ │ │ │ ├── exceptions.py │ │ │ │ ├── lume │ │ │ │ ├── models.py │ │ │ │ ├── pylume.py │ │ │ │ └── server.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ └── som │ │ ├── .bumpversion.cfg │ │ ├── LICENSE │ │ ├── poetry.toml │ │ ├── pyproject.toml │ │ ├── README.md │ │ ├── som │ │ │ ├── __init__.py │ │ │ ├── detect.py │ │ │ ├── detection.py │ │ │ ├── models.py │ │ │ ├── ocr.py │ │ │ ├── util │ │ │ │ └── utils.py │ │ │ └── visualization.py │ │ └── tests │ │ └── test_omniparser.py │ ├── typescript │ │ ├── .gitignore │ │ ├── .nvmrc │ │ ├── agent │ │ │ ├── examples │ │ │ │ ├── playground-example.html │ │ │ │ └── README.md │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── client.ts │ │ │ │ ├── index.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ └── client.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── biome.json │ │ ├── computer │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── computer │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── providers │ │ │ │ │ │ ├── base.ts │ │ │ │ │ │ ├── cloud.ts │ │ │ │ │ │ └── index.ts │ │ │ │ │ └── types.ts │ │ │ │ ├── index.ts │ │ │ │ ├── interface │ │ │ │ │ ├── base.ts │ │ │ │ │ ├── factory.ts │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── linux.ts │ │ │ │ │ ├── macos.ts │ │ │ │ │ └── windows.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ ├── computer │ │ │ │ │ └── cloud.test.ts │ │ │ │ ├── interface │ │ │ │ │ ├── factory.test.ts │ │ │ │ │ ├── index.test.ts │ │ │ │ │ ├── linux.test.ts │ │ │ │ │ ├── macos.test.ts │ │ │ │ │ └── windows.test.ts │ │ │ │ └── setup.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── core │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── index.ts │ │ │ │ └── telemetry │ │ │ │ ├── clients │ │ │ │ │ ├── index.ts │ │ │ │ │ └── posthog.ts │ │ │ │ └── index.ts │ │ │ ├── tests │ │ │ │ └── telemetry.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── pnpm-workspace.yaml │ │ └── README.md │ └── xfce │ ├── .dockerignore │ ├── .gitignore │ ├── Dockerfile │ ├── README.md │ └── src │ ├── scripts │ │ ├── resize-display.sh │ │ ├── start-computer-server.sh │ │ ├── start-novnc.sh │ │ ├── start-vnc.sh │ │ └── xstartup.sh │ ├── supervisor │ │ └── supervisord.conf │ └── xfce-config │ ├── helpers.rc │ ├── xfce4-power-manager.xml │ └── xfce4-session.xml ├── LICENSE.md ├── Makefile ├── notebooks │ ├── agent_nb.ipynb │ ├── blog │ │ ├── build-your-own-operator-on-macos-1.ipynb │ │ └── build-your-own-operator-on-macos-2.ipynb │ ├── composite_agents_docker_nb.ipynb │ ├── computer_nb.ipynb │ ├── computer_server_nb.ipynb │ ├── customizing_computeragent.ipynb │ ├── eval_osworld.ipynb │ ├── ollama_nb.ipynb │ ├── pylume_nb.ipynb │ ├── README.md │ ├── sota_hackathon_cloud.ipynb │ └── sota_hackathon.ipynb ├── pdm.lock ├── pyproject.toml ├── pyrightconfig.json ├── README.md ├── samples │ └── community │ ├── global-online │ │ └── README.md │ └── hack-the-north │ └── README.md ├── scripts │ ├── build-uv.sh │ ├── build.ps1 │ ├── build.sh │ ├── cleanup.sh │ ├── playground-docker.sh │ ├── playground.sh │ └── run-docker-dev.sh └── tests ├── pytest.ini ├── shell_cmd.py ├── test_files.py ├── test_mcp_server_session_management.py ├── test_mcp_server_streaming.py ├── test_shell_bash.py ├── test_telemetry.py ├── test_venv.py └── test_watchdog.py ``` # Files -------------------------------------------------------------------------------- /examples/evals/wikipedia_most_linked.txt: -------------------------------------------------------------------------------- ``` ISBN (identifier) United States Main Page Tilde Doi (identifier) Fair use Association football Years Wayback Machine ISSN (identifier) India Wikimedia Foundation Wikidata Animal Taxonomy (biology) Australia France Eukaryote IP address U.S. state Time zone City Copyright Canada Town ASCII Greek alphabet Typographic ligature Diacritical mark Wikipedia Germany Human settlement Open Tree of Life IMDb (identifier) United Kingdom Catalogue of Life Insect Russia Japan Italy Arthropod Television show Public domain INaturalist Poland England PMID (identifier) Daylight saving time S2CID (identifier) China Encyclopedia of Life Spain OCLC (identifier) Plant Flickr Wikispecies Africa Song Record label Lepidoptera Iran English language Music genre News aggregator Web feed Proxy server X-Forwarded-For College football World War II Brazil Sweden Politics Olympics Netherlands Record producer California New York City Surname The New York Times London New Zealand PMC (identifier) Logo Synonym (taxonomy) Switzerland Turkey Sport Video game Architecture Norway Bibcode (identifier) Mexico Botany JSTOR (identifier) Rail transport Field hockey Ireland Scotland Belgium South Africa Common name Professional sports Sport governing body Sport industry Olympic games Election Austria Ukraine Anthroponymy Pakistan Baseball Denmark Christianity Philippines Woman Romania Czech Republic Album Godzilla Minus One Single (music) Electoral reform Nofollow Basketball New York (state) Argentina Finland Soviet Union Greece Russian language Historic site Free content YouTube Catholic Church Hungary Kingdom Hearts Beetle Company Tetris Portugal BioShock Abandonware Deus Ex (video game) 4A Engine Yoshi's New Island Kaboom! (video game) Rain World Juno (Overwatch) Crash Team Rumble Vault 101 Tales of Commons NHL Hockey Clutch Gaming Haseo Allin Kempthorne Ilyas El Maliki Ratalaika Games 3D mousepad HaptX Walid Sultan Midani Rustler (video game) Look Outside Ducks Ahoy! Fusion Engine Cricket Geography Chordate The Guardian Israel Billboard (magazine) Ice hockey Given name Chicago World War I Pennsylvania Indonesia Alma mater Vascular plant Amorphea Wikimedia Commons Novel Village Visual arts Film poster Flowering plant Opisthokont Obazoa County seat Short story First-class cricket Law Europe University Croatia Sport of athletics Holozoa Choanozoa Filozoa German language Tennis Eumetazoa Serbia ParaHoxozoa Thailand History Midfielder Bilateria Unincorporated area French language AllMusic Astronomy Nephrozoa Novella Ship Twitter Character (arts) College Malaysia Conflict of interest Higher education IUCN Red List Rock music Gastropoda Creative Commons Wales Bulgaria UTC+2 Paris Species Illinois HTML element South Korea BBC Persian language Moth Conservation status Pop music Colombia Wicket American football Jazz World Flora Online Los Angeles Songwriter Hong Kong Hdl (identifier) Genus Spanish language Egypt Not out Slovenia Chile Korea Tropicos Slovakia Bishop Family (biology) Rugby union Women's history Nigeria College basketball Sports Reference Washington, D.C. GFDL Afghanistan Sri Lanka Newspapers.com UTC+1 Eudicots Estonia Los Angeles Times Olympedia Bangladesh Peru Singapore Typographical error UTC Virginia Taiwan Fast bowling COVID-19 pandemic Food Fish River Republic of Ireland Beer Caribbean Michigan Drink Chinese language Business Leg break Women's Test cricket Women's cricket Innings New Jersey Protostome Spin bowling Sugar Underarm bowling Roger Federer Googly Apple Comics Cricket Australia XI Fair and unfair play Anime Rafael Nadal Leander Paes Kazakhstan Capital city Blessed Virgin Mary Venezuela Case sensitivity Arabic language North America Texas Burger King The Plant List Justine Henin Sushi Angelus Beef Sanctification Cuthbert Tunstall Bread Saint Mungo Incumbent Americanism (heresy) Curry Ensoulment Associated Press Adolph John Paschang French cuisine Altar Society UTC-5 Philadelphia Bill Mallon Yogurt Soy sauce Open Era (tennis) Belarus Manga English Wikipedia Islam Trademark ISO 4 Wisconsin Lithuania The Washington Post Agaricus bisporus Reptile Sociology Organizations Death Ham and eggs Asia Swimming (sport) South America Northern Ireland Observation.org European Union Astronomical object Georgia (U.S. state) Gmina Provinces of Iran Computing Counties of Iran Discogs Mathematics Powiat Missouri Bachelor of Arts Iran Standard Time Florida Bakhsh Minnesota Oregon Nepal Variety (magazine) Japanese language Journalism Rome Computer Ohio Ontario Internet Archive Latvia Comedy Azerbaijan BBC News Morocco Ecdysozoa Print-on-demand Bengali language A5 paper Pedia Press Education Mollusca American Civil War Berlin Taxon Maryland Panarthropoda Hebrew language Toronto Tactopoda Episode Cuba Country music Religion Rotten Tomatoes Georgia (country) Classical music Month Puerto Rico GEOnet Names Server Sydney The Times Iraq Polyphaga Derivative work Lisbon Syria Ecuador Uzbekistan Greek language Latin United Nations Literature Animation Physics Amphibian Romanize List of countries Moscow Politician Philosophy Metacritic Mammal Pinyin Open access New South Wales Theatre Allmusic Syntax Women in music Fly Colorado Academic journal LGBTQ Seal (emblem) Rolling Stone Saudi Arabia Science fiction Tweet (social media) Heavy metal music Boston Vietnam Molecular biology Facebook Iceland Albania Cycling Tennessee Armenia Massachusetts Mandibulata United States Navy Communes of France Census Algeria United States Army Wikilink Pancrustacea Alternative rock American English Radio stations History of Romania Endemism San Francisco Award Ghana Judaism Alabama Blog The Independent Melbourne Cantons of France Lebanon West Germany Quotation mark Regions of France Chernivtsi Oblast Tokyo Italian language Connecticut Country Screenshot Ghost town Iran Daylight Time NatureServe Mongolia Cyprus Northern Bukovina Rugby league Northern Bessarabia State highway Harvard University Yorkshire Pterygota Slash (punctuation) Prize Science Asian Games Eastern Time Zone Myanmar Nazi Germany Ottoman Empire Quebec Billboard Hot 100 United Arab Emirates Neoptera Hexapoda Least Concern Type species EPPO Code Wikisource Kyrgyzstan Allotriocarida Volleyball Geology Second World War British Columbia Socialism Zoology The Daily Telegraph Paleontology Vienna Dicondylia BugGuide United States Senate Hermit crab Paraphrase CNN Royal Navy Indian Standard Time Billboard 200 Kenya DVD Sipuncula Tajikistan National park Economics Heterocyathus Uruguay Heteropsammia Road Spanish name Luxembourg Korean language UK Singles Chart Queensland Montreal New York Times Bolivia CP/M Timestamp Electronic music INSEE code ArXiv (identifier) PubMed SVG USA Today Omnivore Tunisia Psychology ESPN UEFA Hawaii Gastropod Aliyah North Carolina Russian Empire Tibet Fungi Oklahoma Fauna Europaea Turkmenistan British English The London Gazette Civil township Boxing Barack Obama Animal Diversity Web Reuters Eumetabola Voter turnout Transport False positive Donald Trump Kansas Antarctica Lake Ethiopia Time (magazine) Marriage NBC Beijing Vertebrate Czechoslovakia Protected area Energy Poetry Archaeology Columbia University Poverty line Alaska Computing platform British Empire University of Oxford Costa Rica Dublin A-side and B-side ZIP code Actinopterygii UTC-6 Photoperiodism Mayor Sphaeriidae Animal suicide Atka mackerel Starling Arizona Entertainment Weekly Sphaerium beckmani Junqueira cow Zaniolepis frenata Campocraspedon Zimbabwe Motorsport Bird flight Cnemophilidae Hinduism Phalarope Indiana Museums Holometabola Pytilia North Macedonia Malta Cathartiformes Darter Saker falcon Cathartes Avian malaria Coal tit Magpie duck Video game developer Bird bath Vesper sparrow Gouldian finch Debeaking Vector graphics Semiplumbeous hawk Scottish crossbill Bullfinch Fregata Nidicolous Plushcap Pallid scops owl Hip-hop Blyth's frogmouth Sunda scops owl Argus (bird) Operation Migration Nik Borrow Per capita income Guy Oseary Madrid Buddhism Drainage basin Sephardic Haredim Rami Kleinstein Guy Bavli David Bar-Hayim Levin Kipnis Edna Arbel Prisoner of Zion Ayala Procaccia Nachum Heiman Zman Tel Aviv CBS ARIA Charts Cucujiformia Away colours Regex 2019 African Games 1962 Asian Games 1958 Asian Games Chemistry Olympic Games The Middle Ages Central Asia Bengalis Southeast Asia Find a Grave Microsoft Windows Swing (politics) White (U.S. Census) Roman Catholic Maine The Times of India Season (sports) Jamaica Video game genre Munich Asterids Rosids Golf Language Hangul Atlanta Glasgow UTC+3 Library of Congress Deuterostome COVID-19 Video game publisher Montenegro ESPNcricinfo Brand UTC-4 IGN Stockholm Istanbul NASA Gnathostomata Ukrainian language Human rights Chicago Tribune ProQuest IMDb River mouth Hip hop music Gene Netflix Moldova Barcelona Paraguay Olfactores Labour Party (UK) United States dollar Qatar Photography Guatemala Summit Cold War Running First World War Precipitation Edinburgh Amsterdam Lima New Eskaton Computer program Xinjiang Women in science Manhattan Warsaw Magazine Horror film Deadline Hollywood Jordan Aparaglossata Agriculture Internet Prague The Hindu Cretaceous Latino (U.S. Census) Vietnam War Music download Encyclopedia Chemical compounds Pittsburgh Soap opera Budapest George W. Bush Seattle Extended play Washington (state) Listed building Palestine LCCN (identifier) Portland, Oregon Panama Plagiarism Brooklyn Teleostomi Manchester Bird Mollusk Automobile Historic England Linguistics Dependent territory Athens Civil engineering Sea snail Population density Finance Disaster management Tanzania Jurassic Districts of Russia Western Australia Louisiana Portuguese language Anatomy The Beatles Tamil language Milan Uganda Natural environment FIFA Cameroon Blu-ray Mexico City Chemical formula Jimmy Wales Papua New Guinea Diaphoretickes UNESCO Forbes Technology Buenos Aires Vancouver Dominican Republic 2007 Species description East Germany Folk music Kentucky Multimedia Monocotyledon Rio de Janeiro Automated Hindi Houston Google Devonian Member of Parliament Bible Mumbai FishBase African diaspora Carboniferous Cambrian Triassic Montana Handball Ordovician San Diego Archive.today Stanford University British Army Middle Ages Frequency Ultratop Permian Detroit Earth Precambrian Hamburg Alberta Tamil Nadu Madagascar Lancashire Guitar Trade union Instagram Engineering 2006 Silurian NPR Railway station CAS Registry Number Yemen Noctuoidea Fiji Haiti Rowing (sport) New Orleans NME Alternative media North Korea Microsoft Jerusalem Paleogene Audery Mill Creek Horse racing Post town Piano Bavaria Polish language Horror fiction Neogene Kerala Copenhagen Google Books Central Time Zone Island Birmingham Anglicanism Software Mountain range Investment Brussels Muhammad Ali Asian (U.S. Census) Video game culture Brisbane Church of England Kosovo Bachelor of Science Molar mass Arachnid Own goal Yale University Caenogastropoda Auckland World Athletics Trinidad and Tobago Hanyu Pinyin Sound bite Time El Salvador Microbiology Columbia Records Seoul Cerambycidae Maharashtra Chelicerata Fungus Media influence South Carolina Radio Telenovela FA Cup Senegal Internet trolling Nashville, Tennessee Demonym Standard Chinese Sculpture Liverpool Thesis Bass guitar Chess Women artists Icon (computing) PubChem UK Albums Chart Head coach Roman Empire Grand Slam (tennis) JSmol Formula One Biology Kent Ancient Rome Inner Carniola Oslo Dutch language Wingspan Archaeplastida MTV Edvard Ravnikar ITunes Feminism German Empire Pacific Ocean Atlantic Ocean Pharmacology Track gauge ChemSpider Doctor of Philosophy Regions of England Districts of England Christmas Pavel Golia Predjama Castle Overtime (sports) Forum Swiss Hitparade Stumped Majority Male Shanghai Siddharta (band) ``` -------------------------------------------------------------------------------- /blog/training-computer-use-models-trajectories-1.md: -------------------------------------------------------------------------------- ```markdown # Training Computer-Use Models: Creating Human Trajectories with Cua *Published on May 1, 2025 by Dillon DuPont* In our previous posts, we covered [building your own Computer-Use Operator](build-your-own-operator-on-macos-1) and [using the Agent framework](build-your-own-operator-on-macos-2) to simplify development. Today, we'll focus on a critical aspect of improving computer-use agents and models: gathering high-quality demonstration data using Cua's Computer-Use Interface (CUI) and its Gradio UI to create and share human-generated trajectories. Why is this important? Underlying models used by Computer-use agents need examples of how humans interact with computers to learn effectively. By creating a dataset of diverse, well-executed tasks, we can help train better models that understand how to navigate user interfaces and accomplish real tasks. <video src="https://github.com/user-attachments/assets/c586d460-3877-4b5f-a736-3248886d2134" controls width="600"></video> ## What You'll Learn By the end of this tutorial, you'll be able to: - Set up the Computer-Use Interface (CUI) with Gradio UI support - Record your own computer interaction trajectories - Organize and tag your demonstrations - Upload your datasets to Hugging Face for community sharing - Contribute to improving computer-use AI for everyone **Prerequisites:** - macOS Sonoma (14.0) or later - Python 3.10+ - Basic familiarity with Python and terminal commands - A Hugging Face account (for uploading datasets) **Estimated Time:** 20-30 minutes ## Understanding Human Trajectories ### What are Human Trajectories? Human trajectories, in the context of Computer-use AI Agents, are recordings of how humans interact with computer interfaces to complete tasks. These interactions include: - Mouse movements, clicks, and scrolls - Keyboard input - Changes in the UI state - Time spent on different elements These trajectories serve as examples for AI models to learn from, helping them understand the relationship between: 1. The visual state of the screen 2. The user's goal or task 3. The most appropriate action to take ### Why Human Demonstrations Matter Unlike synthetic data or rule-based automation, human demonstrations capture the nuanced decision-making that happens during computer interaction: - **Natural Pacing**: Humans pause to think, accelerate through familiar patterns, and adjust to unexpected UI changes - **Error Recovery**: Humans demonstrate how to recover from mistakes or handle unexpected states - **Context-Sensitive Actions**: The same UI element might be used differently depending on the task context By contributing high-quality demonstrations, you're helping to create more capable, human-like computer-use AI systems. ## Setting Up Your Environment ### Installing the CUI with Gradio Support The Computer-Use Interface includes an optional Gradio UI specifically designed to make recording and sharing demonstrations easy. Let's set it up: 1. **Create a Python environment** (optional but recommended): ```bash # Using conda conda create -n cua-trajectories python=3.10 conda activate cua-trajectories # Using venv python -m venv cua-trajectories source cua-trajectories/bin/activate # On macOS/Linux ``` 2. **Install the CUI package with UI support**: ```bash pip install "cua-computer[ui]" ``` 3. **Set up your Hugging Face access token**: Create a `.env` file in your project directory and add your Hugging Face token: ```bash echo "HF_TOKEN=your_huggingface_token" > .env ``` You can get your token from your [Hugging Face account settings](https://huggingface.co/settings/tokens). ### Understanding the Gradio UI The Computer-Use Interface Gradio UI provides three main components: 1. **Recording Panel**: Captures your screen, mouse, and keyboard activity during demonstrations 2. **Review Panel**: Allows you to review, tag, and organize your demonstration recordings 3. **Upload Panel**: Lets you share your demonstrations with the community via Hugging Face The UI is designed to make the entire process seamless, from recording to sharing, without requiring deep technical knowledge of the underlying systems. ## Creating Your First Trajectory Dataset ### Launching the UI To get started, create a simple Python script to launch the Gradio UI: ```python # launch_trajectory_ui.py from computer.ui.gradio.app import create_gradio_ui from dotenv import load_dotenv # Load your Hugging Face token from .env load_dotenv('.env') # Create and launch the UI app = create_gradio_ui() app.launch(share=False) ``` Run this script to start the UI: ```bash python launch_trajectory_ui.py ``` ### Recording a Demonstration Let's walk through the process of recording your first demonstration: 1. **Start the VM**: Click the "Initialize Computer" button in the UI to initialize a fresh macOS sandbox. This ensures your demonstrations are clean and reproducible. 2. **Perform a Task**: Complete a simple task like creating a document, organizing files, or searching for information. Natural, everyday tasks make the best demonstrations. 3. **Review Recording**: Click the "Conversation Logs" or "Function Logs" tabs to review your captured interactions, making sure there is no personal information that you wouldn't want to share. 4. **Add Metadata**: In the "Save/Share Demonstrations" tab, give your recording a descriptive name (e.g., "Creating a Calendar Event") and add relevant tags (e.g., "productivity", "time-management"). 5. **Save Your Demonstration**: Click "Save" to store your recording locally. <video src="https://github.com/user-attachments/assets/de3c3477-62fe-413c-998d-4063e48de176" controls width="600"></video> ### Key Tips for Quality Demonstrations To create the most valuable demonstrations: - **Start and end at logical points**: Begin with a clear starting state and end when the task is visibly complete - **Narrate your thought process**: Use the message input to describe what you're trying to do and why - **Move at a natural pace**: Don't rush or perform actions artificially slowly - **Include error recovery**: If you make a mistake, keep going and show how to correct it - **Demonstrate variations**: Record multiple ways to complete the same task ## Organizing and Tagging Demonstrations Effective tagging and organization make your demonstrations more valuable to researchers and model developers. Consider these tagging strategies: ### Task-Based Tags Describe what the demonstration accomplishes: - `web-browsing` - `document-editing` - `file-management` - `email` - `scheduling` ### Application Tags Identify the applications used: - `finder` - `safari` - `notes` - `terminal` - `calendar` ### Complexity Tags Indicate the difficulty level: - `beginner` - `intermediate` - `advanced` - `multi-application` ### UI Element Tags Highlight specific UI interactions: - `drag-and-drop` - `menu-navigation` - `form-filling` - `search` The Computer-Use Interface UI allows you to apply and manage these tags across all your saved demonstrations, making it easy to create cohesive, well-organized datasets. <video src="https://github.com/user-attachments/assets/5ad1df37-026a-457f-8b49-922ae805faef" controls width="600"></video> ## Uploading to Hugging Face Sharing your demonstrations helps advance research in computer-use AI. The Gradio UI makes uploading to Hugging Face simple: ### Preparing for Upload 1. **Review Your Demonstrations**: Use the review panel to ensure all demonstrations are complete and correctly tagged. 2. **Select Demonstrations to Upload**: You can upload all demonstrations or filter by specific tags. 3. **Configure Dataset Information**: - **Repository Name**: Format as `{your_username}/{dataset_name}`, e.g., `johndoe/productivity-tasks` - **Visibility**: Choose `public` to contribute to the community or `private` for personal use - **License**: Standard licenses like CC-BY or MIT are recommended for public datasets ### The Upload Process 1. **Click "Upload to Hugging Face"**: This initiates the upload preparation. 2. **Review Dataset Summary**: Confirm the number of demonstrations and total size. 3. **Confirm Upload**: The UI will show progress as files are transferred. 4. **Receive Confirmation**: Once complete, you'll see a link to your new dataset on Hugging Face. <video src="https://github.com/user-attachments/assets/c586d460-3877-4b5f-a736-3248886d2134" controls width="600"></video> Your uploaded dataset will have a standardized format with the following structure: ```json { "timestamp": "2025-05-01T09:20:40.594878", "session_id": "1fe9f0fe-9331-4078-aacd-ec7ffb483b86", "name": "penguin lemon forest", "tool_calls": [...], // Detailed interaction records "messages": [...], // User/assistant messages "tags": ["highquality", "tasks"], "images": [...] // Screenshots of each state } ``` This structured format makes it easy for researchers to analyze patterns across different demonstrations and build better computer-use models. ```python from computer import Computer computer = Computer(os_type="macos", display="1024x768", memory="8GB", cpu="4") try: await computer.run() screenshot = await computer.interface.screenshot() with open("screenshot.png", "wb") as f: f.write(screenshot) await computer.interface.move_cursor(100, 100) await computer.interface.left_click() await computer.interface.right_click(300, 300) await computer.interface.double_click(400, 400) await computer.interface.type("Hello, World!") await computer.interface.press_key("enter") await computer.interface.set_clipboard("Test clipboard") content = await computer.interface.copy_to_clipboard() print(f"Clipboard content: {content}") finally: await computer.stop() ``` ## Example: Shopping List Demonstration Let's walk through a concrete example of creating a valuable demonstration: ### Task: Adding Shopping List Items to a Doordash Cart 1. **Start Recording**: Begin with a clean desktop and a text file containing a shopping list. 2. **Task Execution**: Open the file, read the list, open Safari, navigate to Doordash, and add each item to the cart. 3. **Narration**: Add messages like "Reading the shopping list" and "Searching for rice on Doordash" to provide context. 4. **Completion**: Verify all items are in the cart and end the recording. 5. **Tagging**: Add tags like `shopping`, `web-browsing`, `task-completion`, and `multi-step`. This type of demonstration is particularly valuable because it showcases real-world task completion requiring multiple applications and context switching. ### Exploring Community Datasets You can also learn from existing trajectory datasets contributed by the community: 1. Visit [Hugging Face Datasets tagged with 'cua'](https://huggingface.co/datasets?other=cua) 2. Explore different approaches to similar tasks 3. Download and analyze high-quality demonstrations ## Conclusion ### Summary In this guide, we've covered how to: - Set up the Computer-Use Interface with Gradio UI - Record high-quality human demonstrations - Organize and tag your trajectories - Share your datasets with the community By contributing your own demonstrations, you're helping to build more capable, human-like AI systems that can understand and execute complex computer tasks. ### Next Steps Now that you know how to create and share trajectories, consider these advanced techniques: - Create themed collections around specific productivity workflows - Collaborate with others to build comprehensive datasets - Use your datasets to fine-tune your own computer-use models ### Resources - [Computer-Use Interface GitHub](https://github.com/trycua/cua/tree/main/libs/computer) - [Hugging Face Datasets Documentation](https://huggingface.co/docs/datasets) - [Example Dataset: ddupont/test-dataset](https://huggingface.co/datasets/ddupont/test-dataset) ``` -------------------------------------------------------------------------------- /libs/python/pylume/pylume/pylume.py: -------------------------------------------------------------------------------- ```python import os import sys import json import time import asyncio import subprocess from typing import Optional, List, Union, Callable, TypeVar, Any from functools import wraps import re import signal from .server import LumeServer from .client import LumeClient from .models import ( VMConfig, VMStatus, VMRunOpts, VMUpdateOpts, ImageRef, CloneSpec, SharedDirectory, ImageList, ) from .exceptions import ( LumeError, LumeServerError, LumeConnectionError, LumeTimeoutError, LumeNotFoundError, LumeConfigError, LumeVMError, LumeImageError, ) # Type variable for the decorator T = TypeVar("T") def ensure_server(func: Callable[..., T]) -> Callable[..., T]: """Decorator to ensure server is running before executing the method.""" @wraps(func) async def wrapper(self: "PyLume", *args: Any, **kwargs: Any) -> T: # ensure_running is an async method, so we need to await it await self.server.ensure_running() # Initialize client if needed await self._init_client() return await func(self, *args, **kwargs) # type: ignore return wrapper # type: ignore class PyLume: def __init__( self, debug: bool = False, server_start_timeout: int = 60, port: Optional[int] = None, use_existing_server: bool = False, host: str = "localhost", ): """Initialize the async PyLume client. Args: debug: Enable debug logging auto_start_server: Whether to automatically start the lume server if not running server_start_timeout: Timeout in seconds to wait for server to start port: Port number for the lume server. Required when use_existing_server is True. use_existing_server: If True, will try to connect to an existing server on the specified port instead of starting a new one. host: Host to use for connections (e.g., "localhost", "127.0.0.1", "host.docker.internal") """ if use_existing_server and port is None: raise LumeConfigError("Port must be specified when using an existing server") self.server = LumeServer( debug=debug, server_start_timeout=server_start_timeout, port=port, use_existing_server=use_existing_server, host=host, ) self.client = None async def __aenter__(self) -> "PyLume": """Async context manager entry.""" if self.server.use_existing_server: # Just ensure base_url is set for existing server if self.server.requested_port is None: raise LumeConfigError("Port must be specified when using an existing server") if not self.server.base_url: self.server.port = self.server.requested_port self.server.base_url = f"http://{self.server.host}:{self.server.port}/lume" # Ensure the server is running (will connect to existing or start new as needed) await self.server.ensure_running() # Initialize the client await self._init_client() return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """Async context manager exit.""" if self.client is not None: await self.client.close() await self.server.stop() async def _init_client(self) -> None: """Initialize the client if not already initialized.""" if self.client is None: if self.server.base_url is None: raise RuntimeError("Server base URL not set") self.client = LumeClient(self.server.base_url, debug=self.server.debug) def _log_debug(self, message: str, **kwargs) -> None: """Log debug information if debug mode is enabled.""" if self.server.debug: print(f"DEBUG: {message}") if kwargs: print(json.dumps(kwargs, indent=2)) async def _handle_api_error(self, e: Exception, operation: str) -> None: """Handle API errors and raise appropriate custom exceptions.""" if isinstance(e, subprocess.SubprocessError): raise LumeConnectionError(f"Failed to connect to PyLume server: {str(e)}") elif isinstance(e, asyncio.TimeoutError): raise LumeTimeoutError(f"Request timed out: {str(e)}") if not hasattr(e, "status") and not isinstance(e, subprocess.CalledProcessError): raise LumeServerError(f"Unknown error during {operation}: {str(e)}") status_code = getattr(e, "status", 500) response_text = str(e) self._log_debug( f"{operation} request failed", status_code=status_code, response_text=response_text ) if status_code == 404: raise LumeNotFoundError(f"Resource not found during {operation}") elif status_code == 400: raise LumeConfigError(f"Invalid configuration for {operation}: {response_text}") elif status_code >= 500: raise LumeServerError( f"Server error during {operation}", status_code=status_code, response_text=response_text, ) else: raise LumeServerError( f"Error during {operation}", status_code=status_code, response_text=response_text ) async def _read_output(self) -> None: """Read and log server output.""" try: while True: if not self.server.server_process or self.server.server_process.poll() is not None: self._log_debug("Server process ended") break # Read stdout without blocking if self.server.server_process.stdout: while True: line = self.server.server_process.stdout.readline() if not line: break line = line.strip() self._log_debug(f"Server stdout: {line}") if "Server started" in line.decode("utf-8"): self._log_debug("Detected server started message") return # Read stderr without blocking if self.server.server_process.stderr: while True: line = self.server.server_process.stderr.readline() if not line: break line = line.strip() self._log_debug(f"Server stderr: {line}") if "error" in line.decode("utf-8").lower(): raise RuntimeError(f"Server error: {line}") await asyncio.sleep(0.1) # Small delay to prevent CPU spinning except Exception as e: self._log_debug(f"Error in output reader: {str(e)}") raise @ensure_server async def create_vm(self, spec: Union[VMConfig, dict]) -> None: """Create a VM with the given configuration.""" # Ensure client is initialized await self._init_client() if isinstance(spec, VMConfig): spec = spec.model_dump(by_alias=True, exclude_none=True) # Suppress optional attribute access errors self.client.print_curl("POST", "/vms", spec) # type: ignore[attr-defined] await self.client.post("/vms", spec) # type: ignore[attr-defined] @ensure_server async def run_vm(self, name: str, opts: Optional[Union[VMRunOpts, dict]] = None) -> None: """Run a VM.""" if opts is None: opts = VMRunOpts(no_display=False) # type: ignore[attr-defined] elif isinstance(opts, dict): opts = VMRunOpts(**opts) payload = opts.model_dump(by_alias=True, exclude_none=True) self.client.print_curl("POST", f"/vms/{name}/run", payload) # type: ignore[attr-defined] await self.client.post(f"/vms/{name}/run", payload) # type: ignore[attr-defined] @ensure_server async def list_vms(self) -> List[VMStatus]: """List all VMs.""" data = await self.client.get("/vms") # type: ignore[attr-defined] return [VMStatus.model_validate(vm) for vm in data] @ensure_server async def get_vm(self, name: str) -> VMStatus: """Get VM details.""" data = await self.client.get(f"/vms/{name}") # type: ignore[attr-defined] return VMStatus.model_validate(data) @ensure_server async def update_vm(self, name: str, params: Union[VMUpdateOpts, dict]) -> None: """Update VM settings.""" if isinstance(params, dict): params = VMUpdateOpts(**params) payload = params.model_dump(by_alias=True, exclude_none=True) self.client.print_curl("PATCH", f"/vms/{name}", payload) # type: ignore[attr-defined] await self.client.patch(f"/vms/{name}", payload) # type: ignore[attr-defined] @ensure_server async def stop_vm(self, name: str) -> None: """Stop a VM.""" await self.client.post(f"/vms/{name}/stop") # type: ignore[attr-defined] @ensure_server async def delete_vm(self, name: str) -> None: """Delete a VM.""" await self.client.delete(f"/vms/{name}") # type: ignore[attr-defined] @ensure_server async def pull_image( self, spec: Union[ImageRef, dict, str], name: Optional[str] = None ) -> None: """Pull a VM image.""" await self._init_client() if isinstance(spec, str): if ":" in spec: image_str = spec else: image_str = f"{spec}:latest" registry = "ghcr.io" organization = "trycua" elif isinstance(spec, dict): image = spec.get("image", "") tag = spec.get("tag", "latest") image_str = f"{image}:{tag}" registry = spec.get("registry", "ghcr.io") organization = spec.get("organization", "trycua") else: image_str = f"{spec.image}:{spec.tag}" registry = spec.registry organization = spec.organization payload = { "image": image_str, "name": name, "registry": registry, "organization": organization, } self.client.print_curl("POST", "/pull", payload) # type: ignore[attr-defined] await self.client.post("/pull", payload, timeout=300.0) # type: ignore[attr-defined] @ensure_server async def clone_vm(self, name: str, new_name: str) -> None: """Clone a VM with the given name to a new VM with new_name.""" config = CloneSpec(name=name, newName=new_name) self.client.print_curl("POST", "/vms/clone", config.model_dump()) # type: ignore[attr-defined] await self.client.post("/vms/clone", config.model_dump()) # type: ignore[attr-defined] @ensure_server async def get_latest_ipsw_url(self) -> str: """Get the latest IPSW URL.""" await self._init_client() data = await self.client.get("/ipsw") # type: ignore[attr-defined] return data["url"] @ensure_server async def get_images(self, organization: Optional[str] = None) -> ImageList: """Get list of available images.""" await self._init_client() params = {"organization": organization} if organization else None data = await self.client.get("/images", params) # type: ignore[attr-defined] return ImageList(root=data) async def close(self) -> None: """Close the client and stop the server.""" if self.client is not None: await self.client.close() self.client = None await asyncio.sleep(1) await self.server.stop() async def _ensure_client(self) -> None: """Ensure client is initialized.""" if self.client is None: await self._init_client() ``` -------------------------------------------------------------------------------- /libs/python/mcp-server/mcp_server/session_manager.py: -------------------------------------------------------------------------------- ```python """ Session Manager for MCP Server - Handles concurrent client sessions with proper resource isolation. This module provides: - Per-session computer instance management - Resource pooling and lifecycle management - Graceful session cleanup - Concurrent task execution support """ import asyncio import logging import time import uuid from typing import Dict, Optional, Any, List, Set from dataclasses import dataclass, field from contextlib import asynccontextmanager import weakref logger = logging.getLogger("mcp-server.session_manager") @dataclass class SessionInfo: """Information about an active session.""" session_id: str computer: Any # Computer instance created_at: float last_activity: float active_tasks: Set[str] = field(default_factory=set) is_shutting_down: bool = False class ComputerPool: """Pool of computer instances for efficient resource management.""" def __init__(self, max_size: int = 5, idle_timeout: float = 300.0): self.max_size = max_size self.idle_timeout = idle_timeout self._available: List[Any] = [] self._in_use: Set[Any] = set() self._creation_lock = asyncio.Lock() async def acquire(self) -> Any: """Acquire a computer instance from the pool.""" # Try to get an available instance if self._available: computer = self._available.pop() self._in_use.add(computer) logger.debug(f"Reusing computer instance from pool") return computer # Check if we can create a new one async with self._creation_lock: if len(self._in_use) < self.max_size: logger.debug("Creating new computer instance") from computer import Computer computer = Computer(verbosity=logging.INFO) await computer.run() self._in_use.add(computer) return computer # Wait for an instance to become available logger.debug("Waiting for computer instance to become available") while not self._available: await asyncio.sleep(0.1) computer = self._available.pop() self._in_use.add(computer) return computer async def release(self, computer: Any) -> None: """Release a computer instance back to the pool.""" if computer in self._in_use: self._in_use.remove(computer) self._available.append(computer) logger.debug("Released computer instance back to pool") async def cleanup_idle(self) -> None: """Clean up idle computer instances.""" current_time = time.time() idle_instances = [] for computer in self._available[:]: # Check if computer has been idle too long # Note: We'd need to track last use time per instance for this # For now, we'll keep instances in the pool pass async def shutdown(self) -> None: """Shutdown all computer instances in the pool.""" logger.info("Shutting down computer pool") # Close all available instances for computer in self._available: try: if hasattr(computer, 'close'): await computer.close() elif hasattr(computer, 'stop'): await computer.stop() except Exception as e: logger.warning(f"Error closing computer instance: {e}") # Close all in-use instances for computer in self._in_use: try: if hasattr(computer, 'close'): await computer.close() elif hasattr(computer, 'stop'): await computer.stop() except Exception as e: logger.warning(f"Error closing computer instance: {e}") self._available.clear() self._in_use.clear() class SessionManager: """Manages concurrent client sessions with proper resource isolation.""" def __init__(self, max_concurrent_sessions: int = 10): self.max_concurrent_sessions = max_concurrent_sessions self._sessions: Dict[str, SessionInfo] = {} self._computer_pool = ComputerPool() self._session_lock = asyncio.Lock() self._cleanup_task: Optional[asyncio.Task] = None self._shutdown_event = asyncio.Event() async def start(self) -> None: """Start the session manager and cleanup task.""" logger.info("Starting session manager") self._cleanup_task = asyncio.create_task(self._cleanup_loop()) async def stop(self) -> None: """Stop the session manager and cleanup all resources.""" logger.info("Stopping session manager") self._shutdown_event.set() if self._cleanup_task: self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass # Force cleanup all sessions async with self._session_lock: session_ids = list(self._sessions.keys()) for session_id in session_ids: await self._force_cleanup_session(session_id) await self._computer_pool.shutdown() @asynccontextmanager async def get_session(self, session_id: Optional[str] = None) -> Any: """Get or create a session with proper resource management.""" if session_id is None: session_id = str(uuid.uuid4()) # Check if session exists and is not shutting down async with self._session_lock: if session_id in self._sessions: session = self._sessions[session_id] if session.is_shutting_down: raise RuntimeError(f"Session {session_id} is shutting down") session.last_activity = time.time() computer = session.computer else: # Create new session if len(self._sessions) >= self.max_concurrent_sessions: raise RuntimeError(f"Maximum concurrent sessions ({self.max_concurrent_sessions}) reached") computer = await self._computer_pool.acquire() session = SessionInfo( session_id=session_id, computer=computer, created_at=time.time(), last_activity=time.time() ) self._sessions[session_id] = session logger.info(f"Created new session: {session_id}") try: yield session finally: # Update last activity async with self._session_lock: if session_id in self._sessions: self._sessions[session_id].last_activity = time.time() async def register_task(self, session_id: str, task_id: str) -> None: """Register a task for a session.""" async with self._session_lock: if session_id in self._sessions: self._sessions[session_id].active_tasks.add(task_id) logger.debug(f"Registered task {task_id} for session {session_id}") async def unregister_task(self, session_id: str, task_id: str) -> None: """Unregister a task from a session.""" async with self._session_lock: if session_id in self._sessions: self._sessions[session_id].active_tasks.discard(task_id) logger.debug(f"Unregistered task {task_id} from session {session_id}") async def cleanup_session(self, session_id: str) -> None: """Cleanup a specific session.""" async with self._session_lock: if session_id not in self._sessions: return session = self._sessions[session_id] # Check if session has active tasks if session.active_tasks: logger.info(f"Session {session_id} has active tasks, marking for shutdown") session.is_shutting_down = True return # Actually cleanup the session await self._force_cleanup_session(session_id) async def _force_cleanup_session(self, session_id: str) -> None: """Force cleanup a session regardless of active tasks.""" async with self._session_lock: if session_id not in self._sessions: return session = self._sessions[session_id] logger.info(f"Cleaning up session: {session_id}") # Release computer back to pool await self._computer_pool.release(session.computer) # Remove session del self._sessions[session_id] async def _cleanup_loop(self) -> None: """Background task to cleanup idle sessions.""" while not self._shutdown_event.is_set(): try: await asyncio.sleep(60) # Run cleanup every minute current_time = time.time() idle_timeout = 600.0 # 10 minutes async with self._session_lock: idle_sessions = [] for session_id, session in self._sessions.items(): if not session.is_shutting_down and not session.active_tasks: if current_time - session.last_activity > idle_timeout: idle_sessions.append(session_id) # Cleanup idle sessions for session_id in idle_sessions: await self._force_cleanup_session(session_id) logger.info(f"Cleaned up idle session: {session_id}") except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in cleanup loop: {e}") def get_session_stats(self) -> Dict[str, Any]: """Get statistics about active sessions.""" async def _get_stats(): async with self._session_lock: return { "total_sessions": len(self._sessions), "max_concurrent": self.max_concurrent_sessions, "sessions": { session_id: { "created_at": session.created_at, "last_activity": session.last_activity, "active_tasks": len(session.active_tasks), "is_shutting_down": session.is_shutting_down } for session_id, session in self._sessions.items() } } # Run in current event loop or create new one try: loop = asyncio.get_running_loop() return asyncio.run_coroutine_threadsafe(_get_stats(), loop).result() except RuntimeError: # No event loop running, create a new one return asyncio.run(_get_stats()) # Global session manager instance _session_manager: Optional[SessionManager] = None def get_session_manager() -> SessionManager: """Get the global session manager instance.""" global _session_manager if _session_manager is None: _session_manager = SessionManager() return _session_manager async def initialize_session_manager() -> None: """Initialize the global session manager.""" global _session_manager if _session_manager is None: _session_manager = SessionManager() await _session_manager.start() return _session_manager async def shutdown_session_manager() -> None: """Shutdown the global session manager.""" global _session_manager if _session_manager is not None: await _session_manager.stop() _session_manager = None ``` -------------------------------------------------------------------------------- /.github/workflows/pypi-reusable-publish.yml: -------------------------------------------------------------------------------- ```yaml name: Reusable Package Publish Workflow on: workflow_call: inputs: package_name: description: "Name of the package (e.g. pylume, computer, agent)" required: true type: string package_dir: description: "Directory containing the package relative to workspace root (e.g. libs/python/pylume)" required: true type: string version: description: "Version to publish" required: true type: string is_lume_package: description: "Whether this package includes the lume binary" required: false type: boolean default: false base_package_name: description: "PyPI package name (e.g. pylume, cua-agent)" required: true type: string make_latest: description: "Whether to mark this release as latest (should only be true for lume)" required: false type: boolean default: false secrets: PYPI_TOKEN: required: true outputs: version: description: "The version that was published" value: ${{ jobs.build-and-publish.outputs.version }} jobs: build-and-publish: runs-on: macos-latest permissions: contents: write # This permission is needed for creating releases outputs: version: ${{ steps.set-version.outputs.version }} steps: - uses: actions/checkout@v4 with: fetch-depth: 0 # Full history for release creation - name: Set up Python uses: actions/setup-python@v4 with: python-version: "3.11" - name: Create root pdm.lock file run: | # Create an empty pdm.lock file in the root touch pdm.lock - name: Install PDM uses: pdm-project/setup-pdm@v3 with: python-version: "3.11" cache: true - name: Set version id: set-version run: | echo "VERSION=${{ inputs.version }}" >> $GITHUB_ENV echo "version=${{ inputs.version }}" >> $GITHUB_OUTPUT - name: Verify version consistency run: | # Install toml parser pip install toml # Verify version matches using script (exits with error if mismatch) python ${GITHUB_WORKSPACE}/.github/scripts/get_pyproject_version.py \ ${{ inputs.package_dir }}/pyproject.toml \ ${{ inputs.version }} - name: Initialize PDM in package directory run: | # Make sure we're working with a properly initialized PDM project cd ${{ inputs.package_dir }} # Create pdm.lock if it doesn't exist if [ ! -f "pdm.lock" ]; then echo "No pdm.lock found, initializing PDM project..." pdm lock fi # Conditional step for lume binary download (only for pylume package) - name: Download and setup lume binary if: inputs.is_lume_package run: | # Create a temporary directory for extraction mkdir -p temp_lume # Download the latest lume release directly echo "Downloading latest lume version..." curl -sL "https://github.com/trycua/lume/releases/latest/download/lume.tar.gz" -o temp_lume/lume.tar.gz # Extract the tar file (ignore ownership and suppress warnings) cd temp_lume && tar --no-same-owner -xzf lume.tar.gz # Make the binary executable chmod +x lume # Copy the lume binary to the correct location in the pylume package mkdir -p "${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pylume" cp lume "${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pylume/lume" # Verify the binary exists and is executable test -x "${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pylume/lume" || { echo "lume binary not found or not executable"; exit 1; } # Get the version from the downloaded binary for reference LUME_VERSION=$(./lume --version | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' || echo "unknown") echo "Using lume version: $LUME_VERSION" # Cleanup cd "${GITHUB_WORKSPACE}" && rm -rf temp_lume # Save the lume version for reference echo "LUME_VERSION=${LUME_VERSION}" >> $GITHUB_ENV - name: Build and publish env: PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }} run: | cd ${{ inputs.package_dir }} # Build with PDM pdm build # For pylume package, verify the binary is in the wheel if [ "${{ inputs.is_lume_package }}" = "true" ]; then python -m pip install wheel wheel unpack dist/*.whl --dest temp_wheel echo "Listing contents of wheel directory:" find temp_wheel -type f test -f temp_wheel/pylume-*/pylume/lume || { echo "lume binary not found in wheel"; exit 1; } rm -rf temp_wheel echo "Publishing ${{ inputs.base_package_name }} ${VERSION} with lume ${LUME_VERSION}" else echo "Publishing ${{ inputs.base_package_name }} ${VERSION}" fi # Install and use twine directly instead of PDM publish echo "Installing twine for direct publishing..." pip install twine echo "Publishing to PyPI using twine..." TWINE_USERNAME="__token__" TWINE_PASSWORD="$PYPI_TOKEN" python -m twine upload dist/* # Save the wheel file path for the release WHEEL_FILE=$(ls dist/*.whl | head -1) echo "WHEEL_FILE=${WHEEL_FILE}" >> $GITHUB_ENV - name: Prepare Simple Release Notes if: startsWith(github.ref, 'refs/tags/') run: | # Create release notes based on package type echo "# ${{ inputs.base_package_name }} v${VERSION}" > release_notes.md echo "" >> release_notes.md if [ "${{ inputs.package_name }}" = "pylume" ]; then echo "## Python SDK for lume - run macOS and Linux VMs on Apple Silicon" >> release_notes.md echo "" >> release_notes.md echo "This package provides Python bindings for the lume virtualization tool." >> release_notes.md echo "" >> release_notes.md echo "## Dependencies" >> release_notes.md echo "* lume binary: v${LUME_VERSION}" >> release_notes.md elif [ "${{ inputs.package_name }}" = "computer" ]; then echo "## Computer control library for the Computer Universal Automation (CUA) project" >> release_notes.md echo "" >> release_notes.md echo "## Dependencies" >> release_notes.md echo "* pylume: ${PYLUME_VERSION:-latest}" >> release_notes.md elif [ "${{ inputs.package_name }}" = "agent" ]; then echo "## Dependencies" >> release_notes.md echo "* cua-computer: ${COMPUTER_VERSION:-latest}" >> release_notes.md echo "* cua-som: ${SOM_VERSION:-latest}" >> release_notes.md echo "" >> release_notes.md echo "## Installation Options" >> release_notes.md echo "" >> release_notes.md echo "### Basic installation with Anthropic" >> release_notes.md echo '```bash' >> release_notes.md echo "pip install cua-agent[anthropic]==${VERSION}" >> release_notes.md echo '```' >> release_notes.md echo "" >> release_notes.md echo "### With SOM (recommended)" >> release_notes.md echo '```bash' >> release_notes.md echo "pip install cua-agent[som]==${VERSION}" >> release_notes.md echo '```' >> release_notes.md echo "" >> release_notes.md echo "### All features" >> release_notes.md echo '```bash' >> release_notes.md echo "pip install cua-agent[all]==${VERSION}" >> release_notes.md echo '```' >> release_notes.md elif [ "${{ inputs.package_name }}" = "som" ]; then echo "## Computer Vision and OCR library for detecting and analyzing UI elements" >> release_notes.md echo "" >> release_notes.md echo "This package provides enhanced UI understanding capabilities through computer vision and OCR." >> release_notes.md elif [ "${{ inputs.package_name }}" = "computer-server" ]; then echo "## Computer Server for the Computer Universal Automation (CUA) project" >> release_notes.md echo "" >> release_notes.md echo "A FastAPI-based server implementation for computer control." >> release_notes.md echo "" >> release_notes.md echo "## Dependencies" >> release_notes.md echo "* cua-computer: ${COMPUTER_VERSION:-latest}" >> release_notes.md echo "" >> release_notes.md echo "## Usage" >> release_notes.md echo '```bash' >> release_notes.md echo "# Run the server" >> release_notes.md echo "cua-computer-server" >> release_notes.md echo '```' >> release_notes.md elif [ "${{ inputs.package_name }}" = "mcp-server" ]; then echo "## MCP Server for the Computer-Use Agent (CUA)" >> release_notes.md echo "" >> release_notes.md echo "This package provides MCP (Model Context Protocol) integration for CUA agents, allowing them to be used with Claude Desktop, Cursor, and other MCP clients." >> release_notes.md echo "" >> release_notes.md echo "## Dependencies" >> release_notes.md echo "* cua-computer: ${COMPUTER_VERSION:-latest}" >> release_notes.md echo "* cua-agent: ${AGENT_VERSION:-latest}" >> release_notes.md echo "" >> release_notes.md echo "## Usage" >> release_notes.md echo '```bash' >> release_notes.md echo "# Run the MCP server directly" >> release_notes.md echo "cua-mcp-server" >> release_notes.md echo '```' >> release_notes.md echo "" >> release_notes.md echo "## Claude Desktop Integration" >> release_notes.md echo "Add to your Claude Desktop configuration (~/.config/claude-desktop/claude_desktop_config.json or OS-specific location):" >> release_notes.md echo '```json' >> release_notes.md echo '"mcpServers": {' >> release_notes.md echo ' "cua-agent": {' >> release_notes.md echo ' "command": "cua-mcp-server",' >> release_notes.md echo ' "args": [],' >> release_notes.md echo ' "env": {' >> release_notes.md echo ' "CUA_AGENT_LOOP": "OMNI",' >> release_notes.md echo ' "CUA_MODEL_PROVIDER": "ANTHROPIC",' >> release_notes.md echo ' "CUA_MODEL_NAME": "claude-3-opus-20240229",' >> release_notes.md echo ' "ANTHROPIC_API_KEY": "your-api-key",' >> release_notes.md echo ' "PYTHONIOENCODING": "utf-8"' >> release_notes.md echo ' }' >> release_notes.md echo ' }' >> release_notes.md echo '}' >> release_notes.md echo '```' >> release_notes.md fi # Add installation section if not agent (which has its own installation section) if [ "${{ inputs.package_name }}" != "agent" ]; then echo "" >> release_notes.md echo "## Installation" >> release_notes.md echo '```bash' >> release_notes.md echo "pip install ${{ inputs.base_package_name }}==${VERSION}" >> release_notes.md echo '```' >> release_notes.md fi echo "Release notes created:" cat release_notes.md - name: Create GitHub Release uses: softprops/action-gh-release@v2 if: startsWith(github.ref, 'refs/tags/') with: name: "${{ inputs.base_package_name }} v${{ env.VERSION }}" body_path: release_notes.md files: ${{ inputs.package_dir }}/${{ env.WHEEL_FILE }} draft: false prerelease: false make_latest: ${{ inputs.package_name == 'lume' }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/loops/composed_grounded.py: -------------------------------------------------------------------------------- ```python """ Composed-grounded agent loop implementation that combines grounding and thinking models. Uses a two-stage approach: grounding model for element detection, thinking model for reasoning. """ import uuid import asyncio import json import base64 from typing import Dict, List, Any, Optional, Tuple from io import BytesIO from PIL import Image import litellm from ..decorators import register_agent from ..types import Messages, AgentResponse, Tools, AgentCapability from ..loops.base import AsyncAgentConfig from ..responses import ( convert_computer_calls_xy2desc, convert_responses_items_to_completion_messages, convert_completion_messages_to_responses_items, convert_computer_calls_desc2xy, get_all_element_descriptions ) from ..agent import find_agent_config GROUNDED_COMPUTER_TOOL_SCHEMA = { "type": "function", "function": { "name": "computer", "description": "Control a computer by taking screenshots and interacting with UI elements. This tool uses element descriptions to locate and interact with UI elements on the screen (e.g., 'red submit button', 'search text field', 'hamburger menu icon', 'close button in top right corner').", "parameters": { "type": "object", "properties": { "action": { "type": "string", "enum": [ "screenshot", "click", "double_click", "drag", "type", "keypress", "scroll", "move", "wait", "get_current_url", "get_dimensions", "get_environment" ], "description": "The action to perform (required for all actions)" }, "element_description": { "type": "string", "description": "Description of the element to interact with (required for click, double_click, move, scroll actions)" }, "start_element_description": { "type": "string", "description": "Description of the element to start dragging from (required for drag action)" }, "end_element_description": { "type": "string", "description": "Description of the element to drag to (required for drag action)" }, "text": { "type": "string", "description": "The text to type (required for type action)" }, "keys": { "type": "array", "items": { "type": "string" }, "description": "Key(s) to press (required for keypress action)" }, "button": { "type": "string", "enum": [ "left", "right", "wheel", "back", "forward" ], "description": "The mouse button to use for click action (required for click and double_click action)", }, "scroll_x": { "type": "integer", "description": "Horizontal scroll amount for scroll action (required for scroll action)", }, "scroll_y": { "type": "integer", "description": "Vertical scroll amount for scroll action (required for scroll action)", }, }, "required": [ "action" ] } } } def _prepare_tools_for_grounded(tool_schemas: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Prepare tools for grounded API format""" grounded_tools = [] for schema in tool_schemas: if schema["type"] == "computer": grounded_tools.append(GROUNDED_COMPUTER_TOOL_SCHEMA) else: grounded_tools.append(schema) return grounded_tools def get_last_computer_call_image(messages: List[Dict[str, Any]]) -> Optional[str]: """Get the last computer call output image from messages.""" for message in reversed(messages): if (isinstance(message, dict) and message.get("type") == "computer_call_output" and isinstance(message.get("output"), dict) and message["output"].get("type") == "input_image"): image_url = message["output"].get("image_url", "") if image_url.startswith("data:image/png;base64,"): return image_url.split(",", 1)[1] return None @register_agent(r".*\+.*", priority=1) class ComposedGroundedConfig(AsyncAgentConfig): """ Composed-grounded agent configuration that uses both grounding and thinking models. The model parameter should be in format: "grounding_model+thinking_model" e.g., "huggingface-local/HelloKKMe/GTA1-7B+gemini/gemini-1.5-pro" """ def __init__(self): self.desc2xy: Dict[str, Tuple[float, float]] = {} async def predict_step( self, messages: List[Dict[str, Any]], model: str, tools: Optional[List[Dict[str, Any]]] = None, max_retries: Optional[int] = None, stream: bool = False, computer_handler=None, use_prompt_caching: Optional[bool] = False, _on_api_start=None, _on_api_end=None, _on_usage=None, _on_screenshot=None, **kwargs ) -> Dict[str, Any]: """ Composed-grounded predict step implementation. Process: 0. Store last computer call image, if none then take a screenshot 1. Convert computer calls from xy to descriptions 2. Convert responses items to completion messages 3. Call thinking model with litellm.acompletion 4. Convert completion messages to responses items 5. Get all element descriptions and populate desc2xy mapping 6. Convert computer calls from descriptions back to xy coordinates 7. Return output and usage """ # Parse the composed model if "+" not in model: raise ValueError(f"Composed model must be in format 'grounding_model+thinking_model', got: {model}") grounding_model, thinking_model = model.split("+", 1) pre_output_items = [] # Step 0: Store last computer call image, if none then take a screenshot last_image_b64 = get_last_computer_call_image(messages) if last_image_b64 is None: # Take a screenshot screenshot_b64 = await computer_handler.screenshot() # type: ignore if screenshot_b64: call_id = uuid.uuid4().hex pre_output_items += [ { "type": "message", "role": "assistant", "content": [ { "type": "output_text", "text": "Taking a screenshot to see the current computer screen." } ] }, { "action": { "type": "screenshot" }, "call_id": call_id, "status": "completed", "type": "computer_call" }, { "type": "computer_call_output", "call_id": call_id, "output": { "type": "input_image", "image_url": f"data:image/png;base64,{screenshot_b64}" } }, ] last_image_b64 = screenshot_b64 # Call screenshot callback if provided if _on_screenshot: await _on_screenshot(screenshot_b64) tool_schemas = _prepare_tools_for_grounded(tools) # type: ignore # Step 1: Convert computer calls from xy to descriptions input_messages = messages + pre_output_items messages_with_descriptions = convert_computer_calls_xy2desc(input_messages, self.desc2xy) # Step 2: Convert responses items to completion messages completion_messages = convert_responses_items_to_completion_messages( messages_with_descriptions, allow_images_in_tool_results=False ) # Step 3: Call thinking model with litellm.acompletion api_kwargs = { "model": thinking_model, "messages": completion_messages, "tools": tool_schemas, "max_retries": max_retries, "stream": stream, **kwargs } if use_prompt_caching: api_kwargs["use_prompt_caching"] = use_prompt_caching # Call API start hook if _on_api_start: await _on_api_start(api_kwargs) # Make the completion call response = await litellm.acompletion(**api_kwargs) # Call API end hook if _on_api_end: await _on_api_end(api_kwargs, response) # Extract usage information usage = { **response.usage.model_dump(), # type: ignore "response_cost": response._hidden_params.get("response_cost", 0.0), } if _on_usage: await _on_usage(usage) # Step 4: Convert completion messages back to responses items format response_dict = response.model_dump() # type: ignore choice_messages = [choice["message"] for choice in response_dict["choices"]] thinking_output_items = [] for choice_message in choice_messages: thinking_output_items.extend(convert_completion_messages_to_responses_items([choice_message])) # Step 5: Get all element descriptions and populate desc2xy mapping element_descriptions = get_all_element_descriptions(thinking_output_items) if element_descriptions and last_image_b64: # Use grounding model to predict coordinates for each description grounding_agent_conf = find_agent_config(grounding_model) if grounding_agent_conf: grounding_agent = grounding_agent_conf.agent_class() for desc in element_descriptions: for _ in range(3): # try 3 times coords = await grounding_agent.predict_click( model=grounding_model, image_b64=last_image_b64, instruction=desc ) if coords: self.desc2xy[desc] = coords break # Step 6: Convert computer calls from descriptions back to xy coordinates final_output_items = convert_computer_calls_desc2xy(thinking_output_items, self.desc2xy) # Step 7: Return output and usage return { "output": pre_output_items + final_output_items, "usage": usage } async def predict_click( self, model: str, image_b64: str, instruction: str, **kwargs ) -> Optional[Tuple[int, int]]: """ Predict click coordinates using the grounding model. For composed models, uses only the grounding model part for click prediction. """ # Parse the composed model to get grounding model if "+" not in model: raise ValueError(f"Composed model must be in format 'grounding_model+thinking_model', got: {model}") grounding_model, thinking_model = model.split("+", 1) # Find and use the grounding agent grounding_agent_conf = find_agent_config(grounding_model) if grounding_agent_conf: grounding_agent = grounding_agent_conf.agent_class() return await grounding_agent.predict_click( model=grounding_model, image_b64=image_b64, instruction=instruction, **kwargs ) return None def get_capabilities(self) -> List[AgentCapability]: """Return the capabilities supported by this agent.""" return ["click", "step"] ``` -------------------------------------------------------------------------------- /.github/scripts/tests/test_get_pyproject_version.py: -------------------------------------------------------------------------------- ```python """ Comprehensive tests for get_pyproject_version.py script using unittest. This test suite covers: - Version matching validation - Error handling for missing versions - Invalid input handling - File not found scenarios - Malformed TOML handling """ import sys import unittest import tempfile from pathlib import Path from io import StringIO from unittest.mock import patch # Add parent directory to path to import the module sys.path.insert(0, str(Path(__file__).parent.parent)) # Import after path is modified import get_pyproject_version class TestGetPyprojectVersion(unittest.TestCase): """Test suite for get_pyproject_version.py functionality.""" def setUp(self): """Reset sys.argv before each test.""" self.original_argv = sys.argv.copy() def tearDown(self): """Restore sys.argv after each test.""" sys.argv = self.original_argv def create_pyproject_toml(self, version: str) -> Path: """Helper to create a temporary pyproject.toml file with a given version.""" temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) temp_file.write(f""" [project] name = "test-project" version = "{version}" description = "A test project" """) temp_file.close() return Path(temp_file.name) def create_pyproject_toml_no_version(self) -> Path: """Helper to create a pyproject.toml without a version field.""" temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) temp_file.write(""" [project] name = "test-project" description = "A test project without version" """) temp_file.close() return Path(temp_file.name) def create_pyproject_toml_no_project(self) -> Path: """Helper to create a pyproject.toml without a project section.""" temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) temp_file.write(""" [tool.poetry] name = "test-project" version = "1.0.0" """) temp_file.close() return Path(temp_file.name) def create_malformed_toml(self) -> Path: """Helper to create a malformed TOML file.""" temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) temp_file.write(""" [project name = "test-project version = "1.0.0" """) temp_file.close() return Path(temp_file.name) # Test: Successful version match def test_matching_versions(self): """Test that matching versions result in success.""" pyproject_file = self.create_pyproject_toml("1.2.3") try: sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.2.3'] # Capture stdout captured_output = StringIO() with patch('sys.stdout', captured_output): with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 0) self.assertIn("✅ Version consistency check passed: 1.2.3", captured_output.getvalue()) finally: pyproject_file.unlink() # Test: Version mismatch def test_version_mismatch(self): """Test that mismatched versions result in failure with appropriate error message.""" pyproject_file = self.create_pyproject_toml("1.2.3") try: sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.2.4'] # Capture stderr captured_error = StringIO() with patch('sys.stderr', captured_error): with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 1) error_output = captured_error.getvalue() self.assertIn("❌ Version mismatch detected!", error_output) self.assertIn("pyproject.toml version: 1.2.3", error_output) self.assertIn("Expected version: 1.2.4", error_output) self.assertIn("Please update pyproject.toml to version 1.2.4", error_output) finally: pyproject_file.unlink() # Test: Missing version in pyproject.toml def test_missing_version_field(self): """Test handling of pyproject.toml without a version field.""" pyproject_file = self.create_pyproject_toml_no_version() try: sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.0.0'] captured_error = StringIO() with patch('sys.stderr', captured_error): with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 1) self.assertIn("❌ ERROR: No version found in pyproject.toml", captured_error.getvalue()) finally: pyproject_file.unlink() # Test: Missing project section def test_missing_project_section(self): """Test handling of pyproject.toml without a project section.""" pyproject_file = self.create_pyproject_toml_no_project() try: sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.0.0'] captured_error = StringIO() with patch('sys.stderr', captured_error): with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 1) self.assertIn("❌ ERROR: No version found in pyproject.toml", captured_error.getvalue()) finally: pyproject_file.unlink() # Test: File not found def test_file_not_found(self): """Test handling of non-existent pyproject.toml file.""" sys.argv = ['get_pyproject_version.py', '/nonexistent/pyproject.toml', '1.0.0'] with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 1) # Test: Malformed TOML def test_malformed_toml(self): """Test handling of malformed TOML file.""" pyproject_file = self.create_malformed_toml() try: sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.0.0'] with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 1) finally: pyproject_file.unlink() # Test: Incorrect number of arguments - too few def test_too_few_arguments(self): """Test that providing too few arguments results in usage error.""" sys.argv = ['get_pyproject_version.py', 'pyproject.toml'] captured_error = StringIO() with patch('sys.stderr', captured_error): with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 1) self.assertIn("Usage: python get_pyproject_version.py <pyproject_path> <expected_version>", captured_error.getvalue()) # Test: Incorrect number of arguments - too many def test_too_many_arguments(self): """Test that providing too many arguments results in usage error.""" sys.argv = ['get_pyproject_version.py', 'pyproject.toml', '1.0.0', 'extra'] captured_error = StringIO() with patch('sys.stderr', captured_error): with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 1) self.assertIn("Usage: python get_pyproject_version.py <pyproject_path> <expected_version>", captured_error.getvalue()) # Test: No arguments def test_no_arguments(self): """Test that providing no arguments results in usage error.""" sys.argv = ['get_pyproject_version.py'] captured_error = StringIO() with patch('sys.stderr', captured_error): with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 1) self.assertIn("Usage: python get_pyproject_version.py <pyproject_path> <expected_version>", captured_error.getvalue()) # Test: Version with pre-release tags def test_version_with_prerelease_tags(self): """Test matching versions with pre-release tags like alpha, beta, rc.""" pyproject_file = self.create_pyproject_toml("1.2.3-rc.1") try: sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.2.3-rc.1'] captured_output = StringIO() with patch('sys.stdout', captured_output): with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 0) self.assertIn("✅ Version consistency check passed: 1.2.3-rc.1", captured_output.getvalue()) finally: pyproject_file.unlink() # Test: Version with build metadata def test_version_with_build_metadata(self): """Test matching versions with build metadata.""" pyproject_file = self.create_pyproject_toml("1.2.3+build.123") try: sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.2.3+build.123'] captured_output = StringIO() with patch('sys.stdout', captured_output): with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 0) self.assertIn("✅ Version consistency check passed: 1.2.3+build.123", captured_output.getvalue()) finally: pyproject_file.unlink() # Test: Various semantic version formats def test_semantic_version_0_0_1(self): """Test semantic version 0.0.1.""" self._test_version_format("0.0.1") def test_semantic_version_1_0_0(self): """Test semantic version 1.0.0.""" self._test_version_format("1.0.0") def test_semantic_version_10_20_30(self): """Test semantic version 10.20.30.""" self._test_version_format("10.20.30") def test_semantic_version_alpha(self): """Test semantic version with alpha tag.""" self._test_version_format("1.2.3-alpha") def test_semantic_version_beta(self): """Test semantic version with beta tag.""" self._test_version_format("1.2.3-beta.1") def test_semantic_version_rc_with_build(self): """Test semantic version with rc and build metadata.""" self._test_version_format("1.2.3-rc.1+build.456") def _test_version_format(self, version: str): """Helper method to test various semantic version formats.""" pyproject_file = self.create_pyproject_toml(version) try: sys.argv = ['get_pyproject_version.py', str(pyproject_file), version] captured_output = StringIO() with patch('sys.stdout', captured_output): with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 0) self.assertIn(f"✅ Version consistency check passed: {version}", captured_output.getvalue()) finally: pyproject_file.unlink() # Test: Empty version string def test_empty_version_string(self): """Test handling of empty version string.""" pyproject_file = self.create_pyproject_toml("") try: sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.0.0'] captured_error = StringIO() with patch('sys.stderr', captured_error): with self.assertRaises(SystemExit) as cm: get_pyproject_version.main() self.assertEqual(cm.exception.code, 1) # Empty string is falsy, so it should trigger error self.assertIn("❌", captured_error.getvalue()) finally: pyproject_file.unlink() class TestSuiteInfo(unittest.TestCase): """Test suite metadata.""" def test_suite_info(self): """Display test suite information.""" print("\n" + "="*70) print("Test Suite: get_pyproject_version.py") print("Framework: unittest (Python built-in)") print("TOML Library: tomllib (Python 3.11+ built-in)") print("="*70) self.assertTrue(True) if __name__ == '__main__': # Run tests with verbose output unittest.main(verbosity=2) ``` -------------------------------------------------------------------------------- /libs/python/computer-server/computer_server/watchdog.py: -------------------------------------------------------------------------------- ```python """ Watchdog module for monitoring the Computer API server health. Unix/Linux only - provides process management and restart capabilities. """ import asyncio import fcntl import json import logging import os import platform import subprocess import sys import time import websockets from typing import Optional logger = logging.getLogger(__name__) def instance_already_running(label="watchdog"): """ Detect if an an instance with the label is already running, globally at the operating system level. Using `os.open` ensures that the file pointer won't be closed by Python's garbage collector after the function's scope is exited. The lock will be released when the program exits, or could be released if the file pointer were closed. """ lock_file_pointer = os.open(f"/tmp/instance_{label}.lock", os.O_WRONLY | os.O_CREAT) try: fcntl.lockf(lock_file_pointer, fcntl.LOCK_EX | fcntl.LOCK_NB) already_running = False except IOError: already_running = True return already_running class Watchdog: """Watchdog class to monitor server health via WebSocket connection. Unix/Linux only - provides restart capabilities. """ def __init__(self, cli_args: Optional[dict] = None, ping_interval: int = 30): """ Initialize the watchdog. Args: cli_args: Dictionary of CLI arguments to replicate when restarting ping_interval: Interval between ping checks in seconds """ # Check if running on Unix/Linux if platform.system() not in ['Linux', 'Darwin']: raise RuntimeError("Watchdog is only supported on Unix/Linux systems") # Store CLI arguments for restart self.cli_args = cli_args or {} self.host = self.cli_args.get('host', 'localhost') self.port = self.cli_args.get('port', 8000) self.ping_interval = ping_interval self.container_name = os.environ.get("CONTAINER_NAME") self.running = False self.restart_enabled = True @property def ws_uri(self) -> str: """Get the WebSocket URI using the current IP address. Returns: WebSocket URI for the Computer API Server """ ip_address = "localhost" if not self.container_name else f"{self.container_name}.containers.cloud.trycua.com" protocol = "wss" if self.container_name else "ws" port = "8443" if self.container_name else "8000" return f"{protocol}://{ip_address}:{port}/ws" async def ping(self) -> bool: """ Test connection to the WebSocket endpoint. Returns: True if connection successful, False otherwise """ try: # Create a simple ping message ping_message = { "command": "get_screen_size", "params": {} } # Try to connect to the WebSocket async with websockets.connect( self.ws_uri, max_size=1024 * 1024 * 10 # 10MB limit to match server ) as websocket: # Send ping message await websocket.send(json.dumps(ping_message)) # Wait for any response or just close try: response = await asyncio.wait_for(websocket.recv(), timeout=5) logger.debug(f"Ping response received: {response[:100]}...") return True except asyncio.TimeoutError: return False except Exception as e: logger.warning(f"Ping failed: {e}") return False def kill_processes_on_port(self, port: int) -> bool: """ Kill any processes using the specified port. Args: port: Port number to check and kill processes on Returns: True if processes were killed or none found, False on error """ try: # Find processes using the port result = subprocess.run( ["lsof", "-ti", f":{port}"], capture_output=True, text=True, timeout=10 ) if result.returncode == 0 and result.stdout.strip(): pids = result.stdout.strip().split('\n') logger.info(f"Found {len(pids)} processes using port {port}: {pids}") # Kill each process for pid in pids: if pid.strip(): try: subprocess.run(["kill", "-9", pid.strip()], timeout=5) logger.info(f"Killed process {pid}") except subprocess.TimeoutExpired: logger.warning(f"Timeout killing process {pid}") except Exception as e: logger.warning(f"Error killing process {pid}: {e}") return True else: logger.debug(f"No processes found using port {port}") return True except subprocess.TimeoutExpired: logger.error(f"Timeout finding processes on port {port}") return False except Exception as e: logger.error(f"Error finding processes on port {port}: {e}") return False def restart_server(self) -> bool: """ Attempt to restart the server by killing existing processes and starting new one. Returns: True if restart was attempted, False on error """ if not self.restart_enabled: logger.info("Server restart is disabled") return False try: logger.info("Attempting to restart server...") # Kill processes on the port port_to_kill = 8443 if self.container_name else self.port if not self.kill_processes_on_port(port_to_kill): logger.error("Failed to kill processes on port, restart aborted") return False # Wait a moment for processes to die time.sleep(2) # Try to restart the server # In container mode, we can't easily restart, so just log if self.container_name: logger.warning("Container mode detected - cannot restart server automatically") logger.warning("Container orchestrator should handle restart") return False else: # For local mode, try to restart the CLI logger.info("Attempting to restart local server...") # Get the current Python executable and script python_exe = sys.executable # Try to find the CLI module try: # Build command with all original CLI arguments cmd = [python_exe, "-m", "computer_server.cli"] # Add all CLI arguments except watchdog-related ones for key, value in self.cli_args.items(): if key in ['watchdog', 'watchdog_interval', 'no_restart']: continue # Skip watchdog args to avoid recursive watchdog # Convert underscores to hyphens for CLI args arg_name = f"--{key.replace('_', '-')}" if isinstance(value, bool): if value: # Only add flag if True cmd.append(arg_name) else: cmd.extend([arg_name, str(value)]) logger.info(f"Starting server with command: {' '.join(cmd)}") # Start process in background subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True ) logger.info("Server restart initiated") return True except Exception as e: logger.error(f"Failed to restart server: {e}") return False except Exception as e: logger.error(f"Error during server restart: {e}") return False async def start_monitoring(self) -> None: """Start the watchdog monitoring loop.""" self.running = True logger.info(f"Starting watchdog monitoring for {self.ws_uri}") logger.info(f"Ping interval: {self.ping_interval} seconds") if self.container_name: logger.info(f"Container mode detected: {self.container_name}") consecutive_failures = 0 max_failures = 3 while self.running: try: success = await self.ping() if success: if consecutive_failures > 0: logger.info("Server connection restored") consecutive_failures = 0 logger.debug("Ping successful") else: consecutive_failures += 1 logger.warning(f"Ping failed ({consecutive_failures}/{max_failures})") if consecutive_failures >= max_failures: logger.error(f"Server appears to be down after {max_failures} consecutive failures") # Attempt to restart the server if self.restart_enabled: logger.info("Attempting automatic server restart...") restart_success = self.restart_server() if restart_success: logger.info("Server restart initiated, waiting before next ping...") # Wait longer after restart attempt await asyncio.sleep(self.ping_interval * 2) consecutive_failures = 0 # Reset counter after restart attempt else: logger.error("Server restart failed") else: logger.warning("Automatic restart is disabled") # Wait for next ping interval await asyncio.sleep(self.ping_interval) except asyncio.CancelledError: logger.info("Watchdog monitoring cancelled") break except Exception as e: logger.error(f"Unexpected error in watchdog loop: {e}") await asyncio.sleep(self.ping_interval) def stop_monitoring(self) -> None: """Stop the watchdog monitoring.""" self.running = False logger.info("Stopping watchdog monitoring") async def run_watchdog(cli_args: Optional[dict] = None, ping_interval: int = 30) -> None: """ Run the watchdog monitoring. Args: cli_args: Dictionary of CLI arguments to replicate when restarting ping_interval: Interval between ping checks in seconds """ watchdog = Watchdog(cli_args=cli_args, ping_interval=ping_interval) try: await watchdog.start_monitoring() except KeyboardInterrupt: logger.info("Watchdog stopped by user") finally: watchdog.stop_monitoring() if __name__ == "__main__": # For testing the watchdog standalone import argparse parser = argparse.ArgumentParser(description="Run Computer API server watchdog") parser.add_argument("--host", default="localhost", help="Server host to monitor") parser.add_argument("--port", type=int, default=8000, help="Server port to monitor") parser.add_argument("--ping-interval", type=int, default=30, help="Ping interval in seconds") args = parser.parse_args() logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) cli_args = { 'host': args.host, 'port': args.port } asyncio.run(run_watchdog(cli_args, args.ping_interval)) ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/adapters/human_adapter.py: -------------------------------------------------------------------------------- ```python import os import asyncio import requests from typing import List, Dict, Any, Iterator, AsyncIterator from litellm.types.utils import GenericStreamingChunk, ModelResponse from litellm.llms.custom_llm import CustomLLM from litellm import completion, acompletion class HumanAdapter(CustomLLM): """Human Adapter for human-in-the-loop completions. This adapter sends completion requests to a human completion server where humans can review and respond to AI requests. """ def __init__(self, base_url: str | None = None, timeout: float = 300.0, **kwargs): """Initialize the human adapter. Args: base_url: Base URL for the human completion server. Defaults to HUMAN_BASE_URL environment variable or http://localhost:8002 timeout: Timeout in seconds for waiting for human response **kwargs: Additional arguments """ super().__init__() self.base_url = base_url or os.getenv('HUMAN_BASE_URL', 'http://localhost:8002') self.timeout = timeout # Ensure base_url doesn't end with slash self.base_url = self.base_url.rstrip('/') def _queue_completion(self, messages: List[Dict[str, Any]], model: str) -> str: """Queue a completion request and return the call ID. Args: messages: Messages in OpenAI format model: Model name Returns: Call ID for tracking the request Raises: Exception: If queueing fails """ try: response = requests.post( f"{self.base_url}/queue", json={"messages": messages, "model": model}, timeout=10 ) response.raise_for_status() return response.json()["id"] except requests.RequestException as e: raise Exception(f"Failed to queue completion request: {e}") def _wait_for_completion(self, call_id: str) -> Dict[str, Any]: """Wait for human to complete the call. Args: call_id: ID of the queued completion call Returns: Dict containing response and/or tool_calls Raises: TimeoutError: If timeout is exceeded Exception: If completion fails """ import time start_time = time.time() while True: try: # Check status status_response = requests.get(f"{self.base_url}/status/{call_id}") status_response.raise_for_status() status_data = status_response.json() if status_data["status"] == "completed": result = {} if "response" in status_data and status_data["response"]: result["response"] = status_data["response"] if "tool_calls" in status_data and status_data["tool_calls"]: result["tool_calls"] = status_data["tool_calls"] return result elif status_data["status"] == "failed": error_msg = status_data.get("error", "Unknown error") raise Exception(f"Completion failed: {error_msg}") # Check timeout if time.time() - start_time > self.timeout: raise TimeoutError(f"Timeout waiting for human response after {self.timeout} seconds") # Wait before checking again time.sleep(1.0) except requests.RequestException as e: if time.time() - start_time > self.timeout: raise TimeoutError(f"Timeout waiting for human response: {e}") # Continue trying if we haven't timed out time.sleep(1.0) async def _async_wait_for_completion(self, call_id: str) -> Dict[str, Any]: """Async version of wait_for_completion. Args: call_id: ID of the queued completion call Returns: Dict containing response and/or tool_calls Raises: TimeoutError: If timeout is exceeded Exception: If completion fails """ import aiohttp import time start_time = time.time() async with aiohttp.ClientSession() as session: while True: try: # Check status async with session.get(f"{self.base_url}/status/{call_id}") as response: response.raise_for_status() status_data = await response.json() if status_data["status"] == "completed": result = {} if "response" in status_data and status_data["response"]: result["response"] = status_data["response"] if "tool_calls" in status_data and status_data["tool_calls"]: result["tool_calls"] = status_data["tool_calls"] return result elif status_data["status"] == "failed": error_msg = status_data.get("error", "Unknown error") raise Exception(f"Completion failed: {error_msg}") # Check timeout if time.time() - start_time > self.timeout: raise TimeoutError(f"Timeout waiting for human response after {self.timeout} seconds") # Wait before checking again await asyncio.sleep(1.0) except Exception as e: if time.time() - start_time > self.timeout: raise TimeoutError(f"Timeout waiting for human response: {e}") # Continue trying if we haven't timed out await asyncio.sleep(1.0) def _generate_response(self, messages: List[Dict[str, Any]], model: str) -> Dict[str, Any]: """Generate a human response for the given messages. Args: messages: Messages in OpenAI format model: Model name Returns: Dict containing response and/or tool_calls """ # Queue the completion request call_id = self._queue_completion(messages, model) # Wait for human response response = self._wait_for_completion(call_id) return response async def _async_generate_response(self, messages: List[Dict[str, Any]], model: str) -> Dict[str, Any]: """Async version of _generate_response. Args: messages: Messages in OpenAI format model: Model name Returns: Dict containing response and/or tool_calls """ # Queue the completion request (sync operation) call_id = self._queue_completion(messages, model) # Wait for human response (async) response = await self._async_wait_for_completion(call_id) return response def completion(self, *args, **kwargs) -> ModelResponse: """Synchronous completion method. Returns: ModelResponse with human-generated text or tool calls """ messages = kwargs.get('messages', []) model = kwargs.get('model', 'human') # Generate human response human_response_data = self._generate_response(messages, model) # Create ModelResponse with proper structure from litellm.types.utils import ModelResponse, Choices, Message import uuid import time # Create message content based on response type if "tool_calls" in human_response_data and human_response_data["tool_calls"]: # Tool calls response message = Message( role="assistant", content=human_response_data.get("response", ""), tool_calls=human_response_data["tool_calls"] ) else: # Text response message = Message( role="assistant", content=human_response_data.get("response", "") ) choice = Choices( finish_reason="stop", index=0, message=message ) result = ModelResponse( id=f"human-{uuid.uuid4()}", choices=[choice], created=int(time.time()), model=f"human/{model}", object="chat.completion" ) return result async def acompletion(self, *args, **kwargs) -> ModelResponse: """Asynchronous completion method. Returns: ModelResponse with human-generated text or tool calls """ messages = kwargs.get('messages', []) model = kwargs.get('model', 'human') # Generate human response human_response_data = await self._async_generate_response(messages, model) # Create ModelResponse with proper structure from litellm.types.utils import ModelResponse, Choices, Message import uuid import time # Create message content based on response type if "tool_calls" in human_response_data and human_response_data["tool_calls"]: # Tool calls response message = Message( role="assistant", content=human_response_data.get("response", ""), tool_calls=human_response_data["tool_calls"] ) else: # Text response message = Message( role="assistant", content=human_response_data.get("response", "") ) choice = Choices( finish_reason="stop", index=0, message=message ) result = ModelResponse( id=f"human-{uuid.uuid4()}", choices=[choice], created=int(time.time()), model=f"human/{model}", object="chat.completion" ) return result def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]: """Synchronous streaming method. Yields: Streaming chunks with human-generated text or tool calls """ messages = kwargs.get('messages', []) model = kwargs.get('model', 'human') # Generate human response human_response_data = self._generate_response(messages, model) import time # Handle tool calls vs text response if "tool_calls" in human_response_data and human_response_data["tool_calls"]: # Stream tool calls as a single chunk generic_chunk: GenericStreamingChunk = { "finish_reason": "tool_calls", "index": 0, "is_finished": True, "text": human_response_data.get("response", ""), "tool_use": human_response_data["tool_calls"], "usage": {"completion_tokens": 1, "prompt_tokens": 0, "total_tokens": 1}, } yield generic_chunk else: # Stream text response response_text = human_response_data.get("response", "") generic_chunk: GenericStreamingChunk = { "finish_reason": "stop", "index": 0, "is_finished": True, "text": response_text, "tool_use": None, "usage": {"completion_tokens": len(response_text.split()), "prompt_tokens": 0, "total_tokens": len(response_text.split())}, } yield generic_chunk async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]: """Asynchronous streaming method. Yields: Streaming chunks with human-generated text or tool calls """ messages = kwargs.get('messages', []) model = kwargs.get('model', 'human') # Generate human response human_response = await self._async_generate_response(messages, model) # Return as single streaming chunk generic_streaming_chunk: GenericStreamingChunk = { "finish_reason": "stop", "index": 0, "is_finished": True, "text": human_response, "tool_use": None, "usage": {"completion_tokens": len(human_response.split()), "prompt_tokens": 0, "total_tokens": len(human_response.split())}, } yield generic_streaming_chunk ``` -------------------------------------------------------------------------------- /blog/bringing-computer-use-to-the-web.md: -------------------------------------------------------------------------------- ```markdown # Bringing Computer-Use to the Web *Published on August 5, 2025 by Morgan Dean* In one of our original posts, we explored building Computer-Use Operators on macOS - first with a [manual implementation](build-your-own-operator-on-macos-1.md) using OpenAI's `computer-use-preview` model, then with our [cua-agent framework](build-your-own-operator-on-macos-2.md) for Python developers. While these tutorials have been incredibly popular, we've received consistent feedback from our community: **"Can we use C/ua with JavaScript and TypeScript?"** Today, we're excited to announce the release of the **`@trycua/computer` Web SDK** - a new library that allows you to control your C/ua cloud containers from any JavaScript or TypeScript project. With this library, you can click, type, and grab screenshots from your cloud containers - no extra servers required. With this new SDK, you can easily develop CUA experiences like the one below, which we will release soon as open source. <div align="center"> <video src="https://github.com/user-attachments/assets/e213d6c3-73b6-48dd-a7d9-ed761ed74f89" width="600" controls></video> </div> Let’s see how it works. ## What You'll Learn By the end of this tutorial, you'll be able to: - Set up the `@trycua/computer` npm library in any JavaScript/TypeScript project - Connect OpenAI's computer-use model to C/ua cloud containers from web applications - Build computer-use agents that work in Node.js, React, Vue, or any web framework - Handle different types of computer actions (clicking, typing, scrolling) from web code - Implement the complete computer-use loop in JavaScript/TypeScript - Integrate AI automation into existing web applications and workflows **Prerequisites:** - Node.js 16+ and npm/yarn/pnpm - Basic JavaScript or TypeScript knowledge - OpenAI API access (Tier 3+ for computer-use-preview) - C/ua cloud container credits ([get started here](https://trycua.com/pricing)) **Estimated Time:** 45-60 minutes ## Access Requirements ### OpenAI Model Availability At the time of writing, the **computer-use-preview** model has limited availability: - Only accessible to OpenAI tier 3+ users - Additional application process may be required even for eligible users - Cannot be used in the OpenAI Playground - Outside of ChatGPT Operator, usage is restricted to the new **Responses API** Luckily, the `@trycua/computer` library can be used in conjunction with other models, like [Anthropic’s Computer Use](https://docs.anthropic.com/en/docs/agents-and-tools/tool-use/computer-use-tool) or [UI-TARS](https://huggingface.co/ByteDance-Seed/UI-TARS-1.5-7B). You’ll just have to write your own handler to parse the model output for interfacing with the container. ### C/ua Cloud Containers To follow this guide, you’ll need access to a C/ua cloud container. Getting access is simple: purchase credits from our [pricing page](https://trycua.com/pricing), then create and provision a new container instance from the [dashboard](https://trycua.com/dashboard/containers). With your container running, you'll be ready to leverage the web SDK and bring automation to your JavaScript or TypeScript applications. ## Understanding the Flow ### OpenAI API Overview Let's start with the basics. In our case, we'll use OpenAI's API to communicate with their computer-use model. Think of it like this: 1. We send the model a screenshot of our container and tell it what we want it to do 2. The model looks at the screenshot and decides what actions to take 3. It sends back instructions (like "click here" or "type this") 4. We execute those instructions in our container. ### Model Setup Here's how we set up the computer-use model for web development: ```javascript const res = await openai.responses.create({ model: 'computer-use-preview', tools: [ { type: 'computer_use_preview', display_width: 1024, display_height: 768, environment: 'linux', // we're using a linux container }, ], input: [ { role: 'user', content: [ // what we want the ai to do { type: 'input_text', text: 'Open firefox and go to trycua.com' }, // first screenshot of the vm { type: 'input_image', image_url: `data:image/png;base64,${screenshotBase64}`, detail: 'auto', }, ], }, ], truncation: 'auto' }); ``` ### Understanding the Response When we send a request, the API sends back a response that looks like this: ```json "output": [ { "type": "reasoning", // The AI explains what it's thinking "id": "rs_67cc...", "summary": [ { "type": "summary_text", "text": "Clicking on the browser address bar." } ] }, { "type": "computer_call", // The actual action to perform "id": "cu_67cc...", "call_id": "call_zw3...", // Used to track previous calls "action": { "type": "click", // What kind of action (click, type, etc.) "button": "left", // Which mouse button to use "x": 156, // Where to click (coordinates) "y": 50 }, "pending_safety_checks": [], // Any safety warnings to consider "status": "completed" // Whether the action was successful } ] ``` Each response contains: 1. **Reasoning**: The AI's explanation of what it's doing 2. **Action**: The specific computer action to perform 3. **Safety Checks**: Any potential risks to review 4. **Status**: Whether everything worked as planned ## Implementation Guide ### Provision a C/ua Cloud Container 1. Visit [trycua.com](https://trycua.com), sign up, purchase [credits](https://trycua.com/pricing), and create a new container instance from the [dashboard](https://trycua.com/dashboard). 2. Create an API key from the dashboard — be sure to save it in a secure location before continuing. 3. Start the cloud container from the dashboard. ### Environment Setup 1. Install required packages with your preferred package manager: ```bash npm install --save @trycua/computer # or yarn, pnpm, bun npm install --save openai # or yarn, pnpm, bun ``` Works with any JavaScript/TypeScript project setup - whether you're using Create React App, Next.js, Vue, Angular, or plain JavaScript. 2. Save your OpenAI API key, C/ua API key, and container name to a `.env` file: ```bash OPENAI_API_KEY=openai-api-key CUA_API_KEY=cua-api-key CUA_CONTAINER_NAME=cua-cloud-container-name ``` These environment variables work the same whether you're using vanilla JavaScript, TypeScript, or any web framework. ## Building the Agent ### Mapping API Actions to `@trycua/computer` Interface Methods This helper function handles a `computer_call` action from the OpenAI API — converting the action into an equivalent action from the `@trycua/computer` interface. These actions will execute on the initialized `Computer` instance. For example, `await computer.interface.leftClick()` sends a mouse left click to the current cursor position. Whether you're using JavaScript or TypeScript, the interface remains the same: ```javascript export async function executeAction( computer: Computer, action: OpenAI.Responses.ResponseComputerToolCall['action'] ) { switch (action.type) { case 'click': const { x, y, button } = action; console.log(`Executing click at (${x}, ${y}) with button '${button}'.`); await computer.interface.moveCursor(x, y); if (button === 'right') await computer.interface.rightClick(); else await computer.interface.leftClick(); break; case 'type': const { text } = action; console.log(`Typing text: ${text}`); await computer.interface.typeText(text); break; case 'scroll': const { x: locX, y: locY, scroll_x, scroll_y } = action; console.log( `Scrolling at (${locX}, ${locY}) with offsets (scroll_x=${scroll_x}, scroll_y=${scroll_y}).` ); await computer.interface.moveCursor(locX, locY); await computer.interface.scroll(scroll_x, scroll_y); break; case 'keypress': const { keys } = action; for (const key of keys) { console.log(`Pressing key: ${key}.`); // Map common key names to CUA equivalents if (key.toLowerCase() === 'enter') { await computer.interface.pressKey('return'); } else if (key.toLowerCase() === 'space') { await computer.interface.pressKey('space'); } else { await computer.interface.pressKey(key); } } break; case 'wait': console.log(`Waiting for 3 seconds.`); await new Promise((resolve) => setTimeout(resolve, 3 * 1000)); break; case 'screenshot': console.log('Taking screenshot.'); // This is handled automatically in the main loop, but we can take an extra one if requested const screenshot = await computer.interface.screenshot(); return screenshot; default: console.log(`Unrecognized action: ${action.type}`); break; } } ``` ### Implementing the Computer-Use Loop This section defines a loop that: 1. Initializes the `Computer` instance (connecting to a Linux cloud container). 2. Captures a screenshot of the current state. 3. Sends the screenshot (with a user prompt) to the OpenAI Responses API using the `computer-use-preview` model. 4. Processes the returned `computer_call` action and executes it using our helper function. 5. Captures an updated screenshot after the action. 6. Send the updated screenshot and loops until no more actions are returned. ```javascript const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY }); // Initialize the Computer Connection const computer = new Computer({ apiKey: process.env.CUA_API_KEY!, name: process.env.CUA_CONTAINER_NAME!, osType: OSType.LINUX, }); await computer.run(); // Take the initial screenshot const screenshot = await computer.interface.screenshot(); const screenshotBase64 = screenshot.toString('base64'); // Setup openai config for computer use const computerUseConfig: OpenAI.Responses.ResponseCreateParamsNonStreaming = { model: 'computer-use-preview', tools: [ { type: 'computer_use_preview', display_width: 1024, display_height: 768, environment: 'linux', // we're using a linux vm }, ], truncation: 'auto', }; // Send initial screenshot to the openai computer use model let res = await openai.responses.create({ ...computerUseConfig, input: [ { role: 'user', content: [ // what we want the ai to do { type: 'input_text', text: 'open firefox and go to trycua.com' }, // current screenshot of the vm { type: 'input_image', image_url: `data:image/png;base64,${screenshotBase64}`, detail: 'auto', }, ], }, ], }); // Loop until there are no more computer use actions. while (true) { const computerCalls = res.output.filter((o) => o.type === 'computer_call'); if (computerCalls.length < 1) { console.log('No more computer calls. Loop complete.'); break; } // Get the first call const call = computerCalls[0]; const action = call.action; console.log('Received action from OpenAI Responses API:', action); let ackChecks: OpenAI.Responses.ResponseComputerToolCall.PendingSafetyCheck[] = []; if (call.pending_safety_checks.length > 0) { console.log('Safety checks pending:', call.pending_safety_checks); // In a real implementation, you would want to get user confirmation here. ackChecks = call.pending_safety_checks; } // Execute the action in the container await executeAction(computer, action); // Wait for changes to process within the container (1sec) await new Promise((resolve) => setTimeout(resolve, 1000)); // Capture new screenshot const newScreenshot = await computer.interface.screenshot(); const newScreenshotBase64 = newScreenshot.toString('base64'); // Screenshot back as computer_call_output res = await openai.responses.create({ ...computerUseConfig, previous_response_id: res.id, input: [ { type: 'computer_call_output', call_id: call.call_id, acknowledged_safety_checks: ackChecks, output: { type: 'computer_screenshot', image_url: `data:image/png;base64,${newScreenshotBase64}`, }, }, ], }); } ``` You can find the full example on [GitHub](https://github.com/trycua/cua/tree/main/examples/computer-example-ts). ## What's Next? The `@trycua/computer` Web SDK opens up some interesting possibilities. You could build browser-based testing tools, create interactive demos for your products, or automate repetitive workflows directly from your web apps. We're working on more examples and better documentation - if you build something cool with this SDK, we'd love to see it. Drop by our [Discord](https://discord.gg/cua-ai) and share what you're working on. Happy automating on the web! ``` -------------------------------------------------------------------------------- /tests/test_mcp_server_session_management.py: -------------------------------------------------------------------------------- ```python """ Tests for MCP Server Session Management functionality. This module tests the new concurrent session management and resource lifecycle features. """ import asyncio import importlib.util import sys import types import time from pathlib import Path import pytest def _install_stub_module(name: str, module: types.ModuleType, registry: dict[str, types.ModuleType | None]) -> None: registry[name] = sys.modules.get(name) sys.modules[name] = module @pytest.fixture def server_module(): """Create a server module with stubbed dependencies for testing.""" stubbed_modules: dict[str, types.ModuleType | None] = {} # Stub MCP Context primitives mcp_module = types.ModuleType("mcp") mcp_module.__path__ = [] # mark as package mcp_server_module = types.ModuleType("mcp.server") mcp_server_module.__path__ = [] fastmcp_module = types.ModuleType("mcp.server.fastmcp") class _StubContext: async def yield_message(self, *args, **kwargs): return None async def yield_tool_call(self, *args, **kwargs): return None async def yield_tool_output(self, *args, **kwargs): return None def report_progress(self, *_args, **_kwargs): return None def info(self, *_args, **_kwargs): return None def error(self, *_args, **_kwargs): return None class _StubImage: def __init__(self, format: str, data: bytes): self.format = format self.data = data class _StubFastMCP: def __init__(self, name: str): self.name = name self._tools: dict[str, types.FunctionType] = {} def tool(self, *args, **kwargs): def decorator(func): self._tools[func.__name__] = func return func return decorator def run(self): return None fastmcp_module.Context = _StubContext fastmcp_module.FastMCP = _StubFastMCP fastmcp_module.Image = _StubImage _install_stub_module("mcp", mcp_module, stubbed_modules) _install_stub_module("mcp.server", mcp_server_module, stubbed_modules) _install_stub_module("mcp.server.fastmcp", fastmcp_module, stubbed_modules) # Stub Computer module computer_module = types.ModuleType("computer") class _StubInterface: async def screenshot(self) -> bytes: return b"test-screenshot-data" class _StubComputer: def __init__(self, *args, **kwargs): self.interface = _StubInterface() async def run(self): return None computer_module.Computer = _StubComputer _install_stub_module("computer", computer_module, stubbed_modules) # Stub agent module agent_module = types.ModuleType("agent") class _StubComputerAgent: def __init__(self, *args, **kwargs): pass async def run(self, *_args, **_kwargs): # Simulate agent execution with streaming yield { "output": [ { "type": "message", "role": "assistant", "content": [{"type": "output_text", "text": "Task completed"}] } ] } agent_module.ComputerAgent = _StubComputerAgent _install_stub_module("agent", agent_module, stubbed_modules) # Stub session manager module session_manager_module = types.ModuleType("mcp_server.session_manager") class _StubSessionInfo: def __init__(self, session_id: str, computer, created_at: float, last_activity: float): self.session_id = session_id self.computer = computer self.created_at = created_at self.last_activity = last_activity self.active_tasks = set() self.is_shutting_down = False class _StubSessionManager: def __init__(self): self._sessions = {} self._session_lock = asyncio.Lock() async def get_session(self, session_id=None): """Context manager that returns a session.""" if session_id is None: session_id = "test-session-123" async with self._session_lock: if session_id not in self._sessions: computer = _StubComputer() session = _StubSessionInfo( session_id=session_id, computer=computer, created_at=time.time(), last_activity=time.time() ) self._sessions[session_id] = session return self._sessions[session_id] async def register_task(self, session_id: str, task_id: str): pass async def unregister_task(self, session_id: str, task_id: str): pass async def cleanup_session(self, session_id: str): async with self._session_lock: self._sessions.pop(session_id, None) def get_session_stats(self): return { "total_sessions": len(self._sessions), "max_concurrent": 10, "sessions": {sid: {"active_tasks": 0} for sid in self._sessions} } _stub_session_manager = _StubSessionManager() def get_session_manager(): return _stub_session_manager async def initialize_session_manager(): return _stub_session_manager async def shutdown_session_manager(): pass session_manager_module.get_session_manager = get_session_manager session_manager_module.initialize_session_manager = initialize_session_manager session_manager_module.shutdown_session_manager = shutdown_session_manager _install_stub_module("mcp_server.session_manager", session_manager_module, stubbed_modules) # Load the actual server module module_name = "mcp_server_server_under_test" module_path = Path("libs/python/mcp-server/mcp_server/server.py").resolve() spec = importlib.util.spec_from_file_location(module_name, module_path) server_module = importlib.util.module_from_spec(spec) assert spec and spec.loader spec.loader.exec_module(server_module) server_instance = getattr(server_module, "server", None) if server_instance is not None and hasattr(server_instance, "_tools"): for name, func in server_instance._tools.items(): setattr(server_module, name, func) try: yield server_module finally: sys.modules.pop(module_name, None) for name, original in stubbed_modules.items(): if original is None: sys.modules.pop(name, None) else: sys.modules[name] = original class FakeContext: """Fake context for testing.""" def __init__(self) -> None: self.events: list[tuple] = [] self.progress_updates: list[float] = [] def info(self, message: str) -> None: self.events.append(("info", message)) def error(self, message: str) -> None: self.events.append(("error", message)) def report_progress(self, value: float) -> None: self.progress_updates.append(value) async def yield_message(self, *, role: str, content): timestamp = asyncio.get_running_loop().time() self.events.append(("message", role, content, timestamp)) async def yield_tool_call(self, *, name: str | None, call_id: str, input): timestamp = asyncio.get_running_loop().time() self.events.append(("tool_call", name, call_id, input, timestamp)) async def yield_tool_output(self, *, call_id: str, output, is_error: bool = False): timestamp = asyncio.get_running_loop().time() self.events.append(("tool_output", call_id, output, is_error, timestamp)) def test_screenshot_cua_with_session_id(server_module): """Test that screenshot_cua works with session management.""" async def _run_test(): ctx = FakeContext() result = await server_module.screenshot_cua(ctx, session_id="test-session") assert result.format == "png" assert result.data == b"test-screenshot-data" asyncio.run(_run_test()) def test_screenshot_cua_creates_new_session(server_module): """Test that screenshot_cua creates a new session when none provided.""" async def _run_test(): ctx = FakeContext() result = await server_module.screenshot_cua(ctx) assert result.format == "png" assert result.data == b"test-screenshot-data" asyncio.run(_run_test()) def test_run_cua_task_with_session_management(server_module): """Test that run_cua_task works with session management.""" async def _run_test(): ctx = FakeContext() task = "Test task" session_id = "test-session-456" text_result, image = await server_module.run_cua_task(ctx, task, session_id) assert "Task completed" in text_result assert image.format == "png" assert image.data == b"test-screenshot-data" asyncio.run(_run_test()) def test_run_multi_cua_tasks_sequential(server_module): """Test that run_multi_cua_tasks works sequentially.""" async def _run_test(): ctx = FakeContext() tasks = ["Task 1", "Task 2", "Task 3"] results = await server_module.run_multi_cua_tasks(ctx, tasks, concurrent=False) assert len(results) == 3 for i, (text, image) in enumerate(results): assert "Task completed" in text assert image.format == "png" asyncio.run(_run_test()) def test_run_multi_cua_tasks_concurrent(server_module): """Test that run_multi_cua_tasks works concurrently.""" async def _run_test(): ctx = FakeContext() tasks = ["Task 1", "Task 2", "Task 3"] results = await server_module.run_multi_cua_tasks(ctx, tasks, concurrent=True) assert len(results) == 3 for i, (text, image) in enumerate(results): assert "Task completed" in text assert image.format == "png" asyncio.run(_run_test()) def test_get_session_stats(server_module): """Test that get_session_stats returns proper statistics.""" async def _run_test(): ctx = FakeContext() stats = await server_module.get_session_stats() assert "total_sessions" in stats assert "max_concurrent" in stats assert "sessions" in stats asyncio.run(_run_test()) def test_cleanup_session(server_module): """Test that cleanup_session works properly.""" async def _run_test(): ctx = FakeContext() session_id = "test-cleanup-session" result = await server_module.cleanup_session(ctx, session_id) assert f"Session {session_id} cleanup initiated" in result asyncio.run(_run_test()) def test_concurrent_sessions_isolation(server_module): """Test that concurrent sessions are properly isolated.""" async def _run_test(): ctx = FakeContext() # Run multiple tasks with different session IDs concurrently task1 = asyncio.create_task( server_module.run_cua_task(ctx, "Task for session 1", "session-1") ) task2 = asyncio.create_task( server_module.run_cua_task(ctx, "Task for session 2", "session-2") ) results = await asyncio.gather(task1, task2) assert len(results) == 2 for text, image in results: assert "Task completed" in text assert image.format == "png" asyncio.run(_run_test()) def test_session_reuse_with_same_id(server_module): """Test that sessions are reused when the same session ID is provided.""" async def _run_test(): ctx = FakeContext() session_id = "reuse-session" # First call result1 = await server_module.screenshot_cua(ctx, session_id) # Second call with same session ID result2 = await server_module.screenshot_cua(ctx, session_id) assert result1.format == result2.format assert result1.data == result2.data asyncio.run(_run_test()) def test_error_handling_with_session_management(server_module): """Test that errors are handled properly with session management.""" async def _run_test(): # Mock an agent that raises an exception class _FailingAgent: def __init__(self, *args, **kwargs): pass async def run(self, *_args, **_kwargs): raise RuntimeError("Simulated agent failure") # Replace the ComputerAgent with our failing one original_agent = server_module.ComputerAgent server_module.ComputerAgent = _FailingAgent try: ctx = FakeContext() task = "This will fail" text_result, image = await server_module.run_cua_task(ctx, task, "error-session") assert "Error during task execution" in text_result assert image.format == "png" finally: # Restore original agent server_module.ComputerAgent = original_agent asyncio.run(_run_test()) ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/loops/gemini.py: -------------------------------------------------------------------------------- ```python """ Gemini 2.5 Computer Use agent loop Maps internal Agent SDK message format to Google's Gemini Computer Use API and back. Key features: - Lazy import of google.genai - Configure Computer Use tool with excluded browser-specific predefined functions - Optional custom function declarations hook for computer-call specific functions - Convert Gemini function_call parts into internal computer_call actions """ from __future__ import annotations import base64 import io import uuid from typing import Any, Dict, List, Optional, Tuple from PIL import Image from ..decorators import register_agent from ..loops.base import AsyncAgentConfig from ..types import AgentCapability def _lazy_import_genai(): """Import google.genai lazily to avoid hard dependency unless used.""" try: from google import genai # type: ignore from google.genai import types # type: ignore return genai, types except Exception as e: # pragma: no cover raise RuntimeError( "google.genai is required for the Gemini Computer Use loop. Install the Google Gemini SDK." ) from e def _data_url_to_bytes(data_url: str) -> Tuple[bytes, str]: """Convert a data URL to raw bytes and mime type.""" if not data_url.startswith("data:"): # Assume it's base64 png payload try: return base64.b64decode(data_url), "image/png" except Exception: return b"", "application/octet-stream" header, b64 = data_url.split(",", 1) mime = "image/png" if ";" in header: mime = header.split(";")[0].split(":", 1)[1] or "image/png" return base64.b64decode(b64), mime def _bytes_image_size(img_bytes: bytes) -> Tuple[int, int]: try: img = Image.open(io.BytesIO(img_bytes)) return img.size except Exception: return (1024, 768) def _find_last_user_text(messages: List[Dict[str, Any]]) -> List[str]: texts: List[str] = [] for msg in reversed(messages): if msg.get("type") in (None, "message") and msg.get("role") == "user": content = msg.get("content") if isinstance(content, str): return [content] elif isinstance(content, list): for c in content: if c.get("type") in ("input_text", "output_text") and c.get("text"): texts.append(c["text"]) # newest first if texts: return list(reversed(texts)) return [] def _find_last_screenshot(messages: List[Dict[str, Any]]) -> Optional[bytes]: for msg in reversed(messages): if msg.get("type") == "computer_call_output": out = msg.get("output", {}) if isinstance(out, dict) and out.get("type") in ("input_image", "computer_screenshot"): image_url = out.get("image_url", "") if image_url: data, _ = _data_url_to_bytes(image_url) return data return None def _denormalize(v: int, size: int) -> int: # Gemini returns 0-999 normalized try: return max(0, min(size - 1, int(round(v / 1000 * size)))) except Exception: return 0 def _map_gemini_fc_to_computer_call( fc: Dict[str, Any], screen_w: int, screen_h: int, ) -> Optional[Dict[str, Any]]: name = fc.get("name") args = fc.get("args", {}) or {} action: Dict[str, Any] = {} if name == "click_at": x = _denormalize(int(args.get("x", 0)), screen_w) y = _denormalize(int(args.get("y", 0)), screen_h) action = {"type": "click", "x": x, "y": y, "button": "left"} elif name == "type_text_at": x = _denormalize(int(args.get("x", 0)), screen_w) y = _denormalize(int(args.get("y", 0)), screen_h) text = args.get("text", "") if args.get("press_enter") == True: text += "\n" action = {"type": "type", "x": x, "y": y, "text": text} elif name == "hover_at": x = _denormalize(int(args.get("x", 0)), screen_w) y = _denormalize(int(args.get("y", 0)), screen_h) action = {"type": "move", "x": x, "y": y} elif name == "key_combination": keys = str(args.get("keys", "")) action = {"type": "keypress", "keys": keys} elif name == "scroll_document": direction = args.get("direction", "down") magnitude = 800 dx, dy = 0, 0 if direction == "down": dy = magnitude elif direction == "up": dy = -magnitude elif direction == "right": dx = magnitude elif direction == "left": dx = -magnitude action = {"type": "scroll", "scroll_x": dx, "scroll_y": dy, "x": int(screen_w / 2), "y": int(screen_h / 2)} elif name == "scroll_at": x = _denormalize(int(args.get("x", 500)), screen_w) y = _denormalize(int(args.get("y", 500)), screen_h) direction = args.get("direction", "down") magnitude = int(args.get("magnitude", 800)) dx, dy = 0, 0 if direction == "down": dy = magnitude elif direction == "up": dy = -magnitude elif direction == "right": dx = magnitude elif direction == "left": dx = -magnitude action = {"type": "scroll", "scroll_x": dx, "scroll_y": dy, "x": x, "y": y} elif name == "drag_and_drop": x = _denormalize(int(args.get("x", 0)), screen_w) y = _denormalize(int(args.get("y", 0)), screen_h) dx = _denormalize(int(args.get("destination_x", x)), screen_w) dy = _denormalize(int(args.get("destination_y", y)), screen_h) action = {"type": "drag", "start_x": x, "start_y": y, "end_x": dx, "end_y": dy, "button": "left"} elif name == "wait_5_seconds": action = {"type": "wait"} else: # Unsupported / excluded browser-specific or custom function; ignore return None return { "type": "computer_call", "call_id": uuid.uuid4().hex, "status": "completed", "action": action, } @register_agent(models=r"^gemini-2\.5-computer-use-preview-10-2025$") class GeminiComputerUseConfig(AsyncAgentConfig): async def predict_step( self, messages: List[Dict[str, Any]], model: str, tools: Optional[List[Dict[str, Any]]] = None, max_retries: Optional[int] = None, stream: bool = False, computer_handler=None, use_prompt_caching: Optional[bool] = False, _on_api_start=None, _on_api_end=None, _on_usage=None, _on_screenshot=None, **kwargs, ) -> Dict[str, Any]: genai, types = _lazy_import_genai() client = genai.Client() # Build excluded predefined functions for browser-specific behavior excluded = [ "open_web_browser", "search", "navigate", "go_forward", "go_back", "scroll_document", ] # Optional custom functions: can be extended by host code via `tools` parameter later if desired CUSTOM_FUNCTION_DECLARATIONS: List[Any] = [] # Compose tools config generate_content_config = types.GenerateContentConfig( tools=[ types.Tool( computer_use=types.ComputerUse( environment=types.Environment.ENVIRONMENT_BROWSER, excluded_predefined_functions=excluded, ) ), # types.Tool(function_declarations=CUSTOM_FUNCTION_DECLARATIONS), # enable when custom functions needed ] ) # Prepare contents: last user text + latest screenshot user_texts = _find_last_user_text(messages) screenshot_bytes = _find_last_screenshot(messages) parts: List[Any] = [] for t in user_texts: parts.append(types.Part(text=t)) screen_w, screen_h = 1024, 768 if screenshot_bytes: screen_w, screen_h = _bytes_image_size(screenshot_bytes) parts.append(types.Part.from_bytes(data=screenshot_bytes, mime_type="image/png")) # If we don't have any content, at least pass an empty user part to prompt reasoning if not parts: parts = [types.Part(text="Proceed to the next action.")] contents = [types.Content(role="user", parts=parts)] api_kwargs = { "model": model, "contents": contents, "config": generate_content_config, } if _on_api_start: await _on_api_start({ "model": api_kwargs["model"], # "contents": api_kwargs["contents"], # Disabled for now "config": api_kwargs["config"], }) response = client.models.generate_content(**api_kwargs) if _on_api_end: await _on_api_end({ "model": api_kwargs["model"], # "contents": api_kwargs["contents"], # Disabled for now "config": api_kwargs["config"], }, response) # Usage (Gemini SDK may not always provide token usage; populate when available) usage: Dict[str, Any] = {} try: # Some SDKs expose response.usage; if available, copy if getattr(response, "usage_metadata", None): md = response.usage_metadata usage = { "prompt_tokens": getattr(md, "prompt_token_count", None) or 0, "completion_tokens": getattr(md, "candidates_token_count", None) or 0, "total_tokens": getattr(md, "total_token_count", None) or 0, } except Exception: pass if _on_usage and usage: await _on_usage(usage) # Parse output into internal items output_items: List[Dict[str, Any]] = [] candidate = response.candidates[0] # Text parts from the model (assistant message) text_parts: List[str] = [] function_calls: List[Dict[str, Any]] = [] for p in candidate.content.parts: if getattr(p, "text", None): text_parts.append(p.text) if getattr(p, "function_call", None): # p.function_call has name and args fc = { "name": getattr(p.function_call, "name", None), "args": dict(getattr(p.function_call, "args", {}) or {}), } function_calls.append(fc) if text_parts: output_items.append( { "type": "message", "role": "assistant", "content": [{"type": "output_text", "text": "\n".join(text_parts)}], } ) # Map function calls to internal computer_call actions for fc in function_calls: item = _map_gemini_fc_to_computer_call(fc, screen_w, screen_h) if item is not None: output_items.append(item) return {"output": output_items, "usage": usage} async def predict_click( self, model: str, image_b64: str, instruction: str, **kwargs, ) -> Optional[Tuple[float, float]]: """Ask Gemini CUA to output a single click action for the given instruction. Excludes all predefined tools except `click_at` and sends the screenshot. Returns pixel (x, y) if a click is proposed, else None. """ genai, types = _lazy_import_genai() client = genai.Client() # Exclude all but click_at exclude_all_but_click = [ "open_web_browser", "wait_5_seconds", "go_back", "go_forward", "search", "navigate", "hover_at", "type_text_at", "key_combination", "scroll_document", "scroll_at", "drag_and_drop", ] config = types.GenerateContentConfig( tools=[ types.Tool( computer_use=types.ComputerUse( environment=types.Environment.ENVIRONMENT_BROWSER, excluded_predefined_functions=exclude_all_but_click, ) ) ] ) # Prepare prompt parts try: img_bytes = base64.b64decode(image_b64) except Exception: img_bytes = b"" w, h = _bytes_image_size(img_bytes) if img_bytes else (1024, 768) parts: List[Any] = [types.Part(text=f"Click {instruction}.")] if img_bytes: parts.append(types.Part.from_bytes(data=img_bytes, mime_type="image/png")) contents = [types.Content(role="user", parts=parts)] response = client.models.generate_content( model=model, contents=contents, config=config, ) # Parse first click_at try: candidate = response.candidates[0] for p in candidate.content.parts: fc = getattr(p, "function_call", None) if fc and getattr(fc, "name", None) == "click_at": args = dict(getattr(fc, "args", {}) or {}) x = _denormalize(int(args.get("x", 0)), w) y = _denormalize(int(args.get("y", 0)), h) return float(x), float(y) except Exception: return None return None def get_capabilities(self) -> List[AgentCapability]: return ["click", "step"] ``` -------------------------------------------------------------------------------- /libs/lume/src/FileSystem/Home.swift: -------------------------------------------------------------------------------- ```swift import Foundation /// Manages the application's home directory and virtual machine directories. /// Responsible for creating, accessing, and validating the application's directory structure. final class Home { // MARK: - Constants private enum Constants { static let defaultDirectoryName = ".lume" static let homeDirPath = "~/\(defaultDirectoryName)" } // MARK: - Properties private var _homeDir: Path private let settingsManager: SettingsManager private let fileManager: FileManager private var locations: [String: VMLocation] = [:] // Current home directory based on default location var homeDir: Path { return _homeDir } // MARK: - Initialization init( settingsManager: SettingsManager = SettingsManager.shared, fileManager: FileManager = .default ) { self.settingsManager = settingsManager self.fileManager = fileManager // Get home directory path from settings or use default let settings = settingsManager.getSettings() guard let defaultLocation = settings.defaultLocation else { fatalError("No default VM location found") } self._homeDir = Path(defaultLocation.path) // Cache all locations for location in settings.vmLocations { locations[location.name] = location } } // MARK: - VM Directory Management /// Creates a temporary VM directory with a unique identifier /// - Returns: A VMDirectory instance representing the created directory /// - Throws: HomeError if directory creation fails func createTempVMDirectory() throws -> VMDirectory { let uuid = UUID().uuidString let tempDir = homeDir.directory(uuid) Logger.info("Creating temporary directory", metadata: ["path": tempDir.path]) do { try createDirectory(at: tempDir.url) return VMDirectory(tempDir) } catch { throw HomeError.directoryCreationFailed(path: tempDir.path) } } /// Gets a VM directory for a specific VM name and optional location /// /// - Parameters: /// - name: Name of the VM directory /// - storage: Optional name of the VM location (default: default location) /// - Returns: A VMDirectory instance /// - Throws: HomeError if location not found func getVMDirectory(_ name: String, storage: String? = nil) throws -> VMDirectory { // Special case for ephemeral storage using macOS temporary directory if let storage = storage, storage == "ephemeral" { // Get the current temporary directory let tmpDir = ProcessInfo.processInfo.environment["TMPDIR"] ?? "/tmp" // Remove trailing slash if present let cleanPath = tmpDir.hasSuffix("/") ? String(tmpDir.dropLast()) : tmpDir // Create the directory if it doesn't exist if !fileExists(at: cleanPath) { try createVMLocation(at: cleanPath) } let baseDir = Path(cleanPath) return VMDirectory(baseDir.directory(name)) } // Check if storage is a direct path if let storage = storage, (storage.contains("/") || storage.contains("\\")) { let cleanPath = storage.hasSuffix("/") ? String(storage.dropLast()) : storage let baseDir = Path(cleanPath) return VMDirectory(baseDir.directory(name)) } let location: VMLocation if let storage = storage { // Get a specific location guard let loc = locations[storage] else { throw VMLocationError.locationNotFound(name: storage) } location = loc } else { // Use default location let settings = settingsManager.getSettings() guard let defaultLocation = settings.defaultLocation else { throw HomeError.invalidHomeDirectory } location = defaultLocation } let baseDir = Path(location.expandedPath) return VMDirectory(baseDir.directory(name)) } /// Gets a VM directory from a direct file path /// /// - Parameters: /// - name: Name of the VM directory /// - storagePath: Direct file system path where the VM is located /// - Returns: A VMDirectory instance /// - Throws: HomeError if path is invalid func getVMDirectoryFromPath(_ name: String, storagePath: String) throws -> VMDirectory { let baseDir = Path(storagePath) // Create the directory if it doesn't exist if !fileExists(at: storagePath) { Logger.info("Creating storage directory", metadata: ["path": storagePath]) try createVMLocation(at: storagePath) } else if !isValidDirectory(at: storagePath) { // Path exists but isn't a valid directory throw HomeError.invalidHomeDirectory } return VMDirectory(baseDir.directory(name)) } /// Returns all initialized VM directories across all locations /// - Returns: An array of VMDirectory instances with location info /// - Throws: HomeError if directory access is denied func getAllVMDirectories() throws -> [VMDirectoryWithLocation] { var results: [VMDirectoryWithLocation] = [] // Loop through all locations let settings = settingsManager.getSettings() // Also check ephemeral directory (macOS temporary directory) let tmpDir = ProcessInfo.processInfo.environment["TMPDIR"] ?? "/tmp" let cleanPath = tmpDir.hasSuffix("/") ? String(tmpDir.dropLast()) : tmpDir // If tmp directory exists, check for VMs there if fileExists(at: cleanPath) { let tmpDirPath = Path(cleanPath) do { let directoryURL = URL(fileURLWithPath: cleanPath) let contents = try FileManager.default.contentsOfDirectory( at: directoryURL, includingPropertiesForKeys: [.isDirectoryKey], options: .skipsHiddenFiles ) for subdir in contents { do { guard let isDirectory = try subdir.resourceValues(forKeys: [.isDirectoryKey]).isDirectory, isDirectory else { continue } let vmName = subdir.lastPathComponent let vmDir = VMDirectory(tmpDirPath.directory(vmName)) // Only include if it's a valid VM directory if vmDir.initialized() { results.append(VMDirectoryWithLocation( directory: vmDir, locationName: "ephemeral" )) } } catch { // Skip any directories we can't access continue } } } catch { Logger.error( "Failed to access ephemeral directory", metadata: [ "path": cleanPath, "error": error.localizedDescription, ] ) // Continue to regular locations rather than failing completely } } for location in settings.vmLocations { let locationPath = Path(location.expandedPath) // Skip non-existent locations if !locationPath.exists() { continue } do { let allFolders = try fileManager.contentsOfDirectory( at: locationPath.url, includingPropertiesForKeys: nil ) let folders = allFolders .compactMap { url in let sanitizedName = sanitizeFileName(url.lastPathComponent) let dir = VMDirectory(locationPath.directory(sanitizedName)) let dirWithLoc = dir.initialized() ? VMDirectoryWithLocation(directory: dir, locationName: location.name) : nil return dirWithLoc } results.append(contentsOf: folders) } catch { Logger.error( "Failed to access VM location", metadata: [ "location": location.name, "error": error.localizedDescription, ]) // Continue to next location rather than failing completely } } return results } /// Copies a VM directory to a new location with a new name /// - Parameters: /// - sourceName: Name of the source VM /// - destName: Name for the destination VM /// - sourceLocation: Optional name of the source location /// - destLocation: Optional name of the destination location /// - Throws: HomeError if the copy operation fails func copyVMDirectory( from sourceName: String, to destName: String, sourceLocation: String? = nil, destLocation: String? = nil ) throws { let sourceDir = try getVMDirectory(sourceName, storage: sourceLocation) let destDir = try getVMDirectory(destName, storage: destLocation) // Check if destination directory exists at all if destDir.exists() { throw HomeError.directoryAlreadyExists(path: destDir.dir.path) } do { try fileManager.copyItem(atPath: sourceDir.dir.path, toPath: destDir.dir.path) } catch { throw HomeError.directoryCreationFailed(path: destDir.dir.path) } } // MARK: - Location Management /// Adds a new VM location /// - Parameters: /// - name: Location name /// - path: Location path /// - Throws: Error if location cannot be added func addLocation(name: String, path: String) throws { let location = VMLocation(name: name, path: path) try settingsManager.addLocation(location) // Update cache locations[name] = location } /// Removes a VM location /// - Parameter name: Location name /// - Throws: Error if location cannot be removed func removeLocation(name: String) throws { try settingsManager.removeLocation(name: name) // Update cache locations.removeValue(forKey: name) } /// Sets the default VM location /// - Parameter name: Location name /// - Throws: Error if location cannot be set as default func setDefaultLocation(name: String) throws { try settingsManager.setDefaultLocation(name: name) // Update home directory guard let location = locations[name] else { throw VMLocationError.locationNotFound(name: name) } // Update homeDir to reflect the new default self._homeDir = Path(location.path) } /// Gets all available VM locations /// - Returns: Array of VM locations func getLocations() -> [VMLocation] { return settingsManager.getSettings().sortedLocations } /// Gets the default VM location /// - Returns: Default VM location /// - Throws: HomeError if no default location func getDefaultLocation() throws -> VMLocation { guard let location = settingsManager.getSettings().defaultLocation else { throw HomeError.invalidHomeDirectory } return location } // MARK: - Directory Validation /// Validates and ensures the existence of all VM locations /// - Throws: HomeError if validation fails or directory creation fails func validateHomeDirectory() throws { let settings = settingsManager.getSettings() for location in settings.vmLocations { let path = location.expandedPath if !fileExists(at: path) { try createVMLocation(at: path) } else if !isValidDirectory(at: path) { throw HomeError.invalidHomeDirectory } } } // MARK: - Private Helpers private func createVMLocation(at path: String) throws { do { try fileManager.createDirectory( atPath: path, withIntermediateDirectories: true ) } catch { throw HomeError.directoryCreationFailed(path: path) } } private func createDirectory(at url: URL) throws { try fileManager.createDirectory( at: url, withIntermediateDirectories: true ) } private func isValidDirectory(at path: String) -> Bool { var isDirectory: ObjCBool = false return fileManager.fileExists(atPath: path, isDirectory: &isDirectory) && isDirectory.boolValue && Path(path).writable() } private func fileExists(at path: String) -> Bool { return fileManager.fileExists(atPath: path) } private func sanitizeFileName(_ name: String) -> String { // Only decode percent encoding (e.g., %20 for spaces) return name.removingPercentEncoding ?? name } } // MARK: - VM Directory with Location /// Represents a VM directory with its location information struct VMDirectoryWithLocation { let directory: VMDirectory let locationName: String } // MARK: - Home + CustomStringConvertible extension Home: CustomStringConvertible { var description: String { "Home(path: \(homeDir.path))" } } ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/integrations/hud/agent.py: -------------------------------------------------------------------------------- ```python """MCP-compatible Computer Agent for HUD integration. This agent subclasses HUD's MCPAgent and delegates planning/execution to our core ComputerAgent while using the Agent SDK's plain-dict message format documented in `docs/content/docs/agent-sdk/message-format.mdx`. Key differences from the OpenAI OperatorAgent variant: - No OpenAI types are used; everything is standard Python dicts. - Planning is executed via `ComputerAgent.run(messages)`. - The first yielded result per step is returned as the agent response. """ from __future__ import annotations import io from typing import Any, ClassVar, Optional from agent.agent import ComputerAgent as BaseComputerAgent from agent.callbacks import PromptInstructionsCallback from agent.callbacks.trajectory_saver import TrajectorySaverCallback from hud.agents import MCPAgent from hud.tools.computer.settings import computer_settings from hud.types import AgentResponse, MCPToolCall, MCPToolResult, Trace from agent.responses import make_failed_tool_call_items from agent.computers import is_agent_computer from PIL import Image import mcp.types as types import hud import uuid import base64 from pathlib import Path class MCPComputerAgent(MCPAgent): """MCP agent that uses ComputerAgent for planning and tools for execution. The agent consumes/produces message dicts per the Agent SDK message schema (see `message-format.mdx`). """ metadata: ClassVar[dict[str, Any]] = { "display_width": computer_settings.OPENAI_COMPUTER_WIDTH, "display_height": computer_settings.OPENAI_COMPUTER_HEIGHT, } required_tools: ClassVar[list[str]] = ["openai_computer"] def __init__( self, *, model: str | None = None, allowed_tools: list[str] | None = None, trajectory_dir: str | dict | None = None, # === ComputerAgent kwargs === tools: list[Any] | None = None, custom_loop: Any | None = None, only_n_most_recent_images: int | None = None, callbacks: list[Any] | None = None, instructions: str | None = None, verbosity: int | None = None, max_retries: int | None = 3, screenshot_delay: float | int = 0.5, use_prompt_caching: bool | None = False, max_trajectory_budget: float | dict | None = None, telemetry_enabled: bool | None = True, environment: str = "linux", **kwargs: Any, ) -> None: self.allowed_tools = allowed_tools or ["openai_computer"] super().__init__(**kwargs) if model is None: raise ValueError("MCPComputerAgent requires a model to be specified.") self.model = model self.environment = environment # Update model name for HUD logging self.model_name = "cua-" + self.model # Stateful tracking of tool call inputs self.tool_call_inputs: dict[str, list[dict[str, Any]]] = {} self.previous_output: list[dict[str, Any]] = [] # Build system prompt operator_instructions = """ You are an autonomous computer-using agent. Follow these guidelines: 1. NEVER ask for confirmation. Complete all tasks autonomously. 2. Do NOT send messages like "I need to confirm before..." or "Do you want me to continue?" - just proceed. 3. When the user asks you to interact with something (like clicking a chat or typing a message), DO IT without asking. 4. Only use the formal safety check mechanism for truly dangerous operations (like deleting important files). 5. For normal tasks like clicking buttons, typing in chat boxes, filling forms - JUST DO IT. 6. The user has already given you permission by running this agent. No further confirmation is needed. 7. Be decisive and action-oriented. Complete the requested task fully. Remember: You are expected to complete tasks autonomously. The user trusts you to do what they asked. """.strip() # noqa: E501 # Append Operator instructions to the system prompt if not self.system_prompt: self.system_prompt = operator_instructions else: self.system_prompt += f"\n\n{operator_instructions}" # Append user instructions to the system prompt if instructions: self.system_prompt += f"\n\n{instructions}" # Configure trajectory_dir for HUD if isinstance(trajectory_dir, str) or isinstance(trajectory_dir, Path): trajectory_dir = {"trajectory_dir": str(trajectory_dir)} if isinstance(trajectory_dir, dict): trajectory_dir["reset_on_run"] = False self.last_screenshot_b64 = None buffer = io.BytesIO() Image.new('RGB', (self.metadata["display_width"], self.metadata["display_height"])).save(buffer, format='PNG') self.last_screenshot_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') # Ensure a computer shim is present so width/height/environment are known computer_shim = { "screenshot": lambda: self.last_screenshot_b64, "environment": self.environment, "dimensions": ( self.metadata["display_width"], self.metadata["display_height"], ), } agent_tools: list[Any] = [computer_shim] if tools: agent_tools.extend([ tool for tool in tools if not is_agent_computer(tool) ]) agent_kwargs = { "model": self.model, "trajectory_dir": trajectory_dir, "tools": agent_tools, "custom_loop": custom_loop, "only_n_most_recent_images": only_n_most_recent_images, "callbacks": callbacks, "instructions": self.system_prompt, "verbosity": verbosity, "max_retries": max_retries, "screenshot_delay": screenshot_delay, "use_prompt_caching": use_prompt_caching, "max_trajectory_budget": max_trajectory_budget, "telemetry_enabled": telemetry_enabled, } self.computer_agent = BaseComputerAgent( **agent_kwargs ) async def get_system_messages(self) -> list[Any]: """Create initial messages. Unused - ComputerAgent handles this with the 'instructions' parameter. """ return [] async def format_blocks( self, blocks: list[types.ContentBlock] ) -> list[dict[str, Any]]: """ Format blocks for OpenAI input format. Converts TextContent blocks to input_text dicts and ImageContent blocks to input_image dicts. """ # noqa: E501 formatted = [] for block in blocks: if isinstance(block, types.TextContent): formatted.append({"type": "input_text", "text": block.text}) elif isinstance(block, types.ImageContent): mime_type = getattr(block, "mimeType", "image/png") formatted.append( {"type": "input_image", "image_url": f"data:{mime_type};base64,{block.data}"} ) self.last_screenshot_b64 = block.data return [{"role": "user", "content": formatted}] @hud.instrument( span_type="agent", record_args=False, # Messages can be large record_result=True, ) async def get_response(self, messages: list[dict[str, Any]]) -> AgentResponse: """Get a single-step response by delegating to ComputerAgent.run. Returns an Agent SDK-style response dict: { "output": [AgentMessage, ...], "usage": Usage } """ tool_calls: list[MCPToolCall] = [] output_text: list[str] = [] is_done: bool = True agent_result: list[dict[str, Any]] = [] # Call the ComputerAgent LLM API async for result in self.computer_agent.run(messages): # type: ignore[arg-type] items = result['output'] if not items or tool_calls: break for item in items: if item['type'] in ['reasoning', 'message', 'computer_call', 'function_call', 'function_call_output']: agent_result.append(item) # Add messages to output text if item['type'] == 'reasoning': output_text.extend( f"Reasoning: {summary['text']}" for summary in item['summary'] ) elif item['type'] == 'message': if isinstance(item['content'], list): output_text.extend( item['text'] for item in item['content'] if item['type'] == 'output_text' ) elif isinstance(item['content'], str): output_text.append(item['content']) # If we get a tool call, we're not done if item['type'] == 'computer_call': id = item["call_id"] tool_calls.append(MCPToolCall( name="openai_computer", arguments=item["action"], id=id, )) is_done = False self.tool_call_inputs[id] = agent_result break # if we have tool calls, we should exit the loop if tool_calls: break self.previous_output = agent_result return AgentResponse( content="\n".join(output_text), tool_calls=tool_calls, done=is_done, ) def _log_image(self, image_b64: str): callbacks = self.computer_agent.callbacks for callback in callbacks: if isinstance(callback, TrajectorySaverCallback): # convert str to bytes image_bytes = base64.b64decode(image_b64) callback._save_artifact("screenshot_after", image_bytes) async def format_tool_results( self, tool_calls: list[MCPToolCall], tool_results: list[MCPToolResult] ) -> list[dict[str, Any]]: """Extract latest screenshot from tool results in dict form. Expects results to already be in the message-format content dicts. Returns a list of input content dicts suitable for follow-up calls. """ messages = [] for call, result in zip(tool_calls, tool_results): if call.id not in self.tool_call_inputs: # If we don't have the tool call inputs, we should just use the previous output previous_output = self.previous_output.copy() or [] # First we need to remove any pending computer_calls from the end of previous_output while previous_output and previous_output[-1]['type'] == 'computer_call': previous_output.pop() messages.extend(previous_output) # If the call is a 'response', don't add the result if call.name == 'response': continue # Otherwise, if we have a result, we should add it to the messages content = [ { "type": "input_text", "text": content.text } if isinstance(content, types.TextContent) else { "type": "input_image", "image_url": f"data:image/png;base64,{content.data}" } if isinstance(content, types.ImageContent) else { "type": "input_text", "text": "" } for content in result.content ] messages.append({ "role": "user", "content": content, }) continue # Add the assistant's computer call messages.extend(self.tool_call_inputs[call.id]) if result.isError: error_text = "".join([ content.text for content in result.content if isinstance(content, types.TextContent) ]) # Replace computer call with failed tool call messages.pop() messages.extend(make_failed_tool_call_items( tool_name=call.name, tool_kwargs=call.arguments or {}, error_message=error_text, call_id=call.id, )) else: # Get the latest screenshot screenshots = [ content.data for content in result.content if isinstance(content, types.ImageContent) ] # Add the resulting screenshot if screenshots: self._log_image(screenshots[0]) self.last_screenshot_b64 = screenshots[0] messages.append({ "type": "computer_call_output", "call_id": call.id, "output": { "type": "input_image", "image_url": f"data:image/png;base64,{screenshots[0]}" }, }) else: # Otherwise, replace computer call with failed tool call messages.pop() messages.extend(make_failed_tool_call_items( tool_name=call.name, tool_kwargs=call.arguments or {}, error_message="No screenshots returned.", call_id=call.id, )) return messages __all__ = [ "MCPComputerAgent", ] ```