#
tokens: 47104/50000 14/513 files (page 8/16)
lines: off (toggle) GitHub
raw markdown copy
This is page 8 of 16. Use http://codebase.md/trycua/cua?page={x} to view the full context.

# Directory Structure

```
├── .all-contributorsrc
├── .cursorignore
├── .devcontainer
│   ├── devcontainer.json
│   ├── post-install.sh
│   └── README.md
├── .dockerignore
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── ci-lume.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-pylume.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       └── test-validation-script.yml
├── .gitignore
├── .vscode
│   ├── docs.code-workspace
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── composite-agents.md
│   ├── cua-hackathon.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .gitignore
│   ├── .prettierrc
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   └── meta.json
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── meta.json
│   │       │   └── sandboxed-python.mdx
│   │       ├── index.mdx
│   │       ├── libraries
│   │       │   ├── agent
│   │       │   │   └── index.mdx
│   │       │   ├── computer
│   │       │   │   └── index.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── core
│   │       │   │   └── index.mdx
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   └── som
│   │       │       ├── configuration.mdx
│   │       │       └── index.mdx
│   │       ├── meta.json
│   │       ├── quickstart-cli.mdx
│   │       ├── quickstart-devs.mdx
│   │       └── telemetry.mdx
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   └── llms.txt
│   │   │       └── route.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── iou.tsx
│   │   │   └── mermaid.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   └── mdx-components.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── .prettierrc
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   └── uitars.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   └── test_connection.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── scripts
│   │   │       ├── install_mcp_server.sh
│   │   │       └── start_mcp_server.sh
│   │   ├── pylume
│   │   │   ├── __init__.py
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── pylume
│   │   │   │   ├── __init__.py
│   │   │   │   ├── client.py
│   │   │   │   ├── exceptions.py
│   │   │   │   ├── lume
│   │   │   │   ├── models.py
│   │   │   │   ├── pylume.py
│   │   │   │   └── server.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           └── test_omniparser.py
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── biome.json
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Dockerfile
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── pylume_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── pdm.lock
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── samples
│   └── community
│       ├── global-online
│       │   └── README.md
│       └── hack-the-north
│           └── README.md
├── scripts
│   ├── build-uv.sh
│   ├── build.ps1
│   ├── build.sh
│   ├── cleanup.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   └── run-docker-dev.sh
└── tests
    ├── pytest.ini
    ├── shell_cmd.py
    ├── test_files.py
    ├── test_mcp_server_session_management.py
    ├── test_mcp_server_streaming.py
    ├── test_shell_bash.py
    ├── test_telemetry.py
    ├── test_venv.py
    └── test_watchdog.py
```

# Files

--------------------------------------------------------------------------------
/examples/evals/wikipedia_most_linked.txt:
--------------------------------------------------------------------------------

```
ISBN (identifier)
United States
Main Page
Tilde
Doi (identifier)
Fair use
Association football
Years
Wayback Machine
ISSN (identifier)
India
Wikimedia Foundation
Wikidata
Animal
Taxonomy (biology)
Australia
France
Eukaryote
IP address
U.S. state
Time zone
City
Copyright
Canada
Town
ASCII
Greek alphabet
Typographic ligature
Diacritical mark
Wikipedia
Germany
Human settlement
Open Tree of Life
IMDb (identifier)
United Kingdom
Catalogue of Life
Insect
Russia
Japan
Italy
Arthropod
Television show
Public domain
INaturalist
Poland
England
PMID (identifier)
Daylight saving time
S2CID (identifier)
China
Encyclopedia of Life
Spain
OCLC (identifier)
Plant
Flickr
Wikispecies
Africa
Song
Record label
Lepidoptera
Iran
English language
Music genre
News aggregator
Web feed
Proxy server
X-Forwarded-For
College football
World War II
Brazil
Sweden
Politics
Olympics
Netherlands
Record producer
California
New York City
Surname
The New York Times
London
New Zealand
PMC (identifier)
Logo
Synonym (taxonomy)
Switzerland
Turkey
Sport
Video game
Architecture
Norway
Bibcode (identifier)
Mexico
Botany
JSTOR (identifier)
Rail transport
Field hockey
Ireland
Scotland
Belgium
South Africa
Common name
Professional sports
Sport governing body
Sport industry
Olympic games
Election
Austria
Ukraine
Anthroponymy
Pakistan
Baseball
Denmark
Christianity
Philippines
Woman
Romania
Czech Republic
Album
Godzilla Minus One
Single (music)
Electoral reform
Nofollow
Basketball
New York (state)
Argentina
Finland
Soviet Union
Greece
Russian language
Historic site
Free content
YouTube
Catholic Church
Hungary
Kingdom Hearts
Beetle
Company
Tetris
Portugal
BioShock
Abandonware
Deus Ex (video game)
4A Engine
Yoshi's New Island
Kaboom! (video game)
Rain World
Juno (Overwatch)
Crash Team Rumble
Vault 101
Tales of Commons
NHL Hockey
Clutch Gaming
Haseo
Allin Kempthorne
Ilyas El Maliki
Ratalaika Games
3D mousepad
HaptX
Walid Sultan Midani
Rustler (video game)
Look Outside
Ducks Ahoy!
Fusion Engine
Cricket
Geography
Chordate
The Guardian
Israel
Billboard (magazine)
Ice hockey
Given name
Chicago
World War I
Pennsylvania
Indonesia
Alma mater
Vascular plant
Amorphea
Wikimedia Commons
Novel
Village
Visual arts
Film poster
Flowering plant
Opisthokont
Obazoa
County seat
Short story
First-class cricket
Law
Europe
University
Croatia
Sport of athletics
Holozoa
Choanozoa
Filozoa
German language
Tennis
Eumetazoa
Serbia
ParaHoxozoa
Thailand
History
Midfielder
Bilateria
Unincorporated area
French language
AllMusic
Astronomy
Nephrozoa
Novella
Ship
Twitter
Character (arts)
College
Malaysia
Conflict of interest
Higher education
IUCN Red List
Rock music
Gastropoda
Creative Commons
Wales
Bulgaria
UTC+2
Paris
Species
Illinois
HTML element
South Korea
BBC
Persian language
Moth
Conservation status
Pop music
Colombia
Wicket
American football
Jazz
World Flora Online
Los Angeles
Songwriter
Hong Kong
Hdl (identifier)
Genus
Spanish language
Egypt
Not out
Slovenia
Chile
Korea
Tropicos
Slovakia
Bishop
Family (biology)
Rugby union
Women's history
Nigeria
College basketball
Sports Reference
Washington, D.C.
GFDL
Afghanistan
Sri Lanka
Newspapers.com
UTC+1
Eudicots
Estonia
Los Angeles Times
Olympedia
Bangladesh
Peru
Singapore
Typographical error
UTC
Virginia
Taiwan
Fast bowling
COVID-19 pandemic
Food
Fish
River
Republic of Ireland
Beer
Caribbean
Michigan
Drink
Chinese language
Business
Leg break
Women's Test cricket
Women's cricket
Innings
New Jersey
Protostome
Spin bowling
Sugar
Underarm bowling
Roger Federer
Googly
Apple
Comics
Cricket Australia XI
Fair and unfair play
Anime
Rafael Nadal
Leander Paes
Kazakhstan
Capital city
Blessed Virgin Mary
Venezuela
Case sensitivity
Arabic language
North America
Texas
Burger King
The Plant List
Justine Henin
Sushi
Angelus
Beef
Sanctification
Cuthbert Tunstall
Bread
Saint Mungo
Incumbent
Americanism (heresy)
Curry
Ensoulment
Associated Press
Adolph John Paschang
French cuisine
Altar Society
UTC-5
Philadelphia
Bill Mallon
Yogurt
Soy sauce
Open Era (tennis)
Belarus
Manga
English Wikipedia
Islam
Trademark
ISO 4
Wisconsin
Lithuania
The Washington Post
Agaricus bisporus
Reptile
Sociology
Organizations
Death
Ham and eggs
Asia
Swimming (sport)
South America
Northern Ireland
Observation.org
European Union
Astronomical object
Georgia (U.S. state)
Gmina
Provinces of Iran
Computing
Counties of Iran
Discogs
Mathematics
Powiat
Missouri
Bachelor of Arts
Iran Standard Time
Florida
Bakhsh
Minnesota
Oregon
Nepal
Variety (magazine)
Japanese language
Journalism
Rome
Computer
Ohio
Ontario
Internet Archive
Latvia
Comedy
Azerbaijan
BBC News
Morocco
Ecdysozoa
Print-on-demand
Bengali language
A5 paper
Pedia Press
Education
Mollusca
American Civil War
Berlin
Taxon
Maryland
Panarthropoda
Hebrew language
Toronto
Tactopoda
Episode
Cuba
Country music
Religion
Rotten Tomatoes
Georgia (country)
Classical music
Month
Puerto Rico
GEOnet Names Server
Sydney
The Times
Iraq
Polyphaga
Derivative work
Lisbon
Syria
Ecuador
Uzbekistan
Greek language
Latin
United Nations
Literature
Animation
Physics
Amphibian
Romanize
List of countries
Moscow
Politician
Philosophy
Metacritic
Mammal
Pinyin
Open access
New South Wales
Theatre
Allmusic
Syntax
Women in music
Fly
Colorado
Academic journal
LGBTQ
Seal (emblem)
Rolling Stone
Saudi Arabia
Science fiction
Tweet (social media)
Heavy metal music
Boston
Vietnam
Molecular biology
Facebook
Iceland
Albania
Cycling
Tennessee
Armenia
Massachusetts
Mandibulata
United States Navy
Communes of France
Census
Algeria
United States Army
Wikilink
Pancrustacea
Alternative rock
American English
Radio stations
History of Romania
Endemism
San Francisco
Award
Ghana
Judaism
Alabama
Blog
The Independent
Melbourne
Cantons of France
Lebanon
West Germany
Quotation mark
Regions of France
Chernivtsi Oblast
Tokyo
Italian language
Connecticut
Country
Screenshot
Ghost town
Iran Daylight Time
NatureServe
Mongolia
Cyprus
Northern Bukovina
Rugby league
Northern Bessarabia
State highway
Harvard University
Yorkshire
Pterygota
Slash (punctuation)
Prize
Science
Asian Games
Eastern Time Zone
Myanmar
Nazi Germany
Ottoman Empire
Quebec
Billboard Hot 100
United Arab Emirates
Neoptera
Hexapoda
Least Concern
Type species
EPPO Code
Wikisource
Kyrgyzstan
Allotriocarida
Volleyball
Geology
Second World War
British Columbia
Socialism
Zoology
The Daily Telegraph
Paleontology
Vienna
Dicondylia
BugGuide
United States Senate
Hermit crab
Paraphrase
CNN
Royal Navy
Indian Standard Time
Billboard 200
Kenya
DVD
Sipuncula
Tajikistan
National park
Economics
Heterocyathus
Uruguay
Heteropsammia
Road
Spanish name
Luxembourg
Korean language
UK Singles Chart
Queensland
Montreal
New York Times
Bolivia
CP/M
Timestamp
Electronic music
INSEE code
ArXiv (identifier)
PubMed
SVG
USA Today
Omnivore
Tunisia
Psychology
ESPN
UEFA
Hawaii
Gastropod
Aliyah
North Carolina
Russian Empire
Tibet
Fungi
Oklahoma
Fauna Europaea
Turkmenistan
British English
The London Gazette
Civil township
Boxing
Barack Obama
Animal Diversity Web
Reuters
Eumetabola
Voter turnout
Transport
False positive
Donald Trump
Kansas
Antarctica
Lake
Ethiopia
Time (magazine)
Marriage
NBC
Beijing
Vertebrate
Czechoslovakia
Protected area
Energy
Poetry
Archaeology
Columbia University
Poverty line
Alaska
Computing platform
British Empire
University of Oxford
Costa Rica
Dublin
A-side and B-side
ZIP code
Actinopterygii
UTC-6
Photoperiodism
Mayor
Sphaeriidae
Animal suicide
Atka mackerel
Starling
Arizona
Entertainment Weekly
Sphaerium beckmani
Junqueira cow
Zaniolepis frenata
Campocraspedon
Zimbabwe
Motorsport
Bird flight
Cnemophilidae
Hinduism
Phalarope
Indiana
Museums
Holometabola
Pytilia
North Macedonia
Malta
Cathartiformes
Darter
Saker falcon
Cathartes
Avian malaria
Coal tit
Magpie duck
Video game developer
Bird bath
Vesper sparrow
Gouldian finch
Debeaking
Vector graphics
Semiplumbeous hawk
Scottish crossbill
Bullfinch
Fregata
Nidicolous
Plushcap
Pallid scops owl
Hip-hop
Blyth's frogmouth
Sunda scops owl
Argus (bird)
Operation Migration
Nik Borrow
Per capita income
Guy Oseary
Madrid
Buddhism
Drainage basin
Sephardic Haredim
Rami Kleinstein
Guy Bavli
David Bar-Hayim
Levin Kipnis
Edna Arbel
Prisoner of Zion
Ayala Procaccia
Nachum Heiman
Zman Tel Aviv
CBS
ARIA Charts
Cucujiformia
Away colours
Regex
2019 African Games
1962 Asian Games
1958 Asian Games
Chemistry
Olympic Games
The Middle Ages
Central Asia
Bengalis
Southeast Asia
Find a Grave
Microsoft Windows
Swing (politics)
White (U.S. Census)
Roman Catholic
Maine
The Times of India
Season (sports)
Jamaica
Video game genre
Munich
Asterids
Rosids
Golf
Language
Hangul
Atlanta
Glasgow
UTC+3
Library of Congress
Deuterostome
COVID-19
Video game publisher
Montenegro
ESPNcricinfo
Brand
UTC-4
IGN
Stockholm
Istanbul
NASA
Gnathostomata
Ukrainian language
Human rights
Chicago Tribune
ProQuest
IMDb
River mouth
Hip hop music
Gene
Netflix
Moldova
Barcelona
Paraguay
Olfactores
Labour Party (UK)
United States dollar
Qatar
Photography
Guatemala
Summit
Cold War
Running
First World War
Precipitation
Edinburgh
Amsterdam
Lima
New Eskaton
Computer program
Xinjiang
Women in science
Manhattan
Warsaw
Magazine
Horror film
Deadline Hollywood
Jordan
Aparaglossata
Agriculture
Internet
Prague
The Hindu
Cretaceous
Latino (U.S. Census)
Vietnam War
Music download
Encyclopedia
Chemical compounds
Pittsburgh
Soap opera
Budapest
George W. Bush
Seattle
Extended play
Washington (state)
Listed building
Palestine
LCCN (identifier)
Portland, Oregon
Panama
Plagiarism
Brooklyn
Teleostomi
Manchester
Bird
Mollusk
Automobile
Historic England
Linguistics
Dependent territory
Athens
Civil engineering
Sea snail
Population density
Finance
Disaster management
Tanzania
Jurassic
Districts of Russia
Western Australia
Louisiana
Portuguese language
Anatomy
The Beatles
Tamil language
Milan
Uganda
Natural environment
FIFA
Cameroon
Blu-ray
Mexico City
Chemical formula
Jimmy Wales
Papua New Guinea
Diaphoretickes
UNESCO
Forbes
Technology
Buenos Aires
Vancouver
Dominican Republic
2007
Species description
East Germany
Folk music
Kentucky
Multimedia
Monocotyledon
Rio de Janeiro
Automated
Hindi
Houston
Google
Devonian
Member of Parliament
Bible
Mumbai
FishBase
African diaspora
Carboniferous
Cambrian
Triassic
Montana
Handball
Ordovician
San Diego
Archive.today
Stanford University
British Army
Middle Ages
Frequency
Ultratop
Permian
Detroit
Earth
Precambrian
Hamburg
Alberta
Tamil Nadu
Madagascar
Lancashire
Guitar
Trade union
Instagram
Engineering
2006
Silurian
NPR
Railway station
CAS Registry Number
Yemen
Noctuoidea
Fiji
Haiti
Rowing (sport)
New Orleans
NME
Alternative media
North Korea
Microsoft
Jerusalem
Paleogene
Audery Mill Creek
Horse racing
Post town
Piano
Bavaria
Polish language
Horror fiction
Neogene
Kerala
Copenhagen
Google Books
Central Time Zone
Island
Birmingham
Anglicanism
Software
Mountain range
Investment
Brussels
Muhammad Ali
Asian (U.S. Census)
Video game culture
Brisbane
Church of England
Kosovo
Bachelor of Science
Molar mass
Arachnid
Own goal
Yale University
Caenogastropoda
Auckland
World Athletics
Trinidad and Tobago
Hanyu Pinyin
Sound bite
Time
El Salvador
Microbiology
Columbia Records
Seoul
Cerambycidae
Maharashtra
Chelicerata
Fungus
Media influence
South Carolina
Radio
Telenovela
FA Cup
Senegal
Internet trolling
Nashville, Tennessee
Demonym
Standard Chinese
Sculpture
Liverpool
Thesis
Bass guitar
Chess
Women artists
Icon (computing)
PubChem
UK Albums Chart
Head coach
Roman Empire
Grand Slam (tennis)
JSmol
Formula One
Biology
Kent
Ancient Rome
Inner Carniola
Oslo
Dutch language
Wingspan
Archaeplastida
MTV
Edvard Ravnikar
ITunes
Feminism
German Empire
Pacific Ocean
Atlantic Ocean
Pharmacology
Track gauge
ChemSpider
Doctor of Philosophy
Regions of England
Districts of England
Christmas
Pavel Golia
Predjama Castle
Overtime (sports)
Forum
Swiss Hitparade
Stumped
Majority
Male
Shanghai
Siddharta (band)
```

--------------------------------------------------------------------------------
/blog/training-computer-use-models-trajectories-1.md:
--------------------------------------------------------------------------------

```markdown
# Training Computer-Use Models: Creating Human Trajectories with Cua

*Published on May 1, 2025 by Dillon DuPont*

In our previous posts, we covered [building your own Computer-Use Operator](build-your-own-operator-on-macos-1) and [using the Agent framework](build-your-own-operator-on-macos-2) to simplify development. Today, we'll focus on a critical aspect of improving computer-use agents and models: gathering high-quality demonstration data using Cua's Computer-Use Interface (CUI) and its Gradio UI to create and share human-generated trajectories.

Why is this important? Underlying models used by Computer-use agents need examples of how humans interact with computers to learn effectively. By creating a dataset of diverse, well-executed tasks, we can help train better models that understand how to navigate user interfaces and accomplish real tasks.

<video src="https://github.com/user-attachments/assets/c586d460-3877-4b5f-a736-3248886d2134" controls width="600"></video>


## What You'll Learn

By the end of this tutorial, you'll be able to:
- Set up the Computer-Use Interface (CUI) with Gradio UI support
- Record your own computer interaction trajectories
- Organize and tag your demonstrations
- Upload your datasets to Hugging Face for community sharing
- Contribute to improving computer-use AI for everyone

**Prerequisites:**
- macOS Sonoma (14.0) or later
- Python 3.10+
- Basic familiarity with Python and terminal commands
- A Hugging Face account (for uploading datasets)

**Estimated Time:** 20-30 minutes

## Understanding Human Trajectories

### What are Human Trajectories?

Human trajectories, in the context of Computer-use AI Agents, are recordings of how humans interact with computer interfaces to complete tasks. These interactions include:

- Mouse movements, clicks, and scrolls
- Keyboard input
- Changes in the UI state
- Time spent on different elements

These trajectories serve as examples for AI models to learn from, helping them understand the relationship between:
1. The visual state of the screen
2. The user's goal or task
3. The most appropriate action to take

### Why Human Demonstrations Matter

Unlike synthetic data or rule-based automation, human demonstrations capture the nuanced decision-making that happens during computer interaction:

- **Natural Pacing**: Humans pause to think, accelerate through familiar patterns, and adjust to unexpected UI changes
- **Error Recovery**: Humans demonstrate how to recover from mistakes or handle unexpected states
- **Context-Sensitive Actions**: The same UI element might be used differently depending on the task context

By contributing high-quality demonstrations, you're helping to create more capable, human-like computer-use AI systems.

## Setting Up Your Environment

### Installing the CUI with Gradio Support

The Computer-Use Interface includes an optional Gradio UI specifically designed to make recording and sharing demonstrations easy. Let's set it up:

1. **Create a Python environment** (optional but recommended):
   ```bash
   # Using conda
   conda create -n cua-trajectories python=3.10
   conda activate cua-trajectories
   
   # Using venv
   python -m venv cua-trajectories
   source cua-trajectories/bin/activate  # On macOS/Linux
   ```

2. **Install the CUI package with UI support**:
   ```bash
   pip install "cua-computer[ui]"
   ```

3. **Set up your Hugging Face access token**:
   Create a `.env` file in your project directory and add your Hugging Face token:
   ```bash
   echo "HF_TOKEN=your_huggingface_token" > .env
   ```
   You can get your token from your [Hugging Face account settings](https://huggingface.co/settings/tokens).

### Understanding the Gradio UI

The Computer-Use Interface Gradio UI provides three main components:

1. **Recording Panel**: Captures your screen, mouse, and keyboard activity during demonstrations
2. **Review Panel**: Allows you to review, tag, and organize your demonstration recordings
3. **Upload Panel**: Lets you share your demonstrations with the community via Hugging Face

The UI is designed to make the entire process seamless, from recording to sharing, without requiring deep technical knowledge of the underlying systems.

## Creating Your First Trajectory Dataset

### Launching the UI

To get started, create a simple Python script to launch the Gradio UI:

```python
# launch_trajectory_ui.py
from computer.ui.gradio.app import create_gradio_ui
from dotenv import load_dotenv

# Load your Hugging Face token from .env
load_dotenv('.env')

# Create and launch the UI
app = create_gradio_ui()
app.launch(share=False)
```

Run this script to start the UI:

```bash
python launch_trajectory_ui.py
```

### Recording a Demonstration

Let's walk through the process of recording your first demonstration:

1. **Start the VM**: Click the "Initialize Computer" button in the UI to initialize a fresh macOS sandbox. This ensures your demonstrations are clean and reproducible.
2. **Perform a Task**: Complete a simple task like creating a document, organizing files, or searching for information. Natural, everyday tasks make the best demonstrations.
3. **Review Recording**: Click the "Conversation Logs" or "Function Logs" tabs to review your captured interactions, making sure there is no personal information that you wouldn't want to share.
4. **Add Metadata**: In the "Save/Share Demonstrations" tab, give your recording a descriptive name (e.g., "Creating a Calendar Event") and add relevant tags (e.g., "productivity", "time-management").
5. **Save Your Demonstration**: Click "Save" to store your recording locally.

<video src="https://github.com/user-attachments/assets/de3c3477-62fe-413c-998d-4063e48de176" controls width="600"></video>

### Key Tips for Quality Demonstrations

To create the most valuable demonstrations:

- **Start and end at logical points**: Begin with a clear starting state and end when the task is visibly complete
- **Narrate your thought process**: Use the message input to describe what you're trying to do and why
- **Move at a natural pace**: Don't rush or perform actions artificially slowly
- **Include error recovery**: If you make a mistake, keep going and show how to correct it
- **Demonstrate variations**: Record multiple ways to complete the same task

## Organizing and Tagging Demonstrations

Effective tagging and organization make your demonstrations more valuable to researchers and model developers. Consider these tagging strategies:

### Task-Based Tags

Describe what the demonstration accomplishes:
- `web-browsing`
- `document-editing`
- `file-management`
- `email`
- `scheduling`

### Application Tags

Identify the applications used:
- `finder`
- `safari`
- `notes`
- `terminal`
- `calendar`

### Complexity Tags

Indicate the difficulty level:
- `beginner`
- `intermediate`
- `advanced`
- `multi-application`

### UI Element Tags

Highlight specific UI interactions:
- `drag-and-drop`
- `menu-navigation`
- `form-filling`
- `search`

The Computer-Use Interface UI allows you to apply and manage these tags across all your saved demonstrations, making it easy to create cohesive, well-organized datasets.

<video src="https://github.com/user-attachments/assets/5ad1df37-026a-457f-8b49-922ae805faef" controls width="600"></video>

## Uploading to Hugging Face

Sharing your demonstrations helps advance research in computer-use AI. The Gradio UI makes uploading to Hugging Face simple:

### Preparing for Upload

1. **Review Your Demonstrations**: Use the review panel to ensure all demonstrations are complete and correctly tagged.

2. **Select Demonstrations to Upload**: You can upload all demonstrations or filter by specific tags.

3. **Configure Dataset Information**:
   - **Repository Name**: Format as `{your_username}/{dataset_name}`, e.g., `johndoe/productivity-tasks`
   - **Visibility**: Choose `public` to contribute to the community or `private` for personal use
   - **License**: Standard licenses like CC-BY or MIT are recommended for public datasets

### The Upload Process

1. **Click "Upload to Hugging Face"**: This initiates the upload preparation.

2. **Review Dataset Summary**: Confirm the number of demonstrations and total size.

3. **Confirm Upload**: The UI will show progress as files are transferred.

4. **Receive Confirmation**: Once complete, you'll see a link to your new dataset on Hugging Face.

<video src="https://github.com/user-attachments/assets/c586d460-3877-4b5f-a736-3248886d2134" controls width="600"></video>

Your uploaded dataset will have a standardized format with the following structure:

```json
{
  "timestamp": "2025-05-01T09:20:40.594878",
  "session_id": "1fe9f0fe-9331-4078-aacd-ec7ffb483b86",
  "name": "penguin lemon forest",
  "tool_calls": [...],  // Detailed interaction records
  "messages": [...],    // User/assistant messages
  "tags": ["highquality", "tasks"],
  "images": [...]       // Screenshots of each state
}
```

This structured format makes it easy for researchers to analyze patterns across different demonstrations and build better computer-use models.

```python
from computer import Computer

computer = Computer(os_type="macos", display="1024x768", memory="8GB", cpu="4")
try:
    await computer.run()
    
    screenshot = await computer.interface.screenshot()
    with open("screenshot.png", "wb") as f:
        f.write(screenshot)
    
    await computer.interface.move_cursor(100, 100)
    await computer.interface.left_click()
    await computer.interface.right_click(300, 300)
    await computer.interface.double_click(400, 400)

    await computer.interface.type("Hello, World!")
    await computer.interface.press_key("enter")

    await computer.interface.set_clipboard("Test clipboard")
    content = await computer.interface.copy_to_clipboard()
    print(f"Clipboard content: {content}")
finally:
    await computer.stop()
```

## Example: Shopping List Demonstration

Let's walk through a concrete example of creating a valuable demonstration:

### Task: Adding Shopping List Items to a Doordash Cart

1. **Start Recording**: Begin with a clean desktop and a text file containing a shopping list.

2. **Task Execution**: Open the file, read the list, open Safari, navigate to Doordash, and add each item to the cart.

3. **Narration**: Add messages like "Reading the shopping list" and "Searching for rice on Doordash" to provide context.

4. **Completion**: Verify all items are in the cart and end the recording.

5. **Tagging**: Add tags like `shopping`, `web-browsing`, `task-completion`, and `multi-step`.

This type of demonstration is particularly valuable because it showcases real-world task completion requiring multiple applications and context switching.

### Exploring Community Datasets

You can also learn from existing trajectory datasets contributed by the community:

1. Visit [Hugging Face Datasets tagged with 'cua'](https://huggingface.co/datasets?other=cua)
2. Explore different approaches to similar tasks
3. Download and analyze high-quality demonstrations

## Conclusion

### Summary

In this guide, we've covered how to:
- Set up the Computer-Use Interface with Gradio UI
- Record high-quality human demonstrations
- Organize and tag your trajectories
- Share your datasets with the community

By contributing your own demonstrations, you're helping to build more capable, human-like AI systems that can understand and execute complex computer tasks.

### Next Steps

Now that you know how to create and share trajectories, consider these advanced techniques:

- Create themed collections around specific productivity workflows
- Collaborate with others to build comprehensive datasets
- Use your datasets to fine-tune your own computer-use models

### Resources

- [Computer-Use Interface GitHub](https://github.com/trycua/cua/tree/main/libs/computer)
- [Hugging Face Datasets Documentation](https://huggingface.co/docs/datasets)
- [Example Dataset: ddupont/test-dataset](https://huggingface.co/datasets/ddupont/test-dataset)

```

--------------------------------------------------------------------------------
/libs/python/pylume/pylume/pylume.py:
--------------------------------------------------------------------------------

```python
import os
import sys
import json
import time
import asyncio
import subprocess
from typing import Optional, List, Union, Callable, TypeVar, Any
from functools import wraps
import re
import signal

from .server import LumeServer
from .client import LumeClient
from .models import (
    VMConfig,
    VMStatus,
    VMRunOpts,
    VMUpdateOpts,
    ImageRef,
    CloneSpec,
    SharedDirectory,
    ImageList,
)
from .exceptions import (
    LumeError,
    LumeServerError,
    LumeConnectionError,
    LumeTimeoutError,
    LumeNotFoundError,
    LumeConfigError,
    LumeVMError,
    LumeImageError,
)

# Type variable for the decorator
T = TypeVar("T")


def ensure_server(func: Callable[..., T]) -> Callable[..., T]:
    """Decorator to ensure server is running before executing the method."""

    @wraps(func)
    async def wrapper(self: "PyLume", *args: Any, **kwargs: Any) -> T:
        # ensure_running is an async method, so we need to await it
        await self.server.ensure_running()
        # Initialize client if needed
        await self._init_client()
        return await func(self, *args, **kwargs)  # type: ignore

    return wrapper  # type: ignore


class PyLume:
    def __init__(
        self,
        debug: bool = False,
        server_start_timeout: int = 60,
        port: Optional[int] = None,
        use_existing_server: bool = False,
        host: str = "localhost",
    ):
        """Initialize the async PyLume client.

        Args:
            debug: Enable debug logging
            auto_start_server: Whether to automatically start the lume server if not running
            server_start_timeout: Timeout in seconds to wait for server to start
            port: Port number for the lume server. Required when use_existing_server is True.
            use_existing_server: If True, will try to connect to an existing server on the specified port
                               instead of starting a new one.
            host: Host to use for connections (e.g., "localhost", "127.0.0.1", "host.docker.internal")
        """
        if use_existing_server and port is None:
            raise LumeConfigError("Port must be specified when using an existing server")

        self.server = LumeServer(
            debug=debug,
            server_start_timeout=server_start_timeout,
            port=port,
            use_existing_server=use_existing_server,
            host=host,
        )
        self.client = None

    async def __aenter__(self) -> "PyLume":
        """Async context manager entry."""
        if self.server.use_existing_server:
            # Just ensure base_url is set for existing server
            if self.server.requested_port is None:
                raise LumeConfigError("Port must be specified when using an existing server")

            if not self.server.base_url:
                self.server.port = self.server.requested_port
                self.server.base_url = f"http://{self.server.host}:{self.server.port}/lume"

        # Ensure the server is running (will connect to existing or start new as needed)
        await self.server.ensure_running()

        # Initialize the client
        await self._init_client()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""
        if self.client is not None:
            await self.client.close()
        await self.server.stop()

    async def _init_client(self) -> None:
        """Initialize the client if not already initialized."""
        if self.client is None:
            if self.server.base_url is None:
                raise RuntimeError("Server base URL not set")
            self.client = LumeClient(self.server.base_url, debug=self.server.debug)

    def _log_debug(self, message: str, **kwargs) -> None:
        """Log debug information if debug mode is enabled."""
        if self.server.debug:
            print(f"DEBUG: {message}")
            if kwargs:
                print(json.dumps(kwargs, indent=2))

    async def _handle_api_error(self, e: Exception, operation: str) -> None:
        """Handle API errors and raise appropriate custom exceptions."""
        if isinstance(e, subprocess.SubprocessError):
            raise LumeConnectionError(f"Failed to connect to PyLume server: {str(e)}")
        elif isinstance(e, asyncio.TimeoutError):
            raise LumeTimeoutError(f"Request timed out: {str(e)}")

        if not hasattr(e, "status") and not isinstance(e, subprocess.CalledProcessError):
            raise LumeServerError(f"Unknown error during {operation}: {str(e)}")

        status_code = getattr(e, "status", 500)
        response_text = str(e)

        self._log_debug(
            f"{operation} request failed", status_code=status_code, response_text=response_text
        )

        if status_code == 404:
            raise LumeNotFoundError(f"Resource not found during {operation}")
        elif status_code == 400:
            raise LumeConfigError(f"Invalid configuration for {operation}: {response_text}")
        elif status_code >= 500:
            raise LumeServerError(
                f"Server error during {operation}",
                status_code=status_code,
                response_text=response_text,
            )
        else:
            raise LumeServerError(
                f"Error during {operation}", status_code=status_code, response_text=response_text
            )

    async def _read_output(self) -> None:
        """Read and log server output."""
        try:
            while True:
                if not self.server.server_process or self.server.server_process.poll() is not None:
                    self._log_debug("Server process ended")
                    break

                # Read stdout without blocking
                if self.server.server_process.stdout:
                    while True:
                        line = self.server.server_process.stdout.readline()
                        if not line:
                            break
                        line = line.strip()
                        self._log_debug(f"Server stdout: {line}")
                        if "Server started" in line.decode("utf-8"):
                            self._log_debug("Detected server started message")
                            return

                # Read stderr without blocking
                if self.server.server_process.stderr:
                    while True:
                        line = self.server.server_process.stderr.readline()
                        if not line:
                            break
                        line = line.strip()
                        self._log_debug(f"Server stderr: {line}")
                        if "error" in line.decode("utf-8").lower():
                            raise RuntimeError(f"Server error: {line}")

                await asyncio.sleep(0.1)  # Small delay to prevent CPU spinning
        except Exception as e:
            self._log_debug(f"Error in output reader: {str(e)}")
            raise

    @ensure_server
    async def create_vm(self, spec: Union[VMConfig, dict]) -> None:
        """Create a VM with the given configuration."""
        # Ensure client is initialized
        await self._init_client()

        if isinstance(spec, VMConfig):
            spec = spec.model_dump(by_alias=True, exclude_none=True)

        # Suppress optional attribute access errors
        self.client.print_curl("POST", "/vms", spec)  # type: ignore[attr-defined]
        await self.client.post("/vms", spec)  # type: ignore[attr-defined]

    @ensure_server
    async def run_vm(self, name: str, opts: Optional[Union[VMRunOpts, dict]] = None) -> None:
        """Run a VM."""
        if opts is None:
            opts = VMRunOpts(no_display=False)  # type: ignore[attr-defined]
        elif isinstance(opts, dict):
            opts = VMRunOpts(**opts)

        payload = opts.model_dump(by_alias=True, exclude_none=True)
        self.client.print_curl("POST", f"/vms/{name}/run", payload)  # type: ignore[attr-defined]
        await self.client.post(f"/vms/{name}/run", payload)  # type: ignore[attr-defined]

    @ensure_server
    async def list_vms(self) -> List[VMStatus]:
        """List all VMs."""
        data = await self.client.get("/vms")  # type: ignore[attr-defined]
        return [VMStatus.model_validate(vm) for vm in data]

    @ensure_server
    async def get_vm(self, name: str) -> VMStatus:
        """Get VM details."""
        data = await self.client.get(f"/vms/{name}")  # type: ignore[attr-defined]
        return VMStatus.model_validate(data)

    @ensure_server
    async def update_vm(self, name: str, params: Union[VMUpdateOpts, dict]) -> None:
        """Update VM settings."""
        if isinstance(params, dict):
            params = VMUpdateOpts(**params)

        payload = params.model_dump(by_alias=True, exclude_none=True)
        self.client.print_curl("PATCH", f"/vms/{name}", payload)  # type: ignore[attr-defined]
        await self.client.patch(f"/vms/{name}", payload)  # type: ignore[attr-defined]

    @ensure_server
    async def stop_vm(self, name: str) -> None:
        """Stop a VM."""
        await self.client.post(f"/vms/{name}/stop")  # type: ignore[attr-defined]

    @ensure_server
    async def delete_vm(self, name: str) -> None:
        """Delete a VM."""
        await self.client.delete(f"/vms/{name}")  # type: ignore[attr-defined]

    @ensure_server
    async def pull_image(
        self, spec: Union[ImageRef, dict, str], name: Optional[str] = None
    ) -> None:
        """Pull a VM image."""
        await self._init_client()
        if isinstance(spec, str):
            if ":" in spec:
                image_str = spec
            else:
                image_str = f"{spec}:latest"
            registry = "ghcr.io"
            organization = "trycua"
        elif isinstance(spec, dict):
            image = spec.get("image", "")
            tag = spec.get("tag", "latest")
            image_str = f"{image}:{tag}"
            registry = spec.get("registry", "ghcr.io")
            organization = spec.get("organization", "trycua")
        else:
            image_str = f"{spec.image}:{spec.tag}"
            registry = spec.registry
            organization = spec.organization

        payload = {
            "image": image_str,
            "name": name,
            "registry": registry,
            "organization": organization,
        }

        self.client.print_curl("POST", "/pull", payload)  # type: ignore[attr-defined]
        await self.client.post("/pull", payload, timeout=300.0)  # type: ignore[attr-defined]

    @ensure_server
    async def clone_vm(self, name: str, new_name: str) -> None:
        """Clone a VM with the given name to a new VM with new_name."""
        config = CloneSpec(name=name, newName=new_name)
        self.client.print_curl("POST", "/vms/clone", config.model_dump())  # type: ignore[attr-defined]
        await self.client.post("/vms/clone", config.model_dump())  # type: ignore[attr-defined]

    @ensure_server
    async def get_latest_ipsw_url(self) -> str:
        """Get the latest IPSW URL."""
        await self._init_client()
        data = await self.client.get("/ipsw")  # type: ignore[attr-defined]
        return data["url"]

    @ensure_server
    async def get_images(self, organization: Optional[str] = None) -> ImageList:
        """Get list of available images."""
        await self._init_client()
        params = {"organization": organization} if organization else None
        data = await self.client.get("/images", params)  # type: ignore[attr-defined]
        return ImageList(root=data)

    async def close(self) -> None:
        """Close the client and stop the server."""
        if self.client is not None:
            await self.client.close()
            self.client = None
        await asyncio.sleep(1)
        await self.server.stop()

    async def _ensure_client(self) -> None:
        """Ensure client is initialized."""
        if self.client is None:
            await self._init_client()

```

--------------------------------------------------------------------------------
/libs/python/mcp-server/mcp_server/session_manager.py:
--------------------------------------------------------------------------------

```python
"""
Session Manager for MCP Server - Handles concurrent client sessions with proper resource isolation.

This module provides:
- Per-session computer instance management
- Resource pooling and lifecycle management
- Graceful session cleanup
- Concurrent task execution support
"""

import asyncio
import logging
import time
import uuid
from typing import Dict, Optional, Any, List, Set
from dataclasses import dataclass, field
from contextlib import asynccontextmanager
import weakref

logger = logging.getLogger("mcp-server.session_manager")

@dataclass
class SessionInfo:
    """Information about an active session."""
    session_id: str
    computer: Any  # Computer instance
    created_at: float
    last_activity: float
    active_tasks: Set[str] = field(default_factory=set)
    is_shutting_down: bool = False

class ComputerPool:
    """Pool of computer instances for efficient resource management."""
    
    def __init__(self, max_size: int = 5, idle_timeout: float = 300.0):
        self.max_size = max_size
        self.idle_timeout = idle_timeout
        self._available: List[Any] = []
        self._in_use: Set[Any] = set()
        self._creation_lock = asyncio.Lock()
        
    async def acquire(self) -> Any:
        """Acquire a computer instance from the pool."""
        # Try to get an available instance
        if self._available:
            computer = self._available.pop()
            self._in_use.add(computer)
            logger.debug(f"Reusing computer instance from pool")
            return computer
            
        # Check if we can create a new one
        async with self._creation_lock:
            if len(self._in_use) < self.max_size:
                logger.debug("Creating new computer instance")
                from computer import Computer
                computer = Computer(verbosity=logging.INFO)
                await computer.run()
                self._in_use.add(computer)
                return computer
                
        # Wait for an instance to become available
        logger.debug("Waiting for computer instance to become available")
        while not self._available:
            await asyncio.sleep(0.1)
            
        computer = self._available.pop()
        self._in_use.add(computer)
        return computer
    
    async def release(self, computer: Any) -> None:
        """Release a computer instance back to the pool."""
        if computer in self._in_use:
            self._in_use.remove(computer)
            self._available.append(computer)
            logger.debug("Released computer instance back to pool")
    
    async def cleanup_idle(self) -> None:
        """Clean up idle computer instances."""
        current_time = time.time()
        idle_instances = []
        
        for computer in self._available[:]:
            # Check if computer has been idle too long
            # Note: We'd need to track last use time per instance for this
            # For now, we'll keep instances in the pool
            pass
            
    async def shutdown(self) -> None:
        """Shutdown all computer instances in the pool."""
        logger.info("Shutting down computer pool")
        
        # Close all available instances
        for computer in self._available:
            try:
                if hasattr(computer, 'close'):
                    await computer.close()
                elif hasattr(computer, 'stop'):
                    await computer.stop()
            except Exception as e:
                logger.warning(f"Error closing computer instance: {e}")
        
        # Close all in-use instances
        for computer in self._in_use:
            try:
                if hasattr(computer, 'close'):
                    await computer.close()
                elif hasattr(computer, 'stop'):
                    await computer.stop()
            except Exception as e:
                logger.warning(f"Error closing computer instance: {e}")
        
        self._available.clear()
        self._in_use.clear()

class SessionManager:
    """Manages concurrent client sessions with proper resource isolation."""
    
    def __init__(self, max_concurrent_sessions: int = 10):
        self.max_concurrent_sessions = max_concurrent_sessions
        self._sessions: Dict[str, SessionInfo] = {}
        self._computer_pool = ComputerPool()
        self._session_lock = asyncio.Lock()
        self._cleanup_task: Optional[asyncio.Task] = None
        self._shutdown_event = asyncio.Event()
        
    async def start(self) -> None:
        """Start the session manager and cleanup task."""
        logger.info("Starting session manager")
        self._cleanup_task = asyncio.create_task(self._cleanup_loop())
        
    async def stop(self) -> None:
        """Stop the session manager and cleanup all resources."""
        logger.info("Stopping session manager")
        self._shutdown_event.set()
        
        if self._cleanup_task:
            self._cleanup_task.cancel()
            try:
                await self._cleanup_task
            except asyncio.CancelledError:
                pass
        
        # Force cleanup all sessions
        async with self._session_lock:
            session_ids = list(self._sessions.keys())
            
        for session_id in session_ids:
            await self._force_cleanup_session(session_id)
            
        await self._computer_pool.shutdown()
        
    @asynccontextmanager
    async def get_session(self, session_id: Optional[str] = None) -> Any:
        """Get or create a session with proper resource management."""
        if session_id is None:
            session_id = str(uuid.uuid4())
            
        # Check if session exists and is not shutting down
        async with self._session_lock:
            if session_id in self._sessions:
                session = self._sessions[session_id]
                if session.is_shutting_down:
                    raise RuntimeError(f"Session {session_id} is shutting down")
                session.last_activity = time.time()
                computer = session.computer
            else:
                # Create new session
                if len(self._sessions) >= self.max_concurrent_sessions:
                    raise RuntimeError(f"Maximum concurrent sessions ({self.max_concurrent_sessions}) reached")
                
                computer = await self._computer_pool.acquire()
                session = SessionInfo(
                    session_id=session_id,
                    computer=computer,
                    created_at=time.time(),
                    last_activity=time.time()
                )
                self._sessions[session_id] = session
                logger.info(f"Created new session: {session_id}")
        
        try:
            yield session
        finally:
            # Update last activity
            async with self._session_lock:
                if session_id in self._sessions:
                    self._sessions[session_id].last_activity = time.time()
    
    async def register_task(self, session_id: str, task_id: str) -> None:
        """Register a task for a session."""
        async with self._session_lock:
            if session_id in self._sessions:
                self._sessions[session_id].active_tasks.add(task_id)
                logger.debug(f"Registered task {task_id} for session {session_id}")
    
    async def unregister_task(self, session_id: str, task_id: str) -> None:
        """Unregister a task from a session."""
        async with self._session_lock:
            if session_id in self._sessions:
                self._sessions[session_id].active_tasks.discard(task_id)
                logger.debug(f"Unregistered task {task_id} from session {session_id}")
    
    async def cleanup_session(self, session_id: str) -> None:
        """Cleanup a specific session."""
        async with self._session_lock:
            if session_id not in self._sessions:
                return
                
            session = self._sessions[session_id]
            
            # Check if session has active tasks
            if session.active_tasks:
                logger.info(f"Session {session_id} has active tasks, marking for shutdown")
                session.is_shutting_down = True
                return
            
            # Actually cleanup the session
            await self._force_cleanup_session(session_id)
    
    async def _force_cleanup_session(self, session_id: str) -> None:
        """Force cleanup a session regardless of active tasks."""
        async with self._session_lock:
            if session_id not in self._sessions:
                return
                
            session = self._sessions[session_id]
            logger.info(f"Cleaning up session: {session_id}")
            
            # Release computer back to pool
            await self._computer_pool.release(session.computer)
            
            # Remove session
            del self._sessions[session_id]
    
    async def _cleanup_loop(self) -> None:
        """Background task to cleanup idle sessions."""
        while not self._shutdown_event.is_set():
            try:
                await asyncio.sleep(60)  # Run cleanup every minute
                
                current_time = time.time()
                idle_timeout = 600.0  # 10 minutes
                
                async with self._session_lock:
                    idle_sessions = []
                    for session_id, session in self._sessions.items():
                        if not session.is_shutting_down and not session.active_tasks:
                            if current_time - session.last_activity > idle_timeout:
                                idle_sessions.append(session_id)
                
                # Cleanup idle sessions
                for session_id in idle_sessions:
                    await self._force_cleanup_session(session_id)
                    logger.info(f"Cleaned up idle session: {session_id}")
                    
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Error in cleanup loop: {e}")
    
    def get_session_stats(self) -> Dict[str, Any]:
        """Get statistics about active sessions."""
        async def _get_stats():
            async with self._session_lock:
                return {
                    "total_sessions": len(self._sessions),
                    "max_concurrent": self.max_concurrent_sessions,
                    "sessions": {
                        session_id: {
                            "created_at": session.created_at,
                            "last_activity": session.last_activity,
                            "active_tasks": len(session.active_tasks),
                            "is_shutting_down": session.is_shutting_down
                        }
                        for session_id, session in self._sessions.items()
                    }
                }
        
        # Run in current event loop or create new one
        try:
            loop = asyncio.get_running_loop()
            return asyncio.run_coroutine_threadsafe(_get_stats(), loop).result()
        except RuntimeError:
            # No event loop running, create a new one
            return asyncio.run(_get_stats())

# Global session manager instance
_session_manager: Optional[SessionManager] = None

def get_session_manager() -> SessionManager:
    """Get the global session manager instance."""
    global _session_manager
    if _session_manager is None:
        _session_manager = SessionManager()
    return _session_manager

async def initialize_session_manager() -> None:
    """Initialize the global session manager."""
    global _session_manager
    if _session_manager is None:
        _session_manager = SessionManager()
        await _session_manager.start()
    return _session_manager

async def shutdown_session_manager() -> None:
    """Shutdown the global session manager."""
    global _session_manager
    if _session_manager is not None:
        await _session_manager.stop()
        _session_manager = None

```

--------------------------------------------------------------------------------
/.github/workflows/pypi-reusable-publish.yml:
--------------------------------------------------------------------------------

```yaml
name: Reusable Package Publish Workflow

on:
  workflow_call:
    inputs:
      package_name:
        description: "Name of the package (e.g. pylume, computer, agent)"
        required: true
        type: string
      package_dir:
        description: "Directory containing the package relative to workspace root (e.g. libs/python/pylume)"
        required: true
        type: string
      version:
        description: "Version to publish"
        required: true
        type: string
      is_lume_package:
        description: "Whether this package includes the lume binary"
        required: false
        type: boolean
        default: false
      base_package_name:
        description: "PyPI package name (e.g. pylume, cua-agent)"
        required: true
        type: string
      make_latest:
        description: "Whether to mark this release as latest (should only be true for lume)"
        required: false
        type: boolean
        default: false
    secrets:
      PYPI_TOKEN:
        required: true
    outputs:
      version:
        description: "The version that was published"
        value: ${{ jobs.build-and-publish.outputs.version }}

jobs:
  build-and-publish:
    runs-on: macos-latest
    permissions:
      contents: write # This permission is needed for creating releases
    outputs:
      version: ${{ steps.set-version.outputs.version }}
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0 # Full history for release creation

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: "3.11"

      - name: Create root pdm.lock file
        run: |
          # Create an empty pdm.lock file in the root
          touch pdm.lock

      - name: Install PDM
        uses: pdm-project/setup-pdm@v3
        with:
          python-version: "3.11"
          cache: true

      - name: Set version
        id: set-version
        run: |
          echo "VERSION=${{ inputs.version }}" >> $GITHUB_ENV
          echo "version=${{ inputs.version }}" >> $GITHUB_OUTPUT

      - name: Verify version consistency
        run: |
          # Install toml parser
          pip install toml

          # Verify version matches using script (exits with error if mismatch)
          python ${GITHUB_WORKSPACE}/.github/scripts/get_pyproject_version.py \
            ${{ inputs.package_dir }}/pyproject.toml \
            ${{ inputs.version }}

      - name: Initialize PDM in package directory
        run: |
          # Make sure we're working with a properly initialized PDM project
          cd ${{ inputs.package_dir }}

          # Create pdm.lock if it doesn't exist
          if [ ! -f "pdm.lock" ]; then
            echo "No pdm.lock found, initializing PDM project..."
            pdm lock
          fi

      # Conditional step for lume binary download (only for pylume package)
      - name: Download and setup lume binary
        if: inputs.is_lume_package
        run: |
          # Create a temporary directory for extraction
          mkdir -p temp_lume

          # Download the latest lume release directly
          echo "Downloading latest lume version..."
          curl -sL "https://github.com/trycua/lume/releases/latest/download/lume.tar.gz" -o temp_lume/lume.tar.gz

          # Extract the tar file (ignore ownership and suppress warnings)
          cd temp_lume && tar --no-same-owner -xzf lume.tar.gz

          # Make the binary executable
          chmod +x lume

          # Copy the lume binary to the correct location in the pylume package
          mkdir -p "${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pylume"
          cp lume "${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pylume/lume"

          # Verify the binary exists and is executable
          test -x "${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pylume/lume" || { echo "lume binary not found or not executable"; exit 1; }

          # Get the version from the downloaded binary for reference
          LUME_VERSION=$(./lume --version | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' || echo "unknown")
          echo "Using lume version: $LUME_VERSION"

          # Cleanup
          cd "${GITHUB_WORKSPACE}" && rm -rf temp_lume

          # Save the lume version for reference
          echo "LUME_VERSION=${LUME_VERSION}" >> $GITHUB_ENV

      - name: Build and publish
        env:
          PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }}
        run: |
          cd ${{ inputs.package_dir }}
          # Build with PDM
          pdm build

          # For pylume package, verify the binary is in the wheel
          if [ "${{ inputs.is_lume_package }}" = "true" ]; then
            python -m pip install wheel
            wheel unpack dist/*.whl --dest temp_wheel
            echo "Listing contents of wheel directory:"
            find temp_wheel -type f
            test -f temp_wheel/pylume-*/pylume/lume || { echo "lume binary not found in wheel"; exit 1; }
            rm -rf temp_wheel
            echo "Publishing ${{ inputs.base_package_name }} ${VERSION} with lume ${LUME_VERSION}"
          else
            echo "Publishing ${{ inputs.base_package_name }} ${VERSION}"
          fi

          # Install and use twine directly instead of PDM publish
          echo "Installing twine for direct publishing..."
          pip install twine

          echo "Publishing to PyPI using twine..."
          TWINE_USERNAME="__token__" TWINE_PASSWORD="$PYPI_TOKEN" python -m twine upload dist/*

          # Save the wheel file path for the release
          WHEEL_FILE=$(ls dist/*.whl | head -1)
          echo "WHEEL_FILE=${WHEEL_FILE}" >> $GITHUB_ENV

      - name: Prepare Simple Release Notes
        if: startsWith(github.ref, 'refs/tags/')
        run: |
          # Create release notes based on package type
          echo "# ${{ inputs.base_package_name }} v${VERSION}" > release_notes.md
          echo "" >> release_notes.md

          if [ "${{ inputs.package_name }}" = "pylume" ]; then
            echo "## Python SDK for lume - run macOS and Linux VMs on Apple Silicon" >> release_notes.md
            echo "" >> release_notes.md
            echo "This package provides Python bindings for the lume virtualization tool." >> release_notes.md
            echo "" >> release_notes.md
            echo "## Dependencies" >> release_notes.md
            echo "* lume binary: v${LUME_VERSION}" >> release_notes.md
          elif [ "${{ inputs.package_name }}" = "computer" ]; then
            echo "## Computer control library for the Computer Universal Automation (CUA) project" >> release_notes.md
            echo "" >> release_notes.md
            echo "## Dependencies" >> release_notes.md
            echo "* pylume: ${PYLUME_VERSION:-latest}" >> release_notes.md
          elif [ "${{ inputs.package_name }}" = "agent" ]; then
            echo "## Dependencies" >> release_notes.md
            echo "* cua-computer: ${COMPUTER_VERSION:-latest}" >> release_notes.md
            echo "* cua-som: ${SOM_VERSION:-latest}" >> release_notes.md
            echo "" >> release_notes.md
            echo "## Installation Options" >> release_notes.md
            echo "" >> release_notes.md
            echo "### Basic installation with Anthropic" >> release_notes.md
            echo '```bash' >> release_notes.md
            echo "pip install cua-agent[anthropic]==${VERSION}" >> release_notes.md
            echo '```' >> release_notes.md
            echo "" >> release_notes.md
            echo "### With SOM (recommended)" >> release_notes.md
            echo '```bash' >> release_notes.md
            echo "pip install cua-agent[som]==${VERSION}" >> release_notes.md
            echo '```' >> release_notes.md
            echo "" >> release_notes.md
            echo "### All features" >> release_notes.md
            echo '```bash' >> release_notes.md
            echo "pip install cua-agent[all]==${VERSION}" >> release_notes.md
            echo '```' >> release_notes.md
          elif [ "${{ inputs.package_name }}" = "som" ]; then
            echo "## Computer Vision and OCR library for detecting and analyzing UI elements" >> release_notes.md
            echo "" >> release_notes.md
            echo "This package provides enhanced UI understanding capabilities through computer vision and OCR." >> release_notes.md
          elif [ "${{ inputs.package_name }}" = "computer-server" ]; then
            echo "## Computer Server for the Computer Universal Automation (CUA) project" >> release_notes.md
            echo "" >> release_notes.md
            echo "A FastAPI-based server implementation for computer control." >> release_notes.md
            echo "" >> release_notes.md
            echo "## Dependencies" >> release_notes.md
            echo "* cua-computer: ${COMPUTER_VERSION:-latest}" >> release_notes.md
            echo "" >> release_notes.md
            echo "## Usage" >> release_notes.md
            echo '```bash' >> release_notes.md
            echo "# Run the server" >> release_notes.md
            echo "cua-computer-server" >> release_notes.md
            echo '```' >> release_notes.md
          elif [ "${{ inputs.package_name }}" = "mcp-server" ]; then
            echo "## MCP Server for the Computer-Use Agent (CUA)" >> release_notes.md
            echo "" >> release_notes.md
            echo "This package provides MCP (Model Context Protocol) integration for CUA agents, allowing them to be used with Claude Desktop, Cursor, and other MCP clients." >> release_notes.md
            echo "" >> release_notes.md
            echo "## Dependencies" >> release_notes.md
            echo "* cua-computer: ${COMPUTER_VERSION:-latest}" >> release_notes.md
            echo "* cua-agent: ${AGENT_VERSION:-latest}" >> release_notes.md
            echo "" >> release_notes.md
            echo "## Usage" >> release_notes.md
            echo '```bash' >> release_notes.md
            echo "# Run the MCP server directly" >> release_notes.md
            echo "cua-mcp-server" >> release_notes.md
            echo '```' >> release_notes.md
            echo "" >> release_notes.md
            echo "## Claude Desktop Integration" >> release_notes.md
            echo "Add to your Claude Desktop configuration (~/.config/claude-desktop/claude_desktop_config.json or OS-specific location):" >> release_notes.md
            echo '```json' >> release_notes.md
            echo '"mcpServers": {' >> release_notes.md
            echo '  "cua-agent": {' >> release_notes.md
            echo '    "command": "cua-mcp-server",' >> release_notes.md
            echo '    "args": [],' >> release_notes.md
            echo '    "env": {' >> release_notes.md
            echo '      "CUA_AGENT_LOOP": "OMNI",' >> release_notes.md
            echo '      "CUA_MODEL_PROVIDER": "ANTHROPIC",' >> release_notes.md
            echo '      "CUA_MODEL_NAME": "claude-3-opus-20240229",' >> release_notes.md
            echo '      "ANTHROPIC_API_KEY": "your-api-key",' >> release_notes.md
            echo '      "PYTHONIOENCODING": "utf-8"' >> release_notes.md
            echo '    }' >> release_notes.md
            echo '  }' >> release_notes.md
            echo '}' >> release_notes.md
            echo '```' >> release_notes.md
          fi

          # Add installation section if not agent (which has its own installation section)
          if [ "${{ inputs.package_name }}" != "agent" ]; then
            echo "" >> release_notes.md
            echo "## Installation" >> release_notes.md
            echo '```bash' >> release_notes.md
            echo "pip install ${{ inputs.base_package_name }}==${VERSION}" >> release_notes.md
            echo '```' >> release_notes.md
          fi

          echo "Release notes created:"
          cat release_notes.md

      - name: Create GitHub Release
        uses: softprops/action-gh-release@v2
        if: startsWith(github.ref, 'refs/tags/')
        with:
          name: "${{ inputs.base_package_name }} v${{ env.VERSION }}"
          body_path: release_notes.md
          files: ${{ inputs.package_dir }}/${{ env.WHEEL_FILE }}
          draft: false
          prerelease: false
          make_latest: ${{ inputs.package_name == 'lume' }}
        env:
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/composed_grounded.py:
--------------------------------------------------------------------------------

```python
"""
Composed-grounded agent loop implementation that combines grounding and thinking models.
Uses a two-stage approach: grounding model for element detection, thinking model for reasoning.
"""

import uuid
import asyncio
import json
import base64
from typing import Dict, List, Any, Optional, Tuple
from io import BytesIO
from PIL import Image
import litellm

from ..decorators import register_agent
from ..types import Messages, AgentResponse, Tools, AgentCapability
from ..loops.base import AsyncAgentConfig
from ..responses import (
    convert_computer_calls_xy2desc,
    convert_responses_items_to_completion_messages,
    convert_completion_messages_to_responses_items,
    convert_computer_calls_desc2xy,
    get_all_element_descriptions
)
from ..agent import find_agent_config

GROUNDED_COMPUTER_TOOL_SCHEMA = {
  "type": "function",
  "function": {
    "name": "computer",
    "description": "Control a computer by taking screenshots and interacting with UI elements. This tool uses element descriptions to locate and interact with UI elements on the screen (e.g., 'red submit button', 'search text field', 'hamburger menu icon', 'close button in top right corner').",
    "parameters": {
        "type": "object",
        "properties": {
        "action": {
            "type": "string",
            "enum": [
            "screenshot",
            "click",
            "double_click",
            "drag",
            "type",
            "keypress",
            "scroll",
            "move",
            "wait",
            "get_current_url",
            "get_dimensions",
            "get_environment"
            ],
            "description": "The action to perform (required for all actions)"
        },
        "element_description": {
            "type": "string",
            "description": "Description of the element to interact with (required for click, double_click, move, scroll actions)"
        },
        "start_element_description": {
            "type": "string",
            "description": "Description of the element to start dragging from (required for drag action)"
        },
        "end_element_description": {
            "type": "string",
            "description": "Description of the element to drag to (required for drag action)"
        },
        "text": {
            "type": "string",
            "description": "The text to type (required for type action)"
        },
        "keys": {
            "type": "array",
            "items": {
                "type": "string"
            },
            "description": "Key(s) to press (required for keypress action)"
        },
        "button": {
            "type": "string",
            "enum": [
                "left",
                "right",
                "wheel",
                "back",
                "forward"
            ],
            "description": "The mouse button to use for click action (required for click and double_click action)",
        },
        "scroll_x": {
            "type": "integer",
            "description": "Horizontal scroll amount for scroll action (required for scroll action)",
        },
        "scroll_y": {
            "type": "integer",
            "description": "Vertical scroll amount for scroll action (required for scroll action)",
        },
        },
        "required": [
            "action"
        ]
    }
  }
}

def _prepare_tools_for_grounded(tool_schemas: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Prepare tools for grounded API format"""
    grounded_tools = []
    
    for schema in tool_schemas:
        if schema["type"] == "computer":
            grounded_tools.append(GROUNDED_COMPUTER_TOOL_SCHEMA)
        else:
            grounded_tools.append(schema)
    
    return grounded_tools

def get_last_computer_call_image(messages: List[Dict[str, Any]]) -> Optional[str]:
    """Get the last computer call output image from messages."""
    for message in reversed(messages):
        if (isinstance(message, dict) and 
            message.get("type") == "computer_call_output" and
            isinstance(message.get("output"), dict) and
            message["output"].get("type") == "input_image"):
            image_url = message["output"].get("image_url", "")
            if image_url.startswith("data:image/png;base64,"):
                return image_url.split(",", 1)[1]
    return None


@register_agent(r".*\+.*", priority=1)
class ComposedGroundedConfig(AsyncAgentConfig):
    """
    Composed-grounded agent configuration that uses both grounding and thinking models.
    
    The model parameter should be in format: "grounding_model+thinking_model"
    e.g., "huggingface-local/HelloKKMe/GTA1-7B+gemini/gemini-1.5-pro"
    """
    
    def __init__(self):
        self.desc2xy: Dict[str, Tuple[float, float]] = {}
    
    async def predict_step(
        self,
        messages: List[Dict[str, Any]],
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        use_prompt_caching: Optional[bool] = False,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs
    ) -> Dict[str, Any]:
        """
        Composed-grounded predict step implementation.
        
        Process:
        0. Store last computer call image, if none then take a screenshot
        1. Convert computer calls from xy to descriptions
        2. Convert responses items to completion messages
        3. Call thinking model with litellm.acompletion
        4. Convert completion messages to responses items
        5. Get all element descriptions and populate desc2xy mapping
        6. Convert computer calls from descriptions back to xy coordinates
        7. Return output and usage
        """
        # Parse the composed model
        if "+" not in model:
            raise ValueError(f"Composed model must be in format 'grounding_model+thinking_model', got: {model}")
        grounding_model, thinking_model = model.split("+", 1)
        
        pre_output_items = []
        
        # Step 0: Store last computer call image, if none then take a screenshot
        last_image_b64 = get_last_computer_call_image(messages)
        if last_image_b64 is None:
            # Take a screenshot
            screenshot_b64 = await computer_handler.screenshot() # type: ignore
            if screenshot_b64:
                
                call_id = uuid.uuid4().hex
                pre_output_items += [
                    {
                        "type": "message",
                        "role": "assistant",
                        "content": [
                            {
                                "type": "output_text",
                                "text": "Taking a screenshot to see the current computer screen."
                            }
                        ]
                    },
                    {
                        "action": {
                            "type": "screenshot"
                        },
                        "call_id": call_id,
                        "status": "completed",
                        "type": "computer_call"
                    },
                    {
                        "type": "computer_call_output",
                        "call_id": call_id,
                        "output": {
                            "type": "input_image",
                            "image_url": f"data:image/png;base64,{screenshot_b64}"
                        }
                    },
                ]
                last_image_b64 = screenshot_b64
                
                # Call screenshot callback if provided
                if _on_screenshot:
                    await _on_screenshot(screenshot_b64)
        
        tool_schemas = _prepare_tools_for_grounded(tools) # type: ignore

        # Step 1: Convert computer calls from xy to descriptions
        input_messages = messages + pre_output_items
        messages_with_descriptions = convert_computer_calls_xy2desc(input_messages, self.desc2xy)
        
        # Step 2: Convert responses items to completion messages
        completion_messages = convert_responses_items_to_completion_messages(
            messages_with_descriptions, 
            allow_images_in_tool_results=False
        )
        
        # Step 3: Call thinking model with litellm.acompletion
        api_kwargs = {
            "model": thinking_model,
            "messages": completion_messages,
            "tools": tool_schemas,
            "max_retries": max_retries,
            "stream": stream,
            **kwargs
        }

        if use_prompt_caching:
            api_kwargs["use_prompt_caching"] = use_prompt_caching
        
        # Call API start hook
        if _on_api_start:
            await _on_api_start(api_kwargs)
        
        # Make the completion call
        response = await litellm.acompletion(**api_kwargs)
        
        # Call API end hook
        if _on_api_end:
            await _on_api_end(api_kwargs, response)
        
        # Extract usage information
        usage = {
            **response.usage.model_dump(), # type: ignore
            "response_cost": response._hidden_params.get("response_cost", 0.0),
        }
        if _on_usage:
            await _on_usage(usage)
        
        # Step 4: Convert completion messages back to responses items format
        response_dict = response.model_dump() # type: ignore
        choice_messages = [choice["message"] for choice in response_dict["choices"]]
        thinking_output_items = []
        
        for choice_message in choice_messages:
            thinking_output_items.extend(convert_completion_messages_to_responses_items([choice_message]))
        
        # Step 5: Get all element descriptions and populate desc2xy mapping
        element_descriptions = get_all_element_descriptions(thinking_output_items)
        
        if element_descriptions and last_image_b64:
            # Use grounding model to predict coordinates for each description
            grounding_agent_conf = find_agent_config(grounding_model)
            if grounding_agent_conf:
                grounding_agent = grounding_agent_conf.agent_class()
                
                for desc in element_descriptions:
                    for _ in range(3): # try 3 times
                        coords = await grounding_agent.predict_click(
                            model=grounding_model,
                            image_b64=last_image_b64,
                            instruction=desc
                        )
                        if coords:
                            self.desc2xy[desc] = coords
                            break
        
        # Step 6: Convert computer calls from descriptions back to xy coordinates
        final_output_items = convert_computer_calls_desc2xy(thinking_output_items, self.desc2xy)
        
        # Step 7: Return output and usage
        return {
            "output": pre_output_items + final_output_items,
            "usage": usage
        }
    
    async def predict_click(
        self,
        model: str,
        image_b64: str,
        instruction: str,
        **kwargs
    ) -> Optional[Tuple[int, int]]:
        """
        Predict click coordinates using the grounding model.
        
        For composed models, uses only the grounding model part for click prediction.
        """
        # Parse the composed model to get grounding model
        if "+" not in model:
            raise ValueError(f"Composed model must be in format 'grounding_model+thinking_model', got: {model}")
        grounding_model, thinking_model = model.split("+", 1)
        
        # Find and use the grounding agent
        grounding_agent_conf = find_agent_config(grounding_model)
        if grounding_agent_conf:
            grounding_agent = grounding_agent_conf.agent_class()
            return await grounding_agent.predict_click(
                model=grounding_model,
                image_b64=image_b64,
                instruction=instruction,
                **kwargs
            )
        
        return None
    
    def get_capabilities(self) -> List[AgentCapability]:
        """Return the capabilities supported by this agent."""
        return ["click", "step"]

```

--------------------------------------------------------------------------------
/.github/scripts/tests/test_get_pyproject_version.py:
--------------------------------------------------------------------------------

```python
"""
Comprehensive tests for get_pyproject_version.py script using unittest.

This test suite covers:
- Version matching validation
- Error handling for missing versions
- Invalid input handling
- File not found scenarios
- Malformed TOML handling
"""

import sys
import unittest
import tempfile
from pathlib import Path
from io import StringIO
from unittest.mock import patch

# Add parent directory to path to import the module
sys.path.insert(0, str(Path(__file__).parent.parent))

# Import after path is modified
import get_pyproject_version


class TestGetPyprojectVersion(unittest.TestCase):
    """Test suite for get_pyproject_version.py functionality."""

    def setUp(self):
        """Reset sys.argv before each test."""
        self.original_argv = sys.argv.copy()

    def tearDown(self):
        """Restore sys.argv after each test."""
        sys.argv = self.original_argv

    def create_pyproject_toml(self, version: str) -> Path:
        """Helper to create a temporary pyproject.toml file with a given version."""
        temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False)
        temp_file.write(f"""
[project]
name = "test-project"
version = "{version}"
description = "A test project"
""")
        temp_file.close()
        return Path(temp_file.name)

    def create_pyproject_toml_no_version(self) -> Path:
        """Helper to create a pyproject.toml without a version field."""
        temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False)
        temp_file.write("""
[project]
name = "test-project"
description = "A test project without version"
""")
        temp_file.close()
        return Path(temp_file.name)

    def create_pyproject_toml_no_project(self) -> Path:
        """Helper to create a pyproject.toml without a project section."""
        temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False)
        temp_file.write("""
[tool.poetry]
name = "test-project"
version = "1.0.0"
""")
        temp_file.close()
        return Path(temp_file.name)

    def create_malformed_toml(self) -> Path:
        """Helper to create a malformed TOML file."""
        temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False)
        temp_file.write("""
[project
name = "test-project
version = "1.0.0"
""")
        temp_file.close()
        return Path(temp_file.name)

    # Test: Successful version match
    def test_matching_versions(self):
        """Test that matching versions result in success."""
        pyproject_file = self.create_pyproject_toml("1.2.3")

        try:
            sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.2.3']

            # Capture stdout
            captured_output = StringIO()
            with patch('sys.stdout', captured_output):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 0)
            self.assertIn("✅ Version consistency check passed: 1.2.3", captured_output.getvalue())
        finally:
            pyproject_file.unlink()

    # Test: Version mismatch
    def test_version_mismatch(self):
        """Test that mismatched versions result in failure with appropriate error message."""
        pyproject_file = self.create_pyproject_toml("1.2.3")

        try:
            sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.2.4']

            # Capture stderr
            captured_error = StringIO()
            with patch('sys.stderr', captured_error):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 1)
            error_output = captured_error.getvalue()
            self.assertIn("❌ Version mismatch detected!", error_output)
            self.assertIn("pyproject.toml version: 1.2.3", error_output)
            self.assertIn("Expected version: 1.2.4", error_output)
            self.assertIn("Please update pyproject.toml to version 1.2.4", error_output)
        finally:
            pyproject_file.unlink()

    # Test: Missing version in pyproject.toml
    def test_missing_version_field(self):
        """Test handling of pyproject.toml without a version field."""
        pyproject_file = self.create_pyproject_toml_no_version()

        try:
            sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.0.0']

            captured_error = StringIO()
            with patch('sys.stderr', captured_error):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 1)
            self.assertIn("❌ ERROR: No version found in pyproject.toml", captured_error.getvalue())
        finally:
            pyproject_file.unlink()

    # Test: Missing project section
    def test_missing_project_section(self):
        """Test handling of pyproject.toml without a project section."""
        pyproject_file = self.create_pyproject_toml_no_project()

        try:
            sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.0.0']

            captured_error = StringIO()
            with patch('sys.stderr', captured_error):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 1)
            self.assertIn("❌ ERROR: No version found in pyproject.toml", captured_error.getvalue())
        finally:
            pyproject_file.unlink()

    # Test: File not found
    def test_file_not_found(self):
        """Test handling of non-existent pyproject.toml file."""
        sys.argv = ['get_pyproject_version.py', '/nonexistent/pyproject.toml', '1.0.0']

        with self.assertRaises(SystemExit) as cm:
            get_pyproject_version.main()

        self.assertEqual(cm.exception.code, 1)

    # Test: Malformed TOML
    def test_malformed_toml(self):
        """Test handling of malformed TOML file."""
        pyproject_file = self.create_malformed_toml()

        try:
            sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.0.0']

            with self.assertRaises(SystemExit) as cm:
                get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 1)
        finally:
            pyproject_file.unlink()

    # Test: Incorrect number of arguments - too few
    def test_too_few_arguments(self):
        """Test that providing too few arguments results in usage error."""
        sys.argv = ['get_pyproject_version.py', 'pyproject.toml']

        captured_error = StringIO()
        with patch('sys.stderr', captured_error):
            with self.assertRaises(SystemExit) as cm:
                get_pyproject_version.main()

        self.assertEqual(cm.exception.code, 1)
        self.assertIn("Usage: python get_pyproject_version.py <pyproject_path> <expected_version>",
                     captured_error.getvalue())

    # Test: Incorrect number of arguments - too many
    def test_too_many_arguments(self):
        """Test that providing too many arguments results in usage error."""
        sys.argv = ['get_pyproject_version.py', 'pyproject.toml', '1.0.0', 'extra']

        captured_error = StringIO()
        with patch('sys.stderr', captured_error):
            with self.assertRaises(SystemExit) as cm:
                get_pyproject_version.main()

        self.assertEqual(cm.exception.code, 1)
        self.assertIn("Usage: python get_pyproject_version.py <pyproject_path> <expected_version>",
                     captured_error.getvalue())

    # Test: No arguments
    def test_no_arguments(self):
        """Test that providing no arguments results in usage error."""
        sys.argv = ['get_pyproject_version.py']

        captured_error = StringIO()
        with patch('sys.stderr', captured_error):
            with self.assertRaises(SystemExit) as cm:
                get_pyproject_version.main()

        self.assertEqual(cm.exception.code, 1)
        self.assertIn("Usage: python get_pyproject_version.py <pyproject_path> <expected_version>",
                     captured_error.getvalue())

    # Test: Version with pre-release tags
    def test_version_with_prerelease_tags(self):
        """Test matching versions with pre-release tags like alpha, beta, rc."""
        pyproject_file = self.create_pyproject_toml("1.2.3-rc.1")

        try:
            sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.2.3-rc.1']

            captured_output = StringIO()
            with patch('sys.stdout', captured_output):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 0)
            self.assertIn("✅ Version consistency check passed: 1.2.3-rc.1", captured_output.getvalue())
        finally:
            pyproject_file.unlink()

    # Test: Version with build metadata
    def test_version_with_build_metadata(self):
        """Test matching versions with build metadata."""
        pyproject_file = self.create_pyproject_toml("1.2.3+build.123")

        try:
            sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.2.3+build.123']

            captured_output = StringIO()
            with patch('sys.stdout', captured_output):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 0)
            self.assertIn("✅ Version consistency check passed: 1.2.3+build.123", captured_output.getvalue())
        finally:
            pyproject_file.unlink()

    # Test: Various semantic version formats
    def test_semantic_version_0_0_1(self):
        """Test semantic version 0.0.1."""
        self._test_version_format("0.0.1")

    def test_semantic_version_1_0_0(self):
        """Test semantic version 1.0.0."""
        self._test_version_format("1.0.0")

    def test_semantic_version_10_20_30(self):
        """Test semantic version 10.20.30."""
        self._test_version_format("10.20.30")

    def test_semantic_version_alpha(self):
        """Test semantic version with alpha tag."""
        self._test_version_format("1.2.3-alpha")

    def test_semantic_version_beta(self):
        """Test semantic version with beta tag."""
        self._test_version_format("1.2.3-beta.1")

    def test_semantic_version_rc_with_build(self):
        """Test semantic version with rc and build metadata."""
        self._test_version_format("1.2.3-rc.1+build.456")

    def _test_version_format(self, version: str):
        """Helper method to test various semantic version formats."""
        pyproject_file = self.create_pyproject_toml(version)

        try:
            sys.argv = ['get_pyproject_version.py', str(pyproject_file), version]

            captured_output = StringIO()
            with patch('sys.stdout', captured_output):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 0)
            self.assertIn(f"✅ Version consistency check passed: {version}", captured_output.getvalue())
        finally:
            pyproject_file.unlink()

    # Test: Empty version string
    def test_empty_version_string(self):
        """Test handling of empty version string."""
        pyproject_file = self.create_pyproject_toml("")

        try:
            sys.argv = ['get_pyproject_version.py', str(pyproject_file), '1.0.0']

            captured_error = StringIO()
            with patch('sys.stderr', captured_error):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 1)
            # Empty string is falsy, so it should trigger error
            self.assertIn("❌", captured_error.getvalue())
        finally:
            pyproject_file.unlink()


class TestSuiteInfo(unittest.TestCase):
    """Test suite metadata."""

    def test_suite_info(self):
        """Display test suite information."""
        print("\n" + "="*70)
        print("Test Suite: get_pyproject_version.py")
        print("Framework: unittest (Python built-in)")
        print("TOML Library: tomllib (Python 3.11+ built-in)")
        print("="*70)
        self.assertTrue(True)


if __name__ == '__main__':
    # Run tests with verbose output
    unittest.main(verbosity=2)

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/watchdog.py:
--------------------------------------------------------------------------------

```python
"""
Watchdog module for monitoring the Computer API server health.
Unix/Linux only - provides process management and restart capabilities.
"""

import asyncio
import fcntl
import json
import logging
import os
import platform
import subprocess
import sys
import time
import websockets
from typing import Optional

logger = logging.getLogger(__name__)


def instance_already_running(label="watchdog"):
    """
    Detect if an an instance with the label is already running, globally
    at the operating system level.

    Using `os.open` ensures that the file pointer won't be closed
    by Python's garbage collector after the function's scope is exited.

    The lock will be released when the program exits, or could be
    released if the file pointer were closed.
    """

    lock_file_pointer = os.open(f"/tmp/instance_{label}.lock", os.O_WRONLY | os.O_CREAT)

    try:
        fcntl.lockf(lock_file_pointer, fcntl.LOCK_EX | fcntl.LOCK_NB)
        already_running = False
    except IOError:
        already_running = True

    return already_running


class Watchdog:
    """Watchdog class to monitor server health via WebSocket connection.
    Unix/Linux only - provides restart capabilities.
    """
    
    def __init__(self, cli_args: Optional[dict] = None, ping_interval: int = 30):
        """
        Initialize the watchdog.
        
        Args:
            cli_args: Dictionary of CLI arguments to replicate when restarting
            ping_interval: Interval between ping checks in seconds
        """
        # Check if running on Unix/Linux
        if platform.system() not in ['Linux', 'Darwin']:
            raise RuntimeError("Watchdog is only supported on Unix/Linux systems")
        
        # Store CLI arguments for restart
        self.cli_args = cli_args or {}
        self.host = self.cli_args.get('host', 'localhost')
        self.port = self.cli_args.get('port', 8000)
        self.ping_interval = ping_interval
        self.container_name = os.environ.get("CONTAINER_NAME")
        self.running = False
        self.restart_enabled = True
    
    @property
    def ws_uri(self) -> str:
        """Get the WebSocket URI using the current IP address.
        
        Returns:
            WebSocket URI for the Computer API Server
        """
        ip_address = "localhost" if not self.container_name else f"{self.container_name}.containers.cloud.trycua.com"
        protocol = "wss" if self.container_name else "ws"
        port = "8443" if self.container_name else "8000"
        return f"{protocol}://{ip_address}:{port}/ws"
        
    async def ping(self) -> bool:
        """
        Test connection to the WebSocket endpoint.
        
        Returns:
            True if connection successful, False otherwise
        """
        try:
            # Create a simple ping message
            ping_message = {
                "command": "get_screen_size",
                "params": {}
            }
            
            # Try to connect to the WebSocket
            async with websockets.connect(
                self.ws_uri,
                max_size=1024 * 1024 * 10  # 10MB limit to match server
            ) as websocket:
                # Send ping message
                await websocket.send(json.dumps(ping_message))
                
                # Wait for any response or just close
                try:
                    response = await asyncio.wait_for(websocket.recv(), timeout=5)
                    logger.debug(f"Ping response received: {response[:100]}...")
                    return True
                except asyncio.TimeoutError:
                    return False
        except Exception as e:
            logger.warning(f"Ping failed: {e}")
            return False
    
    def kill_processes_on_port(self, port: int) -> bool:
        """
        Kill any processes using the specified port.
        
        Args:
            port: Port number to check and kill processes on
            
        Returns:
            True if processes were killed or none found, False on error
        """
        try:
            # Find processes using the port
            result = subprocess.run(
                ["lsof", "-ti", f":{port}"],
                capture_output=True,
                text=True,
                timeout=10
            )
            
            if result.returncode == 0 and result.stdout.strip():
                pids = result.stdout.strip().split('\n')
                logger.info(f"Found {len(pids)} processes using port {port}: {pids}")
                
                # Kill each process
                for pid in pids:
                    if pid.strip():
                        try:
                            subprocess.run(["kill", "-9", pid.strip()], timeout=5)
                            logger.info(f"Killed process {pid}")
                        except subprocess.TimeoutExpired:
                            logger.warning(f"Timeout killing process {pid}")
                        except Exception as e:
                            logger.warning(f"Error killing process {pid}: {e}")
                            
                return True
            else:
                logger.debug(f"No processes found using port {port}")
                return True
                
        except subprocess.TimeoutExpired:
            logger.error(f"Timeout finding processes on port {port}")
            return False
        except Exception as e:
            logger.error(f"Error finding processes on port {port}: {e}")
            return False
    
    def restart_server(self) -> bool:
        """
        Attempt to restart the server by killing existing processes and starting new one.
        
        Returns:
            True if restart was attempted, False on error
        """
        if not self.restart_enabled:
            logger.info("Server restart is disabled")
            return False
            
        try:
            logger.info("Attempting to restart server...")
            
            # Kill processes on the port
            port_to_kill = 8443 if self.container_name else self.port
            if not self.kill_processes_on_port(port_to_kill):
                logger.error("Failed to kill processes on port, restart aborted")
                return False
            
            # Wait a moment for processes to die
            time.sleep(2)
            
            # Try to restart the server
            # In container mode, we can't easily restart, so just log
            if self.container_name:
                logger.warning("Container mode detected - cannot restart server automatically")
                logger.warning("Container orchestrator should handle restart")
                return False
            else:
                # For local mode, try to restart the CLI
                logger.info("Attempting to restart local server...")
                
                # Get the current Python executable and script
                python_exe = sys.executable
                
                # Try to find the CLI module
                try:
                    # Build command with all original CLI arguments
                    cmd = [python_exe, "-m", "computer_server.cli"]
                    
                    # Add all CLI arguments except watchdog-related ones
                    for key, value in self.cli_args.items():
                        if key in ['watchdog', 'watchdog_interval', 'no_restart']:
                            continue  # Skip watchdog args to avoid recursive watchdog
                        
                        # Convert underscores to hyphens for CLI args
                        arg_name = f"--{key.replace('_', '-')}"
                        
                        if isinstance(value, bool):
                            if value:  # Only add flag if True
                                cmd.append(arg_name)
                        else:
                            cmd.extend([arg_name, str(value)])
                    
                    logger.info(f"Starting server with command: {' '.join(cmd)}")
                    
                    # Start process in background
                    subprocess.Popen(
                        cmd,
                        stdout=subprocess.DEVNULL,
                        stderr=subprocess.DEVNULL,
                        start_new_session=True
                    )
                    
                    logger.info("Server restart initiated")
                    return True
                    
                except Exception as e:
                    logger.error(f"Failed to restart server: {e}")
                    return False
                    
        except Exception as e:
            logger.error(f"Error during server restart: {e}")
            return False
    
    async def start_monitoring(self) -> None:
        """Start the watchdog monitoring loop."""
        self.running = True
        logger.info(f"Starting watchdog monitoring for {self.ws_uri}")
        logger.info(f"Ping interval: {self.ping_interval} seconds")
        if self.container_name:
            logger.info(f"Container mode detected: {self.container_name}")
        
        consecutive_failures = 0
        max_failures = 3
        
        while self.running:
            try:
                success = await self.ping()
                
                if success:
                    if consecutive_failures > 0:
                        logger.info("Server connection restored")
                    consecutive_failures = 0
                    logger.debug("Ping successful")
                else:
                    consecutive_failures += 1
                    logger.warning(f"Ping failed ({consecutive_failures}/{max_failures})")
                    
                    if consecutive_failures >= max_failures:
                        logger.error(f"Server appears to be down after {max_failures} consecutive failures")
                        
                        # Attempt to restart the server
                        if self.restart_enabled:
                            logger.info("Attempting automatic server restart...")
                            restart_success = self.restart_server()
                            
                            if restart_success:
                                logger.info("Server restart initiated, waiting before next ping...")
                                # Wait longer after restart attempt
                                await asyncio.sleep(self.ping_interval * 2)
                                consecutive_failures = 0  # Reset counter after restart attempt
                            else:
                                logger.error("Server restart failed")
                        else:
                            logger.warning("Automatic restart is disabled")
                        
                # Wait for next ping interval
                await asyncio.sleep(self.ping_interval)
                
            except asyncio.CancelledError:
                logger.info("Watchdog monitoring cancelled")
                break
            except Exception as e:
                logger.error(f"Unexpected error in watchdog loop: {e}")
                await asyncio.sleep(self.ping_interval)
    
    def stop_monitoring(self) -> None:
        """Stop the watchdog monitoring."""
        self.running = False
        logger.info("Stopping watchdog monitoring")


async def run_watchdog(cli_args: Optional[dict] = None, ping_interval: int = 30) -> None:
    """
    Run the watchdog monitoring.
    
    Args:
        cli_args: Dictionary of CLI arguments to replicate when restarting
        ping_interval: Interval between ping checks in seconds
    """
    watchdog = Watchdog(cli_args=cli_args, ping_interval=ping_interval)
    
    try:
        await watchdog.start_monitoring()
    except KeyboardInterrupt:
        logger.info("Watchdog stopped by user")
    finally:
        watchdog.stop_monitoring()


if __name__ == "__main__":
    # For testing the watchdog standalone
    import argparse
    
    parser = argparse.ArgumentParser(description="Run Computer API server watchdog")
    parser.add_argument("--host", default="localhost", help="Server host to monitor")
    parser.add_argument("--port", type=int, default=8000, help="Server port to monitor")
    parser.add_argument("--ping-interval", type=int, default=30, help="Ping interval in seconds")
    
    args = parser.parse_args()
    
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )
    
    cli_args = {
        'host': args.host,
        'port': args.port
    }
    asyncio.run(run_watchdog(cli_args, args.ping_interval))

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/adapters/human_adapter.py:
--------------------------------------------------------------------------------

```python
import os
import asyncio
import requests
from typing import List, Dict, Any, Iterator, AsyncIterator
from litellm.types.utils import GenericStreamingChunk, ModelResponse
from litellm.llms.custom_llm import CustomLLM
from litellm import completion, acompletion


class HumanAdapter(CustomLLM):
    """Human Adapter for human-in-the-loop completions.
    
    This adapter sends completion requests to a human completion server
    where humans can review and respond to AI requests.
    """
    
    def __init__(self, base_url: str | None = None, timeout: float = 300.0, **kwargs):
        """Initialize the human adapter.
        
        Args:
            base_url: Base URL for the human completion server.
                     Defaults to HUMAN_BASE_URL environment variable or http://localhost:8002
            timeout: Timeout in seconds for waiting for human response
            **kwargs: Additional arguments
        """
        super().__init__()
        self.base_url = base_url or os.getenv('HUMAN_BASE_URL', 'http://localhost:8002')
        self.timeout = timeout
        
        # Ensure base_url doesn't end with slash
        self.base_url = self.base_url.rstrip('/')
    
    def _queue_completion(self, messages: List[Dict[str, Any]], model: str) -> str:
        """Queue a completion request and return the call ID.
        
        Args:
            messages: Messages in OpenAI format
            model: Model name
            
        Returns:
            Call ID for tracking the request
            
        Raises:
            Exception: If queueing fails
        """
        try:
            response = requests.post(
                f"{self.base_url}/queue",
                json={"messages": messages, "model": model},
                timeout=10
            )
            response.raise_for_status()
            return response.json()["id"]
        except requests.RequestException as e:
            raise Exception(f"Failed to queue completion request: {e}")
    
    def _wait_for_completion(self, call_id: str) -> Dict[str, Any]:
        """Wait for human to complete the call.
        
        Args:
            call_id: ID of the queued completion call
            
        Returns:
            Dict containing response and/or tool_calls
            
        Raises:
            TimeoutError: If timeout is exceeded
            Exception: If completion fails
        """
        import time
        
        start_time = time.time()
        
        while True:
            try:
                # Check status
                status_response = requests.get(f"{self.base_url}/status/{call_id}")
                status_response.raise_for_status()
                status_data = status_response.json()
                
                if status_data["status"] == "completed":
                    result = {}
                    if "response" in status_data and status_data["response"]:
                        result["response"] = status_data["response"]
                    if "tool_calls" in status_data and status_data["tool_calls"]:
                        result["tool_calls"] = status_data["tool_calls"]
                    return result
                elif status_data["status"] == "failed":
                    error_msg = status_data.get("error", "Unknown error")
                    raise Exception(f"Completion failed: {error_msg}")
                
                # Check timeout
                if time.time() - start_time > self.timeout:
                    raise TimeoutError(f"Timeout waiting for human response after {self.timeout} seconds")
                
                # Wait before checking again
                time.sleep(1.0)
                
            except requests.RequestException as e:
                if time.time() - start_time > self.timeout:
                    raise TimeoutError(f"Timeout waiting for human response: {e}")
                # Continue trying if we haven't timed out
                time.sleep(1.0)
    
    async def _async_wait_for_completion(self, call_id: str) -> Dict[str, Any]:
        """Async version of wait_for_completion.
        
        Args:
            call_id: ID of the queued completion call
            
        Returns:
            Dict containing response and/or tool_calls
            
        Raises:
            TimeoutError: If timeout is exceeded
            Exception: If completion fails
        """
        import aiohttp
        import time
        
        start_time = time.time()
        
        async with aiohttp.ClientSession() as session:
            while True:
                try:
                    # Check status
                    async with session.get(f"{self.base_url}/status/{call_id}") as response:
                        response.raise_for_status()
                        status_data = await response.json()
                    
                    if status_data["status"] == "completed":
                        result = {}
                        if "response" in status_data and status_data["response"]:
                            result["response"] = status_data["response"]
                        if "tool_calls" in status_data and status_data["tool_calls"]:
                            result["tool_calls"] = status_data["tool_calls"]
                        return result
                    elif status_data["status"] == "failed":
                        error_msg = status_data.get("error", "Unknown error")
                        raise Exception(f"Completion failed: {error_msg}")
                    
                    # Check timeout
                    if time.time() - start_time > self.timeout:
                        raise TimeoutError(f"Timeout waiting for human response after {self.timeout} seconds")
                    
                    # Wait before checking again
                    await asyncio.sleep(1.0)
                    
                except Exception as e:
                    if time.time() - start_time > self.timeout:
                        raise TimeoutError(f"Timeout waiting for human response: {e}")
                    # Continue trying if we haven't timed out
                    await asyncio.sleep(1.0)
    
    def _generate_response(self, messages: List[Dict[str, Any]], model: str) -> Dict[str, Any]:
        """Generate a human response for the given messages.
        
        Args:
            messages: Messages in OpenAI format
            model: Model name
            
        Returns:
            Dict containing response and/or tool_calls
        """
        # Queue the completion request
        call_id = self._queue_completion(messages, model)
        
        # Wait for human response
        response = self._wait_for_completion(call_id)
        
        return response
    
    async def _async_generate_response(self, messages: List[Dict[str, Any]], model: str) -> Dict[str, Any]:
        """Async version of _generate_response.
        
        Args:
            messages: Messages in OpenAI format
            model: Model name
            
        Returns:
            Dict containing response and/or tool_calls
        """
        # Queue the completion request (sync operation)
        call_id = self._queue_completion(messages, model)
        
        # Wait for human response (async)
        response = await self._async_wait_for_completion(call_id)
        
        return response
    
    def completion(self, *args, **kwargs) -> ModelResponse:
        """Synchronous completion method.
        
        Returns:
            ModelResponse with human-generated text or tool calls
        """
        messages = kwargs.get('messages', [])
        model = kwargs.get('model', 'human')
        
        # Generate human response
        human_response_data = self._generate_response(messages, model)
        
        # Create ModelResponse with proper structure
        from litellm.types.utils import ModelResponse, Choices, Message
        import uuid
        import time
        
        # Create message content based on response type
        if "tool_calls" in human_response_data and human_response_data["tool_calls"]:
            # Tool calls response
            message = Message(
                role="assistant",
                content=human_response_data.get("response", ""),
                tool_calls=human_response_data["tool_calls"]
            )
        else:
            # Text response
            message = Message(
                role="assistant",
                content=human_response_data.get("response", "")
            )
        
        choice = Choices(
            finish_reason="stop",
            index=0,
            message=message
        )
        
        result = ModelResponse(
            id=f"human-{uuid.uuid4()}",
            choices=[choice],
            created=int(time.time()),
            model=f"human/{model}",
            object="chat.completion"
        )
        
        return result
    
    async def acompletion(self, *args, **kwargs) -> ModelResponse:
        """Asynchronous completion method.
        
        Returns:
            ModelResponse with human-generated text or tool calls
        """
        messages = kwargs.get('messages', [])
        model = kwargs.get('model', 'human')
        
        # Generate human response
        human_response_data = await self._async_generate_response(messages, model)
        
        # Create ModelResponse with proper structure
        from litellm.types.utils import ModelResponse, Choices, Message
        import uuid
        import time
        
        # Create message content based on response type
        if "tool_calls" in human_response_data and human_response_data["tool_calls"]:
            # Tool calls response
            message = Message(
                role="assistant",
                content=human_response_data.get("response", ""),
                tool_calls=human_response_data["tool_calls"]
            )
        else:
            # Text response
            message = Message(
                role="assistant",
                content=human_response_data.get("response", "")
            )
        
        choice = Choices(
            finish_reason="stop",
            index=0,
            message=message
        )
        
        result = ModelResponse(
            id=f"human-{uuid.uuid4()}",
            choices=[choice],
            created=int(time.time()),
            model=f"human/{model}",
            object="chat.completion"
        )
        
        return result
    
    def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]:
        """Synchronous streaming method.
        
        Yields:
            Streaming chunks with human-generated text or tool calls
        """
        messages = kwargs.get('messages', [])
        model = kwargs.get('model', 'human')
        
        # Generate human response
        human_response_data = self._generate_response(messages, model)
        
        import time
        
        # Handle tool calls vs text response
        if "tool_calls" in human_response_data and human_response_data["tool_calls"]:
            # Stream tool calls as a single chunk
            generic_chunk: GenericStreamingChunk = {
                "finish_reason": "tool_calls",
                "index": 0,
                "is_finished": True,
                "text": human_response_data.get("response", ""),
                "tool_use": human_response_data["tool_calls"],
                "usage": {"completion_tokens": 1, "prompt_tokens": 0, "total_tokens": 1},
            }
            yield generic_chunk
        else:
            # Stream text response
            response_text = human_response_data.get("response", "")
            generic_chunk: GenericStreamingChunk = {
                "finish_reason": "stop",
                "index": 0,
                "is_finished": True,
                "text": response_text,
                "tool_use": None,
                "usage": {"completion_tokens": len(response_text.split()), "prompt_tokens": 0, "total_tokens": len(response_text.split())},
            }
            yield generic_chunk
    
    async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]:
        """Asynchronous streaming method.
        
        Yields:
            Streaming chunks with human-generated text or tool calls
        """
        messages = kwargs.get('messages', [])
        model = kwargs.get('model', 'human')
        
        # Generate human response
        human_response = await self._async_generate_response(messages, model)
        
        # Return as single streaming chunk
        generic_streaming_chunk: GenericStreamingChunk = {
            "finish_reason": "stop",
            "index": 0,
            "is_finished": True,
            "text": human_response,
            "tool_use": None,
            "usage": {"completion_tokens": len(human_response.split()), "prompt_tokens": 0, "total_tokens": len(human_response.split())},
        }
        
        yield generic_streaming_chunk
```

--------------------------------------------------------------------------------
/blog/bringing-computer-use-to-the-web.md:
--------------------------------------------------------------------------------

```markdown
# Bringing Computer-Use to the Web

*Published on August 5, 2025 by Morgan Dean*

In one of our original posts, we explored building Computer-Use Operators on macOS - first with a [manual implementation](build-your-own-operator-on-macos-1.md) using OpenAI's `computer-use-preview` model, then with our [cua-agent framework](build-your-own-operator-on-macos-2.md) for Python developers. While these tutorials have been incredibly popular, we've received consistent feedback from our community: **"Can we use C/ua with JavaScript and TypeScript?"**

Today, we're excited to announce the release of the **`@trycua/computer` Web SDK** - a new library that allows you to control your C/ua cloud containers from any JavaScript or TypeScript project. With this library, you can click, type, and grab screenshots from your cloud containers - no extra servers required.

With this new SDK, you can easily develop CUA experiences like the one below, which we will release soon as open source.

<div align="center">
  <video src="https://github.com/user-attachments/assets/e213d6c3-73b6-48dd-a7d9-ed761ed74f89" width="600" controls></video>
</div>

Let’s see how it works.

## What You'll Learn

By the end of this tutorial, you'll be able to:

- Set up the `@trycua/computer` npm library in any JavaScript/TypeScript project
- Connect OpenAI's computer-use model to C/ua cloud containers from web applications
- Build computer-use agents that work in Node.js, React, Vue, or any web framework
- Handle different types of computer actions (clicking, typing, scrolling) from web code
- Implement the complete computer-use loop in JavaScript/TypeScript
- Integrate AI automation into existing web applications and workflows

**Prerequisites:**

- Node.js 16+ and npm/yarn/pnpm
- Basic JavaScript or TypeScript knowledge
- OpenAI API access (Tier 3+ for computer-use-preview)
- C/ua cloud container credits ([get started here](https://trycua.com/pricing))

**Estimated Time:** 45-60 minutes

## Access Requirements

### OpenAI Model Availability

At the time of writing, the **computer-use-preview** model has limited availability:

- Only accessible to OpenAI tier 3+ users
- Additional application process may be required even for eligible users
- Cannot be used in the OpenAI Playground
- Outside of ChatGPT Operator, usage is restricted to the new **Responses API**

Luckily, the `@trycua/computer` library can be used in conjunction with other models, like [Anthropic’s Computer Use](https://docs.anthropic.com/en/docs/agents-and-tools/tool-use/computer-use-tool) or [UI-TARS](https://huggingface.co/ByteDance-Seed/UI-TARS-1.5-7B). You’ll just have to write your own handler to parse the model output for interfacing with the container.

### C/ua Cloud Containers

To follow this guide, you’ll need access to a C/ua cloud container.

Getting access is simple: purchase credits from our [pricing page](https://trycua.com/pricing), then create and provision a new container instance from the [dashboard](https://trycua.com/dashboard/containers). With your container running, you'll be ready to leverage the web SDK and bring automation to your JavaScript or TypeScript applications.

## Understanding the Flow

### OpenAI API Overview

Let's start with the basics. In our case, we'll use OpenAI's API to communicate with their computer-use model.

Think of it like this:

1. We send the model a screenshot of our container and tell it what we want it to do
2. The model looks at the screenshot and decides what actions to take
3. It sends back instructions (like "click here" or "type this")
4. We execute those instructions in our container.

### Model Setup

Here's how we set up the computer-use model for web development:

```javascript
const res = await openai.responses.create({
  model: 'computer-use-preview',
  tools: [
    {
      type: 'computer_use_preview',
      display_width: 1024,
      display_height: 768,
      environment: 'linux', // we're using a linux container
    },
  ],
  input: [
    {
      role: 'user',
      content: [
        // what we want the ai to do
        { type: 'input_text', text: 'Open firefox and go to trycua.com' },
        // first screenshot of the vm
        {
          type: 'input_image',
          image_url: `data:image/png;base64,${screenshotBase64}`,
          detail: 'auto',
        },
      ],
    },
  ],
  truncation: 'auto'
});
```

### Understanding the Response

When we send a request, the API sends back a response that looks like this:

```json
"output": [
    {
        "type": "reasoning",           // The AI explains what it's thinking
        "id": "rs_67cc...",
        "summary": [
            {
                "type": "summary_text",
                "text": "Clicking on the browser address bar."
            }
        ]
    },
    {
        "type": "computer_call",       // The actual action to perform
        "id": "cu_67cc...",
        "call_id": "call_zw3...",      // Used to track previous calls
        "action": {
            "type": "click",           // What kind of action (click, type, etc.)
            "button": "left",          // Which mouse button to use
            "x": 156,                  // Where to click (coordinates)
            "y": 50
        },
        "pending_safety_checks": [],   // Any safety warnings to consider
        "status": "completed"          // Whether the action was successful
    }
]

```

Each response contains:

1. **Reasoning**: The AI's explanation of what it's doing
2. **Action**: The specific computer action to perform
3. **Safety Checks**: Any potential risks to review
4. **Status**: Whether everything worked as planned

## Implementation Guide

### Provision a C/ua Cloud Container

  1. Visit [trycua.com](https://trycua.com), sign up, purchase [credits](https://trycua.com/pricing), and create a new container instance from the [dashboard](https://trycua.com/dashboard).
  2. Create an API key from the dashboard — be sure to save it in a secure location before continuing.
  3. Start the cloud container from the dashboard.

### Environment Setup

  1. Install required packages with your preferred package manager:

      ```bash
      npm install --save @trycua/computer # or yarn, pnpm, bun
      npm install --save openai # or yarn, pnpm, bun
      ```

      Works with any JavaScript/TypeScript project setup - whether you're using Create React App, Next.js, Vue, Angular, or plain JavaScript.

  2. Save your OpenAI API key, C/ua API key, and container name to a `.env` file:

      ```bash
      OPENAI_API_KEY=openai-api-key
      CUA_API_KEY=cua-api-key
      CUA_CONTAINER_NAME=cua-cloud-container-name
      ```

      These environment variables work the same whether you're using vanilla JavaScript, TypeScript, or any web framework.

## Building the Agent

### Mapping API Actions to `@trycua/computer` Interface Methods

This helper function handles a `computer_call` action from the OpenAI API — converting the action into an equivalent action from the `@trycua/computer` interface. These actions will execute on the initialized `Computer` instance. For example, `await computer.interface.leftClick()` sends a mouse left click to the current cursor position.

Whether you're using JavaScript or TypeScript, the interface remains the same:

```javascript
export async function executeAction(
  computer: Computer,
  action: OpenAI.Responses.ResponseComputerToolCall['action']
) {
  switch (action.type) {
    case 'click':
      const { x, y, button } = action;
      console.log(`Executing click at (${x}, ${y}) with button '${button}'.`);
      await computer.interface.moveCursor(x, y);
      if (button === 'right') await computer.interface.rightClick();
      else await computer.interface.leftClick();
      break;
    case 'type':
      const { text } = action;
      console.log(`Typing text: ${text}`);
      await computer.interface.typeText(text);
      break;
    case 'scroll':
      const { x: locX, y: locY, scroll_x, scroll_y } = action;
      console.log(
        `Scrolling at (${locX}, ${locY}) with offsets (scroll_x=${scroll_x}, scroll_y=${scroll_y}).`
      );
      await computer.interface.moveCursor(locX, locY);
      await computer.interface.scroll(scroll_x, scroll_y);
      break;
    case 'keypress':
      const { keys } = action;
      for (const key of keys) {
        console.log(`Pressing key: ${key}.`);
        // Map common key names to CUA equivalents
        if (key.toLowerCase() === 'enter') {
          await computer.interface.pressKey('return');
        } else if (key.toLowerCase() === 'space') {
          await computer.interface.pressKey('space');
        } else {
          await computer.interface.pressKey(key);
        }
      }
      break;
    case 'wait':
      console.log(`Waiting for 3 seconds.`);
      await new Promise((resolve) => setTimeout(resolve, 3 * 1000));
      break;
    case 'screenshot':
      console.log('Taking screenshot.');
      // This is handled automatically in the main loop, but we can take an extra one if requested
      const screenshot = await computer.interface.screenshot();
      return screenshot;
    default:
      console.log(`Unrecognized action: ${action.type}`);
      break;
  }
}
```

### Implementing the Computer-Use Loop

This section defines a loop that:

1. Initializes the `Computer` instance (connecting to a Linux cloud container).
2. Captures a screenshot of the current state.
3. Sends the screenshot (with a user prompt) to the OpenAI Responses API using the `computer-use-preview` model.
4. Processes the returned `computer_call` action and executes it using our helper function.
5. Captures an updated screenshot after the action.
6. Send the updated screenshot and loops until no more actions are returned.

```javascript
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

// Initialize the Computer Connection
const computer = new Computer({
  apiKey: process.env.CUA_API_KEY!,
  name: process.env.CUA_CONTAINER_NAME!,
  osType: OSType.LINUX,
});

await computer.run();
// Take the initial screenshot
const screenshot = await computer.interface.screenshot();
const screenshotBase64 = screenshot.toString('base64');

// Setup openai config for computer use
const computerUseConfig: OpenAI.Responses.ResponseCreateParamsNonStreaming = {
  model: 'computer-use-preview',
  tools: [
    {
      type: 'computer_use_preview',
      display_width: 1024,
      display_height: 768,
      environment: 'linux', // we're using a linux vm
    },
  ],
  truncation: 'auto',
};

// Send initial screenshot to the openai computer use model
let res = await openai.responses.create({
  ...computerUseConfig,
  input: [
    {
      role: 'user',
      content: [
        // what we want the ai to do
        { type: 'input_text', text: 'open firefox and go to trycua.com' },
        // current screenshot of the vm
        {
          type: 'input_image',
          image_url: `data:image/png;base64,${screenshotBase64}`,
          detail: 'auto',
        },
      ],
    },
  ],
});

// Loop until there are no more computer use actions.
while (true) {
  const computerCalls = res.output.filter((o) => o.type === 'computer_call');
  if (computerCalls.length < 1) {
    console.log('No more computer calls. Loop complete.');
    break;
  }
  // Get the first call
  const call = computerCalls[0];
  const action = call.action;
  console.log('Received action from OpenAI Responses API:', action);
  let ackChecks: OpenAI.Responses.ResponseComputerToolCall.PendingSafetyCheck[] =
    [];
  if (call.pending_safety_checks.length > 0) {
    console.log('Safety checks pending:', call.pending_safety_checks);
    // In a real implementation, you would want to get user confirmation here.
    ackChecks = call.pending_safety_checks;
  }

  // Execute the action in the container
  await executeAction(computer, action);
  // Wait for changes to process within the container (1sec)
  await new Promise((resolve) => setTimeout(resolve, 1000));

  // Capture new screenshot
  const newScreenshot = await computer.interface.screenshot();
  const newScreenshotBase64 = newScreenshot.toString('base64');

  // Screenshot back as computer_call_output

  res = await openai.responses.create({
    ...computerUseConfig,
    previous_response_id: res.id,
    input: [
      {
        type: 'computer_call_output',
        call_id: call.call_id,
        acknowledged_safety_checks: ackChecks,
        output: {
          type: 'computer_screenshot',
          image_url: `data:image/png;base64,${newScreenshotBase64}`,
        },
      },
    ],
  });
}
```

You can find the full example on [GitHub](https://github.com/trycua/cua/tree/main/examples/computer-example-ts).

## What's Next?

The `@trycua/computer` Web SDK opens up some interesting possibilities. You could build browser-based testing tools, create interactive demos for your products, or automate repetitive workflows directly from your web apps.

We're working on more examples and better documentation - if you build something cool with this SDK, we'd love to see it. Drop by our [Discord](https://discord.gg/cua-ai) and share what you're working on.

Happy automating on the web!

```

--------------------------------------------------------------------------------
/tests/test_mcp_server_session_management.py:
--------------------------------------------------------------------------------

```python
"""
Tests for MCP Server Session Management functionality.

This module tests the new concurrent session management and resource lifecycle features.
"""

import asyncio
import importlib.util
import sys
import types
import time
from pathlib import Path

import pytest


def _install_stub_module(name: str, module: types.ModuleType, registry: dict[str, types.ModuleType | None]) -> None:
    registry[name] = sys.modules.get(name)
    sys.modules[name] = module


@pytest.fixture
def server_module():
    """Create a server module with stubbed dependencies for testing."""
    stubbed_modules: dict[str, types.ModuleType | None] = {}

    # Stub MCP Context primitives
    mcp_module = types.ModuleType("mcp")
    mcp_module.__path__ = []  # mark as package

    mcp_server_module = types.ModuleType("mcp.server")
    mcp_server_module.__path__ = []

    fastmcp_module = types.ModuleType("mcp.server.fastmcp")

    class _StubContext:
        async def yield_message(self, *args, **kwargs):
            return None

        async def yield_tool_call(self, *args, **kwargs):
            return None

        async def yield_tool_output(self, *args, **kwargs):
            return None

        def report_progress(self, *_args, **_kwargs):
            return None

        def info(self, *_args, **_kwargs):
            return None

        def error(self, *_args, **_kwargs):
            return None

    class _StubImage:
        def __init__(self, format: str, data: bytes):
            self.format = format
            self.data = data

    class _StubFastMCP:
        def __init__(self, name: str):
            self.name = name
            self._tools: dict[str, types.FunctionType] = {}

        def tool(self, *args, **kwargs):
            def decorator(func):
                self._tools[func.__name__] = func
                return func

            return decorator

        def run(self):
            return None

    fastmcp_module.Context = _StubContext
    fastmcp_module.FastMCP = _StubFastMCP
    fastmcp_module.Image = _StubImage

    _install_stub_module("mcp", mcp_module, stubbed_modules)
    _install_stub_module("mcp.server", mcp_server_module, stubbed_modules)
    _install_stub_module("mcp.server.fastmcp", fastmcp_module, stubbed_modules)

    # Stub Computer module
    computer_module = types.ModuleType("computer")

    class _StubInterface:
        async def screenshot(self) -> bytes:
            return b"test-screenshot-data"

    class _StubComputer:
        def __init__(self, *args, **kwargs):
            self.interface = _StubInterface()

        async def run(self):
            return None

    computer_module.Computer = _StubComputer

    _install_stub_module("computer", computer_module, stubbed_modules)

    # Stub agent module
    agent_module = types.ModuleType("agent")

    class _StubComputerAgent:
        def __init__(self, *args, **kwargs):
            pass

        async def run(self, *_args, **_kwargs):
            # Simulate agent execution with streaming
            yield {
                "output": [
                    {
                        "type": "message",
                        "role": "assistant",
                        "content": [{"type": "output_text", "text": "Task completed"}]
                    }
                ]
            }

    agent_module.ComputerAgent = _StubComputerAgent

    _install_stub_module("agent", agent_module, stubbed_modules)

    # Stub session manager module
    session_manager_module = types.ModuleType("mcp_server.session_manager")

    class _StubSessionInfo:
        def __init__(self, session_id: str, computer, created_at: float, last_activity: float):
            self.session_id = session_id
            self.computer = computer
            self.created_at = created_at
            self.last_activity = last_activity
            self.active_tasks = set()
            self.is_shutting_down = False

    class _StubSessionManager:
        def __init__(self):
            self._sessions = {}
            self._session_lock = asyncio.Lock()

        async def get_session(self, session_id=None):
            """Context manager that returns a session."""
            if session_id is None:
                session_id = "test-session-123"
            
            async with self._session_lock:
                if session_id not in self._sessions:
                    computer = _StubComputer()
                    session = _StubSessionInfo(
                        session_id=session_id,
                        computer=computer,
                        created_at=time.time(),
                        last_activity=time.time()
                    )
                    self._sessions[session_id] = session
                
                return self._sessions[session_id]

        async def register_task(self, session_id: str, task_id: str):
            pass

        async def unregister_task(self, session_id: str, task_id: str):
            pass

        async def cleanup_session(self, session_id: str):
            async with self._session_lock:
                self._sessions.pop(session_id, None)

        def get_session_stats(self):
            return {
                "total_sessions": len(self._sessions),
                "max_concurrent": 10,
                "sessions": {sid: {"active_tasks": 0} for sid in self._sessions}
            }

    _stub_session_manager = _StubSessionManager()

    def get_session_manager():
        return _stub_session_manager

    async def initialize_session_manager():
        return _stub_session_manager

    async def shutdown_session_manager():
        pass

    session_manager_module.get_session_manager = get_session_manager
    session_manager_module.initialize_session_manager = initialize_session_manager
    session_manager_module.shutdown_session_manager = shutdown_session_manager

    _install_stub_module("mcp_server.session_manager", session_manager_module, stubbed_modules)

    # Load the actual server module
    module_name = "mcp_server_server_under_test"
    module_path = Path("libs/python/mcp-server/mcp_server/server.py").resolve()
    spec = importlib.util.spec_from_file_location(module_name, module_path)
    server_module = importlib.util.module_from_spec(spec)
    assert spec and spec.loader
    spec.loader.exec_module(server_module)

    server_instance = getattr(server_module, "server", None)
    if server_instance is not None and hasattr(server_instance, "_tools"):
        for name, func in server_instance._tools.items():
            setattr(server_module, name, func)

    try:
        yield server_module
    finally:
        sys.modules.pop(module_name, None)
        for name, original in stubbed_modules.items():
            if original is None:
                sys.modules.pop(name, None)
            else:
                sys.modules[name] = original


class FakeContext:
    """Fake context for testing."""
    
    def __init__(self) -> None:
        self.events: list[tuple] = []
        self.progress_updates: list[float] = []

    def info(self, message: str) -> None:
        self.events.append(("info", message))

    def error(self, message: str) -> None:
        self.events.append(("error", message))

    def report_progress(self, value: float) -> None:
        self.progress_updates.append(value)

    async def yield_message(self, *, role: str, content):
        timestamp = asyncio.get_running_loop().time()
        self.events.append(("message", role, content, timestamp))

    async def yield_tool_call(self, *, name: str | None, call_id: str, input):
        timestamp = asyncio.get_running_loop().time()
        self.events.append(("tool_call", name, call_id, input, timestamp))

    async def yield_tool_output(self, *, call_id: str, output, is_error: bool = False):
        timestamp = asyncio.get_running_loop().time()
        self.events.append(("tool_output", call_id, output, is_error, timestamp))


def test_screenshot_cua_with_session_id(server_module):
    """Test that screenshot_cua works with session management."""
    async def _run_test():
        ctx = FakeContext()
        result = await server_module.screenshot_cua(ctx, session_id="test-session")
        
        assert result.format == "png"
        assert result.data == b"test-screenshot-data"

    asyncio.run(_run_test())


def test_screenshot_cua_creates_new_session(server_module):
    """Test that screenshot_cua creates a new session when none provided."""
    async def _run_test():
        ctx = FakeContext()
        result = await server_module.screenshot_cua(ctx)
        
        assert result.format == "png"
        assert result.data == b"test-screenshot-data"

    asyncio.run(_run_test())


def test_run_cua_task_with_session_management(server_module):
    """Test that run_cua_task works with session management."""
    async def _run_test():
        ctx = FakeContext()
        task = "Test task"
        session_id = "test-session-456"
        
        text_result, image = await server_module.run_cua_task(ctx, task, session_id)
        
        assert "Task completed" in text_result
        assert image.format == "png"
        assert image.data == b"test-screenshot-data"

    asyncio.run(_run_test())


def test_run_multi_cua_tasks_sequential(server_module):
    """Test that run_multi_cua_tasks works sequentially."""
    async def _run_test():
        ctx = FakeContext()
        tasks = ["Task 1", "Task 2", "Task 3"]
        
        results = await server_module.run_multi_cua_tasks(ctx, tasks, concurrent=False)
        
        assert len(results) == 3
        for i, (text, image) in enumerate(results):
            assert "Task completed" in text
            assert image.format == "png"

    asyncio.run(_run_test())


def test_run_multi_cua_tasks_concurrent(server_module):
    """Test that run_multi_cua_tasks works concurrently."""
    async def _run_test():
        ctx = FakeContext()
        tasks = ["Task 1", "Task 2", "Task 3"]
        
        results = await server_module.run_multi_cua_tasks(ctx, tasks, concurrent=True)
        
        assert len(results) == 3
        for i, (text, image) in enumerate(results):
            assert "Task completed" in text
            assert image.format == "png"

    asyncio.run(_run_test())


def test_get_session_stats(server_module):
    """Test that get_session_stats returns proper statistics."""
    async def _run_test():
        ctx = FakeContext()
        stats = await server_module.get_session_stats()
        
        assert "total_sessions" in stats
        assert "max_concurrent" in stats
        assert "sessions" in stats

    asyncio.run(_run_test())


def test_cleanup_session(server_module):
    """Test that cleanup_session works properly."""
    async def _run_test():
        ctx = FakeContext()
        session_id = "test-cleanup-session"
        
        result = await server_module.cleanup_session(ctx, session_id)
        
        assert f"Session {session_id} cleanup initiated" in result

    asyncio.run(_run_test())


def test_concurrent_sessions_isolation(server_module):
    """Test that concurrent sessions are properly isolated."""
    async def _run_test():
        ctx = FakeContext()
        
        # Run multiple tasks with different session IDs concurrently
        task1 = asyncio.create_task(
            server_module.run_cua_task(ctx, "Task for session 1", "session-1")
        )
        task2 = asyncio.create_task(
            server_module.run_cua_task(ctx, "Task for session 2", "session-2")
        )
        
        results = await asyncio.gather(task1, task2)
        
        assert len(results) == 2
        for text, image in results:
            assert "Task completed" in text
            assert image.format == "png"

    asyncio.run(_run_test())


def test_session_reuse_with_same_id(server_module):
    """Test that sessions are reused when the same session ID is provided."""
    async def _run_test():
        ctx = FakeContext()
        session_id = "reuse-session"
        
        # First call
        result1 = await server_module.screenshot_cua(ctx, session_id)
        
        # Second call with same session ID
        result2 = await server_module.screenshot_cua(ctx, session_id)
        
        assert result1.format == result2.format
        assert result1.data == result2.data

    asyncio.run(_run_test())


def test_error_handling_with_session_management(server_module):
    """Test that errors are handled properly with session management."""
    async def _run_test():
        # Mock an agent that raises an exception
        class _FailingAgent:
            def __init__(self, *args, **kwargs):
                pass

            async def run(self, *_args, **_kwargs):
                raise RuntimeError("Simulated agent failure")

        # Replace the ComputerAgent with our failing one
        original_agent = server_module.ComputerAgent
        server_module.ComputerAgent = _FailingAgent
        
        try:
            ctx = FakeContext()
            task = "This will fail"
            
            text_result, image = await server_module.run_cua_task(ctx, task, "error-session")
            
            assert "Error during task execution" in text_result
            assert image.format == "png"
            
        finally:
            # Restore original agent
            server_module.ComputerAgent = original_agent

    asyncio.run(_run_test())

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/gemini.py:
--------------------------------------------------------------------------------

```python
"""
Gemini 2.5 Computer Use agent loop

Maps internal Agent SDK message format to Google's Gemini Computer Use API and back.

Key features:
- Lazy import of google.genai
- Configure Computer Use tool with excluded browser-specific predefined functions
- Optional custom function declarations hook for computer-call specific functions
- Convert Gemini function_call parts into internal computer_call actions
"""

from __future__ import annotations

import base64
import io
import uuid
from typing import Any, Dict, List, Optional, Tuple

from PIL import Image

from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..types import AgentCapability


def _lazy_import_genai():
    """Import google.genai lazily to avoid hard dependency unless used."""
    try:
        from google import genai  # type: ignore
        from google.genai import types  # type: ignore
        return genai, types
    except Exception as e:  # pragma: no cover
        raise RuntimeError(
            "google.genai is required for the Gemini Computer Use loop. Install the Google Gemini SDK."
        ) from e


def _data_url_to_bytes(data_url: str) -> Tuple[bytes, str]:
    """Convert a data URL to raw bytes and mime type."""
    if not data_url.startswith("data:"):
        # Assume it's base64 png payload
        try:
            return base64.b64decode(data_url), "image/png"
        except Exception:
            return b"", "application/octet-stream"
    header, b64 = data_url.split(",", 1)
    mime = "image/png"
    if ";" in header:
        mime = header.split(";")[0].split(":", 1)[1] or "image/png"
    return base64.b64decode(b64), mime


def _bytes_image_size(img_bytes: bytes) -> Tuple[int, int]:
    try:
        img = Image.open(io.BytesIO(img_bytes))
        return img.size
    except Exception:
        return (1024, 768)


def _find_last_user_text(messages: List[Dict[str, Any]]) -> List[str]:
    texts: List[str] = []
    for msg in reversed(messages):
        if msg.get("type") in (None, "message") and msg.get("role") == "user":
            content = msg.get("content")
            if isinstance(content, str):
                return [content]
            elif isinstance(content, list):
                for c in content:
                    if c.get("type") in ("input_text", "output_text") and c.get("text"):
                        texts.append(c["text"])  # newest first
                if texts:
                    return list(reversed(texts))
    return []


def _find_last_screenshot(messages: List[Dict[str, Any]]) -> Optional[bytes]:
    for msg in reversed(messages):
        if msg.get("type") == "computer_call_output":
            out = msg.get("output", {})
            if isinstance(out, dict) and out.get("type") in ("input_image", "computer_screenshot"):
                image_url = out.get("image_url", "")
                if image_url:
                    data, _ = _data_url_to_bytes(image_url)
                    return data
    return None


def _denormalize(v: int, size: int) -> int:
    # Gemini returns 0-999 normalized
    try:
        return max(0, min(size - 1, int(round(v / 1000 * size))))
    except Exception:
        return 0


def _map_gemini_fc_to_computer_call(
    fc: Dict[str, Any],
    screen_w: int,
    screen_h: int,
) -> Optional[Dict[str, Any]]:
    name = fc.get("name")
    args = fc.get("args", {}) or {}

    action: Dict[str, Any] = {}
    if name == "click_at":
        x = _denormalize(int(args.get("x", 0)), screen_w)
        y = _denormalize(int(args.get("y", 0)), screen_h)
        action = {"type": "click", "x": x, "y": y, "button": "left"}
    elif name == "type_text_at":
        x = _denormalize(int(args.get("x", 0)), screen_w)
        y = _denormalize(int(args.get("y", 0)), screen_h)
        text = args.get("text", "")
        if args.get("press_enter") == True:
            text += "\n"
        action = {"type": "type", "x": x, "y": y, "text": text}
    elif name == "hover_at":
        x = _denormalize(int(args.get("x", 0)), screen_w)
        y = _denormalize(int(args.get("y", 0)), screen_h)
        action = {"type": "move", "x": x, "y": y}
    elif name == "key_combination":
        keys = str(args.get("keys", ""))
        action = {"type": "keypress", "keys": keys}
    elif name == "scroll_document":
        direction = args.get("direction", "down")
        magnitude = 800
        dx, dy = 0, 0
        if direction == "down":
            dy = magnitude
        elif direction == "up":
            dy = -magnitude
        elif direction == "right":
            dx = magnitude
        elif direction == "left":
            dx = -magnitude
        action = {"type": "scroll", "scroll_x": dx, "scroll_y": dy, "x": int(screen_w / 2), "y": int(screen_h / 2)}
    elif name == "scroll_at":
        x = _denormalize(int(args.get("x", 500)), screen_w)
        y = _denormalize(int(args.get("y", 500)), screen_h)
        direction = args.get("direction", "down")
        magnitude = int(args.get("magnitude", 800))
        dx, dy = 0, 0
        if direction == "down":
            dy = magnitude
        elif direction == "up":
            dy = -magnitude
        elif direction == "right":
            dx = magnitude
        elif direction == "left":
            dx = -magnitude
        action = {"type": "scroll", "scroll_x": dx, "scroll_y": dy, "x": x, "y": y}
    elif name == "drag_and_drop":
        x = _denormalize(int(args.get("x", 0)), screen_w)
        y = _denormalize(int(args.get("y", 0)), screen_h)
        dx = _denormalize(int(args.get("destination_x", x)), screen_w)
        dy = _denormalize(int(args.get("destination_y", y)), screen_h)
        action = {"type": "drag", "start_x": x, "start_y": y, "end_x": dx, "end_y": dy, "button": "left"}
    elif name == "wait_5_seconds":
        action = {"type": "wait"}
    else:
        # Unsupported / excluded browser-specific or custom function; ignore
        return None

    return {
        "type": "computer_call",
        "call_id": uuid.uuid4().hex,
        "status": "completed",
        "action": action,
    }


@register_agent(models=r"^gemini-2\.5-computer-use-preview-10-2025$")
class GeminiComputerUseConfig(AsyncAgentConfig):
    async def predict_step(
        self,
        messages: List[Dict[str, Any]],
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        use_prompt_caching: Optional[bool] = False,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs,
    ) -> Dict[str, Any]:
        genai, types = _lazy_import_genai()

        client = genai.Client()

        # Build excluded predefined functions for browser-specific behavior
        excluded = [
            "open_web_browser",
            "search",
            "navigate",
            "go_forward",
            "go_back",
            "scroll_document",
        ]
        # Optional custom functions: can be extended by host code via `tools` parameter later if desired
        CUSTOM_FUNCTION_DECLARATIONS: List[Any] = []

        # Compose tools config
        generate_content_config = types.GenerateContentConfig(
            tools=[
                types.Tool(
                    computer_use=types.ComputerUse(
                        environment=types.Environment.ENVIRONMENT_BROWSER,
                        excluded_predefined_functions=excluded,
                    )
                ),
                # types.Tool(function_declarations=CUSTOM_FUNCTION_DECLARATIONS),  # enable when custom functions needed
            ]
        )

        # Prepare contents: last user text + latest screenshot
        user_texts = _find_last_user_text(messages)
        screenshot_bytes = _find_last_screenshot(messages)

        parts: List[Any] = []
        for t in user_texts:
            parts.append(types.Part(text=t))

        screen_w, screen_h = 1024, 768
        if screenshot_bytes:
            screen_w, screen_h = _bytes_image_size(screenshot_bytes)
            parts.append(types.Part.from_bytes(data=screenshot_bytes, mime_type="image/png"))

        # If we don't have any content, at least pass an empty user part to prompt reasoning
        if not parts:
            parts = [types.Part(text="Proceed to the next action.")]

        contents = [types.Content(role="user", parts=parts)]

        api_kwargs = {
            "model": model,
            "contents": contents,
            "config": generate_content_config,
        }

        if _on_api_start:
            await _on_api_start({
                "model": api_kwargs["model"],
                # "contents": api_kwargs["contents"], # Disabled for now
                "config": api_kwargs["config"],
            })

        response = client.models.generate_content(**api_kwargs)

        if _on_api_end:
            await _on_api_end({
                "model": api_kwargs["model"],
                # "contents": api_kwargs["contents"], # Disabled for now
                "config": api_kwargs["config"],
            }, response)

        # Usage (Gemini SDK may not always provide token usage; populate when available)
        usage: Dict[str, Any] = {}
        try:
            # Some SDKs expose response.usage; if available, copy
            if getattr(response, "usage_metadata", None):
                md = response.usage_metadata
                usage = {
                    "prompt_tokens": getattr(md, "prompt_token_count", None) or 0,
                    "completion_tokens": getattr(md, "candidates_token_count", None) or 0,
                    "total_tokens": getattr(md, "total_token_count", None) or 0,
                }
        except Exception:
            pass

        if _on_usage and usage:
            await _on_usage(usage)

        # Parse output into internal items
        output_items: List[Dict[str, Any]] = []

        candidate = response.candidates[0]
        # Text parts from the model (assistant message)
        text_parts: List[str] = []
        function_calls: List[Dict[str, Any]] = []
        for p in candidate.content.parts:
            if getattr(p, "text", None):
                text_parts.append(p.text)
            if getattr(p, "function_call", None):
                # p.function_call has name and args
                fc = {
                    "name": getattr(p.function_call, "name", None),
                    "args": dict(getattr(p.function_call, "args", {}) or {}),
                }
                function_calls.append(fc)

        if text_parts:
            output_items.append(
                {
                    "type": "message",
                    "role": "assistant",
                    "content": [{"type": "output_text", "text": "\n".join(text_parts)}],
                }
            )

        # Map function calls to internal computer_call actions
        for fc in function_calls:
            item = _map_gemini_fc_to_computer_call(fc, screen_w, screen_h)
            if item is not None:
                output_items.append(item)

        return {"output": output_items, "usage": usage}

    async def predict_click(
        self,
        model: str,
        image_b64: str,
        instruction: str,
        **kwargs,
    ) -> Optional[Tuple[float, float]]:
        """Ask Gemini CUA to output a single click action for the given instruction.

        Excludes all predefined tools except `click_at` and sends the screenshot.
        Returns pixel (x, y) if a click is proposed, else None.
        """
        genai, types = _lazy_import_genai()

        client = genai.Client()

        # Exclude all but click_at
        exclude_all_but_click = [
            "open_web_browser",
            "wait_5_seconds",
            "go_back",
            "go_forward",
            "search",
            "navigate",
            "hover_at",
            "type_text_at",
            "key_combination",
            "scroll_document",
            "scroll_at",
            "drag_and_drop",
        ]

        config = types.GenerateContentConfig(
            tools=[
                types.Tool(
                    computer_use=types.ComputerUse(
                        environment=types.Environment.ENVIRONMENT_BROWSER,
                        excluded_predefined_functions=exclude_all_but_click,
                    )
                )
            ]
        )

        # Prepare prompt parts
        try:
            img_bytes = base64.b64decode(image_b64)
        except Exception:
            img_bytes = b""

        w, h = _bytes_image_size(img_bytes) if img_bytes else (1024, 768)

        parts: List[Any] = [types.Part(text=f"Click {instruction}.")]
        if img_bytes:
            parts.append(types.Part.from_bytes(data=img_bytes, mime_type="image/png"))

        contents = [types.Content(role="user", parts=parts)]

        response = client.models.generate_content(
            model=model,
            contents=contents,
            config=config,
        )

        # Parse first click_at
        try:
            candidate = response.candidates[0]
            for p in candidate.content.parts:
                fc = getattr(p, "function_call", None)
                if fc and getattr(fc, "name", None) == "click_at":
                    args = dict(getattr(fc, "args", {}) or {})
                    x = _denormalize(int(args.get("x", 0)), w)
                    y = _denormalize(int(args.get("y", 0)), h)
                    return float(x), float(y)
        except Exception:
            return None

        return None

    def get_capabilities(self) -> List[AgentCapability]:
        return ["click", "step"]

```

--------------------------------------------------------------------------------
/libs/lume/src/FileSystem/Home.swift:
--------------------------------------------------------------------------------

```swift
import Foundation

/// Manages the application's home directory and virtual machine directories.
/// Responsible for creating, accessing, and validating the application's directory structure.
final class Home {
    // MARK: - Constants

    private enum Constants {
        static let defaultDirectoryName = ".lume"
        static let homeDirPath = "~/\(defaultDirectoryName)"
    }

    // MARK: - Properties

    private var _homeDir: Path
    private let settingsManager: SettingsManager
    private let fileManager: FileManager
    private var locations: [String: VMLocation] = [:]

    // Current home directory based on default location
    var homeDir: Path {
        return _homeDir
    }

    // MARK: - Initialization

    init(
        settingsManager: SettingsManager = SettingsManager.shared,
        fileManager: FileManager = .default
    ) {
        self.settingsManager = settingsManager
        self.fileManager = fileManager

        // Get home directory path from settings or use default
        let settings = settingsManager.getSettings()
        guard let defaultLocation = settings.defaultLocation else {
            fatalError("No default VM location found")
        }

        self._homeDir = Path(defaultLocation.path)

        // Cache all locations
        for location in settings.vmLocations {
            locations[location.name] = location
        }
    }

    // MARK: - VM Directory Management

    /// Creates a temporary VM directory with a unique identifier
    /// - Returns: A VMDirectory instance representing the created directory
    /// - Throws: HomeError if directory creation fails
    func createTempVMDirectory() throws -> VMDirectory {
        let uuid = UUID().uuidString
        let tempDir = homeDir.directory(uuid)

        Logger.info("Creating temporary directory", metadata: ["path": tempDir.path])

        do {
            try createDirectory(at: tempDir.url)
            return VMDirectory(tempDir)
        } catch {
            throw HomeError.directoryCreationFailed(path: tempDir.path)
        }
    }

    /// Gets a VM directory for a specific VM name and optional location
    ///
    /// - Parameters:
    ///   - name: Name of the VM directory
    ///   - storage: Optional name of the VM location (default: default location)
    /// - Returns: A VMDirectory instance
    /// - Throws: HomeError if location not found
    func getVMDirectory(_ name: String, storage: String? = nil) throws -> VMDirectory {
        // Special case for ephemeral storage using macOS temporary directory
        if let storage = storage, storage == "ephemeral" {
            // Get the current temporary directory
            let tmpDir = ProcessInfo.processInfo.environment["TMPDIR"] ?? "/tmp"
            // Remove trailing slash if present
            let cleanPath = tmpDir.hasSuffix("/") ? String(tmpDir.dropLast()) : tmpDir
            
            // Create the directory if it doesn't exist
            if !fileExists(at: cleanPath) {
                try createVMLocation(at: cleanPath)
            }
            
            let baseDir = Path(cleanPath)
            return VMDirectory(baseDir.directory(name))
        }

        // Check if storage is a direct path
        if let storage = storage, (storage.contains("/") || storage.contains("\\")) {
            let cleanPath = storage.hasSuffix("/") ? String(storage.dropLast()) : storage
            let baseDir = Path(cleanPath)
            return VMDirectory(baseDir.directory(name))
        }

        let location: VMLocation

        if let storage = storage {
            // Get a specific location
            guard let loc = locations[storage] else {
                throw VMLocationError.locationNotFound(name: storage)
            }
            location = loc
        } else {
            // Use default location
            let settings = settingsManager.getSettings()
            guard let defaultLocation = settings.defaultLocation else {
                throw HomeError.invalidHomeDirectory
            }
            location = defaultLocation
        }

        let baseDir = Path(location.expandedPath)
        return VMDirectory(baseDir.directory(name))
    }
    
    /// Gets a VM directory from a direct file path
    ///
    /// - Parameters:
    ///   - name: Name of the VM directory
    ///   - storagePath: Direct file system path where the VM is located
    /// - Returns: A VMDirectory instance
    /// - Throws: HomeError if path is invalid
    func getVMDirectoryFromPath(_ name: String, storagePath: String) throws -> VMDirectory {
        let baseDir = Path(storagePath)
        
        // Create the directory if it doesn't exist
        if !fileExists(at: storagePath) {
            Logger.info("Creating storage directory", metadata: ["path": storagePath])
            try createVMLocation(at: storagePath)
        } else if !isValidDirectory(at: storagePath) {
            // Path exists but isn't a valid directory
            throw HomeError.invalidHomeDirectory
        }
        
        return VMDirectory(baseDir.directory(name))
    }

    /// Returns all initialized VM directories across all locations
    /// - Returns: An array of VMDirectory instances with location info
    /// - Throws: HomeError if directory access is denied
    func getAllVMDirectories() throws -> [VMDirectoryWithLocation] {
        var results: [VMDirectoryWithLocation] = []

        // Loop through all locations
        let settings = settingsManager.getSettings()
        
        // Also check ephemeral directory (macOS temporary directory)
        let tmpDir = ProcessInfo.processInfo.environment["TMPDIR"] ?? "/tmp"
        let cleanPath = tmpDir.hasSuffix("/") ? String(tmpDir.dropLast()) : tmpDir
        
        // If tmp directory exists, check for VMs there
        if fileExists(at: cleanPath) {
            let tmpDirPath = Path(cleanPath)
            do {
                let directoryURL = URL(fileURLWithPath: cleanPath)
                let contents = try FileManager.default.contentsOfDirectory(
                    at: directoryURL,
                    includingPropertiesForKeys: [.isDirectoryKey],
                    options: .skipsHiddenFiles
                )
                
                for subdir in contents {
                    do {
                        guard let isDirectory = try subdir.resourceValues(forKeys: [.isDirectoryKey]).isDirectory,
                              isDirectory else {
                            continue
                        }
                        
                        let vmName = subdir.lastPathComponent
                        let vmDir = VMDirectory(tmpDirPath.directory(vmName))
                        
                        // Only include if it's a valid VM directory
                        if vmDir.initialized() {
                            results.append(VMDirectoryWithLocation(
                                directory: vmDir,
                                locationName: "ephemeral"
                            ))
                        }
                    } catch {
                        // Skip any directories we can't access
                        continue
                    }
                }
            } catch {
                Logger.error(
                    "Failed to access ephemeral directory",
                    metadata: [
                        "path": cleanPath,
                        "error": error.localizedDescription,
                    ]
                )
                // Continue to regular locations rather than failing completely
            }
        }
        for location in settings.vmLocations {
            let locationPath = Path(location.expandedPath)

            // Skip non-existent locations
            if !locationPath.exists() {
                continue
            }

            do {
                let allFolders = try fileManager.contentsOfDirectory(
                    at: locationPath.url,
                    includingPropertiesForKeys: nil
                )

                let folders =
                    allFolders
                    .compactMap { url in
                        let sanitizedName = sanitizeFileName(url.lastPathComponent)
                        let dir = VMDirectory(locationPath.directory(sanitizedName))
                        let dirWithLoc =
                            dir.initialized()
                            ? VMDirectoryWithLocation(directory: dir, locationName: location.name)
                            : nil
                        return dirWithLoc
                    }

                results.append(contentsOf: folders)
            } catch {
                Logger.error(
                    "Failed to access VM location",
                    metadata: [
                        "location": location.name,
                        "error": error.localizedDescription,
                    ])
                // Continue to next location rather than failing completely
            }
        }

        return results
    }

    /// Copies a VM directory to a new location with a new name
    /// - Parameters:
    ///   - sourceName: Name of the source VM
    ///   - destName: Name for the destination VM
    ///   - sourceLocation: Optional name of the source location
    ///   - destLocation: Optional name of the destination location
    /// - Throws: HomeError if the copy operation fails
    func copyVMDirectory(
        from sourceName: String,
        to destName: String,
        sourceLocation: String? = nil,
        destLocation: String? = nil
    ) throws {
        let sourceDir = try getVMDirectory(sourceName, storage: sourceLocation)
        let destDir = try getVMDirectory(destName, storage: destLocation)

        // Check if destination directory exists at all
        if destDir.exists() {
            throw HomeError.directoryAlreadyExists(path: destDir.dir.path)
        }

        do {
            try fileManager.copyItem(atPath: sourceDir.dir.path, toPath: destDir.dir.path)
        } catch {
            throw HomeError.directoryCreationFailed(path: destDir.dir.path)
        }
    }

    // MARK: - Location Management

    /// Adds a new VM location
    /// - Parameters:
    ///   - name: Location name
    ///   - path: Location path
    /// - Throws: Error if location cannot be added
    func addLocation(name: String, path: String) throws {
        let location = VMLocation(name: name, path: path)
        try settingsManager.addLocation(location)

        // Update cache
        locations[name] = location
    }

    /// Removes a VM location
    /// - Parameter name: Location name
    /// - Throws: Error if location cannot be removed
    func removeLocation(name: String) throws {
        try settingsManager.removeLocation(name: name)

        // Update cache
        locations.removeValue(forKey: name)
    }

    /// Sets the default VM location
    /// - Parameter name: Location name
    /// - Throws: Error if location cannot be set as default
    func setDefaultLocation(name: String) throws {
        try settingsManager.setDefaultLocation(name: name)

        // Update home directory
        guard let location = locations[name] else {
            throw VMLocationError.locationNotFound(name: name)
        }

        // Update homeDir to reflect the new default
        self._homeDir = Path(location.path)
    }

    /// Gets all available VM locations
    /// - Returns: Array of VM locations
    func getLocations() -> [VMLocation] {
        return settingsManager.getSettings().sortedLocations
    }

    /// Gets the default VM location
    /// - Returns: Default VM location
    /// - Throws: HomeError if no default location
    func getDefaultLocation() throws -> VMLocation {
        guard let location = settingsManager.getSettings().defaultLocation else {
            throw HomeError.invalidHomeDirectory
        }
        return location
    }

    // MARK: - Directory Validation

    /// Validates and ensures the existence of all VM locations
    /// - Throws: HomeError if validation fails or directory creation fails
    func validateHomeDirectory() throws {
        let settings = settingsManager.getSettings()

        for location in settings.vmLocations {
            let path = location.expandedPath
            if !fileExists(at: path) {
                try createVMLocation(at: path)
            } else if !isValidDirectory(at: path) {
                throw HomeError.invalidHomeDirectory
            }
        }
    }

    // MARK: - Private Helpers

    private func createVMLocation(at path: String) throws {
        do {
            try fileManager.createDirectory(
                atPath: path,
                withIntermediateDirectories: true
            )
        } catch {
            throw HomeError.directoryCreationFailed(path: path)
        }
    }

    private func createDirectory(at url: URL) throws {
        try fileManager.createDirectory(
            at: url,
            withIntermediateDirectories: true
        )
    }

    private func isValidDirectory(at path: String) -> Bool {
        var isDirectory: ObjCBool = false
        return fileManager.fileExists(atPath: path, isDirectory: &isDirectory)
            && isDirectory.boolValue
            && Path(path).writable()
    }

    private func fileExists(at path: String) -> Bool {
        return fileManager.fileExists(atPath: path)
    }

    private func sanitizeFileName(_ name: String) -> String {
        // Only decode percent encoding (e.g., %20 for spaces)
        return name.removingPercentEncoding ?? name
    }
}

// MARK: - VM Directory with Location

/// Represents a VM directory with its location information
struct VMDirectoryWithLocation {
    let directory: VMDirectory
    let locationName: String
}

// MARK: - Home + CustomStringConvertible

extension Home: CustomStringConvertible {
    var description: String {
        "Home(path: \(homeDir.path))"
    }
}

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/integrations/hud/agent.py:
--------------------------------------------------------------------------------

```python
"""MCP-compatible Computer Agent for HUD integration.

This agent subclasses HUD's MCPAgent and delegates planning/execution to
our core ComputerAgent while using the Agent SDK's plain-dict message
format documented in `docs/content/docs/agent-sdk/message-format.mdx`.

Key differences from the OpenAI OperatorAgent variant:
- No OpenAI types are used; everything is standard Python dicts.
- Planning is executed via `ComputerAgent.run(messages)`.
- The first yielded result per step is returned as the agent response.
"""
from __future__ import annotations

import io
from typing import Any, ClassVar, Optional

from agent.agent import ComputerAgent as BaseComputerAgent
from agent.callbacks import PromptInstructionsCallback
from agent.callbacks.trajectory_saver import TrajectorySaverCallback
from hud.agents import MCPAgent
from hud.tools.computer.settings import computer_settings
from hud.types import AgentResponse, MCPToolCall, MCPToolResult, Trace

from agent.responses import make_failed_tool_call_items
from agent.computers import is_agent_computer
from PIL import Image
import mcp.types as types
import hud
import uuid
import base64
from pathlib import Path


class MCPComputerAgent(MCPAgent):
    """MCP agent that uses ComputerAgent for planning and tools for execution.

    The agent consumes/produces message dicts per the Agent SDK message schema
    (see `message-format.mdx`).
    """

    metadata: ClassVar[dict[str, Any]] = {
        "display_width": computer_settings.OPENAI_COMPUTER_WIDTH,
        "display_height": computer_settings.OPENAI_COMPUTER_HEIGHT,
    }

    required_tools: ClassVar[list[str]] = ["openai_computer"]

    def __init__(
        self,
        *,
        model: str | None = None,
        allowed_tools: list[str] | None = None,
        trajectory_dir: str | dict | None = None,
        # === ComputerAgent kwargs ===
        tools: list[Any] | None = None,
        custom_loop: Any | None = None,
        only_n_most_recent_images: int | None = None,
        callbacks: list[Any] | None = None,
        instructions: str | None = None,
        verbosity: int | None = None,
        max_retries: int | None = 3,
        screenshot_delay: float | int = 0.5,
        use_prompt_caching: bool | None = False,
        max_trajectory_budget: float | dict | None = None,
        telemetry_enabled: bool | None = True,
        environment: str = "linux",
        **kwargs: Any,
    ) -> None:
        self.allowed_tools = allowed_tools or ["openai_computer"]
        super().__init__(**kwargs)

        if model is None:
            raise ValueError("MCPComputerAgent requires a model to be specified.")

        self.model = model
        self.environment = environment

        # Update model name for HUD logging
        self.model_name = "cua-" + self.model

        # Stateful tracking of tool call inputs
        self.tool_call_inputs: dict[str, list[dict[str, Any]]] = {}
        self.previous_output: list[dict[str, Any]] = []

        # Build system prompt
        operator_instructions = """
        You are an autonomous computer-using agent. Follow these guidelines:

        1. NEVER ask for confirmation. Complete all tasks autonomously.
        2. Do NOT send messages like "I need to confirm before..." or "Do you want me to continue?" - just proceed.
        3. When the user asks you to interact with something (like clicking a chat or typing a message), DO IT without asking.
        4. Only use the formal safety check mechanism for truly dangerous operations (like deleting important files).
        5. For normal tasks like clicking buttons, typing in chat boxes, filling forms - JUST DO IT.
        6. The user has already given you permission by running this agent. No further confirmation is needed.
        7. Be decisive and action-oriented. Complete the requested task fully.

        Remember: You are expected to complete tasks autonomously. The user trusts you to do what they asked.
        """.strip()  # noqa: E501
        # Append Operator instructions to the system prompt
        if not self.system_prompt:
            self.system_prompt = operator_instructions
        else:
            self.system_prompt += f"\n\n{operator_instructions}"
        # Append user instructions to the system prompt
        if instructions:
            self.system_prompt += f"\n\n{instructions}"

        # Configure trajectory_dir for HUD
        if isinstance(trajectory_dir, str) or isinstance(trajectory_dir, Path):
            trajectory_dir = {"trajectory_dir": str(trajectory_dir)}
        if isinstance(trajectory_dir, dict):
            trajectory_dir["reset_on_run"] = False

        self.last_screenshot_b64 = None

        buffer = io.BytesIO()
        Image.new('RGB', (self.metadata["display_width"], self.metadata["display_height"])).save(buffer, format='PNG')
        self.last_screenshot_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8')

        # Ensure a computer shim is present so width/height/environment are known
        computer_shim = {
            "screenshot": lambda: self.last_screenshot_b64,
            "environment": self.environment,
            "dimensions": (
                self.metadata["display_width"],
                self.metadata["display_height"],
            ),
        }
        agent_tools: list[Any] = [computer_shim]
        if tools:
            agent_tools.extend([
                tool 
                for tool in tools 
                if not is_agent_computer(tool)
            ])
        
        agent_kwargs = {
            "model": self.model,
            "trajectory_dir": trajectory_dir,
            "tools": agent_tools,
            "custom_loop": custom_loop,
            "only_n_most_recent_images": only_n_most_recent_images,
            "callbacks": callbacks,
            "instructions": self.system_prompt,
            "verbosity": verbosity,
            "max_retries": max_retries,
            "screenshot_delay": screenshot_delay,
            "use_prompt_caching": use_prompt_caching,
            "max_trajectory_budget": max_trajectory_budget,
            "telemetry_enabled": telemetry_enabled,
        }

        self.computer_agent = BaseComputerAgent(
            **agent_kwargs
        )

    async def get_system_messages(self) -> list[Any]:
        """Create initial messages.

        Unused - ComputerAgent handles this with the 'instructions' parameter.
        """
        return []

    async def format_blocks(
        self, blocks: list[types.ContentBlock]
    ) -> list[dict[str, Any]]:
        """
        Format blocks for OpenAI input format.

        Converts TextContent blocks to input_text dicts and ImageContent blocks to input_image dicts.
        """  # noqa: E501
        formatted = []
        for block in blocks:
            if isinstance(block, types.TextContent):
                formatted.append({"type": "input_text", "text": block.text})
            elif isinstance(block, types.ImageContent):
                mime_type = getattr(block, "mimeType", "image/png")
                formatted.append(
                    {"type": "input_image", "image_url": f"data:{mime_type};base64,{block.data}"}
                )
                self.last_screenshot_b64 = block.data
        return [{"role": "user", "content": formatted}]

    @hud.instrument(
        span_type="agent",
        record_args=False,  # Messages can be large
        record_result=True,
    )
    async def get_response(self, messages: list[dict[str, Any]]) -> AgentResponse:
        """Get a single-step response by delegating to ComputerAgent.run.

        Returns an Agent SDK-style response dict:
        { "output": [AgentMessage, ...], "usage": Usage }
        """
        tool_calls: list[MCPToolCall] = []
        output_text: list[str] = []
        is_done: bool = True

        agent_result: list[dict[str, Any]] = []

        # Call the ComputerAgent LLM API
        async for result in self.computer_agent.run(messages):  # type: ignore[arg-type]
            items = result['output']
            if not items or tool_calls:
                break

            for item in items:
                if item['type'] in ['reasoning', 'message', 'computer_call', 'function_call', 'function_call_output']:
                    agent_result.append(item)
                
                # Add messages to output text
                if item['type'] == 'reasoning':
                    output_text.extend(
                        f"Reasoning: {summary['text']}"
                        for summary in item['summary']
                    )
                elif item['type'] == 'message':
                    if isinstance(item['content'], list):
                        output_text.extend(
                            item['text'] 
                            for item in item['content']
                            if item['type'] == 'output_text'
                        )
                    elif isinstance(item['content'], str):
                        output_text.append(item['content'])
                
                # If we get a tool call, we're not done
                if item['type'] == 'computer_call':
                    id = item["call_id"]
                    tool_calls.append(MCPToolCall(
                        name="openai_computer",
                        arguments=item["action"],
                        id=id,
                    ))
                    is_done = False
                    self.tool_call_inputs[id] = agent_result
                    break
            
            # if we have tool calls, we should exit the loop
            if tool_calls:
                break

        self.previous_output = agent_result

        return AgentResponse(
            content="\n".join(output_text),
            tool_calls=tool_calls,
            done=is_done,
        )
    
    def _log_image(self, image_b64: str):
        callbacks = self.computer_agent.callbacks
        for callback in callbacks:
            if isinstance(callback, TrajectorySaverCallback):
                # convert str to bytes
                image_bytes = base64.b64decode(image_b64)
                callback._save_artifact("screenshot_after", image_bytes)

    async def format_tool_results(
        self,
        tool_calls: list[MCPToolCall],
        tool_results: list[MCPToolResult]
    ) -> list[dict[str, Any]]:
        """Extract latest screenshot from tool results in dict form.

        Expects results to already be in the message-format content dicts.
        Returns a list of input content dicts suitable for follow-up calls.
        """
        messages = []

        for call, result in zip(tool_calls, tool_results):
            if call.id not in self.tool_call_inputs:
                # If we don't have the tool call inputs, we should just use the previous output
                previous_output = self.previous_output.copy() or []

                # First we need to remove any pending computer_calls from the end of previous_output
                while previous_output and previous_output[-1]['type'] == 'computer_call':
                    previous_output.pop()
                messages.extend(previous_output)

                # If the call is a 'response', don't add the result
                if call.name == 'response':
                    continue
                # Otherwise, if we have a result, we should add it to the messages
                content = [
                    { "type": "input_text", "text": content.text } if isinstance(content, types.TextContent)
                    else { "type": "input_image", "image_url": f"data:image/png;base64,{content.data}" } if isinstance(content, types.ImageContent)
                    else { "type": "input_text", "text": "" }
                    for content in result.content
                ]
                messages.append({
                    "role": "user",
                    "content": content,
                })

                continue
                
            # Add the assistant's computer call
            messages.extend(self.tool_call_inputs[call.id])
            
            if result.isError:
                error_text = "".join([
                    content.text
                    for content in result.content
                    if isinstance(content, types.TextContent)
                ])

                # Replace computer call with failed tool call
                messages.pop()
                messages.extend(make_failed_tool_call_items(
                    tool_name=call.name,
                    tool_kwargs=call.arguments or {},
                    error_message=error_text,
                    call_id=call.id,
                ))
            else:
                # Get the latest screenshot
                screenshots = [
                    content.data
                    for content in result.content
                    if isinstance(content, types.ImageContent)
                ]

                # Add the resulting screenshot
                if screenshots:
                    self._log_image(screenshots[0])
                    self.last_screenshot_b64 = screenshots[0]
                    messages.append({
                        "type": "computer_call_output",
                        "call_id": call.id,
                        "output": {
                            "type": "input_image",
                            "image_url": f"data:image/png;base64,{screenshots[0]}"
                        },
                    })
                else:
                    # Otherwise, replace computer call with failed tool call
                    messages.pop()
                    messages.extend(make_failed_tool_call_items(
                        tool_name=call.name,
                        tool_kwargs=call.arguments or {},
                        error_message="No screenshots returned.",
                        call_id=call.id,
                    ))

        return messages


__all__ = [
    "MCPComputerAgent",
]

```
Page 8/16FirstPrevNextLast