#
tokens: 45031/50000 6/513 files (page 12/16)
lines: off (toggle) GitHub
raw markdown copy
This is page 12 of 16. Use http://codebase.md/trycua/cua?page={x} to view the full context.

# Directory Structure

```
├── .all-contributorsrc
├── .cursorignore
├── .devcontainer
│   ├── devcontainer.json
│   ├── post-install.sh
│   └── README.md
├── .dockerignore
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── ci-lume.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-pylume.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       └── test-validation-script.yml
├── .gitignore
├── .vscode
│   ├── docs.code-workspace
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── composite-agents.md
│   ├── cua-hackathon.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .gitignore
│   ├── .prettierrc
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   └── meta.json
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── meta.json
│   │       │   └── sandboxed-python.mdx
│   │       ├── index.mdx
│   │       ├── libraries
│   │       │   ├── agent
│   │       │   │   └── index.mdx
│   │       │   ├── computer
│   │       │   │   └── index.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── core
│   │       │   │   └── index.mdx
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   └── som
│   │       │       ├── configuration.mdx
│   │       │       └── index.mdx
│   │       ├── meta.json
│   │       ├── quickstart-cli.mdx
│   │       ├── quickstart-devs.mdx
│   │       └── telemetry.mdx
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   └── llms.txt
│   │   │       └── route.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── iou.tsx
│   │   │   └── mermaid.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   └── mdx-components.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── .prettierrc
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   └── uitars.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   └── test_connection.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── scripts
│   │   │       ├── install_mcp_server.sh
│   │   │       └── start_mcp_server.sh
│   │   ├── pylume
│   │   │   ├── __init__.py
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── pylume
│   │   │   │   ├── __init__.py
│   │   │   │   ├── client.py
│   │   │   │   ├── exceptions.py
│   │   │   │   ├── lume
│   │   │   │   ├── models.py
│   │   │   │   ├── pylume.py
│   │   │   │   └── server.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           └── test_omniparser.py
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── biome.json
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Dockerfile
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── pylume_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── pdm.lock
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── samples
│   └── community
│       ├── global-online
│       │   └── README.md
│       └── hack-the-north
│           └── README.md
├── scripts
│   ├── build-uv.sh
│   ├── build.ps1
│   ├── build.sh
│   ├── cleanup.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   └── run-docker-dev.sh
└── tests
    ├── pytest.ini
    ├── shell_cmd.py
    ├── test_files.py
    ├── test_mcp_server_session_management.py
    ├── test_mcp_server_streaming.py
    ├── test_shell_bash.py
    ├── test_telemetry.py
    ├── test_venv.py
    └── test_watchdog.py
```

# Files

--------------------------------------------------------------------------------
/docs/content/docs/libraries/lume/http-api.mdx:
--------------------------------------------------------------------------------

```markdown
---
title: HTTP Server API
description: Lume exposes a local HTTP API server that listens at localhost for programmatic management of VMs.
---

import { Tabs, Tab } from 'fumadocs-ui/components/tabs';
import { Callout } from 'fumadocs-ui/components/callout';

## Default URL

```
http://localhost:7777
```

<Callout type="info">
  The HTTP API service runs on port `7777` by default. If you'd like to use a
  different port, pass the `--port` option during installation or when running
  `lume serve`.
</Callout>

## Endpoints

---

### Create VM

Create a new virtual machine.

`POST: /lume/vms`

#### Parameters

| Name     | Type    | Required | Description                          |
| -------- | ------- | -------- | ------------------------------------ |
| name     | string  | Yes      | Name of the VM                       |
| os       | string  | Yes      | Guest OS (`macOS`, `linux`, etc.)    |
| cpu      | integer | Yes      | Number of CPU cores                  |
| memory   | string  | Yes      | Memory size (e.g. `4GB`)             |
| diskSize | string  | Yes      | Disk size (e.g. `64GB`)              |
| display  | string  | No       | Display resolution (e.g. `1024x768`) |
| ipsw     | string  | No       | IPSW version (e.g. `latest`)         |
| storage  | string  | No       | Storage type (`ssd`, etc.)           |

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X POST \
  -H "Content-Type: application/json" \
  -d '{
    "name": "lume_vm",
    "os": "macOS",
    "cpu": 2,
    "memory": "4GB",
    "diskSize": "64GB",
    "display": "1024x768",
    "ipsw": "latest",
    "storage": "ssd"
  }' \
  http://localhost:7777/lume/vms
```

  </Tab>
  <Tab value="Python">

```python
import requests

payload = {
    "name": "lume_vm",
    "os": "macOS",
    "cpu": 2,
    "memory": "4GB",
    "diskSize": "64GB",
    "display": "1024x768",
    "ipsw": "latest",
    "storage": "ssd"
}
r = requests.post("http://localhost:7777/lume/vms", json=payload, timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const payload = {
  name: 'lume_vm',
  os: 'macOS',
  cpu: 2,
  memory: '4GB',
  diskSize: '64GB',
  display: '1024x768',
  ipsw: 'latest',
  storage: 'ssd',
};

const res = await fetch('http://localhost:7777/lume/vms', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify(payload),
});
console.log(await res.json());
```

  </Tab>
</Tabs>

---

### Run VM

Run a virtual machine instance.

`POST: /lume/vms/:name/run`

#### Parameters

| Name              | Type            | Required | Description                                         |
| ----------------- | --------------- | -------- | --------------------------------------------------- |
| noDisplay         | boolean         | No       | If true, do not start VNC client                    |
| sharedDirectories | array of object | No       | List of shared directories (`hostPath`, `readOnly`) |
| recoveryMode      | boolean         | No       | Start in recovery mode                              |
| storage           | string          | No       | Storage type (`ssd`, etc.)                          |

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
# Basic run
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X POST \
  http://localhost:7777/lume/vms/my-vm-name/run

# Run with VNC client started and shared directory
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X POST \
  -H "Content-Type: application/json" \
  -d '{
    "noDisplay": false,
    "sharedDirectories": [
      {
        "hostPath": "~/Projects",
        "readOnly": false
      }
    ],
    "recoveryMode": false,
    "storage": "ssd"
  }' \
  http://localhost:7777/lume/vms/lume_vm/run
```

  </Tab>
  <Tab value="Python">

```python
import requests

# Basic run
r = requests.post("http://localhost:7777/lume/vms/my-vm-name/run", timeout=50)
print(r.json())

# With VNC and shared directory
payload = {
    "noDisplay": False,
    "sharedDirectories": [
        {"hostPath": "~/Projects", "readOnly": False}
    ],
    "recoveryMode": False,
    "storage": "ssd"
}
r = requests.post("http://localhost:7777/lume/vms/lume_vm/run", json=payload, timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
// Basic run
let res = await fetch('http://localhost:7777/lume/vms/my-vm-name/run', {
  method: 'POST',
});
console.log(await res.json());

// With VNC and shared directory
const payload = {
  noDisplay: false,
  sharedDirectories: [{ hostPath: '~/Projects', readOnly: false }],
  recoveryMode: false,
  storage: 'ssd',
};
res = await fetch('http://localhost:7777/lume/vms/lume_vm/run', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify(payload),
});
console.log(await res.json());
```

  </Tab>
</Tabs>

---

### List VMs

List all virtual machines.

`GET: /lume/vms`

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  http://localhost:7777/lume/vms
```

  </Tab>
  <Tab value="Python">

```python
import requests

r = requests.get("http://localhost:7777/lume/vms", timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const res = await fetch('http://localhost:7777/lume/vms');
console.log(await res.json());
```

  </Tab>
</Tabs>

```json
[
  {
    "name": "my-vm",
    "state": "stopped",
    "os": "macOS",
    "cpu": 2,
    "memory": "4GB",
    "diskSize": "64GB"
  },
  {
    "name": "my-vm-2",
    "state": "stopped",
    "os": "linux",
    "cpu": 2,
    "memory": "4GB",
    "diskSize": "64GB"
  }
]
```

---

### Get VM Details

Get details for a specific virtual machine.

`GET: /lume/vms/:name`

#### Parameters

| Name    | Type   | Required | Description                |
| ------- | ------ | -------- | -------------------------- |
| storage | string | No       | Storage type (`ssd`, etc.) |

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
# Basic get
curl --connect-timeout 6000 \
  --max-time 5000 \
  http://localhost:7777/lume/vms/lume_vm

# Get with specific storage
curl --connect-timeout 6000 \
  --max-time 5000 \
  http://localhost:7777/lume/vms/lume_vm?storage=ssd
```

  </Tab>
  <Tab value="Python">

```python
import requests

# Basic get
details = requests.get("http://localhost:7777/lume/vms/lume_vm", timeout=50)
print(details.json())

# Get with specific storage
details = requests.get("http://localhost:7777/lume/vms/lume_vm", params={"storage": "ssd"}, timeout=50)
print(details.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
// Basic get
let res = await fetch('http://localhost:7777/lume/vms/lume_vm');
console.log(await res.json());

// Get with specific storage
res = await fetch('http://localhost:7777/lume/vms/lume_vm?storage=ssd');
console.log(await res.json());
```

  </Tab>
</Tabs>

```json
{
  "name": "lume_vm",
  "state": "stopped",
  "os": "macOS",
  "cpu": 2,
  "memory": "4GB",
  "diskSize": "64GB",
  "display": "1024x768",
  "ipAddress": "192.168.65.2",
  "vncPort": 5900,
  "sharedDirectories": [
    {
      "hostPath": "~/Projects",
      "readOnly": false,
      "tag": "com.apple.virtio-fs.automount"
    }
  ]
}
```

---

### Update VM Configuration

Update the configuration of a virtual machine.

`PATCH: /lume/vms/:name`

#### Parameters

| Name     | Type    | Required | Description                           |
| -------- | ------- | -------- | ------------------------------------- |
| cpu      | integer | No       | Number of CPU cores                   |
| memory   | string  | No       | Memory size (e.g. `8GB`)              |
| diskSize | string  | No       | Disk size (e.g. `100GB`)              |
| display  | string  | No       | Display resolution (e.g. `1920x1080`) |
| storage  | string  | No       | Storage type (`ssd`, etc.)            |

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X PATCH \
  -H "Content-Type: application/json" \
  -d '{
    "cpu": 4,
    "memory": "8GB",
    "diskSize": "100GB",
    "display": "1920x1080",
    "storage": "ssd"
  }' \
  http://localhost:7777/lume/vms/lume_vm
```

  </Tab>
  <Tab value="Python">

```python
import requests

payload = {
    "cpu": 4,
    "memory": "8GB",
    "diskSize": "100GB",
    "display": "1920x1080",
    "storage": "ssd"
}
r = requests.patch("http://localhost:7777/lume/vms/lume_vm", json=payload, timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const payload = {
  cpu: 4,
  memory: '8GB',
  diskSize: '100GB',
  display: '1920x1080',
  storage: 'ssd',
};
const res = await fetch('http://localhost:7777/lume/vms/lume_vm', {
  method: 'PATCH',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify(payload),
});
console.log(await res.json());
```

  </Tab>
</Tabs>

---

### Stop VM

Stop a running virtual machine.

`POST: /lume/vms/:name/stop`

#### Parameters

| Name    | Type   | Required | Description                |
| ------- | ------ | -------- | -------------------------- |
| storage | string | No       | Storage type (`ssd`, etc.) |

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
# Basic stop
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X POST \
  http://localhost:7777/lume/vms/lume_vm/stop

# Stop with storage location specified
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X POST \
  http://localhost:7777/lume/vms/lume_vm/stop?storage=ssd
```

  </Tab>
  <Tab value="Python">

```python
import requests

# Basic stop
r = requests.post("http://localhost:7777/lume/vms/lume_vm/stop", timeout=50)
print(r.json())

# Stop with storage location specified
r = requests.post("http://localhost:7777/lume/vms/lume_vm/stop", params={"storage": "ssd"}, timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
// Basic stop
let res = await fetch('http://localhost:7777/lume/vms/lume_vm/stop', {
  method: 'POST',
});
console.log(await res.json());

// Stop with storage location specified
res = await fetch('http://localhost:7777/lume/vms/lume_vm/stop?storage=ssd', {
  method: 'POST',
});
console.log(await res.json());
```

  </Tab>
</Tabs>

---

### Delete VM

Delete a virtual machine instance.

`DELETE: /lume/vms/:name`

#### Parameters

| Name    | Type   | Required | Description                |
| ------- | ------ | -------- | -------------------------- |
| storage | string | No       | Storage type (`ssd`, etc.) |

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
# Basic delete
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X DELETE \
  http://localhost:7777/lume/vms/lume_vm

# Delete with specific storage
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X DELETE \
  http://localhost:7777/lume/vms/lume_vm?storage=ssd
```

  </Tab>
  <Tab value="Python">

```python
import requests

# Basic delete
r = requests.delete("http://localhost:7777/lume/vms/lume_vm", timeout=50)
print(r.status_code)

# Delete with specific storage
r = requests.delete("http://localhost:7777/lume/vms/lume_vm", params={"storage": "ssd"}, timeout=50)
print(r.status_code)
```

  </Tab>
  <Tab value="TypeScript">

```typescript
// Basic delete
let res = await fetch('http://localhost:7777/lume/vms/lume_vm', {
  method: 'DELETE',
});
console.log(res.status);

// Delete with specific storage
res = await fetch('http://localhost:7777/lume/vms/lume_vm?storage=ssd', {
  method: 'DELETE',
});
console.log(res.status);
```

  </Tab>
</Tabs>

---

### Clone VM

Clone an existing virtual machine.

`POST: /lume/vms/clone`

#### Parameters

| Name           | Type   | Required | Description                         |
| -------------- | ------ | -------- | ----------------------------------- |
| name           | string | Yes      | Source VM name                      |
| newName        | string | Yes      | New VM name                         |
| sourceLocation | string | No       | Source storage location (`default`) |
| destLocation   | string | No       | Destination storage location        |

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X POST \
  -H "Content-Type: application/json" \
  -d '{
    "name": "source-vm",
    "newName": "cloned-vm",
    "sourceLocation": "default",
    "destLocation": "ssd"
  }' \
  http://localhost:7777/lume/vms/clone
```

  </Tab>
  <Tab value="Python">

```python
import requests

payload = {
    "name": "source-vm",
    "newName": "cloned-vm",
    "sourceLocation": "default",
    "destLocation": "ssd"
}
r = requests.post("http://localhost:7777/lume/vms/clone", json=payload, timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const payload = {
  name: 'source-vm',
  newName: 'cloned-vm',
  sourceLocation: 'default',
  destLocation: 'ssd',
};
const res = await fetch('http://localhost:7777/lume/vms/clone', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify(payload),
});
console.log(await res.json());
```

  </Tab>
</Tabs>

---

### Pull VM Image

Pull a VM image from a registry.

`POST: /lume/pull`

#### Parameters

| Name         | Type   | Required | Description                           |
| ------------ | ------ | -------- | ------------------------------------- |
| image        | string | Yes      | Image name (e.g. `macos-sequoia-...`) |
| name         | string | No       | VM name for the pulled image          |
| registry     | string | No       | Registry host (e.g. `ghcr.io`)        |
| organization | string | No       | Organization name                     |
| storage      | string | No       | Storage type (`ssd`, etc.)            |

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X POST \
  -H "Content-Type: application/json" \
  -d '{
    "image": "macos-sequoia-vanilla:latest",
    "name": "my-vm-name",
    "registry": "ghcr.io",
    "organization": "trycua",
    "storage": "ssd"
  }' \
  http://localhost:7777/lume/pull
```

  </Tab>
  <Tab value="Python">

```python
import requests

payload = {
    "image": "macos-sequoia-vanilla:latest",
    "name": "my-vm-name",
    "registry": "ghcr.io",
    "organization": "trycua",
    "storage": "ssd"
}
r = requests.post("http://localhost:7777/lume/pull", json=payload, timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const payload = {
  image: 'macos-sequoia-vanilla:latest',
  name: 'my-vm-name',
  registry: 'ghcr.io',
  organization: 'trycua',
  storage: 'ssd',
};
const res = await fetch('http://localhost:7777/lume/pull', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify(payload),
});
console.log(await res.json());
```

  </Tab>
</Tabs>

---

### Push VM Image

Push a VM to a registry as an image (asynchronous operation).

`POST: /lume/vms/push`

#### Parameters

| Name         | Type         | Required | Description                                     |
| ------------ | ------------ | -------- | ----------------------------------------------- |
| name         | string       | Yes      | Local VM name to push                           |
| imageName    | string       | Yes      | Image name in registry                          |
| tags         | array        | Yes      | Image tags (e.g. `["latest", "v1"]`)           |
| organization | string       | Yes      | Organization name                               |
| registry     | string       | No       | Registry host (e.g. `ghcr.io`)                  |
| chunkSizeMb  | integer      | No       | Chunk size in MB for upload                     |
| storage      | string/null  | No       | Storage type (`ssd`, etc.)                      |

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X POST \
  -H "Content-Type: application/json" \
  -d '{
    "name": "my-local-vm", 
    "imageName": "my-image",
    "tags": ["latest", "v1"],
    "organization": "my-org", 
    "registry": "ghcr.io",
    "chunkSizeMb": 512,
    "storage": null 
  }' \
  http://localhost:7777/lume/vms/push
```

  </Tab>
  <Tab value="Python">

```python
import requests

payload = {
    "name": "my-local-vm",
    "imageName": "my-image",
    "tags": ["latest", "v1"],
    "organization": "my-org",
    "registry": "ghcr.io",
    "chunkSizeMb": 512,
    "storage": None
}
r = requests.post("http://localhost:7777/lume/vms/push", json=payload, timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const payload = {
  name: 'my-local-vm',
  imageName: 'my-image',
  tags: ['latest', 'v1'],
  organization: 'my-org',
  registry: 'ghcr.io',
  chunkSizeMb: 512,
  storage: null,
};
const res = await fetch('http://localhost:7777/lume/vms/push', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify(payload),
});
console.log(await res.json());
```

  </Tab>
</Tabs>

**Response (202 Accepted):**

```json
{
  "message": "Push initiated in background",
  "name": "my-local-vm",
  "imageName": "my-image",
  "tags": [
    "latest",
    "v1"
  ]
}
```

---

### List Images

List available VM images.

`GET: /lume/images`

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  http://localhost:7777/lume/images
```

  </Tab>
  <Tab value="Python">

```python
import requests

r = requests.get("http://localhost:7777/lume/images", timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const res = await fetch('http://localhost:7777/lume/images');
console.log(await res.json());
```

  </Tab>
</Tabs>

```json
{
  "local": [
    "macos-sequoia-xcode:latest",
    "macos-sequoia-vanilla:latest"
  ]
}
```

---

### Prune Images

Remove unused VM images to free up disk space.

`POST: /lume/prune`

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X POST \
  http://localhost:7777/lume/prune
```

  </Tab>
  <Tab value="Python">

```python
import requests

r = requests.post("http://localhost:7777/lume/prune", timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const res = await fetch('http://localhost:7777/lume/prune', {
  method: 'POST',
});
console.log(await res.json());
```

  </Tab>
</Tabs>

---

### Get Latest IPSW URL

Get the URL for the latest macOS IPSW file.

`GET: /lume/ipsw`

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  http://localhost:7777/lume/ipsw
```

  </Tab>
  <Tab value="Python">

```python
import requests

r = requests.get("http://localhost:7777/lume/ipsw", timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const res = await fetch('http://localhost:7777/lume/ipsw');
console.log(await res.json());
```

  </Tab>
</Tabs>

---

## Configuration Management

### Get Configuration

Get current Lume configuration settings.

`GET: /lume/config`

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  http://localhost:7777/lume/config
```

  </Tab>
  <Tab value="Python">

```python
import requests

r = requests.get("http://localhost:7777/lume/config", timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const res = await fetch('http://localhost:7777/lume/config');
console.log(await res.json());
```

  </Tab>
</Tabs>

```json
{
  "homeDirectory": "~/.lume",
  "cacheDirectory": "~/.lume/cache",
  "cachingEnabled": true
}
```

### Update Configuration

Update Lume configuration settings.

`POST: /lume/config`

#### Parameters

| Name            | Type    | Required | Description                      |
| --------------- | ------- | -------- | -------------------------------- |
| homeDirectory   | string  | No       | Lume home directory path         |
| cacheDirectory  | string  | No       | Cache directory path             |
| cachingEnabled  | boolean | No       | Enable or disable caching        |

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X POST \
  -H "Content-Type: application/json" \
  -d '{
    "homeDirectory": "~/custom/lume",
    "cacheDirectory": "~/custom/lume/cache",
    "cachingEnabled": true
  }' \
  http://localhost:7777/lume/config
```

  </Tab>
  <Tab value="Python">

```python
import requests

payload = {
    "homeDirectory": "~/custom/lume",
    "cacheDirectory": "~/custom/lume/cache",
    "cachingEnabled": True
}
r = requests.post("http://localhost:7777/lume/config", json=payload, timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const payload = {
  homeDirectory: '~/custom/lume',
  cacheDirectory: '~/custom/lume/cache',
  cachingEnabled: true,
};
const res = await fetch('http://localhost:7777/lume/config', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify(payload),
});
console.log(await res.json());
```

  </Tab>
</Tabs>

---

## Storage Location Management

### Get VM Storage Locations

List all configured VM storage locations.

`GET: /lume/config/locations`

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  http://localhost:7777/lume/config/locations
```

  </Tab>
  <Tab value="Python">

```python
import requests

r = requests.get("http://localhost:7777/lume/config/locations", timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const res = await fetch('http://localhost:7777/lume/config/locations');
console.log(await res.json());
```

  </Tab>
</Tabs>

```json
[
  {
    "name": "default",
    "path": "~/.lume/vms",
    "isDefault": true
  },
  {
    "name": "ssd",
    "path": "/Volumes/SSD/lume/vms",
    "isDefault": false
  }
]
```

### Add VM Storage Location

Add a new VM storage location.

`POST: /lume/config/locations`

#### Parameters

| Name | Type   | Required | Description                  |
| ---- | ------ | -------- | ---------------------------- |
| name | string | Yes      | Storage location name        |
| path | string | Yes      | File system path for storage |

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X POST \
  -H "Content-Type: application/json" \
  -d '{
    "name": "ssd",
    "path": "/Volumes/SSD/lume/vms"
  }' \
  http://localhost:7777/lume/config/locations
```

  </Tab>
  <Tab value="Python">

```python
import requests

payload = {
    "name": "ssd",
    "path": "/Volumes/SSD/lume/vms"
}
r = requests.post("http://localhost:7777/lume/config/locations", json=payload, timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const payload = {
  name: 'ssd',
  path: '/Volumes/SSD/lume/vms',
};
const res = await fetch('http://localhost:7777/lume/config/locations', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify(payload),
});
console.log(await res.json());
```

  </Tab>
</Tabs>

### Remove VM Storage Location

Remove a VM storage location.

`DELETE: /lume/config/locations/:name`

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X DELETE \
  http://localhost:7777/lume/config/locations/ssd
```

  </Tab>
  <Tab value="Python">

```python
import requests

r = requests.delete("http://localhost:7777/lume/config/locations/ssd", timeout=50)
print(r.status_code)
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const res = await fetch('http://localhost:7777/lume/config/locations/ssd', {
  method: 'DELETE',
});
console.log(res.status);
```

  </Tab>
</Tabs>

### Set Default VM Storage Location

Set a storage location as the default.

`POST: /lume/config/locations/default/:name`

#### Example Request

<Tabs groupId="language" persist items={['Curl', 'Python', 'TypeScript']}>
  <Tab value="Curl">

```bash
curl --connect-timeout 6000 \
  --max-time 5000 \
  -X POST \
  http://localhost:7777/lume/config/locations/default/ssd
```

  </Tab>
  <Tab value="Python">

```python
import requests

r = requests.post("http://localhost:7777/lume/config/locations/default/ssd", timeout=50)
print(r.json())
```

  </Tab>
  <Tab value="TypeScript">

```typescript
const res = await fetch('http://localhost:7777/lume/config/locations/default/ssd', {
  method: 'POST',
});
console.log(await res.json());
```

  </Tab>
</Tabs>

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/main.py:
--------------------------------------------------------------------------------

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, HTTPException, Header
from fastapi.responses import StreamingResponse, JSONResponse
from typing import List, Dict, Any, Optional, Union, Literal, cast
import uvicorn
import logging
import asyncio
import json
import traceback
import inspect
from contextlib import redirect_stdout, redirect_stderr
from io import StringIO
from .handlers.factory import HandlerFactory
import os
import aiohttp
import hashlib
import time
import platform
from fastapi.middleware.cors import CORSMiddleware

# Authentication session TTL (in seconds). Override via env var CUA_AUTH_TTL_SECONDS. Default: 60s
AUTH_SESSION_TTL_SECONDS: int = int(os.environ.get("CUA_AUTH_TTL_SECONDS", "60"))

try:
    from agent import ComputerAgent
    HAS_AGENT = True
except ImportError:
    HAS_AGENT = False

# Set up logging with more detail
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Configure WebSocket with larger message size
WEBSOCKET_MAX_SIZE = 1024 * 1024 * 10  # 10MB limit

# Configure application with WebSocket settings
app = FastAPI(
    title="Computer API",
    description="API for the Computer project",
    version="0.1.0",
    websocket_max_size=WEBSOCKET_MAX_SIZE,
)

# CORS configuration
origins = ["*"]
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

protocol_version = 1
try:
    from importlib.metadata import version
    package_version = version("cua-computer-server")
except Exception:
    # Fallback for cases where package is not installed or importlib.metadata is not available
    try:
        import pkg_resources
        package_version = pkg_resources.get_distribution("cua-computer-server").version
    except Exception:
        package_version = "unknown"

accessibility_handler, automation_handler, diorama_handler, file_handler = HandlerFactory.create_handlers()
handlers = {
    "version": lambda: {"protocol": protocol_version, "package": package_version},
    # App-Use commands
    "diorama_cmd": diorama_handler.diorama_cmd,
    # Accessibility commands
    "get_accessibility_tree": accessibility_handler.get_accessibility_tree,
    "find_element": accessibility_handler.find_element,
    # Shell commands
    "run_command": automation_handler.run_command,
    # File system commands
    "file_exists": file_handler.file_exists,
    "directory_exists": file_handler.directory_exists,
    "list_dir": file_handler.list_dir,
    "read_text": file_handler.read_text,
    "write_text": file_handler.write_text,
    "read_bytes": file_handler.read_bytes,
    "write_bytes": file_handler.write_bytes,
    "get_file_size": file_handler.get_file_size,
    "delete_file": file_handler.delete_file,
    "create_dir": file_handler.create_dir,
    "delete_dir": file_handler.delete_dir,
    # Mouse commands
    "mouse_down": automation_handler.mouse_down,
    "mouse_up": automation_handler.mouse_up,
    "left_click": automation_handler.left_click,
    "right_click": automation_handler.right_click,
    "double_click": automation_handler.double_click,
    "move_cursor": automation_handler.move_cursor,
    "drag_to": automation_handler.drag_to,
    "drag": automation_handler.drag,
    # Keyboard commands
    "key_down": automation_handler.key_down,
    "key_up": automation_handler.key_up,
    "type_text": automation_handler.type_text,
    "press_key": automation_handler.press_key,
    "hotkey": automation_handler.hotkey,
    # Scrolling actions
    "scroll": automation_handler.scroll,
    "scroll_down": automation_handler.scroll_down,
    "scroll_up": automation_handler.scroll_up,
    # Screen actions
    "screenshot": automation_handler.screenshot,
    "get_cursor_position": automation_handler.get_cursor_position,
    "get_screen_size": automation_handler.get_screen_size,
    # Clipboard actions
    "copy_to_clipboard": automation_handler.copy_to_clipboard,
    "set_clipboard": automation_handler.set_clipboard,
}


class AuthenticationManager:
    def __init__(self):
        self.sessions: Dict[str, Dict[str, Any]] = {}
        self.container_name = os.environ.get("CONTAINER_NAME")
    
    def _hash_credentials(self, container_name: str, api_key: str) -> str:
        """Create a hash of container name and API key for session identification"""
        combined = f"{container_name}:{api_key}"
        return hashlib.sha256(combined.encode()).hexdigest()
    
    def _is_session_valid(self, session_data: Dict[str, Any]) -> bool:
        """Check if a session is still valid based on expiration time"""
        if not session_data.get('valid', False):
            return False
        
        expires_at = session_data.get('expires_at', 0)
        return time.time() < expires_at
    
    async def auth(self, container_name: str, api_key: str) -> bool:
        """Authenticate container name and API key, using cached sessions when possible"""
        # If no CONTAINER_NAME is set, always allow access (local development)
        if not self.container_name:
            logger.info("No CONTAINER_NAME set in environment. Allowing access (local development mode)")
            return True
        
        # Layer 1: VM Identity Verification
        if container_name != self.container_name:
            logger.warning(f"VM name mismatch. Expected: {self.container_name}, Got: {container_name}")
            return False
        
        # Create hash for session lookup
        session_hash = self._hash_credentials(container_name, api_key)
        
        # Check if we have a valid cached session
        if session_hash in self.sessions:
            session_data = self.sessions[session_hash]
            if self._is_session_valid(session_data):
                logger.info(f"Using cached authentication for container: {container_name}")
                return session_data['valid']
            else:
                # Remove expired session
                del self.sessions[session_hash]
        
        # No valid cached session, authenticate with API
        logger.info(f"Authenticating with TryCUA API for container: {container_name}")
        
        try:
            async with aiohttp.ClientSession() as session:
                headers = {
                    "Authorization": f"Bearer {api_key}"
                }
                
                async with session.get(
                    f"https://www.trycua.com/api/vm/auth?container_name={container_name}",
                    headers=headers,
                ) as resp:
                    is_valid = resp.status == 200 and bool((await resp.text()).strip())
                    
                    # Cache the result with configurable expiration
                    self.sessions[session_hash] = {
                        'valid': is_valid,
                        'expires_at': time.time() + AUTH_SESSION_TTL_SECONDS
                    }
                    
                    if is_valid:
                        logger.info(f"Authentication successful for container: {container_name}")
                    else:
                        logger.warning(f"Authentication failed for container: {container_name}. Status: {resp.status}")
                    
                    return is_valid
        
        except aiohttp.ClientError as e:
            logger.error(f"Failed to validate API key with TryCUA API: {str(e)}")
            # Cache failed result to avoid repeated requests
            self.sessions[session_hash] = {
                'valid': False,
                'expires_at': time.time() + AUTH_SESSION_TTL_SECONDS
            }
            return False
        except Exception as e:
            logger.error(f"Unexpected error during authentication: {str(e)}")
            # Cache failed result to avoid repeated requests
            self.sessions[session_hash] = {
                'valid': False,
                'expires_at': time.time() + AUTH_SESSION_TTL_SECONDS
            }
            return False


class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)


manager = ConnectionManager()
auth_manager = AuthenticationManager()

@app.get("/status")
async def status():
    sys = platform.system().lower()
    # get os type
    if "darwin" in sys or sys == "macos" or sys == "mac":
        os_type = "macos"
    elif "windows" in sys:
        os_type = "windows"
    else:
        os_type = "linux"
    # get computer-server features
    features = []
    if HAS_AGENT:
        features.append("agent")
    return {"status": "ok", "os_type": os_type, "features": features}

@app.websocket("/ws", name="websocket_endpoint")
async def websocket_endpoint(websocket: WebSocket):
    global handlers

    # WebSocket message size is configured at the app or endpoint level, not on the instance
    await manager.connect(websocket)
    
    # Check if CONTAINER_NAME is set (indicating cloud provider)
    server_container_name = os.environ.get("CONTAINER_NAME")
    
    # If cloud provider, perform authentication handshake
    if server_container_name:
        try:
            logger.info(f"Cloud provider detected. CONTAINER_NAME: {server_container_name}. Waiting for authentication...")
            
            # Wait for authentication message
            auth_data = await websocket.receive_json()
            
            # Validate auth message format
            if auth_data.get("command") != "authenticate":
                await websocket.send_json({
                    "success": False,
                    "error": "First message must be authentication"
                })
                await websocket.close()
                manager.disconnect(websocket)
                return
            
            # Extract credentials
            client_api_key = auth_data.get("params", {}).get("api_key")
            client_container_name = auth_data.get("params", {}).get("container_name")
            
            # Validate credentials using AuthenticationManager
            if not client_api_key:
                await websocket.send_json({
                    "success": False,
                    "error": "API key required"
                })
                await websocket.close()
                manager.disconnect(websocket)
                return
            
            if not client_container_name:
                await websocket.send_json({
                    "success": False,
                    "error": "Container name required"
                })
                await websocket.close()
                manager.disconnect(websocket)
                return
            
            # Use AuthenticationManager for validation
            is_authenticated = await auth_manager.auth(client_container_name, client_api_key)
            if not is_authenticated:
                await websocket.send_json({
                    "success": False,
                    "error": "Authentication failed"
                })
                await websocket.close()
                manager.disconnect(websocket)
                return
            
            logger.info(f"Authentication successful for VM: {client_container_name}")
            await websocket.send_json({
                "success": True,
                "message": "Authentication successful"
            })
        
        except Exception as e:
            logger.error(f"Error during authentication handshake: {str(e)}")
            await websocket.send_json({
                "success": False,
                "error": "Authentication failed"
            })
            await websocket.close()
            manager.disconnect(websocket)
            return

    try:
        while True:
            try:
                data = await websocket.receive_json()
                command = data.get("command")
                params = data.get("params", {})

                if command not in handlers:
                    await websocket.send_json(
                        {"success": False, "error": f"Unknown command: {command}"}
                    )
                    continue

                try:
                    # Filter params to only include those accepted by the handler function
                    handler_func = handlers[command]
                    sig = inspect.signature(handler_func)
                    filtered_params = {k: v for k, v in params.items() if k in sig.parameters}
                    
                    # Handle both sync and async functions
                    if asyncio.iscoroutinefunction(handler_func):
                        result = await handler_func(**filtered_params)
                    else:
                        # Run sync functions in thread pool to avoid blocking event loop
                        result = await asyncio.to_thread(handler_func, **filtered_params)
                    await websocket.send_json({"success": True, **result})
                except Exception as cmd_error:
                    logger.error(f"Error executing command {command}: {str(cmd_error)}")
                    logger.error(traceback.format_exc())
                    await websocket.send_json({"success": False, "error": str(cmd_error)})

            except WebSocketDisconnect:
                raise
            except json.JSONDecodeError as json_err:
                logger.error(f"JSON decode error: {str(json_err)}")
                await websocket.send_json(
                    {"success": False, "error": f"Invalid JSON: {str(json_err)}"}
                )
            except Exception as loop_error:
                logger.error(f"Error in message loop: {str(loop_error)}")
                logger.error(traceback.format_exc())
                await websocket.send_json({"success": False, "error": str(loop_error)})

    except WebSocketDisconnect:
        logger.info("Client disconnected")
        manager.disconnect(websocket)
    except Exception as e:
        logger.error(f"Fatal error in websocket connection: {str(e)}")
        logger.error(traceback.format_exc())
        try:
            await websocket.close()
        except:
            pass
        manager.disconnect(websocket)

@app.post("/cmd")
async def cmd_endpoint(
    request: Request,
    container_name: Optional[str] = Header(None, alias="X-Container-Name"),
    api_key: Optional[str] = Header(None, alias="X-API-Key")
):
    """
    Backup endpoint for when WebSocket connections fail.
    Accepts commands via HTTP POST with streaming response.
    
    Headers:
    - X-Container-Name: Container name for cloud authentication
    - X-API-Key: API key for cloud authentication
    
    Body:
    {
        "command": "command_name",
        "params": {...}
    }
    """
    global handlers
    
    # Parse request body
    try:
        body = await request.json()
        command = body.get("command")
        params = body.get("params", {})
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Invalid JSON body: {str(e)}")
    
    if not command:
        raise HTTPException(status_code=400, detail="Command is required")
    
    # Check if CONTAINER_NAME is set (indicating cloud provider)
    server_container_name = os.environ.get("CONTAINER_NAME")
    
    # If cloud provider, perform authentication
    if server_container_name:
        logger.info(f"Cloud provider detected. CONTAINER_NAME: {server_container_name}. Performing authentication...")
        
        # Validate required headers
        if not container_name:
            raise HTTPException(status_code=401, detail="Container name required")
        
        if not api_key:
            raise HTTPException(status_code=401, detail="API key required")
        
        # Validate with AuthenticationManager
        is_authenticated = await auth_manager.auth(container_name, api_key)
        if not is_authenticated:
            raise HTTPException(status_code=401, detail="Authentication failed")
    
    if command not in handlers:
        raise HTTPException(status_code=400, detail=f"Unknown command: {command}")
    
    async def generate_response():
        """Generate streaming response for the command execution"""
        try:
            # Filter params to only include those accepted by the handler function
            handler_func = handlers[command]
            sig = inspect.signature(handler_func)
            filtered_params = {k: v for k, v in params.items() if k in sig.parameters}
            
            # Handle both sync and async functions
            if asyncio.iscoroutinefunction(handler_func):
                result = await handler_func(**filtered_params)
            else:
                # Run sync functions in thread pool to avoid blocking event loop
                result = await asyncio.to_thread(handler_func, **filtered_params)
            
            # Stream the successful result
            response_data = {"success": True, **result}
            yield f"data: {json.dumps(response_data)}\n\n"
            
        except Exception as cmd_error:
            logger.error(f"Error executing command {command}: {str(cmd_error)}")
            logger.error(traceback.format_exc())
            
            # Stream the error result
            error_data = {"success": False, "error": str(cmd_error)}
            yield f"data: {json.dumps(error_data)}\n\n"
    
    return StreamingResponse(
        generate_response(),
        media_type="text/plain",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

@app.post("/responses")
async def agent_response_endpoint(
    request: Request,
    api_key: Optional[str] = Header(None, alias="X-API-Key"),
):
    """
    Minimal proxy to run ComputerAgent for up to 2 turns.

    Security:
    - If CONTAINER_NAME is set on the server, require X-API-Key
      and validate using AuthenticationManager unless CUA_ENABLE_PUBLIC_PROXY is true.

    Body JSON:
    {
      "model": "...",                 # required
      "input": "... or messages[]",   # required
      "agent_kwargs": { ... },         # optional, passed directly to ComputerAgent
      "env": { ... }                   # optional env overrides for agent
    }
    """
    if not HAS_AGENT:
        raise HTTPException(status_code=501, detail="ComputerAgent not available")
    
    # Authenticate via AuthenticationManager if running in cloud (CONTAINER_NAME set)
    container_name = os.environ.get("CONTAINER_NAME")
    if container_name:
        is_public = os.environ.get("CUA_ENABLE_PUBLIC_PROXY", "").lower().strip() in ["1", "true", "yes", "y", "on"]
        if not is_public:
            if not api_key:
                raise HTTPException(status_code=401, detail="Missing AGENT PROXY auth headers")
            ok = await auth_manager.auth(container_name, api_key)
            if not ok:
                raise HTTPException(status_code=401, detail="Unauthorized")

    # Parse request body
    try:
        body = await request.json()
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Invalid JSON body: {str(e)}")

    model = body.get("model")
    input_data = body.get("input")
    if not model or input_data is None:
        raise HTTPException(status_code=400, detail="'model' and 'input' are required")

    agent_kwargs: Dict[str, Any] = body.get("agent_kwargs") or {}
    env_overrides: Dict[str, str] = body.get("env") or {}

    # Simple env override context
    class _EnvOverride:
        def __init__(self, overrides: Dict[str, str]):
            self.overrides = overrides
            self._original: Dict[str, Optional[str]] = {}
        def __enter__(self):
            for k, v in (self.overrides or {}).items():
                self._original[k] = os.environ.get(k)
                os.environ[k] = str(v)
        def __exit__(self, exc_type, exc, tb):
            for k, old in self._original.items():
                if old is None:
                    os.environ.pop(k, None)
                else:
                    os.environ[k] = old

    # Convert input to messages
    def _to_messages(data: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
        if isinstance(data, str):
            return [{"role": "user", "content": data}]
        if isinstance(data, list):
            return data

    messages = _to_messages(input_data)

    # Define a direct computer tool that implements the AsyncComputerHandler protocol
    # and delegates to our existing automation/file/accessibility handlers.
    from agent.computers import AsyncComputerHandler  # runtime-checkable Protocol

    class DirectComputer(AsyncComputerHandler):
        def __init__(self):
            # use module-scope handler singletons created by HandlerFactory
            self._auto = automation_handler
            self._file = file_handler
            self._access = accessibility_handler

        async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]:
            sys = platform.system().lower()
            if "darwin" in sys or sys in ("macos", "mac"):
                return "mac"
            if "windows" in sys:
                return "windows"
            return "linux"

        async def get_dimensions(self) -> tuple[int, int]:
            size = await self._auto.get_screen_size()
            return size["width"], size["height"]

        async def screenshot(self) -> str:
            img_b64 = await self._auto.screenshot()
            return img_b64["image_data"]

        async def click(self, x: int, y: int, button: str = "left") -> None:
            if button == "left":
                await self._auto.left_click(x, y)
            elif button == "right":
                await self._auto.right_click(x, y)
            else:
                await self._auto.left_click(x, y)

        async def double_click(self, x: int, y: int) -> None:
            await self._auto.double_click(x, y)

        async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
            await self._auto.move_cursor(x, y)
            await self._auto.scroll(scroll_x, scroll_y)

        async def type(self, text: str) -> None:
            await self._auto.type_text(text)

        async def wait(self, ms: int = 1000) -> None:
            await asyncio.sleep(ms / 1000.0)

        async def move(self, x: int, y: int) -> None:
            await self._auto.move_cursor(x, y)

        async def keypress(self, keys: Union[List[str], str]) -> None:
            if isinstance(keys, str):
                parts = keys.replace("-", "+").split("+") if len(keys) > 1 else [keys]
            else:
                parts = keys
            if len(parts) == 1:
                await self._auto.press_key(parts[0])
            else:
                await self._auto.hotkey(parts)

        async def drag(self, path: List[Dict[str, int]]) -> None:
            if not path:
                return
            start = path[0]
            await self._auto.mouse_down(start["x"], start["y"])
            for pt in path[1:]:
                await self._auto.move_cursor(pt["x"], pt["y"]) 
            end = path[-1]
            await self._auto.mouse_up(end["x"], end["y"]) 

        async def get_current_url(self) -> str:
            # Not available in this server context
            return ""

        async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
            await self._auto.mouse_down(x, y, button="left")

        async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
            await self._auto.mouse_up(x, y, button="left")

    # # Inline image URLs to base64
    # import base64, mimetypes, requests
    # # Use a browser-like User-Agent to avoid 403s from some CDNs (e.g., Wikimedia)
    # HEADERS = {
    #     "User-Agent": (
    #         "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
    #         "AppleWebKit/537.36 (KHTML, like Gecko) "
    #         "Chrome/124.0.0.0 Safari/537.36"
    #     )
    # }
    # def _to_data_url(content_bytes: bytes, url: str, resp: requests.Response) -> str:
    #     ctype = resp.headers.get("Content-Type") or mimetypes.guess_type(url)[0] or "application/octet-stream"
    #     b64 = base64.b64encode(content_bytes).decode("utf-8")
    #     return f"data:{ctype};base64,{b64}"
    # def inline_image_urls(messages):
    #     # messages: List[{"role": "...","content":[...]}]
    #     out = []
    #     for m in messages:
    #         if not isinstance(m.get("content"), list):
    #             out.append(m)
    #             continue
    #         new_content = []
    #         for part in (m.get("content") or []):
    #             if part.get("type") == "input_image" and (url := part.get("image_url")):
    #                 resp = requests.get(url, headers=HEADERS, timeout=30)
    #                 resp.raise_for_status()
    #                 new_content.append({
    #                     "type": "input_image",
    #                     "image_url": _to_data_url(resp.content, url, resp)
    #                 })
    #             else:
    #                 new_content.append(part)
    #         out.append({**m, "content": new_content})
    #     return out
    # messages = inline_image_urls(messages)

    error = None

    with _EnvOverride(env_overrides):
        # Prepare tools: if caller did not pass tools, inject our DirectComputer
        tools = agent_kwargs.get("tools")
        if not tools:
            tools = [DirectComputer()]
            agent_kwargs = {**agent_kwargs, "tools": tools}
        # Instantiate agent with our tools
        agent = ComputerAgent(model=model, **agent_kwargs)  # type: ignore[arg-type]

        total_output: List[Any] = []
        total_usage: Dict[str, Any] = {}

        pending_computer_call_ids = set()
        try:
            async for result in agent.run(messages):
                total_output += result["output"]
                # Try to collect usage if present
                if isinstance(result, dict) and "usage" in result and isinstance(result["usage"], dict):
                    # Merge usage counters
                    for k, v in result["usage"].items():
                        if isinstance(v, (int, float)):
                            total_usage[k] = total_usage.get(k, 0) + v
                        else:
                            total_usage[k] = v
                for msg in result.get("output", []):
                    if msg.get("type") == "computer_call":
                        pending_computer_call_ids.add(msg["call_id"])
                    elif msg.get("type") == "computer_call_output":
                        pending_computer_call_ids.discard(msg["call_id"])
                # exit if no pending computer calls
                if not pending_computer_call_ids:
                    break
        except Exception as e:
            logger.error(f"Error running agent: {str(e)}")
            logger.error(traceback.format_exc())
            error = str(e)
    
    # Build response payload
    payload = {
        "model": model,
        "error": error,
        "output": total_output,
        "usage": total_usage,
        "status": "completed" if not error else "failed"
    }

    # CORS: allow any origin
    headers = {
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
    }

    return JSONResponse(content=payload, headers=headers)


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

```

--------------------------------------------------------------------------------
/libs/lume/src/Server/Handlers.swift:
--------------------------------------------------------------------------------

```swift
import ArgumentParser
import Foundation
import Virtualization

@MainActor
extension Server {
    // MARK: - VM Management Handlers

    func handleListVMs(storage: String? = nil) async throws -> HTTPResponse {
        do {
            let vmController = LumeController()
            let vms = try vmController.list(storage: storage)
            return try .json(vms)
        } catch {
            print(
                "ERROR: Failed to list VMs: \(error.localizedDescription), storage=\(String(describing: storage))"
            )
            return .badRequest(message: error.localizedDescription)
        }
    }

    func handleGetVM(name: String, storage: String? = nil) async throws -> HTTPResponse {
        print("Getting VM details: name=\(name), storage=\(String(describing: storage))")

        do {
            let vmController = LumeController()
            print("Created VM controller, attempting to get VM")
            let vm = try vmController.get(name: name, storage: storage)
            print("Successfully retrieved VM")

            // Check for nil values that might cause crashes
            if vm.vmDirContext.config.macAddress == nil {
                print("ERROR: VM has nil macAddress")
                return .badRequest(message: "VM configuration is invalid (nil macAddress)")
            }
            print("MacAddress check passed")

            // Log that we're about to access details
            print("Preparing VM details response")

            // Print the full details object for debugging
            let details = vm.details
            print("VM DETAILS: \(details)")
            print("  name: \(details.name)")
            print("  os: \(details.os)")
            print("  cpuCount: \(details.cpuCount)")
            print("  memorySize: \(details.memorySize)")
            print("  diskSize: \(details.diskSize)")
            print("  display: \(details.display)")
            print("  status: \(details.status)")
            print("  vncUrl: \(String(describing: details.vncUrl))")
            print("  ipAddress: \(String(describing: details.ipAddress))")
            print("  locationName: \(details.locationName)")

            // Serialize the VM details
            print("About to serialize VM details")
            let response = try HTTPResponse.json(vm.details)
            print("Successfully serialized VM details")
            return response

        } catch {
            // This will catch errors from both vmController.get and the json serialization
            print("ERROR: Failed to get VM details: \(error.localizedDescription)")
            return .badRequest(message: error.localizedDescription)
        }
    }

    func handleCreateVM(_ body: Data?) async throws -> HTTPResponse {
        guard let body = body,
            let request = try? JSONDecoder().decode(CreateVMRequest.self, from: body)
        else {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
            )
        }

        do {
            let sizes = try request.parse()
            let vmController = LumeController()
            try await vmController.create(
                name: request.name,
                os: request.os,
                diskSize: sizes.diskSize,
                cpuCount: request.cpu,
                memorySize: sizes.memory,
                display: request.display,
                ipsw: request.ipsw,
                storage: request.storage
            )

            return HTTPResponse(
                statusCode: .ok,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode([
                    "message": "VM created successfully", "name": request.name,
                ])
            )
        } catch {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    func handleDeleteVM(name: String, storage: String? = nil) async throws -> HTTPResponse {
        do {
            let vmController = LumeController()
            try await vmController.delete(name: name, storage: storage)
            return HTTPResponse(
                statusCode: .ok, headers: ["Content-Type": "application/json"], body: Data())
        } catch {
            return HTTPResponse(
                statusCode: .badRequest, headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription)))
        }
    }

    func handleCloneVM(_ body: Data?) async throws -> HTTPResponse {
        guard let body = body,
            let request = try? JSONDecoder().decode(CloneRequest.self, from: body)
        else {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
            )
        }

        do {
            let vmController = LumeController()
            try vmController.clone(
                name: request.name,
                newName: request.newName,
                sourceLocation: request.sourceLocation,
                destLocation: request.destLocation
            )

            return HTTPResponse(
                statusCode: .ok,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode([
                    "message": "VM cloned successfully",
                    "source": request.name,
                    "destination": request.newName,
                ])
            )
        } catch {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    // MARK: - VM Operation Handlers

    func handleSetVM(name: String, body: Data?) async throws -> HTTPResponse {
        guard let body = body,
            let request = try? JSONDecoder().decode(SetVMRequest.self, from: body)
        else {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
            )
        }

        do {
            let vmController = LumeController()
            let sizes = try request.parse()
            try vmController.updateSettings(
                name: name,
                cpu: request.cpu,
                memory: sizes.memory,
                diskSize: sizes.diskSize,
                display: sizes.display?.string,
                storage: request.storage
            )

            return HTTPResponse(
                statusCode: .ok,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(["message": "VM settings updated successfully"])
            )
        } catch {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    func handleStopVM(name: String, storage: String? = nil) async throws -> HTTPResponse {
        Logger.info(
            "Stopping VM", metadata: ["name": name, "storage": String(describing: storage)])

        do {
            Logger.info("Creating VM controller", metadata: ["name": name])
            let vmController = LumeController()

            Logger.info("Calling stopVM on controller", metadata: ["name": name])
            try await vmController.stopVM(name: name, storage: storage)

            Logger.info(
                "VM stopped, waiting 5 seconds for locks to clear", metadata: ["name": name])

            // Add a delay to ensure locks are fully released before returning
            for i in 1...5 {
                try? await Task.sleep(nanoseconds: 1_000_000_000)
                Logger.info("Lock clearing delay", metadata: ["name": name, "seconds": "\(i)/5"])
            }

            // Verify the VM is really in a stopped state
            Logger.info("Verifying VM is stopped", metadata: ["name": name])
            let vm = try? vmController.get(name: name, storage: storage)
            if let vm = vm, vm.details.status == "running" {
                Logger.info(
                    "VM still reports as running despite stop operation",
                    metadata: ["name": name, "severity": "warning"])
            } else {
                Logger.info(
                    "Verification complete: VM is in stopped state", metadata: ["name": name])
            }

            Logger.info("Returning successful response", metadata: ["name": name])
            return HTTPResponse(
                statusCode: .ok,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(["message": "VM stopped successfully"])
            )
        } catch {
            Logger.error(
                "Failed to stop VM",
                metadata: [
                    "name": name,
                    "error": error.localizedDescription,
                    "storage": String(describing: storage),
                ])
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    func handleRunVM(name: String, body: Data?) async throws -> HTTPResponse {
        Logger.info("Running VM", metadata: ["name": name])

        // Log the raw body data if available
        if let body = body, let bodyString = String(data: body, encoding: .utf8) {
            Logger.info("Run VM raw request body", metadata: ["name": name, "body": bodyString])
        } else {
            Logger.info("No request body or could not decode as string", metadata: ["name": name])
        }

        do {
            Logger.info("Creating VM controller and parsing request", metadata: ["name": name])
            let request =
                body.flatMap { try? JSONDecoder().decode(RunVMRequest.self, from: $0) }
                ?? RunVMRequest(
                    noDisplay: nil, sharedDirectories: nil, recoveryMode: nil, storage: nil)

            Logger.info(
                "Parsed request",
                metadata: [
                    "name": name,
                    "noDisplay": String(describing: request.noDisplay),
                    "sharedDirectories": "\(request.sharedDirectories?.count ?? 0)",
                    "storage": String(describing: request.storage),
                ])

            Logger.info("Parsing shared directories", metadata: ["name": name])
            let dirs = try request.parse()
            Logger.info(
                "Successfully parsed shared directories",
                metadata: ["name": name, "count": "\(dirs.count)"])

            // Start VM in background
            Logger.info("Starting VM in background", metadata: ["name": name])
            startVM(
                name: name,
                noDisplay: request.noDisplay ?? false,
                sharedDirectories: dirs,
                recoveryMode: request.recoveryMode ?? false,
                storage: request.storage
            )
            Logger.info("VM start initiated in background", metadata: ["name": name])

            // Return response immediately
            return HTTPResponse(
                statusCode: .accepted,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode([
                    "message": "VM start initiated",
                    "name": name,
                    "status": "pending",
                ])
            )
        } catch {
            Logger.error(
                "Failed to run VM",
                metadata: [
                    "name": name,
                    "error": error.localizedDescription,
                ])
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    // MARK: - Image Management Handlers

    func handleIPSW() async throws -> HTTPResponse {
        do {
            let vmController = LumeController()
            let url = try await vmController.getLatestIPSWURL()
            return HTTPResponse(
                statusCode: .ok,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(["url": url.absoluteString])
            )
        } catch {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    func handlePull(_ body: Data?) async throws -> HTTPResponse {
        guard let body = body,
            let request = try? JSONDecoder().decode(PullRequest.self, from: body)
        else {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
            )
        }

        do {
            let vmController = LumeController()
            try await vmController.pullImage(
                image: request.image,
                name: request.name,
                registry: request.registry,
                organization: request.organization,
                storage: request.storage
            )

            return HTTPResponse(
                statusCode: .ok,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode([
                    "message": "Image pulled successfully",
                    "image": request.image,
                    "name": request.name ?? "default",
                ])
            )
        } catch {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    func handlePruneImages() async throws -> HTTPResponse {
        do {
            let vmController = LumeController()
            try await vmController.pruneImages()
            return HTTPResponse(
                statusCode: .ok,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(["message": "Successfully removed cached images"])
            )
        } catch {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    func handlePush(_ body: Data?) async throws -> HTTPResponse {
        guard let body = body,
            let request = try? JSONDecoder().decode(PushRequest.self, from: body)
        else {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
            )
        }

        // Trigger push asynchronously, return Accepted immediately
        Task.detached { @MainActor @Sendable in
            do {
                let vmController = LumeController()
                try await vmController.pushImage(
                    name: request.name,
                    imageName: request.imageName,
                    tags: request.tags,
                    registry: request.registry,
                    organization: request.organization,
                    storage: request.storage,
                    chunkSizeMb: request.chunkSizeMb,
                    verbose: false,  // Verbose typically handled by server logs
                    dryRun: false,  // Default API behavior is likely non-dry-run
                    reassemble: false  // Default API behavior is likely non-reassemble
                )
                print(
                    "Background push completed successfully for image: \(request.imageName):\(request.tags.joined(separator: ","))"
                )
            } catch {
                print(
                    "Background push failed for image: \(request.imageName):\(request.tags.joined(separator: ",")) - Error: \(error.localizedDescription)"
                )
            }
        }

        return HTTPResponse(
            statusCode: .accepted,
            headers: ["Content-Type": "application/json"],
            body: try JSONEncoder().encode([
                "message": AnyEncodable("Push initiated in background"),
                "name": AnyEncodable(request.name),
                "imageName": AnyEncodable(request.imageName),
                "tags": AnyEncodable(request.tags),
            ])
        )
    }

    func handleGetImages(_ request: HTTPRequest) async throws -> HTTPResponse {
        let pathAndQuery = request.path.split(separator: "?", maxSplits: 1)
        let queryParams =
            pathAndQuery.count > 1
            ? pathAndQuery[1]
                .split(separator: "&")
                .reduce(into: [String: String]()) { dict, param in
                    let parts = param.split(separator: "=", maxSplits: 1)
                    if parts.count == 2 {
                        dict[String(parts[0])] = String(parts[1])
                    }
                } : [:]

        let organization = queryParams["organization"] ?? "trycua"

        do {
            let vmController = LumeController()
            let imageList = try await vmController.getImages(organization: organization)

            // Create a response format that matches the CLI output
            let response = imageList.local.map {
                [
                    "repository": $0.repository,
                    "imageId": $0.imageId,
                ]
            }

            return HTTPResponse(
                statusCode: .ok,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(response)
            )
        } catch {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    // MARK: - Config Management Handlers

    func handleGetConfig() async throws -> HTTPResponse {
        do {
            let vmController = LumeController()
            let settings = vmController.getSettings()
            return try .json(settings)
        } catch {
            return .badRequest(message: error.localizedDescription)
        }
    }

    struct ConfigRequest: Codable {
        let homeDirectory: String?
        let cacheDirectory: String?
        let cachingEnabled: Bool?
    }

    func handleUpdateConfig(_ body: Data?) async throws -> HTTPResponse {
        guard let body = body,
            let request = try? JSONDecoder().decode(ConfigRequest.self, from: body)
        else {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
            )
        }

        do {
            let vmController = LumeController()

            if let homeDir = request.homeDirectory {
                try vmController.setHomeDirectory(homeDir)
            }

            if let cacheDir = request.cacheDirectory {
                try vmController.setCacheDirectory(path: cacheDir)
            }

            if let cachingEnabled = request.cachingEnabled {
                try vmController.setCachingEnabled(cachingEnabled)
            }

            return HTTPResponse(
                statusCode: .ok,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(["message": "Configuration updated successfully"])
            )
        } catch {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    func handleGetLocations() async throws -> HTTPResponse {
        do {
            let vmController = LumeController()
            let locations = vmController.getLocations()
            return try .json(locations)
        } catch {
            return .badRequest(message: error.localizedDescription)
        }
    }

    struct LocationRequest: Codable {
        let name: String
        let path: String
    }

    func handleAddLocation(_ body: Data?) async throws -> HTTPResponse {
        guard let body = body,
            let request = try? JSONDecoder().decode(LocationRequest.self, from: body)
        else {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: "Invalid request body"))
            )
        }

        do {
            let vmController = LumeController()
            try vmController.addLocation(name: request.name, path: request.path)

            return HTTPResponse(
                statusCode: .ok,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode([
                    "message": "Location added successfully",
                    "name": request.name,
                    "path": request.path,
                ])
            )
        } catch {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    func handleRemoveLocation(_ name: String) async throws -> HTTPResponse {
        do {
            let vmController = LumeController()
            try vmController.removeLocation(name: name)
            return HTTPResponse(
                statusCode: .ok,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(["message": "Location removed successfully"])
            )
        } catch {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    func handleSetDefaultLocation(_ name: String) async throws -> HTTPResponse {
        do {
            let vmController = LumeController()
            try vmController.setDefaultLocation(name: name)
            return HTTPResponse(
                statusCode: .ok,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(["message": "Default location set successfully"])
            )
        } catch {
            return HTTPResponse(
                statusCode: .badRequest,
                headers: ["Content-Type": "application/json"],
                body: try JSONEncoder().encode(APIError(message: error.localizedDescription))
            )
        }
    }

    // MARK: - Log Handlers

    func handleGetLogs(type: String?, lines: Int?) async throws -> HTTPResponse {
        do {
            let logType = type?.lowercased() ?? "all"
            let infoPath = "/tmp/lume_daemon.log"
            let errorPath = "/tmp/lume_daemon.error.log"

            let fileManager = FileManager.default
            var response: [String: String] = [:]

            // Function to read log files
            func readLogFile(path: String) -> String? {
                guard fileManager.fileExists(atPath: path) else {
                    return nil
                }

                do {
                    let content = try String(contentsOfFile: path, encoding: .utf8)

                    // If lines parameter is provided, return only the specified number of lines from the end
                    if let lineCount = lines {
                        let allLines = content.components(separatedBy: .newlines)
                        let startIndex = max(0, allLines.count - lineCount)
                        let lastLines = Array(allLines[startIndex...])
                        return lastLines.joined(separator: "\n")
                    }

                    return content
                } catch {
                    return "Error reading log file: \(error.localizedDescription)"
                }
            }

            // Get logs based on requested type
            if logType == "info" || logType == "all" {
                response["info"] = readLogFile(path: infoPath) ?? "Info log file not found"
            }

            if logType == "error" || logType == "all" {
                response["error"] = readLogFile(path: errorPath) ?? "Error log file not found"
            }

            return try .json(response)
        } catch {
            return .badRequest(message: error.localizedDescription)
        }
    }

    // MARK: - Private Helper Methods

    nonisolated private func startVM(
        name: String,
        noDisplay: Bool,
        sharedDirectories: [SharedDirectory] = [],
        recoveryMode: Bool = false,
        storage: String? = nil
    ) {
        Logger.info(
            "Starting VM in detached task",
            metadata: [
                "name": name,
                "noDisplay": "\(noDisplay)",
                "recoveryMode": "\(recoveryMode)",
                "storage": String(describing: storage),
            ])

        Task.detached { @MainActor @Sendable in
            Logger.info("Background task started for VM", metadata: ["name": name])
            do {
                Logger.info("Creating VM controller in background task", metadata: ["name": name])
                let vmController = LumeController()

                Logger.info(
                    "Calling runVM on controller",
                    metadata: [
                        "name": name,
                        "noDisplay": "\(noDisplay)",
                    ])
                try await vmController.runVM(
                    name: name,
                    noDisplay: noDisplay,
                    sharedDirectories: sharedDirectories,
                    recoveryMode: recoveryMode,
                    storage: storage
                )
                Logger.info("VM started successfully in background task", metadata: ["name": name])
            } catch {
                Logger.error(
                    "Failed to start VM in background task",
                    metadata: [
                        "name": name,
                        "error": error.localizedDescription,
                    ])
            }
        }
        Logger.info("Background task dispatched for VM", metadata: ["name": name])
    }
}

```

--------------------------------------------------------------------------------
/blog/build-your-own-operator-on-macos-2.md:
--------------------------------------------------------------------------------

```markdown
# Build Your Own Operator on macOS - Part 2

*Published on April 27, 2025 by Francesco Bonacci*

In our [previous post](build-your-own-operator-on-macos-1.md), we built a basic Computer-Use Operator from scratch using OpenAI's `computer-use-preview` model and our [cua-computer](https://pypi.org/project/cua-computer) package. While educational, implementing the control loop manually can be tedious and error-prone.

In this follow-up, we'll explore our [cua-agent](https://pypi.org/project/cua-agent) framework - a high-level abstraction that handles all the complexity of VM interaction, screenshot processing, model communication, and action execution automatically.

<div align="center">
  <video src="https://github.com/user-attachments/assets/0be7e3e3-eead-4646-a4a3-5bb392501ee7" width="600" controls></video>
</div>

## What You'll Learn

By the end of this tutorial, you'll be able to:
- Set up the `cua-agent` framework with various agent loop types and model providers
- Understand the different agent loop types and their capabilities
- Work with local models for cost-effective workflows
- Use a simple UI for your operator

**Prerequisites:**
- Completed setup from Part 1 ([lume CLI installed](https://github.com/trycua/cua?tab=readme-ov-file#option-2-full-computer-use-agent-capabilities), macOS CUA image already pulled)
- Python 3.10+. We recommend using Conda (or Anaconda) to create an ad hoc Python environment.
- API keys for OpenAI and/or Anthropic (optional for local models)

**Estimated Time:** 30-45 minutes

## Introduction to cua-agent

The `cua-agent` framework is designed to simplify building Computer-Use Agents. It abstracts away the complex interaction loop we built manually in Part 1, letting you focus on defining tasks rather than implementing the machinery. Among other features, it includes:

- **Multiple Provider Support**: Works with OpenAI, Anthropic, UI-Tars, local models (via Ollama), or any OpenAI-compatible model (e.g. LM Studio, vLLM, LocalAI, OpenRouter, Groq, etc.)
- **Flexible Loop Types**: Different implementations optimized for various models (e.g. OpenAI vs. Anthropic)
- **Structured Responses**: Clean, consistent output following the OpenAI Agent SDK specification we touched on in Part 1
- **Local Model Support**: Run cost-effectively with locally hosted models (Ollama, LM Studio, vLLM, LocalAI, etc.)
- **Gradio UI**: Optional visual interface for interacting with your agent

## Installation

Let's start by installing the `cua-agent` package. You can install it with all features or selectively install only what you need.

From your python 3.10+ environment, run:

```bash
# For all features
pip install "cua-agent[all]"

# Or selectively install only what you need
pip install "cua-agent[openai]"    # OpenAI support
pip install "cua-agent[anthropic]"  # Anthropic support
pip install "cua-agent[uitars]"    # UI-Tars support
pip install "cua-agent[omni]"       # OmniParser + VLMs support
pip install "cua-agent[ui]"         # Gradio UI
```

## Setting Up Your Environment

Before running any code examples, let's set up a proper environment:

1. **Create a new directory** for your project:
   ```bash
   mkdir cua-agent-tutorial
   cd cua-agent-tutorial
   ```

2. **Set up a Python environment** using one of these methods:

   **Option A: Using conda command line**
   ```bash
   # Using conda
   conda create -n cua-agent python=3.10
   conda activate cua-agent
   ```
   
   **Option B: Using Anaconda Navigator UI**
   - Open Anaconda Navigator
   - Click on "Environments" in the left sidebar
   - Click the "Create" button at the bottom
   - Name your environment "cua-agent"
   - Select Python 3.10
   - Click "Create"
   - Once created, select the environment and click "Open Terminal" to activate it
   
   **Option C: Using venv**
   ```bash
   python -m venv cua-env
   source cua-env/bin/activate  # On macOS/Linux
   ```

3. **Install the cua-agent package**:
   ```bash
   pip install "cua-agent[all]"
   ```

4. **Set up your API keys as environment variables**:
   ```bash
   # For OpenAI models
   export OPENAI_API_KEY=your_openai_key_here
   
   # For Anthropic models (if needed)
   export ANTHROPIC_API_KEY=your_anthropic_key_here
   ```

5. **Create a Python file or notebook**:
   
   **Option A: Create a Python script**
   ```bash
   # For a Python script
   touch cua_agent_example.py
   ```
   
   **Option B: Use VS Code notebooks**
   - Open VS Code
   - Install the Python extension if you haven't already
   - Create a new file with a `.ipynb` extension (e.g., `cua_agent_tutorial.ipynb`)
   - Select your Python environment when prompted
   - You can now create and run code cells in the notebook interface

Now you're ready to run the code examples!

## Understanding Agent Loops

If you recall from Part 1, we had to implement a custom interaction loop to interact with the compute-use-preview model. 

In the `cua-agent` framework, an **Agent Loop** is the core abstraction that implements the continuous interaction cycle between an AI model and the computer environment. It manages the flow of:
1. Capturing screenshots of the computer's state
2. Processing these screenshots (with or without UI element detection)
3. Sending this visual context to an AI model along with the task instructions
4. Receiving the model's decisions on what actions to take
5. Safely executing these actions in the environment
6. Repeating this cycle until the task is complete

The loop handles all the complex error handling, retries, context management, and model-specific interaction patterns so you don't have to implement them yourself.

While the core concept remains the same across all agent loops, different AI models require specialized handling for optimal performance. To address this, the framework provides 4 different agent loop implementations, each designed for different computer-use modalities.
| Agent Loop | Supported Models | Description | Set-Of-Marks |
|:-----------|:-----------------|:------------|:-------------|
| `AgentLoop.OPENAI` | • `computer_use_preview` | Use OpenAI Operator CUA Preview model | Not Required |
| `AgentLoop.ANTHROPIC` | • `claude-3-5-sonnet-20240620`<br>• `claude-3-7-sonnet-20250219` | Use Anthropic Computer-Use Beta Tools | Not Required |
| `AgentLoop.UITARS` | • `ByteDance-Seed/UI-TARS-1.5-7B` | Uses ByteDance's UI-TARS 1.5 model | Not Required |
| `AgentLoop.OMNI` | • `claude-3-5-sonnet-20240620`<br>• `claude-3-7-sonnet-20250219`<br>• `gpt-4.5-preview`<br>• `gpt-4o`<br>• `gpt-4`<br>• `phi4`<br>• `phi4-mini`<br>• `gemma3`<br>• `...`<br>• `Any Ollama or OpenAI-compatible model` | Use OmniParser for element pixel-detection (SoM) and any VLMs for UI Grounding and Reasoning | OmniParser |

Each loop handles the same basic pattern we implemented manually in Part 1:
1. Take a screenshot of the VM
2. Send the screenshot and task to the AI model
3. Receive an action to perform
4. Execute the action
5. Repeat until the task is complete

### Why Different Agent Loops?

The `cua-agent` framework provides multiple agent loop implementations to abstract away the complexity of interacting with different CUA models. Each provider has unique API structures, response formats, conventions and capabilities that require specialized handling:

- **OpenAI Loop**: Uses the Responses API with a specific `computer_call_output` format for sending screenshots after actions. Requires handling safety checks and maintains a chain of requests using `previous_response_id`.

- **Anthropic Loop**: Implements a [multi-agent loop pattern](https://docs.anthropic.com/en/docs/agents-and-tools/computer-use#understanding-the-multi-agent-loop) with a sophisticated message handling system, supporting various API providers (Anthropic, Bedrock, Vertex) with token management and prompt caching capabilities.

- **UI-TARS Loop**: Requires custom message formatting and specialized parsing to extract actions from text responses using a "box token" system for UI element identification.

- **OMNI Loop**: Uses [Microsoft's OmniParser](https://github.com/microsoft/OmniParser) to create a [Set-of-Marks (SoM)](https://arxiv.org/abs/2310.11441) representation of the UI, enabling any vision-language model to interact with interfaces without specialized UI training.

- **AgentLoop.OMNI**: The most flexible option that works with virtually any vision-language model including local and open-source ones. Perfect for cost-effective development or when you need to use models without native computer-use capabilities.

These abstractions allow you to easily switch between providers without changing your application code. All loop implementations are available in the [cua-agent GitHub repository](https://github.com/trycua/cua/tree/main/libs/agent/agent/providers).

Choosing the right agent loop depends not only on your API access and technical requirements but also on the specific tasks you need to accomplish. To make an informed decision, it's helpful to understand how these underlying models perform across different computing environments – from desktop operating systems to web browsers and mobile interfaces.

## Computer-Use Model Capabilities

The performance of different Computer-Use models varies significantly across tasks. These benchmark evaluations measure an agent's ability to follow instructions and complete real-world tasks in different computing environments.

| Benchmark type | Benchmark                                                                                                                                       | UI-TARS-1.5 | OpenAI CUA | Claude 3.7 | Previous SOTA       | Human       |
|----------------|--------------------------------------------------------------------------------------------------------------------------------------------------|-------------|-------------|-------------|----------------------|-------------|
| **Computer Use** | [OSworld](https://arxiv.org/abs/2404.07972) (100 steps)                                                                                        | **42.5**     | 36.4        | 28          | 38.1 (200 step)      | 72.4        |
|                | [Windows Agent Arena](https://arxiv.org/abs/2409.08264) (50 steps)                                                                              | **42.1**     | -           | -           | 29.8                 | -           |
| **Browser Use**  | [WebVoyager](https://arxiv.org/abs/2401.13919)                                                                                                 | 84.8         | **87**      | 84.1        | 87                   | -           |
|                | [Online-Mind2web](https://arxiv.org/abs/2504.01382)                                                                                              | **75.8**     | 71          | 62.9        | 71                   | -           |
| **Phone Use**    | [Android World](https://arxiv.org/abs/2405.14573)                                                                                              | **64.2**     | -           | -           | 59.5                 | -           |

### When to Use Each Loop

- **AgentLoop.OPENAI**: Choose when you have OpenAI Tier 3 access and need the most capable computer-use agent for web-based tasks. Uses the same [OpenAI Computer-Use Loop](https://platform.openai.com/docs/guides/tools-computer-use) as Part 1, delivering strong performance on browser-based benchmarks.

- **AgentLoop.ANTHROPIC**: Ideal for users with Anthropic API access who need strong reasoning capabilities with computer-use abilities. Works with `claude-3-5-sonnet-20240620` and `claude-3-7-sonnet-20250219` models following [Anthropic's Computer-Use tools](https://docs.anthropic.com/en/docs/agents-and-tools/computer-use#understanding-the-multi-agent-loop).

- **AgentLoop.UITARS**: Best for scenarios requiring more powerful OS/desktop, and latency-sensitive automation, as UI-TARS-1.5 leads in OS capabilities benchmarks. Requires running the model locally or accessing it through compatible endpoints (e.g. on Hugging Face).

- **AgentLoop.OMNI**: The most flexible option that works with virtually any vision-language model including local and open-source ones. Perfect for cost-effective development or when you need to use models without native computer-use capabilities.

Now that we understand the capabilities and strengths of different models, let's see how easy it is to implement a Computer-Use Agent using the `cua-agent` framework. Let's look at the implementation details.

## Creating Your First Computer-Use Agent

With the `cua-agent` framework, creating a Computer-Use Agent becomes remarkably straightforward. The framework handles all the complexities of model interaction, screenshot processing, and action execution behind the scenes. Let's look at a simple example of how to build your first agent:

**How to run this example:**

1. Create a new file named `simple_task.py` in your text editor or IDE (like VS Code, PyCharm, or Cursor)
2. Copy and paste the following code:

```python
import asyncio
from computer import Computer
from agent import ComputerAgent

async def run_simple_task():
    async with Computer() as macos_computer:
        # Create agent with OpenAI loop
        agent = ComputerAgent(
            model="openai/computer-use-preview",
            tools=[macos_computer]
        )
        
        # Define a simple task
        task = "Open Safari and search for 'Python tutorials'"
        
        # Run the task and process responses
        async for result in agent.run(task):
            print(f"Action: {result.get('text')}")

# Run the example
if __name__ == "__main__":
    asyncio.run(run_simple_task())
```

3. Save the file
4. Open a terminal, navigate to your project directory, and run:
   ```bash
   python simple_task.py
   ```

5. The code will initialize the macOS virtual machine, create an agent, and execute the task of opening Safari and searching for Python tutorials.

You can also run this in a VS Code notebook:
1. Create a new notebook in VS Code (.ipynb file)
2. Copy the code into a cell (without the `if __name__ == "__main__":` part)
3. Run the cell to execute the code

You can find the full code in our [notebook](https://github.com/trycua/cua/blob/main/notebooks/blog/build-your-own-operator-on-macos-2.ipynb).

Compare this to the manual implementation from Part 1 - we've reduced dozens of lines of code to just a few. The cua-agent framework handles all the complex logic internally, letting you focus on the overarching agentic system.

## Working with Multiple Tasks

Another advantage of the cua-agent framework is easily chaining multiple tasks. Instead of managing complex state between tasks, you can simply provide a sequence of instructions to be executed in order:

**How to run this example:**

1. Create a new file named `multi_task.py` with the following code:

```python
import asyncio
from computer import Computer
from agent import ComputerAgent

async def run_multi_task_workflow():
    async with Computer() as macos_computer:
        agent = ComputerAgent(
            model="anthropic/claude-3-5-sonnet-20241022",
            tools=[macos_computer]
        )
        
        tasks = [
            "Open Safari and go to github.com",
            "Search for 'trycua/cua'",
            "Open the repository page",
            "Click on the 'Issues' tab",
            "Read the first open issue"
        ]
        
        for i, task in enumerate(tasks):
            print(f"\nTask {i+1}/{len(tasks)}: {task}")
            async for result in agent.run(task):
                # Print just the action description for brevity
                if result.get("text"):
                    print(f"  → {result.get('text')}")
            print(f"✅ Task {i+1} completed")

if __name__ == "__main__":
    asyncio.run(run_multi_task_workflow())
```

2. Save the file
3. Make sure you have set your Anthropic API key:
   ```bash
   export ANTHROPIC_API_KEY=your_anthropic_key_here
   ```
4. Run the script:
   ```bash
   python multi_task.py
   ```

This pattern is particularly useful for creating workflows that navigate through multiple steps of an application or process. The agent maintains visual context between tasks, making it more likely to successfully complete complex sequences of actions.

## Understanding the Response Format

Each action taken by the agent returns a structured response following the OpenAI Agent SDK specification. This standardized format makes it easy to extract detailed information about what the agent is doing and why:

```python
async for result in agent.run(task):
    # Basic information
    print(f"Response ID: {result.get('id')}")
    print(f"Response Text: {result.get('text')}")
    
    # Detailed token usage statistics
    usage = result.get('usage')
    if usage:
        print(f"Input Tokens: {usage.get('input_tokens')}")
        print(f"Output Tokens: {usage.get('output_tokens')}")
    
    # Reasoning and actions
    for output in result.get('output', []):
        if output.get('type') == 'reasoning':
            print(f"Reasoning: {output.get('summary', [{}])[0].get('text')}")
        elif output.get('type') == 'computer_call':
            action = output.get('action', {})
            print(f"Action: {action.get('type')} at ({action.get('x')}, {action.get('y')})")
```

This structured format allows you to:
- Log detailed information about agent actions
- Provide real-time feedback to users
- Track token usage for cost monitoring
- Access the reasoning behind decisions for debugging or user explanation

## Using Local Models with OMNI

One of the most powerful features of the framework is the ability to use local models via the OMNI loop. This approach dramatically reduces costs while maintaining acceptable reliability for many agentic workflows:

**How to run this example:**

1. First, you'll need to install Ollama for running local models:
   - Visit [ollama.com](https://ollama.com) and download the installer for your OS
   - Follow the installation instructions
   - Pull the Gemma 3 model:
     ```bash
     ollama pull gemma3:4b-it-q4_K_M
     ```

2. Create a file named `local_model.py` with this code:

```python
import asyncio
from computer import Computer
from agent import ComputerAgent

async def run_with_local_model():
    async with Computer() as macos_computer:
        agent = ComputerAgent(
            model="omniparser+ollama_chat/gemma3",
            tools=[macos_computer]
        )
        
        task = "Open the Calculator app and perform a simple calculation"
        
        async for result in agent.run(task):
            print(f"Action: {result.get('text')}")

if __name__ == "__main__":
    asyncio.run(run_with_local_model())
```

3. Run the script:
   ```bash
   python local_model.py
   ```

You can also use other local model servers with the OAICOMPAT provider, which enables compatibility with any API endpoint following the OpenAI API structure:

```python
agent = ComputerAgent(
    model=LLM(
        provider=LLMProvider.OAICOMPAT,
        name="gemma-3-12b-it",
        provider_base_url="http://localhost:1234/v1"  # LM Studio endpoint
    ),
    tools=[macos_computer]
)
```

Common local endpoints include:
- LM Studio: `http://localhost:1234/v1`
- vLLM: `http://localhost:8000/v1`
- LocalAI: `http://localhost:8080/v1`
- Ollama with OpenAI compat: `http://localhost:11434/v1`

This approach is perfect for:
- Development and testing without incurring API costs
- Offline or air-gapped environments where API access isn't possible
- Privacy-sensitive applications where data can't leave your network
- Experimenting with different models to find the best fit for your use case

## Deploying and Using UI-TARS

UI-TARS is ByteDance's Computer-Use model designed for navigating OS-level interfaces. It shows excellent performance on desktop OS tasks. To use UI-TARS, you'll first need to deploy the model.

### Deployment Options

1. **Local Deployment**: Follow the [UI-TARS deployment guide](https://github.com/bytedance/UI-TARS/blob/main/README_deploy.md) to run the model locally.

2. **Hugging Face Endpoint**: Deploy UI-TARS on Hugging Face Inference Endpoints, which will give you a URL like:
   `https://**************.us-east-1.aws.endpoints.huggingface.cloud/v1`

3. **Using with cua-agent**: Once deployed, you can use UI-TARS with the cua-agent framework:

```python
agent = ComputerAgent(
    model=LLM(
        provider=LLMProvider.OAICOMPAT, 
        name="tgi", 
        provider_base_url="https://**************.us-east-1.aws.endpoints.huggingface.cloud/v1"
    ),
    tools=[macos_computer]
)
```

UI-TARS is particularly useful for desktop automation tasks, as it shows the highest performance on OS-level benchmarks like OSworld and Windows Agent Arena.

## Understanding Agent Responses in Detail

The `run()` method of your agent yields structured responses that follow the OpenAI Agent SDK specification. This provides a rich set of information beyond just the basic action text:

```python
async for result in agent.run(task):
    # Basic ID and text
    print("Response ID:", result.get("id"))
    print("Response Text:", result.get("text"))

    # Token usage statistics
    usage = result.get("usage")
    if usage:
        print("\nUsage Details:")
        print(f"  Input Tokens: {usage.get('input_tokens')}")
        if "input_tokens_details" in usage:
            print(f"  Input Tokens Details: {usage.get('input_tokens_details')}")
        print(f"  Output Tokens: {usage.get('output_tokens')}")
        if "output_tokens_details" in usage:
            print(f"  Output Tokens Details: {usage.get('output_tokens_details')}")
        print(f"  Total Tokens: {usage.get('total_tokens')}")

    # Detailed reasoning and actions
    outputs = result.get("output", [])
    for output in outputs:
        output_type = output.get("type")
        if output_type == "reasoning":
            print("\nReasoning:")
            for summary in output.get("summary", []):
                print(f"  {summary.get('text')}")
        elif output_type == "computer_call":
            action = output.get("action", {})
            print("\nComputer Action:")
            print(f"  Type: {action.get('type')}")
            print(f"  Position: ({action.get('x')}, {action.get('y')})")
            if action.get("text"):
                print(f"  Text: {action.get('text')}")
```

This detailed information is invaluable for debugging, logging, and understanding the agent's decision-making process in an agentic system. More details can be found in the [OpenAI Agent SDK Specification](https://platform.openai.com/docs/guides/responses-vs-chat-completions).

## Building a Gradio UI

For a visual interface to your agent, the package also includes a Gradio UI:

**How to run the Gradio UI:**

1. Create a file named `launch_ui.py` with the following code:

```python
from agent.ui.gradio.app import create_gradio_ui

# Create and launch the UI
if __name__ == "__main__":
    app = create_gradio_ui()
    app.launch(share=False)  # Set share=False for local access only
```

2. Install the UI dependencies if you haven't already:
   ```bash
   pip install "cua-agent[ui]"
   ```

3. Run the script:
   ```bash
   python launch_ui.py
   ```

4. Open your browser to the displayed URL (usually http://127.0.0.1:7860)

**Creating a Shareable Link (Optional):**

You can also create a temporary public URL to access your Gradio UI from anywhere:

```python
# In launch_ui.py
if __name__ == "__main__":
    app = create_gradio_ui()
    app.launch(share=True)  # Creates a public link
```

When you run this, Gradio will display both a local URL and a public URL like:
```
Running on local URL:  http://127.0.0.1:7860
Running on public URL: https://abcd1234.gradio.live
```

**Security Note:** Be cautious when sharing your Gradio UI publicly:
- The public URL gives anyone with the link full access to your agent
- Consider using basic authentication for additional protection:
  ```python
  app.launch(share=True, auth=("username", "password"))
  ```
- Only use this feature for personal or team use, not for production environments
- The temporary link expires when you stop the Gradio application

This provides:
- Model provider selection
- Agent loop selection
- Task input field
- Real-time display of VM screenshots
- Action history

### Setting API Keys for the UI

To use the UI with different providers, set your API keys as environment variables:

```bash
# For OpenAI models
export OPENAI_API_KEY=your_openai_key_here

# For Anthropic models
export ANTHROPIC_API_KEY=your_anthropic_key_here

# Launch with both keys set
OPENAI_API_KEY=your_key ANTHROPIC_API_KEY=your_key python launch_ui.py
```

### UI Settings Persistence

The Gradio UI automatically saves your configuration to maintain your preferences between sessions:

- Settings like Agent Loop, Model Choice, Custom Base URL, and configuration options are saved to `.gradio_settings.json` in the project's root directory
- These settings are loaded automatically when you restart the UI
- API keys entered in the custom provider field are **not** saved for security reasons
- It's recommended to add `.gradio_settings.json` to your `.gitignore` file

## Advanced Example: GitHub Repository Workflow

Let's look at a more complex example that automates a GitHub workflow:

**How to run this advanced example:**

1. Create a file named `github_workflow.py` with the following code:

```python
import asyncio
import logging
from computer import Computer
from agent import ComputerAgent

async def github_workflow():
    async with Computer(verbosity=logging.INFO) as macos_computer:
        agent = ComputerAgent(
            model="openai/computer-use-preview",
            save_trajectory=True,  # Save screenshots for debugging
            only_n_most_recent_images=3,  # Only keep last 3 images in context
            verbosity=logging.INFO,
            tools=[macos_computer]
        )
        
        tasks = [
            "Look for a repository named trycua/cua on GitHub.",
            "Check the open issues, open the most recent one and read it.",
            "Clone the repository in users/lume/projects if it doesn't exist yet.",
            "Open the repository with Cursor (on the dock, black background and white cube icon).",
            "From Cursor, open Composer if not already open.",
            "Focus on the Composer text area, then write and submit a task to help resolve the GitHub issue.",
        ]
        
        for i, task in enumerate(tasks):
            print(f"\nExecuting task {i+1}/{len(tasks)}: {task}")
            async for result in agent.run(task):
                print(f"Action: {result.get('text')}")
            print(f"✅ Task {i+1}/{len(tasks)} completed")

if __name__ == "__main__":
    asyncio.run(github_workflow())
```

2. Make sure your OpenAI API key is set:
   ```bash
   export OPENAI_API_KEY=your_openai_key_here
   ```

3. Run the script:
   ```bash
   python github_workflow.py
   ```

4. Watch as the agent completes the entire workflow:
   - The agent will navigate to GitHub
   - Find and investigate issues in the repository
   - Clone the repository to the local machine
   - Open it in Cursor
   - Use Cursor's AI features to work on a solution

This example:
1. Searches GitHub for a repository
2. Reads an issue
3. Clones the repository
4. Opens it in an IDE
5. Uses AI to write a solution

## Comparing Implementation Approaches

Let's compare our manual implementation from Part 1 with the framework approach:

### Manual Implementation (Part 1)
- Required writing custom code for the interaction loop
- Needed explicit handling of different action types
- Required direct management of the OpenAI API calls
- Around 50-100 lines of code for basic functionality
- Limited to OpenAI's computer-use model

### Framework Implementation (Part 2)
- Abstracts the interaction loop
- Handles all action types automatically
- Manages API calls internally
- Only 10-15 lines of code for the same functionality
- Works with multiple model providers
- Includes UI capabilities

## Conclusion

The `cua-agent` framework transforms what was a complex implementation task into a simple, high-level interface for building Computer-Use Agents. By abstracting away the technical details, it lets you focus on defining the tasks rather than the machinery.

### When to Use Each Approach
- **Manual Implementation (Part 1)**: When you need complete control over the interaction loop or are implementing a custom solution
- **Framework (Part 2)**: For most applications where you want to quickly build and deploy Computer-Use Agents

### Next Steps
With the basics covered, you might want to explore:
- Customizing the agent's behavior with additional parameters
- Building more complex workflows spanning multiple applications
- Integrating your agent into other applications
- Contributing to the open-source project on GitHub

### Resources
- [cua-agent GitHub repository](https://github.com/trycua/cua/tree/main/libs/agent)
- [Agent Notebook Examples](https://github.com/trycua/cua/blob/main/notebooks/agent_nb.ipynb)
- [OpenAI Agent SDK Specification](https://platform.openai.com/docs/api-reference/responses)
- [Anthropic API Documentation](https://docs.anthropic.com/en/api/getting-started)
- [UI-TARS GitHub](https://github.com/ByteDance/UI-TARS)
- [OmniParser GitHub](https://github.com/microsoft/OmniParser)

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/agent.py:
--------------------------------------------------------------------------------

```python
"""
ComputerAgent - Main agent class that selects and runs agent loops
"""

import asyncio
from pathlib import Path
from typing import Dict, List, Any, Optional, AsyncGenerator, Union, cast, Callable, Set, Tuple

from litellm.responses.utils import Usage

from .types import (
    Messages,
    AgentCapability,
    ToolError,
    IllegalArgumentError
)
from .responses import make_tool_error_item, replace_failed_computer_calls_with_function_calls
from .decorators import find_agent_config
import json
import litellm
import litellm.utils
import inspect
from .adapters import (
    HuggingFaceLocalAdapter,
    HumanAdapter,
    MLXVLMAdapter,
)
from .callbacks import (
    ImageRetentionCallback, 
    LoggingCallback, 
    TrajectorySaverCallback, 
    BudgetManagerCallback,
    TelemetryCallback,
    OperatorNormalizerCallback,
    PromptInstructionsCallback,
)
from .computers import (
    AsyncComputerHandler,
    is_agent_computer,
    make_computer_handler
)

def assert_callable_with(f, *args, **kwargs):
   """Check if function can be called with given arguments."""
   try:
       inspect.signature(f).bind(*args, **kwargs)
       return True
   except TypeError as e:
       sig = inspect.signature(f)
       raise IllegalArgumentError(f"Expected {sig}, got args={args} kwargs={kwargs}") from e

def get_json(obj: Any, max_depth: int = 10) -> Any:
    def custom_serializer(o: Any, depth: int = 0, seen: Optional[Set[int]] = None) -> Any:
        if seen is None:
            seen = set()
        
        # Use model_dump() if available
        if hasattr(o, 'model_dump'):
            return o.model_dump()
        
        # Check depth limit
        if depth > max_depth:
            return f"<max_depth_exceeded:{max_depth}>"
        
        # Check for circular references using object id
        obj_id = id(o)
        if obj_id in seen:
            return f"<circular_reference:{type(o).__name__}>"
        
        # Handle Computer objects
        if hasattr(o, '__class__') and 'computer' in getattr(o, '__class__').__name__.lower():
            return f"<computer:{o.__class__.__name__}>"

        # Handle objects with __dict__
        if hasattr(o, '__dict__'):
            seen.add(obj_id)
            try:
                result = {}
                for k, v in o.__dict__.items():
                    if v is not None:
                        # Recursively serialize with updated depth and seen set
                        serialized_value = custom_serializer(v, depth + 1, seen.copy())
                        result[k] = serialized_value
                return result
            finally:
                seen.discard(obj_id)
        
        # Handle common types that might contain nested objects
        elif isinstance(o, dict):
            seen.add(obj_id)
            try:
                return {
                    k: custom_serializer(v, depth + 1, seen.copy())
                    for k, v in o.items()
                    if v is not None
                }
            finally:
                seen.discard(obj_id)
        
        elif isinstance(o, (list, tuple, set)):
            seen.add(obj_id)
            try:
                return [
                    custom_serializer(item, depth + 1, seen.copy())
                    for item in o
                    if item is not None
                ]
            finally:
                seen.discard(obj_id)
        
        # For basic types that json.dumps can handle
        elif isinstance(o, (str, int, float, bool)) or o is None:
            return o
        
        # Fallback to string representation
        else:
            return str(o)
    
    def remove_nones(obj: Any) -> Any:
        if isinstance(obj, dict):
            return {k: remove_nones(v) for k, v in obj.items() if v is not None}
        elif isinstance(obj, list):
            return [remove_nones(item) for item in obj if item is not None]
        return obj
    
    # Serialize with circular reference and depth protection
    serialized = custom_serializer(obj)
    
    # Convert to JSON string and back to ensure JSON compatibility
    json_str = json.dumps(serialized)
    parsed = json.loads(json_str)
    
    # Final cleanup of any remaining None values
    return remove_nones(parsed)

def sanitize_message(msg: Any) -> Any:
    """Return a copy of the message with image_url omitted for computer_call_output messages."""
    if msg.get("type") == "computer_call_output":
        output = msg.get("output", {})
        if isinstance(output, dict):
            sanitized = msg.copy()
            sanitized["output"] = {**output, "image_url": "[omitted]"}
            return sanitized
    return msg

def get_output_call_ids(messages: List[Dict[str, Any]]) -> List[str]:
    call_ids = []
    for message in messages:
        if message.get("type") == "computer_call_output" or message.get("type") == "function_call_output":
            call_ids.append(message.get("call_id"))
    return call_ids

class ComputerAgent:
    """
    Main agent class that automatically selects the appropriate agent loop
    based on the model and executes tool calls.
    """
    
    def __init__(
        self,
        model: str,
        tools: Optional[List[Any]] = None,
        custom_loop: Optional[Callable] = None,
        only_n_most_recent_images: Optional[int] = None,
        callbacks: Optional[List[Any]] = None,
        instructions: Optional[str] = None,
        verbosity: Optional[int] = None,
        trajectory_dir: Optional[str | Path | dict] = None,
        max_retries: Optional[int] = 3,
        screenshot_delay: Optional[float | int] = 0.5,
        use_prompt_caching: Optional[bool] = False,
        max_trajectory_budget: Optional[float | dict] = None,
        telemetry_enabled: Optional[bool] = True,
        trust_remote_code: Optional[bool] = False,
        **kwargs
    ):
        """
        Initialize ComputerAgent.
        
        Args:
            model: Model name (e.g., "claude-3-5-sonnet-20241022", "computer-use-preview", "omni+vertex_ai/gemini-pro")
            tools: List of tools (computer objects, decorated functions, etc.)
            custom_loop: Custom agent loop function to use instead of auto-selection
            only_n_most_recent_images: If set, only keep the N most recent images in message history. Adds ImageRetentionCallback automatically.
            callbacks: List of AsyncCallbackHandler instances for preprocessing/postprocessing
            instructions: Optional system instructions to be passed to the model
            verbosity: Logging level (logging.DEBUG, logging.INFO, etc.). If set, adds LoggingCallback automatically
            trajectory_dir: If set, saves trajectory data (screenshots, responses) to this directory. Adds TrajectorySaverCallback automatically.
            max_retries: Maximum number of retries for failed API calls
            screenshot_delay: Delay before screenshots in seconds
            use_prompt_caching: If set, use prompt caching to avoid reprocessing the same prompt. Intended for use with anthropic providers.
            max_trajectory_budget: If set, adds BudgetManagerCallback to track usage costs and stop when budget is exceeded
            telemetry_enabled: If set, adds TelemetryCallback to track anonymized usage data. Enabled by default.
            trust_remote_code: If set, trust remote code when loading local models. Disabled by default.
            **kwargs: Additional arguments passed to the agent loop
        """        
        # If the loop is "human/human", we need to prefix a grounding model fallback
        if model in ["human/human", "human"]:
            model = "openai/computer-use-preview+human/human"
        
        self.model = model
        self.tools = tools or []
        self.custom_loop = custom_loop
        self.only_n_most_recent_images = only_n_most_recent_images
        self.callbacks = callbacks or []
        self.instructions = instructions
        self.verbosity = verbosity
        self.trajectory_dir = trajectory_dir
        self.max_retries = max_retries
        self.screenshot_delay = screenshot_delay
        self.use_prompt_caching = use_prompt_caching
        self.telemetry_enabled = telemetry_enabled
        self.kwargs = kwargs
        self.trust_remote_code = trust_remote_code

        # == Add built-in callbacks ==

        # Prepend operator normalizer callback
        self.callbacks.insert(0, OperatorNormalizerCallback())

        # Add prompt instructions callback if provided
        if self.instructions:
            self.callbacks.append(PromptInstructionsCallback(self.instructions))

        # Add telemetry callback if telemetry_enabled is set
        if self.telemetry_enabled:
            if isinstance(self.telemetry_enabled, bool):
                self.callbacks.append(TelemetryCallback(self))
            else:
                self.callbacks.append(TelemetryCallback(self, **self.telemetry_enabled))

        # Add logging callback if verbosity is set
        if self.verbosity is not None:
            self.callbacks.append(LoggingCallback(level=self.verbosity))

        # Add image retention callback if only_n_most_recent_images is set
        if self.only_n_most_recent_images:
            self.callbacks.append(ImageRetentionCallback(self.only_n_most_recent_images))
        
        # Add trajectory saver callback if trajectory_dir is set
        if self.trajectory_dir:
            if isinstance(self.trajectory_dir, dict):
                self.callbacks.append(TrajectorySaverCallback(**self.trajectory_dir))
            elif isinstance(self.trajectory_dir, (str, Path)):
                self.callbacks.append(TrajectorySaverCallback(str(self.trajectory_dir)))
        
        # Add budget manager if max_trajectory_budget is set
        if max_trajectory_budget:
            if isinstance(max_trajectory_budget, dict):
                self.callbacks.append(BudgetManagerCallback(**max_trajectory_budget))
            else:
                self.callbacks.append(BudgetManagerCallback(max_trajectory_budget))
        
        # == Enable local model providers w/ LiteLLM ==

        # Register local model providers
        hf_adapter = HuggingFaceLocalAdapter(
            device="auto",
            trust_remote_code=self.trust_remote_code or False
        )
        human_adapter = HumanAdapter()
        mlx_adapter = MLXVLMAdapter()
        litellm.custom_provider_map = [
            {"provider": "huggingface-local", "custom_handler": hf_adapter},
            {"provider": "human", "custom_handler": human_adapter},
            {"provider": "mlx", "custom_handler": mlx_adapter}
        ]
        litellm.suppress_debug_info = True

        # == Initialize computer agent ==

        # Find the appropriate agent loop
        if custom_loop:
            self.agent_loop = custom_loop
            self.agent_config_info = None
        else:
            config_info = find_agent_config(model)
            if not config_info:
                raise ValueError(f"No agent config found for model: {model}")
            # Instantiate the agent config class
            self.agent_loop = config_info.agent_class()
            self.agent_config_info = config_info
        
        self.tool_schemas = []
        self.computer_handler = None
        
    async def _initialize_computers(self):
        """Initialize computer objects"""
        if not self.tool_schemas:
            # Process tools and create tool schemas
            self.tool_schemas = self._process_tools()
            
            # Find computer tool and create interface adapter
            computer_handler = None
            for schema in self.tool_schemas:
                if schema["type"] == "computer":
                    computer_handler = await make_computer_handler(schema["computer"])
                    break
            self.computer_handler = computer_handler
    
    def _process_input(self, input: Messages) -> List[Dict[str, Any]]:
        """Process input messages and create schemas for the agent loop"""
        if isinstance(input, str):
            return [{"role": "user", "content": input}]
        return [get_json(msg) for msg in input]

    def _process_tools(self) -> List[Dict[str, Any]]:
        """Process tools and create schemas for the agent loop"""
        schemas = []
        
        for tool in self.tools:
            # Check if it's a computer object (has interface attribute)
            if is_agent_computer(tool):
                # This is a computer tool - will be handled by agent loop
                schemas.append({
                    "type": "computer",
                    "computer": tool
                })
            elif callable(tool):
                # Use litellm.utils.function_to_dict to extract schema from docstring
                try:
                    function_schema = litellm.utils.function_to_dict(tool)
                    schemas.append({
                        "type": "function",
                        "function": function_schema
                    })
                except Exception as e:
                    print(f"Warning: Could not process tool {tool}: {e}")
            else:
                print(f"Warning: Unknown tool type: {tool}")
        
        return schemas
    
    def _get_tool(self, name: str) -> Optional[Callable]:
        """Get a tool by name"""
        for tool in self.tools:
            if hasattr(tool, '__name__') and tool.__name__ == name:
                return tool
            elif hasattr(tool, 'func') and tool.func.__name__ == name:
                return tool
        return None
    
    # ============================================================================
    # AGENT RUN LOOP LIFECYCLE HOOKS
    # ============================================================================
    
    async def _on_run_start(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]]) -> None:
        """Initialize run tracking by calling callbacks."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_run_start'):
                await callback.on_run_start(kwargs, old_items)
    
    async def _on_run_end(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]], new_items: List[Dict[str, Any]]) -> None:
        """Finalize run tracking by calling callbacks."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_run_end'):
                await callback.on_run_end(kwargs, old_items, new_items)
    
    async def _on_run_continue(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]], new_items: List[Dict[str, Any]]) -> bool:
        """Check if run should continue by calling callbacks."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_run_continue'):
                should_continue = await callback.on_run_continue(kwargs, old_items, new_items)
                if not should_continue:
                    return False
        return True
    
    async def _on_llm_start(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Prepare messages for the LLM call by applying callbacks."""
        result = messages
        for callback in self.callbacks:
            if hasattr(callback, 'on_llm_start'):
                result = await callback.on_llm_start(result)
        return result

    async def _on_llm_end(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Postprocess messages after the LLM call by applying callbacks."""
        result = messages
        for callback in self.callbacks:
            if hasattr(callback, 'on_llm_end'):
                result = await callback.on_llm_end(result)
        return result

    async def _on_responses(self, kwargs: Dict[str, Any], responses: Dict[str, Any]) -> None:
        """Called when responses are received."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_responses'):
                await callback.on_responses(get_json(kwargs), get_json(responses))
    
    async def _on_computer_call_start(self, item: Dict[str, Any]) -> None:
        """Called when a computer call is about to start."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_computer_call_start'):
                await callback.on_computer_call_start(get_json(item))
    
    async def _on_computer_call_end(self, item: Dict[str, Any], result: List[Dict[str, Any]]) -> None:
        """Called when a computer call has completed."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_computer_call_end'):
                await callback.on_computer_call_end(get_json(item), get_json(result))
    
    async def _on_function_call_start(self, item: Dict[str, Any]) -> None:
        """Called when a function call is about to start."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_function_call_start'):
                await callback.on_function_call_start(get_json(item))
    
    async def _on_function_call_end(self, item: Dict[str, Any], result: List[Dict[str, Any]]) -> None:
        """Called when a function call has completed."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_function_call_end'):
                await callback.on_function_call_end(get_json(item), get_json(result))
    
    async def _on_text(self, item: Dict[str, Any]) -> None:
        """Called when a text message is encountered."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_text'):
                await callback.on_text(get_json(item))
    
    async def _on_api_start(self, kwargs: Dict[str, Any]) -> None:
        """Called when an LLM API call is about to start."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_api_start'):
                await callback.on_api_start(get_json(kwargs))
    
    async def _on_api_end(self, kwargs: Dict[str, Any], result: Any) -> None:
        """Called when an LLM API call has completed."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_api_end'):
                await callback.on_api_end(get_json(kwargs), get_json(result))

    async def _on_usage(self, usage: Dict[str, Any]) -> None:
        """Called when usage information is received."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_usage'):
                await callback.on_usage(get_json(usage))

    async def _on_screenshot(self, screenshot: Union[str, bytes], name: str = "screenshot") -> None:
        """Called when a screenshot is taken."""
        for callback in self.callbacks:
            if hasattr(callback, 'on_screenshot'):
                await callback.on_screenshot(screenshot, name)

    # ============================================================================
    # AGENT OUTPUT PROCESSING
    # ============================================================================
    
    async def _handle_item(self, item: Any, computer: Optional[AsyncComputerHandler] = None, ignore_call_ids: Optional[List[str]] = None) -> List[Dict[str, Any]]:
        """Handle each item; may cause a computer action + screenshot."""
        call_id = item.get("call_id")
        if ignore_call_ids and call_id and call_id in ignore_call_ids:
            return []
        
        item_type = item.get("type", None)
        
        if item_type == "message":
            await self._on_text(item)
            # # Print messages
            # if item.get("content"):
            #     for content_item in item.get("content"):
            #         if content_item.get("text"):
            #             print(content_item.get("text"))
            return []
        
        try:
            if item_type == "computer_call":
                await self._on_computer_call_start(item)
                if not computer:
                    raise ValueError("Computer handler is required for computer calls")

                # Perform computer actions
                action = item.get("action")
                action_type = action.get("type")
                if action_type is None:
                    print(f"Action type cannot be `None`: action={action}, action_type={action_type}")
                    return []
                
                # Extract action arguments (all fields except 'type')
                action_args = {k: v for k, v in action.items() if k != "type"}
                
                # print(f"{action_type}({action_args})")
                
                # Execute the computer action
                computer_method = getattr(computer, action_type, None)
                if computer_method:
                    assert_callable_with(computer_method, **action_args)
                    await computer_method(**action_args)
                else:
                    raise ToolError(f"Unknown computer action: {action_type}")
                
                # Take screenshot after action
                if self.screenshot_delay and self.screenshot_delay > 0:
                    await asyncio.sleep(self.screenshot_delay)
                screenshot_base64 = await computer.screenshot()
                await self._on_screenshot(screenshot_base64, "screenshot_after")
                
                # Handle safety checks
                pending_checks = item.get("pending_safety_checks", [])
                acknowledged_checks = []
                for check in pending_checks:
                    check_message = check.get("message", str(check))
                    acknowledged_checks.append(check)
                    # TODO: implement a callback for safety checks
                    # if acknowledge_safety_check_callback(check_message, allow_always=True):
                    #     acknowledged_checks.append(check)
                    # else:
                    #     raise ValueError(f"Safety check failed: {check_message}")
                
                # Create call output
                call_output = {
                    "type": "computer_call_output",
                    "call_id": item.get("call_id"),
                    "acknowledged_safety_checks": acknowledged_checks,
                    "output": {
                        "type": "input_image",
                        "image_url": f"data:image/png;base64,{screenshot_base64}",
                    },
                }
                
                # # Additional URL safety checks for browser environments
                # if await computer.get_environment() == "browser":
                #     current_url = await computer.get_current_url()
                #     call_output["output"]["current_url"] = current_url
                #     # TODO: implement a callback for URL safety checks
                #     # check_blocklisted_url(current_url)
                
                result = [call_output]
                await self._on_computer_call_end(item, result)
                return result
            
            if item_type == "function_call":
                await self._on_function_call_start(item)
                # Perform function call
                function = self._get_tool(item.get("name"))
                if not function:
                    raise ToolError(f"Function {item.get("name")} not found")
            
                args = json.loads(item.get("arguments"))

                # Validate arguments before execution
                assert_callable_with(function, **args)

                # Execute function - use asyncio.to_thread for non-async functions
                if inspect.iscoroutinefunction(function):
                    result = await function(**args)
                else:
                    result = await asyncio.to_thread(function, **args)
            
                # Create function call output
                call_output = {
                    "type": "function_call_output",
                    "call_id": item.get("call_id"),
                    "output": str(result),
                }
            
                result = [call_output]
                await self._on_function_call_end(item, result)
                return result
        except ToolError as e:
            return [make_tool_error_item(repr(e), call_id)]

        return []

    # ============================================================================
    # MAIN AGENT LOOP
    # ============================================================================
    
    async def run(
        self,
        messages: Messages,
        stream: bool = False,
        **kwargs
    ) -> AsyncGenerator[Dict[str, Any], None]:
        """
        Run the agent with the given messages using Computer protocol handler pattern.
        
        Args:
            messages: List of message dictionaries
            stream: Whether to stream the response
            **kwargs: Additional arguments
            
        Returns:
            AsyncGenerator that yields response chunks
        """
        if not self.agent_config_info:
            raise ValueError("Agent configuration not found")
        
        capabilities = self.get_capabilities()
        if "step" not in capabilities:
            raise ValueError(f"Agent loop {self.agent_config_info.agent_class.__name__} does not support step predictions")

        await self._initialize_computers()
        
        # Merge kwargs
        merged_kwargs = {**self.kwargs, **kwargs}
        
        old_items = self._process_input(messages)
        new_items = []

        # Initialize run tracking
        run_kwargs = {
            "messages": messages,
            "stream": stream,
            "model": self.model,
            "agent_loop": self.agent_config_info.agent_class.__name__,
            **merged_kwargs
        }
        await self._on_run_start(run_kwargs, old_items)

        while new_items[-1].get("role") != "assistant" if new_items else True:
            # Lifecycle hook: Check if we should continue based on callbacks (e.g., budget manager)
            should_continue = await self._on_run_continue(run_kwargs, old_items, new_items)
            if not should_continue:
                break

            # Lifecycle hook: Prepare messages for the LLM call
            # Use cases:
            # - PII anonymization
            # - Image retention policy
            combined_messages = old_items + new_items
            combined_messages = replace_failed_computer_calls_with_function_calls(combined_messages)
            preprocessed_messages = await self._on_llm_start(combined_messages)
            
            loop_kwargs = {
                "messages": preprocessed_messages,
                "model": self.model,
                "tools": self.tool_schemas,
                "stream": False,
                "computer_handler": self.computer_handler,
                "max_retries": self.max_retries,
                "use_prompt_caching": self.use_prompt_caching,
                **merged_kwargs
            }

            # Run agent loop iteration
            result = await self.agent_loop.predict_step(
                **loop_kwargs,
                _on_api_start=self._on_api_start,
                _on_api_end=self._on_api_end,
                _on_usage=self._on_usage,
                _on_screenshot=self._on_screenshot,
            )
            result = get_json(result)
            
            # Lifecycle hook: Postprocess messages after the LLM call
            # Use cases:
            # - PII deanonymization (if you want tool calls to see PII)
            result["output"] = await self._on_llm_end(result.get("output", []))
            await self._on_responses(loop_kwargs, result)
            
            # Yield agent response
            yield result

            # Add agent response to new_items
            new_items += result.get("output")

            # Get output call ids
            output_call_ids = get_output_call_ids(result.get("output", []))

            # Handle computer actions
            for item in result.get("output"):
                partial_items = await self._handle_item(item, self.computer_handler, ignore_call_ids=output_call_ids)
                new_items += partial_items

                # Yield partial response
                yield {
                    "output": partial_items,
                    "usage": Usage(
                        prompt_tokens=0,
                        completion_tokens=0,
                        total_tokens=0,
                    )
                }
        
        await self._on_run_end(loop_kwargs, old_items, new_items)
    
    async def predict_click(
        self,
        instruction: str,
        image_b64: Optional[str] = None
    ) -> Optional[Tuple[int, int]]:
        """
        Predict click coordinates based on image and instruction.
        
        Args:
            instruction: Instruction for where to click
            image_b64: Base64 encoded image (optional, will take screenshot if not provided)
            
        Returns:
            None or tuple with (x, y) coordinates
        """
        if not self.agent_config_info:
            raise ValueError("Agent configuration not found")
        
        capabilities = self.get_capabilities()
        if "click" not in capabilities:
            raise ValueError(f"Agent loop {self.agent_config_info.agent_class.__name__} does not support click predictions")
        if hasattr(self.agent_loop, 'predict_click'):
            if not image_b64:
                if not self.computer_handler:
                    raise ValueError("Computer tool or image_b64 is required for predict_click")
                image_b64 = await self.computer_handler.screenshot()
            return await self.agent_loop.predict_click(
                model=self.model,
                image_b64=image_b64,
                instruction=instruction
            )
        return None
    
    def get_capabilities(self) -> List[AgentCapability]:
        """
        Get list of capabilities supported by the current agent config.
        
        Returns:
            List of capability strings (e.g., ["step", "click"])
        """
        if not self.agent_config_info:
            raise ValueError("Agent configuration not found")
        
        if hasattr(self.agent_loop, 'get_capabilities'):
            return self.agent_loop.get_capabilities()
        return ["step"]  # Default capability
```

--------------------------------------------------------------------------------
/libs/python/agent/agent/human_tool/ui.py:
--------------------------------------------------------------------------------

```python
import gradio as gr
import json
import time
from typing import List, Dict, Any, Optional
from datetime import datetime
import requests
from .server import completion_queue
import base64
import io
from PIL import Image

class HumanCompletionUI:
    def __init__(self, server_url: str = "http://localhost:8002"):
        self.server_url = server_url
        self.current_call_id: Optional[str] = None
        self.refresh_interval = 2.0  # seconds
        self.last_image = None  # Store the last image for display
        # Track current interactive action controls
        self.current_action_type: str = "click"
        self.current_button: str = "left"
        self.current_scroll_x: int = 0
        self.current_scroll_y: int = -120
    
    def format_messages_for_chatbot(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Format messages for display in gr.Chatbot with type='messages'."""
        formatted = []
        for msg in messages:
            role = msg.get("role", "user")
            content = msg.get("content", "")
            tool_calls = msg.get("tool_calls", [])
            
            # Handle different content formats
            if isinstance(content, list):
                # Multi-modal content - can include text and images
                formatted_content = []
                for item in content:
                    if item.get("type") == "text":
                        text = item.get("text", "")
                        if text.strip():  # Only add non-empty text
                            formatted_content.append(text)
                    elif item.get("type") == "image_url":
                        image_url = item.get("image_url", {}).get("url", "")
                        if image_url:
                            # Check if it's a base64 image or URL
                            if image_url.startswith("data:image"):
                                # For base64 images, decode and create gr.Image
                                try:
                                    header, data = image_url.split(",", 1)
                                    image_data = base64.b64decode(data)
                                    image = Image.open(io.BytesIO(image_data))
                                    formatted_content.append(gr.Image(value=image))
                                except Exception as e:
                                    print(f"Error loading image: {e}")
                                    formatted_content.append(f"[Image loading error: {e}]")
                            else:
                                # For URL images, create gr.Image with URL
                                formatted_content.append(gr.Image(value=image_url))
                
                # Determine final content format
                if len(formatted_content) == 1:
                    content = formatted_content[0]
                elif len(formatted_content) > 1:
                    content = formatted_content
                else:
                    content = "[Empty content]"
            
            # Ensure role is valid for Gradio Chatbot
            if role not in ["user", "assistant"]:
                role = "assistant" if role == "system" else "user"
            
            # Invert roles for better display in human UI context
            # (what the AI says becomes "user", what human should respond becomes "assistant")
            if role == "user":
                role = "assistant"
            else:
                role = "user"
            
            # Add the main message if it has content
            if content and str(content).strip():
                formatted.append({"role": role, "content": content})
            
            # Handle tool calls - create separate messages for each tool call
            if tool_calls:
                for tool_call in tool_calls:
                    function_name = tool_call.get("function", {}).get("name", "unknown")
                    arguments_str = tool_call.get("function", {}).get("arguments", "{}")
                    
                    try:
                        # Parse arguments to format them nicely
                        arguments = json.loads(arguments_str)
                        formatted_args = json.dumps(arguments, indent=2)
                    except json.JSONDecodeError:
                        # If parsing fails, use the raw string
                        formatted_args = arguments_str
                    
                    # Create a formatted message for the tool call
                    tool_call_content = f"```json\n{formatted_args}\n```"
                    
                    formatted.append({
                        "role": role,
                        "content": tool_call_content,
                        "metadata": {"title": f"🛠️ Used {function_name}"}
                    })
        
        return formatted
    
    def get_pending_calls(self) -> List[Dict[str, Any]]:
        """Get pending calls from the server."""
        try:
            response = requests.get(f"{self.server_url}/pending", timeout=5)
            if response.status_code == 200:
                return response.json().get("pending_calls", [])
        except Exception as e:
            print(f"Error fetching pending calls: {e}")
        return []
    
    def complete_call_with_response(self, call_id: str, response: str) -> bool:
        """Complete a call with a text response."""
        try:
            response_data = {"response": response}
            response_obj = requests.post(
                f"{self.server_url}/complete/{call_id}",
                json=response_data,
                timeout=10
            )
            response_obj.raise_for_status()
            return True
        except requests.RequestException as e:
            print(f"Error completing call: {e}")
            return False
    
    def complete_call_with_tool_calls(self, call_id: str, tool_calls: List[Dict[str, Any]]) -> bool:
        """Complete a call with tool calls."""
        try:
            response_data = {"tool_calls": tool_calls}
            response_obj = requests.post(
                f"{self.server_url}/complete/{call_id}",
                json=response_data,
                timeout=10
            )
            response_obj.raise_for_status()
            return True
        except requests.RequestException as e:
            print(f"Error completing call: {e}")
            return False
    
    def complete_call(self, call_id: str, response: Optional[str] = None, tool_calls: Optional[List[Dict[str, Any]]] = None) -> bool:
        """Complete a call with either a response or tool calls."""
        try:
            response_data = {}
            if response:
                response_data["response"] = response
            if tool_calls:
                response_data["tool_calls"] = tool_calls
            
            response_obj = requests.post(
                f"{self.server_url}/complete/{call_id}",
                json=response_data,
                timeout=10
            )
            response_obj.raise_for_status()
            return True
        except requests.RequestException as e:
            print(f"Error completing call: {e}")
            return False
    
    def get_last_image_from_messages(self, messages: List[Dict[str, Any]]) -> Optional[Any]:
        """Extract the last image from the messages for display above conversation."""
        last_image = None
        
        for msg in reversed(messages):  # Start from the last message
            content = msg.get("content", "")
            
            if isinstance(content, list):
                for item in reversed(content):  # Get the last image in the message
                    if item.get("type") == "image_url":
                        image_url = item.get("image_url", {}).get("url", "")
                        if image_url:
                            if image_url.startswith("data:image"):
                                # For base64 images, create a gr.Image component
                                try:
                                    header, data = image_url.split(",", 1)
                                    image_data = base64.b64decode(data)
                                    image = Image.open(io.BytesIO(image_data))
                                    return image
                                except Exception as e:
                                    print(f"Error loading image: {e}")
                                    continue
                            else:
                                # For URL images, return the URL
                                return image_url
        
        return last_image
    
    def refresh_pending_calls(self):
        """Refresh the list of pending calls."""
        pending_calls = self.get_pending_calls()
        
        if not pending_calls:
            return (
                gr.update(choices=["latest"], value="latest"),  # dropdown
                gr.update(value=None),  # image (no image)
                gr.update(value=[]),  # chatbot (empty messages)
                gr.update(interactive=False),  # submit button
                gr.update(visible=False),  # click_actions_group hidden
                gr.update(visible=False),  # actions_group hidden
            )
        
        # Sort pending calls by created_at to get oldest first
        sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", ""))
        
        # Create choices for dropdown
        choices = [("latest", "latest")]  # Add "latest" option first
        
        for call in sorted_calls:
            call_id = call["id"]
            model = call.get("model", "unknown")
            created_at = call.get("created_at", "")
            # Format timestamp
            try:
                dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
                time_str = dt.strftime("%H:%M:%S")
            except:
                time_str = created_at
            
            choice_label = f"{call_id[:8]}... ({model}) - {time_str}"
            choices.append((choice_label, call_id))
        
        # Default to "latest" which shows the oldest pending conversation
        selected_call_id = "latest"
        if selected_call_id == "latest" and sorted_calls:
            # Use the oldest call (first in sorted list)
            selected_call = sorted_calls[0]
            conversation = self.format_messages_for_chatbot(selected_call.get("messages", []))
            self.current_call_id = selected_call["id"]
            # Get the last image from messages
            self.last_image = self.get_last_image_from_messages(selected_call.get("messages", []))
        else:
            conversation = []
            self.current_call_id = None
            self.last_image = None
        
        return (
            gr.update(choices=choices, value="latest"),
            gr.update(value=self.last_image),
            gr.update(value=conversation),
            gr.update(interactive=bool(choices)),
            gr.update(visible=True),  # click_actions_group visible when there is a call
            gr.update(visible=True),  # actions_group visible when there is a call
        )
    
    def on_call_selected(self, selected_choice):
        """Handle when a call is selected from the dropdown."""
        if not selected_choice:
            return (
                gr.update(value=None),  # no image
                gr.update(value=[]),  # empty chatbot
                gr.update(interactive=False),
                gr.update(visible=False),  # click_actions_group hidden
                gr.update(visible=False),  # actions_group hidden
            )
        
        pending_calls = self.get_pending_calls()
        if not pending_calls:
            return (
                gr.update(value=None),  # no image
                gr.update(value=[]),  # empty chatbot
                gr.update(interactive=False),
                gr.update(visible=False),  # click_actions_group hidden
                gr.update(visible=False),  # actions_group hidden
            )
        
        # Handle "latest" option
        if selected_choice == "latest":
            # Sort calls by created_at to get oldest first
            sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", ""))
            selected_call = sorted_calls[0]  # Get the oldest call
            call_id = selected_call["id"]
        else:
            # Extract call_id from the choice for specific calls
            call_id = None
            for call in pending_calls:
                call_id_short = call["id"][:8]
                if call_id_short in selected_choice:
                    call_id = call["id"]
                    break
            
            if not call_id:
                return (
                    gr.update(value=None),  # no image
                    gr.update(value=[]),  # empty chatbot
                    gr.update(interactive=False)
                )
            
            # Find the selected call
            selected_call = next((c for c in pending_calls if c["id"] == call_id), None)
        
        if not selected_call:
            return (
                gr.update(value=None),  # no image
                gr.update(value=[]),  # empty chatbot
                gr.update(interactive=False),
                gr.update(visible=False),  # click_actions_group hidden
                gr.update(visible=False),  # actions_group hidden
            )
        
        conversation = self.format_messages_for_chatbot(selected_call.get("messages", []))
        self.current_call_id = call_id
        # Get the last image from messages
        self.last_image = self.get_last_image_from_messages(selected_call.get("messages", []))
        
        return (
            gr.update(value=self.last_image),
            gr.update(value=conversation),
            gr.update(interactive=True),
            gr.update(visible=True),  # click_actions_group visible
            gr.update(visible=True),  # actions_group visible
        )
    
    def submit_response(self, response_text: str):
        """Submit a text response to the current call."""
        if not self.current_call_id:
            return (
                gr.update(value=response_text),  # keep response text
                gr.update(value="❌ No call selected")  # status
            )
        
        if not response_text.strip():
            return (
                gr.update(value=response_text),  # keep response text
                gr.update(value="❌ Response cannot be empty")  # status
            )
        
        success = self.complete_call_with_response(self.current_call_id, response_text)
        
        if success:
            status_msg = "✅ Response submitted successfully!"
            return (
                gr.update(value=""),  # clear response text
                gr.update(value=status_msg)  # status
            )
        else:
            return (
                gr.update(value=response_text),  # keep response text
                gr.update(value="❌ Failed to submit response")  # status
            )
    
    def submit_action(self, action_type: str, **kwargs) -> str:
        """Submit a computer action as a tool call."""
        if not self.current_call_id:
            return "❌ No call selected"
        
        import uuid
        
        # Create tool call structure
        action_data = {"type": action_type, **kwargs}
        tool_call = {
            "id": f"call_{uuid.uuid4().hex[:24]}",
            "type": "function",
            "function": {
                "name": "computer",
                "arguments": json.dumps(action_data)
            }
        }
        
        success = self.complete_call_with_tool_calls(self.current_call_id, [tool_call])
        
        if success:
            return f"✅ {action_type.capitalize()} action submitted as tool call"
        else:
            return f"❌ Failed to submit {action_type} action"
    
    def submit_click_action(self, x: int, y: int, action_type: str = "click", button: str = "left") -> str:
        """Submit a coordinate-based action."""
        if action_type == "click":
            return self.submit_action(action_type, x=x, y=y, button=button)
        else:
            return self.submit_action(action_type, x=x, y=y)
    
    def submit_type_action(self, text: str) -> str:
        """Submit a type action."""
        return self.submit_action("type", text=text)
    
    def submit_hotkey_action(self, keys: str) -> str:
        """Submit a hotkey action."""
        return self.submit_action("keypress", keys=keys)
    
    def submit_wait_action(self) -> str:
        """Submit a wait action with no kwargs."""
        return self.submit_action("wait")
    
    def submit_description_click(self, description: str, action_type: str = "click", button: str = "left") -> str:
        """Submit a description-based action."""
        if action_type == "click":
            return self.submit_action(action_type, element_description=description, button=button)
        else:
            return self.submit_action(action_type, element_description=description)
    
    def wait_for_pending_calls(self, max_seconds: float = 10.0, check_interval: float = 0.2):
        """Wait for pending calls to appear or until max_seconds elapsed.
        
        This method loops and checks for pending calls at regular intervals,
        returning as soon as a pending call is found or the maximum wait time is reached.
        
        Args:
            max_seconds: Maximum number of seconds to wait
            check_interval: How often to check for pending calls (in seconds)
        """
        import time
        
        start_time = time.time()
        
        while time.time() - start_time < max_seconds:
            # Check if there are any pending calls
            pending_calls = self.get_pending_calls()
            if pending_calls:
                # Found pending calls, return immediately
                return self.refresh_pending_calls()
            
            # Wait before checking again
            time.sleep(check_interval)
        
        # Max wait time reached, return current state
        return self.refresh_pending_calls()


def create_ui():
    """Create the Gradio interface."""
    ui_handler = HumanCompletionUI()
    
    with gr.Blocks(title="Human-in-the-Loop Agent Tool", fill_width=True) as demo:
        gr.Markdown("# 🤖 Human-in-the-Loop Agent Tool")
        gr.Markdown("Review AI conversation requests and provide human responses.")
        
        with gr.Row():
            with gr.Column(scale=2):
                with gr.Group():
                    screenshot_image = gr.Image(
                        label="Interactive Screenshot",
                        interactive=False,
                        height=600
                    )
                    
                    # Action type selection for image clicks (wrapped for visibility control)
                    with gr.Group(visible=False) as click_actions_group:
                        with gr.Row():
                            action_type_radio = gr.Dropdown(
                                label="Interactive Action",
                                choices=["click", "double_click", "move", "left_mouse_up", "left_mouse_down", "scroll"],
                                value="click",
                                scale=2
                            )
                            action_button_radio = gr.Dropdown(
                                label="Button",
                                choices=["left", "right", "wheel", "back", "forward"],
                                value="left",
                                visible=True,
                                scale=1
                            )
                            scroll_x_input = gr.Number(
                                label="scroll_x",
                                value=0,
                                visible=False,
                                scale=1
                            )
                            scroll_y_input = gr.Number(
                                label="scroll_y",
                                value=-120,
                                visible=False,
                                scale=1
                            )
                    
                    conversation_chatbot = gr.Chatbot(
                        label="Conversation",
                        type="messages",
                        height=500,
                        show_copy_button=True
                    )
            
            with gr.Column(scale=1):
                with gr.Group():
                    call_dropdown = gr.Dropdown(
                        label="Select a pending conversation request",
                        choices=["latest"],
                        interactive=True,
                        value="latest"
                    )
                    refresh_btn = gr.Button("🔄 Refresh", variant="secondary")
                    status_display = gr.Textbox(
                        label="Status",
                        interactive=False,
                        value="Ready to receive requests..."
                    )

                with gr.Group():
                    response_text = gr.Textbox(
                        label="Message",
                        lines=3,
                        placeholder="Enter your message here..."
                    )
                    submit_btn = gr.Button("📤 Submit Message", variant="primary", interactive=False)
                
                # Action Accordions (wrapped for visibility control)
                with gr.Group(visible=False) as actions_group:
                    with gr.Tabs():
                        with gr.Tab("🖱️ Click Actions"):
                            with gr.Group():
                                description_text = gr.Textbox(
                                    label="Element Description",
                                    placeholder="e.g., 'Privacy and security option in left sidebar'"
                                )
                                with gr.Row():
                                    description_action_type = gr.Dropdown(
                                        label="Action",
                                        choices=["click", "double_click", "move", "left_mouse_up", "left_mouse_down"],
                                        value="click"
                                    )
                                    description_button = gr.Dropdown(
                                        label="Button",
                                        choices=["left", "right", "wheel", "back", "forward"],
                                        value="left"
                                    )
                                description_submit_btn = gr.Button("Submit Click Action")
                        
                        with gr.Tab("📝 Type Action"):
                            with gr.Group():
                                type_text = gr.Textbox(
                                    label="Text to Type",
                                    placeholder="Enter text to type..."
                                )
                                type_submit_btn = gr.Button("Submit Type")
                        
                        with gr.Tab("⌨️ Keypress Action"):
                            with gr.Group():
                                keypress_text = gr.Textbox(
                                    label="Keys",
                                    placeholder="e.g., ctrl+c, alt+tab"
                                )
                                keypress_submit_btn = gr.Button("Submit Keypress")
                        
                        with gr.Tab("🧰 Misc Actions"):
                            with gr.Group():
                                misc_action_dropdown = gr.Dropdown(
                                    label="Action",
                                    choices=["wait"],
                                    value="wait"
                                )
                                misc_submit_btn = gr.Button("Submit Action")
        
        # Event handlers
        refresh_btn.click(
            fn=ui_handler.refresh_pending_calls,
            outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group]
        )
        
        call_dropdown.change(
            fn=ui_handler.on_call_selected,
            inputs=[call_dropdown],
            outputs=[screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group]
        )
        
        def handle_image_click(evt: gr.SelectData):
            if evt.index is not None:
                x, y = evt.index
                action_type = ui_handler.current_action_type or "click"
                button = ui_handler.current_button or "left"
                if action_type == "scroll":
                    sx_i = int(ui_handler.current_scroll_x or 0)
                    sy_i = int(ui_handler.current_scroll_y or 0)
                    # Submit a scroll action with x,y position and scroll deltas
                    result = ui_handler.submit_action("scroll", x=x, y=y, scroll_x=sx_i, scroll_y=sy_i)
                else:
                    result = ui_handler.submit_click_action(x, y, action_type, button)
                ui_handler.wait_for_pending_calls()
                return result
            return "No coordinates selected"

        screenshot_image.select(
            fn=handle_image_click,
            outputs=[status_display]
        ).then(
            fn=ui_handler.wait_for_pending_calls,
            outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group]
        )

        # Response submission
        submit_btn.click(
            fn=ui_handler.submit_response,
            inputs=[response_text],
            outputs=[response_text, status_display]
        ).then(
            fn=ui_handler.refresh_pending_calls,
            outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group]
        )
        
        # Toggle visibility of controls based on action type
        def toggle_action_controls(action_type):
            # Button visible only for click
            button_vis = gr.update(visible=(action_type == "click"))
            # Scroll inputs visible only for scroll
            scroll_x_vis = gr.update(visible=(action_type == "scroll"))
            scroll_y_vis = gr.update(visible=(action_type == "scroll"))
            # Update state
            ui_handler.current_action_type = action_type or "click"
            return button_vis, scroll_x_vis, scroll_y_vis
        
        action_type_radio.change(
            fn=toggle_action_controls,
            inputs=[action_type_radio],
            outputs=[action_button_radio, scroll_x_input, scroll_y_input]
        )

        # Keep other control values in ui_handler state
        def on_button_change(val):
            ui_handler.current_button = (val or "left")
        action_button_radio.change(
            fn=on_button_change,
            inputs=[action_button_radio]
        )

        def on_scroll_x_change(val):
            try:
                ui_handler.current_scroll_x = int(val) if val is not None else 0
            except Exception:
                ui_handler.current_scroll_x = 0
        scroll_x_input.change(
            fn=on_scroll_x_change,
            inputs=[scroll_x_input]
        )

        def on_scroll_y_change(val):
            try:
                ui_handler.current_scroll_y = int(val) if val is not None else 0
            except Exception:
                ui_handler.current_scroll_y = 0
        scroll_y_input.change(
            fn=on_scroll_y_change,
            inputs=[scroll_y_input]
        )
        
        type_submit_btn.click(
            fn=ui_handler.submit_type_action,
            inputs=[type_text],
            outputs=[status_display]
        ).then(
            fn=ui_handler.wait_for_pending_calls,
            outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group]
        )
        
        keypress_submit_btn.click(
            fn=ui_handler.submit_hotkey_action,
            inputs=[keypress_text],
            outputs=[status_display]
        ).then(
            fn=ui_handler.wait_for_pending_calls,
            outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group]
        )
        
        def handle_description_submit(description, action_type, button):
            if description:
                result = ui_handler.submit_description_click(description, action_type, button)
                ui_handler.wait_for_pending_calls()
                return result
            return "Please enter a description"

        description_submit_btn.click(
            fn=handle_description_submit,
            inputs=[description_text, description_action_type, description_button],
            outputs=[status_display]
        ).then(
            fn=ui_handler.wait_for_pending_calls,
            outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group]
        )
        
        # Misc action handler
        def handle_misc_submit(selected_action):
            if selected_action == "wait":
                result = ui_handler.submit_wait_action()
                ui_handler.wait_for_pending_calls()
                return result
            return f"Unsupported misc action: {selected_action}"

        misc_submit_btn.click(
            fn=handle_misc_submit,
            inputs=[misc_action_dropdown],
            outputs=[status_display]
        ).then(
            fn=ui_handler.wait_for_pending_calls,
            outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group]
        )
        
        # Load initial data
        demo.load(
            fn=ui_handler.refresh_pending_calls,
            outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn, click_actions_group, actions_group]
        )
    
    return demo


if __name__ == "__main__":
    demo = create_ui()
    demo.queue()
    demo.launch(server_name="0.0.0.0", server_port=7860)

```
Page 12/16FirstPrevNextLast