This is page 10 of 20. Use http://codebase.md/trycua/cua?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── scripts
│ │ ├── get_pyproject_version.py
│ │ └── tests
│ │ ├── __init__.py
│ │ ├── README.md
│ │ └── test_get_pyproject_version.py
│ └── workflows
│ ├── bump-version.yml
│ ├── ci-lume.yml
│ ├── docker-publish-cua-linux.yml
│ ├── docker-publish-cua-windows.yml
│ ├── docker-publish-kasm.yml
│ ├── docker-publish-xfce.yml
│ ├── docker-reusable-publish.yml
│ ├── link-check.yml
│ ├── lint.yml
│ ├── npm-publish-cli.yml
│ ├── npm-publish-computer.yml
│ ├── npm-publish-core.yml
│ ├── publish-lume.yml
│ ├── pypi-publish-agent.yml
│ ├── pypi-publish-computer-server.yml
│ ├── pypi-publish-computer.yml
│ ├── pypi-publish-core.yml
│ ├── pypi-publish-mcp-server.yml
│ ├── pypi-publish-som.yml
│ ├── pypi-reusable-publish.yml
│ ├── python-tests.yml
│ ├── test-cua-models.yml
│ └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│ ├── docs.code-workspace
│ ├── extensions.json
│ ├── launch.json
│ ├── libs-ts.code-workspace
│ ├── lume.code-workspace
│ ├── lumier.code-workspace
│ ├── py.code-workspace
│ └── settings.json
├── blog
│ ├── app-use.md
│ ├── assets
│ │ ├── composite-agents.png
│ │ ├── docker-ubuntu-support.png
│ │ ├── hack-booth.png
│ │ ├── hack-closing-ceremony.jpg
│ │ ├── hack-cua-ollama-hud.jpeg
│ │ ├── hack-leaderboard.png
│ │ ├── hack-the-north.png
│ │ ├── hack-winners.jpeg
│ │ ├── hack-workshop.jpeg
│ │ ├── hud-agent-evals.png
│ │ └── trajectory-viewer.jpeg
│ ├── bringing-computer-use-to-the-web.md
│ ├── build-your-own-operator-on-macos-1.md
│ ├── build-your-own-operator-on-macos-2.md
│ ├── cloud-windows-ga-macos-preview.md
│ ├── composite-agents.md
│ ├── computer-use-agents-for-growth-hacking.md
│ ├── cua-hackathon.md
│ ├── cua-playground-preview.md
│ ├── cua-vlm-router.md
│ ├── hack-the-north.md
│ ├── hud-agent-evals.md
│ ├── human-in-the-loop.md
│ ├── introducing-cua-cli.md
│ ├── introducing-cua-cloud-containers.md
│ ├── lume-to-containerization.md
│ ├── neurips-2025-cua-papers.md
│ ├── sandboxed-python-execution.md
│ ├── training-computer-use-models-trajectories-1.md
│ ├── trajectory-viewer.md
│ ├── ubuntu-docker-support.md
│ └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│ ├── .env.example
│ ├── .gitignore
│ ├── content
│ │ └── docs
│ │ ├── agent-sdk
│ │ │ ├── agent-loops.mdx
│ │ │ ├── benchmarks
│ │ │ │ ├── index.mdx
│ │ │ │ ├── interactive.mdx
│ │ │ │ ├── introduction.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── osworld-verified.mdx
│ │ │ │ ├── screenspot-pro.mdx
│ │ │ │ └── screenspot-v2.mdx
│ │ │ ├── callbacks
│ │ │ │ ├── agent-lifecycle.mdx
│ │ │ │ ├── cost-saving.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── logging.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── pii-anonymization.mdx
│ │ │ │ └── trajectories.mdx
│ │ │ ├── chat-history.mdx
│ │ │ ├── custom-tools.mdx
│ │ │ ├── customizing-computeragent.mdx
│ │ │ ├── integrations
│ │ │ │ ├── hud.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── observability.mdx
│ │ │ ├── mcp-server
│ │ │ │ ├── client-integrations.mdx
│ │ │ │ ├── configuration.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── llm-integrations.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── tools.mdx
│ │ │ │ └── usage.mdx
│ │ │ ├── message-format.mdx
│ │ │ ├── meta.json
│ │ │ ├── migration-guide.mdx
│ │ │ ├── prompt-caching.mdx
│ │ │ ├── supported-agents
│ │ │ │ ├── composed-agents.mdx
│ │ │ │ ├── computer-use-agents.mdx
│ │ │ │ ├── grounding-models.mdx
│ │ │ │ ├── human-in-the-loop.mdx
│ │ │ │ └── meta.json
│ │ │ ├── supported-model-providers
│ │ │ │ ├── cua-vlm-router.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ └── local-models.mdx
│ │ │ ├── telemetry.mdx
│ │ │ └── usage-tracking.mdx
│ │ ├── cli-playbook
│ │ │ ├── commands.mdx
│ │ │ ├── index.mdx
│ │ │ └── meta.json
│ │ ├── computer-sdk
│ │ │ ├── cloud-vm-management.mdx
│ │ │ ├── commands.mdx
│ │ │ ├── computer-server
│ │ │ │ ├── Commands.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── REST-API.mdx
│ │ │ │ └── WebSocket-API.mdx
│ │ │ ├── computer-ui.mdx
│ │ │ ├── computers.mdx
│ │ │ ├── custom-computer-handlers.mdx
│ │ │ ├── meta.json
│ │ │ ├── sandboxed-python.mdx
│ │ │ └── tracing-api.mdx
│ │ ├── example-usecases
│ │ │ ├── form-filling.mdx
│ │ │ ├── gemini-complex-ui-navigation.mdx
│ │ │ ├── meta.json
│ │ │ ├── post-event-contact-export.mdx
│ │ │ └── windows-app-behind-vpn.mdx
│ │ ├── get-started
│ │ │ ├── meta.json
│ │ │ └── quickstart.mdx
│ │ ├── index.mdx
│ │ ├── macos-vm-cli-playbook
│ │ │ ├── lume
│ │ │ │ ├── cli-reference.mdx
│ │ │ │ ├── faq.md
│ │ │ │ ├── http-api.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── prebuilt-images.mdx
│ │ │ ├── lumier
│ │ │ │ ├── building-lumier.mdx
│ │ │ │ ├── docker-compose.mdx
│ │ │ │ ├── docker.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ └── meta.json
│ │ │ └── meta.json
│ │ └── meta.json
│ ├── next.config.mjs
│ ├── package-lock.json
│ ├── package.json
│ ├── pnpm-lock.yaml
│ ├── postcss.config.mjs
│ ├── public
│ │ └── img
│ │ ├── agent_gradio_ui.png
│ │ ├── agent.png
│ │ ├── bg-dark.jpg
│ │ ├── bg-light.jpg
│ │ ├── cli.png
│ │ ├── computer.png
│ │ ├── grounding-with-gemini3.gif
│ │ ├── hero.png
│ │ ├── laminar_trace_example.png
│ │ ├── som_box_threshold.png
│ │ └── som_iou_threshold.png
│ ├── README.md
│ ├── source.config.ts
│ ├── src
│ │ ├── app
│ │ │ ├── (home)
│ │ │ │ ├── [[...slug]]
│ │ │ │ │ └── page.tsx
│ │ │ │ └── layout.tsx
│ │ │ ├── api
│ │ │ │ ├── posthog
│ │ │ │ │ └── [...path]
│ │ │ │ │ └── route.ts
│ │ │ │ └── search
│ │ │ │ └── route.ts
│ │ │ ├── favicon.ico
│ │ │ ├── global.css
│ │ │ ├── layout.config.tsx
│ │ │ ├── layout.tsx
│ │ │ ├── llms.mdx
│ │ │ │ └── [[...slug]]
│ │ │ │ └── route.ts
│ │ │ ├── llms.txt
│ │ │ │ └── route.ts
│ │ │ ├── robots.ts
│ │ │ └── sitemap.ts
│ │ ├── assets
│ │ │ ├── discord-black.svg
│ │ │ ├── discord-white.svg
│ │ │ ├── logo-black.svg
│ │ │ └── logo-white.svg
│ │ ├── components
│ │ │ ├── analytics-tracker.tsx
│ │ │ ├── cookie-consent.tsx
│ │ │ ├── doc-actions-menu.tsx
│ │ │ ├── editable-code-block.tsx
│ │ │ ├── footer.tsx
│ │ │ ├── hero.tsx
│ │ │ ├── iou.tsx
│ │ │ ├── mermaid.tsx
│ │ │ └── page-feedback.tsx
│ │ ├── lib
│ │ │ ├── llms.ts
│ │ │ └── source.ts
│ │ ├── mdx-components.tsx
│ │ └── providers
│ │ └── posthog-provider.tsx
│ └── tsconfig.json
├── examples
│ ├── agent_examples.py
│ ├── agent_ui_examples.py
│ ├── browser_tool_example.py
│ ├── cloud_api_examples.py
│ ├── computer_examples_windows.py
│ ├── computer_examples.py
│ ├── computer_ui_examples.py
│ ├── computer-example-ts
│ │ ├── .env.example
│ │ ├── .gitignore
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── README.md
│ │ ├── src
│ │ │ ├── helpers.ts
│ │ │ └── index.ts
│ │ └── tsconfig.json
│ ├── docker_examples.py
│ ├── evals
│ │ ├── hud_eval_examples.py
│ │ └── wikipedia_most_linked.txt
│ ├── pylume_examples.py
│ ├── sandboxed_functions_examples.py
│ ├── som_examples.py
│ ├── tracing_examples.py
│ ├── utils.py
│ └── winsandbox_example.py
├── img
│ ├── agent_gradio_ui.png
│ ├── agent.png
│ ├── cli.png
│ ├── computer.png
│ ├── logo_black.png
│ └── logo_white.png
├── libs
│ ├── kasm
│ │ ├── Dockerfile
│ │ ├── LICENSE
│ │ ├── README.md
│ │ └── src
│ │ └── ubuntu
│ │ └── install
│ │ └── firefox
│ │ ├── custom_startup.sh
│ │ ├── firefox.desktop
│ │ └── install_firefox.sh
│ ├── lume
│ │ ├── .cursorignore
│ │ ├── CONTRIBUTING.md
│ │ ├── Development.md
│ │ ├── img
│ │ │ └── cli.png
│ │ ├── Package.resolved
│ │ ├── Package.swift
│ │ ├── README.md
│ │ ├── resources
│ │ │ └── lume.entitlements
│ │ ├── scripts
│ │ │ ├── build
│ │ │ │ ├── build-debug.sh
│ │ │ │ ├── build-release-notarized.sh
│ │ │ │ └── build-release.sh
│ │ │ └── install.sh
│ │ ├── src
│ │ │ ├── Commands
│ │ │ │ ├── Clone.swift
│ │ │ │ ├── Config.swift
│ │ │ │ ├── Create.swift
│ │ │ │ ├── Delete.swift
│ │ │ │ ├── Get.swift
│ │ │ │ ├── Images.swift
│ │ │ │ ├── IPSW.swift
│ │ │ │ ├── List.swift
│ │ │ │ ├── Logs.swift
│ │ │ │ ├── Options
│ │ │ │ │ └── FormatOption.swift
│ │ │ │ ├── Prune.swift
│ │ │ │ ├── Pull.swift
│ │ │ │ ├── Push.swift
│ │ │ │ ├── Run.swift
│ │ │ │ ├── Serve.swift
│ │ │ │ ├── Set.swift
│ │ │ │ └── Stop.swift
│ │ │ ├── ContainerRegistry
│ │ │ │ ├── ImageContainerRegistry.swift
│ │ │ │ ├── ImageList.swift
│ │ │ │ └── ImagesPrinter.swift
│ │ │ ├── Errors
│ │ │ │ └── Errors.swift
│ │ │ ├── FileSystem
│ │ │ │ ├── Home.swift
│ │ │ │ ├── Settings.swift
│ │ │ │ ├── VMConfig.swift
│ │ │ │ ├── VMDirectory.swift
│ │ │ │ └── VMLocation.swift
│ │ │ ├── LumeController.swift
│ │ │ ├── Main.swift
│ │ │ ├── Server
│ │ │ │ ├── Handlers.swift
│ │ │ │ ├── HTTP.swift
│ │ │ │ ├── Requests.swift
│ │ │ │ ├── Responses.swift
│ │ │ │ └── Server.swift
│ │ │ ├── Utils
│ │ │ │ ├── CommandRegistry.swift
│ │ │ │ ├── CommandUtils.swift
│ │ │ │ ├── Logger.swift
│ │ │ │ ├── NetworkUtils.swift
│ │ │ │ ├── Path.swift
│ │ │ │ ├── ProcessRunner.swift
│ │ │ │ ├── ProgressLogger.swift
│ │ │ │ ├── String.swift
│ │ │ │ └── Utils.swift
│ │ │ ├── Virtualization
│ │ │ │ ├── DarwinImageLoader.swift
│ │ │ │ ├── DHCPLeaseParser.swift
│ │ │ │ ├── ImageLoaderFactory.swift
│ │ │ │ └── VMVirtualizationService.swift
│ │ │ ├── VM
│ │ │ │ ├── DarwinVM.swift
│ │ │ │ ├── LinuxVM.swift
│ │ │ │ ├── VM.swift
│ │ │ │ ├── VMDetails.swift
│ │ │ │ ├── VMDetailsPrinter.swift
│ │ │ │ ├── VMDisplayResolution.swift
│ │ │ │ └── VMFactory.swift
│ │ │ └── VNC
│ │ │ ├── PassphraseGenerator.swift
│ │ │ └── VNCService.swift
│ │ └── tests
│ │ ├── Mocks
│ │ │ ├── MockVM.swift
│ │ │ ├── MockVMVirtualizationService.swift
│ │ │ └── MockVNCService.swift
│ │ ├── VM
│ │ │ └── VMDetailsPrinterTests.swift
│ │ ├── VMTests.swift
│ │ ├── VMVirtualizationServiceTests.swift
│ │ └── VNCServiceTests.swift
│ ├── lumier
│ │ ├── .dockerignore
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── bin
│ │ │ └── entry.sh
│ │ ├── config
│ │ │ └── constants.sh
│ │ ├── hooks
│ │ │ └── on-logon.sh
│ │ └── lib
│ │ ├── utils.sh
│ │ └── vm.sh
│ ├── python
│ │ ├── agent
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── agent
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── adapters
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── cua_adapter.py
│ │ │ │ │ ├── huggingfacelocal_adapter.py
│ │ │ │ │ ├── human_adapter.py
│ │ │ │ │ ├── mlxvlm_adapter.py
│ │ │ │ │ └── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ └── qwen2_5_vl.py
│ │ │ │ ├── agent.py
│ │ │ │ ├── callbacks
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── budget_manager.py
│ │ │ │ │ ├── image_retention.py
│ │ │ │ │ ├── logging.py
│ │ │ │ │ ├── operator_validator.py
│ │ │ │ │ ├── pii_anonymization.py
│ │ │ │ │ ├── prompt_instructions.py
│ │ │ │ │ ├── telemetry.py
│ │ │ │ │ └── trajectory_saver.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── computers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cua.py
│ │ │ │ │ └── custom.py
│ │ │ │ ├── decorators.py
│ │ │ │ ├── human_tool
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ ├── server.py
│ │ │ │ │ └── ui.py
│ │ │ │ ├── integrations
│ │ │ │ │ └── hud
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── agent.py
│ │ │ │ │ └── proxy.py
│ │ │ │ ├── loops
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── anthropic.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── composed_grounded.py
│ │ │ │ │ ├── gelato.py
│ │ │ │ │ ├── gemini.py
│ │ │ │ │ ├── generic_vlm.py
│ │ │ │ │ ├── glm45v.py
│ │ │ │ │ ├── gta1.py
│ │ │ │ │ ├── holo.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── model_types.csv
│ │ │ │ │ ├── moondream3.py
│ │ │ │ │ ├── omniparser.py
│ │ │ │ │ ├── openai.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ ├── uiins.py
│ │ │ │ │ ├── uitars.py
│ │ │ │ │ └── uitars2.py
│ │ │ │ ├── proxy
│ │ │ │ │ ├── examples.py
│ │ │ │ │ └── handlers.py
│ │ │ │ ├── responses.py
│ │ │ │ ├── tools
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── browser_tool.py
│ │ │ │ ├── types.py
│ │ │ │ └── ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ └── gradio
│ │ │ │ ├── __init__.py
│ │ │ │ ├── app.py
│ │ │ │ └── ui_components.py
│ │ │ ├── benchmarks
│ │ │ │ ├── .gitignore
│ │ │ │ ├── contrib.md
│ │ │ │ ├── interactive.py
│ │ │ │ ├── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ └── gta1.py
│ │ │ │ ├── README.md
│ │ │ │ ├── ss-pro.py
│ │ │ │ ├── ss-v2.py
│ │ │ │ └── utils.py
│ │ │ ├── example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer_agent.py
│ │ ├── bench-ui
│ │ │ ├── bench_ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api.py
│ │ │ │ └── child.py
│ │ │ ├── examples
│ │ │ │ ├── folder_example.py
│ │ │ │ ├── gui
│ │ │ │ │ ├── index.html
│ │ │ │ │ ├── logo.svg
│ │ │ │ │ └── styles.css
│ │ │ │ ├── output_overlay.png
│ │ │ │ └── simple_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ └── test_port_detection.py
│ │ ├── computer
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer
│ │ │ │ ├── __init__.py
│ │ │ │ ├── computer.py
│ │ │ │ ├── diorama_computer.py
│ │ │ │ ├── helpers.py
│ │ │ │ ├── interface
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ ├── models.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── logger.py
│ │ │ │ ├── models.py
│ │ │ │ ├── providers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cloud
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── docker
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── lume
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── lume_api.py
│ │ │ │ │ ├── lumier
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── types.py
│ │ │ │ │ └── winsandbox
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── provider.py
│ │ │ │ │ └── setup_script.ps1
│ │ │ │ ├── tracing_wrapper.py
│ │ │ │ ├── tracing.py
│ │ │ │ ├── ui
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ └── gradio
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── app.py
│ │ │ │ └── utils.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer.py
│ │ ├── computer-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── browser.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── diorama
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── diorama_computer.py
│ │ │ │ │ ├── diorama.py
│ │ │ │ │ ├── draw.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── safezone.py
│ │ │ │ ├── handlers
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── main.py
│ │ │ │ ├── server.py
│ │ │ │ ├── utils
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── wallpaper.py
│ │ │ │ └── watchdog.py
│ │ │ ├── examples
│ │ │ │ ├── __init__.py
│ │ │ │ └── usage_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ ├── run_server.py
│ │ │ ├── test_connection.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_server.py
│ │ ├── core
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── core
│ │ │ │ ├── __init__.py
│ │ │ │ └── telemetry
│ │ │ │ ├── __init__.py
│ │ │ │ └── posthog.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_telemetry.py
│ │ ├── mcp-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── build-extension.py
│ │ │ ├── CONCURRENT_SESSIONS.md
│ │ │ ├── desktop-extension
│ │ │ │ ├── cua-extension.mcpb
│ │ │ │ ├── desktop_extension.png
│ │ │ │ ├── manifest.json
│ │ │ │ ├── README.md
│ │ │ │ ├── requirements.txt
│ │ │ │ ├── run_server.sh
│ │ │ │ └── setup.py
│ │ │ ├── mcp_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── server.py
│ │ │ │ └── session_manager.py
│ │ │ ├── pdm.lock
│ │ │ ├── pyproject.toml
│ │ │ ├── QUICK_TEST_COMMANDS.sh
│ │ │ ├── quick_test_local_option.py
│ │ │ ├── README.md
│ │ │ ├── scripts
│ │ │ │ ├── install_mcp_server.sh
│ │ │ │ └── start_mcp_server.sh
│ │ │ ├── test_mcp_server_local_option.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_mcp_server.py
│ │ ├── pylume
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_pylume.py
│ │ └── som
│ │ ├── .bumpversion.cfg
│ │ ├── LICENSE
│ │ ├── poetry.toml
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ ├── som
│ │ │ ├── __init__.py
│ │ │ ├── detect.py
│ │ │ ├── detection.py
│ │ │ ├── models.py
│ │ │ ├── ocr.py
│ │ │ ├── util
│ │ │ │ └── utils.py
│ │ │ └── visualization.py
│ │ └── tests
│ │ ├── conftest.py
│ │ └── test_omniparser.py
│ ├── qemu-docker
│ │ ├── linux
│ │ │ ├── Dockerfile
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ ├── entry.sh
│ │ │ └── vm
│ │ │ ├── image
│ │ │ │ └── README.md
│ │ │ └── setup
│ │ │ ├── install.sh
│ │ │ ├── setup-cua-server.sh
│ │ │ └── setup.sh
│ │ ├── README.md
│ │ └── windows
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── entry.sh
│ │ └── vm
│ │ ├── image
│ │ │ └── README.md
│ │ └── setup
│ │ ├── install.bat
│ │ ├── on-logon.ps1
│ │ ├── setup-cua-server.ps1
│ │ ├── setup-utils.psm1
│ │ └── setup.ps1
│ ├── typescript
│ │ ├── .gitignore
│ │ ├── .nvmrc
│ │ ├── agent
│ │ │ ├── examples
│ │ │ │ ├── playground-example.html
│ │ │ │ └── README.md
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── client.ts
│ │ │ │ ├── index.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ └── client.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── computer
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── computer
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── providers
│ │ │ │ │ │ ├── base.ts
│ │ │ │ │ │ ├── cloud.ts
│ │ │ │ │ │ └── index.ts
│ │ │ │ │ └── types.ts
│ │ │ │ ├── index.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── base.ts
│ │ │ │ │ ├── factory.ts
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── linux.ts
│ │ │ │ │ ├── macos.ts
│ │ │ │ │ └── windows.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ ├── computer
│ │ │ │ │ └── cloud.test.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── factory.test.ts
│ │ │ │ │ ├── index.test.ts
│ │ │ │ │ ├── linux.test.ts
│ │ │ │ │ ├── macos.test.ts
│ │ │ │ │ └── windows.test.ts
│ │ │ │ └── setup.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── core
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── index.ts
│ │ │ │ └── telemetry
│ │ │ │ ├── clients
│ │ │ │ │ ├── index.ts
│ │ │ │ │ └── posthog.ts
│ │ │ │ └── index.ts
│ │ │ ├── tests
│ │ │ │ └── telemetry.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── cua-cli
│ │ │ ├── .gitignore
│ │ │ ├── .prettierrc
│ │ │ ├── bun.lock
│ │ │ ├── CLAUDE.md
│ │ │ ├── index.ts
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── auth.ts
│ │ │ │ ├── cli.ts
│ │ │ │ ├── commands
│ │ │ │ │ ├── auth.ts
│ │ │ │ │ └── sandbox.ts
│ │ │ │ ├── config.ts
│ │ │ │ ├── http.ts
│ │ │ │ ├── storage.ts
│ │ │ │ └── util.ts
│ │ │ └── tsconfig.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── pnpm-workspace.yaml
│ │ └── README.md
│ └── xfce
│ ├── .dockerignore
│ ├── .gitignore
│ ├── Development.md
│ ├── Dockerfile
│ ├── Dockerfile.dev
│ ├── README.md
│ └── src
│ ├── scripts
│ │ ├── resize-display.sh
│ │ ├── start-computer-server.sh
│ │ ├── start-novnc.sh
│ │ ├── start-vnc.sh
│ │ └── xstartup.sh
│ ├── supervisor
│ │ └── supervisord.conf
│ └── xfce-config
│ ├── helpers.rc
│ ├── xfce4-power-manager.xml
│ └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│ ├── agent_nb.ipynb
│ ├── blog
│ │ ├── build-your-own-operator-on-macos-1.ipynb
│ │ └── build-your-own-operator-on-macos-2.ipynb
│ ├── composite_agents_docker_nb.ipynb
│ ├── computer_nb.ipynb
│ ├── computer_server_nb.ipynb
│ ├── customizing_computeragent.ipynb
│ ├── eval_osworld.ipynb
│ ├── ollama_nb.ipynb
│ ├── README.md
│ ├── sota_hackathon_cloud.ipynb
│ └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── install-cli.ps1
│ ├── install-cli.sh
│ ├── playground-docker.sh
│ ├── playground.sh
│ ├── run-docker-dev.sh
│ └── typescript-typecheck.js
├── TESTING.md
├── tests
│ ├── agent_loop_testing
│ │ ├── agent_test.py
│ │ └── README.md
│ ├── pytest.ini
│ ├── shell_cmd.py
│ ├── test_files.py
│ ├── test_mcp_server_session_management.py
│ ├── test_mcp_server_streaming.py
│ ├── test_shell_bash.py
│ ├── test_telemetry.py
│ ├── test_tracing.py
│ ├── test_venv.py
│ └── test_watchdog.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/docs/content/docs/computer-sdk/commands.mdx:
--------------------------------------------------------------------------------
```markdown
---
title: Commands
description: Computer commands and interface methods
---
This page describes the set of supported **commands** you can use to control a Cua Computer directly via the Python SDK.
These commands map to the same actions available in the [Computer Server API Commands Reference](/computer-sdk/computer-server/Commands), and provide low-level, async access to system operations from your agent or automation code.
## Shell Actions
Execute shell commands and get detailed results:
<Tabs items={['Python', 'TypeScript']}>
<Tab value="Python">
```python
# Run shell command
result = await computer.interface.run_command(cmd) # result.stdout, result.stderr, result.returncode
```
</Tab>
<Tab value="TypeScript">
```typescript
// Run shell command
const result = await computer.interface.runCommand(cmd); // result.stdout, result.stderr, result.returncode
```
</Tab>
</Tabs>
## Window Management
Control application launching and windows:
<Tabs items={['Python', 'TypeScript']}>
<Tab value="Python">
```python
# Launch applications
await computer.interface.launch("xfce4-terminal")
await computer.interface.launch("libreoffice --writer")
await computer.interface.open("https://www.google.com")
# Window management
windows = await computer.interface.get_application_windows("xfce4-terminal")
window_id = windows[0]
await computer.interface.activate_window(window_id)
window_id = await computer.interface.get_current_window_id() # get the current active window id
await computer.interface.window_size(window_id)
await computer.interface.get_window_title(window_id)
await computer.interface.get_window_position(window_id)
await computer.interface.set_window_size(window_id, 1200, 800)
await computer.interface.set_window_position(window_id, 100, 100)
await computer.interface.maximize_window(window_id)
await computer.interface.minimize_window(window_id)
await computer.interface.close_window(window_id)
```
</Tab>
<Tab value="TypeScript">
```typescript
// Launch applications
await computer.interface.launch("xfce4-terminal");
await computer.interface.launch("libreoffice --writer");
await computer.interface.open("https://www.google.com");
// Window management
const windows = await computer.interface.getApplicationWindows("xfce4-terminal");
let windowId = windows[0];
await computer.interface.activateWindow(windowId);
windowId = await computer.interface.getCurrentWindowId(); // current active window id
await computer.interface.getWindowSize(windowId);
await computer.interface.getWindowName(windowId);
await computer.interface.getWindowPosition(windowId);
await computer.interface.setWindowSize(windowId, 1200, 800);
await computer.interface.setWindowPosition(windowId, 100, 100);
await computer.interface.maximizeWindow(windowId);
await computer.interface.minimizeWindow(windowId);
await computer.interface.closeWindow(windowId);
```
</Tab>
</Tabs>
## Mouse Actions
Precise mouse control and interaction:
<Tabs items={['Python', 'TypeScript']}>
<Tab value="Python">
```python
# Basic clicks
await computer.interface.left_click(x, y) # Left click at coordinates
await computer.interface.right_click(x, y) # Right click at coordinates
await computer.interface.double_click(x, y) # Double click at coordinates
# Cursor movement and dragging
await computer.interface.move_cursor(x, y) # Move cursor to coordinates
await computer.interface.drag_to(x, y, duration) # Drag to coordinates
await computer.interface.get_cursor_position() # Get current cursor position
# Advanced mouse control
await computer.interface.mouse_down(x, y, button="left") # Press and hold a mouse button
await computer.interface.mouse_up(x, y, button="left") # Release a mouse button
```
</Tab>
<Tab value="TypeScript">
```typescript
// Basic clicks
await computer.interface.leftClick(x, y); // Left click at coordinates
await computer.interface.rightClick(x, y); // Right click at coordinates
await computer.interface.doubleClick(x, y); // Double click at coordinates
// Cursor movement and dragging
await computer.interface.moveCursor(x, y); // Move cursor to coordinates
await computer.interface.dragTo(x, y, duration); // Drag to coordinates
await computer.interface.getCursorPosition(); // Get current cursor position
// Advanced mouse control
await computer.interface.mouseDown(x, y, "left"); // Press and hold a mouse button
await computer.interface.mouseUp(x, y, "left"); // Release a mouse button
```
</Tab>
</Tabs>
## Keyboard Actions
Text input and key combinations:
<Tabs items={['Python', 'TypeScript']}>
<Tab value="Python">
```python
# Text input
await computer.interface.type_text("Hello") # Type text
await computer.interface.press_key("enter") # Press a single key
# Key combinations and advanced control
await computer.interface.hotkey("command", "c") # Press key combination
await computer.interface.key_down("command") # Press and hold a key
await computer.interface.key_up("command") # Release a key
```
</Tab>
<Tab value="TypeScript">
```typescript
// Text input
await computer.interface.typeText("Hello"); // Type text
await computer.interface.pressKey("enter"); // Press a single key
// Key combinations and advanced control
await computer.interface.hotkey("command", "c"); // Press key combination
await computer.interface.keyDown("command"); // Press and hold a key
await computer.interface.keyUp("command"); // Release a key
```
</Tab>
</Tabs>
## Scrolling Actions
Mouse wheel and scrolling control:
<Tabs items={['Python', 'TypeScript']}>
<Tab value="Python">
```python
# Scrolling
await computer.interface.scroll(x, y) # Scroll the mouse wheel
await computer.interface.scroll_down(clicks) # Scroll down
await computer.interface.scroll_up(clicks) # Scroll up
```
</Tab>
<Tab value="TypeScript">
```typescript
// Scrolling
await computer.interface.scroll(x, y); // Scroll the mouse wheel
await computer.interface.scrollDown(clicks); // Scroll down
await computer.interface.scrollUp(clicks); // Scroll up
```
</Tab>
</Tabs>
## Screen Actions
Screen capture and display information:
<Tabs items={['Python', 'TypeScript']}>
<Tab value="Python">
```python
# Screen operations
await computer.interface.screenshot() # Take a screenshot
await computer.interface.get_screen_size() # Get screen dimensions
```
</Tab>
<Tab value="TypeScript">
```typescript
// Screen operations
await computer.interface.screenshot(); // Take a screenshot
await computer.interface.getScreenSize(); // Get screen dimensions
```
</Tab>
</Tabs>
## Desktop Actions
Control desktop environment features like wallpaper:
<Tabs items={['Python', 'TypeScript']}>
<Tab value="Python">
```python
# Get current desktop environment (e.g., 'xfce4', 'gnome', 'kde', 'mac', 'windows')
env = await computer.interface.get_desktop_environment()
print(env) # "xfce4"
# Set desktop wallpaper to an image file accessible on the sandbox
await computer.interface.set_wallpaper("/home/cua/shared/wallpaper.png")
```
</Tab>
<Tab value="TypeScript">
```typescript
// Get current desktop environment
const env = await computer.interface.getDesktopEnvironment();
print(env) # "xfce4"
// Set desktop wallpaper to an image file accessible on the sandbox
await computer.interface.setWallpaper('/home/cua/shared/wallpaper.png');
```
</Tab>
</Tabs>
## Clipboard Actions
System clipboard management:
<Tabs items={['Python', 'TypeScript']}>
<Tab value="Python">
```python
# Clipboard operations
await computer.interface.set_clipboard(text) # Set clipboard content
await computer.interface.copy_to_clipboard() # Get clipboard content
```
</Tab>
<Tab value="TypeScript">
```typescript
// Clipboard operations
await computer.interface.setClipboard(text); // Set clipboard content
await computer.interface.copyToClipboard(); // Get clipboard content
```
</Tab>
</Tabs>
## File System Operations
Direct file and directory manipulation:
<Tabs items={['Python', 'TypeScript']}>
<Tab value="Python">
```python
# File existence checks
await computer.interface.file_exists(path) # Check if file exists
await computer.interface.directory_exists(path) # Check if directory exists
# File content operations
await computer.interface.read_text(path, encoding="utf-8") # Read file content
await computer.interface.write_text(path, content, encoding="utf-8") # Write file content
await computer.interface.read_bytes(path) # Read file content as bytes
await computer.interface.write_bytes(path, content) # Write file content as bytes
# File and directory management
await computer.interface.delete_file(path) # Delete file
await computer.interface.create_dir(path) # Create directory
await computer.interface.delete_dir(path) # Delete directory
await computer.interface.list_dir(path) # List directory contents
```
</Tab>
<Tab value="TypeScript">
```typescript
// File existence checks
await computer.interface.fileExists(path); // Check if file exists
await computer.interface.directoryExists(path); // Check if directory exists
// File content operations
await computer.interface.readText(path, "utf-8"); // Read file content
await computer.interface.writeText(path, content, "utf-8"); // Write file content
await computer.interface.readBytes(path); // Read file content as bytes
await computer.interface.writeBytes(path, content); // Write file content as bytes
// File and directory management
await computer.interface.deleteFile(path); // Delete file
await computer.interface.createDir(path); // Create directory
await computer.interface.deleteDir(path); // Delete directory
await computer.interface.listDir(path); // List directory contents
```
</Tab>
</Tabs>
## Accessibility
Access system accessibility information:
<Tabs items={['Python', 'TypeScript']}>
<Tab value="Python">
```python
# Get accessibility tree
await computer.interface.get_accessibility_tree()
```
</Tab>
<Tab value="TypeScript">
```typescript
// Get accessibility tree
await computer.interface.getAccessibilityTree();
```
</Tab>
</Tabs>
## Delay Configuration
Control timing between actions:
<Tabs items={['Python']}>
<Tab value="Python">
```python
# Set default delay between all actions (in seconds)
computer.interface.delay = 0.5 # 500ms delay between actions
# Or specify delay for individual actions
await computer.interface.left_click(x, y, delay=1.0) # 1 second delay after click
await computer.interface.type_text("Hello", delay=0.2) # 200ms delay after typing
await computer.interface.press_key("enter", delay=0.5) # 500ms delay after key press
```
</Tab>
</Tabs>
## Python Virtual Environment Operations
Manage Python environments:
<Tabs items={['Python']}>
<Tab value="Python">
```python
# Virtual environment management
await computer.venv_install("demo_venv", ["requests", "macos-pyxa"]) # Install packages in a virtual environment
await computer.venv_cmd("demo_venv", "python -c 'import requests; print(requests.get(`https://httpbin.org/ip`).json())'') # Run a shell command in a virtual environment
await computer.venv_exec("demo_venv", python_function_or_code, *args, **kwargs) # Run a Python function in a virtual environment and return the result / raise an exception
```
</Tab>
</Tabs>
```
--------------------------------------------------------------------------------
/libs/python/mcp-server/mcp_server/session_manager.py:
--------------------------------------------------------------------------------
```python
"""
Session Manager for MCP Server - Handles concurrent client sessions with proper resource isolation.
This module provides:
- Per-session computer instance management
- Resource pooling and lifecycle management
- Graceful session cleanup
- Concurrent task execution support
"""
import asyncio
import logging
import os
import time
import uuid
import weakref
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set
logger = logging.getLogger("mcp-server.session_manager")
@dataclass
class SessionInfo:
"""Information about an active session."""
session_id: str
computer: Any # Computer instance
created_at: float
last_activity: float
active_tasks: Set[str] = field(default_factory=set)
is_shutting_down: bool = False
class ComputerPool:
"""Pool of computer instances for efficient resource management."""
def __init__(self, max_size: int = 5, idle_timeout: float = 300.0):
self.max_size = max_size
self.idle_timeout = idle_timeout
self._available: List[Any] = []
self._in_use: Set[Any] = set()
self._creation_lock = asyncio.Lock()
async def acquire(self) -> Any:
"""Acquire a computer instance from the pool."""
# Try to get an available instance
if self._available:
computer = self._available.pop()
self._in_use.add(computer)
logger.debug("Reusing computer instance from pool")
return computer
# Check if we can create a new one
async with self._creation_lock:
if len(self._in_use) < self.max_size:
logger.debug("Creating new computer instance")
from computer import Computer
# Check if we should use host computer server
use_host = os.getenv("CUA_USE_HOST_COMPUTER_SERVER", "false").lower() in (
"true",
"1",
"yes",
)
computer = Computer(verbosity=logging.INFO, use_host_computer_server=use_host)
await computer.run()
self._in_use.add(computer)
return computer
# Wait for an instance to become available
logger.debug("Waiting for computer instance to become available")
while not self._available:
await asyncio.sleep(0.1)
computer = self._available.pop()
self._in_use.add(computer)
return computer
async def release(self, computer: Any) -> None:
"""Release a computer instance back to the pool."""
if computer in self._in_use:
self._in_use.remove(computer)
self._available.append(computer)
logger.debug("Released computer instance back to pool")
async def cleanup_idle(self) -> None:
"""Clean up idle computer instances."""
current_time = time.time()
idle_instances = []
for computer in self._available[:]:
# Check if computer has been idle too long
# Note: We'd need to track last use time per instance for this
# For now, we'll keep instances in the pool
pass
async def shutdown(self) -> None:
"""Shutdown all computer instances in the pool."""
logger.info("Shutting down computer pool")
# Close all available instances
for computer in self._available:
try:
if hasattr(computer, "close"):
await computer.close()
elif hasattr(computer, "stop"):
await computer.stop()
except Exception as e:
logger.warning(f"Error closing computer instance: {e}")
# Close all in-use instances
for computer in self._in_use:
try:
if hasattr(computer, "close"):
await computer.close()
elif hasattr(computer, "stop"):
await computer.stop()
except Exception as e:
logger.warning(f"Error closing computer instance: {e}")
self._available.clear()
self._in_use.clear()
class SessionManager:
"""Manages concurrent client sessions with proper resource isolation."""
def __init__(self, max_concurrent_sessions: int = 10):
self.max_concurrent_sessions = max_concurrent_sessions
self._sessions: Dict[str, SessionInfo] = {}
self._computer_pool = ComputerPool()
self._session_lock = asyncio.Lock()
self._cleanup_task: Optional[asyncio.Task] = None
self._shutdown_event = asyncio.Event()
async def start(self) -> None:
"""Start the session manager and cleanup task."""
logger.info("Starting session manager")
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
async def stop(self) -> None:
"""Stop the session manager and cleanup all resources."""
logger.info("Stopping session manager")
self._shutdown_event.set()
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
# Force cleanup all sessions
async with self._session_lock:
session_ids = list(self._sessions.keys())
for session_id in session_ids:
await self._force_cleanup_session(session_id)
await self._computer_pool.shutdown()
@asynccontextmanager
async def get_session(self, session_id: Optional[str] = None) -> Any:
"""Get or create a session with proper resource management."""
if session_id is None:
session_id = str(uuid.uuid4())
# Check if session exists and is not shutting down
async with self._session_lock:
if session_id in self._sessions:
session = self._sessions[session_id]
if session.is_shutting_down:
raise RuntimeError(f"Session {session_id} is shutting down")
session.last_activity = time.time()
computer = session.computer
else:
# Create new session
if len(self._sessions) >= self.max_concurrent_sessions:
raise RuntimeError(
f"Maximum concurrent sessions ({self.max_concurrent_sessions}) reached"
)
computer = await self._computer_pool.acquire()
session = SessionInfo(
session_id=session_id,
computer=computer,
created_at=time.time(),
last_activity=time.time(),
)
self._sessions[session_id] = session
logger.info(f"Created new session: {session_id}")
try:
yield session
finally:
# Update last activity
async with self._session_lock:
if session_id in self._sessions:
self._sessions[session_id].last_activity = time.time()
async def register_task(self, session_id: str, task_id: str) -> None:
"""Register a task for a session."""
async with self._session_lock:
if session_id in self._sessions:
self._sessions[session_id].active_tasks.add(task_id)
logger.debug(f"Registered task {task_id} for session {session_id}")
async def unregister_task(self, session_id: str, task_id: str) -> None:
"""Unregister a task from a session."""
async with self._session_lock:
if session_id in self._sessions:
self._sessions[session_id].active_tasks.discard(task_id)
logger.debug(f"Unregistered task {task_id} from session {session_id}")
async def cleanup_session(self, session_id: str) -> None:
"""Cleanup a specific session."""
async with self._session_lock:
if session_id not in self._sessions:
return
session = self._sessions[session_id]
# Check if session has active tasks
if session.active_tasks:
logger.info(f"Session {session_id} has active tasks, marking for shutdown")
session.is_shutting_down = True
return
# Actually cleanup the session
await self._force_cleanup_session(session_id)
async def _force_cleanup_session(self, session_id: str) -> None:
"""Force cleanup a session regardless of active tasks."""
async with self._session_lock:
if session_id not in self._sessions:
return
session = self._sessions[session_id]
logger.info(f"Cleaning up session: {session_id}")
# Release computer back to pool
await self._computer_pool.release(session.computer)
# Remove session
del self._sessions[session_id]
async def _cleanup_loop(self) -> None:
"""Background task to cleanup idle sessions."""
while not self._shutdown_event.is_set():
try:
await asyncio.sleep(60) # Run cleanup every minute
current_time = time.time()
idle_timeout = 600.0 # 10 minutes
async with self._session_lock:
idle_sessions = []
for session_id, session in self._sessions.items():
if not session.is_shutting_down and not session.active_tasks:
if current_time - session.last_activity > idle_timeout:
idle_sessions.append(session_id)
# Cleanup idle sessions
for session_id in idle_sessions:
await self._force_cleanup_session(session_id)
logger.info(f"Cleaned up idle session: {session_id}")
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in cleanup loop: {e}")
def get_session_stats(self) -> Dict[str, Any]:
"""Get statistics about active sessions."""
async def _get_stats():
async with self._session_lock:
return {
"total_sessions": len(self._sessions),
"max_concurrent": self.max_concurrent_sessions,
"sessions": {
session_id: {
"created_at": session.created_at,
"last_activity": session.last_activity,
"active_tasks": len(session.active_tasks),
"is_shutting_down": session.is_shutting_down,
}
for session_id, session in self._sessions.items()
},
}
# Run in current event loop or create new one
try:
loop = asyncio.get_running_loop()
return asyncio.run_coroutine_threadsafe(_get_stats(), loop).result()
except RuntimeError:
# No event loop running, create a new one
return asyncio.run(_get_stats())
# Global session manager instance
_session_manager: Optional[SessionManager] = None
def get_session_manager() -> SessionManager:
"""Get the global session manager instance."""
global _session_manager
if _session_manager is None:
_session_manager = SessionManager()
return _session_manager
async def initialize_session_manager() -> None:
"""Initialize the global session manager."""
global _session_manager
if _session_manager is None:
_session_manager = SessionManager()
await _session_manager.start()
return _session_manager
async def shutdown_session_manager() -> None:
"""Shutdown the global session manager."""
global _session_manager
if _session_manager is not None:
await _session_manager.stop()
_session_manager = None
```
--------------------------------------------------------------------------------
/libs/python/computer/computer/tracing.py:
--------------------------------------------------------------------------------
```python
"""
Computer tracing functionality for recording sessions.
This module provides a Computer.tracing API inspired by Playwright's tracing functionality,
allowing users to record computer interactions for debugging, training, and analysis.
"""
import asyncio
import base64
import io
import json
import time
import uuid
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from PIL import Image
class ComputerTracing:
"""
Computer tracing class that records computer interactions and saves them to disk.
This class provides a flexible API for recording computer sessions with configurable
options for what to record (screenshots, API calls, video, etc.).
"""
def __init__(self, computer_instance):
"""
Initialize the tracing instance.
Args:
computer_instance: The Computer instance to trace
"""
self._computer = computer_instance
self._is_tracing = False
self._trace_config: Dict[str, Any] = {}
self._trace_data: List[Dict[str, Any]] = []
self._trace_start_time: Optional[float] = None
self._trace_id: Optional[str] = None
self._trace_dir: Optional[Path] = None
self._screenshot_count = 0
@property
def is_tracing(self) -> bool:
"""Check if tracing is currently active."""
return self._is_tracing
async def start(self, config: Optional[Dict[str, Any]] = None) -> None:
"""
Start tracing with the specified configuration.
Args:
config: Tracing configuration dict with options:
- video: bool - Record video frames (default: False)
- screenshots: bool - Record screenshots (default: True)
- api_calls: bool - Record API calls and results (default: True)
- accessibility_tree: bool - Record accessibility tree snapshots (default: False)
- metadata: bool - Record custom metadata (default: True)
- name: str - Custom trace name (default: auto-generated)
- path: str - Custom trace directory path (default: auto-generated)
"""
if self._is_tracing:
raise RuntimeError("Tracing is already active. Call stop() first.")
# Set default configuration
default_config = {
"video": False,
"screenshots": True,
"api_calls": True,
"accessibility_tree": False,
"metadata": True,
"name": None,
"path": None,
}
self._trace_config = {**default_config, **(config or {})}
# Generate trace ID and directory
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self._trace_id = (
self._trace_config.get("name") or f"trace_{timestamp}_{str(uuid.uuid4())[:8]}"
)
if self._trace_config.get("path"):
self._trace_dir = Path(self._trace_config["path"])
else:
self._trace_dir = Path.cwd() / "traces" / self._trace_id
# Create trace directory
self._trace_dir.mkdir(parents=True, exist_ok=True)
# Initialize trace data
self._trace_data = []
self._trace_start_time = time.time()
self._screenshot_count = 0
self._is_tracing = True
# Record initial metadata
await self._record_event(
"trace_start",
{
"trace_id": self._trace_id,
"config": self._trace_config,
"timestamp": self._trace_start_time,
"computer_info": {
"os_type": self._computer.os_type,
"provider_type": str(self._computer.provider_type),
"image": self._computer.image,
},
},
)
# Take initial screenshot if enabled
if self._trace_config.get("screenshots"):
await self._take_screenshot("initial_screenshot")
async def stop(self, options: Optional[Dict[str, Any]] = None) -> str:
"""
Stop tracing and save the trace data.
Args:
options: Stop options dict with:
- path: str - Custom output path for the trace archive
- format: str - Output format ('zip' or 'dir', default: 'zip')
Returns:
str: Path to the saved trace file or directory
"""
if not self._is_tracing:
raise RuntimeError("Tracing is not active. Call start() first.")
if self._trace_start_time is None or self._trace_dir is None or self._trace_id is None:
raise RuntimeError("Tracing state is invalid.")
# Record final metadata
await self._record_event(
"trace_end",
{
"timestamp": time.time(),
"duration": time.time() - self._trace_start_time,
"total_events": len(self._trace_data),
"screenshot_count": self._screenshot_count,
},
)
# Take final screenshot if enabled
if self._trace_config.get("screenshots"):
await self._take_screenshot("final_screenshot")
# Save trace metadata
metadata_path = self._trace_dir / "trace_metadata.json"
with open(metadata_path, "w") as f:
json.dump(
{
"trace_id": self._trace_id,
"config": self._trace_config,
"start_time": self._trace_start_time,
"end_time": time.time(),
"duration": time.time() - self._trace_start_time,
"total_events": len(self._trace_data),
"screenshot_count": self._screenshot_count,
"events": self._trace_data,
},
f,
indent=2,
default=str,
)
# Determine output format and path
output_format = options.get("format", "zip") if options else "zip"
custom_path = options.get("path") if options else None
if output_format == "zip":
# Create zip file
if custom_path:
zip_path = Path(custom_path)
else:
zip_path = self._trace_dir.parent / f"{self._trace_id}.zip"
await self._create_zip_archive(zip_path)
output_path = str(zip_path)
else:
# Return directory path
if custom_path:
# Move directory to custom path
custom_dir = Path(custom_path)
if custom_dir.exists():
import shutil
shutil.rmtree(custom_dir)
self._trace_dir.rename(custom_dir)
output_path = str(custom_dir)
else:
output_path = str(self._trace_dir)
# Reset tracing state
self._is_tracing = False
self._trace_config = {}
self._trace_data = []
self._trace_start_time = None
self._trace_id = None
self._screenshot_count = 0
return output_path
async def _record_event(self, event_type: str, data: Dict[str, Any]) -> None:
"""
Record a trace event.
Args:
event_type: Type of event (e.g., 'click', 'type', 'screenshot')
data: Event data
"""
if not self._is_tracing or self._trace_start_time is None or self._trace_dir is None:
return
event = {
"type": event_type,
"timestamp": time.time(),
"relative_time": time.time() - self._trace_start_time,
"data": data,
}
self._trace_data.append(event)
# Save event to individual file for large traces
event_file = self._trace_dir / f"event_{len(self._trace_data):06d}_{event_type}.json"
with open(event_file, "w") as f:
json.dump(event, f, indent=2, default=str)
async def _take_screenshot(self, name: str = "screenshot") -> Optional[str]:
"""
Take a screenshot and save it to the trace.
Args:
name: Name for the screenshot
Returns:
Optional[str]: Path to the saved screenshot, or None if screenshots disabled
"""
if (
not self._trace_config.get("screenshots")
or not self._computer.interface
or self._trace_dir is None
):
return None
try:
screenshot_bytes = await self._computer.interface.screenshot()
self._screenshot_count += 1
screenshot_filename = f"{self._screenshot_count:06d}_{name}.png"
screenshot_path = self._trace_dir / screenshot_filename
with open(screenshot_path, "wb") as f:
f.write(screenshot_bytes)
return str(screenshot_path)
except Exception as e:
# Log error but don't fail the trace
if hasattr(self._computer, "logger"):
self._computer.logger.warning(f"Failed to take screenshot: {e}")
return None
async def _create_zip_archive(self, zip_path: Path) -> None:
"""
Create a zip archive of the trace directory.
Args:
zip_path: Path where to save the zip file
"""
if self._trace_dir is None:
raise RuntimeError("Trace directory is not set")
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for file_path in self._trace_dir.rglob("*"):
if file_path.is_file():
arcname = file_path.relative_to(self._trace_dir)
zipf.write(file_path, arcname)
async def record_api_call(
self,
method: str,
args: Dict[str, Any],
result: Any = None,
error: Optional[Exception] = None,
) -> None:
"""
Record an API call event.
Args:
method: The method name that was called
args: Arguments passed to the method
result: Result returned by the method
error: Exception raised by the method, if any
"""
if not self._trace_config.get("api_calls"):
return
# Take screenshot after certain actions if enabled
screenshot_path = None
screenshot_actions = [
"left_click",
"right_click",
"double_click",
"type_text",
"press_key",
"hotkey",
]
if method in screenshot_actions and self._trace_config.get("screenshots"):
screenshot_path = await self._take_screenshot(f"after_{method}")
# Record accessibility tree after certain actions if enabled
if method in screenshot_actions and self._trace_config.get("accessibility_tree"):
await self.record_accessibility_tree()
await self._record_event(
"api_call",
{
"method": method,
"args": args,
"result": str(result) if result is not None else None,
"error": str(error) if error else None,
"screenshot": screenshot_path,
"success": error is None,
},
)
async def record_accessibility_tree(self) -> None:
"""Record the current accessibility tree if enabled."""
if not self._trace_config.get("accessibility_tree") or not self._computer.interface:
return
try:
accessibility_tree = await self._computer.interface.get_accessibility_tree()
await self._record_event("accessibility_tree", {"tree": accessibility_tree})
except Exception as e:
if hasattr(self._computer, "logger"):
self._computer.logger.warning(f"Failed to record accessibility tree: {e}")
async def add_metadata(self, key: str, value: Any) -> None:
"""
Add custom metadata to the trace.
Args:
key: Metadata key
value: Metadata value
"""
if not self._trace_config.get("metadata"):
return
await self._record_event("metadata", {"key": key, "value": value})
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/composed_grounded.py:
--------------------------------------------------------------------------------
```python
"""
Composed-grounded agent loop implementation that combines grounding and thinking models.
Uses a two-stage approach: grounding model for element detection, thinking model for reasoning.
"""
import asyncio
import base64
import json
import uuid
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple
import litellm
from PIL import Image
from ..agent import find_agent_config
from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..responses import (
convert_completion_messages_to_responses_items,
convert_computer_calls_desc2xy,
convert_computer_calls_xy2desc,
convert_responses_items_to_completion_messages,
get_all_element_descriptions,
)
from ..types import AgentCapability, AgentResponse, Messages, Tools
GROUNDED_COMPUTER_TOOL_SCHEMA = {
"type": "function",
"function": {
"name": "computer",
"description": "Control a computer by taking screenshots and interacting with UI elements. This tool uses element descriptions to locate and interact with UI elements on the screen (e.g., 'red submit button', 'search text field', 'hamburger menu icon', 'close button in top right corner').",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [
"screenshot",
"click",
"double_click",
"drag",
"type",
"keypress",
"scroll",
"move",
"wait",
"get_current_url",
"get_dimensions",
"get_environment",
],
"description": "The action to perform (required for all actions)",
},
"element_description": {
"type": "string",
"description": "Description of the element to interact with (required for click, double_click, move, scroll actions)",
},
"start_element_description": {
"type": "string",
"description": "Description of the element to start dragging from (required for drag action)",
},
"end_element_description": {
"type": "string",
"description": "Description of the element to drag to (required for drag action)",
},
"text": {
"type": "string",
"description": "The text to type (required for type action)",
},
"keys": {
"type": "array",
"items": {"type": "string"},
"description": "Key(s) to press (required for keypress action)",
},
"button": {
"type": "string",
"enum": ["left", "right", "wheel", "back", "forward"],
"description": "The mouse button to use for click action (required for click and double_click action)",
},
"scroll_x": {
"type": "integer",
"description": "Horizontal scroll amount for scroll action (required for scroll action)",
},
"scroll_y": {
"type": "integer",
"description": "Vertical scroll amount for scroll action (required for scroll action)",
},
},
"required": ["action"],
},
},
}
def _prepare_tools_for_grounded(tool_schemas: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Prepare tools for grounded API format"""
grounded_tools = []
for schema in tool_schemas:
if schema["type"] == "computer":
grounded_tools.append(GROUNDED_COMPUTER_TOOL_SCHEMA)
else:
grounded_tools.append(schema)
return grounded_tools
def get_last_computer_call_image(messages: List[Dict[str, Any]]) -> Optional[str]:
"""Get the last computer call output image from messages."""
for message in reversed(messages):
if (
isinstance(message, dict)
and message.get("type") == "computer_call_output"
and isinstance(message.get("output"), dict)
and message["output"].get("type") == "input_image"
):
image_url = message["output"].get("image_url", "")
if image_url.startswith("data:image/png;base64,"):
return image_url.split(",", 1)[1]
return None
@register_agent(r".*\+.*", priority=1)
class ComposedGroundedConfig(AsyncAgentConfig):
"""
Composed-grounded agent configuration that uses both grounding and thinking models.
The model parameter should be in format: "grounding_model+thinking_model"
e.g., "huggingface-local/HelloKKMe/GTA1-7B+gemini/gemini-1.5-pro"
"""
def __init__(self):
self.desc2xy: Dict[str, Tuple[float, float]] = {}
async def predict_step(
self,
messages: List[Dict[str, Any]],
model: str,
tools: Optional[List[Dict[str, Any]]] = None,
max_retries: Optional[int] = None,
stream: bool = False,
computer_handler=None,
use_prompt_caching: Optional[bool] = False,
_on_api_start=None,
_on_api_end=None,
_on_usage=None,
_on_screenshot=None,
**kwargs,
) -> Dict[str, Any]:
"""
Composed-grounded predict step implementation.
Process:
0. Store last computer call image, if none then take a screenshot
1. Convert computer calls from xy to descriptions
2. Convert responses items to completion messages
3. Call thinking model with litellm.acompletion
4. Convert completion messages to responses items
5. Get all element descriptions and populate desc2xy mapping
6. Convert computer calls from descriptions back to xy coordinates
7. Return output and usage
"""
# Parse the composed model
if "+" not in model:
raise ValueError(
f"Composed model must be in format 'grounding_model+thinking_model', got: {model}"
)
grounding_model, thinking_model = model.split("+", 1)
pre_output_items = []
# Step 0: Store last computer call image, if none then take a screenshot
last_image_b64 = get_last_computer_call_image(messages)
if last_image_b64 is None:
# Take a screenshot
screenshot_b64 = await computer_handler.screenshot() # type: ignore
if screenshot_b64:
call_id = uuid.uuid4().hex
pre_output_items += [
{
"type": "message",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "Taking a screenshot to see the current computer screen.",
}
],
},
{
"action": {"type": "screenshot"},
"call_id": call_id,
"status": "completed",
"type": "computer_call",
},
{
"type": "computer_call_output",
"call_id": call_id,
"output": {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_b64}",
},
},
]
last_image_b64 = screenshot_b64
# Call screenshot callback if provided
if _on_screenshot:
await _on_screenshot(screenshot_b64)
tool_schemas = _prepare_tools_for_grounded(tools) # type: ignore
# Step 1: Convert computer calls from xy to descriptions
input_messages = messages + pre_output_items
messages_with_descriptions = convert_computer_calls_xy2desc(input_messages, self.desc2xy)
# Step 2: Convert responses items to completion messages
completion_messages = convert_responses_items_to_completion_messages(
messages_with_descriptions, allow_images_in_tool_results=False
)
# Step 3: Call thinking model with litellm.acompletion
api_kwargs = {
"model": thinking_model,
"messages": completion_messages,
"tools": tool_schemas,
"max_retries": max_retries,
"stream": stream,
**kwargs,
}
if use_prompt_caching:
api_kwargs["use_prompt_caching"] = use_prompt_caching
# Call API start hook
if _on_api_start:
await _on_api_start(api_kwargs)
# Make the completion call
response = await litellm.acompletion(**api_kwargs)
# Call API end hook
if _on_api_end:
await _on_api_end(api_kwargs, response)
# Extract usage information
usage = {
**response.usage.model_dump(), # type: ignore
"response_cost": response._hidden_params.get("response_cost", 0.0),
}
if _on_usage:
await _on_usage(usage)
# Step 4: Convert completion messages back to responses items format
response_dict = response.model_dump() # type: ignore
choice_messages = [choice["message"] for choice in response_dict["choices"]]
thinking_output_items = []
for choice_message in choice_messages:
thinking_output_items.extend(
convert_completion_messages_to_responses_items([choice_message])
)
# Step 5: Get all element descriptions and populate desc2xy mapping
element_descriptions = get_all_element_descriptions(thinking_output_items)
if element_descriptions and last_image_b64:
# Use grounding model to predict coordinates for each description
grounding_agent_conf = find_agent_config(grounding_model)
if grounding_agent_conf:
grounding_agent = grounding_agent_conf.agent_class()
for desc in element_descriptions:
for _ in range(3): # try 3 times
coords = await grounding_agent.predict_click(
model=grounding_model, image_b64=last_image_b64, instruction=desc
)
if coords:
self.desc2xy[desc] = coords
break
# Step 6: Convert computer calls from descriptions back to xy coordinates
final_output_items = convert_computer_calls_desc2xy(thinking_output_items, self.desc2xy)
# Step 7: Return output and usage
return {"output": pre_output_items + final_output_items, "usage": usage}
async def predict_click(
self, model: str, image_b64: str, instruction: str, **kwargs
) -> Optional[Tuple[int, int]]:
"""
Predict click coordinates using the grounding model.
For composed models, uses only the grounding model part for click prediction.
"""
# Parse the composed model to get grounding model
if "+" not in model:
raise ValueError(
f"Composed model must be in format 'grounding_model+thinking_model', got: {model}"
)
grounding_model, thinking_model = model.split("+", 1)
# Find and use the grounding agent
grounding_agent_conf = find_agent_config(grounding_model)
if grounding_agent_conf:
grounding_agent = grounding_agent_conf.agent_class()
return await grounding_agent.predict_click(
model=grounding_model, image_b64=image_b64, instruction=instruction, **kwargs
)
return None
def get_capabilities(self) -> List[AgentCapability]:
"""Return the capabilities supported by this agent."""
return ["click", "step"]
```
--------------------------------------------------------------------------------
/.github/workflows/pypi-reusable-publish.yml:
--------------------------------------------------------------------------------
```yaml
name: Reusable Package Publish Workflow
on:
workflow_call:
inputs:
package_name:
description: "Name of the package (e.g. computer, agent)"
required: true
type: string
package_dir:
description: "Directory containing the package relative to workspace root (e.g. libs/python/computer)"
required: true
type: string
version:
description: "Version to publish"
required: true
type: string
is_lume_package:
description: "Whether this package includes the lume binary"
required: false
type: boolean
default: false
base_package_name:
description: "PyPI package name (e.g. cua-agent)"
required: true
type: string
make_latest:
description: "Whether to mark this release as latest (should only be true for lume)"
required: false
type: boolean
default: false
secrets:
PYPI_TOKEN:
required: true
outputs:
version:
description: "The version that was published"
value: ${{ jobs.build-and-publish.outputs.version }}
jobs:
build-and-publish:
runs-on: macos-latest
permissions:
contents: write # This permission is needed for creating releases
outputs:
version: ${{ steps.set-version.outputs.version }}
steps:
- uses: actions/checkout@v4
with:
ref: main
fetch-depth: 0 # Full history for release creation
- name: Ensure latest main branch
run: |
git fetch origin main
git reset --hard origin/main
echo "Current HEAD commit:"
git log -1 --oneline
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Create root pdm.lock file
run: |
# Create an empty pdm.lock file in the root
touch pdm.lock
- name: Install PDM
uses: pdm-project/setup-pdm@v3
with:
python-version: "3.11"
cache: true
- name: Set version
id: set-version
run: |
echo "VERSION=${{ inputs.version }}" >> $GITHUB_ENV
echo "version=${{ inputs.version }}" >> $GITHUB_OUTPUT
- name: Verify version consistency
run: |
# Install toml parser
pip install toml
# Verify version matches using script (exits with error if mismatch)
python ${GITHUB_WORKSPACE}/.github/scripts/get_pyproject_version.py \
${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pyproject.toml \
${{ inputs.version }}
- name: Initialize PDM in package directory
run: |
# Make sure we're working with a properly initialized PDM project
cd ${{ inputs.package_dir }}
# Create pdm.lock if it doesn't exist
if [ ! -f "pdm.lock" ]; then
echo "No pdm.lock found, initializing PDM project..."
pdm lock
fi
# Conditional step for lume binary download (only for pylume package)
- name: Download and setup lume binary
if: inputs.is_lume_package
run: |
# Create a temporary directory for extraction
mkdir -p temp_lume
# Download the latest lume release directly
echo "Downloading latest lume version..."
curl -sL "https://github.com/trycua/lume/releases/latest/download/lume.tar.gz" -o temp_lume/lume.tar.gz
# Extract the tar file (ignore ownership and suppress warnings)
cd temp_lume && tar --no-same-owner -xzf lume.tar.gz
# Make the binary executable
chmod +x lume
# Copy the lume binary to the correct location in the pylume package
mkdir -p "${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pylume"
cp lume "${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pylume/lume"
# Verify the binary exists and is executable
test -x "${GITHUB_WORKSPACE}/${{ inputs.package_dir }}/pylume/lume" || { echo "lume binary not found or not executable"; exit 1; }
# Get the version from the downloaded binary for reference
LUME_VERSION=$(./lume --version | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' || echo "unknown")
echo "Using lume version: $LUME_VERSION"
# Cleanup
cd "${GITHUB_WORKSPACE}" && rm -rf temp_lume
# Save the lume version for reference
echo "LUME_VERSION=${LUME_VERSION}" >> $GITHUB_ENV
- name: Build and publish
env:
PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }}
run: |
cd ${{ inputs.package_dir }}
# Build with PDM
pdm build
# For pylume package, verify the binary is in the wheel
if [ "${{ inputs.is_lume_package }}" = "true" ]; then
python -m pip install wheel
wheel unpack dist/*.whl --dest temp_wheel
echo "Listing contents of wheel directory:"
find temp_wheel -type f
test -f temp_wheel/pylume-*/pylume/lume || { echo "lume binary not found in wheel"; exit 1; }
rm -rf temp_wheel
echo "Publishing ${{ inputs.base_package_name }} ${VERSION} with lume ${LUME_VERSION}"
else
echo "Publishing ${{ inputs.base_package_name }} ${VERSION}"
fi
# Install and use twine directly instead of PDM publish
echo "Installing twine for direct publishing..."
pip install twine
echo "Publishing to PyPI using twine..."
TWINE_USERNAME="__token__" TWINE_PASSWORD="$PYPI_TOKEN" python -m twine upload dist/*
# Save the wheel file path for the release
WHEEL_FILE=$(ls dist/*.whl | head -1)
echo "WHEEL_FILE=${WHEEL_FILE}" >> $GITHUB_ENV
- name: Prepare Simple Release Notes
if: startsWith(github.ref, 'refs/tags/')
run: |
# Create release notes based on package type
echo "# ${{ inputs.base_package_name }} v${VERSION}" > release_notes.md
echo "" >> release_notes.md
if [ "${{ inputs.package_name }}" = "pylume" ]; then
echo "## Python SDK for lume - run macOS and Linux VMs on Apple Silicon" >> release_notes.md
echo "" >> release_notes.md
echo "This package provides Python bindings for the lume virtualization tool." >> release_notes.md
echo "" >> release_notes.md
echo "## Dependencies" >> release_notes.md
echo "* lume binary: v${LUME_VERSION}" >> release_notes.md
elif [ "${{ inputs.package_name }}" = "computer" ]; then
echo "## Computer control library for the Computer Universal Automation (CUA) project" >> release_notes.md
echo "" >> release_notes.md
echo "## Dependencies" >> release_notes.md
echo "* pylume: ${PYLUME_VERSION:-latest}" >> release_notes.md
elif [ "${{ inputs.package_name }}" = "agent" ]; then
echo "## Dependencies" >> release_notes.md
echo "* cua-computer: ${COMPUTER_VERSION:-latest}" >> release_notes.md
echo "* cua-som: ${SOM_VERSION:-latest}" >> release_notes.md
echo "" >> release_notes.md
echo "## Installation Options" >> release_notes.md
echo "" >> release_notes.md
echo "### Basic installation with Anthropic" >> release_notes.md
echo '```bash' >> release_notes.md
echo "pip install cua-agent[anthropic]==${VERSION}" >> release_notes.md
echo '```' >> release_notes.md
echo "" >> release_notes.md
echo "### With SOM (recommended)" >> release_notes.md
echo '```bash' >> release_notes.md
echo "pip install cua-agent[som]==${VERSION}" >> release_notes.md
echo '```' >> release_notes.md
echo "" >> release_notes.md
echo "### All features" >> release_notes.md
echo '```bash' >> release_notes.md
echo "pip install cua-agent[all]==${VERSION}" >> release_notes.md
echo '```' >> release_notes.md
elif [ "${{ inputs.package_name }}" = "som" ]; then
echo "## Computer Vision and OCR library for detecting and analyzing UI elements" >> release_notes.md
echo "" >> release_notes.md
echo "This package provides enhanced UI understanding capabilities through computer vision and OCR." >> release_notes.md
elif [ "${{ inputs.package_name }}" = "computer-server" ]; then
echo "## Computer Server for the Computer Universal Automation (CUA) project" >> release_notes.md
echo "" >> release_notes.md
echo "A FastAPI-based server implementation for computer control." >> release_notes.md
echo "" >> release_notes.md
echo "## Dependencies" >> release_notes.md
echo "* cua-computer: ${COMPUTER_VERSION:-latest}" >> release_notes.md
echo "" >> release_notes.md
echo "## Usage" >> release_notes.md
echo '```bash' >> release_notes.md
echo "# Run the server" >> release_notes.md
echo "cua-computer-server" >> release_notes.md
echo '```' >> release_notes.md
elif [ "${{ inputs.package_name }}" = "mcp-server" ]; then
echo "## MCP Server for the Computer-Use Agent (CUA)" >> release_notes.md
echo "" >> release_notes.md
echo "This package provides MCP (Model Context Protocol) integration for CUA agents, allowing them to be used with Claude Desktop, Cursor, and other MCP clients." >> release_notes.md
echo "" >> release_notes.md
echo "## Dependencies" >> release_notes.md
echo "* cua-computer: ${COMPUTER_VERSION:-latest}" >> release_notes.md
echo "* cua-agent: ${AGENT_VERSION:-latest}" >> release_notes.md
echo "" >> release_notes.md
echo "## Usage" >> release_notes.md
echo '```bash' >> release_notes.md
echo "# Run the MCP server directly" >> release_notes.md
echo "cua-mcp-server" >> release_notes.md
echo '```' >> release_notes.md
echo "" >> release_notes.md
echo "## Claude Desktop Integration" >> release_notes.md
echo "Add to your Claude Desktop configuration (~/.config/claude-desktop/claude_desktop_config.json or OS-specific location):" >> release_notes.md
echo '```json' >> release_notes.md
echo '"mcpServers": {' >> release_notes.md
echo ' "cua-agent": {' >> release_notes.md
echo ' "command": "cua-mcp-server",' >> release_notes.md
echo ' "args": [],' >> release_notes.md
echo ' "env": {' >> release_notes.md
echo ' "CUA_AGENT_LOOP": "OMNI",' >> release_notes.md
echo ' "CUA_MODEL_PROVIDER": "ANTHROPIC",' >> release_notes.md
echo ' "CUA_MODEL_NAME": "claude-3-opus-20240229",' >> release_notes.md
echo ' "ANTHROPIC_API_KEY": "your-api-key",' >> release_notes.md
echo ' "PYTHONIOENCODING": "utf-8"' >> release_notes.md
echo ' }' >> release_notes.md
echo ' }' >> release_notes.md
echo '}' >> release_notes.md
echo '```' >> release_notes.md
fi
# Add installation section if not agent (which has its own installation section)
if [ "${{ inputs.package_name }}" != "agent" ]; then
echo "" >> release_notes.md
echo "## Installation" >> release_notes.md
echo '```bash' >> release_notes.md
echo "pip install ${{ inputs.base_package_name }}==${VERSION}" >> release_notes.md
echo '```' >> release_notes.md
fi
echo "Release notes created:"
cat release_notes.md
- name: Create GitHub Release
uses: softprops/action-gh-release@v2
if: startsWith(github.ref, 'refs/tags/')
with:
name: "${{ inputs.base_package_name }} v${{ env.VERSION }}"
body_path: release_notes.md
files: ${{ inputs.package_dir }}/${{ env.WHEEL_FILE }}
draft: false
prerelease: false
make_latest: ${{ inputs.package_name == 'lume' }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/adapters/human_adapter.py:
--------------------------------------------------------------------------------
```python
import asyncio
import os
from typing import Any, AsyncIterator, Dict, Iterator, List
import requests
from litellm import acompletion, completion
from litellm.llms.custom_llm import CustomLLM
from litellm.types.utils import GenericStreamingChunk, ModelResponse
class HumanAdapter(CustomLLM):
"""Human Adapter for human-in-the-loop completions.
This adapter sends completion requests to a human completion server
where humans can review and respond to AI requests.
"""
def __init__(self, base_url: str | None = None, timeout: float = 300.0, **kwargs):
"""Initialize the human adapter.
Args:
base_url: Base URL for the human completion server.
Defaults to HUMAN_BASE_URL environment variable or http://localhost:8002
timeout: Timeout in seconds for waiting for human response
**kwargs: Additional arguments
"""
super().__init__()
self.base_url = base_url or os.getenv("HUMAN_BASE_URL", "http://localhost:8002")
self.timeout = timeout
# Ensure base_url doesn't end with slash
self.base_url = self.base_url.rstrip("/")
def _queue_completion(self, messages: List[Dict[str, Any]], model: str) -> str:
"""Queue a completion request and return the call ID.
Args:
messages: Messages in OpenAI format
model: Model name
Returns:
Call ID for tracking the request
Raises:
Exception: If queueing fails
"""
try:
response = requests.post(
f"{self.base_url}/queue", json={"messages": messages, "model": model}, timeout=10
)
response.raise_for_status()
return response.json()["id"]
except requests.RequestException as e:
raise Exception(f"Failed to queue completion request: {e}")
def _wait_for_completion(self, call_id: str) -> Dict[str, Any]:
"""Wait for human to complete the call.
Args:
call_id: ID of the queued completion call
Returns:
Dict containing response and/or tool_calls
Raises:
TimeoutError: If timeout is exceeded
Exception: If completion fails
"""
import time
start_time = time.time()
while True:
try:
# Check status
status_response = requests.get(f"{self.base_url}/status/{call_id}")
status_response.raise_for_status()
status_data = status_response.json()
if status_data["status"] == "completed":
result = {}
if "response" in status_data and status_data["response"]:
result["response"] = status_data["response"]
if "tool_calls" in status_data and status_data["tool_calls"]:
result["tool_calls"] = status_data["tool_calls"]
return result
elif status_data["status"] == "failed":
error_msg = status_data.get("error", "Unknown error")
raise Exception(f"Completion failed: {error_msg}")
# Check timeout
if time.time() - start_time > self.timeout:
raise TimeoutError(
f"Timeout waiting for human response after {self.timeout} seconds"
)
# Wait before checking again
time.sleep(1.0)
except requests.RequestException as e:
if time.time() - start_time > self.timeout:
raise TimeoutError(f"Timeout waiting for human response: {e}")
# Continue trying if we haven't timed out
time.sleep(1.0)
async def _async_wait_for_completion(self, call_id: str) -> Dict[str, Any]:
"""Async version of wait_for_completion.
Args:
call_id: ID of the queued completion call
Returns:
Dict containing response and/or tool_calls
Raises:
TimeoutError: If timeout is exceeded
Exception: If completion fails
"""
import time
import aiohttp
start_time = time.time()
async with aiohttp.ClientSession() as session:
while True:
try:
# Check status
async with session.get(f"{self.base_url}/status/{call_id}") as response:
response.raise_for_status()
status_data = await response.json()
if status_data["status"] == "completed":
result = {}
if "response" in status_data and status_data["response"]:
result["response"] = status_data["response"]
if "tool_calls" in status_data and status_data["tool_calls"]:
result["tool_calls"] = status_data["tool_calls"]
return result
elif status_data["status"] == "failed":
error_msg = status_data.get("error", "Unknown error")
raise Exception(f"Completion failed: {error_msg}")
# Check timeout
if time.time() - start_time > self.timeout:
raise TimeoutError(
f"Timeout waiting for human response after {self.timeout} seconds"
)
# Wait before checking again
await asyncio.sleep(1.0)
except Exception as e:
if time.time() - start_time > self.timeout:
raise TimeoutError(f"Timeout waiting for human response: {e}")
# Continue trying if we haven't timed out
await asyncio.sleep(1.0)
def _generate_response(self, messages: List[Dict[str, Any]], model: str) -> Dict[str, Any]:
"""Generate a human response for the given messages.
Args:
messages: Messages in OpenAI format
model: Model name
Returns:
Dict containing response and/or tool_calls
"""
# Queue the completion request
call_id = self._queue_completion(messages, model)
# Wait for human response
response = self._wait_for_completion(call_id)
return response
async def _async_generate_response(
self, messages: List[Dict[str, Any]], model: str
) -> Dict[str, Any]:
"""Async version of _generate_response.
Args:
messages: Messages in OpenAI format
model: Model name
Returns:
Dict containing response and/or tool_calls
"""
# Queue the completion request (sync operation)
call_id = self._queue_completion(messages, model)
# Wait for human response (async)
response = await self._async_wait_for_completion(call_id)
return response
def completion(self, *args, **kwargs) -> ModelResponse:
"""Synchronous completion method.
Returns:
ModelResponse with human-generated text or tool calls
"""
messages = kwargs.get("messages", [])
model = kwargs.get("model", "human")
# Generate human response
human_response_data = self._generate_response(messages, model)
# Create ModelResponse with proper structure
import time
import uuid
from litellm.types.utils import Choices, Message, ModelResponse
# Create message content based on response type
if "tool_calls" in human_response_data and human_response_data["tool_calls"]:
# Tool calls response
message = Message(
role="assistant",
content=human_response_data.get("response", ""),
tool_calls=human_response_data["tool_calls"],
)
else:
# Text response
message = Message(role="assistant", content=human_response_data.get("response", ""))
choice = Choices(finish_reason="stop", index=0, message=message)
result = ModelResponse(
id=f"human-{uuid.uuid4()}",
choices=[choice],
created=int(time.time()),
model=f"human/{model}",
object="chat.completion",
)
return result
async def acompletion(self, *args, **kwargs) -> ModelResponse:
"""Asynchronous completion method.
Returns:
ModelResponse with human-generated text or tool calls
"""
messages = kwargs.get("messages", [])
model = kwargs.get("model", "human")
# Generate human response
human_response_data = await self._async_generate_response(messages, model)
# Create ModelResponse with proper structure
import time
import uuid
from litellm.types.utils import Choices, Message, ModelResponse
# Create message content based on response type
if "tool_calls" in human_response_data and human_response_data["tool_calls"]:
# Tool calls response
message = Message(
role="assistant",
content=human_response_data.get("response", ""),
tool_calls=human_response_data["tool_calls"],
)
else:
# Text response
message = Message(role="assistant", content=human_response_data.get("response", ""))
choice = Choices(finish_reason="stop", index=0, message=message)
result = ModelResponse(
id=f"human-{uuid.uuid4()}",
choices=[choice],
created=int(time.time()),
model=f"human/{model}",
object="chat.completion",
)
return result
def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]:
"""Synchronous streaming method.
Yields:
Streaming chunks with human-generated text or tool calls
"""
messages = kwargs.get("messages", [])
model = kwargs.get("model", "human")
# Generate human response
human_response_data = self._generate_response(messages, model)
import time
# Handle tool calls vs text response
if "tool_calls" in human_response_data and human_response_data["tool_calls"]:
# Stream tool calls as a single chunk
generic_chunk: GenericStreamingChunk = {
"finish_reason": "tool_calls",
"index": 0,
"is_finished": True,
"text": human_response_data.get("response", ""),
"tool_use": human_response_data["tool_calls"],
"usage": {"completion_tokens": 1, "prompt_tokens": 0, "total_tokens": 1},
}
yield generic_chunk
else:
# Stream text response
response_text = human_response_data.get("response", "")
generic_chunk: GenericStreamingChunk = {
"finish_reason": "stop",
"index": 0,
"is_finished": True,
"text": response_text,
"tool_use": None,
"usage": {
"completion_tokens": len(response_text.split()),
"prompt_tokens": 0,
"total_tokens": len(response_text.split()),
},
}
yield generic_chunk
async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]:
"""Asynchronous streaming method.
Yields:
Streaming chunks with human-generated text or tool calls
"""
messages = kwargs.get("messages", [])
model = kwargs.get("model", "human")
# Generate human response
human_response = await self._async_generate_response(messages, model)
# Return as single streaming chunk
generic_streaming_chunk: GenericStreamingChunk = {
"finish_reason": "stop",
"index": 0,
"is_finished": True,
"text": human_response,
"tool_use": None,
"usage": {
"completion_tokens": len(human_response.split()),
"prompt_tokens": 0,
"total_tokens": len(human_response.split()),
},
}
yield generic_streaming_chunk
```
--------------------------------------------------------------------------------
/examples/tracing_examples.py:
--------------------------------------------------------------------------------
```python
"""
Examples demonstrating the Computer.tracing API for recording sessions.
This module shows various use cases for the new Computer.tracing functionality,
including training data collection, debugging, and compliance recording.
"""
import asyncio
import logging
from pathlib import Path
from agent import ComputerAgent
from computer import Computer
async def basic_tracing_example():
"""
Basic example showing how to use Computer.tracing for recording a simple session.
"""
print("=== Basic Tracing Example ===")
# Initialize computer
computer = Computer(os_type="macos", provider_type="lume")
await computer.run()
try:
# Start tracing with basic configuration
await computer.tracing.start(
{"screenshots": True, "api_calls": True, "metadata": True, "name": "basic_session"}
)
print("Tracing started...")
# Perform some computer operations
await computer.interface.move_cursor(100, 100)
await computer.interface.left_click()
await computer.interface.type_text("Hello, tracing!")
await computer.interface.press_key("enter")
# Add custom metadata
await computer.tracing.add_metadata("session_type", "basic_demo")
await computer.tracing.add_metadata("user_notes", "Testing basic functionality")
# Stop tracing and save
trace_path = await computer.tracing.stop({"format": "zip"})
print(f"Trace saved to: {trace_path}")
finally:
await computer.stop()
async def agent_tracing_example():
"""
Example showing how to use tracing with ComputerAgent for enhanced session recording.
"""
print("=== Agent with Tracing Example ===")
# Initialize computer and agent
computer = Computer(os_type="macos", provider_type="lume")
await computer.run()
try:
# Start comprehensive tracing
await computer.tracing.start(
{
"screenshots": True,
"api_calls": True,
"accessibility_tree": True, # Include accessibility data for training
"metadata": True,
"name": "agent_session",
}
)
# Create agent
agent = ComputerAgent(
model="openai/computer-use-preview", tools=[computer], verbosity=logging.INFO
)
# Add metadata about the agent session
await computer.tracing.add_metadata("agent_model", "openai/computer-use-preview")
await computer.tracing.add_metadata("task_type", "web_search")
# Run agent task
async for message in agent.run(
"Open a web browser and search for 'computer use automation'"
):
print(f"Agent: {message}")
# Stop tracing
trace_path = await computer.tracing.stop({"format": "zip"})
print(f"Agent trace saved to: {trace_path}")
finally:
await computer.stop()
async def custom_agent_tracing_example():
"""
Example showing tracing with custom agent implementations.
"""
print("=== Custom Agent Tracing Example ===")
computer = Computer(os_type="macos", provider_type="lume")
await computer.run()
try:
# Start tracing with custom path
trace_dir = Path.cwd() / "custom_traces" / "my_agent_session"
await computer.tracing.start(
{
"screenshots": True,
"api_calls": True,
"accessibility_tree": False,
"metadata": True,
"path": str(trace_dir),
}
)
# Custom agent logic using direct computer calls
await computer.tracing.add_metadata("session_type", "custom_agent")
await computer.tracing.add_metadata("purpose", "RPA_workflow")
# Take initial screenshot
screenshot = await computer.interface.screenshot()
# Simulate RPA workflow
await computer.interface.move_cursor(500, 300)
await computer.interface.left_click()
await computer.interface.type_text("automation workflow test")
# Add workflow checkpoint
await computer.tracing.add_metadata("checkpoint", "text_input_complete")
await computer.interface.hotkey("command", "a") # Select all
await computer.interface.hotkey("command", "c") # Copy
# Stop tracing and save as directory
trace_path = await computer.tracing.stop({"format": "dir"})
print(f"Custom agent trace saved to: {trace_path}")
finally:
await computer.stop()
async def training_data_collection_example():
"""
Example for collecting training data with rich context.
"""
print("=== Training Data Collection Example ===")
computer = Computer(os_type="macos", provider_type="lume")
await computer.run()
try:
# Start tracing optimized for training data
await computer.tracing.start(
{
"screenshots": True, # Essential for visual training
"api_calls": True, # Capture action sequences
"accessibility_tree": True, # Rich semantic context
"metadata": True, # Custom annotations
"name": "training_session",
}
)
# Add training metadata
await computer.tracing.add_metadata("data_type", "training")
await computer.tracing.add_metadata("task_category", "ui_automation")
await computer.tracing.add_metadata("difficulty", "intermediate")
await computer.tracing.add_metadata("annotator", "human_expert")
# Simulate human demonstration
await computer.interface.screenshot() # Baseline screenshot
# Step 1: Navigate to application
await computer.tracing.add_metadata("step", "1_navigate_to_app")
await computer.interface.move_cursor(100, 50)
await computer.interface.left_click()
# Step 2: Input data
await computer.tracing.add_metadata("step", "2_input_data")
await computer.interface.type_text("training example data")
# Step 3: Process
await computer.tracing.add_metadata("step", "3_process")
await computer.interface.press_key("tab")
await computer.interface.press_key("enter")
# Final metadata
await computer.tracing.add_metadata("success", True)
await computer.tracing.add_metadata("completion_time", "45_seconds")
trace_path = await computer.tracing.stop()
print(f"Training data collected: {trace_path}")
finally:
await computer.stop()
async def debugging_session_example():
"""
Example for debugging agent behavior with detailed tracing.
"""
print("=== Debugging Session Example ===")
computer = Computer(os_type="macos", provider_type="lume")
await computer.run()
try:
# Start tracing for debugging
await computer.tracing.start(
{
"screenshots": True,
"api_calls": True,
"accessibility_tree": True,
"metadata": True,
"name": "debug_session",
}
)
# Debug metadata
await computer.tracing.add_metadata("session_type", "debugging")
await computer.tracing.add_metadata("issue", "click_target_detection")
await computer.tracing.add_metadata("expected_behavior", "click_on_button")
try:
# Problematic sequence that needs debugging
await computer.interface.move_cursor(200, 150)
await computer.interface.left_click()
# This might fail - let's trace it
await computer.interface.type_text("debug test")
await computer.tracing.add_metadata("action_result", "successful_typing")
except Exception as e:
# Record the error in tracing
await computer.tracing.add_metadata("error_encountered", str(e))
await computer.tracing.add_metadata("error_type", type(e).__name__)
print(f"Error occurred: {e}")
# Stop tracing
trace_path = await computer.tracing.stop()
print(f"Debug trace saved: {trace_path}")
print("Use this trace to analyze the failure and improve the agent")
finally:
await computer.stop()
async def human_in_the_loop_example():
"""
Example for recording mixed human/agent sessions.
"""
print("=== Human-in-the-Loop Example ===")
computer = Computer(os_type="macos", provider_type="lume")
await computer.run()
try:
# Start tracing for hybrid session
await computer.tracing.start(
{
"screenshots": True,
"api_calls": True,
"metadata": True,
"name": "human_agent_collaboration",
}
)
# Initial agent phase
await computer.tracing.add_metadata("phase", "agent_autonomous")
await computer.tracing.add_metadata("agent_model", "computer-use-preview")
# Agent performs initial task
await computer.interface.move_cursor(300, 200)
await computer.interface.left_click()
await computer.interface.type_text("automated input")
# Transition to human intervention
await computer.tracing.add_metadata("phase", "human_intervention")
await computer.tracing.add_metadata("intervention_reason", "complex_ui_element")
print("Human intervention phase - manual actions will be recorded...")
# At this point, human can take control while tracing continues
# Simulate human input (in practice, this would be actual human interaction)
await computer.interface.move_cursor(500, 400)
await computer.interface.double_click()
await computer.tracing.add_metadata("human_action", "double_click_complex_element")
# Back to agent
await computer.tracing.add_metadata("phase", "agent_completion")
await computer.interface.press_key("enter")
trace_path = await computer.tracing.stop()
print(f"Human-agent collaboration trace saved: {trace_path}")
finally:
await computer.stop()
async def performance_monitoring_example():
"""
Example for performance monitoring and analysis.
"""
print("=== Performance Monitoring Example ===")
computer = Computer(os_type="macos", provider_type="lume")
await computer.run()
try:
# Start tracing for performance analysis
await computer.tracing.start(
{
"screenshots": False, # Skip screenshots for performance
"api_calls": True,
"metadata": True,
"name": "performance_test",
}
)
# Performance test metadata
await computer.tracing.add_metadata("test_type", "performance_benchmark")
await computer.tracing.add_metadata("expected_duration", "< 30 seconds")
import time
start_time = time.time()
# Perform a series of rapid actions
for i in range(10):
await computer.tracing.add_metadata("iteration", i)
await computer.interface.move_cursor(100 + i * 50, 100)
await computer.interface.left_click()
await computer.interface.type_text(f"Test {i}")
await computer.interface.press_key("tab")
end_time = time.time()
# Record performance metrics
await computer.tracing.add_metadata(
"actual_duration", f"{end_time - start_time:.2f} seconds"
)
await computer.tracing.add_metadata(
"actions_per_second", f"{40 / (end_time - start_time):.2f}"
)
trace_path = await computer.tracing.stop()
print(f"Performance trace saved: {trace_path}")
finally:
await computer.stop()
async def main():
"""
Run all tracing examples.
"""
print("Computer.tracing API Examples")
print("=" * 50)
examples = [
basic_tracing_example,
agent_tracing_example,
custom_agent_tracing_example,
training_data_collection_example,
debugging_session_example,
human_in_the_loop_example,
performance_monitoring_example,
]
for example in examples:
try:
await example()
print()
except Exception as e:
print(f"Error in {example.__name__}: {e}")
print()
print("All examples completed!")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/.github/scripts/tests/test_get_pyproject_version.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive tests for get_pyproject_version.py script using unittest.
This test suite covers:
- Version matching validation
- Error handling for missing versions
- Invalid input handling
- File not found scenarios
- Malformed TOML handling
"""
import sys
import tempfile
import unittest
from io import StringIO
from pathlib import Path
from unittest.mock import patch
# Add parent directory to path to import the module
sys.path.insert(0, str(Path(__file__).parent.parent))
# Import after path is modified
import get_pyproject_version
class TestGetPyprojectVersion(unittest.TestCase):
"""Test suite for get_pyproject_version.py functionality."""
def setUp(self):
"""Reset sys.argv before each test."""
self.original_argv = sys.argv.copy()
def tearDown(self):
"""Restore sys.argv after each test."""
sys.argv = self.original_argv
def create_pyproject_toml(self, version: str) -> Path:
"""Helper to create a temporary pyproject.toml file with a given version."""
temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False)
temp_file.write(
f"""
[project]
name = "test-project"
version = "{version}"
description = "A test project"
"""
)
temp_file.close()
return Path(temp_file.name)
def create_pyproject_toml_no_version(self) -> Path:
"""Helper to create a pyproject.toml without a version field."""
temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False)
temp_file.write(
"""
[project]
name = "test-project"
description = "A test project without version"
"""
)
temp_file.close()
return Path(temp_file.name)
def create_pyproject_toml_no_project(self) -> Path:
"""Helper to create a pyproject.toml without a project section."""
temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False)
temp_file.write(
"""
[tool.poetry]
name = "test-project"
version = "1.0.0"
"""
)
temp_file.close()
return Path(temp_file.name)
def create_malformed_toml(self) -> Path:
"""Helper to create a malformed TOML file."""
temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False)
temp_file.write(
"""
[project
name = "test-project
version = "1.0.0"
"""
)
temp_file.close()
return Path(temp_file.name)
# Test: Successful version match
def test_matching_versions(self):
"""Test that matching versions result in success."""
pyproject_file = self.create_pyproject_toml("1.2.3")
try:
sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.2.3"]
# Capture stdout
captured_output = StringIO()
with patch("sys.stdout", captured_output):
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 0)
self.assertIn("✅ Version consistency check passed: 1.2.3", captured_output.getvalue())
finally:
pyproject_file.unlink()
# Test: Version mismatch
def test_version_mismatch(self):
"""Test that mismatched versions result in failure with appropriate error message."""
pyproject_file = self.create_pyproject_toml("1.2.3")
try:
sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.2.4"]
# Capture stderr
captured_error = StringIO()
with patch("sys.stderr", captured_error):
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 1)
error_output = captured_error.getvalue()
self.assertIn("❌ Version mismatch detected!", error_output)
self.assertIn("pyproject.toml version: 1.2.3", error_output)
self.assertIn("Expected version: 1.2.4", error_output)
self.assertIn("Please update pyproject.toml to version 1.2.4", error_output)
finally:
pyproject_file.unlink()
# Test: Missing version in pyproject.toml
def test_missing_version_field(self):
"""Test handling of pyproject.toml without a version field."""
pyproject_file = self.create_pyproject_toml_no_version()
try:
sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.0.0"]
captured_error = StringIO()
with patch("sys.stderr", captured_error):
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 1)
self.assertIn("❌ ERROR: No version found in pyproject.toml", captured_error.getvalue())
finally:
pyproject_file.unlink()
# Test: Missing project section
def test_missing_project_section(self):
"""Test handling of pyproject.toml without a project section."""
pyproject_file = self.create_pyproject_toml_no_project()
try:
sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.0.0"]
captured_error = StringIO()
with patch("sys.stderr", captured_error):
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 1)
self.assertIn("❌ ERROR: No version found in pyproject.toml", captured_error.getvalue())
finally:
pyproject_file.unlink()
# Test: File not found
def test_file_not_found(self):
"""Test handling of non-existent pyproject.toml file."""
sys.argv = ["get_pyproject_version.py", "/nonexistent/pyproject.toml", "1.0.0"]
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 1)
# Test: Malformed TOML
def test_malformed_toml(self):
"""Test handling of malformed TOML file."""
pyproject_file = self.create_malformed_toml()
try:
sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.0.0"]
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 1)
finally:
pyproject_file.unlink()
# Test: Incorrect number of arguments - too few
def test_too_few_arguments(self):
"""Test that providing too few arguments results in usage error."""
sys.argv = ["get_pyproject_version.py", "pyproject.toml"]
captured_error = StringIO()
with patch("sys.stderr", captured_error):
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 1)
self.assertIn(
"Usage: python get_pyproject_version.py <pyproject_path> <expected_version>",
captured_error.getvalue(),
)
# Test: Incorrect number of arguments - too many
def test_too_many_arguments(self):
"""Test that providing too many arguments results in usage error."""
sys.argv = ["get_pyproject_version.py", "pyproject.toml", "1.0.0", "extra"]
captured_error = StringIO()
with patch("sys.stderr", captured_error):
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 1)
self.assertIn(
"Usage: python get_pyproject_version.py <pyproject_path> <expected_version>",
captured_error.getvalue(),
)
# Test: No arguments
def test_no_arguments(self):
"""Test that providing no arguments results in usage error."""
sys.argv = ["get_pyproject_version.py"]
captured_error = StringIO()
with patch("sys.stderr", captured_error):
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 1)
self.assertIn(
"Usage: python get_pyproject_version.py <pyproject_path> <expected_version>",
captured_error.getvalue(),
)
# Test: Version with pre-release tags
def test_version_with_prerelease_tags(self):
"""Test matching versions with pre-release tags like alpha, beta, rc."""
pyproject_file = self.create_pyproject_toml("1.2.3-rc.1")
try:
sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.2.3-rc.1"]
captured_output = StringIO()
with patch("sys.stdout", captured_output):
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 0)
self.assertIn(
"✅ Version consistency check passed: 1.2.3-rc.1", captured_output.getvalue()
)
finally:
pyproject_file.unlink()
# Test: Version with build metadata
def test_version_with_build_metadata(self):
"""Test matching versions with build metadata."""
pyproject_file = self.create_pyproject_toml("1.2.3+build.123")
try:
sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.2.3+build.123"]
captured_output = StringIO()
with patch("sys.stdout", captured_output):
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 0)
self.assertIn(
"✅ Version consistency check passed: 1.2.3+build.123", captured_output.getvalue()
)
finally:
pyproject_file.unlink()
# Test: Various semantic version formats
def test_semantic_version_0_0_1(self):
"""Test semantic version 0.0.1."""
self._test_version_format("0.0.1")
def test_semantic_version_1_0_0(self):
"""Test semantic version 1.0.0."""
self._test_version_format("1.0.0")
def test_semantic_version_10_20_30(self):
"""Test semantic version 10.20.30."""
self._test_version_format("10.20.30")
def test_semantic_version_alpha(self):
"""Test semantic version with alpha tag."""
self._test_version_format("1.2.3-alpha")
def test_semantic_version_beta(self):
"""Test semantic version with beta tag."""
self._test_version_format("1.2.3-beta.1")
def test_semantic_version_rc_with_build(self):
"""Test semantic version with rc and build metadata."""
self._test_version_format("1.2.3-rc.1+build.456")
def _test_version_format(self, version: str):
"""Helper method to test various semantic version formats."""
pyproject_file = self.create_pyproject_toml(version)
try:
sys.argv = ["get_pyproject_version.py", str(pyproject_file), version]
captured_output = StringIO()
with patch("sys.stdout", captured_output):
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 0)
self.assertIn(
f"✅ Version consistency check passed: {version}", captured_output.getvalue()
)
finally:
pyproject_file.unlink()
# Test: Empty version string
def test_empty_version_string(self):
"""Test handling of empty version string."""
pyproject_file = self.create_pyproject_toml("")
try:
sys.argv = ["get_pyproject_version.py", str(pyproject_file), "1.0.0"]
captured_error = StringIO()
with patch("sys.stderr", captured_error):
with self.assertRaises(SystemExit) as cm:
get_pyproject_version.main()
self.assertEqual(cm.exception.code, 1)
# Empty string is falsy, so it should trigger error
self.assertIn("❌", captured_error.getvalue())
finally:
pyproject_file.unlink()
class TestSuiteInfo(unittest.TestCase):
"""Test suite metadata."""
def test_suite_info(self):
"""Display test suite information."""
print("\n" + "=" * 70)
print("Test Suite: get_pyproject_version.py")
print("Framework: unittest (Python built-in)")
print("TOML Library: tomllib (Python 3.11+ built-in)")
print("=" * 70)
self.assertTrue(True)
if __name__ == "__main__":
# Run tests with verbose output
unittest.main(verbosity=2)
```
--------------------------------------------------------------------------------
/Development.md:
--------------------------------------------------------------------------------
```markdown
# Getting Started
## Project Structure
The project is organized as a monorepo with these main packages:
- `libs/core/` - Base package with telemetry support
- `libs/computer/` - Computer-use interface (CUI) library
- `libs/agent/` - AI agent library with multi-provider support
- `libs/som/` - Set-of-Mark parser
- `libs/computer-server/` - Server component for VM
- `libs/lume/` - Lume CLI
These packages are part of a uv workspace which manages a shared virtual environment and dependencies.
## Local Development Setup
1. Install Lume CLI:
```bash
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/trycua/cua/main/libs/lume/scripts/install.sh)"
```
2. Clone the repository:
```bash
git clone https://github.com/trycua/cua.git
cd cua
```
3. Create a `.env.local` file in the root directory with your API keys:
```bash
# Required for Anthropic provider
ANTHROPIC_API_KEY=your_anthropic_key_here
# Required for OpenAI provider
OPENAI_API_KEY=your_openai_key_here
```
4. Install Node.js dependencies for Prettier and other scripts:
```bash
# Install pnpm if you don't have it
npm install -g pnpm
# Install all JS/TS dependencies
pnpm install
```
5. Install Python dependencies and workspace packages:
```bash
# First install uv if you don't have it
pip install uv
# Then install all Python dependencies
uv sync
```
6. Open the workspace in VSCode or Cursor:
```bash
# For Cua Python development
code .vscode/py.code-workspace
# For Lume (Swift) development
code .vscode/lume.code-workspace
```
7. Install Pre-commit hooks:
This ensures code formatting and validation run automatically on each commit.
```bash
uv run pre-commit install
```
Using the workspace file is strongly recommended as it:
- Sets up correct Python environments for each package
- Configures proper import paths
- Enables debugging configurations
- Maintains consistent settings across packages
## Lume Development
Refer to the [Lume README](./libs/lume/Development.md) for instructions on how to develop the Lume CLI.
## Python Development
### Setup
Install all of workspace dependencies with a single command:
```bash
uv sync
```
This installs all dependencies in the virtual environment `.venv`.
Each Cua package is installed in editable mode, which means changes to the source code are immediately reflected in the installed package.
The `.venv` environment is also configured as the default VS Code Python interpreter in `.vscode/settings.json`.
### Running Python Scripts
To run Python scripts in the workspace, use the `uv run` command:
```bash
uv run python examples/agent_examples.py
```
Or activate the virtual environment manually:
```bash
source .venv/bin/activate
python examples/agent_examples.py
```
## Running Examples
The Python workspace includes launch configurations for all packages:
- "Run Computer Examples" - Runs computer examples
- "Run Agent Examples" - Runs agent examples
- "SOM" configurations - Various settings for running SOM
To run examples from VSCode / Cursor:
1. Press F5 or use the Run/Debug view
2. Select the desired configuration
The workspace also includes compound launch configurations:
- "Run Computer Examples + Server" - Runs both the Computer Examples and Server simultaneously
## Code Formatting Standards
The Cua project follows strict code formatting standards to ensure consistency across all packages.
### Python Code Formatting
#### Tools
The project uses the following tools for code formatting and linting:
- **[Black](https://black.readthedocs.io/)**: Code formatter
- **[isort](https://pycqa.github.io/isort/)**: Import sorter
- **[Ruff](https://beta.ruff.rs/docs/)**: Fast linter and formatter
- **[MyPy](https://mypy.readthedocs.io/)**: Static type checker
These tools are automatically installed when you set up the development environment.
#### Configuration
The formatting configuration is defined in the root `pyproject.toml` file:
```toml
[tool.black]
line-length = 100
target-version = ["py311"]
[tool.ruff]
fix = true
line-length = 100
target-version = "py311"
[tool.ruff.lint]
select = ["E", "F", "B", "I"]
ignore = [
"E501", "E402", "I001", "I002", "B007", "B023", "B024", "B027", "B028",
"B904", "B905", "E711", "E712", "E722", "E731", "F401", "F403", "F405",
"F811", "F821", "F841"
]
fix = true
[tool.ruff.format]
docstring-code-format = true
[tool.mypy]
check_untyped_defs = true
disallow_untyped_defs = true
ignore_missing_imports = true
python_version = "3.11"
show_error_codes = true
strict = true
warn_return_any = true
warn_unused_ignores = false
[tool.isort]
profile = "black"
```
#### Key Formatting Rules
- **Line Length**: Maximum of 100 characters
- **Python Version**: Code should be compatible with Python 3.11+
- **Imports**: Automatically sorted (using Ruff's "I" rule)
- **Type Hints**: Required for all function definitions (strict mypy mode)
#### IDE Integration
The repository includes VSCode workspace configurations that enable automatic formatting. When you open the workspace files (as recommended in the setup instructions), the correct formatting settings are automatically applied.
##### Python-specific settings
These are configured in `.vscode/settings.json`:
```json
{
"python.defaultInterpreterPath": "${workspaceFolder}/.venv/bin/python",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit",
"source.fixAll": "explicit"
},
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
},
"python.formatting.provider": "black",
"ruff.configuration": "${workspaceFolder}/pyproject.toml",
"mypy-type-checker.args": ["--config-file", "${workspaceFolder}/pyproject.toml"],
"mypy-type-checker.path": ["${workspaceFolder}"]
}
```
##### **JS/TS-specific settings**
```json
"[javascript][typescript][typescriptreact][javascriptreact]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
}
```
- Ensures Prettier is used for all JS/TS files for consistent formatting.
Recommended VS Code Extensions
- **Black Formatter** – `ms-python.black-formatter`
- **Ruff** – `charliermarsh.ruff`
- **Pylance** – `ms-python.vscode-pylance`
- **isort** – `ms-python.isort`
- **Prettier** – `esbenp.prettier-vscode`
- **Mypy Type Checker** – `ms-python.mypy-type-checker`
> VSCode will automatically suggest installing the recommended extensions when you open the workspace.
#### Manual Formatting
To manually format code:
```bash
# Format all Python files using Black
uv run black .
# Sort imports using isort
uv run isort .
# Run Ruff linter with auto-fix
uv run ruff check .
# Run type checking with MyPy
uv run mypy .
```
#### Pre-commit Validation
Before submitting a pull request, ensure your code passes all formatting checks:
**Option 1: Run all hooks via pre-commit (all in a single command)**
```bash
# Run hooks on staged files (recommended for quick checks)
uv run pre-commit run
```
- Automatically runs Black, Ruff, isort, Mypy, Prettier, and any other configured hooks.
**Option 2: Run individual tools manually**
```bash
# Python checks
uv run black --check .
uv run isort --check .
uv run ruff check .
uv run mypy .
# JavaScript/TypeScript checks
uv run prettier --check "**/*.{ts,tsx,js,jsx,json,md,yaml,yml}"
# TypeScript typecheck
node ./scripts/typescript-typecheck.js
```
### JavaScript / TypeScript Formatting (Prettier)
The project uses **Prettier** to ensure consistent formatting across all JS/TS/JSON/Markdown/YAML files.
#### Installation
All Node.js dependencies are managed via `pnpm`. Make sure you have run:
```bash
# Install pnpm if you don't have it
npm install -g pnpm
# Install project dependencies
pnpm install
```
This installs Prettier and other JS/TS dependencies defined in `package.json`.
#### Usage
- **Check formatting** (without making changes):
```bash
pnpm prettier:check
```
- **Automatically format files**:
```bash
pnpm prettier:format
```
#### Type Checking (TypeScript)
- Run the TypeScript type checker:
```bash
node ./scripts/typescript-typecheck.js
```
#### VSCode Integration
- The workspace config ensures Prettier is used automatically for JS/TS/JSON/Markdown/YAML files.
- Recommended extension: Prettier – Code Formatter
- Ensure `editor.formatOnSave` is enabled in VSCode for automatic formatting.
### Swift Code (Lume)
For Swift code in the `libs/lume` directory:
- Follow the [Swift API Design Guidelines](https://www.swift.org/documentation/api-design-guidelines/)
- Use SwiftFormat for consistent formatting
- Code will be automatically formatted on save when using the lume workspace
## Releasing Packages
Cua uses an automated GitHub Actions workflow to bump package versions.
> **Note:** The main branch is currently not protected. If branch protection is enabled in the future, the github-actions bot must be added to the bypass list for these workflows to commit directly.
### Version Bump Workflow
All packages are managed through a single consolidated workflow: [Bump Version](https://github.com/trycua/cua/actions/workflows/bump-version.yml)
**Supported packages:**
- cua-agent
- cua-computer
- cua-computer-server
- cua-core
- cua-mcp-server
- cua-som
- pylume
**How to use:**
1. Navigate to the [Bump Version workflow](https://github.com/trycua/cua/actions/workflows/bump-version.yml)
2. Click the "Run workflow" button in the GitHub UI
3. Select the **service/package** you want to bump from the first dropdown
4. Select the **bump type** (patch/minor/major) from the second dropdown
5. Click "Run workflow" to start the version bump
6. The workflow will automatically commit changes and push to main
## Releasing a New CLI Version
To release a new version of the CUA CLI, follow these steps:
### 1. Update the Version
1. Update the version in `libs/typescript/cua-cli/package.json`
2. Commit the version change with a message like "Bump version to x.y.z"
3. Push the changes to the main branch
### 2. Trigger the Release Workflow
1. Go to the GitHub Actions tab in the repository
2. Select the "Publish @trycua/cli" workflow
3. Click "Run workflow"
4. Optionally, specify a version (e.g., "1.2.3") or leave empty to use the version from package.json
5. Click "Run workflow"
The workflow will:
- Build single-file executables for all supported platforms
- Publish the package to npm
- Create a GitHub release with the version tag (format: `cua-vX.Y.Z`)
- Attach all platform-specific binaries to the release
### 3. Verify the Release
1. Check the GitHub Releases page to ensure the new version is published
2. Verify the npm package was published to the registry
3. Test installation on different platforms:
```bash
# Test Linux/macOS installation
curl -fsSL https://cua.ai/install.sh | sh
# Test Windows installation (PowerShell)
irm https://cua.ai/install.ps1 | iex
```
### 4. Update Documentation
Update any relevant documentation with the new version number, including:
- Example code in documentation
- Any version-specific instructions
- Compatibility matrices
### 5. Announce the Release
- Create a new GitHub release with release notes
- Update the changelog if maintained separately
- Announce in relevant channels (Slack, Discord, etc.)
---
### Rolling Back a Version Bump
If you need to revert a version bump, follow these steps:
**Step 1: Find the version bump commit**
```bash
# List recent commits
git log --oneline | grep "Bump"
# Example output:
# a1b2c3d Bump cua-core to v0.1.9
```
**Step 2: Revert the commit**
```bash
# Revert the specific commit
git revert <commit-hash>
# Example:
# git revert a1b2c3d
```
**Step 3: Delete the git tag**
```bash
# List tags to find the version tag
git tag -l
# Delete the tag locally (use the correct package-specific format)
git tag -d core-v0.1.9
# Delete the tag remotely
git push origin :refs/tags/core-v0.1.9
```
**Step 4: Push the revert**
```bash
git push origin main
```
**Per-package tag patterns:**
Each package uses its own tag format defined in `.bumpversion.cfg`:
- **cua-core**: `core-v{version}` (e.g., `core-v0.1.9`)
- **cua-computer**: `computer-v{version}` (e.g., `computer-v0.4.7`)
- **cua-agent**: `agent-v{version}` (e.g., `agent-v0.4.35`)
- **cua-som**: `som-v{version}` (e.g., `som-v0.1.3`)
- **pylume**: `pylume-v{version}` (e.g., `pylume-v0.2.1`)
- **cua-computer-server**: `computer-server-v{version}` (e.g., `computer-server-v0.1.27`)
- **cua-mcp-server**: `mcp-server-v{version}` (e.g., `mcp-server-v0.1.14`)
### Local Testing (Advanced)
The Makefile targets are kept for local testing only:
```bash
# Test version bump locally (dry run)
make dry-run-patch-core
# View current versions
make show-versions
```
**Note:** For production releases, always use the GitHub Actions workflows above instead of running Makefile commands directly.
```
--------------------------------------------------------------------------------
/blog/bringing-computer-use-to-the-web.md:
--------------------------------------------------------------------------------
```markdown
# Bringing Computer-Use to the Web
_Published on August 5, 2025 by Morgan Dean_
In one of our original posts, we explored building Computer-Use Operators on macOS - first with a [manual implementation](build-your-own-operator-on-macos-1.md) using OpenAI's `computer-use-preview` model, then with our [cua-agent framework](build-your-own-operator-on-macos-2.md) for Python developers. While these tutorials have been incredibly popular, we've received consistent feedback from our community: **"Can we use Cua with JavaScript and TypeScript?"**
Today, we're excited to announce the release of the **`@trycua/computer` Web SDK** - a new library that allows you to control your Cua cloud containers from any JavaScript or TypeScript project. With this library, you can click, type, and grab screenshots from your cloud containers - no extra servers required.
With this new SDK, you can easily develop CUA experiences like the one below, which we will release soon as open source.
<div align="center">
<video src="https://github.com/user-attachments/assets/e213d6c3-73b6-48dd-a7d9-ed761ed74f89" width="600" controls></video>
</div>
Let’s see how it works.
## What You'll Learn
By the end of this tutorial, you'll be able to:
- Set up the `@trycua/computer` npm library in any JavaScript/TypeScript project
- Connect OpenAI's computer-use model to Cua cloud containers from web applications
- Build computer-use agents that work in Node.js, React, Vue, or any web framework
- Handle different types of computer actions (clicking, typing, scrolling) from web code
- Implement the complete computer-use loop in JavaScript/TypeScript
- Integrate AI automation into existing web applications and workflows
**Prerequisites:**
- Node.js 16+ and npm/yarn/pnpm
- Basic JavaScript or TypeScript knowledge
- OpenAI API access (Tier 3+ for computer-use-preview)
- Cua cloud container credits ([get started here](https://cua.ai/pricing))
**Estimated Time:** 45-60 minutes
## Access Requirements
### OpenAI Model Availability
At the time of writing, the **computer-use-preview** model has limited availability:
- Only accessible to OpenAI tier 3+ users
- Additional application process may be required even for eligible users
- Cannot be used in the OpenAI Playground
- Outside of ChatGPT Operator, usage is restricted to the new **Responses API**
Luckily, the `@trycua/computer` library can be used in conjunction with other models, like [Anthropic’s Computer Use](https://docs.anthropic.com/en/docs/agents-and-tools/tool-use/computer-use-tool) or [UI-TARS](https://huggingface.co/ByteDance-Seed/UI-TARS-1.5-7B). You’ll just have to write your own handler to parse the model output for interfacing with the container.
### Cua Cloud Containers
To follow this guide, you’ll need access to a Cua cloud container.
Getting access is simple: purchase credits from our [pricing page](https://cua.ai/pricing), then create and provision a new container instance from the [dashboard](https://cua.ai/dashboard/containers). With your container running, you'll be ready to leverage the web SDK and bring automation to your JavaScript or TypeScript applications.
## Understanding the Flow
### OpenAI API Overview
Let's start with the basics. In our case, we'll use OpenAI's API to communicate with their computer-use model.
Think of it like this:
1. We send the model a screenshot of our container and tell it what we want it to do
2. The model looks at the screenshot and decides what actions to take
3. It sends back instructions (like "click here" or "type this")
4. We execute those instructions in our container.
### Model Setup
Here's how we set up the computer-use model for web development:
```javascript
const res = await openai.responses.create({
model: 'computer-use-preview',
tools: [
{
type: 'computer_use_preview',
display_width: 1024,
display_height: 768,
environment: 'linux', // we're using a linux container
},
],
input: [
{
role: 'user',
content: [
// what we want the ai to do
{ type: 'input_text', text: 'Open firefox and go to cua.ai' },
// first screenshot of the vm
{
type: 'input_image',
image_url: `data:image/png;base64,${screenshotBase64}`,
detail: 'auto',
},
],
},
],
truncation: 'auto',
});
```
### Understanding the Response
When we send a request, the API sends back a response that looks like this:
```json
"output": [
{
"type": "reasoning", // The AI explains what it's thinking
"id": "rs_67cc...",
"summary": [
{
"type": "summary_text",
"text": "Clicking on the browser address bar."
}
]
},
{
"type": "computer_call", // The actual action to perform
"id": "cu_67cc...",
"call_id": "call_zw3...", // Used to track previous calls
"action": {
"type": "click", // What kind of action (click, type, etc.)
"button": "left", // Which mouse button to use
"x": 156, // Where to click (coordinates)
"y": 50
},
"pending_safety_checks": [], // Any safety warnings to consider
"status": "completed" // Whether the action was successful
}
]
```
Each response contains:
1. **Reasoning**: The AI's explanation of what it's doing
2. **Action**: The specific computer action to perform
3. **Safety Checks**: Any potential risks to review
4. **Status**: Whether everything worked as planned
## Implementation Guide
### Provision a Cua Cloud Container
1. Visit [cua.ai](https://cua.ai), sign up, purchase [credits](https://cua.ai/pricing), and create a new container instance from the [dashboard](https://cua.ai/dashboard).
2. Create an API key from the dashboard — be sure to save it in a secure location before continuing.
3. Start the cloud container from the dashboard.
### Environment Setup
1. Install required packages with your preferred package manager:
```bash
npm install --save @trycua/computer # or yarn, pnpm, bun
npm install --save openai # or yarn, pnpm, bun
```
Works with any JavaScript/TypeScript project setup - whether you're using Create React App, Next.js, Vue, Angular, or plain JavaScript.
2. Save your OpenAI API key, Cua API key, and container name to a `.env` file:
```bash
OPENAI_API_KEY=openai-api-key
CUA_API_KEY=cua-api-key
CUA_CONTAINER_NAME=cua-cloud-container-name
```
These environment variables work the same whether you're using vanilla JavaScript, TypeScript, or any web framework.
## Building the Agent
### Mapping API Actions to `@trycua/computer` Interface Methods
This helper function handles a `computer_call` action from the OpenAI API — converting the action into an equivalent action from the `@trycua/computer` interface. These actions will execute on the initialized `Computer` instance. For example, `await computer.interface.leftClick()` sends a mouse left click to the current cursor position.
Whether you're using JavaScript or TypeScript, the interface remains the same:
```javascript
export async function executeAction(
computer: Computer,
action: OpenAI.Responses.ResponseComputerToolCall['action']
) {
switch (action.type) {
case 'click':
const { x, y, button } = action;
console.log(`Executing click at (${x}, ${y}) with button '${button}'.`);
await computer.interface.moveCursor(x, y);
if (button === 'right') await computer.interface.rightClick();
else await computer.interface.leftClick();
break;
case 'type':
const { text } = action;
console.log(`Typing text: ${text}`);
await computer.interface.typeText(text);
break;
case 'scroll':
const { x: locX, y: locY, scroll_x, scroll_y } = action;
console.log(
`Scrolling at (${locX}, ${locY}) with offsets (scroll_x=${scroll_x}, scroll_y=${scroll_y}).`
);
await computer.interface.moveCursor(locX, locY);
await computer.interface.scroll(scroll_x, scroll_y);
break;
case 'keypress':
const { keys } = action;
for (const key of keys) {
console.log(`Pressing key: ${key}.`);
// Map common key names to CUA equivalents
if (key.toLowerCase() === 'enter') {
await computer.interface.pressKey('return');
} else if (key.toLowerCase() === 'space') {
await computer.interface.pressKey('space');
} else {
await computer.interface.pressKey(key);
}
}
break;
case 'wait':
console.log(`Waiting for 3 seconds.`);
await new Promise((resolve) => setTimeout(resolve, 3 * 1000));
break;
case 'screenshot':
console.log('Taking screenshot.');
// This is handled automatically in the main loop, but we can take an extra one if requested
const screenshot = await computer.interface.screenshot();
return screenshot;
default:
console.log(`Unrecognized action: ${action.type}`);
break;
}
}
```
### Implementing the Computer-Use Loop
This section defines a loop that:
1. Initializes the `Computer` instance (connecting to a Linux cloud container).
2. Captures a screenshot of the current state.
3. Sends the screenshot (with a user prompt) to the OpenAI Responses API using the `computer-use-preview` model.
4. Processes the returned `computer_call` action and executes it using our helper function.
5. Captures an updated screenshot after the action.
6. Send the updated screenshot and loops until no more actions are returned.
```javascript
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
// Initialize the Computer Connection
const computer = new Computer({
apiKey: process.env.CUA_API_KEY!,
name: process.env.CUA_CONTAINER_NAME!,
osType: OSType.LINUX,
});
await computer.run();
// Take the initial screenshot
const screenshot = await computer.interface.screenshot();
const screenshotBase64 = screenshot.toString('base64');
// Setup openai config for computer use
const computerUseConfig: OpenAI.Responses.ResponseCreateParamsNonStreaming = {
model: 'computer-use-preview',
tools: [
{
type: 'computer_use_preview',
display_width: 1024,
display_height: 768,
environment: 'linux', // we're using a linux vm
},
],
truncation: 'auto',
};
// Send initial screenshot to the openai computer use model
let res = await openai.responses.create({
...computerUseConfig,
input: [
{
role: 'user',
content: [
// what we want the ai to do
{ type: 'input_text', text: 'open firefox and go to cua.ai' },
// current screenshot of the vm
{
type: 'input_image',
image_url: `data:image/png;base64,${screenshotBase64}`,
detail: 'auto',
},
],
},
],
});
// Loop until there are no more computer use actions.
while (true) {
const computerCalls = res.output.filter((o) => o.type === 'computer_call');
if (computerCalls.length < 1) {
console.log('No more computer calls. Loop complete.');
break;
}
// Get the first call
const call = computerCalls[0];
const action = call.action;
console.log('Received action from OpenAI Responses API:', action);
let ackChecks: OpenAI.Responses.ResponseComputerToolCall.PendingSafetyCheck[] =
[];
if (call.pending_safety_checks.length > 0) {
console.log('Safety checks pending:', call.pending_safety_checks);
// In a real implementation, you would want to get user confirmation here.
ackChecks = call.pending_safety_checks;
}
// Execute the action in the container
await executeAction(computer, action);
// Wait for changes to process within the container (1sec)
await new Promise((resolve) => setTimeout(resolve, 1000));
// Capture new screenshot
const newScreenshot = await computer.interface.screenshot();
const newScreenshotBase64 = newScreenshot.toString('base64');
// Screenshot back as computer_call_output
res = await openai.responses.create({
...computerUseConfig,
previous_response_id: res.id,
input: [
{
type: 'computer_call_output',
call_id: call.call_id,
acknowledged_safety_checks: ackChecks,
output: {
type: 'computer_screenshot',
image_url: `data:image/png;base64,${newScreenshotBase64}`,
},
},
],
});
}
```
You can find the full example on [GitHub](https://github.com/trycua/cua/tree/main/examples/computer-example-ts).
## What's Next?
The `@trycua/computer` Web SDK opens up some interesting possibilities. You could build browser-based testing tools, create interactive demos for your products, or automate repetitive workflows directly from your web apps.
We're working on more examples and better documentation - if you build something cool with this SDK, we'd love to see it. Drop by our [Discord](https://discord.gg/cua-ai) and share what you're working on.
Happy automating on the web!
```
--------------------------------------------------------------------------------
/tests/test_mcp_server_session_management.py:
--------------------------------------------------------------------------------
```python
"""
Tests for MCP Server Session Management functionality.
This module tests the new concurrent session management and resource lifecycle features.
"""
import asyncio
import importlib.util
import sys
import time
import types
from pathlib import Path
import pytest
def _install_stub_module(
name: str, module: types.ModuleType, registry: dict[str, types.ModuleType | None]
) -> None:
registry[name] = sys.modules.get(name)
sys.modules[name] = module
@pytest.fixture
def server_module():
"""Create a server module with stubbed dependencies for testing."""
stubbed_modules: dict[str, types.ModuleType | None] = {}
# Stub MCP Context primitives
mcp_module = types.ModuleType("mcp")
mcp_module.__path__ = [] # mark as package
mcp_server_module = types.ModuleType("mcp.server")
mcp_server_module.__path__ = []
fastmcp_module = types.ModuleType("mcp.server.fastmcp")
class _StubContext:
async def yield_message(self, *args, **kwargs):
return None
async def yield_tool_call(self, *args, **kwargs):
return None
async def yield_tool_output(self, *args, **kwargs):
return None
def report_progress(self, *_args, **_kwargs):
return None
def info(self, *_args, **_kwargs):
return None
def error(self, *_args, **_kwargs):
return None
class _StubImage:
def __init__(self, format: str, data: bytes):
self.format = format
self.data = data
class _StubFastMCP:
def __init__(self, name: str):
self.name = name
self._tools: dict[str, types.FunctionType] = {}
def tool(self, *args, **kwargs):
def decorator(func):
self._tools[func.__name__] = func
return func
return decorator
def run(self):
return None
fastmcp_module.Context = _StubContext
fastmcp_module.FastMCP = _StubFastMCP
fastmcp_module.Image = _StubImage
_install_stub_module("mcp", mcp_module, stubbed_modules)
_install_stub_module("mcp.server", mcp_server_module, stubbed_modules)
_install_stub_module("mcp.server.fastmcp", fastmcp_module, stubbed_modules)
# Stub Computer module
computer_module = types.ModuleType("computer")
class _StubInterface:
async def screenshot(self) -> bytes:
return b"test-screenshot-data"
class _StubComputer:
def __init__(self, *args, **kwargs):
self.interface = _StubInterface()
async def run(self):
return None
computer_module.Computer = _StubComputer
_install_stub_module("computer", computer_module, stubbed_modules)
# Stub agent module
agent_module = types.ModuleType("agent")
class _StubComputerAgent:
def __init__(self, *args, **kwargs):
pass
async def run(self, *_args, **_kwargs):
# Simulate agent execution with streaming
yield {
"output": [
{
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": "Task completed"}],
}
]
}
agent_module.ComputerAgent = _StubComputerAgent
_install_stub_module("agent", agent_module, stubbed_modules)
# Stub session manager module
session_manager_module = types.ModuleType("mcp_server.session_manager")
class _StubSessionInfo:
def __init__(self, session_id: str, computer, created_at: float, last_activity: float):
self.session_id = session_id
self.computer = computer
self.created_at = created_at
self.last_activity = last_activity
self.active_tasks = set()
self.is_shutting_down = False
class _StubSessionManager:
def __init__(self):
self._sessions = {}
self._session_lock = asyncio.Lock()
async def get_session(self, session_id=None):
"""Context manager that returns a session."""
if session_id is None:
session_id = "test-session-123"
async with self._session_lock:
if session_id not in self._sessions:
computer = _StubComputer()
session = _StubSessionInfo(
session_id=session_id,
computer=computer,
created_at=time.time(),
last_activity=time.time(),
)
self._sessions[session_id] = session
return self._sessions[session_id]
async def register_task(self, session_id: str, task_id: str):
pass
async def unregister_task(self, session_id: str, task_id: str):
pass
async def cleanup_session(self, session_id: str):
async with self._session_lock:
self._sessions.pop(session_id, None)
def get_session_stats(self):
return {
"total_sessions": len(self._sessions),
"max_concurrent": 10,
"sessions": {sid: {"active_tasks": 0} for sid in self._sessions},
}
_stub_session_manager = _StubSessionManager()
def get_session_manager():
return _stub_session_manager
async def initialize_session_manager():
return _stub_session_manager
async def shutdown_session_manager():
pass
session_manager_module.get_session_manager = get_session_manager
session_manager_module.initialize_session_manager = initialize_session_manager
session_manager_module.shutdown_session_manager = shutdown_session_manager
_install_stub_module("mcp_server.session_manager", session_manager_module, stubbed_modules)
# Load the actual server module
module_name = "mcp_server_server_under_test"
module_path = Path("libs/python/mcp-server/mcp_server/server.py").resolve()
spec = importlib.util.spec_from_file_location(module_name, module_path)
server_module = importlib.util.module_from_spec(spec)
assert spec and spec.loader
spec.loader.exec_module(server_module)
server_instance = getattr(server_module, "server", None)
if server_instance is not None and hasattr(server_instance, "_tools"):
for name, func in server_instance._tools.items():
setattr(server_module, name, func)
try:
yield server_module
finally:
sys.modules.pop(module_name, None)
for name, original in stubbed_modules.items():
if original is None:
sys.modules.pop(name, None)
else:
sys.modules[name] = original
class FakeContext:
"""Fake context for testing."""
def __init__(self) -> None:
self.events: list[tuple] = []
self.progress_updates: list[float] = []
def info(self, message: str) -> None:
self.events.append(("info", message))
def error(self, message: str) -> None:
self.events.append(("error", message))
def report_progress(self, value: float) -> None:
self.progress_updates.append(value)
async def yield_message(self, *, role: str, content):
timestamp = asyncio.get_running_loop().time()
self.events.append(("message", role, content, timestamp))
async def yield_tool_call(self, *, name: str | None, call_id: str, input):
timestamp = asyncio.get_running_loop().time()
self.events.append(("tool_call", name, call_id, input, timestamp))
async def yield_tool_output(self, *, call_id: str, output, is_error: bool = False):
timestamp = asyncio.get_running_loop().time()
self.events.append(("tool_output", call_id, output, is_error, timestamp))
def test_screenshot_cua_with_session_id(server_module):
"""Test that screenshot_cua works with session management."""
async def _run_test():
ctx = FakeContext()
result = await server_module.screenshot_cua(ctx, session_id="test-session")
assert result.format == "png"
assert result.data == b"test-screenshot-data"
asyncio.run(_run_test())
def test_screenshot_cua_creates_new_session(server_module):
"""Test that screenshot_cua creates a new session when none provided."""
async def _run_test():
ctx = FakeContext()
result = await server_module.screenshot_cua(ctx)
assert result.format == "png"
assert result.data == b"test-screenshot-data"
asyncio.run(_run_test())
def test_run_cua_task_with_session_management(server_module):
"""Test that run_cua_task works with session management."""
async def _run_test():
ctx = FakeContext()
task = "Test task"
session_id = "test-session-456"
text_result, image = await server_module.run_cua_task(ctx, task, session_id)
assert "Task completed" in text_result
assert image.format == "png"
assert image.data == b"test-screenshot-data"
asyncio.run(_run_test())
def test_run_multi_cua_tasks_sequential(server_module):
"""Test that run_multi_cua_tasks works sequentially."""
async def _run_test():
ctx = FakeContext()
tasks = ["Task 1", "Task 2", "Task 3"]
results = await server_module.run_multi_cua_tasks(ctx, tasks, concurrent=False)
assert len(results) == 3
for i, (text, image) in enumerate(results):
assert "Task completed" in text
assert image.format == "png"
asyncio.run(_run_test())
def test_run_multi_cua_tasks_concurrent(server_module):
"""Test that run_multi_cua_tasks works concurrently."""
async def _run_test():
ctx = FakeContext()
tasks = ["Task 1", "Task 2", "Task 3"]
results = await server_module.run_multi_cua_tasks(ctx, tasks, concurrent=True)
assert len(results) == 3
for i, (text, image) in enumerate(results):
assert "Task completed" in text
assert image.format == "png"
asyncio.run(_run_test())
def test_get_session_stats(server_module):
"""Test that get_session_stats returns proper statistics."""
async def _run_test():
ctx = FakeContext()
stats = await server_module.get_session_stats()
assert "total_sessions" in stats
assert "max_concurrent" in stats
assert "sessions" in stats
asyncio.run(_run_test())
def test_cleanup_session(server_module):
"""Test that cleanup_session works properly."""
async def _run_test():
ctx = FakeContext()
session_id = "test-cleanup-session"
result = await server_module.cleanup_session(ctx, session_id)
assert f"Session {session_id} cleanup initiated" in result
asyncio.run(_run_test())
def test_concurrent_sessions_isolation(server_module):
"""Test that concurrent sessions are properly isolated."""
async def _run_test():
ctx = FakeContext()
# Run multiple tasks with different session IDs concurrently
task1 = asyncio.create_task(
server_module.run_cua_task(ctx, "Task for session 1", "session-1")
)
task2 = asyncio.create_task(
server_module.run_cua_task(ctx, "Task for session 2", "session-2")
)
results = await asyncio.gather(task1, task2)
assert len(results) == 2
for text, image in results:
assert "Task completed" in text
assert image.format == "png"
asyncio.run(_run_test())
def test_session_reuse_with_same_id(server_module):
"""Test that sessions are reused when the same session ID is provided."""
async def _run_test():
ctx = FakeContext()
session_id = "reuse-session"
# First call
result1 = await server_module.screenshot_cua(ctx, session_id)
# Second call with same session ID
result2 = await server_module.screenshot_cua(ctx, session_id)
assert result1.format == result2.format
assert result1.data == result2.data
asyncio.run(_run_test())
def test_error_handling_with_session_management(server_module):
"""Test that errors are handled properly with session management."""
async def _run_test():
# Mock an agent that raises an exception
class _FailingAgent:
def __init__(self, *args, **kwargs):
pass
async def run(self, *_args, **_kwargs):
raise RuntimeError("Simulated agent failure")
# Replace the ComputerAgent with our failing one
original_agent = server_module.ComputerAgent
server_module.ComputerAgent = _FailingAgent
try:
ctx = FakeContext()
task = "This will fail"
text_result, image = await server_module.run_cua_task(ctx, task, "error-session")
assert "Error during task execution" in text_result
assert image.format == "png"
finally:
# Restore original agent
server_module.ComputerAgent = original_agent
asyncio.run(_run_test())
```
--------------------------------------------------------------------------------
/libs/python/computer/computer/providers/cloud/provider.py:
--------------------------------------------------------------------------------
```python
"""Cloud VM provider implementation using CUA Public API.
Implements the following public API endpoints:
- GET /v1/vms
- POST /v1/vms/:name/start
- POST /v1/vms/:name/stop
- POST /v1/vms/:name/restart
"""
import logging
from typing import Any, Dict, List, Optional
from ..base import BaseVMProvider, VMProviderType
from ..types import ListVMsResponse, MinimalVM
# Setup logging
logger = logging.getLogger(__name__)
import asyncio
import os
from urllib.parse import urlparse
import aiohttp
DEFAULT_API_BASE = os.getenv("CUA_API_BASE", "https://api.cua.ai")
class CloudProvider(BaseVMProvider):
"""Cloud VM Provider implementation."""
def __init__(
self,
api_key: Optional[str] = None,
verbose: bool = False,
api_base: Optional[str] = None,
**kwargs,
):
"""
Args:
api_key: API key for authentication (defaults to CUA_API_KEY environment variable)
name: Name of the VM
verbose: Enable verbose logging
"""
# Fall back to environment variable if api_key not provided
if api_key is None:
api_key = os.getenv("CUA_API_KEY")
assert (
api_key
), "api_key required for CloudProvider (provide via parameter or CUA_API_KEY environment variable)"
self.api_key = api_key
self.verbose = verbose
self.api_base = (api_base or DEFAULT_API_BASE).rstrip("/")
# Host caching dictionary: {vm_name: host_string}
self._host_cache: Dict[str, str] = {}
@property
def provider_type(self) -> VMProviderType:
return VMProviderType.CLOUD
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Get VM information by querying the VM status endpoint.
- Build hostname via _get_host_for_vm(name) using cached host or fallback
- Probe https://{hostname}:8443/status with a short timeout
- If JSON contains a "status" field, return it; otherwise infer
- Fallback to DNS resolve check to distinguish unknown vs not_found
"""
hostname = await self._get_host_for_vm(name)
# Try HTTPS probe to the computer-server status endpoint (8443)
try:
timeout = aiohttp.ClientTimeout(total=3)
async with aiohttp.ClientSession(timeout=timeout) as session:
url = f"https://{hostname}:8443/status"
async with session.get(url, allow_redirects=False) as resp:
status_code = resp.status
vm_status: str
vm_os_type: Optional[str] = None
if status_code == 200:
try:
data = await resp.json(content_type=None)
vm_status = str(data.get("status", "ok"))
vm_os_type = str(data.get("os_type"))
except Exception:
vm_status = "unknown"
elif status_code < 500:
vm_status = "unknown"
else:
vm_status = "unknown"
return {
"name": name,
"status": "running" if vm_status == "ok" else vm_status,
"api_url": f"https://{hostname}:8443",
"os_type": vm_os_type,
}
except Exception:
return {"name": name, "status": "not_found", "api_url": f"https://{hostname}:8443"}
async def list_vms(self) -> ListVMsResponse:
url = f"{self.api_base}/v1/vms"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/json",
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
if resp.status == 200:
try:
data = await resp.json(content_type=None)
except Exception:
text = await resp.text()
logger.error(f"Failed to parse list_vms JSON: {text}")
return []
if isinstance(data, list):
# Enrich with convenience URLs when possible.
enriched: List[Dict[str, Any]] = []
for item in data:
vm = dict(item) if isinstance(item, dict) else {}
name = vm.get("name")
password = vm.get("password")
api_host = vm.get("host") # Read host from API response
if isinstance(name, str) and name:
# Use host from API if available, otherwise fallback to legacy format
if isinstance(api_host, str) and api_host:
host = api_host
# Cache the host for this VM
self._host_cache[name] = host
else:
# Legacy fallback
host = f"{name}.containers.cloud.trycua.com"
# Cache the legacy host
self._host_cache[name] = host
# api_url: always set if missing
if not vm.get("api_url"):
vm["api_url"] = f"https://{host}:8443"
# vnc_url: only when password available
if not vm.get("vnc_url") and isinstance(password, str) and password:
vm["vnc_url"] = (
f"https://{host}/vnc.html?autoconnect=true&password={password}"
)
enriched.append(vm)
return enriched # type: ignore[return-value]
logger.warning("Unexpected response for list_vms; expected list")
return []
elif resp.status == 401:
logger.error("Unauthorized: invalid CUA API key for list_vms")
return []
else:
text = await resp.text()
logger.error(f"list_vms failed: HTTP {resp.status} - {text}")
return []
async def run_vm(
self,
name: str,
image: Optional[str] = None,
run_opts: Optional[Dict[str, Any]] = None,
storage: Optional[str] = None,
) -> Dict[str, Any]:
"""Start a VM via public API. Returns a minimal status."""
url = f"{self.api_base}/v1/vms/{name}/start"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/json",
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers) as resp:
if resp.status in (200, 201, 202, 204):
return {"name": name, "status": "starting"}
elif resp.status == 404:
return {"name": name, "status": "not_found"}
elif resp.status == 401:
return {"name": name, "status": "unauthorized"}
else:
text = await resp.text()
return {"name": name, "status": "error", "message": text}
async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Stop a VM via public API."""
url = f"{self.api_base}/v1/vms/{name}/stop"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/json",
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers) as resp:
if resp.status in (200, 202):
# Spec says 202 with {"status":"stopping"}
body_status: Optional[str] = None
try:
data = await resp.json(content_type=None)
body_status = data.get("status") if isinstance(data, dict) else None
except Exception:
body_status = None
return {"name": name, "status": body_status or "stopping"}
elif resp.status == 404:
return {"name": name, "status": "not_found"}
elif resp.status == 401:
return {"name": name, "status": "unauthorized"}
else:
text = await resp.text()
return {"name": name, "status": "error", "message": text}
async def restart_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Restart a VM via public API."""
url = f"{self.api_base}/v1/vms/{name}/restart"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/json",
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers) as resp:
if resp.status in (200, 202):
# Spec says 202 with {"status":"restarting"}
body_status: Optional[str] = None
try:
data = await resp.json(content_type=None)
body_status = data.get("status") if isinstance(data, dict) else None
except Exception:
body_status = None
return {"name": name, "status": body_status or "restarting"}
elif resp.status == 404:
return {"name": name, "status": "not_found"}
elif resp.status == 401:
return {"name": name, "status": "unauthorized"}
else:
text = await resp.text()
return {"name": name, "status": "error", "message": text}
async def update_vm(
self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None
) -> Dict[str, Any]:
logger.warning("CloudProvider.update_vm is not implemented via public API")
return {
"name": name,
"status": "unchanged",
"message": "update_vm not supported by public API",
}
async def _get_host_for_vm(self, name: str) -> str:
"""
Get the host for a VM, trying multiple approaches:
1. Check cache first
2. Try to refresh cache by calling list_vms
3. Try .sandbox.cua.ai format
4. Fallback to legacy .containers.cloud.trycua.com format
Args:
name: VM name
Returns:
Host string for the VM
"""
# Check cache first
if name in self._host_cache:
return self._host_cache[name]
# Try to refresh cache by calling list_vms
try:
await self.list_vms()
# Check cache again after refresh
if name in self._host_cache:
return self._host_cache[name]
except Exception as e:
logger.warning(f"Failed to refresh VM list for host lookup: {e}")
# Try .sandbox.cua.ai format first
sandbox_host = f"{name}.sandbox.cua.ai"
if await self._test_host_connectivity(sandbox_host):
self._host_cache[name] = sandbox_host
return sandbox_host
# Fallback to legacy format
legacy_host = f"{name}.containers.cloud.trycua.com"
# Cache the legacy host
self._host_cache[name] = legacy_host
return legacy_host
async def _test_host_connectivity(self, hostname: str) -> bool:
"""
Test if a host is reachable by trying to connect to its status endpoint.
Args:
hostname: Host to test
Returns:
True if host is reachable, False otherwise
"""
try:
timeout = aiohttp.ClientTimeout(total=2) # Short timeout for connectivity test
async with aiohttp.ClientSession(timeout=timeout) as session:
url = f"https://{hostname}:8443/status"
async with session.get(url, allow_redirects=False) as resp:
# Any response (even error) means the host is reachable
return True
except Exception:
return False
async def get_ip(
self, name: Optional[str] = None, storage: Optional[str] = None, retry_delay: int = 2
) -> str:
"""
Return the VM's host address, trying to use cached host from API or falling back to legacy format.
Uses the provided 'name' argument (the VM name requested by the caller).
"""
if name is None:
raise ValueError("VM name is required for CloudProvider.get_ip")
return await self._get_host_for_vm(name)
```
--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/utils/wallpaper.py:
--------------------------------------------------------------------------------
```python
"""Set the desktop wallpaper."""
import os
import subprocess
import sys
from pathlib import Path
def get_desktop_environment() -> str:
"""
Returns the name of the current desktop environment.
"""
# From https://stackoverflow.com/a/21213358/2624876
# which takes from:
# http://stackoverflow.com/questions/2035657/what-is-my-current-desktop-environment
# and http://ubuntuforums.org/showthread.php?t=652320
# and http://ubuntuforums.org/showthread.php?t=1139057
if sys.platform in ["win32", "cygwin"]:
return "windows"
elif sys.platform == "darwin":
return "mac"
else: # Most likely either a POSIX system or something not much common
desktop_session = os.environ.get("DESKTOP_SESSION")
if (
desktop_session is not None
): # easier to match if we doesn't have to deal with character cases
desktop_session = desktop_session.lower()
if desktop_session in [
"gnome",
"unity",
"cinnamon",
"mate",
"xfce4",
"lxde",
"fluxbox",
"blackbox",
"openbox",
"icewm",
"jwm",
"afterstep",
"trinity",
"kde",
]:
return desktop_session
## Special cases ##
# Canonical sets $DESKTOP_SESSION to Lubuntu rather than LXDE if using LXDE.
# There is no guarantee that they will not do the same with the other desktop environments.
elif "xfce" in desktop_session or desktop_session.startswith("xubuntu"):
return "xfce4"
elif desktop_session.startswith("ubuntustudio"):
return "kde"
elif desktop_session.startswith("ubuntu"):
return "gnome"
elif desktop_session.startswith("lubuntu"):
return "lxde"
elif desktop_session.startswith("kubuntu"):
return "kde"
elif desktop_session.startswith("razor"): # e.g. razorkwin
return "razor-qt"
elif desktop_session.startswith("wmaker"): # e.g. wmaker-common
return "windowmaker"
gnome_desktop_session_id = os.environ.get("GNOME_DESKTOP_SESSION_ID")
if os.environ.get("KDE_FULL_SESSION") == "true":
return "kde"
elif gnome_desktop_session_id:
if "deprecated" not in gnome_desktop_session_id:
return "gnome2"
# From http://ubuntuforums.org/showthread.php?t=652320
elif is_running("xfce-mcs-manage"):
return "xfce4"
elif is_running("ksmserver"):
return "kde"
return "unknown"
def is_running(process: str) -> bool:
"""Returns whether a process with the given name is (likely) currently running.
Uses a basic text search, and so may have false positives.
"""
# From http://www.bloggerpolis.com/2011/05/how-to-check-if-a-process-is-running-using-python/
# and http://richarddingwall.name/2009/06/18/windows-equivalents-of-ps-and-kill-commands/
try: # Linux/Unix
s = subprocess.Popen(["ps", "axw"], stdout=subprocess.PIPE)
except: # Windows
s = subprocess.Popen(["tasklist", "/v"], stdout=subprocess.PIPE)
assert s.stdout is not None
for x in s.stdout:
# if re.search(process, x):
if process in str(x):
return True
return False
def set_wallpaper(file_loc: str, first_run: bool = True):
"""Sets the wallpaper to the given file location."""
# From https://stackoverflow.com/a/21213504/2624876
# I have not personally tested most of this. -- @1j01
# -----------------------------------------
# Note: There are two common Linux desktop environments where
# I have not been able to set the desktop background from
# command line: KDE, Enlightenment
desktop_env = get_desktop_environment()
if desktop_env in ["gnome", "unity", "cinnamon"]:
# Tested on Ubuntu 22 -- @1j01
uri = Path(file_loc).as_uri()
SCHEMA = "org.gnome.desktop.background"
KEY = "picture-uri"
# Needed for Ubuntu 22 in dark mode
# Might be better to set only one or the other, depending on the current theme
# In the settings it will say "This background selection only applies to the dark style"
# even if it's set for both, arguably referring to the selection that you can make on that page.
# -- @1j01
KEY_DARK = "picture-uri-dark"
try:
from gi.repository import Gio # type: ignore
gsettings = Gio.Settings.new(SCHEMA) # type: ignore
gsettings.set_string(KEY, uri)
gsettings.set_string(KEY_DARK, uri)
except Exception:
# Fallback tested on Ubuntu 22 -- @1j01
args = ["gsettings", "set", SCHEMA, KEY, uri]
subprocess.Popen(args)
args = ["gsettings", "set", SCHEMA, KEY_DARK, uri]
subprocess.Popen(args)
elif desktop_env == "mate":
try: # MATE >= 1.6
# info from http://wiki.mate-desktop.org/docs:gsettings
args = ["gsettings", "set", "org.mate.background", "picture-filename", file_loc]
subprocess.Popen(args)
except Exception: # MATE < 1.6
# From https://bugs.launchpad.net/variety/+bug/1033918
args = [
"mateconftool-2",
"-t",
"string",
"--set",
"/desktop/mate/background/picture_filename",
file_loc,
]
subprocess.Popen(args)
elif desktop_env == "gnome2": # Not tested
# From https://bugs.launchpad.net/variety/+bug/1033918
args = [
"gconftool-2",
"-t",
"string",
"--set",
"/desktop/gnome/background/picture_filename",
file_loc,
]
subprocess.Popen(args)
## KDE4 is difficult
## see http://blog.zx2c4.com/699 for a solution that might work
elif desktop_env in ["kde3", "trinity"]:
# From http://ubuntuforums.org/archive/index.php/t-803417.html
args = ["dcop", "kdesktop", "KBackgroundIface", "setWallpaper", "0", file_loc, "6"]
subprocess.Popen(args)
elif desktop_env == "xfce4":
# Iterate over all wallpaper-related keys and set to file_loc
try:
list_proc = subprocess.run(
["xfconf-query", "-c", "xfce4-desktop", "-l"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
keys = []
if list_proc.stdout:
for line in list_proc.stdout.splitlines():
line = line.strip()
if not line:
continue
# Common keys: .../last-image and .../image-path
if "/last-image" in line or "/image-path" in line:
keys.append(line)
# Fallback: known defaults if none were listed
if not keys:
keys = [
"/backdrop/screen0/monitorVNC-0/workspace0/last-image",
"/backdrop/screen0/monitor0/image-path",
]
for key in keys:
subprocess.run(
[
"xfconf-query",
"-c",
"xfce4-desktop",
"-p",
key,
"-s",
file_loc,
],
check=False,
)
except Exception:
pass
# Reload xfdesktop to apply changes
subprocess.Popen(["xfdesktop", "--reload"])
elif desktop_env == "razor-qt": # TODO: implement reload of desktop when possible
if first_run:
import configparser
desktop_conf = configparser.ConfigParser()
# Development version
desktop_conf_file = os.path.join(get_config_dir("razor"), "desktop.conf")
if os.path.isfile(desktop_conf_file):
config_option = R"screens\1\desktops\1\wallpaper"
else:
desktop_conf_file = os.path.join(get_home_dir(), ".razor/desktop.conf")
config_option = R"desktops\1\wallpaper"
desktop_conf.read(os.path.join(desktop_conf_file))
try:
if desktop_conf.has_option("razor", config_option): # only replacing a value
desktop_conf.set("razor", config_option, file_loc)
with open(desktop_conf_file, "w", encoding="utf-8", errors="replace") as f:
desktop_conf.write(f)
except Exception:
pass
else:
# TODO: reload desktop when possible
pass
elif desktop_env in ["fluxbox", "jwm", "openbox", "afterstep"]:
# http://fluxbox-wiki.org/index.php/Howto_set_the_background
# used fbsetbg on jwm too since I am too lazy to edit the XML configuration
# now where fbsetbg does the job excellent anyway.
# and I have not figured out how else it can be set on Openbox and AfterSTep
# but fbsetbg works excellent here too.
try:
args = ["fbsetbg", file_loc]
subprocess.Popen(args)
except Exception:
sys.stderr.write("ERROR: Failed to set wallpaper with fbsetbg!\n")
sys.stderr.write("Please make sre that You have fbsetbg installed.\n")
elif desktop_env == "icewm":
# command found at http://urukrama.wordpress.com/2007/12/05/desktop-backgrounds-in-window-managers/
args = ["icewmbg", file_loc]
subprocess.Popen(args)
elif desktop_env == "blackbox":
# command found at http://blackboxwm.sourceforge.net/BlackboxDocumentation/BlackboxBackground
args = ["bsetbg", "-full", file_loc]
subprocess.Popen(args)
elif desktop_env == "lxde":
args = ["pcmanfm", "--set-wallpaper", file_loc, "--wallpaper-mode=scaled"]
subprocess.Popen(args)
elif desktop_env == "windowmaker":
# From http://www.commandlinefu.com/commands/view/3857/set-wallpaper-on-windowmaker-in-one-line
args = ["wmsetbg", "-s", "-u", file_loc]
subprocess.Popen(args)
# elif desktop_env == "enlightenment": # I have not been able to make it work on e17. On e16 it would have been something in this direction
# args = ["enlightenment_remote", "-desktop-bg-add", "0", "0", "0", "0", file_loc]
# subprocess.Popen(args)
elif desktop_env == "windows":
# From https://stackoverflow.com/questions/1977694/change-desktop-background
# Tested on Windows 10. -- @1j01
import ctypes
SPI_SETDESKWALLPAPER = 20
ctypes.windll.user32.SystemParametersInfoW(SPI_SETDESKWALLPAPER, 0, file_loc, 0) # type: ignore
elif desktop_env == "mac":
# From https://stackoverflow.com/questions/431205/how-can-i-programatically-change-the-background-in-mac-os-x
try:
# Tested on macOS 10.14.6 (Mojave) -- @1j01
assert (
sys.platform == "darwin"
) # ignore `Import "appscript" could not be resolved` for other platforms
from appscript import app, mactypes
app("Finder").desktop_picture.set(mactypes.File(file_loc))
except ImportError:
# Tested on macOS 10.14.6 (Mojave) -- @1j01
# import subprocess
# SCRIPT = f"""/usr/bin/osascript<<END
# tell application "Finder" to set desktop picture to POSIX file "{file_loc}"
# END"""
# subprocess.Popen(SCRIPT, shell=True)
# Safer version, avoiding string interpolation,
# to protect against command injection (both in the shell and in AppleScript):
OSASCRIPT = """
on run (clp)
if clp's length is not 1 then error "Incorrect Parameters"
local file_loc
set file_loc to clp's item 1
tell application "Finder" to set desktop picture to POSIX file file_loc
end run
"""
subprocess.Popen(["osascript", "-e", OSASCRIPT, "--", file_loc])
else:
if first_run: # don't spam the user with the same message over and over again
sys.stderr.write(
"Warning: Failed to set wallpaper. Your desktop environment is not supported."
)
sys.stderr.write(f"You can try manually to set your wallpaper to {file_loc}")
return False
return True
def get_config_dir(app_name: str) -> str:
"""Returns the configuration directory for the given application name."""
if "XDG_CONFIG_HOME" in os.environ:
config_home = os.environ["XDG_CONFIG_HOME"]
elif "APPDATA" in os.environ: # On Windows
config_home = os.environ["APPDATA"]
else:
try:
from xdg import BaseDirectory
config_home = BaseDirectory.xdg_config_home
except ImportError: # Most likely a Linux/Unix system anyway
config_home = os.path.join(get_home_dir(), ".config")
config_dir = os.path.join(config_home, app_name)
return config_dir
def get_home_dir() -> str:
"""Returns the home directory of the current user."""
return os.path.expanduser("~")
```
--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/watchdog.py:
--------------------------------------------------------------------------------
```python
"""
Watchdog module for monitoring the Computer API server health.
Unix/Linux only - provides process management and restart capabilities.
"""
import asyncio
import fcntl
import json
import logging
import os
import platform
import subprocess
import sys
import time
from typing import Optional
import websockets
logger = logging.getLogger(__name__)
def instance_already_running(label="watchdog"):
"""
Detect if an an instance with the label is already running, globally
at the operating system level.
Using `os.open` ensures that the file pointer won't be closed
by Python's garbage collector after the function's scope is exited.
The lock will be released when the program exits, or could be
released if the file pointer were closed.
"""
lock_file_pointer = os.open(f"/tmp/instance_{label}.lock", os.O_WRONLY | os.O_CREAT)
try:
fcntl.lockf(lock_file_pointer, fcntl.LOCK_EX | fcntl.LOCK_NB)
already_running = False
except IOError:
already_running = True
return already_running
class Watchdog:
"""Watchdog class to monitor server health via WebSocket connection.
Unix/Linux only - provides restart capabilities.
"""
def __init__(self, cli_args: Optional[dict] = None, ping_interval: int = 30):
"""
Initialize the watchdog.
Args:
cli_args: Dictionary of CLI arguments to replicate when restarting
ping_interval: Interval between ping checks in seconds
"""
# Check if running on Unix/Linux
if platform.system() not in ["Linux", "Darwin"]:
raise RuntimeError("Watchdog is only supported on Unix/Linux systems")
# Store CLI arguments for restart
self.cli_args = cli_args or {}
self.host = self.cli_args.get("host", "localhost")
self.port = self.cli_args.get("port", 8000)
self.ping_interval = ping_interval
self.container_name = os.environ.get("CONTAINER_NAME")
self.running = False
self.restart_enabled = True
@property
def ws_uri(self) -> str:
"""Get the WebSocket URI using the current IP address.
Returns:
WebSocket URI for the Computer API Server
"""
if not self.container_name:
return "ws://localhost:8000/ws"
# Try .sandbox.cua.ai first, fallback to .containers.cloud.trycua.com
return f"wss://{self.container_name}.sandbox.cua.ai:8443/ws"
@property
def ws_uri_fallback(self) -> str:
"""Get the fallback WebSocket URI using legacy hostname.
Returns:
Fallback WebSocket URI for the Computer API Server
"""
if not self.container_name:
return "ws://localhost:8000/ws"
return f"wss://{self.container_name}.containers.cloud.trycua.com:8443/ws"
async def ping(self) -> bool:
"""
Test connection to the WebSocket endpoint.
Returns:
True if connection successful, False otherwise
"""
# Create a simple ping message
ping_message = {"command": "get_screen_size", "params": {}}
# Try primary URI first (.sandbox.cua.ai)
try:
async with websockets.connect(
self.ws_uri, max_size=1024 * 1024 * 10 # 10MB limit to match server
) as websocket:
# Send ping message
await websocket.send(json.dumps(ping_message))
# Wait for any response or just close
try:
response = await asyncio.wait_for(websocket.recv(), timeout=5)
logger.debug(f"Ping response received from primary URI: {response[:100]}...")
return True
except asyncio.TimeoutError:
return False
except Exception as e:
logger.debug(f"Primary URI ping failed: {e}")
# Try fallback URI (.containers.cloud.trycua.com)
if self.container_name:
try:
async with websockets.connect(
self.ws_uri_fallback,
max_size=1024 * 1024 * 10, # 10MB limit to match server
) as websocket:
# Send ping message
await websocket.send(json.dumps(ping_message))
# Wait for any response or just close
try:
response = await asyncio.wait_for(websocket.recv(), timeout=5)
logger.debug(
f"Ping response received from fallback URI: {response[:100]}..."
)
return True
except asyncio.TimeoutError:
return False
except Exception as fallback_e:
logger.warning(
f"Both primary and fallback ping failed. Primary: {e}, Fallback: {fallback_e}"
)
return False
else:
logger.warning(f"Ping failed: {e}")
return False
def kill_processes_on_port(self, port: int) -> bool:
"""
Kill any processes using the specified port.
Args:
port: Port number to check and kill processes on
Returns:
True if processes were killed or none found, False on error
"""
try:
# Find processes using the port
result = subprocess.run(
["lsof", "-ti", f":{port}"], capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
pids = result.stdout.strip().split("\n")
logger.info(f"Found {len(pids)} processes using port {port}: {pids}")
# Kill each process
for pid in pids:
if pid.strip():
try:
subprocess.run(["kill", "-9", pid.strip()], timeout=5)
logger.info(f"Killed process {pid}")
except subprocess.TimeoutExpired:
logger.warning(f"Timeout killing process {pid}")
except Exception as e:
logger.warning(f"Error killing process {pid}: {e}")
return True
else:
logger.debug(f"No processes found using port {port}")
return True
except subprocess.TimeoutExpired:
logger.error(f"Timeout finding processes on port {port}")
return False
except Exception as e:
logger.error(f"Error finding processes on port {port}: {e}")
return False
def restart_server(self) -> bool:
"""
Attempt to restart the server by killing existing processes and starting new one.
Returns:
True if restart was attempted, False on error
"""
if not self.restart_enabled:
logger.info("Server restart is disabled")
return False
try:
logger.info("Attempting to restart server...")
# Kill processes on the port
port_to_kill = 8443 if self.container_name else self.port
if not self.kill_processes_on_port(port_to_kill):
logger.error("Failed to kill processes on port, restart aborted")
return False
# Wait a moment for processes to die
time.sleep(2)
# Try to restart the server
# In container mode, we can't easily restart, so just log
if self.container_name:
logger.warning("Container mode detected - cannot restart server automatically")
logger.warning("Container orchestrator should handle restart")
return False
else:
# For local mode, try to restart the CLI
logger.info("Attempting to restart local server...")
# Get the current Python executable and script
python_exe = sys.executable
# Try to find the CLI module
try:
# Build command with all original CLI arguments
cmd = [python_exe, "-m", "computer_server.cli"]
# Add all CLI arguments except watchdog-related ones
for key, value in self.cli_args.items():
if key in ["watchdog", "watchdog_interval", "no_restart"]:
continue # Skip watchdog args to avoid recursive watchdog
# Convert underscores to hyphens for CLI args
arg_name = f"--{key.replace('_', '-')}"
if isinstance(value, bool):
if value: # Only add flag if True
cmd.append(arg_name)
else:
cmd.extend([arg_name, str(value)])
logger.info(f"Starting server with command: {' '.join(cmd)}")
# Start process in background
subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
logger.info("Server restart initiated")
return True
except Exception as e:
logger.error(f"Failed to restart server: {e}")
return False
except Exception as e:
logger.error(f"Error during server restart: {e}")
return False
async def start_monitoring(self) -> None:
"""Start the watchdog monitoring loop."""
self.running = True
logger.info(f"Starting watchdog monitoring for {self.ws_uri}")
logger.info(f"Ping interval: {self.ping_interval} seconds")
if self.container_name:
logger.info(f"Container mode detected: {self.container_name}")
consecutive_failures = 0
max_failures = 3
while self.running:
try:
success = await self.ping()
if success:
if consecutive_failures > 0:
logger.info("Server connection restored")
consecutive_failures = 0
logger.debug("Ping successful")
else:
consecutive_failures += 1
logger.warning(f"Ping failed ({consecutive_failures}/{max_failures})")
if consecutive_failures >= max_failures:
logger.error(
f"Server appears to be down after {max_failures} consecutive failures"
)
# Attempt to restart the server
if self.restart_enabled:
logger.info("Attempting automatic server restart...")
restart_success = self.restart_server()
if restart_success:
logger.info("Server restart initiated, waiting before next ping...")
# Wait longer after restart attempt
await asyncio.sleep(self.ping_interval * 2)
consecutive_failures = 0 # Reset counter after restart attempt
else:
logger.error("Server restart failed")
else:
logger.warning("Automatic restart is disabled")
# Wait for next ping interval
await asyncio.sleep(self.ping_interval)
except asyncio.CancelledError:
logger.info("Watchdog monitoring cancelled")
break
except Exception as e:
logger.error(f"Unexpected error in watchdog loop: {e}")
await asyncio.sleep(self.ping_interval)
def stop_monitoring(self) -> None:
"""Stop the watchdog monitoring."""
self.running = False
logger.info("Stopping watchdog monitoring")
async def run_watchdog(cli_args: Optional[dict] = None, ping_interval: int = 30) -> None:
"""
Run the watchdog monitoring.
Args:
cli_args: Dictionary of CLI arguments to replicate when restarting
ping_interval: Interval between ping checks in seconds
"""
watchdog = Watchdog(cli_args=cli_args, ping_interval=ping_interval)
try:
await watchdog.start_monitoring()
except KeyboardInterrupt:
logger.info("Watchdog stopped by user")
finally:
watchdog.stop_monitoring()
if __name__ == "__main__":
# For testing the watchdog standalone
import argparse
parser = argparse.ArgumentParser(description="Run Computer API server watchdog")
parser.add_argument("--host", default="localhost", help="Server host to monitor")
parser.add_argument("--port", type=int, default=8000, help="Server port to monitor")
parser.add_argument("--ping-interval", type=int, default=30, help="Ping interval in seconds")
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
cli_args = {"host": args.host, "port": args.port}
asyncio.run(run_watchdog(cli_args, args.ping_interval))
```