#
tokens: 49204/50000 3/616 files (page 19/20)
lines: off (toggle) GitHub
raw markdown copy
This is page 19 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── bump-version.yml
│       ├── ci-lume.yml
│       ├── docker-publish-cua-linux.yml
│       ├── docker-publish-cua-windows.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── link-check.yml
│       ├── lint.yml
│       ├── npm-publish-cli.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       ├── python-tests.yml
│       ├── test-cua-models.yml
│       └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│   ├── docs.code-workspace
│   ├── extensions.json
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── cloud-windows-ga-macos-preview.md
│   ├── composite-agents.md
│   ├── computer-use-agents-for-growth-hacking.md
│   ├── cua-hackathon.md
│   ├── cua-playground-preview.md
│   ├── cua-vlm-router.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cli.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── neurips-2025-cua-papers.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .env.example
│   ├── .gitignore
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── observability.mdx
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── cua-vlm-router.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   ├── telemetry.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── cli-playbook
│   │       │   ├── commands.mdx
│   │       │   ├── index.mdx
│   │       │   └── meta.json
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── meta.json
│   │       │   ├── sandboxed-python.mdx
│   │       │   └── tracing-api.mdx
│   │       ├── example-usecases
│   │       │   ├── form-filling.mdx
│   │       │   ├── gemini-complex-ui-navigation.mdx
│   │       │   ├── meta.json
│   │       │   ├── post-event-contact-export.mdx
│   │       │   └── windows-app-behind-vpn.mdx
│   │       ├── get-started
│   │       │   ├── meta.json
│   │       │   └── quickstart.mdx
│   │       ├── index.mdx
│   │       ├── macos-vm-cli-playbook
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   └── meta.json
│   │       └── meta.json
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── bg-dark.jpg
│   │       ├── bg-light.jpg
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── grounding-with-gemini3.gif
│   │       ├── hero.png
│   │       ├── laminar_trace_example.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   ├── posthog
│   │   │   │   │   └── [...path]
│   │   │   │   │       └── route.ts
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   ├── llms.txt
│   │   │   │   └── route.ts
│   │   │   ├── robots.ts
│   │   │   └── sitemap.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── analytics-tracker.tsx
│   │   │   ├── cookie-consent.tsx
│   │   │   ├── doc-actions-menu.tsx
│   │   │   ├── editable-code-block.tsx
│   │   │   ├── footer.tsx
│   │   │   ├── hero.tsx
│   │   │   ├── iou.tsx
│   │   │   ├── mermaid.tsx
│   │   │   └── page-feedback.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   ├── mdx-components.tsx
│   │   └── providers
│   │       └── posthog-provider.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── browser_tool_example.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── tracing_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── cua_adapter.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gelato.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── generic_vlm.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   ├── uiins.py
│   │   │   │   │   ├── uitars.py
│   │   │   │   │   └── uitars2.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── tools
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── browser_tool.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer_agent.py
│   │   ├── bench-ui
│   │   │   ├── bench_ui
│   │   │   │   ├── __init__.py
│   │   │   │   ├── api.py
│   │   │   │   └── child.py
│   │   │   ├── examples
│   │   │   │   ├── folder_example.py
│   │   │   │   ├── gui
│   │   │   │   │   ├── index.html
│   │   │   │   │   ├── logo.svg
│   │   │   │   │   └── styles.css
│   │   │   │   ├── output_overlay.png
│   │   │   │   └── simple_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       └── test_port_detection.py
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── tracing_wrapper.py
│   │   │   │   ├── tracing.py
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer.py
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── browser.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   ├── utils
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── wallpaper.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   ├── test_connection.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_server.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_telemetry.py
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── build-extension.py
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── desktop-extension
│   │   │   │   ├── cua-extension.mcpb
│   │   │   │   ├── desktop_extension.png
│   │   │   │   ├── manifest.json
│   │   │   │   ├── README.md
│   │   │   │   ├── requirements.txt
│   │   │   │   ├── run_server.sh
│   │   │   │   └── setup.py
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── QUICK_TEST_COMMANDS.sh
│   │   │   ├── quick_test_local_option.py
│   │   │   ├── README.md
│   │   │   ├── scripts
│   │   │   │   ├── install_mcp_server.sh
│   │   │   │   └── start_mcp_server.sh
│   │   │   ├── test_mcp_server_local_option.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_mcp_server.py
│   │   ├── pylume
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_pylume.py
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           ├── conftest.py
│   │           └── test_omniparser.py
│   ├── qemu-docker
│   │   ├── linux
│   │   │   ├── Dockerfile
│   │   │   ├── README.md
│   │   │   └── src
│   │   │       ├── entry.sh
│   │   │       └── vm
│   │   │           ├── image
│   │   │           │   └── README.md
│   │   │           └── setup
│   │   │               ├── install.sh
│   │   │               ├── setup-cua-server.sh
│   │   │               └── setup.sh
│   │   ├── README.md
│   │   └── windows
│   │       ├── Dockerfile
│   │       ├── README.md
│   │       └── src
│   │           ├── entry.sh
│   │           └── vm
│   │               ├── image
│   │               │   └── README.md
│   │               └── setup
│   │                   ├── install.bat
│   │                   ├── on-logon.ps1
│   │                   ├── setup-cua-server.ps1
│   │                   ├── setup-utils.psm1
│   │                   └── setup.ps1
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── cua-cli
│   │   │   ├── .gitignore
│   │   │   ├── .prettierrc
│   │   │   ├── bun.lock
│   │   │   ├── CLAUDE.md
│   │   │   ├── index.ts
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── auth.ts
│   │   │   │   ├── cli.ts
│   │   │   │   ├── commands
│   │   │   │   │   ├── auth.ts
│   │   │   │   │   └── sandbox.ts
│   │   │   │   ├── config.ts
│   │   │   │   ├── http.ts
│   │   │   │   ├── storage.ts
│   │   │   │   └── util.ts
│   │   │   └── tsconfig.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Development.md
│       ├── Dockerfile
│       ├── Dockerfile.dev
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── install-cli.ps1
│   ├── install-cli.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   ├── run-docker-dev.sh
│   └── typescript-typecheck.js
├── TESTING.md
├── tests
│   ├── agent_loop_testing
│   │   ├── agent_test.py
│   │   └── README.md
│   ├── pytest.ini
│   ├── shell_cmd.py
│   ├── test_files.py
│   ├── test_mcp_server_session_management.py
│   ├── test_mcp_server_streaming.py
│   ├── test_shell_bash.py
│   ├── test_telemetry.py
│   ├── test_tracing.py
│   ├── test_venv.py
│   └── test_watchdog.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/libs/python/computer/computer/ui/gradio/app.py:
--------------------------------------------------------------------------------

```python
"""
Advanced Gradio UI for Computer Interface

This is a Gradio interface for the Computer Interface
"""

import asyncio
import base64
import glob
import hashlib
import io
import json
import os
import random
import random as rand
import uuid
from datetime import datetime

import datasets
import gradio as gr
import pandas as pd
from computer import Computer, VMProviderType
from datasets import Dataset, Features, Sequence, concatenate_datasets
from gradio.components import ChatMessage
from huggingface_hub import DatasetCard, DatasetCardData
from PIL import Image

# Task examples as dictionaries with task string and setup function
TASK_EXAMPLES = [
    {
        "task": "Open the shopping list on my desktop and add all the items to a Doordash cart",
        "setup": lambda computer: create_shopping_list_file(computer),
    },
    {
        "task": "Do a random miniwob++ task, output the task name in <task> </task> tags and your reward in <reward> </reward> tags"
    },
]


# Generate random shopping list and save to desktop using computer interface
async def create_shopping_list_file(computer):
    items = [
        "Milk",
        "Eggs",
        "Bread",
        "Apples",
        "Bananas",
        "Chicken",
        "Rice",
        "Cereal",
        "Coffee",
        "Cheese",
        "Pasta",
        "Tomatoes",
        "Potatoes",
        "Onions",
        "Carrots",
        "Ice Cream",
        "Yogurt",
        "Cookies",
    ]

    # Select 1-5 random items
    num_items = rand.randint(1, 5)
    selected_items = rand.sample(items, num_items)

    # Create shopping list content
    content = "SHOPPING LIST:\n\n"
    for item in selected_items:
        content += f"- {item}\n"

    # Create a temporary file with the content
    temp_file_path = "/tmp/shopping_list.txt"

    # Use run_command to create the file on the desktop
    desktop_path = "~/Desktop"
    file_path = f"{desktop_path}/shopping_list.txt"

    # Create the file using echo command
    cmd = f"echo '{content}' > {file_path}"
    stdout, stderr = await computer.interface.run_command(cmd)

    print(f"Created shopping list at {file_path} with {num_items} items")
    if stderr:
        print(f"Error: {stderr}")

    return file_path


import typing

# Load valid keys from the Key enum in models.py
from computer.interface.models import Key

VALID_KEYS = [key.value for key in Key] + [
    "a",
    "b",
    "c",
    "d",
    "e",
    "f",
    "g",
    "h",
    "i",
    "j",
    "k",
    "l",
    "m",
    "n",
    "o",
    "p",
    "q",
    "r",
    "s",
    "t",
    "u",
    "v",
    "w",
    "x",
    "y",
    "z",
    "0",
    "1",
    "2",
    "3",
    "4",
    "5",
    "6",
    "7",
    "8",
    "9",
]
VALID_KEYS = list(dict.fromkeys(VALID_KEYS))  # remove duplicates, preserve order

# List of random words for demo naming
RANDOM_WORDS = [
    "apple",
    "banana",
    "cherry",
    "dolphin",
    "elephant",
    "forest",
    "giraffe",
    "harmony",
    "igloo",
    "jungle",
    "kangaroo",
    "lemon",
    "mountain",
    "notebook",
    "ocean",
    "penguin",
    "quasar",
    "rainbow",
    "ohana",
    "sunflower",
    "tiger",
    "umbrella",
    "volcano",
    "waterfall",
    "xylophone",
    "yellow",
    "zebra",
]


# Generate a random demo name with 3 words
def generate_random_demo_name():
    return " ".join(random.sample(RANDOM_WORDS, 3))


# Global session ID for tracking this run
session_id = str(uuid.uuid4())

# Global computer instance, tool call logs, memory, and chatbot messages
computer = None
tool_call_logs = []
memory = ""
last_action = {"name": "", "action": "", "arguments": {}}
last_screenshot = None  # Store the most recent screenshot
last_screenshot_before = None  # Store the most [-2]th recent screenshot
screenshot_images = []  # Array to store all screenshot images

# Define a constant for the output directory
OUTPUT_DIR = "examples/output"
SESSION_DIR = os.path.join(OUTPUT_DIR, "sessions")


def load_all_sessions(with_images=False):
    """Load and concatenate all session datasets into a single Dataset"""
    try:
        # Get all session folders
        if not os.path.exists(SESSION_DIR):
            return None

        session_folders = glob.glob(os.path.join(SESSION_DIR, "*"))
        if not session_folders:
            return None

        # Load each dataset and concatenate
        all_datasets = []
        for folder in session_folders:
            try:
                ds = Dataset.load_from_disk(folder)
                if not with_images:
                    ds = ds.remove_columns("images")

                # Add folder name to identify the source
                folder_name = os.path.basename(folder)

                # Process the messages from tool_call_logs
                def process_messages(example):
                    messages_text = []
                    current_role = None

                    # Process the logs if they exist in the example
                    if "tool_calls" in example:
                        # Use the existing get_chatbot_messages function with explicit logs parameter
                        formatted_msgs = get_chatbot_messages(
                            logs=json.loads(example["tool_calls"])
                        )

                        # Process each ChatMessage and extract either title or content
                        for msg in formatted_msgs:
                            # Check if role has changed
                            if msg.role != current_role:
                                # Add a line with the new role if it changed
                                if current_role is not None:  # Skip for the first message
                                    messages_text.append(
                                        ""
                                    )  # Add an empty line between role changes
                                messages_text.append(f"{msg.role}")
                                current_role = msg.role

                            # Add the message content
                            if msg.metadata and "title" in msg.metadata:
                                # Use the title if available
                                messages_text.append(msg.metadata["title"])
                            else:
                                # Use just the content without role prefix since we're adding role headers
                                messages_text.append(msg.content)

                    # Join all messages with newlines
                    all_messages = "\n".join(messages_text)

                    return {
                        **example,
                        "source_folder": folder_name,
                        "messages": all_messages,
                    }

                # Apply the processing to each example
                ds = ds.map(process_messages)
                all_datasets.append(ds)
            except Exception as e:
                print(f"Error loading dataset from {folder}: {str(e)}")

        if not all_datasets:
            return None

        # Concatenate all datasets
        return concatenate_datasets(all_datasets)
    except Exception as e:
        print(f"Error loading sessions: {str(e)}")
        return None


def get_existing_tags():
    """Extract all existing tags from saved demonstrations"""
    all_sessions = load_all_sessions()
    if all_sessions is None:
        return [], []

    # Convert to pandas and extract tags
    df = all_sessions.to_pandas()

    if "tags" not in df.columns:
        return []

    # Extract all tags and flatten the list
    all_tags = []
    for tags in df["tags"].dropna():
        all_tags += list(tags)

    # Remove duplicates and sort
    unique_tags = sorted(list(set(all_tags)))
    return unique_tags, unique_tags


def get_sessions_data():
    """Load all sessions dataset"""

    combined_ds = load_all_sessions()
    if combined_ds:
        # Convert to pandas and select columns
        df = combined_ds.to_pandas()
        columns = ["name", "messages", "source_folder"]
        if "tags" in df.columns:
            columns.append("tags")
        return df[columns]
    else:
        return pd.DataFrame({"name": [""], "messages": [""], "source_folder": [""]})


def upload_to_huggingface(dataset_name, visibility, filter_tags=None):
    """Upload sessions to HuggingFace Datasets Hub, optionally filtered by tags

    Args:
        dataset_name: Name of the dataset on HuggingFace (format: username/dataset-name)
        visibility: 'public' or 'private'
        filter_tags: List of tags to filter by (optional)

    Returns:
        Status message
    """
    try:
        # Check if HF_TOKEN is available
        hf_token = os.environ.get("HF_TOKEN")
        if not hf_token:
            return "Error: HF_TOKEN environment variable not found. Please set it before uploading."

        # Check if dataset name is in the correct format
        if not dataset_name or "/" not in dataset_name:
            return "Dataset name must be in the format 'username/dataset-name'"

        # Load all sessions
        combined_ds = load_all_sessions(with_images=True)
        if combined_ds is None or len(combined_ds) == 0:
            return "No sessions found to upload."

        # If tag filtering is provided, filter the datasets
        if filter_tags:
            # Convert to pandas to filter
            df = combined_ds.to_pandas()

            if "tags" not in df.columns:
                return "No sessions with tags found to filter."

            # Get list of source folders for sessions that have any of the selected tags
            matching_folders = []
            for _, row in df.iterrows():
                if not len(row.get("tags")):
                    continue
                if any(tag in list(row.get("tags", [])) for tag in filter_tags):
                    matching_folders.append(row["source_folder"])

            if not matching_folders:
                return "No sessions matched the selected tag filters."

            # Load only the matching datasets
            filtered_datasets = []
            for folder in matching_folders:
                folder_path = os.path.join(SESSION_DIR, folder)
                if os.path.exists(folder_path):
                    try:
                        ds = Dataset.load_from_disk(folder_path)
                        filtered_datasets.append(ds)
                    except Exception as e:
                        print(f"Error loading dataset from {folder}: {str(e)}")

            if not len(filtered_datasets):
                return "Error loading the filtered sessions."

            # Create a new combined dataset with just the filtered sessions
            upload_ds = concatenate_datasets(filtered_datasets)
            session_count = len(upload_ds)
        else:
            # Use all sessions
            upload_ds = combined_ds
            session_count = len(upload_ds)

        tags = ["cua"]
        if isinstance(filter_tags, list):
            tags += filter_tags

        # Push to HuggingFace
        upload_ds.push_to_hub(
            dataset_name,
            private=visibility == "private",
            token=hf_token,
            commit_message="(Built with github.com/trycua/cua)",
        )

        # Create dataset card
        card_data = DatasetCardData(
            language="en", license="mit", task_categories=["visual-question-answering"], tags=tags
        )
        card = DatasetCard.from_template(
            card_data=card_data,
            template_str="---\n{{ card_data }}\n---\n\n# Uploaded computer interface trajectories\n\nThese trajectories were generated and uploaded using [cua](https://github.com/trycua/cua)",
        )
        card.push_to_hub(dataset_name, commit_message="Cua dataset card")

        return f"Successfully uploaded {session_count} sessions to HuggingFace Datasets Hub at https://huggingface.co/datasets/{dataset_name}"

    except Exception as e:
        return f"Error uploading to HuggingFace: {str(e)}"


def save_demonstration(log_data, demo_name=None, demo_tags=None):
    """Save the current tool call logs as a demonstration file using HuggingFace datasets"""
    global tool_call_logs, session_id

    if not tool_call_logs:
        return "No data to save", None

    # Create output directories if they don't exist
    if not os.path.exists(OUTPUT_DIR):
        os.makedirs(OUTPUT_DIR)
    if not os.path.exists(SESSION_DIR):
        os.makedirs(SESSION_DIR)

    # Use default name if none provided
    if not demo_name or demo_name.strip() == "":
        demo_name = generate_random_demo_name()

    # Process tags
    tags = []
    if demo_tags:
        if isinstance(demo_tags, list):
            tags = demo_tags
        elif isinstance(demo_tags, str):
            # Split by comma if it's a comma-separated string
            tags = [tag.strip() for tag in demo_tags.split(",") if tag.strip()]

    log_time = datetime.now().isoformat()

    def msg_to_dict(msg: ChatMessage):
        return {"role": msg.role, "content": str(msg.content), "metadata": dict(msg.metadata)}

    # Create dataset
    demonstration_dataset = [
        {
            "timestamp": str(log_time),
            "session_id": str(session_id),
            "name": str(demo_name),
            "tool_calls": json.dumps(tool_call_logs),
            "messages": json.dumps(
                [msg_to_dict(msg) for msg in get_chatbot_messages(tool_call_logs)]
            ),
            "tags": list(tags),
            "images": [Image.open(io.BytesIO(img)) for img in screenshot_images],
        }
    ]

    try:
        # Create a new HuggingFace dataset from the current session
        new_session_ds = Dataset.from_list(
            demonstration_dataset,
            features=Features(
                {
                    "timestamp": datasets.Value("string"),
                    "session_id": datasets.Value("string"),
                    "name": datasets.Value("string"),
                    "tool_calls": datasets.Value("string"),
                    "messages": datasets.Value("string"),
                    "tags": Sequence(datasets.Value("string")),
                    "images": Sequence(datasets.Image()),
                }
            ),
        )

        # Create a unique folder name with demonstration name, session ID and timestamp
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        safe_name = demo_name.replace(" ", "_").replace("/", "_").replace("\\", "_")[:50]
        session_folder = os.path.join(SESSION_DIR, f"{safe_name}_{session_id}_{timestamp}")

        # Create the directory if it doesn't exist
        if not os.path.exists(session_folder):
            os.makedirs(session_folder)

        # Save the dataset to the unique folder
        new_session_ds.save_to_disk(session_folder)

        return f"Session saved to {session_folder}"
    except Exception as e:
        return f"Error saving demonstration: {str(e)}"


def log_tool_call(name, action, arguments, result=None):
    """Log a tool call with unique IDs and results"""
    global tool_call_logs

    # Create arguments JSON that includes the action
    args = {"action": action, **arguments}

    # Process result for logging
    processed_result = {}
    if result:
        for key, value in result.items():
            if key == "screenshot" and isinstance(value, bytes):
                # Add screenshot to the array and get its index
                screenshot_index = len(screenshot_images)
                screenshot_images.append(value)
                # Create hash of screenshot data that includes the index
                hash_value = hashlib.md5(value).hexdigest()
                processed_result[key] = f"<Screenshot: MD5 {hash_value}:{screenshot_index}>"
            elif key == "clipboard" and isinstance(value, str):
                processed_result[key] = value
            elif isinstance(value, bytes):
                # Create hash for any binary data
                hash_value = hashlib.md5(value).hexdigest()
                processed_result[key] = f"<Binary data: MD5 {hash_value}>"
            else:
                processed_result[key] = value

    # Create the tool call log entry
    log_entry = {
        "type": "function_call",
        "name": name,
        "arguments": json.dumps(args),
        "result": processed_result if result else None,
    }

    # Add to logs and immediately flush by printing
    tool_call_logs.append(log_entry)
    print(f"Tool call logged: {json.dumps(log_entry)}")

    return log_entry


async def execute(name, action, arguments):
    """Execute a tool call, log it, and return any results"""
    global computer, last_action, last_screenshot, last_screenshot_before

    last_screenshot_before = last_screenshot

    # Store last action for reasoning box
    last_action = {"name": name, "action": action, "arguments": arguments}

    results = {}

    # Execute the action based on name and action
    if name == "computer":
        if computer is None:
            return {}

        # Get the method from the computer interface
        if action == "initialize":
            # Already initialized, just log
            pass
        elif action == "wait":
            # Wait for 1 second
            await asyncio.sleep(1)
        elif action == "screenshot":
            pass
        elif action == "move_cursor":
            await computer.interface.move_cursor(arguments["x"], arguments["y"])
            await asyncio.sleep(0.2)
        elif action == "left_click":
            if "x" in arguments and "y" in arguments:
                await computer.interface.move_cursor(arguments["x"], arguments["y"])
            await computer.interface.left_click(arguments["x"], arguments["y"])
            await asyncio.sleep(0.5)
        elif action == "right_click":
            if "x" in arguments and "y" in arguments:
                await computer.interface.move_cursor(arguments["x"], arguments["y"])
            await computer.interface.right_click(arguments["x"], arguments["y"])
            await asyncio.sleep(0.5)
        elif action == "double_click":
            if "x" in arguments and "y" in arguments:
                await computer.interface.move_cursor(arguments["x"], arguments["y"])
            await computer.interface.double_click(arguments["x"], arguments["y"])
            await asyncio.sleep(0.5)
        elif action == "type_text":
            await computer.interface.type_text(arguments["text"])
            await asyncio.sleep(0.3)
            if "press_enter" in arguments and arguments["press_enter"]:
                await computer.interface.press_key("enter")
        elif action == "press_key":
            await computer.interface.press_key(arguments["key"])
            await asyncio.sleep(0.3)
        elif action == "scroll_up":
            await computer.interface.scroll_up(arguments["clicks"])
            await asyncio.sleep(0.3)
        elif action == "scroll_down":
            await computer.interface.scroll_down(arguments["clicks"])
            await asyncio.sleep(0.3)
        elif action == "send_hotkey":
            await computer.interface.hotkey(*arguments.get("keys", []))
            await asyncio.sleep(0.3)
        elif action == "copy_to_clipboard":
            results["clipboard"] = await computer.interface.copy_to_clipboard()
        elif action == "set_clipboard":
            await computer.interface.set_clipboard(arguments["text"])
        elif action == "run_command":
            stdout, stderr = await computer.interface.run_command(arguments["command"])
            results["stdout"] = stdout
            results["stderr"] = stderr
        elif action == "shutdown":
            await computer.stop()
        elif action == "done" or action == "fail":
            # Just a marker, doesn't do anything
            pass

        # Add a screenshot to the results for every action (if not already there)
        if action != "shutdown" and "screenshot" not in results:
            results["screenshot"] = await computer.interface.screenshot()
    elif name == "message":
        if action == "submit":
            # No action needed for message submission except logging
            # If requested, take a screenshot after message
            if arguments.get("screenshot_after", False) and computer is not None:
                results["screenshot"] = await computer.interface.screenshot()

    # Log the tool call with results
    log_tool_call(name, action, arguments, results)

    if "screenshot" in results:
        # Convert bytes to PIL Image
        screenshot_img = Image.open(io.BytesIO(results["screenshot"]))
        results["screenshot"] = screenshot_img
        # Update last_screenshot with the new screenshot
        last_screenshot = screenshot_img

    return results


async def handle_init_computer(
    os_choice: str, app_list=None, provider="lume", container_name=None, api_key=None
):
    """Initialize the computer instance and tools for macOS or Ubuntu or Windows

    Args:
        os_choice: The OS to use ("macOS" or "Ubuntu" or "Windows")
        app_list: Optional list of apps to focus on using the app-use experiment
        provider: The provider to use ("lume" or "self" or "cloud")
        container_name: The container name to use for cloud provider
        api_key: The API key to use for cloud provider
    """
    global computer, tool_call_logs, tools

    # Check if we should enable app-use experiment
    use_app_experiment = app_list and len(app_list) > 0
    experiments = ["app-use"] if use_app_experiment else None

    # Determine if we should use host computer server
    use_host_computer_server = provider == "self"

    if os_choice == "Ubuntu":
        os_type_str = "linux"
        image_str = "ubuntu-noble-vanilla:latest"
    elif os_choice == "Windows":
        os_type_str = "windows"
        image_str = "windows-11-vanilla:latest"
    else:
        os_type_str = "macos"
        image_str = "macos-sequoia-cua:latest"

    # Create computer instance with appropriate configuration
    if use_host_computer_server:
        computer = Computer(
            os_type=os_type_str, use_host_computer_server=True, experiments=experiments
        )
    elif provider == "cloud":
        # Use API key from environment variable or field input
        cloud_api_key = os.environ.get("CUA_API_KEY") or api_key
        computer = Computer(
            os_type=os_type_str,
            provider_type=VMProviderType.CLOUD,
            name=container_name,
            api_key=cloud_api_key,
            experiments=experiments,
        )
    elif provider == "winsandbox":
        computer = Computer(
            os_type="windows", provider_type=VMProviderType.WINSANDBOX, experiments=experiments
        )
    else:
        computer = Computer(
            image=image_str,
            os_type=os_type_str,
            provider_type=VMProviderType.LUME,
            display="1024x768",
            memory="8GB",
            cpu="4",
            experiments=experiments,
        )

    await computer.run()

    # If app list is provided, create desktop from apps
    if use_app_experiment:
        computer = computer.create_desktop_from_apps(app_list)

    # Log computer initialization as a tool call
    init_params = {"os": os_type_str, "provider": provider}

    # Add VM-specific parameters if not using host computer server
    if not use_host_computer_server:
        init_params.update({"image": image_str, "display": "1024x768", "memory": "8GB", "cpu": "4"})

    # Add app list to the log if provided
    if use_app_experiment:
        init_params["apps"] = app_list
        init_params["experiments"] = ["app-use"]

    # Add container name to the log if using cloud provider
    if provider == "cloud":
        init_params["container_name"] = container_name

    result = await execute("computer", "initialize", init_params)

    return result["screenshot"], json.dumps(tool_call_logs, indent=2)


async def handle_screenshot():
    """Take a screenshot and return it as a PIL Image"""
    global computer
    if computer is None:
        return None

    result = await execute("computer", "screenshot", {})
    return result["screenshot"]


async def handle_wait():
    """Wait for 1 second and then take a screenshot"""
    global computer
    if computer is None:
        return None

    # Execute wait action
    result = await execute("computer", "wait", {})
    return result["screenshot"], json.dumps(tool_call_logs, indent=2)


async def handle_click(evt: gr.SelectData, img, click_type):
    """Handle click events on the image based on click type"""
    global computer
    if computer is None:
        return img, json.dumps(tool_call_logs, indent=2)

    # Get the coordinates of the click
    x, y = evt.index

    # Move cursor and perform click
    result = await execute("computer", click_type, {"x": x, "y": y})

    # Take a new screenshot to show the result
    return result["screenshot"], json.dumps(tool_call_logs, indent=2)


async def handle_type(text, press_enter=False):
    """Type text into the computer"""
    global computer
    if computer is None or not text:
        return await handle_screenshot(), json.dumps(tool_call_logs, indent=2)

    result = await execute("computer", "type_text", {"text": text, "press_enter": press_enter})

    return result["screenshot"], json.dumps(tool_call_logs, indent=2)


async def handle_copy():
    """Copy selected content to clipboard and return it"""
    global computer
    if computer is None:
        return "Computer not initialized", json.dumps(tool_call_logs, indent=2)

    result = await execute("computer", "copy_to_clipboard", {})
    content = result.get("clipboard", "No content copied")

    return content, json.dumps(tool_call_logs, indent=2)


async def handle_set_clipboard(text):
    """Set clipboard content"""
    global computer
    if computer is None:
        return "Computer not initialized", json.dumps(tool_call_logs, indent=2)

    await execute("computer", "set_clipboard", {"text": text})

    return f"Clipboard set to: {text}", json.dumps(tool_call_logs, indent=2)


async def handle_run_command(command):
    """Run a shell command"""
    global computer
    if computer is None:
        return "Computer not initialized", json.dumps(tool_call_logs, indent=2)

    # Execute the run_command action and log it
    result = await execute("computer", "run_command", {"command": command})

    # Get the result from the computer interface
    stdout, stderr = result.get("stdout"), result.get("stderr")

    # Format the output
    output = ""
    if stdout:
        output += f"STDOUT:\n{stdout}\n"
    if stderr:
        output += f"STDERR:\n{stderr}\n"

    if not output:
        output = "(No output)"

    return output, json.dumps(tool_call_logs, indent=2)


async def handle_shutdown():
    """Shutdown the computer instance"""
    global computer
    if computer is None:
        return "Computer not initialized", json.dumps(tool_call_logs, indent=2)

    await execute("computer", "shutdown", {})

    computer = None
    return "Computer shut down", json.dumps(tool_call_logs, indent=2)


async def handle_memory(memory_text):
    """Update the global memory"""
    global memory
    await execute("memory", "update", {"memory_text": memory_text})
    memory = memory_text
    return "Memory updated"


async def update_reasoning(reasoning_text, is_erroneous=False):
    """Update the reasoning for the last action"""
    global last_action, tool_call_logs

    if not last_action["name"]:
        return "No action to update reasoning for"

    # Find the last log entry that matches the last action
    for log_entry in reversed(tool_call_logs):
        if (
            log_entry["name"] == last_action["name"]
            and json.loads(log_entry["arguments"]).get("action") == last_action["action"]
        ):
            # Add reasoning to the log entry
            log_entry["reasoning"] = reasoning_text
            # If marked as erroneous, set weight to 0
            log_entry["weight"] = 0 if is_erroneous else 1
            break

    return "Reasoning updated"


async def clear_log():
    """Clear the tool call logs"""
    global tool_call_logs, screenshot_images
    screenshot_images = []
    tool_call_logs = []
    return json.dumps(tool_call_logs, indent=2)


def get_last_action_display():
    """Format the last action for display in the reasoning box"""
    global last_action
    if not last_action["name"]:
        return "No actions performed yet"

    action_str = f"Tool: {last_action['name']}\nAction: {last_action['action']}"

    if last_action["arguments"]:
        args_str = "\nArguments:\n"
        for k, v in last_action["arguments"].items():
            args_str += f"  {k}: {v}\n"
        action_str += args_str

    return action_str


def get_memory():
    """Get the current memory"""
    global memory
    return memory


def get_chatbot_messages(logs=None):
    """Format chat messages for gr.Chatbot component

    Args:
        logs: Optional list of tool call logs. If None, uses global tool_call_logs.

    Returns:
        List of ChatMessage objects
    """
    formatted_messages = []

    # Use provided logs if specified, otherwise use global tool_call_logs
    logs_to_process = logs if logs is not None else tool_call_logs

    for tool_call in logs_to_process:
        if tool_call["type"] != "function_call":
            continue

        name = tool_call["name"]
        arguments = json.loads(tool_call["arguments"])

        role = (
            tool_call["role"]
            if "role" in tool_call
            else arguments["role"] if "role" in arguments else "assistant"
        )

        if "reasoning" in tool_call:
            formatted_messages += [
                ChatMessage(
                    role=role, content=tool_call["reasoning"], metadata={"title": "🧠 Reasoning"}
                )
            ]

        # Format tool calls with titles
        if name == "message":
            formatted_messages += [ChatMessage(role=role, content=arguments["text"])]
        else:
            # Format tool calls with a title
            action = arguments.get("action", "")

            # Define dictionary for title mappings
            title_mappings = {
                "wait": "⏳ Waiting...",
                "done": "✅ Task Completed",
                "fail": "❌ Task Failed",
                "memory.update": "🧠 Memory Updated",
                "screenshot": "📸 Taking Screenshot",
                "move_cursor": "🖱️ Moving Cursor",
                "left_click": "🖱️ Left Click",
                "right_click": "🖱️ Right Click",
                "double_click": "🖱️ Double Click",
                "type_text": "⌨️ Typing Text",
                "press_key": "⌨️ Pressing Key",
                "send_hotkey": "⌨️ Sending Hotkey",
                "copy_to_clipboard": "📋 Copying to Clipboard",
                "set_clipboard": "📋 Setting Clipboard",
                "run_command": "🖥️ Running Shell Command",
                "initialize": "🚀 Initializing Computer",
                "shutdown": "🛑 Shutting Down",
            }

            # Look up title based on name.action or just action
            key = f"{name}.{action}"
            if key in title_mappings:
                title = title_mappings[key]
            elif action in title_mappings:
                title = title_mappings[action]
            else:
                title = f"🛠️ {name.capitalize()}: {action}"

            # Always set status to done
            status = "done"

            # Format the response content
            content_parts = []

            # Add arguments
            if arguments:
                content_parts.append("**Arguments:**")
                for k, v in arguments.items():
                    if k != "action":  # Skip action as it's in the title
                        content_parts.append(f"- {k}: {v}")

            # Add results if available
            if tool_call.get("result"):
                content_parts.append("\n**Results:**")
                content_parts.append(f"```json\n{json.dumps(tool_call['result'], indent=4)}\n```")
                # for k, v in tool_call['result'].items():
                #     content_parts.append(f"- {k}: {v}")

            # Join all content parts
            content = "\n".join(content_parts)

            formatted_messages += [
                ChatMessage(
                    role="assistant", content=content, metadata={"title": title, "status": status}
                )
            ]

    return formatted_messages


async def submit_message(message_text, role, screenshot_after=False):
    """Submit a message with specified role (user or assistant)"""
    global last_screenshot

    # Log the message submission and get result (may include screenshot)
    result = await execute(
        "message",
        "submit",
        {"role": role, "text": message_text, "screenshot_after": screenshot_after},
    )

    # Update return values based on whether a screenshot was taken
    if screenshot_after and "screenshot" in result:
        return (
            f"Message submitted as {role} with screenshot",
            get_chatbot_messages(),
            json.dumps(tool_call_logs, indent=2),
            result["screenshot"],
        )
    else:
        # Return last screenshot if available
        return (
            f"Message submitted as {role}",
            get_chatbot_messages(),
            json.dumps(tool_call_logs, indent=2),
            last_screenshot,
        )


def create_gradio_ui():
    with gr.Blocks() as app:
        gr.Markdown("# Computer Interface Tool")

        with gr.Row():
            with gr.Column(scale=3):
                with gr.Group():
                    # Main screenshot display
                    img = gr.Image(
                        type="pil", label="Current Screenshot", show_label=False, interactive=False
                    )

                    # Click type selection
                    click_type = gr.Radio(
                        ["left_click", "right_click", "double_click", "move_cursor"],
                        label="Click Type",
                        value="left_click",
                    )

                    with gr.Row():
                        wait_btn = gr.Button("WAIT")
                        done_btn = gr.Button("DONE")
                        fail_btn = gr.Button("FAIL")

                # Tabbed logs: Tool logs, Conversational logs, and Demonstrations
                with gr.Tabs() as logs_tabs:
                    with gr.TabItem("Conversational Logs"):
                        chat_log = gr.Chatbot(
                            value=get_chatbot_messages,
                            label="Conversation",
                            elem_classes="chatbot",
                            height=400,
                            type="messages",
                            sanitize_html=True,
                            allow_tags=True,
                        )
                    with gr.TabItem("Function Logs"):
                        with gr.Group():
                            action_log = gr.JSON(label="Function Logs", every=0.2)
                            clear_log_btn = gr.Button("Clear Log")
                    with gr.TabItem("Save/Share Demonstrations"):
                        with gr.Row():
                            with gr.Column(scale=3):
                                # Dataset viewer - automatically loads sessions with selection column
                                dataset_viewer = gr.DataFrame(
                                    label="All Sessions",
                                    value=get_sessions_data,
                                    show_search="filter",
                                    max_height=300,
                                    interactive=True,  # Make it interactive for selection
                                )

                                # HuggingFace Upload UI
                                with gr.Group(visible=True):
                                    gr.Markdown("Upload Sessions to HuggingFace")
                                    with gr.Row():
                                        hf_dataset_name = gr.Textbox(
                                            label="HuggingFace Dataset Name",
                                            placeholder="username/dataset-name",
                                            info="Format: username/dataset-name",
                                        )
                                        hf_visibility = gr.Radio(
                                            choices=["public", "private"],
                                            label="Dataset Visibility",
                                            value="private",
                                        )

                                    # Tag filtering with a single multi-select dropdown
                                    filter_tags = gr.Dropdown(
                                        label="Filter by tags (optional)",
                                        choices=get_existing_tags()[0],
                                        multiselect=True,
                                        allow_custom_value=True,
                                        info="When tags are selected, only demonstrations with those tags will be uploaded. Leave empty to upload all sessions.",
                                    )

                                    # Function to update button text based on selected tags
                                    def get_upload_button_text(selected_tags=None):
                                        if not selected_tags:
                                            # Count all sessions
                                            session_folders = glob.glob(
                                                os.path.join(SESSION_DIR, "*")
                                            )
                                            count = len(session_folders) if session_folders else 0
                                            return f"Upload {count} Sessions to HuggingFace"
                                        else:
                                            # Count sessions with matching tags
                                            all_sessions = load_all_sessions()
                                            if all_sessions is None:
                                                return "Upload 0 Sessions to HuggingFace"

                                            df = all_sessions.to_pandas()
                                            if "tags" not in df.columns:
                                                return "Upload 0 Sessions to HuggingFace"

                                            # Filter by selected tags (sessions that have ANY of the selected tags)
                                            matching_count = 0
                                            for _, row in df.iterrows():
                                                tags = row.get("tags", [])
                                                if not len(tags):
                                                    continue

                                                # Check if any of the selected tags are in this session's tags
                                                if any(
                                                    tag in list(row["tags"])
                                                    for tag in selected_tags
                                                ):
                                                    matching_count += 1

                                            return (
                                                f"Upload {matching_count} Sessions to HuggingFace"
                                            )

                                    # Initial button text with all sessions
                                    hf_upload_btn = gr.Button(get_upload_button_text())

                                    # Update button text when filter changes
                                    def update_button_text(selected_tags):
                                        return get_upload_button_text(selected_tags)

                                    # Connect filter changes to update button text
                                    filter_tags.change(
                                        update_button_text,
                                        inputs=filter_tags,
                                        outputs=hf_upload_btn,
                                    )

                                    hf_upload_status = gr.Textbox(label="Upload Status", value="")
                            with gr.Column(scale=1):
                                # Demo name with random name button
                                with gr.Group():
                                    demo_name = gr.Textbox(
                                        label="Demonstration Name",
                                        value=generate_random_demo_name(),
                                        placeholder="Enter a name for this demonstration",
                                    )
                                    random_name_btn = gr.Button("🎲", scale=1)

                                    # Demo tags dropdown
                                    demo_tags = gr.Dropdown(
                                        label="Demonstration Tags",
                                        choices=get_existing_tags()[0],
                                        multiselect=True,
                                        allow_custom_value=True,
                                        info="Select existing tags or create new ones",
                                    )

                                    save_btn = gr.Button("Save Current Session")
                                save_status = gr.Textbox(label="Save Status", value="")

                                # Function to update the demo name with a new random name
                                def update_random_name():
                                    return generate_random_demo_name()

                                # Connect random name button
                                random_name_btn.click(update_random_name, outputs=[demo_name])

            with gr.Column(scale=1):
                with gr.Accordion("Memory / Scratchpad", open=False):
                    with gr.Group():
                        memory_display = gr.Textbox(
                            label="Current Memory", value=get_memory(), lines=5
                        )
                        with gr.Row():
                            memory_submit_btn = gr.Button("Submit Memory")
                            memory_refine_btn = gr.Button("Refine")
                    memory_status = gr.Textbox(label="Status", value="")

                with gr.Accordion("Tasks", open=True):
                    # Add current task display and controls
                    with gr.Group():
                        current_task = gr.Textbox(
                            label="Current Task", value=TASK_EXAMPLES[0]["task"], interactive=True
                        )
                        with gr.Row():
                            randomize_task_btn = gr.Button("🎲 Randomize Task")
                            run_setup_btn = gr.Button("⚙️ Run Task Setup")
                    # Setup status textbox
                    setup_status = gr.Textbox(label="Setup Status", value="")

                with gr.Group():
                    with gr.Accordion("Computer Configuration", open=False):
                        with gr.Row():
                            os_choice = gr.Radio(
                                label="OS",
                                choices=["macOS", "Ubuntu", "Windows"],
                                value="macOS",
                            )

                            # Provider selection radio
                            provider_choice = gr.Radio(
                                label="Provider",
                                choices=["lume", "self", "cloud", "winsandbox"],
                                value="lume",
                                info="'lume' uses a VM, 'self' uses the host computer server, 'cloud' uses a cloud container",
                            )

                        # Container name field for cloud provider (initially hidden)
                        container_name = gr.Textbox(
                            label="Container Name",
                            placeholder="Enter your container name",
                            visible=False,
                            info="Get your container from [cua.ai](https://cua.ai/)",
                        )

                        # Check if CUA_API_KEY is set in environment
                        has_cua_key = os.environ.get("CUA_API_KEY") is not None

                        # API key field for cloud provider (visible only if no env key and cloud selected)
                        api_key_field = gr.Textbox(
                            label="CUA API Key",
                            placeholder="Enter your CUA API key",
                            type="password",
                            visible=False,
                            info="Required for cloud provider. Set CUA_API_KEY environment variable to hide this field.",
                        )

                        # App filtering dropdown for app-use experiment
                        app_filter = gr.Dropdown(
                            label="Filter by apps (App-Use)",
                            multiselect=True,
                            allow_custom_value=True,
                            info="When apps are selected, the computer will focus on those apps using the app-use experiment",
                        )

                        # Function to show/hide container name and API key fields based on provider selection
                        def update_cloud_fields_visibility(provider):
                            show_container = provider == "cloud"
                            show_api_key = provider == "cloud" and not has_cua_key
                            return (
                                gr.update(visible=show_container),
                                gr.update(visible=show_api_key),
                            )

                        # Connect provider choice to field visibility
                        provider_choice.change(
                            update_cloud_fields_visibility,
                            inputs=provider_choice,
                            outputs=[container_name, api_key_field],
                        )

                    start_btn = gr.Button("Initialize Computer")

                with gr.Group():
                    input_text = gr.Textbox(label="Type Text")
                    with gr.Row():
                        press_enter_checkbox = gr.Checkbox(label="Press Enter", value=False)
                        submit_text_btn = gr.Button("Submit Text")
                        text_refine_btn = gr.Button("Refine")

                with gr.Group():
                    hotkey_keys = gr.Dropdown(
                        choices=VALID_KEYS,
                        label="Select Keys",
                        multiselect=True,
                        show_label=False,
                        allow_custom_value=True,
                        info="Select one or more keys to send as a hotkey",
                    )
                    hotkey_btn = gr.Button("Send Hotkey(s)")

                with gr.Accordion("Scrolling", open=False):
                    with gr.Group():
                        scroll_clicks = gr.Number(
                            label="Number of Clicks", value=1, minimum=1, step=1
                        )
                        with gr.Row():
                            scroll_up_btn = gr.Button("Scroll Up")
                            scroll_down_btn = gr.Button("Scroll Down")

                with gr.Accordion("Reasoning for Last Action", open=False):
                    with gr.Group():
                        last_action_display = gr.Textbox(
                            label="Last Action", value=get_last_action_display(), interactive=False
                        )
                        reasoning_text = gr.Textbox(
                            label="What was your thought process behind this action?",
                            placeholder="Enter your reasoning here...",
                            lines=3,
                        )
                        erroneous_checkbox = gr.Checkbox(
                            label="Mark this action as erroneous (sets weight to 0)", value=False
                        )
                        reasoning_submit_btn = gr.Button("Submit Reasoning")
                        reasoning_refine_btn = gr.Button("Refine")
                    reasoning_status = gr.Textbox(label="Status", value="")

                with gr.Accordion("Conversation Messages", open=False):
                    message_role = gr.Radio(
                        ["user", "assistant"], label="Message Role", value="user"
                    )
                    message_text = gr.Textbox(
                        label="Message Content", placeholder="Enter message here...", lines=3
                    )
                    screenshot_after_msg = gr.Checkbox(
                        label="Receive screenshot after message", value=False
                    )
                    message_submit_btn = gr.Button("Submit Message")
                    message_status = gr.Textbox(label="Status")

                with gr.Accordion("Clipboard Operations", open=False):
                    clipboard_content = gr.Textbox(label="Clipboard Content")
                    get_clipboard_btn = gr.Button("Get Clipboard Content")
                    set_clipboard_text = gr.Textbox(label="Set Clipboard Text")
                    set_clipboard_btn = gr.Button("Set Clipboard")
                    clipboard_status = gr.Textbox(label="Status")

                with gr.Accordion("Run Shell Commands", open=False):
                    command_input = gr.Textbox(label="Command to run", placeholder="ls -la")
                    run_command_btn = gr.Button("Run Command")
                    command_output = gr.Textbox(label="Command Output", lines=5)

                shutdown_btn = gr.Button("Shutdown Computer")

        # Handle save button
        save_btn.click(
            save_demonstration, inputs=[action_log, demo_name, demo_tags], outputs=[save_status]
        )

        # Function to refresh the dataset viewer
        def refresh_dataset_viewer():
            return get_sessions_data()

        # Also update the dataset viewer when saving
        save_btn.click(refresh_dataset_viewer, outputs=dataset_viewer)

        # Also update the tags dropdown when saving
        save_btn.click(get_existing_tags, outputs=[demo_tags, filter_tags])

        # Handle HuggingFace upload button
        hf_upload_btn.click(
            upload_to_huggingface,
            inputs=[hf_dataset_name, hf_visibility, filter_tags],
            outputs=[hf_upload_status],
        )

        # Function to randomize task
        def randomize_task():
            task_dict = random.choice(TASK_EXAMPLES)
            return task_dict["task"]

        # Function to run task setup
        async def run_task_setup(task_text):
            global computer

            # Check if computer is initialized
            if computer is None:
                return (
                    "Computer not initialized. Please initialize the computer first.",
                    img,
                    action_log,
                )

            # Find the task dict that matches the current task text
            for task_dict in TASK_EXAMPLES:
                if task_dict["task"] == task_text:
                    try:
                        # Run the setup function with the computer interface and return the result
                        setup_func = task_dict["setup"]
                        if setup_func:
                            await setup_func(computer)

                        # Send initial user message
                        _, _, logs_json, screenshot = await submit_message(
                            task_text, "user", screenshot_after=True
                        )

                        return f"Setup complete for: {task_text}", screenshot, logs_json
                    except Exception as e:
                        return f"Error during setup: {str(e)}", img, action_log

            return "Task not found in examples", img, action_log

        # Connect the randomize button to the function
        randomize_task_btn.click(randomize_task, outputs=[current_task])

        # Connect the setup button
        run_setup_btn.click(
            run_task_setup, inputs=[current_task], outputs=[setup_status, img, action_log]
        )

        # Event handlers
        action_log.change(get_chatbot_messages, outputs=[chat_log])

        img.select(handle_click, inputs=[img, click_type], outputs=[img, action_log])
        start_btn.click(
            handle_init_computer,
            inputs=[os_choice, app_filter, provider_choice, container_name, api_key_field],
            outputs=[img, action_log],
        )
        wait_btn.click(handle_wait, outputs=[img, action_log])

        # DONE and FAIL buttons just do a placeholder action
        async def handle_done():
            output = await execute("computer", "done", {})
            return output["screenshot"], json.dumps(tool_call_logs, indent=2)

        async def handle_fail():
            output = await execute("computer", "fail", {})
            return output["screenshot"], json.dumps(tool_call_logs, indent=2)

        done_btn.click(handle_done, outputs=[img, action_log])
        fail_btn.click(handle_fail, outputs=[img, action_log])

        # Handle hotkey button
        async def handle_hotkey(selected_keys):
            if not selected_keys or len(selected_keys) == 0:
                return await handle_screenshot(), json.dumps(tool_call_logs, indent=2)

            # When multiple keys are selected, the last one is the main key, the rest are modifiers
            if len(selected_keys) > 1:
                key = selected_keys[-1]
                modifiers = selected_keys[:-1]
            else:
                # If only one key is selected, no modifiers
                key = selected_keys[0]
                modifiers = []

            output = await execute("computer", "send_hotkey", {"keys": selected_keys})
            return output["screenshot"], json.dumps(tool_call_logs, indent=2)

        hotkey_btn.click(handle_hotkey, inputs=[hotkey_keys], outputs=[img, action_log])

        # Define async handler for scrolling
        async def handle_scroll(direction, num_clicks=1):
            """Scroll the page up or down"""
            global computer
            if computer is None:
                return None, json.dumps(tool_call_logs, indent=2)

            # Convert num_clicks to integer with validation
            try:
                num_clicks = int(num_clicks)
                if num_clicks < 1:
                    num_clicks = 1
            except (ValueError, TypeError):
                num_clicks = 1

            # Execute the scroll action
            action = "scroll_up" if direction == "up" else "scroll_down"
            result = await execute("computer", action, {"clicks": num_clicks})

            return result["screenshot"], json.dumps(tool_call_logs, indent=2)

        # Connect scroll buttons
        scroll_up_btn.click(
            handle_scroll, inputs=[gr.State("up"), scroll_clicks], outputs=[img, action_log]
        )
        scroll_down_btn.click(
            handle_scroll, inputs=[gr.State("down"), scroll_clicks], outputs=[img, action_log]
        )

        submit_text_btn.click(
            handle_type, inputs=[input_text, press_enter_checkbox], outputs=[img, action_log]
        )
        get_clipboard_btn.click(handle_copy, outputs=[clipboard_content, action_log])
        set_clipboard_btn.click(
            handle_set_clipboard, inputs=set_clipboard_text, outputs=[clipboard_status, action_log]
        )
        run_command_btn.click(
            handle_run_command, inputs=command_input, outputs=[command_output, action_log]
        )
        shutdown_btn.click(handle_shutdown, outputs=[clipboard_status, action_log])
        clear_log_btn.click(clear_log, outputs=action_log)
        chat_log.clear(clear_log, outputs=action_log)

        # Update last action display after each action
        img.select(lambda *args: get_last_action_display(), outputs=last_action_display)
        start_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
        wait_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
        done_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
        fail_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
        hotkey_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
        submit_text_btn.click(lambda: get_last_action_display(), outputs=last_action_display)
        message_submit_btn.click(lambda: get_last_action_display(), outputs=last_action_display)

        # Handle reasoning submission
        async def handle_reasoning_update(reasoning, is_erroneous):
            status = await update_reasoning(reasoning, is_erroneous)
            return status, json.dumps(tool_call_logs, indent=2)

        reasoning_submit_btn.click(
            handle_reasoning_update,
            inputs=[reasoning_text, erroneous_checkbox],
            outputs=[reasoning_status, action_log],
        )

        # Helper function for text refinement - used for all refine buttons
        async def handle_text_refinement(
            text_content, content_type="reasoning", task_text="", use_before=False
        ):
            global last_screenshot, last_action, tool_call_logs, last_screenshot_before

            screenshot = last_screenshot_before if use_before else last_screenshot

            # Check if we have the necessary components
            if not text_content.strip():
                return f"No {content_type} text to refine", text_content

            if screenshot is None:
                return "No screenshot available for refinement", text_content

            try:
                # Convert the PIL image to base64 if available
                screenshot_base64 = None
                if screenshot:
                    with io.BytesIO() as buffer:
                        screenshot.save(buffer, format="PNG")
                        screenshot_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")

                # Set up the OpenAI client for refinement
                # Try different API keys from environment in order of preference
                api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OMNI_OPENAI_API_KEY")

                if not api_key:
                    return "OpenAI API key not found in environment", text_content

                from libs.agent.agent.providers.omni.clients.openai import OpenAIClient

                # Create a client - use gpt-4 if available, fall back to 3.5-turbo
                model = "gpt-4.1-2025-04-14"

                client = OpenAIClient(
                    api_key=api_key,
                    model=model,
                    max_tokens=1024,
                    temperature=0.2,  # Low temperature for more focused refinement
                )

                # Get the last 3 messages from the chat history
                recent_messages = (
                    get_chatbot_messages(tool_call_logs)[-3:]
                    if len(get_chatbot_messages(tool_call_logs)) >= 3
                    else get_chatbot_messages(tool_call_logs)
                )

                # Format message history with titles when available
                formatted_messages = []
                for msg in recent_messages:
                    if msg.metadata and "title" in msg.metadata:
                        formatted_messages.append(
                            f"{msg.role} ({msg.metadata['title']}): {msg.content}"
                        )
                    else:
                        formatted_messages.append(f"{msg.role}: {msg.content}")

                formatted_messages = [f"<message>{msg}</message>" for msg in formatted_messages]

                # Create different prompts based on content type
                if content_type == "reasoning":
                    message_prompt = f"""You are helping refine an explanation about why a specific computer UI action is about to be taken.

The screenshot below shows the state of the screen as I prepare to take this action.

TASK: <task_text>{task_text}</task_text>

ACTION I'M ABOUT TO TAKE:
<action_display>{get_last_action_display()}</action_display>

CURRENT EXPLANATION:
<reasoning_content>{text_content}</reasoning_content>

RECENT MESSAGES:
<recent_messages>{'\n'.join(formatted_messages)}</recent_messages>

Make this into a concise reasoning / self-reflection trace, using "I should/need to/let me/it seems/i see". This trace MUST demonstrate planning extensively before each function call, and reflect extensively on the outcomes of the previous function calls. DO NOT do this entire process by making function calls only, as this can impair your ability to solve the problem and think insightfully.



Provide ONLY the refined explanation text, with no additional commentary or markdown."""

                elif content_type == "memory":
                    message_prompt = f"""You are helping refine memory/scratchpad content for an AI assistant.

The screenshot below shows the current state of the computer interface.

TASK: <task_text>{task_text}</task_text>

CURRENT MEMORY CONTENT:
<memory_content>{text_content}</memory_content>

RECENT MESSAGES:
<recent_messages>{'\n'.join(formatted_messages)}</recent_messages>

Refine this memory content to be more clear, organized, and useful for the assistant's task.
- Organize information into logical sections
- Prioritize key facts needed for the task
- Remove unnecessary or redundant information
- Make the format more readable with bullet points or other organizational elements if helpful

Provide ONLY the refined memory text, with no additional commentary or markdown."""

                elif content_type == "text":
                    message_prompt = f"""You are helping refine text that will be typed into a computer interface.

The screenshot below shows the current state of the computer interface.

TASK: <task_text>{task_text}</task_text>

CURRENT TEXT TO TYPE:
<text_content>{text_content}</text_content>

RECENT MESSAGES:
<recent_messages>{'\n'.join(formatted_messages)}</recent_messages>

Refine this text to be more effective for the current context:
- Fix any spelling or grammar issues
- Improve clarity and conciseness
- Format appropriately for the context
- Optimize the text for the intended use

Provide ONLY the refined text, with no additional commentary or markdown."""

                else:
                    message_prompt = f"""You are helping refine text content.

The screenshot below shows the current state of the computer interface.

CURRENT TEXT:
{text_content}

RECENT MESSAGES:
<recent_messages>{'\n'.join(formatted_messages)}</recent_messages>

Improve this text to be more clear, concise, and effective.

Provide ONLY the refined text, with no additional commentary or markdown."""

                # Create messages with the screenshot
                messages = []

                # Add message with image if available
                if screenshot_base64:
                    messages.append(
                        {
                            "role": "user",
                            "content": [
                                {"type": "text", "text": message_prompt},
                                {
                                    "type": "image_url",
                                    "image_url": {
                                        "url": f"data:image/png;base64,{screenshot_base64}"
                                    },
                                },
                            ],
                        }
                    )
                else:
                    # Fallback if screenshot isn't available
                    messages.append({"role": "user", "content": message_prompt})

                print(message_prompt)

                # Make the API call
                response = await client.run_interleaved(
                    messages=messages,
                    system="You are a helpful AI assistant that improves and refines text.",
                )

                # Extract the refined text from the response
                if "choices" in response and len(response["choices"]) > 0:
                    refined_text = response["choices"][0]["message"]["content"]
                    return f"{content_type.capitalize()} refined successfully", refined_text
                else:
                    return "Error: Unexpected API response format", text_content

            except Exception as e:
                return f"Error refining {content_type}: {str(e)}", text_content

        # Define async wrapper functions for each refine button
        async def handle_reasoning_refinement(reasoning, task):
            return await handle_text_refinement(reasoning, "reasoning", task, use_before=True)

        async def handle_memory_refinement(memory_text, task):
            return await handle_text_refinement(memory_text, "memory", task)

        async def handle_text_input_refinement(text, task):
            return await handle_text_refinement(text, "text", task)

        # Connect the refine buttons to the appropriate handlers
        reasoning_refine_btn.click(
            handle_reasoning_refinement,
            inputs=[reasoning_text, current_task],
            outputs=[reasoning_status, reasoning_text],
        )

        # Connect memory refine button
        memory_refine_btn.click(
            handle_memory_refinement,
            inputs=[memory_display, current_task],
            outputs=[memory_status, memory_display],
        )

        # Status element for type text section
        with gr.Group():
            type_text_status = gr.Textbox(label="Text Status", value="", visible=False)

        # Connect text refine button
        text_refine_btn.click(
            handle_text_input_refinement,
            inputs=[input_text, current_task],
            outputs=[type_text_status, input_text],
        )

        # Handle memory submission
        async def handle_memory_update(memory_text):
            status = await handle_memory(memory_text)
            return status, json.dumps(tool_call_logs, indent=2)

        memory_submit_btn.click(
            handle_memory_update, inputs=memory_display, outputs=[memory_status, action_log]
        )

        # Handle message submission
        async def handle_message_submit(message_content, role, screenshot_after):
            status, chat_messages, logs_json, screenshot = await submit_message(
                message_content, role, screenshot_after
            )
            if screenshot:
                return status, chat_messages, logs_json, screenshot
            else:
                return status, chat_messages, logs_json, last_screenshot

        message_submit_btn.click(
            handle_message_submit,
            inputs=[message_text, message_role, screenshot_after_msg],
            outputs=[message_status, chat_log, action_log, img],
        )

    return app


# Launch the app
if __name__ == "__main__":
    app = create_gradio_ui()
    app.launch()

```

--------------------------------------------------------------------------------
/libs/python/computer/computer/computer.py:
--------------------------------------------------------------------------------

```python
import asyncio
import io
import json
import logging
import os
import platform
import re
import time
import traceback
from functools import wraps
from typing import (
    TYPE_CHECKING,
    Any,
    Awaitable,
    Callable,
    Dict,
    List,
    Literal,
    Optional,
    TypeVar,
    Union,
    cast,
)

try:
    from typing import ParamSpec
except Exception:  # pragma: no cover
    from typing_extensions import ParamSpec  # type: ignore

P = ParamSpec("P")
R = TypeVar("R")

from core.telemetry import is_telemetry_enabled, record_event
from PIL import Image

from . import helpers
from .interface.factory import InterfaceFactory
from .logger import Logger, LogLevel
from .models import Computer as ComputerConfig
from .models import Display
from .tracing import ComputerTracing
from .tracing_wrapper import TracingInterfaceWrapper

SYSTEM_INFO = {
    "os": platform.system().lower(),
    "os_version": platform.release(),
    "python_version": platform.python_version(),
}

# Import provider related modules
from .providers.base import VMProviderType
from .providers.factory import VMProviderFactory

OSType = Literal["macos", "linux", "windows"]


class Computer:
    """Computer is the main class for interacting with the computer."""

    def create_desktop_from_apps(self, apps):
        """
        Create a virtual desktop from a list of app names, returning a DioramaComputer
        that proxies Diorama.Interface but uses diorama_cmds via the computer interface.

        Args:
            apps (list[str]): List of application names to include in the desktop.
        Returns:
            DioramaComputer: A proxy object with the Diorama interface, but using diorama_cmds.
        """
        assert (
            "app-use" in self.experiments
        ), "App Usage is an experimental feature. Enable it by passing experiments=['app-use'] to Computer()"
        from .diorama_computer import DioramaComputer

        return DioramaComputer(self, apps)

    def __init__(
        self,
        display: Union[Display, Dict[str, int], str] = "1024x768",
        memory: str = "8GB",
        cpu: str = "4",
        os_type: OSType = "macos",
        name: str = "",
        image: Optional[str] = None,
        shared_directories: Optional[List[str]] = None,
        use_host_computer_server: bool = False,
        verbosity: Union[int, LogLevel] = logging.INFO,
        telemetry_enabled: bool = True,
        provider_type: Union[str, VMProviderType] = VMProviderType.LUME,
        provider_port: Optional[int] = 7777,
        noVNC_port: Optional[int] = 8006,
        api_port: Optional[int] = None,
        host: str = os.environ.get("PYLUME_HOST", "localhost"),
        storage: Optional[str] = None,
        ephemeral: bool = False,
        api_key: Optional[str] = None,
        experiments: Optional[List[str]] = None,
    ):
        """Initialize a new Computer instance.

        Args:
            display: The display configuration. Can be:
                    - A Display object
                    - A dict with 'width' and 'height'
                    - A string in format "WIDTHxHEIGHT" (e.g. "1920x1080")
                    Defaults to "1024x768"
            memory: The VM memory allocation. Defaults to "8GB"
            cpu: The VM CPU allocation. Defaults to "4"
            os_type: The operating system type ('macos' or 'linux')
            name: The VM name
            image: The VM image name
            shared_directories: Optional list of directory paths to share with the VM
            use_host_computer_server: If True, target localhost instead of starting a VM
            verbosity: Logging level (standard Python logging levels: logging.DEBUG, logging.INFO, etc.)
                      LogLevel enum values are still accepted for backward compatibility
            telemetry_enabled: Whether to enable telemetry tracking. Defaults to True.
            provider_type: The VM provider type to use (lume, qemu, cloud)
            port: Optional port to use for the VM provider server
            noVNC_port: Optional port for the noVNC web interface (Lumier provider)
            host: Host to use for VM provider connections (e.g. "localhost", "host.docker.internal")
            storage: Optional path for persistent VM storage (Lumier provider)
            ephemeral: Whether to use ephemeral storage
            api_key: Optional API key for cloud providers (defaults to CUA_API_KEY environment variable)
            experiments: Optional list of experimental features to enable (e.g. ["app-use"])
        """

        self.logger = Logger("computer", verbosity)
        self.logger.info("Initializing Computer...")

        # Fall back to environment variable for api_key if not provided
        if api_key is None:
            api_key = os.environ.get("CUA_API_KEY")

        if not image:
            if os_type == "macos":
                image = "macos-sequoia-cua:latest"
            elif os_type == "linux":
                image = "trycua/cua-ubuntu:latest"
        image = str(image)

        # Store original parameters
        self.image = image
        self.provider_port = provider_port
        self.noVNC_port = noVNC_port
        self.api_port = api_port
        self.host = host
        self.os_type = os_type
        self.provider_type = provider_type
        self.ephemeral = ephemeral
        self.api_key = api_key if self.provider_type == VMProviderType.CLOUD else None

        # Set default API port if not specified
        if self.api_port is None:
            self.api_port = 8443 if self.api_key else 8000

        self.experiments = experiments or []

        if "app-use" in self.experiments:
            assert self.os_type == "macos", "App use experiment is only supported on macOS"

        # The default is currently to use non-ephemeral storage
        if storage and ephemeral and storage != "ephemeral":
            raise ValueError("Storage path and ephemeral flag cannot be used together")

        # Windows Sandbox always uses ephemeral storage
        if self.provider_type == VMProviderType.WINSANDBOX:
            if not ephemeral and storage != None and storage != "ephemeral":
                self.logger.warning(
                    "Windows Sandbox storage is always ephemeral. Setting ephemeral=True."
                )
            self.ephemeral = True
            self.storage = "ephemeral"
        else:
            self.storage = "ephemeral" if ephemeral else storage

        # For Lumier provider, store the first shared directory path to use
        # for VM file sharing
        self.shared_path = None
        if shared_directories and len(shared_directories) > 0:
            self.shared_path = shared_directories[0]
            self.logger.info(
                f"Using first shared directory for VM file sharing: {self.shared_path}"
            )

        # Store telemetry preference
        self._telemetry_enabled = telemetry_enabled

        # Set initialization flag
        self._initialized = False
        self._running = False

        # Configure root logger
        self.verbosity = verbosity
        self.logger = Logger("computer", verbosity)

        # Configure component loggers with proper hierarchy
        self.vm_logger = Logger("computer.vm", verbosity)
        self.interface_logger = Logger("computer.interface", verbosity)

        if not use_host_computer_server:
            if ":" not in image:
                image = f"{image}:latest"

            if not name:
                # Normalize the name to be used for the VM
                name = image.replace(":", "_")
                # Remove any forward slashes
                name = name.replace("/", "_")

            # Convert display parameter to Display object
            if isinstance(display, str):
                # Parse string format "WIDTHxHEIGHT"
                match = re.match(r"(\d+)x(\d+)", display)
                if not match:
                    raise ValueError(
                        "Display string must be in format 'WIDTHxHEIGHT' (e.g. '1024x768')"
                    )
                width, height = map(int, match.groups())
                display_config = Display(width=width, height=height)
            elif isinstance(display, dict):
                display_config = Display(**display)
            else:
                display_config = display

            self.config = ComputerConfig(
                image=image.split(":")[0],
                tag=image.split(":")[1],
                name=name,
                display=display_config,
                memory=memory,
                cpu=cpu,
            )
            # Initialize VM provider but don't start it yet - we'll do that in run()
            self.config.vm_provider = None  # Will be initialized in run()

        # Store shared directories config
        self.shared_directories = shared_directories or []

        # Placeholder for VM provider context manager
        self._provider_context = None

        # Initialize with proper typing - None at first, will be set in run()
        self._interface = None
        self._original_interface = None  # Keep reference to original interface
        self._tracing_wrapper = None  # Tracing wrapper for interface
        self.use_host_computer_server = use_host_computer_server

        # Initialize tracing
        self._tracing = ComputerTracing(self)

        # Record initialization in telemetry (if enabled)
        if telemetry_enabled and is_telemetry_enabled():
            record_event("computer_initialized", SYSTEM_INFO)
        else:
            self.logger.debug("Telemetry disabled - skipping initialization tracking")

    async def __aenter__(self):
        """Start the computer."""
        await self.run()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Stop the computer."""
        await self.disconnect()

    def __enter__(self):
        """Start the computer."""
        # Run the event loop to call the async enter method
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__aenter__())
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Stop the computer."""
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__aexit__(exc_type, exc_val, exc_tb))

    async def run(self) -> Optional[str]:
        """Initialize the VM and computer interface."""
        if TYPE_CHECKING:
            from .interface.base import BaseComputerInterface

        # If already initialized, just log and return
        if hasattr(self, "_initialized") and self._initialized:
            self.logger.info("Computer already initialized, skipping initialization")
            return

        self.logger.info("Starting computer...")
        start_time = time.time()

        try:
            # If using host computer server
            if self.use_host_computer_server:
                self.logger.info("Using host computer server")
                # Set ip_address for host computer server mode
                ip_address = "localhost"
                # Create the interface with explicit type annotation
                from .interface.base import BaseComputerInterface

                interface = cast(
                    BaseComputerInterface,
                    InterfaceFactory.create_interface_for_os(
                        os=self.os_type, ip_address=ip_address, api_port=self.api_port  # type: ignore[arg-type]
                    ),
                )
                self._interface = interface
                self._original_interface = interface

                self.logger.info("Waiting for host computer server to be ready...")
                await self._interface.wait_for_ready()
                self.logger.info("Host computer server ready")
            else:
                # Start or connect to VM
                self.logger.info(f"Starting VM: {self.image}")
                if not self._provider_context:
                    try:
                        provider_type_name = (
                            self.provider_type.name
                            if isinstance(self.provider_type, VMProviderType)
                            else self.provider_type
                        )
                        self.logger.verbose(
                            f"Initializing {provider_type_name} provider context..."
                        )

                        # Explicitly set provider parameters
                        storage = "ephemeral" if self.ephemeral else self.storage
                        verbose = self.verbosity >= LogLevel.DEBUG
                        ephemeral = self.ephemeral
                        port = self.provider_port if self.provider_port is not None else 7777
                        host = self.host if self.host else "localhost"
                        image = self.image
                        shared_path = self.shared_path
                        noVNC_port = self.noVNC_port

                        # Create VM provider instance with explicit parameters
                        try:
                            if self.provider_type == VMProviderType.LUMIER:
                                self.logger.info(f"Using VM image for Lumier provider: {image}")
                                if shared_path:
                                    self.logger.info(
                                        f"Using shared path for Lumier provider: {shared_path}"
                                    )
                                if noVNC_port:
                                    self.logger.info(
                                        f"Using noVNC port for Lumier provider: {noVNC_port}"
                                    )
                                self.config.vm_provider = VMProviderFactory.create_provider(
                                    self.provider_type,
                                    port=port,
                                    host=host,
                                    storage=storage,
                                    shared_path=shared_path,
                                    image=image,
                                    verbose=verbose,
                                    ephemeral=ephemeral,
                                    noVNC_port=noVNC_port,
                                )
                            elif self.provider_type == VMProviderType.LUME:
                                self.config.vm_provider = VMProviderFactory.create_provider(
                                    self.provider_type,
                                    port=port,
                                    host=host,
                                    storage=storage,
                                    verbose=verbose,
                                    ephemeral=ephemeral,
                                )
                            elif self.provider_type == VMProviderType.CLOUD:
                                self.config.vm_provider = VMProviderFactory.create_provider(
                                    self.provider_type,
                                    api_key=self.api_key,
                                    verbose=verbose,
                                )
                            elif self.provider_type == VMProviderType.WINSANDBOX:
                                self.config.vm_provider = VMProviderFactory.create_provider(
                                    self.provider_type,
                                    port=port,
                                    host=host,
                                    storage=storage,
                                    verbose=verbose,
                                    ephemeral=ephemeral,
                                    noVNC_port=noVNC_port,
                                )
                            elif self.provider_type == VMProviderType.DOCKER:
                                self.config.vm_provider = VMProviderFactory.create_provider(
                                    self.provider_type,
                                    port=port,
                                    host=host,
                                    storage=storage,
                                    shared_path=shared_path,
                                    image=image or "trycua/cua-ubuntu:latest",
                                    verbose=verbose,
                                    ephemeral=ephemeral,
                                    noVNC_port=noVNC_port,
                                    api_port=self.api_port,
                                )
                            else:
                                raise ValueError(f"Unsupported provider type: {self.provider_type}")
                            self._provider_context = await self.config.vm_provider.__aenter__()
                            self.logger.verbose("VM provider context initialized successfully")
                        except ImportError as ie:
                            self.logger.error(f"Failed to import provider dependencies: {ie}")
                            if str(ie).find("lume") >= 0 and str(ie).find("lumier") < 0:
                                self.logger.error(
                                    "Please install with: pip install cua-computer[lume]"
                                )
                            elif str(ie).find("lumier") >= 0 or str(ie).find("docker") >= 0:
                                self.logger.error(
                                    "Please install with: pip install cua-computer[lumier] and make sure Docker is installed"
                                )
                            elif str(ie).find("cloud") >= 0:
                                self.logger.error(
                                    "Please install with: pip install cua-computer[cloud]"
                                )
                            raise
                    except Exception as e:
                        self.logger.error(f"Failed to initialize provider context: {e}")
                        raise RuntimeError(f"Failed to initialize VM provider: {e}")

                # Check if VM exists or create it
                is_running = False
                try:
                    if self.config.vm_provider is None:
                        raise RuntimeError(f"VM provider not initialized for {self.config.name}")

                    vm = await self.config.vm_provider.get_vm(self.config.name)
                    self.logger.verbose(f"Found existing VM: {self.config.name}")
                    is_running = vm.get("status") == "running"
                except Exception as e:
                    self.logger.error(f"VM not found: {self.config.name}")
                    self.logger.error(f"Error: {e}")
                    raise RuntimeError(f"VM {self.config.name} could not be found or created.")

                # Start the VM if it's not running
                if not is_running:
                    self.logger.info(f"VM {self.config.name} is not running, starting it...")

                    # Convert paths to dictionary format for shared directories
                    shared_dirs = []
                    for path in self.shared_directories:
                        self.logger.verbose(f"Adding shared directory: {path}")
                        path = os.path.abspath(os.path.expanduser(path))
                        if os.path.exists(path):
                            # Add path in format expected by Lume API
                            shared_dirs.append({"hostPath": path, "readOnly": False})
                        else:
                            self.logger.warning(f"Shared directory does not exist: {path}")

                    # Prepare run options to pass to the provider
                    run_opts = {}

                    # Add display information if available
                    if self.config.display is not None:
                        display_info = {
                            "width": self.config.display.width,
                            "height": self.config.display.height,
                        }

                        # Check if scale_factor exists before adding it
                        if hasattr(self.config.display, "scale_factor"):
                            display_info["scale_factor"] = self.config.display.scale_factor

                        run_opts["display"] = display_info

                    # Add shared directories if available
                    if self.shared_directories:
                        run_opts["shared_directories"] = shared_dirs.copy()

                    # Run the VM with the provider
                    try:
                        if self.config.vm_provider is None:
                            raise RuntimeError(
                                f"VM provider not initialized for {self.config.name}"
                            )

                        # Use the complete run_opts we prepared earlier
                        # Handle ephemeral storage for run_vm method too
                        storage_param = "ephemeral" if self.ephemeral else self.storage

                        # Log the image being used
                        self.logger.info(f"Running VM using image: {self.image}")

                        # Call provider.run_vm with explicit image parameter
                        response = await self.config.vm_provider.run_vm(
                            image=self.image,
                            name=self.config.name,
                            run_opts=run_opts,
                            storage=storage_param,
                        )
                        self.logger.info(f"VM run response: {response if response else 'None'}")
                    except Exception as run_error:
                        self.logger.error(f"Failed to run VM: {run_error}")
                        raise RuntimeError(f"Failed to start VM: {run_error}")

                # Wait for VM to be ready with a valid IP address
                self.logger.info("Waiting for VM to be ready with a valid IP address...")
                try:
                    if self.provider_type == VMProviderType.LUMIER:
                        max_retries = 60  # Increased for Lumier VM startup which takes longer
                        retry_delay = 3  # 3 seconds between retries for Lumier
                    else:
                        max_retries = 30  # Default for other providers
                        retry_delay = 2  # 2 seconds between retries

                    self.logger.info(
                        f"Waiting up to {max_retries * retry_delay} seconds for VM to be ready..."
                    )
                    ip = await self.get_ip(max_retries=max_retries, retry_delay=retry_delay)

                    # If we get here, we have a valid IP
                    self.logger.info(f"VM is ready with IP: {ip}")
                    ip_address = ip
                except TimeoutError as timeout_error:
                    self.logger.error(str(timeout_error))
                    raise RuntimeError(f"VM startup timed out: {timeout_error}")
                except Exception as wait_error:
                    self.logger.error(f"Error waiting for VM: {wait_error}")
                    raise RuntimeError(f"VM failed to become ready: {wait_error}")
        except Exception as e:
            self.logger.error(f"Failed to initialize computer: {e}")
            self.logger.error(traceback.format_exc())
            raise RuntimeError(f"Failed to initialize computer: {e}")

        try:
            # Verify we have a valid IP before initializing the interface
            if not ip_address or ip_address == "unknown" or ip_address == "0.0.0.0":
                raise RuntimeError(
                    f"Cannot initialize interface - invalid IP address: {ip_address}"
                )

            # Initialize the interface using the factory with the specified OS
            self.logger.info(f"Initializing interface for {self.os_type} at {ip_address}")
            from .interface.base import BaseComputerInterface

            # Pass authentication credentials if using cloud provider
            if self.provider_type == VMProviderType.CLOUD and self.api_key and self.config.name:
                interface = cast(
                    BaseComputerInterface,
                    InterfaceFactory.create_interface_for_os(
                        os=self.os_type,
                        ip_address=ip_address,
                        api_key=self.api_key,
                        vm_name=self.config.name,
                        api_port=self.api_port,
                    ),
                )
            else:
                interface = cast(
                    BaseComputerInterface,
                    InterfaceFactory.create_interface_for_os(
                        os=self.os_type, ip_address=ip_address, api_port=self.api_port
                    ),
                )

            self._interface = interface
            self._original_interface = interface

            # Wait for the WebSocket interface to be ready
            self.logger.info("Connecting to WebSocket interface...")

            try:
                # Use a single timeout for the entire connection process
                # The VM should already be ready at this point, so we're just establishing the connection
                await self._interface.wait_for_ready(timeout=30)
                self.logger.info("Sandbox interface connected successfully")
            except TimeoutError as e:
                port = getattr(self._interface, "_api_port", 8000)  # Default to 8000 if not set
                self.logger.error(f"Failed to connect to sandbox interface at {ip_address}:{port}")
                raise TimeoutError(
                    f"Could not connect to sandbox interface at {ip_address}:{port}: {str(e)}"
                )

            # Create an event to keep the VM running in background if needed
            if not self.use_host_computer_server:
                self._stop_event = asyncio.Event()
                self._keep_alive_task = asyncio.create_task(self._stop_event.wait())

            self.logger.info("Computer is ready")

            # Set the initialization flag and clear the initializing flag
            self._initialized = True

            # Set this instance as the default computer for remote decorators
            helpers.set_default_computer(self)

            self.logger.info("Computer successfully initialized")
        except Exception as e:
            raise
        finally:
            # Log initialization time for performance monitoring
            duration_ms = (time.time() - start_time) * 1000
            self.logger.debug(f"Computer initialization took {duration_ms:.2f}ms")
        return

    async def disconnect(self) -> None:
        """Disconnect from the computer's WebSocket interface."""
        if self._interface:
            self._interface.close()

    async def stop(self) -> None:
        """Disconnect from the computer's WebSocket interface and stop the computer."""
        start_time = time.time()

        try:
            self.logger.info("Stopping Computer...")

            # In VM mode, first explicitly stop the VM, then exit the provider context
            if (
                not self.use_host_computer_server
                and self._provider_context
                and self.config.vm_provider is not None
            ):
                try:
                    self.logger.info(f"Stopping VM {self.config.name}...")
                    await self.config.vm_provider.stop_vm(
                        name=self.config.name,
                        storage=self.storage,  # Pass storage explicitly for clarity
                    )
                except Exception as e:
                    self.logger.error(f"Error stopping VM: {e}")

                self.logger.verbose("Closing VM provider context...")
                await self.config.vm_provider.__aexit__(None, None, None)
                self._provider_context = None

            await self.disconnect()
            self.logger.info("Computer stopped")
        except Exception as e:
            self.logger.debug(
                f"Error during cleanup: {e}"
            )  # Log as debug since this might be expected
        finally:
            # Log stop time for performance monitoring
            duration_ms = (time.time() - start_time) * 1000
            self.logger.debug(f"Computer stop process took {duration_ms:.2f}ms")
        return

    async def start(self) -> None:
        """Start the computer."""
        await self.run()

    async def restart(self) -> None:
        """Restart the computer.

        If using a VM provider that supports restart, this will issue a restart
        without tearing down the provider context, then reconnect the interface.
        Falls back to stop()+run() when a provider restart is not available.
        """
        # Host computer server: just disconnect and run again
        if self.use_host_computer_server:
            try:
                await self.disconnect()
            finally:
                await self.run()
            return

        # If no VM provider context yet, fall back to full run
        if not getattr(self, "_provider_context", None) or self.config.vm_provider is None:
            self.logger.info("No provider context active; performing full restart via run()")
            await self.run()
            return

        # Gracefully close current interface connection if present
        if self._interface:
            try:
                self._interface.close()
            except Exception as e:
                self.logger.debug(f"Error closing interface prior to restart: {e}")

        # Attempt provider-level restart if implemented
        try:
            storage_param = "ephemeral" if self.ephemeral else self.storage
            if hasattr(self.config.vm_provider, "restart_vm"):
                self.logger.info(f"Restarting VM {self.config.name} via provider...")
                await self.config.vm_provider.restart_vm(
                    name=self.config.name, storage=storage_param
                )
            else:
                # Fallback: stop then start without leaving provider context
                self.logger.info(
                    f"Provider has no restart_vm; performing stop+start for {self.config.name}..."
                )
                await self.config.vm_provider.stop_vm(name=self.config.name, storage=storage_param)
                await self.config.vm_provider.run_vm(
                    image=self.image, name=self.config.name, run_opts={}, storage=storage_param
                )
        except Exception as e:
            self.logger.error(f"Failed to restart VM via provider: {e}")
            # As a last resort, do a full stop (with provider context exit) and run
            try:
                await self.stop()
            finally:
                await self.run()
            return

        # Wait for VM to be ready and reconnect interface
        try:
            self.logger.info("Waiting for VM to be ready after restart...")
            if self.provider_type == VMProviderType.LUMIER:
                max_retries = 60
                retry_delay = 3
            else:
                max_retries = 30
                retry_delay = 2
            ip_address = await self.get_ip(max_retries=max_retries, retry_delay=retry_delay)

            self.logger.info(f"Re-initializing interface for {self.os_type} at {ip_address}")
            from .interface.base import BaseComputerInterface

            if self.provider_type == VMProviderType.CLOUD and self.api_key and self.config.name:
                self._interface = cast(
                    BaseComputerInterface,
                    InterfaceFactory.create_interface_for_os(
                        os=self.os_type,
                        ip_address=ip_address,
                        api_key=self.api_key,
                        vm_name=self.config.name,
                        api_port=self.api_port,
                    ),
                )
            else:
                self._interface = cast(
                    BaseComputerInterface,
                    InterfaceFactory.create_interface_for_os(
                        os=self.os_type,
                        ip_address=ip_address,
                        api_port=self.api_port,
                    ),
                )

            self.logger.info("Connecting to WebSocket interface after restart...")
            await self._interface.wait_for_ready(timeout=30)
            self.logger.info("Computer reconnected and ready after restart")
        except Exception as e:
            self.logger.error(f"Failed to reconnect after restart: {e}")
            # Try a full reset if reconnection failed
            try:
                await self.stop()
            finally:
                await self.run()

    # @property
    async def get_ip(self, max_retries: int = 15, retry_delay: int = 3) -> str:
        """Get the IP address of the VM or localhost if using host computer server.

        This method delegates to the provider's get_ip method, which waits indefinitely
        until the VM has a valid IP address.

        Args:
            max_retries: Unused parameter, kept for backward compatibility
            retry_delay: Delay between retries in seconds (default: 2)

        Returns:
            IP address of the VM or localhost if using host computer server
        """
        # For host computer server, always return localhost immediately
        if self.use_host_computer_server:
            return "127.0.0.1"

        # Get IP from the provider - each provider implements its own waiting logic
        if self.config.vm_provider is None:
            raise RuntimeError("VM provider is not initialized")

        # Log that we're waiting for the IP
        self.logger.info(f"Waiting for VM {self.config.name} to get an IP address...")

        # Call the provider's get_ip method which will wait indefinitely
        storage_param = "ephemeral" if self.ephemeral else self.storage

        # Log the image being used
        self.logger.info(f"Running VM using image: {self.image}")

        # Call provider.get_ip with explicit image parameter
        ip = await self.config.vm_provider.get_ip(
            name=self.config.name, storage=storage_param, retry_delay=retry_delay
        )

        # Log success
        self.logger.info(f"VM {self.config.name} has IP address: {ip}")
        return ip

    async def wait_vm_ready(self) -> Optional[Dict[str, Any]]:
        """Wait for VM to be ready with an IP address.

        Returns:
            VM status information or None if using host computer server.
        """
        if self.use_host_computer_server:
            return None

        timeout = 600  # 10 minutes timeout (increased from 4 minutes)
        interval = 2.0  # 2 seconds between checks (increased to reduce API load)
        start_time = time.time()
        last_status = None
        attempts = 0

        self.logger.info(f"Waiting for VM {self.config.name} to be ready (timeout: {timeout}s)...")

        while time.time() - start_time < timeout:
            attempts += 1
            elapsed = time.time() - start_time

            try:
                # Keep polling for VM info
                if self.config.vm_provider is None:
                    self.logger.error("VM provider is not initialized")
                    vm = None
                else:
                    vm = await self.config.vm_provider.get_vm(self.config.name)

                # Log full VM properties for debugging (every 30 attempts)
                if attempts % 30 == 0:
                    self.logger.info(
                        f"VM properties at attempt {attempts}: {vars(vm) if vm else 'None'}"
                    )

                # Get current status for logging
                current_status = getattr(vm, "status", None) if vm else None
                if current_status != last_status:
                    self.logger.info(
                        f"VM status changed to: {current_status} (after {elapsed:.1f}s)"
                    )
                    last_status = current_status

                # Check for IP address - ensure it's not None or empty
                ip = getattr(vm, "ip_address", None) if vm else None
                if ip and ip.strip():  # Check for non-empty string
                    self.logger.info(
                        f"VM {self.config.name} got IP address: {ip} (after {elapsed:.1f}s)"
                    )
                    return vm

                if attempts % 10 == 0:  # Log every 10 attempts to avoid flooding
                    self.logger.info(
                        f"Still waiting for VM IP address... (elapsed: {elapsed:.1f}s)"
                    )
                else:
                    self.logger.debug(
                        f"Waiting for VM IP address... Current IP: {ip}, Status: {current_status}"
                    )

            except Exception as e:
                self.logger.warning(f"Error checking VM status (attempt {attempts}): {str(e)}")
                # If we've been trying for a while and still getting errors, log more details
                if elapsed > 60:  # After 1 minute of errors, log more details
                    self.logger.error(f"Persistent error getting VM status: {str(e)}")
                    self.logger.info("Trying to get VM list for debugging...")
                    try:
                        if self.config.vm_provider is not None:
                            vms = await self.config.vm_provider.list_vms()
                            self.logger.info(
                                f"Available VMs: {[getattr(vm, 'name', None) for vm in vms if hasattr(vm, 'name')]}"
                            )
                    except Exception as list_error:
                        self.logger.error(f"Failed to list VMs: {str(list_error)}")

            await asyncio.sleep(interval)

        # If we get here, we've timed out
        elapsed = time.time() - start_time
        self.logger.error(f"VM {self.config.name} not ready after {elapsed:.1f} seconds")

        # Try to get final VM status for debugging
        try:
            if self.config.vm_provider is not None:
                vm = await self.config.vm_provider.get_vm(self.config.name)
                # VM data is returned as a dictionary from the Lumier provider
                status = vm.get("status", "unknown") if vm else "unknown"
                ip = vm.get("ip_address") if vm else None
            else:
                status = "unknown"
                ip = None
            self.logger.error(f"Final VM status: {status}, IP: {ip}")
        except Exception as e:
            self.logger.error(f"Failed to get final VM status: {str(e)}")

        raise TimeoutError(
            f"VM {self.config.name} not ready after {elapsed:.1f} seconds - IP address not assigned"
        )

    async def update(self, cpu: Optional[int] = None, memory: Optional[str] = None):
        """Update VM settings."""
        self.logger.info(
            f"Updating VM settings: CPU={cpu or self.config.cpu}, Memory={memory or self.config.memory}"
        )
        update_opts = {"cpu": cpu or int(self.config.cpu), "memory": memory or self.config.memory}
        if self.config.vm_provider is not None:
            await self.config.vm_provider.update_vm(
                name=self.config.name,
                update_opts=update_opts,
                storage=self.storage,  # Pass storage explicitly for clarity
            )
        else:
            raise RuntimeError("VM provider not initialized")

    def get_screenshot_size(self, screenshot: bytes) -> Dict[str, int]:
        """Get the dimensions of a screenshot.

        Args:
            screenshot: The screenshot bytes

        Returns:
            Dict[str, int]: Dictionary containing 'width' and 'height' of the image
        """
        image = Image.open(io.BytesIO(screenshot))
        width, height = image.size
        return {"width": width, "height": height}

    @property
    def interface(self):
        """Get the computer interface for interacting with the VM.

        Returns:
            The computer interface (wrapped with tracing if tracing is active)
        """
        if not hasattr(self, "_interface") or self._interface is None:
            error_msg = "Computer interface not initialized. Call run() first."
            self.logger.error(error_msg)
            self.logger.error(
                "Make sure to call await computer.run() before using any interface methods."
            )
            raise RuntimeError(error_msg)

        # Return tracing wrapper if tracing is active and we have an original interface
        if (
            self._tracing.is_tracing
            and hasattr(self, "_original_interface")
            and self._original_interface is not None
        ):
            # Create wrapper if it doesn't exist or if the original interface changed
            if (
                not hasattr(self, "_tracing_wrapper")
                or self._tracing_wrapper is None
                or self._tracing_wrapper._original_interface != self._original_interface
            ):
                self._tracing_wrapper = TracingInterfaceWrapper(
                    self._original_interface, self._tracing
                )
            return self._tracing_wrapper

        return self._interface

    @property
    def tracing(self) -> ComputerTracing:
        """Get the computer tracing instance for recording sessions.

        Returns:
            ComputerTracing: The tracing instance
        """
        return self._tracing

    @property
    def telemetry_enabled(self) -> bool:
        """Check if telemetry is enabled for this computer instance.

        Returns:
            bool: True if telemetry is enabled, False otherwise
        """
        return self._telemetry_enabled

    async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
        """Convert normalized coordinates to screen coordinates.

        Args:
            x: X coordinate between 0 and 1
            y: Y coordinate between 0 and 1

        Returns:
            tuple[float, float]: Screen coordinates (x, y)
        """
        return await self.interface.to_screen_coordinates(x, y)

    async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
        """Convert screen coordinates to screenshot coordinates.

        Args:
            x: X coordinate in screen space
            y: Y coordinate in screen space

        Returns:
            tuple[float, float]: (x, y) coordinates in screenshot space
        """
        return await self.interface.to_screenshot_coordinates(x, y)

    async def playwright_exec(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        """
        Execute a Playwright browser command.

        Args:
            command: The browser command to execute (visit_url, click, type, scroll, web_search)
            params: Command parameters

        Returns:
            Dict containing the command result

        Examples:
            # Navigate to a URL
            await computer.playwright_exec("visit_url", {"url": "https://example.com"})

            # Click at coordinates
            await computer.playwright_exec("click", {"x": 100, "y": 200})

            # Type text
            await computer.playwright_exec("type", {"text": "Hello, world!"})

            # Scroll
            await computer.playwright_exec("scroll", {"delta_x": 0, "delta_y": -100})

            # Web search
            await computer.playwright_exec("web_search", {"query": "computer use agent"})
        """
        return await self.interface.playwright_exec(command, params)

    # Add virtual environment management functions to computer interface
    async def venv_install(self, venv_name: str, requirements: list[str]):
        """Install packages in a virtual environment.

        Args:
            venv_name: Name of the virtual environment
            requirements: List of package requirements to install

        Returns:
            Tuple of (stdout, stderr) from the installation command
        """
        requirements = requirements or []
        # Windows vs POSIX handling
        if self.os_type == "windows":
            # Use %USERPROFILE% for home directory and cmd.exe semantics
            venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}"
            ensure_dir_cmd = 'if not exist "%USERPROFILE%\\.venvs" mkdir "%USERPROFILE%\\.venvs"'
            create_cmd = f'if not exist "{venv_path}" python -m venv "{venv_path}"'
            requirements_str = " ".join(requirements)
            # Activate via activate.bat and install
            install_cmd = (
                f'call "{venv_path}\\Scripts\\activate.bat" && pip install {requirements_str}'
                if requirements_str
                else "echo No requirements to install"
            )
            await self.interface.run_command(ensure_dir_cmd)
            await self.interface.run_command(create_cmd)
            return await self.interface.run_command(install_cmd)
        else:
            # POSIX (macOS/Linux)
            venv_path = f"$HOME/.venvs/{venv_name}"
            create_cmd = f'mkdir -p "$HOME/.venvs" && python -m venv "{venv_path}"'
            # Check if venv exists, if not create it
            check_cmd = f'test -d "{venv_path}" || ({create_cmd})'
            _ = await self.interface.run_command(check_cmd)
            # Install packages
            requirements_str = " ".join(requirements)
            install_cmd = (
                f'. "{venv_path}/bin/activate" && pip install {requirements_str}'
                if requirements_str
                else "echo No requirements to install"
            )
        return await self.interface.run_command(install_cmd)

    async def pip_install(self, requirements: list[str]):
        """Install packages using the system Python/pip (no venv).

        Args:
            requirements: List of package requirements to install globally/user site.

        Returns:
            Tuple of (stdout, stderr) from the installation command
        """
        requirements = requirements or []
        if not requirements:
            return await self.interface.run_command("echo No requirements to install")

        # Use python -m pip for cross-platform consistency
        reqs = " ".join(requirements)
        install_cmd = f"python -m pip install {reqs}"
        return await self.interface.run_command(install_cmd)

    async def venv_cmd(self, venv_name: str, command: str):
        """Execute a shell command in a virtual environment.

        Args:
            venv_name: Name of the virtual environment
            command: Shell command to execute in the virtual environment

        Returns:
            Tuple of (stdout, stderr) from the command execution
        """
        if self.os_type == "windows":
            # Windows (cmd.exe)
            venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}"
            # Check existence and signal if missing
            check_cmd = f'if not exist "{venv_path}" (echo VENV_NOT_FOUND) else (echo VENV_FOUND)'
            result = await self.interface.run_command(check_cmd)
            if "VENV_NOT_FOUND" in getattr(result, "stdout", ""):
                # Auto-create the venv with no requirements
                await self.venv_install(venv_name, [])
            # Activate and run the command
            full_command = f'call "{venv_path}\\Scripts\\activate.bat" && {command}'
            return await self.interface.run_command(full_command)
        else:
            # POSIX (macOS/Linux)
            venv_path = f"$HOME/.venvs/{venv_name}"
            # Check if virtual environment exists
            check_cmd = f'test -d "{venv_path}"'
            result = await self.interface.run_command(check_cmd)
            if result.stderr or "test:" in result.stdout:  # venv doesn't exist
                # Auto-create the venv with no requirements
                await self.venv_install(venv_name, [])
            # Activate virtual environment and run command
            full_command = f'. "{venv_path}/bin/activate" && {command}'
            return await self.interface.run_command(full_command)

    async def venv_exec(self, venv_name: str, python_func, *args, **kwargs):
        """Execute Python function in a virtual environment using source code extraction.

        Args:
            venv_name: Name of the virtual environment
            python_func: A callable function to execute
            *args: Positional arguments to pass to the function
            **kwargs: Keyword arguments to pass to the function

        Returns:
            The result of the function execution, or raises any exception that occurred
        """
        import base64
        import inspect
        import json
        import textwrap

        try:
            # Get function source code using inspect.getsource
            source = inspect.getsource(python_func)
            # Remove common leading whitespace (dedent)
            func_source = textwrap.dedent(source).strip()

            # Remove decorators
            while func_source.lstrip().startswith("@"):
                func_source = func_source.split("\n", 1)[1].strip()

            # Get function name for execution
            func_name = python_func.__name__

            # Serialize args and kwargs as JSON (safer than dill for cross-version compatibility)
            args_json = json.dumps(args, default=str)
            kwargs_json = json.dumps(kwargs, default=str)

        except OSError as e:
            raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
        except Exception as e:
            raise Exception(f"Failed to reconstruct function source: {e}")

        # Create Python code that will define and execute the function
        args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
        kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")

        python_code = f'''
import json
import traceback
import base64

try:
    # Define the function from source
{textwrap.indent(func_source, "    ")}
    
    # Deserialize args and kwargs from base64 JSON
    _args_b64 = """{args_b64}"""
    _kwargs_b64 = """{kwargs_b64}"""
    args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
    kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
    
    # Execute the function
    result = {func_name}(*args, **kwargs)

    # Create success output payload
    output_payload = {{
        "success": True,
        "result": result,
        "error": None
    }}
    
except Exception as e:
    # Create error output payload
    output_payload = {{
        "success": False,
        "result": None,
        "error": {{
            "type": type(e).__name__,
            "message": str(e),
            "traceback": traceback.format_exc()
        }}
    }}

# Serialize the output payload as JSON
import json
output_json = json.dumps(output_payload, default=str)

# Print the JSON output with markers
print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>")
'''

        # Encode the Python code in base64 to avoid shell escaping issues
        encoded_code = base64.b64encode(python_code.encode("utf-8")).decode("ascii")

        # Execute the Python code in the virtual environment
        python_command = (
            f"python -c \"import base64; exec(base64.b64decode('{encoded_code}').decode('utf-8'))\""
        )
        result = await self.venv_cmd(venv_name, python_command)

        # Parse the output to extract the payload
        start_marker = "<<<VENV_EXEC_START>>>"
        end_marker = "<<<VENV_EXEC_END>>>"

        # Print original stdout
        print(result.stdout[: result.stdout.find(start_marker)])

        if start_marker in result.stdout and end_marker in result.stdout:
            start_idx = result.stdout.find(start_marker) + len(start_marker)
            end_idx = result.stdout.find(end_marker)

            if start_idx < end_idx:
                output_json = result.stdout[start_idx:end_idx]

                try:
                    # Decode and deserialize the output payload from JSON
                    output_payload = json.loads(output_json)
                except Exception as e:
                    raise Exception(f"Failed to decode output payload: {e}")

                if output_payload["success"]:
                    return output_payload["result"]
                else:
                    import builtins

                    # Recreate and raise the original exception
                    error_info = output_payload.get("error", {}) or {}
                    err_type = error_info.get("type") or "Exception"
                    err_msg = error_info.get("message") or ""
                    err_tb = error_info.get("traceback") or ""

                    exc_cls = getattr(builtins, err_type, None)
                    if isinstance(exc_cls, type) and issubclass(exc_cls, BaseException):
                        # Built-in exception: rethrow with remote traceback appended
                        raise exc_cls(f"{err_msg}\n\nRemote traceback:\n{err_tb}")
                    else:
                        # Non built-in: raise a safe local error carrying full remote context
                        raise RuntimeError(f"{err_type}: {err_msg}\n\nRemote traceback:\n{err_tb}")
            else:
                raise Exception("Invalid output format: markers found but no content between them")
        else:
            # Fallback: return stdout/stderr if no payload markers found
            raise Exception(
                f"No output payload found. stdout: {result.stdout}, stderr: {result.stderr}"
            )

    async def venv_exec_background(
        self, venv_name: str, python_func, *args, requirements: Optional[List[str]] = None, **kwargs
    ) -> int:
        """Run the Python function in the venv in the background and return the PID.

        Uses a short launcher Python that spawns a detached child and exits immediately.
        """
        import base64
        import inspect
        import json
        import textwrap
        import time as _time

        try:
            source = inspect.getsource(python_func)
            func_source = textwrap.dedent(source).strip()
            while func_source.lstrip().startswith("@"):
                func_source = func_source.split("\n", 1)[1].strip()
            func_name = python_func.__name__
            args_json = json.dumps(args, default=str)
            kwargs_json = json.dumps(kwargs, default=str)
        except OSError as e:
            raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
        except Exception as e:
            raise Exception(f"Failed to reconstruct function source: {e}")

        reqs_list = requirements or []
        reqs_json = json.dumps(reqs_list)

        # Create Python code that will define and execute the function
        args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
        kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")

        payload_code = (
            f'''
import json
import traceback
import base64

try:
    # Define the function from source
{textwrap.indent(func_source, "    ")}
    
    # Deserialize args and kwargs from base64 JSON
    _args_b64 = """{args_b64}"""
    _kwargs_b64 = """{kwargs_b64}"""
    args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
    kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
    
    # Ensure requirements inside the active venv
    for pkg in json.loads('''
            + repr(reqs_json)
            + """):
        if pkg:
            import subprocess, sys
            subprocess.run([sys.executable, '-m', 'pip', 'install', pkg], check=False)
    _ = {func_name}(*args, **kwargs)
except Exception:
    import sys
    sys.stderr.write(traceback.format_exc())
"""
        )
        payload_b64 = base64.b64encode(payload_code.encode("utf-8")).decode("ascii")

        if self.os_type == "windows":
            # Launcher spawns detached child and prints its PID
            launcher_code = f"""
import base64, subprocess, os, sys
DETACHED_PROCESS = 0x00000008
CREATE_NEW_PROCESS_GROUP = 0x00000200
creationflags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP
code = base64.b64decode("{payload_b64}").decode("utf-8")
p = subprocess.Popen(["python", "-c", code], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags)
print(p.pid)
"""
            launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
            venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}"
            cmd = (
                'cmd /c "'
                f'call "{venv_path}\\Scripts\\activate.bat" && '
                f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
                '"'
            )
            result = await self.interface.run_command(cmd)
            pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
            return int(pid_str)
        else:
            log = f"/tmp/cua_bg_{int(_time.time())}.log"
            launcher_code = f"""
import base64, subprocess, os, sys
code = base64.b64decode("{payload_b64}").decode("utf-8")
with open("{log}", "ab", buffering=0) as f:
    p = subprocess.Popen(["python", "-c", code], stdout=f, stderr=subprocess.STDOUT, preexec_fn=getattr(os, "setsid", None))
print(p.pid)
"""
            launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
            venv_path = f"$HOME/.venvs/{venv_name}"
            shell = (
                f'. "{venv_path}/bin/activate" && '
                f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
            )
            result = await self.interface.run_command(shell)
            pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
            return int(pid_str)

    async def python_exec(self, python_func, *args, **kwargs):
        """Execute a Python function using the system Python (no venv).

        Uses source extraction and base64 transport, mirroring venv_exec but
        without virtual environment activation.

        Returns the function result or raises a reconstructed exception with
        remote traceback context appended.
        """
        import base64
        import inspect
        import json
        import textwrap

        try:
            source = inspect.getsource(python_func)
            func_source = textwrap.dedent(source).strip()
            while func_source.lstrip().startswith("@"):
                func_source = func_source.split("\n", 1)[1].strip()
            func_name = python_func.__name__
            args_json = json.dumps(args, default=str)
            kwargs_json = json.dumps(kwargs, default=str)
        except OSError as e:
            raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
        except Exception as e:
            raise Exception(f"Failed to reconstruct function source: {e}")

        # Create Python code that will define and execute the function
        args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
        kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")

        python_code = f'''
import json
import traceback
import base64

try:
    # Define the function from source
{textwrap.indent(func_source, "    ")}
    
    # Deserialize args and kwargs from base64 JSON
    _args_b64 = """{args_b64}"""
    _kwargs_b64 = """{kwargs_b64}"""
    args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
    kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
    
    # Execute the function
    result = {func_name}(*args, **kwargs)

    # Create success output payload
    output_payload = {{
        "success": True,
        "result": result,
        "error": None
    }}
    
except Exception as e:
    # Create error output payload
    output_payload = {{
        "success": False,
        "result": None,
        "error": {{
            "type": type(e).__name__,
            "message": str(e),
            "traceback": traceback.format_exc()
        }}
    }}

# Serialize the output payload as JSON
import json
output_json = json.dumps(output_payload, default=str)

# Print the JSON output with markers
print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>")
'''

        encoded_code = base64.b64encode(python_code.encode("utf-8")).decode("ascii")
        python_command = (
            f"python -c \"import base64; exec(base64.b64decode('{encoded_code}').decode('utf-8'))\""
        )
        result = await self.interface.run_command(python_command)

        start_marker = "<<<VENV_EXEC_START>>>"
        end_marker = "<<<VENV_EXEC_END>>>"

        print(result.stdout[: result.stdout.find(start_marker)])

        if start_marker in result.stdout and end_marker in result.stdout:
            start_idx = result.stdout.find(start_marker) + len(start_marker)
            end_idx = result.stdout.find(end_marker)
            if start_idx < end_idx:
                output_json = result.stdout[start_idx:end_idx]
                try:
                    output_payload = json.loads(output_json)
                except Exception as e:
                    raise Exception(f"Failed to decode output payload: {e}")

                if output_payload["success"]:
                    return output_payload["result"]
                else:
                    import builtins

                    error_info = output_payload.get("error", {}) or {}
                    err_type = error_info.get("type") or "Exception"
                    err_msg = error_info.get("message") or ""
                    err_tb = error_info.get("traceback") or ""
                    exc_cls = getattr(builtins, err_type, None)
                    if isinstance(exc_cls, type) and issubclass(exc_cls, BaseException):
                        raise exc_cls(f"{err_msg}\n\nRemote traceback:\n{err_tb}")
                    else:
                        raise RuntimeError(f"{err_type}: {err_msg}\n\nRemote traceback:\n{err_tb}")
            else:
                raise Exception("Invalid output format: markers found but no content between them")
        else:
            raise Exception(
                f"No output payload found. stdout: {result.stdout}, stderr: {result.stderr}"
            )

    async def python_exec_background(
        self, python_func, *args, requirements: Optional[List[str]] = None, **kwargs
    ) -> int:
        """Run a Python function with the system interpreter in the background and return PID.

        Uses a short launcher Python that spawns a detached child and exits immediately.
        """
        import base64
        import inspect
        import json
        import textwrap
        import time as _time

        try:
            source = inspect.getsource(python_func)
            func_source = textwrap.dedent(source).strip()
            while func_source.lstrip().startswith("@"):
                func_source = func_source.split("\n", 1)[1].strip()
            func_name = python_func.__name__
            args_json = json.dumps(args, default=str)
            kwargs_json = json.dumps(kwargs, default=str)
        except OSError as e:
            raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
        except Exception as e:
            raise Exception(f"Failed to reconstruct function source: {e}")

        # Create Python code that will define and execute the function
        args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
        kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")

        payload_code = f'''
import json
import traceback
import base64

try:
    # Define the function from source
{textwrap.indent(func_source, "    ")}
    
    # Deserialize args and kwargs from base64 JSON
    _args_b64 = """{args_b64}"""
    _kwargs_b64 = """{kwargs_b64}"""
    args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
    kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
    
    _ = {func_name}(*args, **kwargs)
except Exception:
    import sys
    sys.stderr.write(traceback.format_exc())
'''
        payload_b64 = base64.b64encode(payload_code.encode("utf-8")).decode("ascii")

        if self.os_type == "windows":
            launcher_code = f"""
import base64, subprocess, os, sys
DETACHED_PROCESS = 0x00000008
CREATE_NEW_PROCESS_GROUP = 0x00000200
creationflags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP
code = base64.b64decode("{payload_b64}").decode("utf-8")
p = subprocess.Popen(["python", "-c", code], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags)
print(p.pid)
"""
            launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
            cmd = f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
            result = await self.interface.run_command(cmd)
            pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
            return int(pid_str)
        else:
            log = f"/tmp/cua_bg_{int(_time.time())}.log"
            launcher_code = f"""
import base64, subprocess, os, sys
code = base64.b64decode("{payload_b64}").decode("utf-8")
with open("{log}", "ab", buffering=0) as f:
    p = subprocess.Popen(["python", "-c", code], stdout=f, stderr=subprocess.STDOUT, preexec_fn=getattr(os, "setsid", None))
print(p.pid)
"""
            launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
            cmd = f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
            result = await self.interface.run_command(cmd)
            pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
            return int(pid_str)

    def python_command(
        self,
        requirements: Optional[List[str]] = None,
        *,
        venv_name: str = "default",
        use_system_python: bool = False,
        background: bool = False,
    ) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]:
        """Decorator to execute a Python function remotely in this Computer's venv.

        This mirrors `computer.helpers.sandboxed()` but binds to this instance and
        optionally ensures required packages are installed before execution.

        Args:
            requirements: Packages to install in the virtual environment.
            venv_name: Name of the virtual environment to use.
            use_system_python: If True, use the system Python/pip instead of a venv.
            background: If True, run the function detached and return the child PID immediately.

        Returns:
            A decorator that turns a local function into an async callable which
            runs remotely and returns the function's result.
        """

        reqs = list(requirements or [])

        def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
            @wraps(func)
            async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
                if use_system_python:
                    # For background, avoid blocking installs; install inside child process
                    if background:
                        return await self.python_exec_background(func, *args, requirements=reqs, **kwargs)  # type: ignore[return-value]
                    # Foreground: install first, then execute
                    if reqs:
                        await self.pip_install(reqs)
                    return await self.python_exec(func, *args, **kwargs)
                else:
                    # For background, avoid blocking installs; install inside child process under venv
                    if background:
                        return await self.venv_exec_background(venv_name, func, *args, requirements=reqs, **kwargs)  # type: ignore[return-value]
                    # Foreground: ensure venv and install, then execute
                    await self.venv_install(venv_name, reqs)
                    return await self.venv_exec(venv_name, func, *args, **kwargs)

            return wrapper

        return decorator

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/anthropic.py:
--------------------------------------------------------------------------------

```python
"""
Anthropic hosted tools agent loop implementation using liteLLM
"""

import asyncio
import json
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union

import litellm
from litellm.responses.litellm_completion_transformation.transformation import (
    LiteLLMCompletionResponsesConfig,
)

from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..responses import (
    make_click_item,
    make_double_click_item,
    make_drag_item,
    make_failed_tool_call_items,
    make_input_image_item,
    make_keypress_item,
    make_left_mouse_down_item,
    make_left_mouse_up_item,
    make_move_item,
    make_output_text_item,
    make_reasoning_item,
    make_screenshot_item,
    make_scroll_item,
    make_type_item,
    make_wait_item,
)
from ..types import AgentCapability, AgentResponse, Messages, Tools

# Model version mapping to tool version and beta flag
MODEL_TOOL_MAPPING = [
    # Claude 4 models
    {
        "pattern": r"claude-4|claude-opus-4|claude-sonnet-4|claude-haiku-4",
        "tool_version": "computer_20250124",
        "beta_flag": "computer-use-2025-01-24",
    },
    # Claude 3.7 models
    {
        "pattern": r"claude-3\.?7|claude-3-7",
        "tool_version": "computer_20250124",
        "beta_flag": "computer-use-2025-01-24",
    },
    # Claude 3.5 models (fallback)
    {
        "pattern": r"claude-3\.?5|claude-3-5",
        "tool_version": "computer_20241022",
        "beta_flag": "computer-use-2024-10-22",
    },
]


def _get_tool_config_for_model(model: str) -> Dict[str, str]:
    """Get tool version and beta flag for the given model."""
    import re

    for mapping in MODEL_TOOL_MAPPING:
        if re.search(mapping["pattern"], model, re.IGNORECASE):
            return {"tool_version": mapping["tool_version"], "beta_flag": mapping["beta_flag"]}

    # Default to Claude 3.5 configuration
    return {"tool_version": "computer_20241022", "beta_flag": "computer-use-2024-10-22"}


async def _map_computer_tool_to_anthropic(computer_tool: Any, tool_version: str) -> Dict[str, Any]:
    """Map a computer tool to Anthropic's hosted tool schema."""
    # Get dimensions from the computer handler
    try:
        width, height = await computer_tool.get_dimensions()
    except Exception:
        # Fallback to default dimensions if method fails
        width, height = 1024, 768

    return {
        "type": tool_version,
        "function": {
            "name": "computer",
            "parameters": {
                "display_height_px": height,
                "display_width_px": width,
                "display_number": 1,
            },
        },
    }


async def _prepare_tools_for_anthropic(tool_schemas: List[Dict[str, Any]], model: str) -> Tools:
    """Prepare tools for Anthropic API format."""
    tool_config = _get_tool_config_for_model(model)
    anthropic_tools = []

    for schema in tool_schemas:
        if schema["type"] == "computer":
            # Map computer tool to Anthropic format
            anthropic_tools.append(
                await _map_computer_tool_to_anthropic(
                    schema["computer"], tool_config["tool_version"]
                )
            )
        elif schema["type"] == "function":
            # Function tools - convert to Anthropic format
            function_schema = schema["function"]
            anthropic_tools.append(
                {
                    "name": function_schema["name"],
                    "description": function_schema.get("description", ""),
                    "input_schema": function_schema.get("parameters", {}),
                }
            )

    return anthropic_tools


def _convert_responses_items_to_completion_messages(messages: Messages) -> List[Dict[str, Any]]:
    """Convert responses_items message format to liteLLM completion format."""
    completion_messages = []
    call_id_to_fn_name = {}

    for message in messages:
        msg_type = message.get("type")
        role = message.get("role")

        # Handle user messages (both with and without explicit type)
        if role == "user" or msg_type == "user":
            content = message.get("content", "")
            if isinstance(content, list):
                # Multi-modal content - convert input_image to image format
                converted_content = []
                for item in content:
                    if isinstance(item, dict) and item.get("type") == "input_image":
                        # Convert input_image to OpenAI image format
                        image_url = item.get("image_url", "")
                        if image_url and image_url != "[omitted]":
                            converted_content.append(
                                {"type": "image_url", "image_url": {"url": image_url}}
                            )
                    elif isinstance(item, dict) and item.get("type") == "input_text":
                        # Convert input_text to OpenAI text format
                        text = item.get("text", "")
                        converted_content.append({"type": "text", "text": text})
                    else:
                        # Keep other content types as-is
                        converted_content.append(item)

                completion_messages.append(
                    {"role": "user", "content": converted_content if converted_content else content}
                )
            else:
                # Text content
                completion_messages.append({"role": "user", "content": content})

        # Handle assistant messages
        elif role == "assistant":
            content = message.get("content", [])
            if isinstance(content, str):
                content = [{"type": "output_text", "text": content}]

            content = "\n".join(item.get("text", "") for item in content)
            completion_messages.append({"role": "assistant", "content": content})

        elif msg_type == "reasoning":
            # Reasoning becomes part of assistant message
            summary = message.get("summary", [])
            reasoning_text = ""

            if isinstance(summary, list) and summary:
                # Extract text from summary items
                for item in summary:
                    if isinstance(item, dict) and item.get("type") == "summary_text":
                        reasoning_text = item.get("text", "")
                        break
            else:
                # Fallback to direct reasoning field
                reasoning_text = message.get("reasoning", "")

            if reasoning_text:
                completion_messages.append({"role": "assistant", "content": reasoning_text})

        elif msg_type == "function_call":
            fn_name = message.get("name")
            fn_args = message.get("arguments", "{}")
            call_id = message.get("call_id", "call_1")
            call_id_to_fn_name[call_id] = fn_name
            openai_tool_calls = [
                {
                    "id": call_id,
                    "type": "function",
                    "function": {"name": fn_name, "arguments": fn_args},
                }
            ]  # If the last completion message is an assistant message, extend the tool_calls
            if completion_messages and completion_messages[-1].get("role") == "assistant":
                if "tool_calls" not in completion_messages[-1]:
                    completion_messages[-1]["tool_calls"] = []
                completion_messages[-1]["tool_calls"].extend(openai_tool_calls)
            else:
                # Create new assistant message with tool calls
                completion_messages.append(
                    {"role": "assistant", "content": None, "tool_calls": openai_tool_calls}
                )

        elif msg_type == "function_call_output":
            call_id = message.get("call_id", "call_1")
            fn_output = message.get("output", "")
            fn_name = call_id_to_fn_name.get(call_id, "computer")

            completion_messages.append(
                {
                    "role": "function",
                    "name": fn_name,
                    "tool_call_id": call_id,
                    "content": str(fn_output),
                }
            )

        elif msg_type == "computer_call":
            # Computer call becomes tool use in assistant message
            action = message.get("action", {})
            action_type = action.get("type")
            call_id = message.get("call_id", "call_1")

            tool_use_content = []

            # Basic actions (all versions)
            if action_type == "click":
                # Input:
                # {
                #     "type": "computer_call",
                #     "call_id": "call_1",
                #     "action": {
                #         "type": "click",
                #         "x": 100,
                #         "y": 200
                #     }
                # }

                # Output:
                # {
                #     "function": {
                #         "name": "computer",
                #         "arguments": json.dumps({
                #             "action": "click",
                #             "coordinate": [100, 200]
                #         })
                #     },
                #     "id": "call_1",
                #     "type": "function"
                # }
                button = action.get("button", "left")
                action_name = (
                    "right_click"
                    if button == "right"
                    else "middle_click" if button == "wheel" else "left_click"
                )
                tool_use_content.append(
                    {
                        "type": "tool_use",
                        "id": call_id,
                        "name": "computer",
                        "input": {
                            "action": action_name,
                            "coordinate": [action.get("x", 0), action.get("y", 0)],
                        },
                    }
                )
            elif action_type == "double_click":
                # Input:
                # {
                #     "type": "computer_call",
                #     "call_id": "call_1",
                #     "action": {
                #         "type": "double_click",
                #         "x": 160,
                #         "y": 240
                #     }
                # }

                # Output:
                # {
                #     "function": {
                #         "name": "computer",
                #         "arguments": json.dumps({
                #             "action": "double_click",
                #             "coordinate": [160, 240]
                #         })
                #     },
                #     "id": "call_1",
                #     "type": "function"
                # }
                tool_use_content.append(
                    {
                        "type": "tool_use",
                        "id": call_id,
                        "name": "computer",
                        "input": {
                            "action": "double_click",
                            "coordinate": [action.get("x", 0), action.get("y", 0)],
                        },
                    }
                )
            elif action_type == "type":
                # Input:
                # {
                #     "type": "computer_call",
                #     "call_id": "call_1",
                #     "action": {
                #         "type": "type",
                #         "text": "Hello World"
                #     }
                # }

                # Output:
                # {
                #     "function": {
                #         "name": "computer",
                #         "arguments": json.dumps({
                #             "action": "type",
                #             "text": "Hello World"
                #         })
                #     },
                #     "id": "call_1",
                #     "type": "function"
                # }
                tool_use_content.append(
                    {
                        "type": "tool_use",
                        "id": call_id,
                        "name": "computer",
                        "input": {"action": "type", "text": action.get("text", "")},
                    }
                )
            elif action_type == "keypress":
                # Input:
                # {
                #     "type": "computer_call",
                #     "call_id": "call_1",
                #     "action": {
                #         "type": "keypress",
                #         "keys": ["ctrl", "c"]
                #     }
                # }

                # Output:
                # {
                #     "function": {
                #         "name": "computer",
                #         "arguments": json.dumps({
                #             "action": "key",
                #             "text": "ctrl+c"
                #         })
                #     },
                #     "id": "call_1",
                #     "type": "function"
                # }
                tool_use_content.append(
                    {
                        "type": "tool_use",
                        "id": call_id,
                        "name": "computer",
                        "input": {"action": "key", "text": "+".join(action.get("keys", []))},
                    }
                )
            elif action_type in ["mouse_move", "move"]:
                # Input:
                # {
                #     "type": "computer_call",
                #     "call_id": "call_1",
                #     "action": {
                #         "type": "move",
                #         "x": 150,
                #         "y": 250
                #     }
                # }

                # Output:
                # {
                #     "function": {
                #         "name": "computer",
                #         "arguments": json.dumps({
                #             "action": "mouse_move",
                #             "coordinate": [150, 250]
                #         })
                #     },
                #     "id": "call_1",
                #     "type": "function"
                # }
                tool_use_content.append(
                    {
                        "type": "tool_use",
                        "id": call_id,
                        "name": "computer",
                        "input": {
                            "action": "mouse_move",
                            "coordinate": [action.get("x", 0), action.get("y", 0)],
                        },
                    }
                )
            elif action_type == "scroll":
                # Input:
                # {
                #     "type": "computer_call",
                #     "call_id": "call_1",
                #     "action": {
                #         "type": "scroll",
                #         "x": 300,
                #         "y": 400,
                #         "scroll_x": 0,
                #         "scroll_y": -5
                #     }
                # }

                # Output:
                # {
                #     "function": {
                #         "name": "computer",
                #         "arguments": json.dumps({
                #             "action": "scroll",
                #             "coordinate": [300, 400],
                #             "scroll_direction": "down",
                #             "scroll_amount": 5
                #         })
                #     },
                #     "id": "call_1",
                #     "type": "function"
                # }
                scroll_x = action.get("scroll_x", 0)
                scroll_y = action.get("scroll_y", 0)
                # Determine direction and amount from scroll values
                if scroll_x > 0:
                    direction = "left"
                    amount = scroll_x
                elif scroll_x < 0:
                    direction = "right"
                    amount = -scroll_x
                elif scroll_y > 0:
                    direction = "up"
                    amount = scroll_y
                elif scroll_y < 0:
                    direction = "down"
                    amount = -scroll_y
                else:
                    direction = "down"
                    amount = 3

                tool_use_content.append(
                    {
                        "type": "tool_use",
                        "id": call_id,
                        "name": "computer",
                        "input": {
                            "action": "scroll",
                            "coordinate": [action.get("x", 0), action.get("y", 0)],
                            "scroll_direction": direction,
                            "scroll_amount": amount,
                        },
                    }
                )
            elif action_type == "drag":
                # Input:
                # {
                #     "type": "computer_call",
                #     "call_id": "call_1",
                #     "action": {
                #         "type": "drag",
                #         "path": [
                #             {"x": 100, "y": 150},
                #             {"x": 200, "y": 250}
                #         ]
                #     }
                # }

                # Output:
                # {
                #     "function": {
                #         "name": "computer",
                #         "arguments": json.dumps({
                #             "action": "left_click_drag",
                #             "start_coordinate": [100, 150],
                #             "end_coordinate": [200, 250]
                #         })
                #     },
                #     "id": "call_1",
                #     "type": "function"
                # }
                path = action.get("path", [])
                start_coord = [0, 0]
                end_coord = [0, 0]
                if isinstance(path, list) and len(path) >= 2:
                    start_coord = [path[0].get("x", 0), path[0].get("y", 0)]
                    end_coord = [path[-1].get("x", 0), path[-1].get("y", 0)]

                tool_use_content.append(
                    {
                        "type": "tool_use",
                        "id": call_id,
                        "name": "computer",
                        "input": {
                            "action": "left_click_drag",
                            "start_coordinate": start_coord,
                            "end_coordinate": end_coord,
                        },
                    }
                )
            elif action_type == "wait":
                # Input:
                # {
                #     "type": "computer_call",
                #     "call_id": "call_1",
                #     "action": {
                #         "type": "wait"
                #     }
                # }

                # Output:
                # {
                #     "function": {
                #         "name": "computer",
                #         "arguments": json.dumps({
                #             "action": "wait"
                #         })
                #     },
                #     "id": "call_1",
                #     "type": "function"
                # }
                tool_use_content.append(
                    {
                        "type": "tool_use",
                        "id": call_id,
                        "name": "computer",
                        "input": {"action": "wait"},
                    }
                )
            elif action_type == "screenshot":
                # Input:
                # {
                #     "type": "computer_call",
                #     "call_id": "call_1",
                #     "action": {
                #         "type": "screenshot"
                #     }
                # }

                # Output:
                # {
                #     "function": {
                #         "name": "computer",
                #         "arguments": json.dumps({
                #             "action": "screenshot"
                #         })
                #     },
                #     "id": "call_1",
                #     "type": "function"
                # }
                tool_use_content.append(
                    {
                        "type": "tool_use",
                        "id": call_id,
                        "name": "computer",
                        "input": {"action": "screenshot"},
                    }
                )
            elif action_type == "left_mouse_down":
                tool_use_content.append(
                    {
                        "type": "tool_use",
                        "id": call_id,
                        "name": "computer",
                        "input": {
                            "action": "left_mouse_down",
                            "coordinate": [action.get("x", None), action.get("y", None)],
                        },
                    }
                )
            elif action_type == "left_mouse_up":
                tool_use_content.append(
                    {
                        "type": "tool_use",
                        "id": call_id,
                        "name": "computer",
                        "input": {
                            "action": "left_mouse_up",
                            "coordinate": [action.get("x", None), action.get("y", None)],
                        },
                    }
                )

            # Convert tool_use_content to OpenAI tool_calls format
            openai_tool_calls = []
            for tool_use in tool_use_content:
                openai_tool_calls.append(
                    {
                        "id": tool_use["id"],
                        "type": "function",
                        "function": {
                            "name": tool_use["name"],
                            "arguments": json.dumps(tool_use["input"]),
                        },
                    }
                )

            # If the last completion message is an assistant message, extend the tool_calls
            if completion_messages and completion_messages[-1].get("role") == "assistant":
                if "tool_calls" not in completion_messages[-1]:
                    completion_messages[-1]["tool_calls"] = []
                completion_messages[-1]["tool_calls"].extend(openai_tool_calls)
            else:
                # Create new assistant message with tool calls
                completion_messages.append(
                    {"role": "assistant", "content": None, "tool_calls": openai_tool_calls}
                )

        elif msg_type == "computer_call_output":
            # Computer call output becomes OpenAI function result
            output = message.get("output", {})
            call_id = message.get("call_id", "call_1")

            if output.get("type") == "input_image":
                # Screenshot result - convert to OpenAI format with image_url content
                image_url = output.get("image_url", "")
                completion_messages.append(
                    {
                        "role": "function",
                        "name": "computer",
                        "tool_call_id": call_id,
                        "content": [{"type": "image_url", "image_url": {"url": image_url}}],
                    }
                )
            else:
                # Text result - convert to OpenAI format
                completion_messages.append(
                    {
                        "role": "function",
                        "name": "computer",
                        "tool_call_id": call_id,
                        "content": str(output),
                    }
                )

    return completion_messages


def _convert_completion_to_responses_items(response: Any) -> List[Dict[str, Any]]:
    """Convert liteLLM completion response to responses_items message format."""
    responses_items = []

    if not response or not hasattr(response, "choices") or not response.choices:
        return responses_items

    choice = response.choices[0]
    message = choice.message

    # Handle text content
    if hasattr(message, "content") and message.content:
        if isinstance(message.content, str):
            responses_items.append(make_output_text_item(message.content))
        elif isinstance(message.content, list):
            for content_item in message.content:
                if isinstance(content_item, dict):
                    if content_item.get("type") == "text":
                        responses_items.append(make_output_text_item(content_item.get("text", "")))
                    elif content_item.get("type") == "tool_use":
                        # Check if this is a custom function tool or computer tool
                        tool_name = content_item.get("name", "computer")
                        tool_input = content_item.get("input", {})
                        call_id = content_item.get("id")

                        # Handle custom function tools (not computer tools)
                        if tool_name != "computer":
                            from ..responses import make_function_call_item

                            responses_items.append(
                                make_function_call_item(
                                    function_name=tool_name, arguments=tool_input, call_id=call_id
                                )
                            )
                            continue

                        # Computer tool - process actions
                        action_type = tool_input.get("action")

                        # Action reference:
                        # https://docs.anthropic.com/en/docs/agents-and-tools/tool-use/computer-use-tool#available-actions

                        try:
                            # Basic actions (all versions)
                            if action_type == "screenshot":
                                responses_items.append(make_screenshot_item(call_id=call_id))
                            elif action_type in ["click", "left_click"]:
                                coordinate = tool_input.get("coordinate", [0, 0])
                                responses_items.append(
                                    make_click_item(
                                        x=coordinate[0] if len(coordinate) > 0 else 0,
                                        y=coordinate[1] if len(coordinate) > 1 else 0,
                                        call_id=call_id,
                                    )
                                )
                            elif action_type in ["type", "type_text"]:
                                responses_items.append(
                                    make_type_item(text=tool_input.get("text", ""), call_id=call_id)
                                )
                            elif action_type in ["key", "keypress", "hotkey"]:
                                responses_items.append(
                                    make_keypress_item(
                                        keys=tool_input.get("text", "")
                                        .replace("+", "-")
                                        .split("-"),
                                        call_id=call_id,
                                    )
                                )
                            elif action_type in ["mouse_move", "move_cursor", "move"]:
                                # Mouse move - create a custom action item
                                coordinate = tool_input.get("coordinate", [0, 0])
                                responses_items.append(
                                    make_move_item(
                                        x=coordinate[0] if len(coordinate) > 0 else 0,
                                        y=coordinate[1] if len(coordinate) > 1 else 0,
                                        call_id=call_id,
                                    )
                                )

                            # Enhanced actions (computer_20250124) Available in Claude 4 and Claude Sonnet 3.7
                            elif action_type == "scroll":
                                coordinate = tool_input.get("coordinate", [0, 0])
                                scroll_amount = tool_input.get("scroll_amount", 3)
                                scroll_x = (
                                    scroll_amount
                                    if tool_input.get("scroll_direction", "down") == "right"
                                    else (
                                        -scroll_amount
                                        if tool_input.get("scroll_direction", "down") == "left"
                                        else 0
                                    )
                                )
                                scroll_y = (
                                    scroll_amount
                                    if tool_input.get("scroll_direction", "down") == "down"
                                    else (
                                        -scroll_amount
                                        if tool_input.get("scroll_direction", "down") == "up"
                                        else 0
                                    )
                                )
                                responses_items.append(
                                    make_scroll_item(
                                        x=coordinate[0] if len(coordinate) > 0 else 0,
                                        y=coordinate[1] if len(coordinate) > 1 else 0,
                                        scroll_x=scroll_x,
                                        scroll_y=scroll_y,
                                        call_id=call_id,
                                    )
                                )
                            elif action_type in ["left_click_drag", "drag"]:
                                start_coord = tool_input.get("start_coordinate", [0, 0])
                                end_coord = tool_input.get("end_coordinate", [0, 0])
                                responses_items.append(
                                    make_drag_item(
                                        path=[
                                            {
                                                "x": start_coord[0] if len(start_coord) > 0 else 0,
                                                "y": start_coord[1] if len(start_coord) > 1 else 0,
                                            },
                                            {
                                                "x": end_coord[0] if len(end_coord) > 0 else 0,
                                                "y": end_coord[1] if len(end_coord) > 1 else 0,
                                            },
                                        ],
                                        call_id=call_id,
                                    )
                                )
                            elif action_type == "right_click":
                                coordinate = tool_input.get("coordinate", [0, 0])
                                responses_items.append(
                                    make_click_item(
                                        x=coordinate[0] if len(coordinate) > 0 else 0,
                                        y=coordinate[1] if len(coordinate) > 1 else 0,
                                        button="right",
                                        call_id=call_id,
                                    )
                                )
                            elif action_type == "middle_click":
                                coordinate = tool_input.get("coordinate", [0, 0])
                                responses_items.append(
                                    make_click_item(
                                        x=coordinate[0] if len(coordinate) > 0 else 0,
                                        y=coordinate[1] if len(coordinate) > 1 else 0,
                                        button="wheel",
                                        call_id=call_id,
                                    )
                                )
                            elif action_type == "double_click":
                                coordinate = tool_input.get("coordinate", [0, 0])
                                responses_items.append(
                                    make_double_click_item(
                                        x=coordinate[0] if len(coordinate) > 0 else 0,
                                        y=coordinate[1] if len(coordinate) > 1 else 0,
                                        call_id=call_id,
                                    )
                                )
                            elif action_type == "triple_click":
                                # coordinate = tool_input.get("coordinate", [0, 0])
                                # responses_items.append({
                                #     "type": "computer_call",
                                #     "call_id": call_id,
                                #     "action": {
                                #         "type": "triple_click",
                                #         "x": coordinate[0] if len(coordinate) > 0 else 0,
                                #         "y": coordinate[1] if len(coordinate) > 1 else 0
                                #     }
                                # })
                                raise NotImplementedError("triple_click")
                            elif action_type == "left_mouse_down":
                                # coordinate = tool_input.get("coordinate", [0, 0])
                                # responses_items.append({
                                #     "type": "computer_call",
                                #     "call_id": call_id,
                                #     "action": {
                                #         "type": "mouse_down",
                                #         "button": "left",
                                #         "x": coordinate[0] if len(coordinate) > 0 else 0,
                                #         "y": coordinate[1] if len(coordinate) > 1 else 0
                                #     }
                                # })
                                coordinate = tool_input.get("coordinate", [None, None])
                                responses_items.append(
                                    make_left_mouse_down_item(
                                        x=coordinate[0] if len(coordinate) > 0 else None,
                                        y=coordinate[1] if len(coordinate) > 1 else None,
                                        call_id=call_id,
                                    )
                                )
                            elif action_type == "left_mouse_up":
                                # coordinate = tool_input.get("coordinate", [0, 0])
                                # responses_items.append({
                                #     "type": "computer_call",
                                #     "call_id": call_id,
                                #     "action": {
                                #         "type": "mouse_up",
                                #         "button": "left",
                                #         "x": coordinate[0] if len(coordinate) > 0 else 0,
                                #         "y": coordinate[1] if len(coordinate) > 1 else 0
                                #     }
                                # })
                                coordinate = tool_input.get("coordinate", [None, None])
                                responses_items.append(
                                    make_left_mouse_up_item(
                                        x=coordinate[0] if len(coordinate) > 0 else None,
                                        y=coordinate[1] if len(coordinate) > 1 else None,
                                        call_id=call_id,
                                    )
                                )
                            elif action_type == "hold_key":
                                # responses_items.append({
                                #     "type": "computer_call",
                                #     "call_id": call_id,
                                #     "action": {
                                #         "type": "key_hold",
                                #         "key": tool_input.get("key", "")
                                #     }
                                # })
                                raise NotImplementedError("hold_key")
                            elif action_type == "wait":
                                responses_items.append(make_wait_item(call_id=call_id))
                            else:
                                raise ValueError(f"Unknown action type: {action_type}")
                        except Exception as e:
                            responses_items.extend(
                                make_failed_tool_call_items(
                                    tool_name="computer",
                                    tool_kwargs=tool_input,
                                    error_message=repr(e),
                                    call_id=call_id,
                                )
                            )

    # Handle tool calls (alternative format)
    if hasattr(message, "tool_calls") and message.tool_calls:
        for tool_call in message.tool_calls:
            tool_name = tool_call.function.name

            # Handle custom function tools
            if tool_name != "computer":
                from ..responses import make_function_call_item

                # tool_call.function.arguments is a JSON string, need to parse it
                try:
                    args_dict = json.loads(tool_call.function.arguments)
                except json.JSONDecodeError:
                    args_dict = {}
                responses_items.append(
                    make_function_call_item(
                        function_name=tool_name, arguments=args_dict, call_id=tool_call.id
                    )
                )
                continue

            # Handle computer tool
            if tool_call.function.name == "computer":
                try:
                    try:
                        args = json.loads(tool_call.function.arguments)
                        action_type = args.get("action")
                        call_id = tool_call.id

                        # Basic actions (all versions)
                        if action_type == "screenshot":
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "screenshot"
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "screenshot"
                            #     }
                            # }
                            responses_items.append(make_screenshot_item(call_id=call_id))
                        elif action_type in ["click", "left_click"]:
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "click",
                            #             "coordinate": [100, 200]
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "click",
                            #         "x": 100,
                            #         "y": 200
                            #     }
                            # }
                            coordinate = args.get("coordinate", [0, 0])
                            responses_items.append(
                                make_click_item(
                                    x=coordinate[0] if len(coordinate) > 0 else 0,
                                    y=coordinate[1] if len(coordinate) > 1 else 0,
                                    call_id=call_id,
                                )
                            )
                        elif action_type in ["type", "type_text"]:
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "type",
                            #             "text": "Hello World"
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "type",
                            #         "text": "Hello World"
                            #     }
                            # }
                            responses_items.append(
                                make_type_item(text=args.get("text", ""), call_id=call_id)
                            )
                        elif action_type in ["key", "keypress", "hotkey"]:
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "key",
                            #             "text": "ctrl+c"
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "keypress",
                            #         "keys": ["ctrl", "c"]
                            #     }
                            # }
                            responses_items.append(
                                make_keypress_item(
                                    keys=args.get("text", "").replace("+", "-").split("-"),
                                    call_id=call_id,
                                )
                            )
                        elif action_type in ["mouse_move", "move_cursor", "move"]:
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "mouse_move",
                            #             "coordinate": [150, 250]
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "mouse_move",
                            #         "x": 150,
                            #         "y": 250
                            #     }
                            # }
                            coordinate = args.get("coordinate", [0, 0])
                            responses_items.append(
                                make_move_item(
                                    x=coordinate[0] if len(coordinate) > 0 else 0,
                                    y=coordinate[1] if len(coordinate) > 1 else 0,
                                    call_id=call_id,
                                )
                            )

                        # Enhanced actions (computer_20250124) Available in Claude 4 and Claude Sonnet 3.7
                        elif action_type == "scroll":
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "scroll",
                            #             "coordinate": [300, 400],
                            #             "scroll_direction": "down",
                            #             "scroll_amount": 5
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "scroll",
                            #         "x": 300,
                            #         "y": 400,
                            #         "scroll_x": 0,
                            #         "scroll_y": -5
                            #     }
                            # }
                            coordinate = args.get("coordinate", [0, 0])
                            direction = args.get("scroll_direction", "down")
                            amount = args.get("scroll_amount", 3)
                            scroll_x = (
                                amount
                                if direction == "left"
                                else -amount if direction == "right" else 0
                            )
                            scroll_y = (
                                amount
                                if direction == "up"
                                else -amount if direction == "down" else 0
                            )
                            responses_items.append(
                                make_scroll_item(
                                    x=coordinate[0] if len(coordinate) > 0 else 0,
                                    y=coordinate[1] if len(coordinate) > 1 else 0,
                                    scroll_x=scroll_x,
                                    scroll_y=scroll_y,
                                    call_id=call_id,
                                )
                            )
                        elif action_type in ["left_click_drag", "drag"]:
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "left_click_drag",
                            #             "start_coordinate": [100, 150],
                            #             "end_coordinate": [200, 250]
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "drag",
                            #         "path": [
                            #             {"x": 100, "y": 150},
                            #             {"x": 200, "y": 250}
                            #         ]
                            #     }
                            # }
                            start_coord = args.get("start_coordinate", [0, 0])
                            end_coord = args.get("end_coordinate", [0, 0])
                            responses_items.append(
                                make_drag_item(
                                    path=[
                                        {
                                            "x": start_coord[0] if len(start_coord) > 0 else 0,
                                            "y": start_coord[1] if len(start_coord) > 1 else 0,
                                        },
                                        {
                                            "x": end_coord[0] if len(end_coord) > 0 else 0,
                                            "y": end_coord[1] if len(end_coord) > 1 else 0,
                                        },
                                    ],
                                    call_id=call_id,
                                )
                            )
                        elif action_type == "right_click":
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "right_click",
                            #             "coordinate": [120, 180]
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "click",
                            #         "x": 120,
                            #         "y": 180,
                            #         "button": "right"
                            #     }
                            # }
                            coordinate = args.get("coordinate", [0, 0])
                            responses_items.append(
                                make_click_item(
                                    x=coordinate[0] if len(coordinate) > 0 else 0,
                                    y=coordinate[1] if len(coordinate) > 1 else 0,
                                    button="right",
                                    call_id=call_id,
                                )
                            )
                        elif action_type == "middle_click":
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "middle_click",
                            #             "coordinate": [140, 220]
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "click",
                            #         "x": 140,
                            #         "y": 220,
                            #         "button": "wheel"
                            #     }
                            # }
                            coordinate = args.get("coordinate", [0, 0])
                            responses_items.append(
                                make_click_item(
                                    x=coordinate[0] if len(coordinate) > 0 else 0,
                                    y=coordinate[1] if len(coordinate) > 1 else 0,
                                    button="wheel",
                                    call_id=call_id,
                                )
                            )
                        elif action_type == "double_click":
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "double_click",
                            #             "coordinate": [160, 240]
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "double_click",
                            #         "x": 160,
                            #         "y": 240
                            #     }
                            # }
                            coordinate = args.get("coordinate", [0, 0])
                            responses_items.append(
                                make_double_click_item(
                                    x=coordinate[0] if len(coordinate) > 0 else 0,
                                    y=coordinate[1] if len(coordinate) > 1 else 0,
                                    call_id=call_id,
                                )
                            )
                        elif action_type == "triple_click":
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "triple_click",
                            #             "coordinate": [180, 260]
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "triple_click",
                            #         "x": 180,
                            #         "y": 260
                            #     }
                            # }
                            raise NotImplementedError("triple_click")
                        elif action_type == "left_mouse_down":
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "left_mouse_down",
                            #             "coordinate": [200, 280]
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "mouse_down",
                            #         "button": "left",
                            #         "x": 200,
                            #         "y": 280
                            #     }
                            # }
                            coordinate = args.get("coordinate", [None, None])
                            responses_items.append(
                                make_left_mouse_down_item(
                                    x=coordinate[0] if len(coordinate) > 0 else None,
                                    y=coordinate[1] if len(coordinate) > 1 else None,
                                    call_id=call_id,
                                )
                            )
                        elif action_type == "left_mouse_up":
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "left_mouse_up",
                            #             "coordinate": [220, 300]
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "mouse_up",
                            #         "button": "left",
                            #         "x": 220,
                            #         "y": 300
                            #     }
                            # }
                            coordinate = args.get("coordinate", [None, None])
                            responses_items.append(
                                make_left_mouse_up_item(
                                    x=coordinate[0] if len(coordinate) > 0 else None,
                                    y=coordinate[1] if len(coordinate) > 1 else None,
                                    call_id=call_id,
                                )
                            )
                        elif action_type == "hold_key":
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "hold_key",
                            #             "key": "shift"
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "key_hold",
                            #         "key": "shift"
                            #     }
                            # }
                            raise NotImplementedError("hold_key")
                        elif action_type == "wait":
                            # Input:
                            # {
                            #     "function": {
                            #         "name": "computer",
                            #         "arguments": json.dumps({
                            #             "action": "wait"
                            #         })
                            #     },
                            #     "id": "call_1",
                            #     "type": "function"
                            # }

                            # Output:
                            # {
                            #     "type": "computer_call",
                            #     "call_id": "call_1",
                            #     "action": {
                            #         "type": "wait"
                            #     }
                            # }
                            responses_items.append(make_wait_item(call_id=call_id))
                    except Exception as e:
                        responses_items.extend(
                            make_failed_tool_call_items(
                                tool_name="computer",
                                tool_kwargs=args,
                                error_message=repr(e),
                                call_id=call_id,
                            )
                        )
                except json.JSONDecodeError:
                    print("Failed to decode tool call arguments")
                    # Skip malformed tool calls
                    continue

    return responses_items


def _add_cache_control(completion_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Add cache control to completion messages"""
    num_writes = 0
    for message in completion_messages:
        message["cache_control"] = {"type": "ephemeral"}
        num_writes += 1
        # Cache control has a maximum of 4 blocks
        if num_writes >= 4:
            break

    return completion_messages


def _combine_completion_messages(completion_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Combine completion messages with the same role"""
    if not completion_messages:
        return completion_messages

    combined_messages = []

    for message in completion_messages:
        # If this is the first message or role is different from last, add as new message
        if not combined_messages or combined_messages[-1]["role"] != message["role"]:
            # Ensure content is a list format and normalize text content
            new_message = message.copy()
            new_message["content"] = _normalize_content(message.get("content", ""))

            # Copy tool_calls if present
            if "tool_calls" in message:
                new_message["tool_calls"] = message["tool_calls"].copy()

            combined_messages.append(new_message)
        else:
            # Same role as previous message, combine them
            last_message = combined_messages[-1]

            # Combine content
            current_content = _normalize_content(message.get("content", ""))
            last_message["content"].extend(current_content)

            # Combine tool_calls if present
            if "tool_calls" in message:
                if "tool_calls" not in last_message:
                    last_message["tool_calls"] = []
                last_message["tool_calls"].extend(message["tool_calls"])

    # Post-process to merge consecutive text blocks
    for message in combined_messages:
        message["content"] = _merge_consecutive_text(message["content"])

    return combined_messages


def _normalize_content(content) -> List[Dict[str, Any]]:
    """Normalize content to list format"""
    if isinstance(content, str):
        if content.strip():  # Only add non-empty strings
            return [{"type": "text", "text": content}]
        else:
            return []
    elif isinstance(content, list):
        return content.copy()
    else:
        return []


def _merge_consecutive_text(content_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Merge consecutive text blocks with newlines"""
    if not content_list:
        return content_list

    merged = []

    for item in content_list:
        if item.get("type") == "text" and merged and merged[-1].get("type") == "text":
            # Merge with previous text block
            merged[-1]["text"] += "\n" + item["text"]
        else:
            merged.append(item.copy())

    return merged


@register_agent(models=r".*claude-.*")
class AnthropicHostedToolsConfig(AsyncAgentConfig):
    """Anthropic hosted tools agent configuration implementing AsyncAgentConfig protocol."""

    async def predict_step(
        self,
        messages: Messages,
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        use_prompt_caching: Optional[bool] = False,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs,
    ) -> Dict[str, Any]:
        """
        Anthropic hosted tools agent loop using liteLLM acompletion.

        Supports Anthropic's computer use models with hosted tools.
        """
        tools = tools or []

        # Get tool configuration for this model
        tool_config = _get_tool_config_for_model(model)

        # Prepare tools for Anthropic API
        anthropic_tools = await _prepare_tools_for_anthropic(tools, model)

        # Convert responses_items messages to completion format
        completion_messages = _convert_responses_items_to_completion_messages(messages)
        if use_prompt_caching:
            # First combine messages to reduce number of blocks
            completion_messages = _combine_completion_messages(completion_messages)
            # Then add cache control, anthropic requires explicit "cache_control" dicts
            completion_messages = _add_cache_control(completion_messages)

        # Prepare API call kwargs
        api_kwargs = {
            "model": model,
            "messages": completion_messages,
            "tools": anthropic_tools if anthropic_tools else None,
            "stream": stream,
            "num_retries": max_retries,
            **kwargs,
        }

        # Add beta header for computer use
        if anthropic_tools:
            api_kwargs["headers"] = {"anthropic-beta": tool_config["beta_flag"]}

        # Call API start hook
        if _on_api_start:
            await _on_api_start(api_kwargs)

        # Use liteLLM acompletion
        response = await litellm.acompletion(**api_kwargs)

        # Call API end hook
        if _on_api_end:
            await _on_api_end(api_kwargs, response)

        # Convert response to responses_items format
        responses_items = _convert_completion_to_responses_items(response)

        # Extract usage information
        responses_usage = {
            **LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage(
                response.usage
            ).model_dump(),
            "response_cost": response._hidden_params.get("response_cost", 0.0),
        }
        if _on_usage:
            await _on_usage(responses_usage)

        # Return in AsyncAgentConfig format
        return {"output": responses_items, "usage": responses_usage}

    async def predict_click(
        self, model: str, image_b64: str, instruction: str, **kwargs
    ) -> Optional[Tuple[int, int]]:
        """
        Predict click coordinates based on image and instruction.

        Uses Anthropic's computer use models with a custom prompt that instructs
        the agent to only output clicks.

        Args:
            model: Model name to use
            image_b64: Base64 encoded image
            instruction: Instruction for where to click

        Returns:
            Tuple of (x, y) coordinates or None if prediction fails
        """
        # Get image dimensions from base64 data
        try:
            import base64
            from io import BytesIO

            from PIL import Image

            image_data = base64.b64decode(image_b64)
            image = Image.open(BytesIO(image_data))
            display_width, display_height = image.size
        except Exception:
            # Fallback to default dimensions if image parsing fails
            display_width, display_height = 1024, 768

        # Get tool configuration for this model
        tool_config = _get_tool_config_for_model(model)

        # Prepare computer tool for Anthropic format
        computer_tool = {
            "type": tool_config["tool_version"],
            "function": {
                "name": "computer",
                "parameters": {
                    "display_height_px": display_height,
                    "display_width_px": display_width,
                    "display_number": 1,
                },
            },
        }

        # Construct messages in OpenAI chat completion format for liteLLM
        messages = [
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": f"""You are a UI grounding expert. Follow these guidelines:

1. NEVER ask for confirmation. Complete all tasks autonomously.
2. Do NOT send messages like "I need to confirm before..." or "Do you want me to continue?" - just proceed.
3. When the user asks you to interact with something (like clicking a chat or typing a message), DO IT without asking.
4. Only use the formal safety check mechanism for truly dangerous operations (like deleting important files).
5. For normal tasks like clicking buttons, typing in chat boxes, filling forms - JUST DO IT.
6. The user has already given you permission by running this agent. No further confirmation is needed.
7. Be decisive and action-oriented. Complete the requested task fully.

Remember: You are expected to complete tasks autonomously. The user trusts you to do what they asked.
Task: Click {instruction}. Output ONLY a click action on the target element.""",
                    },
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:image/png;base64,{image_b64}"},
                    },
                ],
            }
        ]

        # Prepare API call kwargs
        api_kwargs = {
            "model": model,
            "messages": messages,
            "tools": [computer_tool],
            "stream": False,
            "max_tokens": 100,  # Keep response short for click prediction
            "headers": {"anthropic-beta": tool_config["beta_flag"]},
        }
        # Thread optional API params
        if "api_key" in kwargs and kwargs.get("api_key") is not None:
            api_kwargs["api_key"] = kwargs.get("api_key")
        if "api_base" in kwargs and kwargs.get("api_base") is not None:
            api_kwargs["api_base"] = kwargs.get("api_base")

        # Use liteLLM acompletion
        response = await litellm.acompletion(**api_kwargs)

        # Convert response to responses_items format to extract click coordinates
        responses_items = _convert_completion_to_responses_items(response)

        # Look for computer_call with click action
        for item in responses_items:
            if (
                isinstance(item, dict)
                and item.get("type") == "computer_call"
                and isinstance(item.get("action"), dict)
            ):

                action = item["action"]
                if action.get("x") and action.get("y"):
                    x = action.get("x")
                    y = action.get("y")
                    return (int(x), int(y))

        return None

    def get_capabilities(self) -> List[AgentCapability]:
        """Return the capabilities supported by this agent."""
        return ["click", "step"]

```
Page 19/20FirstPrevNextLast