This is page 12 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── scripts
│ │ ├── get_pyproject_version.py
│ │ └── tests
│ │ ├── __init__.py
│ │ ├── README.md
│ │ └── test_get_pyproject_version.py
│ └── workflows
│ ├── bump-version.yml
│ ├── ci-lume.yml
│ ├── docker-publish-cua-linux.yml
│ ├── docker-publish-cua-windows.yml
│ ├── docker-publish-kasm.yml
│ ├── docker-publish-xfce.yml
│ ├── docker-reusable-publish.yml
│ ├── link-check.yml
│ ├── lint.yml
│ ├── npm-publish-cli.yml
│ ├── npm-publish-computer.yml
│ ├── npm-publish-core.yml
│ ├── publish-lume.yml
│ ├── pypi-publish-agent.yml
│ ├── pypi-publish-computer-server.yml
│ ├── pypi-publish-computer.yml
│ ├── pypi-publish-core.yml
│ ├── pypi-publish-mcp-server.yml
│ ├── pypi-publish-som.yml
│ ├── pypi-reusable-publish.yml
│ ├── python-tests.yml
│ ├── test-cua-models.yml
│ └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│ ├── docs.code-workspace
│ ├── extensions.json
│ ├── launch.json
│ ├── libs-ts.code-workspace
│ ├── lume.code-workspace
│ ├── lumier.code-workspace
│ ├── py.code-workspace
│ └── settings.json
├── blog
│ ├── app-use.md
│ ├── assets
│ │ ├── composite-agents.png
│ │ ├── docker-ubuntu-support.png
│ │ ├── hack-booth.png
│ │ ├── hack-closing-ceremony.jpg
│ │ ├── hack-cua-ollama-hud.jpeg
│ │ ├── hack-leaderboard.png
│ │ ├── hack-the-north.png
│ │ ├── hack-winners.jpeg
│ │ ├── hack-workshop.jpeg
│ │ ├── hud-agent-evals.png
│ │ └── trajectory-viewer.jpeg
│ ├── bringing-computer-use-to-the-web.md
│ ├── build-your-own-operator-on-macos-1.md
│ ├── build-your-own-operator-on-macos-2.md
│ ├── cloud-windows-ga-macos-preview.md
│ ├── composite-agents.md
│ ├── computer-use-agents-for-growth-hacking.md
│ ├── cua-hackathon.md
│ ├── cua-playground-preview.md
│ ├── cua-vlm-router.md
│ ├── hack-the-north.md
│ ├── hud-agent-evals.md
│ ├── human-in-the-loop.md
│ ├── introducing-cua-cli.md
│ ├── introducing-cua-cloud-containers.md
│ ├── lume-to-containerization.md
│ ├── neurips-2025-cua-papers.md
│ ├── sandboxed-python-execution.md
│ ├── training-computer-use-models-trajectories-1.md
│ ├── trajectory-viewer.md
│ ├── ubuntu-docker-support.md
│ └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│ ├── .env.example
│ ├── .gitignore
│ ├── content
│ │ └── docs
│ │ ├── agent-sdk
│ │ │ ├── agent-loops.mdx
│ │ │ ├── benchmarks
│ │ │ │ ├── index.mdx
│ │ │ │ ├── interactive.mdx
│ │ │ │ ├── introduction.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── osworld-verified.mdx
│ │ │ │ ├── screenspot-pro.mdx
│ │ │ │ └── screenspot-v2.mdx
│ │ │ ├── callbacks
│ │ │ │ ├── agent-lifecycle.mdx
│ │ │ │ ├── cost-saving.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── logging.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── pii-anonymization.mdx
│ │ │ │ └── trajectories.mdx
│ │ │ ├── chat-history.mdx
│ │ │ ├── custom-tools.mdx
│ │ │ ├── customizing-computeragent.mdx
│ │ │ ├── integrations
│ │ │ │ ├── hud.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── observability.mdx
│ │ │ ├── mcp-server
│ │ │ │ ├── client-integrations.mdx
│ │ │ │ ├── configuration.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── llm-integrations.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── tools.mdx
│ │ │ │ └── usage.mdx
│ │ │ ├── message-format.mdx
│ │ │ ├── meta.json
│ │ │ ├── migration-guide.mdx
│ │ │ ├── prompt-caching.mdx
│ │ │ ├── supported-agents
│ │ │ │ ├── composed-agents.mdx
│ │ │ │ ├── computer-use-agents.mdx
│ │ │ │ ├── grounding-models.mdx
│ │ │ │ ├── human-in-the-loop.mdx
│ │ │ │ └── meta.json
│ │ │ ├── supported-model-providers
│ │ │ │ ├── cua-vlm-router.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ └── local-models.mdx
│ │ │ ├── telemetry.mdx
│ │ │ └── usage-tracking.mdx
│ │ ├── cli-playbook
│ │ │ ├── commands.mdx
│ │ │ ├── index.mdx
│ │ │ └── meta.json
│ │ ├── computer-sdk
│ │ │ ├── cloud-vm-management.mdx
│ │ │ ├── commands.mdx
│ │ │ ├── computer-server
│ │ │ │ ├── Commands.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── REST-API.mdx
│ │ │ │ └── WebSocket-API.mdx
│ │ │ ├── computer-ui.mdx
│ │ │ ├── computers.mdx
│ │ │ ├── custom-computer-handlers.mdx
│ │ │ ├── meta.json
│ │ │ ├── sandboxed-python.mdx
│ │ │ └── tracing-api.mdx
│ │ ├── example-usecases
│ │ │ ├── form-filling.mdx
│ │ │ ├── gemini-complex-ui-navigation.mdx
│ │ │ ├── meta.json
│ │ │ ├── post-event-contact-export.mdx
│ │ │ └── windows-app-behind-vpn.mdx
│ │ ├── get-started
│ │ │ ├── meta.json
│ │ │ └── quickstart.mdx
│ │ ├── index.mdx
│ │ ├── macos-vm-cli-playbook
│ │ │ ├── lume
│ │ │ │ ├── cli-reference.mdx
│ │ │ │ ├── faq.md
│ │ │ │ ├── http-api.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── prebuilt-images.mdx
│ │ │ ├── lumier
│ │ │ │ ├── building-lumier.mdx
│ │ │ │ ├── docker-compose.mdx
│ │ │ │ ├── docker.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ └── meta.json
│ │ │ └── meta.json
│ │ └── meta.json
│ ├── next.config.mjs
│ ├── package-lock.json
│ ├── package.json
│ ├── pnpm-lock.yaml
│ ├── postcss.config.mjs
│ ├── public
│ │ └── img
│ │ ├── agent_gradio_ui.png
│ │ ├── agent.png
│ │ ├── bg-dark.jpg
│ │ ├── bg-light.jpg
│ │ ├── cli.png
│ │ ├── computer.png
│ │ ├── grounding-with-gemini3.gif
│ │ ├── hero.png
│ │ ├── laminar_trace_example.png
│ │ ├── som_box_threshold.png
│ │ └── som_iou_threshold.png
│ ├── README.md
│ ├── source.config.ts
│ ├── src
│ │ ├── app
│ │ │ ├── (home)
│ │ │ │ ├── [[...slug]]
│ │ │ │ │ └── page.tsx
│ │ │ │ └── layout.tsx
│ │ │ ├── api
│ │ │ │ ├── posthog
│ │ │ │ │ └── [...path]
│ │ │ │ │ └── route.ts
│ │ │ │ └── search
│ │ │ │ └── route.ts
│ │ │ ├── favicon.ico
│ │ │ ├── global.css
│ │ │ ├── layout.config.tsx
│ │ │ ├── layout.tsx
│ │ │ ├── llms.mdx
│ │ │ │ └── [[...slug]]
│ │ │ │ └── route.ts
│ │ │ ├── llms.txt
│ │ │ │ └── route.ts
│ │ │ ├── robots.ts
│ │ │ └── sitemap.ts
│ │ ├── assets
│ │ │ ├── discord-black.svg
│ │ │ ├── discord-white.svg
│ │ │ ├── logo-black.svg
│ │ │ └── logo-white.svg
│ │ ├── components
│ │ │ ├── analytics-tracker.tsx
│ │ │ ├── cookie-consent.tsx
│ │ │ ├── doc-actions-menu.tsx
│ │ │ ├── editable-code-block.tsx
│ │ │ ├── footer.tsx
│ │ │ ├── hero.tsx
│ │ │ ├── iou.tsx
│ │ │ ├── mermaid.tsx
│ │ │ └── page-feedback.tsx
│ │ ├── lib
│ │ │ ├── llms.ts
│ │ │ └── source.ts
│ │ ├── mdx-components.tsx
│ │ └── providers
│ │ └── posthog-provider.tsx
│ └── tsconfig.json
├── examples
│ ├── agent_examples.py
│ ├── agent_ui_examples.py
│ ├── browser_tool_example.py
│ ├── cloud_api_examples.py
│ ├── computer_examples_windows.py
│ ├── computer_examples.py
│ ├── computer_ui_examples.py
│ ├── computer-example-ts
│ │ ├── .env.example
│ │ ├── .gitignore
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── README.md
│ │ ├── src
│ │ │ ├── helpers.ts
│ │ │ └── index.ts
│ │ └── tsconfig.json
│ ├── docker_examples.py
│ ├── evals
│ │ ├── hud_eval_examples.py
│ │ └── wikipedia_most_linked.txt
│ ├── pylume_examples.py
│ ├── sandboxed_functions_examples.py
│ ├── som_examples.py
│ ├── tracing_examples.py
│ ├── utils.py
│ └── winsandbox_example.py
├── img
│ ├── agent_gradio_ui.png
│ ├── agent.png
│ ├── cli.png
│ ├── computer.png
│ ├── logo_black.png
│ └── logo_white.png
├── libs
│ ├── kasm
│ │ ├── Dockerfile
│ │ ├── LICENSE
│ │ ├── README.md
│ │ └── src
│ │ └── ubuntu
│ │ └── install
│ │ └── firefox
│ │ ├── custom_startup.sh
│ │ ├── firefox.desktop
│ │ └── install_firefox.sh
│ ├── lume
│ │ ├── .cursorignore
│ │ ├── CONTRIBUTING.md
│ │ ├── Development.md
│ │ ├── img
│ │ │ └── cli.png
│ │ ├── Package.resolved
│ │ ├── Package.swift
│ │ ├── README.md
│ │ ├── resources
│ │ │ └── lume.entitlements
│ │ ├── scripts
│ │ │ ├── build
│ │ │ │ ├── build-debug.sh
│ │ │ │ ├── build-release-notarized.sh
│ │ │ │ └── build-release.sh
│ │ │ └── install.sh
│ │ ├── src
│ │ │ ├── Commands
│ │ │ │ ├── Clone.swift
│ │ │ │ ├── Config.swift
│ │ │ │ ├── Create.swift
│ │ │ │ ├── Delete.swift
│ │ │ │ ├── Get.swift
│ │ │ │ ├── Images.swift
│ │ │ │ ├── IPSW.swift
│ │ │ │ ├── List.swift
│ │ │ │ ├── Logs.swift
│ │ │ │ ├── Options
│ │ │ │ │ └── FormatOption.swift
│ │ │ │ ├── Prune.swift
│ │ │ │ ├── Pull.swift
│ │ │ │ ├── Push.swift
│ │ │ │ ├── Run.swift
│ │ │ │ ├── Serve.swift
│ │ │ │ ├── Set.swift
│ │ │ │ └── Stop.swift
│ │ │ ├── ContainerRegistry
│ │ │ │ ├── ImageContainerRegistry.swift
│ │ │ │ ├── ImageList.swift
│ │ │ │ └── ImagesPrinter.swift
│ │ │ ├── Errors
│ │ │ │ └── Errors.swift
│ │ │ ├── FileSystem
│ │ │ │ ├── Home.swift
│ │ │ │ ├── Settings.swift
│ │ │ │ ├── VMConfig.swift
│ │ │ │ ├── VMDirectory.swift
│ │ │ │ └── VMLocation.swift
│ │ │ ├── LumeController.swift
│ │ │ ├── Main.swift
│ │ │ ├── Server
│ │ │ │ ├── Handlers.swift
│ │ │ │ ├── HTTP.swift
│ │ │ │ ├── Requests.swift
│ │ │ │ ├── Responses.swift
│ │ │ │ └── Server.swift
│ │ │ ├── Utils
│ │ │ │ ├── CommandRegistry.swift
│ │ │ │ ├── CommandUtils.swift
│ │ │ │ ├── Logger.swift
│ │ │ │ ├── NetworkUtils.swift
│ │ │ │ ├── Path.swift
│ │ │ │ ├── ProcessRunner.swift
│ │ │ │ ├── ProgressLogger.swift
│ │ │ │ ├── String.swift
│ │ │ │ └── Utils.swift
│ │ │ ├── Virtualization
│ │ │ │ ├── DarwinImageLoader.swift
│ │ │ │ ├── DHCPLeaseParser.swift
│ │ │ │ ├── ImageLoaderFactory.swift
│ │ │ │ └── VMVirtualizationService.swift
│ │ │ ├── VM
│ │ │ │ ├── DarwinVM.swift
│ │ │ │ ├── LinuxVM.swift
│ │ │ │ ├── VM.swift
│ │ │ │ ├── VMDetails.swift
│ │ │ │ ├── VMDetailsPrinter.swift
│ │ │ │ ├── VMDisplayResolution.swift
│ │ │ │ └── VMFactory.swift
│ │ │ └── VNC
│ │ │ ├── PassphraseGenerator.swift
│ │ │ └── VNCService.swift
│ │ └── tests
│ │ ├── Mocks
│ │ │ ├── MockVM.swift
│ │ │ ├── MockVMVirtualizationService.swift
│ │ │ └── MockVNCService.swift
│ │ ├── VM
│ │ │ └── VMDetailsPrinterTests.swift
│ │ ├── VMTests.swift
│ │ ├── VMVirtualizationServiceTests.swift
│ │ └── VNCServiceTests.swift
│ ├── lumier
│ │ ├── .dockerignore
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── bin
│ │ │ └── entry.sh
│ │ ├── config
│ │ │ └── constants.sh
│ │ ├── hooks
│ │ │ └── on-logon.sh
│ │ └── lib
│ │ ├── utils.sh
│ │ └── vm.sh
│ ├── python
│ │ ├── agent
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── agent
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── adapters
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── cua_adapter.py
│ │ │ │ │ ├── huggingfacelocal_adapter.py
│ │ │ │ │ ├── human_adapter.py
│ │ │ │ │ ├── mlxvlm_adapter.py
│ │ │ │ │ └── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ └── qwen2_5_vl.py
│ │ │ │ ├── agent.py
│ │ │ │ ├── callbacks
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── budget_manager.py
│ │ │ │ │ ├── image_retention.py
│ │ │ │ │ ├── logging.py
│ │ │ │ │ ├── operator_validator.py
│ │ │ │ │ ├── pii_anonymization.py
│ │ │ │ │ ├── prompt_instructions.py
│ │ │ │ │ ├── telemetry.py
│ │ │ │ │ └── trajectory_saver.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── computers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cua.py
│ │ │ │ │ └── custom.py
│ │ │ │ ├── decorators.py
│ │ │ │ ├── human_tool
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ ├── server.py
│ │ │ │ │ └── ui.py
│ │ │ │ ├── integrations
│ │ │ │ │ └── hud
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── agent.py
│ │ │ │ │ └── proxy.py
│ │ │ │ ├── loops
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── anthropic.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── composed_grounded.py
│ │ │ │ │ ├── gelato.py
│ │ │ │ │ ├── gemini.py
│ │ │ │ │ ├── generic_vlm.py
│ │ │ │ │ ├── glm45v.py
│ │ │ │ │ ├── gta1.py
│ │ │ │ │ ├── holo.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── model_types.csv
│ │ │ │ │ ├── moondream3.py
│ │ │ │ │ ├── omniparser.py
│ │ │ │ │ ├── openai.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ ├── uiins.py
│ │ │ │ │ ├── uitars.py
│ │ │ │ │ └── uitars2.py
│ │ │ │ ├── proxy
│ │ │ │ │ ├── examples.py
│ │ │ │ │ └── handlers.py
│ │ │ │ ├── responses.py
│ │ │ │ ├── tools
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── browser_tool.py
│ │ │ │ ├── types.py
│ │ │ │ └── ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ └── gradio
│ │ │ │ ├── __init__.py
│ │ │ │ ├── app.py
│ │ │ │ └── ui_components.py
│ │ │ ├── benchmarks
│ │ │ │ ├── .gitignore
│ │ │ │ ├── contrib.md
│ │ │ │ ├── interactive.py
│ │ │ │ ├── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ └── gta1.py
│ │ │ │ ├── README.md
│ │ │ │ ├── ss-pro.py
│ │ │ │ ├── ss-v2.py
│ │ │ │ └── utils.py
│ │ │ ├── example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer_agent.py
│ │ ├── bench-ui
│ │ │ ├── bench_ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api.py
│ │ │ │ └── child.py
│ │ │ ├── examples
│ │ │ │ ├── folder_example.py
│ │ │ │ ├── gui
│ │ │ │ │ ├── index.html
│ │ │ │ │ ├── logo.svg
│ │ │ │ │ └── styles.css
│ │ │ │ ├── output_overlay.png
│ │ │ │ └── simple_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ └── test_port_detection.py
│ │ ├── computer
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer
│ │ │ │ ├── __init__.py
│ │ │ │ ├── computer.py
│ │ │ │ ├── diorama_computer.py
│ │ │ │ ├── helpers.py
│ │ │ │ ├── interface
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ ├── models.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── logger.py
│ │ │ │ ├── models.py
│ │ │ │ ├── providers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cloud
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── docker
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── lume
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── lume_api.py
│ │ │ │ │ ├── lumier
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── types.py
│ │ │ │ │ └── winsandbox
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── provider.py
│ │ │ │ │ └── setup_script.ps1
│ │ │ │ ├── tracing_wrapper.py
│ │ │ │ ├── tracing.py
│ │ │ │ ├── ui
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ └── gradio
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── app.py
│ │ │ │ └── utils.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer.py
│ │ ├── computer-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── browser.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── diorama
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── diorama_computer.py
│ │ │ │ │ ├── diorama.py
│ │ │ │ │ ├── draw.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── safezone.py
│ │ │ │ ├── handlers
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── main.py
│ │ │ │ ├── server.py
│ │ │ │ ├── utils
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── wallpaper.py
│ │ │ │ └── watchdog.py
│ │ │ ├── examples
│ │ │ │ ├── __init__.py
│ │ │ │ └── usage_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ ├── run_server.py
│ │ │ ├── test_connection.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_server.py
│ │ ├── core
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── core
│ │ │ │ ├── __init__.py
│ │ │ │ └── telemetry
│ │ │ │ ├── __init__.py
│ │ │ │ └── posthog.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_telemetry.py
│ │ ├── mcp-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── build-extension.py
│ │ │ ├── CONCURRENT_SESSIONS.md
│ │ │ ├── desktop-extension
│ │ │ │ ├── cua-extension.mcpb
│ │ │ │ ├── desktop_extension.png
│ │ │ │ ├── manifest.json
│ │ │ │ ├── README.md
│ │ │ │ ├── requirements.txt
│ │ │ │ ├── run_server.sh
│ │ │ │ └── setup.py
│ │ │ ├── mcp_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── server.py
│ │ │ │ └── session_manager.py
│ │ │ ├── pdm.lock
│ │ │ ├── pyproject.toml
│ │ │ ├── QUICK_TEST_COMMANDS.sh
│ │ │ ├── quick_test_local_option.py
│ │ │ ├── README.md
│ │ │ ├── scripts
│ │ │ │ ├── install_mcp_server.sh
│ │ │ │ └── start_mcp_server.sh
│ │ │ ├── test_mcp_server_local_option.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_mcp_server.py
│ │ ├── pylume
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_pylume.py
│ │ └── som
│ │ ├── .bumpversion.cfg
│ │ ├── LICENSE
│ │ ├── poetry.toml
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ ├── som
│ │ │ ├── __init__.py
│ │ │ ├── detect.py
│ │ │ ├── detection.py
│ │ │ ├── models.py
│ │ │ ├── ocr.py
│ │ │ ├── util
│ │ │ │ └── utils.py
│ │ │ └── visualization.py
│ │ └── tests
│ │ ├── conftest.py
│ │ └── test_omniparser.py
│ ├── qemu-docker
│ │ ├── linux
│ │ │ ├── Dockerfile
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ ├── entry.sh
│ │ │ └── vm
│ │ │ ├── image
│ │ │ │ └── README.md
│ │ │ └── setup
│ │ │ ├── install.sh
│ │ │ ├── setup-cua-server.sh
│ │ │ └── setup.sh
│ │ ├── README.md
│ │ └── windows
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── entry.sh
│ │ └── vm
│ │ ├── image
│ │ │ └── README.md
│ │ └── setup
│ │ ├── install.bat
│ │ ├── on-logon.ps1
│ │ ├── setup-cua-server.ps1
│ │ ├── setup-utils.psm1
│ │ └── setup.ps1
│ ├── typescript
│ │ ├── .gitignore
│ │ ├── .nvmrc
│ │ ├── agent
│ │ │ ├── examples
│ │ │ │ ├── playground-example.html
│ │ │ │ └── README.md
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── client.ts
│ │ │ │ ├── index.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ └── client.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── computer
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── computer
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── providers
│ │ │ │ │ │ ├── base.ts
│ │ │ │ │ │ ├── cloud.ts
│ │ │ │ │ │ └── index.ts
│ │ │ │ │ └── types.ts
│ │ │ │ ├── index.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── base.ts
│ │ │ │ │ ├── factory.ts
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── linux.ts
│ │ │ │ │ ├── macos.ts
│ │ │ │ │ └── windows.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ ├── computer
│ │ │ │ │ └── cloud.test.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── factory.test.ts
│ │ │ │ │ ├── index.test.ts
│ │ │ │ │ ├── linux.test.ts
│ │ │ │ │ ├── macos.test.ts
│ │ │ │ │ └── windows.test.ts
│ │ │ │ └── setup.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── core
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── index.ts
│ │ │ │ └── telemetry
│ │ │ │ ├── clients
│ │ │ │ │ ├── index.ts
│ │ │ │ │ └── posthog.ts
│ │ │ │ └── index.ts
│ │ │ ├── tests
│ │ │ │ └── telemetry.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── cua-cli
│ │ │ ├── .gitignore
│ │ │ ├── .prettierrc
│ │ │ ├── bun.lock
│ │ │ ├── CLAUDE.md
│ │ │ ├── index.ts
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── auth.ts
│ │ │ │ ├── cli.ts
│ │ │ │ ├── commands
│ │ │ │ │ ├── auth.ts
│ │ │ │ │ └── sandbox.ts
│ │ │ │ ├── config.ts
│ │ │ │ ├── http.ts
│ │ │ │ ├── storage.ts
│ │ │ │ └── util.ts
│ │ │ └── tsconfig.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── pnpm-workspace.yaml
│ │ └── README.md
│ └── xfce
│ ├── .dockerignore
│ ├── .gitignore
│ ├── Development.md
│ ├── Dockerfile
│ ├── Dockerfile.dev
│ ├── README.md
│ └── src
│ ├── scripts
│ │ ├── resize-display.sh
│ │ ├── start-computer-server.sh
│ │ ├── start-novnc.sh
│ │ ├── start-vnc.sh
│ │ └── xstartup.sh
│ ├── supervisor
│ │ └── supervisord.conf
│ └── xfce-config
│ ├── helpers.rc
│ ├── xfce4-power-manager.xml
│ └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│ ├── agent_nb.ipynb
│ ├── blog
│ │ ├── build-your-own-operator-on-macos-1.ipynb
│ │ └── build-your-own-operator-on-macos-2.ipynb
│ ├── composite_agents_docker_nb.ipynb
│ ├── computer_nb.ipynb
│ ├── computer_server_nb.ipynb
│ ├── customizing_computeragent.ipynb
│ ├── eval_osworld.ipynb
│ ├── ollama_nb.ipynb
│ ├── README.md
│ ├── sota_hackathon_cloud.ipynb
│ └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── install-cli.ps1
│ ├── install-cli.sh
│ ├── playground-docker.sh
│ ├── playground.sh
│ ├── run-docker-dev.sh
│ └── typescript-typecheck.js
├── TESTING.md
├── tests
│ ├── agent_loop_testing
│ │ ├── agent_test.py
│ │ └── README.md
│ ├── pytest.ini
│ ├── shell_cmd.py
│ ├── test_files.py
│ ├── test_mcp_server_session_management.py
│ ├── test_mcp_server_streaming.py
│ ├── test_shell_bash.py
│ ├── test_telemetry.py
│ ├── test_tracing.py
│ ├── test_venv.py
│ └── test_watchdog.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/test_files.py:
--------------------------------------------------------------------------------
```python
"""
File System Interface Tests
Tests for the file system methods of the Computer interface (macOS).
Required environment variables:
- CUA_API_KEY: API key for Cua cloud provider
- CUA_CONTAINER_NAME: Name of the container to use
"""
import asyncio
import os
import sys
import traceback
from pathlib import Path
import pytest
# Load environment variables from .env file
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
print(f"Loading environment from: {env_file}")
from dotenv import load_dotenv
load_dotenv(env_file)
# Add paths to sys.path if needed
pythonpath = os.environ.get("PYTHONPATH", "")
for path in pythonpath.split(":"):
if path and path not in sys.path:
sys.path.insert(0, path) # Insert at beginning to prioritize
print(f"Added to sys.path: {path}")
from computer import Computer, VMProviderType
@pytest.fixture(scope="session")
async def computer():
"""Shared Computer instance for all test cases."""
# Create a remote Linux computer with Cua
computer = Computer(
os_type="linux",
api_key=os.getenv("CUA_API_KEY"),
name=str(os.getenv("CUA_CONTAINER_NAME")),
provider_type=VMProviderType.CLOUD,
)
# Create a local macOS computer with Cua
# computer = Computer()
# Connect to host computer
# computer = Computer(use_host_computer_server=True)
try:
await computer.run()
yield computer
finally:
await computer.disconnect()
@pytest.mark.asyncio(loop_scope="session")
async def test_file_exists(computer):
tmp_path = "test_file_exists.txt"
# Ensure file does not exist
if await computer.interface.file_exists(tmp_path):
await computer.interface.delete_file(tmp_path)
exists = await computer.interface.file_exists(tmp_path)
assert exists is False, f"File {tmp_path} should not exist"
# Create file and check again
await computer.interface.write_text(tmp_path, "hello")
exists = await computer.interface.file_exists(tmp_path)
assert exists is True, f"File {tmp_path} should exist"
await computer.interface.delete_file(tmp_path)
@pytest.mark.asyncio(loop_scope="session")
async def test_directory_exists(computer):
tmp_dir = "test_directory_exists"
if await computer.interface.directory_exists(tmp_dir):
# Remove all files in directory before removing directory
files = await computer.interface.list_dir(tmp_dir)
for fname in files:
await computer.interface.delete_file(f"{tmp_dir}/{fname}")
# Remove the directory itself
await computer.interface.delete_dir(tmp_dir)
exists = await computer.interface.directory_exists(tmp_dir)
assert exists is False, f"Directory {tmp_dir} should not exist"
await computer.interface.create_dir(tmp_dir)
exists = await computer.interface.directory_exists(tmp_dir)
assert exists is True, f"Directory {tmp_dir} should exist"
# Cleanup: remove files and directory
files = await computer.interface.list_dir(tmp_dir)
for fname in files:
await computer.interface.delete_file(f"{tmp_dir}/{fname}")
await computer.interface.delete_dir(tmp_dir)
@pytest.mark.asyncio(loop_scope="session")
async def test_list_dir(computer):
tmp_dir = "test_list_dir"
if not await computer.interface.directory_exists(tmp_dir):
await computer.interface.create_dir(tmp_dir)
files = ["foo.txt", "bar.txt"]
for fname in files:
await computer.interface.write_text(f"{tmp_dir}/{fname}", "hi")
result = await computer.interface.list_dir(tmp_dir)
assert set(result) >= set(files), f"Directory {tmp_dir} should contain files {files}"
for fname in files:
await computer.interface.delete_file(f"{tmp_dir}/{fname}")
await computer.interface.delete_dir(tmp_dir)
@pytest.mark.asyncio(loop_scope="session")
async def test_read_write_text(computer):
tmp_path = "test_rw_text.txt"
content = "sample text"
await computer.interface.write_text(tmp_path, content)
read = await computer.interface.read_text(tmp_path)
assert read == content, "File content should match"
await computer.interface.delete_file(tmp_path)
@pytest.mark.asyncio(loop_scope="session")
async def test_delete_file(computer):
tmp_path = "test_delete_file.txt"
await computer.interface.write_text(tmp_path, "bye")
exists = await computer.interface.file_exists(tmp_path)
assert exists is True, "File should exist"
await computer.interface.delete_file(tmp_path)
exists = await computer.interface.file_exists(tmp_path)
assert exists is False, "File should not exist"
@pytest.mark.asyncio(loop_scope="session")
async def test_create_dir(computer):
tmp_dir = "test_create_dir"
if await computer.interface.directory_exists(tmp_dir):
await computer.interface.delete_dir(tmp_dir)
await computer.interface.create_dir(tmp_dir)
exists = await computer.interface.directory_exists(tmp_dir)
assert exists is True, "Directory should exist"
await computer.interface.delete_dir(tmp_dir)
@pytest.mark.asyncio(loop_scope="session")
async def test_read_bytes_basic(computer):
"""Test basic read_bytes functionality."""
tmp_path = "test_read_bytes.bin"
test_data = b"Hello, World! This is binary data \x00\x01\x02\x03"
# Write binary data using write_text (assuming it handles bytes)
await computer.interface.write_text(tmp_path, test_data.decode("latin-1"))
# Read all bytes
read_data = await computer.interface.read_bytes(tmp_path)
assert read_data == test_data, "Binary data should match"
await computer.interface.delete_file(tmp_path)
@pytest.mark.asyncio(loop_scope="session")
async def test_read_bytes_with_offset_and_length(computer):
"""Test read_bytes with offset and length parameters."""
tmp_path = "test_read_bytes_offset.bin"
test_data = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
# Write test data
await computer.interface.write_text(tmp_path, test_data.decode("latin-1"))
# Test reading with offset only
read_data = await computer.interface.read_bytes(tmp_path, offset=5)
expected = test_data[5:]
assert (
read_data == expected
), f"Data from offset 5 should match. Got: {read_data}, Expected: {expected}"
# Test reading with offset and length
read_data = await computer.interface.read_bytes(tmp_path, offset=10, length=5)
expected = test_data[10:15]
assert (
read_data == expected
), f"Data from offset 10, length 5 should match. Got: {read_data}, Expected: {expected}"
# Test reading from beginning with length
read_data = await computer.interface.read_bytes(tmp_path, offset=0, length=10)
expected = test_data[:10]
assert (
read_data == expected
), f"Data from beginning, length 10 should match. Got: {read_data}, Expected: {expected}"
await computer.interface.delete_file(tmp_path)
@pytest.mark.asyncio(loop_scope="session")
async def test_get_file_size(computer):
"""Test get_file_size functionality."""
tmp_path = "test_file_size.txt"
test_content = "A" * 1000 # 1000 bytes
await computer.interface.write_text(tmp_path, test_content)
file_size = await computer.interface.get_file_size(tmp_path)
assert file_size == 1000, f"File size should be 1000 bytes, got {file_size}"
await computer.interface.delete_file(tmp_path)
@pytest.mark.asyncio(loop_scope="session")
async def test_read_large_file(computer):
"""Test reading a file larger than 10MB to verify chunked reading."""
tmp_path = "test_large_file.bin"
# Create a file larger than 10MB (10 * 1024 * 1024 = 10,485,760 bytes)
total_size = 12 * 1024 * 1024 # 12MB
print(f"Creating large file of {total_size} bytes ({total_size / (1024*1024):.1f}MB)...")
# Create large file content (this will test the chunked writing functionality)
large_content = b"X" * total_size
# Write the large file using write_bytes (will automatically use chunked writing)
await computer.interface.write_bytes(tmp_path, large_content)
# Verify file size
file_size = await computer.interface.get_file_size(tmp_path)
assert file_size == total_size, f"Large file size should be {total_size} bytes, got {file_size}"
print(f"Large file created successfully: {file_size} bytes")
# Test reading the entire large file (should use chunked reading)
print("Reading large file...")
read_data = await computer.interface.read_bytes(tmp_path)
assert (
len(read_data) == total_size
), f"Read data size should match file size. Got {len(read_data)}, expected {total_size}"
# Verify content (should be all 'X' characters)
expected_data = b"X" * total_size
assert read_data == expected_data, "Large file content should be all 'X' characters"
print("Large file read successfully!")
# Test reading with offset and length on large file
offset = 5 * 1024 * 1024 # 5MB offset
length = 2 * 1024 * 1024 # 2MB length
read_data = await computer.interface.read_bytes(tmp_path, offset=offset, length=length)
assert len(read_data) == length, f"Partial read size should be {length}, got {len(read_data)}"
assert read_data == b"X" * length, "Partial read content should be all 'X' characters"
print("Large file partial read successful!")
# Clean up
await computer.interface.delete_file(tmp_path)
print("Large file test completed successfully!")
@pytest.mark.asyncio(loop_scope="session")
async def test_read_write_text_with_encoding(computer):
"""Test reading and writing text files with different encodings."""
print("Testing text file operations with different encodings...")
tmp_path = "test_encoding.txt"
# Test UTF-8 encoding (default)
utf8_content = "Hello, 世界! 🌍 Ñoño café"
await computer.interface.write_text(tmp_path, utf8_content, encoding="utf-8")
read_utf8 = await computer.interface.read_text(tmp_path, encoding="utf-8")
assert read_utf8 == utf8_content, "UTF-8 content should match"
# Test ASCII encoding
ascii_content = "Hello, World! Simple ASCII text."
await computer.interface.write_text(tmp_path, ascii_content, encoding="ascii")
read_ascii = await computer.interface.read_text(tmp_path, encoding="ascii")
assert read_ascii == ascii_content, "ASCII content should match"
# Test Latin-1 encoding
latin1_content = "Café, naïve, résumé"
await computer.interface.write_text(tmp_path, latin1_content, encoding="latin-1")
read_latin1 = await computer.interface.read_text(tmp_path, encoding="latin-1")
assert read_latin1 == latin1_content, "Latin-1 content should match"
# Clean up
await computer.interface.delete_file(tmp_path)
print("Text encoding test completed successfully!")
@pytest.mark.asyncio(loop_scope="session")
async def test_write_text_append_mode(computer):
"""Test appending text to files."""
print("Testing text file append mode...")
tmp_path = "test_append.txt"
# Write initial content
initial_content = "First line\n"
await computer.interface.write_text(tmp_path, initial_content)
# Append more content
append_content = "Second line\n"
await computer.interface.write_text(tmp_path, append_content, append=True)
# Read and verify
final_content = await computer.interface.read_text(tmp_path)
expected_content = initial_content + append_content
assert (
final_content == expected_content
), f"Expected '{expected_content}', got '{final_content}'"
# Append one more line
third_content = "Third line\n"
await computer.interface.write_text(tmp_path, third_content, append=True)
# Read and verify final result
final_content = await computer.interface.read_text(tmp_path)
expected_content = initial_content + append_content + third_content
assert (
final_content == expected_content
), f"Expected '{expected_content}', got '{final_content}'"
# Clean up
await computer.interface.delete_file(tmp_path)
print("Text append test completed successfully!")
@pytest.mark.asyncio(loop_scope="session")
async def test_large_text_file(computer):
"""Test reading and writing large text files (>5MB) to verify chunked operations."""
print("Testing large text file operations...")
tmp_path = "test_large_text.txt"
# Create a large text content (approximately 6MB)
# Each line is about 100 characters, so 60,000 lines ≈ 6MB
line_template = "This is line {:06d} with some additional text to make it longer and reach about 100 chars.\n"
large_content = ""
num_lines = 60000
print(f"Generating large text content with {num_lines} lines...")
for i in range(num_lines):
large_content += line_template.format(i)
content_size_mb = len(large_content.encode("utf-8")) / (1024 * 1024)
print(f"Generated text content size: {content_size_mb:.2f} MB")
# Write the large text file
print("Writing large text file...")
await computer.interface.write_text(tmp_path, large_content)
# Read the entire file back
print("Reading large text file...")
read_content = await computer.interface.read_text(tmp_path)
# Verify content matches
assert read_content == large_content, "Large text file content should match exactly"
# Test partial reading by reading as bytes and decoding specific portions
print("Testing partial text reading...")
# Read first 1000 characters worth of bytes
first_1000_chars = large_content[:1000]
first_1000_bytes = first_1000_chars.encode("utf-8")
read_bytes = await computer.interface.read_bytes(
tmp_path, offset=0, length=len(first_1000_bytes)
)
decoded_partial = read_bytes.decode("utf-8")
assert decoded_partial == first_1000_chars, "Partial text reading should match"
# Test appending to large file
print("Testing append to large text file...")
append_text = "\n--- APPENDED CONTENT ---\nThis content was appended to the large file.\n"
await computer.interface.write_text(tmp_path, append_text, append=True)
# Read and verify appended content
final_content = await computer.interface.read_text(tmp_path)
expected_final = large_content + append_text
assert final_content == expected_final, "Appended large text file should match"
# Clean up
await computer.interface.delete_file(tmp_path)
print("Large text file test completed successfully!")
@pytest.mark.asyncio(loop_scope="session")
async def test_text_file_edge_cases(computer):
"""Test edge cases for text file operations."""
print("Testing text file edge cases...")
tmp_path = "test_edge_cases.txt"
# Test empty file
empty_content = ""
await computer.interface.write_text(tmp_path, empty_content)
read_empty = await computer.interface.read_text(tmp_path)
assert read_empty == empty_content, "Empty file should return empty string"
# Test file with only whitespace
whitespace_content = " \n\t\r\n \n"
await computer.interface.write_text(tmp_path, whitespace_content)
read_whitespace = await computer.interface.read_text(tmp_path)
assert read_whitespace == whitespace_content, "Whitespace content should be preserved"
# Test file with special characters and newlines
special_content = "Line 1\nLine 2\r\nLine 3\tTabbed\nSpecial: !@#$%^&*()\n"
await computer.interface.write_text(tmp_path, special_content)
read_special = await computer.interface.read_text(tmp_path)
assert read_special == special_content, "Special characters should be preserved"
# Test very long single line (no newlines)
long_line = "A" * 10000 # 10KB single line
await computer.interface.write_text(tmp_path, long_line)
read_long_line = await computer.interface.read_text(tmp_path)
assert read_long_line == long_line, "Long single line should be preserved"
# Clean up
await computer.interface.delete_file(tmp_path)
print("Text file edge cases test completed successfully!")
if __name__ == "__main__":
# Run tests directly
pytest.main([__file__, "-v"])
```
--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/handlers/generic.py:
--------------------------------------------------------------------------------
```python
"""
Generic handlers for all OSes.
Includes:
- DesktopHandler
- FileHandler
"""
import base64
import os
import platform
import subprocess
import webbrowser
from pathlib import Path
from typing import Any, Dict, Optional
from ..utils import wallpaper
from .base import BaseDesktopHandler, BaseFileHandler, BaseWindowHandler
try:
import pywinctl as pwc
except Exception: # pragma: no cover
pwc = None # type: ignore
def resolve_path(path: str) -> Path:
"""Resolve a path to its absolute path. Expand ~ to the user's home directory.
Args:
path: The file or directory path to resolve
Returns:
Path: The resolved absolute path
"""
return Path(path).expanduser().resolve()
# ===== Cross-platform Desktop command handlers =====
class GenericDesktopHandler(BaseDesktopHandler):
"""
Generic desktop handler providing desktop-related operations.
Implements:
- get_desktop_environment: detect current desktop environment
- set_wallpaper: set desktop wallpaper path
"""
async def get_desktop_environment(self) -> Dict[str, Any]:
"""
Get the current desktop environment.
Returns:
Dict containing 'success' boolean and either 'environment' string or 'error' string
"""
try:
env = wallpaper.get_desktop_environment()
return {"success": True, "environment": env}
except Exception as e:
return {"success": False, "error": str(e)}
async def set_wallpaper(self, path: str) -> Dict[str, Any]:
"""
Set the desktop wallpaper to the specified path.
Args:
path: The file path to set as wallpaper
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
file_path = resolve_path(path)
ok = wallpaper.set_wallpaper(str(file_path))
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
# ===== Cross-platform window control command handlers =====
class GenericWindowHandler(BaseWindowHandler):
"""
Cross-platform window management using pywinctl where possible.
"""
async def open(self, target: str) -> Dict[str, Any]:
try:
if target.startswith("http://") or target.startswith("https://"):
ok = webbrowser.open(target)
return {"success": bool(ok)}
path = str(resolve_path(target))
sys = platform.system().lower()
if sys == "darwin":
subprocess.Popen(["open", path])
elif sys == "linux":
subprocess.Popen(["xdg-open", path])
elif sys == "windows":
os.startfile(path) # type: ignore[attr-defined]
else:
return {"success": False, "error": f"Unsupported OS: {sys}"}
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def launch(self, app: str, args: Optional[list[str]] = None) -> Dict[str, Any]:
try:
if args:
proc = subprocess.Popen([app, *args])
else:
# allow shell command like "libreoffice --writer"
proc = subprocess.Popen(app, shell=True)
return {"success": True, "pid": proc.pid}
except Exception as e:
return {"success": False, "error": str(e)}
def _get_window_by_id(self, window_id: int | str) -> Optional[Any]:
if pwc is None:
raise RuntimeError("pywinctl not available")
# Find by native handle among Window objects; getAllWindowsDict keys are titles
try:
for w in pwc.getAllWindows():
if str(w.getHandle()) == str(window_id):
return w
return None
except Exception:
return None
async def get_current_window_id(self) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
win = pwc.getActiveWindow()
if not win:
return {"success": False, "error": "No active window"}
return {"success": True, "window_id": win.getHandle()}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_application_windows(self, app: str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
wins = pwc.getWindowsWithTitle(app, condition=pwc.Re.CONTAINS, flags=pwc.Re.IGNORECASE)
ids = [w.getHandle() for w in wins]
return {"success": True, "windows": ids}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_window_name(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
return {"success": True, "name": w.title}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_window_size(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
width, height = w.size
return {"success": True, "width": int(width), "height": int(height)}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_window_position(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
x, y = w.position
return {"success": True, "x": int(x), "y": int(y)}
except Exception as e:
return {"success": False, "error": str(e)}
async def set_window_size(
self, window_id: int | str, width: int, height: int
) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
ok = w.resizeTo(int(width), int(height))
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
async def set_window_position(self, window_id: int | str, x: int, y: int) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
ok = w.moveTo(int(x), int(y))
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
async def maximize_window(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
ok = w.maximize()
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
async def minimize_window(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
ok = w.minimize()
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
async def activate_window(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
ok = w.activate()
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
async def close_window(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
ok = w.close()
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
# ===== Cross-platform file system command handlers =====
class GenericFileHandler(BaseFileHandler):
"""
Generic file handler that provides file system operations for all operating systems.
This class implements the BaseFileHandler interface and provides methods for
file and directory operations including reading, writing, creating, and deleting
files and directories.
"""
async def file_exists(self, path: str) -> Dict[str, Any]:
"""
Check if a file exists at the specified path.
Args:
path: The file path to check
Returns:
Dict containing 'success' boolean and either 'exists' boolean or 'error' string
"""
try:
return {"success": True, "exists": resolve_path(path).is_file()}
except Exception as e:
return {"success": False, "error": str(e)}
async def directory_exists(self, path: str) -> Dict[str, Any]:
"""
Check if a directory exists at the specified path.
Args:
path: The directory path to check
Returns:
Dict containing 'success' boolean and either 'exists' boolean or 'error' string
"""
try:
return {"success": True, "exists": resolve_path(path).is_dir()}
except Exception as e:
return {"success": False, "error": str(e)}
async def list_dir(self, path: str) -> Dict[str, Any]:
"""
List all files and directories in the specified directory.
Args:
path: The directory path to list
Returns:
Dict containing 'success' boolean and either 'files' list of names or 'error' string
"""
try:
return {
"success": True,
"files": [
p.name for p in resolve_path(path).iterdir() if p.is_file() or p.is_dir()
],
}
except Exception as e:
return {"success": False, "error": str(e)}
async def read_text(self, path: str) -> Dict[str, Any]:
"""
Read the contents of a text file.
Args:
path: The file path to read from
Returns:
Dict containing 'success' boolean and either 'content' string or 'error' string
"""
try:
return {"success": True, "content": resolve_path(path).read_text()}
except Exception as e:
return {"success": False, "error": str(e)}
async def write_text(self, path: str, content: str) -> Dict[str, Any]:
"""
Write text content to a file.
Args:
path: The file path to write to
content: The text content to write
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).write_text(content)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def write_bytes(
self, path: str, content_b64: str, append: bool = False
) -> Dict[str, Any]:
"""
Write binary content to a file from base64 encoded string.
Args:
path: The file path to write to
content_b64: Base64 encoded binary content
append: If True, append to existing file; if False, overwrite
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
mode = "ab" if append else "wb"
with open(resolve_path(path), mode) as f:
f.write(base64.b64decode(content_b64))
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def read_bytes(
self, path: str, offset: int = 0, length: Optional[int] = None
) -> Dict[str, Any]:
"""
Read binary content from a file and return as base64 encoded string.
Args:
path: The file path to read from
offset: Byte offset to start reading from
length: Number of bytes to read; if None, read entire file from offset
Returns:
Dict containing 'success' boolean and either 'content_b64' string or 'error' string
"""
try:
file_path = resolve_path(path)
with open(file_path, "rb") as f:
if offset > 0:
f.seek(offset)
if length is not None:
content = f.read(length)
else:
content = f.read()
return {"success": True, "content_b64": base64.b64encode(content).decode("utf-8")}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_file_size(self, path: str) -> Dict[str, Any]:
"""
Get the size of a file in bytes.
Args:
path: The file path to get size for
Returns:
Dict containing 'success' boolean and either 'size' integer or 'error' string
"""
try:
file_path = resolve_path(path)
size = file_path.stat().st_size
return {"success": True, "size": size}
except Exception as e:
return {"success": False, "error": str(e)}
async def delete_file(self, path: str) -> Dict[str, Any]:
"""
Delete a file at the specified path.
Args:
path: The file path to delete
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).unlink()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def create_dir(self, path: str) -> Dict[str, Any]:
"""
Create a directory at the specified path.
Creates parent directories if they don't exist and doesn't raise an error
if the directory already exists.
Args:
path: The directory path to create
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).mkdir(parents=True, exist_ok=True)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def delete_dir(self, path: str) -> Dict[str, Any]:
"""
Delete an empty directory at the specified path.
Args:
path: The directory path to delete
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).rmdir()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
```
--------------------------------------------------------------------------------
/docs/content/docs/example-usecases/post-event-contact-export.mdx:
--------------------------------------------------------------------------------
```markdown
---
title: Post-Event Contact Export
description: Run overnight contact extraction from LinkedIn, X, or other social platforms after networking events
---
import { Step, Steps } from 'fumadocs-ui/components/steps';
import { Tab, Tabs } from 'fumadocs-ui/components/tabs';
## Overview
After networking events, you need to export new connections from LinkedIn, X, or other platforms into your CRM. This automation handles it for you.
**The workflow**: Kick off the script after an event and let it run overnight. Wake up to a clean CSV ready for your CRM or email tool.
This example focuses on LinkedIn but works across platforms. It uses [Cua Computer](/computer-sdk/computers) to interact with web interfaces and [Agent Loops](/agent-sdk/agent-loops) to iterate through connections with conversation history.
### Why Cua is Perfect for This
**Cua's VMs save your session data**, bypassing bot detection entirely:
- **Log in once manually** through the VM browser
- **Session persists** - you appear as a regular user, not a bot
- **No captchas** - the platform treats automation like normal browsing
- **No login code** - script doesn't handle authentication
- **Run overnight** - kick off and forget
Traditional web scraping triggers anti-bot measures immediately. Cua's approach works across all platforms.
### What You Get
The script generates two files with your extracted connections:
**CSV Export** (`linkedin_connections_20250116_143022.csv`):
```csv
first,last,role,company,met_at,linkedin
John,Smith,Software Engineer,Acme Corp,Google Devfest Toronto,https://www.linkedin.com/in/johnsmith
Sarah,Johnson,Product Manager,Tech Inc,Google Devfest Toronto,https://www.linkedin.com/in/sarahjohnson
```
**Messaging Links** (`linkedin_messaging_links_20250116_143022.txt`):
```
LinkedIn Messaging Compose Links
================================================================================
1. https://www.linkedin.com/messaging/compose/?recipient=johnsmith
2. https://www.linkedin.com/messaging/compose/?recipient=sarahjohnson
```
---
<Steps>
<Step>
### Set Up Your Environment
First, install the required dependencies:
Create a `requirements.txt` file:
```text
cua-agent
cua-computer
python-dotenv>=1.0.0
```
Install the dependencies:
```bash
pip install -r requirements.txt
```
Create a `.env` file with your API keys:
```text
ANTHROPIC_API_KEY=your-anthropic-api-key # optional, BYOK. By default, this cookbook uses the CUA VLM Router
CUA_API_KEY=sk_cua-api01...
CUA_CONTAINER_NAME=m-linux-...
```
Finally, setup your VM. Refer to the [quickstart guide](https://cua.ai/docs/get-started/quickstart) on how to setup the computer environment.
</Step>
<Step>
### Log Into LinkedIn Manually
**Important**: Before running the script, manually log into LinkedIn through your VM:
1. Access your VM through the Cua dashboard
2. Open a browser and navigate to LinkedIn
3. Log in with your credentials (handle any captchas manually)
4. Close the browser but leave the VM running
5. Your session is now saved and ready for automation!
This one-time manual login bypasses all bot detection.
</Step>
<Step>
### Configure and Create Your Script
Create a Python file (e.g., `contact_export.py`). You can customize:
```python
# Where you met these connections (automatically added to CSV)
MET_AT_REASON = "Google Devfest Toronto"
# Number of contacts to extract (in the main loop)
for contact_num in range(1, 21): # Change 21 to extract more/fewer contacts
```
Select your environment:
<Tabs items={['Cloud Sandbox', 'Linux on Docker', 'macOS Sandbox', 'Windows Sandbox']}>
<Tab value="Cloud Sandbox">
```python
import asyncio
import csv
import logging
import os
import signal
import traceback
from datetime import datetime
from agent import ComputerAgent
from computer import Computer, VMProviderType
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration: Define where you met these connections
MET_AT_REASON = "Google Devfest Toronto"
def handle_sigint(sig, frame):
print("\n\nExecution interrupted by user. Exiting gracefully...")
exit(0)
def extract_public_id_from_linkedin_url(linkedin_url):
"""Extract public ID from LinkedIn profile URL."""
if not linkedin_url:
return None
url = linkedin_url.split('?')[0].rstrip('/')
if '/in/' in url:
public_id = url.split('/in/')[-1]
return public_id
return None
def extract_contact_from_response(result_output):
"""
Extract contact information from agent's response.
Expects format:
FIRST: value
LAST: value
ROLE: value
COMPANY: value
LINKEDIN: value
"""
contact = {
'first': '',
'last': '',
'role': '',
'company': '',
'met_at': MET_AT_REASON,
'linkedin': ''
}
for item in result_output:
if item.get("type") == "message":
content = item.get("content", [])
for content_part in content:
text = content_part.get("text", "")
if text:
for line in text.split('\n'):
line = line.strip()
line_upper = line.upper()
if line_upper.startswith("FIRST:"):
value = line[6:].strip()
if value and value.upper() != "N/A":
contact['first'] = value
elif line_upper.startswith("LAST:"):
value = line[5:].strip()
if value and value.upper() != "N/A":
contact['last'] = value
elif line_upper.startswith("ROLE:"):
value = line[5:].strip()
if value and value.upper() != "N/A":
contact['role'] = value
elif line_upper.startswith("COMPANY:"):
value = line[8:].strip()
if value and value.upper() != "N/A":
contact['company'] = value
elif line_upper.startswith("LINKEDIN:"):
value = line[9:].strip()
if value and value.upper() != "N/A":
contact['linkedin'] = value
return contact
async def scrape_linkedin_connections():
"""Scrape LinkedIn connections and export to CSV."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_filename = f"linkedin_connections_{timestamp}.csv"
csv_path = os.path.join(os.getcwd(), csv_filename)
# Initialize CSV file
with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=['first', 'last', 'role', 'company', 'met_at', 'linkedin'])
writer.writeheader()
print(f"\n🚀 Starting LinkedIn connections scraper")
print(f"📁 Output file: {csv_path}")
print(f"📍 Met at: {MET_AT_REASON}")
print("=" * 80)
try:
async with Computer(
os_type="linux",
provider_type=VMProviderType.CLOUD,
name=os.environ["CUA_CONTAINER_NAME"], # Your sandbox name
api_key=os.environ["CUA_API_KEY"],
verbosity=logging.INFO,
) as computer:
agent = ComputerAgent(
model="cua/anthropic/claude-sonnet-4.5",
tools=[computer],
only_n_most_recent_images=3,
verbosity=logging.INFO,
trajectory_dir="trajectories",
use_prompt_caching=True,
max_trajectory_budget=10.0,
)
history = []
# Task 1: Navigate to LinkedIn connections page
navigation_task = (
"STEP 1 - NAVIGATE TO LINKEDIN CONNECTIONS PAGE:\n"
"1. Open a web browser (Chrome or Firefox)\n"
"2. Navigate to https://www.linkedin.com/mynetwork/invite-connect/connections/\n"
"3. Wait for the page to fully load\n"
"4. Confirm you can see the list of connections\n"
"5. Ready to start extracting contacts"
)
print(f"\n[Task 1/21] Navigating to LinkedIn...")
history.append({"role": "user", "content": navigation_task})
async for result in agent.run(history, stream=False):
history += result.get("output", [])
print(f"✅ Navigation completed\n")
# Extract 20 contacts
contacts_extracted = 0
linkedin_urls = []
previous_contact_name = None
for contact_num in range(1, 21):
# Build extraction task
if contact_num == 1:
extraction_task = (
f"STEP {contact_num + 1} - EXTRACT CONTACT {contact_num} OF 20:\n"
f"1. Click on the first connection's profile\n"
f"2. Extract: FIRST, LAST, ROLE, COMPANY, LINKEDIN URL\n"
f"3. Return in exact format:\n"
f"FIRST: [value]\n"
f"LAST: [value]\n"
f"ROLE: [value]\n"
f"COMPANY: [value]\n"
f"LINKEDIN: [value]\n"
f"4. Navigate back to connections list"
)
else:
extraction_task = (
f"STEP {contact_num + 1} - EXTRACT CONTACT {contact_num} OF 20:\n"
f"1. Find '{previous_contact_name}' in the list\n"
f"2. Click on the contact BELOW them\n"
f"3. Extract: FIRST, LAST, ROLE, COMPANY, LINKEDIN URL\n"
f"4. Return in exact format:\n"
f"FIRST: [value]\n"
f"LAST: [value]\n"
f"ROLE: [value]\n"
f"COMPANY: [value]\n"
f"LINKEDIN: [value]\n"
f"5. Navigate back"
)
print(f"[Task {contact_num + 1}/21] Extracting contact {contact_num}/20...")
history.append({"role": "user", "content": extraction_task})
all_output = []
async for result in agent.run(history, stream=False):
output = result.get("output", [])
history += output
all_output.extend(output)
contact_data = extract_contact_from_response(all_output)
has_name = bool(contact_data['first'] and contact_data['last'])
has_linkedin = bool(contact_data['linkedin'] and 'linkedin.com' in contact_data['linkedin'])
if has_name or has_linkedin:
with open(csv_path, 'a', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=['first', 'last', 'role', 'company', 'met_at', 'linkedin'])
writer.writerow(contact_data)
contacts_extracted += 1
if contact_data['linkedin']:
linkedin_urls.append(contact_data['linkedin'])
if has_name:
previous_contact_name = f"{contact_data['first']} {contact_data['last']}".strip()
name_str = f"{contact_data['first']} {contact_data['last']}" if has_name else "[No name]"
print(f"✅ Contact {contact_num}/20 saved: {name_str}")
else:
print(f"⚠️ Could not extract valid data for contact {contact_num}")
if contact_num % 5 == 0:
print(f"\n📈 Progress: {contacts_extracted}/{contact_num} contacts extracted\n")
# Create messaging links file
messaging_filename = f"linkedin_messaging_links_{timestamp}.txt"
messaging_path = os.path.join(os.getcwd(), messaging_filename)
with open(messaging_path, 'w', encoding='utf-8') as txtfile:
txtfile.write("LinkedIn Messaging Compose Links\n")
txtfile.write("=" * 80 + "\n\n")
for i, linkedin_url in enumerate(linkedin_urls, 1):
public_id = extract_public_id_from_linkedin_url(linkedin_url)
if public_id:
messaging_url = f"https://www.linkedin.com/messaging/compose/?recipient={public_id}"
txtfile.write(f"{i}. {messaging_url}\n")
print("\n" + "="*80)
print("🎉 All tasks completed!")
print(f"📁 CSV file saved to: {csv_path}")
print(f"📊 Total contacts extracted: {contacts_extracted}/20")
print(f"💬 Messaging links saved to: {messaging_path}")
print("="*80)
except Exception as e:
print(f"\n❌ Error: {e}")
traceback.print_exc()
raise
def main():
try:
load_dotenv()
if "ANTHROPIC_API_KEY" not in os.environ:
raise RuntimeError("Please set ANTHROPIC_API_KEY in .env")
if "CUA_API_KEY" not in os.environ:
raise RuntimeError("Please set CUA_API_KEY in .env")
if "CUA_CONTAINER_NAME" not in os.environ:
raise RuntimeError("Please set CUA_CONTAINER_NAME in .env")
signal.signal(signal.SIGINT, handle_sigint)
asyncio.run(scrape_linkedin_connections())
except Exception as e:
print(f"\n❌ Error: {e}")
traceback.print_exc()
if __name__ == "__main__":
main()
```
</Tab>
<Tab value="Linux on Docker">
```python
# Same code as Cloud Sandbox, but change Computer initialization to:
async with Computer(
os_type="linux",
provider_type=VMProviderType.DOCKER,
image="trycua/cua-xfce:latest",
verbosity=logging.INFO,
) as computer:
```
And remove the `CUA_API_KEY` and `CUA_CONTAINER_NAME` requirements from `.env` and the validation checks.
</Tab>
<Tab value="macOS Sandbox">
```python
# Same code as Cloud Sandbox, but change Computer initialization to:
async with Computer(
os_type="macos",
provider_type=VMProviderType.LUME,
name="macos-sequoia-cua:latest",
verbosity=logging.INFO,
) as computer:
```
And remove the `CUA_API_KEY` and `CUA_CONTAINER_NAME` requirements from `.env` and the validation checks.
</Tab>
<Tab value="Windows Sandbox">
```python
# Same code as Cloud Sandbox, but change Computer initialization to:
async with Computer(
os_type="windows",
provider_type=VMProviderType.WINDOWS_SANDBOX,
verbosity=logging.INFO,
) as computer:
```
And remove the `CUA_API_KEY` and `CUA_CONTAINER_NAME` requirements from `.env` and the validation checks.
</Tab>
</Tabs>
</Step>
<Step>
### Run Your Script
Execute your contact extraction automation:
```bash
python contact_export.py
```
The agent will:
1. Navigate to your LinkedIn connections page
2. Extract data from 20 contacts (first name, last name, role, company, LinkedIn URL)
3. Save contacts to a timestamped CSV file
4. Generate messaging compose links for easy follow-up
Monitor the output to see the agent's progress. The script will show a progress update every 5 contacts.
</Step>
</Steps>
---
## How It Works
This script demonstrates a practical workflow for extracting LinkedIn connection data:
1. **Session Persistence** - Manually log into LinkedIn through the VM once, and the VM saves your session
2. **Navigation** - The script navigates to your connections page using your saved authenticated session
3. **Data Extraction** - For each contact, the agent clicks their profile, extracts data, and navigates back
4. **Python Processing** - Python parses responses, validates data, and writes to CSV incrementally
5. **Output Files** - Generates a CSV with contact data and a text file with messaging URLs
## Next Steps
- Learn more about [Cua computers](/computer-sdk/computers) and [computer commands](/computer-sdk/commands)
- Read about [Agent loops](/agent-sdk/agent-loops), [tools](/agent-sdk/custom-tools), and [supported model providers](/agent-sdk/supported-model-providers/)
- Experiment with different [Models and Providers](/agent-sdk/supported-model-providers/)
- Adapt this script for other platforms (Twitter/X, email extraction, etc.)
- Join our [Discord community](https://discord.com/invite/mVnXXpdE85) for help
```
--------------------------------------------------------------------------------
/libs/python/mcp-server/mcp_server/server.py:
--------------------------------------------------------------------------------
```python
import asyncio
import base64
import inspect
import logging
import os
import signal
import sys
import traceback
import uuid
from typing import Any, Dict, List, Optional, Tuple, Union
import anyio
# Configure logging to output to stderr for debug visibility
logging.basicConfig(
level=logging.DEBUG, # Changed to DEBUG
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stderr,
)
logger = logging.getLogger("mcp-server")
# More visible startup message
logger.debug("MCP Server module loading...")
try:
from mcp.server.fastmcp import Context, FastMCP
# Use the canonical Image type
from mcp.server.fastmcp.utilities.types import Image
logger.debug("Successfully imported FastMCP")
except ImportError as e:
logger.error(f"Failed to import FastMCP: {e}")
traceback.print_exc(file=sys.stderr)
sys.exit(1)
try:
from agent import ComputerAgent
from computer import Computer
logger.debug("Successfully imported Computer and Agent modules")
except ImportError as e:
logger.error(f"Failed to import Computer/Agent modules: {e}")
traceback.print_exc(file=sys.stderr)
sys.exit(1)
try:
from .session_manager import (
get_session_manager,
initialize_session_manager,
shutdown_session_manager,
)
logger.debug("Successfully imported session manager")
except ImportError as e:
logger.error(f"Failed to import session manager: {e}")
traceback.print_exc(file=sys.stderr)
sys.exit(1)
def get_env_bool(key: str, default: bool = False) -> bool:
"""Get boolean value from environment variable."""
return os.getenv(key, str(default)).lower() in ("true", "1", "yes")
async def _maybe_call_ctx_method(ctx: Context, method_name: str, *args, **kwargs) -> None:
"""Call a context helper if it exists, awaiting the result when necessary."""
method = getattr(ctx, method_name, None)
if not callable(method):
return
result = method(*args, **kwargs)
if inspect.isawaitable(result):
await result
def _normalise_message_content(content: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
"""Normalise message content to a list of structured parts."""
if isinstance(content, list):
return content
if content is None:
return []
return [{"type": "output_text", "text": str(content)}]
def _extract_text_from_content(content: Union[str, List[Dict[str, Any]]]) -> str:
"""Extract textual content for inclusion in the aggregated result string."""
if isinstance(content, str):
return content
texts: List[str] = []
for part in content or []:
if not isinstance(part, dict):
continue
if part.get("type") in {"output_text", "text"} and part.get("text"):
texts.append(str(part["text"]))
return "\n".join(texts)
def _serialise_tool_content(content: Any) -> str:
"""Convert tool outputs into a string for aggregation."""
if isinstance(content, str):
return content
if isinstance(content, list):
texts: List[str] = []
for part in content:
if (
isinstance(part, dict)
and part.get("type") in {"output_text", "text"}
and part.get("text")
):
texts.append(str(part["text"]))
if texts:
return "\n".join(texts)
if content is None:
return ""
return str(content)
def serve() -> FastMCP:
"""Create and configure the MCP server."""
# NOTE: Do not pass model_config here; FastMCP 2.12.x doesn't support it.
server = FastMCP(name="cua-agent")
@server.tool(structured_output=False)
async def screenshot_cua(ctx: Context, session_id: Optional[str] = None) -> Any:
"""
Take a screenshot of the current MacOS VM screen and return the image.
Args:
session_id: Optional session ID for multi-client support. If not provided, a new session will be created.
"""
session_manager = get_session_manager()
async with session_manager.get_session(session_id) as session:
screenshot = await session.computer.interface.screenshot()
# Returning Image object is fine when structured_output=False
return Image(format="png", data=screenshot)
@server.tool(structured_output=False)
async def run_cua_task(ctx: Context, task: str, session_id: Optional[str] = None) -> Any:
"""
Run a Computer-Use Agent (CUA) task in a MacOS VM and return (combined text, final screenshot).
Args:
task: The task description for the agent to execute
session_id: Optional session ID for multi-client support. If not provided, a new session will be created.
"""
session_manager = get_session_manager()
task_id = str(uuid.uuid4())
try:
logger.info(f"Starting CUA task: {task} (task_id: {task_id})")
async with session_manager.get_session(session_id) as session:
# Register this task with the session
await session_manager.register_task(session.session_id, task_id)
try:
# Get model name
model_name = os.getenv("CUA_MODEL_NAME", "anthropic/claude-sonnet-4-5-20250929")
logger.info(f"Using model: {model_name}")
# Create agent with the new v0.4.x API
agent = ComputerAgent(
model=model_name,
only_n_most_recent_images=int(os.getenv("CUA_MAX_IMAGES", "3")),
verbosity=logging.INFO,
tools=[session.computer],
)
messages = [{"role": "user", "content": task}]
# Collect all results
aggregated_messages: List[str] = []
async for result in agent.run(messages):
logger.info("Agent processing step")
ctx.info("Agent processing step")
outputs = result.get("output", [])
for output in outputs:
output_type = output.get("type")
if output_type == "message":
logger.debug("Streaming assistant message: %s", output)
content = _normalise_message_content(output.get("content"))
aggregated_text = _extract_text_from_content(content)
if aggregated_text:
aggregated_messages.append(aggregated_text)
await _maybe_call_ctx_method(
ctx,
"yield_message",
role=output.get("role", "assistant"),
content=content,
)
elif output_type in {"tool_use", "computer_call", "function_call"}:
logger.debug("Streaming tool call: %s", output)
call_id = output.get("id") or output.get("call_id")
tool_name = output.get("name") or output.get("action", {}).get(
"type"
)
tool_input = (
output.get("input")
or output.get("arguments")
or output.get("action")
)
if call_id:
await _maybe_call_ctx_method(
ctx,
"yield_tool_call",
name=tool_name,
call_id=call_id,
input=tool_input,
)
elif output_type in {
"tool_result",
"computer_call_output",
"function_call_output",
}:
logger.debug("Streaming tool output: %s", output)
call_id = output.get("call_id") or output.get("id")
content = output.get("content") or output.get("output")
aggregated_text = _serialise_tool_content(content)
if aggregated_text:
aggregated_messages.append(aggregated_text)
if call_id:
await _maybe_call_ctx_method(
ctx,
"yield_tool_output",
call_id=call_id,
output=content,
is_error=output.get("status") == "failed"
or output.get("is_error", False),
)
logger.info("CUA task completed successfully")
ctx.info("CUA task completed successfully")
screenshot_image = Image(
format="png",
data=await session.computer.interface.screenshot(),
)
return (
"\n".join(aggregated_messages).strip()
or "Task completed with no text output.",
screenshot_image,
)
finally:
# Unregister the task from the session
await session_manager.unregister_task(session.session_id, task_id)
except Exception as e:
error_msg = f"Error running CUA task: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
ctx.error(error_msg)
# Try to get a screenshot from the session if available
try:
if session_id:
async with session_manager.get_session(session_id) as session:
screenshot = await session.computer.interface.screenshot()
return (
f"Error during task execution: {str(e)}",
Image(format="png", data=screenshot),
)
except Exception:
pass
# If we can't get a screenshot, return a placeholder
return (
f"Error during task execution: {str(e)}",
Image(format="png", data=b""),
)
@server.tool(structured_output=False)
async def run_multi_cua_tasks(
ctx: Context, tasks: List[str], session_id: Optional[str] = None, concurrent: bool = False
) -> Any:
"""
Run multiple CUA tasks and return a list of (combined text, screenshot).
Args:
tasks: List of task descriptions to execute
session_id: Optional session ID for multi-client support. If not provided, a new session will be created.
concurrent: If True, run tasks concurrently. If False, run sequentially (default).
"""
total_tasks = len(tasks)
if total_tasks == 0:
ctx.report_progress(1.0)
return []
session_manager = get_session_manager()
if concurrent and total_tasks > 1:
# Run tasks concurrently
logger.info(f"Running {total_tasks} tasks concurrently")
ctx.info(f"Running {total_tasks} tasks concurrently")
# Create tasks with progress tracking
async def run_task_with_progress(
task_index: int, task: str
) -> Tuple[int, Tuple[str, Image]]:
ctx.report_progress(task_index / total_tasks)
result = await run_cua_task(ctx, task, session_id)
ctx.report_progress((task_index + 1) / total_tasks)
return task_index, result
# Create all task coroutines
task_coroutines = [run_task_with_progress(i, task) for i, task in enumerate(tasks)]
# Wait for all tasks to complete
results_with_indices = await asyncio.gather(*task_coroutines, return_exceptions=True)
# Sort results by original task order and handle exceptions
results: List[Tuple[str, Image]] = []
for result in results_with_indices:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
ctx.error(f"Task failed: {str(result)}")
results.append((f"Task failed: {str(result)}", Image(format="png", data=b"")))
else:
_, task_result = result
results.append(task_result)
return results
else:
# Run tasks sequentially (original behavior)
logger.info(f"Running {total_tasks} tasks sequentially")
ctx.info(f"Running {total_tasks} tasks sequentially")
results: List[Tuple[str, Image]] = []
for i, task in enumerate(tasks):
logger.info(f"Running task {i+1}/{total_tasks}: {task}")
ctx.info(f"Running task {i+1}/{total_tasks}: {task}")
ctx.report_progress(i / total_tasks)
task_result = await run_cua_task(ctx, task, session_id)
results.append(task_result)
ctx.report_progress((i + 1) / total_tasks)
return results
@server.tool(structured_output=False)
async def get_session_stats(ctx: Context) -> Dict[str, Any]:
"""
Get statistics about active sessions and resource usage.
"""
session_manager = get_session_manager()
return session_manager.get_session_stats()
@server.tool(structured_output=False)
async def cleanup_session(ctx: Context, session_id: str) -> str:
"""
Cleanup a specific session and release its resources.
Args:
session_id: The session ID to cleanup
"""
session_manager = get_session_manager()
await session_manager.cleanup_session(session_id)
return f"Session {session_id} cleanup initiated"
return server
server = serve()
async def run_server():
"""Run the MCP server with proper lifecycle management."""
session_manager = None
try:
logger.debug("Starting MCP server...")
# Initialize session manager
session_manager = await initialize_session_manager()
logger.info("Session manager initialized")
# Set up signal handlers for graceful shutdown
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, initiating graceful shutdown...")
# Create a task to shutdown gracefully
asyncio.create_task(graceful_shutdown())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Start the server
logger.info("Starting FastMCP server...")
# Use run_stdio_async directly instead of server.run() to avoid nested event loops
await server.run_stdio_async()
except Exception as e:
logger.error(f"Error starting server: {e}")
traceback.print_exc(file=sys.stderr)
raise
finally:
# Ensure cleanup happens
if session_manager:
logger.info("Shutting down session manager...")
await shutdown_session_manager()
async def graceful_shutdown():
"""Gracefully shutdown the server and all sessions."""
logger.info("Initiating graceful shutdown...")
try:
await shutdown_session_manager()
logger.info("Graceful shutdown completed")
except Exception as e:
logger.error(f"Error during graceful shutdown: {e}")
finally:
# Exit the process
import os
os._exit(0)
def main():
"""Run the MCP server with proper async lifecycle management."""
try:
# Use anyio.run instead of asyncio.run to avoid nested event loop issues
anyio.run(run_server)
except KeyboardInterrupt:
logger.info("Server interrupted by user")
except Exception as e:
logger.error(f"Error starting server: {e}")
traceback.print_exc(file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/libs/lume/src/Commands/Logs.swift:
--------------------------------------------------------------------------------
```swift
import ArgumentParser
import Foundation
struct Logs: ParsableCommand {
static let configuration = CommandConfiguration(
abstract: "View lume serve logs",
subcommands: [Info.self, Error.self, All.self],
defaultSubcommand: All.self
)
// Common functionality for reading log files
static func readLogFile(path: String, lines: Int? = nil, follow: Bool = false) -> String {
let fileManager = FileManager.default
// Check if file exists
guard fileManager.fileExists(atPath: path) else {
return "Log file not found at \(path)"
}
do {
// Read file content
let content = try String(contentsOfFile: path, encoding: .utf8)
// If lines parameter is provided, return only the specified number of lines from the end
if let lineCount = lines {
let allLines = content.components(separatedBy: .newlines)
let startIndex = max(0, allLines.count - lineCount)
let lastLines = Array(allLines[startIndex...])
return lastLines.joined(separator: "\n")
}
return content
} catch {
return "Error reading log file: \(error.localizedDescription)"
}
}
// Method for tailing a log file (following new changes)
static func tailLogFile(path: String, initialLines: Int? = 10) {
let fileManager = FileManager.default
// Check if file exists
guard fileManager.fileExists(atPath: path) else {
print("Log file not found at \(path)")
return
}
do {
// Get initial content with only the specified number of lines from the end
var lastPosition: UInt64 = 0
let fileHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: path))
// First, print the last few lines of the file
if let lines = initialLines {
let content = try String(contentsOfFile: path, encoding: .utf8)
let allLines = content.components(separatedBy: .newlines)
let startIndex = max(0, allLines.count - lines)
let lastLines = Array(allLines[startIndex...])
print(lastLines.joined(separator: "\n"))
}
// Get current file size
lastPosition = UInt64(try fileManager.attributesOfItem(atPath: path)[.size] as? UInt64 ?? 0)
// Set up for continuous monitoring
print("\nTailing log file... Press Ctrl+C to stop")
// Monitor file for changes
while true {
// Brief pause to reduce CPU usage
Thread.sleep(forTimeInterval: 0.5)
// Get current size
let currentSize = try fileManager.attributesOfItem(atPath: path)[.size] as? UInt64 ?? 0
// If file has grown
if currentSize > lastPosition {
// Seek to where we last read
fileHandle.seek(toFileOffset: lastPosition)
// Read new content
if let newData = try? fileHandle.readToEnd() {
if let newContent = String(data: newData, encoding: .utf8) {
// Print new content without trailing newline
if newContent.hasSuffix("\n") {
print(newContent, terminator: "")
} else {
print(newContent)
}
}
}
// Update position
lastPosition = currentSize
}
// Handle file rotation (if file became smaller)
else if currentSize < lastPosition {
// File was probably rotated, start from beginning
lastPosition = 0
fileHandle.seek(toFileOffset: 0)
if let newData = try? fileHandle.readToEnd() {
if let newContent = String(data: newData, encoding: .utf8) {
print(newContent, terminator: "")
}
}
lastPosition = currentSize
}
}
} catch {
print("Error tailing log file: \(error.localizedDescription)")
}
}
// MARK: - Info Logs Subcommand
struct Info: ParsableCommand {
static let configuration = CommandConfiguration(
commandName: "info",
abstract: "View info logs from the daemon"
)
@Option(name: .shortAndLong, help: "Number of lines to display from the end of the file")
var lines: Int?
@Flag(name: .shortAndLong, help: "Follow log file continuously (like tail -f)")
var follow: Bool = false
func run() throws {
let logPath = "/tmp/lume_daemon.log"
print("=== Info Logs ===")
if follow {
// Use tailing functionality to continuously monitor the log
Logs.tailLogFile(path: logPath, initialLines: lines ?? 10)
} else {
// Regular one-time viewing of logs
let content = Logs.readLogFile(path: logPath, lines: lines)
print(content)
}
}
}
// MARK: - Error Logs Subcommand
struct Error: ParsableCommand {
static let configuration = CommandConfiguration(
commandName: "error",
abstract: "View error logs from the daemon"
)
@Option(name: .shortAndLong, help: "Number of lines to display from the end of the file")
var lines: Int?
@Flag(name: .shortAndLong, help: "Follow log file continuously (like tail -f)")
var follow: Bool = false
func run() throws {
let logPath = "/tmp/lume_daemon.error.log"
print("=== Error Logs ===")
if follow {
// Use tailing functionality to continuously monitor the log
Logs.tailLogFile(path: logPath, initialLines: lines ?? 10)
} else {
// Regular one-time viewing of logs
let content = Logs.readLogFile(path: logPath, lines: lines)
print(content)
}
}
}
// MARK: - All Logs Subcommand
struct All: ParsableCommand {
static let configuration = CommandConfiguration(
commandName: "all",
abstract: "View both info and error logs from the daemon"
)
@Option(name: .shortAndLong, help: "Number of lines to display from the end of each file")
var lines: Int?
@Flag(name: .shortAndLong, help: "Follow log files continuously (like tail -f)")
var follow: Bool = false
// Custom implementation to tail both logs simultaneously
private func tailBothLogs(infoPath: String, errorPath: String, initialLines: Int? = 10) {
let fileManager = FileManager.default
var infoExists = fileManager.fileExists(atPath: infoPath)
var errorExists = fileManager.fileExists(atPath: errorPath)
if !infoExists && !errorExists {
print("Neither info nor error log files found")
return
}
// Print initial content
print("=== Info Logs ===")
if infoExists {
if let lines = initialLines {
let content = (try? String(contentsOfFile: infoPath, encoding: .utf8)) ?? ""
let allLines = content.components(separatedBy: .newlines)
let startIndex = max(0, allLines.count - lines)
let lastLines = Array(allLines[startIndex...])
print(lastLines.joined(separator: "\n"))
}
} else {
print("Info log file not found")
}
print("\n=== Error Logs ===")
if errorExists {
if let lines = initialLines {
let content = (try? String(contentsOfFile: errorPath, encoding: .utf8)) ?? ""
let allLines = content.components(separatedBy: .newlines)
let startIndex = max(0, allLines.count - lines)
let lastLines = Array(allLines[startIndex...])
print(lastLines.joined(separator: "\n"))
}
} else {
print("Error log file not found")
}
print("\nTailing both log files... Press Ctrl+C to stop")
// Initialize file handles and positions
var infoHandle: FileHandle? = nil
var errorHandle: FileHandle? = nil
var infoPosition: UInt64 = 0
var errorPosition: UInt64 = 0
// Set up file handles
if infoExists {
do {
infoHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: infoPath))
infoPosition = UInt64(try fileManager.attributesOfItem(atPath: infoPath)[.size] as? UInt64 ?? 0)
} catch {
print("Error opening info log file: \(error.localizedDescription)")
}
}
if errorExists {
do {
errorHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: errorPath))
errorPosition = UInt64(try fileManager.attributesOfItem(atPath: errorPath)[.size] as? UInt64 ?? 0)
} catch {
print("Error opening error log file: \(error.localizedDescription)")
}
}
// Monitor both files for changes
while true {
Thread.sleep(forTimeInterval: 0.5)
// Check for new content in info log
if let handle = infoHandle {
do {
// Re-check existence in case file was deleted
infoExists = fileManager.fileExists(atPath: infoPath)
if !infoExists {
print("\n[Info log file was removed]")
infoHandle = nil
continue
}
let currentSize = try fileManager.attributesOfItem(atPath: infoPath)[.size] as? UInt64 ?? 0
if currentSize > infoPosition {
handle.seek(toFileOffset: infoPosition)
if let newData = try? handle.readToEnd() {
if let newContent = String(data: newData, encoding: .utf8) {
print("\n--- New Info Log Content ---")
if newContent.hasSuffix("\n") {
print(newContent, terminator: "")
} else {
print(newContent)
}
}
}
infoPosition = currentSize
} else if currentSize < infoPosition {
// File was rotated
print("\n[Info log was rotated]")
infoPosition = 0
handle.seek(toFileOffset: 0)
if let newData = try? handle.readToEnd() {
if let newContent = String(data: newData, encoding: .utf8) {
print("\n--- New Info Log Content ---")
print(newContent, terminator: "")
}
}
infoPosition = currentSize
}
} catch {
print("\nError reading info log: \(error.localizedDescription)")
}
} else if fileManager.fileExists(atPath: infoPath) && !infoExists {
// File exists again after being deleted
do {
infoHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: infoPath))
infoPosition = 0
infoExists = true
print("\n[Info log file reappeared]")
} catch {
print("\nError reopening info log: \(error.localizedDescription)")
}
}
// Check for new content in error log
if let handle = errorHandle {
do {
// Re-check existence in case file was deleted
errorExists = fileManager.fileExists(atPath: errorPath)
if !errorExists {
print("\n[Error log file was removed]")
errorHandle = nil
continue
}
let currentSize = try fileManager.attributesOfItem(atPath: errorPath)[.size] as? UInt64 ?? 0
if currentSize > errorPosition {
handle.seek(toFileOffset: errorPosition)
if let newData = try? handle.readToEnd() {
if let newContent = String(data: newData, encoding: .utf8) {
print("\n--- New Error Log Content ---")
if newContent.hasSuffix("\n") {
print(newContent, terminator: "")
} else {
print(newContent)
}
}
}
errorPosition = currentSize
} else if currentSize < errorPosition {
// File was rotated
print("\n[Error log was rotated]")
errorPosition = 0
handle.seek(toFileOffset: 0)
if let newData = try? handle.readToEnd() {
if let newContent = String(data: newData, encoding: .utf8) {
print("\n--- New Error Log Content ---")
print(newContent, terminator: "")
}
}
errorPosition = currentSize
}
} catch {
print("\nError reading error log: \(error.localizedDescription)")
}
} else if fileManager.fileExists(atPath: errorPath) && !errorExists {
// File exists again after being deleted
do {
errorHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: errorPath))
errorPosition = 0
errorExists = true
print("\n[Error log file reappeared]")
} catch {
print("\nError reopening error log: \(error.localizedDescription)")
}
}
}
}
func run() throws {
let infoLogPath = "/tmp/lume_daemon.log"
let errorLogPath = "/tmp/lume_daemon.error.log"
if follow {
// Use custom tailing implementation for both logs
tailBothLogs(infoPath: infoLogPath, errorPath: errorLogPath, initialLines: lines ?? 10)
} else {
// Regular one-time viewing of logs
let infoContent = Logs.readLogFile(path: infoLogPath, lines: lines)
let errorContent = Logs.readLogFile(path: errorLogPath, lines: lines)
print("=== Info Logs ===")
print(infoContent)
print("\n=== Error Logs ===")
print(errorContent)
}
}
}
}
```
--------------------------------------------------------------------------------
/examples/som_examples.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Example script demonstrating the usage of OmniParser's UI element detection functionality.
This script shows how to:
1. Initialize the OmniParser
2. Load and process images
3. Visualize detection results
4. Compare performance between CPU and MPS (Apple Silicon)
"""
import argparse
import base64
import glob
import io
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
import numpy as np
from PIL import Image
# Load environment variables from .env file
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
print(f"Loading environment from: {env_file}")
from dotenv import load_dotenv
load_dotenv(env_file)
# Add paths to sys.path if needed
pythonpath = os.environ.get("PYTHONPATH", "")
for path in pythonpath.split(":"):
if path and path not in sys.path:
sys.path.append(path)
print(f"Added to sys.path: {path}")
# Add the libs directory to the path to find som
libs_path = project_root / "libs"
if str(libs_path) not in sys.path:
sys.path.append(str(libs_path))
print(f"Added to sys.path: {libs_path}")
from som import IconElement, OmniParser, ParseResult, TextElement
from som.models import BoundingBox, ParserMetadata, UIElement
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
def setup_logging():
"""Configure logging with a nice format."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
class Timer:
"""Enhanced context manager for timing code blocks."""
def __init__(self, name: str, logger):
self.name = name
self.logger = logger
self.start_time: float = 0.0
self.elapsed_time: float = 0.0
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, *args):
self.elapsed_time = time.time() - self.start_time
self.logger.info(f"{self.name}: {self.elapsed_time:.3f}s")
return False
def image_to_bytes(image: Image.Image) -> bytes:
"""Convert PIL Image to PNG bytes."""
buf = io.BytesIO()
image.save(buf, format="PNG")
return buf.getvalue()
def process_image(
parser: OmniParser, image_path: str, output_dir: Path, use_ocr: bool = False
) -> None:
"""Process a single image and save the result."""
try:
# Load image
logger.info(f"Processing image: {image_path}")
image = Image.open(image_path).convert("RGB")
logger.info(f"Image loaded successfully, size: {image.size}")
# Create output filename
input_filename = Path(image_path).stem
output_path = output_dir / f"{input_filename}_analyzed.png"
# Convert image to PNG bytes
image_bytes = image_to_bytes(image)
# Process image
with Timer(f"Processing {input_filename}", logger):
result = parser.parse(image_bytes, use_ocr=use_ocr)
logger.info(
f"Found {result.metadata.num_icons} icons and {result.metadata.num_text} text elements"
)
# Save the annotated image
logger.info(f"Saving annotated image to: {output_path}")
try:
# Save image from base64
img_data = base64.b64decode(result.annotated_image_base64)
img = Image.open(io.BytesIO(img_data))
img.save(output_path)
# Print detailed results
logger.info("\nDetected Elements:")
for elem in result.elements:
if isinstance(elem, IconElement):
logger.info(
f"Icon: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
)
elif isinstance(elem, TextElement):
logger.info(
f"Text: '{elem.content}', confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
)
# Verify file exists and log size
if output_path.exists():
logger.info(
f"Successfully saved image. File size: {output_path.stat().st_size} bytes"
)
else:
logger.error(f"Failed to verify file at {output_path}")
except Exception as e:
logger.error(f"Error saving image: {str(e)}", exc_info=True)
except Exception as e:
logger.error(f"Error processing image {image_path}: {str(e)}", exc_info=True)
def run_detection_benchmark(
input_path: str,
output_dir: Path,
use_ocr: bool = False,
box_threshold: float = 0.01,
iou_threshold: float = 0.1,
):
"""Run detection benchmark on images."""
logger.info(
f"Starting benchmark with OCR enabled: {use_ocr}, box_threshold: {box_threshold}, iou_threshold: {iou_threshold}"
)
try:
# Initialize parser
logger.info("Initializing OmniParser...")
parser = OmniParser()
# Create output directory
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Output directory created at: {output_dir}")
# Get list of PNG files
if os.path.isdir(input_path):
image_files = glob.glob(os.path.join(input_path, "*.png"))
else:
image_files = [input_path]
logger.info(f"Found {len(image_files)} images to process")
# Process each image with specified thresholds
for image_path in image_files:
try:
# Load image
logger.info(f"Processing image: {image_path}")
image = Image.open(image_path).convert("RGB")
logger.info(f"Image loaded successfully, size: {image.size}")
# Create output filename
input_filename = Path(image_path).stem
output_path = output_dir / f"{input_filename}_analyzed.png"
# Convert image to PNG bytes
image_bytes = image_to_bytes(image)
# Process image with specified thresholds
with Timer(f"Processing {input_filename}", logger):
result = parser.parse(
image_bytes,
use_ocr=use_ocr,
box_threshold=box_threshold,
iou_threshold=iou_threshold,
)
logger.info(
f"Found {result.metadata.num_icons} icons and {result.metadata.num_text} text elements"
)
# Save the annotated image
logger.info(f"Saving annotated image to: {output_path}")
try:
# Save image from base64
img_data = base64.b64decode(result.annotated_image_base64)
img = Image.open(io.BytesIO(img_data))
img.save(output_path)
# Print detailed results
logger.info("\nDetected Elements:")
for elem in result.elements:
if isinstance(elem, IconElement):
logger.info(
f"Icon: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
)
elif isinstance(elem, TextElement):
logger.info(
f"Text: '{elem.content}', confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
)
# Verify file exists and log size
if output_path.exists():
logger.info(
f"Successfully saved image. File size: {output_path.stat().st_size} bytes"
)
else:
logger.error(f"Failed to verify file at {output_path}")
except Exception as e:
logger.error(f"Error saving image: {str(e)}", exc_info=True)
except Exception as e:
logger.error(f"Error processing image {image_path}: {str(e)}", exc_info=True)
except Exception as e:
logger.error(f"Benchmark failed: {str(e)}", exc_info=True)
raise
def run_experiments(input_path: str, output_dir: Path, use_ocr: bool = False):
"""Run experiments with different threshold combinations."""
# Define threshold values to test
box_thresholds = [0.01, 0.05, 0.1, 0.3]
iou_thresholds = [0.05, 0.1, 0.2, 0.5]
logger.info("Starting threshold experiments...")
logger.info("Box thresholds to test: %s", box_thresholds)
logger.info("IOU thresholds to test: %s", iou_thresholds)
# Create results directory for this experiment
timestamp = time.strftime("%Y%m%d-%H%M%S")
ocr_suffix = "_ocr" if use_ocr else "_no_ocr"
exp_dir = output_dir / f"experiment_{timestamp}{ocr_suffix}"
exp_dir.mkdir(parents=True, exist_ok=True)
# Create a summary file
summary_file = exp_dir / "results_summary.txt"
with open(summary_file, "w") as f:
f.write("Threshold Experiments Results\n")
f.write("==========================\n\n")
f.write(f"Input: {input_path}\n")
f.write(f"OCR Enabled: {use_ocr}\n")
f.write(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write("Results:\n")
f.write("-" * 80 + "\n")
f.write(
f"{'Box Thresh':^10} | {'IOU Thresh':^10} | {'Num Icons':^10} | {'Num Text':^10} | {'Time (s)':^10}\n"
)
f.write("-" * 80 + "\n")
# Initialize parser once for all experiments
parser = OmniParser()
# Run experiments with each combination
for box_thresh in box_thresholds:
for iou_thresh in iou_thresholds:
logger.info(f"\nTesting box_threshold={box_thresh}, iou_threshold={iou_thresh}")
# Create directory for this combination
combo_dir = exp_dir / f"box_{box_thresh}_iou_{iou_thresh}"
combo_dir.mkdir(exist_ok=True)
try:
# Process each image
if os.path.isdir(input_path):
image_files = glob.glob(os.path.join(input_path, "*.png"))
else:
image_files = [input_path]
total_icons = 0
total_text = 0
total_time = 0
for image_path in image_files:
# Load and process image
image = Image.open(image_path).convert("RGB")
image_bytes = image_to_bytes(image)
# Process with current thresholds
with Timer(f"Processing {Path(image_path).stem}", logger) as t:
result = parser.parse(
image_bytes,
use_ocr=use_ocr,
box_threshold=box_thresh,
iou_threshold=iou_thresh,
)
# Save annotated image
output_path = combo_dir / f"{Path(image_path).stem}_analyzed.png"
img_data = base64.b64decode(result.annotated_image_base64)
img = Image.open(io.BytesIO(img_data))
img.save(output_path)
# Update totals
total_icons += result.metadata.num_icons
total_text += result.metadata.num_text
# Log detailed results
detail_file = combo_dir / f"{Path(image_path).stem}_details.txt"
with open(detail_file, "w") as detail_f:
detail_f.write(f"Results for {Path(image_path).name}\n")
detail_f.write("-" * 40 + "\n")
detail_f.write(f"Number of icons: {result.metadata.num_icons}\n")
detail_f.write(
f"Number of text elements: {result.metadata.num_text}\n\n"
)
detail_f.write("Icon Detections:\n")
icon_count = 1
text_count = (
result.metadata.num_icons + 1
) # Text boxes start after icons
# First list all icons
for elem in result.elements:
if isinstance(elem, IconElement):
detail_f.write(f"Box #{icon_count}: Icon\n")
detail_f.write(f" - Confidence: {elem.confidence:.3f}\n")
detail_f.write(
f" - Coordinates: {elem.bbox.coordinates}\n"
)
icon_count += 1
if use_ocr:
detail_f.write("\nText Detections:\n")
for elem in result.elements:
if isinstance(elem, TextElement):
detail_f.write(f"Box #{text_count}: Text\n")
detail_f.write(f" - Content: '{elem.content}'\n")
detail_f.write(
f" - Confidence: {elem.confidence:.3f}\n"
)
detail_f.write(
f" - Coordinates: {elem.bbox.coordinates}\n"
)
text_count += 1
# Update timing totals
total_time += t.elapsed_time
# Write summary for this combination
avg_time = total_time / len(image_files)
f.write(
f"{box_thresh:^10.3f} | {iou_thresh:^10.3f} | {total_icons:^10d} | {total_text:^10d} | {avg_time:^10.3f}\n"
)
except Exception as e:
logger.error(
f"Error in experiment box={box_thresh}, iou={iou_thresh}: {str(e)}"
)
f.write(
f"{box_thresh:^10.3f} | {iou_thresh:^10.3f} | {'ERROR':^10s} | {'ERROR':^10s} | {'ERROR':^10s}\n"
)
# Write summary footer
f.write("-" * 80 + "\n")
f.write("\nExperiment completed successfully!\n")
logger.info(f"\nExperiment results saved to {exp_dir}")
logger.info(f"Summary file: {summary_file}")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Run OmniParser benchmark")
parser.add_argument("input_path", help="Path to input image or directory containing images")
parser.add_argument(
"--output-dir", default="examples/output", help="Output directory for annotated images"
)
parser.add_argument(
"--ocr",
choices=["none", "easyocr"],
default="none",
help="OCR engine to use (default: none)",
)
parser.add_argument(
"--mode",
choices=["single", "experiment"],
default="single",
help="Run mode: single run or threshold experiments (default: single)",
)
parser.add_argument(
"--box-threshold",
type=float,
default=0.01,
help="Confidence threshold for detection (default: 0.01)",
)
parser.add_argument(
"--iou-threshold",
type=float,
default=0.1,
help="IOU threshold for Non-Maximum Suppression (default: 0.1)",
)
args = parser.parse_args()
logger.info(f"Starting OmniParser with arguments: {args}")
use_ocr = args.ocr != "none"
output_dir = Path(args.output_dir)
try:
if args.mode == "experiment":
run_experiments(args.input_path, output_dir, use_ocr)
else:
run_detection_benchmark(
args.input_path, output_dir, use_ocr, args.box_threshold, args.iou_threshold
)
except Exception as e:
logger.error(f"Process failed: {str(e)}", exc_info=True)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/libs/python/som/som/detect.py:
--------------------------------------------------------------------------------
```python
import argparse
import base64
import io
import logging
import signal
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, cast
import cv2
import numpy as np
import supervision as sv
import torch
import torchvision.ops
import torchvision.transforms as T
from huggingface_hub import hf_hub_download
from PIL import Image
from supervision.detection.core import Detections
from ultralytics import YOLO
from .detection import DetectionProcessor
from .models import (
BoundingBox,
IconElement,
ParseResult,
ParserMetadata,
TextElement,
UIElement,
)
from .ocr import OCRProcessor
from .visualization import BoxAnnotator
logger = logging.getLogger(__name__)
class TimeoutException(Exception):
pass
@contextmanager
def timeout(seconds: int):
def timeout_handler(signum, frame):
raise TimeoutException("OCR process timed out")
# Register the signal handler
original_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, original_handler)
def process_text_box(box, image):
"""Process a single text box with OCR."""
try:
from typing import Any, List, Sequence, Tuple
import easyocr
x1 = int(min(point[0] for point in box))
y1 = int(min(point[1] for point in box))
x2 = int(max(point[0] for point in box))
y2 = int(max(point[1] for point in box))
# Add padding
pad = 2
x1 = max(0, x1 - pad)
y1 = max(0, y1 - pad)
x2 = min(image.shape[1], x2 + pad)
y2 = min(image.shape[0], y2 + pad)
region = image[y1:y2, x1:x2]
if region.size > 0:
reader = easyocr.Reader(["en"])
results = reader.readtext(region)
if results and len(results) > 0:
# EasyOCR returns a list of tuples (bbox, text, confidence)
first_result = results[0]
if isinstance(first_result, (list, tuple)) and len(first_result) >= 3:
text = str(first_result[1])
confidence = float(first_result[2])
if confidence > 0.5:
return text, [x1, y1, x2, y2], confidence
except Exception:
pass
return None
def check_ocr_box(image_path: Union[str, Path]) -> Tuple[List[str], List[List[float]]]:
"""Check OCR box using EasyOCR."""
# Read image once
if isinstance(image_path, str):
image_path = Path(image_path)
# Read image into memory
image_cv = cv2.imread(str(image_path))
if image_cv is None:
logger.error(f"Failed to read image: {image_path}")
return [], []
# Get image dimensions
img_height, img_width = image_cv.shape[:2]
confidence_threshold = 0.5
# Use EasyOCR
import ssl
import easyocr
# Create unverified SSL context for development
ssl._create_default_https_context = ssl._create_unverified_context
try:
reader = easyocr.Reader(["en"])
with timeout(5): # 5 second timeout for EasyOCR
results = reader.readtext(image_cv, paragraph=False, text_threshold=0.5)
except TimeoutException:
logger.warning("EasyOCR timed out, returning no results")
return [], []
except Exception as e:
logger.warning(f"EasyOCR failed: {str(e)}")
return [], []
finally:
# Restore default SSL context
ssl._create_default_https_context = ssl.create_default_context
texts = []
boxes = []
for box, text, conf in results:
# Convert box format to [x1, y1, x2, y2]
x1 = min(point[0] for point in box)
y1 = min(point[1] for point in box)
x2 = max(point[0] for point in box)
y2 = max(point[1] for point in box)
if float(conf) > 0.5: # Only keep higher confidence detections
texts.append(text)
boxes.append([x1, y1, x2, y2])
return texts, boxes
class OmniParser:
"""Enhanced UI parser using computer vision and OCR for detecting interactive elements."""
def __init__(
self,
model_path: Optional[Union[str, Path]] = None,
cache_dir: Optional[Union[str, Path]] = None,
force_device: Optional[str] = None,
):
"""Initialize the OmniParser.
Args:
model_path: Optional path to the YOLO model
cache_dir: Optional directory to cache model files
force_device: Force specific device (cpu/cuda/mps)
"""
self.detector = DetectionProcessor(
model_path=Path(model_path) if model_path else None,
cache_dir=Path(cache_dir) if cache_dir else None,
force_device=force_device,
)
self.ocr = OCRProcessor()
self.visualizer = BoxAnnotator()
def process_image(
self,
image: Image.Image,
box_threshold: float = 0.3,
iou_threshold: float = 0.1,
use_ocr: bool = True,
) -> Tuple[Image.Image, List[UIElement]]:
"""Process an image to detect UI elements and optionally text.
Args:
image: Input PIL Image
box_threshold: Confidence threshold for detection
iou_threshold: IOU threshold for NMS
use_ocr: Whether to enable OCR processing
Returns:
Tuple of (annotated image, list of detections)
"""
try:
logger.info("Starting UI element detection...")
# Detect icons
icon_detections = self.detector.detect_icons(
image=image, box_threshold=box_threshold, iou_threshold=iou_threshold
)
logger.info(f"Found {len(icon_detections)} interactive elements")
# Convert icon detections to typed objects
elements: List[UIElement] = cast(
List[UIElement],
[
IconElement(
id=i + 1,
bbox=BoundingBox(
x1=det["bbox"][0],
y1=det["bbox"][1],
x2=det["bbox"][2],
y2=det["bbox"][3],
),
confidence=det["confidence"],
scale=det.get("scale"),
)
for i, det in enumerate(icon_detections)
],
)
# Run OCR if enabled
if use_ocr:
logger.info("Running OCR detection...")
text_detections = self.ocr.detect_text(image=image, confidence_threshold=0.5)
if text_detections is None:
text_detections = []
logger.info(f"Found {len(text_detections)} text regions")
# Convert text detections to typed objects
text_elements = cast(
List[UIElement],
[
TextElement(
id=len(elements) + i + 1,
bbox=BoundingBox(
x1=det["bbox"][0],
y1=det["bbox"][1],
x2=det["bbox"][2],
y2=det["bbox"][3],
),
content=det["content"],
confidence=det["confidence"],
)
for i, det in enumerate(text_detections)
],
)
if elements and text_elements:
# Filter out non-OCR elements that have OCR elements with center points colliding with them
filtered_elements = []
for elem in elements: # elements at this point contains only non-OCR elements
should_keep = True
for text_elem in text_elements:
# Calculate center point of the text element
center_x = (text_elem.bbox.x1 + text_elem.bbox.x2) / 2
center_y = (text_elem.bbox.y1 + text_elem.bbox.y2) / 2
# Check if this center point is inside the non-OCR element
if (
center_x >= elem.bbox.x1
and center_x <= elem.bbox.x2
and center_y >= elem.bbox.y1
and center_y <= elem.bbox.y2
):
should_keep = False
break
if should_keep:
filtered_elements.append(elem)
elements = filtered_elements
# Merge detections using NMS
all_elements = elements + text_elements
boxes = torch.tensor([elem.bbox.coordinates for elem in all_elements])
scores = torch.tensor([elem.confidence for elem in all_elements])
keep_indices = torchvision.ops.nms(boxes, scores, iou_threshold)
elements = [all_elements[i] for i in keep_indices]
else:
# Just add text elements to the list if IOU doesn't need to be applied
elements.extend(text_elements)
# Calculate drawing parameters based on image size
box_overlay_ratio = max(image.size) / 3200
draw_config = {
"font_size": int(12 * box_overlay_ratio),
"box_thickness": max(int(2 * box_overlay_ratio), 1),
"text_padding": max(int(3 * box_overlay_ratio), 1),
}
# Convert elements back to dict format for visualization
detection_dicts = [
{
"type": elem.type,
"bbox": elem.bbox.coordinates,
"confidence": elem.confidence,
"content": elem.content if isinstance(elem, TextElement) else None,
}
for elem in elements
]
# Create visualization
logger.info("Creating visualization...")
annotated_image = self.visualizer.draw_boxes(
image=image.copy(), detections=detection_dicts, draw_config=draw_config
)
logger.info("Visualization complete")
return annotated_image, elements
except Exception as e:
logger.error(f"Error in process_image: {str(e)}")
import traceback
logger.error(traceback.format_exc())
raise
def parse(
self,
screenshot_data: Union[bytes, str],
box_threshold: float = 0.3,
iou_threshold: float = 0.1,
use_ocr: bool = True,
) -> ParseResult:
"""Parse a UI screenshot to detect interactive elements and text.
Args:
screenshot_data: Raw bytes or base64 string of the screenshot
box_threshold: Confidence threshold for detection
iou_threshold: IOU threshold for NMS
use_ocr: Whether to enable OCR processing
Returns:
ParseResult object containing elements, annotated image, and metadata
"""
try:
start_time = time.time()
# Convert input to PIL Image
if isinstance(screenshot_data, str):
screenshot_data = base64.b64decode(screenshot_data)
image = Image.open(io.BytesIO(screenshot_data)).convert("RGB")
# Process image
annotated_image, elements = self.process_image(
image=image,
box_threshold=box_threshold,
iou_threshold=iou_threshold,
use_ocr=use_ocr,
)
# Convert annotated image to base64
buffered = io.BytesIO()
annotated_image.save(buffered, format="PNG")
annotated_image_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Generate screen info text
screen_info = []
parsed_content_list = []
# Set element IDs and generate human-readable descriptions
for i, elem in enumerate(elements):
# Set the ID (1-indexed)
elem.id = i + 1
if isinstance(elem, IconElement):
screen_info.append(
f"Box #{i+1}: Icon (confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates})"
)
parsed_content_list.append(
{
"id": i + 1,
"type": "icon",
"bbox": elem.bbox.coordinates,
"confidence": elem.confidence,
"content": None,
}
)
elif isinstance(elem, TextElement):
screen_info.append(
f"Box #{i+1}: Text '{elem.content}' (confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates})"
)
parsed_content_list.append(
{
"id": i + 1,
"type": "text",
"bbox": elem.bbox.coordinates,
"confidence": elem.confidence,
"content": elem.content,
}
)
# Calculate metadata
latency = time.time() - start_time
width, height = image.size
# Create ParseResult object with enhanced properties
result = ParseResult(
elements=elements,
annotated_image_base64=annotated_image_base64,
screen_info=screen_info,
parsed_content_list=parsed_content_list,
metadata=ParserMetadata(
image_size=(width, height),
num_icons=len([e for e in elements if isinstance(e, IconElement)]),
num_text=len([e for e in elements if isinstance(e, TextElement)]),
device=self.detector.device,
ocr_enabled=use_ocr,
latency=latency,
),
)
# Return the ParseResult object directly
return result
except Exception as e:
logger.error(f"Error in parse: {str(e)}")
import traceback
logger.error(traceback.format_exc())
raise
def main():
"""Command line interface for UI element detection."""
parser = argparse.ArgumentParser(description="Detect UI elements and text in images")
parser.add_argument("image_path", help="Path to the input image")
parser.add_argument("--model-path", help="Path to YOLO model")
parser.add_argument(
"--box-threshold", type=float, default=0.3, help="Box confidence threshold (default: 0.3)"
)
parser.add_argument(
"--iou-threshold", type=float, default=0.1, help="IOU threshold (default: 0.1)"
)
parser.add_argument(
"--ocr", action="store_true", default=True, help="Enable OCR processing (default: True)"
)
parser.add_argument("--output", help="Output path for annotated image")
args = parser.parse_args()
# Setup logging
logging.basicConfig(level=logging.INFO)
try:
# Initialize parser
parser = OmniParser(model_path=args.model_path)
# Load and process image
logger.info(f"Loading image from: {args.image_path}")
image = Image.open(args.image_path).convert("RGB")
logger.info(f"Image loaded successfully, size: {image.size}")
# Process image
annotated_image, elements = parser.process_image(
image=image,
box_threshold=args.box_threshold,
iou_threshold=args.iou_threshold,
use_ocr=args.ocr,
)
# Save output image
output_path = args.output or str(
Path(args.image_path).parent
/ f"{Path(args.image_path).stem}_analyzed{Path(args.image_path).suffix}"
)
logger.info(f"Saving annotated image to: {output_path}")
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
annotated_image.save(output_path)
logger.info(f"Image saved successfully to {output_path}")
# Print detections
logger.info("\nDetections:")
for i, elem in enumerate(elements):
if isinstance(elem, IconElement):
logger.info(
f"Interactive element {i}: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
)
elif isinstance(elem, TextElement):
logger.info(f"Text {i}: '{elem.content}', bbox={elem.bbox.coordinates}")
except Exception as e:
logger.error(f"Error processing image: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return 1
return 0
if __name__ == "__main__":
import sys
sys.exit(main())
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/cli.py:
--------------------------------------------------------------------------------
```python
"""
CLI chat interface for agent - Computer Use Agent
Usage:
python -m agent.cli <model_string>
Examples:
python -m agent.cli openai/computer-use-preview
python -m agent.cli anthropic/claude-sonnet-4-5-20250929
python -m agent.cli omniparser+anthropic/claude-sonnet-4-5-20250929
"""
try:
import argparse
import asyncio
import base64
import json
import os
import platform
import sys
import time
from pathlib import Path
from typing import Any, Dict, List
import dotenv
try:
from PIL import Image, ImageDraw
PIL_AVAILABLE = True
except Exception:
PIL_AVAILABLE = False
from yaspin import yaspin
except ImportError:
if __name__ == "__main__":
raise ImportError(
"CLI dependencies not found. " 'Please install with: pip install "cua-agent[cli]"'
)
# Load environment variables
dotenv.load_dotenv()
# Color codes for terminal output
class Colors:
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
# Text colors
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
WHITE = "\033[37m"
GRAY = "\033[90m"
# Background colors
BG_RED = "\033[41m"
BG_GREEN = "\033[42m"
BG_YELLOW = "\033[43m"
BG_BLUE = "\033[44m"
def print_colored(
text: str,
color: str = "",
bold: bool = False,
dim: bool = False,
end: str = "\n",
right: str = "",
):
"""Print colored text to terminal with optional right-aligned text."""
prefix = ""
if bold:
prefix += Colors.BOLD
if dim:
prefix += Colors.DIM
if color:
prefix += color
if right:
# Get terminal width (default to 80 if unable to determine)
try:
import shutil
terminal_width = shutil.get_terminal_size().columns
except:
terminal_width = 80
# Add right margin
terminal_width -= 1
# Calculate padding needed
# Account for ANSI escape codes not taking visual space
visible_left_len = len(text)
visible_right_len = len(right)
padding = terminal_width - visible_left_len - visible_right_len
if padding > 0:
output = f"{prefix}{text}{' ' * padding}{right}{Colors.RESET}"
else:
# If not enough space, just put a single space between
output = f"{prefix}{text} {right}{Colors.RESET}"
else:
output = f"{prefix}{text}{Colors.RESET}"
print(output, end=end)
def print_action(action_type: str, details: Dict[str, Any], total_cost: float):
"""Print computer action with nice formatting."""
# Format action details
args_str = ""
if action_type == "click" and "x" in details and "y" in details:
args_str = f"_{details.get('button', 'left')}({details['x']}, {details['y']})"
elif action_type == "type" and "text" in details:
text = details["text"]
if len(text) > 50:
text = text[:47] + "..."
args_str = f'("{text}")'
elif action_type == "key" and "text" in details:
args_str = f"('{details['text']}')"
elif action_type == "scroll" and "x" in details and "y" in details:
args_str = f"({details['x']}, {details['y']})"
if total_cost > 0:
print_colored(f"🛠️ {action_type}{args_str}", dim=True, right=f"💸 ${total_cost:.2f}")
else:
print_colored(f"🛠️ {action_type}{args_str}", dim=True)
def print_welcome(model: str, agent_loop: str, container_name: str):
"""Print welcome message."""
print_colored(f"Connected to {container_name} ({model}, {agent_loop})")
print_colored("Type 'exit' to quit.", dim=True)
async def ainput(prompt: str = ""):
return await asyncio.to_thread(input, prompt)
async def chat_loop(
agent, model: str, container_name: str, initial_prompt: str = "", show_usage: bool = True
):
"""Main chat loop with the agent."""
print_welcome(model, agent.agent_config_info.agent_class.__name__, container_name)
history = []
if initial_prompt:
history.append({"role": "user", "content": initial_prompt})
total_cost = 0
while True:
if len(history) == 0 or history[-1].get("role") != "user":
# Get user input with prompt
print_colored("> ", end="")
user_input = await ainput()
if user_input.lower() in ["exit", "quit", "q"]:
print_colored("\n👋 Goodbye!")
break
if not user_input:
continue
# Add user message to history
history.append({"role": "user", "content": user_input})
# Stream responses from the agent with spinner
with yaspin(text="Thinking...", spinner="line", attrs=["dark"]) as spinner:
spinner.hide()
async for result in agent.run(history):
# Add agent responses to history
history.extend(result.get("output", []))
if show_usage:
total_cost += result.get("usage", {}).get("response_cost", 0)
# Process and display the output
for item in result.get("output", []):
if item.get("type") == "message" and item.get("role") == "assistant":
# Display agent text response
content = item.get("content", [])
for content_part in content:
if content_part.get("text"):
text = content_part.get("text", "").strip()
if text:
spinner.hide()
print_colored(text)
elif item.get("type") == "computer_call":
# Display computer action
action = item.get("action", {})
action_type = action.get("type", "")
if action_type:
spinner.hide()
print_action(action_type, action, total_cost)
spinner.text = f"Performing {action_type}..."
spinner.show()
elif item.get("type") == "function_call":
# Display function call
function_name = item.get("name", "")
spinner.hide()
print_colored(f"🔧 Calling function: {function_name}", dim=True)
spinner.text = f"Calling {function_name}..."
spinner.show()
elif item.get("type") == "function_call_output":
# Display function output (dimmed)
output = item.get("output", "")
if output and len(output.strip()) > 0:
spinner.hide()
print_colored(f"📤 {output}", dim=True)
spinner.hide()
if show_usage and total_cost > 0:
print_colored(f"Total cost: ${total_cost:.2f}", dim=True)
async def main():
"""Main CLI function."""
parser = argparse.ArgumentParser(
description="CUA Agent CLI - Interactive computer use assistant",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python -m agent.cli openai/computer-use-preview
python -m agent.cli anthropic/claude-sonnet-4-5-20250929
python -m agent.cli omniparser+anthropic/claude-sonnet-4-5-20250929
python -m agent.cli huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B
""",
)
parser.add_argument(
"model",
help="Model string (e.g., 'openai/computer-use-preview', 'anthropic/claude-sonnet-4-5-20250929')",
)
parser.add_argument(
"--provider",
choices=["cloud", "lume", "winsandbox", "docker"],
default="cloud",
help="Computer provider to use: cloud (default), lume, winsandbox, or docker",
)
parser.add_argument(
"--images",
type=int,
default=3,
help="Number of recent images to keep in context (default: 3)",
)
parser.add_argument("--trajectory", action="store_true", help="Save trajectory for debugging")
parser.add_argument("--budget", type=float, help="Maximum budget for the session (in dollars)")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
parser.add_argument(
"-p",
"--prompt",
type=str,
help="Initial prompt to send to the agent. Leave blank for interactive mode.",
)
parser.add_argument(
"--prompt-file",
type=Path,
help="Path to a UTF-8 text file whose contents will be used as the initial prompt. If provided, overrides --prompt.",
)
parser.add_argument(
"--predict-click",
dest="predict_click",
type=str,
help="Instruction for click prediction. If set, runs predict_click, draws crosshair on a fresh screenshot, saves and opens it.",
)
parser.add_argument("-c", "--cache", action="store_true", help="Tell the API to enable caching")
parser.add_argument(
"-u", "--usage", action="store_true", help="Show total cost of the agent runs"
)
parser.add_argument(
"-r",
"--max-retries",
type=int,
default=3,
help="Maximum number of retries for the LLM API calls",
)
# Provider override credentials
parser.add_argument(
"--api-key",
dest="api_key",
type=str,
help="API key override for the model provider (passed to ComputerAgent)",
)
parser.add_argument(
"--api-base",
dest="api_base",
type=str,
help="API base URL override for the model provider (passed to ComputerAgent)",
)
args = parser.parse_args()
# Check for required environment variables
container_name = os.getenv("CUA_CONTAINER_NAME")
cua_api_key = os.getenv("CUA_API_KEY")
# Prompt for missing environment variables (container name always required)
if not container_name:
if args.provider == "cloud":
print_colored("CUA_CONTAINER_NAME not set.", dim=True)
print_colored("You can get a CUA container at https://cua.ai/", dim=True)
container_name = input("Enter your CUA container name: ").strip()
if not container_name:
print_colored("❌ Container name is required.")
sys.exit(1)
else:
container_name = "cli-sandbox"
# Only require API key for cloud provider
if args.provider == "cloud" and not cua_api_key:
print_colored("CUA_API_KEY not set.", dim=True)
cua_api_key = input("Enter your CUA API key: ").strip()
if not cua_api_key:
print_colored("❌ API key is required for cloud provider.")
sys.exit(1)
# Check for provider-specific API keys based on model
provider_api_keys = {
"openai/": "OPENAI_API_KEY",
"anthropic/": "ANTHROPIC_API_KEY",
}
# Find matching provider and check for API key
for prefix, env_var in provider_api_keys.items():
if prefix in args.model:
if not os.getenv(env_var):
print_colored(f"{env_var} not set.", dim=True)
api_key = input(f"Enter your {env_var.replace('_', ' ').title()}: ").strip()
if not api_key:
print_colored(f"❌ {env_var.replace('_', ' ').title()} is required.")
sys.exit(1)
# Set the environment variable for the session
os.environ[env_var] = api_key
break
# Import here to avoid import errors if dependencies are missing
try:
from agent import ComputerAgent
from computer import Computer
except ImportError as e:
print_colored(f"❌ Import error: {e}", Colors.RED, bold=True)
print_colored("Make sure agent and computer libraries are installed.", Colors.YELLOW)
sys.exit(1)
# Resolve provider -> os_type, provider_type, api key requirement
provider_map = {
"cloud": ("linux", "cloud", True),
"lume": ("macos", "lume", False),
"winsandbox": ("windows", "winsandbox", False),
"docker": ("linux", "docker", False),
}
os_type, provider_type, needs_api_key = provider_map[args.provider]
computer_kwargs = {
"os_type": os_type,
"provider_type": provider_type,
"name": container_name,
}
if needs_api_key:
computer_kwargs["api_key"] = cua_api_key # type: ignore
# Create computer instance
async with Computer(**computer_kwargs) as computer: # type: ignore
# Create agent
agent_kwargs = {
"model": args.model,
"tools": [computer],
"trust_remote_code": True, # needed for some local models (e.g., InternVL, OpenCUA)
"verbosity": 20 if args.verbose else 30, # DEBUG vs WARNING
"max_retries": args.max_retries,
}
# Thread API credentials to agent if provided
if args.api_key:
agent_kwargs["api_key"] = args.api_key
if args.api_base:
agent_kwargs["api_base"] = args.api_base
if args.images > 0:
agent_kwargs["only_n_most_recent_images"] = args.images
if args.trajectory:
agent_kwargs["trajectory_dir"] = "trajectories"
if args.budget:
agent_kwargs["max_trajectory_budget"] = {
"max_budget": args.budget,
"raise_error": True,
"reset_after_each_run": False,
}
if args.cache:
agent_kwargs["use_prompt_caching"] = True
agent = ComputerAgent(**agent_kwargs)
# If predict-click mode is requested, run once and exit
if args.predict_click:
if not PIL_AVAILABLE:
print_colored(
"❌ Pillow (PIL) is required for --predict-click visualization. Install with: pip install pillow",
Colors.RED,
bold=True,
)
sys.exit(1)
instruction = args.predict_click
print_colored(f"Predicting click for: '{instruction}'", Colors.CYAN)
# Take a fresh screenshot FIRST
try:
img_bytes = await computer.interface.screenshot()
except Exception as e:
print_colored(f"❌ Failed to take screenshot: {e}", Colors.RED, bold=True)
sys.exit(1)
# Encode screenshot to base64 for predict_click
try:
image_b64 = base64.b64encode(img_bytes).decode("utf-8")
except Exception as e:
print_colored(f"❌ Failed to encode screenshot: {e}", Colors.RED, bold=True)
sys.exit(1)
try:
coords = await agent.predict_click(instruction, image_b64=image_b64)
except Exception as e:
print_colored(f"❌ predict_click failed: {e}", Colors.RED, bold=True)
sys.exit(1)
if not coords:
print_colored("⚠️ No coordinates returned.", Colors.YELLOW)
sys.exit(2)
x, y = coords
print_colored(f"✅ Predicted coordinates: ({x}, {y})", Colors.GREEN)
try:
from io import BytesIO
with Image.open(BytesIO(img_bytes)) as img:
img = img.convert("RGB")
draw = ImageDraw.Draw(img)
# Draw crosshair
size = 12
color = (255, 0, 0)
draw.line([(x - size, y), (x + size, y)], fill=color, width=3)
draw.line([(x, y - size), (x, y + size)], fill=color, width=3)
# Optional small circle
r = 6
draw.ellipse([(x - r, y - r), (x + r, y + r)], outline=color, width=2)
out_path = Path.cwd() / f"predict_click_{int(time.time())}.png"
img.save(out_path)
print_colored(f"🖼️ Saved to {out_path}")
# Open the image with default viewer
try:
system = platform.system().lower()
if system == "windows":
os.startfile(str(out_path)) # type: ignore[attr-defined]
elif system == "darwin":
os.system(f'open "{out_path}"')
else:
os.system(f'xdg-open "{out_path}"')
except Exception:
pass
except Exception as e:
print_colored(f"❌ Failed to render/save screenshot: {e}", Colors.RED, bold=True)
sys.exit(1)
# Done
sys.exit(0)
# Resolve initial prompt from --prompt-file or --prompt
initial_prompt = args.prompt or ""
if args.prompt_file:
try:
initial_prompt = args.prompt_file.read_text(encoding="utf-8")
except Exception as e:
print_colored(f"❌ Failed to read --prompt-file: {e}", Colors.RED, bold=True)
sys.exit(1)
# Start chat loop (default interactive mode)
await chat_loop(agent, args.model, container_name, initial_prompt, args.usage)
if __name__ == "__main__":
try:
asyncio.run(main())
except (KeyboardInterrupt, EOFError) as _:
print_colored("\n\n👋 Goodbye!")
```
--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/moondream3.py:
--------------------------------------------------------------------------------
```python
"""
Moondream3+ composed-grounded agent loop implementation.
Grounding is handled by a local Moondream3 preview model via Transformers.
Thinking is delegated to the trailing LLM in the composed model string: "moondream3+<thinking_model>".
Differences from composed_grounded:
- Provides a singleton Moondream3 client outside the class.
- predict_click uses model.point(image, instruction, settings={"max_objects": 1}) and returns pixel coordinates.
- If the last image was a screenshot (or we take one), run model.detect(image, "all form ui") to get bboxes, then
run model.caption on each cropped bbox to label it. Overlay labels on the screenshot and emit via _on_screenshot.
- Add a user message listing all detected form UI names so the thinker can reference them.
- If the thinking model doesn't support vision, filter out image content before calling litellm.
"""
from __future__ import annotations
import base64
import io
import uuid
from typing import Any, Dict, List, Optional, Tuple
import litellm
from PIL import Image, ImageDraw, ImageFont
from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..responses import (
convert_completion_messages_to_responses_items,
convert_computer_calls_desc2xy,
convert_computer_calls_xy2desc,
convert_responses_items_to_completion_messages,
get_all_element_descriptions,
)
from ..types import AgentCapability
_MOONDREAM_SINGLETON = None
def get_moondream_model() -> Any:
"""Get a singleton instance of the Moondream3 preview model."""
global _MOONDREAM_SINGLETON
if _MOONDREAM_SINGLETON is None:
try:
import torch
from transformers import AutoModelForCausalLM
_MOONDREAM_SINGLETON = AutoModelForCausalLM.from_pretrained(
"moondream/moondream3-preview",
trust_remote_code=True,
torch_dtype=torch.bfloat16,
device_map="cuda",
)
except ImportError as e:
raise RuntimeError(
"moondream3 requires torch and transformers. Install with: pip install cua-agent[moondream3]"
) from e
return _MOONDREAM_SINGLETON
def _decode_image_b64(image_b64: str) -> Image.Image:
data = base64.b64decode(image_b64)
return Image.open(io.BytesIO(data)).convert("RGB")
def _image_to_b64(img: Image.Image) -> str:
buf = io.BytesIO()
img.save(buf, format="PNG")
return base64.b64encode(buf.getvalue()).decode("utf-8")
def _supports_vision(model: str) -> bool:
"""Heuristic vision support detection for thinking model."""
m = model.lower()
vision_markers = [
"gpt-4o",
"gpt-4.1",
"o1",
"o3",
"claude-3",
"claude-3.5",
"sonnet",
"haiku",
"opus",
"gemini-1.5",
"llava",
]
return any(v in m for v in vision_markers)
def _filter_images_from_completion_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
filtered: List[Dict[str, Any]] = []
for msg in messages:
msg_copy = {**msg}
content = msg_copy.get("content")
if isinstance(content, list):
msg_copy["content"] = [c for c in content if c.get("type") != "image_url"]
filtered.append(msg_copy)
return filtered
def _annotate_detect_and_label_ui(base_img: Image.Image, model_md) -> Tuple[str, List[str]]:
"""Detect UI elements with Moondream, caption each, draw labels with backgrounds.
Args:
base_img: PIL image of the screenshot (RGB or RGBA). Will be copied/converted internally.
model_md: Moondream model instance with .detect() and .query() methods.
Returns:
A tuple of (annotated_image_base64_png, detected_names)
"""
# Ensure RGBA for semi-transparent fills
if base_img.mode != "RGBA":
base_img = base_img.convert("RGBA")
W, H = base_img.width, base_img.height
# Detect objects
try:
detect_result = model_md.detect(base_img, "all ui elements")
objects = detect_result.get("objects", []) if isinstance(detect_result, dict) else []
except Exception:
objects = []
draw = ImageDraw.Draw(base_img)
try:
font = ImageFont.load_default()
except Exception:
font = None
detected_names: List[str] = []
for i, obj in enumerate(objects):
try:
# Clamp normalized coords and crop
x_min = max(0.0, min(1.0, float(obj.get("x_min", 0.0))))
y_min = max(0.0, min(1.0, float(obj.get("y_min", 0.0))))
x_max = max(0.0, min(1.0, float(obj.get("x_max", 0.0))))
y_max = max(0.0, min(1.0, float(obj.get("y_max", 0.0))))
left, top, right, bottom = (
int(x_min * W),
int(y_min * H),
int(x_max * W),
int(y_max * H),
)
left, top = max(0, left), max(0, top)
right, bottom = min(W - 1, right), min(H - 1, bottom)
crop = base_img.crop((left, top, right, bottom))
# Prompted short caption
try:
result = model_md.query(crop, "Caption this UI element in few words.")
caption_text = (result or {}).get("answer", "")
except Exception:
caption_text = ""
name = (caption_text or "").strip() or f"element_{i+1}"
detected_names.append(name)
# Draw bbox
draw.rectangle([left, top, right, bottom], outline=(255, 215, 0, 255), width=2)
# Label background with padding and rounded corners
label = f"{i+1}. {name}"
padding = 3
if font:
text_bbox = draw.textbbox((0, 0), label, font=font)
else:
text_bbox = draw.textbbox((0, 0), label)
text_w = text_bbox[2] - text_bbox[0]
text_h = text_bbox[3] - text_bbox[1]
tx = left + 3
ty = top - (text_h + 2 * padding + 4)
if ty < 0:
ty = top + 3
bg_left = tx - padding
bg_top = ty - padding
bg_right = tx + text_w + padding
bg_bottom = ty + text_h + padding
try:
draw.rounded_rectangle(
[bg_left, bg_top, bg_right, bg_bottom],
radius=4,
fill=(0, 0, 0, 160),
outline=(255, 215, 0, 200),
width=1,
)
except Exception:
draw.rectangle(
[bg_left, bg_top, bg_right, bg_bottom],
fill=(0, 0, 0, 160),
outline=(255, 215, 0, 200),
width=1,
)
text_fill = (255, 255, 255, 255)
if font:
draw.text((tx, ty), label, fill=text_fill, font=font)
else:
draw.text((tx, ty), label, fill=text_fill)
except Exception:
continue
# Encode PNG base64
annotated = base_img
if annotated.mode not in ("RGBA", "RGB"):
annotated = annotated.convert("RGBA")
annotated_b64 = _image_to_b64(annotated)
return annotated_b64, detected_names
GROUNDED_COMPUTER_TOOL_SCHEMA = {
"type": "function",
"function": {
"name": "computer",
"description": (
"Control a computer by taking screenshots and interacting with UI elements. "
"The screenshot action will include a list of detected form UI element names when available. "
"Use element descriptions to locate and interact with UI elements on the screen."
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [
"screenshot",
"click",
"double_click",
"drag",
"type",
"keypress",
"scroll",
"move",
"wait",
"get_current_url",
"get_dimensions",
"get_environment",
],
"description": "The action to perform (required for all actions)",
},
"element_description": {
"type": "string",
"description": "Description of the element to interact with (required for click/double_click/move/scroll)",
},
"start_element_description": {
"type": "string",
"description": "Description of the element to start dragging from (required for drag)",
},
"end_element_description": {
"type": "string",
"description": "Description of the element to drag to (required for drag)",
},
"text": {
"type": "string",
"description": "The text to type (required for type)",
},
"keys": {
"type": "array",
"items": {"type": "string"},
"description": "Key(s) to press (required for keypress)",
},
"button": {
"type": "string",
"enum": ["left", "right", "wheel", "back", "forward"],
"description": "The mouse button to use for click/double_click",
},
"scroll_x": {
"type": "integer",
"description": "Horizontal scroll amount (required for scroll)",
},
"scroll_y": {
"type": "integer",
"description": "Vertical scroll amount (required for scroll)",
},
},
"required": ["action"],
},
},
}
@register_agent(r"moondream3\+.*", priority=2)
class Moondream3PlusConfig(AsyncAgentConfig):
def __init__(self):
self.desc2xy: Dict[str, Tuple[float, float]] = {}
async def predict_step(
self,
messages: List[Dict[str, Any]],
model: str,
tools: Optional[List[Dict[str, Any]]] = None,
max_retries: Optional[int] = None,
stream: bool = False,
computer_handler=None,
use_prompt_caching: Optional[bool] = False,
_on_api_start=None,
_on_api_end=None,
_on_usage=None,
_on_screenshot=None,
**kwargs,
) -> Dict[str, Any]:
# Parse composed model: moondream3+<thinking_model>
if "+" not in model:
raise ValueError(f"Composed model must be 'moondream3+<thinking_model>', got: {model}")
_, thinking_model = model.split("+", 1)
pre_output_items: List[Dict[str, Any]] = []
# Acquire last screenshot; if missing, take one
last_image_b64: Optional[str] = None
for message in reversed(messages):
if (
isinstance(message, dict)
and message.get("type") == "computer_call_output"
and isinstance(message.get("output"), dict)
and message["output"].get("type") == "input_image"
):
image_url = message["output"].get("image_url", "")
if image_url.startswith("data:image/png;base64,"):
last_image_b64 = image_url.split(",", 1)[1]
break
if last_image_b64 is None and computer_handler is not None:
# Take a screenshot
screenshot_b64 = await computer_handler.screenshot() # type: ignore
if screenshot_b64:
call_id = uuid.uuid4().hex
pre_output_items += [
{
"type": "message",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "Taking a screenshot to analyze the current screen.",
}
],
},
{
"type": "computer_call",
"call_id": call_id,
"status": "completed",
"action": {"type": "screenshot"},
},
{
"type": "computer_call_output",
"call_id": call_id,
"output": {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_b64}",
},
},
]
last_image_b64 = screenshot_b64
if _on_screenshot:
await _on_screenshot(screenshot_b64)
# If we have a last screenshot, run Moondream detection and labeling
detected_names: List[str] = []
if last_image_b64 is not None:
base_img = _decode_image_b64(last_image_b64)
model_md = get_moondream_model()
annotated_b64, detected_names = _annotate_detect_and_label_ui(base_img, model_md)
if _on_screenshot:
await _on_screenshot(annotated_b64, "annotated_form_ui")
# Also push a user message listing all detected names
if detected_names:
names_text = "\n".join(f"- {n}" for n in detected_names)
pre_output_items.append(
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "Detected form UI elements on screen:"},
{"type": "input_text", "text": names_text},
{
"type": "input_text",
"text": "Please continue with the next action needed to perform your task.",
},
],
}
)
tool_schemas = []
for schema in tools or []:
if schema.get("type") == "computer":
tool_schemas.append(GROUNDED_COMPUTER_TOOL_SCHEMA)
else:
tool_schemas.append(schema)
# Step 1: Convert computer calls from xy to descriptions
input_messages = messages + pre_output_items
messages_with_descriptions = convert_computer_calls_xy2desc(input_messages, self.desc2xy)
# Step 2: Convert responses items to completion messages
completion_messages = convert_responses_items_to_completion_messages(
messages_with_descriptions,
allow_images_in_tool_results=False,
)
# Optionally filter images if model lacks vision
if not _supports_vision(thinking_model):
completion_messages = _filter_images_from_completion_messages(completion_messages)
# Step 3: Call thinking model with litellm.acompletion
api_kwargs = {
"model": thinking_model,
"messages": completion_messages,
"tools": tool_schemas,
"max_retries": max_retries,
"stream": stream,
**kwargs,
}
if use_prompt_caching:
api_kwargs["use_prompt_caching"] = use_prompt_caching
if _on_api_start:
await _on_api_start(api_kwargs)
response = await litellm.acompletion(**api_kwargs)
if _on_api_end:
await _on_api_end(api_kwargs, response)
usage = {
**response.usage.model_dump(), # type: ignore
"response_cost": response._hidden_params.get("response_cost", 0.0),
}
if _on_usage:
await _on_usage(usage)
# Step 4: Convert completion messages back to responses items format
response_dict = response.model_dump() # type: ignore
choice_messages = [choice["message"] for choice in response_dict["choices"]]
thinking_output_items: List[Dict[str, Any]] = []
for choice_message in choice_messages:
thinking_output_items.extend(
convert_completion_messages_to_responses_items([choice_message])
)
# Step 5: Use Moondream to get coordinates for each description
element_descriptions = get_all_element_descriptions(thinking_output_items)
if element_descriptions and last_image_b64:
for desc in element_descriptions:
for _ in range(3): # try 3 times
coords = await self.predict_click(
model=model,
image_b64=last_image_b64,
instruction=desc,
)
if coords:
self.desc2xy[desc] = coords
break
# Step 6: Convert computer calls from descriptions back to xy coordinates
final_output_items = convert_computer_calls_desc2xy(thinking_output_items, self.desc2xy)
# Step 7: Return output and usage
return {"output": pre_output_items + final_output_items, "usage": usage}
async def predict_click(
self,
model: str,
image_b64: str,
instruction: str,
**kwargs,
) -> Optional[Tuple[float, float]]:
"""Predict click coordinates using Moondream3's point API.
Returns pixel coordinates (x, y) as floats.
"""
img = _decode_image_b64(image_b64)
W, H = img.width, img.height
model_md = get_moondream_model()
try:
result = model_md.point(img, instruction, settings={"max_objects": 1})
except Exception:
return None
try:
pt = (result or {}).get("points", [])[0]
x_norm = float(pt.get("x", 0.0))
y_norm = float(pt.get("y", 0.0))
x_px = max(0.0, min(float(W - 1), x_norm * W))
y_px = max(0.0, min(float(H - 1), y_norm * H))
return (x_px, y_px)
except Exception:
return None
def get_capabilities(self) -> List[AgentCapability]:
return ["click", "step"]
```
--------------------------------------------------------------------------------
/docs/src/app/(home)/[[...slug]]/page.tsx:
--------------------------------------------------------------------------------
```typescript
import { getApiVersions, source } from '@/lib/source';
import { getMDXComponents } from '@/mdx-components';
import { buttonVariants } from 'fumadocs-ui/components/ui/button';
import { Popover, PopoverContent, PopoverTrigger } from 'fumadocs-ui/components/ui/popover';
import { createRelativeLink } from 'fumadocs-ui/mdx';
import { DocsBody, DocsDescription, DocsPage, DocsTitle } from 'fumadocs-ui/page';
import { cn } from 'fumadocs-ui/utils/cn';
import { ChevronDown, CodeXml, ExternalLink } from 'lucide-react';
import type { Metadata } from 'next';
import Link from 'next/link';
import { notFound } from 'next/navigation';
import { PageFeedback } from '@/components/page-feedback';
import { DocActionsMenu } from '@/components/doc-actions-menu';
export default async function Page(props: { params: Promise<{ slug?: string[] }> }) {
const params = await props.params;
const slug = params.slug || [];
const page = source.getPage(slug);
if (!page) notFound();
// Detect if this is an API reference page: /api/[section] or /api/[section]/[version]
let apiSection: string | null = null;
let apiVersionSlug: string[] = [];
if (slug[0] === 'api' && slug.length >= 2) {
apiSection = slug[1];
if (slug.length > 2) {
apiVersionSlug = slug.slice(2);
}
}
let versionItems: { label: string; slug: string[] }[] = [];
if (apiSection) {
versionItems = await getApiVersions(apiSection);
}
const macos = page.data.macos;
const windows = page.data.windows;
const linux = page.data.linux;
const pypi = page.data.pypi;
const npm = page.data.npm;
const github = page.data.github;
const MDXContent = page.data.body;
// Platform icons component
const PlatformIcons = () => {
const hasAnyPlatform = macos || windows || linux;
if (!hasAnyPlatform && !pypi) return null;
return (
<div className="flex flex-col gap-2">
{hasAnyPlatform && (
<div className="flex flex-row gap-2 items-left dark:text-neutral-400">
{windows && (
<svg
xmlns="http://www.w3.org/2000/svg"
fill="currentColor"
className="h-5"
viewBox="0 0 448 512"
>
<title>Windows</title>
<path d="M0 93.7l183.6-25.3v177.4H0V93.7zm0 324.6l183.6 25.3V268.4H0v149.9zm203.8 28L448 480V268.4H203.8v177.9zm0-380.6v180.1H448V32L203.8 65.7z" />
</svg>
)}
{macos && (
<svg
xmlns="http://www.w3.org/2000/svg"
fill="currentColor"
className="h-5"
viewBox="0 0 384 512"
>
<title>macOS</title>
<path d="M318.7 268.7c-.2-36.7 16.4-64.4 50-84.8-18.8-26.9-47.2-41.7-84.7-44.6-35.5-2.8-74.3 20.7-88.5 20.7-15 0-49.4-19.7-76.4-19.7C63.3 141.2 4 184.8 4 273.5q0 39.3 14.4 81.2c12.8 36.7 59 126.7 107.2 125.2 25.2-.6 43-17.9 75.8-17.9 31.8 0 48.3 17.9 76.4 17.9 48.6-.7 90.4-82.5 102.6-119.3-65.2-30.7-61.7-90-61.7-91.9zm-56.6-164.2c27.3-32.4 24.8-61.9 24-72.5-24.1 1.4-52 16.4-67.9 34.9-17.5 19.8-27.8 44.3-25.6 71.9 26.1 2 49.9-11.4 69.5-34.3z" />
</svg>
)}
{linux && (
<svg
xmlns="http://www.w3.org/2000/svg"
fill="currentColor"
className="h-5"
viewBox="0 0 448 512"
>
<title>Linux</title>
<path d="M220.8 123.3c1 .5 1.8 1.7 3 1.7 1.1 0 2.8-.4 2.9-1.5 .2-1.4-1.9-2.3-3.2-2.9-1.7-.7-3.9-1-5.5-.1-.4 .2-.8 .7-.6 1.1 .3 1.3 2.3 1.1 3.4 1.7zm-21.9 1.7c1.2 0 2-1.2 3-1.7 1.1-.6 3.1-.4 3.5-1.6 .2-.4-.2-.9-.6-1.1-1.6-.9-3.8-.6-5.5 .1-1.3 .6-3.4 1.5-3.2 2.9 .1 1 1.8 1.5 2.8 1.4zM420 403.8c-3.6-4-5.3-11.6-7.2-19.7-1.8-8.1-3.9-16.8-10.5-22.4-1.3-1.1-2.6-2.1-4-2.9-1.3-.8-2.7-1.5-4.1-2 9.2-27.3 5.6-54.5-3.7-79.1-11.4-30.1-31.3-56.4-46.5-74.4-17.1-21.5-33.7-41.9-33.4-72C311.1 85.4 315.7 .1 234.8 0 132.4-.2 158 103.4 156.9 135.2c-1.7 23.4-6.4 41.8-22.5 64.7-18.9 22.5-45.5 58.8-58.1 96.7-6 17.9-8.8 36.1-6.2 53.3-6.5 5.8-11.4 14.7-16.6 20.2-4.2 4.3-10.3 5.9-17 8.3s-14 6-18.5 14.5c-2.1 3.9-2.8 8.1-2.8 12.4 0 3.9 .6 7.9 1.2 11.8 1.2 8.1 2.5 15.7 .8 20.8-5.2 14.4-5.9 24.4-2.2 31.7 3.8 7.3 11.4 10.5 20.1 12.3 17.3 3.6 40.8 2.7 59.3 12.5 19.8 10.4 39.9 14.1 55.9 10.4 11.6-2.6 21.1-9.6 25.9-20.2 12.5-.1 26.3-5.4 48.3-6.6 14.9-1.2 33.6 5.3 55.1 4.1 .6 2.3 1.4 4.6 2.5 6.7v.1c8.3 16.7 23.8 24.3 40.3 23 16.6-1.3 34.1-11 48.3-27.9 13.6-16.4 36-23.2 50.9-32.2 7.4-4.5 13.4-10.1 13.9-18.3 .4-8.2-4.4-17.3-15.5-29.7zM223.7 87.3c9.8-22.2 34.2-21.8 44-.4 6.5 14.2 3.6 30.9-4.3 40.4-1.6-.8-5.9-2.6-12.6-4.9 1.1-1.2 3.1-2.7 3.9-4.6 4.8-11.8-.2-27-9.1-27.3-7.3-.5-13.9 10.8-11.8 23-4.1-2-9.4-3.5-13-4.4-1-6.9-.3-14.6 2.9-21.8zM183 75.8c10.1 0 20.8 14.2 19.1 33.5-3.5 1-7.1 2.5-10.2 4.6 1.2-8.9-3.3-20.1-9.6-19.6-8.4 .7-9.8 21.2-1.8 28.1 1 .8 1.9-.2-5.9 5.5-15.6-14.6-10.5-52.1 8.4-52.1zm-13.6 60.7c6.2-4.6 13.6-10 14.1-10.5 4.7-4.4 13.5-14.2 27.9-14.2 7.1 0 15.6 2.3 25.9 8.9 6.3 4.1 11.3 4.4 22.6 9.3 8.4 3.5 13.7 9.7 10.5 18.2-2.6 7.1-11 14.4-22.7 18.1-11.1 3.6-19.8 16-38.2 14.9-3.9-.2-7-1-9.6-2.1-8-3.5-12.2-10.4-20-15-8.6-4.8-13.2-10.4-14.7-15.3-1.4-4.9 0-9 4.2-12.3zm3.3 334c-2.7 35.1-43.9 34.4-75.3 18-29.9-15.8-68.6-6.5-76.5-21.9-2.4-4.7-2.4-12.7 2.6-26.4v-.2c2.4-7.6 .6-16-.6-23.9-1.2-7.8-1.8-15 .9-20 3.5-6.7 8.5-9.1 14.8-11.3 10.3-3.7 11.8-3.4 19.6-9.9 5.5-5.7 9.5-12.9 14.3-18 5.1-5.5 10-8.1 17.7-6.9 8.1 1.2 15.1 6.8 21.9 16l19.6 35.6c9.5 19.9 43.1 48.4 41 68.9zm-1.4-25.9c-4.1-6.6-9.6-13.6-14.4-19.6 7.1 0 14.2-2.2 16.7-8.9 2.3-6.2 0-14.9-7.4-24.9-13.5-18.2-38.3-32.5-38.3-32.5-13.5-8.4-21.1-18.7-24.6-29.9s-3-23.3-.3-35.2c5.2-22.9 18.6-45.2 27.2-59.2 2.3-1.7 .8 3.2-8.7 20.8-8.5 16.1-24.4 53.3-2.6 82.4 .6-20.7 5.5-41.8 13.8-61.5 12-27.4 37.3-74.9 39.3-112.7 1.1 .8 4.6 3.2 6.2 4.1 4.6 2.7 8.1 6.7 12.6 10.3 12.4 10 28.5 9.2 42.4 1.2 6.2-3.5 11.2-7.5 15.9-9 9.9-3.1 17.8-8.6 22.3-15 7.7 30.4 25.7 74.3 37.2 95.7 6.1 11.4 18.3 35.5 23.6 64.6 3.3-.1 7 .4 10.9 1.4 13.8-35.7-11.7-74.2-23.3-84.9-4.7-4.6-4.9-6.6-2.6-6.5 12.6 11.2 29.2 33.7 35.2 59 2.8 11.6 3.3 23.7 .4 35.7 16.4 6.8 35.9 17.9 30.7 34.8-2.2-.1-3.2 0-4.2 0 3.2-10.1-3.9-17.6-22.8-26.1-19.6-8.6-36-8.6-38.3 12.5-12.1 4.2-18.3 14.7-21.4 27.3-2.8 11.2-3.6 24.7-4.4 39.9-.5 7.7-3.6 18-6.8 29-32.1 22.9-76.7 32.9-114.3 7.2zm257.4-11.5c-.9 16.8-41.2 19.9-63.2 46.5-13.2 15.7-29.4 24.4-43.6 25.5s-26.5-4.8-33.7-19.3c-4.7-11.1-2.4-23.1 1.1-36.3 3.7-14.2 9.2-28.8 9.9-40.6 .8-15.2 1.7-28.5 4.2-38.7 2.6-10.3 6.6-17.2 13.7-21.1 .3-.2 .7-.3 1-.5 .8 13.2 7.3 26.6 18.8 29.5 12.6 3.3 30.7-7.5 38.4-16.3 9-.3 15.7-.9 22.6 5.1 9.9 8.5 7.1 30.3 17.1 41.6 10.6 11.6 14 19.5 13.7 24.6zM173.3 148.7c2 1.9 4.7 4.5 8 7.1 6.6 5.2 15.8 10.6 27.3 10.6 11.6 0 22.5-5.9 31.8-10.8 4.9-2.6 10.9-7 14.8-10.4s5.9-6.3 3.1-6.6-2.6 2.6-6 5.1c-4.4 3.2-9.7 7.4-13.9 9.8-7.4 4.2-19.5 10.2-29.9 10.2s-18.7-4.8-24.9-9.7c-3.1-2.5-5.7-5-7.7-6.9-1.5-1.4-1.9-4.6-4.3-4.9-1.4-.1-1.8 3.7 1.7 6.5z" />
</svg>
)}
</div>
)}
<div className="flex flex-row gap-2 items-left">
{pypi && (
<a target="_blank" href={`https://pypi.org/project/${pypi}/`} rel="noreferrer">
<img
src={`https://img.shields.io/pypi/v/${pypi}?color=blue`}
className="h-5"
alt="PyPI"
/>
</a>
)}
{npm && (
<a target="_blank" href={`https://www.npmjs.com/package/${npm}`} rel="noreferrer">
<img
src={`https://img.shields.io/npm/v/${npm}?color=bf4c4b`}
className="h-5"
alt="NPM"
/>
</a>
)}
</div>
</div>
);
};
const tocHeader = () => {
return (
<div className="w-fit">
<PlatformIcons />
<div className="flex gap-2 mt-2">
{github &&
github.length > 0 &&
(github.length === 1 ? (
<a
href={github[0]}
rel="noreferrer noopener"
target="_blank"
className="inline-flex gap-2 w-fit items-center justify-center rounded-md text-sm font-medium transition-colors duration-100 disabled:pointer-events-none disabled:opacity-50 focus-visible:outline-none hover:bg-fd-accent hover:text-fd-accent-foreground p-1.5 [&_svg]:size-5 text-fd-muted-foreground md:[&_svg]:size-4.5"
aria-label="Source"
data-active="false"
>
<svg role="img" viewBox="0 0 24 24" fill="currentColor">
<path d="M12 .297c-6.63 0-12 5.373-12 12 0 5.303 3.438 9.8 8.205 11.385.6.113.82-.258.82-.577 0-.285-.01-1.04-.015-2.04-3.338.724-4.042-1.61-4.042-1.61C4.422 18.07 3.633 17.7 3.633 17.7c-1.087-.744.084-.729.084-.729 1.205.084 1.838 1.236 1.838 1.236 1.07 1.835 2.809 1.305 3.495.998.108-.776.417-1.305.76-1.605-2.665-.3-5.466-1.332-5.466-5.93 0-1.31.465-2.38 1.235-3.22-.135-.303-.54-1.523.105-3.176 0 0 1.005-.322 3.3 1.23.96-.267 1.98-.399 3-.405 1.02.006 2.04.138 3 .405 2.28-1.552 3.285-1.23 3.285-1.23.645 1.653.24 2.873.12 3.176.765.84 1.23 1.91 1.23 3.22 0 4.61-2.805 5.625-5.475 5.92.42.36.81 1.096.81 2.22 0 1.606-.015 2.896-.015 3.286 0 .315.21.69.825.57C20.565 22.092 24 17.592 24 12.297c0-6.627-5.373-12-12-12"></path>
</svg>
Source
<ExternalLink className="w-4 h-4 ml-auto" />
</a>
) : (
<Popover>
<PopoverTrigger className="inline-flex gap-2 w-fit items-center justify-center rounded-md text-sm font-medium transition-colors duration-100 disabled:pointer-events-none disabled:opacity-50 focus-visible:outline-none hover:bg-fd-accent hover:text-fd-accent-foreground p-1.5 [&_svg]:size-5 text-fd-muted-foreground md:[&_svg]:size-4.5">
<svg role="img" viewBox="0 0 24 24" fill="currentColor">
<path d="M12 .297c-6.63 0-12 5.373-12 12 0 5.303 3.438 9.8 8.205 11.385.6.113.82-.258.82-.577 0-.285-.01-1.04-.015-2.04-3.338.724-4.042-1.61-4.042-1.61C4.422 18.07 3.633 17.7 3.633 17.7c-1.087-.744.084-.729.084-.729 1.205.084 1.838 1.236 1.838 1.236 1.07 1.835 2.809 1.305 3.495.998.108-.776.417-1.305.76-1.605-2.665-.3-5.466-1.332-5.466-5.93 0-1.31.465-2.38 1.235-3.22-.135-.303-.54-1.523.105-3.176 0 0 1.005-.322 3.3 1.23.96-.267 1.98-.399 3-.405 1.02.006 2.04.138 3 .405 2.28-1.552 3.285-1.23 3.285-1.23.645 1.653.24 2.873.12 3.176.765.84 1.23 1.91 1.23 3.22 0 4.61-2.805 5.625-5.475 5.92.42.36.81 1.096.81 2.22 0 1.606-.015 2.896-.015 3.286 0 .315.21.69.825.57C20.565 22.092 24 17.592 24 12.297c0-6.627-5.373-12-12-12"></path>
</svg>
Source
<ChevronDown className="h-4 w-4" />
</PopoverTrigger>
<PopoverContent className="w-48 p-1">
<div className="flex flex-col gap-1">
{github.map((link, index) => (
<a
key={index}
href={link}
rel="noreferrer noopener"
target="_blank"
className="inline-flex gap-2 w-full items-center rounded-md p-2 text-sm hover:bg-fd-accent hover:text-fd-accent-foreground"
>
{link.includes('python')
? 'Python'
: link.includes('typescript')
? 'TypeScript'
: `Source ${index + 1}`}
<ExternalLink className="w-4 h-4 ml-auto" />
</a>
))}
</div>
</PopoverContent>
</Popover>
))}
{/*slug.includes('libraries') && (
<a
className="inline-flex gap-2 w-fit items-center justify-center rounded-md text-sm font-medium transition-colors duration-100 disabled:pointer-events-none disabled:opacity-50 focus-visible:outline-none hover:bg-fd-accent hover:text-fd-accent-foreground p-1.5 [&_svg]:size-5 text-fd-muted-foreground md:[&_svg]:size-4.5"
href={`/api/${page.data.title.toLowerCase()}`}>
<CodeXml size={12} />
Reference
</a>
)*/}
</div>
<hr className="my-2 border-t border-fd-border" />
</div>
);
};
const tocFooter = () => {
// Construct file path from slug
// For root index, use 'index.mdx', otherwise join slug parts
const filePath = slug.length === 0 ? 'index.mdx' : `${slug.join('/')}.mdx`;
return (
<div className="mt-4">
<DocActionsMenu pageUrl={page.url} pageTitle={page.data.title} filePath={filePath} />
</div>
);
};
return (
<DocsPage
toc={page.data.toc}
tableOfContent={{ header: tocHeader(), footer: tocFooter() }}
full={page.data.full}
>
<div className="flex flex-row w-full items-start">
<div className="flex-1">
<div className="flex flex-row w-full">
{slug.length > 0 && <DocsTitle>{page.data.title}</DocsTitle>}
<div className="ml-auto flex items-center gap-2">
{apiSection && versionItems.length > 1 && (
<Popover>
<PopoverTrigger
className={cn(
buttonVariants({
color: 'secondary',
size: 'sm',
className: 'gap-2',
})
)}
>
{(() => {
// Find the current version label
let currentLabel = 'Current';
if (apiVersionSlug.length > 0) {
const found = versionItems.find(
(item) => item.label !== 'Current' && apiVersionSlug[0] === item.label
);
if (found) currentLabel = found.label;
}
return (
<>
API Version: {currentLabel}
<ChevronDown className="size-3.5 text-fd-muted-foreground" />
</>
);
})()}
</PopoverTrigger>
<PopoverContent className="flex flex-col overflow-auto">
{versionItems.map((item) => {
// Build the href for each version
const href =
item.label === 'Current'
? `/api/${apiSection}`
: `/api/${apiSection}/${item.label}`;
// Highlight current version
const isCurrent =
(item.label === 'Current' && apiVersionSlug.length === 0) ||
(item.label !== 'Current' && apiVersionSlug[0] === item.label);
return (
<Link
key={item.label}
href={href}
className={cn(
'px-3 py-1 rounded hover:bg-fd-muted',
isCurrent && 'font-bold bg-fd-muted'
)}
>
API version: {item.label}
</Link>
);
})}
</PopoverContent>
</Popover>
)}
</div>
</div>
<DocsDescription className="text-md mt-1">{page.data.description}</DocsDescription>
</div>
</div>
<DocsBody>
<MDXContent
components={getMDXComponents({
// this allows you to link to other pages with relative file paths
a: createRelativeLink(source, page),
})}
/>
<PageFeedback />
</DocsBody>
</DocsPage>
);
}
export async function generateStaticParams() {
return source.generateParams();
}
export async function generateMetadata(props: {
params: Promise<{ slug?: string[] }>;
}): Promise<Metadata> {
const params = await props.params;
const page = source.getPage(params.slug);
if (!page) notFound();
let title = `${page.data.title} | Cua`;
if (page.url.includes('api')) title = `${page.data.title} | Cua API`;
if (page.url.includes('guide')) title = ` Guide: ${page.data.title} | Cua`;
// Canonical URL points to cua.ai to consolidate all SEO authority on main domain
const canonicalUrl = `https://cua.ai${page.url}`;
// Extract keywords from the page for SEO
const keywords = [
'computer use agent',
'computer use',
'AI automation',
'visual automation',
page.data.title,
];
// Structured data for better Google indexing (TechArticle schema)
const structuredData = {
'@context': 'https://schema.org',
'@type': 'TechArticle',
headline: page.data.title,
description: page.data.description,
url: canonicalUrl,
publisher: {
'@type': 'Organization',
name: 'Cua',
url: 'https://cua.ai',
logo: {
'@type': 'ImageObject',
url: 'https://cua.ai/cua_logo_black.svg',
},
},
mainEntityOfPage: {
'@type': 'WebPage',
'@id': canonicalUrl,
},
};
// Breadcrumb schema for better site structure understanding
const breadcrumbSchema = {
'@context': 'https://schema.org',
'@type': 'BreadcrumbList',
itemListElement: [
{
'@type': 'ListItem',
position: 1,
name: 'Cua',
item: 'https://cua.ai',
},
{
'@type': 'ListItem',
position: 2,
name: 'Documentation',
item: 'https://cua.ai/docs',
},
{
'@type': 'ListItem',
position: 3,
name: page.data.title,
item: canonicalUrl,
},
],
};
return {
title,
description: page.data.description,
keywords,
authors: [{ name: 'Cua', url: 'https://cua.ai' }],
robots: {
index: true,
follow: true,
googleBot: {
index: true,
follow: true,
'max-image-preview': 'large',
'max-snippet': -1,
},
},
alternates: {
canonical: canonicalUrl,
},
openGraph: {
title,
description: page.data.description,
type: 'article',
siteName: 'Cua',
url: canonicalUrl,
},
twitter: {
card: 'summary',
title,
description: page.data.description,
creator: '@trycua',
},
other: {
'script:ld+json': JSON.stringify([structuredData, breadcrumbSchema]),
},
};
}
```