#
tokens: 48194/50000 14/616 files (page 10/20)
lines: off (toggle) GitHub
raw markdown copy
This is page 10 of 20. Use http://codebase.md/trycua/cua?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── bump-version.yml
│       ├── ci-lume.yml
│       ├── docker-publish-cua-linux.yml
│       ├── docker-publish-cua-windows.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── link-check.yml
│       ├── lint.yml
│       ├── npm-publish-cli.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       ├── python-tests.yml
│       ├── test-cua-models.yml
│       └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│   ├── docs.code-workspace
│   ├── extensions.json
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── cloud-windows-ga-macos-preview.md
│   ├── composite-agents.md
│   ├── computer-use-agents-for-growth-hacking.md
│   ├── cua-hackathon.md
│   ├── cua-playground-preview.md
│   ├── cua-vlm-router.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cli.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── neurips-2025-cua-papers.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .env.example
│   ├── .gitignore
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── observability.mdx
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── cua-vlm-router.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   ├── telemetry.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── cli-playbook
│   │       │   ├── commands.mdx
│   │       │   ├── index.mdx
│   │       │   └── meta.json
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── meta.json
│   │       │   ├── sandboxed-python.mdx
│   │       │   └── tracing-api.mdx
│   │       ├── example-usecases
│   │       │   ├── form-filling.mdx
│   │       │   ├── gemini-complex-ui-navigation.mdx
│   │       │   ├── meta.json
│   │       │   ├── post-event-contact-export.mdx
│   │       │   └── windows-app-behind-vpn.mdx
│   │       ├── get-started
│   │       │   ├── meta.json
│   │       │   └── quickstart.mdx
│   │       ├── index.mdx
│   │       ├── macos-vm-cli-playbook
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   └── meta.json
│   │       └── meta.json
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── bg-dark.jpg
│   │       ├── bg-light.jpg
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── grounding-with-gemini3.gif
│   │       ├── hero.png
│   │       ├── laminar_trace_example.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   ├── posthog
│   │   │   │   │   └── [...path]
│   │   │   │   │       └── route.ts
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   ├── llms.txt
│   │   │   │   └── route.ts
│   │   │   ├── robots.ts
│   │   │   └── sitemap.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── analytics-tracker.tsx
│   │   │   ├── cookie-consent.tsx
│   │   │   ├── doc-actions-menu.tsx
│   │   │   ├── editable-code-block.tsx
│   │   │   ├── footer.tsx
│   │   │   ├── hero.tsx
│   │   │   ├── iou.tsx
│   │   │   ├── mermaid.tsx
│   │   │   └── page-feedback.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   ├── mdx-components.tsx
│   │   └── providers
│   │       └── posthog-provider.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── browser_tool_example.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── tracing_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── cua_adapter.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gelato.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── generic_vlm.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   ├── uiins.py
│   │   │   │   │   ├── uitars.py
│   │   │   │   │   └── uitars2.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── tools
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── browser_tool.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer_agent.py
│   │   ├── bench-ui
│   │   │   ├── bench_ui
│   │   │   │   ├── __init__.py
│   │   │   │   ├── api.py
│   │   │   │   └── child.py
│   │   │   ├── examples
│   │   │   │   ├── folder_example.py
│   │   │   │   ├── gui
│   │   │   │   │   ├── index.html
│   │   │   │   │   ├── logo.svg
│   │   │   │   │   └── styles.css
│   │   │   │   ├── output_overlay.png
│   │   │   │   └── simple_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       └── test_port_detection.py
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── tracing_wrapper.py
│   │   │   │   ├── tracing.py
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer.py
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── browser.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   ├── utils
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── wallpaper.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   ├── test_connection.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_server.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_telemetry.py
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── build-extension.py
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── desktop-extension
│   │   │   │   ├── cua-extension.mcpb
│   │   │   │   ├── desktop_extension.png
│   │   │   │   ├── manifest.json
│   │   │   │   ├── README.md
│   │   │   │   ├── requirements.txt
│   │   │   │   ├── run_server.sh
│   │   │   │   └── setup.py
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── QUICK_TEST_COMMANDS.sh
│   │   │   ├── quick_test_local_option.py
│   │   │   ├── README.md
│   │   │   ├── scripts
│   │   │   │   ├── install_mcp_server.sh
│   │   │   │   └── start_mcp_server.sh
│   │   │   ├── test_mcp_server_local_option.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_mcp_server.py
│   │   ├── pylume
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_pylume.py
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           ├── conftest.py
│   │           └── test_omniparser.py
│   ├── qemu-docker
│   │   ├── linux
│   │   │   ├── Dockerfile
│   │   │   ├── README.md
│   │   │   └── src
│   │   │       ├── entry.sh
│   │   │       └── vm
│   │   │           ├── image
│   │   │           │   └── README.md
│   │   │           └── setup
│   │   │               ├── install.sh
│   │   │               ├── setup-cua-server.sh
│   │   │               └── setup.sh
│   │   ├── README.md
│   │   └── windows
│   │       ├── Dockerfile
│   │       ├── README.md
│   │       └── src
│   │           ├── entry.sh
│   │           └── vm
│   │               ├── image
│   │               │   └── README.md
│   │               └── setup
│   │                   ├── install.bat
│   │                   ├── on-logon.ps1
│   │                   ├── setup-cua-server.ps1
│   │                   ├── setup-utils.psm1
│   │                   └── setup.ps1
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── cua-cli
│   │   │   ├── .gitignore
│   │   │   ├── .prettierrc
│   │   │   ├── bun.lock
│   │   │   ├── CLAUDE.md
│   │   │   ├── index.ts
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── auth.ts
│   │   │   │   ├── cli.ts
│   │   │   │   ├── commands
│   │   │   │   │   ├── auth.ts
│   │   │   │   │   └── sandbox.ts
│   │   │   │   ├── config.ts
│   │   │   │   ├── http.ts
│   │   │   │   ├── storage.ts
│   │   │   │   └── util.ts
│   │   │   └── tsconfig.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Development.md
│       ├── Dockerfile
│       ├── Dockerfile.dev
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── install-cli.ps1
│   ├── install-cli.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   ├── run-docker-dev.sh
│   └── typescript-typecheck.js
├── TESTING.md
├── tests
│   ├── agent_loop_testing
│   │   ├── agent_test.py
│   │   └── README.md
│   ├── pytest.ini
│   ├── shell_cmd.py
│   ├── test_files.py
│   ├── test_mcp_server_session_management.py
│   ├── test_mcp_server_streaming.py
│   ├── test_shell_bash.py
│   ├── test_telemetry.py
│   ├── test_tracing.py
│   ├── test_venv.py
│   └── test_watchdog.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/docs/content/docs/computer-sdk/commands.mdx:
--------------------------------------------------------------------------------

```markdown
---
title: Commands
description: Computer commands and interface methods
---

This page describes the set of supported **commands** you can use to control a Cua Computer directly via the Python SDK.

These commands map to the same actions available in the [Computer Server API Commands Reference](/computer-sdk/computer-server/Commands), and provide low-level, async access to system operations from your agent or automation code.

## Shell Actions

Execute shell commands and get detailed results:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">

    ```python
    # Run shell command
    result = await computer.interface.run_command(cmd) # result.stdout, result.stderr, result.returncode
    ```

  </Tab>
  <Tab value="TypeScript">

    ```typescript
    // Run shell command
    const result = await computer.interface.runCommand(cmd); // result.stdout, result.stderr, result.returncode
    ```

  </Tab>
</Tabs>

## Window Management

Control application launching and windows:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">

    ```python
    # Launch applications
    await computer.interface.launch("xfce4-terminal")
    await computer.interface.launch("libreoffice --writer")
    await computer.interface.open("https://www.google.com")

    # Window management
    windows = await computer.interface.get_application_windows("xfce4-terminal")
    window_id = windows[0]
    await computer.interface.activate_window(window_id)

    window_id = await computer.interface.get_current_window_id()  # get the current active window id
    await computer.interface.window_size(window_id)
    await computer.interface.get_window_title(window_id)
    await computer.interface.get_window_position(window_id)
    await computer.interface.set_window_size(window_id, 1200, 800)
    await computer.interface.set_window_position(window_id, 100, 100)
    await computer.interface.maximize_window(window_id)
    await computer.interface.minimize_window(window_id)
    await computer.interface.close_window(window_id)
    ```

  </Tab>
  <Tab value="TypeScript">

    ```typescript
    // Launch applications
    await computer.interface.launch("xfce4-terminal");
    await computer.interface.launch("libreoffice --writer");
    await computer.interface.open("https://www.google.com");

    // Window management
    const windows = await computer.interface.getApplicationWindows("xfce4-terminal");
    let windowId = windows[0];
    await computer.interface.activateWindow(windowId);

    windowId = await computer.interface.getCurrentWindowId(); // current active window id
    await computer.interface.getWindowSize(windowId);
    await computer.interface.getWindowName(windowId);
    await computer.interface.getWindowPosition(windowId);
    await computer.interface.setWindowSize(windowId, 1200, 800);
    await computer.interface.setWindowPosition(windowId, 100, 100);
    await computer.interface.maximizeWindow(windowId);
    await computer.interface.minimizeWindow(windowId);
    await computer.interface.closeWindow(windowId);
    ```

  </Tab>
</Tabs>

## Mouse Actions

Precise mouse control and interaction:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">

    ```python
    # Basic clicks
    await computer.interface.left_click(x, y)       # Left click at coordinates
    await computer.interface.right_click(x, y)      # Right click at coordinates
    await computer.interface.double_click(x, y)     # Double click at coordinates

    # Cursor movement and dragging
    await computer.interface.move_cursor(x, y)      # Move cursor to coordinates
    await computer.interface.drag_to(x, y, duration)  # Drag to coordinates
    await computer.interface.get_cursor_position()  # Get current cursor position

    # Advanced mouse control
    await computer.interface.mouse_down(x, y, button="left")  # Press and hold a mouse button
    await computer.interface.mouse_up(x, y, button="left")    # Release a mouse button
    ```

  </Tab>
  <Tab value="TypeScript">

    ```typescript
    // Basic clicks
    await computer.interface.leftClick(x, y);       // Left click at coordinates
    await computer.interface.rightClick(x, y);      // Right click at coordinates
    await computer.interface.doubleClick(x, y);     // Double click at coordinates

    // Cursor movement and dragging
    await computer.interface.moveCursor(x, y);      // Move cursor to coordinates
    await computer.interface.dragTo(x, y, duration);  // Drag to coordinates
    await computer.interface.getCursorPosition();  // Get current cursor position

    // Advanced mouse control
    await computer.interface.mouseDown(x, y, "left");  // Press and hold a mouse button
    await computer.interface.mouseUp(x, y, "left");    // Release a mouse button
    ```

  </Tab>
</Tabs>

## Keyboard Actions

Text input and key combinations:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">

    ```python
    # Text input
    await computer.interface.type_text("Hello")     # Type text
    await computer.interface.press_key("enter")     # Press a single key

    # Key combinations and advanced control
    await computer.interface.hotkey("command", "c") # Press key combination
    await computer.interface.key_down("command")    # Press and hold a key
    await computer.interface.key_up("command")      # Release a key
    ```

  </Tab>
  <Tab value="TypeScript">

    ```typescript
    // Text input
    await computer.interface.typeText("Hello");     // Type text
    await computer.interface.pressKey("enter");     // Press a single key

    // Key combinations and advanced control
    await computer.interface.hotkey("command", "c"); // Press key combination
    await computer.interface.keyDown("command");    // Press and hold a key
    await computer.interface.keyUp("command");      // Release a key
    ```

  </Tab>
</Tabs>

## Scrolling Actions

Mouse wheel and scrolling control:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">

    ```python
    # Scrolling
    await computer.interface.scroll(x, y) # Scroll the mouse wheel
    await computer.interface.scroll_down(clicks) # Scroll down
    await computer.interface.scroll_up(clicks) # Scroll up
    ```

  </Tab>
  <Tab value="TypeScript">

    ```typescript
    // Scrolling
    await computer.interface.scroll(x, y); // Scroll the mouse wheel
    await computer.interface.scrollDown(clicks); // Scroll down
    await computer.interface.scrollUp(clicks); // Scroll up
    ```

  </Tab>
</Tabs>

## Screen Actions

Screen capture and display information:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">

    ```python
    # Screen operations
    await computer.interface.screenshot() # Take a screenshot
    await computer.interface.get_screen_size() # Get screen dimensions
    ```

  </Tab>
  <Tab value="TypeScript">

    ```typescript
    // Screen operations
    await computer.interface.screenshot(); // Take a screenshot
    await computer.interface.getScreenSize(); // Get screen dimensions
    ```

  </Tab>
</Tabs>

## Desktop Actions

Control desktop environment features like wallpaper:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">
    ```python 
    # Get current desktop environment (e.g., 'xfce4', 'gnome', 'kde', 'mac', 'windows')
    env = await computer.interface.get_desktop_environment()
    print(env) # "xfce4"

    # Set desktop wallpaper to an image file accessible on the sandbox
    await computer.interface.set_wallpaper("/home/cua/shared/wallpaper.png")
    ```

  </Tab>
  <Tab value="TypeScript">
    ```typescript 
    // Get current desktop environment
    const env = await computer.interface.getDesktopEnvironment();
    print(env) # "xfce4"

    // Set desktop wallpaper to an image file accessible on the sandbox
    await computer.interface.setWallpaper('/home/cua/shared/wallpaper.png');
    ```

  </Tab>
</Tabs>

## Clipboard Actions

System clipboard management:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">

    ```python
    # Clipboard operations
    await computer.interface.set_clipboard(text) # Set clipboard content
    await computer.interface.copy_to_clipboard() # Get clipboard content
    ```

  </Tab>
  <Tab value="TypeScript">

    ```typescript
    // Clipboard operations
    await computer.interface.setClipboard(text); // Set clipboard content
    await computer.interface.copyToClipboard(); // Get clipboard content
    ```

  </Tab>
</Tabs>

## File System Operations

Direct file and directory manipulation:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">

    ```python
    # File existence checks
    await computer.interface.file_exists(path)      # Check if file exists
    await computer.interface.directory_exists(path) # Check if directory exists

    # File content operations
    await computer.interface.read_text(path, encoding="utf-8")        # Read file content
    await computer.interface.write_text(path, content, encoding="utf-8") # Write file content
    await computer.interface.read_bytes(path)       # Read file content as bytes
    await computer.interface.write_bytes(path, content) # Write file content as bytes

    # File and directory management
    await computer.interface.delete_file(path)      # Delete file
    await computer.interface.create_dir(path)       # Create directory
    await computer.interface.delete_dir(path)       # Delete directory
    await computer.interface.list_dir(path)         # List directory contents
    ```

  </Tab>
  <Tab value="TypeScript">

    ```typescript
    // File existence checks
    await computer.interface.fileExists(path);      // Check if file exists
    await computer.interface.directoryExists(path); // Check if directory exists

    // File content operations
    await computer.interface.readText(path, "utf-8");        // Read file content
    await computer.interface.writeText(path, content, "utf-8"); // Write file content
    await computer.interface.readBytes(path);       // Read file content as bytes
    await computer.interface.writeBytes(path, content); // Write file content as bytes

    // File and directory management
    await computer.interface.deleteFile(path);      // Delete file
    await computer.interface.createDir(path);       // Create directory
    await computer.interface.deleteDir(path);       // Delete directory
    await computer.interface.listDir(path);         // List directory contents
    ```

  </Tab>
</Tabs>

## Accessibility

Access system accessibility information:

<Tabs items={['Python', 'TypeScript']}>
  <Tab value="Python">

    ```python
    # Get accessibility tree
    await computer.interface.get_accessibility_tree()
    ```

  </Tab>
  <Tab value="TypeScript">

    ```typescript
    // Get accessibility tree
    await computer.interface.getAccessibilityTree();
    ```

  </Tab>
</Tabs>

## Delay Configuration

Control timing between actions:

<Tabs items={['Python']}>
  <Tab value="Python">

    ```python
    # Set default delay between all actions (in seconds)
    computer.interface.delay = 0.5  # 500ms delay between actions

    # Or specify delay for individual actions
    await computer.interface.left_click(x, y, delay=1.0)     # 1 second delay after click
    await computer.interface.type_text("Hello", delay=0.2)   # 200ms delay after typing
    await computer.interface.press_key("enter", delay=0.5)   # 500ms delay after key press
    ```

  </Tab>
</Tabs>

## Python Virtual Environment Operations

Manage Python environments:

<Tabs items={['Python']}>
  <Tab value="Python">

    ```python
    # Virtual environment management
    await computer.venv_install("demo_venv", ["requests", "macos-pyxa"]) # Install packages in a virtual environment
    await computer.venv_cmd("demo_venv", "python -c 'import requests; print(requests.get(`https://httpbin.org/ip`).json())'') # Run a shell command in a virtual environment
    await computer.venv_exec("demo_venv", python_function_or_code, *args, **kwargs) # Run a Python function in a virtual environment and return the result / raise an exception
    ```

  </Tab>
</Tabs>

```

--------------------------------------------------------------------------------
/libs/python/mcp-server/mcp_server/session_manager.py:
--------------------------------------------------------------------------------

```python
"""
Session Manager for MCP Server - Handles concurrent client sessions with proper resource isolation.

This module provides:
- Per-session computer instance management
- Resource pooling and lifecycle management
- Graceful session cleanup
- Concurrent task execution support
"""

import asyncio
import logging
import os
import time
import uuid
import weakref
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set

logger = logging.getLogger("mcp-server.session_manager")


@dataclass
class SessionInfo:
    """Information about an active session."""

    session_id: str
    computer: Any  # Computer instance
    created_at: float
    last_activity: float
    active_tasks: Set[str] = field(default_factory=set)
    is_shutting_down: bool = False


class ComputerPool:
    """Pool of computer instances for efficient resource management."""

    def __init__(self, max_size: int = 5, idle_timeout: float = 300.0):
        self.max_size = max_size
        self.idle_timeout = idle_timeout
        self._available: List[Any] = []
        self._in_use: Set[Any] = set()
        self._creation_lock = asyncio.Lock()

    async def acquire(self) -> Any:
        """Acquire a computer instance from the pool."""
        # Try to get an available instance
        if self._available:
            computer = self._available.pop()
            self._in_use.add(computer)
            logger.debug("Reusing computer instance from pool")
            return computer

        # Check if we can create a new one
        async with self._creation_lock:
            if len(self._in_use) < self.max_size:
                logger.debug("Creating new computer instance")
                from computer import Computer

                # Check if we should use host computer server
                use_host = os.getenv("CUA_USE_HOST_COMPUTER_SERVER", "false").lower() in (
                    "true",
                    "1",
                    "yes",
                )

                computer = Computer(verbosity=logging.INFO, use_host_computer_server=use_host)
                await computer.run()
                self._in_use.add(computer)
                return computer

        # Wait for an instance to become available
        logger.debug("Waiting for computer instance to become available")
        while not self._available:
            await asyncio.sleep(0.1)

        computer = self._available.pop()
        self._in_use.add(computer)
        return computer

    async def release(self, computer: Any) -> None:
        """Release a computer instance back to the pool."""
        if computer in self._in_use:
            self._in_use.remove(computer)
            self._available.append(computer)
            logger.debug("Released computer instance back to pool")

    async def cleanup_idle(self) -> None:
        """Clean up idle computer instances."""
        current_time = time.time()
        idle_instances = []

        for computer in self._available[:]:
            # Check if computer has been idle too long
            # Note: We'd need to track last use time per instance for this
            # For now, we'll keep instances in the pool
            pass

    async def shutdown(self) -> None:
        """Shutdown all computer instances in the pool."""
        logger.info("Shutting down computer pool")

        # Close all available instances
        for computer in self._available:
            try:
                if hasattr(computer, "close"):
                    await computer.close()
                elif hasattr(computer, "stop"):
                    await computer.stop()
            except Exception as e:
                logger.warning(f"Error closing computer instance: {e}")

        # Close all in-use instances
        for computer in self._in_use:
            try:
                if hasattr(computer, "close"):
                    await computer.close()
                elif hasattr(computer, "stop"):
                    await computer.stop()
            except Exception as e:
                logger.warning(f"Error closing computer instance: {e}")

        self._available.clear()
        self._in_use.clear()


class SessionManager:
    """Manages concurrent client sessions with proper resource isolation."""

    def __init__(self, max_concurrent_sessions: int = 10):
        self.max_concurrent_sessions = max_concurrent_sessions
        self._sessions: Dict[str, SessionInfo] = {}
        self._computer_pool = ComputerPool()
        self._session_lock = asyncio.Lock()
        self._cleanup_task: Optional[asyncio.Task] = None
        self._shutdown_event = asyncio.Event()

    async def start(self) -> None:
        """Start the session manager and cleanup task."""
        logger.info("Starting session manager")
        self._cleanup_task = asyncio.create_task(self._cleanup_loop())

    async def stop(self) -> None:
        """Stop the session manager and cleanup all resources."""
        logger.info("Stopping session manager")
        self._shutdown_event.set()

        if self._cleanup_task:
            self._cleanup_task.cancel()
            try:
                await self._cleanup_task
            except asyncio.CancelledError:
                pass

        # Force cleanup all sessions
        async with self._session_lock:
            session_ids = list(self._sessions.keys())

        for session_id in session_ids:
            await self._force_cleanup_session(session_id)

        await self._computer_pool.shutdown()

    @asynccontextmanager
    async def get_session(self, session_id: Optional[str] = None) -> Any:
        """Get or create a session with proper resource management."""
        if session_id is None:
            session_id = str(uuid.uuid4())

        # Check if session exists and is not shutting down
        async with self._session_lock:
            if session_id in self._sessions:
                session = self._sessions[session_id]
                if session.is_shutting_down:
                    raise RuntimeError(f"Session {session_id} is shutting down")
                session.last_activity = time.time()
                computer = session.computer
            else:
                # Create new session
                if len(self._sessions) >= self.max_concurrent_sessions:
                    raise RuntimeError(
                        f"Maximum concurrent sessions ({self.max_concurrent_sessions}) reached"
                    )

                computer = await self._computer_pool.acquire()
                session = SessionInfo(
                    session_id=session_id,
                    computer=computer,
                    created_at=time.time(),
                    last_activity=time.time(),
                )
                self._sessions[session_id] = session
                logger.info(f"Created new session: {session_id}")

        try:
            yield session
        finally:
            # Update last activity
            async with self._session_lock:
                if session_id in self._sessions:
                    self._sessions[session_id].last_activity = time.time()

    async def register_task(self, session_id: str, task_id: str) -> None:
        """Register a task for a session."""
        async with self._session_lock:
            if session_id in self._sessions:
                self._sessions[session_id].active_tasks.add(task_id)
                logger.debug(f"Registered task {task_id} for session {session_id}")

    async def unregister_task(self, session_id: str, task_id: str) -> None:
        """Unregister a task from a session."""
        async with self._session_lock:
            if session_id in self._sessions:
                self._sessions[session_id].active_tasks.discard(task_id)
                logger.debug(f"Unregistered task {task_id} from session {session_id}")

    async def cleanup_session(self, session_id: str) -> None:
        """Cleanup a specific session."""
        async with self._session_lock:
            if session_id not in self._sessions:
                return

            session = self._sessions[session_id]

            # Check if session has active tasks
            if session.active_tasks:
                logger.info(f"Session {session_id} has active tasks, marking for shutdown")
                session.is_shutting_down = True
                return

            # Actually cleanup the session
            await self._force_cleanup_session(session_id)

    async def _force_cleanup_session(self, session_id: str) -> None:
        """Force cleanup a session regardless of active tasks."""
        async with self._session_lock:
            if session_id not in self._sessions:
                return

            session = self._sessions[session_id]
            logger.info(f"Cleaning up session: {session_id}")

            # Release computer back to pool
            await self._computer_pool.release(session.computer)

            # Remove session
            del self._sessions[session_id]

    async def _cleanup_loop(self) -> None:
        """Background task to cleanup idle sessions."""
        while not self._shutdown_event.is_set():
            try:
                await asyncio.sleep(60)  # Run cleanup every minute

                current_time = time.time()
                idle_timeout = 600.0  # 10 minutes

                async with self._session_lock:
                    idle_sessions = []
                    for session_id, session in self._sessions.items():
                        if not session.is_shutting_down and not session.active_tasks:
                            if current_time - session.last_activity > idle_timeout:
                                idle_sessions.append(session_id)

                # Cleanup idle sessions
                for session_id in idle_sessions:
                    await self._force_cleanup_session(session_id)
                    logger.info(f"Cleaned up idle session: {session_id}")

            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Error in cleanup loop: {e}")

    def get_session_stats(self) -> Dict[str, Any]:
        """Get statistics about active sessions."""

        async def _get_stats():
            async with self._session_lock:
                return {
                    "total_sessions": len(self._sessions),
                    "max_concurrent": self.max_concurrent_sessions,
                    "sessions": {
                        session_id: {
                            "created_at": session.created_at,
                            "last_activity": session.last_activity,
                            "active_tasks": len(session.active_tasks),
                            "is_shutting_down": session.is_shutting_down,
                        }
                        for session_id, session in self._sessions.items()
                    },
                }

        # Run in current event loop or create new one
        try:
            loop = asyncio.get_running_loop()
            return asyncio.run_coroutine_threadsafe(_get_stats(), loop).result()
        except RuntimeError:
            # No event loop running, create a new one
            return asyncio.run(_get_stats())


# Global session manager instance
_session_manager: Optional[SessionManager] = None


def get_session_manager() -> SessionManager:
    """Get the global session manager instance."""
    global _session_manager
    if _session_manager is None:
        _session_manager = SessionManager()
    return _session_manager


async def initialize_session_manager() -> None:
    """Initialize the global session manager."""
    global _session_manager
    if _session_manager is None:
        _session_manager = SessionManager()
        await _session_manager.start()
    return _session_manager


async def shutdown_session_manager() -> None:
    """Shutdown the global session manager."""
    global _session_manager
    if _session_manager is not None:
        await _session_manager.stop()
        _session_manager = None

```

--------------------------------------------------------------------------------
/libs/python/computer/computer/tracing.py:
--------------------------------------------------------------------------------

```python
"""
Computer tracing functionality for recording sessions.

This module provides a Computer.tracing API inspired by Playwright's tracing functionality,
allowing users to record computer interactions for debugging, training, and analysis.
"""

import asyncio
import base64
import io
import json
import time
import uuid
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

from PIL import Image


class ComputerTracing:
    """
    Computer tracing class that records computer interactions and saves them to disk.

    This class provides a flexible API for recording computer sessions with configurable
    options for what to record (screenshots, API calls, video, etc.).
    """

    def __init__(self, computer_instance):
        """
        Initialize the tracing instance.

        Args:
            computer_instance: The Computer instance to trace
        """
        self._computer = computer_instance
        self._is_tracing = False
        self._trace_config: Dict[str, Any] = {}
        self._trace_data: List[Dict[str, Any]] = []
        self._trace_start_time: Optional[float] = None
        self._trace_id: Optional[str] = None
        self._trace_dir: Optional[Path] = None
        self._screenshot_count = 0

    @property
    def is_tracing(self) -> bool:
        """Check if tracing is currently active."""
        return self._is_tracing

    async def start(self, config: Optional[Dict[str, Any]] = None) -> None:
        """
        Start tracing with the specified configuration.

        Args:
            config: Tracing configuration dict with options:
                - video: bool - Record video frames (default: False)
                - screenshots: bool - Record screenshots (default: True)
                - api_calls: bool - Record API calls and results (default: True)
                - accessibility_tree: bool - Record accessibility tree snapshots (default: False)
                - metadata: bool - Record custom metadata (default: True)
                - name: str - Custom trace name (default: auto-generated)
                - path: str - Custom trace directory path (default: auto-generated)
        """
        if self._is_tracing:
            raise RuntimeError("Tracing is already active. Call stop() first.")

        # Set default configuration
        default_config = {
            "video": False,
            "screenshots": True,
            "api_calls": True,
            "accessibility_tree": False,
            "metadata": True,
            "name": None,
            "path": None,
        }

        self._trace_config = {**default_config, **(config or {})}

        # Generate trace ID and directory
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self._trace_id = (
            self._trace_config.get("name") or f"trace_{timestamp}_{str(uuid.uuid4())[:8]}"
        )

        if self._trace_config.get("path"):
            self._trace_dir = Path(self._trace_config["path"])
        else:
            self._trace_dir = Path.cwd() / "traces" / self._trace_id

        # Create trace directory
        self._trace_dir.mkdir(parents=True, exist_ok=True)

        # Initialize trace data
        self._trace_data = []
        self._trace_start_time = time.time()
        self._screenshot_count = 0
        self._is_tracing = True

        # Record initial metadata
        await self._record_event(
            "trace_start",
            {
                "trace_id": self._trace_id,
                "config": self._trace_config,
                "timestamp": self._trace_start_time,
                "computer_info": {
                    "os_type": self._computer.os_type,
                    "provider_type": str(self._computer.provider_type),
                    "image": self._computer.image,
                },
            },
        )

        # Take initial screenshot if enabled
        if self._trace_config.get("screenshots"):
            await self._take_screenshot("initial_screenshot")

    async def stop(self, options: Optional[Dict[str, Any]] = None) -> str:
        """
        Stop tracing and save the trace data.

        Args:
            options: Stop options dict with:
                - path: str - Custom output path for the trace archive
                - format: str - Output format ('zip' or 'dir', default: 'zip')

        Returns:
            str: Path to the saved trace file or directory
        """
        if not self._is_tracing:
            raise RuntimeError("Tracing is not active. Call start() first.")

        if self._trace_start_time is None or self._trace_dir is None or self._trace_id is None:
            raise RuntimeError("Tracing state is invalid.")

        # Record final metadata
        await self._record_event(
            "trace_end",
            {
                "timestamp": time.time(),
                "duration": time.time() - self._trace_start_time,
                "total_events": len(self._trace_data),
                "screenshot_count": self._screenshot_count,
            },
        )

        # Take final screenshot if enabled
        if self._trace_config.get("screenshots"):
            await self._take_screenshot("final_screenshot")

        # Save trace metadata
        metadata_path = self._trace_dir / "trace_metadata.json"
        with open(metadata_path, "w") as f:
            json.dump(
                {
                    "trace_id": self._trace_id,
                    "config": self._trace_config,
                    "start_time": self._trace_start_time,
                    "end_time": time.time(),
                    "duration": time.time() - self._trace_start_time,
                    "total_events": len(self._trace_data),
                    "screenshot_count": self._screenshot_count,
                    "events": self._trace_data,
                },
                f,
                indent=2,
                default=str,
            )

        # Determine output format and path
        output_format = options.get("format", "zip") if options else "zip"
        custom_path = options.get("path") if options else None

        if output_format == "zip":
            # Create zip file
            if custom_path:
                zip_path = Path(custom_path)
            else:
                zip_path = self._trace_dir.parent / f"{self._trace_id}.zip"

            await self._create_zip_archive(zip_path)
            output_path = str(zip_path)
        else:
            # Return directory path
            if custom_path:
                # Move directory to custom path
                custom_dir = Path(custom_path)
                if custom_dir.exists():
                    import shutil

                    shutil.rmtree(custom_dir)
                self._trace_dir.rename(custom_dir)
                output_path = str(custom_dir)
            else:
                output_path = str(self._trace_dir)

        # Reset tracing state
        self._is_tracing = False
        self._trace_config = {}
        self._trace_data = []
        self._trace_start_time = None
        self._trace_id = None
        self._screenshot_count = 0

        return output_path

    async def _record_event(self, event_type: str, data: Dict[str, Any]) -> None:
        """
        Record a trace event.

        Args:
            event_type: Type of event (e.g., 'click', 'type', 'screenshot')
            data: Event data
        """
        if not self._is_tracing or self._trace_start_time is None or self._trace_dir is None:
            return

        event = {
            "type": event_type,
            "timestamp": time.time(),
            "relative_time": time.time() - self._trace_start_time,
            "data": data,
        }

        self._trace_data.append(event)

        # Save event to individual file for large traces
        event_file = self._trace_dir / f"event_{len(self._trace_data):06d}_{event_type}.json"
        with open(event_file, "w") as f:
            json.dump(event, f, indent=2, default=str)

    async def _take_screenshot(self, name: str = "screenshot") -> Optional[str]:
        """
        Take a screenshot and save it to the trace.

        Args:
            name: Name for the screenshot

        Returns:
            Optional[str]: Path to the saved screenshot, or None if screenshots disabled
        """
        if (
            not self._trace_config.get("screenshots")
            or not self._computer.interface
            or self._trace_dir is None
        ):
            return None

        try:
            screenshot_bytes = await self._computer.interface.screenshot()
            self._screenshot_count += 1

            screenshot_filename = f"{self._screenshot_count:06d}_{name}.png"
            screenshot_path = self._trace_dir / screenshot_filename

            with open(screenshot_path, "wb") as f:
                f.write(screenshot_bytes)

            return str(screenshot_path)
        except Exception as e:
            # Log error but don't fail the trace
            if hasattr(self._computer, "logger"):
                self._computer.logger.warning(f"Failed to take screenshot: {e}")
            return None

    async def _create_zip_archive(self, zip_path: Path) -> None:
        """
        Create a zip archive of the trace directory.

        Args:
            zip_path: Path where to save the zip file
        """
        if self._trace_dir is None:
            raise RuntimeError("Trace directory is not set")

        with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
            for file_path in self._trace_dir.rglob("*"):
                if file_path.is_file():
                    arcname = file_path.relative_to(self._trace_dir)
                    zipf.write(file_path, arcname)

    async def record_api_call(
        self,
        method: str,
        args: Dict[str, Any],
        result: Any = None,
        error: Optional[Exception] = None,
    ) -> None:
        """
        Record an API call event.

        Args:
            method: The method name that was called
            args: Arguments passed to the method
            result: Result returned by the method
            error: Exception raised by the method, if any
        """
        if not self._trace_config.get("api_calls"):
            return

        # Take screenshot after certain actions if enabled
        screenshot_path = None
        screenshot_actions = [
            "left_click",
            "right_click",
            "double_click",
            "type_text",
            "press_key",
            "hotkey",
        ]
        if method in screenshot_actions and self._trace_config.get("screenshots"):
            screenshot_path = await self._take_screenshot(f"after_{method}")

        # Record accessibility tree after certain actions if enabled
        if method in screenshot_actions and self._trace_config.get("accessibility_tree"):
            await self.record_accessibility_tree()

        await self._record_event(
            "api_call",
            {
                "method": method,
                "args": args,
                "result": str(result) if result is not None else None,
                "error": str(error) if error else None,
                "screenshot": screenshot_path,
                "success": error is None,
            },
        )

    async def record_accessibility_tree(self) -> None:
        """Record the current accessibility tree if enabled."""
        if not self._trace_config.get("accessibility_tree") or not self._computer.interface:
            return

        try:
            accessibility_tree = await self._computer.interface.get_accessibility_tree()
            await self._record_event("accessibility_tree", {"tree": accessibility_tree})
        except Exception as e:
            if hasattr(self._computer, "logger"):
                self._computer.logger.warning(f"Failed to record accessibility tree: {e}")

    async def add_metadata(self, key: str, value: Any) -> None:
        """
        Add custom metadata to the trace.

        Args:
            key: Metadata key
            value: Metadata value
        """
        if not self._trace_config.get("metadata"):
            return

        await self._record_event("metadata", {"key": key, "value": value})

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/composed_grounded.py:
--------------------------------------------------------------------------------

```python
"""
Composed-grounded agent loop implementation that combines grounding and thinking models.
Uses a two-stage approach: grounding model for element detection, thinking model for reasoning.
"""

import asyncio
import base64
import json
import uuid
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple

import litellm
from PIL import Image

from ..agent import find_agent_config
from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..responses import (
    convert_completion_messages_to_responses_items,
    convert_computer_calls_desc2xy,
    convert_computer_calls_xy2desc,
    convert_responses_items_to_completion_messages,
    get_all_element_descriptions,
)
from ..types import AgentCapability, AgentResponse, Messages, Tools

GROUNDED_COMPUTER_TOOL_SCHEMA = {
    "type": "function",
    "function": {
        "name": "computer",
        "description": "Control a computer by taking screenshots and interacting with UI elements. This tool uses element descriptions to locate and interact with UI elements on the screen (e.g., 'red submit button', 'search text field', 'hamburger menu icon', 'close button in top right corner').",
        "parameters": {
            "type": "object",
            "properties": {
                "action": {
                    "type": "string",
                    "enum": [
                        "screenshot",
                        "click",
                        "double_click",
                        "drag",
                        "type",
                        "keypress",
                        "scroll",
                        "move",
                        "wait",
                        "get_current_url",
                        "get_dimensions",
                        "get_environment",
                    ],
                    "description": "The action to perform (required for all actions)",
                },
                "element_description": {
                    "type": "string",
                    "description": "Description of the element to interact with (required for click, double_click, move, scroll actions)",
                },
                "start_element_description": {
                    "type": "string",
                    "description": "Description of the element to start dragging from (required for drag action)",
                },
                "end_element_description": {
                    "type": "string",
                    "description": "Description of the element to drag to (required for drag action)",
                },
                "text": {
                    "type": "string",
                    "description": "The text to type (required for type action)",
                },
                "keys": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "Key(s) to press (required for keypress action)",
                },
                "button": {
                    "type": "string",
                    "enum": ["left", "right", "wheel", "back", "forward"],
                    "description": "The mouse button to use for click action (required for click and double_click action)",
                },
                "scroll_x": {
                    "type": "integer",
                    "description": "Horizontal scroll amount for scroll action (required for scroll action)",
                },
                "scroll_y": {
                    "type": "integer",
                    "description": "Vertical scroll amount for scroll action (required for scroll action)",
                },
            },
            "required": ["action"],
        },
    },
}


def _prepare_tools_for_grounded(tool_schemas: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Prepare tools for grounded API format"""
    grounded_tools = []

    for schema in tool_schemas:
        if schema["type"] == "computer":
            grounded_tools.append(GROUNDED_COMPUTER_TOOL_SCHEMA)
        else:
            grounded_tools.append(schema)

    return grounded_tools


def get_last_computer_call_image(messages: List[Dict[str, Any]]) -> Optional[str]:
    """Get the last computer call output image from messages."""
    for message in reversed(messages):
        if (
            isinstance(message, dict)
            and message.get("type") == "computer_call_output"
            and isinstance(message.get("output"), dict)
            and message["output"].get("type") == "input_image"
        ):
            image_url = message["output"].get("image_url", "")
            if image_url.startswith("data:image/png;base64,"):
                return image_url.split(",", 1)[1]
    return None


@register_agent(r".*\+.*", priority=1)
class ComposedGroundedConfig(AsyncAgentConfig):
    """
    Composed-grounded agent configuration that uses both grounding and thinking models.

    The model parameter should be in format: "grounding_model+thinking_model"
    e.g., "huggingface-local/HelloKKMe/GTA1-7B+gemini/gemini-1.5-pro"
    """

    def __init__(self):
        self.desc2xy: Dict[str, Tuple[float, float]] = {}

    async def predict_step(
        self,
        messages: List[Dict[str, Any]],
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        use_prompt_caching: Optional[bool] = False,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs,
    ) -> Dict[str, Any]:
        """
        Composed-grounded predict step implementation.

        Process:
        0. Store last computer call image, if none then take a screenshot
        1. Convert computer calls from xy to descriptions
        2. Convert responses items to completion messages
        3. Call thinking model with litellm.acompletion
        4. Convert completion messages to responses items
        5. Get all element descriptions and populate desc2xy mapping
        6. Convert computer calls from descriptions back to xy coordinates
        7. Return output and usage
        """
        # Parse the composed model
        if "+" not in model:
            raise ValueError(
                f"Composed model must be in format 'grounding_model+thinking_model', got: {model}"
            )
        grounding_model, thinking_model = model.split("+", 1)

        pre_output_items = []

        # Step 0: Store last computer call image, if none then take a screenshot
        last_image_b64 = get_last_computer_call_image(messages)
        if last_image_b64 is None:
            # Take a screenshot
            screenshot_b64 = await computer_handler.screenshot()  # type: ignore
            if screenshot_b64:

                call_id = uuid.uuid4().hex
                pre_output_items += [
                    {
                        "type": "message",
                        "role": "assistant",
                        "content": [
                            {
                                "type": "output_text",
                                "text": "Taking a screenshot to see the current computer screen.",
                            }
                        ],
                    },
                    {
                        "action": {"type": "screenshot"},
                        "call_id": call_id,
                        "status": "completed",
                        "type": "computer_call",
                    },
                    {
                        "type": "computer_call_output",
                        "call_id": call_id,
                        "output": {
                            "type": "input_image",
                            "image_url": f"data:image/png;base64,{screenshot_b64}",
                        },
                    },
                ]
                last_image_b64 = screenshot_b64

                # Call screenshot callback if provided
                if _on_screenshot:
                    await _on_screenshot(screenshot_b64)

        tool_schemas = _prepare_tools_for_grounded(tools)  # type: ignore

        # Step 1: Convert computer calls from xy to descriptions
        input_messages = messages + pre_output_items
        messages_with_descriptions = convert_computer_calls_xy2desc(input_messages, self.desc2xy)

        # Step 2: Convert responses items to completion messages
        completion_messages = convert_responses_items_to_completion_messages(
            messages_with_descriptions, allow_images_in_tool_results=False
        )

        # Step 3: Call thinking model with litellm.acompletion
        api_kwargs = {
            "model": thinking_model,
            "messages": completion_messages,
            "tools": tool_schemas,
            "max_retries": max_retries,
            "stream": stream,
            **kwargs,
        }

        if use_prompt_caching:
            api_kwargs["use_prompt_caching"] = use_prompt_caching

        # Call API start hook
        if _on_api_start:
            await _on_api_start(api_kwargs)

        # Make the completion call
        response = await litellm.acompletion(**api_kwargs)

        # Call API end hook
        if _on_api_end:
            await _on_api_end(api_kwargs, response)

        # Extract usage information
        usage = {
            **response.usage.model_dump(),  # type: ignore
            "response_cost": response._hidden_params.get("response_cost", 0.0),
        }
        if _on_usage:
            await _on_usage(usage)

        # Step 4: Convert completion messages back to responses items format
        response_dict = response.model_dump()  # type: ignore
        choice_messages = [choice["message"] for choice in response_dict["choices"]]
        thinking_output_items = []

        for choice_message in choice_messages:
            thinking_output_items.extend(
                convert_completion_messages_to_responses_items([choice_message])
            )

        # Step 5: Get all element descriptions and populate desc2xy mapping
        element_descriptions = get_all_element_descriptions(thinking_output_items)

        if element_descriptions and last_image_b64:
            # Use grounding model to predict coordinates for each description
            grounding_agent_conf = find_agent_config(grounding_model)
            if grounding_agent_conf:
                grounding_agent = grounding_agent_conf.agent_class()

                for desc in element_descriptions:
                    for _ in range(3):  # try 3 times
                        coords = await grounding_agent.predict_click(
                            model=grounding_model, image_b64=last_image_b64, instruction=desc
                        )
                        if coords:
                            self.desc2xy[desc] = coords
                            break

        # Step 6: Convert computer calls from descriptions back to xy coordinates
        final_output_items = convert_computer_calls_desc2xy(thinking_output_items, self.desc2xy)

        # Step 7: Return output and usage
        return {"output": pre_output_items + final_output_items, "usage": usage}

    async def predict_click(
        self, model: str, image_b64: str, instruction: str, **kwargs
    ) -> Optional[Tuple[int, int]]:
        """
        Predict click coordinates using the grounding model.

        For composed models, uses only the grounding model part for click prediction.
        """
        # Parse the composed model to get grounding model
        if "+" not in model:
            raise ValueError(
                f"Composed model must be in format 'grounding_model+thinking_model', got: {model}"
            )
        grounding_model, thinking_model = model.split("+", 1)

        # Find and use the grounding agent
        grounding_agent_conf = find_agent_config(grounding_model)
        if grounding_agent_conf:
            grounding_agent = grounding_agent_conf.agent_class()
            return await grounding_agent.predict_click(
                model=grounding_model, image_b64=image_b64, instruction=instruction, **kwargs
            )

        return None

    def get_capabilities(self) -> List[AgentCapability]:
        """Return the capabilities supported by this agent."""
        return ["click", "step"]

```

--------------------------------------------------------------------------------
/.github/workflows/pypi-reusable-publish.yml:
--------------------------------------------------------------------------------

```yaml
name: Reusable Package Publish Workflow

on:
  workflow_call:
    inputs:
      package_name:
        description: "Name of the package (e.g. computer, agent)"
        required: true
        type: string
      package_dir:
        description: "Directory containing the package relative to workspace root (e.g. libs/python/computer)"
        required: true
        type: string
      version:
        description: "Version to publish"
        required: true
        type: string
      is_lume_package:
        description: "Whether this package includes the lume binary"
        required: false
        type: boolean
        default: false
      base_package_name:
        description: "PyPI package name (e.g. cua-agent)"
        required: true
        type: string
      make_latest:
        description: "Whether to mark this release as latest (should only be true for lume)"
        required: false
        type: boolean
        default: false
    secrets:
      PYPI_TOKEN:
        required: true
    outputs:
      version:
        description: "The version that was published"
        value: ${{ jobs.build-and-publish.outputs.version }}

jobs:
  build-and-publish:
    runs-on: macos-latest
    permissions:
      contents: write # This permission is needed for creating releases
    outputs:
      version: ${{ steps.set-version.outputs.version }}
    steps:
      - uses: actions/checkout@v4
        with:
          ref: main
          fetch-depth: 0 # Full history for release creation

      - name: Ensure latest main branch
        run: |
          git fetch origin main
          git reset --hard origin/main
          echo "Current HEAD commit:"
          git log -1 --oneline

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: "3.11"

      - name: Create root pdm.lock file
        run: |
          # Create an empty pdm.lock file in the root
          touch pdm.lock

      - name: Install PDM
        uses: pdm-project/setup-pdm@v3
        with:
          python-version: "3.11"
          cache: true

      - name: Set version
        id: set-version
        run: |
          echo "VERSION=${{ inputs.version }}" >> $GITHUB_ENV
          echo "version=${{ inputs.version }}" >> $GITHUB_OUTPUT

      - name: Verify version consistency
        run: |
          # Install toml parser
          pip install toml

          # Verify version matches using script (exits with error if mismatch)
          python ${GITHUB_WORKSPACE}/.github/scripts/get_pyproject_version.py \
            ${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pyproject.toml \
            ${{ inputs.version }}

      - name: Initialize PDM in package directory
        run: |
          # Make sure we're working with a properly initialized PDM project
          cd ${{ inputs.package_dir }}

          # Create pdm.lock if it doesn't exist
          if [ ! -f "pdm.lock" ]; then
            echo "No pdm.lock found, initializing PDM project..."
            pdm lock
          fi

      # Conditional step for lume binary download (only for pylume package)
      - name: Download and setup lume binary
        if: inputs.is_lume_package
        run: |
          # Create a temporary directory for extraction
          mkdir -p temp_lume

          # Download the latest lume release directly
          echo "Downloading latest lume version..."
          curl -sL "https://github.com/trycua/lume/releases/latest/download/lume.tar.gz" -o temp_lume/lume.tar.gz

          # Extract the tar file (ignore ownership and suppress warnings)
          cd temp_lume && tar --no-same-owner -xzf lume.tar.gz

          # Make the binary executable
          chmod +x lume

          # Copy the lume binary to the correct location in the pylume package
          mkdir -p "${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pylume"
          cp lume "${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pylume/lume"

          # Verify the binary exists and is executable
          test -x "${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pylume/lume" || { echo "lume binary not found or not executable"; exit 1; }

          # Get the version from the downloaded binary for reference
          LUME_VERSION=$(./lume --version | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' || echo "unknown")
          echo "Using lume version: $LUME_VERSION"

          # Cleanup
          cd "${GITHUB_WORKSPACE}" && rm -rf temp_lume

          # Save the lume version for reference
          echo "LUME_VERSION=${LUME_VERSION}" >> $GITHUB_ENV

      - name: Build and publish
        env:
          PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }}
        run: |
          cd ${{ inputs.package_dir }}
          # Build with PDM
          pdm build

          # For pylume package, verify the binary is in the wheel
          if [ "${{ inputs.is_lume_package }}" = "true" ]; then
            python -m pip install wheel
            wheel unpack dist/*.whl --dest temp_wheel
            echo "Listing contents of wheel directory:"
            find temp_wheel -type f
            test -f temp_wheel/pylume-*/pylume/lume || { echo "lume binary not found in wheel"; exit 1; }
            rm -rf temp_wheel
            echo "Publishing ${{ inputs.base_package_name }} ${VERSION} with lume ${LUME_VERSION}"
          else
            echo "Publishing ${{ inputs.base_package_name }} ${VERSION}"
          fi

          # Install and use twine directly instead of PDM publish
          echo "Installing twine for direct publishing..."
          pip install twine

          echo "Publishing to PyPI using twine..."
          TWINE_USERNAME="__token__" TWINE_PASSWORD="$PYPI_TOKEN" python -m twine upload dist/*

          # Save the wheel file path for the release
          WHEEL_FILE=$(ls dist/*.whl | head -1)
          echo "WHEEL_FILE=${WHEEL_FILE}" >> $GITHUB_ENV

      - name: Prepare Simple Release Notes
        if: startsWith(github.ref, 'refs/tags/')
        run: |
          # Create release notes based on package type
          echo "# ${{ inputs.base_package_name }} v${VERSION}" > release_notes.md
          echo "" >> release_notes.md

          if [ "${{ inputs.package_name }}" = "pylume" ]; then
            echo "## Python SDK for lume - run macOS and Linux VMs on Apple Silicon" >> release_notes.md
            echo "" >> release_notes.md
            echo "This package provides Python bindings for the lume virtualization tool." >> release_notes.md
            echo "" >> release_notes.md
            echo "## Dependencies" >> release_notes.md
            echo "* lume binary: v${LUME_VERSION}" >> release_notes.md
          elif [ "${{ inputs.package_name }}" = "computer" ]; then
            echo "## Computer control library for the Computer Universal Automation (CUA) project" >> release_notes.md
            echo "" >> release_notes.md
            echo "## Dependencies" >> release_notes.md
            echo "* pylume: ${PYLUME_VERSION:-latest}" >> release_notes.md
          elif [ "${{ inputs.package_name }}" = "agent" ]; then
            echo "## Dependencies" >> release_notes.md
            echo "* cua-computer: ${COMPUTER_VERSION:-latest}" >> release_notes.md
            echo "* cua-som: ${SOM_VERSION:-latest}" >> release_notes.md
            echo "" >> release_notes.md
            echo "## Installation Options" >> release_notes.md
            echo "" >> release_notes.md
            echo "### Basic installation with Anthropic" >> release_notes.md
            echo '```bash' >> release_notes.md
            echo "pip install cua-agent[anthropic]==${VERSION}" >> release_notes.md
            echo '```' >> release_notes.md
            echo "" >> release_notes.md
            echo "### With SOM (recommended)" >> release_notes.md
            echo '```bash' >> release_notes.md
            echo "pip install cua-agent[som]==${VERSION}" >> release_notes.md
            echo '```' >> release_notes.md
            echo "" >> release_notes.md
            echo "### All features" >> release_notes.md
            echo '```bash' >> release_notes.md
            echo "pip install cua-agent[all]==${VERSION}" >> release_notes.md
            echo '```' >> release_notes.md
          elif [ "${{ inputs.package_name }}" = "som" ]; then
            echo "## Computer Vision and OCR library for detecting and analyzing UI elements" >> release_notes.md
            echo "" >> release_notes.md
            echo "This package provides enhanced UI understanding capabilities through computer vision and OCR." >> release_notes.md
          elif [ "${{ inputs.package_name }}" = "computer-server" ]; then
            echo "## Computer Server for the Computer Universal Automation (CUA) project" >> release_notes.md
            echo "" >> release_notes.md
            echo "A FastAPI-based server implementation for computer control." >> release_notes.md
            echo "" >> release_notes.md
            echo "## Dependencies" >> release_notes.md
            echo "* cua-computer: ${COMPUTER_VERSION:-latest}" >> release_notes.md
            echo "" >> release_notes.md
            echo "## Usage" >> release_notes.md
            echo '```bash' >> release_notes.md
            echo "# Run the server" >> release_notes.md
            echo "cua-computer-server" >> release_notes.md
            echo '```' >> release_notes.md
          elif [ "${{ inputs.package_name }}" = "mcp-server" ]; then
            echo "## MCP Server for the Computer-Use Agent (CUA)" >> release_notes.md
            echo "" >> release_notes.md
            echo "This package provides MCP (Model Context Protocol) integration for CUA agents, allowing them to be used with Claude Desktop, Cursor, and other MCP clients." >> release_notes.md
            echo "" >> release_notes.md
            echo "## Dependencies" >> release_notes.md
            echo "* cua-computer: ${COMPUTER_VERSION:-latest}" >> release_notes.md
            echo "* cua-agent: ${AGENT_VERSION:-latest}" >> release_notes.md
            echo "" >> release_notes.md
            echo "## Usage" >> release_notes.md
            echo '```bash' >> release_notes.md
            echo "# Run the MCP server directly" >> release_notes.md
            echo "cua-mcp-server" >> release_notes.md
            echo '```' >> release_notes.md
            echo "" >> release_notes.md
            echo "## Claude Desktop Integration" >> release_notes.md
            echo "Add to your Claude Desktop configuration (~/.config/claude-desktop/claude_desktop_config.json or OS-specific location):" >> release_notes.md
            echo '```json' >> release_notes.md
            echo '"mcpServers": {' >> release_notes.md
            echo '  "cua-agent": {' >> release_notes.md
            echo '    "command": "cua-mcp-server",' >> release_notes.md
            echo '    "args": [],' >> release_notes.md
            echo '    "env": {' >> release_notes.md
            echo '      "CUA_AGENT_LOOP": "OMNI",' >> release_notes.md
            echo '      "CUA_MODEL_PROVIDER": "ANTHROPIC",' >> release_notes.md
            echo '      "CUA_MODEL_NAME": "claude-3-opus-20240229",' >> release_notes.md
            echo '      "ANTHROPIC_API_KEY": "your-api-key",' >> release_notes.md
            echo '      "PYTHONIOENCODING": "utf-8"' >> release_notes.md
            echo '    }' >> release_notes.md
            echo '  }' >> release_notes.md
            echo '}' >> release_notes.md
            echo '```' >> release_notes.md
          fi

          # Add installation section if not agent (which has its own installation section)
          if [ "${{ inputs.package_name }}" != "agent" ]; then
            echo "" >> release_notes.md
            echo "## Installation" >> release_notes.md
            echo '```bash' >> release_notes.md
            echo "pip install ${{ inputs.base_package_name }}==${VERSION}" >> release_notes.md
            echo '```' >> release_notes.md
          fi

          echo "Release notes created:"
          cat release_notes.md

      - name: Create GitHub Release
        uses: softprops/action-gh-release@v2
        if: startsWith(github.ref, 'refs/tags/')
        with:
          name: "${{ inputs.base_package_name }} v${{ env.VERSION }}"
          body_path: release_notes.md
          files: ${{ inputs.package_dir }}/${{ env.WHEEL_FILE }}
          draft: false
          prerelease: false
          make_latest: ${{ inputs.package_name == 'lume' }}
        env:
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/adapters/human_adapter.py:
--------------------------------------------------------------------------------

```python
import asyncio
import os
from typing import Any, AsyncIterator, Dict, Iterator, List

import requests
from litellm import acompletion, completion
from litellm.llms.custom_llm import CustomLLM
from litellm.types.utils import GenericStreamingChunk, ModelResponse


class HumanAdapter(CustomLLM):
    """Human Adapter for human-in-the-loop completions.

    This adapter sends completion requests to a human completion server
    where humans can review and respond to AI requests.
    """

    def __init__(self, base_url: str | None = None, timeout: float = 300.0, **kwargs):
        """Initialize the human adapter.

        Args:
            base_url: Base URL for the human completion server.
                     Defaults to HUMAN_BASE_URL environment variable or http://localhost:8002
            timeout: Timeout in seconds for waiting for human response
            **kwargs: Additional arguments
        """
        super().__init__()
        self.base_url = base_url or os.getenv("HUMAN_BASE_URL", "http://localhost:8002")
        self.timeout = timeout

        # Ensure base_url doesn't end with slash
        self.base_url = self.base_url.rstrip("/")

    def _queue_completion(self, messages: List[Dict[str, Any]], model: str) -> str:
        """Queue a completion request and return the call ID.

        Args:
            messages: Messages in OpenAI format
            model: Model name

        Returns:
            Call ID for tracking the request

        Raises:
            Exception: If queueing fails
        """
        try:
            response = requests.post(
                f"{self.base_url}/queue", json={"messages": messages, "model": model}, timeout=10
            )
            response.raise_for_status()
            return response.json()["id"]
        except requests.RequestException as e:
            raise Exception(f"Failed to queue completion request: {e}")

    def _wait_for_completion(self, call_id: str) -> Dict[str, Any]:
        """Wait for human to complete the call.

        Args:
            call_id: ID of the queued completion call

        Returns:
            Dict containing response and/or tool_calls

        Raises:
            TimeoutError: If timeout is exceeded
            Exception: If completion fails
        """
        import time

        start_time = time.time()

        while True:
            try:
                # Check status
                status_response = requests.get(f"{self.base_url}/status/{call_id}")
                status_response.raise_for_status()
                status_data = status_response.json()

                if status_data["status"] == "completed":
                    result = {}
                    if "response" in status_data and status_data["response"]:
                        result["response"] = status_data["response"]
                    if "tool_calls" in status_data and status_data["tool_calls"]:
                        result["tool_calls"] = status_data["tool_calls"]
                    return result
                elif status_data["status"] == "failed":
                    error_msg = status_data.get("error", "Unknown error")
                    raise Exception(f"Completion failed: {error_msg}")

                # Check timeout
                if time.time() - start_time > self.timeout:
                    raise TimeoutError(
                        f"Timeout waiting for human response after {self.timeout} seconds"
                    )

                # Wait before checking again
                time.sleep(1.0)

            except requests.RequestException as e:
                if time.time() - start_time > self.timeout:
                    raise TimeoutError(f"Timeout waiting for human response: {e}")
                # Continue trying if we haven't timed out
                time.sleep(1.0)

    async def _async_wait_for_completion(self, call_id: str) -> Dict[str, Any]:
        """Async version of wait_for_completion.

        Args:
            call_id: ID of the queued completion call

        Returns:
            Dict containing response and/or tool_calls

        Raises:
            TimeoutError: If timeout is exceeded
            Exception: If completion fails
        """
        import time

        import aiohttp

        start_time = time.time()

        async with aiohttp.ClientSession() as session:
            while True:
                try:
                    # Check status
                    async with session.get(f"{self.base_url}/status/{call_id}") as response:
                        response.raise_for_status()
                        status_data = await response.json()

                    if status_data["status"] == "completed":
                        result = {}
                        if "response" in status_data and status_data["response"]:
                            result["response"] = status_data["response"]
                        if "tool_calls" in status_data and status_data["tool_calls"]:
                            result["tool_calls"] = status_data["tool_calls"]
                        return result
                    elif status_data["status"] == "failed":
                        error_msg = status_data.get("error", "Unknown error")
                        raise Exception(f"Completion failed: {error_msg}")

                    # Check timeout
                    if time.time() - start_time > self.timeout:
                        raise TimeoutError(
                            f"Timeout waiting for human response after {self.timeout} seconds"
                        )

                    # Wait before checking again
                    await asyncio.sleep(1.0)

                except Exception as e:
                    if time.time() - start_time > self.timeout:
                        raise TimeoutError(f"Timeout waiting for human response: {e}")
                    # Continue trying if we haven't timed out
                    await asyncio.sleep(1.0)

    def _generate_response(self, messages: List[Dict[str, Any]], model: str) -> Dict[str, Any]:
        """Generate a human response for the given messages.

        Args:
            messages: Messages in OpenAI format
            model: Model name

        Returns:
            Dict containing response and/or tool_calls
        """
        # Queue the completion request
        call_id = self._queue_completion(messages, model)

        # Wait for human response
        response = self._wait_for_completion(call_id)

        return response

    async def _async_generate_response(
        self, messages: List[Dict[str, Any]], model: str
    ) -> Dict[str, Any]:
        """Async version of _generate_response.

        Args:
            messages: Messages in OpenAI format
            model: Model name

        Returns:
            Dict containing response and/or tool_calls
        """
        # Queue the completion request (sync operation)
        call_id = self._queue_completion(messages, model)

        # Wait for human response (async)
        response = await self._async_wait_for_completion(call_id)

        return response

    def completion(self, *args, **kwargs) -> ModelResponse:
        """Synchronous completion method.

        Returns:
            ModelResponse with human-generated text or tool calls
        """
        messages = kwargs.get("messages", [])
        model = kwargs.get("model", "human")

        # Generate human response
        human_response_data = self._generate_response(messages, model)

        # Create ModelResponse with proper structure
        import time
        import uuid

        from litellm.types.utils import Choices, Message, ModelResponse

        # Create message content based on response type
        if "tool_calls" in human_response_data and human_response_data["tool_calls"]:
            # Tool calls response
            message = Message(
                role="assistant",
                content=human_response_data.get("response", ""),
                tool_calls=human_response_data["tool_calls"],
            )
        else:
            # Text response
            message = Message(role="assistant", content=human_response_data.get("response", ""))

        choice = Choices(finish_reason="stop", index=0, message=message)

        result = ModelResponse(
            id=f"human-{uuid.uuid4()}",
            choices=[choice],
            created=int(time.time()),
            model=f"human/{model}",
            object="chat.completion",
        )

        return result

    async def acompletion(self, *args, **kwargs) -> ModelResponse:
        """Asynchronous completion method.

        Returns:
            ModelResponse with human-generated text or tool calls
        """
        messages = kwargs.get("messages", [])
        model = kwargs.get("model", "human")

        # Generate human response
        human_response_data = await self._async_generate_response(messages, model)

        # Create ModelResponse with proper structure
        import time
        import uuid

        from litellm.types.utils import Choices, Message, ModelResponse

        # Create message content based on response type
        if "tool_calls" in human_response_data and human_response_data["tool_calls"]:
            # Tool calls response
            message = Message(
                role="assistant",
                content=human_response_data.get("response", ""),
                tool_calls=human_response_data["tool_calls"],
            )
        else:
            # Text response
            message = Message(role="assistant", content=human_response_data.get("response", ""))

        choice = Choices(finish_reason="stop", index=0, message=message)

        result = ModelResponse(
            id=f"human-{uuid.uuid4()}",
            choices=[choice],
            created=int(time.time()),
            model=f"human/{model}",
            object="chat.completion",
        )

        return result

    def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]:
        """Synchronous streaming method.

        Yields:
            Streaming chunks with human-generated text or tool calls
        """
        messages = kwargs.get("messages", [])
        model = kwargs.get("model", "human")

        # Generate human response
        human_response_data = self._generate_response(messages, model)

        import time

        # Handle tool calls vs text response
        if "tool_calls" in human_response_data and human_response_data["tool_calls"]:
            # Stream tool calls as a single chunk
            generic_chunk: GenericStreamingChunk = {
                "finish_reason": "tool_calls",
                "index": 0,
                "is_finished": True,
                "text": human_response_data.get("response", ""),
                "tool_use": human_response_data["tool_calls"],
                "usage": {"completion_tokens": 1, "prompt_tokens": 0, "total_tokens": 1},
            }
            yield generic_chunk
        else:
            # Stream text response
            response_text = human_response_data.get("response", "")
            generic_chunk: GenericStreamingChunk = {
                "finish_reason": "stop",
                "index": 0,
                "is_finished": True,
                "text": response_text,
                "tool_use": None,
                "usage": {
                    "completion_tokens": len(response_text.split()),
                    "prompt_tokens": 0,
                    "total_tokens": len(response_text.split()),
                },
            }
            yield generic_chunk

    async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]:
        """Asynchronous streaming method.

        Yields:
            Streaming chunks with human-generated text or tool calls
        """
        messages = kwargs.get("messages", [])
        model = kwargs.get("model", "human")

        # Generate human response
        human_response = await self._async_generate_response(messages, model)

        # Return as single streaming chunk
        generic_streaming_chunk: GenericStreamingChunk = {
            "finish_reason": "stop",
            "index": 0,
            "is_finished": True,
            "text": human_response,
            "tool_use": None,
            "usage": {
                "completion_tokens": len(human_response.split()),
                "prompt_tokens": 0,
                "total_tokens": len(human_response.split()),
            },
        }

        yield generic_streaming_chunk

```

--------------------------------------------------------------------------------
/examples/tracing_examples.py:
--------------------------------------------------------------------------------

```python
"""
Examples demonstrating the Computer.tracing API for recording sessions.

This module shows various use cases for the new Computer.tracing functionality,
including training data collection, debugging, and compliance recording.
"""

import asyncio
import logging
from pathlib import Path

from agent import ComputerAgent
from computer import Computer


async def basic_tracing_example():
    """
    Basic example showing how to use Computer.tracing for recording a simple session.
    """
    print("=== Basic Tracing Example ===")

    # Initialize computer
    computer = Computer(os_type="macos", provider_type="lume")
    await computer.run()

    try:
        # Start tracing with basic configuration
        await computer.tracing.start(
            {"screenshots": True, "api_calls": True, "metadata": True, "name": "basic_session"}
        )

        print("Tracing started...")

        # Perform some computer operations
        await computer.interface.move_cursor(100, 100)
        await computer.interface.left_click()
        await computer.interface.type_text("Hello, tracing!")
        await computer.interface.press_key("enter")

        # Add custom metadata
        await computer.tracing.add_metadata("session_type", "basic_demo")
        await computer.tracing.add_metadata("user_notes", "Testing basic functionality")

        # Stop tracing and save
        trace_path = await computer.tracing.stop({"format": "zip"})
        print(f"Trace saved to: {trace_path}")

    finally:
        await computer.stop()


async def agent_tracing_example():
    """
    Example showing how to use tracing with ComputerAgent for enhanced session recording.
    """
    print("=== Agent with Tracing Example ===")

    # Initialize computer and agent
    computer = Computer(os_type="macos", provider_type="lume")
    await computer.run()

    try:
        # Start comprehensive tracing
        await computer.tracing.start(
            {
                "screenshots": True,
                "api_calls": True,
                "accessibility_tree": True,  # Include accessibility data for training
                "metadata": True,
                "name": "agent_session",
            }
        )

        # Create agent
        agent = ComputerAgent(
            model="openai/computer-use-preview", tools=[computer], verbosity=logging.INFO
        )

        # Add metadata about the agent session
        await computer.tracing.add_metadata("agent_model", "openai/computer-use-preview")
        await computer.tracing.add_metadata("task_type", "web_search")

        # Run agent task
        async for message in agent.run(
            "Open a web browser and search for 'computer use automation'"
        ):
            print(f"Agent: {message}")

        # Stop tracing
        trace_path = await computer.tracing.stop({"format": "zip"})
        print(f"Agent trace saved to: {trace_path}")

    finally:
        await computer.stop()


async def custom_agent_tracing_example():
    """
    Example showing tracing with custom agent implementations.
    """
    print("=== Custom Agent Tracing Example ===")

    computer = Computer(os_type="macos", provider_type="lume")
    await computer.run()

    try:
        # Start tracing with custom path
        trace_dir = Path.cwd() / "custom_traces" / "my_agent_session"
        await computer.tracing.start(
            {
                "screenshots": True,
                "api_calls": True,
                "accessibility_tree": False,
                "metadata": True,
                "path": str(trace_dir),
            }
        )

        # Custom agent logic using direct computer calls
        await computer.tracing.add_metadata("session_type", "custom_agent")
        await computer.tracing.add_metadata("purpose", "RPA_workflow")

        # Take initial screenshot
        screenshot = await computer.interface.screenshot()

        # Simulate RPA workflow
        await computer.interface.move_cursor(500, 300)
        await computer.interface.left_click()
        await computer.interface.type_text("automation workflow test")

        # Add workflow checkpoint
        await computer.tracing.add_metadata("checkpoint", "text_input_complete")

        await computer.interface.hotkey("command", "a")  # Select all
        await computer.interface.hotkey("command", "c")  # Copy

        # Stop tracing and save as directory
        trace_path = await computer.tracing.stop({"format": "dir"})
        print(f"Custom agent trace saved to: {trace_path}")

    finally:
        await computer.stop()


async def training_data_collection_example():
    """
    Example for collecting training data with rich context.
    """
    print("=== Training Data Collection Example ===")

    computer = Computer(os_type="macos", provider_type="lume")
    await computer.run()

    try:
        # Start tracing optimized for training data
        await computer.tracing.start(
            {
                "screenshots": True,  # Essential for visual training
                "api_calls": True,  # Capture action sequences
                "accessibility_tree": True,  # Rich semantic context
                "metadata": True,  # Custom annotations
                "name": "training_session",
            }
        )

        # Add training metadata
        await computer.tracing.add_metadata("data_type", "training")
        await computer.tracing.add_metadata("task_category", "ui_automation")
        await computer.tracing.add_metadata("difficulty", "intermediate")
        await computer.tracing.add_metadata("annotator", "human_expert")

        # Simulate human demonstration
        await computer.interface.screenshot()  # Baseline screenshot

        # Step 1: Navigate to application
        await computer.tracing.add_metadata("step", "1_navigate_to_app")
        await computer.interface.move_cursor(100, 50)
        await computer.interface.left_click()

        # Step 2: Input data
        await computer.tracing.add_metadata("step", "2_input_data")
        await computer.interface.type_text("training example data")

        # Step 3: Process
        await computer.tracing.add_metadata("step", "3_process")
        await computer.interface.press_key("tab")
        await computer.interface.press_key("enter")

        # Final metadata
        await computer.tracing.add_metadata("success", True)
        await computer.tracing.add_metadata("completion_time", "45_seconds")

        trace_path = await computer.tracing.stop()
        print(f"Training data collected: {trace_path}")

    finally:
        await computer.stop()


async def debugging_session_example():
    """
    Example for debugging agent behavior with detailed tracing.
    """
    print("=== Debugging Session Example ===")

    computer = Computer(os_type="macos", provider_type="lume")
    await computer.run()

    try:
        # Start tracing for debugging
        await computer.tracing.start(
            {
                "screenshots": True,
                "api_calls": True,
                "accessibility_tree": True,
                "metadata": True,
                "name": "debug_session",
            }
        )

        # Debug metadata
        await computer.tracing.add_metadata("session_type", "debugging")
        await computer.tracing.add_metadata("issue", "click_target_detection")
        await computer.tracing.add_metadata("expected_behavior", "click_on_button")

        try:
            # Problematic sequence that needs debugging
            await computer.interface.move_cursor(200, 150)
            await computer.interface.left_click()

            # This might fail - let's trace it
            await computer.interface.type_text("debug test")
            await computer.tracing.add_metadata("action_result", "successful_typing")

        except Exception as e:
            # Record the error in tracing
            await computer.tracing.add_metadata("error_encountered", str(e))
            await computer.tracing.add_metadata("error_type", type(e).__name__)
            print(f"Error occurred: {e}")

        # Stop tracing
        trace_path = await computer.tracing.stop()
        print(f"Debug trace saved: {trace_path}")
        print("Use this trace to analyze the failure and improve the agent")

    finally:
        await computer.stop()


async def human_in_the_loop_example():
    """
    Example for recording mixed human/agent sessions.
    """
    print("=== Human-in-the-Loop Example ===")

    computer = Computer(os_type="macos", provider_type="lume")
    await computer.run()

    try:
        # Start tracing for hybrid session
        await computer.tracing.start(
            {
                "screenshots": True,
                "api_calls": True,
                "metadata": True,
                "name": "human_agent_collaboration",
            }
        )

        # Initial agent phase
        await computer.tracing.add_metadata("phase", "agent_autonomous")
        await computer.tracing.add_metadata("agent_model", "computer-use-preview")

        # Agent performs initial task
        await computer.interface.move_cursor(300, 200)
        await computer.interface.left_click()
        await computer.interface.type_text("automated input")

        # Transition to human intervention
        await computer.tracing.add_metadata("phase", "human_intervention")
        await computer.tracing.add_metadata("intervention_reason", "complex_ui_element")

        print("Human intervention phase - manual actions will be recorded...")
        # At this point, human can take control while tracing continues

        # Simulate human input (in practice, this would be actual human interaction)
        await computer.interface.move_cursor(500, 400)
        await computer.interface.double_click()
        await computer.tracing.add_metadata("human_action", "double_click_complex_element")

        # Back to agent
        await computer.tracing.add_metadata("phase", "agent_completion")
        await computer.interface.press_key("enter")

        trace_path = await computer.tracing.stop()
        print(f"Human-agent collaboration trace saved: {trace_path}")

    finally:
        await computer.stop()


async def performance_monitoring_example():
    """
    Example for performance monitoring and analysis.
    """
    print("=== Performance Monitoring Example ===")

    computer = Computer(os_type="macos", provider_type="lume")
    await computer.run()

    try:
        # Start tracing for performance analysis
        await computer.tracing.start(
            {
                "screenshots": False,  # Skip screenshots for performance
                "api_calls": True,
                "metadata": True,
                "name": "performance_test",
            }
        )

        # Performance test metadata
        await computer.tracing.add_metadata("test_type", "performance_benchmark")
        await computer.tracing.add_metadata("expected_duration", "< 30 seconds")

        import time

        start_time = time.time()

        # Perform a series of rapid actions
        for i in range(10):
            await computer.tracing.add_metadata("iteration", i)
            await computer.interface.move_cursor(100 + i * 50, 100)
            await computer.interface.left_click()
            await computer.interface.type_text(f"Test {i}")
            await computer.interface.press_key("tab")

        end_time = time.time()

        # Record performance metrics
        await computer.tracing.add_metadata(
            "actual_duration", f"{end_time - start_time:.2f} seconds"
        )
        await computer.tracing.add_metadata(
            "actions_per_second", f"{40 / (end_time - start_time):.2f}"
        )

        trace_path = await computer.tracing.stop()
        print(f"Performance trace saved: {trace_path}")

    finally:
        await computer.stop()


async def main():
    """
    Run all tracing examples.
    """
    print("Computer.tracing API Examples")
    print("=" * 50)

    examples = [
        basic_tracing_example,
        agent_tracing_example,
        custom_agent_tracing_example,
        training_data_collection_example,
        debugging_session_example,
        human_in_the_loop_example,
        performance_monitoring_example,
    ]

    for example in examples:
        try:
            await example()
            print()
        except Exception as e:
            print(f"Error in {example.__name__}: {e}")
            print()

    print("All examples completed!")


if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/.github/scripts/tests/test_get_pyproject_version.py:
--------------------------------------------------------------------------------

```python
"""
Comprehensive tests for get_pyproject_version.py script using unittest.

This test suite covers:
- Version matching validation
- Error handling for missing versions
- Invalid input handling
- File not found scenarios
- Malformed TOML handling
"""

import sys
import tempfile
import unittest
from io import StringIO
from pathlib import Path
from unittest.mock import patch

# Add parent directory to path to import the module
sys.path.insert(0, str(Path(__file__).parent.parent))

# Import after path is modified
import get_pyproject_version


class TestGetPyprojectVersion(unittest.TestCase):
    """Test suite for get_pyproject_version.py functionality."""

    def setUp(self):
        """Reset sys.argv before each test."""
        self.original_argv = sys.argv.copy()

    def tearDown(self):
        """Restore sys.argv after each test."""
        sys.argv = self.original_argv

    def create_pyproject_toml(self, version: str) -> Path:
        """Helper to create a temporary pyproject.toml file with a given version."""
        temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False)
        temp_file.write(
            f"""
[project]
name = "test-project"
version = "{version}"
description = "A test project"
"""
        )
        temp_file.close()
        return Path(temp_file.name)

    def create_pyproject_toml_no_version(self) -> Path:
        """Helper to create a pyproject.toml without a version field."""
        temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False)
        temp_file.write(
            """
[project]
name = "test-project"
description = "A test project without version"
"""
        )
        temp_file.close()
        return Path(temp_file.name)

    def create_pyproject_toml_no_project(self) -> Path:
        """Helper to create a pyproject.toml without a project section."""
        temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False)
        temp_file.write(
            """
[tool.poetry]
name = "test-project"
version = "1.0.0"
"""
        )
        temp_file.close()
        return Path(temp_file.name)

    def create_malformed_toml(self) -> Path:
        """Helper to create a malformed TOML file."""
        temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False)
        temp_file.write(
            """
[project
name = "test-project
version = "1.0.0"
"""
        )
        temp_file.close()
        return Path(temp_file.name)

    # Test: Successful version match
    def test_matching_versions(self):
        """Test that matching versions result in success."""
        pyproject_file = self.create_pyproject_toml("1.2.3")

        try:
            sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.2.3"]

            # Capture stdout
            captured_output = StringIO()
            with patch("sys.stdout", captured_output):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 0)
            self.assertIn("✅ Version consistency check passed: 1.2.3", captured_output.getvalue())
        finally:
            pyproject_file.unlink()

    # Test: Version mismatch
    def test_version_mismatch(self):
        """Test that mismatched versions result in failure with appropriate error message."""
        pyproject_file = self.create_pyproject_toml("1.2.3")

        try:
            sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.2.4"]

            # Capture stderr
            captured_error = StringIO()
            with patch("sys.stderr", captured_error):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 1)
            error_output = captured_error.getvalue()
            self.assertIn("❌ Version mismatch detected!", error_output)
            self.assertIn("pyproject.toml version: 1.2.3", error_output)
            self.assertIn("Expected version: 1.2.4", error_output)
            self.assertIn("Please update pyproject.toml to version 1.2.4", error_output)
        finally:
            pyproject_file.unlink()

    # Test: Missing version in pyproject.toml
    def test_missing_version_field(self):
        """Test handling of pyproject.toml without a version field."""
        pyproject_file = self.create_pyproject_toml_no_version()

        try:
            sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.0.0"]

            captured_error = StringIO()
            with patch("sys.stderr", captured_error):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 1)
            self.assertIn("❌ ERROR: No version found in pyproject.toml", captured_error.getvalue())
        finally:
            pyproject_file.unlink()

    # Test: Missing project section
    def test_missing_project_section(self):
        """Test handling of pyproject.toml without a project section."""
        pyproject_file = self.create_pyproject_toml_no_project()

        try:
            sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.0.0"]

            captured_error = StringIO()
            with patch("sys.stderr", captured_error):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 1)
            self.assertIn("❌ ERROR: No version found in pyproject.toml", captured_error.getvalue())
        finally:
            pyproject_file.unlink()

    # Test: File not found
    def test_file_not_found(self):
        """Test handling of non-existent pyproject.toml file."""
        sys.argv = ["get_pyproject_version.py", "/nonexistent/pyproject.toml", "1.0.0"]

        with self.assertRaises(SystemExit) as cm:
            get_pyproject_version.main()

        self.assertEqual(cm.exception.code, 1)

    # Test: Malformed TOML
    def test_malformed_toml(self):
        """Test handling of malformed TOML file."""
        pyproject_file = self.create_malformed_toml()

        try:
            sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.0.0"]

            with self.assertRaises(SystemExit) as cm:
                get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 1)
        finally:
            pyproject_file.unlink()

    # Test: Incorrect number of arguments - too few
    def test_too_few_arguments(self):
        """Test that providing too few arguments results in usage error."""
        sys.argv = ["get_pyproject_version.py", "pyproject.toml"]

        captured_error = StringIO()
        with patch("sys.stderr", captured_error):
            with self.assertRaises(SystemExit) as cm:
                get_pyproject_version.main()

        self.assertEqual(cm.exception.code, 1)
        self.assertIn(
            "Usage: python get_pyproject_version.py <pyproject_path> <expected_version>",
            captured_error.getvalue(),
        )

    # Test: Incorrect number of arguments - too many
    def test_too_many_arguments(self):
        """Test that providing too many arguments results in usage error."""
        sys.argv = ["get_pyproject_version.py", "pyproject.toml", "1.0.0", "extra"]

        captured_error = StringIO()
        with patch("sys.stderr", captured_error):
            with self.assertRaises(SystemExit) as cm:
                get_pyproject_version.main()

        self.assertEqual(cm.exception.code, 1)
        self.assertIn(
            "Usage: python get_pyproject_version.py <pyproject_path> <expected_version>",
            captured_error.getvalue(),
        )

    # Test: No arguments
    def test_no_arguments(self):
        """Test that providing no arguments results in usage error."""
        sys.argv = ["get_pyproject_version.py"]

        captured_error = StringIO()
        with patch("sys.stderr", captured_error):
            with self.assertRaises(SystemExit) as cm:
                get_pyproject_version.main()

        self.assertEqual(cm.exception.code, 1)
        self.assertIn(
            "Usage: python get_pyproject_version.py <pyproject_path> <expected_version>",
            captured_error.getvalue(),
        )

    # Test: Version with pre-release tags
    def test_version_with_prerelease_tags(self):
        """Test matching versions with pre-release tags like alpha, beta, rc."""
        pyproject_file = self.create_pyproject_toml("1.2.3-rc.1")

        try:
            sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.2.3-rc.1"]

            captured_output = StringIO()
            with patch("sys.stdout", captured_output):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 0)
            self.assertIn(
                "✅ Version consistency check passed: 1.2.3-rc.1", captured_output.getvalue()
            )
        finally:
            pyproject_file.unlink()

    # Test: Version with build metadata
    def test_version_with_build_metadata(self):
        """Test matching versions with build metadata."""
        pyproject_file = self.create_pyproject_toml("1.2.3+build.123")

        try:
            sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.2.3+build.123"]

            captured_output = StringIO()
            with patch("sys.stdout", captured_output):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 0)
            self.assertIn(
                "✅ Version consistency check passed: 1.2.3+build.123", captured_output.getvalue()
            )
        finally:
            pyproject_file.unlink()

    # Test: Various semantic version formats
    def test_semantic_version_0_0_1(self):
        """Test semantic version 0.0.1."""
        self._test_version_format("0.0.1")

    def test_semantic_version_1_0_0(self):
        """Test semantic version 1.0.0."""
        self._test_version_format("1.0.0")

    def test_semantic_version_10_20_30(self):
        """Test semantic version 10.20.30."""
        self._test_version_format("10.20.30")

    def test_semantic_version_alpha(self):
        """Test semantic version with alpha tag."""
        self._test_version_format("1.2.3-alpha")

    def test_semantic_version_beta(self):
        """Test semantic version with beta tag."""
        self._test_version_format("1.2.3-beta.1")

    def test_semantic_version_rc_with_build(self):
        """Test semantic version with rc and build metadata."""
        self._test_version_format("1.2.3-rc.1+build.456")

    def _test_version_format(self, version: str):
        """Helper method to test various semantic version formats."""
        pyproject_file = self.create_pyproject_toml(version)

        try:
            sys.argv = ["get_pyproject_version.py", str(pyproject_file), version]

            captured_output = StringIO()
            with patch("sys.stdout", captured_output):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 0)
            self.assertIn(
                f"✅ Version consistency check passed: {version}", captured_output.getvalue()
            )
        finally:
            pyproject_file.unlink()

    # Test: Empty version string
    def test_empty_version_string(self):
        """Test handling of empty version string."""
        pyproject_file = self.create_pyproject_toml("")

        try:
            sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.0.0"]

            captured_error = StringIO()
            with patch("sys.stderr", captured_error):
                with self.assertRaises(SystemExit) as cm:
                    get_pyproject_version.main()

            self.assertEqual(cm.exception.code, 1)
            # Empty string is falsy, so it should trigger error
            self.assertIn("❌", captured_error.getvalue())
        finally:
            pyproject_file.unlink()


class TestSuiteInfo(unittest.TestCase):
    """Test suite metadata."""

    def test_suite_info(self):
        """Display test suite information."""
        print("\n" + "=" * 70)
        print("Test Suite: get_pyproject_version.py")
        print("Framework: unittest (Python built-in)")
        print("TOML Library: tomllib (Python 3.11+ built-in)")
        print("=" * 70)
        self.assertTrue(True)


if __name__ == "__main__":
    # Run tests with verbose output
    unittest.main(verbosity=2)

```

--------------------------------------------------------------------------------
/Development.md:
--------------------------------------------------------------------------------

```markdown
# Getting Started

## Project Structure

The project is organized as a monorepo with these main packages:

- `libs/core/` - Base package with telemetry support
- `libs/computer/` - Computer-use interface (CUI) library
- `libs/agent/` - AI agent library with multi-provider support
- `libs/som/` - Set-of-Mark parser
- `libs/computer-server/` - Server component for VM
- `libs/lume/` - Lume CLI

These packages are part of a uv workspace which manages a shared virtual environment and dependencies.

## Local Development Setup

1. Install Lume CLI:

   ```bash
   /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/trycua/cua/main/libs/lume/scripts/install.sh)"
   ```

2. Clone the repository:

   ```bash
   git clone https://github.com/trycua/cua.git
   cd cua
   ```

3. Create a `.env.local` file in the root directory with your API keys:

   ```bash
   # Required for Anthropic provider
   ANTHROPIC_API_KEY=your_anthropic_key_here

   # Required for OpenAI provider
   OPENAI_API_KEY=your_openai_key_here
   ```

4. Install Node.js dependencies for Prettier and other scripts:

   ```bash
   # Install pnpm if you don't have it
   npm install -g pnpm

   # Install all JS/TS dependencies
   pnpm install
   ```

5. Install Python dependencies and workspace packages:

   ```bash
   # First install uv if you don't have it
   pip install uv

    # Then install all Python dependencies
   uv sync
   ```

6. Open the workspace in VSCode or Cursor:

   ```bash
   # For Cua Python development
   code .vscode/py.code-workspace

   # For Lume (Swift) development
   code .vscode/lume.code-workspace
   ```

7. Install Pre-commit hooks:

   This ensures code formatting and validation run automatically on each commit.

   ```bash
   uv run pre-commit install
   ```

Using the workspace file is strongly recommended as it:

- Sets up correct Python environments for each package
- Configures proper import paths
- Enables debugging configurations
- Maintains consistent settings across packages

## Lume Development

Refer to the [Lume README](./libs/lume/Development.md) for instructions on how to develop the Lume CLI.

## Python Development

### Setup

Install all of workspace dependencies with a single command:

```bash
uv sync
```

This installs all dependencies in the virtual environment `.venv`.

Each Cua package is installed in editable mode, which means changes to the source code are immediately reflected in the installed package.

The `.venv` environment is also configured as the default VS Code Python interpreter in `.vscode/settings.json`.

### Running Python Scripts

To run Python scripts in the workspace, use the `uv run` command:

```bash
uv run python examples/agent_examples.py
```

Or activate the virtual environment manually:

```bash
source .venv/bin/activate
python examples/agent_examples.py
```

## Running Examples

The Python workspace includes launch configurations for all packages:

- "Run Computer Examples" - Runs computer examples
- "Run Agent Examples" - Runs agent examples
- "SOM" configurations - Various settings for running SOM

To run examples from VSCode / Cursor:

1. Press F5 or use the Run/Debug view
2. Select the desired configuration

The workspace also includes compound launch configurations:

- "Run Computer Examples + Server" - Runs both the Computer Examples and Server simultaneously

## Code Formatting Standards

The Cua project follows strict code formatting standards to ensure consistency across all packages.

### Python Code Formatting

#### Tools

The project uses the following tools for code formatting and linting:

- **[Black](https://black.readthedocs.io/)**: Code formatter
- **[isort](https://pycqa.github.io/isort/)**: Import sorter
- **[Ruff](https://beta.ruff.rs/docs/)**: Fast linter and formatter
- **[MyPy](https://mypy.readthedocs.io/)**: Static type checker

These tools are automatically installed when you set up the development environment.

#### Configuration

The formatting configuration is defined in the root `pyproject.toml` file:

```toml
[tool.black]
line-length = 100
target-version = ["py311"]

[tool.ruff]
fix = true
line-length = 100
target-version = "py311"

[tool.ruff.lint]
select = ["E", "F", "B", "I"]
ignore = [
    "E501", "E402", "I001", "I002", "B007", "B023", "B024", "B027", "B028",
    "B904", "B905", "E711", "E712", "E722", "E731", "F401", "F403", "F405",
    "F811", "F821", "F841"
]
fix = true

[tool.ruff.format]
docstring-code-format = true

[tool.mypy]
check_untyped_defs = true
disallow_untyped_defs = true
ignore_missing_imports = true
python_version = "3.11"
show_error_codes = true
strict = true
warn_return_any = true
warn_unused_ignores = false

[tool.isort]
profile = "black"
```

#### Key Formatting Rules

- **Line Length**: Maximum of 100 characters
- **Python Version**: Code should be compatible with Python 3.11+
- **Imports**: Automatically sorted (using Ruff's "I" rule)
- **Type Hints**: Required for all function definitions (strict mypy mode)

#### IDE Integration

The repository includes VSCode workspace configurations that enable automatic formatting. When you open the workspace files (as recommended in the setup instructions), the correct formatting settings are automatically applied.

##### Python-specific settings

These are configured in `.vscode/settings.json`:

```json
{
  "python.defaultInterpreterPath": "${workspaceFolder}/.venv/bin/python",
  "editor.formatOnSave": true,
  "editor.codeActionsOnSave": {
    "source.organizeImports": "explicit",
    "source.fixAll": "explicit"
  },
  "[python]": {
    "editor.defaultFormatter": "ms-python.black-formatter"
  },
  "python.formatting.provider": "black",
  "ruff.configuration": "${workspaceFolder}/pyproject.toml",
  "mypy-type-checker.args": ["--config-file", "${workspaceFolder}/pyproject.toml"],
  "mypy-type-checker.path": ["${workspaceFolder}"]
}
```

##### **JS/TS-specific settings**

```json
"[javascript][typescript][typescriptreact][javascriptreact]": {
  "editor.defaultFormatter": "esbenp.prettier-vscode"
}
```

- Ensures Prettier is used for all JS/TS files for consistent formatting.

Recommended VS Code Extensions

- **Black Formatter** – `ms-python.black-formatter`
- **Ruff** – `charliermarsh.ruff`
- **Pylance** – `ms-python.vscode-pylance`
- **isort** – `ms-python.isort`
- **Prettier** – `esbenp.prettier-vscode`
- **Mypy Type Checker** – `ms-python.mypy-type-checker`

> VSCode will automatically suggest installing the recommended extensions when you open the workspace.

#### Manual Formatting

To manually format code:

```bash
# Format all Python files using Black
uv run black .

# Sort imports using isort
uv run isort .

# Run Ruff linter with auto-fix
uv run ruff check .

# Run type checking with MyPy
uv run mypy .
```

#### Pre-commit Validation

Before submitting a pull request, ensure your code passes all formatting checks:

**Option 1: Run all hooks via pre-commit (all in a single command)**

```bash
# Run hooks on staged files (recommended for quick checks)
uv run pre-commit run
```

- Automatically runs Black, Ruff, isort, Mypy, Prettier, and any other configured hooks.

**Option 2: Run individual tools manually**

```bash
# Python checks
uv run black --check .
uv run isort --check .
uv run ruff check .
uv run mypy .

# JavaScript/TypeScript checks
uv run prettier --check "**/*.{ts,tsx,js,jsx,json,md,yaml,yml}"

# TypeScript typecheck
node ./scripts/typescript-typecheck.js
```

### JavaScript / TypeScript Formatting (Prettier)

The project uses **Prettier** to ensure consistent formatting across all JS/TS/JSON/Markdown/YAML files.

#### Installation

All Node.js dependencies are managed via `pnpm`. Make sure you have run:

```bash
# Install pnpm if you don't have it
npm install -g pnpm

# Install project dependencies
pnpm install
```

This installs Prettier and other JS/TS dependencies defined in `package.json`.

#### Usage

- **Check formatting** (without making changes):

```bash
pnpm prettier:check
```

- **Automatically format files**:

```bash
pnpm prettier:format
```

#### Type Checking (TypeScript)

- Run the TypeScript type checker:

```bash
node ./scripts/typescript-typecheck.js
```

#### VSCode Integration

- The workspace config ensures Prettier is used automatically for JS/TS/JSON/Markdown/YAML files.
- Recommended extension: Prettier – Code Formatter
- Ensure `editor.formatOnSave` is enabled in VSCode for automatic formatting.

### Swift Code (Lume)

For Swift code in the `libs/lume` directory:

- Follow the [Swift API Design Guidelines](https://www.swift.org/documentation/api-design-guidelines/)
- Use SwiftFormat for consistent formatting
- Code will be automatically formatted on save when using the lume workspace

## Releasing Packages

Cua uses an automated GitHub Actions workflow to bump package versions.

> **Note:** The main branch is currently not protected. If branch protection is enabled in the future, the github-actions bot must be added to the bypass list for these workflows to commit directly.

### Version Bump Workflow

All packages are managed through a single consolidated workflow: [Bump Version](https://github.com/trycua/cua/actions/workflows/bump-version.yml)

**Supported packages:**

- cua-agent
- cua-computer
- cua-computer-server
- cua-core
- cua-mcp-server
- cua-som
- pylume

**How to use:**

1. Navigate to the [Bump Version workflow](https://github.com/trycua/cua/actions/workflows/bump-version.yml)
2. Click the "Run workflow" button in the GitHub UI
3. Select the **service/package** you want to bump from the first dropdown
4. Select the **bump type** (patch/minor/major) from the second dropdown
5. Click "Run workflow" to start the version bump
6. The workflow will automatically commit changes and push to main

## Releasing a New CLI Version

To release a new version of the CUA CLI, follow these steps:

### 1. Update the Version

1. Update the version in `libs/typescript/cua-cli/package.json`
2. Commit the version change with a message like "Bump version to x.y.z"
3. Push the changes to the main branch

### 2. Trigger the Release Workflow

1. Go to the GitHub Actions tab in the repository
2. Select the "Publish @trycua/cli" workflow
3. Click "Run workflow"
4. Optionally, specify a version (e.g., "1.2.3") or leave empty to use the version from package.json
5. Click "Run workflow"

The workflow will:

- Build single-file executables for all supported platforms
- Publish the package to npm
- Create a GitHub release with the version tag (format: `cua-vX.Y.Z`)
- Attach all platform-specific binaries to the release

### 3. Verify the Release

1. Check the GitHub Releases page to ensure the new version is published
2. Verify the npm package was published to the registry
3. Test installation on different platforms:

   ```bash
   # Test Linux/macOS installation
   curl -fsSL https://cua.ai/install.sh | sh

   # Test Windows installation (PowerShell)
   irm https://cua.ai/install.ps1 | iex
   ```

### 4. Update Documentation

Update any relevant documentation with the new version number, including:

- Example code in documentation
- Any version-specific instructions
- Compatibility matrices

### 5. Announce the Release

- Create a new GitHub release with release notes
- Update the changelog if maintained separately
- Announce in relevant channels (Slack, Discord, etc.)

---

### Rolling Back a Version Bump

If you need to revert a version bump, follow these steps:

**Step 1: Find the version bump commit**

```bash
# List recent commits
git log --oneline | grep "Bump"

# Example output:
# a1b2c3d Bump cua-core to v0.1.9
```

**Step 2: Revert the commit**

```bash
# Revert the specific commit
git revert <commit-hash>

# Example:
# git revert a1b2c3d
```

**Step 3: Delete the git tag**

```bash
# List tags to find the version tag
git tag -l

# Delete the tag locally (use the correct package-specific format)
git tag -d core-v0.1.9

# Delete the tag remotely
git push origin :refs/tags/core-v0.1.9
```

**Step 4: Push the revert**

```bash
git push origin main
```

**Per-package tag patterns:**

Each package uses its own tag format defined in `.bumpversion.cfg`:

- **cua-core**: `core-v{version}` (e.g., `core-v0.1.9`)
- **cua-computer**: `computer-v{version}` (e.g., `computer-v0.4.7`)
- **cua-agent**: `agent-v{version}` (e.g., `agent-v0.4.35`)
- **cua-som**: `som-v{version}` (e.g., `som-v0.1.3`)
- **pylume**: `pylume-v{version}` (e.g., `pylume-v0.2.1`)
- **cua-computer-server**: `computer-server-v{version}` (e.g., `computer-server-v0.1.27`)
- **cua-mcp-server**: `mcp-server-v{version}` (e.g., `mcp-server-v0.1.14`)

### Local Testing (Advanced)

The Makefile targets are kept for local testing only:

```bash
# Test version bump locally (dry run)
make dry-run-patch-core

# View current versions
make show-versions
```

**Note:** For production releases, always use the GitHub Actions workflows above instead of running Makefile commands directly.

```

--------------------------------------------------------------------------------
/blog/bringing-computer-use-to-the-web.md:
--------------------------------------------------------------------------------

```markdown
# Bringing Computer-Use to the Web

_Published on August 5, 2025 by Morgan Dean_

In one of our original posts, we explored building Computer-Use Operators on macOS - first with a [manual implementation](build-your-own-operator-on-macos-1.md) using OpenAI's `computer-use-preview` model, then with our [cua-agent framework](build-your-own-operator-on-macos-2.md) for Python developers. While these tutorials have been incredibly popular, we've received consistent feedback from our community: **"Can we use Cua with JavaScript and TypeScript?"**

Today, we're excited to announce the release of the **`@trycua/computer` Web SDK** - a new library that allows you to control your Cua cloud containers from any JavaScript or TypeScript project. With this library, you can click, type, and grab screenshots from your cloud containers - no extra servers required.

With this new SDK, you can easily develop CUA experiences like the one below, which we will release soon as open source.

<div align="center">
  <video src="https://github.com/user-attachments/assets/e213d6c3-73b6-48dd-a7d9-ed761ed74f89" width="600" controls></video>
</div>

Let’s see how it works.

## What You'll Learn

By the end of this tutorial, you'll be able to:

- Set up the `@trycua/computer` npm library in any JavaScript/TypeScript project
- Connect OpenAI's computer-use model to Cua cloud containers from web applications
- Build computer-use agents that work in Node.js, React, Vue, or any web framework
- Handle different types of computer actions (clicking, typing, scrolling) from web code
- Implement the complete computer-use loop in JavaScript/TypeScript
- Integrate AI automation into existing web applications and workflows

**Prerequisites:**

- Node.js 16+ and npm/yarn/pnpm
- Basic JavaScript or TypeScript knowledge
- OpenAI API access (Tier 3+ for computer-use-preview)
- Cua cloud container credits ([get started here](https://cua.ai/pricing))

**Estimated Time:** 45-60 minutes

## Access Requirements

### OpenAI Model Availability

At the time of writing, the **computer-use-preview** model has limited availability:

- Only accessible to OpenAI tier 3+ users
- Additional application process may be required even for eligible users
- Cannot be used in the OpenAI Playground
- Outside of ChatGPT Operator, usage is restricted to the new **Responses API**

Luckily, the `@trycua/computer` library can be used in conjunction with other models, like [Anthropic’s Computer Use](https://docs.anthropic.com/en/docs/agents-and-tools/tool-use/computer-use-tool) or [UI-TARS](https://huggingface.co/ByteDance-Seed/UI-TARS-1.5-7B). You’ll just have to write your own handler to parse the model output for interfacing with the container.

### Cua Cloud Containers

To follow this guide, you’ll need access to a Cua cloud container.

Getting access is simple: purchase credits from our [pricing page](https://cua.ai/pricing), then create and provision a new container instance from the [dashboard](https://cua.ai/dashboard/containers). With your container running, you'll be ready to leverage the web SDK and bring automation to your JavaScript or TypeScript applications.

## Understanding the Flow

### OpenAI API Overview

Let's start with the basics. In our case, we'll use OpenAI's API to communicate with their computer-use model.

Think of it like this:

1. We send the model a screenshot of our container and tell it what we want it to do
2. The model looks at the screenshot and decides what actions to take
3. It sends back instructions (like "click here" or "type this")
4. We execute those instructions in our container.

### Model Setup

Here's how we set up the computer-use model for web development:

```javascript
const res = await openai.responses.create({
  model: 'computer-use-preview',
  tools: [
    {
      type: 'computer_use_preview',
      display_width: 1024,
      display_height: 768,
      environment: 'linux', // we're using a linux container
    },
  ],
  input: [
    {
      role: 'user',
      content: [
        // what we want the ai to do
        { type: 'input_text', text: 'Open firefox and go to cua.ai' },
        // first screenshot of the vm
        {
          type: 'input_image',
          image_url: `data:image/png;base64,${screenshotBase64}`,
          detail: 'auto',
        },
      ],
    },
  ],
  truncation: 'auto',
});
```

### Understanding the Response

When we send a request, the API sends back a response that looks like this:

```json
"output": [
    {
        "type": "reasoning",           // The AI explains what it's thinking
        "id": "rs_67cc...",
        "summary": [
            {
                "type": "summary_text",
                "text": "Clicking on the browser address bar."
            }
        ]
    },
    {
        "type": "computer_call",       // The actual action to perform
        "id": "cu_67cc...",
        "call_id": "call_zw3...",      // Used to track previous calls
        "action": {
            "type": "click",           // What kind of action (click, type, etc.)
            "button": "left",          // Which mouse button to use
            "x": 156,                  // Where to click (coordinates)
            "y": 50
        },
        "pending_safety_checks": [],   // Any safety warnings to consider
        "status": "completed"          // Whether the action was successful
    }
]

```

Each response contains:

1. **Reasoning**: The AI's explanation of what it's doing
2. **Action**: The specific computer action to perform
3. **Safety Checks**: Any potential risks to review
4. **Status**: Whether everything worked as planned

## Implementation Guide

### Provision a Cua Cloud Container

1. Visit [cua.ai](https://cua.ai), sign up, purchase [credits](https://cua.ai/pricing), and create a new container instance from the [dashboard](https://cua.ai/dashboard).
2. Create an API key from the dashboard — be sure to save it in a secure location before continuing.
3. Start the cloud container from the dashboard.

### Environment Setup

1. Install required packages with your preferred package manager:

   ```bash
   npm install --save @trycua/computer # or yarn, pnpm, bun
   npm install --save openai # or yarn, pnpm, bun
   ```

   Works with any JavaScript/TypeScript project setup - whether you're using Create React App, Next.js, Vue, Angular, or plain JavaScript.

2. Save your OpenAI API key, Cua API key, and container name to a `.env` file:

   ```bash
   OPENAI_API_KEY=openai-api-key
   CUA_API_KEY=cua-api-key
   CUA_CONTAINER_NAME=cua-cloud-container-name
   ```

   These environment variables work the same whether you're using vanilla JavaScript, TypeScript, or any web framework.

## Building the Agent

### Mapping API Actions to `@trycua/computer` Interface Methods

This helper function handles a `computer_call` action from the OpenAI API — converting the action into an equivalent action from the `@trycua/computer` interface. These actions will execute on the initialized `Computer` instance. For example, `await computer.interface.leftClick()` sends a mouse left click to the current cursor position.

Whether you're using JavaScript or TypeScript, the interface remains the same:

```javascript
export async function executeAction(
  computer: Computer,
  action: OpenAI.Responses.ResponseComputerToolCall['action']
) {
  switch (action.type) {
    case 'click':
      const { x, y, button } = action;
      console.log(`Executing click at (${x}, ${y}) with button '${button}'.`);
      await computer.interface.moveCursor(x, y);
      if (button === 'right') await computer.interface.rightClick();
      else await computer.interface.leftClick();
      break;
    case 'type':
      const { text } = action;
      console.log(`Typing text: ${text}`);
      await computer.interface.typeText(text);
      break;
    case 'scroll':
      const { x: locX, y: locY, scroll_x, scroll_y } = action;
      console.log(
        `Scrolling at (${locX}, ${locY}) with offsets (scroll_x=${scroll_x}, scroll_y=${scroll_y}).`
      );
      await computer.interface.moveCursor(locX, locY);
      await computer.interface.scroll(scroll_x, scroll_y);
      break;
    case 'keypress':
      const { keys } = action;
      for (const key of keys) {
        console.log(`Pressing key: ${key}.`);
        // Map common key names to CUA equivalents
        if (key.toLowerCase() === 'enter') {
          await computer.interface.pressKey('return');
        } else if (key.toLowerCase() === 'space') {
          await computer.interface.pressKey('space');
        } else {
          await computer.interface.pressKey(key);
        }
      }
      break;
    case 'wait':
      console.log(`Waiting for 3 seconds.`);
      await new Promise((resolve) => setTimeout(resolve, 3 * 1000));
      break;
    case 'screenshot':
      console.log('Taking screenshot.');
      // This is handled automatically in the main loop, but we can take an extra one if requested
      const screenshot = await computer.interface.screenshot();
      return screenshot;
    default:
      console.log(`Unrecognized action: ${action.type}`);
      break;
  }
}
```

### Implementing the Computer-Use Loop

This section defines a loop that:

1. Initializes the `Computer` instance (connecting to a Linux cloud container).
2. Captures a screenshot of the current state.
3. Sends the screenshot (with a user prompt) to the OpenAI Responses API using the `computer-use-preview` model.
4. Processes the returned `computer_call` action and executes it using our helper function.
5. Captures an updated screenshot after the action.
6. Send the updated screenshot and loops until no more actions are returned.

```javascript
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

// Initialize the Computer Connection
const computer = new Computer({
  apiKey: process.env.CUA_API_KEY!,
  name: process.env.CUA_CONTAINER_NAME!,
  osType: OSType.LINUX,
});

await computer.run();
// Take the initial screenshot
const screenshot = await computer.interface.screenshot();
const screenshotBase64 = screenshot.toString('base64');

// Setup openai config for computer use
const computerUseConfig: OpenAI.Responses.ResponseCreateParamsNonStreaming = {
  model: 'computer-use-preview',
  tools: [
    {
      type: 'computer_use_preview',
      display_width: 1024,
      display_height: 768,
      environment: 'linux', // we're using a linux vm
    },
  ],
  truncation: 'auto',
};

// Send initial screenshot to the openai computer use model
let res = await openai.responses.create({
  ...computerUseConfig,
  input: [
    {
      role: 'user',
      content: [
        // what we want the ai to do
        { type: 'input_text', text: 'open firefox and go to cua.ai' },
        // current screenshot of the vm
        {
          type: 'input_image',
          image_url: `data:image/png;base64,${screenshotBase64}`,
          detail: 'auto',
        },
      ],
    },
  ],
});

// Loop until there are no more computer use actions.
while (true) {
  const computerCalls = res.output.filter((o) => o.type === 'computer_call');
  if (computerCalls.length < 1) {
    console.log('No more computer calls. Loop complete.');
    break;
  }
  // Get the first call
  const call = computerCalls[0];
  const action = call.action;
  console.log('Received action from OpenAI Responses API:', action);
  let ackChecks: OpenAI.Responses.ResponseComputerToolCall.PendingSafetyCheck[] =
    [];
  if (call.pending_safety_checks.length > 0) {
    console.log('Safety checks pending:', call.pending_safety_checks);
    // In a real implementation, you would want to get user confirmation here.
    ackChecks = call.pending_safety_checks;
  }

  // Execute the action in the container
  await executeAction(computer, action);
  // Wait for changes to process within the container (1sec)
  await new Promise((resolve) => setTimeout(resolve, 1000));

  // Capture new screenshot
  const newScreenshot = await computer.interface.screenshot();
  const newScreenshotBase64 = newScreenshot.toString('base64');

  // Screenshot back as computer_call_output

  res = await openai.responses.create({
    ...computerUseConfig,
    previous_response_id: res.id,
    input: [
      {
        type: 'computer_call_output',
        call_id: call.call_id,
        acknowledged_safety_checks: ackChecks,
        output: {
          type: 'computer_screenshot',
          image_url: `data:image/png;base64,${newScreenshotBase64}`,
        },
      },
    ],
  });
}
```

You can find the full example on [GitHub](https://github.com/trycua/cua/tree/main/examples/computer-example-ts).

## What's Next?

The `@trycua/computer` Web SDK opens up some interesting possibilities. You could build browser-based testing tools, create interactive demos for your products, or automate repetitive workflows directly from your web apps.

We're working on more examples and better documentation - if you build something cool with this SDK, we'd love to see it. Drop by our [Discord](https://discord.gg/cua-ai) and share what you're working on.

Happy automating on the web!

```

--------------------------------------------------------------------------------
/tests/test_mcp_server_session_management.py:
--------------------------------------------------------------------------------

```python
"""
Tests for MCP Server Session Management functionality.

This module tests the new concurrent session management and resource lifecycle features.
"""

import asyncio
import importlib.util
import sys
import time
import types
from pathlib import Path

import pytest


def _install_stub_module(
    name: str, module: types.ModuleType, registry: dict[str, types.ModuleType | None]
) -> None:
    registry[name] = sys.modules.get(name)
    sys.modules[name] = module


@pytest.fixture
def server_module():
    """Create a server module with stubbed dependencies for testing."""
    stubbed_modules: dict[str, types.ModuleType | None] = {}

    # Stub MCP Context primitives
    mcp_module = types.ModuleType("mcp")
    mcp_module.__path__ = []  # mark as package

    mcp_server_module = types.ModuleType("mcp.server")
    mcp_server_module.__path__ = []

    fastmcp_module = types.ModuleType("mcp.server.fastmcp")

    class _StubContext:
        async def yield_message(self, *args, **kwargs):
            return None

        async def yield_tool_call(self, *args, **kwargs):
            return None

        async def yield_tool_output(self, *args, **kwargs):
            return None

        def report_progress(self, *_args, **_kwargs):
            return None

        def info(self, *_args, **_kwargs):
            return None

        def error(self, *_args, **_kwargs):
            return None

    class _StubImage:
        def __init__(self, format: str, data: bytes):
            self.format = format
            self.data = data

    class _StubFastMCP:
        def __init__(self, name: str):
            self.name = name
            self._tools: dict[str, types.FunctionType] = {}

        def tool(self, *args, **kwargs):
            def decorator(func):
                self._tools[func.__name__] = func
                return func

            return decorator

        def run(self):
            return None

    fastmcp_module.Context = _StubContext
    fastmcp_module.FastMCP = _StubFastMCP
    fastmcp_module.Image = _StubImage

    _install_stub_module("mcp", mcp_module, stubbed_modules)
    _install_stub_module("mcp.server", mcp_server_module, stubbed_modules)
    _install_stub_module("mcp.server.fastmcp", fastmcp_module, stubbed_modules)

    # Stub Computer module
    computer_module = types.ModuleType("computer")

    class _StubInterface:
        async def screenshot(self) -> bytes:
            return b"test-screenshot-data"

    class _StubComputer:
        def __init__(self, *args, **kwargs):
            self.interface = _StubInterface()

        async def run(self):
            return None

    computer_module.Computer = _StubComputer

    _install_stub_module("computer", computer_module, stubbed_modules)

    # Stub agent module
    agent_module = types.ModuleType("agent")

    class _StubComputerAgent:
        def __init__(self, *args, **kwargs):
            pass

        async def run(self, *_args, **_kwargs):
            # Simulate agent execution with streaming
            yield {
                "output": [
                    {
                        "type": "message",
                        "role": "assistant",
                        "content": [{"type": "output_text", "text": "Task completed"}],
                    }
                ]
            }

    agent_module.ComputerAgent = _StubComputerAgent

    _install_stub_module("agent", agent_module, stubbed_modules)

    # Stub session manager module
    session_manager_module = types.ModuleType("mcp_server.session_manager")

    class _StubSessionInfo:
        def __init__(self, session_id: str, computer, created_at: float, last_activity: float):
            self.session_id = session_id
            self.computer = computer
            self.created_at = created_at
            self.last_activity = last_activity
            self.active_tasks = set()
            self.is_shutting_down = False

    class _StubSessionManager:
        def __init__(self):
            self._sessions = {}
            self._session_lock = asyncio.Lock()

        async def get_session(self, session_id=None):
            """Context manager that returns a session."""
            if session_id is None:
                session_id = "test-session-123"

            async with self._session_lock:
                if session_id not in self._sessions:
                    computer = _StubComputer()
                    session = _StubSessionInfo(
                        session_id=session_id,
                        computer=computer,
                        created_at=time.time(),
                        last_activity=time.time(),
                    )
                    self._sessions[session_id] = session

                return self._sessions[session_id]

        async def register_task(self, session_id: str, task_id: str):
            pass

        async def unregister_task(self, session_id: str, task_id: str):
            pass

        async def cleanup_session(self, session_id: str):
            async with self._session_lock:
                self._sessions.pop(session_id, None)

        def get_session_stats(self):
            return {
                "total_sessions": len(self._sessions),
                "max_concurrent": 10,
                "sessions": {sid: {"active_tasks": 0} for sid in self._sessions},
            }

    _stub_session_manager = _StubSessionManager()

    def get_session_manager():
        return _stub_session_manager

    async def initialize_session_manager():
        return _stub_session_manager

    async def shutdown_session_manager():
        pass

    session_manager_module.get_session_manager = get_session_manager
    session_manager_module.initialize_session_manager = initialize_session_manager
    session_manager_module.shutdown_session_manager = shutdown_session_manager

    _install_stub_module("mcp_server.session_manager", session_manager_module, stubbed_modules)

    # Load the actual server module
    module_name = "mcp_server_server_under_test"
    module_path = Path("libs/python/mcp-server/mcp_server/server.py").resolve()
    spec = importlib.util.spec_from_file_location(module_name, module_path)
    server_module = importlib.util.module_from_spec(spec)
    assert spec and spec.loader
    spec.loader.exec_module(server_module)

    server_instance = getattr(server_module, "server", None)
    if server_instance is not None and hasattr(server_instance, "_tools"):
        for name, func in server_instance._tools.items():
            setattr(server_module, name, func)

    try:
        yield server_module
    finally:
        sys.modules.pop(module_name, None)
        for name, original in stubbed_modules.items():
            if original is None:
                sys.modules.pop(name, None)
            else:
                sys.modules[name] = original


class FakeContext:
    """Fake context for testing."""

    def __init__(self) -> None:
        self.events: list[tuple] = []
        self.progress_updates: list[float] = []

    def info(self, message: str) -> None:
        self.events.append(("info", message))

    def error(self, message: str) -> None:
        self.events.append(("error", message))

    def report_progress(self, value: float) -> None:
        self.progress_updates.append(value)

    async def yield_message(self, *, role: str, content):
        timestamp = asyncio.get_running_loop().time()
        self.events.append(("message", role, content, timestamp))

    async def yield_tool_call(self, *, name: str | None, call_id: str, input):
        timestamp = asyncio.get_running_loop().time()
        self.events.append(("tool_call", name, call_id, input, timestamp))

    async def yield_tool_output(self, *, call_id: str, output, is_error: bool = False):
        timestamp = asyncio.get_running_loop().time()
        self.events.append(("tool_output", call_id, output, is_error, timestamp))


def test_screenshot_cua_with_session_id(server_module):
    """Test that screenshot_cua works with session management."""

    async def _run_test():
        ctx = FakeContext()
        result = await server_module.screenshot_cua(ctx, session_id="test-session")

        assert result.format == "png"
        assert result.data == b"test-screenshot-data"

    asyncio.run(_run_test())


def test_screenshot_cua_creates_new_session(server_module):
    """Test that screenshot_cua creates a new session when none provided."""

    async def _run_test():
        ctx = FakeContext()
        result = await server_module.screenshot_cua(ctx)

        assert result.format == "png"
        assert result.data == b"test-screenshot-data"

    asyncio.run(_run_test())


def test_run_cua_task_with_session_management(server_module):
    """Test that run_cua_task works with session management."""

    async def _run_test():
        ctx = FakeContext()
        task = "Test task"
        session_id = "test-session-456"

        text_result, image = await server_module.run_cua_task(ctx, task, session_id)

        assert "Task completed" in text_result
        assert image.format == "png"
        assert image.data == b"test-screenshot-data"

    asyncio.run(_run_test())


def test_run_multi_cua_tasks_sequential(server_module):
    """Test that run_multi_cua_tasks works sequentially."""

    async def _run_test():
        ctx = FakeContext()
        tasks = ["Task 1", "Task 2", "Task 3"]

        results = await server_module.run_multi_cua_tasks(ctx, tasks, concurrent=False)

        assert len(results) == 3
        for i, (text, image) in enumerate(results):
            assert "Task completed" in text
            assert image.format == "png"

    asyncio.run(_run_test())


def test_run_multi_cua_tasks_concurrent(server_module):
    """Test that run_multi_cua_tasks works concurrently."""

    async def _run_test():
        ctx = FakeContext()
        tasks = ["Task 1", "Task 2", "Task 3"]

        results = await server_module.run_multi_cua_tasks(ctx, tasks, concurrent=True)

        assert len(results) == 3
        for i, (text, image) in enumerate(results):
            assert "Task completed" in text
            assert image.format == "png"

    asyncio.run(_run_test())


def test_get_session_stats(server_module):
    """Test that get_session_stats returns proper statistics."""

    async def _run_test():
        ctx = FakeContext()
        stats = await server_module.get_session_stats()

        assert "total_sessions" in stats
        assert "max_concurrent" in stats
        assert "sessions" in stats

    asyncio.run(_run_test())


def test_cleanup_session(server_module):
    """Test that cleanup_session works properly."""

    async def _run_test():
        ctx = FakeContext()
        session_id = "test-cleanup-session"

        result = await server_module.cleanup_session(ctx, session_id)

        assert f"Session {session_id} cleanup initiated" in result

    asyncio.run(_run_test())


def test_concurrent_sessions_isolation(server_module):
    """Test that concurrent sessions are properly isolated."""

    async def _run_test():
        ctx = FakeContext()

        # Run multiple tasks with different session IDs concurrently
        task1 = asyncio.create_task(
            server_module.run_cua_task(ctx, "Task for session 1", "session-1")
        )
        task2 = asyncio.create_task(
            server_module.run_cua_task(ctx, "Task for session 2", "session-2")
        )

        results = await asyncio.gather(task1, task2)

        assert len(results) == 2
        for text, image in results:
            assert "Task completed" in text
            assert image.format == "png"

    asyncio.run(_run_test())


def test_session_reuse_with_same_id(server_module):
    """Test that sessions are reused when the same session ID is provided."""

    async def _run_test():
        ctx = FakeContext()
        session_id = "reuse-session"

        # First call
        result1 = await server_module.screenshot_cua(ctx, session_id)

        # Second call with same session ID
        result2 = await server_module.screenshot_cua(ctx, session_id)

        assert result1.format == result2.format
        assert result1.data == result2.data

    asyncio.run(_run_test())


def test_error_handling_with_session_management(server_module):
    """Test that errors are handled properly with session management."""

    async def _run_test():
        # Mock an agent that raises an exception
        class _FailingAgent:
            def __init__(self, *args, **kwargs):
                pass

            async def run(self, *_args, **_kwargs):
                raise RuntimeError("Simulated agent failure")

        # Replace the ComputerAgent with our failing one
        original_agent = server_module.ComputerAgent
        server_module.ComputerAgent = _FailingAgent

        try:
            ctx = FakeContext()
            task = "This will fail"

            text_result, image = await server_module.run_cua_task(ctx, task, "error-session")

            assert "Error during task execution" in text_result
            assert image.format == "png"

        finally:
            # Restore original agent
            server_module.ComputerAgent = original_agent

    asyncio.run(_run_test())

```

--------------------------------------------------------------------------------
/libs/python/computer/computer/providers/cloud/provider.py:
--------------------------------------------------------------------------------

```python
"""Cloud VM provider implementation using CUA Public API.

Implements the following public API endpoints:

- GET /v1/vms
- POST /v1/vms/:name/start
- POST /v1/vms/:name/stop
- POST /v1/vms/:name/restart
"""

import logging
from typing import Any, Dict, List, Optional

from ..base import BaseVMProvider, VMProviderType
from ..types import ListVMsResponse, MinimalVM

# Setup logging
logger = logging.getLogger(__name__)

import asyncio
import os
from urllib.parse import urlparse

import aiohttp

DEFAULT_API_BASE = os.getenv("CUA_API_BASE", "https://api.cua.ai")


class CloudProvider(BaseVMProvider):
    """Cloud VM Provider implementation."""

    def __init__(
        self,
        api_key: Optional[str] = None,
        verbose: bool = False,
        api_base: Optional[str] = None,
        **kwargs,
    ):
        """
        Args:
            api_key: API key for authentication (defaults to CUA_API_KEY environment variable)
            name: Name of the VM
            verbose: Enable verbose logging
        """
        # Fall back to environment variable if api_key not provided
        if api_key is None:
            api_key = os.getenv("CUA_API_KEY")
        assert (
            api_key
        ), "api_key required for CloudProvider (provide via parameter or CUA_API_KEY environment variable)"
        self.api_key = api_key
        self.verbose = verbose
        self.api_base = (api_base or DEFAULT_API_BASE).rstrip("/")
        # Host caching dictionary: {vm_name: host_string}
        self._host_cache: Dict[str, str] = {}

    @property
    def provider_type(self) -> VMProviderType:
        return VMProviderType.CLOUD

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        pass

    async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Get VM information by querying the VM status endpoint.

        - Build hostname via _get_host_for_vm(name) using cached host or fallback
        - Probe https://{hostname}:8443/status with a short timeout
        - If JSON contains a "status" field, return it; otherwise infer
        - Fallback to DNS resolve check to distinguish unknown vs not_found
        """
        hostname = await self._get_host_for_vm(name)

        # Try HTTPS probe to the computer-server status endpoint (8443)
        try:
            timeout = aiohttp.ClientTimeout(total=3)
            async with aiohttp.ClientSession(timeout=timeout) as session:
                url = f"https://{hostname}:8443/status"
                async with session.get(url, allow_redirects=False) as resp:
                    status_code = resp.status
                    vm_status: str
                    vm_os_type: Optional[str] = None
                    if status_code == 200:
                        try:
                            data = await resp.json(content_type=None)
                            vm_status = str(data.get("status", "ok"))
                            vm_os_type = str(data.get("os_type"))
                        except Exception:
                            vm_status = "unknown"
                    elif status_code < 500:
                        vm_status = "unknown"
                    else:
                        vm_status = "unknown"
                    return {
                        "name": name,
                        "status": "running" if vm_status == "ok" else vm_status,
                        "api_url": f"https://{hostname}:8443",
                        "os_type": vm_os_type,
                    }
        except Exception:
            return {"name": name, "status": "not_found", "api_url": f"https://{hostname}:8443"}

    async def list_vms(self) -> ListVMsResponse:
        url = f"{self.api_base}/v1/vms"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Accept": "application/json",
        }
        async with aiohttp.ClientSession() as session:
            async with session.get(url, headers=headers) as resp:
                if resp.status == 200:
                    try:
                        data = await resp.json(content_type=None)
                    except Exception:
                        text = await resp.text()
                        logger.error(f"Failed to parse list_vms JSON: {text}")
                        return []
                    if isinstance(data, list):
                        # Enrich with convenience URLs when possible.
                        enriched: List[Dict[str, Any]] = []
                        for item in data:
                            vm = dict(item) if isinstance(item, dict) else {}
                            name = vm.get("name")
                            password = vm.get("password")
                            api_host = vm.get("host")  # Read host from API response

                            if isinstance(name, str) and name:
                                # Use host from API if available, otherwise fallback to legacy format
                                if isinstance(api_host, str) and api_host:
                                    host = api_host
                                    # Cache the host for this VM
                                    self._host_cache[name] = host
                                else:
                                    # Legacy fallback
                                    host = f"{name}.containers.cloud.trycua.com"
                                    # Cache the legacy host
                                    self._host_cache[name] = host

                                # api_url: always set if missing
                                if not vm.get("api_url"):
                                    vm["api_url"] = f"https://{host}:8443"
                                # vnc_url: only when password available
                                if not vm.get("vnc_url") and isinstance(password, str) and password:
                                    vm["vnc_url"] = (
                                        f"https://{host}/vnc.html?autoconnect=true&password={password}"
                                    )
                            enriched.append(vm)
                        return enriched  # type: ignore[return-value]
                    logger.warning("Unexpected response for list_vms; expected list")
                    return []
                elif resp.status == 401:
                    logger.error("Unauthorized: invalid CUA API key for list_vms")
                    return []
                else:
                    text = await resp.text()
                    logger.error(f"list_vms failed: HTTP {resp.status} - {text}")
                    return []

    async def run_vm(
        self,
        name: str,
        image: Optional[str] = None,
        run_opts: Optional[Dict[str, Any]] = None,
        storage: Optional[str] = None,
    ) -> Dict[str, Any]:
        """Start a VM via public API. Returns a minimal status."""
        url = f"{self.api_base}/v1/vms/{name}/start"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Accept": "application/json",
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(url, headers=headers) as resp:
                if resp.status in (200, 201, 202, 204):
                    return {"name": name, "status": "starting"}
                elif resp.status == 404:
                    return {"name": name, "status": "not_found"}
                elif resp.status == 401:
                    return {"name": name, "status": "unauthorized"}
                else:
                    text = await resp.text()
                    return {"name": name, "status": "error", "message": text}

    async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Stop a VM via public API."""
        url = f"{self.api_base}/v1/vms/{name}/stop"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Accept": "application/json",
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(url, headers=headers) as resp:
                if resp.status in (200, 202):
                    # Spec says 202 with {"status":"stopping"}
                    body_status: Optional[str] = None
                    try:
                        data = await resp.json(content_type=None)
                        body_status = data.get("status") if isinstance(data, dict) else None
                    except Exception:
                        body_status = None
                    return {"name": name, "status": body_status or "stopping"}
                elif resp.status == 404:
                    return {"name": name, "status": "not_found"}
                elif resp.status == 401:
                    return {"name": name, "status": "unauthorized"}
                else:
                    text = await resp.text()
                    return {"name": name, "status": "error", "message": text}

    async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
        """Restart a VM via public API."""
        url = f"{self.api_base}/v1/vms/{name}/restart"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Accept": "application/json",
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(url, headers=headers) as resp:
                if resp.status in (200, 202):
                    # Spec says 202 with {"status":"restarting"}
                    body_status: Optional[str] = None
                    try:
                        data = await resp.json(content_type=None)
                        body_status = data.get("status") if isinstance(data, dict) else None
                    except Exception:
                        body_status = None
                    return {"name": name, "status": body_status or "restarting"}
                elif resp.status == 404:
                    return {"name": name, "status": "not_found"}
                elif resp.status == 401:
                    return {"name": name, "status": "unauthorized"}
                else:
                    text = await resp.text()
                    return {"name": name, "status": "error", "message": text}

    async def update_vm(
        self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None
    ) -> Dict[str, Any]:
        logger.warning("CloudProvider.update_vm is not implemented via public API")
        return {
            "name": name,
            "status": "unchanged",
            "message": "update_vm not supported by public API",
        }

    async def _get_host_for_vm(self, name: str) -> str:
        """
        Get the host for a VM, trying multiple approaches:
        1. Check cache first
        2. Try to refresh cache by calling list_vms
        3. Try .sandbox.cua.ai format
        4. Fallback to legacy .containers.cloud.trycua.com format

        Args:
            name: VM name

        Returns:
            Host string for the VM
        """
        # Check cache first
        if name in self._host_cache:
            return self._host_cache[name]

        # Try to refresh cache by calling list_vms
        try:
            await self.list_vms()
            # Check cache again after refresh
            if name in self._host_cache:
                return self._host_cache[name]
        except Exception as e:
            logger.warning(f"Failed to refresh VM list for host lookup: {e}")

        # Try .sandbox.cua.ai format first
        sandbox_host = f"{name}.sandbox.cua.ai"
        if await self._test_host_connectivity(sandbox_host):
            self._host_cache[name] = sandbox_host
            return sandbox_host

        # Fallback to legacy format
        legacy_host = f"{name}.containers.cloud.trycua.com"
        # Cache the legacy host
        self._host_cache[name] = legacy_host
        return legacy_host

    async def _test_host_connectivity(self, hostname: str) -> bool:
        """
        Test if a host is reachable by trying to connect to its status endpoint.

        Args:
            hostname: Host to test

        Returns:
            True if host is reachable, False otherwise
        """
        try:
            timeout = aiohttp.ClientTimeout(total=2)  # Short timeout for connectivity test
            async with aiohttp.ClientSession(timeout=timeout) as session:
                url = f"https://{hostname}:8443/status"
                async with session.get(url, allow_redirects=False) as resp:
                    # Any response (even error) means the host is reachable
                    return True
        except Exception:
            return False

    async def get_ip(
        self, name: Optional[str] = None, storage: Optional[str] = None, retry_delay: int = 2
    ) -> str:
        """
        Return the VM's host address, trying to use cached host from API or falling back to legacy format.
        Uses the provided 'name' argument (the VM name requested by the caller).
        """
        if name is None:
            raise ValueError("VM name is required for CloudProvider.get_ip")

        return await self._get_host_for_vm(name)

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/utils/wallpaper.py:
--------------------------------------------------------------------------------

```python
"""Set the desktop wallpaper."""

import os
import subprocess
import sys
from pathlib import Path


def get_desktop_environment() -> str:
    """
    Returns the name of the current desktop environment.
    """
    # From https://stackoverflow.com/a/21213358/2624876
    # which takes from:
    # http://stackoverflow.com/questions/2035657/what-is-my-current-desktop-environment
    # and http://ubuntuforums.org/showthread.php?t=652320
    # and http://ubuntuforums.org/showthread.php?t=1139057
    if sys.platform in ["win32", "cygwin"]:
        return "windows"
    elif sys.platform == "darwin":
        return "mac"
    else:  # Most likely either a POSIX system or something not much common
        desktop_session = os.environ.get("DESKTOP_SESSION")
        if (
            desktop_session is not None
        ):  # easier to match if we doesn't have to deal with character cases
            desktop_session = desktop_session.lower()
            if desktop_session in [
                "gnome",
                "unity",
                "cinnamon",
                "mate",
                "xfce4",
                "lxde",
                "fluxbox",
                "blackbox",
                "openbox",
                "icewm",
                "jwm",
                "afterstep",
                "trinity",
                "kde",
            ]:
                return desktop_session
            ## Special cases ##
            # Canonical sets $DESKTOP_SESSION to Lubuntu rather than LXDE if using LXDE.
            # There is no guarantee that they will not do the same with the other desktop environments.
            elif "xfce" in desktop_session or desktop_session.startswith("xubuntu"):
                return "xfce4"
            elif desktop_session.startswith("ubuntustudio"):
                return "kde"
            elif desktop_session.startswith("ubuntu"):
                return "gnome"
            elif desktop_session.startswith("lubuntu"):
                return "lxde"
            elif desktop_session.startswith("kubuntu"):
                return "kde"
            elif desktop_session.startswith("razor"):  # e.g. razorkwin
                return "razor-qt"
            elif desktop_session.startswith("wmaker"):  # e.g. wmaker-common
                return "windowmaker"
        gnome_desktop_session_id = os.environ.get("GNOME_DESKTOP_SESSION_ID")
        if os.environ.get("KDE_FULL_SESSION") == "true":
            return "kde"
        elif gnome_desktop_session_id:
            if "deprecated" not in gnome_desktop_session_id:
                return "gnome2"
        # From http://ubuntuforums.org/showthread.php?t=652320
        elif is_running("xfce-mcs-manage"):
            return "xfce4"
        elif is_running("ksmserver"):
            return "kde"
    return "unknown"


def is_running(process: str) -> bool:
    """Returns whether a process with the given name is (likely) currently running.

    Uses a basic text search, and so may have false positives.
    """
    # From http://www.bloggerpolis.com/2011/05/how-to-check-if-a-process-is-running-using-python/
    # and http://richarddingwall.name/2009/06/18/windows-equivalents-of-ps-and-kill-commands/
    try:  # Linux/Unix
        s = subprocess.Popen(["ps", "axw"], stdout=subprocess.PIPE)
    except:  # Windows
        s = subprocess.Popen(["tasklist", "/v"], stdout=subprocess.PIPE)
    assert s.stdout is not None
    for x in s.stdout:
        # if re.search(process, x):
        if process in str(x):
            return True
    return False


def set_wallpaper(file_loc: str, first_run: bool = True):
    """Sets the wallpaper to the given file location."""
    # From https://stackoverflow.com/a/21213504/2624876
    # I have not personally tested most of this. -- @1j01
    # -----------------------------------------

    # Note: There are two common Linux desktop environments where
    # I have not been able to set the desktop background from
    # command line: KDE, Enlightenment
    desktop_env = get_desktop_environment()
    if desktop_env in ["gnome", "unity", "cinnamon"]:
        # Tested on Ubuntu 22 -- @1j01
        uri = Path(file_loc).as_uri()
        SCHEMA = "org.gnome.desktop.background"
        KEY = "picture-uri"
        # Needed for Ubuntu 22 in dark mode
        # Might be better to set only one or the other, depending on the current theme
        # In the settings it will say "This background selection only applies to the dark style"
        # even if it's set for both, arguably referring to the selection that you can make on that page.
        # -- @1j01
        KEY_DARK = "picture-uri-dark"
        try:
            from gi.repository import Gio  # type: ignore

            gsettings = Gio.Settings.new(SCHEMA)  # type: ignore
            gsettings.set_string(KEY, uri)
            gsettings.set_string(KEY_DARK, uri)
        except Exception:
            # Fallback tested on Ubuntu 22 -- @1j01
            args = ["gsettings", "set", SCHEMA, KEY, uri]
            subprocess.Popen(args)
            args = ["gsettings", "set", SCHEMA, KEY_DARK, uri]
            subprocess.Popen(args)
    elif desktop_env == "mate":
        try:  # MATE >= 1.6
            # info from http://wiki.mate-desktop.org/docs:gsettings
            args = ["gsettings", "set", "org.mate.background", "picture-filename", file_loc]
            subprocess.Popen(args)
        except Exception:  # MATE < 1.6
            # From https://bugs.launchpad.net/variety/+bug/1033918
            args = [
                "mateconftool-2",
                "-t",
                "string",
                "--set",
                "/desktop/mate/background/picture_filename",
                file_loc,
            ]
            subprocess.Popen(args)
    elif desktop_env == "gnome2":  # Not tested
        # From https://bugs.launchpad.net/variety/+bug/1033918
        args = [
            "gconftool-2",
            "-t",
            "string",
            "--set",
            "/desktop/gnome/background/picture_filename",
            file_loc,
        ]
        subprocess.Popen(args)
    ## KDE4 is difficult
    ## see http://blog.zx2c4.com/699 for a solution that might work
    elif desktop_env in ["kde3", "trinity"]:
        # From http://ubuntuforums.org/archive/index.php/t-803417.html
        args = ["dcop", "kdesktop", "KBackgroundIface", "setWallpaper", "0", file_loc, "6"]
        subprocess.Popen(args)
    elif desktop_env == "xfce4":
        # Iterate over all wallpaper-related keys and set to file_loc
        try:
            list_proc = subprocess.run(
                ["xfconf-query", "-c", "xfce4-desktop", "-l"],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
                check=False,
            )
            keys = []
            if list_proc.stdout:
                for line in list_proc.stdout.splitlines():
                    line = line.strip()
                    if not line:
                        continue
                    # Common keys: .../last-image and .../image-path
                    if "/last-image" in line or "/image-path" in line:
                        keys.append(line)
            # Fallback: known defaults if none were listed
            if not keys:
                keys = [
                    "/backdrop/screen0/monitorVNC-0/workspace0/last-image",
                    "/backdrop/screen0/monitor0/image-path",
                ]
            for key in keys:
                subprocess.run(
                    [
                        "xfconf-query",
                        "-c",
                        "xfce4-desktop",
                        "-p",
                        key,
                        "-s",
                        file_loc,
                    ],
                    check=False,
                )
        except Exception:
            pass
        # Reload xfdesktop to apply changes
        subprocess.Popen(["xfdesktop", "--reload"])
    elif desktop_env == "razor-qt":  # TODO: implement reload of desktop when possible
        if first_run:
            import configparser

            desktop_conf = configparser.ConfigParser()
            # Development version
            desktop_conf_file = os.path.join(get_config_dir("razor"), "desktop.conf")
            if os.path.isfile(desktop_conf_file):
                config_option = R"screens\1\desktops\1\wallpaper"
            else:
                desktop_conf_file = os.path.join(get_home_dir(), ".razor/desktop.conf")
                config_option = R"desktops\1\wallpaper"
            desktop_conf.read(os.path.join(desktop_conf_file))
            try:
                if desktop_conf.has_option("razor", config_option):  # only replacing a value
                    desktop_conf.set("razor", config_option, file_loc)
                    with open(desktop_conf_file, "w", encoding="utf-8", errors="replace") as f:
                        desktop_conf.write(f)
            except Exception:
                pass
        else:
            # TODO: reload desktop when possible
            pass
    elif desktop_env in ["fluxbox", "jwm", "openbox", "afterstep"]:
        # http://fluxbox-wiki.org/index.php/Howto_set_the_background
        # used fbsetbg on jwm too since I am too lazy to edit the XML configuration
        # now where fbsetbg does the job excellent anyway.
        # and I have not figured out how else it can be set on Openbox and AfterSTep
        # but fbsetbg works excellent here too.
        try:
            args = ["fbsetbg", file_loc]
            subprocess.Popen(args)
        except Exception:
            sys.stderr.write("ERROR: Failed to set wallpaper with fbsetbg!\n")
            sys.stderr.write("Please make sre that You have fbsetbg installed.\n")
    elif desktop_env == "icewm":
        # command found at http://urukrama.wordpress.com/2007/12/05/desktop-backgrounds-in-window-managers/
        args = ["icewmbg", file_loc]
        subprocess.Popen(args)
    elif desktop_env == "blackbox":
        # command found at http://blackboxwm.sourceforge.net/BlackboxDocumentation/BlackboxBackground
        args = ["bsetbg", "-full", file_loc]
        subprocess.Popen(args)
    elif desktop_env == "lxde":
        args = ["pcmanfm", "--set-wallpaper", file_loc, "--wallpaper-mode=scaled"]
        subprocess.Popen(args)
    elif desktop_env == "windowmaker":
        # From http://www.commandlinefu.com/commands/view/3857/set-wallpaper-on-windowmaker-in-one-line
        args = ["wmsetbg", "-s", "-u", file_loc]
        subprocess.Popen(args)
    # elif desktop_env == "enlightenment": # I have not been able to make it work on e17. On e16 it would have been something in this direction
    #     args = ["enlightenment_remote", "-desktop-bg-add", "0", "0", "0", "0", file_loc]
    #     subprocess.Popen(args)
    elif desktop_env == "windows":
        # From https://stackoverflow.com/questions/1977694/change-desktop-background
        # Tested on Windows 10. -- @1j01
        import ctypes

        SPI_SETDESKWALLPAPER = 20
        ctypes.windll.user32.SystemParametersInfoW(SPI_SETDESKWALLPAPER, 0, file_loc, 0)  # type: ignore
    elif desktop_env == "mac":
        # From https://stackoverflow.com/questions/431205/how-can-i-programatically-change-the-background-in-mac-os-x
        try:
            # Tested on macOS 10.14.6 (Mojave) -- @1j01
            assert (
                sys.platform == "darwin"
            )  # ignore `Import "appscript" could not be resolved` for other platforms
            from appscript import app, mactypes

            app("Finder").desktop_picture.set(mactypes.File(file_loc))
        except ImportError:
            # Tested on macOS 10.14.6 (Mojave) -- @1j01
            # import subprocess
            # SCRIPT = f"""/usr/bin/osascript<<END
            # tell application "Finder" to set desktop picture to POSIX file "{file_loc}"
            # END"""
            # subprocess.Popen(SCRIPT, shell=True)

            # Safer version, avoiding string interpolation,
            # to protect against command injection (both in the shell and in AppleScript):
            OSASCRIPT = """
            on run (clp)
                if clp's length is not 1 then error "Incorrect Parameters"
                local file_loc
                set file_loc to clp's item 1
                tell application "Finder" to set desktop picture to POSIX file file_loc
            end run
            """
            subprocess.Popen(["osascript", "-e", OSASCRIPT, "--", file_loc])
    else:
        if first_run:  # don't spam the user with the same message over and over again
            sys.stderr.write(
                "Warning: Failed to set wallpaper. Your desktop environment is not supported."
            )
            sys.stderr.write(f"You can try manually to set your wallpaper to {file_loc}")
        return False
    return True


def get_config_dir(app_name: str) -> str:
    """Returns the configuration directory for the given application name."""
    if "XDG_CONFIG_HOME" in os.environ:
        config_home = os.environ["XDG_CONFIG_HOME"]
    elif "APPDATA" in os.environ:  # On Windows
        config_home = os.environ["APPDATA"]
    else:
        try:
            from xdg import BaseDirectory

            config_home = BaseDirectory.xdg_config_home
        except ImportError:  # Most likely a Linux/Unix system anyway
            config_home = os.path.join(get_home_dir(), ".config")
    config_dir = os.path.join(config_home, app_name)
    return config_dir


def get_home_dir() -> str:
    """Returns the home directory of the current user."""
    return os.path.expanduser("~")

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/watchdog.py:
--------------------------------------------------------------------------------

```python
"""
Watchdog module for monitoring the Computer API server health.
Unix/Linux only - provides process management and restart capabilities.
"""

import asyncio
import fcntl
import json
import logging
import os
import platform
import subprocess
import sys
import time
from typing import Optional

import websockets

logger = logging.getLogger(__name__)


def instance_already_running(label="watchdog"):
    """
    Detect if an an instance with the label is already running, globally
    at the operating system level.

    Using `os.open` ensures that the file pointer won't be closed
    by Python's garbage collector after the function's scope is exited.

    The lock will be released when the program exits, or could be
    released if the file pointer were closed.
    """

    lock_file_pointer = os.open(f"/tmp/instance_{label}.lock", os.O_WRONLY | os.O_CREAT)

    try:
        fcntl.lockf(lock_file_pointer, fcntl.LOCK_EX | fcntl.LOCK_NB)
        already_running = False
    except IOError:
        already_running = True

    return already_running


class Watchdog:
    """Watchdog class to monitor server health via WebSocket connection.
    Unix/Linux only - provides restart capabilities.
    """

    def __init__(self, cli_args: Optional[dict] = None, ping_interval: int = 30):
        """
        Initialize the watchdog.

        Args:
            cli_args: Dictionary of CLI arguments to replicate when restarting
            ping_interval: Interval between ping checks in seconds
        """
        # Check if running on Unix/Linux
        if platform.system() not in ["Linux", "Darwin"]:
            raise RuntimeError("Watchdog is only supported on Unix/Linux systems")

        # Store CLI arguments for restart
        self.cli_args = cli_args or {}
        self.host = self.cli_args.get("host", "localhost")
        self.port = self.cli_args.get("port", 8000)
        self.ping_interval = ping_interval
        self.container_name = os.environ.get("CONTAINER_NAME")
        self.running = False
        self.restart_enabled = True

    @property
    def ws_uri(self) -> str:
        """Get the WebSocket URI using the current IP address.

        Returns:
            WebSocket URI for the Computer API Server
        """
        if not self.container_name:
            return "ws://localhost:8000/ws"

        # Try .sandbox.cua.ai first, fallback to .containers.cloud.trycua.com
        return f"wss://{self.container_name}.sandbox.cua.ai:8443/ws"

    @property
    def ws_uri_fallback(self) -> str:
        """Get the fallback WebSocket URI using legacy hostname.

        Returns:
            Fallback WebSocket URI for the Computer API Server
        """
        if not self.container_name:
            return "ws://localhost:8000/ws"

        return f"wss://{self.container_name}.containers.cloud.trycua.com:8443/ws"

    async def ping(self) -> bool:
        """
        Test connection to the WebSocket endpoint.

        Returns:
            True if connection successful, False otherwise
        """
        # Create a simple ping message
        ping_message = {"command": "get_screen_size", "params": {}}

        # Try primary URI first (.sandbox.cua.ai)
        try:
            async with websockets.connect(
                self.ws_uri, max_size=1024 * 1024 * 10  # 10MB limit to match server
            ) as websocket:
                # Send ping message
                await websocket.send(json.dumps(ping_message))

                # Wait for any response or just close
                try:
                    response = await asyncio.wait_for(websocket.recv(), timeout=5)
                    logger.debug(f"Ping response received from primary URI: {response[:100]}...")
                    return True
                except asyncio.TimeoutError:
                    return False
        except Exception as e:
            logger.debug(f"Primary URI ping failed: {e}")

            # Try fallback URI (.containers.cloud.trycua.com)
            if self.container_name:
                try:
                    async with websockets.connect(
                        self.ws_uri_fallback,
                        max_size=1024 * 1024 * 10,  # 10MB limit to match server
                    ) as websocket:
                        # Send ping message
                        await websocket.send(json.dumps(ping_message))

                        # Wait for any response or just close
                        try:
                            response = await asyncio.wait_for(websocket.recv(), timeout=5)
                            logger.debug(
                                f"Ping response received from fallback URI: {response[:100]}..."
                            )
                            return True
                        except asyncio.TimeoutError:
                            return False
                except Exception as fallback_e:
                    logger.warning(
                        f"Both primary and fallback ping failed. Primary: {e}, Fallback: {fallback_e}"
                    )
                    return False
            else:
                logger.warning(f"Ping failed: {e}")
                return False

    def kill_processes_on_port(self, port: int) -> bool:
        """
        Kill any processes using the specified port.

        Args:
            port: Port number to check and kill processes on

        Returns:
            True if processes were killed or none found, False on error
        """
        try:
            # Find processes using the port
            result = subprocess.run(
                ["lsof", "-ti", f":{port}"], capture_output=True, text=True, timeout=10
            )

            if result.returncode == 0 and result.stdout.strip():
                pids = result.stdout.strip().split("\n")
                logger.info(f"Found {len(pids)} processes using port {port}: {pids}")

                # Kill each process
                for pid in pids:
                    if pid.strip():
                        try:
                            subprocess.run(["kill", "-9", pid.strip()], timeout=5)
                            logger.info(f"Killed process {pid}")
                        except subprocess.TimeoutExpired:
                            logger.warning(f"Timeout killing process {pid}")
                        except Exception as e:
                            logger.warning(f"Error killing process {pid}: {e}")

                return True
            else:
                logger.debug(f"No processes found using port {port}")
                return True

        except subprocess.TimeoutExpired:
            logger.error(f"Timeout finding processes on port {port}")
            return False
        except Exception as e:
            logger.error(f"Error finding processes on port {port}: {e}")
            return False

    def restart_server(self) -> bool:
        """
        Attempt to restart the server by killing existing processes and starting new one.

        Returns:
            True if restart was attempted, False on error
        """
        if not self.restart_enabled:
            logger.info("Server restart is disabled")
            return False

        try:
            logger.info("Attempting to restart server...")

            # Kill processes on the port
            port_to_kill = 8443 if self.container_name else self.port
            if not self.kill_processes_on_port(port_to_kill):
                logger.error("Failed to kill processes on port, restart aborted")
                return False

            # Wait a moment for processes to die
            time.sleep(2)

            # Try to restart the server
            # In container mode, we can't easily restart, so just log
            if self.container_name:
                logger.warning("Container mode detected - cannot restart server automatically")
                logger.warning("Container orchestrator should handle restart")
                return False
            else:
                # For local mode, try to restart the CLI
                logger.info("Attempting to restart local server...")

                # Get the current Python executable and script
                python_exe = sys.executable

                # Try to find the CLI module
                try:
                    # Build command with all original CLI arguments
                    cmd = [python_exe, "-m", "computer_server.cli"]

                    # Add all CLI arguments except watchdog-related ones
                    for key, value in self.cli_args.items():
                        if key in ["watchdog", "watchdog_interval", "no_restart"]:
                            continue  # Skip watchdog args to avoid recursive watchdog

                        # Convert underscores to hyphens for CLI args
                        arg_name = f"--{key.replace('_', '-')}"

                        if isinstance(value, bool):
                            if value:  # Only add flag if True
                                cmd.append(arg_name)
                        else:
                            cmd.extend([arg_name, str(value)])

                    logger.info(f"Starting server with command: {' '.join(cmd)}")

                    # Start process in background
                    subprocess.Popen(
                        cmd,
                        stdout=subprocess.DEVNULL,
                        stderr=subprocess.DEVNULL,
                        start_new_session=True,
                    )

                    logger.info("Server restart initiated")
                    return True

                except Exception as e:
                    logger.error(f"Failed to restart server: {e}")
                    return False

        except Exception as e:
            logger.error(f"Error during server restart: {e}")
            return False

    async def start_monitoring(self) -> None:
        """Start the watchdog monitoring loop."""
        self.running = True
        logger.info(f"Starting watchdog monitoring for {self.ws_uri}")
        logger.info(f"Ping interval: {self.ping_interval} seconds")
        if self.container_name:
            logger.info(f"Container mode detected: {self.container_name}")

        consecutive_failures = 0
        max_failures = 3

        while self.running:
            try:
                success = await self.ping()

                if success:
                    if consecutive_failures > 0:
                        logger.info("Server connection restored")
                    consecutive_failures = 0
                    logger.debug("Ping successful")
                else:
                    consecutive_failures += 1
                    logger.warning(f"Ping failed ({consecutive_failures}/{max_failures})")

                    if consecutive_failures >= max_failures:
                        logger.error(
                            f"Server appears to be down after {max_failures} consecutive failures"
                        )

                        # Attempt to restart the server
                        if self.restart_enabled:
                            logger.info("Attempting automatic server restart...")
                            restart_success = self.restart_server()

                            if restart_success:
                                logger.info("Server restart initiated, waiting before next ping...")
                                # Wait longer after restart attempt
                                await asyncio.sleep(self.ping_interval * 2)
                                consecutive_failures = 0  # Reset counter after restart attempt
                            else:
                                logger.error("Server restart failed")
                        else:
                            logger.warning("Automatic restart is disabled")

                # Wait for next ping interval
                await asyncio.sleep(self.ping_interval)

            except asyncio.CancelledError:
                logger.info("Watchdog monitoring cancelled")
                break
            except Exception as e:
                logger.error(f"Unexpected error in watchdog loop: {e}")
                await asyncio.sleep(self.ping_interval)

    def stop_monitoring(self) -> None:
        """Stop the watchdog monitoring."""
        self.running = False
        logger.info("Stopping watchdog monitoring")


async def run_watchdog(cli_args: Optional[dict] = None, ping_interval: int = 30) -> None:
    """
    Run the watchdog monitoring.

    Args:
        cli_args: Dictionary of CLI arguments to replicate when restarting
        ping_interval: Interval between ping checks in seconds
    """
    watchdog = Watchdog(cli_args=cli_args, ping_interval=ping_interval)

    try:
        await watchdog.start_monitoring()
    except KeyboardInterrupt:
        logger.info("Watchdog stopped by user")
    finally:
        watchdog.stop_monitoring()


if __name__ == "__main__":
    # For testing the watchdog standalone
    import argparse

    parser = argparse.ArgumentParser(description="Run Computer API server watchdog")
    parser.add_argument("--host", default="localhost", help="Server host to monitor")
    parser.add_argument("--port", type=int, default=8000, help="Server port to monitor")
    parser.add_argument("--ping-interval", type=int, default=30, help="Ping interval in seconds")

    args = parser.parse_args()

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )

    cli_args = {"host": args.host, "port": args.port}
    asyncio.run(run_watchdog(cli_args, args.ping_interval))

```
Page 10/20FirstPrevNextLast